写完memcached集群轻客户端有一段时间了,使用了ketama的第3方一致性hash算法库。这里分析一下它的实现。
1,简介
若我们在后台使用NoSQL集群,必然会涉及到key的分配问题,集群中某台机器宕机时如何key又该如何分配的问题。
若我们用一种简单的方法,n = hash( key)%N来选择n号服务器,一切都运行正常,若再考虑如下的两种情况;
(1) 一个 cache 服务器 m down 掉了(在实际应用中必须要考虑这种情况),这样所有映射到 cache m 的对象都会失效,怎么办,需要把 cache m 从 cache 中移除,这时候 cache 是 N-1 台,映射公式变成了 hash(object)%(N-1) ;
(2) 由于访问加重,需要添加 cache ,这时候 cache 是 N+1 台,映射公式变成了 hash(object)%(N+1) ;
1 和 2 意味着什么?这意味着突然之间几乎所有的 cache 都失效了。对于服务器而言,这是一场灾难,洪水般的访问都会直接冲向后台服务器;
(3) 再来考虑一个问题,由于硬件能力越来越强,你可能想让后面添加的节点多做点活,显然上面的 hash 算法也做不到。
以上三个问题,可以用一致性hash算法来解决。关于一致性hash算法的理论网上很多,这里分析几种一致性hash算法的实现。
2,ketama实现分析
2.1 实现流程介绍
ketama对一致性hash算法的实现思路是:
(1) 通过配置文件,建立一个服务器列表,其形式如:(1.1.1.1:11211, 2.2.2.2:11211,9.8.7.6:11211...)
(2) 对每个服务器列表中的字符串,通过Hash算法,hash成几个无符号型整数。
注意:如何通过hash算法来计算呢?
(3) 把这几个无符号型整数放到一个环上,这个换被称为continuum。(我们可以想象,一个从0到2^32的钟表)
(4) 可以建立一个数据结构,把每个数和服务器的ip地址对应在一起,这样,每个服务器就出现在这个环上的这几个位置上。
注意:这几个数,不能随着服务器的增加和删除而变化,这样才能保证集群增加/删除机器后,以前的那些key都映射到同样的ip地址上。后面将会详细说明怎么做。
(5) 为了把一个key映射到一个服务器上,先要对key做hash,形成一个无符号型整数un,然后在环continuum上查找大于un的下一个数值。若找到,就把key保存到这台服务器上。
(6) 若你的hash(key)值超过continuum上的最大整数值,就直接回饶到continuum环的开始位置。
这样,添加或删除集群中的结点,就只会影响一少部分key的分布。
注意:这里说的会影响一部分key是相对的。其实影响的key的多少,由该ip地址占的权重大小决定的。在ketama的配置文件中,需要指定每个ip地址的权重。权重大的在环上占的点就多。
2.2 源码分析
在github上下载源码后,解压,进入ketama-master/libketama目录。一致性hash算法的实现是在ketama.c文件中。
在该文件中,还用到了共享内存,这里不分析这一部分,只分析一致性hash算法的核心实现部分。
2.2.1 数据结构
// 服务器信息,主要记录服务器的ip地址和权重值
typedef struct
{
char addr[22]; //服务器ip地址
unsigned long memory; // 权重值
} serverinfo;
// 以下数据结构就是continuum环上的结点,换上的每个点其实代表了一个ip地址,该结构把点和ip地址一一对应起来。
// 环上的结点
typedef struct
{
unsigned int point; //在环上的点,数组下标值
char ip[22]; // 对应的ip地址
} mcs;
2.2.2 一致性hash环的创建
该函数是创建continuum的核心函数,它先从配置文件中读取集群服务器ip和端口,以及权重信息。创建continuum环,并把这些服务器信息和环上的数组下标对应起来。
// 其中key是为了访问共享内存而设定的,在使用时可以把共享内存部分去掉。
static int
ketama_create_continuum( key_t key, char* filename )
{
// 若不使用共享内存,可以不管
if (shm_ids == NULL) {
init_shm_id_tracker();
}
// 共享内存相关,用不着时,可以去掉
if (shm_data == NULL) {
init_shm_data_tracker();
}
int shmid;
int* data; /* Pointer to shmem location */
// 该变量来记录共从配置文件中共读取了多少个服务器
unsigned int numservers = 0;
// 该变量是配置文件中所有服务器权重值得总和
unsigned long memory;
// 从配置文件中读取到的服务器信息,包括ip地址,端口,权重值
serverinfo* slist;
// 从配置文件filename中读取服务器信息,把服务器总数保存到变量numservers中,把所有服务器的权重值保存到memory中。
slist = read_server_definitions( filename, &numservers, &memory );
/* Check numservers first; if it is zero then there is no error message
* and we need to set one. */
// 以下几行是检查配置文件内容是否正确
// 若总服务器数量小于1,错误。
if ( numservers < 1 )
{
sprintf( k_error, "No valid server definitions in file %s", filename );
return 0;
}
else if ( slist == 0 ) // 若服务器信息数组为空,错误
{
/* read_server_definitions must've set error message. */
return 0;
}
// 以下代码开始构建continuum环
/* Continuum will hold one mcs for each point on the circle: */
// 平均每台服务器要在这个环上布160个点,这个数组的元素个数就是服务器个数*160。
// 具体多少个点,需要根据事情的服务器权重值进行计算得到。
// 为什么要选择160个点呢?主要是通过md5计算出来的是16个整数,把这个整数分成4等分,每份是4位整数。
// 而每进行一次hash计算,我们可以获得4个点。
mcs continuum[ numservers * 160 ];
unsigned int i, k, cont = 0;
// 遍历所有服务器开始在环上部点
for( i = 0; i < numservers; i++ )
{
// 计算服务器i在所有服务器权重的占比
float pct = (float)slist[i].memory / (float)memory;
// 由于计算一次可以得到4个点,所有对每一台机器来说,总的计算只需要计算40*numservers次。
// 按权重占比进行划分,就是以下的计算得到的次数
unsigned int ks = floorf( pct * 40.0 * (float)numservers );
#ifdef DEBUG
int hpct = floorf( pct * 100.0 );
syslog( LOG_INFO, "Server no. %d: %s (mem: %lu = %u%% or %d of %d)\n",
i, slist[i].addr, slist[i].memory, hpct, ks, numservers * 40 );
#endif
// 计算出总次数,每次可以得到4个点
for( k = 0; k < ks; k++ )
{
/* 40 hashes, 4 numbers per hash = 160 points per server */
char ss[30];
unsigned char digest[16];
// 通过计算hash值来得到下标值,该hash值是字符串:"-n",其中的n是通过权重计算出来的该主机应该部点的总数/4。
sprintf( ss, "%s-%d", slist[i].addr, k );
// 计算其字符串的md5值,该值计算出来后是一个unsigned char [16]的数组,也就是可以保存16个字节
ketama_md5_digest( ss, digest );
/* Use successive 4-bytes from hash as numbers for the points on the circle: */
// 通过对16个字节的每组4个字节进行移位,得到一个0到2^32之间的整数,这样环上的一个结点就准备好了。
int h;
// 共有16个字节,可以处理4次,得到4个点的值
for( h = 0; h < 4; h++ )
{
// 把计算出来的连续4位的数字,进行移位。
// 把第一个数字一道一个整数的最高8位,后面的一次移动次高8位,后面一次补零,这样就得到了一个32位的整数值。移动后
continuum[cont].point = ( digest[3+h*4] << 24 )
| ( digest[2+h*4] << 16 )
| ( digest[1+h*4] << 8 )
| digest[h*4];
// 复制对应的ip地址到该点上
memcpy( continuum[cont].ip, slist[i].addr, 22 );
cont++;
}
}
}
free( slist );
// 以下代码对计算出来的环上点的值进行排序,方便进行查找
// 这里要注意:排序是按照point的值(计算出来的整数值)进行的,也就是说原来的数组下标顺序被打乱了。
/* Sorts in ascending order of "point" */
qsort( (void*) &continuum, cont, sizeof( mcs ), (compfn)ketama_compare );
// 到这里算法的实现就结束了,环上的点(0^32整数范围内)都已经建立起来,每个点都是0到2^32的一个整数和ip地址的结构。
// 这样查找的时候,只是需要hash(key),并在环上找到对应的数的位置,取得该节点的ip地址即可。
... ...
2.2.3 在环上查找元素
* 计算key的hash值的实现
unsigned int ketama_hashi( char* inString )
{
unsigned char digest[16];
// 对key的值做md5计算,得到一个有16个元素的unsigned char数组
ketama_md5_digest( inString, digest );
// 取数组中的前4个字符,并移位,形成一个整数作为hash得到的值返回
return (unsigned int)(( digest[3] << 24 )
| ( digest[2] << 16 )
| ( digest[1] << 8 )
| digest[0] );
}
* 在环上查找相应的结点
mcs* ketama_get_server( char* key, ketama_continuum cont )
{
// 计算key的hash值,并保存到变量h中
unsigned int h = ketama_hashi( key );
// 该变量cont->numpoints是总的数组埋点数
int highp = cont->numpoints;
// 数组结点的值
mcs (*mcsarr)[cont->numpoints] = cont->array;
int lowp = 0, midp;
unsigned int midval, midval1;
// divide and conquer array search to find server with next biggest
// point after what this key hashes to
while ( 1 )
{
// 从数组的中间位置开始找
// 注意此时的数组是按照point的值排好序了
midp = (int)( ( lowp+highp ) / 2 );
// 若中间位置等于最大点数,直接绕回到0位置
if ( midp == cont->numpoints )
return &( (*mcsarr)[0] ); // if at the end, roll back to zeroth
// 取的中间位置的point值
midval = (*mcsarr)[midp].point;
// 再取一个值:若中间位置下标为0,直接返回0,若中间位置的下标不为0,直接返回上一个结点的point值
midval1 = midp == 0 ? 0 : (*mcsarr)[midp-1].point;
// 把h的值和取的两个值point值进行比较,若在这两个point值之间说明h值应该放在较大的那个point值的下标对应的ip地址上
if ( h <= midval && h > midval1 )
return &( (*mcsarr)[midp] );
// 否则继续2分
if ( midval < h )
lowp = midp + 1;
else
highp = midp - 1;
// 若没有找到,直接返回0位置的值,这种情况应该很少
if ( lowp > highp )
return &( (*mcsarr)[0] );
}
}
2.2.4 添加删除机器时会怎样
先说明一下删除机器的情况。机器m1被删除后,以前分配到m1的key需要重新分配,而且最好是均匀分配到现存的机器上。
我们来看看,ketama是否能够做到?
当m1机器宕机后,continuum环需要重构,需要把m1的ip对应的点从continuum环中去掉。
我们来回顾一下环的创建过程:
按每个ip平均160个点,可以计算出总数t。按每个ip的权重值占比和总数t的乘积得到该ip应该在该环上部的点数。若一台机器宕机,那么每台机器的权重占比增加,在该环上部的点数也就相应的增加,当然这个增加也是按每台机器的占比来的,占比多的增加的点数就多,占比少的增加的点数就少。但,每个ip的点数一定是增加的。
创建环上的点值的过程是:
先计算hash值:
for( k = 0; k < ks; k++ ) { //其中ks是每个ip地址对应的总点数
...
sprintf( ss, "%s-%d", slist[i].addr, k );
ketama_md5_digest( ss, digest );
...
}
循环移位hash值: continuum[cont].point = ( digest[3+h*4] << 24 )
| ( digest[2+h*4] << 16 )
| ( digest[1+h*4] << 8 )
| digest[h*4];
由于此时每个ip的占比增加,ks就增加了:
float pct = (float)slist[i].memory / (float)memory; // 此时这个值增加
unsigned int ks = floorf( pct * 40.0 * (float)numservers ); //该值也增加
这样,每个ip地址对应的point值就多了,但以前的point值不会变。依然在这个环上相同的点值上。也就是说把影响平均分摊到现有的各台机器上。
当然,删除的情况和添加的情况相似,都是把影响平均分摊到现有的各个机器上了。
小结:
(1) 环上的点是通过对ip地址加一个整数(形如:-N)作为一个字符串做hash,然后移位得到4个点数。
(2) 排序后,通过2分查找进行查询,效率较高。
(3) 这样,添加ip时,环上以前部的点不会变化,而且把影响分摊到现有的各个ip上。
问题:
这里我也对该算法提出了两点疑问,
问题1:创建环和在环上查找,都是使用的hash值4位取数的办法,那么是否存在查找某个key时,计算的值在环上不存在?当然这里也做了处理(找不到直接返回0号位置的ip地址: return &( (*mcsarr)[0] );),但若这种情况比较多时,误差可能比较大。
通过测试发现,这种情况出现的概率并不大,几乎没有。
问题2:其实当ip地址有变动时,还是又可能使原来的key对应的ip地址有变化,只是这种情况概率比较小?那么能不能使得原来的key对应的ip地址不变化?还有待改进。
代码来源: