13 | Tornado源码分析:BaseIOStream 对象(下)

时间:2022-07-25
本文章向大家介绍13 | Tornado源码分析:BaseIOStream 对象(下),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

hello 大家好 上期我们已经介绍了 tornado.iostream 模块,也整理了核心代码,不知大家是否理解其中的运作原理,本期我们对这部分的源码进行批注并进行总结。 # -*- encoding: utf-8 -*- # !/usr/bin/python """ @File : __init__.py.py @Time : 2020/09/13 15:24 @Author : haishiniu @Software: PyCharm """ import numbers import socket import sys import errno from tornado import ioloop, stack_context from tornado.concurrent import TracebackFuture from tornado.iostream import UnsatisfiableReadError, StreamBufferFullError from tornado.log import app_log, gen_log from tornado.util import errno_from_exception class BaseIOStream(object): def __init__(self, io_loop=None, max_buffer_size=None, read_chunk_size=None, max_write_buffer_size=None): self.io_loop = io_loop or ioloop.IOLoop.current() self.max_buffer_size = max_buffer_size or 104857600 # 每次<fd>.read调用最多读取的字节数 self.read_chunk_size = min(read_chunk_size or 65536,self.max_buffer_size // 2) # 读缓冲区:读缓冲区中的数据分为已经被消费 + 尚未被消费的。 self._read_buffer = bytearray() # 读指针指向第一个尚未被消费的字节。随着缓冲区中的数据被消费,读指针会右移。 # 当读指针大于缓冲区大小时,缓冲区会向右收缩,释放空间。 self._read_buffer_pos = 0 # 读缓冲区的大小(特指未被消费的那部分缓冲区的大小) self._read_buffer_size = 0 # read_bytes()方法的第一个参数 self._read_bytes = None # read callback 当读操作完成之后,会调用该回调函数 self._read_callback = None # read future 当读操作完成时,会将数据或异常信息填充到该对象中; self._read_future = None # 关注的事件 self._state = None # 异步的读取指定数量的字节。 # 如果指定了callback,那么当读取到指定数量的数据之后,会使用数据作为第一个参数调用这个回调函数; # 如果没有指定callback,则返回一个Future对象。 # 本次我们只解析 streaming_callback、partial为 默认值的情况。 def read_bytes(self, num_bytes, callback=None, streaming_callback=None, partial=False): future = self._set_read_callback(callback) assert isinstance(num_bytes, numbers.Integral) self._read_bytes = num_bytes self._read_partial = partial self._streaming_callback = stack_context.wrap(streaming_callback) try: self._try_inline_read() except: if future is not None: future.add_done_callback(lambda f: f.exception()) raise return future # 如果callback为None,则返回一个Future对象,当读操作完成时,会将数据或异常信息填充到该对象中; # 否则,将其设置为read callback,当读操作完成之后,会调用该回调函数。 def _set_read_callback(self, callback): # 如果 read callback 和 read future 都不为None,说明已经有一个读操作正在执行,抛出错误。 assert self._read_callback is None, "Already reading" assert self._read_future is None, "Already reading" if callback is not None: self._read_callback = stack_context.wrap(callback) else: self._read_future = TracebackFuture() return self._read_future # 尝试从读缓冲区,完成当前的读操作。 # 如果读操作能够被满足,则在下一次IOLoop迭代时,执行 read callback;否则,将文件描述符添加到 IOLoop上,并且关注其上的读操作。 def _try_inline_read(self): self._run_streaming_callback() # 1,尝试从读缓冲区完成当前挂起的读操作: pos = self._find_read_pos() if pos is not None: # 1.1,如果可以完成,那么从 read buffer 读取数据之后,使用数据调用 read callback,或 填充read future self._read_from_buffer(pos) return self._check_closed() try: # 2.尝试从fd上读取能够完成当前读操作的数据 pos = self._read_to_buffer_loop() except Exception: self._maybe_run_close_callback() raise # 2.1 如果能够从fd上读取到能完成当前读操作的数据,那么从read buffer读取数据 if pos is not None: self._read_from_buffer(pos) return # 3,否则将fd添加到IOLoop,并关注其上的读操作 if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ) # 尝试从读缓冲区中找到满足 # 当前挂起的读请求的位置,如果当前读请求能够被满足,则返回这个位置,否则返回None。比如,当前的读请求是read_bytes(num_bytes, partial=False),那么当读缓冲区中有 # 大于等于 num_bytes 个字节时,则返回num_bytes 否则,返回None def _find_read_pos(self): if (self._read_bytes is not None and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))): num_bytes = min(self._read_bytes, self._read_buffer_size) return num_bytes return None # 从读缓冲区中,完成当前挂起的读请求。 def _read_from_buffer(self, pos): self._read_bytes = self._read_delimiter = self._read_regex = None self._read_partial = False self._run_read_callback(pos, False) def _run_read_callback(self, size, streaming): if 1: pass else: callback = self._read_callback self._read_callback = self._streaming_callback = None if self._read_future is not None: assert callback is None # 如果没有设置read callback,则将数据保存到read future future = self._read_future self._read_future = None future.set_result(self._consume(size)) if callback is not None: assert (self._read_future is None) or streaming # 如果设置了read callback,那么则在下一次IOLoop迭代时,调度它 self._run_callback(callback, self._consume(size)) else: self._maybe_add_error_listener() def _run_callback(self, callback, *args): def wrapper(): self._pending_callbacks -= 1 try: return callback(*args) except Exception: raise finally: self._maybe_add_error_listener() with stack_context.NullContext(): self._pending_callbacks += 1 self.io_loop.add_callback(wrapper) # 该方法用于检测:一个没有活跃读、写请求的连接是否被关闭,为此,必须监听读事件。然而,在连接刚建立的时候,执行这个操作是无用的, # 因为这种情况下,我们会立刻进行读写。在IOStream中,很多地方都插入了这个检查,当连接空闲时,那么就监听其上的读事件。 def _maybe_add_error_listener(self): if self._pending_callbacks != 0: return if self._state is None or self._state == ioloop.IOLoop.ERROR: if self.closed(): self._maybe_run_close_callback() elif (self._read_buffer_size == 0 and self._close_callback is not None): self._add_io_state(ioloop.IOLoop.READ) # 从 read buffer上读取loc个字节,并返回。 def _consume(self, loc): if loc == 0: return b"" assert loc <= self._read_buffer_size # 这里用到了memoryview,memoryview为支持buffer protocol的对象,提供了基于字节的访问接口。 # 使用memoryview不会发生内存拷贝。 b = (memoryview(self._read_buffer) [self._read_buffer_pos:self._read_buffer_pos + loc] ).tobytes() # 移动读指针 和 修改缓冲区大小 self._read_buffer_pos += loc self._read_buffer_size -= loc # 当 读指针 大于 缓冲区大小 的时候,会对缓冲区进行收缩: # 1,删除已经被消费内容的缓冲区 # 2,将读指针归零 if self._read_buffer_pos > self._read_buffer_size: del self._read_buffer[:self._read_buffer_pos] self._read_buffer_pos = 0 return b # 向事件处理函数添加`state` def _add_io_state(self, state): if self.closed(): return if self._state is None: self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext(): self.io_loop.add_handler( self.fileno(), self._handle_events, self._state) elif not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.fileno(), self._state) # 尝试从 fd上 读取期望数量的字节, # 如果读取到的数据,能够满足当前的读操作,则返回位置;否则返回None。 def _read_to_buffer_loop(self): try: if self._read_bytes is not None: target_bytes = self._read_bytes else: target_bytes = 0 next_find_pos = 0 self._pending_callbacks += 1 while not self.closed(): # 如果fd的输入缓冲区为空,则退出 if self._read_to_buffer() == 0: break self._run_streaming_callback() if (target_bytes is not None and self._read_buffer_size >= target_bytes): break if self._read_buffer_size >= next_find_pos: pos = self._find_read_pos() if pos is not None: return pos next_find_pos = self._read_buffer_size * 2 return self._find_read_pos() finally: self._pending_callbacks -= 1 def _handle_events(self, fd, events): if self.closed(): gen_log.warning("Got events for closed stream %s", fd) return try: if events & self.io_loop.READ: self._handle_read() # 在处理完读写事件之后,会更改关注的事件。 # 如果当前没有读操作,那么则不再关注读事件,否则关注; # 如果当前没有写操作,那么则不再关注写事件,否则关注; # 如果当前连接空闲(也就是没有读写操作,并且读缓冲区为空), # 则关注读操作(目的是检测连接是否断开),否则不关注; # 最后,更新关注的事件 state = self.io_loop.ERROR if self.reading(): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE if state == self.io_loop.ERROR and self._read_buffer_size == 0: state |= self.io_loop.READ if state != self._state: assert self._state is not None, "shouldn't happen: _handle_events without self._state" self._state = state self.io_loop.update_handler(self.fileno(), self._state) except UnsatisfiableReadError as e: gen_log.info("Unsatisfiable read, closing connection: %s" % e) self.close(exc_info=True) except Exception: gen_log.error("Uncaught exception, closing connection.", exc_info=True) self.close(exc_info=True) raise # 从fd上读取数据,并追加到读缓冲区的末尾。 # 该方法返回实际读取的自节数。如果没有读到任何数据,则返回0。 # 出错时,关闭fd,并抛出异常。 def _read_to_buffer(self): while True: try: chunk = self.read_from_fd() except (socket.error, IOError, OSError) as e: if errno_from_exception(e) == errno.EINTR: continue if self._is_connreset(e): self.close(exc_info=True) return self.close(exc_info=True) raise break if chunk is None: return 0 self._read_buffer += chunk self._read_buffer_size += len(chunk) if self._read_buffer_size > self.max_buffer_size: gen_log.error("Reached maximum read buffer size") self.close() raise StreamBufferFullError("Reached maximum read buffer size") return len(chunk)

好的,以上就是本期分享的源码批注,我们再来简单总结一下:

read_bytes(num_bytes)的大致执行流程是:

1. 如果read buffer有num_bytes个字节,goto 步骤4

2. 从fd的缓冲区读数据到read buffer,一直到:

读到了期望数量的数据,goto 步骤4

或者,fd的缓冲区被读空

3. 将fd 注册到IOLoop,并关注其上的读事件,每次IOLoop迭代的时候,如果fd上有读事件发生,则执行步骤2

4. 从read buffer上消费num_bytes个字节,

若设置了 read callback ,则在下一次IOLoop迭代的时候,执行callback

否则,将数据填充到 read future

好的,本期我们给出这部分核心代码的批注,希望以上内容对你有所帮助,感谢大家的支持,谢谢!