Chinaunix首页 | 论坛 | 博客
  • 博客访问: 665758
  • 博文数量: 156
  • 博客积分: 4833
  • 博客等级: 上校
  • 技术积分: 1554
  • 用 户 组: 普通用户
  • 注册时间: 2007-05-21 19:36
文章分类

全部博文(156)

文章存档

2016年(2)

2013年(1)

2012年(13)

2011年(30)

2010年(46)

2009年(29)

2008年(23)

2007年(12)

分类: LINUX

2010-11-01 13:13:33


December 17, 2007

This article was contributed by Paul McKenney

[Editor's note: this is the first in a three-part series on how the read-copy-update mechanism works. Many thanks to Paul McKenney and Jonathan Walpole for allowing us to publish these articles. The remaining two sections will appear in future weeks.]

Part 1 of 3 of What is RCU, Really?

Paul E. McKenney, IBM Linux Technology Center
Jonathan Walpole, Portland State University Department of Computer Science

Introduction

Read-copy update (RCU) is a synchronization mechanism that was added to the Linux kernel in October of 2002. RCU achieves scalability improvements by allowing reads to occur concurrently with updates. In contrast with conventional locking primitives that ensure mutual exclusion among concurrent threads regardless of whether they be readers or updaters, or with reader-writer locks that allow concurrent reads but not in the presence of updates, RCU supports concurrency between a single updater and multiple readers. RCU ensures that reads are coherent by maintaining multiple versions of objects and ensuring that they are not freed up until all pre-existing read-side critical sections complete. RCU defines and uses efficient and scalable mechanisms for publishing and reading new versions of an object, and also for deferring the collection of old versions. These mechanisms distribute the work among read and update paths in such a way as to make read paths extremely fast. In some cases (non-preemptable kernels), RCU's read-side primitives have zero overhead.

: But doesn't seqlock also permit readers and updaters to get work done concurrently?

This leads to the question "what exactly is RCU?", and perhaps also to the question "how can RCU possibly work?" (or, not infrequently, the assertion that RCU cannot possibly work). This document addresses these questions from a fundamental viewpoint; later installments look at them from usage and from API viewpoints. This last installment also includes a list of references.

RCU is made up of three fundamental mechanisms, the first being used for insertion, the second being used for deletion, and the third being used to allow readers to tolerate concurrent insertions and deletions. These mechanisms are described in the following sections, which focus on applying RCU to linked lists:

  1. (for insertion)
  2. (for deletion)
  3. (for readers)

These sections are followed by and the .

One key attribute of RCU is the ability to safely scan data, even though that data is being modified concurrently. To provide this ability for concurrent insertion, RCU uses what can be thought of as a publish-subscribe mechanism. For example, consider an initially NULL global pointer gp that is to be modified to point to a newly allocated and initialized data structure. The following code fragment (with the addition of appropriate locking) might be used for this purpose:

  1 struct foo {

  2   int a;

  3   int b;

  4   int c;

  5 };

  6 struct foo *gp = NULL;

  7

  8 /* . . . */

  9

 10 p = kmalloc(sizeof(*p), GFP_KERNEL);

 11 p->a = 1;

 12 p->b = 2;

 13 p->c = 3;

 14 gp = p;

Unfortunately, there is nothing forcing the compiler and CPU to execute the last four assignment statements in order. If the assignment to gp happens before the initialization of p's fields, then concurrent readers could see the uninitialized values. Memory barriers are required to keep things ordered, but memory barriers are notoriously difficult to use. We therefore encapsulate them into a primitive rcu_assign_pointer() that has publication semantics. The last four lines would then be as follows:

  1 p->a = 1;

  2 p->b = 2;

  3 p->c = 3;

  4 rcu_assign_pointer(gp, p);

The rcu_assign_pointer() would publish the new structure, forcing both the compiler and the CPU to execute the assignment to gp after the assignments to the fields referenced by p.

However, it is not sufficient to only enforce ordering at the updater, as the reader must enforce proper ordering as well. Consider for example the following code fragment:

  1 p = gp;

  2 if (p != NULL) {

  3   do_something_with(p->a, p->b, p->c);

  4 }

Although this code fragment might well seem immune to misordering, unfortunately, the and value-speculation compiler optimizations can, believe it or not, cause the values of p->a, p->b, and p->c to be fetched before the value of p! This is perhaps easiest to see in the case of value-speculation compiler optimizations, where the compiler guesses the value of p, fetches p->a, p->b, and p->c, then fetches the actual value of p in order to check whether its guess was correct. This sort of optimization is quite aggressive, perhaps insanely so, but does actually occur in the context of profile-driven optimization.

Clearly, we need to prevent this sort of skullduggery on the part of both the compiler and the CPU. The rcu_dereference() primitive uses whatever memory-barrier instructions and compiler directives are required for this purpose:

  1 rcu_read_lock();

  2 p = rcu_dereference(gp);

  3 if (p != NULL) {

  4   do_something_with(p->a, p->b, p->c);

  5 }

  6 rcu_read_unlock();

The rcu_dereference() primitive can thus be thought of as subscribing to a given value of the specified pointer, guaranteeing that subsequent dereference operations will see any initialization that occurred before the corresponding publish (rcu_assign_pointer()) operation. The rcu_read_lock() and rcu_read_unlock() calls are absolutely required: they define the extent of the RCU read-side critical section. Their purpose is explained in the , however, they never spin or block, nor do they prevent the list_add_rcu() from executing concurrently. In fact, in non-CONFIG_PREEMPT kernels, they generate absolutely no code.

Although rcu_assign_pointer() and rcu_dereference() can in theory be used to construct any conceivable RCU-protected data structure, in practice it is often better to use higher-level constructs. Therefore, the rcu_assign_pointer() and rcu_dereference() primitives have been embedded in special RCU variants of Linux's list-manipulation API. Linux has two variants of doubly linked list, the circular struct list_head and the linear struct hlist_head/struct hlist_node pair. The former is laid out as follows, where the green boxes represent the list header and the blue boxes represent the elements in the list.




Adapting the pointer-publish example for the linked list gives the following:

  1 struct foo {

  2   struct list_head list;

  3   int a;

  4   int b;

  5   int c;

  6 };

  7 LIST_HEAD(head);

  8

  9 /* . . . */

 10

 11 p = kmalloc(sizeof(*p), GFP_KERNEL);

 12 p->a = 1;

 13 p->b = 2;

 14 p->c = 3;

 15 list_add_rcu(&p->list, &head);

Line 15 must be protected by some synchronization mechanism (most commonly some sort of lock) to prevent multiple list_add() instances from executing concurrently. However, such synchronization does not prevent this list_add() from executing concurrently with RCU readers.

Subscribing to an RCU-protected list is straightforward:

  1 rcu_read_lock();

  2 list_for_each_entry_rcu(p, head, list) {

  3   do_something_with(p->a, p->b, p->c);

  4 }

  5 rcu_read_unlock();

The list_add_rcu() primitive publishes an entry into the specified list, guaranteeing that the corresponding list_for_each_entry_rcu() invocation will properly subscribe to this same entry.

: What prevents the list_for_each_entry_rcu() from getting a segfault if it happens to execute at exactly the same time as the list_add_rcu()?

Linux's other doubly linked list, the hlist, is a linear list, which means that it needs only one pointer for the header rather than the two required for the circular list. Thus, use of hlist can halve the memory consumption for the hash-bucket arrays of large hash tables.



Publishing a new element to an RCU-protected hlist is quite similar to doing so for the circular list:

  1 struct foo {

  2   struct hlist_node *list;

  3   int a;

  4   int b;

  5   int c;

  6 };

  7 HLIST_HEAD(head);

  8

  9 /* . . . */

 10

 11 p = kmalloc(sizeof(*p), GFP_KERNEL);

 12 p->a = 1;

 13 p->b = 2;

 14 p->c = 3;

 15 hlist_add_head_rcu(&p->list, &head);

As before, line 15 must be protected by some sort of synchronization mechanism, for example, a lock.

Subscribing to an RCU-protected hlist is also similar to the circular list:

  1 rcu_read_lock();

  2 hlist_for_each_entry_rcu(p, q, head, list) {

  3   do_something_with(p->a, p->b, p->c);

  4 }

  5 rcu_read_unlock();

: Why do we need to pass two pointers into hlist_for_each_entry_rcu() when only one is needed for list_for_each_entry_rcu()?

The set of RCU publish and subscribe primitives are shown in the following table, along with additional primitives to "unpublish", or retract:

Category

Publish

Retract

Subscribe

Pointers

rcu_assign_pointer()

rcu_assign_pointer(..., NULL)

rcu_dereference()

Lists

list_add_rcu()
list_add_tail_rcu()
list_replace_rcu()

list_del_rcu()

list_for_each_entry_rcu()

Hlists

hlist_add_after_rcu()
hlist_add_before_rcu()
hlist_add_head_rcu()
hlist_replace_rcu()

hlist_del_rcu()

hlist_for_each_entry_rcu()

Note that the list_replace_rcu(), list_del_rcu(), hlist_replace_rcu(), and hlist_del_rcu() APIs add a complication. When is it safe to free up the data element that was replaced or removed? In particular, how can we possibly know when all the readers have released their references to that data element?

These questions are addressed in the following section.

In its most basic form, RCU is a way of waiting for things to finish. Of course, there are a great many other ways of waiting for things to finish, including reference counts, reader-writer locks, events, and so on. The great advantage of RCU is that it can wait for each of (say) 20,000 different things without having to explicitly track each and every one of them, and without having to worry about the performance degradation, scalability limitations, complex deadlock scenarios, and memory-leak hazards that are inherent in schemes using explicit tracking.

In RCU's case, the things waited on are called "RCU read-side critical sections". An RCU read-side critical section starts with an rcu_read_lock() primitive, and ends with a corresponding rcu_read_unlock() primitive. RCU read-side critical sections can be nested, and may contain pretty much any code, as long as that code does not explicitly block or sleep (although a special form of RCU called "" does permit general sleeping in SRCU read-side critical sections). If you abide by these conventions, you can use RCU to wait for any desired piece of code to complete.

RCU accomplishes this feat by indirectly determining when these other things have finished, as has been described elsewhere for and .

In particular, as shown in the following figure, RCU is a way of waiting for pre-existing RCU read-side critical sections to completely finish, including memory operations executed by those critical sections.


However, note that RCU read-side critical sections that begin after the beginning of a given grace period can and will extend beyond the end of that grace period.

The following pseudocode shows the basic form of algorithms that use RCU to wait for readers:

  1. Make a change, for example, replace an element in a linked list.
  2. Wait for all pre-existing RCU read-side critical sections to completely finish (for example, by using the synchronize_rcu() primitive). The key observation here is that subsequent RCU read-side critical sections have no way to gain a reference to the newly removed element.
  3. Clean up, for example, free the element that was replaced above.

The following code fragment, adapted from those in the , demonstrates this process, with field a being the search key:

  1 struct foo {

  2   struct list_head list;

  3   int a;

  4   int b;

  5   int c;

  6 };

  7 LIST_HEAD(head);

  8

  9 /* . . . */

 10

 11 p = search(head, key);

 12 if (p == NULL) {

 13   /* Take appropriate action, unlock, and return. */

 14 }

 15 q = kmalloc(sizeof(*p), GFP_KERNEL);

 16 *q = *p;

 17 q->b = 2;

 18 q->c = 3;

 19 list_replace_rcu(&p->list, &q->list);

 20 synchronize_rcu();

 21 kfree(p);

Lines 19, 20, and 21 implement the three steps called out above. Lines 16-19 gives RCU ("read-copy update") its name: while permitting concurrent reads, line 16 copies and lines 17-19 do an update.

The synchronize_rcu() primitive might seem a bit mysterious at first. After all, it must wait for all RCU read-side critical sections to complete, and, as we saw earlier, the rcu_read_lock() and rcu_read_unlock() primitives that delimit RCU read-side critical sections don't even generate any code in non-CONFIG_PREEMPT kernels!

There is a trick, and the trick is that RCU Classic read-side critical sections delimited by rcu_read_lock() and rcu_read_unlock() are not permitted to block or sleep. Therefore, when a given CPU executes a context switch, we are guaranteed that any prior RCU read-side critical sections will have completed. This means that as soon as each CPU has executed at least one context switch, all prior RCU read-side critical sections are guaranteed to have completed, meaning that synchronize_rcu() can safely return.

Thus, RCU Classic's synchronize_rcu() can conceptually be as simple as the following:

  1 for_each_online_cpu(cpu)

  2   run_on(cpu);

Here, run_on() switches the current thread to the specified CPU, which forces a context switch on that CPU. The for_each_online_cpu() loop therefore forces a context switch on each CPU, thereby guaranteeing that all prior RCU read-side critical sections have completed, as required. Although this simple approach works for kernels in which preemption is disabled across RCU read-side critical sections, in other words, for non-CONFIG_PREEMPT and CONFIG_PREEMPT kernels, it does not work for CONFIG_PREEMPT_RT realtime (-rt) kernels. Therefore, uses a different approach based loosely on reference counters.

Of course, the actual implementation in the Linux kernel is much more complex, as it is required to handle interrupts, NMIs, CPU hotplug, and other hazards of production-capable kernels, but while also maintaining good performance and scalability. Realtime implementations of RCU must additionally help provide good realtime response, which rules out implementations (like the simple two-liner above) that rely on disabling preemption.

Although it is good to know that there is a simple conceptual implementation of synchronize_rcu(), other questions remain. For example, what exactly do RCU readers see when traversing a concurrently updated list? This question is addressed in the following section.

This section demonstrates how RCU maintains multiple versions of lists to accommodate synchronization-free readers. Two examples are presented showing how a an element that might be referenced by a given reader must remain intact while that reader remains in its RCU read-side critical section. The first example demonstrates deletion of a list element, and the second example demonstrates replacement of an element.

To start the "deletion" example, we will modify lines 11-21 in the as follows:

  1 p = search(head, key);

  2 if (p != NULL) {

  3   list_del_rcu(&p->list);

  4   synchronize_rcu();

  5   kfree(p);

  6 }

The initial state of the list, including the pointer p, is as follows.


The triples in each element represent the values of fields a, b, and c, respectively. The red borders on each element indicate that readers might be holding references to them, and because readers do not synchronize directly with updaters, readers might run concurrently with this entire replacement process. Please note that we have omitted the backwards pointers and the link from the tail of the list to the head for clarity.

After the list_del_rcu() on line 3 has completed, the 5,6,7 element has been removed from the list, as shown below. Since readers do not synchronize directly with updaters, readers might be concurrently scanning this list. These concurrent readers might or might not see the newly removed element, depending on timing. However, readers that were delayed (e.g., due to interrupts, ECC memory errors, or, in CONFIG_PREEMPT_RT kernels, preemption) just after fetching a pointer to the newly removed element might see the old version of the list for quite some time after the removal. Therefore, we now have two versions of the list, one with element 5,6,7 and one without. The border of the 5,6,7 element is still red, indicating that readers might be referencing it.


Please note that readers are not permitted to maintain references to element 5,6,7 after exiting from their RCU read-side critical sections. Therefore, once the synchronize_rcu() on line 4 completes, so that all pre-existing readers are guaranteed to have completed, there can be no more readers referencing this element, as indicated by its black border below. We are thus back to a single version of the list.


At this point, the 5,6,7 element may safely be freed, as shown below:


At this point, we have completed the deletion of element 5,6,7. The following section covers replacement.

Example 2: Maintaining Multiple Versions During Replacement

To start the replacement example, here are the last few lines of the :

  1 q = kmalloc(sizeof(*p), GFP_KERNEL);

  2 *q = *p;

  3 q->b = 2;

  4 q->c = 3;

  5 list_replace_rcu(&p->list, &q->list);

  6 synchronize_rcu();

  7 kfree(p);

The initial state of the list, including the pointer p, is the same as for the deletion example:


As before, the triples in each element represent the values of fields a, b, and c, respectively. The red borders on each element indicate that readers might be holding references to them, and because readers do not synchronize directly with updaters, readers might run concurrently with this entire replacement process. Please note that we again omit the backwards pointers and the link from the tail of the list to the head for clarity.

Line 1 kmalloc()s a replacement element, as follows:


Line 2 copies the old element to the new one:


Line 3 updates q->b to the value "2":


Line 4 updates q->c to the value "3":


Now, line 5 does the replacement, so that the new element is finally visible to readers. At this point, as shown below, we have two versions of the list. Pre-existing readers might see the 5,6,7 element, but new readers will instead see the 5,2,3 element. But any given reader is guaranteed to see some well-defined list.


After the synchronize_rcu() on line 6 returns, a grace period will have elapsed, and so all reads that started before the list_replace_rcu() will have completed. In particular, any readers that might have been holding references to the 5,6,7 element are guaranteed to have exited their RCU read-side critical sections, and are thus prohibited from continuing to hold a reference. Therefore, there can no longer be any readers holding references to the old element, as indicated by the thin black border around the 5,6,7 element below. As far as the readers are concerned, we are back to having a single version of the list, but with the new element in place of the old.


After the kfree() on line 7 completes, the list will appear as follows:


Despite the fact that RCU was named after the replacement case, the vast majority of RCU usage within the Linux kernel relies on the simple deletion case shown in the .

Discussion

These examples assumed that a mutex was held across the entire update operation, which would mean that there could be at most two versions of the list active at a given time.

: How would you modify the deletion example to permit more than two versions of the list to be active?

: How many RCU versions of a given list can be active at any given time?

This sequence of events shows how RCU updates use multiple versions to safely carry out changes in presence of concurrent readers. Of course, some algorithms cannot gracefully handle multiple versions. There are for adapting such algorithms to RCU, but these are beyond the scope of this article.

This article has described the three fundamental components of RCU-based algorithms:

  1. a publish-subscribe mechanism for adding new data,
  2. a way of waiting for pre-existing RCU readers to finish, and
  3. a discipline of maintaining multiple versions to permit change without harming or unduly delaying concurrent RCU readers.

: How can RCU updaters possibly delay RCU readers, given that the rcu_read_lock() and rcu_read_unlock() primitives neither spin nor block?

These three RCU components allow data to be updated in face of concurrent readers, and can be combined in different ways to implement a surprising variety of different types of RCU-based algorithms, some of which will be the topic of the next installment in this "What is RCU, Really?" series.

Acknowledgements

We are all indebted to Andy Whitcroft, Gautham Shenoy, and Mike Fulton, whose review of an early draft of this document greatly improved it. We owe thanks to the members of the Relativistic Programming project and to members of PNW TEC for many valuable discussions. We are grateful to Dan Frye for his support of this effort. Finally, this material is based upon work supported by the National Science Foundation under Grant No. CNS-0719851.

This work represents the view of the authors and does not necessarily represent the view of IBM or of Portland State University.

Linux is a registered trademark of Linus Torvalds.

Other company, product, and service names may be trademarks or service marks of others.

Quick Quiz 1: But doesn't seqlock also permit readers and updaters to get work done concurrently?

Answer: Yes and no. Although seqlock readers can run concurrently with seqlock writers, whenever this happens, the read_seqretry() primitive will force the reader to retry. This means that any work done by a seqlock reader running concurrently with a seqlock updater will be discarded and redone. So seqlock readers can run concurrently with updaters, but they cannot actually get any work done in this case.

In contrast, RCU readers can perform useful work even in presence of concurrent RCU updaters.

Quick Quiz 2: What prevents the list_for_each_entry_rcu() from getting a segfault if it happens to execute at exactly the same time as the list_add_rcu()?

Answer: On all systems running Linux, loads from and stores to pointers are atomic, that is, if a store to a pointer occurs at the same time as a load from that same pointer, the load will return either the initial value or the value stored, never some bitwise mashup of the two. In addition, the list_for_each_entry_rcu() always proceeds forward through the list, never looking back. Therefore, the list_for_each_entry_rcu() will either see the element being added by list_add_rcu(), or it will not, but either way, it will see a valid well-formed list.

Quick Quiz 3: Why do we need to pass two pointers into hlist_for_each_entry_rcu() when only one is needed for list_for_each_entry_rcu()?

Answer: Because in an hlist it is necessary to check for NULL rather than for encountering the head. (Try coding up a single-pointer hlist_for_each_entry_rcu(). If you come up with a nice solution, it would be a very good thing!)

Quick Quiz 4: How would you modify the deletion example to permit more than two versions of the list to be active?

Answer: One way of accomplishing this is as follows:

spin_lock(&mylock);

p = search(head, key);

if (p == NULL)

        spin_unlock(&mylock);

else {

        list_del_rcu(&p->list);

        spin_unlock(&mylock);

        synchronize_rcu();

        kfree(p);

}

Note that this means that multiple concurrent deletions might be waiting in synchronize_rcu().

Quick Quiz 5: How many RCU versions of a given list can be active at any given time?

Answer: That depends on the synchronization design. If a semaphore protecting the update is held across the grace period, then there can be at most two versions, the old and the new.

However, if only the search, the update, and the list_replace_rcu() were protected by a lock, then there could be an arbitrary number of versions active, limited only by memory and by how many updates could be completed within a grace period. But please note that data structures that are updated so frequently probably are not good candidates for RCU. That said, RCU can handle high update rates when necessary.

Quick Quiz 6: How can RCU updaters possibly delay RCU readers, given that the rcu_read_lock() and rcu_read_unlock() primitives neither spin nor block?

Answer: The modifications undertaken by a given RCU updater will cause the corresponding CPU to invalidate cache lines containing the data, forcing the CPUs running concurrent RCU readers to incur expensive cache misses. (Can you design an algorithm that changes a data structure without inflicting expensive cache misses on concurrent readers? On subsequent readers?)


------------------------------------------
本文系本站原创,欢迎转载!
转载请注明出处:http://ericxiao.cublog.cn/
------------------------------------------
一:前言
RCU机制出现的比较早,只是在linux kernel中一直到2.5版本的时候才被采用.关于RCU机制,这里就不做过多的介绍了,网上有很多有关RCU介绍和使用的文档.请自行查阅.本文主要是从linux kernel源代码的角度.来分析RCU的实现.
在讨论RCU的实现之前.有必要重申以下几点:
1:RCU使用在读者多而写者少的情况.RCU和读写锁相似.但RCU的读者占锁没有任何的系统开销.写者与写写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源.
2:RCU保护的是指针.这一点尤其重要.因为指针赋值是一条单指令.也就是说是一个原子操作.因它更改指针指向没必要考虑它的同步.只需要考虑cache的影响.
3:读者是可以嵌套的.也就是说rcu_read_lock()可以嵌套调用.
4:读者在持有rcu_read_lock()的时候,不能发生进程上下文切换.否则,因为写者需要要等待读者完成,写者进程也会一直被阻塞.
以下的代码是基于linux kernel 2.6.26
二:使用RCU的实例
Linux kernel中自己附带有详细的文档来介绍RCU,这些文档位于linux-2.6.26.3/Documentation/RCU. 这些文档值得多花点时间去仔细研读一下.
下面以whatisRCU.txt中的例子作为今天分析的起点:
struct foo {
        int a;
        char b;
        long c;
};
DEFINE_SPINLOCK(foo_mutex);
 
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
        struct foo *new_fp;
        struct foo *old_fp;
 
        new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
        spin_lock(&foo_mutex);
        old_fp = gbl_foo;
        *new_fp = *old_fp;
        new_fp->a = new_a;
        rcu_assign_pointer(gbl_foo, new_fp);
        spin_unlock(&foo_mutex);
        synchronize_rcu();
        kfree(old_fp);
}
 
