其他分享
首页 > 其他分享> > 等待观察者完成不使用锁

等待观察者完成不使用锁

作者:互联网

我有一个运行有多个观察者的可观察间隔的应用程序.每间隔0.5s,就会从Web服务器加载一些XML数据,然后观察者在后台线程上进行一些特定于应用程序的处理.一旦不再需要数据,就可以取消订阅和可观察的间隔,因此不再调用观察者的OnNext / OnCompleted / one rror.到现在为止还挺好.

我的问题:在极少数情况下,有可能在调用Dispose之后,观察者的OnNext方法仍在运行!在处置后继续进行进一步的操作之前,我想确保OnNext已完成.

我当前的解决方案:我在观察者类中引入了一个locker字段(请参见代码).处置后,我尝试获取锁,并且仅在获取锁之后才继续.虽然此解决方案有效(?),但对我来说有点不对劲.

问题:是否有更优雅,更“ Rx方式”来解决此问题?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RxExperimental
{
    internal sealed class MyXmlDataFromWeb
    {
        public string SomeXmlDataFromWeb { get; set; }
    }

    internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
    {
        private readonly object _locker = new object();
        private readonly string _observerName;

        public MyObserver(string observerName) {
            this._observerName = observerName;
        }

        public object Locker {
            get { return this._locker; }
        }

        public void OnCompleted() {
            lock (this._locker) {
                Console.WriteLine("{0}: Completed.", this._observerName);
            }
        }

        public void one rror(Exception error) {
            lock (this._locker) {
                Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
            }
        }

        public void OnNext(MyXmlDataFromWeb value) {
            lock (this._locker) {
                Console.WriteLine("  {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine("  {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
                Thread.Sleep(5000); // simulate some long running operation
                Console.WriteLine("  {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
            }
        }
    }

    internal sealed class Program
    {
        private static void Main() {
            const int interval = 500;
            //
            var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
                var data = new MyXmlDataFromWeb {
                    SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
                };
                return data;
            }).Publish();
            //
            var observer1 = new MyObserver("Observer 1");
            var observer2 = new MyObserver("Observer 2");
            //
            var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
            var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
            //
            var connection = dataSource.Connect();
            //
            Console.WriteLine("Press any key to cancel ...");
            Console.ReadLine();
            //
            subscription1.Dispose();
            subscription2.Dispose();
            connection.Dispose();
            //
            lock (observer1.Locker) {
                Console.WriteLine("Observer 1 completed.");
            }
            lock (observer2.Locker) {
                Console.WriteLine("Observer 2 completed.");
            }
            //
            Console.WriteLine("Can only be executed, after all observers completed.");
        }
    }
}

解决方法:

是的,还有更多的接收方法.

第一个观察结果是,取消订阅可观察流实际上与观察者中正在发生的事情无关.确实没有任何反馈.由于您需要确定何时结束观察,因此需要将其建模到可观察的流中.换句话说,您应该完成流,而不是取消订阅流,以便能够观察OnComplete事件.在您的情况下,您可以使用TakeUntil结束可观察对象,而不是取消订阅.

第二个观察结果是您的主程序需要观察“观察者”何时完成工作.但是,由于您使“观察者”成为实际的IObservable,因此您实际上没有办法做到这一点.当人们首次开始使用Rx时,这是常见的混淆源.如果将“观察者”建模为可观察链中的另一个链接,则您的主程序可以观察者.具体来说,您的“观察者”无非是将输入的Xml数据映射为“完成”消息的映射操作(具有副作用).

因此,如果您重构代码,就可以得到想要的…

public class MyObserver
{
    private readonly string _name;

    public MyObserver(string name) { _name = name; }

    public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source)
    {
        return source.Select(value =>
        {
            Thread.Sleep(5000); // simulate work
            return Unit.Default;
        });
    }
}

// main
var endSignal = new Subject<Unit>();
var dataSource = Observable
    .Interval(...)
    .Select(...)
    .TakeUntil(endSignal)
    .Publish();
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
var results1 = observer1.Handle(dataSource.ObserveOn(...));
var results2 = observer2.Handle(dataSource.ObserveOn(...));
// since you just want to know when they are all done
// just merge them.
// use ToTask() to subscribe them and collect the results
// as a Task
var processingDone = results1.Merge(results2).Count().ToTask();

dataSource.Connect();

Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();

// end the stream
endSignal.OnNext(Unit.Default);

// wait for the processing to complete.
// use await, or Task.Result
var numProcessed = await processingDone;

标签:multithreading,system-reactive,c
来源: https://codeday.me/bug/20191120/2047591.html