Chinaunix首页 | 论坛 | 博客
  • 博客访问: 6660575
  • 博文数量: 1159
  • 博客积分: 12444
  • 博客等级: 上将
  • 技术积分: 12570
  • 用 户 组: 普通用户
  • 注册时间: 2008-03-13 21:34
文章分类

全部博文(1159)

文章存档

2016年(126)

2015年(350)

2014年(56)

2013年(91)

2012年(182)

2011年(193)

2010年(138)

2009年(23)

分类: C/C++

2015-12-04 15:56:52



chat_server.h

点击(此处)折叠或打开

  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <stdlib.h>
  4. #include <unistd.h>
  5. #include <sys/time.h>
  6. #include <sys/socket.h>
  7. #include <sys/types.h>
  8. #include <sys/select.h>
  9. #include <netinet/in.h>
  10. #include <arpa/inet.h>
  11. #include <sys/epoll.h>
  12. #include <fcntl.h>
  13. #include <netinet/tcp.h>
  14. #include <pthread.h>
  15. #include <errno.h>
  16. #include <assert.h>
  17. #define BUFSIZE 65535

  18. //#define PORT 9002
  19. #define SERVER_PORT 11111
  20. #define SERVER_HOST "127.0.0.1"

  21. struct {
  22.     int len;
  23.     char buf[65535];
  24. } TcpPacket;

  25. typedef struct worker {
  26.     void *(*process) (void *arg, char *buf);
  27.     void *arg;
  28.     char buf[255];
  29.     struct worker *next;
  30. } CThread_worker;

  31. typedef struct param {
  32.     int *epoll_fd;
  33.     int *accept_fd;
  34.     int *maxfd;
  35.     struct epoll_event *ev;
  36. } CThread_param;

  37. typedef struct {
  38.     pthread_mutex_t queue_lock;
  39.     pthread_cond_t queue_ready;
  40.     CThread_worker *queue_head;
  41.     int shutdown;
  42.     pthread_t *threadid;
  43.     int max_thread_num;
  44.     int cur_queue_size;
  45. } CThread_pool;

  46. void *myprocess(void *arg, char *buf);
  47. void pool_init(int);
  48. int pool_add_worker(void *(*process) (void *arg, char *buf), void *arg);
  49. void *thread_routine(void *arg);

  50. static CThread_pool *pool = NULL;

  51. int buildConnect();
  52. long int fd_A[5] = { 0, 0, 0, 0 };    //because fd_A maybe flow

  53. int acceptSelectClient(int);

  54. int acceptEpollClient(int);
  55. int set_noblocking(int);
  56. int readn(int, char *, size_t);
  57. int readrec(int, char *, size_t);
  58. //int pthread_test();
  59. //void* thread_fun(void*);
  60. //int reicveData();

  61. //int toQueue();

  62. //int dealData();

  63. //int sendData();


chat_server.c

