Chinaunix首页 | 论坛 | 博客
  • 博客访问: 41273
  • 博文数量: 24
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 130
  • 用 户 组: 普通用户
  • 注册时间: 2011-08-03 19:07
文章分类
文章存档

2012年(21)

2011年(3)

分类:

2012-05-27 18:30:01

FastDFS 文件上传机制,涉及到cient API、tracker、storage,所以这篇文章的思路是从client 执行upload 的流程说起,逐步深入到 tracker 、storage 的内部。
fdfs_upload_file.c 是上传的一个客户端,通过调用api实现上传文件至FastDFS。

首先看一下代码内容,fdfs_upload_file.c :

  1. int main(int argc, char *argv[])
  2. {
  3.     char *conf_filename;
  4.     char *local_filename;
  5.     char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
  6.     TrackerServerInfo *pTrackerServer;
  7.     int result;
  8.     int store_path_index;
  9.     TrackerServerInfo storageServer;
  10.     char file_id[128];
  11.     
  12.     if (argc < 3)
  13.     {
  14.         printf("Usage: %s \n", argv[0]);
  15.         return 1;
  16.     }

  17.     log_init();
  18.     g_log_context.log_level = LOG_ERR;

  19.     conf_filename = argv[1];
  20.     if ((result=fdfs_client_init(conf_filename)) != 0)
  21.     {
  22.         return result;
  23.     }

  24.     pTrackerServer = tracker_get_connection();
  25.     if (pTrackerServer == NULL)
  26.     {
  27.         fdfs_client_destroy();
  28.         return errno != 0 ? errno : ECONNREFUSED;
  29.     }


  30.     if ((result=tracker_query_storage_store(pTrackerServer, \
  31.      &storageServer, &store_path_index)) != 0)
  32.     {
  33.         fdfs_client_destroy();
  34.         fprintf(stderr, "tracker_query_storage fail, " \
  35.             "error no: %d, error info: %s\n", \
  36.             result, STRERROR(result));
  37.         return result;
  38.     }

  39.     strcpy(group_name, "");
  40.     local_filename = argv[2];
  41.     result = storage_upload_by_filename1(pTrackerServer, \
  42.             &storageServer, store_path_index, \
  43.             local_filename, NULL, \
  44.             NULL, 0, group_name, file_id);
  45.     if (result != 0)
  46.     {
  47.         fprintf(stderr, "upload file fail, " \
  48.             "error no: %d, error info: %s\n", \
  49.             result, STRERROR(result));

  50.         if (storageServer.sock >= 0)
  51.         {
  52.             fdfs_quit(&storageServer);
  53.         }
  54.         tracker_disconnect_server(&storageServer);

  55.         fdfs_quit(pTrackerServer);
  56.         tracker_close_all_connections();
  57.         fdfs_client_destroy();
  58.         return result;
  59.     }

  60.     printf("%s\n", file_id);

  61.     fdfs_quit(pTrackerServer);
  62.     tracker_close_all_connections();
  63.     fdfs_client_destroy();

  64.     return 0;
  65. }
fdfs_upload_file.c 40行:

  1.     conf_filename = argv[1];
  2.     if ((result=fdfs_client_init(conf_filename)) != 0)
  3.     {
  4.         return result;
  5.     }
代码分析:
1. client 调用 fdfs_client_init 读取配置文件;
2. 深入到 fdfs_client_init 的内部,会调用 fdfs_load_tracker_group_ex;
3. 而fdfs_load_tracker_group_ex,会从配置文件中解析出tracker_server字段的内容,获取tracker 服务器地址信息。

fdfs_upload_file.c 46行: 
  1.     pTrackerServer = tracker_get_connection();
  2.     if (pTrackerServer == NULL)
  3.     {
  4.         fdfs_client_destroy();
  5.         return errno != 0 ? errno : ECONNREFUSED;
  6.     }
