The await keyword of c# can be applied to any object (not only Tasks), and because these objects can be reused it should be possible to build a less wasteful version of an asynchronous queue.
A Task based blocking queue
A fully asynchronous queue would allow asynchronous adding and asynchronous taking. The queue api would look something like this:// implementation using Tasks:
class AsyncBlockingQueue<T> {
public Task AddAsync(T x);
public void CompleteAdding();
public Task<T> TakeAsync();
}
The API is inspired on BlockingCollection<T>.Producer example
As an usage example, here is a Producer Task that pulls items from an enumerable and adds them to the queue. The Producer will be throttled when the queue is full:
private async Task Producer<T>(IEnumerable<T> list, AsyncBlockingQueue<T> q)
{
foreach(var item in list) {
await q.AddAsync(item);
}
q.CompleteAdding();
}
}
Note: there is no need for the CompleteAdding method be async as the Producer can't put more items on the queue after that. Returning a Task in CompleteAdding would be meaningless, as there is no more reason to throttle the producer.
Consumer example
This example code takes all elements from the queue and pushes it to an observer:
private async Task Consume<T>(AsyncBlockingQueue<T> q,
IObserver<T> observer)
{
while(true) {
T t;
try {
t = await q.TakeAsync();
} catch(InvalidOperationException ex) {
// this exception marks the end of the queue
observer.OnCompleted();
return;
} catch(Exception ex) {
observer.OnError(ex);
throw;
}
observer.OnNext(t);
}
}
The code is a little awkward with the exception handling around TakeAsync. That is because the completion of the queue is notified with an InvalidOperationException. Notice that the code makes sure that the observer will always get either an OnCompleted or OnError (queue broken), except when OnNext itself throws, the assumption is that the observer itself is broken and we stop copying to immediately.
An awaitable blocking queue
The disadvantage of the API just presented is that just to enqueue or dequeue we are allocating Tasks, and internally probably TaskCompletionSources. This might be okay, but when the queue has a lot of data throughput this will be very wasteful and the garbage collector will get busy removing these objects. In the case of AddAsync we might be lucky and we can return the same cached completed Task over and over again (when the producer is slow). But for the Consumer caching is not available (at least not with the API provided).
An alternative would be to use change the API in two points:
- We assume that the producer and consumer are long living. A producer or consumer would take use of the same queue over a longer session where potentially many items get added or taken.
- The queue will be used with the async/await language feature. It is okay not to return Task objects.
class AwaitableBlockingQueue<T> {
public ProducingAwaiter CreateProducerSession();
public void CompleteAdding();
public ConsumingAwaiter CreateConsumerSession();
class ProducingAwaiter : IDisposable {
public void Dispose();
public ProducingAwaiter Add(T item);
public ProducingAwaiter GetAwaiter()
public bool IsCompleted {get; }
public void GetResult();
public void OnCompleted(Action action);
}
class ComsumerAwaiter : IDisposable{
public void Dispose();
public T Value { get; }
public ConsumingAwaiter GetAwaiter();
public bool IsCompleted {get; }
public bool GetResult();
public void OnCompleted(Action action);
}
}
Even though the api looks daunting, usage is still quite simple as most methods provided are aimed at the async/await language feature.Producer example
private async Task Producer<T>(IEnumerable<T> list, AwaitableBlockingQueue<T> q)
{
using(var session = q.CreateProducerSession()) {
foreach(var item in list) {
await session.Add(item);
}
}
q.CompleteAdding();
}
The code is nearly the same as in the previous producer example except that now a session object is created. The code still looks neat.
Consumer example
private async Task Consume<T>(AwaitableBlockingQueue<T> q,
IObserver<T> observer)
{
using(var session = q.CreateConsumerSession()) {
try {
while(await session) {
observer.OnNext(session.Value);
}
} catch(Exception ex) {
observer.OnError(ex);
throw;
}
observer.OnCompleted();
}
}
Awaiting the session now returns a boolean, denoting if the queue was completed. This makes the code better readable. After awaiting the session the value is stored in the Value getter.
The next post will be the implementation of a first version where only the consuming part is asynchronous.
No comments:
Post a Comment