Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2633978
  • 博文数量: 333
  • 博客积分: 4817
  • 博客等级: 上校
  • 技术积分: 4413
  • 用 户 组: 普通用户
  • 注册时间: 2011-02-28 10:51
文章分类

全部博文(333)

文章存档

2017年(20)

2016年(57)

2015年(27)

2014年(20)

2013年(21)

2012年(164)

2011年(24)

分类: 嵌入式

2014-02-16 20:19:08

建立多任务模型,并用线程来实现 

  符合POSIX标准的UNIX操作系统提供了线程的控制函数,如:线程的创建和终止、线程之间的互斥、线程之间的同步等。利用这些系统函数可以成功地模拟消息队列,来实现线程间数据共享和同步,以完成多任务的实时性。为成功地描述线程间数据共享和同步,以下列任务模型为例。 

  首先建立消息队列属性数据结构 

   #define MAXQUEUE 30 

   typedef struct mq_attrib { 

   char name[20]; 

   pthread_mutex_t mutex_buff; 

   pthread_mutex_t mutex_cond; 

   pthread cond_t cond; 

   int maxElements; 

   int elementLength; 

   int curElementNum; 

   caddr_t buff; 

  }mq_attrib,mq_attribstruct,?mq_attrib_t; 

   mq_attrib_t msqueue[MAXQUEUE]; 

  数据结构定义了消息队列的名字name,最大消息个数maxElements,单个消息长度elementLength,当前消息个数curElementNum,存放消息的缓冲区buff,保护缓冲区锁mutex_buff,线程同步条件变量cond,保护线程同步条件变量锁mutex_cond。 

  消息队列的创建 

  依据此数据结构进行消息队列的创建,函数为msqueue_create(参数解释:name消息队列名,maxnum消息的最大个数,length单个消息的长度)。 

  int msqueue_create( name, maxnum, length ) 

  char?name; 

  int maxnum,length; 

  { 

   int i; 

   for ( i=0; i 

   if ( msqueue[i]==NULL )break; 

   //如果消息队列全部被分配,返回错 

   if ( i==MAXQUEUE ) return MQERROR; 

   msqueue[i]=malloc(sizeof(mq_attribstruct)); 

   sprintf( msqueue[i]->name, "%s", name); 

   msqueue[i]->maxElements = maxnum; 

   msqueue[i]->elementLength = length; 

   msqueue[i]->curElementNum = 0; 

   msqueue[i]->buff=malloc(maxnum?length); 

   //对保护锁进行初始化 

   pthread_mutex_init(&&msqueue[i] 

  ->mutex_buff, NULL); 

   pthread_mutex_init(&&msqueue[i] 

  ->mutex_cond, NULL); 

   //对线程同步条件变量初始化 

   pthread_cond_init(&&msqueue[i]->cond, NULL); 

   return i; 

  } 

  应用消息队列进行消息的发送和接收 

  发送消息到消息队列: 

  消息队列的发送和接收是在不同的线程中进行的。首先介绍发送消息到消息队列的函数: 

  int msqueue_send ( id, buff, length ) 

  int id, length; 

  caddr_t buff; 

  { 

   int pos; 

   //消息队列id错,返回错 

   if ( id<0 || id >= MAXQUEU ) return MQERROR; 

   //消息长度与创建时的长度不符,返回错 

   if ( length != msqueue[id]->elementLength ) return MQERROR; 

   //消息队列满,不能发送 

   if ( msqueue[id]->curElementNum >= msqueue[id]->maxElements ) 

   return MQERROR; 

   //在对消息队列缓冲区操作前,锁住缓冲区,以免其他线程操作 

   pthread_mutex_lock ( &&msqueue[id]->mutex_buff ); 

   pos = msqueue[id]->curElementNum * msqueue[id]->elementLength; 

   bcopy ( buff, &&msqueue[id]->buff[pos], msqueue[id]->elementLength ); 

   msqueue[id]->curElementNum ++; 

   pthread_mutex_unlock ( &&msqueue[id]->mutex_buff ); 

   //如果插入消息前,消息队列是空的,插入消息后,消息队列为非空,则通知等待从消息队列取消息的线程,条件满足,可以取出消息进行处理 

   if ( msqueue[id]->curElementNum == 1 ) { 

   pthread_mutex_lock ( &&msqueue[id]->mutex_cond ); 

   pthread_cond_broadcast ( &&msqueue[id]->cond ); 

   pthread_mutex_unlock ( &&msqueue[id]->mutex_cond ); 

   } 

   return length; 

  } 

  从消息队列中接收消息: 

  消息队列的接收函数msqueue_receive,其参数:id为消息队列数组的索引号,buff为消息内容,length为消息长度。 

  int msqueue_receive ( id, buff, length ) 

  int id, length; 

  caddr_t buff; 

  { 

   caddr_t temp; 

   int pos; 

   if(id<0||id>=MAXQUEUE)return MQERROR; 

   if(length != msqueue[id]->elementLength) 

  return MQERROR; 

   //如果消息队列为空,则等待,直到消息队列为非空条件满足 

   if ( msqueue[id]->curElementNum == 0){ 

   pthread_mutex_lock ( &&msqueue[id]->mutex_cond ); 

   pthread_cond_wait ( &&msqueue[id]->cond, &&msqueue[id]->mutex_cond ); 

   pthread_mutex_unlock ( &&msqueue[id]->mutex_cond ); 

   } 

   //取消息前,锁住消息队列缓冲区,以免其他线程存放或取消息 

   pthread_mutex_lock ( &&msqueue[id]->mutex_buff ); 

   //为符合消息队列FIFO特性,取出消息后,进行消息队列的调整 

  temp = 

  malloc((msqueue[id]->curElementNum-1) 

  ?msqueue[id]-elementLength ); 

   bcopy ( &&msqueue[id]->buff[0], buff, msqueue[id]->elementLength ); 

   msqueue[id]->curElementNum --; 

   bcopy ( &&msqueue[id]->buff[msqueue[id]->elementLength], temp, 

   msqueue[id]->elementLength 

  ?msqueue[id]->curElementNum); 

   bcopy ( temp, &&msqueue[id]->buff[0], 

   msqueue[id]->elementLength 

  ?msqueue[id]->curElementNum); 

   free ( temp ); 

   //解除缓冲区锁 

   pthread_mutex_unlock ( &&msqueue[id]->mutex_buff ); 

   return length; 

  } 

  多任务模型的实现 

  在讨论完消息队列的创建、删除、发送和接收后,下面讲述消息队列在线程中的应用以实现多任务线程间的数据共享。 

  首先在main主函数中创建消息队列和线程: 

  //定义全局变量 

  Int msqueue_record, msqueue_process; 

  Void main() 

  { 

   pthread_t pthreadID1; 

   //创建消息队列,用于线程间通信 

   msqueue_record = msqueue_create (“record”, 200, 200); 

   msqueue_process = msqueue_create (“process”, 200, 200); 

   //创建数据采集线程 

   pthread_create ( &&pthreadID1, NULL, receiveData, NULL); 

   //创建数据处理线程 

   pthread_create ( &&pthreadID2, NULL, process, NULL); 

   //创建数据记录线程 

   pthread_create ( &&pthreadID1, NULL, record, NULL); 

   //等待进程结束 

   wait_thread_end( ); 

  } 

  数据采集线程: 

  void?receiveData( ) 

  { 

   int count; 

   unsigned char buff[200]; 

   for(;;) { 

  //从数据口采集数据,并将数据放置于buff中 

  //wait_data_from_data_port( buff ) 

  //将数据写入消息队列msqueue_record中 

  msqueue_send ( msqueue_record, buff, 200 ); 

  //将数据写入消息队列msqueue_process中 

  msqueue_send ( msqueue_process, buff, 200 ); 

   } 

  } 

  记录线程函数: 

  void?record ( ) 

  { 

   int num, count; 

  unsigned char buffer[200]; 

   for ( ;; ) { 

  count = msqueue_receive ( msg_record, &&buffer, 200 ); 

  if ( count < 0) { 

  perror ( "msgrcv in record"); 

  continue; 

  } 

   //将取到的消息进行记录处理 

   //record_message_to_lib(); 

   } 

  } 

  数据处理线程函数: 

  int process( ) 

  { 

  int count; 

   unsigned char buffer[200]; 

   for ( ;; ) { 

   count = msqueue_receive ( msg_process, &&buffer, 200 ); 

  if ( count < 0) { 

   perror ( "msgrcv in record"); 

   continue; 

   } 

   //将取到的消息进行处理 

   //process_message_data() 

   } 

  } 

  在实现多任务系统时,作者曾经做过以下三种实现方法的比较:进程间通信采用IPC机制,线程间通信采用进程通信方式IPC,线程间通信采用基于作者开发的消息队列。结果表明:利用用户下的数据区进行线程间通信的速度最快,效率最高,而IPC方式慢。 
阅读(2120) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~