tracker服务器的选主和心跳机制
概述:
(1)每个tracker服务在启动时会调用tracker_relationship_init函数启动一个线程relationship_thread_entrance。该线程会每隔几秒进行各个tracker的关系确认,若leader发生了变动,或宕机之类,会重新选出leader。
(2) 线程中函数的调用关系如下:
relationship_thread_entrance
->relationship_select_leader() //没有选出leader,需要调用该函数选出leader
->relationship_get_tracker_leader() //找出最有可能是leader的tracker服务器状态信息,并返回
->tracker_mem_get_status() //发送64号命令,获取tracker服务器的状态信息
->relationship_notify_leader_changed() //若leader改变,调用该函数
->relationship_notify_next_leader() // 发送66号通知命令
->do_notify_leader_changed(TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER)
->relationship_commit_next_leader() //发送67号命令
->do_notify_leader_changed(TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER)
->relationship_ping_leader() //若已经选出了leader,调用该函数确认leader是否存活
->tracker_connect_server()
->fdfs_ping_leader()
(3) 选主流程
relationship_select_leader()函数
a) 会调用tracker_mem_get_status()函数进行选主的操作。
b) tracker_mem_get_status函数先会遍历所有的tracker服务器结构,向所有的tracker服务器发起tcp连接,并发送
TRACKER_PROTO_CMD_TRACKER_GET_STATUS命令来获取各个tracker服务器的状态,各个tracker服务器接收到该命令后,会发送自己的状态给该函数(包括该tracker服务器是否是leader的信息),该函数把信息保存到tracker服务器对应的数据结构中。
c) 接下来该函数对各个tracker服务器的状态进行排序(qsort,默认是升序排列),先按照是否是leader进行排序,其次根据运行时间,再次间隔时间,端口等。
d) 排序后的数组的最后一个成员就是leader,取出leader的状态。
e) 若leader是自己,用relationship_notify_leader_changed函数通知各个tracker服务器,若不是自己,把leader的值设置成选出的leader的值。
(4) 选出leader后的通知流程
函数调用关系如下:
relationship_notify_leader_changed()
->relationship_notify_next_leader
->relationship_commit_next_leader()
若tracker的leader已经改变了,就要调用relationship_notify_leader_changed函数对各个tracker进行通知,该函数又继续调用relationship_notify_next_leader和relationship_commit_next_leader函数,下面分别看看这两个函数做了什么。
relationship_commit_next_leader():
(1) 调用do_notify_leader_changed函数发送TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER命令给tracker服务器。
tracker服务器端收到给命令后,调用tracker_deal_notify_next_leader()函数处理相应的请求。
relationship_commit_next_leader():
(1) 调用do_notify_leader_changed函数发送TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER命令给tracker服务器。
tracker服务器接收到该命令请求后,调用tracker_deal_commit_next_leader()函数处理相应请求。
1, 数据结构
//tracker服务器信息
typedef struct
{
int sock;
int port;
char ip_addr[IP_ADDRESS_SIZE];
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
} TrackerServerInfo;
//tracker服务器状态信息
typedef struct {
TrackerServerInfo *pTrackerServer;
int running_time; //running seconds, more means higher weight
int restart_interval; //restart interval, less mean higher weight
bool if_leader; //if leader
} TrackerRunningStatus;
2, 各个tracker之间关系的初始化
void main()
{
... ...
if ((result=tracker_relationship_init()) != 0)
{
log_destroy();
return result;
}
... ...
}
进行各个tracker之间关系的初始化操作。
3, tracker关系初始化实现
int tracker_relationship_init()
{
int result;
pthread_t tid;
pthread_attr_t thread_attr;
//设置线程属性:线程栈的大小。
//默认值是512K
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_attr fail, program exit!", __LINE__);
return result;
}
//创建tracker关系处理线程
if ((result=pthread_create(&tid, &thread_attr, \
relationship_thread_entrance, NULL)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
//销毁线程属性变量
pthread_attr_destroy(&thread_attr);
return 0;
}
//tracker关系处理线程,实例函数
static void *relationship_thread_entrance(void* arg)
{
#define MAX_SLEEP_SECONDS 10
int fail_count;
int sleep_seconds;
fail_count = 0;
while (g_continue_flag)
{
sleep_seconds = 1;
//tracker的服务器列表不为空
if (g_tracker_servers.servers != NULL)
{
//若tracker服务器还没有选leader,需要现在就选出leader
if (g_tracker_servers.leader_index < 0)
{
//从tracker中选出leader
if (relationship_select_leader() != 0)
{
sleep_seconds = 1 + (int)((double)rand()
* (double)MAX_SLEEP_SECONDS / RAND_MAX);
}
}
else
{
//若tracker已经选出了leader,ping一下leader看leader是否存活
if (relationship_ping_leader() == 0)
{
fail_count = 0;
}
else
{
//若tracker的leader没有响应,有可能leader已经死掉,
//失败次数大于3次,把tracker的leader编号重设为-1,下一次循环会再次选出leader
fail_count++;
if (fail_count >= 3)
{
g_tracker_servers.leader_index = -1;
}
}
}
}
//最终的tracker服务器不为空
if (g_last_tracker_servers != NULL)
{
tracker_mem_file_lock();
free(g_last_tracker_servers);
g_last_tracker_servers = NULL;
tracker_mem_file_unlock();
}
//睡眠几秒,再次循环,或者和leader通讯,或者继续选leader
sleep(sleep_seconds);
}
return NULL;
}
4, leader的选取
static int relationship_select_leader()
{
int result;
TrackerRunningStatus trackerStatus;
//若服务器数量小于0
if (g_tracker_servers.server_count <= 0)
{
return 0;
}
logInfo("file: "__FILE__", line: %d, " \
"selecting leader...", __LINE__);
//选取leader
if ((result=relationship_get_tracker_leader(&trackerStatus)) != 0)
{
return result;
}
//若选出来的leader和自己本地ip地址和端口相等
//由自己负责通知其他各个tracker服务器,leader是谁
if (trackerStatus.pTrackerServer->port == g_server_port && \
is_local_host_ip(trackerStatus.pTrackerServer->ip_addr))
{
if ((result=relationship_notify_leader_changed( \
trackerStatus.pTrackerServer)) != 0)
{
return result;
}
logInfo("file: "__FILE__", line: %d, " \
"I am the new tracker leader %s:%d", \
__LINE__, trackerStatus.pTrackerServer->ip_addr, \
trackerStatus.pTrackerServer->port);
//寻找和设置trunk服务器
tracker_mem_find_trunk_servers();
}
else //选出的leader不是我
{
//而且已经选出leader了
if (trackerStatus.if_leader)
{
//获取到leader的编号
g_tracker_servers.leader_index = \
trackerStatus.pTrackerServer - \
g_tracker_servers.servers;
//若获取到的leader的编号不正确,则把leader编号设置成-1
if (g_tracker_servers.leader_index < 0 || \
g_tracker_servers.leader_index >= \
g_tracker_servers.server_count)
{
g_tracker_servers.leader_index = -1;
return EINVAL;
}
logInfo("file: "__FILE__", line: %d, " \
"the tracker leader %s:%d", __LINE__, \
trackerStatus.pTrackerServer->ip_addr, \
trackerStatus.pTrackerServer->port);
}
else //否则,等待leader的通知
{
logDebug("file: "__FILE__", line: %d, " \
"waiting for leader notify", __LINE__);
return ENOENT;
}
}
return 0;
}
//该函数遍历每个tracker服务器,并获取每个tracker服务器的状态信息,包括是否是leader的标记。
//并对服务器状态信息进行排序,找出最有可能是leader的状态保存在参数pTrackerStatus中。
static int relationship_get_tracker_leader(TrackerRunningStatus *pTrackerStatus)
{
TrackerServerInfo *pTrackerServer;
TrackerServerInfo *pTrackerEnd;
TrackerRunningStatus *pStatus;
TrackerRunningStatus trackerStatus[FDFS_MAX_TRACKERS];
int count;
int result;
int r;
int i;
memset(pTrackerStatus, 0, sizeof(TrackerRunningStatus));
pStatus = trackerStatus;
result = 0;
pTrackerEnd = g_tracker_servers.servers + g_tracker_servers.server_count;
//遍历所有的tracker服务器
for (pTrackerServer=g_tracker_servers.servers; \
{
//获取到每个tracker服务器的基本信息
pStatus->pTrackerServer = pTrackerServer;
//获取每个tracker服务器的状态信息
r = tracker_mem_get_status(pTrackerServer, pStatus);
if (r == 0)
{
pStatus++;
}
else if (r != ENOENT)
{
result = r;
}
}
//获取tracker服务器的个数
count = pStatus - trackerStatus;
//tracker服务器的个数为0,直接返回结构
if (count == 0)
{
return result == 0 ? ENOENT : result;
}
//多个traker服务器,需要按照升序排列tracker服务器列表
qsort(trackerStatus, count, sizeof(TrackerRunningStatus), \
relationship_cmp_tracker_status);
for (i=0; i
{
logDebug("file: "__FILE__", line: %d, " \
"%s:%d if_leader: %d, running time: %d, " \
"restart interval: %d", __LINE__, \
trackerStatus[i].pTrackerServer->ip_addr, \
trackerStatus[i].pTrackerServer->port, \
trackerStatus[i].if_leader, \
trackerStatus[i].running_time, \
trackerStatus[i].restart_interval);
}
//获取到最后一个tracker服务器的信息,也就是leader的运行状态信息
memcpy(pTrackerStatus, trackerStatus + (count - 1), \
sizeof(TrackerRunningStatus));
return 0;
}
//按照tracker服务器信息排序
static int relationship_cmp_tracker_status(const void *p1, const void *p2)
{
TrackerRunningStatus *pStatus1;
TrackerRunningStatus *pStatus2;
TrackerServerInfo *pTrackerServer1;
TrackerServerInfo *pTrackerServer2;
int sub;
pStatus1 = (TrackerRunningStatus *)p1;
pStatus2 = (TrackerRunningStatus *)p2;
//leader排在最后面
sub = pStatus1->if_leader - pStatus2->if_leader;
if (sub != 0)
{
return sub;
}
//其次是运行时间长的在后面
sub = pStatus1->running_time - pStatus2->running_time;
if (sub != 0)
{
return sub;
}
//其次是重启间隔时间长的在后面
sub = pStatus2->restart_interval - pStatus1->restart_interval;
if (sub != 0)
{
return sub;
}
pTrackerServer1 = pStatus1->pTrackerServer;
pTrackerServer2 = pStatus2->pTrackerServer;
//ip地址不相等的
sub = strcmp(pTrackerServer1->ip_addr, pTrackerServer2->ip_addr);
if (sub != 0)
{
return sub;
}
//端口大的在后面
return pTrackerServer1->port - pTrackerServer2->port;
}
//获取参数pTrackerServer对应的tracker服务器的状态信息,并保存到参数pStatus中。
int tracker_mem_get_status(TrackerServerInfo *pTrackerServer, \
TrackerRunningStatus *pStatus)
{
char in_buff[1 + 2 * FDFS_PROTO_PKG_LEN_SIZE];
TrackerHeader header;
char *pInBuff;
int64_t in_bytes;
int result;
pTrackerServer->sock = -1;
//连接每个tracker服务器
if ((result=tracker_connect_server(pTrackerServer)) != 0)
{
return result;
}
do
{
memset(&header, 0, sizeof(header));
//发送获取tracker服务器状态命令
header.cmd = TRACKER_PROTO_CMD_TRACKER_GET_STATUS;
if ((result=tcpsenddata_nb(pTrackerServer->sock, &header, \
sizeof(header), g_fdfs_network_timeout)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"send data to tracker server %s:%d fail, " \
"errno: %d, error info: %s", __LINE__, \
pTrackerServer->ip_addr, \
pTrackerServer->port, \
result, STRERROR(result));
result = (result == ENOENT ? EACCES : result);
break;
}
pInBuff = in_buff;
//接受服务器的返回信息
result = fdfs_recv_response(pTrackerServer, &pInBuff, \
sizeof(in_buff), &in_bytes);
//有错误,关闭socket,跳出
if (result != 0)
{
break;
}
//返回值的信息大小不正确
if (in_bytes != sizeof(in_buff))
{
logError("file: "__FILE__", line: %d, " \
"tracker server %s:%d response data " \
"length: "INT64_PRINTF_FORMAT" is invalid, " \
"expect length: %d.", __LINE__, \
pTrackerServer->ip_addr, pTrackerServer->port, \
in_bytes, (int)sizeof(in_buff));
result = EINVAL;
break;
}
//获取tracker状态信息
pStatus->if_leader = *in_buff; //是否是leader
pStatus->running_time = buff2long(in_buff + 1); //运行时间
pStatus->restart_interval = buff2long(in_buff + 1 + \
FDFS_PROTO_PKG_LEN_SIZE); //重启间隔时间
} while (0);
//已获取到tracker服务器信息,关闭socket,并重置sock的值
close(pTrackerServer->sock);
pTrackerServer->sock = -1;
return result;
}
//tracker关系通知函数
static int relationship_notify_leader_changed(TrackerServerInfo *pLeader)
{
TrackerServerInfo *pTrackerServer;
TrackerServerInfo *pTrackerEnd;
int result;
bool bConnectFail;
int success_count;
result = ENOENT;
pTrackerEnd = g_tracker_servers.servers + g_tracker_servers.server_count;
success_count = 0;
//遍历每个tracker服务器,发送
// TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER类型的通告
for (pTrackerServer=g_tracker_servers.servers; \
{
if ((result=relationship_notify_next_leader(pTrackerServer, \
pLeader, &bConnectFail)) != 0)
{
if (!bConnectFail)
{
return result;
}
}
else
{
success_count++;
}
}
if (success_count == 0)
{
return result;
}
success_count = 0;
//遍历tracker服务器
//发送TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER类型通告
for (pTrackerServer=g_tracker_servers.servers; \
{
if ((result=relationship_commit_next_leader(pTrackerServer, \
pLeader, &bConnectFail)) != 0)
{
if (!bConnectFail)
{
return result;
}
}
else
{
success_count++;
}
}
if (success_count == 0)
{
return result;
}
return 0;
}
//在选主流程中,查找trunk存储服务器
void tracker_mem_find_trunk_servers()
{
FDFSGroupInfo **ppGroup;
FDFSGroupInfo **ppGroupEnd;
//若自己不是leader或者没有使用trunk文件,直接返回
if (!(g_if_leader_self && g_if_use_trunk_file))
{
return;
}
pthread_mutex_lock(&mem_thread_lock);
ppGroupEnd = g_groups.groups + g_groups.count;
//遍历各个trunk组
for (ppGroup=g_groups.groups; ppGroup {
if ((*ppGroup)->pTrunkServer == NULL)
{
tracker_mem_find_trunk_server(*ppGroup, true);
}
}
pthread_mutex_unlock(&mem_thread_lock);
}
//向tracker服务器发送,leader改变的命令,共有两个命令:66号和67号。
static int do_notify_leader_changed(TrackerServerInfo *pTrackerServer, \
TrackerServerInfo *pLeader, const char cmd, bool *bConnectFail)
{
char out_buff[sizeof(TrackerHeader) + FDFS_PROTO_IP_PORT_SIZE];
char in_buff[1];
TrackerHeader *pHeader;
char *pInBuff;
int64_t in_bytes;
int result;
pTrackerServer->sock = -1;
//和tracker server建立tcp连接
if ((result=tracker_connect_server(pTrackerServer)) != 0)
{
*bConnectFail = true;
return result;
}
*bConnectFail = false;
//tcp建立成功,继续
do
{
//发送缓冲区:tracker头+ip:port
memset(out_buff, 0, sizeof(out_buff));
//tracker头
pHeader = (TrackerHeader *)out_buff;
//填写tracker命令
pHeader->cmd = cmd;
//填写ip和port值,是一个字符串"ip:port"
sprintf(out_buff + sizeof(TrackerHeader), "%s:%d", \
pLeader->ip_addr, pLeader->port);
//填写数据包长度
long2buff(FDFS_PROTO_IP_PORT_SIZE, pHeader->pkg_len);
//发送数据
if ((result=tcpsenddata_nb(pTrackerServer->sock, out_buff, \
sizeof(out_buff), g_fdfs_network_timeout)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"send data to tracker server %s:%d fail, " \
"errno: %d, error info: %s", __LINE__, \
pTrackerServer->ip_addr, \
pTrackerServer->port, \
result, STRERROR(result));
result = (result == ENOENT ? EACCES : result);
break;
}
//接收缓冲区
pInBuff = in_buff;
//获取返回信息,这里buffsize是0
result = fdfs_recv_response(pTrackerServer, &pInBuff, \
0, &in_bytes);
if (result != 0)
{
break;
}
if (in_bytes != 0)
{
logError("file: "__FILE__", line: %d, " \
"tracker server %s:%d response data " \
"length: "INT64_PRINTF_FORMAT" is invalid, " \
"expect length: %d.", __LINE__, \
pTrackerServer->ip_addr, pTrackerServer->port, \
in_bytes, 0);
result = EINVAL;
break;
}
} while (0);
close(pTrackerServer->sock);
pTrackerServer->sock = -1;
return result;
}
5,tracker服务器端接收到这两个命令的处理方式
5.1 命令TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER
处理函数tracker_deal_notify_next_leader
//处理leader改变的消息
//trackerheader + ip:port
static int tracker_deal_notify_next_leader(struct fast_task_info *pTask)
{
TrackerClientInfo *pClientInfo;
char *pIpAndPort;
char *ipAndPort[2];
TrackerServerInfo leader;
int server_index;
pClientInfo = (TrackerClientInfo *)pTask->arg;
//数据包长度不对
if (pTask->length - sizeof(TrackerHeader) != FDFS_PROTO_IP_PORT_SIZE)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip addr: %s, " \
"package size "PKG_LEN_PRINTF_FORMAT" " \
"is not correct, expect length: %d", __LINE__, \
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID, \
pTask->client_ip, pTask->length - \
(int)sizeof(TrackerHeader), FDFS_PROTO_IP_PORT_SIZE);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
//数据包结尾标志置为'\0'
*(pTask->data + pTask->length) = '\0';
//获取leader的port和ip地址信息
pIpAndPort = pTask->data + sizeof(TrackerHeader);
if (splitEx(pIpAndPort, ':', ipAndPort, 2) != 2)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, invalid ip and port: %s", \
__LINE__, pTask->client_ip, pIpAndPort);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
pTask->length = sizeof(TrackerHeader);
strcpy(leader.ip_addr, ipAndPort[0]);
leader.port = atoi(ipAndPort[1]);
//通过ip地址和port获取leader的编号
server_index = fdfs_get_tracker_leader_index_ex(&g_tracker_servers, \
leader.ip_addr, leader.port);
if (server_index < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, leader %s:%d not exist", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return ENOENT;
}
//若原先的leader是自己,但本次的leader的port和ip地址和自己不同。
//说明自己不再是leader了。
if (g_if_leader_self && (leader.port != g_server_port || \
!is_local_host_ip(leader.ip_addr)))
{
//自己已不再是leader重置该标志
g_if_leader_self = false;
//重置leader编号
g_tracker_servers.leader_index = -1;
//leader改变次数+1
g_tracker_leader_chg_count++;
logError("file: "__FILE__", line: %d, " \
"client ip: %s, two leader occur, " \
"new leader is %s:%d", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return EINVAL;
}
//重置leader编号
g_next_leader_index = server_index;
return 0;
}
5.2 命令TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER
处理函数tracker_deal_commit_next_leader
//确认tracker的leader的值,并重置自己的状态
static int tracker_deal_commit_next_leader(struct fast_task_info *pTask)
{
TrackerClientInfo *pClientInfo;
char *pIpAndPort;
char *ipAndPort[2];
TrackerServerInfo leader;
int server_index;
pClientInfo = (TrackerClientInfo *)pTask->arg;
//数据长度不正确
if (pTask->length - sizeof(TrackerHeader) != FDFS_PROTO_IP_PORT_SIZE)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip addr: %s, " \
"package size "PKG_LEN_PRINTF_FORMAT" " \
"is not correct, expect length: %d", __LINE__, \
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID, \
pTask->client_ip, pTask->length - \
(int)sizeof(TrackerHeader), FDFS_PROTO_IP_PORT_SIZE);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
*(pTask->data + pTask->length) = '\0';
pIpAndPort = pTask->data + sizeof(TrackerHeader);
if (splitEx(pIpAndPort, ':', ipAndPort, 2) != 2)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, invalid ip and port: %s", \
__LINE__, pTask->client_ip, pIpAndPort);
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
pTask->length = sizeof(TrackerHeader);
strcpy(leader.ip_addr, ipAndPort[0]);
leader.port = atoi(ipAndPort[1]);
server_index = fdfs_get_tracker_leader_index_ex(&g_tracker_servers, \
leader.ip_addr, leader.port);
if (server_index < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, leader %s:%d not exist", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return ENOENT;
}
//若获取到的server_index和以前记录的不同,说明leader不对
//在接收第一个包时,tracker_deal_notify_next_leader函数中已设置成一样
//而这里不一样,说明有错误。
if (server_index != g_next_leader_index)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, can't commit leader %s:%d", \
__LINE__, pTask->client_ip, \
leader.ip_addr, leader.port);
return EINVAL;
}
//重置server_index
g_tracker_servers.leader_index = server_index;
//若leader是自己
if (leader.port == g_server_port && is_local_host_ip(leader.ip_addr))
{
//设置g_if_leader_self值为true
g_if_leader_self = true;
g_tracker_leader_chg_count++;
}
else
{
logInfo("file: "__FILE__", line: %d, " \
"the tracker leader is %s:%d", __LINE__, \
leader.ip_addr, leader.port);
}
return 0;
}
6, leader已存在的ping消息流
relationship_ping_leader函数
//向tracker的leader发送Ping消息,并获取storage的组信息。
static int fdfs_ping_leader(TrackerServerInfo *pTrackerServer)
{
TrackerHeader header;
int result;
int success_count;
int64_t in_bytes;
char in_buff[(FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE) * \
FDFS_MAX_GROUPS];
char *pInBuff;
char *p;
char *pEnd;
FDFSGroupInfo *pGroup;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char trunk_server_ip[IP_ADDRESS_SIZE];
memset(&header, 0, sizeof(header));
//填写命令关键字
header.cmd = TRACKER_PROTO_CMD_TRACKER_PING_LEADER;
//向leader发送命令
result = tcpsenddata_nb(pTrackerServer->sock, &header, \
sizeof(header), g_fdfs_network_timeout);
if(result != 0)
{
logError("file: "__FILE__", line: %d, " \
"tracker server ip: %s, send data fail, " \
"errno: %d, error info: %s", \
__LINE__, pTrackerServer->ip_addr, \
result, STRERROR(result));
return result;
}
pInBuff = in_buff;
//接受leader的反馈信息
if ((result=fdfs_recv_response(pTrackerServer, &pInBuff, \
sizeof(in_buff), &in_bytes)) != 0)
{
return result;
}
//若没有反馈信息直接返回0
if (in_bytes == 0)
{
return 0;
}
else if (in_bytes % (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE) != 0)
{
logError("file: "__FILE__", line: %d, " \
"tracker server ip: %s, invalid body length: " \
INT64_PRINTF_FORMAT, __LINE__, \
pTrackerServer->ip_addr, in_bytes);
return EINVAL;
}
success_count = 0;
memset(group_name, 0, sizeof(group_name));
memset(trunk_server_ip, 0, sizeof(trunk_server_ip));
pEnd = in_buff + in_bytes;
//leader反馈的是组信息
for (p=in_buff; p
{
//保存组名
memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN);
//保存trunk服务器信息
memcpy(trunk_server_ip, p + FDFS_GROUP_NAME_MAX_LEN, \
IP_ADDRESS_SIZE - 1);
pGroup = tracker_mem_get_group(group_name);
if (pGroup == NULL)
{
logWarning("file: "__FILE__", line: %d, " \
"tracker server ip: %s, group: %s not exists", \
__LINE__, pTrackerServer->ip_addr, group_name);
continue;
}
if (*trunk_server_ip == '\0')
{
*(pGroup->last_trunk_server_ip) = '\0';
pGroup->pTrunkServer = NULL;
success_count++;
continue;
}
//获取trunk服务器信息
pGroup->pTrunkServer = tracker_mem_get_storage(pGroup, \
trunk_server_ip);
if (pGroup->pTrunkServer == NULL)
{
logWarning("file: "__FILE__", line: %d, " \
"tracker server ip: %s, group: %s, " \
"trunk server: %s not exists", \
__LINE__, pTrackerServer->ip_addr, \
group_name, trunk_server_ip);
}
snprintf(pGroup->last_trunk_server_ip, sizeof( \
pGroup->last_trunk_server_ip), "%s", trunk_server_ip);
success_count++;
}
//保存组信息到文件中
if (success_count > 0)
{
tracker_save_groups();
}
return 0;
}
//tracker服务处理TRACKER_PROTO_CMD_TRACKER_PING_LEADER命令的函数
tracker_deal_ping_leader()
static int tracker_deal_ping_leader(struct fast_task_info *pTask)
{
FDFSGroupInfo **ppGroup;
FDFSGroupInfo **ppEnd;
int body_len;
char *p;
TrackerClientInfo *pClientInfo;
pClientInfo = (TrackerClientInfo *)pTask->arg;
//判断数据包长度是否合法
if (pTask->length - sizeof(TrackerHeader) != 0)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
PKG_LEN_PRINTF_FORMAT" is not correct, " \
"expect length 0", __LINE__, \
TRACKER_PROTO_CMD_TRACKER_PING_LEADER, \
pTask->client_ip, \
pTask->length - (int)sizeof(TrackerHeader));
pTask->length = sizeof(TrackerHeader);
return EINVAL;
}
//若leader不是自己,直接返回错误
if (!g_if_leader_self)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, i am not the leader!", \
__LINE__, TRACKER_PROTO_CMD_TRACKER_PING_LEADER, \
pTask->client_ip);
pTask->length = sizeof(TrackerHeader);
return EOPNOTSUPP;
}
if (pClientInfo->chg_count.trunk_server == g_trunk_server_chg_count)
{
pTask->length = sizeof(TrackerHeader);
return 0;
}
body_len = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE) * g_groups.count;
if (body_len + sizeof(TrackerHeader) > pTask->size)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, " \
"exceeds max package size: %d!", \
__LINE__, TRACKER_PROTO_CMD_TRACKER_PING_LEADER, \
pTask->client_ip, pTask->size);
pTask->length = sizeof(TrackerHeader);
return ENOSPC;
}
//p指向数据包的数据部分
p = pTask->data + sizeof(TrackerHeader);
memset(p, 0, body_len);
//准备发送组信息
ppEnd = g_groups.sorted_groups + g_groups.count;
for (ppGroup=g_groups.sorted_groups; ppGroup
{
memcpy(p, (*ppGroup)->group_name, FDFS_GROUP_NAME_MAX_LEN);
p += FDFS_GROUP_NAME_MAX_LEN;
//若该组的trunk服务器不为空
if ((*ppGroup)->pTrunkServer != NULL)
{
//汇报trunk服务器的ip地址
memcpy(p, (*ppGroup)->pTrunkServer->ip_addr, IP_ADDRESS_SIZE);
}
p += IP_ADDRESS_SIZE;
}
pTask->length = p - pTask->data;
pClientInfo->chg_count.trunk_server = g_trunk_server_chg_count;
return 0;
}
阅读(10356) | 评论(0) | 转发(1) |