Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1852913
  • 博文数量: 274
  • 博客积分: 2366
  • 博客等级: 大尉
  • 技术积分: 1880
  • 用 户 组: 普通用户
  • 注册时间: 2007-04-22 09:37
文章分类

全部博文(274)

文章存档

2022年(1)

2020年(10)

2019年(7)

2018年(18)

2017年(26)

2016年(32)

2015年(43)

2014年(30)

2013年(44)

2012年(36)

2011年(17)

2010年(10)

分类: LINUX

2016-07-01 10:59:06

http://www.cnblogs.com/shakin/p/3913151.html
live555从RTSP服务器读取数据到使用接收到的数据流程分析
本文在linux环境下编译live555工程,并用cgdb调试工具对live555工程中的testProgs目录下的openRTSP的执行过程进行了跟踪分析,直到将从socket端读取视频数据并保存为对应的视频和音频数据为止。
进入testProgs目录,执行./openRTSP rtsp://xxxx/test.mp4
对于RTSP协议的处理部分,可设置断点在setupStreams函数中,并跟踪即可进行分析。
这里主要分析进入如下的while(1)循环中的代码
void BasicTaskScheduler0::doEventLoop(char* watchVariable)   
{  
  // Repeatedly loop, handling readble sockets and timed events:  
  while (1)   
  {  
    if (watchVariable != NULL && *watchVariable != 0) break;  
    SingleStep();  
  }  
}  
从这里可知,live555在客户端处理数据实际上是单线程的程序,不断执行SingleStep()函数中的代码。通过查看该函数代码里,下面一句代码为重点
(*handler->handlerProc)(handler->clientData, resultConditionSet); 
其中该条代码出现了两次,通过调试跟踪它的执行轨迹,第一次出现调用的函数是为了处理和RTSP服务器的通信协议的商定,而第二次出现调用的函数才是处理真正的视频和音频数据。对于RTSP通信协议的分析我们暂且不讨论,而直接进入第二次调用该函数的部分。
在我们的调试过程中在执行到上面的函数时就直接调用到livemedia目录下的如下函数
void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/)   
{  
  source->networkReadHandler1();  
}  
//下面这个函数实现的主要功能就是从socket端读取数据并存储数据
void MultiFramedRTPSource::networkReadHandler1()   
{  
  BufferedPacket* bPacket = fPacketReadInProgress;  
  if (bPacket == NULL)  
  {  
    // Normal case: Get a free BufferedPacket descriptor to hold the new network packet:  
    //分配一块新的存储空间来存储从socket端读取的数据  
    bPacket = fReorderingBuffer->getFreePacket(this);  
  }  




  // Read the network packet, and perform sanity checks on the RTP header:  
  Boolean readSuccess = False;  
  do   
  {  
    Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;  
    //fillInData()函数封装了从socket端获取数据的过程,到此函数执行完已经将数据保存到了bPacket对象中  
    if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete))   
   {  
      if (bPacket->bytesAvailable() == 0)   
      {  
      envir() << "MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase \"MAX_PACKET_SIZE\"\n";  
      }  
      break;  
   }  
    if (packetReadWasIncomplete)  
    {  
      // We need additional read(s) before we can process the incoming packet:  
      fPacketReadInProgress = bPacket;  
      return;  
    } else   
    {  
      fPacketReadInProgress = NULL;  
    }  
      
    //省略关于RTP包的处理  
    ...  
    ...  
    ...  
    //fReorderingBuffer为MultiFramedRTPSource类中的对象,该对象建立了一个存储Packet数据包对象的链表  
    //下面的storePacket()函数即将上面获取的数据包存储在链表中  
    if (!fReorderingBuffer->storePacket(bPacket)) break;   
  
    readSuccess = True;  
  } while (0);  
  if (!readSuccess) fReorderingBuffer->freePacket(bPacket);  
  
  doGetNextFrame1();  
  // If we didn't get proper data this time, we'll get another chance  
}  
//下面的这个函数则实现从上面函数中介绍的存储数据包链表的对象(即fReorderingBuffer)中取出数据包并调用相应函数使用它
//代码1.1
void MultiFramedRTPSource::doGetNextFrame1()   
{  
  while (fNeedDelivery)   
  {  
    // If we already have packet data available, then deliver it now.  
    Boolean packetLossPrecededThis;   
    //从fReorderingBuffer对象中取出一个数据包  
    BufferedPacket* nextPacket  
      = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);  
    if (nextPacket == NULL) break;  
  
    fNeedDelivery = False;  
  
    if (nextPacket->useCount() == 0)   
    {  
      // Before using the packet, check whether it has a special header  
      // that needs to be processed:  
      unsigned specialHeaderSize;  
      if (!processSpecialHeader(nextPacket, specialHeaderSize))  
      {  
        // Something's wrong with the header; reject the packet:  
        fReorderingBuffer->releaseUsedPacket(nextPacket);  
        fNeedDelivery = True;  
        break;  
      }  
      nextPacket->skip(specialHeaderSize);  
    }  
  
    // Check whether we're part of a multi-packet frame, and whether  
    // there was packet loss that would render this packet unusable:  
    if (fCurrentPacketBeginsFrame)   
    {  
      if (packetLossPrecededThis || fPacketLossInFragmentedFrame)   
      {  
        // We didn't get all of the previous frame.  
        // Forget any data that we used from it:  
        fTo = fSavedTo; fMaxSize = fSavedMaxSize;  
        fFrameSize = 0;  
      }  
      fPacketLossInFragmentedFrame = False;  
    } else if (packetLossPrecededThis)   
    {  
      // We're in a multi-packet frame, with preceding packet loss  
      fPacketLossInFragmentedFrame = True;  
    }  
    if (fPacketLossInFragmentedFrame)  
    {  
      // This packet is unusable; reject it:  
      fReorderingBuffer->releaseUsedPacket(nextPacket);  
      fNeedDelivery = True;  
      break;  
    }  
  
    // The packet is usable. Deliver all or part of it to our caller:  
    unsigned frameSize;  
    //将上面取出的数据包拷贝到fTo指针所指向的地址  
    nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,  
            fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,  
            fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,  
            fCurPacketMarkerBit);  
    fFrameSize += frameSize;  
  
    if (!nextPacket->hasUsableData())   
    {  
      // We're completely done with this packet now  
      fReorderingBuffer->releaseUsedPacket(nextPacket);  
    }  
  
    if (fCurrentPacketCompletesFrame) //如果完整的取出了一帧数据,则可调用需要该帧数据的函数去处理它  
     {  
      // We have all the data that the client wants.  
      if (fNumTruncatedBytes > 0)   
      {  
    envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("  
        << fSavedMaxSize << ").  "  
        << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";  
      }  
      // Call our own 'after getting' function, so that the downstream object can consume the data:  
      if (fReorderingBuffer->isEmpty())   
      {  
        // Common case optimization: There are no more queued incoming packets, so this code will not get  
        // executed again without having first returned to the event loop.  Call our 'after getting' function  
        // directly, because there's no risk of a long chain of recursion (and thus stack overflow):  
    afterGetting(this);  //调用函数去处理取出的数据帧  
       } else   
      {  
    // Special case: Call our 'after getting' function via the event loop.  
    nextTask() = envir().taskScheduler().scheduleDelayedTask(0,  
                                 (TaskFunc*)FramedSource::afterGetting, this);  
      }  
    }  
    else       
    {  
      // This packet contained fragmented data, and does not complete  
      // the data that the client wants.  Keep getting data:  
      fTo += frameSize; fMaxSize -= frameSize;  
      fNeedDelivery = True;  
    }  
  }  
}  