点击(此处)折叠或打开

  1. #include "chat_server.h"
  2. int main()
  3. {
  4.     int fd;
  5.     pool_init(3);
  6.     fd = buildConnect();
  7.     //acceptSelectClient(fd);
  8.     acceptEpollClient(fd);
  9.     return 0;
  10. }

  11. int buildConnect()
  12. {
  13.     struct sockaddr_in server;
  14.     int listen_fd;
  15.     if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
  16.         return -1;
  17.     }

  18.     server.sin_family = AF_INET;
  19.     server.sin_port = htons(SERVER_PORT);
  20.     server.sin_addr.s_addr = inet_addr(SERVER_HOST);

  21.     if (bind(listen_fd, (struct sockaddr *)&server, sizeof(server)) < 0) {
  22.         return -1;
  23.     }
  24.     listen(listen_fd, 5);
  25.     printf("server start listen....\r\n");
  26.     return listen_fd;

  27. }

  28. int set_nonblocking(int listen_fd)
  29. {
  30.     if (fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFD, 0) | O_NONBLOCK) == -1) {
  31.         return -1;
  32.     }
  33.     return 0;
  34. }

  35. /*
  36. void * thread_fun(void *arg)
  37. {
  38.     pid_t pid;
  39.     pthread_t tid;
  40.     pid = getpid();
  41.     tid = pthread_self();
  42.     printf(" pid %u tid %u (0x%x)\n" ,(unsigned int)pid, (unsigned int )tid , (unsigned int ) tid);    
  43.     return ((void *)1);
  44. }
  45. int pthread_test()
  46. {
  47.     int err;
  48.     pthread_t tid;
  49.     err = pthread_create(&tid, NULL , thread_fun, NULL);
  50.     //err = pthread_join(tid , NULL);
  51.     return 0;

  52. }
  53. */
  54. int readn(int accept_fd, char *buf, size_t len)
  55. {
  56.     int cnt;
  57.     int rc;
  58.     cnt = len;
  59.     while (cnt > 0) {
  60.         rc = recv(accept_fd, buf, cnt, 0);
  61.         if (rc < 0) {
  62.             return -1;
  63.         }
  64.         if (rc == 0)
  65.             return len - cnt;
  66.         buf += rc;
  67.         cnt -= rc;
  68.     }
  69.     return len;
  70. }

  71. int readrec(int accept_fd, char *buf, size_t len)
  72. {
  73.     int reclen;
  74.     int rc;
  75.     rc = readn(accept_fd, (char *)&reclen, sizeof(reclen));
  76.     if (rc != sizeof(reclen))
  77.         return rc < 0 ? -1 : 0;
  78.     reclen = ntohl(reclen);

  79.     if (reclen > len) {
  80.         while (reclen > 0) {
  81.             rc = readn(accept_fd, buf, len);
  82.             if (rc != len)
  83.                 return rc < 0 ? -1 : 0;
  84.             reclen -= len;
  85.             if (reclen < len)
  86.                 len = reclen;
  87.         }
  88.     }
  89.     rc = readn(accept_fd, buf, reclen);
  90.     if (rc != reclen)
  91.         return rc < 0 ? -1 : 0;
  92.     return rc;
  93. }

  94. int acceptEpollClient(int listen_fd)
  95. {
  96.     int epoll_fd;
  97.     int maxfd;
  98.     int ret;
  99.     int accept_fd;
  100.     int i;
  101.     struct sockaddr_in client;
  102.     struct epoll_event ev;
  103.     struct epoll_event events[5];
  104.     //int len = sizeof(client);
  105.     socklen_t len = sizeof(client);
  106.     char buf[255];
  107.     if (set_nonblocking(listen_fd) < 0) {
  108.         printf("fcntl error\r\n");
  109.         return -1;
  110.     }
  111.     epoll_fd = epoll_create(5);
  112.     if (epoll_fd == 0) {
  113.         printf("create error\r\n");
  114.         return -1;
  115.     }
  116.     ev.events = EPOLLIN | EPOLLET;
  117.     ev.data.fd = listen_fd;
  118.     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) {
  119.         printf("epoll add error\r\n");
  120.         return -1;
  121.     } else {
  122.         printf("listen id %d\r\n", (int)listen_fd);
  123.     }
  124.     //set_keep_alive(listen_fd);
  125.     maxfd = 1;
  126.     while (1) {
  127.         ret = epoll_wait(epoll_fd, events, maxfd, -1);

  128.         if (ret == -1) {
  129.             printf("wait\r\n");
  130.             return -1;
  131.         }
  132.         for (i = 0; i < ret; i++) {
  133.             if (events[i].data.fd == listen_fd) {
  134.                 accept_fd = accept(listen_fd, (struct sockaddr *)&client, &len);
  135.                 if (accept_fd < 0) {
  136.                     printf("accetp error\r\n");
  137.                     continue;
  138.                 }
  139.                 set_nonblocking(accept_fd);
  140.                 ev.events = EPOLLIN | EPOLLET;
  141.                 ev.data.fd = accept_fd;
  142.                 if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_fd, &ev) < 0) {
  143.                     printf("epoll add error\r\n");
  144.                     return -1;
  145.                 } else {
  146.                     printf("new client %d\r\n", (int)accept_fd);

  147.                     maxfd++;
  148.                 }
  149.             } else if (events[i].events & EPOLLIN) {
  150.                 CThread_param *CPThread_param = (CThread_param *) malloc(sizeof(CThread_param));
  151.                 CPThread_param->epoll_fd = &epoll_fd;
  152.                 CPThread_param->accept_fd = &(events[i].data.fd);
  153.                 CPThread_param->ev = &ev;
  154.                 CPThread_param->maxfd = &maxfd;

  155.                 pool_add_worker(myprocess, CPThread_param);
  156.                 /*
  157.                  while(1)
  158.                  {
  159.                  ret = readrec(events[i].data.fd, buf,sizeof(buf));
  160.                  // ret = recv(events[i].data.fd, buf, sizeof(buf),0);
  161.                  if(ret == -1)
  162.                  {
  163.                  if(errno == EAGAIN)
  164.                  {
  165.                  break;

  166.                  }

  167.                  break;
  168.                  }
  169.                  else if(ret == 0)
  170.                  {
  171.                  printf("client[%d] close\n",i);
  172.                  epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd,&ev);
  173.                  maxfd--;
  174.                  break;

  175.                  }
  176.                  else
  177.                  {

  178.                  buf[ret] = '\0';
  179.                  printf("client send %s\r\n", buf);

  180.                  send(events[i].data.fd,buf, sizeof(buf) , 0);
  181.                  pthread_test();
  182.                  }
  183.                  } */
  184.             }

  185.         }
  186.     }
  187.     close(listen_fd);
  188.     return 0;

  189. }

  190. // select block way
  191. int acceptSelectClient(int listen_fd)
  192. {
  193.     struct sockaddr_in client;
  194.     int accept_fd;
  195.     int maxfd;
  196.     struct timeval tv;
  197.     fd_set select_fd;
  198.     int ret;
  199.     int i;
  200.     //int len = sizeof(client);
  201.     socklen_t len = sizeof(client);
  202.     int amount = 0;
  203.     char buf[256];

  204.     maxfd = listen_fd;

  205.     while (1) {
  206.         FD_ZERO(&select_fd);
  207.         FD_SET(listen_fd, &select_fd);
  208.         tv.tv_sec = 30;
  209.         tv.tv_usec = 0;
  210.         for (i = 0; i < 5; i++) {
  211.             if (fd_A[i] != 0) {
  212.                 FD_SET(fd_A[i], &select_fd);
  213.             }
  214.         }
  215.         ret = select(maxfd + 1, &select_fd, NULL, NULL, &tv);
  216.         if (ret < 0) {
  217.             return -1;
  218.         } else if (ret == 0) {
  219.             printf("timeout\r\n");
  220.             continue;
  221.         }
  222.         for (i = 0; i < amount; i++) {
  223.             if (FD_ISSET(fd_A[i], &select_fd)) {
  224.                 ret = recv(fd_A[i], buf, sizeof(buf), 0);
  225.                 if (ret <= 0) {
  226.                     printf("client[%d] close\n", i);
  227.                     close(fd_A[i]);
  228.                     FD_CLR(fd_A[i], &select_fd);
  229.                     fd_A[i] = 0;

  230.                 } else {
  231.                     buf[ret] = '\0';
  232.                     printf("client send %s\r\n", buf);
  233.                 }
  234.             }
  235.         }
  236.         if (FD_ISSET(listen_fd, &select_fd)) {
  237.             printf("new connect come on \r\n");
  238.             accept_fd = accept(listen_fd, (struct sockaddr *)&client, &len);
  239.             if (accept_fd <= 0) {
  240.                 printf("accept error\r\n");
  241.                 continue;
  242.             }

  243.             if (amount < 5) {
  244.                 fd_A[amount++] = accept_fd;
  245.                 if (accept_fd > maxfd)
  246.                     maxfd = accept_fd;
  247.             } else {
  248.                 printf("max connects arrive\r\n");
  249.             }
  250.         }

  251.     }
  252.     for (i = 0; i < 5; i++) {
  253.         if (fd_A[i] != 0) {
  254.             close(fd_A[i]);
  255.         }
  256.     }
  257.     return 0;
  258. }

  259. void pool_init(int max_thread_num)
  260. {
  261.     pool = (CThread_pool *) malloc(sizeof(CThread_pool));
  262.     pthread_mutex_init(&(pool->queue_lock), NULL);
  263.     pthread_cond_init(&(pool->queue_ready), NULL);
  264.     pool->queue_head = NULL;
  265.     pool->max_thread_num = max_thread_num;
  266.     pool->cur_queue_size = 0;
  267.     pool->shutdown = 0;
  268.     pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
  269.     int i = 0;
  270.     for (i = 0; i < max_thread_num; i++) {
  271.         pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);
  272.     }
  273. }

  274. int pool_add_worker(void *(*process) (void *arg, char *buf), void *arg)
  275. {

  276.     CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));
  277.     newworker->process = process;
  278.     newworker->arg = arg;
  279.     newworker->next = NULL;
  280.     pthread_mutex_lock(&(pool->queue_lock));

  281.     CThread_worker *member = pool->queue_head;
  282.     if (member != NULL) {
  283.         while (member->next != NULL)
  284.             member = member->next;
  285.         member->next = newworker;
  286.     } else {
  287.         pool->queue_head = newworker;
  288.     }
  289.     assert(pool->queue_head != NULL);
  290.     pool->cur_queue_size++;
  291.     printf("cur_____%d\r\n", pool->cur_queue_size);
  292.     pthread_mutex_unlock(&(pool->queue_lock));

  293.     pthread_cond_signal(&(pool->queue_ready));
  294.     return 0;
  295. }

  296. int pool_destroy()
  297. {
  298.     if (pool->shutdown)
  299.         return -1;
  300.     pool->shutdown = 1;

  301.     pthread_cond_broadcast(&(pool->queue_ready));

  302.     int i;
  303.     for (i = 0; i < pool->max_thread_num; i++)
  304.         pthread_join(pool->threadid[i], NULL);
  305.     free(pool->threadid);

  306.     CThread_worker *head = NULL;
  307.     while (pool->queue_head != NULL) {
  308.         head = pool->queue_head;
  309.         pool->queue_head = pool->queue_head->next;
  310.         free(head);
  311.     }

  312.     pthread_mutex_destroy(&(pool->queue_lock));
  313.     pthread_cond_destroy(&(pool->queue_ready));

  314.     free(pool);

  315.     pool = NULL;
  316.     return 0;
  317. }

  318. void *thread_routine(void *arg)
  319. {
  320.     printf("starting thread 0x%x\n", (unsigned int)pthread_self());
  321.     while (1) {
  322.         pthread_mutex_lock(&(pool->queue_lock));

  323.         while (pool->cur_queue_size == 0 && !pool->shutdown) {
  324.             printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
  325.             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
  326.         }

  327.         if (pool->shutdown) {

  328.             pthread_mutex_unlock(&(pool->queue_lock));
  329.             printf("thread 0x%x will exit\n", (unsigned int)pthread_self());
  330.             pthread_exit(NULL);
  331.         }
  332.         printf("thread 0x%x is starting to work\n", (unsigned int)pthread_self());

  333.         assert(pool->cur_queue_size != 0);
  334.         assert(pool->queue_head != NULL);

  335.         pool->cur_queue_size--;
  336.         CThread_worker *worker = pool->queue_head;
  337.         pool->queue_head = worker->next;
  338.         pthread_mutex_unlock(&(pool->queue_lock));
  339.         printf("start process...\r\n");
  340.         (*(worker->process)) (worker->arg, worker->buf);
  341.         free(worker);
  342.         worker = NULL;
  343.     }

  344.     pthread_exit(NULL);
  345. }

  346. void *myprocess(void *arg, char *buf)
  347. {
  348.     int ret;
  349.     char buf1[255];
  350.     struct param *PP_param;
  351.     PP_param = (CThread_param *) arg;
  352.     printf("myprocess start...\r\n");
  353.     while (1) {
  354.         //printf("fd_%d\r\n",*(PP_param->accept_fd));
  355.         ret = readrec(*(PP_param->accept_fd), buf1, sizeof(buf1));
  356.         // ret = recv(events[i].data.fd, buf, sizeof(buf),0);
  357.         if (ret == -1) {
  358.             if (errno == EAGAIN) {
  359.                 break;

  360.             }

  361.             break;
  362.         } else if (ret == 0) {
  363.             epoll_ctl(*(PP_param->epoll_fd), EPOLL_CTL_DEL, *(PP_param->accept_fd), PP_param->ev);
  364.             *(PP_param->maxfd)--;
  365.             break;

  366.         } else {

  367.             buf1[ret] = '\0';
  368.             printf("client send %s\r\n", buf1);

  369.             printf("threadid is 0x%x\n", (unsigned int)pthread_self());

  370.             send(*(PP_param->accept_fd), buf1, sizeof(buf1), 0);
  371.         }

  372.     }

  373.     return NULL;
  374. }


