Working with the Task API and async and await makes consuming asynchronous operations very straightforward. However, what if I want to process several asynchronous operations concurrently?

Imagine we want to take a set of color images and turn them into grey-scale. We could have code something like this

var colorImages = Directory.GetFiles(directory,
                                             "*.jpg", 
                                            SearchOption.AllDirectories)
                                   .Select(fn => new Uri(fn))
                                   .ToList();

foreach (Uri colorImage in colorImages)
{
    BitmapSource grayScaleImage 
            = await transforms.CreateGrayScaleImageAsync(colorImage);
    TransformedImages.Add(grayScaleImage);
}

Now, the problem with this code is we are only processing one image at a time - each one asynchronously so we don't block the UI - but still only one at a time. To process images concurrently we need to start them all off before awaiting any for the results. Here's another formulation:

var colorImages = Directory.GetFiles(directory,
                                     "*.jpg",
                                     SearchOption.AllDirectories)   
                           .Select(fn => new Uri(fn))
                           .ToList();

List<Task<BitmapSource>> tasks = new List<Task<BitmapSource>>();
foreach (Uri colourImage in colorImages)
{
    tasks.Add(transforms.CreateGrayScaleImageAsync(colourImage));
}

foreach (Task<BitmapSource> task in tasks)
{
    TransformedImages.Add(await task);
}

Here we add all of the tasks that represent the asynchronous transforming of images to a list and only once started do we await their completion. This is definitely better as we are now using all of the cores on the machine for the work. However, there is still an issue. We are processing the results in the order that the transformations were started, not the order in which they complete. If the first image were a lot larger then we wouldn't see any images until that first image had finished processing.

What we really need is the ability to know when the first one completes so we can process its results while the others are still being converted - enter Task.WhenAny.

var colorImages = Directory.GetFiles(directory,
                                     "*.jpg", 
                                     SearchOption.AllDirectories)
                           .Select(fn => new Uri(fn))
                           .ToList();

var outstandingTasks = new List<Task<BitmapSource>>();
foreach (Uri colourImage in colorImages)
{
    outstandingTasks.Add(transforms.CreateGrayScaleImageAsync(colourImage));
}

while (outstandingTasks.Count > 0)
{
    Task<BitmapSource> completedTask = await Task.WhenAny(outstandingTasks);

    TransformedImages.Add(await completedTask);

    outstandingTasks.Remove(completedTask);
}

Task.WhenAny takes a set of tasks on which to wait and returns the first Task to complete - but, unlike Task.WaitAny, it is awaitable. This again means we don't block the UI but as each task completes we can process its results and remove it from the list of outstanding tasks. While this appears to give us an optimal solution, there is a hidden problem. To understand the issue we need to consider how Task.WhenAny might be implemented. To wait for each of those outstanding tasks to complete we would need to set up a continuation on each and then do some processing when the first one finishes. So if we passed 10 tasks initially we would have to set up 10 continuations. The next time round the loop we would set up 9 continuations; the time after that 8 continuations and so on. More generally, for n tasks we need to set up n(n+1)/2 continuations (55 in the case of 10 outstanding tasks). This is hugely inefficient.

In an ideal world we want to await the tasks in the order that they will complete in the future. Unfortunately, not being able to predict the future, this is not possible. However, what if we could construct a set of tasks that will complete in the order that the original ones will complete? What we need is to be able to create tasks whose lifetimes are fully under programmatic control - this is where TaskCompletionSource<T> comes in. Here are the essential parts of the API

public class TaskCompletionSource<T>
{
    public Task<T> Task { get; }

    public void SetCanceled();
    public void SetException(Exception exception);       
    public void SetResult(T result);

    ...
}

The idea is that we can create an instance of TaskCompletionSource<T> and hand someone theTask property which they can now await. We can then decide how and when that task completes using SetResult, SetException and SetCanceled. Also, as Task<T> derives from Task you can also use this even if we have no results to produce, which can be very useful when unit testing asynchronous components.

So, using TaskCompletionSource<T> let's create a method that will take a set of tasks and, using a minimal number of continuations, return another set of tasks that will complete in the order the passed ones complete.

public static class TaskExtensions
{
    public static IEnumerable<Task<T>> GetInCompletingOrder<T>(this IEnumerable<Task<T>> source)
    {
        Task<T>[] tasks = source.ToArray();
        TaskCompletionSource<T>[] tcss = new TaskCompletionSource<T>[tasks.Length];

        int currentSlot = -1;

        for (int i = 0; i < tasks.Length; i++)
        {
            tcss[i] = new TaskCompletionSource<T>();

            tasks[i].ContinueWith(prev =>
            {

                int indexToSet = Interlocked.Increment(ref currentSlot);

                tcss[indexToSet].SetResult(prev.Result);

            });
        }

        return tcss.Select(t => t.Task);
    }
}

Lets walk through the code.

  • We create a set of TaskCompletionSource<T> so we have one for each source Task
  • We loop through the tasks, initializing TaskCompletionSource array and setting up a continuation on the current source task which will fire when the source task completes
  • We return the TaskCompleteionSource tasks using LINQ.
  • When the continuation fires, we increment which TaskCompletionSource we are going to complete, in a thread-safe way (as more than one source task could complete simultaneously), and then set the result of that TaskCompletionSource to the result of the completed source task.

For production use, we should make this extension method more robust. We should use separate continuations on the source task, for each potential completion type, to make sure we complete the TaskCompletionSource task in the correct way (RanToCompletion, Faulted, Canceled) but for this blog post that would obscure the technique being used.

Let's use this GetInCompletingOrder extension method in our original code:

var colorImages = Directory.GetFiles(directory,
                                     "*.jpg", 
                                     SearchOption.AllDirectories)
                                   .Select(fn => new Uri(fn))
                                   .ToList();

List<Task<BitmapSource>> tasks = new List<Task<BitmapSource>>();
foreach (Uri colourImage in colorImages)
{
    tasks.Add(transforms.CreateGrayScaleImageAsync(colourImage));
}


foreach (var task in tasks.GetInCompletingOrder())
{
    TransformedImages.Add(await task);
}

Now we have all of our images being processed concurrently and then we collect the results in the order they complete using a minimal number of continuations. But the most powerful thing is that the extension method we have written is not tied to our problem domain. It can be used with any set of tasks as general purpose utility code.

Posted in Code on Wednesday, 23 May, 2018 by Richard Blewett tagged with async C#

Join the conversation:

comments powered by Disqus