Chinaunix首页 | 论坛 | 博客
  • 博客访问: 7070235
  • 博文数量: 703
  • 博客积分: 10821
  • 博客等级: 上将
  • 技术积分: 12041
  • 用 户 组: 普通用户
  • 注册时间: 2005-12-02 10:41
个人简介

中科院云平台架构师,专注于数字化、智能化,技术方向:云、Linux内核、AI、MES/ERP/CRM/OA、物联网、传感器、大数据、ML、微服务。

文章分类

全部博文(703)

分类: LINUX

2020-05-14 17:56:42

##总览
urcu全称user-space read-copy update即用户态RCU,它提供了与内核RCU相似的功能,使得在多核多线程并发访问共享数据时,reader线程不用阻塞于writer线程的操作,从而使reader线程运行的更快,非常适合于读多写少的场景。
urcu特性
针对不同的应用场景,urcu提供了以下5种不同的flavors
    urcu
    QSBR(quiescent-state-based RCU)
    Memory-barrier-base RCU
    “Bullet-proof” RCU
    Signal-based RCU

其中qsbr是5种flavor中读性能最好的,它也称为显式静默期声明模式。这种模式下,rcu_read_lock()和rcu_read_unlock()为空操作,对于reader来说负担为零(这一点是另外4种flavor不具备的),但这带来的代价是,每个reader线程必须周期性的调用rcu_quiescent_state()来声明自己进入静默期。不过,需要注意的是,并非每次读操作完成后都需要做此声明,考虑到读操作的性能和应用读写操作次数的不平衡性,通常的做法是每进行一定次数(如1024)的读操作之后声明进入一次静默期。还有一点,每个会进入RCU-read-side critical sections的线程都需要事先通过rcu_register_thread()接口进行注册,退出时调用rcu_unregister_thread()接口进行去注册。
实现
全局计数器
RCU机制是用于多核系统中,保持每个核上的线程所看到的全局数据一致性的一种机制,所以需要一种手段可以判断当writer线程进行数据更新后,reader线程看到的数据是否已经最新,为此urcu维护了一个全局的计数器rcu_gp.ctr,每次writer进行同步操作(synchronize),都会使计数器加1,表示数据我已经更新了,reader你需要更新。


  1. struct rcu_gp rcu_gp;
  2. struct rcu_gp {
  3.     unsigned long ctr;
  4.     ...
  5. } __attribute__((aligned(CAA_CACHE_LINE_SIZE)));

每个reader线程也持有一个线程内部的计数器ctx,如果这个ctx与rcu_gp.ctr一致,就表明本reader线程的数据已经最新(ACTIVE_CURRENT),反之则不是最新(ACTIVE_OLD),
  1. struct rcu_reader {
  2.     unsigned long ctr;
  3.     ...
  4. };
  5. DECLARE_URCU_TLS(struct rcu_reader, rcu_reader)
读者 注册(register)\去注册(unregister)\上线(online)\下线(offline)

qsbr rcu的实现中,reader线程必须进行显式注册, 将自己挂接在全局链表registry上,通俗地说就是将自己置于全局管理之下,这样当writer在进行同步(synchronize)时,才能知道哪些线程需要同步(只有注册过的线程才需要)。线程的上线状态分为在线(online)和离线(offline),其中处于offline的线程虽然在registry链表上,但在synchronized时,writer会忽略这些线程。线程注册会默认置于online状态。

void rcu_register_thread(void)
{
    URCU_TLS(rcu_reader).tid = pthread_self();
    mutex_lock(&rcu_registry_lock);
    URCU_TLS(rcu_reader).registered = 1;
    cds_list_add(&URCU_TLS(rcu_reader).node, ?istry);
    _rcu_thread_online();
}

线程上线(online)的本质,就是将rcu_gp.ctr的值存储到本线程的ctr中
  1. static inline void _rcu_thread_online(void)
  2. {
  3.     _CMM_STORE_SHARED(URCU_TLS(rcu_reader).ctr, CMM_LOAD_SHARED(rcu_gp.ctr));
  4. }
而线程下线(offline),则是将本线程的ctr清零

  1. static inline void _rcu_thread_offline(void)
  2. {
  3.     CMM_STORE_SHARED(URCU_TLS(rcu_reader).ctr, 0);
  4.     wake_up_gp();
  5. }

