分类: LINUX
2011-12-14 10:08:51
TIPC是爱立信的某个工程师弄出来的,后来开源了。这段时间我琢磨了一下,觉得这个玩意还真不错。TIPC是Transparent Interprocess Communication的缩写,即是进程间通信的一种协议,之所以冠之以Transparent,透明的,因为它发布了一层更为简洁实用的框架,让使用的人不再需要知道某个进程运行在哪一台机子上,就能够和与这个进程通信。TIPC本质上是用socket实现的,而且现在已经成为linux内核的一部分,足以见得是好东西。
平时我们使用的socket,TCP也好,UDP也好,用来标识一对socket的通信,无非是用两个socket的IP地址和端口号。比如使用UDP的socket,要发一个datagram到另一个socket,需要指定对端的地址,这个地址是由那台机的IP和端口组成。socket是在内核中管理,当内核检测到socket有数据可读时,就会通知拥有这个socket的进程去读取里面的数据。
这里的不方便之处在于,要指定对端地址,我们必须知道这个socket在哪台机,端口是多少,才能发送数据出去。能不能只提供一些应用层的信息,就可以让内核自己去查到socket的位置,再把消息发过去?TIPC做的就是这样的事。使用TIPC,我们在创建socket的时候在内核中注册自己的服务类型service type,那么在发送端,只需要指定服务类型就可以由内核路由到相应的socket。这个时候,对应用层来说,对端地址仅仅是一个服务类型service type!很显然,内核维护着这么一张TIPC的路由表,即由服务去查找socket。而每台机都有这样的路由表,他们之间信息就像能够共享一样地为整个集群的TIPC socket服务。有了TIPC,这个socket使用了哪个IP,那个端口,我们都不再需要知道,很好很强大。
TIPC的其他的特性还有:
1. 有些时候多个进程提供同样的服务,仅仅是为了负载平衡或其他原因,这种情况可以用一个整数变量instance来标识不同socket,但是指定同样的service type。这个时候socket的地址是由service type和instance共同来指定。发送数据时候只需要指定service type和一个instance的值,也可以指定service type和instance的一个区间。对于后者,就是broadcast你的datagram。
2. 管理前面说的TIPC路由表的是内核当中的一个进程叫做name server。它知晓着集群中所有的TIPC socket。在发送datagram给服务某个service的socket之前,你可以向它请求服务这个service的socket是否已经在工作了,它会告诉你service的状态。并且注册了一个observer,当你关心的socket起来之后发消息通知你,这样就可以避免你把datagram发给一个根本不存在的socket。
TIPC安装及配置
Ø 编译内核:
步骤:
1.cp tipc-1.7.5.tar /usr/src/linux-2.6.16.21-0.8
2.tar -xvf tipc-1.7.5.tar
3.cd /usr/src/linux-2.6.16.21-0.8
4.进入/usr/src/linux-2.6.16.21-0.8/
5.make menuconfig
6.make
7.make bzImage
8.make modules
9.make modules_install
10.make install
Ø 安装配置:
步骤:
1.将ticp的压缩包放在/usr/src/linux-2.6.16.21-0.8下,
2. tar -xvf tipc-1.7.5.tar
3.cd /usr/src/linux-2.6.16.21-0.8/net/tipc
4. make (现在1.7.5已经提供提供了可以配置的选项,在make之前可以使用make menuconfig进行配置,默认配置也可以用。)
5.insmod. tipc.ko
6.编译tipc提供的util配置工具。
7. ./tipc-config -netid=1234 -a=1.1.5 配置本节点的id为1234 地址是1.1.5
8. ./tipc-config -be=eth:eth0/1.1.0 配置通信方式,通过本机的第一个以太网口进行通信。只向本集群内的节点进行通信。配置成1.1.8标识指向1.1.8节点进行通信,1.0.0表示向本zone内的各个集群都可以通信,0.0.0则表示向任何地方通信。
2,TIPC的socket接口分析:
l socket地址
socket地址是个16字节的结构,如下所示:
struct sockaddr
{
unsigned short sa_family; /* 地址家族, AF_xxx */
char sa_data[14]; /*14字节协议地址*/
};
为了编程可以更加方便通常我们会针对不同的协议定义不同的并行结构,如tipc的socket地址定义如下:
struct sockaddr_tipc {
unsigned short family; /* AF_TIPC */
unsigned char addrtype; /* TIPC_ADDR_NAME */
signed char scope; /* TIPC_CLUSTER_SCOPE */
union {
struct tipc_portid id;
struct {
unsigned long type;
unsigned long instance;
unsigned long domain; /* 0: own zone */
} name;
} addr;
};
l tipc的地址:
tipc的地址描述如下
当启动tipc服务的时候,确定当前node(可以是一块单板,也可以是一台主机)的
l 创建socket
int socket(int domain, int type, int protocol);这个函数的目的是创建一个socket,然后返回一个socket描述符。Domain在这里是AF_TIPC,type通常有SOCK_STREAM,SOCK_DGRAM两种,表示面向流或面向包。Protocol通常设置为0。
l bind操作。
一旦有一个socket,我们可能要将socket和机器上的某个地址关联起来,这个操作由bind来完成。
Bind和connect,原socket在应用bind的时候我们需要自己指定端口,connect则会自动为我们选定接口,但对于tipc应用层,不存在这种关系。
l getsockname,根据socket描述符获取当前的地址。
l setsockopt
其参数如下:int sockfd, int level, int optname, const void *optval, socklen_t optlen
当level为SOL_TIPC,也就是使用tipc时,optname有如下值可以选择。
1) TIPC_IMPORTANCE
这个值用来标识本socket消息的重要性,设置为重要时本socket在发生拥塞时,消息丢失的可能性很小。 从下面的字面意思可以看出其含义,不再赘述。
TIPC_LOW_IMPORTANCE, 低优先级
TIPC_MEDIUM_IMPORTANCE, 中优先级
TIPC_HIGH_IMPORTANCE 高优先级
TIPC_CRITICAL_IMPORTANCE. 紧急优先级
默认是TIPC_LOW_IMPORTANCE
2)TIPC_SRC_DROPPABLE
同样是作为拥塞控制,如果设置为此值,则在拥塞发生时,tipc会丢弃消息,否则,将吧消息放入队列缓存。
默认情况下: 对SOCK_SEQPACKET, SOCK_STREAM, SOCK_RDM 三种传输方式,也就是可靠链接,则将消息缓存,对SOCK_DGRAM,也就是不可靠链接,将消息丢弃。
3)TIPC_DEST_DROPPABLE
仍然为拥塞控制服务。针对下面三种情况有用,消息不能发送到目的地址,或者目的地址不存在,或者目的地址发生了拥塞。如果使能这个功能,在发生以上三种情况是,消息将被丢弃,否则会将消息返回给发送者。
默认情况下:SOCK_SEQPACKET ,SOCK_STREAM两种传输方式,返回给发送者。
对SOCK_RDM and SOCK_DGRAM则将消息丢弃。这样做的目的是在在使用面向链接的情况下发生通信失败时进行合适的处理,同时不增加面向无链接的情况下通信失败的处理的复杂性。
3)TIPC_CONN_TIMEOUT
设置connect的超时时间,单位是毫秒
用sendto时,这个选项是没有意义的。
编程示例:
service.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define TEST_MSG_SIZE 2048
#define TEST_READ_TIMES 10
#define TEST_MSG_LEN 64*1024
typedef struct tagDmslLogiLinkAddrTipc
{
int uiType;
int uiInstance;
}TIPC_ADDR_STR;
int dmsGetSockAddr(struct sockaddr* SocketAddr, TIPC_ADDR_STR *pstLogilinkAdds)
{
struct sockaddr_tipc* addr = (struct sockaddr_tipc*)SocketAddr;
/*设定监听地址*/
if((0 == addr)||(0 == pstLogilinkAdds))
{
return 10000;
}
memset(addr, 0, sizeof(struct sockaddr_tipc));
addr->family = AF_TIPC;
addr->addrtype = TIPC_ADDR_NAME;
addr->addr.name.name.type = pstLogilinkAdds->uiType;
addr->addr.name.name.instance = pstLogilinkAdds->uiInstance;
addr->addr.name.domain = 0;
addr->scope = 2;
return 0;
}
int main()
{
int uiRet,stSocket,uiNum,n,i;
struct sockaddr location_addr, TagSockAddr;
struct sockaddr_tipc servAddr;
TIPC_ADDR_STR* pstPortName;
void *pMsg;
fd_set fdReadSet;
struct timeval stwait;
socklen_t addrlen = sizeof(struct sockaddr);
static int uiMsgNum = 0;
char Buf[TEST_MSG_SIZE] ;
struct msghdr stMsg;
struct iovec astIovec[1];
pstPortName = malloc(sizeof(TIPC_ADDR_STR));
pstPortName->uiType = 0x80000001;
pstPortName->uiInstance = 130987;
stSocket = socket(AF_TIPC, SOCK_RDM, 0);
if (-1 == stSocket)
{
perror("socket");
return -1;
}
uiRet = dmsGetSockAddr(&location_addr, pstPortName);
if(0 != uiRet)
{
printf("dmsGetSockAddr Failed 0x%x",uiRet);
return -1;
}
uiRet = bind(stSocket, (__CONST_SOCKADDR_ARG)&location_addr, (socklen_t)sizeof(location_addr));
if (0 != uiRet)
{
printf("bind Failed 0x%x",uiRet);
return -1;
}
pMsg = malloc(TEST_MSG_SIZE);
if (NULL == pMsg)
{
printf("malloc Failed");
return -1;
}
while(1)
{
FD_ZERO( &fdReadSet );
FD_SET(stSocket, &fdReadSet);
stwait.tv_sec = 0;
stwait.tv_usec = 10000;
// if(select(stSocket + 1, &fdReadSet, (fd_set*)0, (fd_set*)0, &stwait)>0)
{
// uiNum = 0;
// while (uiNum < TEST_READ_TIMES)
{
n = recvfrom(stSocket, pMsg, TEST_MSG_LEN, 0, ( __SOCKADDR_ARG)(struct sockaddr_tipc *)&servAddr, (unsigned int *)(size_t*)&addrlen);
if (n <= 0)
{
continue;
}
// uiMsgNum += n / TEST_MSG_SIZE;
pstPortName->uiType = 0x70000001;
pstPortName->uiInstance = 130986;
uiRet = dmsGetSockAddr(&TagSockAddr, pstPortName);
if(0 != uiRet)
{
printf("dmsGetSockAddr Failed 0x%x",uiRet);
return -1;
}
// uiNum++;
// for(i = 0;i < uiMsgNum;i++)
{
// pBuf = malloc(TEST_MSG_SIZE);
// memset(pBuf, 0, TEST_MSG_SIZE);
sendto(stSocket, (void*)Buf, TEST_MSG_SIZE, 0,
(struct sockaddr*)&TagSockAddr, sizeof(TagSockAddr));
// printf("send msg num is %d\n",i);
}
}
}
}
return 0;
}
client.c
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define TEST_MSG_LEN 64*1024
#define TEST_MSG_SIZE 2048
typedef struct tagDmslLogiLinkAddrTipc
{
int uiType;
int uiInstance;
}TIPC_LOGI_ADDR_STR;
//typedef void *( * OSAL_LINUX_TASK_ENTRY )( VOS_VOID * );
int stSocket;
int dmsGetSockAddr(struct sockaddr* SocketAddr, TIPC_LOGI_ADDR_STR *pstLogilinkAdds)
{
struct sockaddr_tipc* addr = (struct sockaddr_tipc*)SocketAddr;
/*设定监听地址*/
if((0 == addr)||(0 == pstLogilinkAdds))
{
return -1;
}
memset(addr, 0, sizeof(struct sockaddr_tipc));
addr->family = AF_TIPC;
addr->addrtype = TIPC_ADDR_NAME;
addr->addr.name.name.type = pstLogilinkAdds->uiType;
addr->addr.name.name.instance = pstLogilinkAdds->uiInstance;
addr->addr.name.domain = 0;
addr->scope = 2;
return 0;
}
void RecvProc(void *pThis)
{
fd_set fdReadSet;
struct timeval stwait;
int uiNum,n,uiSecd;
static int uiMsgNum = 0;
static struct timeval stEndtimeFei,stStarttimeFei;
struct sockaddr_tipc servAddr;
socklen_t addrlen = sizeof(struct sockaddr);
void *pMsg;
printf("------------");
pMsg = malloc(TEST_MSG_SIZE);
if (NULL == pMsg)
{
printf("malloc Failed");
return ;
}
while(1)
{
FD_ZERO( &fdReadSet );
FD_SET(stSocket, &fdReadSet);
stwait.tv_sec = 0;
stwait.tv_usec = 10000;
// if(select(stSocket + 1, &fdReadSet, (fd_set*)0, (fd_set*)0, &stwait)>0)
{
// uiNum = 0;
// while (uiNum < TEST_READ_TIMES)
{
n = recvfrom(stSocket, pMsg, TEST_MSG_LEN, 0, ( __SOCKADDR_ARG)(struct sockaddr_tipc *)&servAddr, (unsigned int *)(size_t*)&addrlen);
if (n <= 0)
{
continue;
}
if(uiMsgNum == 0)
{
gettimeofday(&stStarttimeFei,NULL);
}
// uiMsgNum += n / TEST_MSG_SIZE;
uiMsgNum++;
// printf("recv msg num is %d\n",uiMsgNum);
// uiNum++;
if((uiMsgNum%200000) == 0)
{
gettimeofday(&stEndtimeFei,NULL);
uiSecd = (stEndtimeFei.tv_sec - stStarttimeFei.tv_sec)*1000 + (stEndtimeFei.tv_usec - stStarttimeFei.tv_usec)/1000 ;
printf("The flux is %u\n",200000/uiSecd);
gettimeofday(&stStarttimeFei,NULL);
}
}
}
}
}
int main(int ac, char **av)
{
pthread_attr_t attr; /* attributes for thread */
pthread_t tid;
void *pThis;
int uiRet,uiSize,i,j;
struct sockaddr location_addr, TagSockAddr;
TIPC_LOGI_ADDR_STR* pstPortName;
TIPC_LOGI_ADDR_STR* pstDstPortName;
char Buf[TEST_MSG_SIZE] ;
struct msghdr stMsg;
int iNum;
iNum = atoi(av[1])
uiSize = TEST_MSG_SIZE;
pstPortName = malloc(sizeof(TIPC_LOGI_ADDR_STR));
pstDstPortName = malloc(sizeof(TIPC_LOGI_ADDR_STR));
pstPortName->uiType = 0x70000001;
pstPortName->uiInstance = 130986;
stSocket = socket(AF_TIPC, SOCK_RDM, 0);
if (-1 == stSocket)
{
perror("socket");
exit(1);
}
uiRet = dmsGetSockAddr(&location_addr, pstPortName);
if(0 != uiRet)
{
printf("\r\ndmsGetSockAddr Failed 0x%x",uiRet);
return -1;
}
uiRet = bind(stSocket, (__CONST_SOCKADDR_ARG)&location_addr, (socklen_t)sizeof(location_addr));
if (0 != uiRet)
{
perror("bind");
exit(1);
}
int svErrNo = pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
uiRet = pthread_create(&tid, &attr, (void*)&RecvProc, pThis);
printf("\r\n %u",uiRet);
fflush(stdout);
pstDstPortName->uiType = 0x80000001;
pstDstPortName->uiInstance = 130987;
if (0 != dmsGetSockAddr(&TagSockAddr, pstDstPortName))
{
printf("\r\ndmsGetSockAddr Failed 0x%x",uiRet);
return -1;
}
// struct iovec astIovec[1];
for (j= 0; ;j++)
{
if(j%iNum != 0)
{
/* pBuf = malloc(uiSize);
memset(pBuf, 0, uiSize);*/
sendto(stSocket, (void*)Buf, uiSize, 0,
(struct sockaddr*)&TagSockAddr, sizeof(TagSockAddr));
/*当前只使用发送一个分片接口*/
//uiRet = sendto(stSocket, pBuf, uiSize, 0, (__CONST_SOCKADDR_ARG)(struct sockaddr_tipc *)&TagSockAddr, sizeof(TagSockAddr));
if (0>uiRet)
{
perror("sendmsg");
return 10000;
}
// printf("send num is %d",j);
}
else
{
usleep(4000);
}
}
return 0;
}