Chinaunix首页 | 论坛 | 博客
  • 博客访问: 163982
  • 博文数量: 84
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1
  • 用 户 组: 普通用户
  • 注册时间: 2014-03-09 10:55
文章分类
文章存档

2014年(84)

我的朋友

分类: LINUX

2014-05-15 15:30:01

摘要:本文主要讲述linux如何处理ARM cortex A9多核处理器的内核同步部分。主要包括信号量介绍。

法律声明LINUX3.0内核源代码分析》系列文章由谢宝友()发表于http://xiebaoyou.blog.chinaunix.net,文章中的LINUX3.0源代码遵循GPL协议。除此以外,文档中的其他内容由作者保留所有版权。谢绝转载。

 

1.1 信号量
1.1.1      经典的信号量

信号量也是一种锁,它与自旋锁的区别在于:当资源不可用时,它会使进程挂起,而自旋锁是让等待者忙等。一般情况下,信号量用于进程上下文,自旋锁用于中断上下文。

信号量的结构如下:

/**

 * 信号量

 */

struct semaphore {

         /**

          * 保护信号量数据结构的信号量

          */

         spinlock_t                  lock;

         /**

          * 可用计数。如果仅仅用于互斥,那么其初值为1;如果用于计数信号量,则大于1.

          */

         unsigned int              count;

         /**

          * 等待获得此信号量的进程列表。

          */

         struct list_head        wait_list;

};

 

可以看到,linux3.0中的信号量数据结构与linux2.6.11相比,有了非常大的变化。总的来说,变得很清晰了。

获得信号量的原语是:

/**

 * 获得信号量,如果信号量不可用,则睡眠。

 * 根据代码注释,不推荐直接使用此函数,而应当使用down_interruptibledown_killable

 * 直接使用此函数会导致进程处于"D"状态,不能被kill掉。

 */

void down(struct semaphore *sem)

{

         unsigned long flags;

 

         /**

          * 获得保护此信号量的自旋锁,并关中断。

          * 这里需要关中断,因为有另外一个down_trylock的原语能够在中断里面使用。如果不关中断,则在中断中调用down_trylock用形成自旋锁死锁。

          */

         spin_lock_irqsave(&sem->lock, flags);

         if (likely(sem->count > 0))/* 自旋锁处于可用状态。 */

                   sem->count--;/* 将信号量可用计数减1后直接返回,表示信号量被当前进程获得的事实 */

         else

                   __down(sem);/* 信号量不可用,调用__down进入慢速处理流程。 */

         spin_unlock_irqrestore(&sem->lock, flags);/* 释放自旋锁 */

}

当信号量不可用时,调用__down将当前进程挂起:

static noinline void __sched __down(struct semaphore *sem)

{

         /**

          * 请注意TASK_UNINTERRUPTIBLE标志,它表示将任务置为不可中断状态。这里down原语不被推荐使用的原因。

          * MAX_SCHEDULE_TIMEOUT表示没有超时,进程会一直等待。

          */

         __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);

}

 

/**

 * 将进程挂入信号量的等待队列。

 */

static inline int __sched __down_common(struct semaphore *sem, long state,

                                                                           long timeout)

{

         struct task_struct *task = current;

         struct semaphore_waiter waiter;

 

         /**

          * 将等待节点挂到信号量的等待队列上。

          */

         list_add_tail(&waiter.list, &sem->wait_list);

         /**

          * 等待节点上的任务是当前任务

          */

         waiter.task = task;

         waiter.up = 0;

 

         for (;;) {/* 这里使用一个死循环,是考虑到等待过程可能被信号打断,导致睡眠时间不足。这主要是用于down_timeout */

                   /**

                    * 如果是TASK_INTERRUPTIBLETASK_WAKEKILL状态,并且有信号将进程唤醒,则将当前进程从等待队列中取出并退出。

                    */

                   if (signal_pending_state(state, task))

                            goto interrupted;

                   if (timeout <= 0)/* 进程不是被信号唤醒的,而是等待的超时时间到达,则向上层返回超时错误。 */

                            goto timed_out;

                   /**

                    * 将进程设置为指定的状态,并调度出去。

                    */

                   __set_task_state(task, state);

                   /**

                    * 在调度出去之前,需要将自旋锁释放。

                    */

                   spin_unlock_irq(&sem->lock);

                   /**

                    * 根据调用者指定的睡眠时间,调用schedule_timeout

                    * 如果由于信号等原因导致进程过早的被唤醒,那么timeout就是还需要继续睡眠的时间。

                    */

                   timeout = schedule_timeout(timeout);

                   /**

                    * 进程被唤醒了,可能是睡眠时间已经到达,或者获得了信号量,或者被信号唤醒了。

                    * 在继续进行判断之前,需要获得信号量的自旋锁。

                    */

                   spin_lock_irq(&sem->lock);

                   if (waiter.up)/* 进程是被其他进程释放信号量而唤醒的,向上层返回成功 */

                            return 0;

         }

 

 timed_out:

         list_del(&waiter.list);

         return -ETIME;

 

 interrupted:

         list_del(&waiter.list);

         return -EINTR;

}

 

