Skip to content

03. Parallel

David Shnayder edited this page Jun 3, 2024 · 3 revisions

Parallel

Whenever using parallel calculations, with either the Parallel class or with Task.WhenAll, more than likely you could benefit greatly from using the alternatives provided here.

To use these alternatives, you need to follow the next steps:

  1. Create a type that implements the best suited delegate type, either IAction or IAsyncAction
  2. Create a local instance of that "delegate" type
  3. Use the .Concurrent() extension that works on any ICollection<T> to access special functions that utilize the "delegate" type

Extensions Methods On .Concurrent()

public static Task InvokeAsync<T>(this Concurrent<T> concurrentReference, in IAsyncAction<T> action, CancellationToken token = default)
public static void ForEach<T>(this Concurrent<T> concurrentReference, IAction<T> action)
public static Task ForEachAsync<T>(this Concurrent<T> concurrentReference, IAsyncAction<T> action, int degreeOfParallelization = -1, CancellationToken token = default)
  • InvokeAsync will process all the collection in parallel dynamically depending on the system requirements, similar to Task.WhenAll which it uses internally.
  • ForEach will process non-async action in parallel using maximal concurrency.
  • ForEachAsync is special as it is using the degreeOfParallelization parameter to allow a certain number of actions to be executed in parallel. This is not batching where batching would split the collection to a number of parts, this splits the collection as to maintain a certain number of elements in each sub-collection. settings the parameter to -1 will set the function to process the same number of elements in parallel as the number of CPU threads in the machine. For example, if you have an 8-thread machine, it will process 8 elements in parallel, after that another 8 and so on, until it is finished. This allows much greater control of resource usage.

Example

To best show how this is used, I will show an example that uses a List<int> and for each item in that list it will wait for 2 seconds and add 2 to the number.

Implementing IAsyncAction

Here we can really see the benefits of using IAsyncAction instead of a lambda. As it allows greater control over the properties and their signatures, and vastly more room for optimization by the JIT compiler.

First implement the interface, you can use a struct but a sealed class performs better in this case as a single instance will be used for it, while a struct will be mem-copied:

public sealed class ParallelListFunction : IAsyncAction<int> { }

There is only one method to implement which is async Task InvokeAsync(int input) and we will do that later, but we need to store the outputs somewhere, possibly some constants, and being a class with the possibilities of readonly properties and constructors, and even static stuff such as fields, we will use this to our advantage.

Creating Fields And Custom Constructor

private readonly ConcurrentDictionary<int, int> _dict;
private static readonly TimeSpan _delay = TimeSpan.FromSeconds(2);

public ParallelListFunction(ConcurrentDictionary<int, int> outputDict) {
    _dict = outputDict;
}

We now have a static readonly field for the delay which reduces memory allocation and increases performance, and a readonly inject field which will hold the reference to our output dictionary.

Now we can implement the main function, take note the the input is the element it pulls from the collection.

Implementing InvokeAsync

public async Task InvokeAsync(int input) {
    await Task.Delay(delay);
    _dict[input] = input + 2;
}

Now we have finished the implementation of the parallel action, we can go the call site and use the following code to execute it.

Performing The Parallel Calculation

List<int> lst = ...
ConcurrentDictionary<int, int> outputDict = new();
var processor = new ParallelListFunction(outputDict); // Using the new constructor we added.
await lst.Concurrent().ForEachAsync(processor);
// Now the output dict is populated with the results.

While for this example, this requires much more code than using a lambda, it will considerably reduce memory allocation. As the function that is done in parallel gets longer and more complicated, the readability, maintainability, debugging, and performance will increase exponentially.

Notes

  • All the methods which will have an optional parameter of a CancellationToken will actually cancel the parallel execution, as you might know, Task.WhenAll isn't very easy to cancel by default.
  • The Concurrent() extension is required to both separate these functions from the native ones, and make sure the user knows which is the one he is using. And to enforce them only being used on types that implement the ICollection<T> interface, which allows optimization using the Count property and faster iteration.

AsyncLocal (Since Sharpify v1.5.0)

An entirely new access point, to entirely new concurrent APIs were added, and starts with AsyncLocal<TList> where TList : IList<TItem>. This enforced constraint is used to optimize for memory allocation by using rented buffers.

On any said TList you can create a new AsyncLocal over it, or use the new extension methods AsAsyncLocal() and AsAsyncLocal(TItem? @default) which will do it for you, to save you the research and simplify, wrapping the collection in AsyncLocal means any changes that occur during asynchronous programming to the collection will be discarded, not making this immutable at least from an logical standpoint, but rather ensuring no backtracking and change synchronization will need to happen.

