C++实现epoll echo服务器

时间:2022-07-24
本文章向大家介绍C++实现epoll echo服务器,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

epoll简介

通常来说,实现处理tcp请求,为一个连接一个线程,在高并发的场景,这种多线程模型与Epoll相比就显得相形见绌了。epoll是linux2.6内核的一个新的系统调用,epoll在设计之初,就是为了替代select, poll线性复杂度的模型,epoll的时间复杂度为O(1), 也就意味着,epoll在高并发场景,随着文件描述符的增长,有良好的可扩展性。

  • selectpoll监听文件描述符list,进行一个线性的查找 O(n)
  • epoll: 使用了内核文件级别的回调机制O(1)

关键函数

  • epoll_create1: 创建一个epoll实例,返回文件描述符
  • epoll_ctl: 将监听的文件描述符添加到epoll实例中,实例代码为将标准输入文件描述符添加到epoll中
  • epoll_wait: 等待epoll事件从epoll实例中发生, 并返回事件以及对应文件描述符

epoll 关键的核心数据结构如下:

```cpp

typedef union epoll_data

{

void *ptr;

int fd;

uint32_t u32;

uint64_t u64;

} epoll_data_t;

struct epoll_event

{

uint32_t events; / Epoll events /

epoll_data_t data; / User data variable /

};

```

epoll高效原理

epoll使用RB-Tree红黑树去监听并维护所有文件描述符,RB-Tree的根节点

调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件.

epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已.

那么,这个准备就绪list链表是怎么维护的呢?

当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。

epoll相比于select并不是在所有情况下都要高效,例如在如果有少于1024个文件描述符监听,且大多数socket都是出于活跃繁忙的状态,这种情况下,select要比epoll更为高效,因为epoll会有更多次的系统调用,用户态和内核态会有更加频繁的切换。

epoll高效的本质在于:

  • 减少了用户态和内核态的文件句柄拷贝
  • 减少了对可读可写文件句柄的遍历
  • mmap 加速了内核与用户空间的信息传递,epoll是通过内核与用户mmap同一块内存,避免了无谓的内存拷贝
  • IO性能不会随着监听的文件描述的数量增长而下降
  • 使用红黑树存储fd,以及对应的回调函数,其插入,查找,删除的性能不错,相比于hash,不必预先分配很多的空间

epoll实现echo server

借鉴TCP Echo Server Example in C++ Using Epoll的实现

#ifndef __EPOLLER_H__
#define __EPOLLER_H__

#include <string>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <unordered_map>

#define MAX_PENDING 1024
#define BUFFER_SIZE 1024

class Handler {
public:
	virtual ~Handler() {}
	virtual int handle(epoll_event e) = 0;
};

/**
 * epoll 事件轮询
 */ 
class IOLoop {
public:
	static IOLoop *Instance()
	{
		static IOLoop instance;
		return &instance;
	}
	~IOLoop() 
	{
		for (auto it : handlers_) {
			delete it.second;
		}
	}

	void start()
	{
		const uint64_t MAX_EVENTS = 10;
		epoll_event events[MAX_EVENTS];
		while (true)
		{
			// -1 只没有事件一直阻塞
			int nfds = epoll_wait(epfd_, events, MAX_EVENTS, -1/*Timeout*/);
			for (int i = 0; i < nfds; ++i) {
				int fd = events[i].data.fd;
				Handler* handler = handlers_[fd];
				handler->handle(events[i]);
			}
		}
	}

	void addHandler(int fd, Handler* handler, unsigned int events)
	{
		handlers_[fd] = handler;
		epoll_event e;
		e.data.fd = fd;
		e.events = events;

		if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &e) < 0) {
			std::cout << "Failed to insert handler to epoll" << std::endl;
		}
	}

	void modifyHandler(int fd, unsigned int events) 
	{
		struct epoll_event event;
		event.events = events;
		epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &event);
	}

	void removeHandler(int fd) 
	{
		Handler* handler = handlers_[fd];
		handlers_.erase(fd);
		delete handler;
		//将fd从epoll堆删除
		epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, NULL);
	}

