Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1004290
  • 博文数量: 96
  • 博客积分: 1553
  • 博客等级: 上尉
  • 技术积分: 1871
  • 用 户 组: 普通用户
  • 注册时间: 2011-12-25 14:50
个人简介

专注点,细心点,耐心点 知行合一

文章分类

全部博文(96)

文章存档

2018年(1)

2014年(4)

2013年(31)

2012年(56)

2011年(4)

分类: C/C++

2012-02-16 22:15:03

在多线程情况下,每个线程都监听同一个fd,当有数据来的时候,是否会有惊群现象呢?验证下

server端代码
编译: g++ testlibevent.cpp -lpthread -levent -o server
  1. #include <iostream>
  2. #include <pthread.h>
  3. #include <event.h>
  4. #include <sys/types.h>
  5. #include <sys/socket.h>
  6. #include <sys/time.h>
  7. #include <netinet/in.h>
  8. #include <sys/types.h>
  9. #include <sys/socket.h>
  10. #include <arpa/inet.h>

  11. int init_count = 0;
  12. pthread_mutex_t init_lock;
  13. pthread_cond_t init_cond;

  14. using namespace std;
  15. typedef struct {
  16.     pthread_t thread_id; /* unique ID of this thread */
  17.     struct event_base *base; /* libevent handle this thread uses */
  18.     struct event notify_event; /* listen event for notify pipe */
  19. } mythread;

  20. void *worker_libevent(void *arg)
  21. {
  22.     mythread *p = (mythread *)arg;
  23.     pthread_mutex_lock(&init_lock);
  24.     init_count++;
  25.     pthread_cond_signal(&init_cond);
  26.     pthread_mutex_unlock(&init_lock);
  27.     event_base_loop(p->base, 0);
  28. }

  29. int create_worker(void*(*func)(void *), void *arg)
  30. {
  31.     mythread *p = (mythread *)arg;
  32.     pthread_t tid;
  33.     pthread_attr_t attr;
  34.     
  35.     pthread_attr_init(&attr);
  36.     pthread_create(&tid, &attr, func, arg);
  37.     p->thread_id = tid;
  38.     return 0;
  39. }

  40. void process(int fd, short which, void *arg)
  41. {
  42.     mythread *p = (mythread *)arg;
  43.     cout << "I am in the thread: " << p->thread_id << endl;
  44.     
  45.     char buffer[100];
  46.     memset(buffer, 0, 100);
  47.     
  48.     int ilen = read(fd, buffer, 100);
  49.     cout << "read num is :" << ilen << endl;
  50.     cout << "the buffer: " << buffer;
  51. }

  52. int setup_thread(mythread *p, int fd)
  53. {
  54.     p->base = event_init();
  55.     event_set(&p->notify_event, fd, EV_READ|EV_PERSIST, process, p);
  56.     event_base_set(p->base, &p->notify_event);
  57.     event_add(&p->notify_event, 0);
  58.     return 0;
  59. }

  60. int main()
  61. {
  62.     struct sockaddr_in in;
  63.     int fd;
  64.     
  65.     fd = socket(AF_INET, SOCK_DGRAM, 0);
  66.     
  67.     struct in_addr s;
  68.     bzero(&in, sizeof(in));
  69.     in.sin_family = AF_INET;
  70.     inet_pton(AF_INET, "192.168.217.128", (void *)&s);
  71.     in.sin_addr.s_addr = s.s_addr;
  72.     in.sin_port = htons(19870);
  73.     
  74.     bind(fd, (struct sockaddr*)&in, sizeof(in));
  75.     int threadnum = 10;
  76.     int i;
  77.     
  78.     pthread_mutex_init(&init_lock, NULL);
  79.     pthread_cond_init(&init_cond, NULL);
  80.     mythread *g_thread;
  81.     g_thread = (mythread *)malloc(sizeof(mythread)*10);
  82.     for(i=0; i<threadnum; i++)
  83.     {
  84.         setup_thread(&g_thread[i], fd);
  85.     }
  86.     
  87.     for(i=0; i<threadnum; i++)
  88.     {
  89.         create_worker(worker_libevent, &g_thread[i]);
  90.     }
  91.     
  92.     pthread_mutex_lock(&init_lock);
  93.     while(init_count < threadnum)
  94.     {
  95.         pthread_cond_wait(&init_cond, &init_lock);
  96.     }
  97.     pthread_mutex_unlock(&init_lock);
  98.     
  99.     
  100.     cout << "IN THE MAIN LOOP" << endl;

  101.     string test = "I am michael";
  102.     write(fd, test.c_str(), test.size());
  103.     
  104.     while(1)
  105.     {
  106.         sleep(1);
  107.     }
  108.     
  109.     free(g_thread);
  110.     return 0;
  111.     
  112. }

client端代码
编译:  g++ udpclient.cpp -o client
  1. #include <iostream>
  2. #include <sys/types.h>
  3. #include <sys/socket.h>
  4. #include <sys/time.h>
  5. #include <netinet/in.h>
  6. #include <sys/types.h>
  7. #include <sys/socket.h>
  8. #include <arpa/inet.h>

  9. using namespace std;
  10. int main()
  11. {
  12.     struct sockaddr_in in;
  13.     int fd;
  14.     
  15.     fd = socket(AF_INET, SOCK_DGRAM, 0);
  16.     
  17.     struct in_addr s;
  18.     bzero(&in, sizeof(in));
  19.     in.sin_family = AF_INET;
  20.     inet_pton(AF_INET, "192.168.217.128", (void *)&s);
  21.     in.sin_addr.s_addr = s.s_addr;
  22.     in.sin_port = htons(19870);

  23.     string str = "I am Michael";
  24.     sendto(fd, str.c_str(), str.size(), 0, (struct sockaddr *)&in, sizeof(struct sockaddr_in));
  25.     
  26.     return 0;
  27. }
.
先启动server, 后启动client.在server端显示如下信息:
IN THE MAIN LOOP
I am in the thread: I am in the thread: I am in the thread: I am in the thread: I am in the thread: I am in the thread: I am in the thread: I am in the thread: I am in the thread: I am in the thread: 3058039696
read num is :12
the buffer: I am Michael3049646992
3083217808
3024468880
3007683472
3016076176
3032861584
3041254288
3066432400
3074825104

结果: 当有数据到来时,每个线程都被唤醒了,但是只有一个线程可以读到数据.



阅读(6401) | 评论(2) | 转发(2) |
给主人留下些什么吧!~~

nanye19842013-03-09 14:43:15

在楼主实验的这种场景下,还有一种场景,就是每个工作线程的处理速度很快的情况下,大部分的时间都浪费在了惊群的状态切换上,这样是很浪费服务器的资源的,
当时做url分布式查询服务器的时候就遇到了这样的问题(框架用redis的信号驱动io),导致我们的并发量不高,最后在此基础上又添加了线程轮发式的分发任务方法。