Chinaunix首页 | 论坛 | 博客
  • 博客访问: 610122
  • 博文数量: 197
  • 博客积分: 7001
  • 博客等级: 大校
  • 技术积分: 2155
  • 用 户 组: 普通用户
  • 注册时间: 2005-02-24 00:29
文章分类

全部博文(197)

文章存档

2022年(1)

2019年(2)

2015年(1)

2012年(100)

2011年(69)

2010年(14)

2007年(3)

2005年(7)

分类: LINUX

2011-01-25 23:58:32

GNU Pth 是著名的用户级线程库, NGPT是基于Pth 的M:N模型线程库。
里面有几个概念讲得比较透彻,摘录如下
o reentrant, thread-safe and asynchronous-safe functions
A reentrant function is one that behaves correctly if it is called simultaneously by several threads and then also executes simultaneously. Functions that access global state, such as memory or files, of course, need to be carefully designed in order to be reentrant. Two traditional approaches to solve these problems are caller-supplied states and thread-specific data.
Thread-safety is the avoidance of data races, i.e., situations in which data is set to either correct or incorrect value depending upon the (unpredictable) order in which multiple threads access and modify the data. So a function is thread-safe when it still behaves semantically correct when called simultaneously by several threads (it is not required that the functions also execute simultaneously). The traditional approach to achieve thread-safety is to wrap a function body with an internal mutual exclusion lock (aka 'mutex'). As you should recognize, reentrant is a stronger attribute than thread-safe, because it is harder to achieve and results especially in no run-time contention between threads. So, a reentrant function is always thread-safe, but not vice versa.
Additionally there is a related attribute for functions named asynchronous-safe, which comes into play in conjunction with signal handlers. This is very related to the problem of reentrant functions. An asynchronous-safe function is one that can be called safe and without side-effects from within a signal handler context. Usually very few functions are of this type, because an application is very restricted in what it can perform from within a signal handler (especially what system functions it is allowed to call). The reason mainly is, because only a few system functions are officially declared by POSIX as guaranteed to be asynchronous-safe. Asynchronous-safe functions usually have to be already reentrant.

很明显,下面的场景还是少数
oPth increases the responsiveness and concurrency of an event-driven application, but NOT the concurrency of number-crunching applications.
The reason is the non-preemptive scheduling. Number-crunching applications usually require preemptive scheduling to achieve concurrency because of their long CPU bursts. For them, non-preemptive scheduling (even together with explicit yielding) provides only the old concept of 'coroutines'. On the other hand, event driven applications benefit greatly from non-preemptive scheduling. They have only short CPU bursts and lots of events to wait on, and this way run faster under non-preemptive scheduling because no unnecessary context switching occurs, as it is the case for preemptive scheduling. That's why Pth is mainly intended for server type applications, although there is no technical restriction.

The purpose of the NEW queue has to do with the fact that in Pth a thread never directly switches to another thread. A thread always yields execution to the scheduler and the scheduler dispatches to the next thread. So a freshly spawned thread has to be kept somewhere until the scheduler gets a chance to pick it up for scheduling. That is what the NEW queue is for.
The purpose of the DEAD queue is to support thread joining. When a thread is marked to be unjoinable, it is directly kicked out of the system after it terminated. But when it is joinable, it enters the DEAD queue. There it remains until another thread joins it.
后版本的代码




NGPT is the user-level portion of a POSIX pthreads library which provides non-preemptive priority-based scheduling for multiple threads of execution (aka ``multithreading'') inside event-driven applications. All threads run in the same address space of the server application, but each thread has it's own individual program-counter, run-time stack, signal mask and errno variable.
On SMP machines, this library will use an M:N threading model if enabled resulting in significantly improved performance.

相关的邮件列表,涉及实现细节很少
http://blog.gmane.org/gmane.linux.ngpt.user/month=20030301
按照2003年3月NGPT官方网站上的通知,NGPT考虑到NPTL日益广泛地为人所接受,为避免不同的线程库版本引起的混乱,今后将不再进行进一步开发,而今进行支持性的维护工作。


项目开发者的ppt,大纲式的,无实现细节


(重要)

M:N的具体体现
NGPT 2.2.1 文件 pth_lib.c 中有如下代码
intern  int  pth_max_native_threads;    /*ibm*/
intern  int  pth_number_of_natives;    /*ibm*/
intern  int  pth_threads_per_native;    /*ibm*/

