c# – 使用AsObservable观察TPL数据流块而不消耗消息
作者:互联网
我有一系列TPL Dataflow块,希望观察系统内部的某些进展.
我知道我可以将TransformBlock堵塞到我想要观察的网格中,让它发布到某种程度的进度更新程序,然后将消息保持不变到下一个块.我不喜欢这个解决方案,因为块的纯粹是因为它的副作用,我还必须在任何我想要观察的地方改变块链接逻辑.
所以我想知道我是否可以使用ISourceBlock< T> .AsObservable来观察网格内的消息传递而不改变它并且不消耗消息.如果有效,这似乎是一个更纯粹,更实用的解决方案.
根据我对Rx的有限理解,这意味着我需要observable是热而不是冷,以便我的进度更新程序看到消息但不消耗它.而.Publish().RefCount()似乎是让观察变得热的方式.但是,它根本无法正常工作 – 而是阻止每个消息接收和消耗每个消息.
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});
// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));
// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
block1.Post(v);
}
block1.Complete();
结果是不确定的,但我得到的东西是这样的:
block2:21
progress:22
progress:24
block2:23
progress:25
那么,我做错了什么,或者由于TPL Dataflow AsObservable的实现方式,这是不可能的?
我意识到我还可以使用Observable / Observer对替换block1和block2之间的LinkTo,这可能有效,但LinkTo与下游BoundedCapacity = 1是我首先使用TPL Dataflow的全部原因.
编辑:
一些澄清:
>我打算在block2中设置BoundedCapacity = 1.虽然在这个简单的例子中没有必要,但下游约束的情况是我发现TPL Dataflow非常有用的地方.
>为了澄清我在第二段中拒绝的解决方案,将添加在block1和block2之间链接的以下块:
var progressBlock = new TransformBlock< int,int>(i => {SomeUpdateProgressMethod(i); return i;});
>我还想保持背压,以便如果进一步的上游块将工作分配给block1以及其他等效工作者,如果该链已经忙,它将不会将工作发送到block1.
解决方法:
您的代码的问题在于您正在连接两个block1的使用者.然后,数据流只是给出了消费者首先存在的价值.
因此,您需要将block1中的值广播到另外两个块中,然后才能独立使用这些值.
只是旁注,不要这样做.发布().RefCount(),因为它没有按照你的想法去做.它将有效地使一个运行只能观察到,在一次运行期间将允许多个观察者连接并查看相同的值.它与数据源无关,也与Dataflow块的交互方式无关.
试试这段代码:
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);
// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));
// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
block1.Post(v);
}
block1.Complete();
这给了我:
block2:21 block2:22 block2:23 block2:24 block2:25 progress:21 progress:22 progress:23 progress:24 progress:25
这就是我认为你想要的.
现在,除此之外,使用Rx可能是一个更好的选择.它比任何TPL或Dataflow选项更强大,更具说服力.
您的代码归结为:
Observable
.Range(1, 5)
.Select(i => i + 20)
.Do(i => Debug.Print("progress:" + i.ToString()));
.Subscribe(i => Debug.Print("block2:" + i.ToString()));
这几乎可以给你相同的结果.
标签:c,net,system-reactive,tpl-dataflow,rx-net 来源: https://codeday.me/bug/20190622/1264859.html