写者 同步(synchronize)

rcu机制的一个典型场景: 全局指针gp_ptr指向内存区域A,writer在申请了一份新的内存区域B后,使全局指针gp_ptr指向B。在多核系统中,writer在更新后并不知道有没有reader正在区域A的数据,所以它需要阻塞等待所有的reader线程已经更新(即reader.ctx等于gp.ctr),这个操作便是同步(synchronize),其简化版实现代码片段如下

  1. void synchronize_rcu(void)
  2. {
  3.     CDS_LIST_HEAD(qsreaders);
  4.     DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING);
  5.     ......
  6.     urcu_wait_add(&gp_waiters, &wait)  /* 将writer自身置于wait状态 */
  7.     ......
  8.     wait_for_readers(?istry, &cur_snap_readers, &qsreaders); /* writer阻塞在这里 */
  9.     .....
  10. }
  11. static void wait_for_readers(struct cds_list_head *input_readers,
  12.                  struct cds_list_head *cur_snap_readers,
  13.                  struct cds_list_head *qsreaders)
  14. {
  15.     unsigned int wait_loops = 0;
  16.     struct rcu_reader *index, *tmp;
  17.     /*
  18.      * Wait for each thread URCU_TLS(rcu_reader).ctr to either
  19.      * indicate quiescence (offline), or for them to observe the
  20.      * current rcu_gp.ctr value.
  21.      */
  22.     for (;;) {          /* 直到所有reader.ctr已经到最新才跳出循环 */
  23.         uatomic_set(&rcu_gp.futex, -1);
  24.         cds_list_for_each_entry(index, input_readers, node) {
  25.         _CMM_STORE_SHARED(index->waiting, 1);
  26.         
  27.         /* 遍历所有输入的reader */
  28.         cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
  29.             switch (rcu_reader_state(&index->ctr)) {
  30.             case RCU_READER_ACTIVE_CURRENT:   /* reader.ctr已经最新 */
  31.             case RCU_READER_INACTIVE:      /* reader处于offline状态 */
  32.                 cds_list_move(&index->node, qsreaders); /* 从遍历列表中移除 */
  33.                 break;
  34.             case RCU_READER_ACTIVE_OLD:     /* reader.ctr不是最新 */
  35.                 break;
  36.             }
  37.         }
  38.         if (cds_list_empty(input_readers)) {
  39.             uatomic_set(&rcu_gp.futex, 0);   /* 列表空了,表示所有reader已更新 跳出循环 */
  40.             break;
  41.         }
  42.     }
  43. }
读者 静默(quiescent)

从上面writer synchronize的过程可知,要使writer结束阻塞状态,reader必须将其ctr更新到最新(除非它处于offline状态),更新到最新是通过reader调用rcu_quiescent_state()接口声明静默期完成的.

  1. static inline void _rcu_quiescent_state(void)
  2. {
  3.     unsigned long gp_ctr;
  4.     if ((gp_ctr = CMM_LOAD_SHARED(rcu_gp.ctr)) == URCU_TLS(rcu_reader).ctr)
  5.         return;
  6.     _rcu_quiescent_state_update_and_wakeup(gp_ctr);
  7. }
  8. static inline void _rcu_quiescent_state_update_and_wakeup(unsigned long gp_ctr)
  9. {
  10.     _CMM_STORE_SHARED(URCU_TLS(rcu_reader).ctr, gp_ctr); /* 将本线程ctr更新为gp_ctr */
  11.     wake_up_gp();           /* 唤醒writer */
  12. }
