最近找了一个机会来研究一下futex,但是发现rtmutex是futex的基础,所以就先仔细研究了一下rtmutex.
关于rtmutex的背景,在内核文档rt-mutex-design.txt里面已经有很详细的讲解了https://www.kernel.org/doc/Documentation/locking/rt-mutex-design.txt
,在此不再赘述。
下面来一一解释rtmutex里面用到的三个结构体(基于3.10内核),首先是struct rt_mutex用来描述这个rtmutex锁。
-
21 /**
-
22 * The rt_mutex structure
-
23 *
-
24 * @wait_lock: spinlock to protect the structure
-
25 * @waiters: rbtree root to enqueue waiters in priority order
-
26 * @waiters_leftmost: top waiter
-
27 * @owner: the mutex owner
-
28 */
-
29 struct rt_mutex {
-
30 raw_spinlock_t wait_lock;//用于保护该结构的spinlock
-
31 struct rb_root waiters;/RB树的根节点,用于组织所有属于这个rt_mutex的rt_mutex_waiter
-
32 struct rb_node *waiters_leftmost;//该rt_mutex上的top waiter,即优先级最高的waiter
-
33 struct task_struct *owner;//当前持有改锁的进程。因为系统都是32bit对齐的。所以bit0 用于表示是否有waiters.
-
34 #ifdef CONFIG_DEBUG_RT_MUTEXES
-
35 int save_state;
-
36 const char *name, *file;
-
37 int line;
-
38 void *magic;
-
39 #endif
-
40 };
接下来是struct rt_mutex_waiter,用来表示rtmutex的一个等待者(进程)。
-
40 * This is the control structure for tasks blocked on a rt_mutex,
-
41 * which is allocated on the kernel stack on of the blocked task.
-
42 *
-
43 * @tree_entry: pi node to enqueue into the mutex waiters tree
-
44 * @pi_tree_entry: pi node to enqueue into the mutex owner waiters tree
-
45 * @task: task reference to the blocked task
-
46 */
-
47 struct rt_mutex_waiter {
-
48 struct rb_node tree_entry;//用于链入rt_mutex->waiter所在的RB树
-
49 struct rb_node pi_tree_entry;//用于链入struct task_struct 的pi_waiters所在的RB树。但是只有一个rtmutex的优先级最高的waiter才会被链入。
-
50 struct task_struct *task;//指向持有该rt_mutex_waiter的进程
-
51 struct rt_mutex *lock;//waiter等待的rt_mutex
-
52 #ifdef CONFIG_DEBUG_RT_MUTEXES
-
53 unsigned long ip;
-
54 struct pid *deadlock_task_pid;
-
55 struct rt_mutex *deadlock_lock;
-
56 #endif
-
57 int prio;//waiter的优先级
-
58 };
一个rt_mutex可能会被不同的进程所acquire/release.所以rt_mutex_waiter用于搭建rt_mutex跟不同进程之间的联系。同时在task_struct里面也有rtmutex相关的数据成员。
-
struct task_struct {
-
…..
-
1426 #ifdef CONFIG_RT_MUTEXES
-
1427 /* PI waiters blocked on a rt_mutex held by this task */
-
1428 struct rb_root pi_waiters;//RB树的根节点,用于链入不同的rt_mutex_waiters,即该进程所持有的不同的RTmutex的top waiter。所以并不是每一个rt_mutex_waiter都会被链入某个进程的task_struct。只有每一个lock的top waiter才会被链入。
-
1429 struct rb_node *pi_waiters_leftmost;//RB树的最小节点,即优先级最高的waiter。用于提高查询效率
-
1430 /* Deadlock detection and priority inheritance handling */
-
1431 struct rt_mutex_waiter *pi_blocked_on;//用于表示当前进程正在等待的锁
-
1434 #endif
-
-
….
-
};
这三个结构体之间的关系图大体如下:
对于mutex,我们都知道最基本的三个函数 init, lock和unlcok。init没什么好说的。
-
1075 /**
-
1076 * rt_mutex_lock - lock a rt_mutex
-
1077 *
-
1078 * @lock: the rt_mutex to be locked
-
1079 */
-
1080 void __sched rt_mutex_lock(struct rt_mutex *lock)
-
1081 {
-
1082 might_sleep();
-
1083
-
1084 rt_mutex_fastlock(lock, TASK_UNINTERRUPTIBLE, 0, rt_mutex_slowlock);
-
1085 }
1020 /*
1021 * debug aware fast / slowpath lock,trylock,unlock
1022 *
1023 * The atomic acquire/release ops are compiled away, when either the
1024 * architecture does not support cmpxchg or when debugging is enabled.
1025 */
1026 static inline int
1027 rt_mutex_fastlock(struct rt_mutex *lock, int state,
1028 int detect_deadlock,
1029 int (*slowfn)(struct rt_mutex *lock, int state,
1030 struct hrtimer_sleeper *timeout,
1031 int detect_deadlock))
1032 {
1033 if (!detect_deadlock && likely(rt_mutex_cmpxchg(lock, NULL, current))) {
1034 rt_mutex_deadlock_account_lock(lock, current);
1035 return 0;
1036 } else
1037 return slowfn(lock, state, NULL, detect_deadlock);
1038 }
首先先调用一些CPU支持的基于指令的comare and exchange操作。如果该操作不支持或者该操作失败的话。接着调用
rt_mutex_slowlock函数。
rt_mutex_slowlock这个函数过于庞大,下面的函数调用关系图可以帮助理解。
下面就来详细的分析rt_mutex_slowlock的相关代码
-
871 /*
-
872 * Slow path lock function:
-
873 */
-
874 static int __sched
-
875 rt_mutex_slowlock(struct rt_mutex *lock, int state,
-
876 struct hrtimer_sleeper *timeout,
-
877 int detect_deadlock)
-
878 {
-
879 struct rt_mutex_waiter waiter;//注意这边的waiter是一个局部变量。因为waiter只有在本进程未成功获得锁的情况下才有意义
-
880 int ret = 0;
-
881
-
882 debug_rt_mutex_init_waiter(&waiter);
-
883 RB_CLEAR_NODE(&waiter.pi_tree_entry);//初始化RB树
-
884 RB_CLEAR_NODE(&waiter.tree_entry);
-
885
-
886 raw_spin_lock(&lock->wait_lock);
-
887
-
888 /* Try to acquire the lock again: */
-
889 if (try_to_take_rt_mutex(lock, current, NULL)) { //调用try_to_wake_rt_mutex尝试获得rtmutex。该函数在后面__rt_mutex_slowlock还会被调用
-
890 raw_spin_unlock(&lock->wait_lock);
-
891 return 0;
-
892 }
-
893
-
894 set_current_state(state);
-
895
-
896 /* Setup the timer, when timeout != NULL */
-
897 if (unlikely(timeout)) { //如果设置了timeout值,调用内核high resolution timer来定时,到期唤醒该进程。
-
898 hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
-
899 if (!hrtimer_active(&timeout->timer))
-
900 timeout->task = NULL;
-
901 }
-
902
-
903 ret = task_blocks_on_rt_mutex(lock, &waiter, current, detect_deadlock); //调用task_blocks_on_rt_mutex来准备waiter,并调整PI chain。在这个过程中会多次检查时候能够获得锁
904
905 if (likely(!ret))//函数走到这边,waiter就绪(在lock 以及owner的pi_waiters RB树中)并且PI chain的优先级调整及死锁检查已经完毕。下面准备让当前进程sleep wait
906 ret = __rt_mutex_slowlock(lock, state, timeout, &waiter);
下面是try_to_take_rt_mutex函数。这个函数在rt_mutex_slowtrylock里面也会被调用。我们都知道lock与trylock之间的区别。所以这个try_to_take_rt_mutex 正如名字的含义一样。尝试去获取锁。
-
static int try_to_take_rt_mutex(struct rt_mutex *lock, struct task_struct *task,
-
struct rt_mutex_waiter *waiter)
-
{
-
-
mark_rt_mutex_waiters(lock);
-
-
if (rt_mutex_owner(lock)) //如果当前锁已经有了owner,获取失败
-
return 0;
-
-
if (rt_mutex_has_waiters(lock)) {
-
if (task->prio >= rt_mutex_top_waiter(lock)->prio) { //如果lock没有owner,并且当前进程并不是top waiter。获取失败
-
if (!waiter || waiter != rt_mutex_top_waiter(lock))
-
return 0;
-
}
-
}
-
-
if (waiter || rt_mutex_has_waiters(lock)) {//由于rt_mutex_slowlock和rt_mutex_slowtrylock里面调用该函数时,waiter的输入参数都是NULL。
-
unsigned long flags;
-
struct rt_mutex_waiter *top;
-
-
raw_spin_lock_irqsave(&task->pi_lock, flags);
-
-
/* remove the queued waiter. */
-
if (waiter) {
-
rt_mutex_dequeue(lock, waiter);
-
task->pi_blocked_on = NULL;
-
}
-
if (rt_mutex_has_waiters(lock)) {
-
top = rt_mutex_top_waiter(lock);//将该锁的top waiter插入到当前进程的pi_waiter里面去。为啥? 因为当前进程即将成为该锁的owner。waiter的优先级将影响当前进程的优先级。
-
rt_mutex_enqueue_pi(task, top);
-
}
-
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-
}
-
-
/* We got the lock. */
-
debug_rt_mutex_lock(lock);
-
-
rt_mutex_set_owner(lock, task);
-
-
rt_mutex_deadlock_account_lock(lock, task);
-
-
return 1;
-
}
接下来是task_blocks_on_rt_mutex,这个函数虽然不长,但是有几个子函数比较长。这个函数是RTmutex思想的核心,即优先级继承。
-
static int task_blocks_on_rt_mutex(struct rt_mutex *lock,
-
struct rt_mutex_waiter *waiter,
-
struct task_struct *task,
-
int detect_deadlock)
-
{
-
struct task_struct *owner = rt_mutex_owner(lock);
-
struct rt_mutex_waiter *top_waiter = waiter;
-
struct rt_mutex *next_lock;
-
int chain_walk = 0, res;
-
unsigned long flags;
-
-
if (owner == task) //如果当前进程已经是owner了,死锁。
-
//比如某个二货在代码重复两次lock同一个锁。 rt_mutex_lock(&lock); rt_mutex_lock(&lock);这样的代码。
-
return -EDEADLK;
-
-
raw_spin_lock_irqsave(&task->pi_lock, flags);
-
__rt_mutex_adjust_prio(task);
-
waiter->task = task;
-
waiter->lock = lock;
-
waiter->prio = task->prio;
-
-
/* Get the top priority waiter on the lock */
-
if (rt_mutex_has_waiters(lock))
-
top_waiter = rt_mutex_top_waiter(lock);
-
rt_mutex_enqueue(lock, waiter);//将waiter插入到lock的RB树中
-
-
task->pi_blocked_on = waiter;//设置当前进程被阻塞在waiter上,如果后面获取锁成功的话,这个pi_blocked_on会被重新置为NULL
-
-
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-
-
if (!owner)
-
return 0;
-
-
raw_spin_lock_irqsave(&owner->pi_lock, flags);
-
if (waiter == rt_mutex_top_waiter(lock)) { //如果当前waiter成为lock的top waiter的话,调整owner的PI_waiter.因为进程的pi_waiters里面链入的是lock的top waiter
-
rt_mutex_dequeue_pi(owner, top_waiter);
-
rt_mutex_enqueue_pi(owner, waiter);
-
-
__rt_mutex_adjust_prio(owner);//调整owner的优先级,因为一个高优先级的waiter出现了。
-
if (owner->pi_blocked_on)//如果owner同样被另外一把lock阻塞的话。那么需要遍历chain,调整优先级
-
chain_walk = 1;
-
} else if (debug_rt_mutex_detect_deadlock(waiter, detect_deadlock)) {
-
chain_walk = 1;
-
}
-
/* Store the lock on which owner is blocked or NULL */
-
next_lock = task_blocked_on_lock(owner);//获去阻塞owner的lock。如果next_lock存在的话,那么需要一步一步往下去调整整个pi chain
-
-
raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
-
/*
-
* Even if full deadlock detection is on, if the owner is not
-
* blocked itself, we can avoid finding this out in the chain
-
* walk.
-
*/
-
if (!chain_walk || !next_lock)
-
return 0;
-
-
/*
-
* The owner can't disappear while holding a lock,
-
* so the owner struct is protected by wait_lock.
-
* Gets dropped in rt_mutex_adjust_prio_chain()!
-
*/
-
get_task_struct(owner);
-
-
raw_spin_unlock(&lock->wait_lock);
-
-
res = rt_mutex_adjust_prio_chain(owner, detect_deadlock, lock,
-
next_lock, waiter, task); //遍历PI chain,调整优先级。关于pi chain,建议先看一下rt-mutex-design.txt大体了解一下。
-
-
raw_spin_lock(&lock->wait_lock);
-
-
return res;
-
}
下面重头戏来了,我个人认为这个是rtmutex里面代码部分最难懂的部分rt_mutex_adjust_prio_chain。这段代码中,输入参数之间的关系要搞清楚,因为其中包含了两个
rt_mutex和两个task_struct
-
static int rt_mutex_adjust_prio_chain(struct task_struct *task, //orig_lock的owner
-
int deadlock_detect, //是否检查死锁
-
struct rt_mutex *orig_lock, //当前进程尝试要获取的lock
-
struct rt_mutex *next_lock, //owner尝试要获取的lock
-
struct rt_mutex_waiter *orig_waiter, //当前进程的waiter
-
struct task_struct *top_task) //当前进程
-
{
-
struct rt_mutex *lock;
-
struct rt_mutex_waiter *waiter, *top_waiter = orig_waiter;
-
int detect_deadlock, ret = 0, depth = 0;
-
unsigned long flags;
-
-
detect_deadlock = debug_rt_mutex_detect_deadlock(orig_waiter,
-
deadlock_detect);
-
-
/*
-
* The (de)boosting is a step by step approach with a lot of
-
* pitfalls. We want this to be preemptible and we want hold a
-
* maximum of two locks per step. So we have to check
-
* carefully whether things change under us.
-
*/
-
again:
-
if (++depth > max_lock_depth) {
-
//该函数会遍历PI chain,但也不是无限变量。出于复杂度和性能的考虑,做了1024的限制
-
static int prev_max;
-
-
/*
-
* Print this only once. If the admin changes the limit,
-
* print a new message when reaching the limit again.
-
*/
-
if (prev_max != max_lock_depth) {
-
prev_max = max_lock_depth;
-
printk(KERN_WARNING "Maximum lock depth %d reached "
-
"task: %s (%d)\n", max_lock_depth,
-
top_task->comm, task_pid_nr(top_task));
-
}
-
put_task_struct(task);
-
-
return -EDEADLK;
-
}
-
-
retry:
-
/*
-
* Task can not go away as we did a get_task() before !
-
*/
-
raw_spin_lock_irqsave(&task->pi_lock, flags);
-
-
waiter = task->pi_blocked_on;//获取task的相应waiter,注意这边的task参数其实是orig_waiter的owner。
-
//该函数主要是遍历PI-chain来调整优先级,直到某一个owner并没有被阻塞的话。
-
/*
-
* Check whether the end of the boosting chain has been
-
* reached or the state of the chain has changed while we
-
* dropped the locks.
-
*/
-
if (!waiter)
-
goto out_unlock_pi;
-
-
/*
-
* Check the orig_waiter state. After we dropped the locks,
-
* the previous owner of the lock might have released the lock.
-
*/
-
if (orig_waiter && !rt_mutex_owner(orig_lock))//检查orig_lock的状态,因为可能在我们调整的过程中,orig_lock被释放了。
-
goto out_unlock_pi;
-
-
/*
-
* We dropped all locks after taking a refcount on @task, so
-
* the task might have moved on in the lock chain or even left
-
* the chain completely and blocks now on an unrelated lock or
-
* on @orig_lock.
-
*
-
* We stored the lock on which @task was blocked in @next_lock,
-
* so we can detect the chain change.
-
*/
-
if (next_lock != waiter->lock)//一般情况下,这边的next_lock是等于waiter->lock的。但是如果owner的阻塞从一个锁变成了另外一个锁。那么就没必要继续遍历了。
-
goto out_unlock_pi;
/*
* Drop out, when the task has no waiters. Note,
* top_waiter can be NULL, when we are in the deboosting
* mode!
*/
if (top_waiter) {
if (!task_has_pi_waiters(task))//在遍历的过程中,如果一个task即owner不在阻塞了,那么没有必要继续遍历了。
goto out_unlock_pi;
/*
* If deadlock detection is off, we stop here if we
* are not the top pi waiter of the task.
*/
if (!detect_deadlock && top_waiter != task_top_pi_waiter(task))
goto out_unlock_pi;
}
/*
* When deadlock detection is off then we check, if further
* priority adjustment is necessary.
*/
if (!detect_deadlock && waiter->prio == task->prio)//如果进程的优先级没有发生改变,那么在没有开启死锁检查的情况下,没有必要再继续遍历pi chain了
goto out_unlock_pi;
lock = waiter->lock;
if (!raw_spin_trylock(&lock->wait_lock)) {
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
cpu_relax();
goto retry;
}
/*
* Deadlock detection. If the lock is the same as the original
* lock which caused us to walk the lock chain or if the
* current lock is owned by the task which initiated the chain
* walk, we detected a deadlock.
*/
if (lock == orig_lock || rt_mutex_owner(lock) == top_task) {
debug_rt_mutex_deadlock(deadlock_detect, orig_waiter, lock);
raw_spin_unlock(&lock->wait_lock);
ret = -EDEADLK;
goto out_unlock_pi;
}
-
这边是检查死锁的关键。
-
(1)lock == orig_lock, 注意orig_lock是阻塞当前调用rt_mutex_lock的进程的锁,而lock是pi chain中任意一把锁。如果lock == orig_lock。即代表
-
pi chain的全部或者局部形成了一个环。打个比方
-
rt_mutex A, B ,c ;
-
func_A {
-
rt_mutex_lock ( &A);
-
rt_mutex_lock (&B);
-
};
-
-
func_B {
-
rt_mutex_lock (&B);
-
rt_mutex_lock (&C);
-
};
-
-
func_C {
-
rt_mutex_lock (&C);
-
rt_mutex_lock (&A);
-
};
-
-
上面三个函数分别代表三个进程,用pi chain表示大致如下
-
-
A->func_A->B->func_B->C->fun_C->A ;这样就形成了一个环。
-
-
(2)第二种情况rt_mutex_owner(lock) == top_task ,这个标记其实跟上面形成环的比较是一样的,只是比较的时机跟上面稍有不同而已。
-
func_A->B->func_B->C->fun_C->A ->func_A. 环大概是这样而已。下面继续看代码
-
-
top_waiter = rt_mutex_top_waiter(lock);
-
-
/* Requeue the waiter */// 重新requee waiter,因为优先级发生了改变。一个waiter的优先级是肯定大于等于task的优先级的。上面我们已经处理的等于的情况,这里继续处理大于的情况。
-
rt_mutex_dequeue(lock, waiter);
-
waiter->prio = task->prio;
-
rt_mutex_enqueue(lock, waiter);
-
-
/* Release the task */
-
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-
if (!rt_mutex_owner(lock)) {
-
/*
-
* If the requeue above changed the top waiter, then we need
-
* to wake the new top waiter up to try to get the lock.
-
*/
-
-
-
if (top_waiter != rt_mutex_top_waiter(lock))//如果在处理的过程中,一个lock已经被释放了,那么唤醒这个lock的top waiter
-
wake_up_process(rt_mutex_top_waiter(lock)->task);
-
raw_spin_unlock(&lock->wait_lock);
-
goto out_put_task;
-
}
-
put_task_struct(task);
-
-
/* Grab the next task */
-
task = rt_mutex_owner(lock); //获取下一个owner
-
get_task_struct(task);
-
raw_spin_lock_irqsave(&task->pi_lock, flags);
-
-
-
if (waiter == rt_mutex_top_waiter(lock)) { //因为该lock的top waiter发生了变化。调整owner的pi_waiters。(其实waiter已经变了。。。)
-
/* Boost the owner */
-
rt_mutex_dequeue_pi(task, top_waiter);
-
rt_mutex_enqueue_pi(task, waiter);
-
__rt_mutex_adjust_prio(task);
-
-
-
} else if (top_waiter == waiter) {//该进程的waiter本来是top waiter的,但是很凑巧,这个top waiter获得了锁。那么就需要调整task的优先级(降低)。
-
/* Deboost the owner */
-
rt_mutex_dequeue_pi(task, waiter);
-
waiter = rt_mutex_top_waiter(lock);
-
rt_mutex_enqueue_pi(task, waiter);
-
__rt_mutex_adjust_prio(task);
-
}
-
-
next_lock = task_blocked_on_lock(task);//获得下一把阻塞task的lock
-
-
-
raw_spin_unlock_irqrestore(&task->pi_lock, flags);
-
-
-
top_waiter = rt_mutex_top_waiter(lock);
-
raw_spin_unlock(&lock->wait_lock);
-
-
-
/*
-
* We reached the end of the lock chain. Stop right here. No
-
* point to go back just to figure that out.
-
*/
-
if (!next_lock)
-
goto out_put_task;
-
-
-
if (!detect_deadlock && waiter != top_waiter)
-
goto out_put_task;
-
-
-
goto again;
再次回到rt_mutex_slow函数,在task_blocks_on_rt_mutex执行完之后,如果还是没有获得锁的话,继续执行__rt_mutex_slowlock
static int __sched
-
__rt_mutex_slowlock(struct rt_mutex *lock, int state,
-
struct hrtimer_sleeper *timeout,
-
struct rt_mutex_waiter *waiter)
-
{
-
int ret = 0;
-
-
-
for (;;) {
-
/* Try to acquire the lock: */
-
if (try_to_take_rt_mutex(lock, current, waiter))//上次已经提到了try_to_take_rt_mutex函数,但是注意这边的输入参数waiter不是NULL了。所以实际情况会稍有不同
-
break;
-
-
-
/*
-
* TASK_INTERRUPTIBLE checks for signals and
-
* timeout. Ignored otherwise.
-
*/
-
if (unlikely(state == TASK_INTERRUPTIBLE)) {
-
/* Signal pending? */
-
if (signal_pending(current)) //如果进程状态是TASK_INTERRUPTIBLE的话,即代表可以被信号打断。这边检查是否有未处理的信号。
-
ret = -EINTR;
-
if (timeout && !timeout->task)
-
ret = -ETIMEDOUT;
-
if (ret)
-
break;
-
}
-
-
-
raw_spin_unlock(&lock->wait_lock);
-
-
-
debug_rt_mutex_print_deadlock(waiter);
-
-
-
schedule_rt_mutex(lock);//调用schedule()睡眠
-
-
-
raw_spin_lock(&lock->wait_lock);
-
set_current_state(state);
-
}
-
-
-
return ret;
-
}
到此rt_mutex_lock的过程大体结束。
-
if (unlikely(ret)) {//如果返回值不为0的话,那么就是被信号打断或者timeout时间到了。不是被unlock唤醒的,所以需要在rt_mutex_lock中删除waiter.
-
-
//如果是被unlock唤醒的话,waiter的清理工作由unlock函数完成。
-
remove_waiter(lock, &waiter);
-
rt_mutex_handle_deadlock(ret, detect_deadlock, &waiter);
-
}
-
-
/*
-
* try_to_take_rt_mutex() sets the waiter bit
-
* unconditionally. We might have to fix that up.
-
*/
-
fixup_rt_mutex_waiters(lock);
-
-
-
raw_spin_unlock(&lock->wait_lock);
-
-
-
/* Remove pending timer: */
-
if (unlikely(timeout))
-
hrtimer_cancel(&timeout->timer);
-
-
-
debug_rt_mutex_free_waiter(&waiter);
-
阅读(2810) | 评论(0) | 转发(0) |