Chinaunix首页 | 论坛 | 博客
  • 博客访问: 85700
  • 博文数量: 15
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 210
  • 用 户 组: 普通用户
  • 注册时间: 2014-01-05 15:27
文章分类

全部博文(15)

文章存档

2014年(15)

我的朋友

分类: LINUX

2014-04-19 00:03:04

NAPI是网络收包流程上的一种优化机制,简单而言,就是同时利用中断和轮询的优点,在一个中断中尽量接收多的报文,而不是一个中断接收一个报文。目前主流的网卡驱动都是使用这种机制的,同时内核也保持着对于普通方式的兼容。
对于每一个中断向量,驱动都需要维护一个napi_struct的数据结构:
  1. struct napi_struct {
  2.     /* The poll_list must only be managed by the entity which
  3.      * changes the state of the NAPI_STATE_SCHED bit. This means
  4.      * whoever atomically sets that bit can add this napi_struct
  5.      * to the per-cpu poll_list, and whoever clears that bit
  6.      * can remove from the list right before clearing the bit.
  7.      */
  8.     struct list_head poll_list;

  9.     unsigned long state;
  10.     int weight;
  11.     int (*poll)(struct napi_struct *, int);
  12. #ifdef CONFIG_NETPOLL
  13.     spinlock_t poll_lock;
  14.     int poll_owner;
  15. #endif

  16.     unsigned int gro_count;

  17.     struct net_device *dev;
  18.     struct list_head dev_list;
  19.     struct sk_buff *gro_list;
  20.     struct sk_buff *skb;
  21. };
内核以数据结构net_device表示一个网卡设备,一般的驱动在分配这个数据结构的时候会额外的分配一段空间,作为该设备的private数据,napi_struct一般就包含在这个private数据结构中,以r8169驱动为例:
分配的时候额外分配rtl8169_private数据结构的大小:

  1. static int __devinit
  2. rtl8169_init_one(struct pci_dev *pdev, const struct pci_device_id *ent)
  3. {
  4.     const struct rtl_cfg_info *cfg = rtl_cfg_infos + ent->driver_data;
  5.     const unsigned int region = cfg->region;
  6.     struct rtl8169_private *tp;
  7.     struct mii_if_info *mii;
  8.     struct net_device *dev;
  9.     void __iomem *ioaddr;
  10.     int chipset, i;
  11.     int rc;

  12.     if (netif_msg_drv(&debug)) {
  13.         printk(KERN_INFO "%s Gigabit Ethernet driver %s loaded\n",
  14.                MODULENAME, RTL8169_VERSION);
  15.     }

  16.     dev = alloc_etherdev(sizeof (*tp)); //alloc_etherdev函数本身会包含net_device数据结构的大小
  17. ...
  18. }
  1. struct rtl8169_private {
  2.     void __iomem *mmio_addr; /* memory map physical address */
  3.     struct pci_dev *pci_dev;
  4.     struct net_device *dev;
  5.     struct napi_struct napi;
  6.     spinlock_t lock;
  7.     u32 msg_enable;
  8.     u16 txd_version;
  9.  ...
  10. };
驱动初始化的时候会调用netif_napi_add函数进行注册:
  1. netif_napi_add(dev, &tp->napi, rtl8169_poll, R8169_NAPI_WEIGHT);//限额,默认64
  1. void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
  2.             int (*poll)(struct napi_struct *, int), int weight)
  3. {
  4.     INIT_LIST_HEAD(&napi->poll_list);  //初始化napi_struct的相应分量
  5.     napi->gro_count = 0;
  6.     napi->gro_list = NULL;
  7.     napi->skb = NULL;
  8.     napi->poll = poll;  //回调函数
  9.     napi->weight = weight;
  10.     list_add(&napi->dev_list, &dev->napi_list);
  11.     napi->dev = dev;
  12. #ifdef CONFIG_NETPOLL
  13.     spin_lock_init(&napi->poll_lock);
  14.     napi->poll_owner = -1;
  15. #endif
  16.     set_bit(NAPI_STATE_SCHED, &napi->state);
  17. }
