Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4023373
  • 博文数量: 251
  • 博客积分: 11197
  • 博客等级: 上将
  • 技术积分: 6862
  • 用 户 组: 普通用户
  • 注册时间: 2008-12-05 14:41
个人简介

@HUST张友东 work@taobao zyd_com@126.com

文章分类

全部博文(251)

文章存档

2014年(10)

2013年(20)

2012年(22)

2011年(74)

2010年(98)

2009年(27)

分类: LINUX

2010-03-11 17:04:37

等待队列是linux里的一个重要数据结构,包括之前分析的fuse中也用到等待队列以实现阻塞IO,等待队列实际上是一个循环链表,需要等待某个条件的进程在相应的等待队列上睡眠,但等待条件变为真时,进程被唤醒。

主要的数据结构在include/linux/wait.h中声明及定义:

等待队列头的结构如下:

struct __wait_queue_head {

    spinlock_t lock;   //用于保护双向循环链表

    struct list_head task_list;

};

等待队列元素的结构如下:

struct __wait_queue {

    unsigned int flags;

#define WQ_FLAG_EXCLUSIVE   0x01

    void *private;

    wait_queue_func_t func;

    struct list_head task_list;

};

__wait_queue各个字段的含义如下:

   flags:  101表示互斥进程由内核有选择地唤醒,0表示非互斥
进程总是由内核在事件发生时唤醒。

   private:此字段以前为struct task_struct * task,现在改为void *型,但
其作用不变,指向一个等待进程。

   func:   指向一个函数地址,被用来指定一个在等待队列的睡眠进程如何被
唤醒。

 

    DECLARE_WAIT_QUEUE_HEADDECLARE_WAITQUEUE用于定义及初始化一个等待队列头,等待队列元素结构,或使用init_waitqueue_head对等待队列头显示初始化。   init_waitqueue_entryinit_waitqueue_func_entry实现对等待队列元素的显示初始化。__add_wait_queue __add_wait_queue_tail __remove_wait_queue函数实现向等待队列中添加/删除元素。

 

scull代码中提供了两种睡眠机制实现阻塞操作,阻塞操作的标准语法:

如果一个进程调用 read 但是没有数据可用(尚未), 这个进程必须阻塞. 这个进程在有数据达到时被立刻唤醒, 并且那个数据被返回给调用者, 即便小于在给方法的count 参数中请求的数量.

如果一个进程调用 write 并且在缓冲中没有空间, 这个进程必须阻塞, 并且它必须在一个与用作 read 的不同的等待队列中. 当一些数据被写入硬件设备, 并且在输出缓冲中的空间变空闲, 这个进程被唤醒并且写调用成功, 尽管数据可能只被部分写入如果在缓冲只没有空间给被请求的 count 字节.

两种机制分别在scull_p_readscull_p_write中进行了示范,这里分别称之为简单睡眠和高级睡眠。

 

 

l  简单睡眠:

 

使用如下一组函数来实现在某一条件上睡眠,及唤醒操作。

wait_event(queue, condition)

 //返回值非0意味着睡眠过程被某些信号打断,通常应返回-ERESTARTSYS

wait_event_interruptible(queue, condition)

wait_event_timeout(queue, condition, timeout)

wait_event_interruptible_timeout(queue, condition, timeout)

 

void wake_up(wait_queue_head_t *queue);

void wake_up_interruptible(wait_queue_head_t *queue);

 

scull_p_read分析:

static ssize_t scull_p_read (struct file *filp, char __user *buf, size_t count,

                loff_t *f_pos)

