TransWikia.com

How to Separate IObservable and IObserver

Stack Overflow Asked by Christian Findlay on December 16, 2020

Update: check out the example at the bottom

I need to message between classes. The publisher will loop indefinitely, call some method to get data, and then pass the result of that call into OnNext. There can be many subscribers, but there should only ever be one IObservable, and one long-running task. Here is an implementation.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            var subject = new Subject<string>();

            //Create a class and inject the subject as IObserver
            new Publisher(subject);

            //Create a class and inject the subject as IObservable
            new Subscriber(subject, 1.ToString());
            new Subscriber(subject, 2.ToString());
            new Subscriber(subject, 3.ToString());

            //Run the loop for 3 seconds
            await Task.Delay(3000);
        }

        class Publisher
        {
            public Publisher(IObserver<string> observer)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        observer.OnNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            public string Name;

            //Listen for OnNext and write to the debug window when it happens
            public Subscriber(IObservable<string> observable, string name)
            {
                Name = name;
                var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
            }
        }
    }
}

Output:

Name: 1 Message: Hi

Name: 2 Message: Hi

Name: 3 Message: Hi

Name: 1 Message: Hi

Name: 2 Message: Hi

Name: 3 Message: Hi

This works fine. Notice that only one IObserver sends messages, but all subscriptions pick up the message. But, how do I separate the IObservable and the IObserver ? They are glued together as a Subject. Here is another approach.

[TestMethod]
public async Task RunMessagingAsync2()
{
    var observers = new List<IObserver<string>>();

    var observable = Observable.Create(
    (IObserver<string> observer) =>
    {
        observers.Add(observer);

        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    observer.OnNext(GetSomeData());
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }

                await Task.Delay(500);
            }
        });

        return Disposable.Create(() => { });
    });

    //Create a class and inject the subject as IObservable
    new Subscriber(observable);
    new Subscriber(observable);

    //Run the loop for 10 seconds
    await Task.Delay(10000);

    Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}

The problem here is that this creates two separate Tasks and two separate IObservers. Every subscription creates a new IObserver. You can confirm that because the Assert here fails. This doesn’t really make any sense to me. From what I understand of Reactive programming, I wouldn’t expect the Subscribe method here to create a new IObserver each time. Check out this gist. It is a slight modification of the Observable.Create example. It shows how the Subscribe method causes an IObserver to be created each time it is called. How can I achieve the functionality from the first example without using a Subject?

Here is another approach that does not use Reactive UI at all… You could create a Subject from the publisher if you want to, but it is not necessary.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";
   
        class Publisher
        {
            public Publisher(Action<string> onNext)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data, publish it with OnNext and wait 500 milliseconds
                        onNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            //Listen for OnNext and write to the debug window when it happens
            public void ReceiveMessage(string message) => Debug.WriteLine(message);
        }

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            //Create a class and inject the subject as IObservable
            var subscriber = new Subscriber();

            //Create a class and inject the subject as IObserver
            new Publisher(subscriber.ReceiveMessage);

            //Run the loop for 10 seconds
            await Task.Delay(10000);
        }
    }
}

Lastly, I should add that ReactiveUI used to have a MessageBus class. I’m not sure if it got removed or not, but it is no longer recommended. What do they suggest we use instead?

Working Example

This version is correct. I guess the only thing I’m asking now is how do I do the equivalent of this with Observable.Create? The problem with Observable.Create is that it runs the action for each subscription. That is not the intended functionality. The long running task here only runs once no matter how many subscriptions there are.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable, string name)
        {
            Name = name;
            var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    internal class BasicObservable<T> : IObservable<T>
    {
        List<IObserver<T>> _observers = new List<IObserver<T>>();

        public BasicObservable(
            Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default
            ) =>

            Task.Run(async () =>
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
                        var data = getData();
                        _observers.ForEach(o => o.OnNext(data));
                    }
                    catch (Exception ex)
                    {
                        _observers.ForEach(o => o.OnError(ex));
                    }
                }

                _observers.ForEach(o => o.OnCompleted());

            }, cancellationToken);

        public IDisposable Subscribe(IObserver<T> observer)
        {
            _observers.Add(observer);
            return Disposable.Create(observer, (o) => _observers.Remove(o));
        }
    }

    public static class ObservableExtensions
    {
        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, default, cancellationToken);

        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,
            TimeSpan? interval = null,
            CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData, interval, cancellationToken);
    }

    [TestClass]
    public class UnitTest1
    {
        string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            Func<string> getData = GetData;

            var publisher = getData.CreateObservable(cancellationToken);

            new Subscriber(publisher, "One");
            new Subscriber(publisher, "Two");

            for (var i = 0; true; i++)
            {
                if (i >= 5)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }
        }
    }

}