//下面这个函数即开始调用执行需要该帧数据的函数
void FramedSource::afterGetting(FramedSource* source)   
{  
  source->fIsCurrentlyAwaitingData = False;  
      // indicates that we can be read again  
      // Note that this needs to be done here, in case the "fAfterFunc"  
      // called below tries to read another frame (which it usually will)  
  
  if (source->fAfterGettingFunc != NULL)     
  {  
    (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,  
                   source->fFrameSize, source->fNumTruncatedBytes,  
                   source->fPresentationTime,  
                   source->fDurationInMicroseconds);  
  }  
}  
上面的fAfterGettingFunc为我们自己注册的函数,如果运行的是testProgs中的openRTSP实例,则该函数指向下列代码中通过调用getNextFrame()注册的afterGettingFrame()函数
Boolean FileSink::continuePlaying()  
{  
  if (fSource == NULL) return False;  
  
  fSource->getNextFrame(fBuffer, fBufferSize,  
            afterGettingFrame, this,  
            onSourceClosure, this);  
  
  return True;  
}  
如果运行的是testProgs中的testRTSPClient中的实例,则该函数指向这里注册的afterGettingFrame()函数
Boolean DummySink::continuePlaying()  
{  
  if (fSource == NULL) return False; // sanity check (should not happen)  
  
  // Request the next frame of data from our input source.  "afterGettingFrame()" will get called later, when it arrives:  
  fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,  
                        afterGettingFrame, this,  
                        onSourceClosure, this);  
  return True;  
}  
从上面的代码中可以看到getNextFrame()函数的第一个参数为分别在各自类中定义的buffer,我们继续以openRTSP为运行程序来分析,fBuffer为FileSink类里定义的指针:unsigned char* fBuffer;
这里我们先绕一个弯,看看getNextFrame()函数里做了什么
void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,  
                afterGettingFunc* afterGettingFunc,  
                void* afterGettingClientData,  
                onCloseFunc* onCloseFunc,  
                void* onCloseClientData)   
{  
  // Make sure we're not already being read:  
  if (fIsCurrentlyAwaitingData)     
  {  
    envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";  
    envir().internalError();  
  }  
  
  fTo = to;  
  fMaxSize = maxSize;  
  fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()  
  fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()  
  fAfterGettingFunc = afterGettingFunc;  
  fAfterGettingClientData = afterGettingClientData;  
  fOnCloseFunc = onCloseFunc;  
  fOnCloseClientData = onCloseClientData;  
  fIsCurrentlyAwaitingData = True;  
  
  doGetNextFrame();  
}  




