Browse by Tags

All Tags » reactive extensions   (RSS)

The Problem

A common UI pattern is to expose text in a read-only container, then swap it for an editable input box based on a command. There are multiple approaches to doing this (including just changing the style of the same container). Swapping between a read-only TextBlock to a TextBox is easy enough, but what if you want to also focus and select the TextBox so the user can simply begin typing? And what if the UI elements are nestled deep with in a data template so there is no straightforward way to reference them?

A Solution

I say, "a solution" because there are probably other ones, but this is how I recently tackled the problem.

Unique Identifier

First, I figured no matter how nested the text box would be, it most likely is data bound to some data element. So, in order to uniquely identify the "transaction" I could expose a unique identifier on the bound object. Assume the bound field is "Name":

public string NameIdentifier { get; private set; }

...

// constructor
NameIdentifier = Guid.NewGuid.ToString();

The Message

Using the Event Aggregator pattern, I created a message payload specifically for the "message" that the text item should receive focus:

public class TextFocusEvent
{
    public TextFocusEvent(string identifier)
    {
        Identifier = identifier;
    }
    public string Identifier { get; set; }
}

In the "edit command" we can now publish our message that the text box should receive focus (notice I'm using the DelegateCommand from Prism):

EditCommand = new DelegateCommand<object>(
    obj =>
        {
            if (!EditCommand.CanExecute(obj)) return;

            _oldName = _name;
            IsInEditMode = true;
            EventAggregator.Publish(new TextFocusEvent(NameIdentifier));
        },
    obj => !IsInEditMode);

In this case, you can infer we have an IsInEditMode flag exposed, which we can use to bind the visibility of the TextBlock and TextBox to swap them out. The event saves the old name so if the user cancels, it can be put back. We've just published an event to let the world know that text box deserves some focus. Now let's catch it!

The Behavior

I decided to go with a behavior that could subscribe to the message. Because my event aggregator is based on Reactive Extensions (Rx), instead of being a straight event subscription, it actually returns IObservable, which can be filtered using LINQ. This way an attached behavior can simply listen for the specific identifier it is "tuned" to. We want to databind the identifier because it is generated by the view model, so we expose that property as a dependency property. Here's the behavior:

public class TextBoxFocusListenerBehavior : Behavior<TextBox>
{
    [Import]
    public IEventAggregator EventAggregator { get; set; }

    public static readonly DependencyProperty IdentifierProperty =
            DependencyProperty.Register("Identifier", 
            typeof (string), 
            typeof (TextBoxFocusListenerBehavior),
            new PropertyMetadata(string.Empty));

    private IDisposable _listener;

    public string Identifier
    {
        get { return GetValue(IdentifierProperty).ToString(); }
        set { SetValue(IdentifierProperty, value);}
    }

    protected override void OnAttached()
    {
        if (DesignerProperties.IsInDesignTool)
        {
            return;
        }

        if (EventAggregator == null)
        {
            CompositionInitializer.SatisfyImports(this);
        }

        if (EventAggregator != null)
        {
            var query = from evt in EventAggregator.GetEvent<TextFocusEvent>()
                        where evt.Identifier.Equals(Identifier)
                        select true;

            _listener = query.Subscribe(evt =>
                                            {
                                                if (AssociatedObject.Focus())
                                                {
                                                    AssociatedObject.SelectAll();
                                                }
                                            });
        }

        base.OnAttached();
    }

    protected override void OnDetaching()
    {
        if (_listener != null)
        {
            _listener.Dispose();
        }
        base.OnDetaching();
    }
}

So what's going on? First, we are importing the global event aggregator (we know this fires on the UI thread, so I'm not using a mutex to check to see if I need to satisfy the imports and request the reference from MEF). To stay design-time friendly, we don't try to compose if we're in design mode.

When the behavior is attached, the subscription is made for the event. Notice, however, that a filter is being used to filter only the identifier we are interested in. When a new event is pushed to us, we simply set the focus and auto-select the text. This has the effect of highlighting the text box so the user can begin typing right away. When the behavior is detached, we dispose of the subscription.

The XAML

Now we can put it all together and attach the behavior in our XAML:

<TextBox Text="{Binding Name, Mode=TwoWay}">
    <i:Interaction.Behaviors>
             <behaviors:TextBoxFocusListenerBehavior Identifier="{Binding NameIdentifier}"/>
    </i:Interaction.Behaviors>
