Thursday, April 12, 2012

Awaitable Queues Part 2: Unbounded Awaitable Queue

As a first attempt to write an awaitable queue, the queue is unbounded. The producer side of the queue is then synchronous and we can concentrate on making consumer side awaitable. The code for the can be found at the end of this post.


(This is revision 2 of the text.)

Transformation applied by the compiler

To make the magic go away from the async and await combo, it helps to understand the transformation the compiler applies to an async method. The compiler converts an async method to a class that implements a state machine. The state machine has one method (MoveNext) that is a transformation of the original async method.

Every await keyword in the async method corresponds a wait state in the state machine. Thus, when the compiler sees an await keyword like this:

async Task AsyncMethod()
{
  ...
  var result = await xxx;
  ...
}

It will convert that code to something similar too:

class AsyncMethodFsm {
  State state;

  ...

  void MoveNext() {
    if(state == State.WaitingForXXXToFinish) {
      goto LabelXXXIsFinished;
    }
    ...
    this.awaiter = xxx.GetAwaiter();
    if(!awaiter.IsCompleted) {
      state = State.WaitingForXXXToFinish;
      awaiter.OnCompletion(MoveNext);
      return;
    }
  LabelXXXIsFinished:
    var result = awaiter.GetResult();
    ...
  }
}

When the AsyncMethod is called, what happens is that an AsyncMethodFsm object is allocated and the MoveNext method on it is called for the first time. MoveNext will then run until the awaiter that couldn't complete synchronously (IsCompleted property returns false). In that case the MoveNext method will remember where it was, and install a callback to the awaiter, by using OnCompletion and passing a delegate to itself. When the awaiter has finished doing whatever it was doing, it will call the MoveNext callback and the state machine calls GetResult to retrieve the result of xxx.

Code Walkthrough