pth_max_native_threads 就是M:N中的N的最大值,即内核线程的最大数目,一般不会超过物理CPU数目Max Number of natives。   pth_number_of_natives 就是当前进程拥有的实际内核线程数目Current Number of natives。pth_threads_per_native实际是M:N的比值(Number of user threads per native),参见

pth_init函数中有如下代码对上面变量进行初始化:
           /*begin ibm*/
    pth_threads_per_native  = 1;
    pth_max_native_threads  = 0;
    pth_number_of_natives   = 1;

    /* determine the number of native threads per cpu. */
    c_ratio = getenv("MAXTHREADPERCPU");
    if (c_ratio != NULL) {
           long ratio = strtol(c_ratio, (char **)NULL, 10);
           if (errno != ERANGE)
               pth_threads_per_native = (int)ratio;
    }
   
    /*
    * See if the MAXNATIVETHREADS environment variable is set.
    * We'll use this instead of the number of cpus if this
    * is set since the user wants to override the default behavior
    * which is based on the number of CPUs in the host.
    */
    c_numcpus = getenv("MAXNATIVETHREADS");
    if (c_numcpus != NULL) {
           long numcpus = strtol(c_numcpus, (char **)NULL, 10);
           if (errno != ERANGE)
               pth_max_native_threads = (int)numcpus;
    }

    /*
    * We check to see if we've gotten an override...
    *    If not, we'll base it off of CPU and set a
    *    max number of threads per cpu to 1.
    */
          // sysconf(_SC_NPROCESSORS_CONF);返回CPU数目
         //从下面代码来看,如果没有设置MAXNATIVETHREADS, 则 pth_threads_per_native为
        // 1, 似乎是1:1模型,果真如此吗?参见pth_spawn_cb的注释

    if (pth_max_native_threads == 0) {
           pth_max_native_threads = sysconf(_SC_NPROCESSORS_CONF);
          pth_threads_per_native = 1;
           cpu_based = 1;
    }

    if (pth_max_native_threads > 1) {
           pth_main->boundnative = &pth_first_native;
           pth_max_native_threads++;
    }


函数pth_spawn_cb 是创建线程的实际函数,该函数必然创建一个用户态线程,是否创建核心态线程(native thread)有如下代码,可以有如下结论:
1  M:N 模型不适用于PTH_SCOPE_SYSTEM线程,只适用于 PTH_SCOPE_PROCESS线程。所以创建PTH_SCOPE_SYSTEM native thread时, pth_max_native_threads需要增1调整。
2  从(pth_active_threads % pth_threads_per_native) == 0来看,每pth_threads_per_native个用户态线程创建一个新的native thread,即使pth_threads_per_native为1. 但是创建的数目会受条件pth_number_of_natives < pth_max_native_threads的约束
3 pth_new_native 负责分配内核线程