</TextBox>
<HyperlinkButton Content="edit" Command="{Binding EditCommand}"/>

I've left out the nuances of the TextBlock and related code to swap into/out of view, but you get the point ... now, even if the text box is buried within a set of data templates, simple data-binding gives us the way to tie the edit event with the focus behavior. Of course, the event aggregator is a generic approach: you could also create a more strongly typed message contract between the behavior and the event.
Jeremy Likness

I've been doing quite a bit with Reactive Extensions (Rx) for Silverlight lately. One idea that I keep exploring is the concept of creative intuitive sequential workflows for asynchronous operations. You can read about my explorations using Wintellect's own Power Threading Library in this post along with a simple solution using an interface and enumerators in part 2 of that series. I'm tackling the problem from a different angle.

First, my disclaimers: this is more of an exploratory, "Here's what can be done" post. Please don't take this as best practice or guidance, but more of another way of looking at the solution as well as an opportunity to dive deeper into Reactive Extensions. Also understand I am by no means an expert with Rx so I welcome feedback related to this experiment.

The concept is straightforward: there are often times we want an asynchronous set of operations to perform sequentially. Perhaps you must load a list from a service, then load the selected item, then trigger an animation. This can be done either by chaining the completed events or nesting lambda expressions, but is there a cleaner way?

To me, a cleaner solution would make it easy to see what happens sequentially, and also easy to fire the sequence. If it becomes too complex to force the sequential workflow, it buys us nothing from the traditional methods of wiring into completed events or nesting lambda expressions.

So, I started with the idea of having a helper object that I could feed operations to in sequence, then fire it off. What would that look like?

First, I made an interface to keep track of operations as they run and complete. This is by no means a perfect design but for now it's a sort of "recrusive container" for work done. The interface looks like this:

public interface ISequentialResult 
{
    List<ISequentialResult> Results { get; set; }
    object Value { get; set; }        
}

The result is simple: it aggregates all previous results, and contains a pointer to whatever is processing the current action in the sequence. Simple enough. Now on to a base class:

public abstract class BaseResult<T> : ISequentialResult 
{
    protected IObserver<ISequentialResult> Observer { get; set; }

    protected BaseResult(IObserver<ISequentialResult> observer)
    {
        Results = new List<ISequentialResult>();
        Observer = observer;
    }

    public List<ISequentialResult> Results { get; set;}

    public object Value { get; set; }       

    public T TypedValue
    {
        get { return (T) Value; }
        set { Value = value; }
    }
}

The base class does a little more for us. First, it introduces the concept of an IObserver. The observer is simply a class that receives notifications, and the type is the type of the notification. In this case, we allow for a derived type that will create the observer as well as take our "object" value and type it to a specific value. You'll see how the observer works with our sequences in a bit.

Next, I created three implementations. One is just a default that does nothing, and the other two handle Click events and Storyboard interactions. Let's take a look at the handler for storyboards:

public class StoryboardResult : BaseResult<Storyboard>  
{
    public StoryboardResult(IObserver<ISequentialResult> observer, Storyboard sb) : base(observer)
    {
        TypedValue = sb;
        sb.Completed += SbCompleted;
        sb.Begin();
    }

    void SbCompleted(object sender, EventArgs e)
    {
        TypedValue.Completed -= SbCompleted;
        Observer.OnNext(this);
        Observer.OnCompleted();
    }       
}

You'll notice a few interesting things. We take in the story board, wire into its Completed event, then fire it off (we could have changed the interface to have an Execute method to give this more flexibility). The story board completion is the important piece to consider. First, the event is unhooked. Then, we pass the result to the observer. The observer is waiting for ISequentialResult notifications. We provide it with one, but only after the story board is completed. We also tell the observer "we're done" (an observer can listen for multiple push notifications, so we could have created a handler that never completed, or one that aggregated multiple storyboards and only completed when all story boards were done).

With that in mind, take a look at the Click handler:

public class ClickResult : BaseResult<ButtonBase>
{
    public ClickResult(IObserver<ISequentialResult> observer, ButtonBase button) : base(observer)
    {
        TypedValue = button;
        button.Click += ButtonClick;
    }

