Parallel execution of multiple results in Caliburn Micro

by Petr Hošek

While working with Caliburn Micro, we have came up with the need for parallel execution of multiple results. This is especially useful for concurrent load of data from multiple sources (e.g. filter content).

The framework itself contains only support for sequential execution represented by the SequentialResult class. Therefore, to add support for parallel execution, we have decided to implement our own ParallelResult class in the similar fashion.

First of all, we had to implement the IResult interface:

public class ParallelResult : IResult
{
    private readonly IEnumerator results;

    public ParallelResult(IEnumerable children)
    {
        this.results = children.GetEnumerator();
    }

    public ParallelResult(params IResult[] children)
        : this(children as IEnumerable)
    {
    }

    public event EventHandler Completed;

    public void Execute(ActionExecutionContext context)
    {
        throw new NotImplementedException();
    }

    private void OnComplete(Exception error, bool wasCancelled)
    {
        var handler = Completed;
        if (handler != null)
        {
            handler(this, new ResultCompletionEventArgs { Error = error, WasCancelled = wasCancelled });
        }
    }
}

Both constructors resembles the existing SequentialResult class. Now, we need to implement the logic of this class. Our goal is to run all tasks in parallel and notify observers when all tasks have finished. To do that, we have decided to use Reactive Extensions for .NET (Rx), library for composing asynchronous and event-based programs using observable collections.

The implementation looks as follows:

public void Execute(ActionExecutionContext context)
{
    IoC.BuildUp(this);
    var observables = Observable.Merge(Scheduler.ThreadPool, ChildObservables(context).ToArray());
    observables.Subscribe(_ => { }, ex => OnComplete(ex, true), () => OnComplete(null, false));
}

private IEnumerable<IObservable> ChildObservables(ActionExecutionContext context)
{
    while (results.MoveNext())
    {
        IResult child = results.Current;
        yield return Observable.Create(subscribe =>
        {
            IoC.BuildUp(child);
            child.Completed += (sender, args) =>
            {
                if (args.Error != null || args.WasCancelled)
                {
                    subscribe.OnError(args.Error);
                }
                subscribe.OnCompleted();
            };
            child.Execute(context);
            return () => { };
        });
    }
}

Using the ChildObservables(ActionExecutionContext) method, we convert each task (represented by IResult interface) to instance of observable collection. Then, in the Execute(ActionExecuteContext) method, we merge all observable collections and subscribe to resulting obserable collection in order to be notified of its status.

When all tasks finish, OnComplete(Exception, bool) method is called. This method notifies all the clients subscribed to Completed event. This allows instances of ParallelResult class to be composed in the same way as instances of SequentialResult class.

Tags: Caliburn Micro, Rx

2 Comments

  • ahmet said

    can you fix the typo? : "private IEnumerable>"

  • Augustin Šulc said

    Typo fixed.
    I haven't tried it but I think 'private IEnumerable<IObservable> ChildObservables' was the original text.

Add a Comment