C#-ReactiveExtensions BufferWithPredicate
作者:互联网
Rx具有BufferWithTime,BufferWithCount和BufferWithTimeOrCount的方法,我想编写一个BufferWithPredicate方法,如下所示:
public IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)
本质上,除非谓词返回false,否则新项目将添加到现有缓冲区中,在这种情况下,将返回缓冲区并启动新的缓冲区.谓词将下一项和缓冲区作为参数.
我该如何实现?
解决方法:
这应该为您做.我正在使用Observable.Defer,以便它也可以与冷的Observables一起使用:
public static class MyObservableExtensions
{
public static IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)
{
return Observable.Defer(() =>
{
var result = new Subject<IList<T>>();
var list = new List<T>();
input.Subscribe(item =>
{
if (predicate(item, list))
{
list.Add(item);
}
else
{
result.OnNext(list);
list = new List<T>();
list.Add(item);
}
},
() => result.OnNext(list));
return result;
});
}
}
用法:
var observable = new[] { 2, 4, 6, 8, 10, 12, 13, 14, 15 }.ToObservable();
var result = observable.BufferWithPredicate((item, list) => item % 2 == 0);
result.Subscribe(l => Console.WriteLine("New list arrived. Count = {0}", l.Count));
输出:
"New list arrived. Count = 6"
"New list arrived. Count = 3"
标签:system-reactive,c 来源: https://codeday.me/bug/20191208/2090822.html