Chinaunix首页 | 论坛 | 博客
  • 博客访问: 712480
  • 博文数量: 31
  • 博客积分: 330
  • 博客等级: 一等列兵
  • 技术积分: 3004
  • 用 户 组: 普通用户
  • 注册时间: 2012-09-05 22:38
个人简介

java开发工程师,专注于内核源码,算法,数据结构。 qq:630501400

文章分类
文章存档

2014年(2)

2013年(22)

2012年(7)

分类: C/C++

2012-11-05 23:49:54

memcached中存在一个大的hash表,采用队列解决hash冲突,在hash表中的item数量超过hash表的1.5倍时候,会通过控制信号量,激活assoc_maintenance_thread线程,开始hashtable的扩容工作。扩容工作使用两个hashtable,一个是一直使用的primary_table,一个是在扩容时使用的old_table,在扩容时会使用pthread_mutex_trylock来锁定一个锁,使得其他线程不能对hash表进行插入操作。

hash表插入item代码如下:items.c,在这之前会获得控制item的锁,cache_lock是LRU的锁

  1. 291 int do_item_link(item *it, const uint32_t hv) {
  2. 292 MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
  3. 293 assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
  4. 294 mutex_lock(&cache_lock);//插入item之前就要获得LRU锁
  5. 295 it->it_flags |= ITEM_LINKED
  6. 296 it->time = current_time;
  7. 297
  8. 298 STATS_LOCK();
  9. 299 stats.curr_bytes += ITEM_ntotal(it);
  10. 300 stats.curr_items += 1;
  11. 301 stats.total_items += 1;
  12. 302 STATS_UNLOCK();
  13. 303
  14. 304 /* Allocate a new CAS ID on link. */
  15. 305 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
  16. 306 assoc_insert(it, hv); //插入item
  17. 307 item_link_q(it);
  18. 308 refcount_incr(&it->refcount);
  19. 309 mutex_unlock(&cache_lock);//插入完毕就释放LRU锁
  20. 310
  21. 311 return 1;
  22. 312 }

在插入到hash表后会判断hash表中item的数量,如果超过1.5倍的hash桶的数量,就开始扩容

assoc.c

  1. 154 int assoc_insert(item *it, const uint32_t hv) {
  2. 155 unsigned int oldbucket;
  3. 156
  4. 157 // assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */
  5. 158
  6. 159 if (expanding &&
  7. 160 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
  8. //可以看出,在扩容过程中epanding=true,并且item的hash值大于expand_bucket,这个说明复制操作还没有到达这个hash桶,所以这时必须先放到oldbucket中,注意不能按照新的hashpower,直接插入到primary_hashtable中,因为直接插入,会插入到桶链表中的第一个元素,破坏了属于这个桶链表的元素顺序,这个顺序是不可预估的,因为是老的元素按照新的hashpower进行重新分配。
  9. 161 {
  10. 162 it->h_next = old_hashtable[oldbucket];
  11. 163 old_hashtable[oldbucket] = it;
  12. 164 } else {
  13.    //如果已经处理过的hash桶,就可以直接插入到primary_hashtable中了。
  14. 165 it->h_next = primary_hashtable[hv & hashmask(hashpower)];
  15. 166 primary_hashtable[hv & hashmask(hashpower)] = it;
  16. 167 }
  17. 168
  18. 169 hash_items++;
  19. //全局hash表item的数量
  20. 170 if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
  21. //超过了1.5倍开始扩容
  22. 171 assoc_start_expand();
  23. 172 }
  24. 173
  25. 174 MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
  26. 175 return 1;
  27. 176 }

assoc_start_expand()函数就是释放信号量激发assoc_maintenance_thread线程

assoc.c

  1. 146 static void assoc_start_expand(void) {
  2. 147 if (started_expanding)
  3. 148 return;
  4. 149 started_expanding = true;
  5. 150 pthread_cond_signal(&maintenance_cond);
  6. 151 }

memcached启动时会通过环境变量确定每次通过锁操作mutex_lock(&cache_lock),来移动hash桶的数量。

assoc.c

  1. 266 int start_assoc_maintenance_thread() {
  2. 267 int ret;
  3. 268 char *env = getenv("MEMCACHED_HASH_BULK_MOVE"); //通过环境来确定每次锁操作移动hash桶的数量,默认是1个。
  4. 269 if (env != NULL) {
  5. 270 hash_bulk_move = atoi(env);
  6. 271 if (hash_bulk_move == 0) {
  7. 272 hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
  8. 273 }
  9. 274 }
  10. 275 if ((ret = pthread_create(&maintenance_tid, NULL,
  11. 276 assoc_maintenance_thread, NULL)) != 0) {
  12. 277 fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
  13. 278 return -1;
  14. 279 }
  15. 280 return 0;
  16. 281 }