int foo_get_a(void)
{
        int retval;
 
        rcu_read_lock();
        retval = rcu_dereference(gbl_foo)->a;
        rcu_read_unlock();
        return retval;
}
如上代码所示,RCU被用来保护全局指针struct foo *gbl_foo. foo_get_a()用来从RCU保护的结构中取得gbl_foo的值.而foo_update_a()用来更新被RCU保护的gbl_foo的值.
另外,我们思考一下,为什么要在foo_update_a()中使用自旋锁foo_mutex呢?
假设中间没有使用自旋锁.那foo_update_a()的代码如下:
 
void foo_update_a(int new_a)
{
        struct foo *new_fp;
        struct foo *old_fp;
 
        new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
       
        old_fp = gbl_foo;
        1:-------------------------    
        *new_fp = *old_fp;
        new_fp->a = new_a;
        rcu_assign_pointer(gbl_foo, new_fp);
       
        synchronize_rcu();
        kfree(old_fp);
}
假设A进程在上图----标识处被B进程抢点.B进程也执行了goo_ipdate_a().等B执行完后,再切换回A进程.此时,A进程所持的old_fd实际上已经被B进程给释放掉了.此后A进程对old_fd的操作都是非法的.
 
另外,我们在上面也看到了几个有关RCU的核心API.它们为别是:
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()
其中,rcu_read_lock()和rcu_read_unlock()用来保持一个读者的RCU临界区.在该临界区内不允许发生上下文切换.
rcu_dereference():读者调用它来获得一个被RCU保护的指针.
Rcu_assign_pointer():写者使用该函数来为被RCU保护的指针分配一个新的值.这样是为了安全从写者到读者更改其值.这个函数会返回一个新值
 