释放信号量的函数是up:

/**

 * 释放信号量

 * 注意,信号量不仅可以用于互斥,也可以用于同步。

 * 也就是说,某个进程即使没有调用down,也可以直接调用up唤醒等待的任务。

 * 也可以在中断上下文使用。

 */

void up(struct semaphore *sem)

{

         unsigned long flags;

 

         spin_lock_irqsave(&sem->lock, flags);/* 获得保护信号量的自旋锁。 */

         if (likely(list_empty(&sem->wait_list)))/* 没有进程在等待此信号量 */

                   sem->count++;/* 直接将信号量计数加1.注意,这与vxworks的二进制信号量的不同 */

         else

                   __up(sem);/* 唤醒等待队列上的任务 */

         spin_unlock_irqrestore(&sem->lock, flags);/* 释放自旋锁 */

}

 

/**

 * 唤醒等待队列上的任务。

 * 这个实现已经与linux2.6.11有了非常大的变化。不再考虑一次唤醒多个任务的情况。只唤醒队列头上的任务。

 * 这样的实现简单,但是不支持按优先级排队。影响系统实时性。这是因为实时信号量可以使用rt_mutex

 */

static noinline void __sched __up(struct semaphore *sem)

{

         /**

          * 取等待队列上的第一个任务。

          */

         struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,

                                                        struct semaphore_waiter, list);

         /**

          * 将第一个任务从等待队列上摘除。

          */

         list_del(&waiter->list);

         /**

          * 设置up标志,表示任务是被信号量唤醒的。

          * 而不是被信号或者定时器唤醒的。

          */

         waiter->up = 1;

         /**

          * 唤醒等待任务。

          */

         wake_up_process(waiter->task);

}

 

除了down以外,linux还提供了以下原语供使用:

函数名

功能

备注

down

获取信号量,不可被信号唤醒

不推荐使用

down_interruptible

获取信号量,但可被信号唤醒

 

down_killable

获取信号量,但可被致命信号唤醒

 

down_trylock

获取信号量,如果信号量不可用,则返回失败而不等待

可用于中断上下文

down_timeout

获取信号量,如果在指定的时间内,任务没有被信号量唤醒,则返回失败

 

 

1.1.2      Rt-mutex

根据我们的分析,普通信号量的实现很简单。在信号量不可用时,直接将当前进程挂入信号量的等待队列末尾。而没有按等待任务的优先级进行排除。

如果获得信号量的任务被抢占,导致信号量迟迟不能释放,那么等待的任务就会一直被阻塞。即使等待的任务优先级再高也没有用。这在实时系统中是不可容忍的。这是典型的“优先级反转”。要解决这个问题,需要信号量实现优先级可继承。业界典型的实现有一级优先级可继承和二级优先级可继承。

优先级反转问题曾经引起火星探测的故障。有兴趣的同学可以深入了解一下。

Linux3.0rtmutex.c中实现了另外一种互斥信号量,这种信号量实现了优先级继承协议,仅仅能够用于互斥而不能象信号量一样也用于同步。

详细的文档请参见documentation/rt-mutex.txtdocumentation/rt-mutex-design.txt

1.1.3      completion

linux还实现了另外一种类似于信号量的东东:completion。从代码上来看,它可以用于互斥和同步。但是在linux中,几乎都是将它用于同步。

它的实现方法和信号量非常相似,同时保留两套接口,可能是出于兼容的原因。

 

Completion数据结构的声明是:

struct completion {

         /**

          * 相当于信号量中的计数变量。

          * done大于0时,表示任务已经完成。

          */

         unsigned int done;

         /**

          * 等待事件完成的队列。

          */

         wait_queue_head_t wait;

};

 

等待事件完成的函数是wait_for_completion

void __sched wait_for_completion(struct completion *x)

{

         wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);

}

 

其中wait_for_common是一个通用函数,除wait_for_completion外,其他几个等待completion完成的函数也是对它的封装函数。

/**

 * 等待completion事件完成的通用函数。

 *              completion:               要等待的完成变量

 *              timeout:            超时时间,以tick为单位。MAX_SCHEDULE_TIMEOUT表示永远等待。

 *              state:                          当需要阻塞时,将进程设置为何种状态。

 */

