Chinaunix首页 | 论坛 | 博客
  • 博客访问: 838470
  • 博文数量: 91
  • 博客积分: 2544
  • 博客等级: 少校
  • 技术积分: 1885
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-12 09:08
文章存档

2016年(10)

2014年(2)

2013年(4)

2012年(23)

2011年(23)

2010年(13)

2009年(14)

2007年(2)

分类: LINUX

2012-08-28 07:13:48


storage服务(service)

概述

*storage服务接收和执行客户端命令的流程如下:
(1) storage服务初始化
storage部分会创建g_work_threads(默认是3)个work_thread_entrance线程来执行客户端和trakcer端的命令。每个工作线程对应一个非阻塞管道,该管道用来和主线程进行通信。每个工作线程从该管道的读端读取任务信息。
而主线程通过storage_accept_loop函数监听客户端信息,并把任务信息写入管道,work_thread_entrance线程读取任务信息,并处理任务。
(2) storage服务读取任务信息
读取任务信息是通过storage_recv_notify_read函数来进行的,storage服务会把管道的读端添加到libevent的读事件队列中。
(3) storage的信息读取分为几个状态,开始的时候是FDFS_STORAGE_STAGE_NIO_INIT(\0)状态,该状态下处理的是数据结构的转换,并把该管道的读端再次添加到事件的写和读的监控队列中,处理函数分别是:client_sock_read和client_sock_write。随后把状态更新成FDFS_STORAGE_STAGE_NIO_RECV。处于该状态的fd端,开始处理客户端的命令数据了,处理函数是client_sock_read。在该函数中,调用storage_deal_task函数执行各种任务。
(4) 当数据读取完成后,状态变成FDFS_STORAGE_STAGE_NIO_SEND。该状态要进行善后的处理。

*storage执行各种命令的流程:
(1) 读取命令后,对命令的各种参数进行分析,保存,storage_write_to_file,并进入具体的任务处理函数中,这些函数对任务的处理函数指针进行赋值,然后调用storage_write_to_file函数。
(2) storage_write_to_file函数其实只是调用storage_dio_queue_push函数把任务结构信息保存到任务队列中。并发送一个信号,唤醒dio_thread_entrance线程并解锁,然后调用已被赋值的deal_func函数(一般是dio_write_file函数)指针处理该任务。
(3) 


1, storage相关数据结构
1.1 数据结构
//storage的nio线程信息
struct storage_nio_thread_data
{
        struct event_base *ev_base;  //libevent base pointer
        int pipe_fds[2];   //for notify nio thread to deal task
    GroupArray group_array;  //FastDHT group array
};

typedef struct
{
    ServerArray *groups;
    FDHTServerInfo *servers;
    int group_count;  //group count
    int server_count;
    FDHTServerInfo proxy_server;
    bool use_proxy;
} GroupArray;

typedef struct
{
    FDHTServerInfo **servers;
    int count;  //server count
} ServerArray;

typedef struct
{
    int sock;
    int port;
    char ip_addr[IP_ADDRESS_SIZE];
} FDHTServerInfo;

//fast_task信息
typedef int (*TaskFinishCallBack) (struct fast_task_info *pTask);
            
struct fast_task_info
{       
    char client_ip[IP_ADDRESS_SIZE];      //客户端ip地址
    struct event ev_read;
    struct event ev_write;
    void *arg;          //extra argument pointer
    char *data;     //buffer for write or recv
    int size;           //alloc size
    int length;     //data length
    int offset;     //current offset
    int req_count;     //request count
    struct fast_task_info *next;
    TaskFinishCallBack finish_callback;
};          

struct fast_task_queue
{
struct fast_task_info *head;
struct fast_task_info *tail;
pthread_mutex_t lock;
int max_connections;
int min_buff_size;
int max_buff_size;
int arg_size;
bool malloc_whole_block;
};

1.2 全局变量

//tracker和client信息队列
static struct fast_task_queue g_free_queue;


//storage service的线程数,默认是4个,可以在配置文件中设置
int g_work_threads = DEFAULT_WORK_THREADS;

//storage服务模块全局线程锁
pthread_mutex_t g_storage_thread_lock;
//storage服务线程数
int g_storage_thread_count = 0; 
static int last_stat_change_count = 1;  //for sync to stat file
static int64_t temp_file_sequence = 0; 
//目录
static pthread_mutex_t path_index_thread_lock;
//状态线程锁
static pthread_mutex_t stat_count_thread_lock;

//工作线程数
g_work_threads

2, storage的service初始化
storage服务的初始化是在storage_service_init()函数中完成的。该初始化主要完成以下几件事:
(1) 初始化各个全局线程锁和线程属性值
(2) 初始化任务队列
(3) 创建storage_nio工作线程,默认创建4个工作线程,为每个工作线程创建一个管道,并把管道的可读fd添加到事件监控列表中。
(4) 创建service工作线程,执行的函数是work_thread_entrance()

