Chinaunix首页 | 论坛 | 博客
  • 博客访问: 7926996
  • 博文数量: 701
  • 博客积分: 2150
  • 博客等级: 上尉
  • 技术积分: 13233
  • 用 户 组: 普通用户
  • 注册时间: 2011-06-29 16:28
个人简介

天行健,君子以自强不息!

文章分类

全部博文(701)

文章存档

2019年(2)

2018年(12)

2017年(76)

2016年(120)

2015年(178)

2014年(129)

2013年(123)

2012年(61)

分类: 架构设计与优化

2014-01-27 10:58:42

Hank (http://blog.chinaunix.net/uid/26000296.html)版权所有,尊重他人劳动成果,
转载时请注明作者和原始出处及本声明。


一、简介
crtmpserver(C++ RTMP Server)是高性能的流媒体服务器,支持以下协议(直播或录制):
 . 支持Flash(RTMP,RTMPE,RTMPS,RTMPS,RTMPT,RTMPTE)
 . 支持嵌入式设备:iPhone,Android
 . 支持监控摄像机
 . 支持IP-TV(MPEG-tS,RTSP/RTCP/RTP)


此外,crtmpserver可以作为高性能rendes-vous服务器,可以让你做:
 . 音视频会议
 . 在线游戏
 . 在线协作
 . 简单/复杂的聊天应用


crtmpserver不同之处
 . 支持多种流媒体技术之间的通信(Adobe flash, Apple streaming, Silverlight, etc)
 . 高性能,并发几千路连接
 . 占用资源少
 . 可移植性强,只要GCC支持,crtmpserver可以运行在: IP cameras, Android, ARM or MIPS based systems, SoC, etc
 . 依赖少:lua, openssl


如此强大和高性能的流媒体服务器,是如何实现肯定很让人期待,
我前面写了一些详细的如何应用的文章,
本文开始分析代码,剖析其是如何实现,并高性能处理的;


NOTE1: 本文基于linux Redhat运行环境,使用其默认配置文件crtmpserver/crtmpserver.lua;

二、Crtmpserver的框架层
先从main()函数开始,具体的分析时,将会配上相应的数据结构解析;


2.1 main()函数层
源文件: crtmpserver/src/crtmpserver.cpp 
代码:
01  int main(int argc, char **argv) { 


      //1. Pick up the startup parameters and hold them inside the running status 
02    Variant::DeserializeFromCmdLineArgs(argc, argv, gRs.commandLine);


03    string configFile = argv[argc - 1];
04    if (configFile.find("--") == 0)
05      configFile = "";   
06    NormalizeCommandLine(configFile);  


07    do {
       //2. Reset the run flag
08     gRs.run = false;


        //3. Initialize the running status
09      if (Initialize()) {
10        Run();
11      } else {
12        gRs.run = false;
13      }


        //5. Cleanup
14      Cleanup();
15    } while (gRs.run);


      //6. We are done
16    return 0;
17  }


分析:
02 :  命令行参数检查
03~06 : 命令行参数的基本处理;
09 :  初始化整个流媒体服务器;
10 :  处理各种事件;
14 :  关闭服务器的各项功能,并清空;


最重要的就是Initialize(),Run()这两个函数,下面具体分析它们;


2.2 Initialize()函数层
源文件: crtmpserver/src/crtmpserver.cpp 
代码:
01  bool Initialize() {
02    Logger::Init();


03    gRs.pConfigFile = new ConfigFile(NULL, NULL);
04    string configFilePath = gRs.commandLine["arguments"]["configFile"];
05    string fileName;
06    string extension;
07    splitFileName(configFilePath, fileName, extension);
08    if (lowerCase(extension) == "xml") {
        ...
09    } else if (lowerCase(extension) == "lua") {
10      gRs.pConfigFile->LoadLuaFile(configFilePath,(bool)gRs.commandLine["arguments"]["--daemon"]);
11    } else {
12      FATAL("Invalid file format: %s", STR(configFilePath));
13      return false;
14    }


15    gRs.pConfigFile->ConfigLogAppenders();


16    INFO("Initialize I/O handlers manager: %s", NETWORK_REACTOR);
17    IOHandlerManager::Initialize();


18    INFO("Configure modules");
19    gRs.pConfigFile->ConfigModules();


20    INFO("Plug in the default protocol factory");
21    gRs.pProtocolFactory = new DefaultProtocolFactory();
22    ProtocolFactoryManager::RegisterProtocolFactory(gRs.pProtocolFactory);


23    INFO("Configure factories");
24    gRs.pConfigFile->ConfigFactories();


25    INFO("Configure acceptors");
26    gRs.pConfigFile->ConfigAcceptors();


27    INFO("Configure instances");
28    gRs.pConfigFile->ConfigInstances();


29    INFO("Start I/O handlers manager: %s", NETWORK_REACTOR);
30    IOHandlerManager::Start();


31    INFO("Configure applications");
32    gRs.pConfigFile->ConfigApplications();


33    INFO("Install the quit signal");
34    installQuitSignal(QuitSignalHandler);


35    return true;
36  }


分析:
03~14: 依据配置文件的后缀(本文使用的是crtmpserver.lua)选择不同的函数来处理配置文件;
Lua 是一个小巧的脚本语言。作者是巴西人。该语言的设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
Lua脚本可以很容易的被 C/C++ 代码调用,也可以反过来调用C/C++的函数,这使得Lua在应用程序中可以被广泛应用。
不仅仅作为扩展脚本,也可以作为普通的配置文件,代替XML,Ini等文件格式,并且更容易理解和维护。
Lua由标准C编写而成,代码简洁优美,几乎在所有操作系统和平台上都可以编译,运行.


15: 配置流媒体服务器日志模块;


17: 初始化类IOHandlerManager的成员变量
IOHandlerManager类定义:
// FILE: thelib/include/netio/epoll/iohandlermanager.h
#define EPOLL_QUERY_SIZE 1024


class IOHandlerManager {
private:
  static int32_t _eq; 
  static map _activeIOHandlers;
  static map _deadIOHandlers;


  static struct epoll_event _query[EPOLL_QUERY_SIZE];


  static vector _tokensVector1;
  static vector _tokensVector2;
  static vector *_pAvailableTokens;
  static vector *_pRecycledTokens;


  static TimersManager *_pTimersManager;
  static struct epoll_event _dummy;
public:
  ...
}


IOHandlerManager::Initialize()的函数定义如下:
// FILE: thelib/src/netio/epoll/iohandlermanager.cpp


void IOHandlerManager::Initialize() {
  _eq = 0;
  _pAvailableTokens = &_tokensVector1;
  _pRecycledTokens = &_tokensVector2;
  _pTimersManager = new TimersManager(ProcessTimer);
  memset(&_dummy, 0, sizeof (_dummy));
}


19: 以dlopen的方式加载各个模块的动态库
ConfigFile类定义:
// FILE: thelib/include/configuration/configfile.h
class DLLEXP ConfigFile {
private:
  Variant _configuration;
  Variant _logAppenders;
  string _rootAppFolder;
  Variant _applications;
  map _uniqueNames;
  GetApplicationFunction_t _staticGetApplicationFunction;
  GetFactoryFunction_t _staticGetFactoryFunction;
  map _modules;
  bool _isOrigin;
public:
  ...
}


函数调用:
gRs.pConfigFile->ConfigModules()
 |--> ConfigFile::ConfigModules 
       |--> FOR_MAP(_applications, string, Variant, i) {
              ConfigFile::ConfigModule(Variant &node) 
            }   |
                |
                |--> Module::Load()
                       |--> Module::LoadLibrary(){
                              libHandler = LOAD_LIBRARY(STR(path), LOAD_LIBRARY_FLAGS);
                            }
                              
ConfigFile::_modules的数据:
["flvplayback"]   => flvplayback/libflvplayback.so
["samplefactory"] => samplefactory/libsamplefactory.so
["vptests"]       => vptests/libvptests.so
["admin"]         => admin/libadmin.so
["appselector"]   => appselector/libappselector.so
["proxypublish"]  => proxypublish/libproxypublish.so
["stresstest"]    => stresstest/libstresstest.so
["applestreamingclient"] => applestreamingclient/libapplestreamingclient.so


21: 创建类DefaultProtocolFactory的对象;
22: 初始化协议工厂类的对象的成员变量;
类定义如下:
/*!   
  @class ProtocolFactoryManager
  @brief Class that manages protocol factories.
 */
class DLLEXP ProtocolFactoryManager {
private:
  static map _factoriesById;
  static map _factoriesByProtocolId;
  static map _factoriesByChainName;
public:
  ...
}


代码如下:
bool ProtocolFactoryManager::RegisterProtocolFactory(BaseProtocolFactory *pFactory) {
  ...
  检查协议是否已注册;
  //1. Test to see if this factory is already registered
  //2. Test to see if the protocol chains exported by this factory are already in use
  //3. Test to see if the protocols exported by this factory are already in use


  //4. Register everything
  FOR_VECTOR(protocolChains, i) {
    _factoriesByChainName[protocolChains[i]] = pFactory;
  }


  FOR_VECTOR(protocols, i) {
    _factoriesByProtocolId[protocols[i]] = pFactory;
  }


  _factoriesById[pFactory->GetId()] = pFactory;
  
  return true;
}


24: 将加载了动态库的模块注册到协议工厂的成员变量中去;
成员函数定义如下:
bool ConfigFile::ConfigFactories() {


  FOR_MAP(_modules, string, Module, i) {
    if (!MAP_VAL(i).ConfigFactory()) {
      FATAL("Unable to configure factory");
      return false; 
    } 
  }     
  return true;
}


26: 区分是UDP还是TCP,对所有要处理协议对应的端口进行socket绑定;
    并初始化每个协议事件响应函数;
bool ConfigFile::ConfigAcceptors(){ 
  // 对所有的模块进行处理
  FOR_MAP(_modules, string, Module, i) {
       MAP_VAL(i).BindAcceptors();
         |-->bool Module::BindAcceptors()
               |--> bool Module::BindAcceptor(Variant &node)
                     |--> IOHandler::IOHandler
                           |--> void IOHandlerManager::RegisterIOHandler(IOHandler* pIOHandler)
                                 |--> void IOHandlerManager::SetupToken(IOHandler *pIOHandler)
                                       |--   pResult->pPayload = pIOHandler;
                                               pResult->validPayload = true;
  }
}


28: 创建server的实例
30: 调用函数 int epoll_create(int size); 创建epoll句柄;
void IOHandlerManager::Start() {
  _eq = epoll_create(EPOLL_QUERY_SIZE);
}


32: 初始化各种应用, 并调用 epoll_ctl()进行epoll事件响应函数的注册; 
bool ConfigFile::ConfigApplications()
  FOR_MAP(_modules, string, Module, i) {
          |--> bool Module::ConfigApplication()
                |--> bool BaseClientApplication::ActivateAcceptors
                       |--> bool BaseClientApplication::ActivateAcceptor(IOHandler *pIOHandler) 
                             |--> bool TCPAcceptor::StartAccept() 
                                    |--> bool IOHandlerManager::EnableAcceptConnections(IOHandler *pIOHandler) 
                                           evt.events = EPOLLIN;
                                           evt.data.ptr = pIOHandler->GetIOHandlerManagerToken();
                                           epoll_ctl(_eq, EPOLL_CTL_ADD, pIOHandler->GetInboundFd(), &evt) ;
        
  }
}


