概念:
循环队列是将队列的末尾和开头连接起来,主要处理了队列的假溢出。
作为一种容器,肯定需要支持基本的插入和删除。但是它只支持在队列末尾插入,在队列开始处取消息。
实际应用:
在实际应用中,常常有多个生产者和消费者同时进行访问,在学习操作系统时的伪代码是采用了互斥量和条件变量来实现的。而我们的程序,可能是单进程多线程或者是多进程来访问循环队列,而现在,在多线程中还可以采用无锁技术来提高访问的效率。
队列实现
在window下,使用临界区和信号量(window vista后才有条件变量)来实现的循环队列可以如下定义:
typedef struct CycleQueue
{
void* buffer;
int ItemLen;
int size;
//front指针和rear指针分开存储,减少cache失效概率。
int front; //指向头,生存者用来放消息。
int used; //如果要使用迭代器,好像还真是得留一个元素不存储?
//LockGuad m_lock;
CRITICAL_SECTION m_lock; //临界区只能在一个进程中使用
int rear;
HANDLE hFull;
HANDLE hEmpty;
}CycleQueue, *pCycleQueue;
buffer、size是动态分配的数组。这里没有使用普通的数组是不希望队列的大小在编译时就固定了——虽然运行程序一般不会去动态改变大小,但是我们可以在运行之前进行配置。
buffer 和ItemLen指明该队列所存储的数据类型。
used:队列使用了多少,它可以用来区分队列的满和空。
front、rear只是队列的开始和结束位置(后面的代码实现将这两个指针和大多数书上的写反了:front为push的位置)。这里将它分开存储是有利于在多核计算时的cache命中(通常情况下、front和rear会被不不同的线程使用——生存者和消费者)
m_lock:多个生存者、消费者互斥的访问队列。
hFull/hEmpty:表示队列中可以消费/生产的数量,使用了这两个信号量后,从队列的实现角度来说,是没有必须要used字段。hFull的值等于used,hEmpy的值size-used
总体来说,该队列需要同步的字段只有front、used、rear。而buffer字段可以去掉,直接将数据存储在队列的末尾——此时队列和数据必须一起分配和释放。
队列的空、满
1.循环队列的满和空的判断条件是一样的:front==rear。
此时可以预留一个元素不使用,这样空还是front==rear,但是满就变成了(front + 1)%size == rear
另一种方法是使用一个变量used来表示使用了多少,很明显used==0为空,used==size为满。这样可能会节省一点空间(ItemLen可能比较大),但是每次push和pop都需要计算used——当然这都是细节,通常情况下都不需要太关系。
2.当队列不能操作时:push时队列已满,pop时队列为空
此时可以直接返回错误信息给用户
也可以将接口设置一个超时时间,在该时间段内可以操作时就可以返回。
下面的实现中,采用无锁队列的实现是直接返回的错误码,采用信号量机制实现时可以等待一段时间。
当然,在某些情况下,队列中存放的消息并不是很重要(需要实时性数据),并且队列的buffer开辟的比较大——一般会结合双循环队列使用,一般在队列满之后可以直接覆盖以前的信息。
限制:
本次操作没有实现一次放入多条消息,或者一次取出多条消息——信号量每次wait都是减一的。
操作方法:
-
inline bool isEmpty(CycleQueue* q) //判断队列是否为空。
-
{
-
return q->front == q->rear;
-
}
-
-
inline bool isFull(CycleQueue* q) //判断队列是否已满
-
{
-
return q->used == q->size;
-
}
-
-
inline void* CycyleQueue_at(CycleQueue* q, int current)
-
{
-
return (unsigned char*)q->buffer + current * q->ItemLen;
-
}
-
-
CycleQueue* CreateCycleQueue(int size, int itemSize);
-
void DestroyCycleQueue(CycleQueue* q);
-
-
void * CycyleQueue_push(CycleQueue* q, int Milliseconds);
-
bool CycyleQueue_pop(CycleQueue* q, void* des, int Milliseconds);
创建队列:
生产者和消费者是等价的,队列应该先被创建,然后再被生存者和消费者使用:
CycleQueue* CreateCycleQueue(int size, int itemSize)
{
CycleQueue* queue = (CycleQueue*)malloc(sizeof(CycleQueue));
if (!queue)
{
return NULL;
}
(queue->buffer) = (void *)malloc(size*itemSize);
if (!queue->buffer)
{
free(queue);
return NULL;
}
queue->size = size;
queue->ItemLen = itemSize;
queue->front = 0;
queue->rear = 0;
queue->used = 0;
InitializeCriticalSection(&queue->m_lock);
queue->hFull = CreateSemaphore(NULL, 0, queue->size, NULL);
queue->hEmpty = CreateSemaphore(NULL, queue->size, queue->size, NULL);
return queue;
}
释放队列:
void DestroyCycleQueue(CycleQueue* q)
{
if (q)
{
if (q->buffer)
{
free(q->buffer);
q->buffer = NULL;
}
CloseHandle(q->hFull);
CloseHandle(q->hEmpty);
DeleteCriticalSection(&q->m_lock);
free(q);
q = NULL;
}
}
向队列中放入消息:
void * CycyleQueue_push(CycleQueue* q, int Milliseconds)
{
void *current = NULL;
if (WAIT_OBJECT_0 != WaitForSingleObject(q->hEmpty, Milliseconds)) //信号量不必被锁!
{
return current;
}
EnterCriticalSection(&q->m_lock);
current = (unsigned char*)q->buffer + q->front * q->ItemLen;
q->front = ++q->front % q->size;
q->used++;
LeaveCriticalSection(&q->m_lock);
ReleaseSemaphore(q->hFull, 1, NULL);
return current;
}
向队列中取消息:
bool CycyleQueue_pop(CycleQueue* q, void* des, int Milliseconds)
{
void *current = NULL;
if (WAIT_OBJECT_0 != WaitForSingleObject(q->hFull, Milliseconds)) //信号量不必被锁!
{
return false;
}
EnterCriticalSection(&q->m_lock);
current = (unsigned char*)q->buffer + q->rear * q->ItemLen;
if (des)
{
memcpy(des, current, q->ItemLen);
}
q->rear = ++q->rear % q->size;
q->used--;
LeaveCriticalSection(&q->m_lock);
ReleaseSemaphore(q->hEmpty, 1, NULL);
return true;
}
其它版本实现: