Redis 6.0 IO线程功能分析
Redis多线程原理
Redis 6.0
的亮点之一就是支持多线程,Redis 分 主线程
和 IO线程
,IO线程
只用于读取客户端的命令和发送回复数据给客户端,处理客户端命令还是在 主线程
进行,如下图所示:
从上图可知,主线程
主要负责接收客户端连接,并且分发到各个 IO线程
,而 IO线程
负责读取客户端命令。命令读取完成后,由 主线程
执行命令。主线程
执行完命令后,再由 IO线程
把回复数据发送给客户端。
读者可能会问,为什么处理命令不在 IO线程
进行,我觉得主要有两个原因:
- 如果处理命令在
IO线程
进行,那么就会涉及到竞争的问题。因为 Redis 的数据库是共享的,所以如果多个线程同时操作数据库,那么就必须要对数据库进行上锁,而上锁是一个比较耗时的操作(因为上锁可能会导致线程上下文切换)。 - 由于 Redis 6.0 以前一直都是由单线程执行命令的,所以如果要改为多线程执行命令,那么需要修改大量代码,而且可能会引入新的问题(比如bug)。所以,为了稳定性,继续使用单线程执行命令是最好的选择。
为什么要使用多线程呢?主要为了使用多核CPU的优势,下面是使用多线程的测试数据(数据来源网络):
从上面的测试结果可以看出,多线程版本的 Redis 读写QPS都要比单线程版本的高。
Redis 多线程实现
要开启 Redis 的 IO线程
功能,可以在配置文件中加入以下配置项:
io-threads-do-reads yes # 开启IO线程
io-threads 6 # 设置IO线程数
Redis 在启动时会根据配置文件中设置的 IO线程
数来启动 IO线程
,启动 IO线程
在函数 initThreadedIO()
中完成,代码如下:
void initThreadedIO(void) {
io_threads_active = 0;
if (server.io_threads_num == 1) return;
...
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
initThreadedIO()
函数的主要工作是:
- 为每个IO线程创建一个链表,用于放置要进行IO操作的客户端连接。
- 为每个IO线程创建一个锁,用于主线程与IO线程的通信。
- 调用
pthread_create()
系统调用来创建IO线程,IO线程的主体函数是IOThreadMain()
。
下面我们来分析一下IO线程的主体函数主要完成的工作:
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
...
while (1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
if (io_threads_pending[id] == 0) { // 不等于0表示有客户端连接需要处理
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
...
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
...
}
}
IO线程的主体函数主要完成以下几个操作:
- 等待主线程分配客户端连接(对应IO线程的
io_threads_list
链表不为空)。 - 判断当前是进行读操作还是写操作(
io_threads_op
等于IO_THREADS_OP_WRITE
表示要进行写操作,而io_threads_op
等于IO_THREADS_OP_READ
表示要进行读操作)。如果是进行写操作,那么就调用writeToClient()
函数向客户端连接进行发送数据。如果是读操作,那么就调用readQueryFromClient()
函数读取客户端连接的请求。 - 完成对客户端连接的读写操作后,需要清空对应IO线程的
io_threads_list
链表和计数器io_threads_pending
,用于通知主线程已经完成读写操作。
那么,主线程是怎样分配客户端连接给各个IO线程的呢?
主线程在接收到客户端连接后,会把客户端连接添加到事件驱动库中监听其读事件,读事件的回调函数为 readQueryFromClient()
。也就是说,当客户端连接可读时会触发调用 readQueryFromClient()
函数,而 readQueryFromClient()
函数会调用 postponeClientRead()
函数判断当前 Redis 是否开启了 IO线程
功能,代码如下:
int postponeClientRead(client *c) {
if (io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
postponeClientRead()
函数主要判断 Redis 是否开启了 IO线程
功能,如果开启了就调用 listAddNodeHead()
函数把客户端连接添加到 clients_pending_read
链表中,并且设置客户端连接的 CLIENT_PENDING_READ
标志位,表示当前连接已经在 clients_pending_read
链表中,防止二次添加。
把客户端连接添加到 clients_pending_read
链表后,主线程会在 handleClientsWithPendingReadsUsingThreads()
函数中把客户端连接分配给各个 IO线程
。代码如下:
int handleClientsWithPendingReadsUsingThreads(void) {
...
/* 分配给各个IO线程 */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 设置各个IO线程负责的客户端连接数
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主线程也要负责一部分客户端连接的读写操作
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 等待所有IO线程完成
while (1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
...
// 执行各个客户端连接的命令
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
continue;
}
}
processInputBuffer(c);
}
return processed;
}
handleClientsWithPendingReadsUsingThreads()
函数主要完成以下几个操作:
- 分配客户端连接给各个
IO线程
(添加到对应IO线程
的io_threads_list
链表中),分配策略为轮询。 - 设置各个
IO线程
负责的客户端连接数io_threads_pending
。 - 处理主线程负责那部分客户端连接的读写操作。
- 等待所有
IO线程
完成读取客户端连接请求的命令。 - 执行各个客户端连接请求的命令。
前面说过,IO线程
在完成读取客户端连接的请求后,会把 io_threads_pending
计数器清零,主线程就是通过检测 io_threads_pending
计数器来判断是否所有 IO线程
都完成了对客户端连接的读取命令操作。
但这里要吐槽一下的是,在等待 IO线程
读取客户端请求时,居然用了一个死循环来等待,这样有可能会导致CPU使用率飙升的问题,有可能影响其他服务的运行(不知道作者怎么想的)。我觉得比较合适的方式是,各个 IO线程
完成了读取命令操作后,通过一个信号来通知主线程。
- hiveQL去重
- springboot高并发redis细粒度加锁(key粒度加锁)
- java使用spark/spark-sql处理schema数据
- redis的发布订阅模式pubsub
- linux命令和awk
- django集成celery之callback方式link_error和on_failure
- 使用beanstalkd实现定制化持续集成过程中pipeline
- 用SQLite查看编辑android导出的微信聊天记录
- 使用HDFS客户端java api读取hadoop集群上的信息
- 使用Fabric一键批量部署上线/线上环境监控
- springboot使用zookeeper(curator)实现注册发现与负载均衡
- django使用xlwt导出excel文件
- redis的sentinel主从切换(failover)与Jedis线程池自动重连
- Hadoop通过HCatalog编写Mapreduce任务访问hive库中schema数据
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Qt音视频开发37-USB摄像头解码ffmpeg方案
- Navicat 环境测试 innodb 的事务隔离级别 产生的幻读 和 不可重复读
- leetcode之最长回文串
- Discourse CentOS 8 全新安装手册
- 3分钟短文:Laravel路子真野啊!路由昵称前缀中间件
- CSS中重要的BFC概念
- Redis哨兵集群中哨兵挂了,主从库还能切换吗?
- 你的 Redis 为什么变慢了?
- 解决Maven依赖冲突的好帮手,这款IDEA插件了解一下?
- Python爬虫实现HTTP网络请求多种实现方式
- 在tensorflow以及keras安装目录查询操作(windows下)
- Python调用OpenCV实现图像平滑代码实例
- php微信公众号开发之音乐信息
- Laravel关联模型中过滤结果为空的结果集(has和with区别)
- php微信公众号开发之二级菜单