12 | Tornado源码分析:BaseIOStream 对象(上)

时间:2022-07-25
本文章向大家介绍12 | Tornado源码分析:BaseIOStream 对象(上),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

hello 大家好 通过前几期我们已经聊了 Tornado 中协程的创建、运行,本期我们开始聊聊 tornado 中 网络读写数据处理相关的内容,这部分还是比较复杂的我们打算拆分成几期来聊。

本期我们先聊聊大体的概念性质的和整体部分代码的概况。tornado.iostream 模块提供了一个向非阻塞文件或者socket句柄写数据或者读数据的工具,其中 BaseIOStream 用于读写的通用接口,SSLIOStream 是IOStream 的SSl 封装版本,PipeIOStream 是基于管道的IOStream 实现的。

我们先看一下我整理过后的这部分的核心代码逻辑:

# -*- encoding: utf-8 -*-
# !/usr/bin/python
"""
@File    : BaseIOStream_Learn.py
@Time    : 2020/09/12 15:24
@Author  : haishiniu
@Software: PyCharm
"""

import numbers
import socket
import sys
import errno
from tornado import ioloop, stack_context
from tornado.concurrent import TracebackFuture
from tornado.iostream import UnsatisfiableReadError, StreamBufferFullError
from tornado.log import app_log, gen_log
from tornado.util import errno_from_exception