三:RCU API实现分析
Rcu_read_lock()和rcu_read_unlock()的实现如下:
#define rcu_read_lock() __rcu_read_lock()
#define rcu_read_unlock() __rcu_read_unlock()
 
#define __rcu_read_lock() \
    do { \
        preempt_disable(); \
        __acquire(RCU); \
        rcu_read_acquire(); \
    } while (0)
#define __rcu_read_unlock() \
    do { \
        rcu_read_release(); \
        __release(RCU); \
        preempt_enable(); \
    } while (0)
其中 __acquire(),rcu_read_read_acquire(),rcu_read_release(),rcu_read_release() 都是一些选择编译函数,可以忽略不可看.因此可以得知.rcu_read_lock(),rcu_read_unlock()只是禁止和启用抢占.因为在 读者临界区,不允许发生上下文切换.
 
rcu_dereference()和rcu_assign_pointer()的实现如下:
#define rcu_dereference(p)     ({ \
                typeof(p) _________p1 = ACCESS_ONCE(p); \
                smp_read_barrier_depends(); \
                (_________p1); \
                })
#define rcu_assign_pointer(p, v) \
    ({ \
        if (!__builtin_constant_p(v) || \
            ((v) != NULL)) \
            smp_wmb(); \
        (p) = (v); \
    })