You can then use the same old IAsyncAction to perform whatever you want. The 2 most notable api's are:

public static async Task InvokeAsync<TList, TItem>(
        this AsyncLocal<TList> asyncLocalReference,
        IAsyncAction<TItem> action,
        CancellationToken token = default) where TList : IList<TItem>;
// and
public static async Task ForEachAsync<TList, TItem>(
        this AsyncLocal<TList> asyncLocalReference,
        IAsyncAction<TItem> action,
        int degreeOfParallelism = -1,
        bool loadBalance = false,
        CancellationToken token = default) where TList : IList<TItem>;

Both use reworked and optimized implementations, and benchmarks show improved performance, and vastly improved memory efficiency.

Notice that ForEachAsync now has a new loadBalance parameter which allows to further customize the behavior of the function. the default is false, and my benchmarks show that it was faster that way, however if your tasks vary by load, it might be the other way around, you should experiment to find the ultimate setup for you.

IAsyncValueAction{T} (Also since Sharpify v1.5.0)

A new interface IAsyncValueAction{T} was added and it is similar to IAsyncAction with the only difference is the return of a ValueTask instead of a Task.

To match the newer APIs, the access point is AsyncLocal<TList>, and there are currently 2 apis:

public static async ValueTask WhenAllAsync<TList, TItem>(
        this AsyncLocal<TList> asyncLocalReference,
        IAsyncValueAction<TItem> action,
        CancellationToken token = default) where TList : IList<TItem>;

It is essentially a Task.WhenAll that works on ValueTasks, but built in a very efficient way.

As you might know, a ValueTask doesn't necessarily allocate a Task, sometimes it returns a value, or cached value. Using this information, we can make tremendous optimizations.

and:

public static ValueTask ForEach<TList, TItem>(
        this AsyncLocal<TList> asyncLocalReference,
        IAction<TItem> action,
        int degreeOfParallelism = -1,
        CancellationToken token = default) where TList : IList<TItem>

This will force synchronous (yet awaitable) parallel execution with the help of Parallel.ForEach.

Summary of what happens behind the scenes

  1. 2 Buffers are rented, first for ValueTasks and second for Tasks
  2. We populate the first buffer by invoking the action on the values
  3. We iterate once more, checking all of the first buffer for completed value tasks.
  4. We populate the second buffer with any incomplete value tasks, by wrapping them in a task.
  5. If all have completed, we return early, in which case even the main method value task, doesn't allocate a task.
  6. Otherwise, we execute the remaining Tasks asynchronously and await the call.

As you can expect it can massively increase the performance and memory efficiency of executing ValueTask actions.

It also opens the gateway to creating parallel paths, for previously synchronous methods, a synchronous method, if wrapped in an IAsyncValueAction will return a cached result, and not allocate a task, doing this for any number of actions together with the nature of this method, guarantee that even a single Task won't be allocated. effectively parallelizing the previously synchronous code for free.

Example (Modification of the previous example)

We can use the same IAsyncAction from before ParallelListFunction

And we already know that List implements IList so all we need is to use the new API entry:

List<int> lst = ...
ConcurrentDictionary<int, int> outputDict = new();
var processor = new ParallelListFunction(outputDict); // Using the new constructor we added.
await lst.AsAsyncLocal<List<int>, int>().ForEachAsync(processor); // AsAsyncLocal - Overload 1
// Now the output dict is populated with the results.

Notice that all that has change is .Concurrent() became .AsLocalAsync(). But the gained performance is massive.

More complex example (Dictionary Source)

Lets look at a closer to real world example, lets say we have a Dictionary<string, StudentInfo that represents a key-value-pair of studentId-StudentInfo, and you want to update the grades concurrency.

To make the example shorter, say you implemented an IAsyncAction that performs an api call and returns a tuple of studentId-Grades, which you enqueue to a ConcurrentQueue of the responding type.

Now the main action:

Dictionary<string, StudentId> source = ...;
ConcurrentQueue<(string studentId, Grades grades)> queue = new();
var action = new GetStudentGrades(queue); // implementation of IAsyncAction

// we rent a buffer for the source entries
var (buffer, entries) = source.RentBufferAndCopyEntries(); // a new dictionary extensions
// perform the calculation
await entries.AsLocalAsync(default(KeyValuePair<string, StudentId>)).ForEachAsync(action); // that is all to perform the calculation -- AsAsyncLocal Overload 2
// return the buffer
buffer.ReturnRentedBuffer(); // new extension for arrays
// We now finished concurrently updating the grades, using minimal memory allocations
// All that's left is to update the source

while (queue.TryDequeue(out var entry)) {
    source[entry.studentId].Grades = entry.grades;
}

/// All done

Clone this wiki locally