Optimistic Concurrency Mongo C# — Digitteck
Optimistic Concurrency Mongo C#
dotnet·8 February 2022·5 min read

Optimistic Concurrency Mongo C#

Locking is required when we need to guarantee data consistency — preventing one user from overwriting changes another user has already made. There are two types:

  • Optimistic — assumes collisions are rare; validates before write
  • Pessimistic — exclusive lock while editing; no reads or writes until released

When Pessimistic?

A pessimistic lock prevents all concurrent reads and writes while a process holds it. This is not performant when reads must be allowed while blocking writes. A useful case: a worker implementing the outbox pattern that reads a database entry and sends an event — scaling that worker requires ensuring each entry is only read once.

When Optimistic?

Optimistic locking assumes the collection is not often changed and collisions are rare. Reads are allowed; collision-writes are detected and rejected. The most common technique is a timestamp — each document stores a timestamp, and any write must present the same timestamp currently stored. If they differ, another writer got there first.

Implementation

Assumptions:

  • Write concern is at least Acknowledged (Majority recommended for stronger consistency)
  • Cannot delete/update/replace multiple documents by LINQ expression — each timestamp must be validated individually

The base document carries the timestamp used for all concurrency checks:

csharp
public class BaseDocument
{
    [BsonConstructor]
    public BaseDocument()
    {
        Timestamp = DateTimeOffset.UtcNow.Ticks;
    }

    public BaseDocument(Guid id)
    {
        Id = id;
        Timestamp = DateTimeOffset.UtcNow.Ticks;
    }

    [BsonId(IdGenerator = typeof(GuidGenerator))]
    [BsonIgnoreIfDefault]
    [BsonRepresentation(BsonType.String)]
    public Guid Id { get; set; }

    [BsonElement("Timestamp")]
    public long Timestamp { get; set; }
}

The collection wrapper enforces optimistic locking on every write operation — any mismatch between the stored and provided timestamp throws a concurrency exception:

