Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1800655
  • 博文数量: 306
  • 博客积分: 3133
  • 博客等级: 中校
  • 技术积分: 3932
  • 用 户 组: 普通用户
  • 注册时间: 2009-04-19 16:50
文章分类

全部博文(306)

文章存档

2018年(7)

2017年(18)

2016年(39)

2015年(35)

2014年(52)

2013年(39)

2012年(22)

2011年(29)

2010年(53)

2009年(12)

分类: LINUX

2015-02-02 10:54:41

原创文章,转载请注明: 转载自

本文链接地址: 

前面我介绍过google对内核协议栈的patch,RPS,它主要是为了软中断的负载均衡,这次继续来介绍google 的对RPS的增强path RFS(receive flow steering),RPS是把软中断map到对应cpu,而这个时候还会有另外的性能影响,那就是如果应用程序所在的cpu和软中断处理的cpu不是同一个,此时对于cpu cache的影响会很大。 这里要注意,在kernel 的2.6.35中 这两个patch已经加入了。

ok,先来描述下它是怎么做的,其实这个补丁很简单,想对于rps来说就是添加了一个cpu的选择,也就是说我们需要根据应用程序的cpu来选择软中断需要被处理的cpu。这里做法是当调用recvmsg的时候,应用程序的cpu会被存储在一个hash table中,而索引是根据socket的rxhash进行计算的。而这个rxhash就是RPS中计算得出的那个skb的hash值.

可是这里会有一个问题,那就是当多个线程或者进程读取相同的socket的时候,此时就会导致cpu id不停的变化,从而导致大量的OOO的数据包(这是因为cpu id变化,导致下面软中断不停的切换到不同的cpu,此时就会导致大量的乱序的包).


而RFS是如何解决这个问题的呢,它做了两个表rps_sock_flow_table和rps_dev_flow_table,其中第一个rps_sock_flow_table是一个全局的hash表,这个表针对socket的,映射了socket对应的cpu,这里的cpu就是应用层期待软中断所在的cpu。

点击(此处)折叠或打开

  1. struct rps_sock_flow_table {
  2.     unsigned int mask;
  3.     //hash表
  4.     u16 ents[0];
  5. };


可以看到它有两个域,其中第一个是掩码,用于来计算hash表的索引,而ents就是保存了对应socket的cpu。

然后是rps_dev_flow_table,这个是针对设备的,每个设备队列都含有一个rps_dev_flow_table(这个表主要是保存了上次处理相同链接上的skb所在的cpu),这个hash表中每一个元素包含了一个cpu id,一个tail queue的计数器,这个值是一个很关键的值,它主要就是用来解决上面大量OOO的数据包的问题的,它保存了当前的dev flow table需要处理的数据包的尾部计数。接下来我们会详细分析这个东西。

点击(此处)折叠或打开

  1. struct netdev_rx_queue {
  2.     struct rps_map *rps_map;
  3.     //每个设备的队列保存了一个rps_dev_flow_table
  4.     struct rps_dev_flow_table *rps_flow_table;
  5.     struct kobject kobj;
  6.     struct netdev_rx_queue *first;
  7.     atomic_t count;
  8. } ____cacheline_aligned_in_smp;
  9.  
  10.  
  11. struct rps_dev_flow_table {
  12.     unsigned int mask;
  13.     struct rcu_head rcu;
  14.     struct work_struct free_work;
  15.     //hash表
  16.     struct rps_dev_flow flows[0];
  17. };
  18.  
  19. struct rps_dev_flow {
  20.     u16 cpu;
  21.     u16 fill;
  22.     //tail计数。
  23.     unsigned int last_qtail;
  24. };


首先我们知道,大量的OOO的数据包的引起是因为多个进程同时请求相同的socket,而此时会导致这个socket对应的cpu id不停的切换,然后软中断如果不做处理,只是简单的调度软中断到不同的cpu,就会导致顺序的数据包被分发到不同的cpu,由于是smp,因此会导致大量的OOO的数据包,而在RFS中是这样解决这个问题的,在soft_net中添加了2个域,input_queue_head和input_queue_tail,然后在设备队列中添加了rps_flow_table,而rps_flow_table中的元素rps_dev_flow包含有一个last_qtail,RFS就通过这3个域来控制乱序的数据包。

