storage服务(service)
概述
*storage服务接收和执行客户端命令的流程如下:
(1) storage服务初始化
storage部分会创建g_work_threads(默认是3)个work_thread_entrance线程来执行客户端和trakcer端的命令。每个工作线程对应一个非阻塞管道,该管道用来和主线程进行通信。每个工作线程从该管道的读端读取任务信息。
而主线程通过storage_accept_loop函数监听客户端信息,并把任务信息写入管道,work_thread_entrance线程读取任务信息,并处理任务。
(2) storage服务读取任务信息
读取任务信息是通过storage_recv_notify_read函数来进行的,storage服务会把管道的读端添加到libevent的读事件队列中。
(3) storage的信息读取分为几个状态,开始的时候是FDFS_STORAGE_STAGE_NIO_INIT(\0)状态,该状态下处理的是数据结构的转换,并把该管道的读端再次添加到事件的写和读的监控队列中,处理函数分别是:client_sock_read和client_sock_write。随后把状态更新成FDFS_STORAGE_STAGE_NIO_RECV。处于该状态的fd端,开始处理客户端的命令数据了,处理函数是client_sock_read。在该函数中,调用storage_deal_task函数执行各种任务。
(4) 当数据读取完成后,状态变成FDFS_STORAGE_STAGE_NIO_SEND。该状态要进行善后的处理。
*storage执行各种命令的流程:
(1) 读取命令后,对命令的各种参数进行分析,保存,storage_write_to_file,并进入具体的任务处理函数中,这些函数对任务的处理函数指针进行赋值,然后调用storage_write_to_file函数。
(2) storage_write_to_file函数其实只是调用storage_dio_queue_push函数把任务结构信息保存到任务队列中。并发送一个信号,唤醒dio_thread_entrance线程并解锁,然后调用已被赋值的deal_func函数(一般是dio_write_file函数)指针处理该任务。
(3)
1, storage相关数据结构
1.1 数据结构
//storage的nio线程信息
struct storage_nio_thread_data
{
struct event_base *ev_base; //libevent base pointer
int pipe_fds[2]; //for notify nio thread to deal task
GroupArray group_array; //FastDHT group array
};
typedef struct
{
ServerArray *groups;
FDHTServerInfo *servers;
int group_count; //group count
int server_count;
FDHTServerInfo proxy_server;
bool use_proxy;
} GroupArray;
typedef struct
{
FDHTServerInfo **servers;
int count; //server count
} ServerArray;
typedef struct
{
int sock;
int port;
char ip_addr[IP_ADDRESS_SIZE];
} FDHTServerInfo;
//fast_task信息
typedef int (*TaskFinishCallBack) (struct fast_task_info *pTask);
struct fast_task_info
{
char client_ip[IP_ADDRESS_SIZE]; //客户端ip地址
struct event ev_read;
struct event ev_write;
void *arg; //extra argument pointer
char *data; //buffer for write or recv
int size; //alloc size
int length; //data length
int offset; //current offset
int req_count; //request count
struct fast_task_info *next;
TaskFinishCallBack finish_callback;
};
struct fast_task_queue
{
struct fast_task_info *head;
struct fast_task_info *tail;
pthread_mutex_t lock;
int max_connections;
int min_buff_size;
int max_buff_size;
int arg_size;
bool malloc_whole_block;
};
1.2 全局变量
//tracker和client信息队列
static struct fast_task_queue g_free_queue;
//storage service的线程数,默认是4个,可以在配置文件中设置
int g_work_threads = DEFAULT_WORK_THREADS;
//storage服务模块全局线程锁
pthread_mutex_t g_storage_thread_lock;
//storage服务线程数
int g_storage_thread_count = 0;
static int last_stat_change_count = 1; //for sync to stat file
static int64_t temp_file_sequence = 0;
//目录
static pthread_mutex_t path_index_thread_lock;
//状态线程锁
static pthread_mutex_t stat_count_thread_lock;
//工作线程数
g_work_threads
2, storage的service初始化
storage服务的初始化是在storage_service_init()函数中完成的。该初始化主要完成以下几件事:
(1) 初始化各个全局线程锁和线程属性值
(2) 初始化任务队列
(3) 创建storage_nio工作线程,默认创建4个工作线程,为每个工作线程创建一个管道,并把管道的可读fd添加到事件监控列表中。
(4) 创建service工作线程,执行的函数是work_thread_entrance()
//storage服务初始化
int storage_service_init()
{
int result;
struct storage_nio_thread_data *pThreadData;
struct storage_nio_thread_data *pDataEnd;
pthread_t tid;
pthread_attr_t thread_attr;
//初始化storage任务线程锁
if ((result=init_pthread_lock(&g_storage_thread_lock)) != 0)
{
return result;
}
//初始化路径索引线程锁
if ((result=init_pthread_lock(&path_index_thread_lock)) != 0)
{
return result;
}
//初始化状态线程锁
if ((result=init_pthread_lock(&stat_count_thread_lock)) != 0)
{
return result;
}
//初始化线程堆栈的大小为512k
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_attr fail, program exit!", __LINE__);
return result;
}
//初始化释放队列
// 其中的g_max_connections全局变量,是从配置文件的max_connections项中读取的
if ((result=free_queue_init(g_max_connections, g_buff_size, \
g_buff_size, sizeof(StorageClientInfo))) != 0)
{
return result;
}
//分配4(默认初始化)个nio_thread_data结构内存
//g_work_threads是service工作线程数,默认值是4
g_nio_thread_data = (struct storage_nio_thread_data *)malloc(sizeof( \
struct storage_nio_thread_data) * g_work_threads);
if (g_nio_thread_data == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, errno: %d, error info: %s", \
__LINE__, (int)sizeof(struct storage_nio_thread_data) * \
g_work_threads, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
g_storage_thread_count = 0;
//获取nio_thread_data结构数组的尾指针
pDataEnd = g_nio_thread_data + g_work_threads;
//遍历nio_thread_data数组,创建io的工作线程
for (pThreadData=g_nio_thread_data; pThreadData {
//创建libevent基础结构
pThreadData->ev_base = event_base_new();
if (pThreadData->ev_base == NULL)
{
result = errno != 0 ? errno : ENOMEM;
logError("file: "__FILE__", line: %d, " \
"event_base_new fail.", __LINE__);
return result;
}
//建立管道,并把两个fd保存到storage_nio_thread_data的pipe_fds变量中
if (pipe(pThreadData->pipe_fds) != 0)
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, " \
"call pipe fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
break;
}
//把管道的可读fd设置成非阻塞的
if ((result=set_nonblock(pThreadData->pipe_fds[0])) != 0)
{
break;
}
//创建工作线程
if ((result=pthread_create(&tid, &thread_attr, \
work_thread_entrance, pThreadData)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, startup threads: %d, " \
"errno: %d, error info: %s", \
__LINE__, g_storage_thread_count, \
result, STRERROR(result));
break;
}
else
{
//创建失败
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
g_storage_thread_count++;
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
}
}
//销毁线程属性值
pthread_attr_destroy(&thread_attr);
last_stat_change_count = g_stat_change_count;
//DO NOT support direct IO !!!
//g_extra_open_file_flags = g_disk_rw_direct ? O_DIRECT : 0;
return result;
}
2,tracker信息和客服端信息 队列初始化
int free_queue_init(const int max_connections, const int min_buff_size, \
const int max_buff_size, const int arg_size)
{
struct fast_task_info *pTask;
char *p;
char *pCharEnd;
int block_size;
int alloc_size;
int64_t total_size;
int result;
//初始化g_free_queue线程锁
if ((result=init_pthread_lock(&(g_free_queue.lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_lock fail, errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
//计算每个块分配的内存空间,结构如下[sizeof(fast_task_info)][sizeof(StorageClientInfo)]
block_size = sizeof(struct fast_task_info) + arg_size;
//实际要分配的内存大小:最大连接数*每块内存大小
alloc_size = block_size * max_connections;
//max_buff_size也就是g_buff_size是从配置文件的buff_size项中读取的,其默认大小是64k
//min_buff_size是sizeof(StorageClientInfo)也就是一个客服端信息结构的大小
if (max_buff_size > min_buff_size) //若g_buff_size大小大于min_buff_size大小
{
total_size = alloc_size;
g_free_queue.malloc_whole_block = false;
}
else
{
struct rlimit rlimit_data;
rlim_t max_data_size;
if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0)
{
logError("file: "__FILE__", line: %d, " \
"call getrlimit fail, " \
"errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EPERM;
}
if (rlimit_data.rlim_cur == RLIM_INFINITY) //系统没有限制内存
{
max_data_size = 512 * 1024 * 1024; //内存使用最大数设置成512M
}
else
{
max_data_size = rlimit_data.rlim_cur;
if (max_data_size > 512 * 1024 * 1024)
{
max_data_size = 512 * 1024 * 1024;
}
}
total_size = alloc_size+(int64_t)min_buff_size*max_connections;
if (total_size <= max_data_size)
{
g_free_queue.malloc_whole_block = true;
block_size += min_buff_size;
}
else
{
g_free_queue.malloc_whole_block = false;
total_size = alloc_size;
}
}
g_mpool = (struct fast_task_info *)malloc(total_size);
if (g_mpool == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc "INT64_PRINTF_FORMAT" bytes fail, " \
"errno: %d, error info: %s", \
__LINE__, total_size, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
memset(g_mpool, 0, total_size);
//分配task_info + storageclientinfo结构内存池
pCharEnd = ((char *)g_mpool) + total_size;
for (p=(char *)g_mpool; p {
pTask = (struct fast_task_info *)p;
pTask->size = min_buff_size;
pTask->arg = p + sizeof(struct fast_task_info);
if (g_free_queue.malloc_whole_block)
{
pTask->data = (char *)pTask->arg + arg_size;
}
else
{
pTask->data = (char *)malloc(pTask->size);
if (pTask->data == NULL)
{
free_queue_destroy();
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, " \
"errno: %d, error info: %s", \
__LINE__, pTask->size, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
}
}
g_free_queue.tail = (struct fast_task_info *)(pCharEnd - block_size);
for (p=(char *)g_mpool; p<(char *)g_free_queue.tail; p += block_size)
{
pTask = (struct fast_task_info *)p;
pTask->next = (struct fast_task_info *)(p + block_size);
}
g_free_queue.max_connections = max_connections;
g_free_queue.min_buff_size = min_buff_size;
g_free_queue.max_buff_size = max_buff_size;
g_free_queue.arg_size = arg_size;
g_free_queue.head = g_mpool;
g_free_queue.tail->next = NULL;
return 0;
}
3, 工作线程函数work_thread_entrance()
static void *work_thread_entrance(void* arg)
{
int result;
struct storage_nio_thread_data *pThreadData;
struct event ev_notify;
//pThreadData 指向全局变量g_nio_thread_data的每一个storage_nio_thread_data结构的起始地址
//共有g_work_threads个工作线程,所以有这么多个storage_nio_thread_data结构
pThreadData = (struct storage_nio_thread_data *)arg;
//若要检查重复文件
if (g_check_file_duplicate) {
if ((result=fdht_copy_group_array(&(pThreadData->group_array),\
&g_group_array)) != 0)
{
pthread_mutex_lock(&g_storage_thread_lock);
g_storage_thread_count--;
pthread_mutex_unlock(&g_storage_thread_lock);
return NULL;
}
}
do {
//每个工作线程分别监控属于自己的storage_nio_thread_data结构中的可读描述符:pipe_fds[0]
//通过libevent监控管道可读端是否可读,若管道可读,则使用storage_recv_notify_read函数
//读取管道里的信息,并对管道信息进行处理。
event_set(&ev_notify, pThreadData->pipe_fds[0], \
EV_READ | EV_PERSIST, storage_recv_notify_read, NULL);
if ((result=event_base_set(pThreadData->ev_base, &ev_notify)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"event_base_set fail.", __LINE__);
break;
}
//添加到libevent事件监控队列
if ((result=event_add(&ev_notify, NULL)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"event_add fail.", __LINE__);
break;
}
//若继续,再次添加到事件监控队列中
while (g_continue_flag)
{
event_base_loop(pThreadData->ev_base, 0);
}
} while (0);
event_base_free(pThreadData->ev_base);
if (g_check_file_duplicate)
{
if (g_keep_alive)
{
fdht_disconnect_all_servers(&(pThreadData->group_array));
}
fdht_free_group_array(&(pThreadData->group_array));
}
//要对全局变量操作了,加线程锁
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
//storage线程个数减1
g_storage_thread_count--;
//解锁
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
logDebug("file: "__FILE__", line: %d, " \
"nio thread exited, thread count: %d", \
__LINE__, g_storage_thread_count);
return NULL;
}
//处理g_nio_thread_data队列中storage_nio_thread_data结构的管道读描述符pipe_fds[0]的可读信息
void storage_recv_notify_read(int sock, short event, void *arg)
{
struct fast_task_info *pTask;
StorageClientInfo *pClientInfo;
long task_addr;
int64_t remain_bytes;
int bytes;
int result;
//进入死循环,不断处理客户端或tracker来的请求,并处理请求
while (1)
{
//从可读管道描述符中读取信息
if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)
{
//出错,跳出循环
if (!(errno == EAGAIN || errno == EWOULDBLOCK))
{
logError("file: "__FILE__", line: %d, " \
"call read failed, " \
"errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
}
break;
}
else if (bytes == 0) //读到的信息长度为0
{
logError("file: "__FILE__", line: %d, " \
"call read failed, end of file", __LINE__);
break;
}
//管道写入的是一个fast_task_info 结构的信息
//先把信息格式化一下
pTask = (struct fast_task_info *)task_addr;
//对应连接的客户端信息
pClientInfo = (StorageClientInfo *)pTask->arg;
//sock已经小于0了,有错误发生,直接返回,结束读取
if (pClientInfo->sock < 0) //quit flag
{
struct storage_nio_thread_data *pThreadData;
struct timeval tv;
pThreadData = g_nio_thread_data + \
pClientInfo->nio_thread_index;
tv.tv_sec = 1;
tv.tv_usec = 0;
event_base_loopexit(pThreadData->ev_base, &tv);
return;
}
/* //logInfo("=====thread index: %d, pClientInfo->sock=%d", \
pClientInfo->nio_thread_index, pClientInfo->sock);
*/
//根据客户端的状态进行处理
switch (pClientInfo->stage)
{
//初始化状态,此状态是由在storage_accept_loop函数中和客户端建立tcp连接后,初始化的。
case FDFS_STORAGE_STAGE_NIO_INIT:
result = storage_nio_init(pTask);
break;
//完成FDFS_STORAGE_STAGE_NIO_INIT这个阶段后,进入接收阶段
case FDFS_STORAGE_STAGE_NIO_RECV:
//把目前的偏移量设置为0
pTask->offset = 0;
//接收的总长度是total_length-total_offset总长度减去总偏移量
remain_bytes = pClientInfo->total_length - \
pClientInfo->total_offset;
//pTask->length 是数据长度,为剩余的字节数和pTask->size中的两者中最大的数据
if (remain_bytes > pTask->size)
{
pTask->length = pTask->size;
}
else
{
pTask->length = remain_bytes;
}
//从pClientInfo->sock中读取数据并处理相应的命令
client_sock_read(pClientInfo->sock, EV_READ, pTask);
result = 0;
/*
if ((result=event_add(&pTask->ev_read, \
&g_network_tv)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"event_add fail.", __LINE__);
}
*/
break;
case FDFS_STORAGE_STAGE_NIO_SEND:
result = storage_send_add_event(pTask);
break;
default:
logError("file: "__FILE__", line: %d, " \
"invalid stage: %d", __LINE__, \
pClientInfo->stage);
result = EINVAL;
break;
}
if (result != 0)
{
task_finish_clean_up(pTask);
}
}
}
//storage_nio_init函数
该初始化函数主要是把客户端和storage服务端的已建立成功的socket描述符添加到事件监控队列中。
用来接收并处理客户端的请求。并设置读、写事件处理函数。
static int storage_nio_init(struct fast_task_info *pTask)
{
int result;
StorageClientInfo *pClientInfo;
struct storage_nio_thread_data *pThreadData;
pClientInfo = (StorageClientInfo *)pTask->arg;
pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;
//把客户端和storage服务端建立成功的socket添加到libevent的读和写事件监控队列中。
//并设置读和写事件的处理函数。
event_set(&pTask->ev_read, pClientInfo->sock, EV_READ, \
client_sock_read, pTask);
if ((result=event_base_set(pThreadData->ev_base, &pTask->ev_read)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"event_base_set fail.", __LINE__);
return result;
}
//向pClientInfo->sock写数据事件,写事件的处理函数是client_sock_write
event_set(&pTask->ev_write, pClientInfo->sock, EV_WRITE, \
client_sock_write, pTask);
if ((result=event_base_set(pThreadData->ev_base, \
&pTask->ev_write)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"event_base_set fail.", __LINE__);
return result;
}
//修改客户端的stage变量的状态为RECV
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
if ((result=event_add(&pTask->ev_read, &g_network_tv)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"event_add fail.", __LINE__);
return result;
}
return 0;
}
//读客户端信息,并进行相应处理
static void client_sock_read(int sock, short event, void *arg)
{
int bytes;
int recv_bytes;
struct fast_task_info *pTask;
StorageClientInfo *pClientInfo;
pTask = (struct fast_task_info *)arg;
pClientInfo = (StorageClientInfo *)pTask->arg;
//读数据超时,触发超时事件
//若超时直接返回,由
if (event == EV_TIMEOUT)
{
if (pClientInfo->total_offset == 0 && pTask->req_count > 0)
{
if (event_add(&pTask->ev_read, &g_network_tv) != 0)
{
task_finish_clean_up(pTask);
logError("file: "__FILE__", line: %d, " \
"event_add fail.", __LINE__);
}
}
else
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, recv timeout, " \
"recv offset: %d, expect length: %d", \
__LINE__, pTask->client_ip, \
pTask->offset, pTask->length);
task_finish_clean_up(pTask);
}
return;
}
//死循环
while (1) {
//若total_length 为0,代表此次发送的是头数据
if (pClientInfo->total_length == 0) //recv header
{
//若发送的是协议头,接收的字节数为,traker头减去偏移量
recv_bytes = sizeof(TrackerHeader) - pTask->offset;
} else {
//接收的是数据,接收长度是length-offset
recv_bytes = pTask->length - pTask->offset;
}
/*
logInfo("total_length="INT64_PRINTF_FORMAT", recv_bytes=%d, "
"pTask->length=%d, pTask->offset=%d",
pClientInfo->total_length, recv_bytes,
pTask->length, pTask->offset);
*/
bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
//recv到的长度小于0,表示有错误发生
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if(event_add(&pTask->ev_read, &g_network_tv)!=0) {
task_finish_clean_up(pTask);
logError("file: "__FILE__", line: %d, "\
"event_add fail.", __LINE__);
}
} else {
logError("file: "__FILE__", line: %d, " \
"client ip: %s, recv failed, " \
"errno: %d, error info: %s", \
__LINE__, pTask->client_ip, \
errno, STRERROR(errno));
task_finish_clean_up(pTask);
}
//接收时发生错误,直接返回,重新初始化
return;
}
else if (bytes == 0)
{
logDebug("file: "__FILE__", line: %d, " \
"client ip: %s, recv failed, " \
"connection disconnected.", \
__LINE__, pTask->client_ip);
task_finish_clean_up(pTask);
return;
}
//total_length 为0,表示读取的是数据包头
if (pClientInfo->total_length == 0) //header
{
if (pTask->offset + bytes < sizeof(TrackerHeader))
{
if (event_add(&pTask->ev_read, &g_network_tv)!=0)
{
task_finish_clean_up(pTask);
logError("file: "__FILE__", line: %d, "\
"event_add fail.", __LINE__);
}
pTask->offset += bytes;
return;
}
pClientInfo->total_length=buff2long(((TrackerHeader *) \
pTask->data)->pkg_len);
if (pClientInfo->total_length < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, pkg length: " \
INT64_PRINTF_FORMAT" < 0", \
__LINE__, pTask->client_ip, \
pClientInfo->total_length);
task_finish_clean_up(pTask);
return;
}
pClientInfo->total_length += sizeof(TrackerHeader);
if (pClientInfo->total_length > pTask->size)
{
pTask->length = pTask->size;
}
else
{
pTask->length = pClientInfo->total_length;
}
}
pTask->offset += bytes;
if (pTask->offset >= pTask->length) //recv current pkg done
{
if (pClientInfo->total_offset + pTask->length >= \
pClientInfo->total_length)
{
/* current req recv done */
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
pTask->req_count++;
}
if (pClientInfo->total_offset == 0)
{
pClientInfo->total_offset = pTask->length;
storage_deal_task(pTask);
}
else
{
pClientInfo->total_offset += pTask->length;
/* continue write to file */
storage_dio_queue_push(pTask);
}
return;
}
}
return;
}
//处理客户端命令
int storage_deal_task(struct fast_task_info *pTask)
{
TrackerHeader *pHeader;
StorageClientInfo *pClientInfo;
int result;
pClientInfo = (StorageClientInfo *)pTask->arg;
pHeader = (TrackerHeader *)pTask->data;
//解析请求命令
switch(pHeader->cmd)
{
case STORAGE_PROTO_CMD_DOWNLOAD_FILE: //下载文件
result = storage_server_download_file(pTask);
break;
case STORAGE_PROTO_CMD_GET_METADATA: //获取元数据
result = storage_server_get_metadata(pTask);
break;
case STORAGE_PROTO_CMD_UPLOAD_FILE: //上传文件
result = storage_upload_file(pTask, false);
break;
case STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE: //上传文件并添加到文件末端
result = storage_upload_file(pTask, true);
break;
case STORAGE_PROTO_CMD_APPEND_FILE:
result = storage_append_file(pTask);
break;
case STORAGE_PROTO_CMD_MODIFY_FILE: //修改文件
result = storage_modify_file(pTask);
break;
case STORAGE_PROTO_CMD_TRUNCATE_FILE: //截断文件内容
result = storage_do_truncate_file(pTask);
break;
case STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE:
result = storage_upload_slave_file(pTask);
break;
case STORAGE_PROTO_CMD_DELETE_FILE: //删除文件
result = storage_server_delete_file(pTask);
break;
case STORAGE_PROTO_CMD_CREATE_LINK: //创建连接
result = storage_create_link(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_CREATE_FILE:
result = storage_sync_copy_file(pTask, pHeader->cmd);
break;
case STORAGE_PROTO_CMD_SYNC_DELETE_FILE: //删除文件同步
result = storage_sync_delete_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_UPDATE_FILE:
result = storage_sync_copy_file(pTask, pHeader->cmd);
break;
case STORAGE_PROTO_CMD_SYNC_APPEND_FILE:
result = storage_sync_append_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_MODIFY_FILE:
result = storage_sync_modify_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE:
result = storage_sync_truncate_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_CREATE_LINK:
result = storage_sync_link_file(pTask);
break;
case STORAGE_PROTO_CMD_SET_METADATA:
result = storage_server_set_metadata(pTask);
break;
case STORAGE_PROTO_CMD_QUERY_FILE_INFO:
result = storage_server_query_file_info(pTask);
break;
case STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG:
result = storage_server_fetch_one_path_binlog(pTask);
break;
case FDFS_PROTO_CMD_QUIT:
task_finish_clean_up(pTask);
return 0;
case FDFS_PROTO_CMD_ACTIVE_TEST:
result = storage_deal_active_test(pTask);
break;
case STORAGE_PROTO_CMD_REPORT_CLIENT_IP:
result = storage_server_report_client_ip(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE:
result = storage_server_trunk_alloc_space(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_ALLOC_CONFIRM:
result = storage_server_trunk_alloc_confirm(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_FREE_SPACE:
result = storage_server_trunk_free_space(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_SYNC_BINLOG:
result = storage_server_trunk_sync_binlog(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_GET_BINLOG_SIZE:
result = storage_server_trunk_get_binlog_size(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_DELETE_BINLOG_MARKS:
result = storage_server_trunk_delete_binlog_marks(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE:
result = storage_server_trunk_truncate_binlog_file(pTask);
break;
default:
logError("file: "__FILE__", line: %d, " \
"client ip: %s, unkown cmd: %d", \
__LINE__, pTask->client_ip, \
pHeader->cmd);
result = EINVAL;
break;
}
if (result != STORAGE_STATUE_DEAL_FILE)
{
pClientInfo->total_offset = 0;
pTask->length = pClientInfo->total_length;
pHeader = (TrackerHeader *)pTask->data;
pHeader->status = result;
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
long2buff(pClientInfo->total_length - sizeof(TrackerHeader), \
pHeader->pkg_len);
storage_send_add_event(pTask);
}
return result;
}
//接收到任务后的最终会调用该函数对任务进行处理
static int storage_write_to_file(struct fast_task_info *pTask, \
const int64_t file_offset, const int64_t upload_bytes, \
const int buff_offset, TaskDealFunc deal_func, \
FileDealDoneCallback done_callback, \
DisconnectCleanFunc clean_func, const int store_path_index)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
int result;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
pClientInfo->deal_func = deal_func;
pClientInfo->clean_func = clean_func;
pFileContext->fd = -1;
pFileContext->buff_offset = buff_offset;
pFileContext->offset = file_offset;
pFileContext->start = file_offset;
pFileContext->end = file_offset + upload_bytes;
pFileContext->dio_thread_index = storage_dio_get_thread_index( \
pTask, store_path_index, pFileContext->op);
pFileContext->done_callback = done_callback;
if (pFileContext->calc_crc32)
{
pFileContext->crc32 = CRC32_XINIT;
}
if (pFileContext->calc_file_hash)
{
INIT_HASH_CODES4(pFileContext->file_hash_codes)
}
//把任务放到io任务队列中,并发送信号,通知io处理函数进行处理。
//io队列接收到通知后,会调用io处理函数进行处理。
if ((result=storage_dio_queue_push(pTask)) != 0)
{
pClientInfo->total_length = sizeof(TrackerHeader);
return result;
}
return STORAGE_STATUE_DEAL_FILE;
}
/**
1 byte: store path index
8 bytes: file size
FDFS_FILE_EXT_NAME_MAX_LEN bytes: file ext name, do not include dot (.)
file size bytes: file content
上传文件
**/
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
DisconnectCleanFunc clean_func;
char *p;
char filename[128];
char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN + 1];
int64_t nInPackLen;
int64_t file_offset;
int64_t file_bytes;
int crc32;
int store_path_index;
int result;
int filename_len;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE +
FDFS_FILE_EXT_NAME_MAX_LEN)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
INT64_PRINTF_FORMAT" is not correct, " \
"expect length >= %d", __LINE__, \
STORAGE_PROTO_CMD_UPLOAD_FILE, \
pTask->client_ip, nInPackLen, \
1 + FDFS_PROTO_PKG_LEN_SIZE + \
FDFS_FILE_EXT_NAME_MAX_LEN);
pClientInfo->total_length = sizeof(TrackerHeader);
return EINVAL;
}
//文件路径索引号
p = pTask->data + sizeof(TrackerHeader);
store_path_index = *p++;
if (store_path_index < 0 || store_path_index >= g_fdfs_path_count)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, store_path_index: %d " \
"is invalid", __LINE__, \
pTask->client_ip, store_path_index);
pClientInfo->total_length = sizeof(TrackerHeader);
return EINVAL;
}
//获取文件大小
file_bytes = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
if (file_bytes < 0 || file_bytes != nInPackLen - \
(1 + FDFS_PROTO_PKG_LEN_SIZE + \
FDFS_FILE_EXT_NAME_MAX_LEN))
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, pkg length is not correct, " \
"invalid file bytes: "INT64_PRINTF_FORMAT \
", total body length: "INT64_PRINTF_FORMAT, \
__LINE__, pTask->client_ip, file_bytes, nInPackLen);
pClientInfo->total_length = sizeof(TrackerHeader);
return EINVAL;
}
//获取文件名
memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
*(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
p += FDFS_FILE_EXT_NAME_MAX_LEN;
pFileContext->calc_crc32 = true;
pFileContext->calc_file_hash = g_check_file_duplicate;
pFileContext->extra_info.upload.start_time = time(NULL);
strcpy(pFileContext->extra_info.upload.file_ext_name, file_ext_name);
storage_format_ext_name(file_ext_name, \
pFileContext->extra_info.upload.formatted_ext_name);
pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR;
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
if (bAppenderFile)
{
pFileContext->extra_info.upload.file_type |= \
_FILE_TYPE_APPENDER;
}
else
{
if (g_if_use_trunk_file && trunk_check_size( \
TRUNK_CALC_SIZE(file_bytes)))
{
pFileContext->extra_info.upload.file_type |= \
_FILE_TYPE_TRUNK;
}
}
//若使用trunk文件保存
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
{
FDFSTrunkFullInfo *pTrunkInfo;
pFileContext->extra_info.upload.if_sub_path_alloced = true;
pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info);
//为trunk文件名分配空间,并添加到缓存中
if ((result=trunk_client_trunk_alloc_space( \
TRUNK_CALC_SIZE(file_bytes), pTrunkInfo)) != 0)
{
pClientInfo->total_length = sizeof(TrackerHeader);
return result;
}
clean_func = dio_trunk_write_finish_clean_up;
file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo));
pFileContext->extra_info.upload.if_gen_filename = true;
trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \
sizeof(pFileContext->filename));
//初始化trunk文件,并对trunk文件做初始化,trunk文件一般是
pFileContext->extra_info.upload.before_open_callback = \
dio_check_trunk_file;
pFileContext->extra_info.upload.before_close_callback = \
dio_write_chunk_header;
pFileContext->open_flags = O_RDWR | g_extra_open_file_flags;
}
else
{
if (g_path_free_mbs[store_path_index] - (file_bytes/FDFS_ONE_MB)
<= g_avg_storage_reserved_mb)
{
logError("file: "__FILE__", line: %d, " \
"no space to upload file, "
"free space: %d MB is too small, file bytes: " \
INT64_PRINTF_FORMAT", reserved space: %d MB", \
__LINE__, g_path_free_mbs[store_path_index], \
file_bytes, g_avg_storage_reserved_mb);
pClientInfo->total_length = sizeof(TrackerHeader);
return ENOSPC;
}
crc32 = rand();
*filename = '\0';
filename_len = 0;
pFileContext->extra_info.upload.if_sub_path_alloced = false;
pFileContext->extra_info.upload.trunk_info.path. \
store_path_index = store_path_index;
if ((result=storage_get_filename(pClientInfo, \
pFileContext->extra_info.upload.start_time, \
file_bytes, crc32, pFileContext->extra_info.upload.\
formatted_ext_name, filename, &filename_len, \
pFileContext->filename)) != 0)
{
pClientInfo->total_length = sizeof(TrackerHeader);
return result;
}
clean_func = dio_write_finish_clean_up;
file_offset = 0;
pFileContext->extra_info.upload.if_gen_filename = true;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->open_flags = O_WRONLY | O_CREAT | O_TRUNC \
| g_extra_open_file_flags;
}
return storage_write_to_file(pTask, file_offset, file_bytes, \
p - pTask->data, dio_write_file, \
storage_upload_file_done_callback, \
clean_func, store_path_index);
}
/**
pkg format:
Header
8 bytes: file offset
8 bytes: download file bytes
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
filename
传输的数据包格式:
值:
|tracker header|file offset|download file bytes|group_name|
长度:
|tracker Header|8B|8B|FDFS_GROUP_NAME_MAX_LEN|
**/
//从storage服务器下载文件
static int storage_server_download_file(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
char *p;
int result;
int store_path_index;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char true_filename[128];
char *filename;
int filename_len;
int64_t file_offset;
int64_t download_bytes;
int64_t file_bytes;
struct stat stat_buf;
int64_t nInPackLen;
FDFSTrunkFullInfo trunkInfo;
FDFSTrunkHeader trunkHeader;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
//减去trackerheader的长度,得到数据包长度
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
pClientInfo->total_length = sizeof(TrackerHeader);
//包中实际的数据长度错误
if (nInPackLen <= 16 + FDFS_GROUP_NAME_MAX_LEN)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
INT64_PRINTF_FORMAT" is not correct, " \
"expect length > %d", __LINE__, \
STORAGE_PROTO_CMD_UPLOAD_FILE, \
pTask->client_ip, \
nInPackLen, 16 + FDFS_GROUP_NAME_MAX_LEN);
return EINVAL;
}
//数据包长度错误
if (nInPackLen >= pTask->size)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
INT64_PRINTF_FORMAT" is too large, " \
"expect length should < %d", __LINE__, \
STORAGE_PROTO_CMD_UPLOAD_FILE, \
pTask->client_ip, \
nInPackLen, pTask->size);
return EINVAL;
}
//获得实际数据的起始地址
p = pTask->data + sizeof(TrackerHeader);
//得到文件偏移量
file_offset = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
//下一个字段是下载文件的bytes数
download_bytes = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
//若文件偏移量小于0,错误,直接返回
if (file_offset < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, invalid file offset: " \
INT64_PRINTF_FORMAT, __LINE__, \
pTask->client_ip, file_offset);
return EINVAL;
}
//若下载的文件大小小于0,错误
if (download_bytes < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, invalid download file bytes: " \
INT64_PRINTF_FORMAT, __LINE__, \
pTask->client_ip, download_bytes);
return EINVAL;
}
//下一个字段是组名
memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN);
*(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0';
p += FDFS_GROUP_NAME_MAX_LEN;
//和客户端发送的group命比较,是否相等,若不等说明连接错误,直接返回
if (strcmp(group_name, g_group_name) != 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, group_name: %s " \
"not correct, should be: %s", \
__LINE__, pTask->client_ip, \
group_name, g_group_name);
return EINVAL;
}
//获取到文件名
filename = p;
filename_len = nInPackLen - (16 + FDFS_GROUP_NAME_MAX_LEN);
*(filename + filename_len) = '\0';
//到此,文件名和下载量都已知道了
//分解文件名,并获取真正的可以直接读取的文件名
if ((result=storage_split_filename_ex(filename, \
&filename_len, true_filename, &store_path_index)) != 0)
{
return result;
}
//检查文件名字符串是否正确
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}
pFileContext->fd = -1;
result = trunk_file_stat_ex(store_path_index, \
true_filename, filename_len, &stat_buf, \
&trunkInfo, &trunkHeader, &pFileContext->fd);
if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
pthread_mutex_lock(&stat_count_thread_lock);
g_storage_stat.total_file_open_count++;
pthread_mutex_unlock(&stat_count_thread_lock);
}
if (result == 0)
{
if (!S_ISREG(stat_buf.st_mode))
{
logError("file: "__FILE__", line: %d, " \
"logic file %s is not a regular file", \
__LINE__, filename);
return EISDIR;
}
file_bytes = stat_buf.st_size;
}
else
{
file_bytes = 0;
logError("file: "__FILE__", line: %d, " \
"call stat fail, logic file: %s, "\
"error no: %d, error info: %s", \
__LINE__, filename, result, STRERROR(result));
return result;
}
if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
pthread_mutex_lock(&stat_count_thread_lock);
g_storage_stat.success_file_open_count++;
pthread_mutex_unlock(&stat_count_thread_lock);
}
//检查下载文件的字节数是否正确
if (download_bytes == 0)
{
download_bytes = file_bytes - file_offset;
}
else if (download_bytes > file_bytes - file_offset)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, invalid download file bytes: " \
INT64_PRINTF_FORMAT" > file remain bytes: " \
INT64_PRINTF_FORMAT, __LINE__, \
pTask->client_ip, download_bytes, \
file_bytes - file_offset);
if (pFileContext->fd >= 0)
{
close(pFileContext->fd);
}
return EINVAL;
}
if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
trunk_get_full_filename((&trunkInfo), pFileContext->filename, \
sizeof(pFileContext->filename));
file_offset += TRUNK_FILE_START_OFFSET(trunkInfo);
}
else
{
sprintf(pFileContext->filename, "%s/data/%s", \
g_fdfs_store_paths[store_path_index], true_filename);
}
return storage_read_from_file(pTask, file_offset, download_bytes, \
storage_download_file_done_callback, store_path_index);
}