它们的实现也很简单.因为它们本身都是原子操作.因为只是为了cache一致性,插上了内存屏障.可以让其它的读者/写者可以看到保护指针的最新值.
 
synchronize_rcu()在RCU中是一个最核心的函数,它用来等待之前的读者全部退出.我们后面的大部份分析也是围绕着它而进行.实现如下:
void synchronize_rcu(void)
{
    struct rcu_synchronize rcu;
 
    init_completion(&rcu.completion);
    /* Will wake me after RCU finished */
    call_rcu(&rcu.head, wakeme_after_rcu);
 
    /* Wait for it */
    wait_for_completion(&rcu.completion);
}
我们可以看到,它初始化了一个本地变量,它的类型为struct rcu_synchronize.调用call_rcu()之后.一直等待条件变量rcu.competion的满足.
 
在这里看到了RCU的另一个核心API,它就是call_run().它的定义如下:
void call_rcu(struct rcu_head *head,
                void (*func)(struct rcu_head *rcu))
 
它用来等待之前的读者操作完成之后,就会调用函数func.
我们也可以看到,在synchronize_rcu()中,读者操作完了要调用的函数就是wakeme_after_rcu().
另外,call_rcu()用在不可睡眠的条件中,如果中断环境,禁止抢占环境等.而synchronize_rcu()用在可睡眠的环境下.先跟踪看一下wakeme_after_rcu():
static void wakeme_after_rcu(struct rcu_head  *head)
{
    struct rcu_synchronize *rcu;
 
    rcu = container_of(head, struct rcu_synchronize, head);
    complete(&rcu->completion);
}
我们可以看到,该函数将条件变量置真,然后唤醒了在条件变量上等待的进程.
 
