Tornado 文件下载Hash值不同 (阅读tornado源码记录)
作者:互联网
版本信息
python3.7
tornado==4.3.0
问题描述: 多次下载同样的文件,每次文件的hash均不相同.
下载文件的示例接口:
import tornado
from tornado.concurrent import futures
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
@run_on_executor
@tornado.web.asynchronous
class XXX_Handler(RequestHandler):
executor = futures.ThreadPoolExecutor()
def get():
data = open("/data/xxx.csv",encoding="utf-8")
chunk = data.read(65536)
while chunk:
self.write(chunk)
self.flush()
chunk = data.read(65536)
data.close()
self.set_status(200)
return self.finish()
推测可能与线程池有关,果真去掉@run_on_executor则正常.
好奇心驱使进行具体原因调查,后面涉及到Tornado相关源码的查看与记录。
- 通过
RequestHandler
的flush
方法 检查到self.stream.write 中的 data每次的hash都是相同,继续向下调查。
else:
if callback is not None:
self._write_callback = stack_context.wrap(callback)
else:
future = self._write_future = Future()
data = b"\r\n".join(lines) + b"\r\n\r\n"
if chunk:
data += self._format_chunk(chunk)
self._pending_write = self.stream.write(data)
self._pending_write.add_done_callback(self._on_write_complete)
return future
- 查看self.stream如何产生的, 这里就看到TCP server的class,来监听socket连接的请求,使用sock的文件描述记录下载, 分配一个handler来处理, 添加一个读事件(事件都是主线程来处理), 然后获取connection连接, 实例化IOStream 来处理消息的接收和响应。
stream = IOStream(connection, io_loop=self.io_loop,
max_buffer_size=self.max_buffer_size,
read_chunk_size=self.read_chunk_size)
future = self.handle_stream(stream, address)
if future is not None:
self.io_loop.add_future(future, lambda f: f.result())
except Exception:
app_log.error("Error in connection callback", exc_info=True)
3.知道通过IOstream来传送数据,查看它的write方法.主要就是按照大小将上层传过来的数据切分到指定大小,
(1).通过self._handle_write来发送 self._write_buffer保存的chunk数据
if not self._connecting:
self._handle_write()
if self._write_buffer:
self._add_io_state(self.io_loop.WRITE)
self._maybe_add_error_listener()
return future
(2). 查看self._handle_write, self._handle_write是ThreadPoolExecutor的线程池来处理的,发生(Resource temporarily unavailable)错误失败, 会返回到上面的函数中添加写事件来处理(主线程) self._add_io_state(self.io_loop.WRITE)
。
如果发送错误比较多,这就导致出现多线程都在写self._write_buffer
的问题, 会出现顺序错误或者重复数据的问题从而导致hash结果不同。
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
self._write_buffer_frozen = True
break
else:
if not self._is_connreset(e):
# Broken pipe errors are usually caused by connection
# reset, and its better to not log EPIPE errors to
# minimize log spam
gen_log.warning("Write error on %s: %s",
self.fileno(), e)
self.close(exc_info=True)
return
测试使用硬核的方法,失败后去除掉 主线程重试的写事件,而是交给当前线程一直重试。。。这个测试可以获得正确的hash,不过最好是去掉@run_on_executor
的使用.
标签:Hash,Tornado,self,write,源码,._,future,data,chunk 来源: https://www.cnblogs.com/dncey/p/16672474.html