static long __sched

wait_for_common(struct completion *x, long timeout, int state)

{

         might_sleep();/* 这个调试函数用于通知调用者,本函数可能睡眠,如果在不可睡眠的上下文(如中断上下文或关抢占的上下文) 调用了此函数,则系统会产生警告 */

 

         /**

          * 在关中断状态下,是不应当调用本函数的。

          * 因此,可以确保此时系统是开中断的,因此也就可用spin_lock_irq来强开强关中断了。

          */

         spin_lock_irq(&x->wait.lock);

         /**

          * 如果事件还没有完成,那么就将当前任务挂到等待队列并挂起。

          */

         timeout = do_wait_for_common(x, timeout, state);

         /**

          * 释放自旋锁并强开中断。

          */

         spin_unlock_irq(&x->wait.lock);

         return timeout;

}

 

/**

 * 等待事件完成。

 * 需要注意本函数的返回值:0表示超时,大于0表示成功。小于0表示需要重新执行系统调用。

 */

static inline long __sched

do_wait_for_common(struct completion *x, long timeout, int state)

{

         if (!x->done) {/* 事件还没有完成,进入等待流程 */

                   DECLARE_WAITQUEUE(wait, current);/* 定义一个等待节点,并将当前任务设置为等待任务 */

 

                   /**

                    * 将当前任务加入到等待队列中。

                    */

                   __add_wait_queue_tail_exclusive(&x->wait, &wait);

                   do {/* 循环等待,直到事件完成或者被信号唤醒或者超时 */

                            if (signal_pending_state(state, current)) {/* 被信号唤醒了 */

                                     timeout = -ERESTARTSYS;/* 返回ERESTARTSYS,表示需要重新执行系统调用。 */

                                     break;

                            }

                            /* 设置任务状态,必须在开中断之前执行,否则可能会使中断中唤醒任务的操作失效。请读者在熟悉调度以后慢慢领会。 */

                            __set_current_state(state);

                            /**

                             * 释放自旋锁并开中断。

                             */

                            spin_unlock_irq(&x->wait.lock);

                            /**

                             * 调度出去,等待被唤醒。

                             */

                            timeout = schedule_timeout(timeout);

                            /**

                             * 被唤醒后,需要对completion的状态进行判断,必须再次获得自旋锁。

                             */

                            spin_lock_irq(&x->wait.lock);

                   } while (!x->done && timeout);/* 如果被事件唤醒或者超时时间到,则退出 */

                   /**

                    * 无论如何,都需要将任务从等待队列中取出。

                    */

                   __remove_wait_queue(&x->wait, &wait);

                   if (!x->done)/* 事件还没有完成,如果是被信号打断,则返回ERESTARTSYS,否则返回0表示超时。 */

                            return timeout;

         }

         /**

          * 运行到这里,说明事件已经完成,则递减done

          */

         x->done--;

         /**

          * 请注意返回值,这里确保不会返回0值。因为0值表示超时。

          */

         return timeout ?: 1;

}

 

当事件完成时,调用complete通知等待队列上的任务。

/**

 * 完成事件,并唤醒等待的任务。

 */

void complete(struct completion *x)

{

         unsigned long flags;

 

         /**

          * 获得等待队列的自旋锁并关中断

          * 注意,这里不是强关强开中断。因为可以在关中断上下文(如果中断上下文)中调用此函数。不能简单的强开中断。

          */

         spin_lock_irqsave(&x->wait.lock, flags);

         x->done++;/* 增加计数,表示事件已经完成。 */

         __wake_up_common(&x->wait, TASK_NORMAL, 1, 0, NULL);/* 唤醒等待队列上的任务。请注意:只唤醒了第一个任务 */

         spin_unlock_irqrestore(&x->wait.lock, flags);

}

 

系统还提供以下相关函数:

函数名

作用

备注

complete_all

唤醒等待队列上的所有任务

 

wait_for_completion_timeout

等待事件完成,直到预定的时间到达

 

wait_for_completion_interruptible

等待事件完成,可以被信号打断

 

wait_for_completion_interruptible_timeout

等待事件完成,直到预定的时间到达,或者被信号打断

 

wait_for_completion_killable

等待事件完成,可以被致命信号打断

 

wait_for_completion_killable_timeout

等待事件完成,直到预定的时间到达,或者被致命信号打断

 

try_wait_for_completion

等待事件完成,如果事件没有完成,则返回错误信息并直接返回,不等待。

 

completion_done

判断事件是否完成

 

阅读(663) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~