看下call_rcu():
void call_rcu(struct rcu_head *head,
                void (*func)(struct rcu_head *rcu))
{
    unsigned long flags;
    struct rcu_data *rdp;
 
    head->func = func;
    head->next = NULL;
    local_irq_save(flags);
    rdp = &__get_cpu_var(rcu_data);
    *rdp->nxttail = head;
    rdp->nxttail = &head->next;
    if (unlikely(++rdp->qlen > qhimark)) {
        rdp->blimit = INT_MAX;
        force_quiescent_state(rdp, &rcu_ctrlblk);
    }
    local_irq_restore(flags);
}
该函数也很简单,就是将head加在了per_cpu变量rcu_data的tail链表上.
Rcu_data定义如下:
DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
由此,我们可以得知,每一个CPU都有一个rcu_data.每个调用call_rcu()/synchronize_rcu()进程所代表的head都会挂到rcu_data的tail链表上.
 
那究竟怎么去判断当前的写者已经操作完了呢?我们在之前看到,不是读者在调用rcu_read_lock()的时候要禁止抢占么?因此,我们只需要判断如有的CPU都进过了一次上下文切换,就说明所有读者已经退出了.
引用<< Linux 2.6内核中新的锁机制--RCU >>( (http://www.ibm.com/developerworks/cn/linux/l-rcu/)中有关这个过程的描述:
“等待适当时机的这一时期称为grace period,而CPU发生了上下文切换称为经历一个quiescent state,grace period就是所有CPU都经历一次quiescent state所需要的等待的时间。垃圾收集器就是在grace period之后调用写者注册的回调函数来完成真正的数据修改或数据释放操作的”
 
要彻底弄清楚这个问题,我们得从RCU的初始化说起.
四:从RCU的初始化说起
RCU的初始化位于start_kernel()àrcu_init().代码如下:
void __init rcu_init(void)
{
    __rcu_init();
}
 
void __init __rcu_init(void)
{
    rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
            (void *)(long)smp_processor_id());
    /* Register notifier for non-boot CPUs */
    register_cpu_notifier(&rcu_nb);
}
Reqister_cpu_notifier()是关于通知链表的操作,可以忽略不看.
跟进rcu_cpu_notify():
static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
                unsigned long action, void *hcpu)
{
    long cpu = (long)hcpu;
 
    switch (action) {
    case CPU_UP_PREPARE:
    case CPU_UP_PREPARE_FROZEN:
        rcu_online_cpu(cpu);
        break;
    case CPU_DEAD:
    case CPU_DEAD_FROZEN:
        rcu_offline_cpu(cpu);
        break;
    default:
        break;
    }
    return NOTIFY_OK;
}
注意到,在__rcu_init()中是以CPU_UP_PREPARE为参数调用此函数,对应流程转入rcu_online_cpu中:
static void __cpuinit rcu_online_cpu(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
    struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
 
    rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
    rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
    open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}