    void ButtonClick(object sender, System.Windows.RoutedEventArgs e)
    {
        TypedValue.Click -= ButtonClick;
        Observer.OnNext(this);
        Observer.OnCompleted();                 
    }
}

This is very similar to the storyboard. There is nothing to "kick off" but instead this result simply registers itself whenever the target is clicked. Why would we do that? I'll get to the example in a minute.

The most interesting part of workflow is the sequencer itself. This is the class that will manage the asychronous events and coordinate them. The key is that while they run sequentially, they also run asynchronously so there is no blocking.

Here is the sequencer class:

public class Sequencer : IDisposable 
{
    private IDisposable _sequence;
    private ISequentialResult _result;

    private readonly List<Func<IObserver<ISequentialResult>,ISequentialResult,ISequentialResult>> _sequences
        = new List<Func<IObserver<ISequentialResult>, ISequentialResult,ISequentialResult>>();

    public void AddSequence(Func<IObserver<ISequentialResult>, ISequentialResult,ISequentialResult> sequence)
    {
        _sequences.Add(sequence);
    }
        
    public void Run(Action<Exception> exception, Action<ISequentialResult> completed)
    {
        _sequence = Observable.Iterate(_Sequencer)
            .Subscribe(
                next => { },
                exception,
                ()=>completed(_result));
    }

    private IEnumerable<IObservable<Object>> _Sequencer()
    {
        var x = 0;

        _result = new DefaultResult(null) {Value = this};

        while (x (
                observer =>
                    {
                        var newResult = sequence(observer, _result);
                        newResult.Results.Add(_result);
                        newResult.Results.AddRange(_result.Results);
                        _result = newResult;
                        return () => { };
                    }
                ).Start();

            yield return step;                

            x++;
        }

        _result.Results.Add(_result);
    }

    public void Dispose()
    {
        _sequence.Dispose();
    }
}

There's a lot going on here, so let's break it down.

First, you'll notice two references: one to a disposable object, and one to a result. More on those in a bit.

The list may seem confusing at first, but it's only because of all of the type specifiers. Each "step" is represented by a function. The sequence will call the function with an observer and the previous result, and expect to get a new result back. Think of it as "here's who is watching, and what happened the last time ... now give me what you have."

A method is provided to add these steps to the sequence.

The run method is interesting. This is where we set up our disposable reference because we're creating a long-running observer that we want to dispose of when the sequence itself is disposed. The iterate function takes a list of observable sequences and observes them sequentially. We provide an enumerator that feeds the observable sequences, and for each sequence in the "outer loop" (the main algorithm that drives the sequential work flow) we simply drive through the collection. If an exception is encountered, we'll call back with the exception. When completed, we call back with the final result.

To better understand what's going on, take a look at the enumerator. For each iteration, we create a new observable stream. We call the function I mentioned earlier and pass the observer in. When we receive the result, we stack it recursively and store it, then iterate to the next in line. The yield statement ensures the sequential operation completes (when we call the OnCompleted in our result) before the next step begins.

That's the complicated part: setting up the core framework. Now comes the easy part: plugging into it.

In the XAML I placed a large red rectangle. There are three storyboards tied to the rectangle. One changes the colors, one shrinks it, and one twists it using the plane projection. There are also three buttons. One button kicks off the story boards. One button kicks off a sequential workflow that will run the story boards in order. The final button resets the story boards by calling Stop on them.

Let's have some fun. First, kicking off the story boards is simple enough:

private void Button_Click(object sender, RoutedEventArgs e)
{
    StoryTwist.Begin();
    StoryShrink.Begin();
    StoryColor.Begin();
}

The reset button is also easy:

private void Button_Click_2(object sender, RoutedEventArgs e)
{
    StoryTwist.Stop();
    StoryShrink.Stop();
    StoryColor.Stop();
}

Now for the sequential storyboards. This is where things should get easier for us. Instead of having to wire in several completed events or nesting lambdas, let's see what it looks like to run them in order using our sequencer:

