Chinaunix首页 | 论坛 | 博客
  • 博客访问: 67881
  • 博文数量: 24
  • 博客积分: 25
  • 博客等级: 民兵
  • 技术积分: 80
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-07 00:11
文章分类

全部博文(24)

文章存档

2014年(10)

2013年(7)

2012年(7)

我的朋友

分类: LINUX

2014-10-23 09:41:46


tracker服务器的选主和心跳机制

概述:
(1)每个tracker服务在启动时会调用tracker_relationship_init函数启动一个线程relationship_thread_entrance。该线程会每隔几秒进行各个tracker的关系确认,若leader发生了变动,或宕机之类,会重新选出leader。

(2) 线程中函数的调用关系如下:
relationship_thread_entrance
    ->relationship_select_leader()                     //没有选出leader,需要调用该函数选出leader
        ->relationship_get_tracker_leader()        //找出最有可能是leader的tracker服务器状态信息,并返回
            ->tracker_mem_get_status()              //发送64号命令,获取tracker服务器的状态信息
        ->relationship_notify_leader_changed() //若leader改变,调用该函数
            ->relationship_notify_next_leader()   // 发送66号通知命令
                ->do_notify_leader_changed(TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER)
            ->relationship_commit_next_leader()  //发送67号命令
                ->do_notify_leader_changed(TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER)
    ->relationship_ping_leader()                       //若已经选出了leader,调用该函数确认leader是否存活
        ->tracker_connect_server()
        ->fdfs_ping_leader()

(3)  选主流程
relationship_select_leader()函数
a) 会调用tracker_mem_get_status()函数进行选主的操作。
b) tracker_mem_get_status函数先会遍历所有的tracker服务器结构,向所有的tracker服务器发起tcp连接,并发送
TRACKER_PROTO_CMD_TRACKER_GET_STATUS命令来获取各个tracker服务器的状态,各个tracker服务器接收到该命令后,会发送自己的状态给该函数(包括该tracker服务器是否是leader的信息),该函数把信息保存到tracker服务器对应的数据结构中。
c) 接下来该函数对各个tracker服务器的状态进行排序(qsort,默认是升序排列),先按照是否是leader进行排序,其次根据运行时间,再次间隔时间,端口等。
d) 排序后的数组的最后一个成员就是leader,取出leader的状态。
e) 若leader是自己,用relationship_notify_leader_changed函数通知各个tracker服务器,若不是自己,把leader的值设置成选出的leader的值。

(4) 选出leader后的通知流程
函数调用关系如下:
relationship_notify_leader_changed()
   ->relationship_notify_next_leader
   ->relationship_commit_next_leader()

若tracker的leader已经改变了,就要调用relationship_notify_leader_changed函数对各个tracker进行通知,该函数又继续调用relationship_notify_next_leader和relationship_commit_next_leader函数,下面分别看看这两个函数做了什么。

relationship_commit_next_leader(): 
(1) 调用do_notify_leader_changed函数发送TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER命令给tracker服务器。
tracker服务器端收到给命令后,调用tracker_deal_notify_next_leader()函数处理相应的请求。

relationship_commit_next_leader():
(1) 调用do_notify_leader_changed函数发送TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER命令给tracker服务器。
tracker服务器接收到该命令请求后,调用tracker_deal_commit_next_leader()函数处理相应请求。


1, 数据结构
//tracker服务器信息
typedef struct
{
int sock;
int port;
char ip_addr[IP_ADDRESS_SIZE];
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
} TrackerServerInfo;

//tracker服务器状态信息
typedef struct {
TrackerServerInfo *pTrackerServer;
int running_time;     //running seconds, more means higher weight
int restart_interval; //restart interval, less mean higher weight
bool if_leader;       //if leader
} TrackerRunningStatus;



2, 各个tracker之间关系的初始化

void main()
{
    ... ...

    if ((result=tracker_relationship_init()) != 0)
    {   
        log_destroy();
        return result;
    }
    ... ...   
}
进行各个tracker之间关系的初始化操作。


3, tracker关系初始化实现

