Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5690002
  • 博文数量: 675
  • 博客积分: 20301
  • 博客等级: 上将
  • 技术积分: 7671
  • 用 户 组: 普通用户
  • 注册时间: 2005-12-31 16:15
文章分类

全部博文(675)

文章存档

2012年(1)

2011年(20)

2010年(14)

2009年(63)

2008年(118)

2007年(141)

2006年(318)

分类: C/C++

2011-09-13 16:03:12

帮lonelycastle做的libutp调研,主要是调研libutp如何使用。uTP是bittorrent引入的一种基于UDP的可靠传输,拥有ACK机制,使用基于窗口的拥塞控制,能够自适应的利用带宽资源,避免网络拥塞。libutp可以用于其他应用场景,未必仅限于bittorrent。

libutp使用的问题
1. libutp完全采用事件驱动方式,内部不维护数据,应用层需要创建一个流式缓冲区保存接受和发送的数据,可以直接使用evbuffer。
2. libutp线程不安全,utp.cpp中有很多全局数据,例如_global_stats, g_rst_infos, g_utp_sockets。因此不建议在多线程程序中使用,如果使用的话,在API使用前加锁进行同步。
3. 发送数据的时候,首先就数据放入每一个Session的发送缓冲区,然后调用UTP_Write。然是UTP_Write不能保证成功,应该在定时器中也进行调用【transmission的UTP_Write就是在定时器中调用的】。每次调用的时候write的字节数,为发送缓冲区中暂存的全部数据长度。
4. utp_on_state_change可以在UTP_Socket状态改变的时候调用UTP_Write。
5. UTP_IsIncomingUTP是UTP报文接收处理入口。如果是server端,UTP握手完成后会自动创建新的UTP_Socket,调用incomging函数,在incoming函数中创建相应的Session就可以了。
6. libutp不进行具体的发送,具体发送需要在UTP_Create的时候注册sendto函数,UTP_Connect和UTP_IsIncomingUTP中进行了设置。
7. 接收到数据后,utp_on_read将数据拷贝到应用层接收缓冲区;发送数据的时候(调用UTP_Write触发),utp_on_write将数据从应用层发送缓冲区拷贝给libutp。
8. 定期调用UTP_CheckTimeouts,进行超时检查。
9. utp_get_rb_size返回当前接受缓冲区的大小,用于libutp调整接收窗口,可以简单的返回0,避免接受缓冲区对libutp的影响。
10. socket资源销毁:
    应用主动关闭socket的时候,直接调用UTP_Close,会进入CS_FIN_SENT状态,接收到应答或者是check_timeout检查超时30s后,进入CS_DESTROY状态;
    如果对端主动关闭socket的时候,接收到FIN,如果之前就在CS_FIN_SENT,忽略,并按照上面一条策略处理。如果之前不是CS_FIN_SENT,会进入CS_GOT_FIN状态,以UTP_STATE_EOF调用on_state_change函数。
 
    check_timeout函数中检查进入CS_DESTROY或者是CS_RESET状态,其中CS_DESTROY状态在check_timeout后立即调用UTP_Free删除socket资源;CS_RESET状态需要再次调用UTP_Close,设置成CS_DESTROY,最终通过check_timeout进入UTP_Free,并以UTP_STATE_DESTROYING调用on_state_change,并释放UTPSocket资源。
【注意】:示例代码中,关于资源释放的部分,可以参照10进行改进。由于多点进行应用层会话资源回收,建议采用引用计数的方式进行资源管理。
 
CONNECT_FULL问题
修改libutp的utp_log,将UTP日志输出,发送大量的CONNECT_FULL,这个本来是正常的。但是libutp的实现有些问题。
UTP_Write中有类似如下的逻辑:
  1. while(is_writable(conn, num_to_send))
  2. {
  3.     if(num_to_send==0)
  4.         return true;
  5.     bytes = write_outgoing_packet();
  6.     num_to_send -= bytes;
  7. }
UTP_Write中会使用is_writable()检查UTP_Socket是否可写,如果如果传递给is_writable()的参数为0的话,会导致is_writable()返回false,并设置conn->state为CONNECT_FULL,从而影响后续的发送。可以修改libutp判断如果传入参数为0,直接返回true,避免设置UTP_Socket的状态。

libutp使用示例
libutp由于是事件驱动,使用libevent配合起来就十分的方便,下面就使用libutp和libevent实现一个简单的echo,客户端每次从0开始发送8字节的字符串,每次进行累加;服务器端进行echo。
代码写的比较仓促,大家凑或着看吧;-)

服务器端
  1. #include <stdio.h>
  2. #include <sys/types.h> /* See NOTES */
  3. #include <sys/socket.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. #include <pthread.h>

  7. #include <map>
  8. #include "event2/event.h"
  9. #include "event2/buffer.h"

  10. #include "utp.h"

  11. #ifndef DEF_UTP_TIMEOUT
  12. #define DEF_UTP_TIMEOUT 1
  13. #endif
  14. #ifndef DEF_UTP_PORT
    #define DEF_UTP_PORT 2345
    #endif
    #ifndef MAX_BUF_LEN
    #define MAX_BUF_LEN 65536
    #endif
    #ifndef NO_LOG
    #define LOG(format,args...) \
    do{\
        char tmpstr[65536];\
            snprintf(tmpstr,65535,format,##args);\
            printf("[thread:%u] %s\n", pthread_self(), tmpstr); \
    }while(0)
    #else
    #define LOG(format,args...)
    #endif
    void utp_on_read(void *userdata, const unsigned char *buf, size_t buf_len);
    void utp_on_write(void *userdata, unsigned char *buf, size_t buf_len);
    size_t utp_get_rb_size(void *userdata);
    void utp_on_state_change(void *userdata, int state);
    void utp_on_error(void *userdata, int errcode);
    void utp_on_overhead(void *userdata, bool send, size_t count, int type);
    void utp_sendto(void *send_to_userdata, const unsigned char *buf, size_t buf_len,
            const struct sockaddr *to, socklen_t to_len);
    struct UTPFunctionTable utp_func_table =
    {
        utp_on_read,
        utp_on_write,
        utp_get_rb_size,
        utp_on_state_change,
        utp_on_error,
        utp_on_overhead
    };
    int g_count = 0;
    ////////////////////////////////
    //
    class Session
    {
    public:
        Session(UTPSocket *conn, void *data) : _conn(conn), _data(data)
        {
                    _in_buf = evbuffer_new();
                    _out_buf = evbuffer_new();
        }
        ~Session()
        {
                    if(_conn)
                    {
                            UTP_Close(_conn);
                            _conn = NULL;
                    }
                    if(_in_buf)
                    {
                            evbuffer_free(_in_buf);
                            _in_buf = NULL;
                    }
                    if(_out_buf)
                    {
                            evbuffer_free(_out_buf);
                            _out_buf = NULL;
                    }
        }
            void Write()
            {
                    size_t old_size = evbuffer_get_length(_out_buf);
                    if(old_size>0)
                            UTP_Write(_conn, old_size);
            }
            void PushInbuf(const char *buf, int buf_len)
            {
                    evbuffer_add(_in_buf, buf, buf_len);
            }
            int PopInbuf(char *buf, int buf_len)
            {
                    int _buf_len = evbuffer_get_length(_in_buf);
                    if(_buf_len>buf_len)
                    {
                            evbuffer_remove(_in_buf, buf, buf_len);
                            return buf_len;
                    }
                    else
                    {
                            evbuffer_remove(_in_buf, buf, _buf_len);
                            return _buf_len;
                    }
            }
            void PushOutbuf(const char *buf, int buf_len)
            {
                    evbuffer_add(_out_buf, buf, buf_len);
            }
            int PopOutbuf(char *buf, int buf_len)
            {
                    int _buf_len = evbuffer_get_length(_out_buf);
                    if(_buf_len>buf_len)
                    {
                            evbuffer_remove(_out_buf, buf, buf_len);
                            return buf_len;
                    }
                    else
                    {
                            evbuffer_remove(_out_buf, buf, _buf_len);
                            return _buf_len;
                    }
            }
        UTPSocket *Conn() {return _conn;}
    public:
        UTPSocket *_conn;
            struct evbuffer *_in_buf;
            struct evbuffer *_out_buf;
            void *_data;
    };
    class Listener
    {
    public:
        Listener(int port) : _sock(-1), _port(port)
        {
                    Init();
        }
        ~Listener()
        {
                    Dispose();
        }
            static void utp_incoming(void *send_to_userdata, UTPSocket *conn);
            static void utp_sendto(void *send_to_userdata, const unsigned char *buf, size_t buf_len,
            const struct sockaddr *to, socklen_t to_len);
            static void utp_callback(int fd, short events, void *arg);
            static void utp_timer_callback(int fd, short events, void *arg);
            void Init();
            void Dispose();
            void Run();
            void AddSession(Session *session)
            {
                    int key = (int) session;
                    _sessions.insert(std::pair(key, session));
            }
            void DelSession(Session *session)
            {
                    int key = (int) session;
                    _sessions.erase(key);
                    delete session;
            }
        int Sock() {return _sock;}
        int Bind();
    private:
        int _sock;
        int _port;
            struct event_base* _base;
            struct event* _event;
            struct event* _timer;
            std::map _sessions;
    };
    int Listener::Bind()
    {
        int ret = 0;
        struct sockaddr_in addr;
        _sock = socket(AF_INET, SOCK_DGRAM, 0);
        if(_sock ==-1)
        {
            perror("socket");
            goto failed;
        }
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
        addr.sin_port = htons(_port);
        ret = bind(_sock, (struct sockaddr *)&addr, sizeof(addr));
        if(ret==-1)
        {
            perror("bind");
            goto failed;
        }
        return 0;
    failed:
        if(_sock)
        {
            close(_sock);
            _sock = -1;
        }
        return -1;
    }
    void Listener::Init()
    {
            //bind
            Bind();
            //init event
            _base = event_base_new();
            _event = event_new(_base, _sock, EV_READ|EV_PERSIST, utp_callback, this);
            event_add(_event, NULL);
            struct timeval tv = {DEF_UTP_TIMEOUT, 0};
            _timer = evtimer_new(_base, utp_timer_callback, this);
            evtimer_add(_timer, &tv);
    }
    void Listener::Dispose()
    {
            if(_event)
            {
                    event_free(_event);
                    _event = NULL;
            }
            if(_timer)
            {
                    event_free(_timer);
                    _timer = NULL;
            }
            if(_base)
            {
                    event_base_free(_base);
                    _base = NULL;
            }
            std::map::iterator it;
            for(it=_sessions.begin(); it!=_sessions.end(); it++)
            {
                    Session *ss = it->second;
                    _sessions.erase(it);
                    delete ss;
            }
            if(_sock>0)
            {
                    close(_sock);
                    _sock = -1;
            }
    }
    void Listener::utp_incoming(void *send_to_userdata, UTPSocket *conn)
    {
        Listener *listener = (Listener *)send_to_userdata;
        LOG("[%s]", __func__);
        //XXX:new session
        Session *session = new Session(conn, listener);
        UTP_SetSockopt(conn, SO_RCVBUF, MAX_BUF_LEN);
        UTP_SetSockopt(conn, SO_SNDBUF, MAX_BUF_LEN);
        UTP_SetCallbacks(conn, &utp_func_table, session);
            listener->AddSession(session);
    }
    void Listener::utp_sendto(void *send_to_userdata, const unsigned char *buf, size_t buf_len,
            const struct sockaddr *to, socklen_t to_len)
    {
        Listener *listener = (Listener *)send_to_userdata;
        int ret = 0;
        int socket = listener->_sock;
        LOG("[%s] send %s:%d[%d]", __func__, inet_ntoa(((struct sockaddr_in *)to)->sin_addr),
                ntohs(((struct sockaddr_in *)to)->sin_port), buf_len);
        //XXX:get socket from send_to_userdata
       
        do {
            ret = sendto(socket, buf, buf_len, 0, to, to_len);
                    if(ret<0)
                    {
                            perror("sendto");
                            usleep(2000);
                    }
        } while(ret!=buf_len);
    }
    void Listener::utp_callback(int fd, short events, void *arg)
    {
            Listener *listener = (Listener *)arg;
            int socket = listener->_sock;
            if(events & EV_TIMEOUT)
            {
                    //listener->Write();
                    UTP_CheckTimeouts();
            }
            if(events & EV_READ)
            {
                    //XXX: Recv
                    int ret = 0;
                    char recv_buf[MAX_BUF_LEN] = {0};
                    struct sockaddr_in from;
                    socklen_t from_len = sizeof(struct sockaddr);
                    ret = recvfrom(socket, (byte*)recv_buf, sizeof(recv_buf)-1, 0, (struct sockaddr*)&from, &from_len);
                    if(ret<0)
                    {
                            perror("recvfrom");
                    }
                    LOG("[%s] recvfrom %s:%d %d bytes", __func__, inet_ntoa(from.sin_addr), ntohs(from.sin_port), ret);
                    //XXX:send_to_userdata MUST contain listen_socket
                    UTP_IsIncomingUTP(utp_incoming, utp_sendto, listener, (const byte*)recv_buf, ret, (struct sockaddr *)&from, from_len);
            }
    }
    void Listener::utp_timer_callback(int fd, short events, void *arg)
    {
            Listener *listener = (Listener *)arg;
            std::map::iterator it;
            for(it=listener->_sessions.begin(); it!=listener->_sessions.end(); it++)
            {
                    it->second->Write();
            }
            UTP_CheckTimeouts();
           
            struct timeval tv = {DEF_UTP_TIMEOUT, 0};
            evtimer_add(listener->_timer, &tv);
    }
    void Listener::Run()
    {
            event_base_dispatch(_base);
    }
    ////////////////////////////////
    //
    void utp_process(Session *ss, size_t buf_len)
    {
            char buf[MAX_BUF_LEN] = {0};
            int len = ss->PopInbuf(buf, MAX_BUF_LEN);
        LOG("[%s] recv[%d]: [%s]", __func__, buf_len, buf);
    //      char input[128]={0};
    //      sprintf(input, "%08d", ++g_count);
    //      ss->PushOutbuf((const char *)input, len);
            ss->PushOutbuf(buf, len);
            ss->Write();
    }
    /////////////////////////////////
    //
    void utp_on_read(void *userdata, const unsigned char *buf, size_t buf_len)
    {
        Session *ss = (Session *)userdata;
        LOG("[%s] recv[%d]: [%s]", __func__, buf_len, buf);
            ss->PushInbuf((const char *)buf, buf_len);
        //XXX: copy and process
        utp_process(ss, buf_len);
    }
    void utp_on_write(void *userdata, unsigned char *buf, size_t buf_len)
    {
        Session *ss = (Session *)userdata;
            ss->PopOutbuf((char *)buf, buf_len);
            buf[buf_len] = '\0';
        LOG("[%s] write[%d]: [%s]", __func__, buf_len, buf);
    }
    size_t utp_get_rb_size(void *userdata)
    {
        Session *ss = (Session *)userdata;
            //FIXME:
        //int rbsize = MAX_BUF_LEN - evbuffer_get_length(ss->_in_buf);
        int rbsize = evbuffer_get_length(ss->_in_buf);
        LOG("[%s] : %d", __func__, rbsize);
        return rbsize;
    }
    void utp_on_state_change(void *userdata, int state)
    {
        Session *ss = (Session *)userdata;
        LOG("[%s] [state:%d]", __func__, state);
        if(state==UTP_STATE_WRITABLE)
        {
                    ss->Write();
        }
    }
    void utp_on_error(void *userdata, int errcode)
    {
        Session *ss = (Session *)userdata;
            Listener *listener = (Listener *)(ss->_data);
            listener->DelSession(ss);
        //delete ss;
        LOG("[%s] [err:%d]", __func__, errcode);
    }
    void utp_on_overhead(void *userdata, bool send, size_t count, int type)
    {
        LOG("[%s] [send:%s] [count:%d] [type:%d]", __func__,
                send==true?"true":"false", count, type);
    }

    int main()
    {
            Listener listener(DEF_UTP_PORT);

            listener.Run();
            return 0;
    }
    阅读(11678) | 评论(3) | 转发(0) |
    给主人留下些什么吧!~~

    CUDev2011-11-21 16:07:22

    wangxuefan1220: 对于你所的CONNECT_FULL问题,我觉得是不存在的。
    当is_writable(conn, num_to_send)的返回值为0时,conn的状态是会修改为CONNECT_FULL,但是utp在内部进行周期.....
    呵呵,希望这篇文章能够对你有帮助

    CUDev2011-11-21 16:06:45

    wangxuefan1220: 对于你所的CONNECT_FULL问题,我觉得是不存在的。
    当is_writable(conn, num_to_send)的返回值为0时,conn的状态是会修改为CONNECT_FULL,但是utp在内部进行周期.....
    其实我的意思是避免在0的时候进入CONNECT_FULL,这样还要到check_timeout的时候才可能修改状态

    wangxuefan12202011-11-14 10:11:14

    对于你所的CONNECT_FULL问题,我觉得是不存在的。
    当is_writable(conn, num_to_send)的返回值为0时,conn的状态是会修改为CONNECT_FULL,但是utp在内部进行周期性事务处理的时候,如果发现conn又变为可写时,会重新启动发送数据,这样并不会导致数据传输的问题。

    以上是我对utp协议和你不同的理解,希望能就这个问题与你再探讨。
    Email:wangxuefan1220@126.com