3 Answers

At first you must familiarize yourself with the theory of "cold" and "hot" observables. Here is the definition from the Introduction to RX.

  1. Cold are sequences that are passive and start producing notifications on request (when subscribed to).
  2. Hot are sequences that are active and produce notifications regardless of subscriptions.

What you want is a hot observable, and the problem is that the Observable.Create method creates cold observables. But you can make any observable hot by using the Publish operator. This operator provides a way to have a single underlying subscription shared by multiple independent observers. Example:

int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
    _ = Task.Run(async () =>
    {
        while (true)
        {
            observer.OnNext(++index);
            await Task.Delay(1000);
        }
    });
    return Disposable.Empty;
});

IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop

hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));

The coldObservable created by the Observable.Create is subscribed when the hotObservable.Connect method is invoked, and then all notifications generated by that single subscription are propagated to all subscribers of the hotObservable.

Output:

Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...

Important: the purpose of the example above is to demonstrate the Publish operator, and not to serve as an example of good quality RX code. One of its problems is that by subscribing the observers after connecting to the source becomes theoretically possible that the first notification will not be send to some or all of the observers, because it may be created before their subscription. There is a race condition in other words.

There is an alternative method of managing the lifetime of an IConnectableObservable, the operator RefCount:

Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.

var hotObservable = coldObservable.Publish().RefCount();

This way you don't need to Connect manually. The connection occurs automatically with the first subscription, and it is disposed automatically with the last unsubscription.

Correct answer by Theodor Zoulias on December 16, 2020

I've added this as an answer because I feel that the code that Christian posted in his answer is dangerous as it's mixing Tasks and Rx and there are race conditions.

Here's an alternative that fixes most of these issues:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IDisposable Subscriber(IObservable<string> observable, string name) =>
        observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData());
                
        var publisher = coldObservable.Publish();

        var subscriptions =
            new CompositeDisposable(
                Subscriber(publisher, "One"),
                Subscriber(publisher, "Two"),
                publisher.Connect());

        await Task.Delay(TimeSpan.FromSeconds(5.0));

        subscriptions.Dispose();
    }
}

Better yet, though, I would look at doing it this way:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IObservable<string> Subscriber(IObservable<string> observable, string name) =>
        observable.Select(s => $"Name: {name} Message: {s}");
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData())
                .Do(_ => Debug.WriteLine("Called GetData()"))
                .Publish(published =>
                    Observable
                        .Merge(
                            Subscriber(published, "One"),
                            Subscriber(published, "Two")))
                .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
                .Do(x => Debug.WriteLine(x));
    
        await coldObservable;
    }
}

It's always best to use the inbuilt operators for Rx rather than hybrid approaches with tasks.

Answered by Enigmativity on December 16, 2020

Thanks to the answer above, I eventually got the desired result without having to implement IObservable. Theodor was correct. The answer was to convert the IObservable to hot with the Publish() method.

I wrote an article about this here

While this works, Enigmativity's answer above is far better.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Observables
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable, string name)
        {
            Name = name;
            observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    [TestClass]
    public class UnitTest1
    {
        static string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            var coldObservable = Observable.Create<string>(observer =>
            {
                _ = Task.Run(async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var data = GetData();
                        observer.OnNext(data);
                        await Task.Delay(1000);
                    }
                }, cancellationToken);

                return Disposable.Empty;
            });

            var publisher = coldObservable.Publish();
            var connection = publisher.Connect();

            new Subscriber(publisher, "One");
            new Subscriber(publisher, "Two");

            for (var i = 0; i < 5; i++)
            {
                if (i == 4)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }

            connection.Dispose();
        }
    }
}

Answered by Christian Findlay on December 16, 2020

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP