# Idempotency for Subscribers

## Overview

**Idempotency** ensures that event subscribers process each message **exactly once**, even when the message bus redelivers the same message multiple times. This is critical for maintaining data integrity in distributed systems.

### Why Do Messages Get Redelivered?

Message buses like NServiceBus, RabbitMQ, and Azure Service Bus may redeliver messages due to:

| Scenario                   | Description                                             |
| -------------------------- | ------------------------------------------------------- |
| **Transient Failures**     | Network timeouts, database connection issues            |
| **Process Crashes**        | Application crashes before acknowledging the message    |
| **Retry Policies**         | Automatic retries on failed message processing          |
| **Infrastructure Issues**  | Message broker restarts, failovers                      |
| **At-Least-Once Delivery** | Most message buses guarantee delivery but may duplicate |

### What Problems Can Duplicate Processing Cause?

Without idempotency, duplicate message processing can lead to:

* 📧 **Multiple emails sent** to the same user
* 💳 **Duplicate financial transactions**
* 📦 **Duplicate orders or shipments**
* 🔢 **Incorrect counters or statistics**
* 💾 **Duplicate database records**
* 🔗 **Inconsistent data across systems**

***

## Quick Start

FlexBase provides **automatic idempotency** for subscribers using the Decorator Pattern. No changes are required to your existing subscriber code.

### Step 1: Add Configuration to appsettings.json

```json
{
  "FlexIdempotency": {
    "Enabled": true,
    "StoreType": "Cache",
    "DefaultTimeToLiveDays": 7,
    "KeyPrefix": "flex:idempotency"
  },
  "FlexCache": {
    "MaxPayloadBytes": 1048576,
    "DefaultExpirationMinutes": 60
  }
}
```

### Step 2: Register Idempotency Services

In your `CommonHostConfig.cs` or startup configuration:

```csharp
services.AddFlexIdempotency(configuration);
```

That's it! All your subscribers now have automatic idempotency protection.

***

## Configuration Options

### FlexIdempotency Section

| Setting                 | Default              | Description                                        |
| ----------------------- | -------------------- | -------------------------------------------------- |
| `Enabled`               | `true`               | Enable/disable idempotency checking globally       |
| `StoreType`             | `"Cache"`            | Storage backend: `"Cache"` or `"Database"`         |
| `DefaultTimeToLiveDays` | `7`                  | How long to remember processed events              |
| `KeyPrefix`             | `"flex:idempotency"` | Prefix for cache keys (Cache store only)           |
| `EnableAutoCleanup`     | `false`              | Auto-cleanup expired records (Database store only) |
| `CleanupBatchSize`      | `1000`               | Records per cleanup batch (Database store only)    |

### FlexCache Section (for Cache Store)

| Setting                    | Default         | Description                    |
| -------------------------- | --------------- | ------------------------------ |
| `MaxPayloadBytes`          | `1048576` (1MB) | Maximum cache entry size       |
| `DefaultExpirationMinutes` | `60`            | Default cache entry expiration |

***

## Store Types

FlexBase offers two storage backends for tracking processed events:

### Option 1: Cache Store (Default)

Uses **FlexHybridCache** with L1 (in-memory) + L2 (distributed) architecture.

```json
{
  "FlexIdempotency": {
    "StoreType": "Cache"
  }
}
```

**Advantages:**

* ✅ High performance (in-memory L1 cache)
* ✅ Scales across multiple instances (with distributed L2)
* ✅ Simple setup for development

**Cache Backend Options:**

| Backend        | Use Case                       | Setup Complexity      |
| -------------- | ------------------------------ | --------------------- |
| In-Memory Only | Development, single instance   | None                  |
| Redis          | Production, high performance   | Requires Redis server |
| SQL Server     | Production, no Redis available | Requires cache table  |

### Option 2: Database Store (EF Core)

Uses your application's **Entity Framework Core DbContext** to store processed events.

```json
{
  "FlexIdempotency": {
    "StoreType": "Database"
  }
}
```

**Advantages:**

* ✅ Data persists across application restarts
* ✅ Queryable (you can analyze processed events)
* ✅ Uses existing database infrastructure
* ✅ No additional cache infrastructure needed

**Disadvantages:**

* ⚠️ Slower than cache-based storage
* ⚠️ Increases database load
* ⚠️ Requires EF Core migration

***

## Cache Store Configuration

### Development: In-Memory Only

For local development, no additional configuration is needed. FlexHybridCache uses in-memory storage by default.

```csharp
// Just register idempotency - cache is in-memory by default
services.AddFlexIdempotency(configuration);
```

> ⚠️ **Warning:** In-memory cache is lost on application restart and not shared across instances. Use distributed cache for production.

***

### Production: Redis Backend

Redis provides the best performance for distributed caching.

#### Step 1: Install NuGet Package

```bash
dotnet add package Microsoft.Extensions.Caching.StackExchangeRedis
```

#### Step 2: Configure Redis

```csharp
// In your ConfigureServices or Program.cs
services.AddStackExchangeRedisCache(options =>
{
    options.Configuration = configuration.GetConnectionString("Redis");
    options.InstanceName = "YourApp:";
});

// Then add idempotency (uses Redis as L2 cache automatically)
services.AddFlexIdempotency(configuration);
```

#### Step 3: Add Connection String

```json
{
  "ConnectionStrings": {
    "Redis": "localhost:6379,abortConnect=false"
  }
}
```

***

### Production: SQL Server Distributed Cache

If you don't have Redis, you can use SQL Server as a distributed cache backend.

#### Step 1: Install NuGet Package

```bash
dotnet add package Microsoft.Extensions.Caching.SqlServer
```

#### Step 2: Create the Cache Table

**Option A: Using dotnet tool**

```bash
dotnet tool install --global dotnet-sql-cache
dotnet sql-cache create "YourConnectionString" dbo DistributedCache
```

**Option B: Using SQL Script**

```sql
CREATE TABLE [dbo].[DistributedCache] (
    [Id] NVARCHAR(449) NOT NULL PRIMARY KEY,
    [Value] VARBINARY(MAX) NOT NULL,
    [ExpiresAtTime] DATETIMEOFFSET NOT NULL,
    [SlidingExpirationInSeconds] BIGINT NULL,
    [AbsoluteExpiration] DATETIMEOFFSET NULL
);

CREATE NONCLUSTERED INDEX [IX_DistributedCache_ExpiresAtTime]
    ON [dbo].[DistributedCache]([ExpiresAtTime]);
```

#### Step 3: Configure SQL Server Distributed Cache

```csharp
// In your ConfigureServices or Program.cs
services.AddDistributedSqlServerCache(options =>
{
    options.ConnectionString = configuration.GetConnectionString("CacheDb");
    options.SchemaName = "dbo";
    options.TableName = "DistributedCache";
    options.ExpiredItemsDeletionInterval = TimeSpan.FromMinutes(30);
});

// Then add idempotency (uses SQL Server as L2 cache automatically)
services.AddFlexIdempotency(configuration);
```

#### Step 4: Add Connection String

```json
{
  "ConnectionStrings": {
    "CacheDb": "Server=.;Database=YourDb;Trusted_Connection=True;TrustServerCertificate=True"
  }
}
```

***

## Database Store Configuration (EF Core)

For full database persistence with a typed entity, use the EF Core store.

### Step 1: Configure Store Type

```json
{
  "FlexIdempotency": {
    "StoreType": "Database",
    "DefaultTimeToLiveDays": 7,
    "EnableAutoCleanup": false,
    "CleanupBatchSize": 1000
  }
}
```

### Step 2: Choose Your DbContext Approach

FlexBase provides two options for the Database store:

| Approach              | Description                                               | Best For                            |
| --------------------- | --------------------------------------------------------- | ----------------------------------- |
| **Dedicated Context** | Uses `FlexProcessedEventDbContext` (provided by FlexBase) | Isolation, separate migrations      |
| **Shared Context**    | Add entity to your `ApplicationEFDbContext`               | Simplicity, single migration stream |

***

#### Option A: Dedicated Context (Recommended for Isolation)

Use the built-in `FlexProcessedEventDbContext`. No changes to your application DbContext required.

**Register in IdempotencyConfig.cs:**

```csharp
using Sumeru.Flex.Data.EntityFramework.Idempotency;

// For dedicated context with same connection string as app database
services.AddFlexProcessedEventStoreEfCore(
    options => options.UseSqlServer(configuration.GetConnectionString("AppDbConnection")),
    storeOptions => 
    {
        storeOptions.DefaultTimeToLive = TimeSpan.FromDays(7);
        storeOptions.EnableAutoCleanup = false;
    });
```

**Create migrations:**

