assoc.h/assoc.c文件实现的是item的hash.
memcached数据管理是以item为单位的,为了便于快速地set,get,replace等操作,使用来Hash来进行关联(assoc)管理。 assoc.c文件主要是封装来将一个item指针链接到Hash表,从Hash表移除等操作。
一. 程序接口
/* associative array */
void assoc_init(void);
item *assoc_find(const char *key, const size_t nkey);
int assoc_insert(item *item);
void assoc_delete(const char *key, const size_t nkey);
void do_assoc_move_next_bucket(void);
int start_assoc_maintenance_thread(void);
void stop_assoc_maintenance_thread(void);
|
二. 核心代码注释和分析
1. 相关数据结构
static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
typedef unsigned long int ub4; /* unsigned 4-byte quantities */
typedef unsigned char ub1; /* unsigned 1-byte quantities */
/* how many powers of 2's worth of buckets we use */
static unsigned int hashpower = 16;
#define hashsize(n) ((ub4)1<<(n))
#define hashmask(n) (hashsize(n)-1)
/* Main hash table. This is where we look except during expansion. */
static item** primary_hashtable = 0;
/*
* Previous hash table. During expansion, we look here for keys that haven't
* been moved over to the primary yet.
*/
static item** old_hashtable = 0;
/* Number of items in the hash table. */
static unsigned int hash_items = 0;
/* Flag: Are we in the middle of expanding now? */
static bool expanding = false;
/*
* During expansion we migrate values with bucket granularity; this is how
* far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1.
*/
static unsigned int expand_bucket = 0;
//注释:
1. 所有item都是链接到一个hash表上的,处理hash冲突的方式为链式Hash,即将映射到同一个Hash bucket上的item组成一个链表。
2. hash表桶的个数刚开始是2^16=65535个,但是后来随着item的增长,需要进行扩容(expand) 这个是时候的核心实现是新建一个新的primary_hashtable, 原来的hash表变为old_hashtable, 然后每次将old_hashtable上的bucket里的item移动到primary_hastable上。 而进行这个操作的就是 maintenance_thread 。
|
2. assoc_init
void assoc_init(void) //memcached.c的main函数中调用
{ //hashpower 刚开始为16, hashtable中存储的是pointer, 而item存储在chunk中
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
}
|
3. assoc_expand
/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) { //以成倍的方式增长hash buckets
old_hashtable = primary_hashtable;
primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
expanding = true; //进入expanding状态,合并了old_hashtable后状态结束
expand_bucket = 0; //当前迁移的bucket的个数
pthread_cond_signal(&maintenance_cond); //向 maintenance_thread发信号
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}
|
4. maintenance_thread
static volatile int do_run_maintenance_thread = 1;
#define DEFAULT_HASH_BULK_MOVE 1
int hash_bulk_move = DEFAULT_HASH_BULK_MOVE; //每次移动一个bucket
static void *assoc_maintenance_thread(void *arg) {
while (do_run_maintenance_thread) {
int ii = 0;
/* Lock the cache, and bulk move multiple buckets to the new
* hash table. */
pthread_mutex_lock(&cache_lock);
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { //hash_bulk_move=1
item *it, *next;
int bucket;
//依次move该bucket链表的每一个item_pointer
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
//重新hash到新的hashtable中。
bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
old_hashtable[expand_bucket] = NULL;
expand_bucket++; //增加move bucket的个数(下标)
if (expand_bucket == hashsize(hashpower - 1)) {
expanding = false;
free(old_hashtable);
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}
}
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
pthread_cond_wait(&maintenance_cond, &cache_lock);
}
pthread_mutex_unlock(&cache_lock);
}
return NULL;
}
|
5. assoc_find
//assoc_find需要注意hashtable扩容后的情况
item *assoc_find(const char *key, const size_t nkey) {
uint32_t hv = hash(key, nkey, 0);
item *it;
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{ //出入bucket迁移过程中,且要找的item还在尚为move到primary的old _hashtable 中
it = old_hashtable[oldbucket];
} else { //否则上primary中找
it = primary_hashtable[hv & hashmask(hashpower)];
}
item *ret = NULL;
int depth = 0;
while (it) { //遍历该bucket的所有item,比较出要找的那个key
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}
|
6. assoc_insert
/* Note: this isn't an assoc_update. The key must not already exist to call this */
int assoc_insert(item *it) {
uint32_t hv;
unsigned int oldbucket;
assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */
hv = hash(ITEM_key(it), it->nkey, 0);
if (expanding && //查找是在那个hashtable中
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{ //insert到old_hastable中
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else { //insert到primary_hashtable中
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}
hash_items++;
// hashtable扩容的时机:item数据>hashsize的3/2倍时。
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_expand();
}
MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}
|
7. assoc_delete
void assoc_delete(const char *key, const size_t nkey) {
item **before = _hashitem_before(key, nkey); //找出该item在bucket中的前一个item
if (*before) {
item *nxt;
hash_items--;
/* The DTrace probe cannot be triggered as the last instruction
* due to possible tail-optimization by the compiler
*/
MEMCACHED_ASSOC_DELETE(key, nkey, hash_items);
nxt = (*before)->h_next; //其实也就是链表中的prev = prev->next
(*before)->h_next = 0; /* probably pointless, but whatever. */
*before = nxt;
return;
}
/* Note: we never actually get here. the callers don't delete things
they can't find. */
assert(*before != 0);
}
|
阅读(954) | 评论(0) | 转发(0) |