Vaughan Reid's blog

Rx as part of your standard toolkit

For years I heard about RX in .NET but never really found a compelling reason to use it. About a year ago we started using it in my team and let me just say that things have changed. Now I don’t think theres a compelling reason NOT to use it as part of your standard toolset. Its a great tool with a really interesting way to look at things but I'd argue that you should strongly consider using it whenever you are making multi-threaded applications. The reason is that it has a really nice solution to one of the biggest issues with multi-threaded code. Testability. Without a test proving a piece of work can you really say its working as expected? What happens if your elegant code has a bug? How sure are you that your fix to the bug isn't breaking something else? I'm not saying that Rx can/should be used for all multi-threaded code but since its built with testing in mind its a very useful tool in the belt. Let me show you an example.


public class MultithreadTest
 {
      public void RunWithTpl()
     {
         var taskFactory = new TaskFactory();
         taskFactory.StartNew(DoWork,TaskCreationOptions.LongRunning);
     }

     public void RunWithRx(IScheduler scheduler)
     {
         var stream = CreateStream()
                      .ObserveOn(scheduler)
                      .Subscribe(_ => DoWork());
     }

     private IObservable CreateStream()
     {
         return Observable.Create(obs=>
         {
             obs.OnNext(Unit.Default);

             return Disposable.Empty;
         });
     }

     public void DoWork()
     {
         Console.WriteLine(Starting work);
         Thread.Sleep(TimeSpan.FromSeconds(2));
         Console.WriteLine(Finished work);
     }
 }

So this is my class I'll use to test. A few things to note.

  1. RunWithTpl just starts a new task that will run the DoWork method on a new thread.

  2. The RunWithRx example is a little confusing at first and there are many ways to do it. In this example I just created a stream that returns an Unit class once and then finishes. The part to take note of is the IScheduler interface that is injected. The DoWork method will execute with the scheduler that I inject. This is the secret sauce of multithread code in rx!

So the Tpl example is run as:


MultithreadTest test = new MultithreadTest();
test.RunWithTpl();

It works as expected. It runs the code on a new thread. You will remember that I injected a scheduler into the Rx method. This is where it gets interesting. The scheduler implementation will determine how the thread is run. So for example, if I want just want a new thread I can use the Eventloop scheduler.


MultithreadTest test = new MultithreadTest();
IScheduler scheduler = new EventLoopScheduler();
test.RunWithRx(scheduler);

What about if we decide that we don't want to run in a new thread but on the current thread. You can use

MultithreadTest test = new MultithreadTest();
IScheduler scheduler = Scheduler.Immediate;
test.RunWithRx(scheduler);

Or the threadpool

MultithreadTest test = new MultithreadTest();
IScheduler scheduler = Scheduler.ThreadPool;
test.RunWithRx(scheduler);

This is great but you ask the question. How do I test this? The Microsoft.Reactive.Testing package gives you what you need to do this.

MultithreadTest test = new MultithreadTest();
TestScheduler scheduler = new TestScheduler();
test.RunWithRx(scheduler);
scheduler.AdvanceBy(1);

In your tests you will rather use the TestScheduler instead and use the AdvanceBy method to kick the thread into action. I'll write other posts about what else you can do with the TestScheduler but as you can see this abstracts the threading from the code and allows you to test.