private:
	IOLoop()
	{
		epfd_ = epoll_create1(0);  //flag=0 等价于epll_craete
		if (epfd_ < 0) {
			std::cout << "Failed to create epoll" << std::endl;
			exit(1);
		}
	}

private:
	int epfd_;
	std::unordered_map<int, Handler*> handlers_;
};

class EchoHandler : public Handler {
public:
	EchoHandler() {}
	virtual int handle(epoll_event e) override
	{
		int fd = e.data.fd;
		if (e.events & EPOLLHUP) {
			IOLoop::Instance()->removeHandler(fd);
			return -1;
		}

		if (e.events & EPOLLERR) {
			return -1;
		}

		if (e.events & EPOLLOUT)
		{
			if (received > 0)
			{
				std::cout << "Writing: " << buffer << std::endl;
				if (send(fd, buffer, received, 0) != received)
				{
					std::cout << "Error writing to socket" << std::endl;
				}
			}

			IOLoop::Instance()->modifyHandler(fd, EPOLLIN);
		}

		if (e.events & EPOLLIN)
		{
			std::cout << "read" << std::endl;
			received = recv(fd, buffer, BUFFER_SIZE, 0);
			if (received < 0) {
				std::cout << "Error reading from socket" << std::endl;
			}
			else if (received > 0) {
				buffer[received] = 0;
				std::cout << "Reading: " << buffer << std::endl;
				if (strcmp(buffer, "stop") == 0) {
					std::cout << "stop----" << std::endl;
				}
			}

			if (received > 0) {
				IOLoop::Instance()->modifyHandler(fd, EPOLLOUT);
			} else {
				std::cout << "disconnect fd=" << fd << std::endl;
				IOLoop::Instance()->removeHandler(fd);
			}
		}

		return 0;
	}

private:
	int received = 0;
	char buffer[BUFFER_SIZE];
};

class ServerHandler : public Handler {
public:
	ServerHandler(int port)
	{
		int fd;
		struct sockaddr_in addr;
		memset(&addr, 0, sizeof(addr));

		if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
		{
			std::cout << "Failed to create server socket" << std::endl;
			exit(1);
		}

		addr.sin_family = AF_INET;
		addr.sin_addr.s_addr = htonl(INADDR_ANY);
		addr.sin_port = htons(port);

		if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
		{
			std::cout << "Failed to bind server socket" << std::endl;
			exit(1);
		}

		if (listen(fd, MAX_PENDING) < 0)
		{
			std::cout << "Failed to listen on server socket" << std::endl;
			exit(1);
		}
		setnonblocking(fd);

		IOLoop::Instance()->addHandler(fd, this, EPOLLIN);
	}

	virtual int handle(epoll_event e) override
	{
		int fd = e.data.fd;
		struct sockaddr_in client_addr;
		socklen_t ca_len = sizeof(client_addr);

		int client = accept(fd, (struct sockaddr*)&client_addr, &ca_len);

		if (client < 0)
		{
			std::cout << "Error accepting connection" << std::endl;
			return -1;
		}

		std::cout << "accept connected: " << inet_ntoa(client_addr.sin_addr) << std::endl;
		Handler* clientHandler = new EchoHandler();
		IOLoop::Instance()->addHandler(client, clientHandler, EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR);
		return 0;
	}

private:
	void setnonblocking(int fd)
	{
		int flags = fcntl(fd, F_GETFL, 0);
		fcntl(fd, F_SETFL, flags | O_NONBLOCK);
	}
};

#endif /* __EPOLLER_H__ */

启动一个服务只需如下:

#include "Epoller.h"

int main(int argc, char** argv) {
	ServerHandler serverhandler(8877);
	IOLoop::Instance()->start();
	return 0;
}