Linux内核开发
分类: LINUX
2015-12-05 16:44:01
1.futex引入的意义
传统的SYSTEM V IPC机制需要系统调用进入内核态去操作某个内核对象,由内核来仲裁同步,事实上大部分情况下并没有资源竞争即多个申请者不会同时去竞争同步对象,此种情况下仍然进入内核态会显得很浪费,系统开销增加进而造成性能拆扣。
Futex(Fast Userspace Mutex)快速用户态互斥体,它是一种由用户态和内核态共同完成的同步机制。创建时是在用户空间通过mmap申请一片共享内存以便多进程间共同访问此futex,用户程序首先访问用户态的futex,只在判断出存在冲突(如一个进程已经拥有此futex,另一个进程申请访问,此时便存在一个冲突)时才进入内核态进行仲裁同步。
用户空间的访问和冲突判断由glibc库完成,冲突仲裁由内核的futex模块完成。
内核仲裁:将用户态的锁置上等待标志表明有锁的等待者存在,并调用schedule()将锁申请者挂起;当锁的拥有者释放锁(由glibc库完成)时,检查发现该锁有等待者就进入内核将等待者唤醒。
Glibc库中实现有pthread_mutex_lock()/pthread_mutex_unlock()等用户态锁接口,以提供快速的futex机制。
2.优先级继承功能的futex性能比普通的futex差
pthread_mutex_lock()/pthread_mutex_unlock()配有优先级继承(Priority Inherent简称pi)后的有时性能会严重影响了业务的正常运行。
不带优先级继承功能的锁,会调用内核流程futex_wait()/futex_wake(),带有优先级继承功能的锁会调用内核流程futex_lock_pi()/futex_unlock_pi();pi锁导致业务性能下降是实现机制以及业务模型导致。
pthread_mutex_lock
Ø 尝试获取锁,如果失败则进入内核阻塞
Ø 重复上述过程直到成功,并置持有锁标志
pthread_mutex_unlock
Ø 如果有等待者,则进入内核态唤醒
pthread_mutex_lock
Ø 尝试获取锁,如果失败则进入内核阻塞
Ø 从内核返回后(通常表明加锁成功),置持有锁标志
pthread_mutex_unlock
Ø 如果有等待者,则进入内核态唤醒
在加锁阶段,非pi锁可能会在glibc库中多次竞争并多次进入内核态阻塞直到获取锁;而pi锁最多只会进入内核态一次。从此角度可以看出,非pi锁存在竞争的不公平性。
futex_wait
Ø 取__lock的值,与传进来的val参数比较,如果不等,则直接返回;
Ø 将自己加入到等待队列中,然后调用schedule将自己调度出去。
futex_wake
Ø 遍历hash链表,找到对应的futex,调用wake_futex唤醒对应阻塞的线程。因为系统调用传进来的nr_wake参数为1, 实际上只唤醒1个线程就退出,优先唤醒优先级高的。
futex_lock_pi
Ø 使用queue_lock获取spin_lock锁,保证后面对信号量相关的操作都是安全的。
Ø 再次使用cmpxchg_futex_value_locked原子指令试图将__lock字段改为tid,如果能修改成功,表明锁拥有者释放了该锁。加锁成功直接返回。
Ø 将__lock字段的bit 31置1,表明现在开始有线程将阻塞到该锁上。
Ø 从内核维护的相关锁信息pi_state中,找到对应的内核实时信号量;将自己放到用户态信号量等待队列,并调用rt_mutex_timed_lock阻塞到内核的实时信号量上。
Ø 从rt_mutex_timed_lock返回时,可能失败,也可能是真正获取到了信号量;这期间可能会导致pi_state相关信息不一致,如果不一致,则修正。
Ø 必要时对锁拥有者线程进行优先级的提升。
Ø 返回rt_mutex_timed_lock的返回结果。
futex_unlock_pi
Ø 如果__lock中的tid不是自己,返回错误。
Ø 使用cmpxchg_futex_value_locked原子操作,如果__lock等于当前的tid,则将其改为0,然后返回。
Ø 如果用户态信号量的等待队列中还有线程阻塞,则使用wake_futex_pi函数挑选优先级最高的线程为新的owner,修改__lock的tid属性为新owner的tid,并唤醒。
Ø 如果等待队列中没有线程阻塞,则在unlock_futex_pi函数中将__lock值改为0。
整个加锁/解锁流程中,主要有三点区别:
1. pi锁远比非pi锁复杂,并使用rt_mutex内核对象进行线程的管理和唤醒,因此pi锁在内核中的执行时间比非pi锁要长得多;
2. pi锁直接参于锁的管理,非pi锁只是简单的挂起和唤醒线程(在glibc中管理锁,见3.1);
3. pi锁会改变锁当前持有者的优先级(优先级继承,以避免优先级反转)。
Futex通过内核对象rt_mutex实现,本文针对recursive pi类型的futex以及线程间共享锁的情况,讲解内核中的futex_lock_pi()流程,展示所涉及的几个核心数据结构。
全局变量static struct futex_hash_bucket futex_queues[1<
futex_queues做为一个桥梁作用,glibc系统调用进入内核时,第一件事情就是通过它来找到内核的的rt_mutex对象;利用当前task_struct->mm->mmap_sem和用户态lock字段的地址生成key,用此key哈希到此变量数组的某个成员,从而建立起二者的联系。
数据结构查找关系:
futex_lock_pi():
1.uaddr(即用户态lock字段的地址), mmap_sem->key;
2.栈上分配futex_q, 将futex_q挂入futex_queues[hash(key)]中;
3.futex_q->futex_pi_state->rt_mutex,查找或分配futex_pi_state进而获得rt_mutex;
4.栈上分配rt_mutex_waiter,将rt_mutex_waiter挂入当前task和步骤3的rt_mutex中。
futex_unlock_pi():
1.key->futex_queues[hash(key)]->futex_q->futex_pi_state->rt_mutex->rt_mutex_waiter->task,从而找到等待唤醒的任务。
数据结构关系图:
数据结构描述(同种颜色相互对应):
struct task_struct {
spinlock_t pi_lock;
struct plist_head pi_waiters; /* 只链入某rtmutex中优先级最高的rt_mutex_waiter */
/* Deadlock detection and priority inheritance handling */
struct rt_mutex_waiter *pi_blocked_on;
struct list_head pi_state_list;
struct futex_pi_state *pi_state_cache;
}
static struct futex_hash_bucket futex_queues[1<
struct futex_hash_bucket {
spinlock_t lock;
struct plist_head chain;
};
struct futex_q {
struct plist_node list; /* 根据当前线程的nomal_prio值链入 futex_hash_bucket
的chain中:plist_add(&q->list, &hb->chain); */
wait_queue_head_t waiters;
spinlock_t *lock_ptr; /* 指向futex_hash_bucket中的lock */
union futex_key key; /* 选择futex_hash_bucket */
/* For fd, sigio sent using these: */
int fd;
struct file *filp;
/* Optional priority inheritance state: */
struct futex_pi_state *pi_state; /*指向task_struct中第一次预先分配好的pi_state_cache */
struct task_struct *task;
struct rt_mutex_waiter waiter;
};
struct futex_pi_state {
struct list_head list; /* 链入task_struct中的pi_state_list */
struct rt_mutex pi_mutex;
struct task_struct *owner; /* 锁的当前拥有者 */
atomic_t refcount;
union futex_key key; /* 拷贝futex_q中的 key */
};
struct rt_mutex {
spinlock_t wait_lock;
struct plist_head wait_list; /* 所有的该锁的rt_mutex_waiter */
struct task_struct *owner;
}
struct rt_mutex_waiter {
struct plist_node list_entry; /* 链入rt_mutex中的wait_list */
struct plist_node pi_list_entry; /* 链入task_struct中的pi_waiters */
struct task_struct *task;
struct rt_mutex *lock;
}
struct rt_mutex锁的owner的低2位作为如下标志使用:
#define RT_MUTEX_OWNER_PENDING 1UL /* 唤醒时会将实时锁置上此标志,
用于try_to_steal_lock() */
#define RT_MUTEX_HAS_WAITERS 2UL /* 有线程等待实时锁置上此标志 */
#define RT_MUTEX_OWNER_MASKALL 3UL
futex_lock_pi()流程:
栈上分配struct futex_q q;
分配当前线程的pi_state_cache内存;
根据glibc中的用户态futex锁uaddr的地址和task_struct->mm-> mmap_sem生成q. key;
根据key找到hash_bucket, 初始化q.lock_ptr,并上锁;
如果锁的拥有都为当前线程,释放q.lock_ptr锁后退出;
将uaddr置位FUTEX_WAITERS;
正在持有futex锁uaddr的线程p,第一次会以uaddr&0x0fff_ffff为pid找到;
lookup_pi_state():遍历hash_bucket-> chain,根据q.key值找到相应的pi_state;如果找不到则返回之前分配好的当前线程的pi_state_cache,并初始化pi_state:拷贝q.key,owner指向线程p,初始化pi_mutex(初始化wait_lock,将owner指向线程p),将list链入线程p的pi_state_list(注pi_state始终链入锁的拥有者线程中);
__queue_me():初始化q,将q.list链入hash_bucket->chain中,q.task指向当前线程,并释放q.lock_ptr锁;
如果是高优先级的线程则可以偷锁,偷锁成功后会调用fixup_pi_state_owner()对pi_state以及uaddr的值进行修正;
rt_mutex_timed_lock()->rt_mutex_slowlock()->task_blocks_on_rt_mutex(lock, &waiter, detect):
其中waiter为栈上分配的rt_mutex_waiter结构;设定当前线程为TASK_INTERRUPTIBLE状态;初始化waiter,task指向当前线程,lock指向当前rt_mutex锁,并使用当前线程的prio初始化两个plist_node结构;waiter->list_entry链入lock->wait_list, waiter->pi_list_entry链入owner ->pi_waiters(owner为拥有锁的线程即p);当前线程的pi_blocked_on指向waiter(用于检测死锁和PI处理,会在被唤醒时检查并置NULL),最后__rt_mutex_adjust_prio(owner)进行优先级继承操作,如果存在锁链(一个低优先级rtmutex1的拥有者会继承高优先级的waiter的优先级, 谓之PI;如果此线程阻塞在另外的rtmutex2,这个优先级会传播到rtmutex2的拥有者,依次类推传播,默认允许传播1024个;此种情形暂称之为存在锁链),则调用rt_mutex_adjust_prio_chain()进行优先级的传播;涉及到的锁为:lock->wait_lock和task->pi_lock;
schedule();/* unlock pi时会调用wakeup_next_waiter()唤醒此线程 */
正常流程调用try_to_steal_lock()成功返回:当成功后清除RT_MUTEX_OWNER_PENDING标志。
当rt_mutex_timed_lock()调用失败时(如信号唤醒),如果实时锁的拥有者恰好为当前线程则进行rt_mutex_trylock()获取锁;其它情况进行相应参数修正退出;
把q.list从hash_bucket中摘除,根据情况把q.pi_state从task_struct中摘除,把q.pi_state ->pi_mutex的owner置NULL(如果没有等待者),清除pi_state资源等。
注:2.6.21内核存在许多pi futex的BUG。例如,在频繁fork的时候没有处理cmpxchg_futex_value_locked返回-EFAULT的情况,还是用高版本内核为好。