Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4518254
  • 博文数量: 252
  • 博客积分: 5347
  • 博客等级: 大校
  • 技术积分: 13838
  • 用 户 组: 普通用户
  • 注册时间: 2009-09-30 10:13
文章分类
文章存档

2022年(12)

2017年(11)

2016年(7)

2015年(14)

2014年(20)

2012年(9)

2011年(20)

2010年(153)

2009年(6)

分类: 网络与安全

2015-02-15 11:30:24

netdev_rx_queue表示对应的接收队列,很多网卡硬件上已经支持多个队列,此时就会有多个netdev_rx_queue队列,这个结构是挂在net_device,初始化接收队列的函数:netif_alloc_rx_queues

netif_alloc_rx_queues

  1. static int netif_alloc_rx_queues(struct net_device *dev)
  2. {
  3. /*获取接收队列的个数*/
  4.     unsigned int i, count = netdev_extended(dev)->rps_data.num_rx_queues;
  5.     struct netdev_rx_queue *rx;

  6.     BUG_ON(count < 1);
  7.  /*分配netdev_rx_queue 空间*/
  8.     rx = kcalloc(count, sizeof(struct netdev_rx_queue), GFP_KERNEL);
  9.     if (!rx) {
  10.         pr_err("netdev: Unable to allocate %u rx queues.\n", count);
  11.         return -ENOMEM;
  12.     }
  13.  /* netdev_rx_queue 和net_device关联起来。*/
  14.     netdev_extended(dev)->rps_data._rx = rx;
  15.  /*对netdev_rx_queue 中net_device进行赋值操作*/
  16.     for (i = 0; i < count; i++)
  17.         rx[i].dev = dev;
  18.     return 0;
  19. }

  20.     struct netdev_rx_queue {
  21.     /*保存当前队列的rps map*/
  22.     struct rps_map *rps_map;
  23. /* //每个设备的队列保存了一个rps_dev_flow_table */    
  24. struct rps_dev_flow_table *rps_flow_table;
  25. //对应的kobject
  26.     struct kobject kobj;
  27. /*所属的net_device*/
  28.     struct net_device *dev;
  29. } ____cacheline_aligned_in_smp;


  30. struct rps_map {
  31. /*CPU的个数,也就是CPU数组的个数*/
  32.     unsigned int len;
  33.     struct rcu_head rcu;
  34. /*保存了CPU的ID*/
  35.     u16 cpus[0];
  36. };

