帮lonelycastle做的libutp调研,主要是调研libutp如何使用。uTP是bittorrent引入的一种基于UDP的可靠传输,拥有ACK机制,使用基于窗口的拥塞控制,能够自适应的利用带宽资源,避免网络拥塞。libutp可以用于其他应用场景,未必仅限于bittorrent。
libutp使用的问题
1. libutp完全采用事件驱动方式,内部不维护数据,应用层需要创建一个流式缓冲区保存接受和发送的数据,可以直接使用evbuffer。
2. libutp线程不安全,utp.cpp中有很多全局数据,例如_global_stats, g_rst_infos, g_utp_sockets。因此不建议在多线程程序中使用,如果使用的话,在API使用前加锁进行同步。
3. 发送数据的时候,首先就数据放入每一个Session的发送缓冲区,然后调用UTP_Write。然是UTP_Write不能保证成功,应该在定时器中也进行调用【transmission的UTP_Write就是在定时器中调用的】。每次调用的时候write的字节数,为发送缓冲区中暂存的全部数据长度。
4. utp_on_state_change可以在UTP_Socket状态改变的时候调用UTP_Write。
5. UTP_IsIncomingUTP是UTP报文接收处理入口。如果是server端,UTP握手完成后会自动创建新的UTP_Socket,调用incomging函数,在incoming函数中创建相应的Session就可以了。
6. libutp不进行具体的发送,具体发送需要在UTP_Create的时候注册sendto函数,UTP_Connect和UTP_IsIncomingUTP中进行了设置。
7. 接收到数据后,utp_on_read将数据拷贝到应用层接收缓冲区;发送数据的时候(调用UTP_Write触发),utp_on_write将数据从应用层发送缓冲区拷贝给libutp。
8. 定期调用UTP_CheckTimeouts,进行超时检查。
9. utp_get_rb_size返回当前接受缓冲区的大小,用于libutp调整接收窗口,可以简单的返回0,避免接受缓冲区对libutp的影响。
10. socket资源销毁:
应用主动关闭socket的时候,直接调用UTP_Close,会进入CS_FIN_SENT状态,接收到应答或者是check_timeout检查超时30s后,进入CS_DESTROY状态;
如果对端主动关闭socket的时候,接收到FIN,如果之前就在CS_FIN_SENT,忽略,并按照上面一条策略处理。如果之前不是CS_FIN_SENT,会进入CS_GOT_FIN状态,以UTP_STATE_EOF调用on_state_change函数。
check_timeout函数中检查进入CS_DESTROY或者是CS_RESET状态,其中CS_DESTROY状态在check_timeout后立即调用UTP_Free删除socket资源;CS_RESET状态需要再次调用UTP_Close,设置成CS_DESTROY,最终通过check_timeout进入UTP_Free,并以UTP_STATE_DESTROYING调用on_state_change,并释放UTPSocket资源。
【注意】:示例代码中,关于资源释放的部分,可以参照10进行改进。由于多点进行应用层会话资源回收,建议采用引用计数的方式进行资源管理。
CONNECT_FULL问题
修改libutp的utp_log,将UTP日志输出,发送大量的CONNECT_FULL,这个本来是正常的。但是libutp的实现有些问题。
UTP_Write中有类似如下的逻辑:
- while(is_writable(conn, num_to_send))
- {
- if(num_to_send==0)
- return true;
- bytes = write_outgoing_packet();
- num_to_send -= bytes;
- }
UTP_Write中会使用is_writable()检查UTP_Socket是否可写,如果如果传递给is_writable()的参数为0的话,会导致is_writable()返回false,并设置conn->state为CONNECT_FULL,从而影响后续的发送。可以修改libutp判断如果传入参数为0,直接返回true,避免设置UTP_Socket的状态。
libutp使用示例
libutp由于是事件驱动,使用libevent配合起来就十分的方便,下面就使用libutp和libevent实现一个简单的echo,客户端每次从0开始发送8字节的字符串,每次进行累加;服务器端进行echo。
代码写的比较仓促,大家凑或着看吧;-)
服务器端
- #include <stdio.h>
- #include <sys/types.h> /* See NOTES */
- #include <sys/socket.h>
- #include <string.h>
- #include <unistd.h>
- #include <pthread.h>
- #include <map>
- #include "event2/event.h"
- #include "event2/buffer.h"
- #include "utp.h"
- #ifndef DEF_UTP_TIMEOUT
- #define DEF_UTP_TIMEOUT 1
- #endif
#ifndef DEF_UTP_PORT
#define DEF_UTP_PORT 2345
#endif
#ifndef MAX_BUF_LEN
#define MAX_BUF_LEN 65536
#endif
#ifndef NO_LOG
#define LOG(format,args...) \
do{\
char tmpstr[65536];\
snprintf(tmpstr,65535,format,##args);\
printf("[thread:%u] %s\n", pthread_self(), tmpstr); \
}while(0)
#else
#define LOG(format,args...)
#endif
void utp_on_read(void *userdata, const unsigned char *buf, size_t buf_len);
void utp_on_write(void *userdata, unsigned char *buf, size_t buf_len);
size_t utp_get_rb_size(void *userdata);
void utp_on_state_change(void *userdata, int state);
void utp_on_error(void *userdata, int errcode);
void utp_on_overhead(void *userdata, bool send, size_t count, int type);
void utp_sendto(void *send_to_userdata, const unsigned char *buf, size_t buf_len,
const struct sockaddr *to, socklen_t to_len);
struct UTPFunctionTable utp_func_table =
{
utp_on_read,
utp_on_write,
utp_get_rb_size,
utp_on_state_change,
utp_on_error,
utp_on_overhead
};
int g_count = 0;
////////////////////////////////
//
class Session
{
public:
Session(UTPSocket *conn, void *data) : _conn(conn), _data(data)
{
_in_buf = evbuffer_new();
_out_buf = evbuffer_new();
}
~Session()
{
if(_conn)
{
UTP_Close(_conn);
_conn = NULL;
}
if(_in_buf)
{
evbuffer_free(_in_buf);
_in_buf = NULL;
}
if(_out_buf)
{
evbuffer_free(_out_buf);
_out_buf = NULL;
}
}
void Write()
{
size_t old_size = evbuffer_get_length(_out_buf);
if(old_size>0)
UTP_Write(_conn, old_size);
}
void PushInbuf(const char *buf, int buf_len)
{
evbuffer_add(_in_buf, buf, buf_len);
}
int PopInbuf(char *buf, int buf_len)
{
int _buf_len = evbuffer_get_length(_in_buf);
if(_buf_len>buf_len)
{
evbuffer_remove(_in_buf, buf, buf_len);
return buf_len;
}
else
{
evbuffer_remove(_in_buf, buf, _buf_len);
return _buf_len;
}
}
void PushOutbuf(const char *buf, int buf_len)
{
evbuffer_add(_out_buf, buf, buf_len);
}
int PopOutbuf(char *buf, int buf_len)
{
int _buf_len = evbuffer_get_length(_out_buf);
if(_buf_len>buf_len)
{
evbuffer_remove(_out_buf, buf, buf_len);
return buf_len;
}
else
{
evbuffer_remove(_out_buf, buf, _buf_len);
return _buf_len;
}
}
UTPSocket *Conn() {return _conn;}
public:
UTPSocket *_conn;
struct evbuffer *_in_buf;
struct evbuffer *_out_buf;
void *_data;
};
class Listener
{
public:
Listener(int port) : _sock(-1), _port(port)
{
Init();
}
~Listener()
{
Dispose();
}
static void utp_incoming(void *send_to_userdata, UTPSocket *conn);
static void utp_sendto(void *send_to_userdata, const unsigned char *buf, size_t buf_len,
const struct sockaddr *to, socklen_t to_len);
static void utp_callback(int fd, short events, void *arg);
static void utp_timer_callback(int fd, short events, void *arg);
void Init();
void Dispose();
void Run();
void AddSession(Session *session)
{
int key = (int) session;
_sessions.insert(std::pair(key, session));
}
void DelSession(Session *session)
{
int key = (int) session;
_sessions.erase(key);
delete session;
}
int Sock() {return _sock;}
int Bind();
private:
int _sock;
int _port;
struct event_base* _base;
struct event* _event;
struct event* _timer;
std::map _sessions;
};
int Listener::Bind()
{
int ret = 0;
struct sockaddr_in addr;
_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(_sock ==-1)
{
perror("socket");
goto failed;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(_port);
ret = bind(_sock, (struct sockaddr *)&addr, sizeof(addr));
if(ret==-1)
{
perror("bind");
goto failed;
}
return 0;
failed:
if(_sock)
{
close(_sock);
_sock = -1;
}
return -1;
}
void Listener::Init()
{
//bind
Bind();
//init event
_base = event_base_new();
_event = event_new(_base, _sock, EV_READ|EV_PERSIST, utp_callback, this);
event_add(_event, NULL);
struct timeval tv = {DEF_UTP_TIMEOUT, 0};
_timer = evtimer_new(_base, utp_timer_callback, this);
evtimer_add(_timer, &tv);
}
void Listener::Dispose()
{
if(_event)
{
event_free(_event);
_event = NULL;
}
if(_timer)
{
event_free(_timer);
_timer = NULL;
}
if(_base)
{
event_base_free(_base);
_base = NULL;
}
std::map::iterator it;
for(it=_sessions.begin(); it!=_sessions.end(); it++)
{
Session *ss = it->second;
_sessions.erase(it);
delete ss;
}
if(_sock>0)
{
close(_sock);
_sock = -1;
}
}
void Listener::utp_incoming(void *send_to_userdata, UTPSocket *conn)
{
Listener *listener = (Listener *)send_to_userdata;
LOG("[%s]", __func__);
//XXX:new session
Session *session = new Session(conn, listener);
UTP_SetSockopt(conn, SO_RCVBUF, MAX_BUF_LEN);
UTP_SetSockopt(conn, SO_SNDBUF, MAX_BUF_LEN);
UTP_SetCallbacks(conn, &utp_func_table, session);
listener->AddSession(session);
}
void Listener::utp_sendto(void *send_to_userdata, const unsigned char *buf, size_t buf_len,
const struct sockaddr *to, socklen_t to_len)
{
Listener *listener = (Listener *)send_to_userdata;
int ret = 0;
int socket = listener->_sock;
LOG("[%s] send %s:%d[%d]", __func__, inet_ntoa(((struct sockaddr_in *)to)->sin_addr),
ntohs(((struct sockaddr_in *)to)->sin_port), buf_len);
//XXX:get socket from send_to_userdata
do {
ret = sendto(socket, buf, buf_len, 0, to, to_len);
if(ret<0)
{
perror("sendto");
usleep(2000);
}
} while(ret!=buf_len);
}
void Listener::utp_callback(int fd, short events, void *arg)
{
Listener *listener = (Listener *)arg;
int socket = listener->_sock;
if(events & EV_TIMEOUT)
{
//listener->Write();
UTP_CheckTimeouts();
}
if(events & EV_READ)
{
//XXX: Recv
int ret = 0;
char recv_buf[MAX_BUF_LEN] = {0};
struct sockaddr_in from;
socklen_t from_len = sizeof(struct sockaddr);
ret = recvfrom(socket, (byte*)recv_buf, sizeof(recv_buf)-1, 0, (struct sockaddr*)&from, &from_len);
if(ret<0)
{
perror("recvfrom");
}
LOG("[%s] recvfrom %s:%d %d bytes", __func__, inet_ntoa(from.sin_addr), ntohs(from.sin_port), ret);
//XXX:send_to_userdata MUST contain listen_socket
UTP_IsIncomingUTP(utp_incoming, utp_sendto, listener, (const byte*)recv_buf, ret, (struct sockaddr *)&from, from_len);
}
}
void Listener::utp_timer_callback(int fd, short events, void *arg)
{
Listener *listener = (Listener *)arg;
std::map::iterator it;
for(it=listener->_sessions.begin(); it!=listener->_sessions.end(); it++)
{
it->second->Write();
}
UTP_CheckTimeouts();
struct timeval tv = {DEF_UTP_TIMEOUT, 0};
evtimer_add(listener->_timer, &tv);
}
void Listener::Run()
{
event_base_dispatch(_base);
}
////////////////////////////////
//
void utp_process(Session *ss, size_t buf_len)
{
char buf[MAX_BUF_LEN] = {0};
int len = ss->PopInbuf(buf, MAX_BUF_LEN);
LOG("[%s] recv[%d]: [%s]", __func__, buf_len, buf);
// char input[128]={0};
// sprintf(input, "%08d", ++g_count);
// ss->PushOutbuf((const char *)input, len);
ss->PushOutbuf(buf, len);
ss->Write();
}
/////////////////////////////////
//
void utp_on_read(void *userdata, const unsigned char *buf, size_t buf_len)
{
Session *ss = (Session *)userdata;
LOG("[%s] recv[%d]: [%s]", __func__, buf_len, buf);
ss->PushInbuf((const char *)buf, buf_len);
//XXX: copy and process
utp_process(ss, buf_len);
}
void utp_on_write(void *userdata, unsigned char *buf, size_t buf_len)
{
Session *ss = (Session *)userdata;
ss->PopOutbuf((char *)buf, buf_len);
buf[buf_len] = '\0';
LOG("[%s] write[%d]: [%s]", __func__, buf_len, buf);
}
size_t utp_get_rb_size(void *userdata)
{
Session *ss = (Session *)userdata;
//FIXME:
//int rbsize = MAX_BUF_LEN - evbuffer_get_length(ss->_in_buf);
int rbsize = evbuffer_get_length(ss->_in_buf);
LOG("[%s] : %d", __func__, rbsize);
return rbsize;
}
void utp_on_state_change(void *userdata, int state)
{
Session *ss = (Session *)userdata;
LOG("[%s] [state:%d]", __func__, state);
if(state==UTP_STATE_WRITABLE)
{
ss->Write();
}
}
void utp_on_error(void *userdata, int errcode)
{
Session *ss = (Session *)userdata;
Listener *listener = (Listener *)(ss->_data);
listener->DelSession(ss);
//delete ss;
LOG("[%s] [err:%d]", __func__, errcode);
}
void utp_on_overhead(void *userdata, bool send, size_t count, int type)
{
LOG("[%s] [send:%s] [count:%d] [type:%d]", __func__,
send==true?"true":"false", count, type);
}
int main()
{
Listener listener(DEF_UTP_PORT);
listener.Run();
return 0;
}
阅读(11852) | 评论(3) | 转发(0) |