muduo TcpServer粗略过程
使用 muduoexamplesasiochatserver.cc 作例子
首先需要知道:
EventLoop:一个事件分发器类,拥有 Poller 对象,事件处理函数是 loop(),在这里捕获注册的 Channel 事件,并调用相应的回调函数。
while (!quit_)
{
...
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
// 通过 revents_ 来处理响应的事件
currentActiveChannel_->handleEvent(pollReturnTime_);
}
...
}
下面这几个类,都拥有一个 EventLoop * 对象,他们的事件注册,就注册到他们分配到的 EventLoop * 对象中。
Channel:一个封装 fd 的类,方便向 EventLoop 注册事件.比如,可读,可写事件。
Acceptor:负责接受连接的类,拥有一个Channel对象,向 EventLoop 注册为可读,则当有人来连接的时候,就能调用相应的回调。
TcpConnecton:也拥有一个 Channel 对象,已建立连接以后,再生成的对象,用来处理,连接之后的操作,发送,接受,断开。
然后开始来看吧,客户使用的代码,极其简单,就建立起一个 ChatServer . 这里的 loop 为 baseloop 处于主线程的条件下。
ChatServer server(&loop, serverAddr);
server.start();
loop.loop();
再来看看 server.start() 干了什么.先在构造函数中,初始化了几个成员,threadPool_,acceptor_。设置了acceptor_的回调.
1.threadPool_.start(),开启创建 N 个 EventLoopThread 个对象,即开了了 N 个 EventLoop 对象,并执行他们的 loop() 函数。
2.loop_runInLoop(..listen..),runInLoop 简单说明一下,就是保证你的函数执行的环境是跟你的 EventLoop 处于同一个线程中,保证同步。 最终,这个函数会调用 acceptor_.listen(),这个函数的本质就是向他的 loop 注册可读事件,可读的话就是有人来连接啦。可以自己跟踪下。
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
hostport_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
// Acceptor::handleRead函数中会回调用TcpServer::newConnection
// _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
}
void TcpServer::start()
{
....
threadPool_->start(threadInitCallback_);
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
....
}
到这里为止,就已经注册了 accept fd 的可读事件,那么当有连接到来的时候,就会触发,在 loop() 函数中,然后去调用响应的回调函数,在 TcpServer 构造函数中可以看到,注册了 acceptor_ 可读回调是 TcpServer::newConnecton() ,所以,就来看看这个函数又看了什么。
1.可以看到,通过传过来的参数,创建一个 TcpConnecton 对象,并存储到 connections_ 中,这是一个 map 容器。threadPool_->getNextLoop() 就是在线程池中分配一个 loop 给他,负载均衡吧。如果之前设置的线程数为0,则大家共用主线程的 loop.
2.注册,响应的回调函数,可读,可写,有连接(就是调用用户设置的有连接来的回调函数),关闭。
3.最后调用,ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));这个函数里面,实际就是将 TcpConnection 中的 Channel 注册为可读,这样就能开始接收 client 的数据。然后也可以调用 TcpConnection 的 send 来发数据。
这里的接受和发送设计一个 Buffer 类,这是一个 缓存类,非阻塞I/O必备,这里就不说先。
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
// 按照轮叫的方式选择一个EventLoop
EventLoop* ioLoop = threadPool_->getNextLoop();
// 记录那个 nextConnId_ 有什么意义吗?
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
// getLocalAddr listenAddr 不应该就是 localAddr 了吗?
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
// 每个TcpConnection 里面都有一个 Channel
// 每一个 channel 里面 有一个 loop
// 每一个 loop 里面有一个 poller_(),每一个loop不同线程
// 每一个 loop 里面有一个 vector<Channel *>
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
// connections_ is a map<string,TcpConnectionPtr>
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);//改变connection 改为用户定义的,跟构造函数的不一样
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe//跨线程的调用
// TcpConnection所对应的通道加入到Poller关注
// 嗯,这个时候这里调用 ioLoop->runInLoop() 的话,似乎就不是他那里的环境
// 没错,这里的 ioloop 是在其他线程中创建的。
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//各自线程调用
}
当已经建立连接好以后,我们再来看看,在最初的 loop() 函数中的 Channel::handleEvent() 函数.可以看到,通过判断事件来种类,来调用响应的回调函数。
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
大致逻辑是这样,服务端的过程比客户端的过程要复杂一些,毕竟服务端要管理这么多个连接。客户端的过程大家可以自己去看一下。本文只是粗略的解释了服务端的过程,其中涉及的多线程应该注意的事项还无功力能总结。
- 修改Apache的超时设置,解决长连接请求超时问题
- Oracle 12cR2初体验(r11笔记第91天)
- MySQL中的undo截断(r11笔记第89天)
- Linux系统 df 命令显示异常、分区丢失问题解决
- MySQL主从、字典死锁、连接数的Python监控脚本
- MySQL Online DDL(二)(r11笔记第88天)
- 转-Android上面运行golang
- Golang适合高并发场景的原因分析
- 浅谈MySQL中的事务隔离级别(r11笔记第86天)
- 巧用echo命令解决Samba批量添加用户难题
- 分分钟搭建MySQL Group Replication测试环境(r11笔记第82天)
- MySQL 5.7 Group Replication错误总结(r11笔记第84天)
- 空结构体struct{}解析
- 动态创建MySQL Group Replication的节点(r11笔记第84天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 015.Nginx重定向
- 明了 | MongoDB 外键的基本使用
- 优雅 | koa处理异常
- iOS开发之Context Menus
- 快速解释如何使用pandas的inplace参数
- bam格式转bigWig
- 46. Vue路由传参的基本使用
- H3C在端口同时配置MAC地址认证和802.1x
- proxmox notes
- Unix每分钟监控进程的状态
- webpack实战——资源输入与输出
- R语言读取 xlsx 和xls 文件
- pytest文档42-fixture参数化params
- 搭建node服务(三):使用TypeScript
- Antd for Vue使用Form组件报错You cannot set a form field before rendering