private void Button_Click_1(object sender, RoutedEventArgs e)
{
    if (_sequence != null)
    {
        _sequence.Dispose();
    }

    _sequence = new Sequencer();
    _sequence.AddSequence((o, r) => new StoryboardResult(o, StoryColor));
    _sequence.AddSequence((o, r) => new StoryboardResult(o, StoryShrink));
    _sequence.AddSequence((o, r) => new StoryboardResult(o, StoryTwist));
    _sequence.Run(ex => MessageBox.Show(ex.Message),
                    result =>
                        {
                            foreach (var storyboard in
                                result.Results.Select(sequence => sequence.Value).OfType<Storyboard>())
                            {
                                storyboard.Stop();
                            }
                            _sequence.Dispose();
                            _sequence = null;
                        });
}       

So what's going on? If we have a previous sequence running, we dispose it so we can start fresh. Then, we simply add each story board in the order we want it to fire (the observer is provided to us by the sequencer). Finally, we run it. If there is an error, show it. When the sequence is done, we iterate the results and find any result that was a story board and call the "stop" method. This means after the sequence completes, it will automatically restore the rectangle to its original state.

Finally, to show just how powerful it is to drive sequential workflows without blocking, I added one more method:

private void _WaitForThreeClicks()
{
    _buttonSequence = new Sequencer();
    _buttonSequence.AddSequence((o, r) => new ClickResult(o, ResetButton));
    _buttonSequence.AddSequence((o, r) => new ClickResult(o, ResetButton));
    _buttonSequence.AddSequence((o, r) => new ClickResult(o, ResetButton));
    _buttonSequence.Run(ex=>MessageBox.Show(ex.Message),
        result=>
            {
                MessageBox.Show("You clicked the Reset button 3 times!");
                _buttonSequence.Dispose();
                _WaitForThreeClicks();
            });
}

This is a fun method. It literally creates three sequences, all "waiting" for the reset button to be clicked. After the sequence completes, we show a message indicating that 3 clicks happened, then restart the sequence recursively. I'll call it the first time just after IinitializeComponent:

public MainPage()
{
    InitializeComponent();  
    _WaitForThreeClicks();
}      

There you have it! When you run the code, you'll get this:


Notice that you can keep clicking the sequence to start it over: there is no blocking. And whatever order you decide to click on other buttons, the reset button always faithfully shows a "3 click" message on the third click.

You can download the full source code for this solution here.
Jeremy Likness

I've been working with the Reactive Extensions (Rx) library quite a bit lately and am very impressed. While it is a new way of thinking about services, it certainly makes life much easier. In this example, I'll show you a way to simplify your web service calls using Rx. In fact, even if you don't use Reactive Extensions, you may benefit from the proxy wrappers that I'll describe.

I'm assuming you are familiar with Silverlight, web services, and have some exposure to the Managed Extensibility Framework. You'll also want to make sure you've got the latest version of Rx for Silverlight 4.

Let's get started! First, create a new Silverlight 4 Application. Keep all of the defaults: we do want a web project, but we aren't using RIA.

The Service: Server Side

Let's create a simple calculator service. Sure, it is a simple example, but it will make it easier to focus on the details of Rx rather than puzzling over a more complex web service example.

Create a new service and call it "Calculator." Just place it in the root of the web application. Create a contract and implement it, so that your service ends up looking like this:

namespace RxWebServices.Web
{
    [ServiceContract(Namespace = "http://csharperimage/")]
    public interface ICalculator
    {
        [OperationContract]
        long Add(int operand1, int operand2);       
    }

    [AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
    public class Calculator : ICalculator
    {
        readonly Random _random = new Random();

        public long Add(int operand1, int operand2)
        {
            Thread.Sleep(TimeSpan.FromMilliseconds(_random.Next(1000) + 50));
            return operand1 + operand2;
        }        
    }
}

Notice I built in a delay. This is important to see how Rx helps us handle the asynchronous nature of web service calls.

Go ahead and hit CTRL+F5 to build and run without debugging. This will set up the service end point for us to grab in Silverlight. Now, in your Silverlight project, let's set some things up.

The Service: Client Side

First, we want to add some references to both Reactive Extensions (Rx) and the Managed Extensibility Framework. Below, I've highlighted the references to add:


Now we can add our service reference. Right-click references, choose "Add Service" and select "Discover Services in Solution". You should be able to select the calculator service. Put it in the namespace "Calculator Service" as depicted below.


Making Things Easy: ServiceProxy

Services can seem complex, but with the factory patterns provided by the framework and the support of relative paths, abstracting the creation of an end point is easy. I like to create a proxy class thas manages the end points for me. In this example, I store the end point as a constant. However, you can easily make it a parameter for your Silverlight application and construct it on the fly. All my "consumer" really cares about is the service contract, not the details of how to wire in the service endpoint. So, let's make it easy. Take a look at the following class. The class itself is never instanced directly, but it will export the service contract so that wherever I import it, I'll have a fully wired version of the proxy ready to use.

Create a folder called "Implementation" and add "ServiceProxy.cs". Your class will look like this:

namespace RxWebServices.Implementation
{
    public  class ServiceProxy
    {
        private const string CALCULATOR_SERVICE = "../Calculator.svc";
        private const string NOT_SUPPORTED = "Type {0} not supported";                