//storage服务初始化
int storage_service_init()
{
int result;
struct storage_nio_thread_data *pThreadData;
struct storage_nio_thread_data *pDataEnd;
pthread_t tid;
pthread_attr_t thread_attr;

        //初始化storage任务线程锁
if ((result=init_pthread_lock(&g_storage_thread_lock)) != 0)
{
return result;
}
        //初始化路径索引线程锁
if ((result=init_pthread_lock(&path_index_thread_lock)) != 0)
{
return result;
}
        //初始化状态线程锁
if ((result=init_pthread_lock(&stat_count_thread_lock)) != 0)
{
return result;
}
        //初始化线程堆栈的大小为512k
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_attr fail, program exit!", __LINE__);
return result;
}
        //初始化释放队列
        // 其中的g_max_connections全局变量,是从配置文件的max_connections项中读取的
if ((result=free_queue_init(g_max_connections, g_buff_size, \
                g_buff_size, sizeof(StorageClientInfo))) != 0)
{
return result;
}
        //分配4(默认初始化)个nio_thread_data结构内存
        //g_work_threads是service工作线程数,默认值是4
g_nio_thread_data = (struct storage_nio_thread_data *)malloc(sizeof( \
struct storage_nio_thread_data) * g_work_threads);
if (g_nio_thread_data == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, errno: %d, error info: %s", \
__LINE__, (int)sizeof(struct storage_nio_thread_data) * \
g_work_threads, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}

g_storage_thread_count = 0;
        //获取nio_thread_data结构数组的尾指针
pDataEnd = g_nio_thread_data + g_work_threads;
        //遍历nio_thread_data数组,创建io的工作线程
for (pThreadData=g_nio_thread_data; pThreadData
{
                //创建libevent基础结构
pThreadData->ev_base = event_base_new();
if (pThreadData->ev_base == NULL)
{
result = errno != 0 ? errno : ENOMEM;
logError("file: "__FILE__", line: %d, " \
"event_base_new fail.", __LINE__);
return result;
}
                //建立管道,并把两个fd保存到storage_nio_thread_data的pipe_fds变量中
if (pipe(pThreadData->pipe_fds) != 0)
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, " \
"call pipe fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
break;
}
                //把管道的可读fd设置成非阻塞的
if ((result=set_nonblock(pThreadData->pipe_fds[0])) != 0)
{
break;
}
                //创建工作线程
if ((result=pthread_create(&tid, &thread_attr, \
work_thread_entrance, pThreadData)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, startup threads: %d, " \
"errno: %d, error info: %s", \
__LINE__, g_storage_thread_count, \
result, STRERROR(result));
break;
}
else
{
                        //创建失败
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
g_storage_thread_count++;
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
}
}
        //销毁线程属性值
pthread_attr_destroy(&thread_attr);
last_stat_change_count = g_stat_change_count;
//DO NOT support direct IO !!!
//g_extra_open_file_flags = g_disk_rw_direct ? O_DIRECT : 0;
return result;
}


2,tracker信息和客服端信息 队列初始化

int free_queue_init(const int max_connections, const int min_buff_size, \
const int max_buff_size, const int arg_size)
{
struct fast_task_info *pTask;
char *p;
char *pCharEnd;
int block_size;
int alloc_size;
int64_t total_size;
int result;

        //初始化g_free_queue线程锁
if ((result=init_pthread_lock(&(g_free_queue.lock))) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_lock fail, errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
        //计算每个块分配的内存空间,结构如下[sizeof(fast_task_info)][sizeof(StorageClientInfo)]
block_size = sizeof(struct fast_task_info) + arg_size;
        //实际要分配的内存大小:最大连接数*每块内存大小
alloc_size = block_size * max_connections;

        //max_buff_size也就是g_buff_size是从配置文件的buff_size项中读取的,其默认大小是64k
        //min_buff_size是sizeof(StorageClientInfo)也就是一个客服端信息结构的大小

if (max_buff_size > min_buff_size) //若g_buff_size大小大于min_buff_size大小
{
total_size = alloc_size;
g_free_queue.malloc_whole_block = false;
}
else
{
struct rlimit rlimit_data;
rlim_t max_data_size;

if (getrlimit(RLIMIT_DATA, &rlimit_data) < 0)
{
logError("file: "__FILE__", line: %d, " \
"call getrlimit fail, " \
"errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno != 0 ? errno : EPERM;
}
if (rlimit_data.rlim_cur == RLIM_INFINITY) //系统没有限制内存
{
max_data_size = 512 * 1024 * 1024;  //内存使用最大数设置成512M
}
else
{
max_data_size = rlimit_data.rlim_cur;
if (max_data_size > 512 * 1024 * 1024)
{
max_data_size = 512 * 1024 * 1024;
}
}

total_size = alloc_size+(int64_t)min_buff_size*max_connections;
if (total_size <= max_data_size)
{
g_free_queue.malloc_whole_block = true;
block_size += min_buff_size;
}
else
{
g_free_queue.malloc_whole_block = false;
total_size = alloc_size;
}
}

g_mpool = (struct fast_task_info *)malloc(total_size);
if (g_mpool == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc "INT64_PRINTF_FORMAT" bytes fail, " \
"errno: %d, error info: %s", \
__LINE__, total_size, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
memset(g_mpool, 0, total_size);

        //分配task_info + storageclientinfo结构内存池
pCharEnd = ((char *)g_mpool) + total_size;
for (p=(char *)g_mpool; p
{
pTask = (struct fast_task_info *)p;
pTask->size = min_buff_size;

pTask->arg = p + sizeof(struct fast_task_info);
if (g_free_queue.malloc_whole_block)
{
pTask->data = (char *)pTask->arg + arg_size;
}
else
{
pTask->data = (char *)malloc(pTask->size);
if (pTask->data == NULL)
{
free_queue_destroy();

logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, " \
"errno: %d, error info: %s", \
__LINE__, pTask->size, \
errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
}
}

g_free_queue.tail = (struct fast_task_info *)(pCharEnd - block_size);
for (p=(char *)g_mpool; p<(char *)g_free_queue.tail; p += block_size)
{
pTask = (struct fast_task_info *)p;
pTask->next = (struct fast_task_info *)(p + block_size);
}

g_free_queue.max_connections = max_connections;
g_free_queue.min_buff_size = min_buff_size;
g_free_queue.max_buff_size = max_buff_size;
g_free_queue.arg_size = arg_size;
g_free_queue.head = g_mpool;
g_free_queue.tail->next = NULL;

return 0;
}


3, 工作线程函数work_thread_entrance()

static void *work_thread_entrance(void* arg) 
{
    int result;
    struct storage_nio_thread_data *pThreadData;
    struct event ev_notify;

    //pThreadData 指向全局变量g_nio_thread_data的每一个storage_nio_thread_data结构的起始地址
    //共有g_work_threads个工作线程,所以有这么多个storage_nio_thread_data结构

    pThreadData = (struct storage_nio_thread_data *)arg;
    //若要检查重复文件
    if (g_check_file_duplicate)  {    
        if ((result=fdht_copy_group_array(&(pThreadData->group_array),\
                &g_group_array)) != 0)
        {    
            pthread_mutex_lock(&g_storage_thread_lock);
            g_storage_thread_count--;
            pthread_mutex_unlock(&g_storage_thread_lock);
            return NULL;
        }    
    }

    do  {    
        //每个工作线程分别监控属于自己的storage_nio_thread_data结构中的可读描述符:pipe_fds[0]
        //通过libevent监控管道可读端是否可读,若管道可读,则使用storage_recv_notify_read函数
        //读取管道里的信息,并对管道信息进行处理。
        event_set(&ev_notify, pThreadData->pipe_fds[0], \
            EV_READ | EV_PERSIST, storage_recv_notify_read, NULL);
        if ((result=event_base_set(pThreadData->ev_base, &ev_notify)) != 0)
        {
            logCrit("file: "__FILE__", line: %d, " \
                "event_base_set fail.", __LINE__);
            break;
        }
        //添加到libevent事件监控队列
        if ((result=event_add(&ev_notify, NULL)) != 0)
        {
            logCrit("file: "__FILE__", line: %d, " \
                "event_add fail.", __LINE__);
            break;
        }
        //若继续,再次添加到事件监控队列中 
        while (g_continue_flag)
        {
            event_base_loop(pThreadData->ev_base, 0);
        }
    } while (0);

    event_base_free(pThreadData->ev_base);

    if (g_check_file_duplicate)
    {
        if (g_keep_alive)
        {
            fdht_disconnect_all_servers(&(pThreadData->group_array));
        }

        fdht_free_group_array(&(pThreadData->group_array));
    }
    //要对全局变量操作了,加线程锁
    if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "call pthread_mutex_lock fail, " \
            "errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
    }
    //storage线程个数减1
    g_storage_thread_count--;
    //解锁
    if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "call pthread_mutex_lock fail, " \
            "errno: %d, error info: %s", \
            __LINE__, result, STRERROR(result));
    }

    logDebug("file: "__FILE__", line: %d, " \
        "nio thread exited, thread count: %d", \
        __LINE__, g_storage_thread_count);

    return NULL;
}


//处理g_nio_thread_data队列中storage_nio_thread_data结构的管道读描述符pipe_fds[0]的可读信息
void storage_recv_notify_read(int sock, short event, void *arg)
{   
    struct fast_task_info *pTask;
    StorageClientInfo *pClientInfo;
    long task_addr;
    int64_t remain_bytes;
    int bytes;
    int result; 
        
    //进入死循环,不断处理客户端或tracker来的请求,并处理请求
    while (1)
    {       
        //从可读管道描述符中读取信息
        if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)
        {   
            //出错,跳出循环
            if (!(errno == EAGAIN || errno == EWOULDBLOCK))
            {
                logError("file: "__FILE__", line: %d, " \
                    "call read failed, " \
                    "errno: %d, error info: %s", \
                    __LINE__, errno, STRERROR(errno));
            }
        
            break;  
        }   
        else if (bytes == 0)     //读到的信息长度为0
        {   
            logError("file: "__FILE__", line: %d, " \
                "call read failed, end of file", __LINE__);
            break;
        }   
    
        //管道写入的是一个fast_task_info 结构的信息
        //先把信息格式化一下
        pTask = (struct fast_task_info *)task_addr;
        //对应连接的客户端信息
        pClientInfo = (StorageClientInfo *)pTask->arg;
        //sock已经小于0了,有错误发生,直接返回,结束读取
        if (pClientInfo->sock < 0)  //quit flag
        {
            struct storage_nio_thread_data *pThreadData;
            struct timeval tv;

            pThreadData = g_nio_thread_data + \
                    pClientInfo->nio_thread_index;
                        tv.tv_sec = 1;
                        tv.tv_usec = 0;
            event_base_loopexit(pThreadData->ev_base, &tv);
            return;
        }
        
        /* //logInfo("=====thread index: %d, pClientInfo->sock=%d", \
            pClientInfo->nio_thread_index, pClientInfo->sock);
        */
        //根据客户端的状态进行处理
        switch (pClientInfo->stage)
        {         
            //初始化状态,此状态是由在storage_accept_loop函数中和客户端建立tcp连接后,初始化的。
            case FDFS_STORAGE_STAGE_NIO_INIT:
                result = storage_nio_init(pTask);
                break;
            //完成FDFS_STORAGE_STAGE_NIO_INIT这个阶段后,进入接收阶段
            case FDFS_STORAGE_STAGE_NIO_RECV:
                //把目前的偏移量设置为0
                pTask->offset = 0;
                //接收的总长度是total_length-total_offset总长度减去总偏移量
                remain_bytes = pClientInfo->total_length - \
                           pClientInfo->total_offset;
                //pTask->length 是数据长度,为剩余的字节数和pTask->size中的两者中最大的数据
                if (remain_bytes > pTask->size)
                {
                    pTask->length = pTask->size;
                }
                else
                {
                    pTask->length = remain_bytes;
                }
                //从pClientInfo->sock中读取数据并处理相应的命令
                client_sock_read(pClientInfo->sock, EV_READ, pTask);
                result = 0;
                /*
                if ((result=event_add(&pTask->ev_read, \
                        &g_network_tv)) != 0)
                {
                    logError("file: "__FILE__", line: %d, " \
                        "event_add fail.", __LINE__);
                }
                */
                break;
            case FDFS_STORAGE_STAGE_NIO_SEND:
                result = storage_send_add_event(pTask);
                break;

            default:
                logError("file: "__FILE__", line: %d, " \
                    "invalid stage: %d", __LINE__, \
                    pClientInfo->stage);
                result = EINVAL;
                break;
        }

        if (result != 0)
        {
            task_finish_clean_up(pTask);
        }
    }
}


//storage_nio_init函数
该初始化函数主要是把客户端和storage服务端的已建立成功的socket描述符添加到事件监控队列中。
用来接收并处理客户端的请求。并设置读、写事件处理函数。

static int storage_nio_init(struct fast_task_info *pTask)
{
    int result;
    StorageClientInfo *pClientInfo;
    struct storage_nio_thread_data *pThreadData;

    pClientInfo = (StorageClientInfo *)pTask->arg;
    pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;

    //把客户端和storage服务端建立成功的socket添加到libevent的读和写事件监控队列中。
    //并设置读和写事件的处理函数。
    event_set(&pTask->ev_read, pClientInfo->sock, EV_READ, \
            client_sock_read, pTask);
    if ((result=event_base_set(pThreadData->ev_base, &pTask->ev_read)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "event_base_set fail.", __LINE__);
        return result;
    }
    //向pClientInfo->sock写数据事件,写事件的处理函数是client_sock_write
    event_set(&pTask->ev_write, pClientInfo->sock, EV_WRITE, \
            client_sock_write, pTask);
    if ((result=event_base_set(pThreadData->ev_base, \
                    &pTask->ev_write)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "event_base_set fail.", __LINE__);
        return result;
    }

    //修改客户端的stage变量的状态为RECV
    pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
    if ((result=event_add(&pTask->ev_read, &g_network_tv)) != 0)
    {
        logError("file: "__FILE__", line: %d, " \
            "event_add fail.", __LINE__);
        return result;
    }
    return 0;
}

//读客户端信息,并进行相应处理
static void client_sock_read(int sock, short event, void *arg)
{
int bytes;
int recv_bytes;
struct fast_task_info *pTask;
        StorageClientInfo *pClientInfo;

pTask = (struct fast_task_info *)arg;
        pClientInfo = (StorageClientInfo *)pTask->arg;
        
        //读数据超时,触发超时事件
        //若超时直接返回,由
if (event == EV_TIMEOUT)
{
if (pClientInfo->total_offset == 0 && pTask->req_count > 0)
{
if (event_add(&pTask->ev_read, &g_network_tv) != 0)
{
task_finish_clean_up(pTask);

logError("file: "__FILE__", line: %d, " \
"event_add fail.", __LINE__);
}
}
else
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, recv timeout, " \
"recv offset: %d, expect length: %d", \
__LINE__, pTask->client_ip, \
pTask->offset, pTask->length);

task_finish_clean_up(pTask);
}

return;
}
        //死循环
while (1) {
                //若total_length 为0,代表此次发送的是头数据
if (pClientInfo->total_length == 0)     //recv header
{    
                        //若发送的是协议头,接收的字节数为,traker头减去偏移量
recv_bytes = sizeof(TrackerHeader) - pTask->offset;
} else {
                        //接收的是数据,接收长度是length-offset
recv_bytes = pTask->length - pTask->offset;
}

/*
logInfo("total_length="INT64_PRINTF_FORMAT", recv_bytes=%d, "
"pTask->length=%d, pTask->offset=%d",
pClientInfo->total_length, recv_bytes, 
pTask->length, pTask->offset);
*/

bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
                //recv到的长度小于0,表示有错误发生
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if(event_add(&pTask->ev_read, &g_network_tv)!=0) {
task_finish_clean_up(pTask);

logError("file: "__FILE__", line: %d, "\
"event_add fail.", __LINE__);
}
} else {
logError("file: "__FILE__", line: %d, " \
"client ip: %s, recv failed, " \
"errno: %d, error info: %s", \
__LINE__, pTask->client_ip, \
errno, STRERROR(errno));

task_finish_clean_up(pTask);
}
                        //接收时发生错误,直接返回,重新初始化
return;
}
else if (bytes == 0)
{
logDebug("file: "__FILE__", line: %d, " \
"client ip: %s, recv failed, " \
"connection disconnected.", \
__LINE__, pTask->client_ip);

task_finish_clean_up(pTask);
return;
}
                //total_length 为0,表示读取的是数据包头
if (pClientInfo->total_length == 0) //header
{
if (pTask->offset + bytes < sizeof(TrackerHeader))
{
if (event_add(&pTask->ev_read, &g_network_tv)!=0)
{
task_finish_clean_up(pTask);

logError("file: "__FILE__", line: %d, "\
"event_add fail.", __LINE__);
}

pTask->offset += bytes;
return;
}

pClientInfo->total_length=buff2long(((TrackerHeader *) \
pTask->data)->pkg_len);
if (pClientInfo->total_length < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, pkg length: " \
INT64_PRINTF_FORMAT" < 0", \
__LINE__, pTask->client_ip, \
pClientInfo->total_length);

task_finish_clean_up(pTask);
return;
}

pClientInfo->total_length += sizeof(TrackerHeader);
if (pClientInfo->total_length > pTask->size)
{
pTask->length = pTask->size;
}
else
{
pTask->length = pClientInfo->total_length;
}
}

pTask->offset += bytes;
if (pTask->offset >= pTask->length) //recv current pkg done
{
if (pClientInfo->total_offset + pTask->length >= \
pClientInfo->total_length)
{
/* current req recv done */
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
pTask->req_count++;
}

if (pClientInfo->total_offset == 0)
{
pClientInfo->total_offset = pTask->length;
storage_deal_task(pTask);
}
else
{
pClientInfo->total_offset += pTask->length;

/* continue write to file */
storage_dio_queue_push(pTask);
}

return;
}
}