网口打开的时候使能:
  1. napi_enable(&tp->napi);
  1. static inline void napi_enable(struct napi_struct *n)
  2. {
  3.     BUG_ON(!test_bit(NAPI_STATE_SCHED, &n->state));
  4.     smp_mb__before_clear_bit();
  5.     clear_bit(NAPI_STATE_SCHED, &n->state);
  6. }
注意napi_struct数据结构的状态之间的变化
  1. enum {
  2.     NAPI_STATE_SCHED, /* Poll is scheduled */
  3.     NAPI_STATE_DISABLE, /* Disable pending */
  4.     NAPI_STATE_NPSVC, /* Netpoll - don't dequeue from poll_list */
  5. };
报文接收中断发生的时候不再是简单的处理一个报文,而是先关闭中断,然后使用轮询的方式尽量多的处理报文,一般如下调用:
  1. if (status & tp->intr_mask & tp->napi_event) {
  2.             RTL_W16(IntrMask, tp->intr_event & ~tp->napi_event); //关闭相应的中断,不同硬件不一样
  3.             tp->intr_mask = ~tp->napi_event;
  4.             
  5.             if (likely(napi_schedule_prep(&tp->napi))) //判断napi的状态
  6.                 __napi_schedule(&tp->napi);
  7.             else
  8.                 netif_info(tp, intr, dev,
  9.                        "interrupt %04x in poll\n", status);
  10.         }
napi_schedule_prep判断状态,保证只有一个可以往下走:
  1. /**
  2.  * napi_schedule_prep - check if napi can be scheduled
  3.  * @n: napi context
  4.  *
  5.  * Test if NAPI routine is already running, and if not mark
  6.  * it as running. This is used as a condition variable
  7.  * insure only one NAPI poll instance runs. We also make
  8.  * sure there is no pending NAPI disable.
  9.  */
  10. static inline int napi_schedule_prep(struct napi_struct *n)
  11. {
  12.     return !napi_disable_pending(n) &&
  13.         !test_and_set_bit(NAPI_STATE_SCHED, &n->state);
  14. }
__napi_schedule涉及一个percpu的变量softnet_data
  1. void __napi_schedule(struct napi_struct *n)
  2. {
  3.     unsigned long flags;

  4.     local_irq_save(flags);
  5.     ____napi_schedule(&__get_cpu_var(softnet_data), n);
  6.     local_irq_restore(flags);
  7. }
  1. static inline void ____napi_schedule(struct softnet_data *sd,
  2.                      struct napi_struct *napi)
  3. {
  4.     list_add_tail(&napi->poll_list, &sd->poll_list);//把当前网卡对应napi添加到percpu的数据结构的链表中
  5.     __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  6. }
softnet_data定义如下:DEFINE_PER_CPU_ALIGNED(struct softnet_data, softnet_data);
在net_dev_init函数中初始化:
  1. for_each_possible_cpu(i) {
  2.         struct softnet_data *sd = &per_cpu(softnet_data, i);

  3.         memset(sd, 0, sizeof(*sd));
  4.         skb_queue_head_init(&sd->input_pkt_queue);
  5.         skb_queue_head_init(&sd->process_queue);
  6.         sd->completion_queue = NULL;
  7.         INIT_LIST_HEAD(&sd->poll_list);
  8.         sd->output_queue = NULL;
  9.         sd->output_queue_tailp = &sd->output_queue;
  10. #ifdef CONFIG_RPS
  11.         sd->csd.func = rps_trigger_softirq;
  12.         sd->csd.info = sd;
  13.         sd->csd.flags = 0;
  14.         sd->cpu = i;
  15. #endif

  16.         sd->backlog.poll = process_backlog; //no napi驱动共用该处理函数
  17.         sd->backlog.weight = weight_p;  //默认64
  18.         sd->backlog.gro_list = NULL;
  19.         sd->backlog.gro_count = 0;
  20.     }
