Chinaunix首页 | 论坛 | 博客
  • 博客访问: 108871
  • 博文数量: 40
  • 博客积分: 2058
  • 博客等级: 大尉
  • 技术积分: 409
  • 用 户 组: 普通用户
  • 注册时间: 2008-10-07 16:49
文章分类

全部博文(40)

文章存档

2011年(3)

2010年(17)

2009年(14)

2008年(6)

我的朋友

分类: LINUX

2010-08-10 09:59:19

文件:bt.tar.bz2
大小:155KB
下载:下载

BT协议分析+源代码阅读


这一周多用来分析了一下《LinuxC编程实战》最后一章的项目实战:BT下载软件的开发,书上详细分析和解释了BT协议。我的学习方法主要结合书上给的BT协议一边理解一边看源代码。

主要分为这么几个步骤:

1.了解了BT(BitTorrent)协议在互联网中的一些应用,发展历史和前景。

2.详细介绍了BT协议。 BitTorrent(简称BT )是一个文件分发协议,每个下载者在下载的同时不断的向其他下载者上传已下载的数据。他的特点是:下载的人越多下载的速度越快,原因在于每个下载者将已下载的数据提供给其他下载者下载,它充分利用了用户的上载宽带。BT协议通过一定的策略保证上传的速度越快,下载的速度也越快。

3.BT协议的文件分发系统的构成:一个web服务器,一个种子文件,一个Tracker服务器,一个原始文件提供者,一个网路浏览器,一个或多个下载者。

4.熟悉B编码的规则。

5.了解种子文件的结构

6.客户端使用http协议与tracer进行通信:一方面是将自己的下载进度告知给tracker进行一些相关的统计,二是获取当前下载同一个共享文件的peerIP地址和端口号。

7.peer之间的通信协议,是一个基于TCP协议的应用层协议。

8.BT协议中一些关键的算法和策略。

9.系统的结构化设计,其结构图如下所示:



其中最主要的就是理解BT协议和系统结构设计。理论上理解完了以后就开始看源代码。刚开始看着感觉还不错,到对后看得有点晕,不过整体思想还是搞清楚了,哈哈……结合源代码对这个协议又有了更深入的了解,用我自己的话来总结:

首先BT协议和我以往看到的socket编程不太一样,虽然也用到了创建套接子和网络链接,但它最大的特点就是虽然也有客户端却没有服务器端,没有绑定,监听,生成新的进程去处理客户短发送来的请求等。每个peer通过解析种子文件得到tracker的地址以后就得到了和要下载该数据有关的peer,每个peer即使上传者又是下载者,每个下载者从服务器下载自己的数据各个下载者之间并没有交互,就不会像FTPHTTP服务一样,由于好多客户端访问服务器就带来了服务器性能优化的问题,由于服务器的处理能力和宽带的限制,影响客户的下载速度。

其次就是揭开了上传和下载文件的面纱,以前一直感觉在网络上传送一个文件之类的东西是一件很神奇的事情,现在才发现其实质只不过就是对磁盘上的文件的读写,然后将读到的数据放到缓冲区中然后经过网络传输到我们的应用程序中然后再写入磁盘。

最后,也是这次看源代码的一些心路历程吧,看代码有好几天看到关键一截了,代码难度也上去了很不好理解,代码看得我恶心,突然就有想放弃的念头,最后还是忍住了。见天总算是看完了。呵呵不过今天看了一个先前以为很难的代码,没想到感觉好读多了,真不知到是看完了难的看简单一点的就感觉好受一点呢还是我的水平有了进一步的提高。

以前只是自己看看书,写写一些小代码没做过什么大项目,所以想通过这次阅读该项目的源代码来了解一下大型程序是怎样开发出来的,同时看一下自己平时写的代码和工业级的代码相差在哪里,通过这些天对源代码的阅读,真的还收获不少。结合自己的理解我给源代码中加上了注释,细节问题书上分析的都很清楚,我就不再赘述有兴趣的可以下载我上传的源代码阅读。以下是对BT种子源代码的阅读后的一些感受:

1.变量的命名规范问题。这是一个不容争议的话题,变量的命名要有见名知意的效果,这样写出来的代码便于自己记忆也利于别人阅读。

2.分模块的程序设计。每一个.c文件都有一个.h文件与其对应,.h文件中都是一些结构体的定义和函数的申明。.c文件则是.h文件中函数的具体实现。全局变量定义在使用最多的.c文件中,别的文件中若果需要使用则需要用extern关键字声明即可。

3.对形如 char *buff;的变量 都要对其空间进行动态分配和释放,同时使用一个标识记录该字符串的真实长度(这样便于确定字符串的长度,能简化程序的设计)。

4.释放内存空间的时候一定要先释放主结构体中为成员所分配的内存,再释放主结构体中动态申请的内存。如下结构:



typedef struct _Peer {
    int socket; // 通过该socket与peer进行通信

    char ip[16]; // peer的ip地址
    unsigned short port; // peer的端口号
    char id[21]; // peer的id
    int state; // 当前所处的状态
    int am_choking; // 是否将peer阻塞
    int am_interested; // 是否对peer感兴趣
    int peer_choking; // 是否被peer阻塞
    int peer_interested; // 是否被peer感兴趣
    Bitmap bitmap; // 存放peer的位图
 
  char *in_buff; // 存放从peer处获取的消息
    int buff_len; // 缓存区in_buff的长度
    char *out_msg; // 存放将发送给peer的消息
    int msg_len; // 缓冲区out_msg的长度:
    char *out_msg_copy; // out_msg的副本,发送时使用该缓冲区
    int msg_copy_len; // 缓冲区out_msg_copy的长度
    int msg_copy_index; // 下一次要发送的数据的偏移量
    Request_piece *Request_piece_head; // 向peer请求数据的队列
    Request_piece *Requested_piece_head; // 被peer请求数据的队列
    unsigned int down_total; // 从该peer下载的数据的总和
    unsigned int up_total; // 向该peer上传的数据的总和
   
time_t start_timestamp; // 最近一次接收到peer消息的时间
   
time_t recet_timestamp; // 最近一次发送消息给peer的时间
    time_t last_down_timestamp; // 最近下载数据的开始时间
    time_t last_up_timestamp; // 最近上传数据的开始时间
    long long down_count; // 本计时周期从peer下载的数据的字节数
    long long up_count; // 本计时周期向peer上传的数据的字节数
    float down_rate; // 本计时周期从peer处下载数据的速度
    float up_rate; // 本计时周期向peer处上传数据的速度
    struct _Peer *next; // 指向下一个Peer结构体
} Peer;

这个结构体在使用完之后释放的时候必须按照如下顺序,否则将会出现不可预料的错误。



int del_peer_node(Peer *peer)
{/*从peer链表中删除一个结点*/
    Peer *p = peer_head, *q;

    if(peer == NULL) return -1;

    while(p != NULL) {
        if( p == peer ) {
            if(p == peer_head) peer_head = p->next;
            else q->next = p->next;
            free_peer_node(p); // 可能存在问题

            return 0;
        } else {
            q = p;
            p = p->next;
        }
    }

    return -1;
}

int cancel_request_list(Peer *node)
{    // 撤消当前请求队列

    Request_piece *p;

    p = node->Request_piece_head;
    while(p != NULL) {/*从链表的头结点开始删除*/
        node->Request_piece_head = node->Request_piece_head->next;
        free(p);
        p = node->Request_piece_head;
    }

    return 0;
}

int cancel_requested_list(Peer *node)
{    // 撤消当前被请求队列

    Request_piece *p;
    
    p = node->Requested_piece_head;
    while(p != NULL) {
        node->Requested_piece_head = node->Requested_piece_head->next;
        free(p);
        p = node->Requested_piece_head;
    }
    
    return 0;
}

void free_peer_node(Peer *node)
{    /*释放一个peer 结点的内存*/
    if(node == NULL) return;
    if(node->bitmap.bitfield != NULL) {
        free(node->bitmap.bitfield);
        node->bitmap.bitfield = NULL;
    }
    if(node->in_buff != NULL) {
        free(node->in_buff);
        node->in_buff = NULL;
    }
    if(node->out_msg != NULL) {
        free(node->out_msg);
        node->out_msg = NULL;
    }
    if(node->out_msg_copy != NULL) {
        free(node->out_msg_copy);
        node->out_msg_copy = NULL;
    }

    cancel_request_list(node);/*撤销当前请求队列*/
    cancel_requested_list(node);/*撤销当前被请求队列*/

    // 释放完peer成员的内存后,再释放peer所占的内存

    free(node);
}

void release_memory_in_peer()
{/*释放peer管理模块中动态申请的内存,即释放peer 链表所所有的内存空间*/
    Peer *p;

    if(peer_head == NULL) return;

    p = peer_head;
    while(p != NULL) {
        peer_head = peer_head->next;
        free_peer_node(p);
        p = peer_head;
    }
}

5.添加注释的重要性。代码是自己写的,当然任何函数和变量的意义和命名规则自己都知道,但是别人读你的代码将会不知所云。因而读别人写的程序,从另一个方面来说也让我们能更好的清楚要在哪里添加注释才能使我们的程序变得容易阅读,添加怎样的注释就更加重要了。

6.复杂问题就是最基本问题的组合。这是老师的一个研究生告诉我们的,顺便就记了下来觉得很有道理!

7.看源代码是你的目标不同你的侧重点就会不一样。如果你只是想了解程序的大体框架,你直接看每个.h 文件,main.c文件和各个函数间的相互调用就好了。如果想了解程序的细节到底如何实现,则要仔细的揣摩函数实现的算法,等等。目标不同,阅读的方式也不同。我想有了这个程序做铺垫,我看内核源代码应该就会有一定的方向和经验了。

8.读源代码时一定要认准数据结构,各个函数之间都是有一些重要的变量相关连,那么理顺这些数据的结构和组织方式在读代码的时候也起到了至关重要的作用。

9.定义形如 char *s;这中字符串变量的时候一定要注意内存空间的动态分配和回收。

10.写程序想要实现特定的功能倒不是一件难事,难就难在对细节的处理上,对于他会出现怎样的错误,和错误出现后应该如何处理才是关键,否则小则产生bug,打则造成程序崩溃。

比如说这一段函数(虽然有点长,呵呵):


// 负责与所有Peer收发数据、交换消息

int download_upload_with_peers()
{
    Peer *p;
    int ret, max_sockfd, i;

    int connect_tracker, connecting_tracker;
    int connect_peer, connecting_peer;
    time_t last_time[3], now_time;

    time_t start_connect_tracker; // 开始连接tracker的时间

    time_t start_connect_peer; // 开始连接peer的时间

    fd_set rset, wset; // select要监视的描述符集合

    struct timeval tmval; // select函数的超时时间


    
    now_time = time(NULL);
    last_time[0] = now_time; // 上一次选择非阻塞peer的时间

    last_time[1] = now_time; // 上一次选择优化非阻塞peer的时间

    last_time[2] = now_time; // 上一次连接tracker服务器的时间

    connect_tracker = 1; // 是否需要连接tracker

    connecting_tracker = 0; // 是否正在连接tracker

    connect_peer = 0; // 是否需要连接peer

    connecting_peer = 0; // 是否正在连接peer


    for(;;) {
        max_sockfd = 0;
        now_time = time(NULL);
        
        // 每隔10秒重新选择非阻塞peer

        if(now_time-last_time[0] >= 10) {
            if(download_piece_num > 0 && peer_head != NULL) {
                compute_rate(); // 计算各个peer的下载、上传速度

                select_unchoke_peer(); // 选择非阻塞的peer

                last_time[0] = now_time;
            }
        }
        
        // 每隔30秒重新选择优化非阻塞peer

        if(now_time-last_time[1] >= 30) {
            if(download_piece_num > 0 && peer_head != NULL) {
                select_optunchoke_peer();
                last_time[1] = now_time;
            }
        }
        
        // 每隔5分钟连接一次tracker,如果当前peer数为0也连接tracker

        if((now_time-last_time[2] >= 300 || connect_tracker == 1) &&
            connecting_tracker != 1 && connect_peer != 1 && connecting_peer != 1) {
            // 由tracker的URL获取tracker的IP地址和端口号

            ret = prepare_connect_tracker(&max_sockfd);
            if(ret < 0) { printf("prepare_connect_tracker\n"); return -1; }

            connect_tracker = 0;
            connecting_tracker = 1;
            start_connect_tracker = now_time;
        }
        
        // 如果要连接新的peer

        if(connect_peer == 1) {
            // 创建套接字,向peer发出连接请求

            ret = prepare_connect_peer(&max_sockfd);
            if(ret < 0) { printf("prepare_connect_peer\n"); return -1; }

            connect_peer = 0;
            connecting_peer = 1;
            start_connect_peer = now_time;
        }

        FD_ZERO(&rset);
        FD_ZERO(&wset);

        // 将连接tracker的socket加入到待监视的集合中

        if(connecting_tracker == 1) {
            int flag = 1;
            // 如果连接tracker超过10秒,则终止连接tracker

            if(now_time-start_connect_tracker > 10) {
                for(i = 0; i < tracker_count; i++)
                    if(valid[i] != 0) close(sock[i]);
            } else {
                for(i = 0; i < tracker_count; i++) {
                    if(valid[i] != 0 && sock[i] > max_sockfd)
                        max_sockfd = sock[i]; // valid[i]值为-1、1、2时要监视

                    if(valid[i] == -1) {
                        FD_SET(sock[i],&rset);
                        FD_SET(sock[i],&wset);
                        if(flag == 1) flag = 0;
                    } else if(valid[i] == 1) {
                        FD_SET(sock[i],&wset);
                        if(flag == 1) flag = 0;
                    } else if(valid[i] == 2) {
                        FD_SET(sock[i],&rset);
                        if(flag == 1) flag = 0;
                    }
                }
            }
            // 说明连接tracker结束,开始与peer建立连接

            if(flag == 1) {
                connecting_tracker = 0;
                last_time[2] = now_time;
                clear_connect_tracker();
                clear_tracker_response();
                if(peer_addr_head != NULL) {
                    connect_tracker = 0;
                    connect_peer = 1;
                } else {
                    connect_tracker = 1;
                }
                continue;
            }
        }

        // 将正在连接peer的socket加入到待监视的集合中

        if(connecting_peer == 1) {
            int flag = 1;
            // 如果连接peer超过10秒,则终止连接peer        

            if(now_time-start_connect_peer > 10) {
                for(i = 0; i < peer_count; i++) {
                    if(peer_valid[i] != 1) close(peer_sock[i]); //不为1说明连接失败

                }
            } else {
                for(i = 0; i < peer_count; i++) {
                    if(peer_valid[i] == -1) {
                        if(peer_sock[i] > max_sockfd)
                            max_sockfd = peer_sock[i];
                        FD_SET(peer_sock[i],&rset);
                        FD_SET(peer_sock[i],&wset);
                        if(flag == 1) flag = 0;
                    }
                }
            }

            if(flag == 1) {
                connecting_peer = 0;
                clear_connect_peer();
                if(peer_head == NULL) connect_tracker = 1;
                continue;
            }
        }

        // 将peer的socket成员加入到待监视的集合中

        connect_tracker = 1;
        p = peer_head;
        while(p != NULL) {
            if(p->state != CLOSING && p->socket > 0) {
                FD_SET(p->socket,&rset);
                FD_SET(p->socket,&wset);
                if(p->socket > max_sockfd) max_sockfd = p->socket;
                connect_tracker = 0;
            }
            p = p->next;
        }
        if(peer_head==NULL && (connecting_tracker==1 || connecting_peer==1))
            connect_tracker = 0;
        if(connect_tracker == 1) continue;

        tmval.tv_sec = 2;
        tmval.tv_usec = 0;
        ret = select(max_sockfd+1,&rset,&wset,NULL,&tmval);
        if(ret < 0) {
            printf("%s:%d error\n",__FILE__,__LINE__);
            perror("select error");
            break;
        }
        if(ret == 0) continue;

        // 添加have消息,have消息要发送给每一个peer,放在此处是为了方便处理

        prepare_send_have_msg();

        // 对于每个peer,接收或发送消息,接收一条完整的消息就进行处理

        p = peer_head;
        while(p != NULL) {
            if( p->state != CLOSING && FD_ISSET(p->socket,&rset) ) {
                ret = recv(p->socket,p->in_buff+p->buff_len,MSG_SIZE-p->buff_len,0);
                if(ret <= 0) { // recv返回0说明对方关闭连接,返回负数说明出错

                    //if(ret < 0) perror("recv error");

                    p->state = CLOSING;
                    // 通过设置套接字选项来丢弃发送缓冲区中的数据

                    discard_send_buffer(p);
                    clear_btcache_before_peer_close(p);
                    close(p->socket);
                } else {
                    int completed, ok_len;
                    p->buff_len += ret;
                    completed = is_complete_message(p->in_buff,p->buff_len,&ok_len);
                    if (completed == 1) parse_response(p);
                    else if(p->buff_len >= threshold) {
                        parse_response_uncomplete_msg(p,ok_len);
                    } else {
                        p->start_timestamp = time(NULL);
                    }
                }
            }
            if( p->state != CLOSING && FD_ISSET(p->socket,&wset) ) {
                if( p->msg_copy_len == 0) {
                    // 创建待发送的消息,并把生成的消息拷贝到发送缓冲区并发送

                    create_response_message(p);
                    if(p->msg_len > 0) {
                        memcpy(p->out_msg_copy,p->out_msg,p->msg_len);
                        p->msg_copy_len = p->msg_len;
                        p->msg_len = 0; // 消息长度赋0,使p->out_msg所存消息清空

                    }    
                }        
                if(p->msg_copy_len > 1024) {
                    send(p->socket,p->out_msg_copy+p->msg_copy_index,1024,0);
                    p->msg_copy_len = p->msg_copy_len - 1024;
                    p->msg_copy_index = p->msg_copy_index + 1024;
                    p->recet_timestamp = time(NULL); // 记录最近一次发送消息给peer的时间

                }
                else if(p->msg_copy_len <= 1024 && p->msg_copy_len > 0 ) {
                    send(p->socket,p->out_msg_copy+p->msg_copy_index,p->msg_copy_len,0);
                    p->msg_copy_len = 0;
                    p->msg_copy_index = 0;
                    p->recet_timestamp = time(NULL); // 记录最近一次发送消息给peer的时间

                }
            }
            p = p->next;
        }

        if(connecting_tracker == 1) {
            for(i = 0; i < tracker_count; i++) {
                if(valid[i] == -1) {
                    // 如果某个套接字可写且未发生错误,说明连接建立成功

                    if(FD_ISSET(sock[i],&wset)) {
                        int error, len;
                        error = 0;
                        len = sizeof(error);
                        if(getsockopt(sock[i],SOL_SOCKET,SO_ERROR,&error,&len) < 0) {
                            valid[i] = 0;
                            close(sock[i]);
                        }
                        if(error) { valid[i] = 0; close(sock[i]); }
                        else { valid[i] = 1; }
                    }
                }
                if(valid[i] == 1 && FD_ISSET(sock[i],&wset) ) {
                    char request[1024];
                    unsigned short listen_port = 33550; // 本程序并未实现监听某端口

                    unsigned long down = total_down;
                    unsigned long up = total_up;
                    unsigned long left;
                    left = (pieces_length/20-download_piece_num)*piece_length;
                    
                    int num = i;
                    Announce_list *anouce = announce_list_head;
                    while(num > 0) {
                        anouce = anouce->next;
                        num--;
                    }
                    create_request(request,1024,anouce,listen_port,down,up,left,200);
                    write(sock[i], request, strlen(request));
                    valid[i] = 2;
                }
                if(valid[i] == 2 && FD_ISSET(sock[i],&rset)) {
                    char buffer[2048];
                    char redirection[128];
                    ret = read(sock[i], buffer, sizeof(buffer));
                    if(ret > 0) {
                        if(response_len != 0) {
                            memcpy(tracker_response+response_index,buffer,ret);
                            response_index += ret;
                            if(response_index == response_len) {
                                parse_tracker_response2(tracker_response,response_len);
                                clear_tracker_response();
                                valid[i] = 0;
                                close(sock[i]);
                                last_time[2] = time(NULL);
                            }
                        } else if(get_response_type(buffer,ret,&response_len) == 1) {
                            tracker_response = (char *)malloc(response_len);
                            if(tracker_response == NULL) printf("malloc error\n");
                            memcpy(tracker_response,buffer,ret);
                            response_index = ret;
                        } else {
                            ret = parse_tracker_response1(buffer,ret,redirection,128);
                            if(ret == 1) add_an_announce(redirection);
                            valid[i] = 0;
                            close(sock[i]);
                            last_time[2] = time(NULL);
                        }
                    }
                }
            }
        }

        if(connecting_peer == 1) {
            for(i = 0; i < peer_count; i++) {
                if(peer_valid[i] == -1 && FD_ISSET(peer_sock[i],&wset)) {
                    int error, len;
                    error = 0;
                    len = sizeof(error);
                    if(getsockopt(peer_sock[i],SOL_SOCKET,SO_ERROR,&error,&len) < 0) {
                        peer_valid[i] = 0;
                    }
                    if(error == 0) {
                        peer_valid[i] = 1;
                        add_peer_node_to_peerlist(&peer_sock[i],peer_addr[i]);
                    }
                } // end if

            } // end for

        } // end if

        
        // 对处于CLOSING状态的peer,将其从peer队列中删除

        // 此处应当非常小心,处理不当非常容易使程序崩溃

        p = peer_head;
        while(p != NULL) {
            if(p->state == CLOSING) {
                del_peer_node(p);
                p = peer_head;
            } else {
                p = p->next;
            }
        }

        // 判断是否已经下载完毕

        if(download_piece_num == pieces_length/20) {
            printf("++++++ All Files Downloaded Successfully +++++\n");
            break;
        }
    }

    return 0;
}


11.写程序的基本步骤还是一样的。只要掌握了程序的设计思想后,读别人的代码就和自己写的一样了,应为遵循的规则都是相同的。你只要了解该程序的设计思想和流程,读懂他们的结构和组织方式,细节问题则可以不必过多理会,这样对程序的大致思想也就抓住了。


阅读(4676) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~