[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架
时间:2022-05-05
本文章向大家介绍[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
简介:
本文描述了使用Go语言实现的、适应于Go语言并发模型的一种支持多路复用的网络协议处理框架,并提供了框架的代码实现。作者将这种框架用于网络游戏服务器中的协议处理,但也可用于其他领域。
应用背景:
在网络游戏服务器设计中,一般都会遇到协议多路复用的场景。比如登录服务器和玩家客户端之间有1:N的多个TCP连接;登录服务器和游戏服务器之间是1:1的TCP连接。玩家登录游戏的大致流程是这样的:
- 玩家连接登录服务器
- 登录服务器向数据库请求玩家数据
- 登录服务器获取到玩家数据,把玩家数据转发给游戏服务器进行加载包括创建玩家对象等
- 登录服务器获取到加载成功回应后,通知玩家客户端可以进入游戏世界
在3和4中,因为登录服务器和游戏服务器通常只有一个TCP连接,所有玩家数据都是通过这个连接进行传输,所以需要从协议包中区分出是哪个玩家的数据。通常这个区分的依据可以是玩家的角色名,但是也可以更通用一些,用一个数字ID来区分,这样就把协议包的分发处理和协议包中与游戏逻辑有关的内容分离开来。
协议说明:
通常网游的网络协议都是报文的形式,即使底层是使用TCP,也会用一些方法把数据拆分成一个个的报文(本文中称为协议包)。因此,本文也基于这一假设,但是对于具体的协议包格式,本文没有特别限制,只是要求协议包中能够容纳一个32字节的ID。
协议包的处理大概可以分为以下两种类型。其他更复杂的会话可以由以下两种类型组合而成。
- 发送一个数据包并等待回应。比如登录服务器等待游戏服务器加载玩家数据的结果通知。
- 发送一个数据包,不需要回应。比如游戏服务器加载玩家数据后,给登录服务器发送结果通知。
框架说明:
Go语言是一种支持高并发的编程语言,它支持高并发的方式是大量轻量级的goroutine并发执行。在每个goroutine中的操作基本上都是同步阻塞的,这样可以极大地简化程序逻辑,使得代码清晰易读,容易维护。基于这点,本文实现的框架的调用接口也是使用同步方式的。
- 如果一个协议包需要等待回应,就在调用函数上阻塞等待。这个调用的签名为: func (p *Connection) Query(data []byte) ([]byte, error) 注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
- 如果发送一个协议包是对于接收到的某个协议包的回应,则调用: func (p *Connection) Reply(query, answer []byte) error 注意:answer的控制权会转交给框架,因此函数调用后不能修改answer的内容。
- 如果一个协议包不需要回应,就直接调用发送函数: func (p *Connection) Write(data []byte) error 注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
- 调用者需要实现的接口:
- Socket。用于协议包的收发。基本上是net.TCPConn的简单封装,在头部加上一个协议包的长度。
- DataHandler。用于协议处理,即没有通过Query返回的协议包会分发给此接口处理。
- ErrorHandler。用于错误处理。当断线时,会调用此接口。
- IdentityHandler。用于读取和设置会话ID。
5. 关于goroutine安全的说明:
ErrorHandler和DataHandler的函数实现中不能直接调用(*Connection).Close,否则会导致死锁。
导出类型、函数和接口:
type Connection
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, ehErrorHandler) *Connection
func (p *Connection) Start()
func (p *Connection) Close()
func (p *Connection) Query(data []byte) (res []byte, err error)
func (p *Connection) Reply(query, answer []byte) error
func (p *Connection) Write(data []byte) error
type Socket interface {
Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {
Process([]byte)
}
type ErrorHandler interface {
OnError(error)
}
type IdentityHandler interface {
GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}
完整的代码实现:
package multiplexer
import (
"errors"
"sync"
"sync/atomic"
)
var (
ERR_EXIT = errors.New("exit")
)
type Socket interface {
Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {
Process([]byte)
}
type ErrorHandler interface {
OnError(error)
}
type IdentityHandler interface {
GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}
type Connection struct {
conn Socket
wg sync.WaitGroup
mutex sync.Mutex
applicants map[uint32]chan []byte
chexit chan bool
chsend chan []byte
chch chan chan []byte
dh DataHandler
ih IdentityHandler
eh ErrorHandler
identity uint32
}
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler)*Connection {
count := maxcount
if count < 1024 {
count = 1024
}
chch := make(chan chan []byte, count)
for i := 0; i < count; i++ {
chch <- make(chan []byte, 1)
}
return &Connection{
conn: conn,
applicants: make(map[uint32]chan []byte, count),
chsend: make(chan []byte, count),
chexit: make(chan bool),
chch: chch,
dh: dh,
ih: ih,
eh: eh,
}
}
func (p *Connection) Start() {
p.wg.Add(2)
go func() {
defer p.wg.Done()
p.recv()
}()
go func() {
defer p.wg.Done()
p.send()
}()
}
func (p *Connection) Close() {
close(p.chexit)
p.conn.Close()
p.wg.Wait()
}
func (p *Connection) Query(data []byte) (res []byte, err error) {
var ch chan []byte
select {
case <-p.chexit:
return nil, ERR_EXIT
case ch = <-p.chch:
defer func() {
p.chch <- ch
}()
}
id := p.newIdentity()
p.ih.SetIdentity(data, id)
p.addApplicant(id, ch)
defer func() {
if err != nil {
p.popApplicant(id)
}
}()
if err := p.Write(data); err != nil {
return nil, err
}
select {
case <-p.chexit:
return nil, ERR_EXIT
case res = <-ch:
break
}
return res, nil
}
func (p *Connection) Reply(query, answer []byte) error {
// put back the identity attached to the query
id := p.ih.GetIdentity(query)
p.ih.SetIdentity(answer, id)
return p.Write(answer)
}
func (p *Connection) Write(data []byte) error {
select {
case <-p.chexit:
return ERR_EXIT
case p.chsend <- data:
break
}
return nil
}
func (p *Connection) send() {
for {
select {
case <-p.chexit:
return
case data := <-p.chsend:
if p.conn.Write(data) != nil {
return
}
}
}
}
func (p *Connection) recv() (err error) {
defer func() {
if err != nil {
select {
case <-p.chexit:
err = nil
default:
p.eh.OnError(err)
}
}
}()
for {
select {
case <-p.chexit:
return nil
default:
break
}
data, err := p.conn.Read()
if err != nil {
return err
}
if id := p.ih.GetIdentity(data); id > 0 {
ch, ok := p.popApplicant(id)
if ok {
ch <- data
continue
}
}
p.dh.Process(data)
}
return nil
}
func (p *Connection) newIdentity() uint32 {
return atomic.AddUint32(&p.identity, 1)
}
func (p *Connection) addApplicant(identity uint32, ch chan []byte) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.applicants[identity] = ch
}
func (p *Connection) popApplicant(identity uint32) (chan []byte, bool) {
p.mutex.Lock()
defer p.mutex.Unlock()
ch, ok := p.applicants[identity]
if !ok {
return nil, false
}
delete(p.applicants, identity)
return ch, true
}
- PhalGo-初识PhalGO
- 【学术】如何在神经网络中选择正确的激活函数
- PhalGo-Echo路由
- PhalGo-介绍
- 数据库分库分表中间件 Sharding-JDBC 源码分析 —— 分布式主键
- [喵咪Golang(2)]安装和Helloworld
- LSTM的简单介绍,附情感分析应用
- 使用实体嵌入的结构化数据进行深度学习
- Otter-入门篇3(Node搭建)
- PhalGo-Respones
- 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写
- Otter-入门篇2(Manager安装配置)
- Java的字符串常量相关的一个问题
- [喵咪Liunx(3)]端口转发工具rinetd
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- CNS图表复现08—肿瘤单细胞数据第一次分群通用规则
- Gunicorn运行与配置方法
- 如何使用Linux文本操作命令ed进行提权nov5详解
- linux系统用户管理与grep正则表达式示例教程
- 在Linux中如何查找最大的10个文件方法汇总
- CNS图表复现09—上皮细胞可以区分为恶性与否
- CentOS 6/7环境下通过yum安装php7的方法
- Centos7.3服务器搭建LNMP环境的方法
- Linux中解除端口占用的方法
- ubuntu服务器环境下安装python的方法
- 带你入门Linux中size命令的6个例子
- 详解CentOS重启后resolv.conf被重置的解决方案
- scRNA-seq Clustering quality control(二)
- ubuntu环境下安装memcache及启动的方法
- Linux下批量修改服务器用户密码方法步骤