        private static readonly Dictionary<Type, Uri> _serviceMap
            = new Dictionary<Type, Uri> {{typeof (ICalculator), new Uri(CALCULATOR_SERVICE,UriKind.Relative)}};

        public static T GetProxyFor<T>()
        {
            if (!_serviceMap.ContainsKey(typeof(T)))
            {
                throw new TypeLoadException(string.Format(NOT_SUPPORTED, typeof (T).FullName));
            }

            return
                new ChannelFactory<T>(new BasicHttpBinding(), new EndpointAddress(_serviceMap[typeof (T)])).
                    CreateChannel();
        }

        [Export]
        public ICalculator CalculatorService
        {
            get { return GetProxyFor<ICalculator>(); }
        }
    }
}

Take a look. We are mapping the service contract to the end points. In our case, it is relative to the site serving the Silverlight application. Because the application is in ClientBin, we back up one level to access the service. Note this will work just as easily for a service hosted somewhere else: I would simply specify a relative or absolute uri. We only have one service, but the dictionary makes it easy to map multiple ones. The export uses the channel factory to generate an instance and return the client.

Our Internal Contract

I rarely let the rest of my application concern itself with the details of the service. Any other area of my application is simply asking for results based on input, regardless of how it is obtained. Therefore, I'll create a very light contract for the calculator service internally - one that is easy to mock and test.

Create a folder called "Contract" and add one interface, ICalculatorService. The interface looks like this:

namespace RxWebServices.Contract
{
    public interface ICalculatorService
    {
        IObservable<long> Add(int operand1, int operand2);
    }
}

Here is where things get interesting. You should be familiar with IEnumerable which we'll call a "pull" sequence of elements: you pull the values from the iterator. With Reactive Extensions, we invert this using IObservable to create a "push" sequence. With the push sequence, you subscribe and receive an event (pushed to you) when an element is available. In this case, we'll subscribe by sending in two operands, and wait to be pushed the result when it comes back.

Wrapping the Service

Now we've got a service proxy and an interface. Let's satisfy the contract. I'll show you the code, then explain it. Under the implementation folder, create a Calculator.cs class and wire it up like this:

namespace RxWebServices.Implementation
{
    [Export(typeof(ICalculatorService))]
    public class Calculator : ICalculatorService, IPartImportsSatisfiedNotification
    {
        [Import]
        public ICalculator CalculatorProxy { get; set; }

        private Func<int,int,IObservable<long>> _calculatorService;            

        public IObservable<long> Add(int operand1, int operand2)
        {
            return _calculatorService(operand1, operand2);
        }

        public void OnImportsSatisfied()
        {
            _calculatorService = Observable.FromAsyncPattern<int, int, long>
                (CalculatorProxy.BeginAdd, CalculatorProxy.EndAdd);
        }
    }
}

Let's break it down. First, you'll notice we import the calculator service. This is the actual proxy we set up in the previous class. When the import is satisfied, we use a helper method provided by Rx to convert the asynchronous call into an observable list. The FromAsyncPattern takes in the types of the inputs, followed by the type of the output. It creates a function that, when called, returns an observable list of the results. In this case, we cast it from the beginning call to our calculator service to the return call. This is the way we take the asynchronous call and turn it into an observable list.

When we actually want to use the method, we call the function with the inputs, and receive the output as the observable. Thus, we do all of the conversion internally, hide the implementation details, and just return a stream that can be subscribed to in order to fetch the results.

Take a look at the signature for the actual service:

private interface ICalculator
{
    IAsyncResult BeginAdd(int operand1, int operand2, AsyncCallback callback, object asyncState);
    