if (scope == PTH_SCOPE_PROCESS) {
   /*
    * Check to see if we're allowed to create additional native
    * threads and we've reached the threshold...
    */
   if ( pth_max_native_threads > 1 && (pth_active_threads > 1) &&
        (((pth_active_threads % pth_threads_per_native) == 0)
       || (pth_active_threads-1 == 1) )) {
       /*
        * We are, now check to see if we've reached the max number of natives an'
        * we've reached the threshold...
        */
       if ((pth_number_of_natives < pth_max_native_threads) &&
       (pth_number_of_natives < pth_active_threads)) {
       /*
        * We're not yet at the maximum number of natives so it's time
        * to create another native thread and start scheduling on it.
        */
              if (pth_new_native(scope, 0) == NULL) {
           pth_tcb_free (thread);
           return NULL;
       }  
   }
} else {
       if ((thread->boundnative = pth_new_native(scope, thread)) == NULL) {
           pth_tcb_free(thread);
           return NULL;
       }
       pth_CQ = &(thread->boundnative->ready_queue);
       pth_max_native_threads++;
}
pth_descr_st 和pth_st前者描述内核线程,后者描述用户态线程pth_tcb_free。
      用户态线程的分配通过 pth_spawn_cb调用 pth_tcb_alloc。用户态线程结束时调用pth_exit,先设置状态:

dying_thread->state = PTH_STATE_EXIT;

然后切换到scheduler线程执行线程的释放
pth_mctx_switch(&dying_thread->mctx, &descr->sched->mctx);

接着pth_schedule线程根据current线程状态,调用pth_tcb_free释放线程结构
if (current != NULL && current->state == PTH_STATE_EXIT) {
           if (!current->joinable) {
                      pth_tcb_free(current);
       } else {

这 里比较有意思的是因为调度是一个线程,必然是scheduler->current->scheduler的过程,current不会有变 化,而Linux进程切换是一个函数,普遍会出现A->B->C  的情况,才有switch_to的三个参数。

内核线程的分配是通过pth_new_native函数,空间来自一个静态的大小固定数组。
struct pth_descr_st pth_native_list[PTH_MAX_NATIVE_THREADS];

pth_new_native的代码如下
if (scope == PTH_SCOPE_SYSTEM) {
       /* Re-use the native thread, if available */
       int slot = 1;
       while (pth_native_list[slot].is_used) {
           descr = &pth_native_list[slot++];
           if (descr->is_bounded && !descr->bounded_thread) {
               pth_release_lock(&pth_native_lock);
               return descr;
           }
       }
}
if ((descr = pth_alloc_native(TRUE, FALSE)) == NULL) {

pth_descr_t pth_alloc_native(int create_stack, int is_watchdog)
{
    pth_descr_t    descr = (is_watchdog) ? &(pth_watchdog_descr) : &(pth_native_list[pth_number_of_natives++]);

从 上面可以看出,PTH_SCOPE_SYSTEM可能复用原来已分配(但现在bound_thread不存在了)的内核线程。对于 PTH_SCOPE_PROCESS线程来说,只会递增(pth_number_of_natives++),直到上限 PTH_MAX_NATIVE_THREADS。

内核线程一旦分配,中途不释放,直至进程结束调用pth_drop_natives();

所 以看来pth_threads_per_native 没有真正实现,设想如下场景,先分配pth_threads_per_native 个线程,再释放pth_threads_per_native ,反复如此,pth_number_of_natives轻松达到最大值,但是用户线程数目实际没有上去。
pth_new_scheduler 和 pth_scheduler每一个native thread都有一个scheduler用于调度复用在它上面的user threads。

创建内核线程的函数pth_new_native有如下代码:
native_pid = clone(pth_new_scheduler, descr->stack_top, pth_clone_flags, descr);
可以看到,内核线程执行函数pth_new_scheduler。

可是pth_new_scheduler 函数初始化descr结构后,创建一个用户线程pth_scheduler后,直接pth_mctx_restore切换到用户线程,永远不回来了。

descr->sched = PTH_SPAWN(t_attr, pth_scheduler, NULL);

   
    /* Make the scheduler the current thread for this native... */
    descr->current = descr->sched;
    descr->is_running = TRUE;

    /* switch context to this new scheduler... */
    pth_mctx_restore(&descr->sched->mctx);

这 里存在一个比较令人困惑的地方,就是pth_new_scheduler所在的栈空间没有释放?当初可不可以直接调用 clone(pth_scheduler, descr->stack_top, pth_clone_flags, descr); 而不经过pth_new_scheduler,如果这样做不行的原因可能是pth_scheduler无法做为一个descr->sched所描述 的user thread了。

pth_scheduler() calls pth_sched_eventmanager() in order to process any events which have occurred and to move those threads which are waiting on those threads from the WAITING queue to the READY queue. If the READY queue is not empty, then pth_sched_eventmanager() is called with a flag which indicates that it should only poll for any events which may have occurred, rather than waiting for one to occur. So far so good.

线程组织与队列候选用户进程放在队列中,NGPT是cooperative的,当线程阻塞或主动放弃时会切换到pth_schedule线程,该线程会从相应队列选择进程。
pth_scheduler代码如下:
if (descr->is_bounded || native_is_group_leader(descr)) {
   NQ = &descr->new_queue;
   RQ = &descr->ready_queue;
    } else {
   NQ = &pth_NQ;
   RQ = &pth_RQ;
    }
对 于bounded thread就是1:1,就用绑定内核线程的new_queue和ready_queue。对于PTH_SCOPE_PROCESS内核线程, 则共用全局队列pth_NQ 和pth_RQ,这也就是M:N所在,M个核心线程pth_schedule 从N个线程选择(在pth_NQ 和pth_RQ中)。
阅读(9787) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~