从代码可以知道上面getNextFrame()中传入的第一个参数fBuffer指向了指针fTo,而我们在前面分析代码1.1中的void MultiFramedRTPSource::doGetNextFrame1()函数中有下面一段代码:




//将上面取出的数据包拷贝到fTo指针所指向的地址  
 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,  
   fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,  
   fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,  
   fCurPacketMarkerBit);  




实际上现在应该明白了,从getNextFrame()函数中传入的第一个参数fBuffer最终存储的即是从数据包链表对象中取出的数据,并且在调用上面的use()函数后就可以使用了。
而在void MultiFramedRTPSource::doGetNextFrame1()函数中代码显示的最终调用我们注册的void FileSink::afterGettingFrame()正好是在use()函数调用之后的afterGetting(this)中调用。我们再看看afterGettingFrame()做了什么处理:
void FileSink::afterGettingFrame(void* clientData, unsigned frameSize,  
                 unsigned numTruncatedBytes,  
                 struct timeval presentationTime,  
                 unsigned /*durationInMicroseconds*/)  
{  
  FileSink* sink = (FileSink*)clientData;  
  sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime);  
}  
  
void FileSink::afterGettingFrame(unsigned frameSize,  
                 unsigned numTruncatedBytes,  
                 struct timeval presentationTime)   
{  
  if (numTruncatedBytes > 0)     
  {  
    envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size ("  
        << fBufferSize << ").  "  
            << numTruncatedBytes << " bytes of trailing data was dropped!  Correct this by increasing the \"bufferSize\" parameter in the \"createNew()\" call to at least "  
            << fBufferSize + numTruncatedBytes << "\n";  
  }  
  addData(fBuffer, frameSize, presentationTime);  
  
  if (fOutFid == NULL || fflush(fOutFid) == EOF)     
  {  
    // The output file has closed.  Handle this the same way as if the  
    // input source had closed:  
    onSourceClosure(this);  
  
    stopPlaying();  
    return;  
  }  
  
  if (fPerFrameFileNameBuffer != NULL)     
  {  
    if (fOutFid != NULL) { fclose(fOutFid); fOutFid = NULL; }  
  }  
  
  // Then try getting the next frame:  
  continuePlaying();  
}  




从上面代码可以看到调用了addData()函数将数据保存到文件中,然后继续continuePlaying()又去获取下一帧数据然后处理,直到遇到循环结束然后依次退出调用函数。最后看看addData()函数的实现即可知:
void FileSink::addData(unsigned char const* data, unsigned dataSize,  
               struct timeval presentationTime)   
{  
  if (fPerFrameFileNameBuffer != NULL)     
  {  
    // Special case: Open a new file on-the-fly for this frame  
    sprintf(fPerFrameFileNameBuffer, "%s-%lu.%06lu", fPerFrameFileNamePrefix,  
        presentationTime.tv_sec, presentationTime.tv_usec);  
    fOutFid = OpenOutputFile(envir(), fPerFrameFileNameBuffer);  
  }  
  
  // Write to our file:  
#ifdef TEST_LOSS  
  static unsigned const framesPerPacket = 10;  
  static unsigned const frameCount = 0;  
  static Boolean const packetIsLost;  
  if ((frameCount++)%framesPerPacket == 0)     
  {  
    packetIsLost = (our_random()%10 == 0); // simulate 10% packet loss #####  
  }  
  
  if (!packetIsLost)  
#endif  
  if (fOutFid != NULL && data != NULL)    
  {  
    fwrite(data, 1, dataSize, fOutFid);  
  }  
}  
最后调用系统函数fwrite()实现写入文件功能。
总结:从上面的分析可知,如果要取得从RTSP服务器端接收并保存的数据帧,我们只需要定义一个类并实现如下格式两个的函数,并声明一个指针地址buffer用于指向数据帧,再在continuePlaying()函数中调用getNextFrame(buffer,...)即可。
typedef void (afterGettingFunc)(void* clientData, unsigned frameSize,  
          unsigned numTruncatedBytes,  
          struct timeval presentationTime,  
          unsigned durationInMicroseconds);  
typedef void (onCloseFunc)(void* clientData);  




然后再在afterGettingFunc的函数中即可使用buffer。.










































阅读(1553) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~