Chinaunix首页 | 论坛 | 博客
  • 博客访问: 407111
  • 博文数量: 48
  • 博客积分: 1820
  • 博客等级: 上尉
  • 技术积分: 705
  • 用 户 组: 普通用户
  • 注册时间: 2009-12-28 23:10
文章分类

全部博文(48)

文章存档

2012年(1)

2011年(12)

2010年(34)

2009年(1)

分类: 嵌入式

2010-07-23 23:55:30

#include "rtp_service.h"

#define logtrace(x) printf x
#define logerror(x) printf x


static void *  thread_fn(void *arg){
CRtpService* p = (CRtpService*)arg;
p->process_task();;
return ((void *)0);
}

BaseRTPSession::BaseRTPSession():num_dest(0)
{
}

string rtp_error(int rtperr)
{
string ret;
if (rtperr < 0)
{
ret = RTPGetErrorString(rtperr);
std::cout << "ERROR: " << ret << std::endl;
//exit(-1);
}
return ret;
}


void BaseRTPSession::OnNewSource(RTPSourceData *dat)
{
if (dat->IsOwnSSRC())
return;

uint32_t ip;
uint16_t port;
if (dat->GetRTPDataAddress() != 0)
{
const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress());
ip = addr->GetIP();
port = addr->GetPort();
}
else if (dat->GetRTCPDataAddress() != 0)
{
const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress());
ip = addr->GetIP();
port = addr->GetPort()-1;
}
else
return;
RTPIPv4Address dest(ip,port);
AddDestination(dest);

struct in_addr inaddr;
inaddr.s_addr = htonl(ip);
num_dest ++;
std::cout << "Adding destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl;
}

void BaseRTPSession::OnBYEPacket(RTPSourceData *dat)
{
if (dat->IsOwnSSRC())
return;
uint32_t ip;
uint16_t port;
if (dat->GetRTPDataAddress() != 0)
{
const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress());
ip = addr->GetIP();
port = addr->GetPort();
}
else if (dat->GetRTCPDataAddress() != 0)
{
const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress());
ip = addr->GetIP();
port = addr->GetPort()-1;
}
else
return;
RTPIPv4Address dest(ip,port);
DeleteDestination(dest);
struct in_addr inaddr;
inaddr.s_addr = htonl(ip);
num_dest--;
std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl;
}

void BaseRTPSession::OnRemoveSource(RTPSourceData *dat)
{
if (dat->IsOwnSSRC())
return;
if (dat->ReceivedBYE())
return;
uint32_t ip;
uint16_t port;
if (dat->GetRTPDataAddress() != 0)
{
const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress());
ip = addr->GetIP();
port = addr->GetPort();
}
else if (dat->GetRTCPDataAddress() != 0)
{
const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress());
ip = addr->GetIP();
port = addr->GetPort()-1;
}
else
return;
RTPIPv4Address dest(ip,port);
DeleteDestination(dest);
struct in_addr inaddr;
inaddr.s_addr = htonl(ip);
num_dest--;
std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl;
}


CRtpService::CRtpService():started(false),m_timestamp(0)
{
int err = pthread_mutex_init(&m_mutex,NULL);
if(err)
{
logerror(("CRtpService::CRtpService pthread mutex init fail\n"));
return;
}
err = pthread_cond_init(&m_pthread_cond,NULL);
if(err)
{
logerror(("CRtpService::CRtpService pthread cond init fail\n"));
return;
}
logtrace(("CRtpService::CRtpService end ok \n"));
}

CRtpService::~CRtpService()
{
logtrace(("CRtpService::~CRtpService end ok\n"));
if(started) stop();
}


int CRtpService::start(string port)
{
logtrace(("CRtpService::start begin\n"));
int err;
if(started)
return 0;
started = true;
RTPUDPv4TransmissionParams transparams;
RTPSessionParams sessparams;
sessparams.SetOwnTimestampUnit(1.0/10.0);
sessparams.SetAcceptOwnPackets(true);
transparams.SetPortbase(atoi(port.c_str()));
int status = m_rtp_session.Create(sessparams,&transparams);
if(status)
{
string ret = rtp_error(status);
logerror(("CRtpService::start sess create fail=<%s>\n",ret.c_str()));
return -1;
}
err = pthread_create(&m_thread_id,NULL,thread_fn,this);
if(err != 0){
logerror(("can't create thread: %s\n",strerror(err)));
return -1;
}
err = pthread_detach( m_thread_id );
if(err != 0){
logerror(("thread detach fail: %s\n",strerror(err)));
return -1;
}
logtrace(("CRtpService::start end ok \n"));
return 0;
}

int CRtpService::stop()
{
logtrace(("CRtpService::stop begin\n"));
if(!started)
{
return 0;
}
started = false;
pthread_mutex_lock(&m_mutex); 
pthread_cond_signal(&m_pthread_cond);
pthread_mutex_unlock(&m_mutex); 
logtrace(("CRtpService::stop end ok \n"));
return 0;
}


int CRtpService::send_rtp_packet(const string& data)
{
//logtrace(("CRtpService::send_rtp_packet begin data size=<%u>\n",data.size()));
pthread_mutex_lock(&m_mutex); 
m_task_queue.push(data);
pthread_cond_signal(&m_pthread_cond); 
pthread_mutex_unlock(&m_mutex); 

//logtrace(("CRtpService::send_rtp_packet end ok \n"));
return 0;
}

int CRtpService::process_task()
{
logtrace(("CRtpService::process_task begin\n"));
string data;
int status;
bool flag_begin = true;
while(1)
{
m_rtp_session.BeginDataAccess();
//logtrace(("CRtpService::process_task BeginDataAccess\n"));
// check incoming packets
if (m_rtp_session.GotoFirstSourceWithData())
{
do
{
RTPPacket *pack;
while ((pack = m_rtp_session.GetNextPacket()) != NULL)
{
// You can examine the data here
//logtrace(("Got packet !\n"));
// we don't longer need the packet, so
// we'll delete it
m_rtp_session.DeletePacket(pack);
}
} while (m_rtp_session.GotoNextSourceWithData());
}
m_rtp_session.EndDataAccess();
status = m_rtp_session.Poll();
if(status)
{
logerror(("process_task::Poll error<%s>",rtp_error(status).c_str()));
}
if( data.empty() ){
m_timestamp = 1;
flag_begin = true;
pthread_mutex_lock(&m_mutex);
while(m_task_queue.empty()){
   pthread_cond_wait(&m_pthread_cond, &m_mutex);  
   if(!started)
   {
    m_rtp_session.BYEDestroy(RTPTime(10,0),0,0);
    pthread_mutex_unlock (&m_mutex);
    return 0;
   }
}//! end while
if(!m_task_queue.empty()){
   data =  m_task_queue.front();
   m_task_queue.pop();   
   
}
pthread_mutex_unlock (&m_mutex);
}//! end if( data.empty() )
else{
m_timestamp = 0;
flag_begin = false;
}
m_timestamp=10;
do{
int size = data.size(); 
if(size > 1000)
{
size = 1000;
}
//logtrace(("process_task::size=<%d> m_timestamp=<%ld>\n",size,m_timestamp));
status = m_rtp_session.SendPacket(data.data(),size,0,false,m_timestamp);
if(status)
{
logerror(("process_task::SendPacket error<%s>",rtp_error(status).c_str()));
continue;
}
data = data.substr(size);
m_timestamp = 0;
usleep(1000);
}while(!data.empty());
}
logtrace(("CRtpService::process_task end ok \n"));
return 0;
}







阅读(1557) | 评论(0) | 转发(3) |
给主人留下些什么吧!~~