编程语言
首页 > 编程语言> > Python;异步处理我需要一个单独的线程吗?

Python;异步处理我需要一个单独的线程吗?

作者:互联网

从asyncore的文档中:https://docs.python.org/2/library/asyncore.html

import asyncore, socket

class HTTPClient(asyncore.dispatcher):

  def __init__(self, host, path):
      asyncore.dispatcher.__init__(self)
      self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
      self.connect( (host, 80) )
      self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path

  def handle_connect(self):
      pass

  def handle_close(self):
      self.close()

  def handle_read(self):
      print self.recv(8192)

  def writable(self):
      return (len(self.buffer) > 0)

  def handle_write(self):
      sent = self.send(self.buffer)
      self.buffer = self.buffer[sent:]

  client = HTTPClient('www.python.org', '/')
  asyncore.loop()

现在假设我们有:

def handle_read(self):
    data = self.recv(8192)
    //SOME REALLY LONG AND COMPLICATED THING

这是由于asyncore的轮询/选择方法在Asyncore本身中处理的,还是我需要这样做:

def handle_read(self):
    data = self.recv(8192)
    h = Handler(data)
    h.start()

class Handler(threading.Thread):
    def __init__(self, data):
        threading.Thread.__init__(self)
        self.data = data
    def run():
        //LONG AND COMPLICATED THING WITH DATA

如果我确实需要线程,启动后是否需要h.join()?这似乎可行,但是由于连接块,我不确定为什么.

解决方法:

简短的答案

Is this handled in Asyncore itself due to asyncore’s polling/select
methodlogy?

不,异步处理程序无法单独在handle_read()中处理长时间的阻塞任务,因为只有一个线程.该线程正在做一些漫长的工作,并且不能被同一线程中断.

但是,这种阻止实现是有意义的.唯一的问题是网络传输速度较慢.例如,如果长时间的任务需要1秒钟,则最大数据传输速率为每秒8192字节.尽管数据速率较慢,但是网络连接稳定并且可以正常工作.这由操作系统内核中的TCP协议实现处理.

…or do I need to do…? If I do need a thread, do I want h.join()
after start?

以上线程用法均无意义.但是,仍然可以使用帮助程序线程以最大速率下载数据并并行处理该数据,请参见下面的说明.

TCP协议

TCP提供可靠,有序和经过错误检查的流交付.

Data transfer:

Flow control — limits the rate a sender transfers data to guarantee
reliable delivery. The receiver continually hints the sender on how
much data can be received (controlled by the sliding window). When the
receiving host’s buffer fills, the next acknowledgment contains a 0 in
the window size, to stop transfer and allow the data in the buffer to
be processed.

When a receiver advertises a window size of 0, the sender stops
sending data and starts the persist timer. The persist timer is used
to protect TCP from a deadlock situation that could arise if a
subsequent window size update from the receiver is lost, and the
sender cannot send more data until receiving a new window size update
from the receiver. When the persist timer expires, the TCP sender
attempts recovery by sending a small packet so that the receiver
responds by sending another acknowledgement containing the new window
size.

因此,当由于handle_read()中的较长任务而无法从套接字读取数据时,套接字缓冲区将变满. TCP连接挂起,并且不接收任何新的数据包. recv()之后,可以接收新数据,因此将TCP ACK数据包发送到发送方以更新TCP窗口大小.

当设置限制数据传输速率时,使用文件下载器应用程序会观察到类似的行为.例如,如果将限制设置为1Kb / s,则下载程序可能每秒调用一次recv(1000).即使物理网络连接能够发送1Mb / s,也将仅接收1Kb / s.在这种情况下,可以通过tcpdump或Wireshark TCP零窗口数据包和TCP窗口更新数据包查看.

尽管应用程序可以处理长时间的阻塞任务,但通常将网络连接视为瓶颈.因此,最好尽快释放网络.

如果长时间的任务需要更长的时间,那么数据下载最简单的解决方案是下载所有内容,然后才处理下载的数据.但是,如果数据下载时间与数据处理任务的时间相称,可能是不可接受的.例如,如果下载并行执行,则2小时内可以完成2小时的下载.

每个数据块的线程

如果在handle_read()中创建了新线程,并且主线程不等待帮助线程完成(没有join()),则应用程序可能会创建大量线程.请注意,handle_read()每秒可能被调用数千次,如果每个长任务花费的时间超过了每秒,那么应用程序可能会创建数百个线程,最后它可能会被异常杀死.这种解决方案没有意义,因为无法控制线程数,而且这些线程处理的数据块也是随机的.函数recv(8192)最多接收8192字节,但它也可能接收较小的数据块.

每个数据块的线程并与主线程联接

创建线程并立即通过join()阻止主线程的执行没有任何意义,因为这样的解决方案并不比没有任何线程的初始解决方案更好.

一些帮助程序线程和后来的join()可用于并行执行某些操作.例如:

# Start detached thread
h.start()
# Do something in parallel to that thread
# ...
# Wait the thread to finish
h.join()

但是,这里不是这种情况.

持久性工作线程和生产者-消费者数据交换

可以创建一个负责数据处理的持久性工作线程(或多个使用所有CPU内核).它应该在asyncore.loop()之前启动,例如:

handler = Handler()
asyncore.loop()

现在,一旦处理程序线程准备就绪,它就可以获取所有下载的数据进行处理,同时主线程可以继续进行数据下载.当处理程序线程繁忙时,下载程序会将数据追加到其数据缓冲区.需要注意线程之间的正确同步:

>如果将下载的数据附加到下载程序缓冲区,则处理程序线程应等待,然后才能访问该缓冲区;
>如果处理程序正在读取缓冲区,则下载程序应等待,然后才能追加到缓冲区;
>如果处理程序无关,并且缓冲区为空,则应冻结并等待新的下载数据.

可以使用threading condition object和生产者-消费者示例来实现:

# create a new condition variable on __init__
cv = threading.Condition()

# Consume one item by Handler
cv.acquire()
while not an_item_is_available():
    cv.wait()
get_an_available_item()
cv.release()
# DO SOME REALLY LONG AND COMPLICATED THING

# Produce one item by Downloader
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()

在这里,make_an_item_available()可能与将下载的数据附加到缓冲区和/或设置一些其他共享状态变量(例如在handle_close()中)有关.处理程序线程应在cv.release()之后执行其较长的任务,因此下载程序可以在此较长的任务中获取锁定并将新数据附加到缓冲区.

标签:asyncore,asynchronous,python
来源: https://codeday.me/bug/20191027/1946391.html