At the end of this post the whole code is provided. I only show some parts that should help to understand the rest:

  public class AwaitableUnboundedQueue<T>
  {
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    private bool isCompleted;
    private readonly List<ConsumerSession> consumers = 
                         new List<ConsumerSession>();

The state of the object is protected by one lock. I use the "queue" field for a lock plus some minor tricks with holding a volatile state in ConsumerSession.

Notice that there are two "fast cases" for the queue:
  • Consuming from a non empty queue. Consuming will in this case not take the "queue" lock and not cause lock contention with the producer. That is the reason we use ConcurrentQueue.
  • Producing an item on an empty queue with a waiting consumer. The producer will be able to push the value directly through to the consumer, bypassing the queueing up of the item. The consumer will then be able to use the value without taking a lock.
Unfortunately there is still contention when the queue is empty. In that case both consumer and producer needs the main "queue" lock (TODO).

public ConsumerSession CreateConsumerSession()
{
  lock(queue) {
    var awaiter = new ConsumerSession(this);
    consumers.Add(awaiter);
    return awaiter;
  }
}

Every consuming session is stored in the consumers list. When the Dispose method of the consuming session is called the session will be removed from the list again.

public class ConsumerSession : IDisposable
{
  private readonly AwaitableUnboundedQueue<T> owner;

  // note: only when the consumer is in Waiting state the producer can change the consumer session state. 
  // In any other state only the consumer session itself able to change the state.
  private enum AwaiterState
  {
    None,
    Waiting,
    ReadyWithValue,
    ReadyWithComplete,
    Disposed
  }  
  private volatile AwaiterState state;
  private T value;
  private Action continuation;

  public ConsumerSession(AwaitableUnboundedQueue<T> owner)
  {
    this.owner = owner;
  }

  // only allowed to be called from the session thread.
  public void Dispose()
  {
    lock(owner.queue) {
      if(state != AwaiterState.Disposed) {
        state = AwaiterState.Disposed;
        value = default(T);
        owner.consumers.Remove(this);
      }
    }
  }


The ConsumerSession object is really a state machine. Furthermore when an item is retrieved it is stored in "value" field.

Hopefully a picture says more than a thousands words because here is one. The state machine starts in "None" and ends in "Disposed":



The "await session" statement will call ComsumerSession.GetAwaiter:

public ConsumerSession GetAwaiter()
{
  var currentState = state; 
  if(currentState == AwaiterState.Waiting)
    throw new ArgumentException("ConsumerSession is already in use");
  if(currentState == AwaiterState.Disposed)
    throw new ObjectDisposedException("ConsumerSession has been disposed");

  if(owner.queue.TryDequeue(out value)) {
    state = AwaiterState.ReadyWithValue;
  } else {
    lock(owner.queue) {
      if(owner.queue.TryDequeue(out value)) {
        state = AwaiterState.ReadyWithValue;
      } else if(owner.isCompleted) {
        value = default(T);
        state = AwaiterState.ReadyWithComplete;
      } else {
        value = default(T);
        state = AwaiterState.Waiting;
      }
    }
  }
  return this;
}

public bool IsCompleted { get {
  return state == AwaiterState.ReadyWithValue || state == AwaiterState.ReadyWithComplete;
}}

public void OnCompleted(Action action)
{
  lock(owner.queue) {
    if(state == AwaiterState.ReadyWithValue || 
       state == AwaiterState.ReadyWithComplete) {
      action();
    } else {
      continuation = action;
    }
  }
}

public bool GetResult()
{
  return state != AwaiterState.ReadyWithComplete && state != AwaiterState.Disposed;
}

The GetAwaiter method will examine queue, and if there is an item available or the queue is completed the state of the session will switch to one of the "Ready" states, otherwise the session will switch to "Waiting" state.

The other three methods that are called by the await transformation are pretty straightforward.

In OnCompleted we must test the state again as the state could have changed between this call and the previous call to IsCompleted. If the session happens to be ready we execute the action parameter immediately.

Notice: we call the continuation without handling exceptions. The continuation already swallows any exceptions, passing them instead to the Task object that is the result of the async method. Only the exception that cannot be swallowed like ThreadAbortException will leak out from the continuation.

public void Add(T item)
{
  Action continuation = null;
  lock(queue) {
    if(queue.IsEmpty) {
      foreach(var awaiter in consumers) {
        continuation = awaiter.TrySetValue(item);
        if(continuation != null) break;
      }
    }
    if(continuation == null) queue.Enqueue(item);
  }
  if(continuation != null && continuation != Sentinel) {
    continuation();
  }
}

When the queue is empty the Add method will check if there is a consumer waiting for data. If that is the case, it will pass the value directly to that consumer and call the continuation of the consumer. I think that is rather neat. The prefered case for unbounded queues is that the consumer should be faster than the producer and exactly in this case the code bypasses the overhead of queuing the item at all, instead it will chain it directly to the consumer.

Notice: the continuation is called directly and not posted on any saved synchronization context. That is certainly still a TODO to make it compatible with the awaiter provided for Tasks (together with a ConfigureAwait(false) to disable that).

The code



  public class AwaitableUnboundedQueue<T>
  {
    // special value of an action to denote both success and no action. 
    private static readonly Action Sentinel = () => { };

    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    private bool isCompleted;
    private readonly List<ConsumerSession> consumers = new List<ConsumerSession>();

    public void Add(T item)
    {
      Action continuation = null;
      lock(queue) {
        if(queue.IsEmpty) {
          foreach(var awaiter in consumers) {
            continuation = awaiter.TrySetValue(item);
            if(continuation != null) break;
          }
        }
        if(continuation == null) queue.Enqueue(item);
      }
      if(continuation != null && continuation != Sentinel) {
        continuation();
      }
    }

    public void CompleteAdding()
    {
      List<Action> continuations;
      lock(queue) {
        if(isCompleted) return;
        isCompleted = true;
        if(queue.Count > 0) return;
        // queue is empty: notify the awaiters immediately
        continuations = new List<Action>();
        foreach(var consumingAwaiter in consumers) {
          var action = consumingAwaiter.TryCompleteAdding();
          if(action != null) {
            continuations.Add(action);
          }
        }
      }
      foreach(var action in continuations) {
        action();
      }
    }

    /// <summary>
    /// Consuming a value is done using an awaiter.
    /// </summary>
    /// <returns></returns>
    public ConsumerSession CreateConsumerSession()
    {
        lock(queue) {
          var awaiter = new ConsumerSession(this);
          consumers.Add(awaiter);
          return awaiter;
        }
    }

    public class ConsumerSession : IDisposable
    {
      private readonly AwaitableUnboundedQueue<T> owner;

      private volatile AwaiterState state;
      private T value;
      private Action continuation;

      public ConsumerSession(AwaitableUnboundedQueue<T> owner)
      {
        this.owner = owner;
      }

      public ConsumerSession GetAwaiter()
      {
        var currentState = state; 
        if(currentState == AwaiterState.Waiting)
          throw new ArgumentException("ConsumerSession is already in use");
        if(currentState == AwaiterState.Disposed)
          throw new ObjectDisposedException("ConsumerSession has been disposed");

        if(owner.queue.TryDequeue(out value)) {
          state = AwaiterState.ReadyWithValue;
        } else {
          lock(owner.queue) {
            if(owner.queue.TryDequeue(out value)) {
              state = AwaiterState.ReadyWithValue;
            } else if(owner.isCompleted) {
              value = default(T);
              state = AwaiterState.ReadyWithComplete;
            } else {
              value = default(T);
              state = AwaiterState.Waiting;
            }
          }
        }
        return this;
      }

      public T Value { [DebuggerStepThrough] get { return value; } }

      public bool IsCompleted
      {
        get { return state == AwaiterState.ReadyWithValue || state == AwaiterState.ReadyWithComplete; } }

      public bool GetResult()
      {
        return state != AwaiterState.ReadyWithComplete && state != AwaiterState.Disposed;
      }

      public void OnCompleted(Action action)
      {
        lock(owner.queue) {
          if(state == AwaiterState.ReadyWithValue || state == AwaiterState.ReadyWithComplete) {
            // this is a seldom case where between IsCompleted and OnCompleted the queue state has changed. So the chance
            // of creating a stack overflow is minimal.
            action();
          } else {
            continuation = action;
          }
        }
      }

      // only called while holding lock
      internal Action TrySetValue(T item)
      {
        if(state != AwaiterState.Waiting) {          
          return null;
        }
        value = item;
        state = AwaiterState.ReadyWithValue;
        var tmp = continuation;
        continuation = null;
        return tmp ?? Sentinel;
      }

      // only called while holding lock
      internal Action TryCompleteAdding()
      {
        if(state != AwaiterState.Waiting) {
          return null;
        }
        value = default(T);
        state = AwaiterState.ReadyWithComplete;
        var tmp = continuation;
        continuation = null;
        return tmp;
      }

      public void Dispose()
      {
        lock(owner.queue) {
          if(state != AwaiterState.Disposed) {
            value = default(T);
            state = AwaiterState.Disposed;
            owner.consumers.Remove(this);
          }
        }
      }

      // note: only the consumer session thread itself able to out from a None, ReadyWithValue 
      // or ReadyWithComplete state. If one of these three states are set the consumer knows 
      // the state won't be changed by the producer.
      private enum AwaiterState
      {
        None,
        Waiting,
        ReadyWithValue,
        ReadyWithComplete,
        Disposed
      }
    }
  }

No comments:

Post a Comment