编程语言
首页 > 编程语言> > Tornado 文件下载Hash值不同 (阅读tornado源码记录)

Tornado 文件下载Hash值不同 (阅读tornado源码记录)

作者:互联网

版本信息

python3.7
tornado==4.3.0

问题描述: 多次下载同样的文件,每次文件的hash均不相同.

下载文件的示例接口:

import tornado
from tornado.concurrent import futures
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler

@run_on_executor
@tornado.web.asynchronous
class XXX_Handler(RequestHandler):
    executor = futures.ThreadPoolExecutor()

    def get():
        data = open("/data/xxx.csv",encoding="utf-8")
        chunk = data.read(65536)
        while chunk:
            self.write(chunk)
            self.flush()
            chunk = data.read(65536)
        data.close()
        self.set_status(200)
        return self.finish()        

推测可能与线程池有关,果真去掉@run_on_executor则正常.
好奇心驱使进行具体原因调查,后面涉及到Tornado相关源码的查看与记录。

  1. 通过RequestHandlerflush方法 检查到self.stream.write 中的 data每次的hash都是相同,继续向下调查。
        else:
            if callback is not None:
                self._write_callback = stack_context.wrap(callback)
            else:
                future = self._write_future = Future()
            data = b"\r\n".join(lines) + b"\r\n\r\n"
            if chunk:
                data += self._format_chunk(chunk)
            self._pending_write = self.stream.write(data)
            self._pending_write.add_done_callback(self._on_write_complete)
        return future
  1. 查看self.stream如何产生的, 这里就看到TCP server的class,来监听socket连接的请求,使用sock的文件描述记录下载, 分配一个handler来处理, 添加一个读事件(事件都是主线程来处理), 然后获取connection连接, 实例化IOStream 来处理消息的接收和响应
                stream = IOStream(connection, io_loop=self.io_loop,
                                  max_buffer_size=self.max_buffer_size,
                                  read_chunk_size=self.read_chunk_size)
            future = self.handle_stream(stream, address)
            if future is not None:
                self.io_loop.add_future(future, lambda f: f.result())
        except Exception:
            app_log.error("Error in connection callback", exc_info=True)

3.知道通过IOstream来传送数据,查看它的write方法.主要就是按照大小将上层传过来的数据切分到指定大小,
(1).通过self._handle_write来发送 self._write_buffer保存的chunk数据

        if not self._connecting:
            self._handle_write()
            if self._write_buffer:
                self._add_io_state(self.io_loop.WRITE)
            self._maybe_add_error_listener()
        return future

(2). 查看self._handle_write, self._handle_write是ThreadPoolExecutor的线程池来处理的,发生(Resource temporarily unavailable)错误失败, 会返回到上面的函数中添加写事件来处理(主线程) self._add_io_state(self.io_loop.WRITE)
如果发送错误比较多,这就导致出现多线程都在写self._write_buffer的问题, 会出现顺序错误或者重复数据的问题从而导致hash结果不同。

            except (socket.error, IOError, OSError) as e:
                if e.args[0] in _ERRNO_WOULDBLOCK:
                    self._write_buffer_frozen = True
                    break
                else:
                    if not self._is_connreset(e):
                        # Broken pipe errors are usually caused by connection
                        # reset, and its better to not log EPIPE errors to
                        # minimize log spam
                        gen_log.warning("Write error on %s: %s",
                                        self.fileno(), e)
                    self.close(exc_info=True)
                    return

测试使用硬核的方法,失败后去除掉 主线程重试的写事件,而是交给当前线程一直重试。。。这个测试可以获得正确的hash,不过最好是去掉@run_on_executor的使用.

标签:Hash,Tornado,self,write,源码,._,future,data,chunk
来源: https://www.cnblogs.com/dncey/p/16672474.html