通过跟踪,我们可以进入到tracker_get_connection()内部,里面调用了 tracker_client.c 65行:

  1. TrackerServerInfo *tracker_get_connection_ex(TrackerServerGroup *pTrackerGroup)
  2. {
  3.     TrackerServerInfo *pCurrentServer;
  4.     TrackerServerInfo *pResult;
  5.     TrackerServerInfo *pServer;
  6.     TrackerServerInfo *pEnd;
  7.     int server_index;

  8.     server_index = pTrackerGroup->server_index;
  9.     if (server_index >= pTrackerGroup->server_count)
  10.     {
  11.         server_index = 0;
  12.     }

  13.     pResult = NULL;

  14.     do
  15.     {
  16.     pCurrentServer = pTrackerGroup->servers + server_index;
  17.     if (pCurrentServer->sock >= 0 ||
  18.         tracker_connect_server(pCurrentServer) == 0)
  19.     {
  20.         pResult = pCurrentServer;
  21.         break;
  22.     }

  23.     pEnd = pTrackerGroup->servers + pTrackerGroup->server_count;
  24.     for (pServer=pCurrentServer+1; pServer<pEnd; pServer++)
  25.     {
  26.         if (pServer->sock >= 0 || tracker_connect_server(pServer) == 0)
  27.         {
  28.             pResult = pServer;
  29.             pTrackerGroup->server_index = pServer - \
  30.                             pTrackerGroup->servers;
  31.             break;
  32.         }
  33.     }

  34.     if (pResult != NULL)
  35.     {
  36.         break;
  37.     }

  38.     for (pServer=pTrackerGroup->servers; pServer<pCurrentServer; pServer++)
  39.     {
  40.         if (pServer->sock >= 0 || tracker_connect_server(pServer) == 0)
  41.         {
  42.             pResult = pServer;
  43.             pTrackerGroup->server_index = pServer - \
  44.                             pTrackerGroup->servers;
  45.             break;
  46.         }
  47.     }
  48.     } while (0);

  49.     pTrackerGroup->server_index++;
  50.     if (pTrackerGroup->server_index >= pTrackerGroup->server_count)
  51.     {
  52.         pTrackerGroup->server_index = 0;
  53.     }

  54.     return pResult;
  55. }
观察这段代码,主要的动作就是do-while,尝试 tracker_connect_server 连接tracker 服务器,一旦连接成功,会返回 TrackerServerInfo * 的tracker 结构。

fdfs_upload_file.c 56行: 
  1.    if ((result=tracker_query_storage_store(pTrackerServer, \
  2.      &storageServer, &store_path_index)) != 0)
  3.     {
  4.         fdfs_client_destroy();
  5.         fprintf(stderr, "tracker_query_storage fail, " \
  6.             "error no: %d, error info: %s\n", \
  7.             result, STRERROR(result));
  8.         return result;
  9.     }
代码分析:
1. tracker_query_storage_store 是 tracker_query_storage_store_without_group的宏定义;

我们到 tracker_query_storage_store_without_group,tracker_client.c 770行:

  1. int tracker_query_storage_store_without_group(TrackerServerInfo *pTrackerServer,
  2.         TrackerServerInfo *pStorageServer, int *store_path_index)
  3. {
  4.     TrackerHeader header;
  5.     char in_buff[sizeof(TrackerHeader) + \
  6.         TRACKER_QUERY_STORAGE_STORE_BODY_LEN];
  7.     char *pInBuff;
  8.     int64_t in_bytes;
  9.     int result;

  10.     if (pTrackerServer->sock < 0)
  11.     {
  12.         if ((result=tracker_connect_server(pTrackerServer)) != 0)
  13.         {
  14.             return result;
  15.         }
  16.     }

  17.     memset(pStorageServer, 0, sizeof(TrackerServerInfo));
  18.     pStorageServer->sock = -1;

  19.     memset(&header, 0, sizeof(header));
  20.     header.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;
  21.     if ((result=tcpsenddata_nb(pTrackerServer->sock, &header, \
  22.             sizeof(header), g_fdfs_network_timeout)) != 0)
  23.     {
  24.         logError("file: "__FILE__", line: %d, " \
  25.             "send data to tracker server %s:%d fail, " \
  26.             "errno: %d, error info: %s", __LINE__, \
  27.             pTrackerServer->ip_addr, \
  28.             pTrackerServer->port, \
  29.             result, STRERROR(result));
  30.     }
  31.     else
  32.     {
  33.         pInBuff = in_buff;
  34.         result = fdfs_recv_response(pTrackerServer, \
  35.                 &pInBuff, sizeof(in_buff), &in_bytes);
  36.     }

  37.     if (result != 0)
  38.     {
  39.         close(pTrackerServer->sock);
  40.         pTrackerServer->sock = -1;

  41.         return result;
  42.     }

  43.     if (in_bytes != TRACKER_QUERY_STORAGE_STORE_BODY_LEN)
  44.     {
  45.         logError("file: "__FILE__", line: %d, " \
  46.             "tracker server %s:%d response data " \
  47.             "length: "INT64_PRINTF_FORMAT" is invalid, " \
  48.             "expect length: %d", __LINE__, \
  49.             pTrackerServer->ip_addr, pTrackerServer->port, \
  50.             in_bytes, TRACKER_QUERY_STORAGE_STORE_BODY_LEN);
  51.         return EINVAL;
  52.     }

  53.     memcpy(pStorageServer->group_name, in_buff, \
  54.             FDFS_GROUP_NAME_MAX_LEN);
  55.     memcpy(pStorageServer->ip_addr, in_buff + \
  56.             FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE-1);
  57.     pStorageServer->port = (int)buff2long(in_buff + \
  58.                 FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1);
  59.     *store_path_index = *(in_buff + FDFS_GROUP_NAME_MAX_LEN + \
  60.              IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE);

  61.     return 0;
  62. }
