FastDFS 文件上传机制,涉及到cient API、tracker、storage,所以这篇文章的思路是从client 执行upload 的流程说起,逐步深入到 tracker 、storage 的内部。
fdfs_upload_file.c 是上传的一个客户端,通过调用api实现上传文件至FastDFS。
首先看一下代码内容,fdfs_upload_file.c :
- int main(int argc, char *argv[])
- {
- char *conf_filename;
- char *local_filename;
- char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
- TrackerServerInfo *pTrackerServer;
- int result;
- int store_path_index;
- TrackerServerInfo storageServer;
- char file_id[128];
-
- if (argc < 3)
- {
- printf("Usage: %s \n", argv[0]);
- return 1;
- }
- log_init();
- g_log_context.log_level = LOG_ERR;
- conf_filename = argv[1];
- if ((result=fdfs_client_init(conf_filename)) != 0)
- {
- return result;
- }
- pTrackerServer = tracker_get_connection();
- if (pTrackerServer == NULL)
- {
- fdfs_client_destroy();
- return errno != 0 ? errno : ECONNREFUSED;
- }
- if ((result=tracker_query_storage_store(pTrackerServer, \
- &storageServer, &store_path_index)) != 0)
- {
- fdfs_client_destroy();
- fprintf(stderr, "tracker_query_storage fail, " \
- "error no: %d, error info: %s\n", \
- result, STRERROR(result));
- return result;
- }
- strcpy(group_name, "");
- local_filename = argv[2];
- result = storage_upload_by_filename1(pTrackerServer, \
- &storageServer, store_path_index, \
- local_filename, NULL, \
- NULL, 0, group_name, file_id);
- if (result != 0)
- {
- fprintf(stderr, "upload file fail, " \
- "error no: %d, error info: %s\n", \
- result, STRERROR(result));
- if (storageServer.sock >= 0)
- {
- fdfs_quit(&storageServer);
- }
- tracker_disconnect_server(&storageServer);
- fdfs_quit(pTrackerServer);
- tracker_close_all_connections();
- fdfs_client_destroy();
- return result;
- }
- printf("%s\n", file_id);
- fdfs_quit(pTrackerServer);
- tracker_close_all_connections();
- fdfs_client_destroy();
- return 0;
- }
fdfs_upload_file.c 40行:
- conf_filename = argv[1];
- if ((result=fdfs_client_init(conf_filename)) != 0)
- {
- return result;
- }
代码分析:
1. client 调用 fdfs_client_init 读取配置文件;
2. 深入到 fdfs_client_init 的内部,会调用 fdfs_load_tracker_group_ex;
3. 而fdfs_load_tracker_group_ex,会从配置文件中解析出tracker_server字段的内容,获取tracker 服务器地址信息。
fdfs_upload_file.c 46行:
- pTrackerServer = tracker_get_connection();
- if (pTrackerServer == NULL)
- {
- fdfs_client_destroy();
- return errno != 0 ? errno : ECONNREFUSED;
- }
通过跟踪,我们可以进入到tracker_get_connection()内部,里面调用了 tracker_client.c 65行:
- TrackerServerInfo *tracker_get_connection_ex(TrackerServerGroup *pTrackerGroup)
- {
- TrackerServerInfo *pCurrentServer;
- TrackerServerInfo *pResult;
- TrackerServerInfo *pServer;
- TrackerServerInfo *pEnd;
- int server_index;
- server_index = pTrackerGroup->server_index;
- if (server_index >= pTrackerGroup->server_count)
- {
- server_index = 0;
- }
- pResult = NULL;
- do
- {
- pCurrentServer = pTrackerGroup->servers + server_index;
- if (pCurrentServer->sock >= 0 ||
- tracker_connect_server(pCurrentServer) == 0)
- {
- pResult = pCurrentServer;
- break;
- }
- pEnd = pTrackerGroup->servers + pTrackerGroup->server_count;
- for (pServer=pCurrentServer+1; pServer<pEnd; pServer++)
- {
- if (pServer->sock >= 0 || tracker_connect_server(pServer) == 0)
- {
- pResult = pServer;
- pTrackerGroup->server_index = pServer - \
- pTrackerGroup->servers;
- break;
- }
- }
- if (pResult != NULL)
- {
- break;
- }
- for (pServer=pTrackerGroup->servers; pServer<pCurrentServer; pServer++)
- {
- if (pServer->sock >= 0 || tracker_connect_server(pServer) == 0)
- {
- pResult = pServer;
- pTrackerGroup->server_index = pServer - \
- pTrackerGroup->servers;
- break;
- }
- }
- } while (0);
- pTrackerGroup->server_index++;
- if (pTrackerGroup->server_index >= pTrackerGroup->server_count)
- {
- pTrackerGroup->server_index = 0;
- }
- return pResult;
- }
观察这段代码,主要的动作就是do-while,尝试 tracker_connect_server 连接tracker 服务器,一旦连接成功,会返回 TrackerServerInfo * 的tracker 结构。
fdfs_upload_file.c 56行:
- if ((result=tracker_query_storage_store(pTrackerServer, \
- &storageServer, &store_path_index)) != 0)
- {
- fdfs_client_destroy();
- fprintf(stderr, "tracker_query_storage fail, " \
- "error no: %d, error info: %s\n", \
- result, STRERROR(result));
- return result;
- }
代码分析:
1. tracker_query_storage_store 是 tracker_query_storage_store_without_group的宏定义;
我们到 tracker_query_storage_store_without_group,tracker_client.c 770行:
- int tracker_query_storage_store_without_group(TrackerServerInfo *pTrackerServer,
- TrackerServerInfo *pStorageServer, int *store_path_index)
- {
- TrackerHeader header;
- char in_buff[sizeof(TrackerHeader) + \
- TRACKER_QUERY_STORAGE_STORE_BODY_LEN];
- char *pInBuff;
- int64_t in_bytes;
- int result;
- if (pTrackerServer->sock < 0)
- {
- if ((result=tracker_connect_server(pTrackerServer)) != 0)
- {
- return result;
- }
- }
- memset(pStorageServer, 0, sizeof(TrackerServerInfo));
- pStorageServer->sock = -1;
- memset(&header, 0, sizeof(header));
- header.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;
- if ((result=tcpsenddata_nb(pTrackerServer->sock, &header, \
- sizeof(header), g_fdfs_network_timeout)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "send data to tracker server %s:%d fail, " \
- "errno: %d, error info: %s", __LINE__, \
- pTrackerServer->ip_addr, \
- pTrackerServer->port, \
- result, STRERROR(result));
- }
- else
- {
- pInBuff = in_buff;
- result = fdfs_recv_response(pTrackerServer, \
- &pInBuff, sizeof(in_buff), &in_bytes);
- }
- if (result != 0)
- {
- close(pTrackerServer->sock);
- pTrackerServer->sock = -1;
- return result;
- }
- if (in_bytes != TRACKER_QUERY_STORAGE_STORE_BODY_LEN)
- {
- logError("file: "__FILE__", line: %d, " \
- "tracker server %s:%d response data " \
- "length: "INT64_PRINTF_FORMAT" is invalid, " \
- "expect length: %d", __LINE__, \
- pTrackerServer->ip_addr, pTrackerServer->port, \
- in_bytes, TRACKER_QUERY_STORAGE_STORE_BODY_LEN);
- return EINVAL;
- }
- memcpy(pStorageServer->group_name, in_buff, \
- FDFS_GROUP_NAME_MAX_LEN);
- memcpy(pStorageServer->ip_addr, in_buff + \
- FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE-1);
- pStorageServer->port = (int)buff2long(in_buff + \
- FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1);
- *store_path_index = *(in_buff + FDFS_GROUP_NAME_MAX_LEN + \
- IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE);
- return 0;
- }
代码分析:
1. client 连接成功 tracker 后,会向tracker 发起 TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE 命令;
2. tracker server 处理完成后,返回client 应答信息;
3. client 解析出 group name、storage ip_addr、storage port、store_path_index等信息。
我们接着看client 的动作, fdfs_upload_file.c 64行:
- strcpy(group_name, "");
- local_filename = argv[2];
- result = storage_upload_by_filename1(pTrackerServer, \
- &storageServer, store_path_index, \
- local_filename, NULL, \
- NULL, 0, group_name, file_id);
这里,通过调用storage_upload_by_filename1,完成文件上传。
进入storage_upload_by_filename1,它是storage_upload_by_filename1_ex 的宏定义,注意命令参数是STORAGE_PROTO_CMD_UPLOAD_FILE。这个宏定义在 storage_client1.h 34行:
- #define storage_upload_by_filename1(pTrackerServer, pStorageServer, \
- store_path_index, local_filename, file_ext_name, \
- meta_list, meta_count, group_name, file_id) \
- storage_upload_by_filename1_ex(pTrackerServer, pStorageServer, \
- store_path_index, STORAGE_PROTO_CMD_UPLOAD_FILE, \
- local_filename, file_ext_name, meta_list, meta_count, \
- group_name, file_id)
我们进入到storage_upload_by_filename1_ex 内部实现,它又调用了storage_upload_by_filename_ex,进而调用storage_do_upload_file进行文件上传动作。注意,调用 storage_do_upload_file时有个参数是FDFS_UPLOAD_BY_FILE。
真正执行上传的函数是 storage_client.c 829行:
- int storage_do_upload_file(TrackerServerInfo *pTrackerServer, \
- TrackerServerInfo *pStorageServer, const int store_path_index, \
- const char cmd, const int upload_type, const char *file_buff, \
- void *arg, const int64_t file_size, const char *master_filename, \
- const char *prefix_name, const char *file_ext_name, \
- const FDFSMetaData *meta_list, const int meta_count, \
- char *group_name, char *remote_filename)
在storage_do_upload_file内部,client会向storage交互,发送cmd请求。
注意 storage_client.c 971行:
- if (upload_type == FDFS_UPLOAD_BY_FILE)
- {
- if ((result=tcpsendfile(pStorageServer->sock, file_buff, \
- file_size, g_fdfs_network_timeout, \
- &total_send_bytes)) != 0)
- {
- break;
- }
- }
如果upload_type 是 FDFS_UPLOAD_BY_FILE类型,那么tcpsendfile直接传送文件,这就涉及到网络发送部分,具体的函数实现在sockopt.c 里面。可以理解为网络底层的socket send/recv通信。
fdfs_upload_file.c 88 行:
- printf("%s\n", file_id);
- fdfs_quit(pTrackerServer);
- tracker_close_all_connections();
- fdfs_client_destroy();
client 上传文件成功之后,storage 会返回file_id。 接着,client 关闭socket 连接。
至此,client 流程分析完成。通过分析,client 和 tracker 交互,是通过命令TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE ,而和storage 交互,是通过命令
STORAGE_PROTO_CMD_UPLOAD_FILE 。接下来,我们逐步进入tracker 、 storage 的内部实现。
通过前几篇文章的介绍,我们知道,tracker网络接收正常后, 处理任务命令的入口在 tracker_service.c
3137行,int tracker_deal_task(struct fast_task_info *pTask) 函数;
而storage 处理任务命令的入口在storage_service.c 6456 行,int storage_deal_task(struct fast_task_info *pTask).
我们逐步展开对tracker、storage 的分析。
欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842
阅读(8302) | 评论(0) | 转发(1) |