{

    struct scull_pipe *dev = filp->private_data;

    //获取信号量

    if (down_interruptible(&dev->sem))

        return -ERESTARTSYS;

   //如果管道为空,需要睡眠等待至有数据可读

    while (dev->rp == dev->wp) { /* nothing to read */

//释放信号量,否则带着锁去睡眠,不会有写进程能获取锁来唤醒睡眠的进程

        up(&dev->sem); /* release the lock */

        //如果以非阻塞的模式读,立即返回

        if (filp->f_flags & O_NONBLOCK)

            return -EAGAIN;

        PDEBUG("\"%s\" reading: going to sleep\n", current->comm);

        // 以可中断的方式在inq上睡眠,等待条件为dev->rp != dev->wp

        if (wait_event_interruptible(dev->inq, (dev->rp != dev->wp)))

            return -ERESTARTSYS; /* signal: tell the fs layer to handle it */

        /* otherwise loop, but first reacquire the lock */

        //被唤醒后,有数据可读,重新获取信号量

        if (down_interruptible(&dev->sem))

            return -ERESTARTSYS;

    }

    /* ok, data is there, return something */

    if (dev->wp > dev->rp)

        count = min(count, (size_t)(dev->wp - dev->rp));

    else /* the write pointer has wrapped, return data up to dev->end */

        count = min(count, (size_t)(dev->end - dev->rp));

    //copy_to_user过程可能睡眠,但该睡眠过程是合理的,不会造成系统死锁

    if (copy_to_user(buf, dev->rp, count)) {

        up (&dev->sem);

        return -EFAULT;

    }

    dev->rp += count;

    if (dev->rp == dev->end)

        dev->rp = dev->buffer; /* wrapped */

    up (&dev->sem);

 

    /* finally, awake any writers and return */

    // 数据读取完后,唤醒等待写的进程

    wake_up_interruptible(&dev->outq);

    PDEBUG("\"%s\" did read %li bytes\n",current->comm, (long)count);

    return count;

}

 

l  高级睡眠:

 

如何实现睡眠?

使一个进程睡眠的第一步常常是分配和初始化一个 wait_queue_t 结构, 随后将其添加到正确的等待队列. 当所有东西都就位了, 负责唤醒工作的人就可以找到正确的进程.

第二步设置进程的状态为睡眠状态,设置为TASK_INTERRUPTIBLETASK_UNINTERRUPTIBLE,可通过set_current_state实现。

放弃处理器,及调用schedule。通常代码为:

if (!condition)

schedule();

if 测试和可能的调用 schedule (并从其返回)之后, 有些清理工作要做. 因为这个代码不再想睡眠, 它必须保证任务状态被重置为 TASK_RUNNING. 如果代码只是从 schedule 返回, 这一步是不必要的; 那个函数不会返回直到进程处于可运行态. 如果由于不再需要睡眠而对 schedule 的调用被跳过, 进程状态将不正确. 还有必要从等待队列中去除这个进程, 否则它可能被多次唤醒.

  

在以前的linux版本中,程序员需要手动处理好以上所有过程及细节,在新的内核中,linux/sched.h对一些过程进行了封装,使得实现更加容易。

1, 创建,初始化一个等待队列元素。DEFINE_WAIT(my_wait);

2, 添加等待队列元素到相应的等待队列并设置状态。

void prepare_to_wait(wait_queue_head_t *queue, wait_queue_t *wait, int state);

3. 执行清理任务。void finish_wait(wait_queue_head_t *queue, wait_queue_t *wait);

 

以上过程涉及到的代码:

#define DEFINE_WAIT(name)                       \

    wait_queue_t name = {                       \

        .private    = current,              \

        .func       = autoremove_wake_function,     \

        .task_list  = LIST_HEAD_INIT((name).task_list), \

    }

 

void fastcall

prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)

{

    unsigned long flags;

 

    wait->flags &= ~WQ_FLAG_EXCLUSIVE;

    spin_lock_irqsave(&q->lock, flags);

    if (list_empty(&wait->task_list))

        __add_wait_queue(q, wait);

    /*

     * don't alter the task state if this is just going to

     * queue an async wait queue callback

     */

    if (is_sync_wait(wait))

        set_current_state(state);

    spin_unlock_irqrestore(&q->lock, flags);

}

 

void fastcall finish_wait(wait_queue_head_t *q, wait_queue_t *wait)

{

    unsigned long flags;

    __set_current_state(TASK_RUNNING);

 

    if (!list_empty_careful(&wait->task_list)) {

        spin_lock_irqsave(&q->lock, flags);

        list_del_init(&wait->task_list);

        spin_unlock_irqrestore(&q->lock, flags);

    }

}

 

 

scull_p_write代码分析

/* Wait for space for writing; caller must hold device semaphore.  On

 * error the semaphore will be released before returning. */

 

static int scull_getwritespace(struct scull_pipe *dev, struct file *filp)