看一下网卡接收软中断的流程:
static void net_rx_action(struct softirq_action *h)
  1. {
  2.     struct softnet_data *sd = &__get_cpu_var(softnet_data);
  3.     unsigned long time_limit = jiffies + 2; //每次软中断处理2个jiffies,防止一个软中断占用太长时间
  4.     int budget = netdev_budget;  //默认300
  5.     void *have;

  6.     local_irq_disable();

  7.     while (!list_empty(&sd->poll_list)) { //所有网卡对应的napi数据结构都在这个链表中
  8.         struct napi_struct *n;
  9.         int work, weight;

  10.         /* If softirq window is exhuasted then punt.
  11.          * Allow this to run for 2 jiffies since which will allow
  12.          * an average latency of 1.5/HZ.
  13.          */
  14.         if (unlikely(budget <= 0 || time_after(jiffies, time_limit)))
  15.             goto softnet_break;

  16.         local_irq_enable();

  17.         /* Even though interrupts have been re-enabled, this
  18.          * access is safe because interrupts can only add new
  19.          * entries to the tail of this list, and only ->poll()
  20.          * calls can remove this head entry from the list.
  21.          */
  22.         n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);

  23.         have = netpoll_poll_lock(n);

  24.         weight = n->weight;

  25.         /* This NAPI_STATE_SCHED test is for avoiding a race
  26.          * with netpoll's poll_napi(). Only the entity which
  27.          * obtains the lock and sees NAPI_STATE_SCHED set will
  28.          * actually make the ->poll() call. Therefore we avoid
  29.          * accidentally calling ->poll() when NAPI is not scheduled.
  30.          */
  31.         work = 0;
  32.         if (test_bit(NAPI_STATE_SCHED, &n->state)) {
  33.             work = n->poll(n, weight); //调用各个网卡驱动在函数netif_napi_add中注册的poll函数,
  34.             trace_napi_poll(n);
  35.         }
  36.         }

  37.         WARN_ON_ONCE(work > weight);

  38.         budget -= work;

  39.         local_irq_disable();

  40.         /* Drivers must not modify the NAPI state if they
  41.          * consume the entire weight. In such cases this code
  42.          * still "owns" the NAPI instance and therefore can
  43.          * move the instance around on the list at-will.
  44.          */
  45.         if (unlikely(work == weight)) {
  46.             if (unlikely(napi_disable_pending(n))) {
  47.                 local_irq_enable();
  48.                 napi_complete(n);
  49.                 local_irq_disable();
  50.             } else  //网卡驱动消耗了所有的份额,意味着可能还没有处理完,驱动只有在没有完全消耗完份额的情况下才会删除poll_list链表
  51.                 list_move_tail(&n->poll_list, &sd->poll_list);
  52.         }

  53.         netpoll_poll_unlock(have);
  54.     }
  55. out:
  56.     net_rps_action_and_irq_enable(sd);

  57. #ifdef CONFIG_NET_DMA
  58.     /*
  59.      * There may not be any more sk_buffs coming right now, so push
  60.      * any pending DMA copies to hardware
  61.      */
  62.     dma_issue_pending_all();
  63. #endif

  64.     return;

  65. softnet_break:
  66.     sd->time_squeeze++;
  67.     __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  68.     goto out;
  69. }
看r8169的poll函数:
  1. static int rtl8169_poll(struct napi_struct *napi, int budget)
  2. {
  3.     struct rtl8169_private *tp = container_of(napi, struct rtl8169_private, napi);
  4.     struct net_device *dev = tp->dev;
  5.     void __iomem *ioaddr = tp->mmio_addr;
  6.     int work_done;

  7.     work_done = rtl8169_rx_interrupt(dev, tp, ioaddr, (u32) budget); //napi_gro_receive(&tp->napi, skb);
  8.     rtl8169_tx_interrupt(dev, tp, ioaddr);

  9.     if (work_done < budget) {
  10.         napi_complete(napi); //一次操作结束

  11.         /* We need for force the visibility of tp->intr_mask
  12.          * for other CPUs, as we can loose an MSI interrupt
  13.          * and potentially wait for a retransmit timeout if we don't.
  14.          * The posted write to IntrMask is safe, as it will
  15.          * eventually make it to the chip and we won't loose anything
  16.          * until it does.
  17.          */
  18.         tp->intr_mask = 0xffff;
  19.         wmb();
  20.         RTL_W16(IntrMask, tp->intr_event); //重新使能中断
  21.     }

  22.     return work_done;
  23. }
  1. void __napi_complete(struct napi_struct *n)
  2. {
  3.     BUG_ON(!test_bit(NAPI_STATE_SCHED, &n->state));
  4.     BUG_ON(n->gro_list);

  5.     list_del(&n->poll_list);  //删除链表
  6.     smp_mb__before_clear_bit();
  7.     clear_bit(NAPI_STATE_SCHED, &n->state); //清楚标志,一次操作结束
  8. }
  9. void napi_complete(struct napi_struct *n)
  10. {
  11.     unsigned long flags;
  12.     
  13.     /*
  14.      * don't let napi dequeue from the cpu poll list
  15.      * just in case its running on a different cpu
  16.      */
  17.     if (unlikely(test_bit(NAPI_STATE_NPSVC, &n->state)))
  18.         return;
  19.         
  20.     napi_gro_flush(n);  //gro相关
  21.     local_irq_save(flags);
  22.     __napi_complete(n);
  23.     local_irq_restore(flags);
  24. }
总结:
1)使用NAPI机制的驱动会维护一个napi_struct的数据结构,初始化对应的poll函数和份额
2)中断发生的时候,在中断处理程序中把当前网卡对应的napi_struct数据结构添加到percpu的变量中,触发软中断
3)在接收软中断中遍历percpu的链表,触发各个网卡对应的napi_struct数据结构中的poll函数
4)网卡驱动注册的poll函数中完成skb的初始化,然后调用napi_gro_receive上发协议栈


目前绝大部分的网卡驱动都使用NAPI的机制,但是还是有一些驱动没有使用,内核尽量做到兼容。
以loopback为例,接收时网卡驱动调用netif_rx函数:
  1. int netif_rx(struct sk_buff *skb)
  2. {
  3.     int ret;

  4.     /* if netpoll wants it, pretend we never saw it */
  5.     if (netpoll_rx(skb))
  6.         return NET_RX_DROP;

  7.     if (netdev_tstamp_prequeue)
  8.         net_timestamp_check(skb);

  9.     trace_netif_rx(skb);
  10. #ifdef CONFIG_RPS  //先略过RPS机制
  11.     {
  12.         struct rps_dev_flow voidflow, *rflow = &voidflow;
  13.         int cpu;

  14.         preempt_disable();
  15.         rcu_read_lock();

  16.         cpu = get_rps_cpu(skb->dev, skb, &rflow);
  17.         if (cpu < 0)
  18.             cpu = smp_processor_id();

  19.         ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);

  20.         rcu_read_unlock();
  21.         preempt_enable();
  22.     }
  23. #else
  24.     {
  25.         unsigned int qtail;
  26.         ret = enqueue_to_backlog(skb, get_cpu(), &qtail);  
  27.         put_cpu();
  28.     }
  29. #endif
  30.     return ret;
  31. }
  1. static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
  2.                   unsigned int *qtail)
  3. {
  4.     struct softnet_data *sd;
  5.     unsigned long flags;

  6.     sd = &per_cpu(softnet_data, cpu); //NAPI机制的驱动使用独立的napi_struct结构,lo与其他驱动共用percpu变量

  7.     local_irq_save(flags);

  8.     rps_lock(sd);
  9.     if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog) {  //默认为1000
  10.         if (skb_queue_len(&sd->input_pkt_queue)) {  //如果队列中还有数据,意味着上次还没有处理完,不需要再次触发
  11. enqueue:
  12.             __skb_queue_tail(&sd->input_pkt_queue, skb); //把当前skb放入softnet_data对应的队列中
  13.             input_queue_tail_incr_save(sd, qtail);//RPS相关
  14.             rps_unlock(sd);
  15.             local_irq_restore(flags);
  16.             return NET_RX_SUCCESS;
  17.         }

  18.         /* Schedule NAPI for backlog device
  19.          * We can use non atomic operation since we own the queue lock
  20.          */
  21.         if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
  22.             if (!rps_ipi_queued(sd))
  23.                 ____napi_schedule(sd, &sd->backlog); //把softnet_data中的napi_struct加入到链表中,触发软中断
  24.         }
  25.         goto enqueue;
  26.     }

  27.     sd->dropped++;
  28.     rps_unlock(sd);

  29.     local_irq_restore(flags);

  30.     atomic_long_inc(&skb->dev->rx_dropped);
  31.     kfree_skb(skb);
  32.     return NET_RX_DROP;
  33. }
