void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/)
{
source->networkReadHandler1();
}
//下面这个函数实现的主要功能就是从socket端读取数据并存储数据
void MultiFramedRTPSource::networkReadHandler1()
{
BufferedPacket* bPacket = fPacketReadInProgress;
if (bPacket == NULL)
{
// Normal case: Get a free BufferedPacket descriptor to hold the new network packet:
//分配一块新的存储空间来存储从socket端读取的数据
bPacket = fReorderingBuffer->getFreePacket(this);
}
// Read the network packet, and perform sanity checks on the RTP header:
Boolean readSuccess = False;
do
{
Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;
//fillInData()函数封装了从socket端获取数据的过程,到此函数执行完已经将数据保存到了bPacket对象中
if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete))
{
if (bPacket->bytesAvailable() == 0)
{
envir() << "MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase \"MAX_PACKET_SIZE\"\n";
}
break;
}
if (packetReadWasIncomplete)
{
// We need additional read(s) before we can process the incoming packet:
fPacketReadInProgress = bPacket;
return;
} else
{
fPacketReadInProgress = NULL;
}
//省略关于RTP包的处理
...
...
...
//fReorderingBuffer为MultiFramedRTPSource类中的对象,该对象建立了一个存储Packet数据包对象的链表
//下面的storePacket()函数即将上面获取的数据包存储在链表中
if (!fReorderingBuffer->storePacket(bPacket)) break;
readSuccess = True;
} while (0);
if (!readSuccess) fReorderingBuffer->freePacket(bPacket);
doGetNextFrame1();
// If we didn't get proper data this time, we'll get another chance
}
void MultiFramedRTPSource::doGetNextFrame1()
{
while (fNeedDelivery)
{
// If we already have packet data available, then deliver it now.
Boolean packetLossPrecededThis;
//从fReorderingBuffer对象中取出一个数据包
BufferedPacket* nextPacket
= fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
if (nextPacket == NULL) break;
fNeedDelivery = False;
if (nextPacket->useCount() == 0)
{
// Before using the packet, check whether it has a special header
// that needs to be processed:
unsigned specialHeaderSize;
if (!processSpecialHeader(nextPacket, specialHeaderSize))
{
// Something's wrong with the header; reject the packet:
fReorderingBuffer->releaseUsedPacket(nextPacket);
fNeedDelivery = True;
break;
}
nextPacket->skip(specialHeaderSize);
}
// Check whether we're part of a multi-packet frame, and whether
// there was packet loss that would render this packet unusable:
if (fCurrentPacketBeginsFrame)
{
if (packetLossPrecededThis || fPacketLossInFragmentedFrame)
{
// We didn't get all of the previous frame.
// Forget any data that we used from it:
fTo = fSavedTo; fMaxSize = fSavedMaxSize;
fFrameSize = 0;
}
fPacketLossInFragmentedFrame = False;
} else if (packetLossPrecededThis)
{
// We're in a multi-packet frame, with preceding packet loss
fPacketLossInFragmentedFrame = True;
}
if (fPacketLossInFragmentedFrame)
{
// This packet is unusable; reject it:
fReorderingBuffer->releaseUsedPacket(nextPacket);
fNeedDelivery = True;
break;
}
// The packet is usable. Deliver all or part of it to our caller:
unsigned frameSize;
//将上面取出的数据包拷贝到fTo指针所指向的地址
nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
fCurPacketMarkerBit);
fFrameSize += frameSize;
if (!nextPacket->hasUsableData())
{
// We're completely done with this packet now
fReorderingBuffer->releaseUsedPacket(nextPacket);
}
if (fCurrentPacketCompletesFrame) //如果完整的取出了一帧数据,则可调用需要该帧数据的函数去处理它
{
// We have all the data that the client wants.
if (fNumTruncatedBytes > 0)
{
envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
<< fSavedMaxSize << "). "
<< fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
}
// Call our own 'after getting' function, so that the downstream object can consume the data:
if (fReorderingBuffer->isEmpty())
{
// Common case optimization: There are no more queued incoming packets, so this code will not get
// executed again without having first returned to the event loop. Call our 'after getting' function
// directly, because there's no risk of a long chain of recursion (and thus stack overflow):
afterGetting(this); //调用函数去处理取出的数据帧
} else
{
// Special case: Call our 'after getting' function via the event loop.
nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
(TaskFunc*)FramedSource::afterGetting, this);
}
}
else
{
// This packet contained fragmented data, and does not complete
// the data that the client wants. Keep getting data:
fTo += frameSize; fMaxSize -= frameSize;
fNeedDelivery = True;
}
}
}
//下面这个函数即开始调用执行需要该帧数据的函数
void FramedSource::afterGetting(FramedSource* source)
{
source->fIsCurrentlyAwaitingData = False;
// indicates that we can be read again
// Note that this needs to be done here, in case the "fAfterFunc"
// called below tries to read another frame (which it usually will)
Boolean DummySink::continuePlaying()
{
if (fSource == NULL) return False; // sanity check (should not happen)
// Request the next frame of data from our input source. "afterGettingFrame()" will get called later, when it arrives:
fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,
afterGettingFrame, this,
onSourceClosure, this);
return True;
}
void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
afterGettingFunc* afterGettingFunc,
void* afterGettingClientData,
onCloseFunc* onCloseFunc,
void* onCloseClientData)
{
// Make sure we're not already being read:
if (fIsCurrentlyAwaitingData)
{
envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";
envir().internalError();
}
fTo = to;
fMaxSize = maxSize;
fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()
fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
fAfterGettingFunc = afterGettingFunc;
fAfterGettingClientData = afterGettingClientData;
fOnCloseFunc = onCloseFunc;
fOnCloseClientData = onCloseClientData;
fIsCurrentlyAwaitingData = True;
void FileSink::afterGettingFrame(unsigned frameSize,
unsigned numTruncatedBytes,
struct timeval presentationTime)
{
if (numTruncatedBytes > 0)
{
envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size ("
<< fBufferSize << "). "
<< numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing the \"bufferSize\" parameter in the \"createNew()\" call to at least "
<< fBufferSize + numTruncatedBytes << "\n";
}
addData(fBuffer, frameSize, presentationTime);
if (fOutFid == NULL || fflush(fOutFid) == EOF)
{
// The output file has closed. Handle this the same way as if the
// input source had closed:
onSourceClosure(this);
stopPlaying();
return;
}
if (fPerFrameFileNameBuffer != NULL)
{
if (fOutFid != NULL) { fclose(fOutFid); fOutFid = NULL; }
}
// Then try getting the next frame:
continuePlaying();
}