Hank (http://blog.chinaunix.net/uid/26000296.html)版权所有,尊重他人劳动成果,
转载时请注明作者和原始出处及本声明。
一、简介
crtmpserver(C++ RTMP Server)是高性能的流媒体服务器,支持以下协议(直播或录制):
. 支持Flash(RTMP,RTMPE,RTMPS,RTMPS,RTMPT,RTMPTE)
. 支持嵌入式设备:iPhone,Android
. 支持监控摄像机
. 支持IP-TV(MPEG-tS,RTSP/RTCP/RTP)
此外,crtmpserver可以作为高性能rendes-vous服务器,可以让你做:
. 音视频会议
. 在线游戏
. 在线协作
. 简单/复杂的聊天应用
crtmpserver不同之处
. 支持多种流媒体技术之间的通信(Adobe flash, Apple streaming, Silverlight, etc)
. 高性能,并发几千路连接
. 占用资源少
. 可移植性强,只要GCC支持,crtmpserver可以运行在: IP cameras, Android, ARM or MIPS based systems, SoC, etc
. 依赖少:lua, openssl
如此强大和高性能的流媒体服务器,是如何实现肯定很让人期待,
我前面写了一些详细的如何应用的文章,
本文开始分析代码,剖析其是如何实现,并高性能处理的;
NOTE1: 本文基于linux Redhat运行环境,使用其默认配置文件crtmpserver/crtmpserver.lua;
二、Crtmpserver的框架层
先从main()函数开始,具体的分析时,将会配上相应的数据结构解析;
2.1 main()函数层
源文件: crtmpserver/src/crtmpserver.cpp
代码:
01 int main(int argc, char **argv) {
//1. Pick up the startup parameters and hold them inside the running status
02 Variant::DeserializeFromCmdLineArgs(argc, argv, gRs.commandLine);
03 string configFile = argv[argc - 1];
04 if (configFile.find("--") == 0)
05 configFile = "";
06 NormalizeCommandLine(configFile);
07 do {
//2. Reset the run flag
08 gRs.run = false;
//3. Initialize the running status
09 if (Initialize()) {
10 Run();
11 } else {
12 gRs.run = false;
13 }
//5. Cleanup
14 Cleanup();
15 } while (gRs.run);
//6. We are done
16 return 0;
17 }
分析:
02 : 命令行参数检查
03~06 : 命令行参数的基本处理;
09 : 初始化整个流媒体服务器;
10 : 处理各种事件;
14 : 关闭服务器的各项功能,并清空;
最重要的就是Initialize(),Run()这两个函数,下面具体分析它们;
2.2 Initialize()函数层
源文件: crtmpserver/src/crtmpserver.cpp
代码:
01 bool Initialize() {
02 Logger::Init();
03 gRs.pConfigFile = new ConfigFile(NULL, NULL);
04 string configFilePath = gRs.commandLine["arguments"]["configFile"];
05 string fileName;
06 string extension;
07 splitFileName(configFilePath, fileName, extension);
08 if (lowerCase(extension) == "xml") {
...
09 } else if (lowerCase(extension) == "lua") {
10 gRs.pConfigFile->LoadLuaFile(configFilePath,(bool)gRs.commandLine["arguments"]["--daemon"]);
11 } else {
12 FATAL("Invalid file format: %s", STR(configFilePath));
13 return false;
14 }
15 gRs.pConfigFile->ConfigLogAppenders();
16 INFO("Initialize I/O handlers manager: %s", NETWORK_REACTOR);
17 IOHandlerManager::Initialize();
18 INFO("Configure modules");
19 gRs.pConfigFile->ConfigModules();
20 INFO("Plug in the default protocol factory");
21 gRs.pProtocolFactory = new DefaultProtocolFactory();
22 ProtocolFactoryManager::RegisterProtocolFactory(gRs.pProtocolFactory);
23 INFO("Configure factories");
24 gRs.pConfigFile->ConfigFactories();
25 INFO("Configure acceptors");
26 gRs.pConfigFile->ConfigAcceptors();
27 INFO("Configure instances");
28 gRs.pConfigFile->ConfigInstances();
29 INFO("Start I/O handlers manager: %s", NETWORK_REACTOR);
30 IOHandlerManager::Start();
31 INFO("Configure applications");
32 gRs.pConfigFile->ConfigApplications();
33 INFO("Install the quit signal");
34 installQuitSignal(QuitSignalHandler);
35 return true;
36 }
分析:
03~14: 依据配置文件的后缀(本文使用的是crtmpserver.lua)选择不同的函数来处理配置文件;
Lua 是一个小巧的脚本语言。作者是巴西人。该语言的设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
Lua脚本可以很容易的被 C/C++ 代码调用,也可以反过来调用C/C++的函数,这使得Lua在应用程序中可以被广泛应用。
不仅仅作为扩展脚本,也可以作为普通的配置文件,代替XML,Ini等文件格式,并且更容易理解和维护。
Lua由标准C编写而成,代码简洁优美,几乎在所有操作系统和平台上都可以编译,运行.
15: 配置流媒体服务器日志模块;
17: 初始化类IOHandlerManager的成员变量
IOHandlerManager类定义:
// FILE: thelib/include/netio/epoll/iohandlermanager.h
#define EPOLL_QUERY_SIZE 1024
class IOHandlerManager {
private:
static int32_t _eq;
static map _activeIOHandlers;
static map _deadIOHandlers;
static struct epoll_event _query[EPOLL_QUERY_SIZE];
static vector _tokensVector1;
static vector _tokensVector2;
static vector *_pAvailableTokens;
static vector *_pRecycledTokens;
static TimersManager *_pTimersManager;
static struct epoll_event _dummy;
public:
...
}
IOHandlerManager::Initialize()的函数定义如下:
// FILE: thelib/src/netio/epoll/iohandlermanager.cpp
void IOHandlerManager::Initialize() {
_eq = 0;
_pAvailableTokens = &_tokensVector1;
_pRecycledTokens = &_tokensVector2;
_pTimersManager = new TimersManager(ProcessTimer);
memset(&_dummy, 0, sizeof (_dummy));
}
19: 以dlopen的方式加载各个模块的动态库
ConfigFile类定义:
// FILE: thelib/include/configuration/configfile.h
class DLLEXP ConfigFile {
private:
Variant _configuration;
Variant _logAppenders;
string _rootAppFolder;
Variant _applications;
map _uniqueNames;
GetApplicationFunction_t _staticGetApplicationFunction;
GetFactoryFunction_t _staticGetFactoryFunction;
map _modules;
bool _isOrigin;
public:
...
}
函数调用:
gRs.pConfigFile->ConfigModules()
|--> ConfigFile::ConfigModules
|--> FOR_MAP(_applications, string, Variant, i) {
ConfigFile::ConfigModule(Variant &node)
} |
|
|--> Module::Load()
|--> Module::LoadLibrary(){
libHandler = LOAD_LIBRARY(STR(path), LOAD_LIBRARY_FLAGS);
}
ConfigFile::_modules的数据:
["flvplayback"] => flvplayback/libflvplayback.so
["samplefactory"] => samplefactory/libsamplefactory.so
["vptests"] => vptests/libvptests.so
["admin"] => admin/libadmin.so
["appselector"] => appselector/libappselector.so
["proxypublish"] => proxypublish/libproxypublish.so
["stresstest"] => stresstest/libstresstest.so
["applestreamingclient"] => applestreamingclient/libapplestreamingclient.so
21: 创建类DefaultProtocolFactory的对象;
22: 初始化协议工厂类的对象的成员变量;
类定义如下:
/*!
@class ProtocolFactoryManager
@brief Class that manages protocol factories.
*/
class DLLEXP ProtocolFactoryManager {
private:
static map _factoriesById;
static map _factoriesByProtocolId;
static map _factoriesByChainName;
public:
...
}
代码如下:
bool ProtocolFactoryManager::RegisterProtocolFactory(BaseProtocolFactory *pFactory) {
...
检查协议是否已注册;
//1. Test to see if this factory is already registered
//2. Test to see if the protocol chains exported by this factory are already in use
//3. Test to see if the protocols exported by this factory are already in use
//4. Register everything
FOR_VECTOR(protocolChains, i) {
_factoriesByChainName[protocolChains[i]] = pFactory;
}
FOR_VECTOR(protocols, i) {
_factoriesByProtocolId[protocols[i]] = pFactory;
}
_factoriesById[pFactory->GetId()] = pFactory;
return true;
}
24: 将加载了动态库的模块注册到协议工厂的成员变量中去;
成员函数定义如下:
bool ConfigFile::ConfigFactories() {
FOR_MAP(_modules, string, Module, i) {
if (!MAP_VAL(i).ConfigFactory()) {
FATAL("Unable to configure factory");
return false;
}
}
return true;
}
26: 区分是UDP还是TCP,对所有要处理协议对应的端口进行socket绑定;
并初始化每个协议事件响应函数;
bool ConfigFile::ConfigAcceptors(){
// 对所有的模块进行处理
FOR_MAP(_modules, string, Module, i) {
MAP_VAL(i).BindAcceptors();
|-->bool Module::BindAcceptors()
|--> bool Module::BindAcceptor(Variant &node)
|--> IOHandler::IOHandler
|--> void IOHandlerManager::RegisterIOHandler(IOHandler* pIOHandler)
|--> void IOHandlerManager::SetupToken(IOHandler *pIOHandler)
|-- pResult->pPayload = pIOHandler;
pResult->validPayload = true;
}
}
28: 创建server的实例
30: 调用函数 int epoll_create(int size); 创建epoll句柄;
void IOHandlerManager::Start() {
_eq = epoll_create(EPOLL_QUERY_SIZE);
}
32: 初始化各种应用, 并调用 epoll_ctl()进行epoll事件响应函数的注册;
bool ConfigFile::ConfigApplications()
FOR_MAP(_modules, string, Module, i) {
|--> bool Module::ConfigApplication()
|--> bool BaseClientApplication::ActivateAcceptors
|--> bool BaseClientApplication::ActivateAcceptor(IOHandler *pIOHandler)
|--> bool TCPAcceptor::StartAccept()
|--> bool IOHandlerManager::EnableAcceptConnections(IOHandler *pIOHandler)
evt.events = EPOLLIN;
evt.data.ptr = pIOHandler->GetIOHandlerManagerToken();
epoll_ctl(_eq, EPOLL_CTL_ADD, pIOHandler->GetInboundFd(), &evt) ;
}
}
34: 依据不同的平台, 如linux ,windows, android等安装不同的信号处理机;
linux平台下:调用 sigemptyset() , sigaction(); 进行清空并注册自己的信号处理机;
2.3 Run()函数
函数定义:
01 void Run() {
02 INFO("\n%s", STR(gRs.pConfigFile->GetServicesInfo()));
03 INFO("GO! GO! GO! (%u)", (uint32_t) getpid());
04 while (IOHandlerManager::Pulse()) {
05 IOHandlerManager::DeleteDeadHandlers();
06 ProtocolManager::CleanupDeadProtocols();
07 }
08 }
代码分析:
02: 打印显示流媒体服务器所有开启的应用,以及对应的端口;
03: 显示流媒体服务器的进程ID;
04: 调用 epoll_wait()等待事件发生;
当对应端口的事件发生时,就调用事件处理函数进行处理;
bool IOHandlerManager::Pulse() {
int32_t eventsCount = 0;
if ((eventsCount = epoll_wait(_eq, _query, EPOLL_QUERY_SIZE, 1000)) < 0) {
int32_t err = errno;
FATAL("Unable to execute epoll_wait: (%d) %s", err, strerror(err));
return false;
}
_pTimersManager->TimeElapsed(time(NULL));
for (int32_t i = 0; i < eventsCount; i++) {
//1. Get the token
IOHandlerManagerToken *pToken =
(IOHandlerManagerToken *) _query[i].data.ptr;
//2. Test the fd
if ((_query[i].events & EPOLLERR) != 0) {
if (pToken->validPayload) {
if ((_query[i].events & EPOLLHUP) != 0) {
DEBUG("***Event handler HUP: %p", (IOHandler *) pToken->pPayload);
((IOHandler *) pToken->pPayload)->OnEvent(_query[i]);
} else {
DEBUG("***Event handler ERR: %p", (IOHandler *) pToken->pPayload);
}
IOHandlerManager::EnqueueForDelete((IOHandler *) pToken->pPayload);
}
continue;
}
//3. Do the damage
if (pToken->validPayload) {
if (!((IOHandler *) pToken->pPayload)->OnEvent(_query[i])) {
EnqueueForDelete((IOHandler *) pToken->pPayload);
}
} else {
FATAL("Invalid token");
}
}
if (_tokensVector1.size() > _tokensVector2.size()) {
_pAvailableTokens = &_tokensVector1;
_pRecycledTokens = &_tokensVector2;
} else {
_pAvailableTokens = &_tokensVector2;
_pRecycledTokens = &_tokensVector1;
}
return true;
}
05,06: 事件处理完,对资源进行回收;
阅读(1067) | 评论(0) | 转发(0) |