{

    while (spacefree(dev) == 0) { /* full */

        DEFINE_WAIT(wait);

       

        up(&dev->sem);

        if (filp->f_flags & O_NONBLOCK)

            return -EAGAIN;

        PDEBUG("\"%s\" writing: going to sleep\n",current->comm);

        prepare_to_wait(&dev->outq, &wait, TASK_INTERRUPTIBLE);

        if (spacefree(dev) == 0)

            schedule();

        finish_wait(&dev->outq, &wait);

        if (signal_pending(current)) //如果有信号处理??

            return -ERESTARTSYS; /* signal: tell the fs layer to handle it */

        if (down_interruptible(&dev->sem))

            return -ERESTARTSYS;

    }

    return 0;

}  

 

/* How much space is free? */

//环形缓冲区,rp==wp时,缓冲区空;rp == wp +1)% bufsize时缓冲区满;

static int spacefree(struct scull_pipe *dev)

{

    if (dev->rp == dev->wp)

        return dev->buffersize - 1;

    return ((dev->rp + dev->buffersize - dev->wp) % dev->buffersize) - 1;

}

 

static ssize_t scull_p_write(struct file *filp, const char __user *buf, size_t count, loff_t *f_pos)

{

    struct scull_pipe *dev = filp->private_data;

    int result;

 

    if (down_interruptible(&dev- >sem))

        return -ERESTARTSYS;

 

    /* Make sure there's space to write */

    result = scull_getwritespace(dev, filp);

    if (result)

        return result; /* scull_getwritespace called up(&dev->sem) */

 

    /* ok, space is there, accept something */

    count = min(count, (size_t)spacefree(dev));

    if (dev->wp >= dev->rp)

        count = min(count, (size_t)(dev->end - dev->wp)); /* to end-of-buf */

    else /* the write pointer has wrapped, fill up to rp-1 */

        count = min(count, (size_t)(dev->rp - dev->wp - 1));

    PDEBUG("Going to accept %li bytes to %p from %p\n", (long)count, dev->wp, buf);

    if (copy_from_user(dev->wp, buf, count)) {

        up (&dev->sem);

        return -EFAULT;

    }

    dev->wp += count;

    if (dev->wp == dev->end)

        dev->wp = dev->buffer; /* wrapped */

    up(&dev->sem);

 

    /* finally, awake any reader */

    //唤醒inq等待队列上的进程

    wake_up_interruptible(&dev->inq);  /* blocked in read() and select() */

 

    /* and signal asynchronous readers, explained late in chapter 5 */

    if (dev->async_queue)

        kill_fasync(&dev->async_queue, SIGIO, POLL_IN);

    PDEBUG("\"%s\" did write %li bytes\n",current->comm, (long)count);

    return count;

}

 

l  互斥等待

当多个进程等待互斥资源时,wake_up唤醒这些进程后也只能有1或多个进程(跟互斥资源数有关)能投入运行,效率较低。通过进程加入等待队列时指定其标志(flag字段)为WQ_FLAG_EXCLUSEVE,它被添加到等待队列的尾部. 没有这个标志的入口项, 相反, 添加到开始。当 wake_up 被在一个等待队列上调用, 它在唤醒第一个有 WQ_FLAG_EXCLUSIVE 标志的进程后停止。

 

l  wake_up系列函数:

 

wake_up(wait_queue_head_t *queue);

wake_up_interruptible(wait_queue_head_t *queue);

wake_up唤醒队列中的每个不是在互斥等待中的进程, 并且唤醒第一个互斥等待者, 如果它存在.wake_up_interruptible跳过处于不可中断睡眠的进程.

 

wake_up_nr(wait_queue_head_t *queue, int nr);

wake_up_interruptible_nr(wait_queue_head_t *queue, int nr);

唤醒多达 nr 个互斥等待者, 而不只是一个,其他与wake_up. 注意传递 0 被解释为请求所有的互斥等待者都被唤醒.

 

wake_up_all(wait_queue_head_t *queue);

wake_up_interruptible_all(wait_queue_head_t *queue);

唤醒所有的进程, 不管它们是否进行互斥等待,后者跳过不可中断的条目

 

wake_up_interruptible_sync(struct wait_queue *q);

在返回前不重新调度CPU

 

 

 

阅读(2102) | 评论(0) | 转发(1) |
0

上一篇:编程调试技巧

下一篇:iSCSI学习总结

给主人留下些什么吧!~~