class BaseIOStream(object):
    def __init__(self, io_loop=None, max_buffer_size=None,
                 read_chunk_size=None, max_write_buffer_size=None):
        self.io_loop = io_loop or ioloop.IOLoop.current()
        self.max_buffer_size = max_buffer_size or 104857600
        self.read_chunk_size = min(read_chunk_size or 65536,
                                   self.max_buffer_size // 2)
        self._read_buffer = bytearray()
        self._read_buffer_pos = 0
        self._read_buffer_size = 0
        self._read_bytes = None
        self._read_callback = None
        self._read_future = None
        self._state = None

    def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
                   partial=False):
        future = self._set_read_callback(callback)
        assert isinstance(num_bytes, numbers.Integral)
        self._read_bytes = num_bytes
        self._read_partial = partial
        self._streaming_callback = stack_context.wrap(streaming_callback)
        try:
            self._try_inline_read()
        except:
            if future is not None:
                future.add_done_callback(lambda f: f.exception())
            raise
        return future

    def _set_read_callback(self, callback):
        assert self._read_callback is None, "Already reading"
        assert self._read_future is None, "Already reading"

        if callback is not None:
            self._read_callback = stack_context.wrap(callback)
        else:
            self._read_future = TracebackFuture()
        return self._read_future

    def _try_inline_read(self):

        self._run_streaming_callback()
        pos = self._find_read_pos()
        if pos is not None:
            self._read_from_buffer(pos)
            return

        self._check_closed()
        try:
            pos = self._read_to_buffer_loop()
        except Exception:
            self._maybe_run_close_callback()
            raise

        if pos is not None:
            self._read_from_buffer(pos)
            return
        if self.closed():
            self._maybe_run_close_callback()
        else:
            self._add_io_state(ioloop.IOLoop.READ)

    def _find_read_pos(self):
        if (self._read_bytes is not None and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))):
            num_bytes = min(self._read_bytes, self._read_buffer_size)
            return num_bytes

        return None

    def _read_from_buffer(self, pos):
        self._read_bytes = self._read_delimiter = self._read_regex = None
        self._read_partial = False
        self._run_read_callback(pos, False)

    def _run_read_callback(self, size, streaming):
        if 1:
            pass
        else:
            callback = self._read_callback
            self._read_callback = self._streaming_callback = None
            if self._read_future is not None:
                assert callback is None
                self._read_future = None

                future.set_result(self._consume(size))

        if callback is not None:
            assert (self._read_future is None) or streaming
            self._run_callback(callback, self._consume(size))
        else:
            self._maybe_add_error_listener()

    def _run_callback(self, callback, *args):
        def wrapper():
            self._pending_callbacks -= 1
            try:
                return callback(*args)
            except Exception:
                app_log.error("Uncaught exception, closing connection.",
                              exc_info=True)

                self.close(exc_info=True)
                raise
            finally:
                self._maybe_add_error_listener()
        with stack_context.NullContext():
            self._pending_callbacks += 1
            self.io_loop.add_callback(wrapper)

    def _maybe_add_error_listener(self):
        if self._pending_callbacks != 0:
            return
        if self._state is None or self._state == ioloop.IOLoop.ERROR:
            if self.closed():
                self._maybe_run_close_callback()
            elif (self._read_buffer_size == 0 and
                  self._close_callback is not None):
                self._add_io_state(ioloop.IOLoop.READ)

    def _consume(self, loc):
        if loc == 0:
            return b""
        assert loc <= self._read_buffer_size

        b = (memoryview(self._read_buffer)
             [self._read_buffer_pos:self._read_buffer_pos + loc]
             ).tobytes()
        self._read_buffer_pos += loc
        self._read_buffer_size -= loc

        if self._read_buffer_pos > self._read_buffer_size:
            del self._read_buffer[:self._read_buffer_pos]
            self._read_buffer_pos = 0
        return b

    def _add_io_state(self, state):
        if self.closed():
            # connection has been closed, so there can be no future events
            return
        if self._state is None:
            self._state = ioloop.IOLoop.ERROR | state
            with stack_context.NullContext():
                self.io_loop.add_handler(
                    self.fileno(), self._handle_events, self._state)
        elif not self._state & state:
            self._state = self._state | state
            self.io_loop.update_handler(self.fileno(), self._state)

    def _read_to_buffer_loop(self):
        try:
            if self._read_bytes is not None:
                target_bytes = self._read_bytes
            else:
                target_bytes = 0
            next_find_pos = 0
            self._pending_callbacks += 1
            while not self.closed():
                if self._read_to_buffer() == 0:
                    break
                self._run_streaming_callback()

                if (target_bytes is not None and
                        self._read_buffer_size >= target_bytes):
                    break

                if self._read_buffer_size >= next_find_pos:
                    pos = self._find_read_pos()
                    if pos is not None:
                        return pos
                    next_find_pos = self._read_buffer_size * 2
            return self._find_read_pos()
        finally:
            self._pending_callbacks -= 1

    def _handle_events(self, fd, events):
        if self.closed():
            gen_log.warning("Got events for closed stream %s", fd)
            return
        try:
            ...
            if events & self.io_loop.READ:
                self._handle_read()
            state = self.io_loop.ERROR
            if self.reading():
                state |= self.io_loop.READ
            if self.writing():
                state |= self.io_loop.WRITE
            if state == self.io_loop.ERROR and self._read_buffer_size == 0:
                state |= self.io_loop.READ
            if state != self._state:
                assert self._state is not None, 
                    "shouldn't happen: _handle_events without self._state"
                self._state = state
                self.io_loop.update_handler(self.fileno(), self._state)

        except UnsatisfiableReadError as e:
            gen_log.info("Unsatisfiable read, closing connection: %s" % e)
            self.close(exc_info=True)
        except Exception:
            gen_log.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close(exc_info=True)
            raise

    def _handle_read(self):
        try:
            pos = self._read_to_buffer_loop()
        except UnsatisfiableReadError:
            raise
        except Exception as e:
            gen_log.warning("error on read: %s" % e)
            self.close(exc_info=True)
            return
        if pos is not None:
            self._read_from_buffer(pos)
            return
        else:
            self._maybe_run_close_callback()

    def _read_to_buffer(self):
        while True:
            try:
                chunk = self.read_from_fd()
            except (socket.error, IOError, OSError) as e:
                if errno_from_exception(e) == errno.EINTR:
                    continue
                if self._is_connreset(e):
                    self.close(exc_info=True)
                    return
                self.close(exc_info=True)
                raise
            break
        if chunk is None:
            return 0
        self._read_buffer += chunk
        self._read_buffer_size += len(chunk)
        if self._read_buffer_size > self.max_buffer_size:
            gen_log.error("Reached maximum read buffer size")
            self.close()
            raise StreamBufferFullError("Reached maximum read buffer size")
        return len(chunk)

好的,本期我们就先简单的介绍了一下BaseIOStream 这个类的核心逻辑,大家可以先自行看看是否能看懂这部分的逻辑,下期我们会给出这部分核心代码的批注,敬请期待,感谢大家的支持,谢谢!