编程语言
首页 > 编程语言> > c# – 使用线程和EventWaitHandle的生产者/消费者模式

c# – 使用线程和EventWaitHandle的生产者/消费者模式

作者:互联网

我想这是一种代码审查,但这是我对生产者/消费者模式的实现.我想知道的是,在某种情况下,ReceivingThread()或SendingThread()方法中的while循环可能会停止执行.请注意EnqueueSend(DataSendEnqeueInfo信息)是从多个不同的线程调用的,我可能不能在这里使用任务,因为我必须在一个单独的线程中使用命令.

private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;

private void ReceivingThread()
{
    while (mIsRunning)
    {
        mRcWaitHandle.WaitOne();
        DataRecievedEnqeueInfo item = null;
        while (mReceivingThreadQueue.Count > 0)
        {
            lock (mReceivingQueueLock)
            {
                item = mReceivingThreadQueue.Dequeue();
            }
            ProcessReceivingItem(item);
        }
        mRcWaitHandle.Reset();
    }
}

private void SendingThread()
{
    while (mIsRunning)
    {
        mSeWaitHandle.WaitOne();
        while (mSendingThreadQueue.Count > 0)
        {
            DataSendEnqeueInfo item = null;
            lock (mSendingQueueLock)
            {
                item = mSendingThreadQueue.Dequeue();
            }
            ProcessSendingItem(item);
        }
        mSeWaitHandle.Reset();
    }
}

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
    lock (mReceivingQueueLock)
    {
        mReceivingThreadQueue.Enqueue(info);
        mRcWaitHandle.Set();
    }
}

public void EnqueueSend(DataSendEnqeueInfo info)
{
     lock (mSendingQueueLock)
    {
        mSendingThreadQueue.Enqueue(info);
        mSeWaitHandle.Set();
    }
}

P.S这里的想法是当队列为空时使用WaitHandles将线程置于休眠状态,并在新项目入队时发出信号以启动它.

UPDATE
对于可能尝试使用TPL或任务实现生产者/消费者模式的人,我将离开这个https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/.

解决方法:

就个人而言,对于简单的生产者 – 消费者问题,我只使用BlockingCollection.不需要手动编写自己的同步逻辑.如果队列中没有项目,则消费线程也将阻塞.

如果您使用此类,以下是您的代码:

private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>();
private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>();

public void Stop()
{
    // No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls
    // below to complete.
    mReceivingThreadQueue.CompleteAdding();
    mSendingThreadQueue.CompleteAdding();
}

private void ReceivingThread()
{
    foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable())
    {
        ProcessReceivingItem(item);
    }
}

private void SendingThread()
{
    foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable())
    {
        ProcessSendingItem(item);
    }
}

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
    // You can also use TryAdd() if there is a possibility that you
    // can add items after you have stopped. Otherwise, this can throw an
    // an exception after CompleteAdding() has been called.
    mReceivingThreadQueue.Add(info);
}

public void EnqueueSend(DataSendEnqeueInfo info)
{
    mSendingThreadQueue.Add(info);
}

标签:c,multithreading,tpl-dataflow,blockingcollection,producer-consumer
来源: https://codeday.me/bug/20190611/1218014.html