assoc_maintenance_thread线程代码:


  1. 204 static void *assoc_maintenance_thread(void *arg) {
  2. 205
  3. 206 while (do_run_maintenance_thread) { //线程开始无限循环
  4. 207 int ii = 0;
  5. 208
  6. 209 /* Lock the cache, and bulk move multiple buckets to the new
  7. 210 * hash table. */
  8. 211 item_lock_global();                 //获得控制item的锁
  9. 212 mutex_lock(&cache_lock);            //开始锁定LRU的cache_lock
  10. 213 //开始expanding为false,不执行扩容操作,因为没有达到hash容量最大值
  11. 214 for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
  12.     // hash_bulk_move默认是1,可以看出默认这个for循环执行1次就退出
  13. 215 item *it, *next;
  14. 216 int bucket;
  15. 217 //处理hash每个桶中每个链表,重新按照新的hashpower,计算它属于哪个桶
  16. 218 for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
  17. 219 next = it->h_next;
  18. 220 //把item放到primary中的合适的位置
  19. 221 bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
  20. 222 it->h_next = primary_hashtable[bucket];
  21. 223 primary_hashtable[bucket] = it;
  22. 224 }
  23. 225
  24. 226 old_hashtable[expand_bucket] = NULL;
  25. 227
  26. 228 expand_bucket++;
  27.   //每次处理完成一个桶,桶的index计数加一
  28.   //如果到达最后,都处理迁移完桶中的数据了,重新把expanding标志置为false,释放old_table
  29. 229 if (expand_bucket == hashsize(hashpower - 1)) {
  30. 230 expanding = false;
  31. 231 free(old_hashtable);
  32. 232 STATS_LOCK();
  33. 233 stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
  34. 234 stats.hash_is_expanding = 0;
  35. 235 STATS_UNLOCK();
  36. 236 if (settings.verbose > 1)
  37. 237 fprintf(stderr, "Hash table expansion done\n");
  38. 238 }
  39. 239 }
  40. 241 mutex_unlock(&cache_lock); 
  41. 242 item_unlock_global();//每次迁移完hash_bulk_move个桶就释放了锁,使得对hashtable的操作继续执行。
  42. 243 //扩容完毕后,使得控制slab平衡的一个线程继续执行,这个线程的分析后面会讲。
  43. 244 if (!expanding) {
  44. 245 /* finished expanding. tell all threads to use fine-grained locks */
  45. 246 switch_item_lock_type(ITEM_LOCK_GRANULAR);
  46. 247 slabs_rebalancer_resume();
  47. 248 /* We are done expanding.. just wait for next invocation */
  48. 249 mutex_lock(&cache_lock);
  49. 250 started_expanding = false;
  50. 251 pthread_cond_wait(&maintenance_cond, &cache_lock);
  51.  //使得cachelock在maintenace条件上继续等待,这个不影响其他加锁写item数据的线程的执行
  52. 252 /* Before doing anything, tell threads to use a global lock */
  53. 253 mutex_unlock(&cache_lock);
  54. 254 slabs_rebalancer_pause();
  55. 255 switch_item_lock_type(ITEM_LOCK_GLOBAL);
  56. 256 mutex_lock(&cache_lock);
  57. 257 assoc_expand();
  58. 258 mutex_unlock(&cache_lock);
  59. 259 }
  60. 260 }
  61. 261 return NULL;
  62. 262 }

通过上面的代码,个人认为在插入item后,确定容量超出值域就开始扩容, hash_bulk_move变量的作用就是每次加锁,在拥有锁的过程中,移动复制hash桶的数量,可以试想在写入操作十分频繁的过程中,如果跳高hash_bulk_move这个变量,在复制操作的过程中, assoc_maintenance_thread线程占有锁的时间就会变大,写入item的时间就会变长。反之如果hash_bulk_move变量的很小,在写入频繁的条件下,写入操作不会受到锁的影响(assoc_maintenance_thread占有锁时间短),但是如果写入的item恰好都是大于expand_bucket的,会导致old_table表很大,后面的复制过程copy old_tableprimary_hashtable的时间也会增长。

阅读(8054) | 评论(0) | 转发(2) |
给主人留下些什么吧!~~