Intro
In the realm of extensive datasets — and for this discussion, I’ve zeroed in on MongoDB — it’s imperative to understand the database’s response patterns to the queries our applications launch against it. Not long ago, I found myself crafting some seemingly straightforward MongoDB queries. For instance, consider a task as “innocent” as “deleting all records associated with user x”. While it might sound trivial, executing such commands isn’t always as straightforward as it appears. In this article I will show you how I handled the MongoDB batch processes
“In the course of my research, I came across numerous sources that echoed a common theme, one of which can be referenced at ( https://www.mongodb.com/community/forums/t/delete-many-with-limit/11940):
- Performance Impact: Deleting a large number of entries in a single call can impact the performance of your MongoDB server. This could lead to increased resource usage and potentially slow down other operations. In some cases, it might even cause the server to become unresponsive during the deletion process.
- Locking: MongoDB uses locks for write operations. Deleting a large number of documents in one go can cause extensive locking, affecting the availability of the database for other operations during the delete process.
- Network Traffic: Transmitting a large batch of delete requests can generate significant network traffic between your application and the database server. This can impact both the database and network performance.
- Timeouts: Depending on the execution environment and network conditions, very large delete operations might run into timeouts or connection issues, causing incomplete or failed deletions.
Given the considerations outlined above, I charted out a methodical batching approach, broken down as follows:
- Identify and Batch Entities: Initiate a search for entities slated for deletion, grouping them into manageable batches. For this, I took advantage of the
batch options
parameter available with MongoDB’sFindAsync
method. - Sequential Deletion: Process the removal of these entities, tackling one batch at a time. This ensures a controlled and system-friendly deletion pace.
- Intersperse with Pauses: Introduce brief intervals between each deletion batch. This not only eases the load on the database but also ensures optimal operational continuity.
- Background Processing: To seamlessly execute the above steps without taxing the primary application flow, I delegated the process to a background worker, utilizing the capabilities of
Hangfire
.
Batching The Mongo Cursor
For those well-versed with MongoDB, you’re likely familiar that the FindAsync
method yields an IAsyncCursor
.
This IAsyncCursor
invariably points to the Current
value, comprising the list of loaded entities. Additionally, the cursor is structured to guide you to the subsequent batch. Here’s why I devised a specific enumerator:
- Item-centric Iteration: I aimed for the iterator to yield single items upon traversal, as opposed to returning an entire batch.
- Automated Batch Loading: I envisioned a streamlined enumerator, one that would autonomously fetch the next batch once the present batch concludes its iteration.
- Decoupling from IAsyncCursor: My goal was to abstract away from the MongoDB-specific
IAsyncCursor
. Instead, I aspired for a more universalIAsyncEnumerable
, detaching the implementation from MongoDB’s particulars.
Here’s my final formulation: an enumerator that gracefully accepts the IAsyncCursor
, ensuring individual item iteration while autonomously pulling batches from MongoDB into memory
public sealed class BatchAsyncEnumerable<TDocument> : IAsyncEnumerable<TDocument> where TDocument : BaseDocument { private readonly IAsyncCursor<TDocument> _mongoAsyncCursor; public BatchAsyncEnumerable(IAsyncCursor<TDocument> mongoAsyncCursor) { _mongoAsyncCursor = mongoAsyncCursor; } public IAsyncEnumerator<TDocument> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new BatchAsyncEnumerator<TDocument>(_mongoAsyncCursor); } }
public sealed class BatchAsyncEnumerator<TDocument> : IAsyncEnumerator<TDocument> where TDocument : BaseDocument { private readonly IAsyncCursor<TDocument> _asyncCursor; private List<TDocument> currentBatch; private int currentBatchIndex; public BatchAsyncEnumerator(IAsyncCursor<TDocument> asyncCursor) { _asyncCursor = asyncCursor; currentBatchIndex = -1; } /// <summary> /// Returns the current element, which represent the current progress in the current mongo batch. /// <para>The current progress is maintained in a list index for the current batch, if the index reached the list /// limit then the enumerator will get the next batch from the async cursor.</para> /// </summary> public TDocument Current { get { if (currentBatch == null || currentBatch.Count == 0) { throw new Exception($"Current item is null"); } if (currentBatchIndex == -1) { throw new ArgumentOutOfRangeException($"Current item cannot be retrieved. Try using the MoveNextAsync method "); } return currentBatch[currentBatchIndex]; } } /// <summary> /// Dispose the async cursor of the mongo database /// </summary> public ValueTask DisposeAsync() { _asyncCursor.Dispose(); return new ValueTask(); } /// <summary> /// This method will return the next item from the current batch using an index that maintaines current position. /// <para>If the index reached the end of the list, the method will get the next batch or return false if the next batch is empty</para> /// </summary> public async ValueTask<bool> MoveNextAsync() { //At the first iteration current batch is null. if (currentBatch == null) { //if there is a batch, the asynccursor.MoveNext will return true if (await _asyncCursor.MoveNextAsync().ConfigureAwait(false)) { //it seams mongo returns true for the first empty batch. GetCurrentBatch will detect this return GetCurrentBatch(); } else { //if there is no batch return false currentBatch = null; currentBatchIndex = -1; return false; } } else { //currentBatchIndex maintains the current position of the current batch(list). //if the index did not reach the end of the list proceed by increasing the index if (currentBatchIndex < currentBatch.Count - 1) { currentBatchIndex++; return true; } //if the index reached the end of the current batch(list) try and get the next batch else { //if there is a batch, the asynccursor.MoveNext will return true if (await _asyncCursor.MoveNextAsync().ConfigureAwait(false)) { //it seams mongo returns true for the first empty batch. GetCurrentBatch will detect this return GetCurrentBatch(); } else { //if there is no batch return false currentBatch = null; currentBatchIndex = -1; return false; } } } } private bool GetCurrentBatch() { if (_asyncCursor.Current == null) { currentBatch = null; currentBatchIndex = -1; return false; } currentBatch = _asyncCursor.Current.ToList(); //if the mongo current batch returns an emtpy list, enumerator current batch is set to null if (currentBatch.Count == 0) { currentBatch = null; currentBatchIndex = -1; return false; } currentBatchIndex = 0; return true; } }
Here is an example of how you would use this enumerator (note that I am taking into account the disadvantage of using an expression over directly using the FilterDefinition)
public async Task<IAsyncEnumerable<TDocument>> FindManyAsync(Expression<Func<TDocument, bool>> filter) { var asyncCursor = await _mongoCollection.FindAsync(filter, new FindOptions<TDocument> { BatchSize = 1000 }); return new BatchAsyncEnumerable<TDocument>(asyncCursor); }
Partitioning
The following section shows how to create a custom enumerator, which takes in an IAsyncEnumerable<T>
and returns an IAsyncEnumerable<IEnumerable<T>>
having a custom partitionSize
It might appear contradictory at first glance. Why iterate through a bulked collection item-by-item only to regroup them later? The answer lies in design choice, not necessity. You could, in theory, design an iterator that dispenses each set of items sequentially. My personal preference was to station the prior batch enumerator at the repository level. This placement lends a tidiness to the repository, obscuring the granular details about bulking. Now, as I intend to process items in bulk, I’ve layered a new iterator on top to reassemble these items. While it’s advised (though not obligatory) to have the bulk count from the initial stage align with the partitionSize
we’ll designate in the subsequent step.
public class PartitionAsyncEnumerable<T> : IAsyncEnumerable<IEnumerable<T>> { private readonly IAsyncEnumerable<T> _enumerable; private readonly int _partitionSize; public PartitionAsyncEnumerable(IAsyncEnumerable<T> enumerable, int partitionSize) { this._enumerable = enumerable; this._partitionSize = partitionSize; } public IAsyncEnumerator<IEnumerable<T>> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new PartitionAsyncEnumerator<T>(_enumerable, _partitionSize); } } public class PartitionAsyncEnumerator<T> : IAsyncEnumerator<IEnumerable<T>> { private readonly IAsyncEnumerable<T> _enumerable; private readonly IAsyncEnumerator<T> _enumerator; private readonly int _partitionSize; private List<T> _current = new(); public IEnumerable<T> Current => _current; public PartitionAsyncEnumerator(IAsyncEnumerable<T> enumerable, int partitionSize) { _enumerable = enumerable; _partitionSize = partitionSize; _enumerator = _enumerable.GetAsyncEnumerator(); } public ValueTask DisposeAsync() { return _enumerator.DisposeAsync(); } public async ValueTask<bool> MoveNextAsync() { _current.Clear(); while (await _enumerator.MoveNextAsync()) { _current.Add(_enumerator.Current); if (_current.Count == _partitionSize) { break; } } return _current.Count > 0; } }
Batch Processing
In the following section, I am using a background job spawned with Hangfire. I am not going into the details of using Hangfire, just the relevant code.
I started by creating an extension method for my Partitioner enumerator
public static class ExtensionsEnumerable{ public static IAsyncEnumerator<IEnumerable<T>> Partition<T>(this IAsyncEnumerable<T> enumerable, int partSize) { return new PartitionAsyncEnumerator<T>(enumerable, partSize); } }
I created a batch processing extension method which would take in an IEnumerable<T>, partition it and execute a function over each partition
public static class Batching { public static async Task BatchProcessAsync<T>( Func<ushort, Task<IAsyncEnumerable<T>>> enumerableTask, Func<List<T>, Task> execution, ushort batchSize = 1000) { var enumerable = await enumerableTask(batchSize); IAsyncEnumerator<IEnumerable<T>> enumerator = enumerable.Partition<T>(batchSize); while (await enumerator.MoveNextAsync()) { await execution(enumerator.Current.ToList()); await Pauser.PauseAsync(); } await Pauser.PauseAsync(); } }
And the final process used by the background job would be
public async Task ProcessAsync( JobRemove message) { await Batching.BatchProcessAsync( async batchSize=> await _repository.FindAsync(message.SomeId, batchSize), // returns the first enumerator we defined async entities => { await DeleteAsync(entities); await Pauser.PauseAsync(); // simple Thread.Sleep to destress the database }, batchSize: 1000); }
Thank you
Thank you for reading the article.
You can support me by https://www.buymeacoffee.com/ramihamati
You can also find this article at https://medium.com/@hamatirami