Wednesday, 23 May 2018
Richard Blewett
10 minute read
Imagine we want to take a set of colour images and turn them into greyscale. We could have code something like this:
var colorImages = Directory.GetFiles(directory, "*.jpg", SearchOption.AllDirectories)
.Select(fn => new Uri(fn))
.ToList();
foreach (Uri colorImage in colorImages)
{
BitmapSource grayScaleImage = await transforms.CreateGrayScaleImageAsync(colorImage);
TransformedImages.Add(grayScaleImage);
}
Now, the problem with this code is we are only processing one image at a time - each one asynchronously so we don't block the UI - but still only one at a time. To process images concurrently we need to start them all off before awaiting any for the results. Here's another formulation:
var colorImages = Directory.GetFiles(directory, "*.jpg", SearchOption.AllDirectories)
.Select(fn => new Uri(fn))
.ToList();
List<Task<BitmapSource>> tasks = new List<Task<BitmapSource>>();
foreach (Uri colourImage in colorImages)
{
tasks.Add(transforms.CreateGrayScaleImageAsync(colourImage));
}
foreach (Task<BitmapSource> task in tasks)
{
TransformedImages.Add(await task);
}
Here we add all of the tasks that represent the asynchronous transforming of images to a list and only once started do we await their completion. This is definitely better as we are now using all of the cores on the machine for the work. However, there is still an issue. We are processing the results in the order that the transformations were started, not the order in which they complete. If the first image were a lot larger then we wouldn't see any images until that first image had finished processing.
What we really need is the ability to know when the first one completes so we can process its results while the others are still being converted - enter Task.WhenAny
.
Task.WhenAny
takes a set of tasks on which to wait and returns the first Task
to complete - but, unlike Task.WaitAny
, it is awaitable. This again means we don't block the UI but as each task completes we can process its results and remove it from the list of outstanding tasks. While this appears to give us an optimal solution, there is a hidden problem.
To understand the issue we need to consider how Task.WhenAny
might be implemented. To wait for each of those outstanding tasks to complete we would need to set up a continuation on each and then do some processing when the first one finishes. So, if we passed 10 tasks initially we would have to set up 10 continuations. The next time round the loop we would set up nine continuations; the time after that eight continuations, and so on. More generally, for n
tasks we need to set up n(n+1)/2
continuations (55 in the case of 10 outstanding tasks).
This is hugely inefficient.
In an ideal world we want to await the tasks in the order that they will complete in the future. Unfortunately, not being able to predict the future, this is not possible.
However, what if we could construct a set of tasks that will complete in the order that the original ones will complete? What we need is to be able to create tasks whose lifetimes are fully under programmatic control - this is where TaskCompletionSource<T>
comes in. Here are the essential parts of the API:
public class TaskCompletionSource<T>
{
public Task<T> Task { get; }
public void SetCanceled();
public void SetException(Exception exception);
public void SetResult(T result);
// ...
}
The idea is that we can create an instance of TaskCompletionSource<T>
and hand someone the Task
property which they can now await. We can then decide how and when that task completes using SetResult
, SetException
and SetCanceled
. Also, as Task<T>
derives from Task
you can also use this even if we have no results to produce, which can be very useful when unit testing asynchronous components.
So, using TaskCompletionSource<T>
let's create a method that will take a set of tasks and, using a minimal number of continuations, return another set of tasks that will complete in the order the passed ones complete.
public static class TaskExtensions
{
public static IEnumerable<Task<T>> GetInCompletingOrder<T>(this IEnumerable<Task<T>> source)
{
Task<T>[] tasks = source.ToArray();
TaskCompletionSource<T>[] tcss = new TaskCompletionSource<T>[tasks.Length];
int currentSlot = -1;
for (int i = 0; i < tasks.Length; i++)
{
tcss[i] = new TaskCompletionSource<T>();
tasks[i].ContinueWith(prev =>
{
int indexToSet = Interlocked.Increment(ref currentSlot);
tcss[indexToSet].SetResult(prev.Result);
});
}
return tcss.Select(t => t.Task);
}
}
Lets walk through the code:
TaskCompletionSource<T>
so we have one for each source Task
TaskCompletionSource
array and setting up a continuation on the current source task which will fire when the source task completesTaskCompletionSource
tasks using LINQTaskCompletionSource
we are going to complete, in a thread-safe way (as more than one source task could complete simultaneously), and then set the result of that TaskCompletionSource
to the result of the completed source taskTaskCompletionSource
task in the correct way (RanToCompletion
, Faulted
, Canceled
), but for this blog post that would obscure the technique being usedLet's use this GetInCompletingOrder
extension method in our original code:
var colorImages = Directory.GetFiles(directory, "*.jpg", SearchOption.AllDirectories)
.Select(fn => new Uri(fn))
.ToList();
List<Task<BitmapSource>> tasks = new List<Task<BitmapSource>>();
foreach (Uri colourImage in colorImages)
{
tasks.Add(transforms.CreateGrayScaleImageAsync(colourImage));
}
foreach (var task in tasks.GetInCompletingOrder())
{
TransformedImages.Add(await task);
}
Now we have all of our images being processed concurrently and then we collect the results in the order they complete using a minimal number of continuations. But the most powerful thing is that the extension method we have written is not tied to our problem domain. It can be used with any set of tasks as general purpose utility code.
Last updated: Monday, 12 June 2023
Director
He/him
Richard is a Director at Rock Solid Knowledge.
We're proud to be a Certified B Corporation, meeting the highest standards of social and environmental impact.
+44 333 939 8119