34: 依据不同的平台, 如linux ,windows, android等安装不同的信号处理机;
linux平台下:调用 sigemptyset() , sigaction(); 进行清空并注册自己的信号处理机;


2.3 Run()函数
函数定义:
01  void Run() {


02    INFO("\n%s", STR(gRs.pConfigFile->GetServicesInfo()));
03    INFO("GO! GO! GO! (%u)", (uint32_t) getpid());


04    while (IOHandlerManager::Pulse()) {
05      IOHandlerManager::DeleteDeadHandlers();
06      ProtocolManager::CleanupDeadProtocols();
07    }
08  }


代码分析:
02: 打印显示流媒体服务器所有开启的应用,以及对应的端口;
03: 显示流媒体服务器的进程ID;
04: 调用 epoll_wait()等待事件发生;
    当对应端口的事件发生时,就调用事件处理函数进行处理;


bool IOHandlerManager::Pulse() {
  int32_t eventsCount = 0;
  if ((eventsCount = epoll_wait(_eq, _query, EPOLL_QUERY_SIZE, 1000)) < 0) {
    int32_t err = errno;
    FATAL("Unable to execute epoll_wait: (%d) %s", err, strerror(err));
    return false;
  }
  
  _pTimersManager->TimeElapsed(time(NULL));
    
  for (int32_t i = 0; i < eventsCount; i++) {
    //1. Get the token
    IOHandlerManagerToken *pToken =
        (IOHandlerManagerToken *) _query[i].data.ptr;
  
    //2. Test the fd
    if ((_query[i].events & EPOLLERR) != 0) {
      if (pToken->validPayload) {
        if ((_query[i].events & EPOLLHUP) != 0) {
          DEBUG("***Event handler HUP: %p", (IOHandler *) pToken->pPayload);
          ((IOHandler *) pToken->pPayload)->OnEvent(_query[i]);
        } else {
          DEBUG("***Event handler ERR: %p", (IOHandler *) pToken->pPayload);
        }
        IOHandlerManager::EnqueueForDelete((IOHandler *) pToken->pPayload);
      }
      continue;
    }


    //3. Do the damage
    if (pToken->validPayload) {
      if (!((IOHandler *) pToken->pPayload)->OnEvent(_query[i])) {
        EnqueueForDelete((IOHandler *) pToken->pPayload);
      }
    } else {
      FATAL("Invalid token");
    }
  }


  if (_tokensVector1.size() > _tokensVector2.size()) {
    _pAvailableTokens = &_tokensVector1;
    _pRecycledTokens = &_tokensVector2;
  } else {
    _pAvailableTokens = &_tokensVector2;
    _pRecycledTokens = &_tokensVector1;
  }


  return true;
}

05,06: 事件处理完,对资源进行回收;
阅读(11218) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~