C# 流水线 生产者/消费者链 Producer/Consumer
作者:互联网
<body>
-
manager.cs
using System; using System.Collections.Concurrent; using System.Threading; using bntu.pcm.plworker; using bntu.pcm.works; /* * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/ * pcm 是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的 */ namespace bntu.pcm { /// <summary> /// 管理流水线工程(定义流水线,添加流程) /// </summary> class Manager { public static void Main(string[] args) { // 定义流水线工程,并为其添加流程 Pipleline pipleline = new Pipleline(new BlockingCollection<string>()); pipleline.AddItem<string, int>(ReadGrayImage.Read); pipleline.AddItem<int, double>(GenerateDeepthImage.Generate); pipleline.AddItem<double, double>(CalculateDeepthImage.Calculate); // 将整个流水线作为后台线程(这是因为前台线程是整个流水线的输入) Thread thread = new Thread(() => pipleline.PiplelineWork()); thread.IsBackground = true; thread.Start(); // 整个流水线的输入作为前台线程(这个while循环模拟相机不断输出图像) string image_path = GetImagePath(); while (image_path != null) { // 为整个流水线的输入缓冲区添加元素 pipleline.HIB.Add(image_path); image_path = GetImagePath(); } } // 模拟图片的编号 private static int i = 0; /// <summary> /// 用于模拟相机输出图像 /// </summary> /// <returns>模拟图像</returns> public static string GetImagePath() { return i++.ToString(); } } }
-
PiplelineItem.cs
using System; using System.Collections.Concurrent; /* * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/ * pcm 是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的 * plworker 是pipleline worker的缩写,是“生/消”模型的特殊形式——即流水线模型 */ namespace bntu.pcm.plworker { /// <summary> /// 流水线上的流程定义 /// </summary> /// <typeparam name="INPUT">该流程的输入缓冲区的类型</typeparam> /// <typeparam name="OUTPUT">该流程的输出缓冲区的类型</typeparam> public class PiplelineItem<INPUT, OUTPUT> { // 该流程的输出缓冲区 public BlockingCollection<OUTPUT> output; public BlockingCollection<OUTPUT> Output { get => output; } /// <summary> /// 流程的构造函数 /// </summary> public PiplelineItem() { // 流程的输出缓冲区由其自己定义,然后为后一流程提供一个作为输入缓冲区的接口 this.output = new BlockingCollection<OUTPUT>(); } /// <summary> /// 该流程的操作过程(从输入缓冲区取走,然后处理后放入输出缓冲区) /// </summary> /// <param name="input_buffer">该流程的输入缓冲区(上一流程的输出缓冲区)</param> /// <param name="handle">该流程中的具体操作</param> private void Action(BlockingCollection<INPUT> input_buffer, Func<INPUT, OUTPUT> handle) { try { // 从输入缓冲区中取走元素 foreach (var item in input_buffer.GetConsumingEnumerable()) { // 经过指定操作后放入输出缓冲区 this.output.Add(handle(item)); } } finally { // 当输入缓冲区取空了就需要告知输出缓冲区没有新的元素进入,否则线程无法结束 this.output.CompleteAdding(); } } /// <summary> /// 获取该流程的动作(也就是操作或者称为任务) /// </summary> /// <param name="input_buffer">该流程的输入缓冲区(上一流程的输出缓冲区)</param> /// <param name="handle">该流程中的具体操作</param> /// <returns>该流程的任务</returns> public Action GetPiplelineItemAction(object input_buffer, object handle) { // 在该pipleline_item中包含了input的信息,所以在PiplelineItem可以进行类型转换,而在Pipleline中就不行 BlockingCollection<INPUT> _input_buffer = input_buffer as BlockingCollection<INPUT>; Func<INPUT, OUTPUT> _handle = handle as Func<INPUT, OUTPUT>; // this指针包含了模板信息,所以action就固定成Task了,而不再是Pipleline中的Task<dynamic> return () => this.Action(_input_buffer, _handle); } } }
-
Pipleline.cs
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; /* * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/ * pcm 是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的 * plworker 是pipleline worker的缩写,是“生/消”模型的特殊形式——即流水线模型 */ namespace bntu.pcm.plworker { /// <summary> /// 流水线的结构定义 /// </summary> public class Pipleline { // 整个流水线的输入缓冲区 private dynamic head_input_buffer; // 流水线上的所有流程组成的列表,各个流程中执行的操作组成的刘表 private List<object> pipleline_item_list, handle_list; public dynamic HIB { get => head_input_buffer; } /// <summary> /// 流水线构造函数 /// </summary> /// <param name="head_input_buffer">第一个输入缓冲区,也是整个流水线的输入缓冲区</param> public Pipleline(dynamic head_input_buffer) { this.head_input_buffer = head_input_buffer; this.pipleline_item_list = new List<object>(); this.handle_list = new List<object>(); } /// <summary> /// 为流水线添加一个流程 /// </summary> /// <typeparam name="INPUT">输入缓冲区的类型</typeparam> /// <typeparam name="OUTPUT">输出缓冲区的类型</typeparam> /// <param name="handle">该流程需要执行的操作</param> public void AddItem<INPUT, OUTPUT>(Func<INPUT, OUTPUT> handle) { this.pipleline_item_list.Add(new PiplelineItem<INPUT, OUTPUT>()); this.handle_list.Add(handle); } /// <summary> /// 构建流水线工程(也就是把流水线的各个结点都联系起来) /// </summary> public void PiplelineWork() { // 缓冲区列表,任务列表,任务工厂 var buffer_list = new List<object>(); var task_list = new List<Task>(); var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); // 先将整个流水线的输入缓冲区加入流水线再说 buffer_list.Add(this.head_input_buffer); // 浅浅用一波zip函数(没啥实质性用处,主要是方便) foreach ((dynamic pipleline_item, dynamic handle) in this.pipleline_item_list.Zip(this.handle_list, (pipleline_item, handle) => new KeyValuePair<dynamic, dynamic>(pipleline_item, handle))) { // 获取该流程的输入缓冲区(其实就是上一个流程的输出缓冲区) var input_buffer = buffer_list.Last(); // 将流程需要执行的操作作为一个新的任务,并将该线程(任务)加入任务列表 task_list.Add(taskFactory.StartNew(pipleline_item.GetPiplelineItemAction(input_buffer, handle))); // 将该流程的输出缓冲区加入缓冲区列表,作为后一个流程的输入缓冲区 buffer_list.Add(pipleline_item.Output); } // 等待所有所需的结果(就是已经结束咧) Task.WaitAll(task_list.ToArray()); } } }
-
CalculateDeepthImage.cs
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; /* * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/ * pcm 是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的 * works 是流水线的各个流程 */ namespace bntu.pcm.works { /// <summary> /// 模拟进行深度图的计算 /// </summary> class CalculateDeepthImage { public static double Calculate(double d) { Console.WriteLine("计算深度图像:" + d); Task.Delay(new Random().Next(300)); return d; } } }
-
GenerateDeepthImage.cs
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; /* * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/ * pcm 是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的 * works 是流水线的各个流程 */ namespace bntu.pcm.works { /// <summary> /// 模拟通过灰度图像产生深度图 /// </summary> class GenerateDeepthImage { public static double Generate(int i) { Console.WriteLine("产生深度图像:" + i * 1.0); Task.Delay(new Random().Next(200)); return i * 1.0; } } }
-
ReadGrayImage.cs
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; /* * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/ * pcm 是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的 * works 是流水线的各个流程 */ namespace bntu.pcm.works { /// <summary> /// 模拟读取灰度图像 /// </summary> class ReadGrayImage { public static int Read(string s) { Console.WriteLine("读取灰度图像:" + s); Task.Delay(new Random().Next(100)); return Convert.ToInt32(s); } } }
标签:Producer,C#,流程,System,流水线,缓冲区,using,Consumer,pipleline 来源: https://www.cnblogs.com/SimbaWang/p/16676365.html