分类: LINUX
2011-06-30 10:43:35
Linux機制之工作隊列
一.Workqueue創建
由於多核的發展,不得不考慮多核協同工作時的工作,以workqueue為例,如果沒有設定是singlethread,則會在每一個CPU上創建workqueue。關於多核的應用,在這裡我們不看了,僅僅是考慮單核的情況。
常見的workqueue創建有以下幾個:
#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
從上面的宏定義可見,均是調用__create_workqueue函數,不同的是傳入的參數不同而已。
#define __create_workqueue(name, singlethread, freezeable, rt) \
__create_workqueue_key((name), (singlethread), (freezeable), (rt), NULL, NULL)
下面我們均會忽略關於lockdep的應用。
不過在看下面的函數之前需要看一下一個很重要的數據結構:
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq; --針對于每一個CPU的workqueue_struct
struct list_head list;
const char *name;
int singlethread; --是否為一個單獨的進程,如果是單獨進程,使用第一個可能的CPU
int freezeable; --在掛起時是否凍結線程
int rt;
};
每一個CPU的workqueue實例
struct cpu_workqueue_struct {
spinlock_t lock;
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work;
struct workqueue_struct *wq;
struct task_struct *thread;
int run_depth; /* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;
struct workqueue_struct *__create_workqueue_key(const char *name,int singlethread,int freezeable,int rt,struct lock_class_key *key,const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
int err = 0, cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);--為per-cpu開闢cpu_wq,針對于單獨的cpu,則是直接採用kmalloc開闢內存空間。
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name;
wq->singlethread = singlethread;
wq->freezeable = freezeable;
wq->rt = rt;
INIT_LIST_HEAD(&wq->list);
if (singlethread) {--在第一個可能的cpu上創建workqueue
cwq = init_cpu_workqueue(wq, singlethread_cpu); --初始化cpu_wq,第二個參數為cpu的id號。
err = create_workqueue_thread(cwq, singlethread_cpu);--創建wq線程
start_workqueue_thread(cwq, -1);
} else {
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
for_each_possible_cpu(cpu) {
cwq = init_cpu_workqueue(wq, cpu);
if (err || !cpu_online(cpu))
continue;
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
}
return wq;
}
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
struct workqueue_struct *wq = cwq->wq;
const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d";
struct task_struct *p;
p = kthread_create(, cwq, fmt, wq->name, cpu);--創建一個新的線程,執行worker_thread函數。不過創建時不會執行
if (cwq->wq->rt)
sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
cwq->thread = p;
return 0;
}
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
if (cwq->wq->freezeable)
set_freezable();
set_user_nice(current, -5); --設置當前的線程的優先級
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) &&
!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
if (kthread_should_stop())
break;
run_workqueue(cwq);
}
return 0;
}
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
cwq->run_depth++;
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
cwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irq(&cwq->lock);
work_clear_pending(work);
f(work); ---執行work_struct中的fn函數。
spin_lock_irq(&cwq->lock);
cwq->current_work = NULL;
}
cwq->run_depth--;
spin_unlock_irq(&cwq->lock);
}
從上面的函數的實現上可以很明顯的看出,其本質是創建一個新的守護線程,用於執行work_struct中的fn函數。
在創建線程完成后,就是喚醒線程,讓守護程序運行
static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct task_struct *p = cwq->thread;
if (p != NULL) {
if (cpu >= 0)
kthread_bind(p, cpu);
wake_up_process(p); --這個函數名有點不倫不類的,因為從調用該函數的函數名上看是開始wq的線程,而這裡卻是喚醒進程。額,在linux線程和進程分的不是很仔細,各位不要介意。
}
}
二.刪除workqueue
void destroy_workqueue(struct workqueue_struct *wq)
{
const cpumask_t *cpu_map = wq_cpu_map(wq);
int cpu;
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);
for_each_cpu_mask_nr(cpu, *cpu_map)
(per_cpu_ptr(wq->cpu_wq, cpu));
free_percpu(wq->cpu_wq);
kfree(wq);
}
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
{
if (cwq->thread == NULL)
return;
flush_cpu_workqueue(cwq);--清空wq上所有的work_struct
kthread_stop(cwq->thread);--停止kthread
cwq->thread = NULL;
}
static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
int active;
if (cwq->thread == current) {
run_workqueue(cwq);
active = 1;
} else {
struct wq_barrier barr;
active = 0;
spin_lock_irq(&cwq->lock);
if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
insert_wq_barrier(cwq, &barr, &cwq->worklist);
active = 1;
}
spin_unlock_irq(&cwq->lock);
if (active)
wait_for_completion(&barr.done);
}
return active;
}
從上面的函數很明顯看出一個bug就是沒有確保所有的work_struct被執行,就開始刪除線程。這個的處理辦法不是太好,在2.6.36中內核對work_queue進行了很大的改動。因為由於work_queue的好用性,內核遍佈著work_queue,但是該work_queue的架構有點問題,故在後期中改動很大。關於work_queue的新的特性以後再看。
三.關於work_queue的操作函數
3.1創建wq
create_workqueue(name)
create_rt_workqueue(name)
create_freezeable_workqueue(name)
create_singlethread_workqueue(name)
3.2刪除wq
void destroy_workqueue(struct workqueue_struct *wq);
3.3為wq添加work
Int queue_work(struct workqueue_struct *wq, struct work_struct *work);
int queue_work_on(int cpu, struct workqueue_struct *wq,struct work_struct *work);
int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *work, unsigned long delay);
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,struct delayed_work *work, unsigned long delay);
3.4清除wq的work
void flush_workqueue(struct workqueue_struct *wq);
void flush_scheduled_work(void);
3.5關於keventd_wq操作
int schedule_work(struct work_struct *work);
int schedule_work_on(int cpu, struct work_struct *work);
int schedule_delayed_work(struct delayed_work *work, unsigned long delay);
int schedule_delayed_work_on(int cpu, struct delayed_work *work,unsigned long delay);
四.關於kevent_wq
Kevent_wq其本質還是workqueue,只不過這個wq是在系統初始化時創建時,該函數的存在也為像我們這些懶人創建一個好的平臺就是不需要重新創建一個新的wq才可以使用等待隊列,我們完全可以只創建work_struct就可以使用workqueue了。這個全局workqueue在2.6.36中變的比較複雜了。內核一天比一天複雜,不過整體的趨勢是一天比一天穩定。
五.work_struct
等待隊列中,work_queue負責管理work_struc以及線程信息。不過我們用戶的具體函數實現還是在work_struct中實現。Work_struct才是實體。
六.總結
等待隊列實際是為內核用戶提供一種簡單的線程創建的接口。說的直白一點就是,workqueue是一個線程,想添加什麽函數處理直接就添加一個work_struct就行了。