Locking is required when we need to guarantee data consistency — preventing one user from overwriting changes another user has already made. There are two types:
- Optimistic — assumes collisions are rare; validates before write
- Pessimistic — exclusive lock while editing; no reads or writes until released
When Pessimistic?
A pessimistic lock prevents all concurrent reads and writes while a process holds it. This is not performant when reads must be allowed while blocking writes. A useful case: a worker implementing the outbox pattern that reads a database entry and sends an event — scaling that worker requires ensuring each entry is only read once.
When Optimistic?
Optimistic locking assumes the collection is not often changed and collisions are rare. Reads are allowed; collision-writes are detected and rejected. The most common technique is a timestamp — each document stores a timestamp, and any write must present the same timestamp currently stored. If they differ, another writer got there first.
Implementation
Assumptions:
- Write concern is at least Acknowledged (Majority recommended for stronger consistency)
- Cannot delete/update/replace multiple documents by LINQ expression — each timestamp must be validated individually
The base document carries the timestamp used for all concurrency checks:
public class BaseDocument
{
[BsonConstructor]
public BaseDocument()
{
Timestamp = DateTimeOffset.UtcNow.Ticks;
}
public BaseDocument(Guid id)
{
Id = id;
Timestamp = DateTimeOffset.UtcNow.Ticks;
}
[BsonId(IdGenerator = typeof(GuidGenerator))]
[BsonIgnoreIfDefault]
[BsonRepresentation(BsonType.String)]
public Guid Id { get; set; }
[BsonElement("Timestamp")]
public long Timestamp { get; set; }
}The collection wrapper enforces optimistic locking on every write operation — any mismatch between the stored and provided timestamp throws a concurrency exception:
public sealed class CollectionUnitOptimistic<TDocument>
where TDocument : BaseDocument
{
private IMongoCollection<TDocument> MongoCollection { get; }
public CollectionUnitOptimistic(IMongoCollection<TDocument> mongoCollection)
{
MongoCollection = mongoCollection;
}
public Task<long> CountAsync(
Expression<Func<TDocument, bool>> filter,
CountOptions countOptions = null,
IClientSessionHandle session = null)
{
Ensure.IsNotNull(filter, nameof(filter));
return session != null
? MongoCollection.CountDocumentsAsync(session, filter, countOptions)
: MongoCollection.CountDocumentsAsync(filter, countOptions);
}
public async Task DeleteOneAsync(
BaseDocument document,
IClientSessionHandle session = null)
{
FilterDefinition<TDocument> filter
= Builders<TDocument>.Filter.Eq(x => x.Id, document.Id)
& Builders<TDocument>.Filter.Eq(r => r.Timestamp, document.Timestamp);
DeleteResult result = session == null
? await MongoCollection.DeleteOneAsync(filter).ConfigureAwait(false)
: await MongoCollection.DeleteOneAsync(session, filter).ConfigureAwait(false);
if (!result.IsAcknowledged)
throw ThrowHelpers.NotAcknowledged();
if (result.DeletedCount != 1 && MongoCollection.CountDocuments(r => r.Id == document.Id) == 1)
throw ThrowHelpers.ConcurrencyDeleteOneFailed(document);
}
public async Task DeleteManyAsync(
IEnumerable<BaseDocument> documents,
IClientSessionHandle session = null)
{
var deleteModels = documents
.Select(r => new DeleteOneModel<TDocument>(
Builders<TDocument>.Filter.Eq(t => t.Id, r.Id)
& Builders<TDocument>.Filter.Eq(t => t.Timestamp, r.Timestamp)))
.ToList();
BulkWriteResult<TDocument> result = session is null
? await MongoCollection.BulkWriteAsync(deleteModels)
: await MongoCollection.BulkWriteAsync(session, deleteModels);
if (!result.IsAcknowledged)
throw ThrowHelpers.NotAcknowledged();
if (result.DeletedCount != deleteModels.Count)
{
var documentIds = documents.Select(r => r.Id).ToList();
if (MongoCollection.CountDocuments(Builders<TDocument>.Filter.In(r => r.Id, documentIds)) == documentIds.Count)
throw ThrowHelpers.ConcurrencyException();
}
}
public async Task<List<TDocument>> InsertManyAsync(
List<TDocument> documents,
IClientSessionHandle session = null)
{
Ensure.IsNotNull(documents, nameof(documents));
foreach (TDocument document in documents)
{
document.Id = (document.Id == Guid.Empty) ? Guid.NewGuid() : document.Id;
document.Timestamp = DateTimeOffset.UtcNow.Ticks;
}
if (session == null)
await MongoCollection.InsertManyAsync(documents).ConfigureAwait(false);
else
await MongoCollection.InsertManyAsync(session, documents).ConfigureAwait(false);
return documents;
}
public async Task<TDocument> InsertOneAsync(
TDocument document,
IClientSessionHandle session = null)
{
Ensure.IsNotNull(document, nameof(document));
document.Id = (document.Id == Guid.Empty) ? Guid.NewGuid() : document.Id;
document.Timestamp = DateTimeOffset.UtcNow.Ticks;
if (session == null)
await MongoCollection.InsertOneAsync(document).ConfigureAwait(false);
else
await MongoCollection.InsertOneAsync(session, document).ConfigureAwait(false);
return document;
}
public async Task ReplaceOneAsync(
TDocument document,
IClientSessionHandle session = null)
{
Ensure.IsNotNull(document, nameof(document));
Ensure.IsNotEmpty(document.Id, nameof(document.Id));
long currentTS = document.Timestamp;
document.Timestamp = DateTimeOffset.UtcNow.Ticks;
FilterDefinition<TDocument> filter = Builders<TDocument>.Filter.Eq(r => r.Id, document.Id)
& Builders<TDocument>.Filter.Eq(r => r.Timestamp, currentTS);
ReplaceOneResult result = session == null
? await MongoCollection.ReplaceOneAsync(filter, document, new ReplaceOptions { IsUpsert = false }).ConfigureAwait(false)
: await MongoCollection.ReplaceOneAsync(session, filter, document, new ReplaceOptions { IsUpsert = false }).ConfigureAwait(false);
if (!result.IsAcknowledged)
throw ThrowHelpers.NotAcknowledged();
if (result.ModifiedCount == 0 && MongoCollection.CountDocuments(r => r.Id == document.Id) == 1)
throw ThrowHelpers.ConcurrencyReplaceOneFail(document);
}
public async Task ReplaceManyAsync(
IEnumerable<TDocument> documents,
IClientSessionHandle session = null)
{
Ensure.IsNotNull(documents, nameof(documents));
var replaceModels = new List<ReplaceOneModel<TDocument>>();
foreach (TDocument document in documents)
{
FilterDefinition<TDocument> filter = Builders<TDocument>.Filter.Eq(r => r.Id, document.Id)
& Builders<TDocument>.Filter.Eq(r => r.Timestamp, document.Timestamp);
document.Timestamp = DateTimeOffset.UtcNow.Ticks;
replaceModels.Add(new ReplaceOneModel<TDocument>(filter, document) { IsUpsert = false });
}
BulkWriteResult<TDocument> result = session == null
? await MongoCollection.BulkWriteAsync(replaceModels).ConfigureAwait(false)
: await MongoCollection.BulkWriteAsync(session, replaceModels).ConfigureAwait(false);
if (!result.IsAcknowledged)
throw ThrowHelpers.NotAcknowledged();
if (result.ModifiedCount != documents.Count())
{
var documentIds = documents.Select(r => r.Id);
if (MongoCollection.CountDocuments(r => documentIds.Contains(r.Id)) != documentIds.Count())
throw ThrowHelpers.ConcurrencyException();
throw ThrowHelpers.NotAcknowledged();
}
}
public async Task ReplaceOneUpsertAsync(
TDocument document,
IClientSessionHandle session = null)
{
Ensure.IsNotNull(document, nameof(document));
document.Id = (document.Id == Guid.Empty) ? Guid.NewGuid() : document.Id;
long currentTS = document.Timestamp;
document.Timestamp = DateTimeOffset.UtcNow.Ticks;
FilterDefinition<TDocument> filter = Builders<TDocument>.Filter.Eq(r => r.Id, document.Id)
& Builders<TDocument>.Filter.Eq(r => r.Timestamp, currentTS);
ReplaceOneResult result = await MongoCollection.ReplaceOneAsync(
session ?? (IClientSessionHandle)null,
filter,
document,
new ReplaceOptions { IsUpsert = true }).ConfigureAwait(false);
if (!result.IsAcknowledged)
throw ThrowHelpers.NotAcknowledged();
if (result.ModifiedCount == 0 && MongoCollection.CountDocuments(r => r.Id == document.Id) == 1)
throw ThrowHelpers.ConcurrencyReplaceOneFail(document);
}
public async Task UpdateManyAsync(
List<TDocument> documents,
UpdateDefinition<TDocument> update,
IClientSessionHandle session = null)
{
Ensure.IsNotNull(documents, nameof(documents));
Ensure.IsNotNull(update, nameof(update));
var updateModels = documents
.Select(r => new UpdateOneModel<TDocument>(
Builders<TDocument>.Filter.Eq(t => t.Id, r.Id)
& Builders<TDocument>.Filter.Eq(t => t.Timestamp, r.Timestamp),
update.Set(r => r.Timestamp, DateTimeOffset.UtcNow.Ticks)) { IsUpsert = false })
.ToList();
BulkWriteResult<TDocument> result = session is null
? await MongoCollection.BulkWriteAsync(updateModels)
: await MongoCollection.BulkWriteAsync(session, updateModels);
if (!result.IsAcknowledged)
throw ThrowHelpers.NotAcknowledged();
if (result.ModifiedCount != documents.Count)
{
var documentIds = documents.Select(r => r.Id).ToList();
if (MongoCollection.CountDocuments(Builders<TDocument>.Filter.In(r => r.Id, documentIds)) == documentIds.Count)
throw ThrowHelpers.ConcurrencyException();
throw ThrowHelpers.NotAcknowledged();
}
}
}