我们从这里又看到了另一个per_cpu变量,rcu_bh_data.有关bh的部份之后再来分析.在这里略过这些部份.
Rcu_init_percpu_data()如下:
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
                        struct rcu_data *rdp)
{
    memset(rdp, 0, sizeof(*rdp));
    rdp->curtail = &rdp->curlist;
    rdp->nxttail = &rdp->nxtlist;
    rdp->donetail = &rdp->donelist;
    rdp->quiescbatch = rcp->completed;
    rdp->qs_pending = 0;
    rdp->cpu = cpu;
    rdp->blimit = blimit;
}
调用这个函数的第二个参数是一个全局变量rcu_ctlblk.定义如下:
static struct rcu_ctrlblk rcu_ctrlblk = {
    .cur = -300,
    .completed = -300,
    .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
    .cpumask = CPU_MASK_NONE,
};
static struct rcu_ctrlblk rcu_bh_ctrlblk = {
    .cur = -300,
    .completed = -300,
    .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
    .cpumask = CPU_MASK_NONE,
};
在rcu_init_percpu_data中,初始化了三个链表,分别是taillist,curlist和donelist.另外, 将rdp->quiescbatch 赋值为 rcp->completed.这个是一个很重要的操作.
Rdp-> quiescbatch表示rcu_data已经完成的grace period序号(在代码中也被称为了batch),rcp->completed表示全部变量rcu_ctrlblk计数已经完成的grace period序号.将rdp->quiescbatch = rcp->completed;,表示不需要等待grace period.
 
回到rcu_online_cpu()中:
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
初始化了RCU_SOFTIRQ类型的软中断.但这个软中断什么时候被打开,还需要之后来分析.
之后,每个CPU的初始化都会经过start_kernel()->rcu_init().相应的,也为每个CPU初始化了RCU的相关结构.
 
五:等待RCU读者操作完成
之前,我们看完了RCU的初始化,现在可以来看一下RCU如何来判断当前的RCU读者已经退出了.
在每一次进程切换的时候,都会调用rcu_qsctr_inc().如下代码片段如示:
asmlinkage void __sched schedule(void)
{
    ......
    ......
    rcu_qsctr_inc(cpu);
    ......
}
Rcu_qsctr_inc()代码如下:
static inline void rcu_qsctr_inc(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
    rdp->passed_quiesc = 1;
}
该函数将对应CPU上的rcu_data的passed_quiesc成员设为了1.
或许你已经发现了,这个过程就标识该CPU经过了一次quiescent state.没错:-)
 
另外,在时钟中断中,会进行以下操作:
void update_process_times(int user_tick)
{
    ......
    ......
 
    if (rcu_pending(cpu))
        rcu_check_callbacks(cpu, user_tick);
    ......
    ......
}
在每一次时钟中断,都会检查是否有需要更新的RCU需要处理,如果有,就会为其调用rcu_check_callbacks().
 
Rcu_pending()的代码如下:
int rcu_pending(int cpu)
{
    return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
        __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
}
同上面一样,忽略bh的部份.
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
    /* This cpu has pending rcu entries and the grace period
     * for them has completed.
     */
    if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
        return 1;
 
    /* This cpu has no pending entries, but there are new entries */
    if (!rdp->curlist && rdp->nxtlist)
        return 1;
 
    /* This cpu has finished callbacks to invoke */
    if (rdp->donelist)
        return 1;
 
    /* The rcu core waits for a quiescent state from the cpu */
    if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
        return 1;
 
    /* nothing to do */
    return 0;
}
上面有四种情况会返回1,分别对应:
1:该CPU上有等待处理的回调函数,且已经经过了一个batch(grace period).rdp->datch表示rdp在等待的batch序号
2:上一个等待已经处理完了,又有了新注册的回调函数.
3:等待已经完成,但尚末调用该次等待的回调函数.
4:在等待quiescent state.
关于rcp和rdp结构中成员的含义,我们等用到的时候再来分析.
 
如果rcu_pending返回1,就会进入到rcu_check_callbacks().代码如下:
void rcu_check_callbacks(int cpu, int user)
{
    if (user ||
        (idle_cpu(cpu) && !in_softirq() &&
                hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
        rcu_qsctr_inc(cpu);
        rcu_bh_qsctr_inc(cpu);
    } else if (!in_softirq())
        rcu_bh_qsctr_inc(cpu);
    raise_rcu_softirq();
}
如果已经CPU中运行的进程是用户空间进程或者是CPU空闲且不处于中断环境,那么,它也已经进过了一次切换.注意,RCU只能在内核空间使用.
最后调用raise_rcu_softirq()打开了软中断处理.相应的,也就调用RCU的软中断处理函数.结合上面分析的初始化流程,软中断的处理函数为rcu_process_callbacks().
代码如下:
static void rcu_process_callbacks(struct softirq_action *unused)
{
    __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
    __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
}
 
在阅读__rcu_process_callbacks()之前,先来了解一下rdp中几个链表的含义:
每次新注册的回调函数,都会链入到rdp->taillist.
当前等待grace period完成的函数都会链入到rdp->curlist上.
到等待的grace period已经到来,就会将curlist上的链表移到donelist上.
当一个grace period过了之后,就会将taillist上的数据移到rdp->curlist上.之后加册的回调函数又会将其加到rdp->taillist上.
 
__rcu_process_callbacks()代码分段分析如下:
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
        *rdp->donetail = rdp->curlist;
        rdp->donetail = rdp->curtail;
        rdp->curlist = NULL;
        rdp->curtail = &rdp->curlist;
    }
 
    如果有需要处理的回调函数,且已经经过了一次grace period.就将curlist上的数据移到donetlist上.
