编程语言
首页 > 编程语言> > c#-如何使用Reactive Extensions(Rx.Net)等待值或直到经过固定时间

c#-如何使用Reactive Extensions(Rx.Net)等待值或直到经过固定时间

作者:互联网

我想等待(阻塞)线程,直到某个时间过去或另一个流泵入一个值,我以为以下方法可以实现此目的,但是由于第一个流为空,因此它引发了异常,

 // class level subject manipulated by another thread...
 _updates = new Subject<Unit>();
 ...
 // wait for up to 5 seconds before carrying on...    
 var result = Observable.Timer(DateTime.Now.AddSeconds(5))
    .TakeUntil(_updates)
    .Wait();

如何获得长达5秒钟的阻塞能力,或者直到另一股视频流产生一个值为止?

解决方法:

您可以像这样使用Observable.Timeout:

 var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait();

我使用Take(1),因为超时期望序列完成,而不仅仅是产生下一个值.在超时时,它将抛出System.TimeoutException.

如果您不希望出现异常,则可以使用Catch来提供一些值:

var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5))
    .Catch(Observable.Return(default(Unit))).Wait();
// should catch specific exception, not all

如果您的单位确实是@Shlomo提到的rx单位-您可以这样更改:

var result = _updates.Select(c => (Unit?) c).Take(1)
    .Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait();

还是照常捕获该异常.

标签:system-reactive,c
来源: https://codeday.me/bug/20191111/2020462.html