```bash
# From your Migrations project
cd YourApp.Migrations.SqlServer

dotnet ef migrations add AddFlexProcessedEventsTable \
    --context FlexProcessedEventDbContext \
    --output-dir Idempotency \
    --startup-project ../YourApp.EndPoint.WebAPI

dotnet ef database update \
    --context FlexProcessedEventDbContext \
    --startup-project ../YourApp.EndPoint.WebAPI
```

> 💡 **Tip:** Using `--output-dir Idempotency` keeps idempotency migrations separate from your main application migrations.

***

#### Option B: Shared Context (Simpler Setup)

Add the `FlexProcessedEventEntity` to your existing `ApplicationEFDbContext`. This keeps all entities and migrations together.

**Step B1: Modify your ApplicationEFDbContext:**

```csharp
using Sumeru.Flex.Data.EntityFramework.Idempotency;

public class ApplicationEFDbContext : FlexEFDbContext
{
    // Add this DbSet alongside your other entities
    public DbSet<FlexProcessedEventEntity> FlexProcessedEvents { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        
        // Add this configuration
        FlexProcessedEventDbContext.ConfigureProcessedEventEntity(modelBuilder);
        
        // ... your other entity configurations ...
    }
}
```

**Step B2: Register in IdempotencyConfig.cs:**

```csharp
using Sumeru.Flex.Data.EntityFramework.Idempotency;

// For shared context - uses your existing ApplicationEFDbContext
services.AddFlexProcessedEventStoreEfCore<ApplicationEFDbContext>(
    storeOptions => 
    {
        storeOptions.DefaultTimeToLive = TimeSpan.FromDays(7);
        storeOptions.EnableAutoCleanup = false;
    });
```

**Step B3: Create migrations:**

```bash
# From your Migrations project
cd YourApp.Migrations.SqlServer

dotnet ef migrations add AddFlexProcessedEventsTable \
    --context ApplicationEFDbContext \
    --startup-project ../YourApp.EndPoint.WebAPI

dotnet ef database update \
    --context ApplicationEFDbContext \
    --startup-project ../YourApp.EndPoint.WebAPI
```

***

### Which Approach Should I Choose?

| Consideration            | Dedicated Context     | Shared Context                        |
| ------------------------ | --------------------- | ------------------------------------- |
| **Setup complexity**     | More configuration    | Simpler                               |
| **Migration management** | Separate migrations   | Single migration stream               |
| **Transaction scope**    | Separate transactions | Same transaction as business entities |
| **Connection pooling**   | Uses separate pool    | Shares connection pool                |
| **Schema isolation**     | Fully isolated        | Mixed with app entities               |

**Recommendation:**

* Use **Shared Context** for most applications (simpler, fewer moving parts)
* Use **Dedicated Context** if you need schema isolation or want separate database

### Database Table Structure

The migration creates a `FlexProcessedEvents` table:

| Column           | Type            | Description                          |
| ---------------- | --------------- | ------------------------------------ |
| `Id`             | `bigint`        | Auto-increment primary key           |
| `EventId`        | `nvarchar(450)` | Unique event identifier              |
| `SubscriberType` | `nvarchar(450)` | Fully qualified subscriber type name |
| `CorrelationId`  | `nvarchar(450)` | Optional correlation ID for tracing  |
| `ProcessedAtUtc` | `datetime2`     | When the event was processed         |
| `ExpiresAtUtc`   | `datetime2`     | When the record can be cleaned up    |

**Indexes:**

* Composite unique index on `(EventId, SubscriberType)`
* Index on `ExpiresAtUtc` for cleanup queries

### Cleanup of Expired Records

**For Cache Store:** Expired records are automatically cleaned up by the cache infrastructure (Redis, HybridCache). No action needed.

**For Database Store:** Records have an `ExpiresAtUtc` column and **expired records are automatically ignored** during idempotency checks. However, you need to periodically clean up expired records to prevent table growth.

FlexBase provides a `CleanupExpiredAsync()` method you can call from your own scheduled job:

```csharp
// Example: Hangfire recurring job
public class ProcessedEventCleanupJob
{
    private readonly EfCoreProcessedEventStore _store;

    public ProcessedEventCleanupJob(EfCoreProcessedEventStore store)
    {
        _store = store;
    }

    public async Task Execute()
    {
        int totalCleaned = 0;
        int cleaned;
        
        // Process in batches until all expired records are deleted
        do
        {
            cleaned = await _store.CleanupExpiredAsync();
            totalCleaned += cleaned;
        } while (cleaned > 0);
        
        // Log result
        Console.WriteLine($"Cleaned up {totalCleaned} expired idempotency records");
    }
}

// Register with Hangfire (or your preferred scheduler)
RecurringJob.AddOrUpdate<ProcessedEventCleanupJob>(
    "cleanup-expired-idempotency-records",
    job => job.Execute(),
    Cron.Daily);  // Run daily
```

**Alternative: Direct SQL cleanup**

If you prefer, you can also clean up using a SQL Agent job or scheduled query:

```sql
-- Delete expired records older than 7 days
DELETE FROM FlexProcessedEvents 
WHERE ExpiresAtUtc <= GETUTCDATE();
```

> **Note:** Even without cleanup, expired records don't affect correctness - they are ignored during idempotency checks. Cleanup is only needed to manage table size.

***

## Opting Out of Idempotency

Some subscribers should always run, even on retries (e.g., logging, metrics, notifications). Use the `[SkipIdempotency]` attribute:

```csharp
using Sumeru.Flex;

[SkipIdempotency]
public class AuditLogSubscriber : IAuditLogSubscriber
{
    public async Task Execute(UserLoggedInEvent @event, IFlexServiceBusContext context)
    {
        // This will run on EVERY message, including duplicates
        await LogUserActivity(@event);
    }
}
```

***

## How It Works

FlexBase automatically adds a "gatekeeper" in front of your subscribers. This gatekeeper checks if an event has already been processed before allowing it through to your code.

**The key point:** Your existing subscriber code doesn't change at all. FlexBase handles everything behind the scenes when you register your services.

### What Happens When a Message Arrives

```
    ┌─────────────────────────────────────────────────────────────┐
    │                    MESSAGE ARRIVES                          │
    │              (might be first time or a retry)               │
    └─────────────────────────────────────────────────────────────┘
                                │
                                ▼
    ┌─────────────────────────────────────────────────────────────┐
    │              IDEMPOTENCY CHECK (Automatic)                  │
    │                                                             │
    │   "Have I seen this EventId + Subscriber before?"           │
    │                                                             │
    │          YES (duplicate)  │  NO (first time)                │
    │               │           │        │                        │
    │               ▼           │        ▼                        │
    │         ┌─────────┐       │   ┌──────────────┐              │
    │         │  SKIP   │       │   │ Mark as      │              │
    │         │  (done) │       │   │ "processed"  │              │
    │         └─────────┘       │   └──────────────┘              │
    │                           │        │                        │
    └───────────────────────────│────────│────────────────────────┘
                                         │
                                         ▼
    ┌─────────────────────────────────────────────────────────────┐
    │               YOUR SUBSCRIBER RUNS                          │
    │                                                             │
    │   - Your business logic executes                            │
    │   - Database operations happen                              │
    │   - Emails get sent, etc.                                   │
    │                                                             │
    │   (This only runs ONCE per unique message)                  │
    └─────────────────────────────────────────────────────────────┘
```

### Why Your Code Doesn't Need to Change

When you call `AddFlexIdempotency()`, FlexBase automatically:

1. **Intercepts** all calls to your subscribers
2. **Checks** if the `EventId` has been seen before
3. **Blocks duplicates** from reaching your code
4. **Passes through** new messages to your subscriber

Your subscriber just receives messages and processes them - it doesn't know (or care) that there's a gatekeeper in front of it.

### Key Points

1. **No code changes required** - Your subscribers work exactly as before
2. **EventId-based deduplication** - Each event has a unique `EventId` (auto-generated)
3. **Per-subscriber tracking** - The same event can be processed by different subscribers
4. **TTL-based cleanup** - Old records are automatically expired after the configured time

***

## Troubleshooting

### Events Being Skipped Unexpectedly

1. **Check EventId generation** - Events should have unique `EventId` values
2. **Verify TTL settings** - If TTL is too long, old events may still be "remembered"
3. **Check cache connectivity** - Ensure Redis/SQL Server is reachable

### Duplicate Processing Still Occurring

1. **Verify registration** - Ensure `AddFlexIdempotency()` is called
2. **Check store type** - Verify the configured store is working
3. **Check for `[SkipIdempotency]`** - Subscriber may be opted out

### Debug Logging

Enable debug logging to see idempotency decisions:

```json
{
  "Serilog": {
    "MinimumLevel": {
      "Override": {
        "Sumeru.Flex": "Debug"
      }
    }
  }
}
```

Log messages:

* `"Event {EventId} already processed by {Subscriber}; skipping"` - Duplicate blocked
* `"Successfully processed event {EventId} by {Subscriber}"` - First-time processing

***

## Best Practices

### 1. Choose the Right Store Type

| Scenario                        | Recommended Store  |
| ------------------------------- | ------------------ |
| Development                     | Cache (in-memory)  |
| Single instance, simple app     | Cache (in-memory)  |
| Multi-instance, high throughput | Cache + Redis      |
| Multi-instance, no Redis        | Cache + SQL Server |
| Need queryable history          | Database (EF Core) |
| Regulatory/audit requirements   | Database (EF Core) |

### 2. Set Appropriate TTL

* **Too short:** Events may be reprocessed if redelivered after TTL expires
* **Too long:** Storage grows unnecessarily, cleanup slower

Recommended: **7 days** (default) for most scenarios

### 3. Monitor Storage Growth

For Database store, periodically review the `FlexProcessedEvents` table size and enable `EnableAutoCleanup` if needed.

### 4. Use SkipIdempotency Sparingly

Only use `[SkipIdempotency]` for:

* Logging/auditing subscribers
* Metrics/telemetry collectors
* Notification dispatchers (where duplicates are acceptable)

***

## Complete Example

### appsettings.json

```json
{
  "ConnectionStrings": {
    "Redis": "localhost:6379,abortConnect=false"
  },
  "FlexIdempotency": {
    "Enabled": true,
    "StoreType": "Cache",
    "DefaultTimeToLiveDays": 7,
    "KeyPrefix": "flex:idempotency"
  },
  "FlexCache": {
    "MaxPayloadBytes": 1048576,
    "DefaultExpirationMinutes": 60
  }
}
```

### IdempotencyConfig.cs

```csharp
using Microsoft.Extensions.Caching.Hybrid;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Sumeru.Flex;

public static class IdempotencyConfig
{
    public static IServiceCollection AddFlexIdempotency(
        this IServiceCollection services, 
        IConfiguration configuration)
    {
        var section = configuration.GetSection("FlexIdempotency");
        var enabled = section.GetValue<bool>("Enabled", true);

        if (!enabled)
            return services;

        // Configure cache
        services.AddFlexHybridCache(options =>
        {
            var cacheSection = configuration.GetSection("FlexCache");
            options.MaximumPayloadBytes = cacheSection.GetValue<int>("MaxPayloadBytes", 1024 * 1024);
            options.DefaultEntryOptions = new HybridCacheEntryOptions
            {
                Expiration = TimeSpan.FromMinutes(
                    cacheSection.GetValue<int>("DefaultExpirationMinutes", 60))
            };
        });

        // Configure idempotency store
        var options = new FlexProcessedEventStoreOptions
        {
            DefaultTimeToLive = TimeSpan.FromDays(
                section.GetValue<int>("DefaultTimeToLiveDays", 7)),
            KeyPrefix = section.GetValue<string>("KeyPrefix", "flex:idempotency")!
        };

        services.AddSingleton(options);
        services.AddSingleton<IFlexProcessedEventStore, FlexCacheProcessedEventStore>();

        return services;
    }
}
```

### CommonHostConfig.cs

```csharp
.ConfigureServices((hostingContext, services) =>
{
    // ... other services ...

    // Add Redis distributed cache (for production)
    services.AddStackExchangeRedisCache(options =>
    {
        options.Configuration = hostingContext.Configuration.GetConnectionString("Redis");
        options.InstanceName = "MyApp:";
    });

    // Add idempotency
    services.AddFlexIdempotency(hostingContext.Configuration);
});
```

***

## Summary

| Feature                   | Description                                   |
| ------------------------- | --------------------------------------------- |
| **Purpose**               | Prevent duplicate event processing            |
| **Default Store**         | Cache (FlexHybridCache)                       |
| **Alternative Store**     | Database (EF Core)                            |
| **Cache Backends**        | In-memory, Redis, SQL Server                  |
| **Opt-out**               | `[SkipIdempotency]` attribute                 |
| **Configuration**         | `FlexIdempotency` section in appsettings.json |
| **Code Changes Required** | None (Decorator Pattern)                      |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.flexbase.in/solution-structure/getting-started/application/idempotency-for-subscribers.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
