Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1154003
  • 博文数量: 177
  • 博客积分: 761
  • 博客等级: 上士
  • 技术积分: 1518
  • 用 户 组: 普通用户
  • 注册时间: 2011-02-04 22:37
文章分类

全部博文(177)

文章存档

2017年(1)

2016年(3)

2015年(33)

2014年(48)

2013年(60)

2012年(32)

分类: LINUX

2014-02-15 13:54:19

原文地址:tipc编程 作者:qqrilxk

TIPC是爱立信的某个工程师弄出来的,后来开源了。这段时间我琢磨了一下,觉得这个玩意还真不错。TIPC是Transparent Interprocess Communication的缩写,即是进程间通信的一种协议,之所以冠之以Transparent,透明的,因为它发布了一层更为简洁实用的框架,让使用的人不再需要知道某个进程运行在哪一台机子上,就能够和与这个进程通信。TIPC本质上是用socket实现的,而且现在已经成为linux内核的一部分,足以见得是好东西。

  平时我们使用的socket,TCP也好,UDP也好,用来标识一对socket的通信,无非是用两个socket的IP地址和端口号。比如使用UDP的socket,要发一个datagram到另一个socket,需要指定对端的地址,这个地址是由那台机的IP和端口组成。socket是在内核中管理,当内核检测到socket有数据可读时,就会通知拥有这个socket的进程去读取里面的数据。

  这里的不方便之处在于,要指定对端地址,我们必须知道这个socket在哪台机,端口是多少,才能发送数据出去。能不能只提供一些应用层的信息,就可以让内核自己去查到socket的位置,再把消息发过去?TIPC做的就是这样的事。使用TIPC,我们在创建socket的时候在内核中注册自己的服务类型service type,那么在发送端,只需要指定服务类型就可以由内核路由到相应的socket。这个时候,对应用层来说,对端地址仅仅是一个服务类型service type!很显然,内核维护着这么一张TIPC的路由表,即由服务去查找socket。而每台机都有这样的路由表,他们之间信息就像能够共享一样地为整个集群的TIPC socket服务。有了TIPC,这个socket使用了哪个IP,那个端口,我们都不再需要知道,很好很强大。

  TIPC的其他的特性还有:

  1. 有些时候多个进程提供同样的服务,仅仅是为了负载平衡或其他原因,这种情况可以用一个整数变量instance来标识不同socket,但是指定同样的service type。这个时候socket的地址是由service type和instance共同来指定。发送数据时候只需要指定service type和一个instance的值,也可以指定service type和instance的一个区间。对于后者,就是broadcast你的datagram。

  2. 管理前面说的TIPC路由表的是内核当中的一个进程叫做name server。它知晓着集群中所有的TIPC socket。在发送datagram给服务某个service的socket之前,你可以向它请求服务这个service的socket是否已经在工作了,它会告诉你service的状态。并且注册了一个observer,当你关心的socket起来之后发消息通知你,这样就可以避免你把datagram发给一个根本不存在的socket。

 

TIPC安装及配置

 

Ø  编译内核:

步骤:

1.cp tipc-1.7.5.tar /usr/src/linux-2.6.16.21-0.8

2.tar -xvf tipc-1.7.5.tar

3.cd /usr/src/linux-2.6.16.21-0.8

4.进入/usr/src/linux-2.6.16.21-0.8/

5.make menuconfig  

6.make

7.make bzImage

8.make modules

9.make modules_install

10.make install

 

Ø  安装配置:

  步骤:

1.ticp的压缩包放在/usr/src/linux-2.6.16.21-0.8下,

  2. tar -xvf tipc-1.7.5.tar

  3.cd  /usr/src/linux-2.6.16.21-0.8/net/tipc

4. make (现在1.7.5已经提供提供了可以配置的选项,在make之前可以使用make menuconfig进行配置,默认配置也可以用。)

  5.insmod. tipc.ko

  6.编译tipc提供的util配置工具。

  7. ./tipc-config -netid=1234 -a=1.1.5    配置本节点的id1234 地址是1.1.5

  8. ./tipc-config -be=eth:eth0/1.1.0  配置通信方式,通过本机的第一个以太网口进行通信。只向本集群内的节点进行通信。配置成1.1.8标识指向1.1.8节点进行通信,1.0.0表示向本zone内的各个集群都可以通信,0.0.0则表示向任何地方通信。

 

 

2TIPCsocket接口分析:

 

l  socket地址

socket地址是个16字节的结构,如下所示:

struct sockaddr

{
   unsigned short sa_family; /* 地址家族, AF_xxx */
   char sa_data[14]; /*14字节协议地址*/
};

为了编程可以更加方便通常我们会针对不同的协议定义不同的并行结构,如tipcsocket地址定义如下:

struct sockaddr_tipc {

    unsigned short family;          /* AF_TIPC */

unsigned char  addrtype;        /* TIPC_ADDR_NAME */

    signed   char  scope;           /* TIPC_CLUSTER_SCOPE */

    union {

        struct tipc_portid id;

        struct {

            unsigned long type;

            unsigned long instance;

            unsigned long domain; /* 0: own zone */

        } name;

    } addr;

};

l  tipc的地址:

tipc的地址描述如下:portZ表示zone, C表示cluster,N表示nodePort可以看成是一个socket

当启动tipc服务的时候,确定当前node(可以是一块单板,也可以是一台主机)的地址。我们创建一个socket就创建了一个portTipc提供名字服务,在sockaddr_tipc中指定name,把sockaddr_tipcport绑定起来,向指定的名字发送消息就会发到对应的socket,而下层的则不需要用户去感知。名字和端口支持1对多的关系,一个名字对应多个端口

 

l  创建socket

int socket(int domain, int type, int protocol);这个函数的目的是创建一个socket,然后返回一个socket描述符。Domain在这里是AF_TIPCtype通常有SOCK_STREAMSOCK_DGRAM两种,表示面向流或面向包。Protocol通常设置为0

 

l  bind操作。

一旦有一个socket,我们可能要将socket和机器上的某个地址关联起来,这个操作由bind来完成。

Bindconnect,原socket在应用bind的时候我们需要自己指定端口,connect则会自动为我们选定接口,但对于tipc应用层,不存在这种关系。

 

l  getsockname,根据socket描述符获取当前的地址。

 

l  setsockopt

其参数如下:int sockfd, int level, int optname, const void *optval, socklen_t optlen

levelSOL_TIPC,也就是使用tipc时,optname有如下值可以选择。

1)      TIPC_IMPORTANCE

          这个值用来标识本socket消息的重要性,设置为重要时本socket在发生拥塞时,消息丢失的可能性很小。 从下面的字面意思可以看出其含义,不再赘述。

TIPC_LOW_IMPORTANCE,      低优先级

TIPC_MEDIUM_IMPORTANCE,  中优先级

TIPC_HIGH_IMPORTANCE      高优先级

TIPC_CRITICAL_IMPORTANCE. 紧急优先级

 

默认是TIPC_LOW_IMPORTANCE

 

2TIPC_SRC_DROPPABLE

      同样是作为拥塞控制,如果设置为此值,则在拥塞发生时,tipc会丢弃消息,否则,将吧消息放入队列缓存。

   

 默认情况下: SOCK_SEQPACKET, SOCK_STREAM, SOCK_RDM 三种传输方式,也就是可靠链接,则将消息缓存,对SOCK_DGRAM,也就是不可靠链接,将消息丢弃。

3TIPC_DEST_DROPPABLE

     仍然为拥塞控制服务。针对下面三种情况有用,消息不能发送到目的地址,或者目的地址不存在,或者目的地址发生了拥塞。如果使能这个功能,在发生以上三种情况是,消息将被丢弃,否则会将消息返回给发送者。

      默认情况下:SOCK_SEQPACKET SOCK_STREAM两种传输方式,返回给发送者。

  SOCK_RDM and SOCK_DGRAM则将消息丢弃。这样做的目的是在在使用面向链接的情况下发生通信失败时进行合适的处理,同时不增加面向无链接的情况下通信失败的处理的复杂性。

3TIPC_CONN_TIMEOUT

设置connect的超时时间,单位是毫秒

sendto时,这个选项是没有意义的。

 

编程示例:

service.c

#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define TEST_MSG_SIZE 2048
#define TEST_READ_TIMES 10
#define TEST_MSG_LEN 64*1024
typedef struct tagDmslLogiLinkAddrTipc
{
    int  uiType;
    int  uiInstance;
}TIPC_ADDR_STR;

int dmsGetSockAddr(struct sockaddr* SocketAddr, TIPC_ADDR_STR *pstLogilinkAdds)
{
    struct sockaddr_tipc* addr = (struct sockaddr_tipc*)SocketAddr;

    /*设定监听地址*/
    if((0 == addr)||(0 == pstLogilinkAdds))
    {
        return 10000;
    }


    memset(addr, 0, sizeof(struct sockaddr_tipc));

    addr->family   = AF_TIPC;
    addr->addrtype = TIPC_ADDR_NAME;
    addr->addr.name.name.type = pstLogilinkAdds->uiType;
    addr->addr.name.name.instance = pstLogilinkAdds->uiInstance;
    addr->addr.name.domain = 0;
    addr->scope = 2;

    return 0;
}

 

