IO处理线程
时间:2022-04-22
本文章向大家介绍IO处理线程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
客户IO处理,是在工作线程,_WorkerThreadProc中完成的
函数,在完成端口上调用GetQueuedCompletionStatus函数等待IO完成,并调用自定义函数HandleIO来处理IO,具体代码如下:
DOWRD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
::OutputDebugString("Worker Thread startup...n");
#endif //_DEBUG
CIOCPServer *pThis = (CIOCPServer*)lpParam;
CIOCPBuffer *pBuffer;
DWORD dwKey;
DWORD dwTrans;
LPOVERLAPPED lpol;
while(TRUE)
{
//在关联到此完成端口的所有套接字上等待IO完成
BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,&dwTrans,(LPDWORD)&dwKey,(LPOVERLAPPED)&lpol,WSA_INFINITE);
if(dwTrans == -1)
{
#ifdef _DEBUG
::OutputDebugString("Worker Thread startup...n");
#endif //_DEBUG
::ExitThread(0);
}
pBuffer=CONTAINING_RECORD(lpol,CIOCPBuffer,ol);
int nError = NO_ERROR;
if(!bOK)
{
SOCKET s;
if(pBuffer->nOperation == OP_ACCEPT)
{
s=pThis->m_sListen;
}
else
{
if(dwKey == 0)
break;
s=((CIOCPContext*)dwKey)->s;
}
DWORD dwFlags = 0;
if(!::WSAGetOverlappedResult(s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags))
{
nError = ::WSAGetLastError();
}
}
pThis->HandleIO(dwKey,pBuffer,dwTrans,nError);
}
#ifdef _DEBUG
::OutputDebugString("Worker Thread out...n");
#endif //_DEBUG
return 0;
}
SendText成员函数用于在连接上发送数据,执行时先申请一个缓冲区对象,把用户将要发送的数据复制到里面,然后调用postSend成员函数投递这个缓冲区对象
BOOL CIOCPServer::SendText(CIOCPContext *pContext,char *pszText,int nLen)
{
CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
if(pBuffer != NULL)
{
memcpy(pBuffer->buff,pszText,nLen);
return PostSend(pContext,pBuffer);
}
return FALSE;
}
下面的HandleIO函数是关键,
处理完成的IO,投递新的IO请求,释放完成的缓冲区对象,关闭客户上下文对象
下面是主要的实现代码:
void CIOCPServer::HandleIO(DWORD dwKey,CIOCPBuffer *pBuffer,DOWRD dwTrans,int nError)
{
CIOCPContext *pContext = (CIOCPContext*)dwKey;
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..n");
#endif //_DEBUG
//减少套接字未决IO计数
if(pContext!=NULL)
{
::EnterCriticalSection(&pContext->Lock);
if(pBuffer->nOperation == OP_READ)
pContext->nOutstandingRecv--;
else if(pBuffer->nOperation == OP_WRITE)
pContext->nOutstandingSend--;
::LeaveCriticalSection(&pContext->Lock);
//检查套接字是否已经打开
if(pContext->bClosing)
{
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..n");
#endif //_DEBUG
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
//释放已关闭套接字的未决IO
ReleaseBuffer(pBuffer);
return;
}
}
else
{
RemovePendingAccept(pBuffer);
}
//检查套接字上发生的错误,然后直接关闭套接字
if(nError!=NO_ERROR)
{
if(pBuffer->nOperation != OP_ACCEPT)
{
OnConnectionError(pContext,pBuffer,nError);
CloseAConnection(pContext);
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..n");
#endif //_DEBUG
}
else//在监听套接字上发生错误
{
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..n");
#endif //_DEBUG
}
ReleaseBuffer(pBuffer);
return;
}
//开始处理
if(pBuffer->nOperation == OP_ACCEPT)
{
if(dwTrans == 0)
{
#ifdef _DEBUG
::OutputDebugString("HandleIO startup..n");
#endif //_DEBUG
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
else
{
//为接收新连接的申请客户上下文对象
CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
if(pClient != NULL)
{
if(AddAConnection(pCliebt))
{
//取得用户地址
int nLocalLen,nRmoteLen;
m_lpfnGetAcceptExSockaddrs(
pBuffer->buff,
pBuffer->nLen-((sizeof(sockaddr_in)+16)*2),
sizeof(sockaddr_in)+16,
sizeof(sockaddr_in)+16,
(SOCKADDR **)&pLocalAddr,
&nLocalLen,
(SOCKADDR **)&pRemoteAddr,
&nRmoteLen);
memcpy(&pClient->addrLocal,pLocalAddr,nLocalLen);
memcpy(&pClient->addrRemote,pRemoteAddr,nRmoteLen);
//关联新连接到完成端口对象
::CreateIoCompletionPort((HANDLE)pClient->s,m_hCompletion,(DWORD)pClient,0);
//通知用户
pBuffer->nLen = dwTrans;
OnConnectionEstablished(pClient,pBuffer);
//向新连接投递Read请求
for(int i=0;i<5;i++)
{
CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
if(p != NULL)
{
if(!PostRecv(pClient,p))
{
CloseAConnection(pClient);
break;
}
}
}
}
else
{
CloseAConnection(pClient);
ReleaseContext(pClient);
}
}
else
{
//资源不足,关闭与客户的连接即可
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
//Accept请求完成,释放IO缓冲区
ReleaseBuffer(pBuffer);
//通知监听线程继续再投递一个Accept请求
::InterlockedDecrement(&m_nRepostCount);
::SetEvent(m_hRepostEvent);
}
else if(pBuffer->nOperation == OP_READ)
{
if(dwTrans == 0)
{
//先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext,pBuffer);
//再关闭连接
CloseAConnection(pContext);
//释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
ReleaseBuffer(pBuffer);
}
else
{
pBuffer->nLen = dwTrans;
//按照IO投递的顺序读取接收到的数据
CIOCPBuffer *p = GetNextReadBuffer(pContext,pBuffer);
while(p!=NULL)
{
OnReadCompleted(pContext,p);
//增加要读的序列号的值
::InterlockedDecrement((LONG*)pContext->nCurrentReadSequence);
//释放IO
ReleaseBuffer(p);
p = GetNextReadBuffer(pContext,NULL);
}
//继续投递一个新的请求
pBuffer = AllocateBuffer(BUFFER_SIZE);
if(pBuffer==NULL || !PostRecv(pContext,pBuffer))
{
CloseAConnection(pContext);
}
}
}
else if(pBuffer->nOperation == OP_WRITE)
{
if(dwTrans == 0)
{
//先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext,pBuffer);
//再关闭连接
CloseAConnection(pContext);
//释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
ReleaseBuffer(pBuffer);
}
else
{
//写操作完成,通知用户
pBuffer->nLen = dwTrans;
OnWriteCompleted(pContext,pBuffer);
//释放SendText函数申请缓冲区
ReleaseBuffer(pBuffer);
}
}
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 从头创建您自己的vue.js——第4部分(构建反应性)
- Oracle 数据库-服务器端字符集查看方法
- Pywinauto 应用后端类型选择错误:AttributeError: 'NoneType' object has no attribute 'backend'. 原因及解决办法
- 恕我直言你可能真的不会java第11篇-Stream API终端操作
- Python+selenium 自动化-滚动的使用方法,如何滚动到元素的位置
- 恕我直言你可能真的不会java第10篇-集合元素归约
- Python+selenium 自动化-模拟键盘输入、点击操作,如何查看所支持的全部键位名称
- Uber为什么放弃Postgres选择迁移到MySQL?
- BAT 批处理命令 - 文件批量复制、克隆功能实例演示
- 【35期】谈谈你对Java线程之间通信方式的理解
- mac 技术篇-修改hosts文件,hosts文件位置
- 一行能装逼的 JavaScript 代码
- python 技术篇-时间戳的获取,记录程序处理时间
- 基于SpringBoot AOP面向切面编程实现Redis分布式锁
- Python+Selenium 技巧篇-svg标签内元素的xpath定位方式