CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式
作者:互联网
这是我想做的事情:
>在需要处理的项目的内存中保持队列(即IsProcessed = 0)
>每5秒从db中获取未处理的项目,如果它们尚未在队列中,则添加它们
>从队列中连续提取项目,进行处理,并且每次处理项目时,都在db中对其进行更新(IsProcessed = 1)
>做到“尽可能平行”
我有一个服务的构造函数,例如
public MyService()
{
Ticker.Elapsed += FillQueue;
}
当服务启动时,我启动计时器
protected override void OnStart(string[] args)
{
Ticker.Enabled = true;
Task.Run(() => { ConsumeWork(); });
}
我的FillQueue就像
private static async void FillQueue(object source, ElapsedEventArgs e)
{
var items = GetUnprocessedItemsFromDb();
foreach(var item in items)
{
if(!Work.Contains(item))
{
Work.Enqueue(item);
}
}
}
我的ConsumeWork就像
private static void ConsumeWork()
{
while(true)
{
if(Work.Count > 0)
{
var item = Work.Peek();
Process(item);
Work.Dequeue();
}
else
{
Thread.Sleep(500);
}
}
}
但是,这可能是一个幼稚的实现,并且我想知道.NET是否具有正是这种情况所需的任何类型的类.
解决方法:
尽管@JSteward的答案是一个好的开始,但是您可以使用mixing up TPL-Dataflow和Rx.NET
extensions进行改进,因为数据流块很容易成为数据的观察者,而使用Rx
Timer则可以为您省去很多麻烦(Rx.Timer
explanation) .
我们可以根据您的需要调整MSDN article,如下所示:
private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;
var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));
var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
IDisposable subscription = source.Subscribe(action.AsObserver());
另外,您对已处理项目的检查也不是很准确,因为有可能在完成处理时从db中选择未处理项目,但未在数据库中对其进行更新.在这种情况下,项目将从Queue< T>中删除,然后由生产者再次添加到队列中,这就是为什么我将ConcurrentBag<T>
添加到此解决方案中(HashSet< T>不是线程安全的):
private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}
_processedItemIds.Add(item.Id);
// actual work here
// save item as processed in database
// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));
// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}
public class Item
{
public Guid Id { get; set; }
}
// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();
private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");
// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}
您可以通过其他方式实现缓存清除,例如,启动“ GC”计时器,该计时器将定期从缓存中删除已处理的项目.
要停止事件和处理项目,您应该处置订阅,并可能完成ActionBlock:
subscription.Dispose();
action.Complete();
您可以在他们的guidelines on github中找到有关Rx.Net的更多信息.
标签:asynchronous,windows-services,tpl-dataflow,c,net 来源: https://codeday.me/bug/20191111/2020868.html