memcached中存在一个大的hash表,采用队列解决hash冲突,在hash表中的item数量超过hash表的1.5倍时候,会通过控制信号量,激活assoc_maintenance_thread线程,开始hashtable的扩容工作。扩容工作使用两个hashtable,一个是一直使用的primary_table,一个是在扩容时使用的old_table,在扩容时会使用pthread_mutex_trylock来锁定一个锁,使得其他线程不能对hash表进行插入操作。
对hash表插入item代码如下:items.c,在这之前会获得控制item的锁,cache_lock是LRU的锁
- 291 int do_item_link(item *it, const uint32_t hv) {
- 292 MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
- 293 assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
- 294 mutex_lock(&cache_lock);//插入item之前就要获得LRU锁
- 295 it->it_flags |= ITEM_LINKED;
- 296 it->time = current_time;
- 297
- 298 STATS_LOCK();
- 299 stats.curr_bytes += ITEM_ntotal(it);
- 300 stats.curr_items += 1;
- 301 stats.total_items += 1;
- 302 STATS_UNLOCK();
- 303
- 304 /* Allocate a new CAS ID on link. */
- 305 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
- 306 assoc_insert(it, hv); //插入item
- 307 item_link_q(it);
- 308 refcount_incr(&it->refcount);
- 309 mutex_unlock(&cache_lock);//插入完毕就释放LRU锁
- 310
- 311 return 1;
- 312 }
在插入到hash表后会判断hash表中item的数量,如果超过1.5倍的hash桶的数量,就开始扩容
assoc.c
- 154 int assoc_insert(item *it, const uint32_t hv) {
- 155 unsigned int oldbucket;
- 156
- 157 // assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */
- 158
- 159 if (expanding &&
- 160 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
- //可以看出,在扩容过程中epanding=true,并且item的hash值大于expand_bucket,这个说明复制操作还没有到达这个hash桶,所以这时必须先放到oldbucket中,注意不能按照新的hashpower,直接插入到primary_hashtable中,因为直接插入,会插入到桶链表中的第一个元素,破坏了属于这个桶链表的元素顺序,这个顺序是不可预估的,因为是老的元素按照新的hashpower进行重新分配。
- 161 {
- 162 it->h_next = old_hashtable[oldbucket];
- 163 old_hashtable[oldbucket] = it;
- 164 } else {
- //如果已经处理过的hash桶,就可以直接插入到primary_hashtable中了。
- 165 it->h_next = primary_hashtable[hv & hashmask(hashpower)];
- 166 primary_hashtable[hv & hashmask(hashpower)] = it;
- 167 }
- 168
- 169 hash_items++;
- //全局hash表item的数量
- 170 if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
- //超过了1.5倍开始扩容
- 171 assoc_start_expand();
- 172 }
- 173
- 174 MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
- 175 return 1;
- 176 }
assoc_start_expand()函数就是释放信号量激发assoc_maintenance_thread线程
assoc.c
- 146 static void assoc_start_expand(void) {
- 147 if (started_expanding)
- 148 return;
- 149 started_expanding = true;
- 150 pthread_cond_signal(&maintenance_cond);
- 151 }
在memcached启动时会通过环境变量确定每次通过锁操作mutex_lock(&cache_lock),来移动hash桶的数量。
assoc.c
- 266 int start_assoc_maintenance_thread() {
- 267 int ret;
- 268 char *env = getenv("MEMCACHED_HASH_BULK_MOVE"); //通过环境来确定每次锁操作移动hash桶的数量,默认是1个。
- 269 if (env != NULL) {
- 270 hash_bulk_move = atoi(env);
- 271 if (hash_bulk_move == 0) {
- 272 hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
- 273 }
- 274 }
- 275 if ((ret = pthread_create(&maintenance_tid, NULL,
- 276 assoc_maintenance_thread, NULL)) != 0) {
- 277 fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
- 278 return -1;
- 279 }
- 280 return 0;
- 281 }
assoc_maintenance_thread线程代码:
- 204 static void *assoc_maintenance_thread(void *arg) {
- 205
- 206 while (do_run_maintenance_thread) { //线程开始无限循环
- 207 int ii = 0;
- 208
- 209 /* Lock the cache, and bulk move multiple buckets to the new
- 210 * hash table. */
- 211 item_lock_global(); //获得控制item的锁
- 212 mutex_lock(&cache_lock); //开始锁定LRU的cache_lock
- 213 //开始expanding为false,不执行扩容操作,因为没有达到hash容量最大值
- 214 for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
- // hash_bulk_move默认是1,可以看出默认这个for循环执行1次就退出
- 215 item *it, *next;
- 216 int bucket;
- 217 //处理hash每个桶中每个链表,重新按照新的hashpower,计算它属于哪个桶
- 218 for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
- 219 next = it->h_next;
- 220 //把item放到primary中的合适的位置
- 221 bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
- 222 it->h_next = primary_hashtable[bucket];
- 223 primary_hashtable[bucket] = it;
- 224 }
- 225
- 226 old_hashtable[expand_bucket] = NULL;
- 227
- 228 expand_bucket++;
- //每次处理完成一个桶,桶的index计数加一
- //如果到达最后,都处理迁移完桶中的数据了,重新把expanding标志置为false,释放old_table
- 229 if (expand_bucket == hashsize(hashpower - 1)) {
- 230 expanding = false;
- 231 free(old_hashtable);
- 232 STATS_LOCK();
- 233 stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
- 234 stats.hash_is_expanding = 0;
- 235 STATS_UNLOCK();
- 236 if (settings.verbose > 1)
- 237 fprintf(stderr, "Hash table expansion done\n");
- 238 }
- 239 }
- 241 mutex_unlock(&cache_lock);
- 242 item_unlock_global();//每次迁移完hash_bulk_move个桶就释放了锁,使得对hashtable的操作继续执行。
- 243 //扩容完毕后,使得控制slab平衡的一个线程继续执行,这个线程的分析后面会讲。
- 244 if (!expanding) {
- 245 /* finished expanding. tell all threads to use fine-grained locks */
- 246 switch_item_lock_type(ITEM_LOCK_GRANULAR);
- 247 slabs_rebalancer_resume();
- 248 /* We are done expanding.. just wait for next invocation */
- 249 mutex_lock(&cache_lock);
- 250 started_expanding = false;
- 251 pthread_cond_wait(&maintenance_cond, &cache_lock);
- //使得cachelock在maintenace条件上继续等待,这个不影响其他加锁写item数据的线程的执行
- 252 /* Before doing anything, tell threads to use a global lock */
- 253 mutex_unlock(&cache_lock);
- 254 slabs_rebalancer_pause();
- 255 switch_item_lock_type(ITEM_LOCK_GLOBAL);
- 256 mutex_lock(&cache_lock);
- 257 assoc_expand();
- 258 mutex_unlock(&cache_lock);
- 259 }
- 260 }
- 261 return NULL;
- 262 }
通过上面的代码,个人认为在插入item后,确定容量超出值域就开始扩容,
hash_bulk_move变量的作用就是每次加锁,在拥有锁的过程中,移动复制hash桶的数量,可以试想在写入操作十分频繁的过程中,如果跳高hash_bulk_move这个变量,在复制操作的过程中,
assoc_maintenance_thread线程占有锁的时间就会变大,写入item的时间就会变长。反之如果hash_bulk_move变量的很小,在写入频繁的条件下,写入操作不会受到锁的影响(assoc_maintenance_thread占有锁时间短),但是如果写入的item恰好都是大于expand_bucket的,会导致old_table表很大,后面的复制过程copy
old_table到primary_hashtable的时间也会增长。
阅读(785) | 评论(0) | 转发(0) |