int main()
{
    int uiRet,stSocket,uiNum,n,i;
    struct sockaddr location_addr, TagSockAddr;
    struct sockaddr_tipc servAddr;
    TIPC_ADDR_STR* pstPortName;
    void *pMsg;
    fd_set  fdReadSet;
    struct timeval  stwait;
    socklen_t addrlen = sizeof(struct sockaddr);
    static  int uiMsgNum = 0;
    char Buf[TEST_MSG_SIZE] ;
    struct msghdr stMsg;
    struct iovec  astIovec[1];
   
    pstPortName = malloc(sizeof(TIPC_ADDR_STR));
    pstPortName->uiType = 0x80000001;
    pstPortName->uiInstance = 130987;

    stSocket = socket(AF_TIPC, SOCK_RDM, 0);
    if (-1 == stSocket)
    {
        perror("socket");
        return -1;
    }

    uiRet = dmsGetSockAddr(&location_addr, pstPortName);
    if(0 != uiRet)
    {
        printf("dmsGetSockAddr Failed 0x%x",uiRet);
        return -1;
    }

    uiRet = bind(stSocket, (__CONST_SOCKADDR_ARG)&location_addr, (socklen_t)sizeof(location_addr));
    if (0 != uiRet)
    {
        printf("bind Failed 0x%x",uiRet);
        return -1;
    }

    pMsg = malloc(TEST_MSG_SIZE);
    if (NULL == pMsg)
    {
        printf("malloc Failed");
        return -1;
    }

    while(1)
    {
        FD_ZERO( &fdReadSet );
        FD_SET(stSocket, &fdReadSet);
        stwait.tv_sec = 0;
        stwait.tv_usec = 10000;

 //       if(select(stSocket + 1, &fdReadSet, (fd_set*)0, (fd_set*)0, &stwait)>0)
        {
//            uiNum = 0;
//            while (uiNum < TEST_READ_TIMES)
            {
                n = recvfrom(stSocket, pMsg, TEST_MSG_LEN, 0, ( __SOCKADDR_ARG)(struct sockaddr_tipc *)&servAddr, (unsigned int *)(size_t*)&addrlen);
                if (n <= 0)
                {
                    continue;
                }
//                uiMsgNum += n / TEST_MSG_SIZE;
                pstPortName->uiType = 0x70000001;
                pstPortName->uiInstance = 130986;
                uiRet = dmsGetSockAddr(&TagSockAddr, pstPortName);
                if(0 != uiRet)
                {
                    printf("dmsGetSockAddr Failed 0x%x",uiRet);
                    return -1;
                }
//                uiNum++;
//                for(i = 0;i < uiMsgNum;i++)
                {
//                    pBuf = malloc(TEST_MSG_SIZE);
//                    memset(pBuf, 0, TEST_MSG_SIZE);
                    sendto(stSocket, (void*)Buf, TEST_MSG_SIZE, 0,
                        (struct sockaddr*)&TagSockAddr, sizeof(TagSockAddr));
//                    printf("send msg num is %d\n",i);
                }

            }
        }
    }
   
    return 0;
}

 

 

client.c

#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include


#define TEST_MSG_LEN 64*1024
#define TEST_MSG_SIZE 2048
typedef struct tagDmslLogiLinkAddrTipc
{
    int  uiType;
    int  uiInstance;
}TIPC_LOGI_ADDR_STR;

//typedef void *( * OSAL_LINUX_TASK_ENTRY )( VOS_VOID * );

int stSocket;

int dmsGetSockAddr(struct sockaddr* SocketAddr, TIPC_LOGI_ADDR_STR *pstLogilinkAdds)
{
    struct sockaddr_tipc* addr = (struct sockaddr_tipc*)SocketAddr;

    /*设定监听地址*/
    if((0 == addr)||(0 == pstLogilinkAdds))
    {
        return -1;
    }


    memset(addr, 0, sizeof(struct sockaddr_tipc));

    addr->family   = AF_TIPC;
    addr->addrtype = TIPC_ADDR_NAME;
    addr->addr.name.name.type = pstLogilinkAdds->uiType;
    addr->addr.name.name.instance = pstLogilinkAdds->uiInstance;
    addr->addr.name.domain = 0;
    addr->scope = 2;

    return 0;
}

void  RecvProc(void *pThis)
{
    fd_set  fdReadSet;
    struct timeval  stwait;
    int uiNum,n,uiSecd;
    static  int uiMsgNum = 0;
    static struct timeval stEndtimeFei,stStarttimeFei;
    struct sockaddr_tipc servAddr;
    socklen_t addrlen = sizeof(struct sockaddr);
    void *pMsg;
    printf("------------");
    pMsg = malloc(TEST_MSG_SIZE);
    if (NULL == pMsg)
    {
        printf("malloc Failed");
        return ;
    }

    while(1)
    {
        FD_ZERO( &fdReadSet );
        FD_SET(stSocket, &fdReadSet);
        stwait.tv_sec = 0;
        stwait.tv_usec = 10000;

//        if(select(stSocket + 1, &fdReadSet, (fd_set*)0, (fd_set*)0, &stwait)>0)
        {
 //           uiNum = 0;
 //           while (uiNum < TEST_READ_TIMES)
            {
                n = recvfrom(stSocket, pMsg, TEST_MSG_LEN, 0, ( __SOCKADDR_ARG)(struct sockaddr_tipc *)&servAddr, (unsigned int *)(size_t*)&addrlen);
                if (n <= 0)
                {
                    continue;
                }
                if(uiMsgNum == 0)
                {
                    gettimeofday(&stStarttimeFei,NULL);
                }               
//                uiMsgNum += n / TEST_MSG_SIZE;
                uiMsgNum++;
//                printf("recv msg num is %d\n",uiMsgNum);
//                uiNum++;
                if((uiMsgNum%200000) == 0)
                {
                    gettimeofday(&stEndtimeFei,NULL);
                    uiSecd = (stEndtimeFei.tv_sec - stStarttimeFei.tv_sec)*1000 + (stEndtimeFei.tv_usec - stStarttimeFei.tv_usec)/1000 ;
                    printf("The flux is %u\n",200000/uiSecd);
                    gettimeofday(&stStarttimeFei,NULL);
                }
            }
        }
    }

}

int main(int ac, char **av)
{
    pthread_attr_t attr;    /* attributes for thread */
    pthread_t tid;
    void *pThis;
    int uiRet,uiSize,i,j;
    struct sockaddr location_addr, TagSockAddr;
    TIPC_LOGI_ADDR_STR* pstPortName;
    TIPC_LOGI_ADDR_STR* pstDstPortName;
    char Buf[TEST_MSG_SIZE] ;
    struct msghdr stMsg;
    int iNum;

    iNum = atoi(av[1])

    uiSize = TEST_MSG_SIZE;

   
    pstPortName = malloc(sizeof(TIPC_LOGI_ADDR_STR));
    pstDstPortName = malloc(sizeof(TIPC_LOGI_ADDR_STR));
    pstPortName->uiType = 0x70000001;
    pstPortName->uiInstance = 130986;

    stSocket = socket(AF_TIPC, SOCK_RDM, 0);
    if (-1 == stSocket)
    {
        perror("socket");
        exit(1);
    }

    uiRet = dmsGetSockAddr(&location_addr, pstPortName);
    if(0 != uiRet)
    {
        printf("\r\ndmsGetSockAddr Failed 0x%x",uiRet);
        return -1;
    }

    uiRet = bind(stSocket, (__CONST_SOCKADDR_ARG)&location_addr, (socklen_t)sizeof(location_addr));
    if (0 != uiRet)
    {
        perror("bind");
        exit(1);
    }
    int svErrNo = pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);  

    uiRet = pthread_create(&tid, &attr, (void*)&RecvProc, pThis);
    printf("\r\n %u",uiRet);
    fflush(stdout);

    pstDstPortName->uiType = 0x80000001;
    pstDstPortName->uiInstance = 130987;
    if (0 != dmsGetSockAddr(&TagSockAddr, pstDstPortName))
    {       
        printf("\r\ndmsGetSockAddr Failed 0x%x",uiRet);
        return -1;
    }
//    struct iovec  astIovec[1];
    for (j= 0; ;j++)
    {
        if(j%iNum != 0)
        {
/*            pBuf = malloc(uiSize);
            memset(pBuf, 0, uiSize);*/
            sendto(stSocket, (void*)Buf, uiSize, 0,
                        (struct sockaddr*)&TagSockAddr, sizeof(TagSockAddr));

            /*当前只使用发送一个分片接口*/
            //uiRet = sendto(stSocket, pBuf, uiSize, 0, (__CONST_SOCKADDR_ARG)(struct sockaddr_tipc *)&TagSockAddr, sizeof(TagSockAddr));
            if (0>uiRet)
            {
                perror("sendmsg");
                return 10000;
            }
//            printf("send num is %d",j);
        }
        else
        {
            usleep(4000);
        }
    }
    return 0;
}

 

阅读(6456) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~