系统相关
首页 > 系统相关> > CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式

CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式

作者:互联网

这是我想做的事情:

>在需要处理的项目的内存中保持队列(即IsProcessed = 0)
>每5秒从db中获取未处理的项目,如果它们尚未在队列中,则添加它们
>从队列中连续提取项目,进行处理,并且每次处理项目时,都在db中对其进行更新(IsProcessed = 1)
>做到“尽可能平行”

我有一个服务的构造函数,例如

public MyService()
{
    Ticker.Elapsed += FillQueue;
}

当服务启动时,我启动计时器

protected override void OnStart(string[] args)
{
    Ticker.Enabled = true;
    Task.Run(() => { ConsumeWork(); });
}

我的FillQueue就像

private static async void FillQueue(object source, ElapsedEventArgs e)   
{
    var items = GetUnprocessedItemsFromDb();
    foreach(var item in items)
    {
        if(!Work.Contains(item))
        {
            Work.Enqueue(item);
        }   
    }
}

我的ConsumeWork就像

private static void ConsumeWork()
{
    while(true)
    {
        if(Work.Count > 0)
        {
            var item = Work.Peek();
            Process(item);
            Work.Dequeue();
        }
        else
        {
            Thread.Sleep(500);
        }
    }
}

但是,这可能是一个幼稚的实现,并且我想知道.NET是否具有正是这种情况所需的任何类型的类.

解决方法:

尽管@JSteward的答案是一个好的开始,但是您可以使用mixing up TPL-Dataflow和Rx.NET extensions进行改进,因为数据流块很容易成为数据的观察者,而使用Rx Timer则可以为您省去很多麻烦(Rx.Timer explanation) .

我们可以根据您的需要调整MSDN article,如下所示:

private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;

var source =
    // sequence of Int64 numbers, starting from 0
    // https://msdn.microsoft.com/en-us/library/hh229435.aspx
    Observable.Timer(
        // fire first event after 1 minute waiting
        TimeSpan.FromSeconds(DueIntervalInSeconds),
        // fire all next events each 5 seconds
        TimeSpan.FromSeconds(EventIntervalInSeconds))
    // each number will have a timestamp
    .Timestamp()
    // each time we select some items to process
    .SelectMany(GetItemsFromDB)
    // filter already added
    .Where(i => !_processedItemIds.Contains(i.Id));

var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
    {
        // we can start as many item processing as processor count
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

IDisposable subscription = source.Subscribe(action.AsObserver());

另外,您对已处理项目的检查也不是很准确,因为有可能在完成处理时从db中选择未处理项目,但未在数据库中对其进行更新.在这种情况下,项目将从Queue< T>中删除,然后由生产者再次添加到队列中,这就是为什么我将ConcurrentBag<T>添加到此解决方案中(HashSet< T>不是线程安全的):

private static async Task ProcessItem(Item item)
{
    if (_processedItemIds.Contains(item.Id))
    {
        return;
    }

    _processedItemIds.Add(item.Id);
    // actual work here

    // save item as processed in database

    // we need to wait to ensure item not to appear in queue again 
    await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));

    // clear the processed cache to reduce memory usage
    _processedItemIds.Remove(item.Id);
}

public class Item
{
    public Guid Id { get; set; }
}

// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();

private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
    // log event timing
    Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");

    // return items from DB
    return new[] { new Item { Id = Guid.NewGuid() } };
}

您可以通过其他方式实现缓存清除,例如,启动“ GC”计时器,该计时器将定期从缓存中删除已处理的项目.

要停止事件和处理项目,您应该处置订阅,并可能完成ActionBlock:

subscription.Dispose();
action.Complete();

您可以在他们的guidelines on github中找到有关Rx.Net的更多信息.

标签:asynchronous,windows-services,tpl-dataflow,c,net
来源: https://codeday.me/bug/20191111/2020868.html