return;
}


//处理客户端命令
int storage_deal_task(struct fast_task_info *pTask)
{
TrackerHeader *pHeader;
StorageClientInfo *pClientInfo;
int result;

pClientInfo = (StorageClientInfo *)pTask->arg;
pHeader = (TrackerHeader *)pTask->data;
        //解析请求命令
switch(pHeader->cmd)
{
case STORAGE_PROTO_CMD_DOWNLOAD_FILE:  //下载文件
result = storage_server_download_file(pTask);
break;
case STORAGE_PROTO_CMD_GET_METADATA: //获取元数据
result = storage_server_get_metadata(pTask);
break;
case STORAGE_PROTO_CMD_UPLOAD_FILE: //上传文件
result = storage_upload_file(pTask, false);
break;
case STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE: //上传文件并添加到文件末端
result = storage_upload_file(pTask, true);
break;
case STORAGE_PROTO_CMD_APPEND_FILE:
result = storage_append_file(pTask);
break;
case STORAGE_PROTO_CMD_MODIFY_FILE: //修改文件
result = storage_modify_file(pTask);
break;
case STORAGE_PROTO_CMD_TRUNCATE_FILE: //截断文件内容
result = storage_do_truncate_file(pTask);
break;
case STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE:
result = storage_upload_slave_file(pTask);
break;
case STORAGE_PROTO_CMD_DELETE_FILE: //删除文件
result = storage_server_delete_file(pTask);
break;
case STORAGE_PROTO_CMD_CREATE_LINK: //创建连接
result = storage_create_link(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_CREATE_FILE:
result = storage_sync_copy_file(pTask, pHeader->cmd);
break;
case STORAGE_PROTO_CMD_SYNC_DELETE_FILE: //删除文件同步 
result = storage_sync_delete_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_UPDATE_FILE:
result = storage_sync_copy_file(pTask, pHeader->cmd);
break;
case STORAGE_PROTO_CMD_SYNC_APPEND_FILE:
result = storage_sync_append_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_MODIFY_FILE:
result = storage_sync_modify_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE:
result = storage_sync_truncate_file(pTask);
break;
case STORAGE_PROTO_CMD_SYNC_CREATE_LINK:
result = storage_sync_link_file(pTask);
break;
case STORAGE_PROTO_CMD_SET_METADATA:
result = storage_server_set_metadata(pTask);
break;
case STORAGE_PROTO_CMD_QUERY_FILE_INFO:
result = storage_server_query_file_info(pTask);
break;
case STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG:
result = storage_server_fetch_one_path_binlog(pTask);
break;
case FDFS_PROTO_CMD_QUIT:
task_finish_clean_up(pTask);
return 0;
case FDFS_PROTO_CMD_ACTIVE_TEST:
result = storage_deal_active_test(pTask);
break;
case STORAGE_PROTO_CMD_REPORT_CLIENT_IP:
result = storage_server_report_client_ip(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE:
result = storage_server_trunk_alloc_space(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_ALLOC_CONFIRM:
result = storage_server_trunk_alloc_confirm(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_FREE_SPACE:
result = storage_server_trunk_free_space(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_SYNC_BINLOG:
result = storage_server_trunk_sync_binlog(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_GET_BINLOG_SIZE:
result = storage_server_trunk_get_binlog_size(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_DELETE_BINLOG_MARKS:
result = storage_server_trunk_delete_binlog_marks(pTask);
break;
case STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE:
result = storage_server_trunk_truncate_binlog_file(pTask);
break;
default:
logError("file: "__FILE__", line: %d, "  \
"client ip: %s, unkown cmd: %d", \
__LINE__, pTask->client_ip, \
pHeader->cmd);
result = EINVAL;
break;
}

if (result != STORAGE_STATUE_DEAL_FILE)
{
pClientInfo->total_offset = 0;
pTask->length = pClientInfo->total_length;

pHeader = (TrackerHeader *)pTask->data;
pHeader->status = result;
pHeader->cmd = STORAGE_PROTO_CMD_RESP;
long2buff(pClientInfo->total_length - sizeof(TrackerHeader), \
pHeader->pkg_len);
storage_send_add_event(pTask);
}

return result;
}


//接收到任务后的最终会调用该函数对任务进行处理
static int storage_write_to_file(struct fast_task_info *pTask, \
const int64_t file_offset, const int64_t upload_bytes, \
const int buff_offset, TaskDealFunc deal_func, \
FileDealDoneCallback done_callback, \
DisconnectCleanFunc clean_func, const int store_path_index)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
int result;

pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext =  &(pClientInfo->file_context);

pClientInfo->deal_func = deal_func;
pClientInfo->clean_func = clean_func;

pFileContext->fd = -1;
pFileContext->buff_offset = buff_offset;
pFileContext->offset = file_offset;
pFileContext->start = file_offset;
pFileContext->end = file_offset + upload_bytes;
pFileContext->dio_thread_index = storage_dio_get_thread_index( \
pTask, store_path_index, pFileContext->op);
pFileContext->done_callback = done_callback;

if (pFileContext->calc_crc32)
{
pFileContext->crc32 = CRC32_XINIT;
}

if (pFileContext->calc_file_hash)
{
INIT_HASH_CODES4(pFileContext->file_hash_codes)
}

        //把任务放到io任务队列中,并发送信号,通知io处理函数进行处理。
        //io队列接收到通知后,会调用io处理函数进行处理。
if ((result=storage_dio_queue_push(pTask)) != 0)
{
pClientInfo->total_length = sizeof(TrackerHeader);
return result;
}

return STORAGE_STATUE_DEAL_FILE;
}


/**
1 byte: store path index
8 bytes: file size 
FDFS_FILE_EXT_NAME_MAX_LEN bytes: file ext name, do not include dot (.)
file size bytes: file content

上传文件
**/
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
DisconnectCleanFunc clean_func;
char *p;
char filename[128];
char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN + 1];
int64_t nInPackLen;
int64_t file_offset;
int64_t file_bytes;
int crc32;
int store_path_index;
int result;
int filename_len;

pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext =  &(pClientInfo->file_context);

nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);

if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE + 
FDFS_FILE_EXT_NAME_MAX_LEN)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
INT64_PRINTF_FORMAT" is not correct, " \
"expect length >= %d", __LINE__, \
STORAGE_PROTO_CMD_UPLOAD_FILE, \
pTask->client_ip,  nInPackLen, \
1 + FDFS_PROTO_PKG_LEN_SIZE + \
FDFS_FILE_EXT_NAME_MAX_LEN);
pClientInfo->total_length = sizeof(TrackerHeader);
return EINVAL;
}
        //文件路径索引号
p = pTask->data + sizeof(TrackerHeader);
store_path_index = *p++;
if (store_path_index < 0 || store_path_index >= g_fdfs_path_count)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, store_path_index: %d " \
"is invalid", __LINE__, \
pTask->client_ip, store_path_index);
pClientInfo->total_length = sizeof(TrackerHeader);
return EINVAL;
}
        //获取文件大小