代码分析:
1. client 连接成功 tracker 后,会向tracker 发起 TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE 命令;
2. tracker server 处理完成后,返回client 应答信息;
3. client 解析出 group name、storage ip_addr、storage port、store_path_index等信息。

我们接着看client 的动作, fdfs_upload_file.c 64行: 

  1.     strcpy(group_name, "");
  2.     local_filename = argv[2];
  3.     result = storage_upload_by_filename1(pTrackerServer, \
  4.             &storageServer, store_path_index, \
  5.             local_filename, NULL, \
  6.             NULL, 0, group_name, file_id);
这里,通过调用storage_upload_by_filename1,完成文件上传。

进入storage_upload_by_filename1,它是storage_upload_by_filename1_ex 的宏定义,注意命令参数是STORAGE_PROTO_CMD_UPLOAD_FILE。这个宏定义在 storage_client1.h 34行:

  1. #define storage_upload_by_filename1(pTrackerServer, pStorageServer, \
  2.         store_path_index, local_filename, file_ext_name, \
  3.         meta_list, meta_count, group_name, file_id) \
  4.     storage_upload_by_filename1_ex(pTrackerServer, pStorageServer, \
  5.         store_path_index, STORAGE_PROTO_CMD_UPLOAD_FILE, \
  6.         local_filename, file_ext_name, meta_list, meta_count, \
  7.         group_name, file_id)
我们进入到storage_upload_by_filename1_ex 内部实现,它又调用了storage_upload_by_filename_ex,进而调用storage_do_upload_file进行文件上传动作。注意,调用 storage_do_upload_file时有个参数是FDFS_UPLOAD_BY_FILE。

真正执行上传的函数是 storage_client.c 829行:

  1. int storage_do_upload_file(TrackerServerInfo *pTrackerServer, \
  2.     TrackerServerInfo *pStorageServer, const int store_path_index, \
  3.     const char cmd, const int upload_type, const char *file_buff, \
  4.     void *arg, const int64_t file_size, const char *master_filename, \
  5.     const char *prefix_name, const char *file_ext_name, \
  6.     const FDFSMetaData *meta_list, const int meta_count, \
  7.     char *group_name, char *remote_filename)
在storage_do_upload_file内部,client会向storage交互,发送cmd请求。
注意 storage_client.c 971行:

  1. if (upload_type == FDFS_UPLOAD_BY_FILE)
  2.     {
  3.         if ((result=tcpsendfile(pStorageServer->sock, file_buff, \
  4.             file_size, g_fdfs_network_timeout, \
  5.             &total_send_bytes)) != 0)
  6.         {
  7.             break;
  8.         }
  9.     }
如果upload_type 是 FDFS_UPLOAD_BY_FILE类型,那么tcpsendfile直接传送文件,这就涉及到网络发送部分,具体的函数实现在sockopt.c 里面。可以理解为网络底层的socket send/recv通信。

fdfs_upload_file.c 88 行: 
  1.     printf("%s\n", file_id);

  2.     fdfs_quit(pTrackerServer);
  3.     tracker_close_all_connections();
  4.     fdfs_client_destroy();
client 上传文件成功之后,storage 会返回file_id。 接着,client 关闭socket 连接。

至此,client 流程分析完成。通过分析,client 和 tracker 交互,是通过命令TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE ,而和storage 交互,是通过命令 STORAGE_PROTO_CMD_UPLOAD_FILE 。接下来,我们逐步进入tracker 、 storage 的内部实现。

通过前几篇文章的介绍,我们知道,tracker网络接收正常后, 处理任务命令的入口在 tracker_service.c
 3137行,int tracker_deal_task(struct fast_task_info *pTask) 函数;
而storage  处理任务命令的入口在storage_service.c  6456 行,int storage_deal_task(struct fast_task_info *pTask).
我们逐步展开对tracker、storage 的分析。


欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842



阅读(1110) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~