Chinaunix首页 | 论坛 | 博客
  • 博客访问: 175061
  • 博文数量: 84
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 22
  • 用 户 组: 普通用户
  • 注册时间: 2014-12-26 12:56
文章分类

全部博文(84)

文章存档

2015年(83)

2014年(1)

我的朋友

分类: LINUX

2015-04-16 20:44:13

用互斥量实现的线程安全循环队列。
只是一个示例实现,不大能用于实际编程。
因为当真正多线程时,会因为资源竞争而等待,
导致队列的效率极低。
更有效的方式是使用信号量。

更多的不说了,直接看代码吧。
cir_queue.h
  1. /*
  2.  * \File
  3.  * cir_queue.h
  4.  * \Brief
  5.  * circular queue
  6.  */
  7. #ifndef __CIR_QUEUE_H__
  8. #define __CIR_QUEUE_H__

  9. #define QUE_SIZE 8

  10. typedef int DataType;
  11. typedef struct cir_queue_t
  12. {
  13.   DataType data[QUE_SIZE];
  14.   int front;
  15.   int rear;
  16.   int count;
  17. }cir_queue_t;

  18. extern pthread_mutex_t queue_mutex;

  19. void init_cir_queue(cir_queue_t* q);
  20. int is_empty_cir_queue(cir_queue_t* q);
  21. int is_full_cir_queue(cir_queue_t* q);
  22. void push_cir_queue(cir_queue_t* q, DataType x);
  23. DataType pop_cir_queue(cir_queue_t* q);
  24. DataType top_cir_queue(cir_queue_t* q);
  25. void destroy_cir_queue(cir_queue_t* q);
  26. void print_queue(cir_queue_t* q);

  27. #endif


main.c

  1. /*
  2.  * \File
  3.  * main.c
  4.  * \Breif
  5.  * Thread-safe circular-queue implemented by mutex
  6.  * \Author
  7.  * Hank.yan
  8.  */
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <unistd.h>
  12. #include <string.h>
  13. #include <pthread.h>
  14. #include <semaphore.h>

  15. #include "cir_queue.h"

  16. void* thread_queue(void *arg);

  17.     
  18. /*
  19.  * \Func
  20.  * main
  21.  */
  22. int main(int argc, char* argv[])
  23. {
  24.   int res;
  25.   cir_queue_t cq;
  26.   DataType e;
  27.  
  28.   pthread_t a_thread;
  29.   void* thread_result;

  30.   init_cir_queue(&cq);

  31.   push_cir_queue(&cq, 1);
  32.   push_cir_queue(&cq, 2);
  33.   push_cir_queue(&cq, 3);

  34.   print_queue(&cq);

  35.   res = pthread_create(&a_thread, NULL, thread_queue, (void*)&cq);
  36.   if (res != 0)
  37.   {
  38.     perror("Thread creation failed.");
  39.     exit(EXIT_FAILURE);
  40.   }

  41.   e = pop_cir_queue(&cq);    
  42.   e = pop_cir_queue(&cq);    
  43.   print_queue(&cq);

  44.   push_cir_queue(&cq, 9);
  45.   push_cir_queue(&cq, 100);

  46.   print_queue(&cq);

  47.   e = pop_cir_queue(&cq);    

  48.   push_cir_queue(&cq, 20);
  49.   print_queue(&cq);

  50.   printf("Waiting for thread to finish...\n");
  51.   res = pthread_join(a_thread, &thread_result);
  52.   if (res != 0)
  53.   {
  54.     perror("Thread join failed.");
  55.     exit(EXIT_FAILURE);
  56.   }
  57.   print_queue(&cq);

  58.   destroy_cir_queue(&cq);

  59.   printf("Thread joined, it returned %s\n", (char*)thread_result);
  60.   exit(EXIT_SUCCESS);
  61. }


  62. void *thread_queue(void *cirqueue)
  63. {
  64.   int flag;
  65.   DataType element;

  66.   print_queue((cir_queue_t*)cirqueue);

  67.   flag = is_empty_cir_queue((cir_queue_t*)cirqueue);

  68.   print_queue((cir_queue_t*)cirqueue);
  69.   element = pop_cir_queue((cir_queue_t*)cirqueue);
  70.   element = pop_cir_queue((cir_queue_t*)cirqueue);
  71.   print_queue((cir_queue_t*)cirqueue);

  72.   push_cir_queue((cir_queue_t*)cirqueue, 5);
  73.   print_queue((cir_queue_t*)cirqueue);

  74.   push_cir_queue((cir_queue_t*)cirqueue, 99);
  75.   push_cir_queue((cir_queue_t*)cirqueue, 1000);
  76.   push_cir_queue((cir_queue_t*)cirqueue, 88);

  77.   print_queue((cir_queue_t*)cirqueue);
  78.   
  79.   pthread_exit("Thank you for the cpu time.");
  80. }

