惊群现象是在多进程或者多线程场景下,多个进程或者多个线程在同一条件下睡眠,当唤醒条件发生的时候,会同时唤醒这些睡眠的进程或者线程,但是只有一个是可以成功执行的,而其他的进程或者线程被唤醒后存在着执行开销的浪费。
Linux中惊群的触发
多个进程或者线程在获取同一把锁的时候睡眠
多个进程或者线程同时进行accept
多个进程在同一个epoll上竞争
多个进程在多个epoll上对于同一个监听的socket进行accept
Linux中的解决方案
通用解决方案
Linux提供了wake_up_process方法,用于唤醒一个指定的进程。
int wake_up_process(struct task_struct *p)
{
return try_to_wake_up(p, TASK_ALL, 0);
}
Linux系统中进程唤醒的流程为,当有多个进程同时睡眠的时候,Linux通过工作队列的方式维护这些睡眠的进程。在进程睡眠之前,会先进行一个操作:
void
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
prepare_to_wait函数主要是将当前的flags加上了WQ_FLAG_EXCLUSIVE标志,该标志意味着进程想要被独占地唤醒,然后放入到工作队列的尾部(如果含有独占标志的进程并不位于队列尾部,将导致其后的不含有该标志的进程无法执行),最后设置相应状态,如TASK_INTERUPTIBLE表示可以被wake_up唤醒。当需要唤醒进程的时候,Linux提供了__wake_up_common方法唤醒工作队列中的进程:
/*
* The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just
* wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve
* number) then we wake all the non-exclusive tasks and one exclusive task.
*
* There are circumstances in which we can try to wake a task which has already
* started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
* zero in this (rare) case, and we handle it by continuing to scan the queue.
*/
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
__wake_up_common函数会遍历工作队列,寻找flags中含有WQ_FLAG_EXCLUSIVE标志的进程,如果没有设置独占标志,则根据mode唤醒每个睡眠的进程。nr_exclusiv表示需要唤醒的设置了独占标志进程的数目,当nr_exclusive减为零的时候,就会跳出循环,它在wake_up中设置为1,表明当处理了一个含有WQ_FLAG_EXCLUSIVE标志进程后,将不再处理,所以只能唤醒nr_exclusive个进程。
其中回调函数curr->func的回调函数是通过DEFINE_WAIT(wait)宏定义的:
#define DEFINE_WAIT_FUNC(name, function) \
wait_queue_t name = { \
.private = current, \
.func = function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
}
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
通过上面的代码可以发现回调函数为autoremove_wake_function:
//kernel/wait.c
int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int ret = default_wake_function(wait, mode, sync, key);
if (ret)
list_del_init(&wait->task_list);
return ret;
}
//kernel/sched.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
void *key)
{
return try_to_wake_up(curr->private, mode, wake_flags);
}
autoremove_wake_function最终通过default_wake_function调用try_to_wake_up实现唤醒指定进程。
socket accept场景下的惊群方案
在Linux中当对服务器监听的socket进行accept操作时,假如没有新的accept事件则会进行睡眠。sys_accept调用最终会在TCP层执行inet_csk_accept函数:
struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
{
struct inet_connection_sock *icsk = inet_csk(sk);
struct sock *newsk;
int error;
lock_sock(sk);
...
//阻塞等待直到有全连接建立。如果用户设置了等待超时时间,超时后会退出
error = inet_csk_wait_for_connect(sk, timeo);
...
out:
release_sock(sk);
...
}
其中inet_csk_accept在等待accept连接到来的时候,会执行inet_csk_wait_for_connect:
/*
* Wait for an incoming connection, avoid race conditions. This must be called
* with the socket locked.
*/
static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
struct inet_connection_sock *icsk = inet_csk(sk);
DEFINE_WAIT(wait);
...
for (;;) {
prepare_to_wait_exclusive(sk_sleep(sk), &wait,
TASK_INTERRUPTIBLE);
...
if (reqsk_queue_empty(&icsk->icsk_accept_queue))
timeo = schedule_timeout(timeo);
...
}
...
}
其中函数prepare_to_wait_exclusive的作用与之前的例子一样,此处的DEFINE_WAIT(wait)将当前的进程包装成wait_queue_t结构放入到监听socket的等待队列的尾部,然后通过schedule_timeout函数让当前进程睡眠timeo个时间。
该进程被唤醒只有:
1. 睡眠timeo后被timer定时器唤醒
2. accept事件到来
第二种唤醒场景是通过网络层的代码触发。在TCP V4协议中执行过程为:tcp_v4->tcp_v4_do_rcv->tcp_child_process,而在tcp_child_process方法中会调用父socket,也就是监听socket的parent->sk_data_ready(parent)方法,在sock_init_data的时候其定义为:sk->sk_data_ready = sock_def_readable。
static void sock_def_readable(struct sock *sk, int len)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
sock_def_readable首先判断是否在等待队列中有睡眠进程,然后通过wake_up_interruptible_sync_poll进行唤醒。
//include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
//kernel/sched.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
int wake_flags = WF_SYNC;
if (unlikely(!q))
return;
if (unlikely(!nr_exclusive))
wake_flags = 0;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
spin_unlock_irqrestore(&q->lock, flags);
}
可以看出最终还是通过__wake_up_common执行唤醒操作,并且nr_exclusive = 1说明一次只会唤醒一个进程,不会发生惊群。
其实在inet_csk_accept时候,先进行了lock_sock(sk)操作,理论上锁住了监听的socket,每次只有一个进程可以accept。但是实际上lock_sock(sk)的时候要是没有拿到锁也会进行睡眠,加入不做特殊处理也有可能惊群。
epoll的惊群解决方案
在之前的文章linux源码角度看epoll中介绍了,在使用epoll的时候会在注册事件之后会调用epoll_wait系统调用,该方法会调用ep_poll方法:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
...
if (list_empty(&ep->rdllist)) {
//没有事件,需要睡眠。当有事件到来时,睡眠会被ep_poll_callback函数唤醒
//将current初始化为等待队列项wait后,放入ep->wg的等待队列中
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;//linux 3.13.1已经导入,应该还在更早的版本就导入了
__add_wait_queue(&ep->wq, &wait);
for (;;) {
//执行ep_poll_callback()时应当将当前进程唤醒,所以当前进程状态应该为“可唤醒”TASK_INTERRUPTIBLE
set_current_state(TASK_INTERRUPTIBLE);
//如果就绪队列不为空或者已超时则退出循环
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
//如果当前进程收到信号则退出循环,返回EINTR错误
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
//主动让出处理器,等待ep_poll_callback()将当前进程唤醒或者超时,返回值是剩余的时间。
//从这里开始当前进程会进入睡眠状态,直到某些文件的状态就绪或者超时。
//当文件状态就绪时,eventpoll的回调函数ep_poll_callback()会唤醒在ep->wq指向的等待队列中的进程。
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
...
}
其中在没有事件需要睡眠时,通过函数__add_wait_queue将当前进程放入等待队列的对头中。此外会先将WQ_FLAG_EXCLUSIVE赋给flags,表示该睡眠进程是一个互斥进程。睡眠的当前进程会被回调函数ep_poll_callback唤醒:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
...
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
...
}
#define wake_up_locked(x) __wake_up_locked((x), TASK_NORMAL)
void __wake_up_locked(wait_queue_head_t *q, unsigned int mode)
{
__wake_up_common(q, mode, 1, 0, NULL);
}
ep_poll_callback最终通过__wake_up_common来唤醒等待队列中的互斥进程。
form:jacktang816.github.io/post/jingqun/
阅读(621) | 评论(0) | 转发(0) |