分类: LINUX
2016-07-01 16:23:42
原文地址:消息队列的设计与实现 作者:dongliqiang1985
消息驱动机制是 GUI 系统的基础,消息驱动的底层基础设施之一是消息队列, 它是整个 GUI 系统运转中枢,本文介绍了一个基于环形队列的消息队列实现方 法,给出了它的数据结构、主要操作流程和核心代码。
环行队列是一种首尾相连的队列数据结构,遵循先进先出原则,如下图所示:
ring buffer 示意图 |
在环形队列中用一组连续地址的存储单元依次存放从队列头到队列尾的元素,通 过两个指针 read_pos 和 write_pos 分别指向读取位置和写入位置。
初始化队列时,令 read_pos = write_pos = 0,每当写入一个新元素时, write_pos 增 1;每当读取一个元素时,read_pos 增 1 。若队列已满,不能往 队列写入数据;若队列为空,则不能读取数据。判断对列是否为满的的方法是看 (write_pos + 1)% QUEUE_SIZE == read_pos 是否成立,判断队列是否为空的方 法是看 write_pos == read_pos 是否成立。
鉴于多个线程同时访问环形队列,需要考虑线程之间的互斥和同步问题,拟采用 锁控制多个线程互斥访问环形队列,使用信号量控制线程之间的同步。
一段时间内只能有一个线程获得锁,当它持有锁时,其它线程要访问环形队列必 须等待,直到前者释放锁。由此,锁可以保证多个线程互斥的访问环形队列。
线程从队列对数据前首先判断信号量是否大于 1 ,若是,则从队列读数据;否 则,进入等待状态,直到信号量大于 1 为止;线程往队列写入一个数据后,会 将信号量增 1 ,若有线程在等待,则会被唤醒。由此,信号量实现了多线程同 步访问环形队列。
下图是环形缓冲区的初始化、读数据、写数据的主要流程。
ring buffer 流程图 |
环形队列的数据结构如下所示:
typedef _MSG {
int message;
void* param;
} MSG;
typedef _MSGQUE {
pthread_mutex_t lock;
sem_t wait;
MSG* msg;
int size;
int read_ops;
int write_ops;
} MSGQUEUE;
环形队列包括如下数据:
初始化主要完成三个任务:
/* Create message queue */
_msg_queue = malloc (sizeof (MSGQUEUE));
/* init lock and sem */
pthread_mutex_init (&_msg_queue->lock, NULL);
sem_init (&_msg_queue->wait, 0, 0);
/* allocate message memory */
_msg_queue -> msg = malloc (sizeof(MSG) * nr_msg);
_msg_queue -> size = nr_msg;
如上面的流程图介绍,写操作主要包括如下几步: - 获取锁;
/* lock the message queue */
pthread_mutex_lock (_msg_queue->lock);
/* check if the queue is full. */
if ((_msg_queue->write_pos + 1)% _msg_queue->size == _msg_queue->read_pos) {
/* Message queue is full. */
pthread_mutex_unlock (_msg_queue->lock);
return;
}
/* write a data to write_pos. */
_msg_queue -> msg [write_pos] = *msg;
write_pos ++;
/* check if write_pos if overflow. */
if (_msg_queue->write_pos >= _msg_queue->size)
_msg_queue->write_pos = 0;
/* release lock */
pthread_mutex_unlock (_msg_queue->lock);
sem_post (_msg_queue->wait);
同理,读操作分如下几个步骤:
sem_wait (_msg_queue->wait);
/* lock the message queue */
pthread_mutex_lock (_msg_queue->lock);
/* check if queue is empty */
if (_msg_queue->read_pos != _msg_queue->write_pos) {
msg = _msg_queue->msg + _msg_queue->read_pos;
/* read a data and check if read_pos is overflow */
_msg_queue->read_pos ++;
if (_msg_queue->read_pos >= _msg_queue->size)
_msg_queue->read_pos = 0;
return;
}
/* release lock*/
pthread_mutex_unlock (_msg_queue->lock);