其中,crp->completed表示已经完成的grace period.rdp->batch表示该CPU正在等待的grace period序号.
 
    if (rdp->nxtlist && !rdp->curlist) {
        local_irq_disable();
        rdp->curlist = rdp->nxtlist;
        rdp->curtail = rdp->nxttail;
        rdp->nxtlist = NULL;
        rdp->nxttail = &rdp->nxtlist;
        local_irq_enable();
 
        /*
         * start the next batch of callbacks
         */
 
        /* determine batch number */
        rdp->batch = rcp->cur + 1;
        /* see the comment and corresponding wmb() in
         * the rcu_start_batch()
         */
        smp_rmb();
 
        if (!rcp->next_pending) {
            /* and start it/schedule start if it's a new batch */
            spin_lock(&rcp->lock);
            rcp->next_pending = 1;
            rcu_start_batch(rcp);
            spin_unlock(&rcp->lock);
        }
    }
如果上一个等待的回调函数处理完了,而且又有了新注册的回调函数.就将taillist上的数据移动到curlist上.并开启新的grace period等待.
注意里面几个变量的赋值:
rdp->batch = rcp->cur + 1表示该CPU等待的grace period置为当前已发生grace period序号的下一个.
每次启动一个新的grace period等待之后,就会将rcp->next_pending.在启动的过程中,也就是rcu_start_batch()的过程中,会将rcp->next_pending置为1.设置这个变量主要是防止多个写者竞争的情况
 
    //更新相关信息
    rcu_check_quiescent_state(rcp, rdp);
    //处理等待完成的回调函数
    if (rdp->donelist)
        rcu_do_batch(rdp);
}
接着,更新相关的信息,例如,判断当前CPU是否进行了quiescent state.或者grace period是否已经完成.
最后再处理挂在rdp->donelist上的链表.
 
这里面有几个子函数值得好好分析,分别分析如下:
第一个要分析的是rcu_start_batch():
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
    if (rcp->next_pending &&
            rcp->completed == rcp->cur) {
        rcp->next_pending = 0;
        smp_wmb();
        rcp->cur++;
        smp_mb();
        cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
 
        rcp->signaled = 0;
    }
}
这个函数的代码虽然很简单,但隐藏了很多玄机.
每次启动一个新的grace period等待的时候就将rcp->cur加1,将rcp->cpumask中,将存在的CPU的位置1.
其中,if判断必须要满足二个条件:
第一:rcp->next_pending必须为1.我们把这个函数放到__rcu_process_callbacks()这个大环境中看一下:
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    ......
    ......
    if (rdp->nxtlist && !rdp->curlist) {
        ......
        if (!rcp->next_pending) {
            /* and start it/schedule start if it's a new batch */
            spin_lock(&rcp->lock);
            rcp->next_pending = 1;
            rcu_start_batch(rcp);
            spin_unlock(&rcp->lock);
        }
    }
}
首先,rcp->next_pending为0才会调用rcu_start_batch()启动一个新的进程.然后,将 rcp->next_pending置为1,再调用rcu_start_batch().在这里要注意中间的自旋锁.然后在 rcu_start_batch()中,再次判断rcp->next_pending为1后,再进行后续操作,并将 rcp->next_pending置为0.
为什么这里需要这样的判断呢? 如果其它CPU正在开启一个新的grace period等待,那就用不着再次开启一个新的等待了,直接返回即可.
 
第二: rcu_start_batch()中 if要满足的第二个条件为rcp->completed == rcp->cur.也就是说前面的grace period全部都完成了.每次开启新等待的时候都会将rcp->cur加1.每一个等待完成之后,都会将rc-> completed等于rcp->cur.
 
第二个要分析的函数是rcu_check_quiescent_state().代码如下:
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    if (rdp->quiescbatch != rcp->cur) {
        /* start new grace period: */
        rdp->qs_pending = 1;
        rdp->passed_quiesc = 0;
        rdp->quiescbatch = rcp->cur;
        return;
    }
 
    /* Grace period already completed for this cpu?
     * qs_pending is checked instead of the actual bitmap to avoid
     * cacheline trashing.
     */
    if (!rdp->qs_pending)
        return;
 
    /*
     * Was there a quiescent state since the beginning of the grace
     * period? If no, then exit and wait for the next call.
     */
    if (!rdp->passed_quiesc)
        return;
    rdp->qs_pending = 0;
 
    spin_lock(&rcp->lock);
    /*
     * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
     * during cpu startup. Ignore the quiescent state.
     */
    if (likely(rdp->quiescbatch == rcp->cur))
        cpu_quiet(rdp->cpu, rcp);
 
    spin_unlock(&rcp->lock);
}
首先,如果rdp->quiescbatch != rcp->cur.则说明又开启了一个新的等待,因此需要重新处理这个等待,首先将rdp->quiescbatch 更新为rcp->cur.然后,使rdp->qs_pending为1.表示有等待需要处理. passed_quiesc也被清成了0.
然后,再判断rdp->passed_quiesc是否为真,记得我们在之前分析过,在每次进程切换或者进程切换的时候,都会调用rcu_qsctr_inc().该函数会将rdp->passed_quiesc置为1.
因此,在这里判断这个值是为了检测该CPU上是否发生了上下文切换.
之后,就是一段被rcp->lock保护的一段区域.如果还是等待没有发生改变,就会调用cpu_quiet(rdp->cpu, rcp)将该CPU位清零.如果是一个新的等待了,就用不着清了,因为需要重新判断该CPU上是否发生了上下文切换.
 
cpu_quiet()函数代码如下:
static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
{
    cpu_clear(cpu, rcp->cpumask);
    if (cpus_empty(rcp->cpumask)) {
        /* batch completed ! */
        rcp->completed = rcp->cur;
        rcu_start_batch(rcp);
    }
}
它清除当前CPU对应的位,如果CPMMASK为空,对应所有的CPU都发生了进程切换,就会将rcp->completed = rcp->cur.并且根据需要是否开始一个grace period等待.
 