    long EndAdd(IAsyncResult result);
}

To use Rx, we want a function that takes all of the inputs up until the AsyncCallback parameter, and returns an observable list of the return value. In this case, our two inputs are int, and it returns a long, so our function signature is Func<int,int,IObservable<long>>. By using these same types on the FromAsyncPattern extension method, Rx will return us the appropriate function and expect a pointer to the methods to start and the end the call.

Fibonnacci Sequence

Now we can get to the fun part: using the service. We'll use the service two different ways to illustrate how the observable lists work. In the MainPage.xaml, add some rows, a set of buttons, and a stackpanel. Generate code behind for the buttons. It will look something like this:

<Grid x:Name="LayoutRoot" Background="White">
    <Grid.RowDefinitions>
        <RowDefinition Height="Auto"/>
        <RowDefinition Height="*"/>
    </Grid.RowDefinitions>
    <StackPanel Orientation="Horizontal" HorizontalAlignment="Center">
        <Button Content=" GO " Click="Button_Click" Margin="5"/>
        <Button Content=" GO " Click="Button_Click_1" Margin="5"/>
    </StackPanel>
    <StackPanel Orientation="Horizontal" Grid.Row="1" HorizontalAlignment="Stretch" VerticalAlignment="Stretch" x:Name="MainSurface"/>
</Grid>

Next, let's go to the code behind and wire in the first example. First, we'll add some properties we're going to be using:

[Import]
public ICalculatorService Calculator { get; set; }

private IDisposable _sequence;

private readonly Subject<long> _watcher = new Subject<long>();

private int _x, _y, _iterations;

The first piece is the service, which we import using MEF. When we subscribe to services, we receive a disposable observer. In order to cancel observations in progress and start new ones, we'll keep a reference to this using the _sequence field.

What's Your Favorite Subject?

The subject is interesting. Subjects are used to set up a publisher/subscriber model. The subject here is a long. Anyone with access to the subject can publish (send it a long value) and/or subscribe (receive notifications when values are published). We'll use this to bridge between our UI and the service.

Finally, we've got some local variables to use to keep track of state.

Next, we'll set everything up in the constructor:

public MainPage()
{
    InitializeComponent();

    if (DesignerProperties.IsInDesignTool) return;

    CompositionInitializer.SatisfyImports(this);

    _watcher.ObserveOnDispatcher().Subscribe(
        answer =>
            {
                var grid = new Grid
                                {
                                    Width = answer,
                                    Height = answer,
                                    Background = new SolidColorBrush(Colors.Red),
                                    Margin = new Thickness(5, 5, 5, 5)
                                };
                var tb = new TextBlock {Margin = new Thickness(2, 2, 2, 2), Text = answer.ToString()};
                grid.Children.Add(tb);
                MainSurface.Children.Add(grid);
                _Add();
            });
}

The first thing you'll notice is that if we're in the designer, all bets are off and we drop out. Otherwise, we compose the parts, which gives us our service import. Next, we'll subscribe to our subject. Notice that we don't have any service interaction yet. The subscription basically breaks down like this:

