Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2764209
  • 博文数量: 505
  • 博客积分: 1552
  • 博客等级: 上尉
  • 技术积分: 2514
  • 用 户 组: 普通用户
  • 注册时间: 2007-09-23 18:24
文章分类

全部博文(505)

文章存档

2019年(12)

2018年(15)

2017年(1)

2016年(17)

2015年(14)

2014年(93)

2013年(233)

2012年(108)

2011年(1)

2009年(11)

分类: LINUX

2014-07-21 08:59:23

linux下epoll+多线程,使用nosql数据库进行数据持久化存储

点击(此处)折叠或打开

  1. #include <sys/socket.h>
  2. #include <sys/epoll.h>
  3. #include <netinet/in.h>
  4. #include <arpa/inet.h>
  5. #include <fcntl.h>
  6. #include <unistd.h>
  7. #include <stdio.h>
  8. #include <pthread.h>
  9. #include <errno.h>
  10. #include <string.h>
  11. #include <tcbdb.h>

  12. #define PORT 8888
  13. #define MAXFDS 10000
  14. #define EVENTSIZE 5000
  15. #define THREADNUM 2
  16. #define BUFFERSIZE 4096
  17. #define REPLY_BUFFER "HTTP/1.1 200 OK/r/nConnection: close/r/nContent-Type: text/html/r/n/r/n"
  18. int epfd[THREADNUM];
  19. int THREADCOUNT;
  20. pthread_mutex_t LOCK;
  21. TCBDB *event_db;
  22. struct HTTP{
  23.   char name[ 256 ];
  24.   char charset[ 32 ];
  25.   char opt[ 8 ];
  26.   char data[ 1024 ];
  27.   char pos [ 16 ];
  28.   char num[ 16 ];
  29.   char content[ 1024 ];
  30. };
  31. void *serv_epoll(void *p);
  32. void setnonblocking(int fd)
  33. {
  34.   int opts;
  35.   opts=fcntl(fd, F_GETFL);
  36.   if (opts < 0)
  37.     {
  38.       fprintf(stderr, "fcntl failed/n");
  39.       return;
  40.     }
  41.   opts = opts | O_NONBLOCK;
  42.   if(fcntl(fd, F_SETFL, opts) < 0)
  43.     {
  44.       fprintf(stderr, "fcntl failed/n");
  45.       return;
  46.     }
  47.   return;
  48. }
  49. void get_itemdata( char *buffer, char *itemname, char *item )
  50. {
  51.     int count = strlen( itemname ),length = count + 1;
  52.     char *ptr;
  53.     if(ptr = strstr( buffer, itemname )){
  54.       while( (ptr[ count ] != ' ') && (ptr[ count ] != '&') && (ptr[ count ] != '/r') && (ptr[ count ] != '/0')){
  55.     item[ count - length ] = ptr[ count ];
  56.     count++;
  57.       }
  58.       item[ count - length + 1 ] = '/0';
  59.     }
  60. }
  61. void show_help(void)
  62. {
  63.         char *b = "--------------------------------------------------------------------------------------------------/n"
  64.                   "Message Queue Service" " (April 6, 2010)/n/n"
  65.                   "Author: Kang YuXiang , E-mail: kangyx2010@gmail.com/n"
  66.      "This is free software, and you are welcome to modify and redistribute it under the New BSD License/n"
  67.                   "/n"
  68.      //"-l interface to listen on, default is 0.0.0.0/n"
  69.      //"-p TCP port number to listen on (default: 1218)/n"
  70.      //"-x database directory (example: /opt/httpsqs/data)/n"
  71.      //"-t timeout for an http request (default: 3)/n"
  72.      //"-s the interval to sync updated contents to the disk (default: 5)/n"
  73.      //"-c the maximum number of non-leaf nodes to be cached (default: 1024)/n"
  74.      //"-m database memory cache size in MB (default: 100)/n"
  75.      //"-h print this help and exit/n/n"
  76.      // "Use command /"killall httpsqs/", /"pkill httpsqs/" and /"kill `cat /tmp/httpsqs.pid`/" to stop httpsqs./n"
  77.      // "Please note that don't use the command /"pkill -9 httpsqs/" and /"kill -9 PID of httpsqs/"!/n"
  78.                    "/n"
  79.                    "--------------------------------------------------------------------------------------------------/n"
  80.                    "/n";
  81.     fprintf(stderr, b, strlen(b));
  82. }

  83. int main(int argc, char *argv[])
  84. {
  85.     int fd, cfd,opt=1;
  86.     unsigned char i;
  87.     struct epoll_event ev;
  88.     struct sockaddr_in sin, cin;
  89.     socklen_t sin_len = sizeof(struct sockaddr_in);
  90.     pthread_t tid;
  91.     pthread_attr_t attr;
  92.     THREADCOUNT = 0;
  93.     show_help( );
  94.     event_db = tcbdbnew();
  95.     tcbdbtune(event_db, 1024, 2048, 50000000, 8, 10, BDBTLARGE);
  96.     tcbdbsetcache(event_db, 2048, 1024);
  97.     tcbdbsetxmsiz(event_db, 104857600);
  98.     /* open the database */
  99.     if(!tcbdbopen(event_db, "casket.tct", BDBOWRITER | BDBOCREAT)){
  100.       printf( "Attention: Unable to open the database./n/n");
  101.       exit( 1 );
  102.     }
  103.  
  104.     for( i = 0; i < THREADNUM; i++)
  105.     {
  106.     epfd[ i ] = epoll_create(MAXFDS);
  107.     pthread_attr_init(&attr);
  108.     pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
  109.     if (pthread_create(&tid, &attr, serv_epoll, NULL) != 0)
  110.      {
  111.      fprintf(stderr, "pthread_create failed/n");
  112.      return -1;
  113.      }
  114.     }
  115.     
  116.     
  117.     if ((fd = socket(AF_INET, SOCK_STREAM, 0)) <= 0)
  118.       {
  119.     fprintf(stderr, "socket failed/n");
  120.     return -1;
  121.       }
  122.     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt));
  123.     
  124.     memset(&sin, 0, sizeof(struct sockaddr_in));
  125.     sin.sin_family = AF_INET;
  126.     sin.sin_port = htons((short)(PORT));
  127.     sin.sin_addr.s_addr = INADDR_ANY;
  128.     if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) != 0)
  129.       {
  130.     fprintf(stderr, "bind failed/n");
  131.     return -1;
  132.       }
  133.     if (listen(fd, 32) != 0)
  134.       {
  135.     fprintf(stderr, "listen failed/n");
  136.     return -1;
  137.       }
  138.     
  139.     i = 0;
  140.     while ((cfd = accept(fd, (struct sockaddr *)&cin, &sin_len)) > 0)
  141.       {
  142.     setnonblocking(cfd);
  143.     ev.data.fd = cfd;
  144.     ev.events = EPOLLIN | EPOLLET;
  145.     epoll_ctl(epfd[i%THREADNUM], EPOLL_CTL_ADD, cfd, &ev);
  146.     i++;
  147.       }
  148.     
  149.     if (fd > 0)
  150.       close(fd);
  151.     return 0;
  152. }
  153. void *serv_epoll(void *p)
  154. {
  155.   int i, ret, cfd, nfds,head,tail;
  156.   struct epoll_event ev,events[EVENTSIZE];
  157.   char buffer[BUFFERSIZE];
  158.   const char* ptr;
  159.   int temp;
  160.   struct HTTP http;
  161.   pthread_mutex_lock( &LOCK );
  162.   temp = THREADCOUNT;
  163.   THREADCOUNT++;
  164.   pthread_mutex_unlock( &LOCK );

  165.   while (1)
  166.   {
  167.       nfds = epoll_wait(epfd[temp], events, EVENTSIZE , -1);
  168.       for (i=0; i<nfds; i++)
  169.     {
  170.      memset( http.charset, '/0', 32 );
  171.      memset( http.data, '/0', 1024 );
  172.      memset( http.content, '/0', 1024 );
  173.      memset( http.opt, '/0', 8 );
  174.      memset( http.name, '/0', 256 );
  175.      memset( http.pos, '/0', 16 );
  176.      memset( http.num, '/0', 16 );
  177.      if(events[i].events & EPOLLIN)
  178.      {
  179.      cfd = events[i].data.fd;
  180.      ret = recv(cfd, buffer, sizeof(buffer),0);
  181.      //printf( "%s/n",buffer );
  182.      head = 0;
  183.      tail = 0;
  184.      /*判断读数据是否成功*/
  185.      if( ret <= 0 )
  186.         {
  187.          ev.data.fd = cfd;
  188.          epoll_ctl(epfd[ temp ], EPOLL_CTL_DEL, cfd, &ev);
  189.          close(cfd);
  190.          return NULL;
  191.         }
  192.      get_itemdata( buffer, "name", http.name );//printf( "http.name:%s/n", http.name );
  193.      get_itemdata( buffer, "opt", http.opt );//printf( "http.opt:%s/n", http.opt );
  194.      get_itemdata( buffer, "charset", http.charset );//printf( "http.charset%s/n", http.charset );
  195.      get_itemdata( buffer, "pos", http.pos );//printf( "http.pos:%s/n", http.pos );
  196.      get_itemdata( buffer, "num", http.num );//printf( "http.num:%s/n", http.num );
  197.      get_itemdata( buffer, "data", http.data );//printf( "http.data:%s/n", http.data );
  198.     
  199.      /*获取post正文信息*/
  200.      ptr = strstr( buffer, "/r/n/r/n" );
  201.      if(strlen( ptr ) > 0 ){
  202.         head = 4;
  203.         while( ptr[ head ] != '/0'){
  204.          http.content[ head-5 ] = ptr[ head ];
  205.          head++;
  206.         }
  207.         http.content[ head ] = '/0';
  208.         //printf( "http.content:%s/n",http.content );
  209.      }
  210.      memset( buffer, '/0', BUFFERSIZE);
  211.      strcpy(buffer, REPLY_BUFFER);
  212.      ret = 0;
  213.      /*根据opt进行相应的入库或者出库操作,并给客户端发送回复信息*/
  214.      if( strcmp( http.opt,"put" ) == 0){
  215.         if( strlen( http.content ) > 0)
  216.          tcbdbput2( event_db, http.name, http.content);
  217.         else
  218.          tcbdbput2( event_db, http.name, http.data);
  219.         strcat( buffer, "OK" );
  220.         ret = send(cfd, buffer, strlen(buffer), 0);
  221.         ev.data.fd = cfd;
  222.         epoll_ctl(epfd[ temp ], EPOLL_CTL_DEL, cfd, &ev);
  223.         close(cfd);
  224.      }
  225.      else if( strcmp(http.opt,"get") == 0 )
  226.         {
  227.          printf( "opt is get/n");
  228.          //printf("http.content:%s/n",tcbdbget2(event_db, http.name));
  229.          ptr = tcbdbget2(event_db, http.name);
  230.          //printf( "ptr:%s/n", ptr );
  231.          if( ptr )
  232.          strcat(buffer ,tcbdbget2(event_db, http.name));
  233.          else
  234.          strcat(buffer ,"no such name");
  235.          if(write( cfd, buffer, sizeof( buffer)) <= 0)
  236.          {
  237.          ev.data.fd = cfd;
  238.          ev.events = EPOLLOUT | EPOLLET;
  239.          epoll_ctl(epfd[ temp ], EPOLL_CTL_MOD, cfd, &ev);
  240.          }
  241.          else{
  242.          ev.data.fd = cfd;
  243.          epoll_ctl(epfd[ temp ], EPOLL_CTL_DEL, cfd, &ev);
  244.          close(cfd);
  245.          }
  246.         }
  247.      }
  248.      else if(events[i].events & EPOLLOUT)
  249.      {
  250.      cfd = events[i].data.fd;
  251.      ret = send(cfd, REPLY_BUFFER, strlen(REPLY_BUFFER), 0);
  252.      //printf("send ret...........= %d/n", ret);
  253.     
  254.      ev.data.fd = cfd;
  255.      epoll_ctl(epfd[ temp ], EPOLL_CTL_DEL, cfd, &ev);
  256.      //shutdown(cfd, 1);
  257.      close(cfd);
  258.      }
  259.     }
  260.   }
  261.   return NULL;
  262. }

阅读(1517) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~