cir_queue.c
  1. /*
  2.  * \File
  3.  * cir_queue.c
  4.  */
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #include <unistd.h>
  8. #include <string.h>
  9. #include <pthread.h>
  10. #include <semaphore.h>

  11. #include "cir_queue.h"

  12. pthread_mutex_t queue_mutex;
  13. /*
  14.  * \Func
  15.  *
  16.  */
  17. void init_cir_queue(cir_queue_t *q)
  18. {    
  19.   int res;

  20.   res= pthread_mutex_init(&queue_mutex, NULL);
  21.   if (res != 0)
  22.   {
  23.     perror("Mutex init failed.\n");
  24.     exit(EXIT_FAILURE);
  25.   }
  26.   memset(q->data, 0, QUE_SIZE*sizeof(DataType));

  27.   q->front = q->rear = 0;
  28.   q->count = 0;
  29. }

  30. /*
  31.  * \Func
  32.  *
  33.  */
  34. int is_empty_cir_queue(cir_queue_t *q)
  35. {
  36.   int empty_flag;

  37.   pthread_mutex_lock(&queue_mutex);

  38.   empty_flag = q->front == q->rear;

  39.   pthread_mutex_unlock(&queue_mutex);

  40.   return empty_flag;
  41. }

  42. /*
  43.  * \Func
  44.  *
  45.  */
  46. int is_full_cir_queue(cir_queue_t *q)
  47. {
  48.   int full_flag;

  49.   pthread_mutex_lock(&queue_mutex);

  50.   full_flag = q->rear == QUE_SIZE - 1 + q->front;

  51.   pthread_mutex_unlock(&queue_mutex);

  52.   return full_flag;
  53. }

  54. /*
  55.  * \Func
  56.  *
  57.  */
  58. void push_cir_queue(cir_queue_t *q, DataType x)
  59. {

  60.   if (is_full_cir_queue(q))
  61.   {
  62.     printf("queue overflow.\n");
  63.     return ;
  64.   }

  65.   pthread_mutex_lock(&queue_mutex);

  66.   q->count++;
  67.   q->data[q->rear] = x;
  68.   q->rear = (q->rear+1) % QUE_SIZE;

  69.   pthread_mutex_unlock(&queue_mutex);
  70. }

  71. /*
  72.  * \Func
  73.  *
  74.  */
  75. DataType pop_cir_queue(cir_queue_t *q)
  76. {
  77.   DataType temp;

  78.   if (is_empty_cir_queue(q))
  79.   {
  80.     printf("queue empty.\n");
  81.     return 0;
  82.   }

  83.   pthread_mutex_lock(&queue_mutex);

  84.   temp = q->data[q->front];
  85.   q->data[q->front] = 0;
  86.   q->count--;
  87.   q->front = (q->front+1) % QUE_SIZE;

  88.   pthread_mutex_unlock(&queue_mutex);

  89.   return temp;
  90. }

  91. /*
  92.  * \Func
  93.  *
  94.  */
  95. DataType top_cir_queue(cir_queue_t *q)
  96. {
  97.   DataType x;

  98.   if (is_empty_cir_queue(q))
  99.   {
  100.     printf("queue is empty.\n");
  101.     return 0;
  102.   }

  103.   pthread_mutex_lock(&queue_mutex);

  104.   x = q->data[q->front];

  105.   pthread_mutex_unlock(&queue_mutex);

  106.   return x;
  107. }

  108. void destroy_cir_queue(cir_queue_t *q)
  109. {
  110.   pthread_mutex_destroy(&queue_mutex);

  111.   return;    
  112. }

  113. void print_queue(cir_queue_t* q)
  114. {
  115.   int index;
  116.   if (is_empty_cir_queue(q))
  117.   {
  118.     printf("queue is empty.\n");
  119.     return;
  120.   }

  121.   pthread_mutex_lock(&queue_mutex);
  122.   printf("QUEUE: ");
  123.   for (index = 0; index < QUE_SIZE; index++)
  124.   {
  125.     printf(" %d ", q->data[index]);
  126.   }
  127.   printf("\n");
  128.   pthread_mutex_unlock(&queue_mutex);

  129.   return;
  130. }

makefile
  1. OBJECTS = main.o cir_queue.o
  2. CC = gcc
  3. CFLAGS = -D_REENTRANT -lpthread -g -Wall


  4. thrd_safe_queue: $(OBJECTS)
  5.   $(CC) $(CFLAGS) -o thrd_safe_queue $(OBJECTS)

  6. main.o: cir_queue.h
  7. cir_queue.o: cir_queue.h


  8. .PHONY:clean
  9. clean:
  10.   rm thrd_safe_queue $(OBJECTS)


阅读(637) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~