最后一个要分析的函数是rcu_do_batch().它进行的是清尾的工作.如果等待完成了,那就必须要处理donelist链表上挂载的数据了.代码如下:
static void rcu_do_batch(struct rcu_data *rdp)
{
    struct rcu_head *next, *list;
    int count = 0;
 
    list = rdp->donelist;
    while (list) {
        next = list->next;
        prefetch(next);
        list->func(list);
        list = next;
        if (++count >= rdp->blimit)
            break;
    }
    rdp->donelist = list;
 
    local_irq_disable();
    rdp->qlen -= count;
    local_irq_enable();
    if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
        rdp->blimit = blimit;
 
    if (!rdp->donelist)
        rdp->donetail = &rdp->donelist;
    else
        raise_rcu_softirq();
}
它遍历处理挂在链表上的回调函数.在这里,注意每次调用的回调函数有最大值限制.这样做主要是防止一次调用过多的回调函数而产生不必要系统负载.如果donelist中还有没处理完的数据,打开RCU软中断,在下次软中断到来的时候接着处理.
 
五:几种RCU情况分析
1:如果CPU 1上有进程调用rcu_read_lock进入临界区,之后退出来,发生了进程切换,新进程又通过rcu_read­_lock进入临界区.由于RCU软 中断中只判断一次上下文切换,因此,在调用回调函数的时候,仍然有进程处于RCU的读临界区,这样会不会有问题呢?
这样是不会有问题的.还是上面的例子:
        spin_lock(&foo_mutex);
        old_fp = gbl_foo;
        *new_fp = *old_fp;
        new_fp->a = new_a;
        rcu_assign_pointer(gbl_foo, new_fp);
        spin_unlock(&foo_mutex);
        synchronize_rcu();
        kfree(old_fp);
 
使用synchronize_rcu ()只是为了等待持有old_fd(也就是调用rcu_assign_pointer ()更新之前的gbl_foo)的进程退出.而不需要等待所有的读者全部退出.这是因为,在rcu_assign_pointer ()之后的读取取得的保护指针,已经是更新好的新值了.
 
2:上面分析的似乎是针对有挂载链表的CPU而言的,那对于只调用rcu_read_lock()的CPU,它们是怎么处理的呢?
首先,每次启动一次等待,肯定是会更新rcp->cur的.因此,在rcu_pending()的判断中,下面语句会被满足:
    if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
        return 1;
因此会进入到RCU的软中断.在软中断处理中:
rcu_process_callbacks() à __rcu_process_callbacks() àrcu_check_quiescent_state()
中,如果该CPU上有进程切换,就会各新rcp中的CPU 掩码数组.
 
3:如果一个CPU连续调用synchronize_rcu()或者call_rcu()它们会有什么影响呢?
如果当前有请求在等待,就会新请提交的回调函数挂到taillist上,一直到前一个等待完成,再将taillist的数据移到 curlist,并开启一个新的等待,因此,也就是说,在前一个等待期间提交的请求,都会放到一起处理.也就是说,他们会共同等待所有CPU切换完成.
举例说明如下:
假设grace period时间是12ms.在12ms内,先后有A,B,C进程提交请求.
那系统在等待处理完后,交A,B,C移到curlist中,开始一个新的等待.
 
六:有关rcu_read_lock_bh()/rcu_read_unlock_bh()/call_rcu_bh().
在上面的代码分析的时候,经常看到带有bh的RCU代码.现在来看一下这些带bh的RCU是什么样的.
#define rcu_read_lock_bh() __rcu_read_lock_bh()
#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
 
#define __rcu_read_lock_bh() \
    do { \
        local_bh_disable(); \
        __acquire(RCU_BH); \
        rcu_read_acquire(); \
    } while (0)
#define __rcu_read_unlock_bh() \
    do { \
        rcu_read_release(); \
        __release(RCU_BH); \
        local_bh_enable(); \
    } while (0)
根据上面的分析:bh RCU跟普通的RCU相比不同的是,普通RCU是禁止内核抢占,而bh RCU是禁止下半部.
其实,带bh的RCU一般在软中断使用,不过计算quiescent state并不是发生一次上下文切换.而是发生一次softirq.我们在后面的分析中可得到印证.
 
Call_rcu_bh()代码如下:
void call_rcu_bh(struct rcu_head *head,
                void (*func)(struct rcu_head *rcu))
{
    unsigned long flags;
    struct rcu_data *rdp;
 
    head->func = func;
    head->next = NULL;
    local_irq_save(flags);
    rdp = &__get_cpu_var(rcu_bh_data);
    *rdp->nxttail = head;
    rdp->nxttail = &head->next;
 
    if (unlikely(++rdp->qlen > qhimark)) {
        rdp->blimit = INT_MAX;
        force_quiescent_state(rdp, &rcu_bh_ctrlblk);
    }
 
    local_irq_restore(flags);
}
它跟call_rcu()不相同的是,rcu是取per_cpu变量rcu__data和全局变量rcu_ctrlblk.而bh RCU是取rcu_bh_data,rcu_bh_ctrlblk.他们的类型都是一样的,这样做只是为了区分BH和普通RCU的等待.
 
对于rcu_bh_qsctr_inc
static inline void rcu_bh_qsctr_inc(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
    rdp->passed_quiesc = 1;
}
它跟rcu_qsctr_inc()机同,也是更改对应成员.
 
所不同的是,调用rcu_bh_qsctr_inc()的地方发生了变化.
asmlinkage void __do_softirq(void)
{
    ......
        do {
        if (pending & 1) {
            h->action(h);
            rcu_bh_qsctr_inc(cpu);
        }
        h++;
        pending >>= 1;
    } while (pending);
    ......
}
也就是说,在发生软中断的时候,才会认为是经过了一次quiescent state.
 
七:小结
基本上,RCU的代码架构不难,难的是对它的临界条件分析.作者的思路也相当缜密.值得花时间仔细研读.

阅读(1679) | 评论(0) | 转发(0) |
0

上一篇:ucontext 例程

下一篇:启动前30秒交互

给主人留下些什么吧!~~