关于下载,其实和上传文件很相似。这里我们暂时先不考虑nginx httpd的方式,只通过client api 方式与FastDFS 交互。
首先,我们看fdfs_download_file.c ,这个文件下载文件的客户端例子,直接看71行:
- result = storage_download_file_to_file1( \
- pTrackerServer, NULL, \
- file_id, local_filename, &file_size);
- if (result != 0)
- {
- printf("download file fail, " \
- "error no: %d, error info: %s\n", \
- result, STRERROR(result));
- }
我们到这个函数的内部,storage_client.c 707行:
- int storage_download_file_to_file1(TrackerServerInfo *pTrackerServer, \
- TrackerServerInfo *pStorageServer, \
- const char *file_id, \
- const char *local_filename, int64_t *file_size)
- {
- FDFS_SPLIT_GROUP_NAME_AND_FILENAME(file_id)
- return storage_download_file_to_file(pTrackerServer, \
- pStorageServer, group_name, filename, \
- local_filename, file_size);
- }
代码分析:
1. 首先从file_id ,解析出group name、file name
2. 调用storage_download_file_to_file
接着往下分析,进入storage_download_file_to_file,storage_client.c 719行:
- int storage_download_file_to_file(TrackerServerInfo *pTrackerServer, \
- TrackerServerInfo *pStorageServer, \
- const char *group_name, const char *remote_filename, \
- const char *local_filename, int64_t *file_size)
- {
- char *pLocalFilename;
- pLocalFilename = (char *)local_filename;
- return storage_do_download_file(pTrackerServer, pStorageServer, \
- FDFS_DOWNLOAD_TO_FILE, group_name, remote_filename, \
- &pLocalFilename, NULL, file_size);
- }
storage_download_file_to_file函数内部,会调用storage_do_download_file下载文件,注意命令FDFS_DOWNLOAD_TO_FILE。
我们通过分析storage_do_download_file,内部会和tracker服务器命令交互TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE,获取合适的storage服务器。然后和storage 通过 STORAGE_PROTO_CMD_DOWNLOAD_FILE 命令下载文件。这部分和上传机制类似。我们不做详细分析。
这里,由于与tracker 交互时候发的命令是TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE,这点与upload机制类似。读者可以参考此前关于upload的机制分析。
我们着重分析下与storage 交互的过程。
storage_service.c 6456行:
- int storage_deal_task(struct fast_task_info *pTask)
storage_deal_task 是storage 处理任务的入口,我们之间观察STORAGE_PROTO_CMD_DOWNLOAD_FILE,storage_service.c 6467行:
- case STORAGE_PROTO_CMD_DOWNLOAD_FILE:
- result = storage_server_download_file(pTask);
- break;
接着分析
storage_server_download_file,storage_service.c 5489行:
- return storage_read_from_file(pTask, file_offset, download_bytes, \
- storage_download_file_done_callback, store_path_index);
代码分析:
1. storage_server_download_file 首先解析协议,解析出file相关信息;
2. 然后调用storage_read_from_file,注意回调函数storage_download_file_done_callback
接着往下分析,storage_service.c 5520行:
- static int storage_read_from_file(struct fast_task_info *pTask, \
- const int64_t file_offset, const int64_t download_bytes, \
- FileDealDoneCallback done_callback, \
- const int store_path_index)
- {
- StorageClientInfo *pClientInfo;
- StorageFileContext *pFileContext;
- TrackerHeader *pHeader;
- int result;
- pClientInfo = (StorageClientInfo *)pTask->arg;
- pFileContext = &(pClientInfo->file_context);
- pClientInfo->deal_func = dio_read_file;
- pClientInfo->clean_func = dio_read_finish_clean_up;
- pClientInfo->total_length = sizeof(TrackerHeader) + download_bytes;
- pClientInfo->total_offset = 0;
- pFileContext->op = FDFS_STORAGE_FILE_OP_READ;
- pFileContext->open_flags = O_RDONLY | g_extra_open_file_flags;
- pFileContext->offset = file_offset;
- pFileContext->start = file_offset;
- pFileContext->end = file_offset + download_bytes;
- pFileContext->dio_thread_index = storage_dio_get_thread_index( \
- pTask, store_path_index, pFileContext->op);
- pFileContext->done_callback = done_callback;
- pTask->length = sizeof(TrackerHeader);
- pHeader = (TrackerHeader *)pTask->data;
- pHeader->status = 0;
- pHeader->cmd = STORAGE_PROTO_CMD_RESP;
- long2buff(download_bytes, pHeader->pkg_len);
- if ((result=storage_dio_queue_push(pTask)) != 0)
- {
- if (pFileContext->fd >= 0)
- {
- close(pFileContext->fd);
- }
- pClientInfo->total_length = sizeof(TrackerHeader);
- return result;
- }
- return STORAGE_STATUE_DEAL_FILE;
- }
代码分析:
1.这里和上传相似,通过队列,实现nio和dio 的转换;
2. 注意 FDFS_STORAGE_FILE_OP_READ,说明操作是读取文件;
3. pClientInfo->deal_func = dio_read_file;
4. 上面的函数指针是dio 执行函数;
5. pFileContext->done_callback = done_callback;完成时的回调行数,也就是之前提到的storage_download_file_done_callback。
storage_dio.c 中 dio_thread_entrance 函数处理和上传机制一样。我们直接分析deal_func函数,这里是dio_read_file。
storage_dio.c 334行:
- int dio_read_file(struct fast_task_info *pTask)
- {
- StorageFileContext *pFileContext;
- int result;
- int64_t remain_bytes;
- int capacity_bytes;
- int read_bytes;
- pFileContext = &(((StorageClientInfo *)pTask->arg)->file_context);
- do
- {
- if ((result=dio_open_file(pFileContext)) != 0)
- {
- break;
- }
- remain_bytes = pFileContext->end - pFileContext->offset;
- capacity_bytes = pTask->size - pTask->length;
- read_bytes = (capacity_bytes < remain_bytes) ? \
- capacity_bytes : remain_bytes;
- /*
- logInfo("###before dio read bytes: %d, pTask->length=%d, file offset=%ld", \
- read_bytes, pTask->length, pFileContext->offset);
- */
- if (read(pFileContext->fd, pTask->data + pTask->length, \
- read_bytes) != read_bytes)
- {
- result = errno != 0 ? errno : EIO;
- logError("file: "__FILE__", line: %d, " \
- "read from file: %s fail, " \
- "errno: %d, error info: %s", \
- __LINE__, pFileContext->filename, \
- result, STRERROR(result));
- }
- pthread_mutex_lock(&g_dio_thread_lock);
- g_storage_stat.total_file_read_count++;
- if (result == 0)
- {
- g_storage_stat.success_file_read_count++;
- }
- pthread_mutex_unlock(&g_dio_thread_lock);
- if (result != 0)
- {
- break;
- }
- pTask->length += read_bytes;
- pFileContext->offset += read_bytes;
- /*
- logInfo("###after dio read bytes: %d, pTask->length=%d, file offset=%ld", \
- read_bytes, pTask->length, pFileContext->offset);
- */
- if (pFileContext->offset < pFileContext->end)
- {
- storage_nio_notify(pTask); //notify nio to deal
- }
- else
- {
- /* file read done, close it */
- close(pFileContext->fd);
- pFileContext->fd = -1;
- pFileContext->done_callback(pTask, result);
- }
- return 0;
- } while (0);
- /* file read error, close it */
- if (pFileContext->fd > 0)
- {
- close(pFileContext->fd);
- pFileContext->fd = -1;
- }
- pFileContext->done_callback(pTask, result);
- return result;
- }
代码分析:
1. 文件操作,打开文件,读取文件内容;
2.通过storage_nio_notify函数,通知nio 处理;
3.通过nio、dio直接的交互,直到正确读取、发送完文件数据,调用done_callback
我们接着分析 done_callback,也就是storage_download_file_done_callback。storage_service.c 596行:
- static void storage_download_file_done_callback(struct fast_task_info *pTask, \
- const int err_no)
- {
- StorageFileContext *pFileContext;
- TrackerHeader *pHeader;
- pFileContext = &(((StorageClientInfo *)pTask->arg)->file_context);
- if (err_no != 0)
- {
- pthread_mutex_lock(&stat_count_thread_lock);
- g_storage_stat.total_download_count++;
- g_storage_stat.total_download_bytes += \
- pFileContext->offset - pFileContext->start;
- pthread_mutex_unlock(&stat_count_thread_lock);
- if (pTask->length == sizeof(TrackerHeader)) //never response
- {
- pHeader = (TrackerHeader *)pTask->data;
- pHeader->status = err_no;
- storage_nio_notify(pTask);
- }
- else
- {
- task_finish_clean_up(pTask);
- }
- }
- else
- {
- CHECK_AND_WRITE_TO_STAT_FILE2_WITH_BYTES( \
- g_storage_stat.total_download_count, \
- g_storage_stat.success_download_count, \
- g_storage_stat.total_download_bytes, \
- g_storage_stat.success_download_bytes, \
- pFileContext->end - pFileContext->start)
- storage_nio_notify(pTask);
- }
- }
分析发现,storage_download_file_done_callback 也即是做一些文件下载结尾的动作。
OK,这里下载就完成了。和上传很是相似。通过http方式下载,我们放到后面的章节分析。
欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842
阅读(716) | 评论(0) | 转发(0) |