get_rps_cpu

  1. static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
  2.          struct rps_dev_flow **rflowp)
  3. {
  4.     struct ipv6hdr *ip6;
  5.     struct iphdr *ip;
  6.     struct netdev_rx_queue *rxqueue;
  7.     struct rps_map *map;
  8.     struct rps_dev_flow_table *flow_table;
  9.     struct rps_sock_flow_table *sock_flow_table;
  10.     struct netdev_rps_info *rpinfo = &netdev_extended(dev)->rps_data;
  11.     int cpu = -1;
  12.     int tcpu;
  13.     u8 ip_proto;
  14.     u32 addr1, addr2, ports, ihl;

  15.     rcu_read_lock();

  16.     if (skb_rx_queue_recorded(skb)) {
  17. /*获取设备对应的rx队列。*/
  18.         u16 index = skb_get_rx_queue(skb);
  19.         if (unlikely(index >= rpinfo->num_rx_queues)) {
  20.             WARN_ONCE(rpinfo->num_rx_queues > 1, "%s received packet "
  21.                 "on queue %u, but number of RX queues is %u\n",
  22.                 dev->name, index, rpinfo->num_rx_queues);
  23.             goto done;
  24.         }
  25.         rxqueue = rpinfo->_rx + index;
  26.     } else
  27.         rxqueue = rpinfo->_rx;

  28.     if (!rxqueue->rps_map && !rxqueue->rps_flow_table)
  29.         goto done;

  30.     if (skb->rxhash) //如果硬件已经计算过,则直接跳过,不需要计算HASH值
  31.         goto got_hash; /* Skip hash computation on packet header */

  32.     switch (skb->protocol) { /*根据不同的IP协议获取源IP和目的IP*/
  33.     case __constant_htons(ETH_P_IP):
  34.         if (!pskb_may_pull(skb, sizeof(*ip)))
  35.             goto done;

  36.         ip = (struct iphdr *) skb->data;
  37.         ip_proto = ip->protocol;
  38.         addr1 = ip->saddr;
  39.         addr2 = ip->daddr;
  40.         ihl = ip->ihl;
  41.         break;
  42.     case __constant_htons(ETH_P_IPV6):
  43.         if (!pskb_may_pull(skb, sizeof(*ip6)))
  44.             goto done;

  45.         ip6 = (struct ipv6hdr *) skb->data;
  46.         ip_proto = ip6->nexthdr;
  47.         addr1 = ip6->saddr.s6_addr32[3];
  48.         addr2 = ip6->daddr.s6_addr32[3];
  49.         ihl = (40 >> 2);
  50.         break;
  51.     default:
  52.         goto done;
  53.     }
  54.     ports = 0;
  55.     switch (ip_proto) {
  56.     case IPPROTO_TCP:
  57.     case IPPROTO_UDP:
  58.     case IPPROTO_DCCP:
  59.     case IPPROTO_ESP:
  60.     case IPPROTO_AH:
  61.     case IPPROTO_SCTP:
  62.     case IPPROTO_UDPLITE:
  63.         if (pskb_may_pull(skb, (ihl * 4) + 4))
  64.             ports = *((u32 *) (skb->data + (ihl * 4))); /*获取四层协议的端口号,tcp头的前4个字节就是源和目的端口,因此这里跳过ip头得到tcp头的前4个字节*/
  65.         break;

  66.     default:
  67.         break;
  68.     }
  69.  /*根据获取到的SIP和DIP,PORT计算HSAH值,*/
  70.     skb->rxhash = jhash_3words(addr1, addr2, ports, hashrnd) >> 16;
  71.     if (!skb->rxhash)
  72.         skb->rxhash = 1;

  73. got_hash:
  74. /* rps_sock_flow_table和rps_dev_flow_table 是为了解决RFS而添加的两张表,rps_sock_flow_table是一个全局的hash表,这个表针对socket的,映射了socket对应的cpu,这里的cpu就是应用层期待软中断所在的cpu ,rps_dev_flow_table,这个是针对设备的,每个设备队列都含有一个rps_dev_flow_table(这个表主要是保存了上次处理相同链接上的skb所在的cpu),这个hash表中每一个元素包含了一个cpu id,一个tail queue的计数器*/
  75.     flow_table = rcu_dereference(rxqueue->rps_flow_table);
  76.     sock_flow_table = rcu_dereference(rps_sock_flow_table);
  77.     if (flow_table && sock_flow_table) {
  78.         u16 next_cpu;
  79.         struct rps_dev_flow *rflow;

  80.         rflow = &flow_table->flows[skb->rxhash & flow_table->mask];
  81.         tcpu = rflow->cpu;

  82.         next_cpu = sock_flow_table->ents[skb->rxhash &
  83.          sock_flow_table->mask];

  84.         /*首先会得到两个flow table,一个是sock_flow_table,另一个是设备的rps_flow_table(skb对应的设备队列中对应的flow table),这里的逻辑是这样子的取出来两个cpu,一个是根据rps计算数据包前一次被调度过的cpu(tcpu),一个是应用程序期望的cpu(next_cpu),然后比较这两个值,如果 1 tcpu未设置(等于RPS_NO_CPU) 2 tcpu是离线的 3 tcpu的input_queue_head大于rps_flow_table中的last_qtail 的话就调度这个skb到next_cpu.而这里第三点input_queue_head大于rps_flow_table则说明在当前的dev flow table中的数据包已经发送完毕,否则的话为了避免乱序就还是继续使用tcpu
  85.          * If the desired CPU (where last recvmsg was done) is
  86.          * different from current CPU (one in the rx-queue flow
  87.          * table entry), switch if one of the following holds:
  88.          * - Current CPU is unset (equal to RPS_NO_CPU).
  89.          * - Current CPU is offline.
  90.          * - The current CPU's queue tail has advanced beyond the
  91.          * last packet that was enqueued using this table entry.
  92.          * This guarantees that all previous packets for the flow
  93.          * have been dequeued, thus preserving in order delivery.
  94.          */
  95.         if (unlikely(tcpu != next_cpu) &&
  96.          (tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||
  97.          ((int)(per_cpu(softnet_data, tcpu).input_queue_head -
  98.          rflow->last_qtail)) >= 0)) {
  99.             tcpu = rflow->cpu = next_cpu;
  100.             if (tcpu != RPS_NO_CPU)
  101.                 rflow->last_qtail = per_cpu(softnet_data,
  102.                  tcpu).input_queue_head;
  103.         }
  104.         if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
  105.             *rflowp = rflow;
  106.  /*设置返回的CPU*/
  107.             cpu = tcpu;
  108.             goto done;
  109.         }
  110.     }
  111. /*当第一次进来时tcpu是RPS_NO_CPU,并且next_cpu也是RPS_NO_CPU,此时会导致跳过rfs处理,而是直接使用rps的处理, */
  112.     map = rcu_dereference(rxqueue->rps_map);
  113.     if (map) {
  114.         tcpu = map->cpus[((u32) (skb->rxhash * map->len)) >> 16];
  115. /*如果cpu是online的,则返回计算出的这个cpu */
  116.         if (cpu_online(tcpu)) {
  117.             cpu = tcpu;
  118.             goto done;
  119.         }
  120.     }

  121. done:
  122.     rcu_read_unlock();
  123.     return cpu;
  124. }

  125. /*将skb挂在到对应cpu的input queue上的, enqueue_to_backlog接受一个skb和cpu为参数,通过cpu来判断skb如何处理。要么加入所属的input_pkt_queue中,要么schecule 软中断*/

enqueue_to_backlog

  1. static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
  2.              unsigned int *qtail)
  3. {
  4.     struct softnet_data *queue;
  5.     unsigned long flags;
  6.    /*根据传递过来的CPU,获取softnet_data结构体*/
  7.     queue = &per_cpu(softnet_data, cpu);

  8.     local_irq_save(flags);
  9.     __get_cpu_var(netdev_rx_stat).total++;

  10.     spin_lock(&queue->input_pkt_queue.lock);
  11.     if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
  12.         if (queue->input_pkt_queue.qlen) {
  13. enqueue:/*将数据包添加到input_pkt_queue队列中*/
  14.             __skb_queue_tail(&queue->input_pkt_queue, skb);
  15.             *qtail = queue->input_queue_head +
  16.                  queue->input_pkt_queue.qlen;

  17.             spin_unlock_irqrestore(&queue->input_pkt_queue.lock,
  18.              flags);
  19.             return NET_RX_SUCCESS;
  20.         }

  21.         /* Schedule NAPI for backlog device 可以调度软中断*/
  22.         if (napi_schedule_prep(&queue->backlog)) {
  23.             if (cpu != smp_processor_id()) {/*判断该SKB是否该CPU处理*/
  24.                 struct rps_remote_softirq_cpus *rcpus =
  25.                  &__get_cpu_var(rps_remote_softirq_cpus);

  26.                 cpu_set(cpu, rcpus->mask[rcpus->select]);
  27.                 __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  28.             } else
  29. /*应该当前cpu处理,则直接schedule 软中断,这里可以看到传递进去的是backlog */
  30.                 ____napi_schedule(queue, &queue->backlog);
  31.         }
  32.         goto enqueue;
  33.     }

  34.     spin_unlock(&queue->input_pkt_queue.lock);

  35.     __get_cpu_var(netdev_rx_stat).dropped++;
  36.     local_irq_restore(flags);

  37.     kfree_skb(skb);
  38.     return NET_RX_DROP;
  39. }

enqueue_to_backlog

  1. static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
  2.              unsigned int *qtail)
  3. {
  4.     struct softnet_data *queue;
  5.     unsigned long flags;
  6.    /*根据传递过来的CPU,获取softnet_data结构体*/
  7.     queue = &per_cpu(softnet_data, cpu);

  8.     local_irq_save(flags);
  9.     __get_cpu_var(netdev_rx_stat).total++;

  10.     spin_lock(&queue->input_pkt_queue.lock);
  11.     if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
  12.         if (queue->input_pkt_queue.qlen) {
  13. enqueue:/*将数据包添加到input_pkt_queue队列中*/
  14.             __skb_queue_tail(&queue->input_pkt_queue, skb);
  15.             *qtail = queue->input_queue_head +
  16.                  queue->input_pkt_queue.qlen;

  17.             spin_unlock_irqrestore(&queue->input_pkt_queue.lock,
  18.              flags);
  19.             return NET_RX_SUCCESS;
  20.         }

  21.         /* Schedule NAPI for backlog device 可以调度软中断*/
  22.         if (napi_schedule_prep(&queue->backlog)) {
  23.             if (cpu != smp_processor_id()) {/*判断该SKB是否该CPU处理*/
  24.                 struct rps_remote_softirq_cpus *rcpus =
  25.                  &__get_cpu_var(rps_remote_softirq_cpus);

  26.                 cpu_set(cpu, rcpus->mask[rcpus->select]);
  27.                 __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  28.             } else
  29. /*应该当前cpu处理,则直接schedule 软中断,这里可以看到传递进去的是backlog */
  30.                 ____napi_schedule(queue, &queue->backlog);
  31.         }
  32.         goto enqueue;
  33.     }

  34.     spin_unlock(&queue->input_pkt_queue.lock);

  35.     __get_cpu_var(netdev_rx_stat).dropped++;
  36.     local_irq_restore(flags);

  37.     kfree_skb(skb);
  38.     return NET_RX_DROP;
  39. }

inet_recvmsg

  1. int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
  2.          size_t size, int flags)
  3. {
  4.     struct sock *sk = sock->sk;
  5.     int addr_len = 0;
  6.     int err;

  7.     inet_rps_record_flow(sk);//设置HASH表

  8.     err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
  9.                  flags & ~MSG_DONTWAIT, &addr_len);
  10.     if (err >= 0)
  11.         msg->msg_namelen = addr_len;
  12.     return err;
  13. }

这个函数主要是得到全局的rps_sock_flow_table,然后调用rps_record_sock_flow来对rps_sock_flow_table进行设置,这里会将socket的sk_rxhash传递进去当作hash的索引,而这个sk_rxhash其实就是skb里面的rxhash,skb的rxhash就是rps中设置的hash值,这个值是根据四元组进行hash的。这里用这个当索引一个是为了相同的socket都能落入一个index。而且下面的软中断上下文也比较容易存取这个hash表

inet_rps_record_flow

点击(此处)折叠或打开

  1. static inline void inet_rps_record_flow(struct sock *sk)
  2. {
  3.     struct rps_sock_flow_table *sock_flow_table;

  4.     rcu_read_lock();
  5.     sock_flow_table = rcu_dereference(rps_sock_flow_table);
  6.     rps_record_sock_flow(sock_flow_table, inet_sk_rxhash(sk));
  7.     rcu_read_unlock();
  8. }

rps_record_sock_flow

点击(此处)折叠或打开

  1. static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
  2.                     u32 hash)
  3. {
  4.     if (table && hash) {
  5. /*获取索引*/
  6.         unsigned int cpu, index = hash & table->mask;

  7.         /* We only give a hint, preemption can change cpu under us 获取CPU */
  8.         cpu = raw_smp_processor_id();
  9.  /*保存对应的cpu,如果等于当前cpu,则说明已经设置过了*/
  10.         if (table->ents[index] != cpu)
  11.             table->ents[index] = cpu;
  12.     }
  13. }


    图:内核代码流程
阅读(6704) | 评论(0) | 转发(4) |
给主人留下些什么吧!~~