int tracker_relationship_init()
{   
    int result;
    pthread_t tid;
    pthread_attr_t thread_attr;

    //设置线程属性:线程栈的大小。
    //默认值是512K
    if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "init_pthread_attr fail, program exit!", __LINE__);
        return result;
    }
    
    //创建tracker关系处理线程
    if ((result=pthread_create(&tid, &thread_attr, \
            relationship_thread_entrance, NULL)) != 0)
    {   
        logError("file: "__FILE__", line: %d, " \
            "create thread failed, errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
        return result;
    }
    //销毁线程属性变量
    pthread_attr_destroy(&thread_attr);

    return 0;
}

//tracker关系处理线程,实例函数
static void *relationship_thread_entrance(void* arg)
{
#define MAX_SLEEP_SECONDS  10

    int fail_count;
    int sleep_seconds;

    fail_count = 0;
    while (g_continue_flag)
    {
        sleep_seconds = 1;
        //tracker的服务器列表不为空
        if (g_tracker_servers.servers != NULL)
        {
            //若tracker服务器还没有选leader,需要现在就选出leader
            if (g_tracker_servers.leader_index < 0)
            {
                //从tracker中选出leader
                if (relationship_select_leader() != 0)
                {
                    sleep_seconds = 1 + (int)((double)rand()
                    * (double)MAX_SLEEP_SECONDS / RAND_MAX);
                }
            }
            else
            {
                //若tracker已经选出了leader,ping一下leader看leader是否存活
                if (relationship_ping_leader() == 0)
                {
                    fail_count = 0;
                }
                else
                {
                    //若tracker的leader没有响应,有可能leader已经死掉,
                    //失败次数大于3次,把tracker的leader编号重设为-1,下一次循环会再次选出leader
                    fail_count++;
                    if (fail_count >= 3)
                    {
                        g_tracker_servers.leader_index = -1;
                    }
                }
            }
        }

        //最终的tracker服务器不为空
        if (g_last_tracker_servers != NULL)
        {
            tracker_mem_file_lock();

            free(g_last_tracker_servers);
            g_last_tracker_servers = NULL;

            tracker_mem_file_unlock();
        }

        //睡眠几秒,再次循环,或者和leader通讯,或者继续选leader
        sleep(sleep_seconds);
    }

    return NULL;
}


4, leader的选取

static int relationship_select_leader()
{
int result;
TrackerRunningStatus trackerStatus;

        //若服务器数量小于0
if (g_tracker_servers.server_count <= 0)
{
return 0;
}

logInfo("file: "__FILE__", line: %d, " \
"selecting leader...", __LINE__);
        
        //选取leader
if ((result=relationship_get_tracker_leader(&trackerStatus)) != 0)
{
return result;
}

        //若选出来的leader和自己本地ip地址和端口相等
        //由自己负责通知其他各个tracker服务器,leader是谁
if (trackerStatus.pTrackerServer->port == g_server_port && \
is_local_host_ip(trackerStatus.pTrackerServer->ip_addr))
{
if ((result=relationship_notify_leader_changed( \
trackerStatus.pTrackerServer)) != 0)
{
return result;
}

logInfo("file: "__FILE__", line: %d, " \
"I am the new tracker leader %s:%d", \
__LINE__, trackerStatus.pTrackerServer->ip_addr, \
trackerStatus.pTrackerServer->port);
                //寻找和设置trunk服务器
tracker_mem_find_trunk_servers();
}
else  //选出的leader不是我
{
                //而且已经选出leader了
if (trackerStatus.if_leader)
{
                        //获取到leader的编号
g_tracker_servers.leader_index = \
trackerStatus.pTrackerServer - \
g_tracker_servers.servers;
                        //若获取到的leader的编号不正确,则把leader编号设置成-1
if (g_tracker_servers.leader_index < 0 || \
g_tracker_servers.leader_index >= \
g_tracker_servers.server_count)
{
g_tracker_servers.leader_index = -1;
return EINVAL;
}

logInfo("file: "__FILE__", line: %d, " \
"the tracker leader %s:%d", __LINE__, \
trackerStatus.pTrackerServer->ip_addr, \
trackerStatus.pTrackerServer->port);
}
else    //否则,等待leader的通知
{
logDebug("file: "__FILE__", line: %d, " \
"waiting for leader notify", __LINE__);
return ENOENT;
}
}

return 0;
}

//该函数遍历每个tracker服务器,并获取每个tracker服务器的状态信息,包括是否是leader的标记。
//并对服务器状态信息进行排序,找出最有可能是leader的状态保存在参数pTrackerStatus中。
static int relationship_get_tracker_leader(TrackerRunningStatus *pTrackerStatus)
{
TrackerServerInfo *pTrackerServer;
TrackerServerInfo *pTrackerEnd;
TrackerRunningStatus *pStatus;
TrackerRunningStatus trackerStatus[FDFS_MAX_TRACKERS];
int count;
int result;
int r;
int i;

memset(pTrackerStatus, 0, sizeof(TrackerRunningStatus));
pStatus = trackerStatus;
result = 0;
pTrackerEnd = g_tracker_servers.servers + g_tracker_servers.server_count;
        //遍历所有的tracker服务器
for (pTrackerServer=g_tracker_servers.servers; \
pTrackerServer
{
                //获取到每个tracker服务器的基本信息
pStatus->pTrackerServer = pTrackerServer;
                //获取每个tracker服务器的状态信息
r = tracker_mem_get_status(pTrackerServer, pStatus);
if (r == 0)
{
pStatus++;
}
else if (r != ENOENT)
{
result = r;
}
}

        //获取tracker服务器的个数
count = pStatus - trackerStatus;
        //tracker服务器的个数为0,直接返回结构
if (count == 0)
{
return result == 0 ? ENOENT : result;
}

        //多个traker服务器,需要按照升序排列tracker服务器列表
qsort(trackerStatus, count, sizeof(TrackerRunningStatus), \
relationship_cmp_tracker_status);

for (i=0; i
{
logDebug("file: "__FILE__", line: %d, " \
"%s:%d if_leader: %d, running time: %d, " \
"restart interval: %d", __LINE__, \
trackerStatus[i].pTrackerServer->ip_addr, \
trackerStatus[i].pTrackerServer->port, \
trackerStatus[i].if_leader, \
trackerStatus[i].running_time, \
trackerStatus[i].restart_interval);
}
        //获取到最后一个tracker服务器的信息,也就是leader的运行状态信息
memcpy(pTrackerStatus, trackerStatus + (count - 1), \
sizeof(TrackerRunningStatus));
return 0;
}


//按照tracker服务器信息排序
static int relationship_cmp_tracker_status(const void *p1, const void *p2)
{
TrackerRunningStatus *pStatus1;
TrackerRunningStatus *pStatus2;
TrackerServerInfo *pTrackerServer1;
TrackerServerInfo *pTrackerServer2;
int sub;

pStatus1 = (TrackerRunningStatus *)p1;
pStatus2 = (TrackerRunningStatus *)p2;
        //leader排在最后面
sub = pStatus1->if_leader - pStatus2->if_leader;
if (sub != 0)
{
return sub;
}
        //其次是运行时间长的在后面
sub = pStatus1->running_time - pStatus2->running_time;
if (sub != 0)
{
return sub;
}
        //其次是重启间隔时间长的在后面
sub = pStatus2->restart_interval - pStatus1->restart_interval;
if (sub != 0)
{
return sub;
}

pTrackerServer1 = pStatus1->pTrackerServer;
pTrackerServer2 = pStatus2->pTrackerServer;
        //ip地址不相等的
sub = strcmp(pTrackerServer1->ip_addr, pTrackerServer2->ip_addr);
if (sub != 0)
{
return sub;
}
        //端口大的在后面
return pTrackerServer1->port - pTrackerServer2->port;
}

//获取参数pTrackerServer对应的tracker服务器的状态信息,并保存到参数pStatus中。
int tracker_mem_get_status(TrackerServerInfo *pTrackerServer, \
TrackerRunningStatus *pStatus)
{
char in_buff[1 + 2 * FDFS_PROTO_PKG_LEN_SIZE];
TrackerHeader header;
char *pInBuff;
int64_t in_bytes;
int result;

pTrackerServer->sock = -1;
        //连接每个tracker服务器
if ((result=tracker_connect_server(pTrackerServer)) != 0)
{
return result;
}

do
{
memset(&header, 0, sizeof(header));
        //发送获取tracker服务器状态命令
header.cmd = TRACKER_PROTO_CMD_TRACKER_GET_STATUS;
if ((result=tcpsenddata_nb(pTrackerServer->sock, &header, \
sizeof(header), g_fdfs_network_timeout)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"send data to tracker server %s:%d fail, " \
"errno: %d, error info: %s", __LINE__, \
pTrackerServer->ip_addr, \
pTrackerServer->port, \
result, STRERROR(result));

result = (result == ENOENT ? EACCES : result);
break;
}

pInBuff = in_buff;
        //接受服务器的返回信息
result = fdfs_recv_response(pTrackerServer, &pInBuff, \
sizeof(in_buff), &in_bytes);
        //有错误,关闭socket,跳出
if (result != 0)
{
break;
}

        //返回值的信息大小不正确
if (in_bytes != sizeof(in_buff))
{
logError("file: "__FILE__", line: %d, " \
"tracker server %s:%d response data " \
"length: "INT64_PRINTF_FORMAT" is invalid, " \
"expect length: %d.", __LINE__, \
pTrackerServer->ip_addr, pTrackerServer->port, \
in_bytes, (int)sizeof(in_buff));
result = EINVAL;
break;
}

        //获取tracker状态信息
pStatus->if_leader = *in_buff;  //是否是leader
pStatus->running_time = buff2long(in_buff + 1); //运行时间
pStatus->restart_interval = buff2long(in_buff + 1 + \
FDFS_PROTO_PKG_LEN_SIZE);   //重启间隔时间

} while (0);

        //已获取到tracker服务器信息,关闭socket,并重置sock的值
close(pTrackerServer->sock);
pTrackerServer->sock = -1;

return result;
}

//tracker关系通知函数
static int relationship_notify_leader_changed(TrackerServerInfo *pLeader)
{
TrackerServerInfo *pTrackerServer;
TrackerServerInfo *pTrackerEnd;
int result;
bool bConnectFail;
int success_count;

result = ENOENT;
pTrackerEnd = g_tracker_servers.servers + g_tracker_servers.server_count;
success_count = 0;
        //遍历每个tracker服务器,发送
        // TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER类型的通告
for (pTrackerServer=g_tracker_servers.servers; \
pTrackerServer
{
if ((result=relationship_notify_next_leader(pTrackerServer, \
pLeader, &bConnectFail)) != 0)
{
if (!bConnectFail)
{
return result;
}
}
else
{
success_count++;
}
}

if (success_count == 0)
{
return result;
}

success_count = 0;
        //遍历tracker服务器
        //发送TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER类型通告
for (pTrackerServer=g_tracker_servers.servers; \
pTrackerServer
{
if ((result=relationship_commit_next_leader(pTrackerServer, \
pLeader, &bConnectFail)) != 0)
{
if (!bConnectFail)
{
return result;
}
}
else
{
success_count++;
}
}
if (success_count == 0)
{
return result;
}

return 0;
}


//在选主流程中,查找trunk存储服务器
void tracker_mem_find_trunk_servers()
{
FDFSGroupInfo **ppGroup;
FDFSGroupInfo **ppGroupEnd;

        //若自己不是leader或者没有使用trunk文件,直接返回
if (!(g_if_leader_self && g_if_use_trunk_file))
{
return;
}

pthread_mutex_lock(&mem_thread_lock);
ppGroupEnd = g_groups.groups + g_groups.count;
        //遍历各个trunk组
for (ppGroup=g_groups.groups; ppGroup
{
if ((*ppGroup)->pTrunkServer == NULL)
{
tracker_mem_find_trunk_server(*ppGroup, true);
}
}
pthread_mutex_unlock(&mem_thread_lock);
}


//向tracker服务器发送,leader改变的命令,共有两个命令:66号和67号。
static int do_notify_leader_changed(TrackerServerInfo *pTrackerServer, \
TrackerServerInfo *pLeader, const char cmd, bool *bConnectFail)
{
char out_buff[sizeof(TrackerHeader) + FDFS_PROTO_IP_PORT_SIZE];
char in_buff[1];
TrackerHeader *pHeader;
char *pInBuff;
int64_t in_bytes;
int result;

pTrackerServer->sock = -1;
//和tracker server建立tcp连接
if ((result=tracker_connect_server(pTrackerServer)) != 0)
{
*bConnectFail = true;
return result;
}
*bConnectFail = false;

//tcp建立成功,继续
do
{
//发送缓冲区:tracker头+ip:port
memset(out_buff, 0, sizeof(out_buff));
//tracker头
pHeader = (TrackerHeader *)out_buff;
//填写tracker命令
pHeader->cmd = cmd;
//填写ip和port值,是一个字符串"ip:port"
sprintf(out_buff + sizeof(TrackerHeader), "%s:%d", \
pLeader->ip_addr, pLeader->port);
//填写数据包长度
long2buff(FDFS_PROTO_IP_PORT_SIZE, pHeader->pkg_len);
//发送数据
if ((result=tcpsenddata_nb(pTrackerServer->sock, out_buff, \
sizeof(out_buff), g_fdfs_network_timeout)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"send data to tracker server %s:%d fail, " \
"errno: %d, error info: %s", __LINE__, \
pTrackerServer->ip_addr, \
pTrackerServer->port, \
result, STRERROR(result));

result = (result == ENOENT ? EACCES : result);
break;
}

//接收缓冲区
pInBuff = in_buff;
        //获取返回信息,这里buffsize是0
result = fdfs_recv_response(pTrackerServer, &pInBuff, \
0, &in_bytes); 
if (result != 0)
{
break;
}

if (in_bytes != 0)
{
logError("file: "__FILE__", line: %d, " \
"tracker server %s:%d response data " \
"length: "INT64_PRINTF_FORMAT" is invalid, " \
"expect length: %d.", __LINE__, \
pTrackerServer->ip_addr, pTrackerServer->port, \
in_bytes, 0);
result = EINVAL;
break;
}
} while (0);

close(pTrackerServer->sock);
pTrackerServer->sock = -1;

return result;
}


5,tracker服务器端接收到这两个命令的处理方式
5.1 命令TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER
    处理函数tracker_deal_notify_next_leader

//处理leader改变的消息
//trackerheader + ip:port
static int tracker_deal_notify_next_leader(struct fast_task_info *pTask)
{
TrackerClientInfo *pClientInfo;
char *pIpAndPort;
char *ipAndPort[2];
TrackerServerInfo leader;
int server_index;
pClientInfo = (TrackerClientInfo *)pTask->arg;
//数据包长度不对
if (pTask->length - sizeof(TrackerHeader) != FDFS_PROTO_IP_PORT_SIZE)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip addr: %s, " \
"package size "PKG_LEN_PRINTF_FORMAT" " \
"is not correct, expect length: %d", __LINE__, \
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID, \
pTask->client_ip, pTask->length - \
(int)sizeof(TrackerHeader), FDFS_PROTO_IP_PORT_SIZE);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}

//数据包结尾标志置为'\0'
*(pTask->data + pTask->length) = '\0';
//获取leader的port和ip地址信息
pIpAndPort = pTask->data + sizeof(TrackerHeader);
if (splitEx(pIpAndPort, ':', ipAndPort, 2) != 2)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, invalid ip and port: %s", \
__LINE__, pTask->client_ip, pIpAndPort);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}

pTask->length = sizeof(TrackerHeader);
strcpy(leader.ip_addr, ipAndPort[0]);
leader.port = atoi(ipAndPort[1]);
//通过ip地址和port获取leader的编号
server_index = fdfs_get_tracker_leader_index_ex(&g_tracker_servers, \
leader.ip_addr, leader.port);
if (server_index < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, leader %s:%d not exist", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return ENOENT;
}

//若原先的leader是自己,但本次的leader的port和ip地址和自己不同。
//说明自己不再是leader了。
if (g_if_leader_self && (leader.port != g_server_port || \
!is_local_host_ip(leader.ip_addr)))
{
//自己已不再是leader重置该标志
g_if_leader_self = false;
//重置leader编号
g_tracker_servers.leader_index = -1;
//leader改变次数+1
g_tracker_leader_chg_count++;

logError("file: "__FILE__", line: %d, " \
"client ip: %s, two leader occur, " \
"new leader is %s:%d", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return EINVAL;
}

//重置leader编号
g_next_leader_index = server_index;
return 0;
}

5.2 命令TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER
处理函数tracker_deal_commit_next_leader

//确认tracker的leader的值,并重置自己的状态
static int tracker_deal_commit_next_leader(struct fast_task_info *pTask)
{
TrackerClientInfo *pClientInfo;
char *pIpAndPort;
char *ipAndPort[2];
TrackerServerInfo leader;
int server_index;
pClientInfo = (TrackerClientInfo *)pTask->arg;
//数据长度不正确
if (pTask->length - sizeof(TrackerHeader) != FDFS_PROTO_IP_PORT_SIZE)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip addr: %s, " \
"package size "PKG_LEN_PRINTF_FORMAT" " \
"is not correct, expect length: %d", __LINE__, \
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID, \
pTask->client_ip, pTask->length - \
(int)sizeof(TrackerHeader), FDFS_PROTO_IP_PORT_SIZE);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}

*(pTask->data + pTask->length) = '\0';
pIpAndPort = pTask->data + sizeof(TrackerHeader);
if (splitEx(pIpAndPort, ':', ipAndPort, 2) != 2)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, invalid ip and port: %s", \
__LINE__, pTask->client_ip, pIpAndPort);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}

pTask->length = sizeof(TrackerHeader);
strcpy(leader.ip_addr, ipAndPort[0]);
leader.port = atoi(ipAndPort[1]);
server_index = fdfs_get_tracker_leader_index_ex(&g_tracker_servers, \
leader.ip_addr, leader.port);
if (server_index < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, leader %s:%d not exist", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return ENOENT;
}
//若获取到的server_index和以前记录的不同,说明leader不对
//在接收第一个包时,tracker_deal_notify_next_leader函数中已设置成一样
//而这里不一样,说明有错误。
if (server_index != g_next_leader_index)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, can't commit leader %s:%d", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return EINVAL;
}

//重置server_index
g_tracker_servers.leader_index = server_index;
//若leader是自己
if (leader.port == g_server_port && is_local_host_ip(leader.ip_addr))
{
//设置g_if_leader_self值为true
g_if_leader_self = true;
g_tracker_leader_chg_count++;
}
else
{
logInfo("file: "__FILE__", line: %d, " \
"the tracker leader is %s:%d", __LINE__, \
leader.ip_addr, leader.port);
}

return 0;
}


6, leader已存在的ping消息流
relationship_ping_leader函数

//向tracker的leader发送Ping消息,并获取storage的组信息。
static int fdfs_ping_leader(TrackerServerInfo *pTrackerServer)
{
TrackerHeader header;
int result;
int success_count;
int64_t in_bytes;
char in_buff[(FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE) * \
FDFS_MAX_GROUPS];
char *pInBuff;
char *p;
char *pEnd;
FDFSGroupInfo *pGroup;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char trunk_server_ip[IP_ADDRESS_SIZE];

memset(&header, 0, sizeof(header));
//填写命令关键字
header.cmd = TRACKER_PROTO_CMD_TRACKER_PING_LEADER;
//向leader发送命令
result = tcpsenddata_nb(pTrackerServer->sock, &header, \
sizeof(header), g_fdfs_network_timeout);
if(result != 0)
{
logError("file: "__FILE__", line: %d, " \
"tracker server ip: %s, send data fail, " \
"errno: %d, error info: %s", \
__LINE__, pTrackerServer->ip_addr, \
result, STRERROR(result));
return result;
}

pInBuff = in_buff;
//接受leader的反馈信息
if ((result=fdfs_recv_response(pTrackerServer, &pInBuff, \
sizeof(in_buff), &in_bytes)) != 0)
{
return result;
}

//若没有反馈信息直接返回0
if (in_bytes == 0)
{
return 0;
}
else if (in_bytes % (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE) != 0)
{
logError("file: "__FILE__", line: %d, " \
"tracker server ip: %s, invalid body length: " \
INT64_PRINTF_FORMAT, __LINE__, \
pTrackerServer->ip_addr, in_bytes);
return EINVAL;
}

success_count = 0;
memset(group_name, 0, sizeof(group_name));
memset(trunk_server_ip, 0, sizeof(trunk_server_ip));

pEnd = in_buff + in_bytes;
//leader反馈的是组信息
for (p=in_buff; p
{
//保存组名
memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN);
//保存trunk服务器信息
memcpy(trunk_server_ip, p + FDFS_GROUP_NAME_MAX_LEN, \
IP_ADDRESS_SIZE - 1);

pGroup = tracker_mem_get_group(group_name);
if (pGroup == NULL)
{
logWarning("file: "__FILE__", line: %d, " \
"tracker server ip: %s, group: %s not exists", \
__LINE__, pTrackerServer->ip_addr, group_name);
continue;
}

if (*trunk_server_ip == '\0')
{
*(pGroup->last_trunk_server_ip) = '\0';
pGroup->pTrunkServer = NULL;
success_count++;
continue;
}
//获取trunk服务器信息
pGroup->pTrunkServer = tracker_mem_get_storage(pGroup, \
trunk_server_ip);
if (pGroup->pTrunkServer == NULL)
{
logWarning("file: "__FILE__", line: %d, " \
"tracker server ip: %s, group: %s, " \
"trunk server: %s not exists", \
__LINE__, pTrackerServer->ip_addr, \
group_name, trunk_server_ip);
}
snprintf(pGroup->last_trunk_server_ip, sizeof( \
pGroup->last_trunk_server_ip), "%s", trunk_server_ip);
success_count++;
}
//保存组信息到文件中
if (success_count > 0)
{
tracker_save_groups();

}

return 0;
}

//tracker服务处理TRACKER_PROTO_CMD_TRACKER_PING_LEADER命令的函数
tracker_deal_ping_leader()

static int tracker_deal_ping_leader(struct fast_task_info *pTask)
{
FDFSGroupInfo **ppGroup;
FDFSGroupInfo **ppEnd;
int body_len;
char *p;
TrackerClientInfo *pClientInfo;
pClientInfo = (TrackerClientInfo *)pTask->arg;
//判断数据包长度是否合法
if (pTask->length - sizeof(TrackerHeader) != 0)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
PKG_LEN_PRINTF_FORMAT" is not correct, " \
"expect length 0", __LINE__, \
TRACKER_PROTO_CMD_TRACKER_PING_LEADER, \
pTask->client_ip, \
pTask->length - (int)sizeof(TrackerHeader));
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
//若leader不是自己,直接返回错误
if (!g_if_leader_self)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, i am not the leader!", \
__LINE__, TRACKER_PROTO_CMD_TRACKER_PING_LEADER, \
pTask->client_ip);
pTask->length = sizeof(TrackerHeader);
return EOPNOTSUPP;
}

if (pClientInfo->chg_count.trunk_server == g_trunk_server_chg_count)
{
pTask->length = sizeof(TrackerHeader);
return 0;
}

body_len = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE) * g_groups.count;
if (body_len + sizeof(TrackerHeader) > pTask->size)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, " \
"exceeds max package size: %d!", \
__LINE__, TRACKER_PROTO_CMD_TRACKER_PING_LEADER, \
pTask->client_ip, pTask->size);
pTask->length = sizeof(TrackerHeader);
return ENOSPC;
}
//p指向数据包的数据部分
p = pTask->data + sizeof(TrackerHeader);
memset(p, 0, body_len);
        //准备发送组信息
ppEnd = g_groups.sorted_groups + g_groups.count;
for (ppGroup=g_groups.sorted_groups; ppGroup
{
memcpy(p, (*ppGroup)->group_name, FDFS_GROUP_NAME_MAX_LEN);
p += FDFS_GROUP_NAME_MAX_LEN;
//若该组的trunk服务器不为空
if ((*ppGroup)->pTrunkServer != NULL)
{
//汇报trunk服务器的ip地址
memcpy(p, (*ppGroup)->pTrunkServer->ip_addr, IP_ADDRESS_SIZE);
}
p += IP_ADDRESS_SIZE;
}

pTask->length = p - pTask->data;
pClientInfo->chg_count.trunk_server = g_trunk_server_chg_count;

return 0;
}


阅读(1798) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~