example
这个example节选自urcu官方代码的测试例程test_urcu_qsbr
测试例程根据用户输入创建若干个writer和reader,writer不断申请释放内存资源,并用全局指针test_rcu_pointer记录资源,reader不断读取test_rcu_pointer指向资源的值,并且每1024次声明静默期,最后统计reader和writer的次数。

  1. void *thr_writer(void *_count)
  2. {
  3.     unsigned long long *count = _count;
  4.     int *new, *old;
  5.     for (;;) {
  6.         new = malloc(sizeof(int));
  7.         assert(new);
  8.         *new = 8;
  9.         old = rcu_xchg_pointer(&test_rcu_pointer, new);
  10.         synchronize_rcu();
  11.         if (old)
  12.             *old = 0;
  13.         free(old);
  14.         URCU_TLS(nr_writes)++;
  15.     }
  16.     printf_verbose("thread_end %s, tid %lu\n",
  17.             "writer", urcu_get_thread_id());
  18.     *count = URCU_TLS(nr_writes);
  19.     return ((void*)2);
  20. }
  21. void *thr_reader(void *_count)
  22. {
  23.     unsigned long long *count = _count;
  24.     int *local_ptr;
  25.     
  26.     rcu_register_thread();
  27.     rcu_thread_offline();
  28.     rcu_thread_online();
  29.     for (;;) {
  30.         rcu_read_lock();
  31.         local_ptr = rcu_dereference(test_rcu_pointer);
  32.         if (local_ptr)
  33.             assert(*local_ptr == 8);
  34.         rcu_read_unlock();
  35.         URCU_TLS(nr_reads)++;
  36.         /* QS each 1024 reads */
  37.         if (caa_unlikely((URCU_TLS(nr_reads) & ((1 << 10) - 1)) == 0))
  38.             rcu_quiescent_state();
  39.     }
  40.     rcu_unregister_thread();
  41.     *count = URCU_TLS(nr_reads);
  42.     printf_verbose("thread_end %s, tid %lu\n",
  43.             "reader", urcu_get_thread_id());
  44.     return ((void*)1);
  45. }
perfomance



flavor     total read     totol write
urcu     7826237468     91
qsbr     10427746859     1746176
memory-barrir     365980233     10333212
bullet-proof     568170476     5226213
signal-based     9522616041     2329
call rcu


前面writer的例子中,当writer进行数据更新后需要释放旧资源,而这要在synchronize_rcu()接触阻塞后才能进行(否则reader还在使用呐),但还有的时候,我们希望提高writer的效率,‘释放’过程不要阻塞,再reader进行了更新后,再进行资源释放,urcu提供了call_rcu()接口来完成这一功能struct rcu_head {

  1.     struct cds_wfcq_node next;
  2.     void (*func)(struct rcu_head *head);
  3. };
  4. void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *head);

一般得,将要延迟释放的数据结构内嵌一个rcu_head结构,在需要延迟释放时调用

  1. struct global_foo {
  2.     struct rcu_head rcu_head;
  3.     ......
  4. };
  5. struct global_foo g_foo;



在writer更新后,需要释放旧的资源时时,调用call_rcu(),之后当所有reader都更新完成后,设置的回调函数free_func被自动调用

  1. call_rcu(&g_foo.rcu_head, free_func);

那么urcu是如何实现这个功能的呢?

既然不能阻塞将writer阻塞在synchronize_rcu(),那总得有一个线程阻塞在synchronize_rcu()等待所有reader更新,于是urcu内部创建一个线程,称为call_rcu_thread,这个线程专门用于writer call_rcu() (这个线程只会在第一次call_rcu()被创建,之后的call_rcu()均使用这个线程),以下是call_rcu_thread创建时的代码片段

  1. /* 第一次call_rcu()会调用到 call_rcu_data_init() */
  2. static void call_rcu_data_init(struct call_rcu_data **crdpp,unsigned long flags,int cpu_affinity)
  3. {
  4.     struct call_rcu_data *crdp;
  5.     int ret;
  6.     crdp = malloc(sizeof(*crdp));
  7.     if (crdp == NULL)
  8.         urcu_die(errno);
  9.     memset(crdp, '\0', sizeof(*crdp));
  10.     cds_wfcq_init(&crdp->cbs_head, &crdp->cbs_tail);
  11.     ......
  12.     ret = pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp); /* 创建call_rcu_thread */
  13.     if (ret)
  14.         urcu_die(ret);
  15. }
  16. static void *call_rcu_thread(void *arg)
  17. {
  18.     struct call_rcu_data *crdp = (struct call_rcu_data *) arg;
  19.     rcu_register_thread();
  20.     URCU_TLS(thread_call_rcu_data) = crdp;
  21.     for (;;) {
  22.             ......
  23.             synchronize_rcu();  /* 在这里完成同步 */
  24.             rhp->func(rhp);    /* 执行回调 */
  25.             ......
  26.     }
  27.     rcu_unregister_thread();
  28.     return NULL;
  29. }

原文链接:https://blog.csdn.net/chenmo187J3X1/java/article/details/80992945

阅读(3247) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~