chat_client.c

点击(此处)折叠或打开

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <netdb.h>
  6. #include <sys/socket.h>
  7. #include <sys/types.h>
  8. #include <netinet/in.h>
  9. #include <arpa/inet.h>
  10. #include <sys/time.h>

  11. #define BUFSIZE 8192
  12. #define SERVER_PORT 11111
  13. #define SERVER_HOST "127.0.0.1"

  14. struct {
  15.     int len;
  16.     char buf[65535];
  17. } TcpPacket;
  18. enum { CMD_NAME, DST_IP, DST_PORT };

  19. int main(int argc, char *argv[])
  20. {
  21.     struct sockaddr_in server;
  22.     unsigned long dst_ip;
  23.     int port;
  24.     int s;
  25.     int n;
  26.     char buf[BUFSIZE];
  27.     char buf1[BUFSIZE];
  28.     char cmd[BUFSIZE];
  29.     struct timeval tv;
  30.     fd_set readfd;

  31.     port = SERVER_PORT;
  32.     dst_ip = inet_addr(SERVER_HOST);
  33.     if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
  34.         perror("sokcet error");
  35.         exit(-1);
  36.     }
  37.     memset((char *)&server, 0, sizeof(server));
  38.     server.sin_family = AF_INET;
  39.     server.sin_addr.s_addr = dst_ip;
  40.     server.sin_port = htons(port);

  41.     if (connect(s, (struct sockaddr *)&server, sizeof(server)) < 0) {
  42.         perror("connect error");
  43.         exit(-1);
  44.     }
  45.     while (1) {
  46.         tv.tv_sec = 600;
  47.         tv.tv_usec = 0;
  48.         FD_ZERO(&readfd);
  49.         FD_SET(0, &readfd);
  50.         FD_SET(s, &readfd);
  51.         if ((select(s + 1, &readfd, NULL, NULL, &tv)) < 0) {
  52.             perror("timeout");
  53.             break;
  54.         }
  55.         if (FD_ISSET(0, &readfd)) {
  56.             if ((n = read(0, TcpPacket.buf, sizeof(TcpPacket.buf))) <= 0)
  57.                 break;
  58.             n = strlen(TcpPacket.buf);
  59.             // TcpPacket.buf[n-1] = '\0';
  60.             TcpPacket.len = ntohl(n);
  61.             if (send(s, (char *)&TcpPacket, n + sizeof(TcpPacket.len), 0) <= 0)
  62.                 break;
  63.         }
  64.         if (FD_ISSET(s, &readfd)) {
  65.             if ((n = recv(s, buf1, BUFSIZE - 1, 0)) <= 0) {
  66.                 perror("recv error!\r\n");
  67.                 exit(-1);
  68.             }
  69.             buf1[n] = '\0';
  70.             printf("%s", buf1);
  71.             fflush(stdout);
  72.         }
  73.     }
  74.     close(s);
  75.     return 0;
  76. }

g++ -lpthread chat_server.c -o chat_server

g++ -lpthread chat_client.c -o chat_client




阅读(1019) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~