csharp
public sealed class CollectionUnitOptimistic<TDocument>
    where TDocument : BaseDocument
{
    private IMongoCollection<TDocument> MongoCollection { get; }

    public CollectionUnitOptimistic(IMongoCollection<TDocument> mongoCollection)
    {
        MongoCollection = mongoCollection;
    }

    public Task<long> CountAsync(
        Expression<Func<TDocument, bool>> filter,
        CountOptions countOptions = null,
        IClientSessionHandle session = null)
    {
        Ensure.IsNotNull(filter, nameof(filter));
        return session != null
            ? MongoCollection.CountDocumentsAsync(session, filter, countOptions)
            : MongoCollection.CountDocumentsAsync(filter, countOptions);
    }

    public async Task DeleteOneAsync(
        BaseDocument document,
        IClientSessionHandle session = null)
    {
        FilterDefinition<TDocument> filter
            = Builders<TDocument>.Filter.Eq(x => x.Id, document.Id)
            & Builders<TDocument>.Filter.Eq(r => r.Timestamp, document.Timestamp);

        DeleteResult result = session == null
            ? await MongoCollection.DeleteOneAsync(filter).ConfigureAwait(false)
            : await MongoCollection.DeleteOneAsync(session, filter).ConfigureAwait(false);

        if (!result.IsAcknowledged)
            throw ThrowHelpers.NotAcknowledged();

        if (result.DeletedCount != 1 && MongoCollection.CountDocuments(r => r.Id == document.Id) == 1)
            throw ThrowHelpers.ConcurrencyDeleteOneFailed(document);
    }

    public async Task DeleteManyAsync(
        IEnumerable<BaseDocument> documents,
        IClientSessionHandle session = null)
    {
        var deleteModels = documents
            .Select(r => new DeleteOneModel<TDocument>(
                Builders<TDocument>.Filter.Eq(t => t.Id, r.Id)
                & Builders<TDocument>.Filter.Eq(t => t.Timestamp, r.Timestamp)))
            .ToList();

        BulkWriteResult<TDocument> result = session is null
            ? await MongoCollection.BulkWriteAsync(deleteModels)
            : await MongoCollection.BulkWriteAsync(session, deleteModels);

        if (!result.IsAcknowledged)
            throw ThrowHelpers.NotAcknowledged();

        if (result.DeletedCount != deleteModels.Count)
        {
            var documentIds = documents.Select(r => r.Id).ToList();
            if (MongoCollection.CountDocuments(Builders<TDocument>.Filter.In(r => r.Id, documentIds)) == documentIds.Count)
                throw ThrowHelpers.ConcurrencyException();
        }
    }

    public async Task<List<TDocument>> InsertManyAsync(
        List<TDocument> documents,
        IClientSessionHandle session = null)
    {
        Ensure.IsNotNull(documents, nameof(documents));

        foreach (TDocument document in documents)
        {
            document.Id = (document.Id == Guid.Empty) ? Guid.NewGuid() : document.Id;
            document.Timestamp = DateTimeOffset.UtcNow.Ticks;
        }

        if (session == null)
            await MongoCollection.InsertManyAsync(documents).ConfigureAwait(false);
        else
            await MongoCollection.InsertManyAsync(session, documents).ConfigureAwait(false);

        return documents;
    }

    public async Task<TDocument> InsertOneAsync(
        TDocument document,
        IClientSessionHandle session = null)
    {
        Ensure.IsNotNull(document, nameof(document));

        document.Id = (document.Id == Guid.Empty) ? Guid.NewGuid() : document.Id;
        document.Timestamp = DateTimeOffset.UtcNow.Ticks;

        if (session == null)
            await MongoCollection.InsertOneAsync(document).ConfigureAwait(false);
        else
            await MongoCollection.InsertOneAsync(session, document).ConfigureAwait(false);

        return document;
    }

    public async Task ReplaceOneAsync(
        TDocument document,
        IClientSessionHandle session = null)
    {
        Ensure.IsNotNull(document, nameof(document));
        Ensure.IsNotEmpty(document.Id, nameof(document.Id));

        long currentTS = document.Timestamp;
        document.Timestamp = DateTimeOffset.UtcNow.Ticks;

        FilterDefinition<TDocument> filter = Builders<TDocument>.Filter.Eq(r => r.Id, document.Id)
            & Builders<TDocument>.Filter.Eq(r => r.Timestamp, currentTS);

        ReplaceOneResult result = session == null
            ? await MongoCollection.ReplaceOneAsync(filter, document, new ReplaceOptions { IsUpsert = false }).ConfigureAwait(false)
            : await MongoCollection.ReplaceOneAsync(session, filter, document, new ReplaceOptions { IsUpsert = false }).ConfigureAwait(false);

        if (!result.IsAcknowledged)
            throw ThrowHelpers.NotAcknowledged();

        if (result.ModifiedCount == 0 && MongoCollection.CountDocuments(r => r.Id == document.Id) == 1)
            throw ThrowHelpers.ConcurrencyReplaceOneFail(document);
    }

    public async Task ReplaceManyAsync(
        IEnumerable<TDocument> documents,
        IClientSessionHandle session = null)
    {
        Ensure.IsNotNull(documents, nameof(documents));

        var replaceModels = new List<ReplaceOneModel<TDocument>>();

        foreach (TDocument document in documents)
        {
            FilterDefinition<TDocument> filter = Builders<TDocument>.Filter.Eq(r => r.Id, document.Id)
                & Builders<TDocument>.Filter.Eq(r => r.Timestamp, document.Timestamp);
            document.Timestamp = DateTimeOffset.UtcNow.Ticks;

            replaceModels.Add(new ReplaceOneModel<TDocument>(filter, document) { IsUpsert = false });
        }

        BulkWriteResult<TDocument> result = session == null
            ? await MongoCollection.BulkWriteAsync(replaceModels).ConfigureAwait(false)
            : await MongoCollection.BulkWriteAsync(session, replaceModels).ConfigureAwait(false);

        if (!result.IsAcknowledged)
            throw ThrowHelpers.NotAcknowledged();

        if (result.ModifiedCount != documents.Count())
        {
            var documentIds = documents.Select(r => r.Id);
            if (MongoCollection.CountDocuments(r => documentIds.Contains(r.Id)) != documentIds.Count())
                throw ThrowHelpers.ConcurrencyException();

            throw ThrowHelpers.NotAcknowledged();
        }
    }

    public async Task ReplaceOneUpsertAsync(
        TDocument document,
        IClientSessionHandle session = null)
    {
        Ensure.IsNotNull(document, nameof(document));

        document.Id = (document.Id == Guid.Empty) ? Guid.NewGuid() : document.Id;

        long currentTS = document.Timestamp;
        document.Timestamp = DateTimeOffset.UtcNow.Ticks;

        FilterDefinition<TDocument> filter = Builders<TDocument>.Filter.Eq(r => r.Id, document.Id)
            & Builders<TDocument>.Filter.Eq(r => r.Timestamp, currentTS);

        ReplaceOneResult result = await MongoCollection.ReplaceOneAsync(
            session ?? (IClientSessionHandle)null,
            filter,
            document,
            new ReplaceOptions { IsUpsert = true }).ConfigureAwait(false);

        if (!result.IsAcknowledged)
            throw ThrowHelpers.NotAcknowledged();

        if (result.ModifiedCount == 0 && MongoCollection.CountDocuments(r => r.Id == document.Id) == 1)
            throw ThrowHelpers.ConcurrencyReplaceOneFail(document);
    }

    public async Task UpdateManyAsync(
        List<TDocument> documents,
        UpdateDefinition<TDocument> update,
        IClientSessionHandle session = null)
    {
        Ensure.IsNotNull(documents, nameof(documents));
        Ensure.IsNotNull(update, nameof(update));

        var updateModels = documents
            .Select(r => new UpdateOneModel<TDocument>(
                Builders<TDocument>.Filter.Eq(t => t.Id, r.Id)
                & Builders<TDocument>.Filter.Eq(t => t.Timestamp, r.Timestamp),
                update.Set(r => r.Timestamp, DateTimeOffset.UtcNow.Ticks)) { IsUpsert = false })
            .ToList();

        BulkWriteResult<TDocument> result = session is null
            ? await MongoCollection.BulkWriteAsync(updateModels)
            : await MongoCollection.BulkWriteAsync(session, updateModels);

        if (!result.IsAcknowledged)
            throw ThrowHelpers.NotAcknowledged();

        if (result.ModifiedCount != documents.Count)
        {
            var documentIds = documents.Select(r => r.Id).ToList();
            if (MongoCollection.CountDocuments(Builders<TDocument>.Filter.In(r => r.Id, documentIds)) == documentIds.Count)
                throw ThrowHelpers.ConcurrencyException();

            throw ThrowHelpers.NotAcknowledged();
        }
    }
}

Tags

.NETC#MongoDBConcurrency
digitteck

© 2026 Digitteck