@HUST张友东 work@taobao zyd_com@126.com
分类: LINUX
2010-03-11 17:04:37
等待队列是linux里的一个重要数据结构,包括之前分析的fuse中也用到等待队列以实现阻塞IO,等待队列实际上是一个循环链表,需要等待某个条件的进程在相应的等待队列上睡眠,但等待条件变为真时,进程被唤醒。
主要的数据结构在include/linux/wait.h中声明及定义:
等待队列头的结构如下:
struct __wait_queue_head {
spinlock_t lock; //用于保护双向循环链表
struct list_head task_list;
};
等待队列元素的结构如下:
struct __wait_queue {
unsigned int flags;
#define WQ_FLAG_EXCLUSIVE 0x01
void *private;
wait_queue_func_t func;
struct list_head task_list;
};
__wait_queue各个字段的含义如下:
flags: 置1或0,1表示互斥进程由内核有选择地唤醒,0表示非互斥
进程总是由内核在事件发生时唤醒。
private:此字段以前为struct task_struct * task,现在改为void *型,但
其作用不变,指向一个等待进程。
func: 指向一个函数地址,被用来指定一个在等待队列的睡眠进程如何被
唤醒。
DECLARE_WAIT_QUEUE_HEAD,DECLARE_WAITQUEUE用于定义及初始化一个等待队列头,等待队列元素结构,或使用init_waitqueue_head对等待队列头显示初始化。 init_waitqueue_entry,init_waitqueue_func_entry实现对等待队列元素的显示初始化。__add_wait_queue ,__add_wait_queue_tail ,__remove_wait_queue函数实现向等待队列中添加/删除元素。
scull代码中提供了两种睡眠机制实现阻塞操作,阻塞操作的标准语法:
如果一个进程调用 read 但是没有数据可用(尚未), 这个进程必须阻塞. 这个进程在有数据达到时被立刻唤醒, 并且那个数据被返回给调用者, 即便小于在给方法的count 参数中请求的数量.
如果一个进程调用 write 并且在缓冲中没有空间, 这个进程必须阻塞, 并且它必须在一个与用作 read 的不同的等待队列中. 当一些数据被写入硬件设备, 并且在输出缓冲中的空间变空闲, 这个进程被唤醒并且写调用成功, 尽管数据可能只被部分写入如果在缓冲只没有空间给被请求的 count 字节.
两种机制分别在scull_p_read和scull_p_write中进行了示范,这里分别称之为简单睡眠和高级睡眠。
l 简单睡眠:
使用如下一组函数来实现在某一条件上睡眠,及唤醒操作。
wait_event(queue, condition)
//返回值非0意味着睡眠过程被某些信号打断,通常应返回-ERESTARTSYS
wait_event_interruptible(queue, condition)
wait_event_timeout(queue, condition, timeout)
wait_event_interruptible_timeout(queue, condition, timeout)
void wake_up(wait_queue_head_t *queue);
void wake_up_interruptible(wait_queue_head_t *queue);
scull_p_read分析:
static ssize_t scull_p_read (struct file *filp, char __user *buf, size_t count,
loff_t *f_pos)
{
struct scull_pipe *dev = filp->private_data;
//获取信号量
if (down_interruptible(&dev->sem))
return -ERESTARTSYS;
//如果管道为空,需要睡眠等待至有数据可读
while (dev->rp == dev->wp) { /* nothing to read */
//释放信号量,否则带着锁去睡眠,不会有写进程能获取锁来唤醒睡眠的进程
up(&dev->sem); /* release the lock */
//如果以非阻塞的模式读,立即返回
if (filp->f_flags & O_NONBLOCK)
return -EAGAIN;
PDEBUG("\"%s\" reading: going to sleep\n", current->comm);
// 以可中断的方式在inq上睡眠,等待条件为dev->rp != dev->wp
if (wait_event_interruptible(dev->inq, (dev->rp != dev->wp)))
return -ERESTARTSYS; /* signal: tell the fs layer to handle it */
/* otherwise loop, but first reacquire the lock */
//被唤醒后,有数据可读,重新获取信号量
if (down_interruptible(&dev->sem))
return -ERESTARTSYS;
}
/* ok, data is there, return something */
if (dev->wp > dev->rp)
count = min(count, (size_t)(dev->wp - dev->rp));
else /* the write pointer has wrapped, return data up to dev->end */
count = min(count, (size_t)(dev->end - dev->rp));
//copy_to_user过程可能睡眠,但该睡眠过程是合理的,不会造成系统死锁
if (copy_to_user(buf, dev->rp, count)) {
up (&dev->sem);
return -EFAULT;
}
dev->rp += count;
if (dev->rp == dev->end)
dev->rp = dev->buffer; /* wrapped */
up (&dev->sem);
/* finally, awake any writers and return */
// 数据读取完后,唤醒等待写的进程
wake_up_interruptible(&dev->outq);
PDEBUG("\"%s\" did read %li bytes\n",current->comm, (long)count);
return count;
}
l 高级睡眠:
如何实现睡眠?
使一个进程睡眠的第一步常常是分配和初始化一个 wait_queue_t 结构, 随后将其添加到正确的等待队列. 当所有东西都就位了, 负责唤醒工作的人就可以找到正确的进程.
第二步设置进程的状态为睡眠状态,设置为TASK_INTERRUPTIBLE或TASK_UNINTERRUPTIBLE,可通过set_current_state实现。
放弃处理器,及调用schedule。通常代码为:
if (!condition)
schedule();
在 if 测试和可能的调用 schedule (并从其返回)之后, 有些清理工作要做. 因为这个代码不再想睡眠, 它必须保证任务状态被重置为 TASK_RUNNING. 如果代码只是从 schedule 返回, 这一步是不必要的; 那个函数不会返回直到进程处于可运行态. 如果由于不再需要睡眠而对 schedule 的调用被跳过, 进程状态将不正确. 还有必要从等待队列中去除这个进程, 否则它可能被多次唤醒.
在以前的linux版本中,程序员需要手动处理好以上所有过程及细节,在新的内核中,linux/sched.h对一些过程进行了封装,使得实现更加容易。
1, 创建,初始化一个等待队列元素。DEFINE_WAIT(my_wait);
2, 添加等待队列元素到相应的等待队列并设置状态。
void prepare_to_wait(wait_queue_head_t *queue, wait_queue_t *wait, int state);
3. 执行清理任务。void finish_wait(wait_queue_head_t *queue, wait_queue_t *wait);
以上过程涉及到的代码:
#define DEFINE_WAIT(name) \
wait_queue_t name = { \
.private = current, \
.func = autoremove_wake_function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
}
void fastcall
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue(q, wait);
/*
* don't alter the task state if this is just going to
* queue an async wait queue callback
*/
if (is_sync_wait(wait))
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
void fastcall finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
__set_current_state(TASK_RUNNING);
if (!list_empty_careful(&wait->task_list)) {
spin_lock_irqsave(&q->lock, flags);
list_del_init(&wait->task_list);
spin_unlock_irqrestore(&q->lock, flags);
}
}
scull_p_write代码分析
/* Wait for space for writing; caller must hold device semaphore. On
* error the semaphore will be released before returning. */
static int scull_getwritespace(struct scull_pipe *dev, struct file *filp)
{
while (spacefree(dev) == 0) { /* full */
DEFINE_WAIT(wait);
up(&dev->sem);
if (filp->f_flags & O_NONBLOCK)
return -EAGAIN;
PDEBUG("\"%s\" writing: going to sleep\n",current->comm);
prepare_to_wait(&dev->outq, &wait, TASK_INTERRUPTIBLE);
if (spacefree(dev) == 0)
schedule();
finish_wait(&dev->outq, &wait);
if (signal_pending(current)) //如果有信号处理??
return -ERESTARTSYS; /* signal: tell the fs layer to handle it */
if (down_interruptible(&dev->sem))
return -ERESTARTSYS;
}
return 0;
}
/* How much space is free? */
//环形缓冲区,rp==wp时,缓冲区空;rp == (wp +1)% bufsize时缓冲区满;
static int spacefree(struct scull_pipe *dev)
{
if (dev->rp == dev->wp)
return dev->buffersize - 1;
return ((dev->rp + dev->buffersize - dev->wp) % dev->buffersize) - 1;
}
static ssize_t scull_p_write(struct file *filp, const char __user *buf, size_t count, loff_t *f_pos)
{
struct scull_pipe *dev = filp->private_data;
int result;
if (down_interruptible(&dev- >sem))
return -ERESTARTSYS;
/* Make sure there's space to write */
result = scull_getwritespace(dev, filp);
if (result)
return result; /* scull_getwritespace called up(&dev->sem) */
/* ok, space is there, accept something */
count = min(count, (size_t)spacefree(dev));
if (dev->wp >= dev->rp)
count = min(count, (size_t)(dev->end - dev->wp)); /* to end-of-buf */
else /* the write pointer has wrapped, fill up to rp-1 */
count = min(count, (size_t)(dev->rp - dev->wp - 1));
PDEBUG("Going to accept %li bytes to %p from %p\n", (long)count, dev->wp, buf);
if (copy_from_user(dev->wp, buf, count)) {
up (&dev->sem);
return -EFAULT;
}
dev->wp += count;
if (dev->wp == dev->end)
dev->wp = dev->buffer; /* wrapped */
up(&dev->sem);
/* finally, awake any reader */
//唤醒inq等待队列上的进程
wake_up_interruptible(&dev->inq); /* blocked in read() and select() */
/* and signal asynchronous readers, explained late in chapter 5 */
if (dev->async_queue)
kill_fasync(&dev->async_queue, SIGIO, POLL_IN);
PDEBUG("\"%s\" did write %li bytes\n",current->comm, (long)count);
return count;
}
l 互斥等待
当多个进程等待互斥资源时,wake_up唤醒这些进程后也只能有1或多个进程(跟互斥资源数有关)能投入运行,效率较低。通过进程加入等待队列时指定其标志(flag字段)为WQ_FLAG_EXCLUSEVE,它被添加到等待队列的尾部. 没有这个标志的入口项, 相反, 添加到开始。当 wake_up 被在一个等待队列上调用, 它在唤醒第一个有 WQ_FLAG_EXCLUSIVE 标志的进程后停止。
l wake_up系列函数:
wake_up(wait_queue_head_t *queue);
wake_up_interruptible(wait_queue_head_t *queue);
wake_up唤醒队列中的每个不是在互斥等待中的进程, 并且唤醒第一个互斥等待者, 如果它存在.wake_up_interruptible跳过处于不可中断睡眠的进程.
wake_up_nr(wait_queue_head_t *queue, int nr);
wake_up_interruptible_nr(wait_queue_head_t *queue, int nr);
唤醒多达 nr 个互斥等待者, 而不只是一个,其他与wake_up同. 注意传递 0 被解释为请求所有的互斥等待者都被唤醒.
wake_up_all(wait_queue_head_t *queue);
wake_up_interruptible_all(wait_queue_head_t *queue);
唤醒所有的进程, 不管它们是否进行互斥等待,后者跳过不可中断的条目
wake_up_interruptible_sync(struct wait_queue *q);
在返回前不重新调度CPU