这里为什么需要3个值呢,这是因为每个cpu上的队列的个数input_queue_tail是一直增加的,而设备每一个队列中的flow table对应的skb则是有可能会被调度到另外的cpu,而dev flow table的last_qtail表示当前的flow table所需要处理的数据包队列(backlog queue)的尾部队列计数,也就是说当input_queue_head大于等于它的时候说明当前的flow table可以切换了,否则的话不能切换到进程期待的cpu。

不过这里还要注意就是最好能够绑定进程到指定的cpu(配合rps和rfs的参数设置),这样的话,rfs和rps的效率会更好,所以我认为像erlang这种在rfs和rps下性能应该提高非常大的.
下面就是softnet_data 的结构。

点击(此处)折叠或打开

  1. struct softnet_data {
  2.     struct Qdisc *output_queue;
  3.     struct Qdisc **output_queue_tailp;
  4.     struct list_head poll_list;
  5.     struct sk_buff *completion_queue;
  6.     struct sk_buff_head process_queue;
  7.  
  8.     /* stats */
  9.     unsigned int processed;
  10.     unsigned int time_squeeze;
  11.     unsigned int cpu_collision;
  12.     unsigned int received_rps;
  13.  
  14. #ifdef CONFIG_RPS
  15.     struct softnet_data *rps_ipi_list;
  16.  
  17.     /* Elements below can be accessed between CPUs for RPS */
  18.     struct call_single_data csd ____cacheline_aligned_in_smp;
  19.     struct softnet_data *rps_ipi_next;
  20.     unsigned int cpu;
  21.     //最关键的两个域
  22.     unsigned int input_queue_head;
  23.     unsigned int input_queue_tail;
  24. #endif
  25.     unsigned dropped;
  26.     struct sk_buff_head input_pkt_queue;
  27.     struct napi_struct backlog;
  28. };


接下来我们来看代码,来看内核是如何实现的,先来看inet_recvmsg,也就是调用rcvmsg时,内核会调用的函数,这个函数比较简单,就是多加了一行代码sock_rps_record_flow,这个函数主要是将本socket和cpu设置到rps_sock_flow_table这个hash表中。

首先要提一下,这里这两个flow table的初始化都是放在sys中初始化的,不过sys部分相关的代码我就不分析了,因为具体的逻辑和原理都是在协议栈部分实现的。

点击(此处)折叠或打开

  1. int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
  2.          size_t size, int flags)
  3. {
  4.     struct sock *sk = sock->sk;
  5.     int addr_len = 0;
  6.     int err;
  7.     //设置hash表
  8.     sock_rps_record_flow(sk);
  9.  
  10.     err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
  11.                    flags & ~MSG_DONTWAIT, &addr_len);
  12.     if (err >= 0)
  13.         msg->msg_namelen = addr_len;
  14.     return err;
  15. }


然后就是rps_record_sock_flow,这个函数主要是得到全局的rps_sock_flow_table,然后调用rps_record_sock_flow来对rps_sock_flow_table进行设置,这里会将socket的sk_rxhash传递进去当作hash的索引,而这个sk_rxhash其实就是skb里面的rxhash,skb的rxhash就是rps中设置的hash值,这个值是根据四元组进行hash的。这里用这个当索引一个是为了相同的socket都能落入一个index。而且下面的软中断上下文也比较容易存取这个hash表。

点击(此处)折叠或打开

  1. struct rps_sock_flow_table *rps_sock_flow_table __read_mostly;
  2. static inline void sock_rps_record_flow(const struct sock *sk)
  3. {
  4. #ifdef CONFIG_RPS
  5.     struct rps_sock_flow_table *sock_flow_table;
  6.  
  7.     rcu_read_lock();
  8.     sock_flow_table = rcu_dereference(rps_sock_flow_table);
  9.     //设置hash表
  10.     rps_record_sock_flow(sock_flow_table, sk->sk_rxhash);
  11.     rcu_read_unlock();
  12. #endif
  13. }


其实所有的事情都是rps_record_sock_flow中做的

点击(此处)折叠或打开

  1. static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
  2.                     u32 hash)
  3. {
  4.     if (table && hash) {
  5.         //获取索引。
  6.         unsigned int cpu, index = hash & table->mask;
  7.  
  8.         /* We only give a hint, preemption can change cpu under us */
  9.         //获取cpu
  10.         cpu = raw_smp_processor_id();
  11.         //保存对应的cpu,如果等于当前cpu,则说明已经设置过了。
  12.         if (table->ents[index] != cpu)
  13.             //否则设置cpu
  14.             table->ents[index] = cpu;
  15.     }
  16. }


上面是进程上下文做的事情,也就是设置对应的进程所期待的cpu,它用的是rps_sock_flow_table,而接下来就是软中断上下文了,rfs这个patch主要的工作都是在软中断上下文做的。不过看这里的代码之前最好能够了解下RPS补丁,因为RFS就是对rps做了一点小改动。

主要是两个函数,第一个是enqueue_to_backlog,这个函数我们知道是用来将skb挂在到对应cpu的input queue上的,这里我们就关注他的一个函数就是input_queue_tail_incr_save,他就是更新设备的input_queue_tail以及softnet_data的input_queue_tail。

点击(此处)折叠或打开

  1. if (skb_queue_len(&sd->input_pkt_queue)) {
  2. enqueue:
  3.             __skb_queue_tail(&sd->input_pkt_queue, skb);
  4.             //这个函数更新对应设备的rps_dev_flow_table中的input_queue_tail以及dev flow table的last_qtail
  5.             input_queue_tail_incr_save(sd, qtail);
  6.             rps_unlock(sd);
  7.             local_irq_restore(flags);
  8.             return NET_RX_SUCCESS;
  9.         }


第二个是get_rps_cpu,这个函数我们知道就是得到软中断应该运行的cpu,这里我们就看RFS添加的部分,这里它是这样计算的,首先会得到两个flow table,一个是sock_flow_table,另一个是设备的rps_flow_table(skb对应的设备队列中对应的flow table),这里的逻辑是这样子的取出来两个cpu,一个是根据rps计算数据包前一次被调度过的cpu(tcpu),一个是应用程序期望的cpu(next_cpu),然后比较这两个值,如果 1 tcpu未设置(等于RPS_NO_CPU) 2 tcpu是离线的 3 tcpu的input_queue_head大于rps_flow_table中的last_qtail 的话就调度这个skb到next_cpu.
而这里第三点input_queue_head大于rps_flow_table则说明在当前的dev flow table中的数据包已经发送完毕,否则的话为了避免乱序就还是继续使用tcpu.

点击(此处)折叠或打开

  1. got_hash:
  2.     flow_table = rcu_dereference(rxqueue->rps_flow_table);
  3.     sock_flow_table = rcu_dereference(rps_sock_flow_table);
  4.     if (flow_table && sock_flow_table) {
  5.         u16 next_cpu;
  6.         struct rps_dev_flow *rflow;
  7.         //得到flow table
  8.         rflow = &flow_table->flows[skb->rxhash & flow_table->mask];
  9.         tcpu = rflow->cpu;
  10.         //得到next_cpu
  11.         next_cpu = sock_flow_table->ents[skb->rxhash &
  12.             sock_flow_table->mask];
  13.  
  14.         //条件
  15.         if (unlikely(tcpu != next_cpu) &&
  16.             (tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||
  17.              ((int)(per_cpu(softnet_data, tcpu).input_queue_head -
  18.               rflow->last_qtail)) >= 0)) {
  19.             //设置tcpu
  20.             tcpu = rflow->cpu = next_cpu;
  21.             if (tcpu != RPS_NO_CPU)
  22.                 //更新last_qtail
  23.                 rflow->last_qtail = per_cpu(softnet_data,
  24.                     tcpu).input_queue_head;
  25.         }
  26.         if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
  27.             *rflowp = rflow;
  28.             //设置返回cpu,以供软中断重新调度
  29.             cpu = tcpu;
  30.             goto done;
  31.         }
  32.     }
  33.     ....................................


最后我们来分析下第一次数据包到达协议栈而应用程序还没有调用rcvmsg读取数据包,此时会发生什么问题,当第一次进来时tcpu是RPS_NO_CPU,并且next_cpu也是RPS_NO_CPU,此时会导致跳过rfs处理,而是直接使用rps的处理,也就是上面代码的紧接着的部分,下面这段代码前面rps时已经分析过了,这里就不分析了。

阅读(1199) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~