file_bytes = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
if (file_bytes < 0 || file_bytes != nInPackLen - \
(1 + FDFS_PROTO_PKG_LEN_SIZE + \
 FDFS_FILE_EXT_NAME_MAX_LEN))
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, pkg length is not correct, " \
"invalid file bytes: "INT64_PRINTF_FORMAT \
", total body length: "INT64_PRINTF_FORMAT, \
__LINE__, pTask->client_ip, file_bytes, nInPackLen);
pClientInfo->total_length = sizeof(TrackerHeader);
return EINVAL;
}
        //获取文件名
memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
*(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
p += FDFS_FILE_EXT_NAME_MAX_LEN;

pFileContext->calc_crc32 = true;
pFileContext->calc_file_hash = g_check_file_duplicate;
pFileContext->extra_info.upload.start_time = time(NULL);

strcpy(pFileContext->extra_info.upload.file_ext_name, file_ext_name);
storage_format_ext_name(file_ext_name, \
pFileContext->extra_info.upload.formatted_ext_name);

pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR;
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
if (bAppenderFile)
{
pFileContext->extra_info.upload.file_type |= \
_FILE_TYPE_APPENDER;
}
else
{
if (g_if_use_trunk_file && trunk_check_size( \
TRUNK_CALC_SIZE(file_bytes)))
{
pFileContext->extra_info.upload.file_type |= \
_FILE_TYPE_TRUNK;
}
}

        //若使用trunk文件保存
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)
{
FDFSTrunkFullInfo *pTrunkInfo;

pFileContext->extra_info.upload.if_sub_path_alloced = true;
pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info);
                //为trunk文件名分配空间,并添加到缓存中
if ((result=trunk_client_trunk_alloc_space( \
TRUNK_CALC_SIZE(file_bytes), pTrunkInfo)) != 0)
{
pClientInfo->total_length = sizeof(TrackerHeader);
return result;
}

clean_func = dio_trunk_write_finish_clean_up;
file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo));
         pFileContext->extra_info.upload.if_gen_filename = true;
trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \
sizeof(pFileContext->filename));
                //初始化trunk文件,并对trunk文件做初始化,trunk文件一般是
pFileContext->extra_info.upload.before_open_callback = \
dio_check_trunk_file;
pFileContext->extra_info.upload.before_close_callback = \
dio_write_chunk_header;
pFileContext->open_flags = O_RDWR | g_extra_open_file_flags;
}
else
{
if (g_path_free_mbs[store_path_index] - (file_bytes/FDFS_ONE_MB)
<= g_avg_storage_reserved_mb)
{
logError("file: "__FILE__", line: %d, " \
"no space to upload file, "
"free space: %d MB is too small, file bytes: " \
INT64_PRINTF_FORMAT", reserved space: %d MB", \
__LINE__, g_path_free_mbs[store_path_index], \
file_bytes, g_avg_storage_reserved_mb);
pClientInfo->total_length = sizeof(TrackerHeader);
return ENOSPC;
}

crc32 = rand();
*filename = '\0';
filename_len = 0;
pFileContext->extra_info.upload.if_sub_path_alloced = false;
pFileContext->extra_info.upload.trunk_info.path. \
store_path_index = store_path_index;
if ((result=storage_get_filename(pClientInfo, \
pFileContext->extra_info.upload.start_time, \
file_bytes, crc32, pFileContext->extra_info.upload.\
formatted_ext_name, filename, &filename_len, \
pFileContext->filename)) != 0)
{
pClientInfo->total_length = sizeof(TrackerHeader);
return result;
}

clean_func = dio_write_finish_clean_up;
file_offset = 0;
         pFileContext->extra_info.upload.if_gen_filename = true;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->open_flags = O_WRONLY | O_CREAT | O_TRUNC \
| g_extra_open_file_flags;
}

  return storage_write_to_file(pTask, file_offset, file_bytes, \
p - pTask->data, dio_write_file, \
storage_upload_file_done_callback, \
clean_func, store_path_index);
}



/**
pkg format:
Header
8 bytes: file offset
8 bytes: download file bytes
FDFS_GROUP_NAME_MAX_LEN bytes: group_name
filename
传输的数据包格式:
值:
|tracker header|file offset|download file bytes|group_name|
长度:
|tracker Header|8B|8B|FDFS_GROUP_NAME_MAX_LEN|
**/
//从storage服务器下载文件
static int storage_server_download_file(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
char *p;
int result;
int store_path_index;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char true_filename[128];
char *filename;
int filename_len;
int64_t file_offset;
int64_t download_bytes;
int64_t file_bytes;
struct stat stat_buf;
int64_t nInPackLen;
FDFSTrunkFullInfo trunkInfo;
FDFSTrunkHeader trunkHeader;

pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext =  &(pClientInfo->file_context);

        //减去trackerheader的长度,得到数据包长度
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader); 
pClientInfo->total_length = sizeof(TrackerHeader);

        //包中实际的数据长度错误
if (nInPackLen <= 16 + FDFS_GROUP_NAME_MAX_LEN)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
INT64_PRINTF_FORMAT" is not correct, " \
"expect length > %d", __LINE__, \
STORAGE_PROTO_CMD_UPLOAD_FILE, \
pTask->client_ip,  \
nInPackLen, 16 + FDFS_GROUP_NAME_MAX_LEN);
return EINVAL;
}

        //数据包长度错误
if (nInPackLen >= pTask->size)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
INT64_PRINTF_FORMAT" is too large, " \
"expect length should < %d", __LINE__, \
STORAGE_PROTO_CMD_UPLOAD_FILE, \
pTask->client_ip,  \
nInPackLen, pTask->size);
return EINVAL;
}

        //获得实际数据的起始地址
p = pTask->data + sizeof(TrackerHeader);

        //得到文件偏移量
file_offset = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
        //下一个字段是下载文件的bytes数
download_bytes = buff2long(p);
p += FDFS_PROTO_PKG_LEN_SIZE;
        //若文件偏移量小于0,错误,直接返回
if (file_offset < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, invalid file offset: " \
INT64_PRINTF_FORMAT,  __LINE__, \
pTask->client_ip, file_offset);
return EINVAL;
}
        //若下载的文件大小小于0,错误
if (download_bytes < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, invalid download file bytes: " \
INT64_PRINTF_FORMAT,  __LINE__, \
pTask->client_ip, download_bytes);
return EINVAL;
}

        //下一个字段是组名
memcpy(group_name, p, FDFS_GROUP_NAME_MAX_LEN);
*(group_name + FDFS_GROUP_NAME_MAX_LEN) = '\0';
p += FDFS_GROUP_NAME_MAX_LEN;
        //和客户端发送的group命比较,是否相等,若不等说明连接错误,直接返回
if (strcmp(group_name, g_group_name) != 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, group_name: %s " \
"not correct, should be: %s", \
__LINE__, pTask->client_ip, \
group_name, g_group_name);
return EINVAL;
}

        //获取到文件名
filename = p;
filename_len = nInPackLen - (16 + FDFS_GROUP_NAME_MAX_LEN);
*(filename + filename_len) = '\0';

        //到此,文件名和下载量都已知道了
        //分解文件名,并获取真正的可以直接读取的文件名
if ((result=storage_split_filename_ex(filename, \
&filename_len, true_filename, &store_path_index)) != 0)
{
return result;
}
        //检查文件名字符串是否正确
if ((result=fdfs_check_data_filename(true_filename, filename_len)) != 0)
{
return result;
}

pFileContext->fd = -1;
result = trunk_file_stat_ex(store_path_index, \
true_filename, filename_len, &stat_buf, \
&trunkInfo, &trunkHeader, &pFileContext->fd);
if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
pthread_mutex_lock(&stat_count_thread_lock);
g_storage_stat.total_file_open_count++;
pthread_mutex_unlock(&stat_count_thread_lock);
}
if (result == 0)
{
if (!S_ISREG(stat_buf.st_mode))
{
logError("file: "__FILE__", line: %d, " \
"logic file %s is not a regular file", \
__LINE__, filename);
return EISDIR;
}

file_bytes = stat_buf.st_size;
}
else
{
file_bytes = 0;
logError("file: "__FILE__", line: %d, " \
"call stat fail, logic file: %s, "\
"error no: %d, error info: %s", \
__LINE__, filename, result, STRERROR(result));
return result;
}

if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
pthread_mutex_lock(&stat_count_thread_lock);
g_storage_stat.success_file_open_count++;
pthread_mutex_unlock(&stat_count_thread_lock);
}

        //检查下载文件的字节数是否正确
if (download_bytes == 0)
{
download_bytes  = file_bytes - file_offset;
}
else if (download_bytes > file_bytes - file_offset)
{
logError("file: "__FILE__", line: %d, " \
"client ip:%s, invalid download file bytes: " \
INT64_PRINTF_FORMAT" > file remain bytes: " \
INT64_PRINTF_FORMAT,  __LINE__, \
pTask->client_ip, download_bytes, \
file_bytes - file_offset);
if (pFileContext->fd >= 0)
{
close(pFileContext->fd);
}
return EINVAL;
}

if (IS_TRUNK_FILE_BY_ID(trunkInfo))
{
trunk_get_full_filename((&trunkInfo), pFileContext->filename, \
sizeof(pFileContext->filename));
file_offset += TRUNK_FILE_START_OFFSET(trunkInfo);
}
else
{
sprintf(pFileContext->filename, "%s/data/%s", \
g_fdfs_store_paths[store_path_index], true_filename);
}

return storage_read_from_file(pTask, file_offset, download_bytes, \
storage_download_file_done_callback, store_path_index);
}



阅读(10329) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~