为了成为自由自在的人而奋斗!
分类: C/C++
2014-12-23 13:07:31
epoll实现多个tcp client, 在主线程可以做一些操作, 比如通过输入选择哪个客户端发送指令到服务器。
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
int fd = 0;
struct epoll_event* m_events = NULL;
int g_MaxClientNum = MAXEVENTS;
int set_nonblock(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int COneEmlutor::RecvMsgCount = 0;
int COneEmlutor::InitCon()
{
struct sockaddr_in des_addr;
/*新增socket */
m_nSock = socket(AF_INET, SOCK_STREAM, 0);//创建socket
if (m_nSock < 0) {
printf("socket error\r\n");
return 0;
}
/* 设置连接目的地址 */
des_addr.sin_family = AF_INET;
des_addr.sin_port = htons(m_nSrvPort);
des_addr.sin_addr.s_addr = inet_addr(m_strSrvIp.c_str());
bzero(&(des_addr.sin_zero), 8);
/* 发送连接请求 */
if (connect(m_nSock, (struct sockaddr *)&des_addr, sizeof(struct sockaddr)) < 0) {
printf("connect failed\r\n");
return 0;
}
if( set_nonblock(m_nSock) )
{
printf("set_nonblock() failed");
return 0;
}
epoll_event_t ev;
ev.data.ptr=(void*)m_nSock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
if( epoll_ctl(fd, EPOLL_CTL_ADD, m_nSock, &ev)==-1)
{
printf("epoll_ctl(EPOLL_CTL_ADD) failed");
return 0;
}
m_nStatus = 1;
return 0;
}
void COneEmlutor::OnRecv(unsigned int nEvent)
{
int nRecvLen = 0;
char szBuf[2048];
char* pszBufUnParse = NULL;
LPPacket pPktHead = NULL;
unsigned short usOffset = 0;
if (0 == m_nStatus)
{
printf("COneEmlutor::OnRecv error, invalid link kkkkkkkkkkkkkkkkkkkkkkkkkk\n");
return;
}
if ((nEvent & EPOLLIN))
{
int nRtn;
unsigned short wDealSize = 0;
for (;;)
{
if (0 == m_nStatus)
{
break;
}
nRtn = ::recv(m_nSock, m_pRcvBuf + m_nUnParse, m_nBufRemain, 0);
if (nRtn > 0)
{
printf("Recv nRtn %d\n", nRtn);
// do something
}
else /*session disconnect*/
{
if((nRtn < 0) && (errno == EWOULDBLOCK || errno == EAGAIN))
{
printf("Read Next Time\n");
break;// Read nextTime
}
close(m_nSock);
printf("close socket\r\n");
m_nSock = -1;
m_nStatus = 0;
break;
}
}
}
}
stMapData g_stMapData;
MAPSOCKCLIENT g_stMapSockClient;
pthread_mutex_t g_sockClientMutex;
void* ThreadRun(void* argv)
{
int ret;
printf("ThreadRun run \r\n");
COneEmlutor *pMapTmp = NULL;
while (1)
{
ret = epoll_wait(fd, g_stMapData.pstEvent,g_MaxClientNum, 10000);
if (ret > 0)
{
if (g_stMapSockClient.empty())
{
continue;
}
for (int i = 0; i < ret; ++i)
{
//查找处理
pthread_mutex_lock(&g_sockClientMutex);
MAPSOCKCLIENT::iterator iterMap = g_stMapSockClient.find((long)(g_stMapData.pstEvent[i].data.ptr));
if (iterMap != g_stMapSockClient.end())
{
pMapTmp = (COneEmlutor*)(iterMap->second);
pMapTmp->OnRecv(g_stMapData.pstEvent[i].events);
printf("Recv sock %x\r\n", (long)(g_stMapData.pstEvent[i].data.ptr));
}
else
{
printf("No find sock %x\r\n", (long)(g_stMapData.pstEvent[i].data.ptr));
}
pthread_mutex_unlock(&g_sockClientMutex);
}
//printf("COneEmlutor::RecvMsgCount = %d EventNum = %d\r\n", COneEmlutor::RecvMsgCount, ret);
}
}
return NULL;
}
int main(int argc, char *argv[])
{
pthread_t thread;
struct sockaddr_in des_addr;
char sendmsg[MAXSIZE], recvmsg[MAXSIZE];
COneEmlutor *pCTmp = NULL;
int nSendSock = -1;
pthread_mutex_init((&g_sockClientMutex), NULL);
g_stMapSockClient.clear();
g_stMapData.pstEvent = NULL;
if (2 == argc)
{
g_MaxClientNum = atoi(argv[1]);
}
fd =epoll_create(g_MaxClientNum);
if (-1 == fd)
{
return 0;
}
g_stMapData.pstEvent=(epoll_event_t*)malloc(sizeof(epoll_event_t) * g_MaxClientNum);
if(!g_stMapData.pstEvent)
{
//close(fd);
return -2;
}
pthread_create(&(thread), NULL, ThreadRun, 0);
for (int i = 0; i < g_MaxClientNum; i++)
{
pCTmp = new COneEmlutor();
pCTmp->InitCon();
pthread_mutex_lock(&g_sockClientMutex);
g_stMapSockClient.insert(MAPSOCKCLIENT::value_type(pCTmp->m_nSock, pCTmp));
pthread_mutex_unlock(&g_sockClientMutex);
}
//使用socket[0] 发送
while (1) {
memset(sendmsg, 0, MAXSIZE);
memset(recvmsg, 0, MAXSIZE);
printf("client:\n");
printf("send from sock %d\r\n",pCTmp->m_nSock);
// scanf("%s", sendmsg);
gets(sendmsg);
if (!strcmp(sendmsg, "quit")) {
printf("quit success!\n");
break;
}
//send buf
...
memset(recvmsg, 0, MAXSIZE);
/*
if (recv(sockfd, recvmsg, MAXSIZE, 0) < 0) {//接收信息
printf("\nserver has no reply!\n");
close(sockfd);
}
printf("server:\n%s\n", recvmsg);
*/
}
//清空内存
MAPSOCKCLIENT::iterator iterMap;
pthread_mutex_lock(&g_sockClientMutex);
for (iterMap = g_stMapSockClient.begin(); iterMap != g_stMapSockClient.end(); iterMap++)
{
delete (COneEmlutor*)iterMap->second;
iterMap->second = NULL;
// g_stMapSockClient.erase(iterMap);
}
g_stMapSockClient.clear();
pthread_mutex_unlock(&g_sockClientMutex);
free(g_stMapData.pstEvent);
pthread_mutex_destroy((&g_sockClientMutex));
return 0;
}