  • I'm interested in the subject with long values
  • When something happens, let me know on the dispatcher thread (as I'm going to do something with the UI)
  • When a long value is observed, give it to me: I'll make a grid as big as the value I received, put some text inside it, add it to the stack panel and then call the _Add method

That's very simple and straightforward. No we can explore the missing method. First, let's kick things off when the user clicks the first button. I want to use the add service to compute a fibonnacci ratio (each number is the sum of the previous two, started with 1 and 1). I'll implement the button click code-behind and add the missing method here:

private void Button_Click(object sender, RoutedEventArgs e)
{
    if (_sequence != null)
    {
        _sequence.Dispose();
    }

    MainSurface.Children.Clear();

    _x = 1;
    _y = 1;
    _iterations = 0;

    _watcher.OnNext(_x);            
}      
  
private void _Add()
{         
    _sequence.Dispose();

    if (++_iterations == 20)
    {
        return;
    }

    _sequence = Calculator.Add(_x, _y).Subscribe(answer =>
                                            {
                                                _x = _y;
                                                _y = (int)answer;
                                                _watcher.OnNext(answer);                                                     
                                            });
}

So the first part should be straight forward. If we had another sequence, dispose it. This will cancel any observations in progress. Clear the surface, initialize our variables, and then call the OnNext function on our subject. What's that? Simple: we just published a number. The subject will receive the number (1) and then push it to any subscriptions. We subscribed earlier, so we'll create a 1x1 grid and call the _Add method.

This method is even more interesting. First, we stop after 20 iterations. No sense in going to infinity. Next, we subscribe to the calculator service. Subscriptions to observable lists are the same as subscriptions to subjects. We're asking to watch for a value, and initiating the "watch" by sending in our first values (1 and 1). When we receive the answer, we shift the numbers to continue the sequence, and then publish the number to the subject.

This allows us to "daisy chain" service calls. We wait until we receive the first answer before we ask the next question. At this point, if you hit F5 (or CTRL-F5) to run it, and click the first button, you should see this:

Note if you keep clicking while it is rendering, it will start over. There will be no "hanging" calls because the calls are daisy chained. We are also not blocking the UI while waiting, or you wouldn't be able to click the button again. You can clearly see the delays on the server as the results are returned.

Here is a simplified overview of what is happening:

Random Addition

Now we'll throw another function into the mix. It's time to set up the second button. For this button, we're going to add two methods. The first is an enumerable that returns nothing but random numbers. It loops infinitely so we obtain as many numbers as we like, and we'll receive them in tuples:

private static IEnumerable<Tuple<int,int>> _RandomNumbers()
{
    var random = new Random();

    while (true)
    {
        yield return Tuple.Create(random.Next(100), random.Next(100));                
    }
}

In the event handler for the second button, add this bit of code:

private void Button_Click_1(object sender, RoutedEventArgs e)
{
    if (_sequence != null)
    {
        _sequence.Dispose();
    }

    MainSurface.Children.Clear();

    _sequence = _RandomNumbers()
        .ToObservable()
        .Take(20)                
        .Subscribe(numbers => Calculator.Add(numbers.Item1, numbers.Item2)
                                    .ObserveOnDispatcher()
                                    .Subscribe(result =>
                                                    {
                                                        var text = string.Format("{0}+{1}={2}", numbers.Item1,
                                                                                numbers.Item2, result);
                                                        var tb = new TextBlock
                                                                    {Margin = new Thickness(5, 5, 5, 5), Text = text};
                                                        MainSurface.Children.Add(tb);
                                                    }));                
}    

This is a little different. First, we're taking the enumerable list of random numbers and turning it into an observable list so the values will be pushed to us. This is just by way of demonstration; we could have just as easily iterated the list with a foreach loop instead. What's interesting here is that I can limit how many I grab with the Take(20) extension. I subscribe like I do to any other observable list, and when I receive the next number pair, I turn around and subscribe to the calculator service to add the numbers for me. Instead of publishing the result to the subject, I'm handling it myself. I observe on the dispatcher thread, then add a text block with the addition statement to the stack panel.

Go ahead and run the application, click the button, and you'll receive output that looks like this:

Observations (Pardon the Pun)

If you run this and click the go button, you might notice something interesting. No matter how many times you click, you get the full sequence of numbers. In other words, if I let 5 numbers come back, then click go, I'll receive a sequence of 35 numbers, not 25.

Even more interesting is if you click the second go button, wait until most (but not all) of the 20 numbers return, then click the first go button. You'll see the screen clear, but you'll receive a few sequences of added numbers before the fibonnacci sequence starts.

What's Going On?

But we disposed of the subscription, right? Not exactly. In this implementation, we always getting the same service subscription. The subscription we cancel is the outer observation. To better understand this, load up a tool like Fiddler and watch the service calls. In the first example, the call is made, there is a delay, it returns, and then the next call is made.

In the second example, however, almost all of the calls are made almost all at once. They return at different rates due to the delays on the server. So, when you start a new sequence, you subscribe to the same service and therefore get to watch the results return that hadn't made it back from the initial calls.

This is important to understand as you are building your extensions, because in some cases you might want a new observable sequence, while in others it makes sense to keep the existing one. It depends on your needs and the desired behavior.

Hopefully this will help open some new doors of understanding about Reactive Extensions!

Click here to download the source code for this article

Jeremy Likness

Reactive Extensions (Rx) is a product from Microsoft Research that simplifies the management and composition of asynchronous events. If you read my earlier post on Asynchronous Workflows, you'll understand why the asynchronous programming model can sometimes lead to confusing and hard-to-maintain code.

There is nothing wrong with the model, but the fact that you must subscribe to an event or register a callback means your code can end up nested and scattered to the winds. Add to that the complexity of aggregating events (for example, firing multiple asynchronous events and then waiting for them all to complete before going on) and it's enough to require a visit to the pharmacy. OK, that was a bad lead-in because of course that's where you can get your Rx (prescription).

There is a lot to explore in the framework, but you can start by visiting the main site and downloading the extensions (they are available for .NET 3.5, .NET 4.0, Silverlight 3, Silverlight 4 and JavaScript). Probably the best and easiest way to learn using Rx is to follow the hands-on labs that you can download in PDF format from this blog post.

I'd like to give a gentle introduction by demonstrating how it solved a very specific need of mine. A very common pattern is what I'll call "synchronized lists" where one list triggers a change in the other. The classic example is selecting a country to get a list of states for that country. Selecting a country means we fetch or filter a new list of states, and if we're kind to the end user we'll also give them a default (either a state or perhaps a "select one" prompt). One test would be changing the country and validating that the list of states updated correctly. Another test, important for data-binding, is to ensure that setting the country raises both the country and the state property changed events. If someone were to come along and change the setter to the private property, the property change notification would fail to fire and we'd lose our data-binding.

To test is straightforward: set up the view model, register to the event, then listen and confirm we get the properties we are looking for. Implementing it is another story. The original syntax would involve registering a lambda that adds to a list, then waiting for a particular time (to make sure all events have fired) and then check the list. There is no "clean" way to do it without timers or a counter or making assumptions about the order the properties may come in.

The Reactive Extensions simplify this tremendously. Take a look at the following code:

public void TestChangeCurrentCountry_RaisesPropertyChanged()
{
    var properties = new List<string>();
    var expected = new[] {"CurrentCountry", "CurrentState"};

    // set it up
    _target.OnImportsSatisfied();

    var propertyChanged = (from evt 
      in Observable.FromEvent<PropertyChangedEventArgs>(_target, "PropertyChanged")
      select evt.EventArgs.PropertyName).Take(2);
                
    using (propertyChanged.Subscribe(
      properties.Add,
      ex => Assert.Fail(ex.Message),
      () =>
         {                                                 
            CollectionAssert.AreEquivalent(expected, properties,
            "Invalid properties were raised on the notify change event.");
            EnqueueTestComplete();
         }))
    {
        _target.CurrentCountry = _countries[1];
    }
}

What's going on here? I set up a list to grab properties as they are raised, and I initialize a list of expected properties. This makes the conditions of the test very clear. I'm using MEF, so what happens in unit tests is that I set up the view model with a bunch of mock objects to return my data: i.e. "if you ask for a country, I'll give you this" and so forth. I explicitly call "OnImportsSatisfied" to simulate what would happen if I were actually composing the view model with MEF (in this case, however, my test is composing it).

In this case, the view model will call the country service to populate the list, then the state service, and set defaults. All of these are fed based on the mock configurations.

Once we have the view model prepared, the Rx steps in. I set up an observable collection that is based on the property changed event. Notice we pass the expected EventArgs, the object to inspect and the event to listen for. The power of Rx is that it will asynchronously build the list for me as the event is raised, in this case returning the property names (because of my select statement). I am only expecting 2, so I can use the standard LINQ "Take(2)" for the list to stop after the second property changed event (otherwise, it would continue listening and building out the list ad infinitum).

The using statement provides an action as items are added (note I'm using a method group to specify that the string is passed to the add method of the properties list.) If an exception is thrown, I'll fail the test gracefully with an assert. The last parameter is the action to perform when complete, which is after both properties have been added. Here, I compare the two lists and let the Silverlight Unit Testing Framework know that the test is complete.

The entire statement wraps the functionality I'll use to trigger the behaviors - in this case, changing the country to set off the property change notification.

While this is a very simple example, I encourage you to download the hands on labs to see more complex examples that compose multiple asynchronous events together in a single collection. It's a powerful framework that deserves a close look!

Jeremy Likness