(八)高性能服务器架构设计总结4——以flamigo服务器代码为例

时间:2022-05-07
本文章向大家介绍(八)高性能服务器架构设计总结4——以flamigo服务器代码为例,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

二、架构篇

一个项目的服务器端往往由很多服务组成,就算单个服务在性能上做到极致,支持的并发数量也是有限的,举个简单的例子,假如一个聊天服务器,每个用户的信息是1k,那对于一个8G的内存的机器,在不考虑其它的情况下8*1024*1024*1024 / 100 = 1024,实际有838万,但实际这只是非常理想的情况。所以我们有时候需要需要某个服务部署多套,就单个服务的实现来讲还是《框架篇》中介绍的。我们举个例子:

这是蘑菇街TeamTalk的服务器架构。MsgServer是聊天服务,可以部署多套,每个聊天服务器启动时都会告诉loginSever和routeSever自己的ip地址和端口号,当有用户上下或者下线的时候,MsgServer也会告诉loginSever和routeSever自己上面最新的用户数量和用户id列表。现在一个用户需要登录,先连接loginServer,loginServer根据记录的各个MsgServer上的用户情况,返回一个最小负载的MsgServer的ip地址和端口号给客户端,客户端再利用这个ip地址和端口号去登录MsgServer。当聊天时,位于A MsgServer上的用户给另外一个用户发送消息,如果该用户不在同一个MsgServer上,MsgServer将消息转发给RouteServer,RouteServer根据自己记录的用户id信息找到目标用户所在的MsgServer并转发给对应的MsgServer。

上面是分布式部署的一个例子。我们再来看另外一个例子,这个例子是单个服务的策略,实际服务器在处理网络数据的时候,如果同时有多个socket上有数据要处理,可能会出现一直服务前几个socket,直到前几个socket处理完毕后再处理后面几个socket的数据。这就相当于,你去饭店吃饭,大家都点了菜,但是有些桌子上一直在上菜,而有些桌子上一直没有菜。这样肯定不好,我们来看下如何避免这种现象:

int CFtdEngine::HandlePackage(CFTDCPackage *pFTDCPackage, CFTDCSession *pSession)  
{  
    //NET_IO_LOG0("CFtdEngine::HandlePackagen");  
    FTDC_PACKAGE_DEBUG(pFTDCPackage);  
  
    if (pFTDCPackage->GetTID() != FTD_TID_ReqUserLogin)  
    {  
        if (!IsSessionLogin(pSession->GetSessionID()))  
        {  
            SendErrorRsp(pFTDCPackage, pSession, 1, "客户未登录");  
            return 0;  
        }  
    }  
      
    CalcFlux(pSession, pFTDCPackage->Length());  //统计流量  
  
    REPORT_EVENT(LOG_DEBUG, "Front/Fgateway", "登录请求%0x", pFTDCPackage->GetTID());   
      
    int nRet = 0;  
    switch(pFTDCPackage->GetTID())   
    {  
      
    case FTD_TID_ReqUserLogin:  
        ///huwp:20070608:检查过高版本的API将被禁止登录  
        if (pFTDCPackage->GetVersion()>FTD_VERSION)  
        {  
            SendErrorRsp(pFTDCPackage, pSession, 1, "Too High FTD Version");  
            return 0;  
        }  
        nRet = OnReqUserLogin(pFTDCPackage, (CFTDCSession *)pSession);  
        FTDRequestIndex.incValue();  
        break;  
    case FTD_TID_ReqCheckUserLogin:  
        nRet = OnReqCheckUserLogin(pFTDCPackage, (CFTDCSession *)pSession);  
        FTDRequestIndex.incValue();  
        break;  
    case FTD_TID_ReqSubscribeTopic:  
        nRet = OnReqSubscribeTopic(pFTDCPackage, (CFTDCSession *)pSession);  
        FTDRequestIndex.incValue();  
        break;    
    }  
      
    return 0;  
}  

当有某个socket上有数据可读时,接着接收该socket上的数据,对接收到的数据进行解包,然后调用CalcFlux(pSession, pFTDCPackage->Length())进行流量统计:

void CFrontEngine::CalcFlux(CSession *pSession, const int nFlux)  
{  
    TFrontSessionInfo *pSessionInfo = m_mapSessionInfo.Find(pSession->GetSessionID());  
    if (pSessionInfo != NULL)  
    {  
        //流量控制改为计数  
        pSessionInfo->nCommFlux ++;   
        ///若流量超过规定,则挂起该会话的读操作  
        if (pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux)  
        {  
            pSession->SuspendRead(true);  
        }  
    }  
}  

