有了tracker 分析的基础,我们直接进入storage 的任务处理函数 int storage_deal_task(struct fast_task_info *pTask);
storage_service.c 6473行:
- case STORAGE_PROTO_CMD_UPLOAD_FILE:
- result = storage_upload_file(pTask, false);
- break;
如果命令是上传文件,会调用storage_upload_file函数处理,我们进入这个函数。
storage_service.c 3735行,是storage_upload_file 的入口,
storage_service.c 3771行 :
- p = pTask->data + sizeof(TrackerHeader);
- store_path_index = *p++;
- if (store_path_index < 0 || store_path_index >= g_fdfs_path_count)
- {
- logError("file: "__FILE__", line: %d, " \
- "client ip: %s, store_path_index: %d " \
- "is invalid", __LINE__, \
- pTask->client_ip, store_path_index);
- pClientInfo->total_length = sizeof(TrackerHeader);
- return EINVAL;
- }
- file_bytes = buff2long(p);
- p += FDFS_PROTO_PKG_LEN_SIZE;
- if (file_bytes < 0 || file_bytes != nInPackLen - \
- (1 + FDFS_PROTO_PKG_LEN_SIZE + \
- FDFS_FILE_EXT_NAME_MAX_LEN))
- {
- logError("file: "__FILE__", line: %d, " \
- "client ip: %s, pkg length is not correct, " \
- "invalid file bytes: "INT64_PRINTF_FORMAT \
- ", total body length: "INT64_PRINTF_FORMAT, \
- __LINE__, pTask->client_ip, file_bytes, nInPackLen);
- pClientInfo->total_length = sizeof(TrackerHeader);
- return EINVAL;
- }
- memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
- *(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
- p += FDFS_FILE_EXT_NAME_MAX_LEN;
- pFileContext->calc_crc32 = true;
- pFileContext->calc_file_hash = g_check_file_duplicate;
- pFileContext->extra_info.upload.start_time = time(NULL);
代码分析:
1. 首先解析出store_path_index;
2. 解析出file_bytes;
3. 解析出file_ext_name;
4. 将相关的属性赋值给pFileContext
storage_service.c 3810行 :
- pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR;
- pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
- pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
- pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
- if (bAppenderFile)
- {
- pFileContext->extra_info.upload.file_type |= \
- _FILE_TYPE_APPENDER;
- }
- else
- {
- if (g_if_use_trunk_file && trunk_check_size( \
- TRUNK_CALC_SIZE(file_bytes)))
- {
- pFileContext->extra_info.upload.file_type |= \
- _FILE_TYPE_TRUNK;
- }
- }
注意,这里把pFileContext->op 置为 FDFS_STORAGE_FILE_OP_WRITE;说明要执行的是写操作。
为了简化分析,我们暂时不考虑trunk 这种方式,以后会单独详细分析。
接着往下执行,storage_service.c 3893行 :
- return storage_write_to_file(pTask, file_offset, file_bytes, \
- p - pTask->data, dio_write_file, \
- storage_upload_file_done_callback, \
- clean_func, store_path_index);
开始真正的写文件操作了。
我们进入storage_write_to_file,storage_service.c 5567行 :
- static int storage_write_to_file(struct fast_task_info *pTask, \
- const int64_t file_offset, const int64_t upload_bytes, \
- const int buff_offset, TaskDealFunc deal_func, \
- FileDealDoneCallback done_callback, \
- DisconnectCleanFunc clean_func, const int store_path_index)
- {
- StorageClientInfo *pClientInfo;
- StorageFileContext *pFileContext;
- int result;
- pClientInfo = (StorageClientInfo *)pTask->arg;
- pFileContext = &(pClientInfo->file_context);
- pClientInfo->deal_func = deal_func;
- pClientInfo->clean_func = clean_func;
- pFileContext->fd = -1;
- pFileContext->buff_offset = buff_offset;
- pFileContext->offset = file_offset;
- pFileContext->start = file_offset;
- pFileContext->end = file_offset + upload_bytes;
- pFileContext->dio_thread_index = storage_dio_get_thread_index( \
- pTask, store_path_index, pFileContext->op);
- pFileContext->done_callback = done_callback;
- if (pFileContext->calc_crc32)
- {
- pFileContext->crc32 = CRC32_XINIT;
- }
- if (pFileContext->calc_file_hash)
- {
- INIT_HASH_CODES4(pFileContext->file_hash_codes)
- }
- if ((result=storage_dio_queue_push(pTask)) != 0)
- {
- pClientInfo->total_length = sizeof(TrackerHeader);
- return result;
- }
- return STORAGE_STATUE_DEAL_FILE;
- }
代码分析:
1. 注意参数的回调函数,这里含义是写动作执行完成后,会主动调用;
2.storage_dio_queue_push 函数,把pTask push 到dio 队列中;
3. 前面在分析storage 入口函数的时候,在main函数init 时,已经完成了对dio 线程的初始化动作;
4. 这里是往dio队列中push pTask;
5. storage_dio_queue_push 内部 使用的task_queue_push,同时,pthread_cond_signal 通知队列另端有数据到来。
6.特别需要注意 pClientInfo->deal_func = deal_func; 下面会用到。
既然已经放入队列,那么,另端就应该从队列中取出任务执行。这里,我们看一下dio线程执行函数。
storage_dio.c 646行:
- static void *dio_thread_entrance(void* arg)
- {
- int result;
- struct storage_dio_context *pContext;
- struct fast_task_info *pTask;
- pContext = (struct storage_dio_context *)arg;
- pthread_mutex_lock(&(pContext->lock));
- while (g_continue_flag)
- {
- if ((result=pthread_cond_wait(&(pContext->cond), \
- &(pContext->lock))) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "call pthread_cond_wait fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- }
- while ((pTask=task_queue_pop(&(pContext->queue))) != NULL)
- {
- ((StorageClientInfo *)pTask->arg)->deal_func(pTask);
- }
- }
- pthread_mutex_unlock(&(pContext->lock));
- if ((result=pthread_mutex_lock(&g_dio_thread_lock)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "call pthread_mutex_lock fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- }
- g_dio_thread_count--;
- if ((result=pthread_mutex_unlock(&g_dio_thread_lock)) != 0)
- {
- logError("file: "__FILE__", line: %d, " \
- "call pthread_mutex_lock fail, " \
- "errno: %d, error info: %s", \
- __LINE__, result, STRERROR(result));
- }
- logDebug("file: "__FILE__", line: %d, " \
- "dio thread exited, thread count: %d", \
- __LINE__, g_dio_thread_count);
- return NULL;
- }
代码分析:
1. 当队列有数据,取出数据,执行之;
2. ((StorageClientInfo *)pTask->arg)->deal_func(pTask); 这里通过回调函数完成任务的处理。
结合前面的分析,deal_func 函数指针,其实是函数 dio_write_file;
我们接着分析 dio_write_file,这个函数在 storage_dio.c 421行。
storage_dio.c 454行:
- if (write(pFileContext->fd, pDataBuff, write_bytes) != write_bytes)
- {
- result = errno != 0 ? errno : EIO;
- logError("file: "__FILE__", line: %d, " \
- "write to file: %s fail, fd=%d, write_bytes=%d, " \
- "errno: %d, error info: %s", \
- __LINE__, pFileContext->filename, \
- pFileContext->fd, write_bytes, \
- result, STRERROR(result));
- }
调用write 写数据了。
接着分析,
storage_dio.c 478行:
- if (pFileContext->calc_crc32)
- {
- pFileContext->crc32 = CRC32_ex(pDataBuff, write_bytes, \
- pFileContext->crc32);
- }
storage 通过crc32 保证数据的完整性。
接着分析, storage_dio.c 495行:
- pFileContext->offset += write_bytes;
- if (pFileContext->offset < pFileContext->end)
- {
- pFileContext->buff_offset = 0;
- storage_nio_notify(pTask); //notify nio to deal
- }
- else
- {
- if (pFileContext->calc_crc32)
- {
- pFileContext->crc32 = CRC32_FINAL( \
- pFileContext->crc32);
- }
- if (pFileContext->calc_file_hash)
- {
- FINISH_HASH_CODES4(pFileContext->file_hash_codes)
- }
- if (pFileContext->extra_info.upload.before_close_callback != NULL)
- {
- result = pFileContext->extra_info.upload. \
- before_close_callback(pTask);
- }
- /* file write done, close it */
- close(pFileContext->fd);
- pFileContext->fd = -1;
- if (pFileContext->done_callback != NULL)
- {
- pFileContext->done_callback(pTask, result);
- }
- }
代码分析:
1. 如果文件数据长度没有接收完,会storage_nio_notify 继续从nio(网络io) 读取文件内容;
2. 写文件完成后,得到文件 crc32 的值;
3. 执行回调函数,pFileContext->done_callback(pTask, result);
4. 而这个done_callback 实际上是 storage_upload_file_done_callback。
接着分析storage_upload_file_done_callback, storage_dio.c 1004行:
- static void storage_upload_file_done_callback(struct fast_task_info *pTask, \
- const int err_no)
接着分析, 先跳过trunk文件的处理,trunk 后面分析,storage_dio.c 1029行:
- if (result == 0)
- {
- result = storage_service_upload_file_done(pTask);
- if (result == 0)
- {
- if (pFileContext->create_flag & STORAGE_CREATE_FLAG_FILE)
- {
- result = storage_binlog_write(\
- pFileContext->timestamp2log, \
- STORAGE_OP_TYPE_SOURCE_CREATE_FILE, \
- pFileContext->fname2log);
- }
- }
- }
代码分析:
1. 调用storage_service_upload_file_done,完成接收文件后的操作。目前暂时先不展开分析,因为这里涉及到slave、link、fastdht 等机制,后面专门详细介绍。
2. 文件上传结束后,会调用 storage_binlog_write 写入binlog。
接着往下分析, storage_dio.c 1085行:
- pHeader = (TrackerHeader *)pTask->data;
- pHeader->status = result;
- pHeader->cmd = STORAGE_PROTO_CMD_RESP;
- long2buff(pClientInfo->total_length - sizeof(TrackerHeader), \
- pHeader->pkg_len);
- storage_nio_notify(pTask);
构造应答的数据包,storage_nio_notify发送至客户端。
至此,我们简要分析了storage 接收文件的处理流程。大概的调用脉络上还是清晰的。像slave、trunk、link、fastdht 等机制,比较复杂,后面慢慢分析。本章节主要解释的就是storage对文件上传的大概处理流程。
欢迎感兴趣的朋友一起交流研究,提出意见。
FastDFS技术交流群:164684842
阅读(876) | 评论(0) | 转发(0) |