6. 使用chain作为收发数据的缓存。收取时,chain达到长度才开始处理;发送包时,缓存到chain,然后启动异步发送即可。
整个机制虽然有所简化,比起同步的调用,要复杂至少10倍。
首先,下载和解压nginx:
tar xf nginx-1.5.0.tar.gz && tar xf nginx-rtmp-module-1.0.4.tar.gz
cd nginx-1.5.0 && ./configure --add-module=/home/winlin/nginx-rtmp-module-1.0.4 --with-http_ssl_module --prefix=`pwd`/release
修改编译参数,将优化去掉:
sed -i "s/-O /-O0 /g" objs/Makefile
编译:
make && make install
配置RTMP:
vi conf/nginx.conf
添加如下:
daemon off;
master_process off;
rtmp {
server {
listen 19352;
application live {
live on;
allow publish all;
allow play all;
}
}
}
配置可以在nginx.c中看到:
{ ngx_string("daemon"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
0,
offsetof(ngx_core_conf_t, daemon),
NULL },
跳到函数:ngx_conf_set_flag_slot,可以看到应该配置为on或off。
开始调试。
ultimate IDE的project如附件:
目录结构如下:
E:\Chnvideo\research\nginx
main
nginx-1.5.0
nginx-rtmp-module-1.0.4
只要将nginx和rtmp模块解压到这个目录,然后将当前目录(E:\Chnvideo\research\nginx)在IDE打开即可。
IDE=>New assembly => package nests 输入:E:\Chnvideo\research\nginx
Rtmp模块初始化
rtmp模块初始化函数调用如下:
(gdb) bt
#0 ngx_rtmp_optimize_servers (cf=0x7fffffffe1d0, ports=0x7fffffffddb0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:605
#1 0x00000000004ae362 in ngx_rtmp_block (cf=0x7fffffffe1d0, cmd=0x711680, conf=0xa4da78) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:312
#2 0x000000000041f173 in ngx_conf_handler (cf=0x7fffffffe1d0, last=1) at src/core/ngx_conf_file.c:387
#3 0x000000000041ed1e in ngx_conf_parse (cf=0x7fffffffe1d0, filename=0xa4ce00) at src/core/ngx_conf_file.c:243
#4 0x000000000041b9cc in ngx_init_cycle (old_cycle=0x7fffffffe310) at src/core/ngx_cycle.c:268
#5 0x0000000000406230 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:333
(gdb) f
#0 ngx_rtmp_optimize_servers (cf=0x7fffffffe1d0, ports=0x7fffffffddb0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:605
605 ls->handler = ngx_rtmp_init_connection;
默认将accept之后的处理函数设置为了ngx_rtmp_init_connection。
Nginx的回调函数
在ngx_rtmp_init_connection(ngx_rtmp_init.c:132)中设置断点,可以看到接受连接和处理handshake的过程。
#0 ngx_rtmp_init_connection (c=0x7ffff7fad190) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_init.c:31
#1 0x000000000042befe in ngx_event_accept (ev=0xa7b370) at src/event/ngx_event_accept.c:357
#2 0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=18446744073709551615, flags=1) at src/event/modules/ngx_epoll_module.c:683
#3 0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#4 0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#5 0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
在ngx_event_accept(ngx_event_accept.c:206)中,接受到连接后进行处理,其中设置了recvchain:
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
这两个值是宏定义,在ngx_event.h:458:
extern ngx_os_io_t ngx_io;
#define ngx_recv_chain ngx_io.recv_chain
#define ngx_send_chain ngx_io.send_chain
这个ngx_io是个extern的变量,grep搜索,发现定义在:
[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_io\;"
src/core/ngx_connection.c:13:ngx_os_io_t ngx_io;
[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_io ="|grep epoll
src/event/modules/ngx_epoll_module.c:329: ngx_io = ngx_os_io;
所以会在epoll这个模块被重置,可以设置断点。
b ngx_epoll_module.c:329
Breakpoint 6, ngx_epoll_init (cycle=0xa4cca0, timer=0) at src/event/modules/ngx_epoll_module.c:329
329 ngx_io = ngx_os_io;
(gdb) p &ngx_io
$32 = (ngx_os_io_t *) 0xa222a0
(gdb) p &ngx_os_io
$33 = (ngx_os_io_t *) 0x703440
(gdb) p ngx_os_io
$34 = {recv = 0x42f8b0 , recv_chain = 0x42f9c8 , udp_recv = 0x42fc74 , send = 0x42fd34 , send_chain = 0x435ee4 , flags = 1}
(gdb) p ngx_io
$35 = {recv = 0, recv_chain = 0, udp_recv = 0, send = 0, send_chain = 0, flags = 0}
在connection.c中的初始化只是默认初始化为0,只有在ngx_os_io这个里面才是真正用到的。
这个ngx_os_io是在这里定义的:
[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_os_io \="|grep linux
src/os/unix/ngx_linux_init.c:75: ngx_os_io = ngx_linux_io;
这个结构体定义为:
static ngx_os_io_t ngx_linux_io = {
ngx_unix_recv,
ngx_readv_chain,
ngx_unix_send,
ngx_linux_sendfile_chain,
};
函数定义在ngx_readv_chain.c:19和ngx_linux_sendfile_chain.c:38(对于rtmp还是用的writev,不是用的sendfile)。
处理函数的改变
ngx_rtmp_init_connection里面调用了ngx_rtmp_init_session和ngx_rtmp_handshake。
ngx_rtmp_init_session先调用ngx_rtmp_set_chunk_size,然后调用ngx_rtmp_fire_event激发了事件NGX_RTMP_CONNECT。
ngx_rtmp_set_chunk_size,第一次将chunk-size设为128(NGX_RTMP_DEFAULT_CHUNK_SIZE),不需要发包。
ngx_rtmp_fire_event的handler可以搜索:NGX_RTMP_CONNECT,定义在:ngx_rtmp_limit_postconfiguration
h = ngx_array_push(&cmcf->events[NGX_RTMP_CONNECT]);
*h = ngx_rtmp_limit_connect;
所以NGX_RTMP_CONNECT的handler应该是ngx_rtmp_limit_connect,这个函数检查一个共享的变量,然后判断是否超过连接数,返回错误或者OK:
rc = n > (ngx_uint_t) lmcf->max_conn ? NGX_ERROR : NGX_OK;
若可以接受这个连接,则开始握手,调用函数ngx_rtmp_handshake如下:
#0 ngx_rtmp_handshake (s=0xa665b0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c:576
#1 0x00000000004af3ab in ngx_rtmp_init_connection (c=0x7ffff7fad190) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_init.c:132
#2 0x000000000042befe in ngx_event_accept (ev=0xa7b370) at src/event/ngx_event_accept.c:357
#3 0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=18446744073709551615, flags=1) at src/event/modules/ngx_epoll_module.c:683
#4 0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#5 0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#6 0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
在ngx_rtmp_handshake中,初始化rtmp的读写函数为:
c->read->handler = ngx_rtmp_handshake_recv;
c->write->handler = ngx_rtmp_handshake_send;
然后,设置状态机:
s->hs_stage = NGX_RTMP_HANDSHAKE_SERVER_RECV_CHALLENGE;
并直接进入下一个状态:
ngx_rtmp_handshake_recv(c->read);
可见,还是使用了状态机,只是将状态组合成了几个大的阶段,通过设置不同的c->read/write->hander来处理。
在handshake完毕的函数ngx_rtmp_cycle(s)里面,将handler设置成了ngx_rtmp_recv/ngx_rtmp_send。
HandshakeBuffer
设置断点,看状态如何改变:
(gdb) b ngx_rtmp_handshake_recv
Breakpoint 15 at 0x4b0526: file /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c, line 375.
(gdb) b ngx_rtmp_handshake_send
Breakpoint 16 at 0x4b09a6: file /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c, line 490.
ngx_rtmp_handshake_recv,接受消息,处理时,先接受指定长度的数据,然后再处理。
b = s->hs_buf;
while (b->last != b->end) {
n = c->recv(c, b->last, b->end - b->last);
if (n == NGX_AGAIN) {
ngx_add_timer(rev, s->timeout);
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
}
return;
}
b->last += n;
}
第一次时,会收1537字节然后处理,:
(gdb) p b->end-b->last
$76 = 1537
所以这个s->hs_buf应该初始化为1537字节,即handshake的字节。在调用handshake时初始化为NGX_RTMP_HANDSHAKE_BUFSIZE长度。
获取时间:
ngx_time_update这个函数会更新全局变量ngx_current_msec,调用的是ngx_gettimeofday,如果过多频繁的调用会有性能问题。
所以应该不会随意调用它,很多时候只是直接用,相当于cache了这个时间。
timer的处理,超时:
假设收取失败,返回的是NGX_AGAIN(-2),则会使用定时器:ngx_add_timer设置超时,然后调用ngx_handle_read_event侦听READ事件。
ngx_add_timer会添加到rbtree里面去:
timer = {key = 1380262079704, left = 0x719700, right = 0x719700, parent = 0x0, color = 0 '\000', data = 0 '\000'}
NGX_EAGAIN返回后,会调用ngx_event_expire_timers函数,这个会取全局的rbtree的root:ngx_event_timer_rbtree
(gdb) p ngx_event_timer_rbtree
$98 = {root = 0xa7b468, sentinel = 0x719700, insert = 0x414ded }
(gdb) p *ngx_event_timer_rbtree.root
$101 = {key = 1380262079704, left = 0x719700, right = 0x719700, parent = 0x0, color = 0 '\000', data = 0 '\000'}
这个root就是ngx_add_timer添加的那个选项,rev就是handshake的那个结构。
rbtree的key是超时时间,所以最小的超时时间会放在前面,ngx_event_add_timer函数里面:
key = ngx_current_msec + timer;
rbtree的节点是直接指向evt对象的,所以ngx_event_expire_timers可以直接从rbtree的node获取evt:
ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));
然后设置为timeout后,直接就调用handler:
ev->timedout = 1;
ev->handler(ev); // ngx_rtmp_handshake_recv
因为timeout,所以就关掉连接了。
EAGAIN的处理:
假若没有timeout,则进入到EAGAIN的处理。
ngx_event_find_timer会返回一个timeout,共epoll_wait来处理超时事件。
timer = ngx_event_find_timer(); //60000,handshake的超时为60秒。
events = epoll_wait(ep, event_list, (int) nevents, timer);
若fd可读,则调用read的handler:
if ((revents & EPOLLIN) && rev->active) {
rev = c->read;
rev->handler(rev);
(gdb) p *c->read->handler
$125 = {void (ngx_event_t *)} 0x4b0519
所以直接就调用到了这个处理函数ngx_rtmp_handshake_recv,先清除timer,然后读取到handshake需要的长度的数据。
(gdb) p b->end-b->last
$126 = 1437
还需要读取这些数据。若EAGAIN,还是在可读时调用这个函数。
可见nginx来设置不同的handler,相当于将大状态机切割成小状态机,小状态机里面直接switch。
若c++用类表示大状态机,用switch表示小状态机,是可以简化的。设置不同的handler,就设置不同的状态子类表示。
ATM模型:
ATM就是读-写-读这种分离的模型,避免又读又写。即同一时刻,只能读或者写,做完了才能做下一个。
ngx_rtmp_handshake_recv中,会先读取,若EAGAIN则侦听读事件,若读取完毕则删除读事件。
// start read, if not completed, focus read event.
n = c->recv(c, b->last, b->end - b->last);
if (n == NGX_AGAIN) {
ngx_handle_read_event(c->read, 0);
return;
}
// read completed, donot focus read event.
if (rev->active) {
ngx_del_event(rev, NGX_READ_EVENT, 0);
}
若读取C0C1成功,则发送S0S1S2。调用的是:
ngx_rtmp_handshake_send(c->write);
这个函数会尝试发送,也是ATM模型:
// start write, if not completed, focus write event.
n = c->send(c, b->pos, b->last - b->pos);
if (n == NGX_AGAIN || n == 0) {
ngx_handle_write_event(c->write, 0)
return;
}
// write completed, donot focus write event.
if (wev->active) {
ngx_del_event(wev, NGX_WRITE_EVENT, 0);
}
发送S0S1完毕,还是要收C2.等C0C1C2和S0S1S2都处理完毕,则进入ngx_rtmp_handshake_done,调用ngx_rtmp_cycle函数,进入包处理逻辑。
RTMP收包
这个应该是状态巨多的一个场景,每个包都是一个大状态。
首先把handler设置成收发的函数。
c->read->handler = ngx_rtmp_recv;
c->write->handler = ngx_rtmp_send;
并先开始收数据并处理:
ngx_rtmp_recv(c->read);
ngx_rtmp_recv的主要逻辑就是解析chunk为RTMP包,并调用处理包的函数。
ngx_rtmp_session_t* s中有个成员是ngx_rtmp_stream_t* in_streams,这个就是chunk streams。
st = &s->in_streams[s->in_csid]; // 获取或创建一个chunk_stream,直接将数组转换为map,即预先开辟in_streams的空间。
其中:
typedef struct {
ngx_rtmp_header_t hdr;
uint32_t dtime;
uint32_t len; /* current fragment length */
uint8_t ext;
ngx_chain_t *in;
} ngx_rtmp_stream_t;
在每个ngx_rtmp_stream_t中都定义了一个chain:in。每个chain其实就是一个缓存,就是说给每个chunk_stream新建了一个可以写的buffer。
n = c->recv(c, b->last, b->end - b->last); // 先收146字节的数据。调试时可以改为10,看如何解析。
b的初始化在ngx_rtmp_init_session,设置了in_streams里面:
s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) * cscf->max_streams);
收到数据后,开始解析chunk。若只读取了10字节,那么basic_header当fmt为0时需要12字节,会在这个地方再读取:
if (fmt == 0) {
if (b->last - p < 4)
continue;
(gdb) p b->last - p
$173 = 2
再次循环调用recv和解析:
n = c->recv(c, b->last, b->end - b->last); // b->end - b->last=136,读取了10字节。
解析时,还是会重头开始解析一遍。除非header解析完了,才会改变pos位置:
/* header done */
b->pos = p; // p-b->start=12, fmt为0时的12字节basic header.
解析完的头如下:
(gdb) p st->hdr
$189 = {csid = 3, timestamp = 0, mlen = 161, type = 20 '\024', msid = 0}
有个地方验证了消息的最大长度,不能超过这个:cscf->max_message,默认1048576即1M,这个对大码率视频可能需要配置。
解析完头,就得看body是否足够了:
size = b->last - b->pos; // buffer的数据,134
fsize = h->mlen - st->len; // 消息还没有下载的长度。
这个时候,还需要更多的数据:
if (fsize > s->in_chunk_size) {
st->len += s->in_chunk_size;
b->last = b->pos + s->in_chunk_size;
这样就算出了还需要收多少数据:
(gdb) p b->end-b->last
$198 = 6
然后会接着收,一直收完为止。
RTMP处理包
ngx_rtmp_recv收到一个包后,调用ngx_rtmp_receive_message处理,这个函数根据消息的类型来找到对应的handler:
evhs = &cmcf->events[h->type];
evh = evhs->elts;
(*evh)(s, h, in)
消息处理的钩子函数的初始化是在ngx_rtmp_init_event_handlers,譬如AMF0的消息处理在:
static size_t amf_events[] = {
NGX_RTMP_MSG_AMF_CMD,
NGX_RTMP_MSG_AMF_META,
NGX_RTMP_MSG_AMF_SHARED,
NGX_RTMP_MSG_AMF3_CMD,
NGX_RTMP_MSG_AMF3_META,
NGX_RTMP_MSG_AMF3_SHARED
};
/* init amf events */
for(n = 0; n < sizeof(amf_events) / sizeof(amf_events[0]); ++n) {
eh = ngx_array_push(&cmcf->events[amf_events[n]]);
*eh = ngx_rtmp_amf_message_handler;
}
在amf的处理函数中,还得解析body内容后再调用其他钩子,来处理特定的amf0函数:
ph = ch->elts;
(*ph)(s, h, in) // ngx_rtmp_cmd_connect_init
也就是说,这个处理实际上是最难的,虽然一个消息能被一个处理函数处理,可是这个函数不知道很多状态信息。
不过nginx-rtmp这样还是处理的很漂亮,非常Nice。
其实同步的逻辑是这样写:
MessagePacket* packet = NULL;
if((ret = ctx_core->rtmp->RecvMessage(&packet)) != ErrorCode::Success){
return ret;
}
if(packet is play){
return process_play();
}
if(packet is publish){
return process_publish();
}
纯异步的方式是超级复杂,特别是加上了回源pull和push两种方式后。
能简化纯异步的技术有:
1. 将收包和解析处理分开,确认消息收到了才开始解析。
2. 异步的ATM模型,读写分开,读完才写,写完才读。
3. 处理时使用handler,譬如handshake有handler,amf0有handler,amf0-connect有handler。
同步方式是要简单很多。
Edge-relay-pull模式
nginx-rtmp的边缘叫做relay,即中继模式。有pull和push,即上行推流和下行播放。
实际上FMS是不区分这两个的,中继时只需要指定上游服务器地址即可。
收到请求前的调用:
ngx_rtmp_relay_create_app_conf,初始化app的配置。
ngx_rtmp_relay_push_pull,解析pull的命令和参数。
ngx_rtmp_relay_merge_app_conf
ngx_rtmp_relay_postconfiguration
ngx_rtmp_relay_init_process
连接上游的调用:
ngx_rtmp_relay_pull
ngx_rtmp_relay_create_local_ctx
ngx_rtmp_relay_create_remote_ctx
ngx_rtmp_relay_create_connection
ngx_event_connect_peer(堆栈如下)
#0 ngx_event_connect_peer (pc=0xa53ed8) at src/event/ngx_event_connect.c:25
#1 0x00000000004cb815 in ngx_rtmp_relay_create_connection (cctx=0x7fffffffdd60, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:468
#2 0x00000000004cba2a in ngx_rtmp_relay_create_remote_ctx (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:527
#3 0x00000000004cbce3 in ngx_rtmp_relay_create (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0, create_publish_ctx=0x4cb9d5 , create_play_ctx=0x4cba2c )
at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:607
#4 0x00000000004cbde9 in ngx_rtmp_relay_pull (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:630
#5 0x00000000004cc226 in ngx_rtmp_relay_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:738
#6 0x00000000004d0df5 in ngx_rtmp_enotify_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_enotify_module.c:412
#7 0x00000000004d4045 in ngx_rtmp_notify_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_notify_module.c:1407
#8 0x00000000004d5b6d in ngx_rtmp_log_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_log_module.c:844
#9 0x00000000004b8f7c in ngx_rtmp_cmd_play_init (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_cmd_module.c:569
#10 0x00000000004b6d2b in ngx_rtmp_amf_message_handler (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_receive.c:436
#11 0x00000000004b28f3 in ngx_rtmp_receive_message (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:791
#12 0x00000000004b1d19 in ngx_rtmp_recv (rev=0xa7b440) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:456
#13 0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=59762, flags=1) at src/event/modules/ngx_epoll_module.c:683
#14 0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#15 0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#16 0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
连接后,进行握手:
ngx_rtmp_relay_create_connection
ngx_event_connect_peer
ngx_rtmp_init_session
ngx_rtmp_client_handshake
握手完成时,回调relay的函数:
ngx_epoll_process_events
ngx_rtmp_handshake_recv
ngx_rtmp_handshake_recv
ngx_rtmp_handshake_send
ngx_rtmp_handshake_done
ngx_rtmp_fire_event
ngx_rtmp_relay_handshake_done
ngx_rtmp_relay_send_connect
正常的握手是没有回调的,就进入了ngx_rtmp_cycle,相当于relay在这个地方进行了hook:
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE)
ngx_rtmp_cycle(s);
异步发送多个包
回源连接connect时,连续发送了几个包,对于异步的socket,如何做到的呢?
ngx_rtmp_relay_send_connect
return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
|| ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
|| ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? NGX_ERROR
: NGX_OK;
分析一个:ngx_rtmp_send_chunk_size
ngx_rtmp_relay_send_connect
ngx_rtmp_send_chunk_size
ngx_rtmp_send_shared_packet
ngx_rtmp_send_message
ngx_rtmp_send
这个函数是先将包写到buffer:
ngx_int_t
ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_chunk_size(s, chunk_size));
}
ngx_chain_t *
ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
其中,ngx_rtmp_create_chunk_size就是将包写到ngx_chain_t*,调用ngx_rtmp_prepare_message。
发送多包的逻辑是:
if (!s->connection->write->active) {
ngx_rtmp_send(s->connection->write);
}
其中,ngx_rtmp_send就是一个没有返回值的函数,它主要是启动ATM到写模式,若写不完,在可写时会自动回调这个函数。
所以发送多包,实际上就是把包发送到ngx_rtmp_send(ngx_event_t *wev)的wev的chain里面,相当于缓冲区,它会保证将它发送完。
至于ngx_rtmp_send_message,根本就不会处理这个发送错误,它打交道的是缓冲区。
不过这样比起同步来,也足够麻烦了。
简化的地方:将发送和组包分离,组包到chain,可以将多个包进入队列,发送只负责发就好了。
同样,发送成功后,需要调用特殊的回调函数。
譬如处理amf的_result消息时:
ngx_rtmp_relay_postconfiguration
ch = ngx_array_push(&cmcf->amf);
ngx_str_set(&ch->name, "_result");
ch->handler = ngx_rtmp_relay_on_result;
这样可以回调:
ngx_rtmp_recv
ngx_rtmp_receive_message
ngx_rtmp_amf_message_handler
ngx_rtmp_relay_on_result
ngx_rtmp_relay_send_create_stream
整个异步的状态,无处不在。
边缘数据
边缘处理数据包的调用是:
ngx_epoll_process_events
ngx_rtmp_recv
ngx_rtmp_receive_message
ngx_rtmp_codec_av
回调是在ngx_rtmp_codec_postconfiguration中设置:
h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]);
*h = ngx_rtmp_codec_av;
h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);
*h = ngx_rtmp_codec_av;
其中,回源连接的ngx_rtmp_receive_message会调用的handler包括:
ngx_rtmp_codec_av //解码av。
ngx_rtmp_live_av // 转发给所有客户端
ngx_rtmp_live_av的调用是:
#0 ngx_rtmp_send (wev=0xa95450) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:492
#1 0x00000000004b283f in ngx_rtmp_send_message (s=0xa665b0, out=0xab33e4, priority=0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:736
#2 0x00000000004bef33 in ngx_rtmp_live_av (s=0xa53f88, h=0xaaf4a0, in=0xab1400) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_live_module.c:957
#3 0x00000000004b28f3 in ngx_rtmp_receive_message (s=0xa53f88, h=0xaaf4a0, in=0xab1400) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:791
#4 0x00000000004b1d19 in ngx_rtmp_recv (rev=0xa7b4a8) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:456
#5 0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=49151, flags=1) at src/event/modules/ngx_epoll_module.c:683
#6 0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249
#7 0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315
#8 0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409
其中ngx_rtmp_live_av,也是调用ngx_rtmp_send_message,发送到缓冲区,然后发起发送请求而已,不必等到所有数据都发出去了。
ngx_rtmp_live_av
/* broadcast to all subscribers */
是在live模块中实现的这个转发,即subscribe模式。
也就是说,这个回源连接收到数据后,同时转发给了其他socket(准确讲是转发给了各个客户端的chain,然后异步发送)。