该函数会先让某个连接会话(Session)处理的包数量递增,接着判断是否超过最大包数量,则设置读挂起标志:

void CSession::SuspendRead(bool bSuspend)  
{  
    m_bSuspendRead = bSuspend;  
}  

这样下次将会从检测的socket列表中排除该socket:

void CEpollReactor::RegisterIO(CEventHandler *pEventHandler)  
{  
    int nReadID, nWriteID;  
    pEventHandler->GetIds(&nReadID, &nWriteID);  
    if (nWriteID != 0 && nReadID ==0)  
    {  
        nReadID = nWriteID;  
    }  
    if (nReadID != 0)  
    {  
        m_mapEventHandlerId[pEventHandler] = nReadID;  
        struct epoll_event ev;  
        ev.data.ptr = pEventHandler;  
        if(epoll_ctl(m_fdEpoll, EPOLL_CTL_ADD, nReadID, &ev) != 0)  
        {  
            perror("epoll_ctl EPOLL_CTL_ADD");  
        }  
    }  
}  
void CSession::GetIds(int *pReadId, int *pWriteId)  
{  
    m_pChannelProtocol->GetIds(pReadId,pWriteId);  
    if (m_bSuspendRead)  
    {  
        *pReadId = 0;  
    }  
}  

也就是说不再检测该socket上是否有数据可读。然后在定时器里1秒后重置该标志,这样这个socket上有数据的话又可以重新检测到了:

const int SESSION_CHECK_TIMER_ID    = 9;  
const int SESSION_CHECK_INTERVAL    = 1000;  
SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL);  
void CFrontEngine::OnTimer(int nIDEvent)  
{  
    if (nIDEvent == SESSION_CHECK_TIMER_ID)  
    {  
        CSessionMap::iterator itor = m_mapSession.Begin();  
        while (!itor.IsEnd())  
        {  
            TFrontSessionInfo *pFind = m_mapSessionInfo.Find((*itor)->GetSessionID());  
            if (pFind != NULL)  
            {  
                CheckSession(*itor, pFind);  
            }  
            itor++;  
        }  
    }  
}  
  
void CFrontEngine::CheckSession(CSession *pSession, TFrontSessionInfo *pSessionInfo)  
{  
    ///重新开始计算流量  
    pSessionInfo->nCommFlux -= pSessionInfo->nMaxCommFlux;  
    if (pSessionInfo->nCommFlux < 0)  
    {  
        pSessionInfo->nCommFlux = 0;  
    }  
    ///若流量超过规定,则挂起该会话的读操作  
    pSession->SuspendRead(pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux);
}

这就相当与饭店里面先给某一桌客人上一些菜,让他们先吃着,等上了一些菜之后不会再给这桌继续上菜了,而是给其它空桌上菜,大家都吃上后,继续回来给原先的桌子继续上菜。实际上我们的饭店都是这么做的。上面的例子是单服务流量控制的实现的一个非常好的思路,它保证了每个客户端都能均衡地得到服务,而不是一些客户端等很久才有响应。

另外加快服务器处理速度的策略可能就是缓存了,缓存实际上是以空间换取时间的策略。对于一些反复使用的,但是不经常改变的信息,如果从原始地点加载这些信息就比较耗时的数据(比如从磁盘中、从数据库中),我们就可以使用缓存。所以时下像redis、leveldb、fastdb等各种内存数据库大行其道。我在flamingo中用户的基本信息都是缓存在聊天服务程序中的,而文件服务启动时会去加载指定目录里面的所有程序名称,这些文件的名称都是md5,为该文件内容的md5。这样当客户端上传了新文件请求时,如果其传上来的文件md5已经位于缓存中,则表明该文件在服务器上已经存在,这个时候服务器就不必再接收该文件了,而是告诉客户端文件已经上传成功了。

说了这么多,一般来说,一个服务器的架构,往往更多取决于其具体的业务,我们要在结合当前的情况来实际去组织铺排,没有一套系统是万能的。多思考,多实践,多总结,相信很快你也能拥有很不错的架构能力。

鉴于笔者能力和经验有限,文中难免有错漏之处,欢迎提意见。 交流QQ群:49114021