NONAPI共用的poll函数为process_backlog,这个在初始化percpu变量softnet_data的时候赋值:
  1. static int process_backlog(struct napi_struct *napi, int quota)
  2. {
  3.     int work = 0;
  4.     struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

  5. #ifdef CONFIG_RPS
  6.     /* Check if we have pending ipi, its better to send them now,
  7.      * not waiting net_rx_action() end.
  8.      */
  9.     if (sd->rps_ipi_list) {
  10.         local_irq_disable();
  11.         net_rps_action_and_irq_enable(sd);
  12.     }
  13. #endif
  14.     napi->weight = weight_p;
  15.     local_irq_disable();
  16.     while (work < quota) {
  17.         struct sk_buff *skb;
  18.         unsigned int qlen;

  19.         while ((skb = __skb_dequeue(&sd->process_queue))) { //从链表中取出一个skb进行处理
  20.             local_irq_enable();
  21.             __netif_receive_skb(skb);  //上传给协议栈
  22.             local_irq_disable();
  23.             input_queue_head_incr(sd);
  24.             if (++work >= quota) {  //一次处理不完,先返回,让软中断处理函数进行处理,那边会再次进行调度
  25.                 local_irq_enable();
  26.                 return work;
  27.             }
  28.         }

  29.         rps_lock(sd);
  30.         qlen = skb_queue_len(&sd->input_pkt_queue);
  31.         if (qlen)
  32.             skb_queue_splice_tail_init(&sd->input_pkt_queue, //把input_pkt_queue队列上的skb移到process_queue准备处理
  33.                            &sd->process_queue);              //用两个队列的好处是处理和增加可以并发,内核很多地方都使用了这种优化

  34.         if (qlen < quota - work) { //如果一次能够处理完,驱动注册的函数需要删除poll_list
  35.             /*
  36.              * Inline a custom version of __napi_complete().
  37.              * only current cpu owns and manipulates this napi,
  38.              * and NAPI_STATE_SCHED is the only possible flag set on backlog.
  39.              * we can use a plain write instead of clear_bit(),
  40.              * and we dont need an smp_mb() memory barrier.
  41.              */
  42.             list_del(&napi->poll_list);  
  43.             napi->state = 0;

  44.             quota = work + qlen;
  45.         }
  46.         rps_unlock(sd);
  47.     }
  48.     local_irq_enable();
  49.                                                 
  50.     return work;
  51. }
总结:
1)对于非NAPI的驱动,内核做了一些适应使得可以和NAPI的驱动尽量兼容
2)共用percpu变量softnet_data内包含的napi_struct,使用同一个poll函数
3)调用机制和NAPI流程基本一样













阅读(2399) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~