NAPI是网络收包流程上的一种优化机制,简单而言,就是同时利用中断和轮询的优点,在一个中断中尽量接收多的报文,而不是一个中断接收一个报文。目前主流的网卡驱动都是使用这种机制的,同时内核也保持着对于普通方式的兼容。
对于每一个中断向量,驱动都需要维护一个napi_struct的数据结构:
-
struct napi_struct {
-
/* The poll_list must only be managed by the entity which
-
* changes the state of the NAPI_STATE_SCHED bit. This means
-
* whoever atomically sets that bit can add this napi_struct
-
* to the per-cpu poll_list, and whoever clears that bit
-
* can remove from the list right before clearing the bit.
-
*/
-
struct list_head poll_list;
-
-
unsigned long state;
-
int weight;
-
int (*poll)(struct napi_struct *, int);
-
#ifdef CONFIG_NETPOLL
-
spinlock_t poll_lock;
-
int poll_owner;
-
#endif
-
-
unsigned int gro_count;
-
-
struct net_device *dev;
-
struct list_head dev_list;
-
struct sk_buff *gro_list;
-
struct sk_buff *skb;
-
};
内核以数据结构net_device表示一个网卡设备,一般的驱动在分配这个数据结构的时候会额外的分配一段空间,作为该设备的private数据,
napi_struct一般就包含在这个private数据结构中,以r8169驱动为例:
分配的时候额外分配rtl8169_private数据结构的大小:
-
static int __devinit
-
rtl8169_init_one(struct pci_dev *pdev, const struct pci_device_id *ent)
-
{
-
const struct rtl_cfg_info *cfg = rtl_cfg_infos + ent->driver_data;
-
const unsigned int region = cfg->region;
-
struct rtl8169_private *tp;
-
struct mii_if_info *mii;
-
struct net_device *dev;
-
void __iomem *ioaddr;
-
int chipset, i;
-
int rc;
-
-
if (netif_msg_drv(&debug)) {
-
printk(KERN_INFO "%s Gigabit Ethernet driver %s loaded\n",
-
MODULENAME, RTL8169_VERSION);
-
}
-
-
dev = alloc_etherdev(sizeof (*tp)); //alloc_etherdev函数本身会包含net_device数据结构的大小
-
...
-
}
-
struct rtl8169_private {
-
void __iomem *mmio_addr; /* memory map physical address */
-
struct pci_dev *pci_dev;
-
struct net_device *dev;
-
struct napi_struct napi;
-
spinlock_t lock;
-
u32 msg_enable;
-
u16 txd_version;
-
...
-
};
驱动初始化的时候会调用netif_napi_add函数进行注册:
-
netif_napi_add(dev, &tp->napi, rtl8169_poll, R8169_NAPI_WEIGHT);//限额,默认64
-
void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
-
int (*poll)(struct napi_struct *, int), int weight)
-
{
-
INIT_LIST_HEAD(&napi->poll_list); //初始化napi_struct的相应分量
-
napi->gro_count = 0;
-
napi->gro_list = NULL;
-
napi->skb = NULL;
-
napi->poll = poll; //回调函数
-
napi->weight = weight;
-
list_add(&napi->dev_list, &dev->napi_list);
-
napi->dev = dev;
-
#ifdef CONFIG_NETPOLL
-
spin_lock_init(&napi->poll_lock);
-
napi->poll_owner = -1;
-
#endif
-
set_bit(NAPI_STATE_SCHED, &napi->state);
-
}
网口打开的时候使能:
-
static inline void napi_enable(struct napi_struct *n)
-
{
-
BUG_ON(!test_bit(NAPI_STATE_SCHED, &n->state));
-
smp_mb__before_clear_bit();
-
clear_bit(NAPI_STATE_SCHED, &n->state);
-
}
注意napi_struct数据结构的状态之间的变化
-
enum {
-
NAPI_STATE_SCHED, /* Poll is scheduled */
-
NAPI_STATE_DISABLE, /* Disable pending */
-
NAPI_STATE_NPSVC, /* Netpoll - don't dequeue from poll_list */
-
};
报文接收中断发生的时候不再是简单的处理一个报文,而是先关闭中断,然后使用轮询的方式尽量多的处理报文,一般如下调用:
-
if (status & tp->intr_mask & tp->napi_event) {
-
RTL_W16(IntrMask, tp->intr_event & ~tp->napi_event); //关闭相应的中断,不同硬件不一样
-
tp->intr_mask = ~tp->napi_event;
-
-
if (likely(napi_schedule_prep(&tp->napi))) //判断napi的状态
-
__napi_schedule(&tp->napi);
-
else
-
netif_info(tp, intr, dev,
-
"interrupt %04x in poll\n", status);
-
}
napi_schedule_prep判断状态,保证只有一个可以往下走:
-
/**
-
* napi_schedule_prep - check if napi can be scheduled
-
* @n: napi context
-
*
-
* Test if NAPI routine is already running, and if not mark
-
* it as running. This is used as a condition variable
-
* insure only one NAPI poll instance runs. We also make
-
* sure there is no pending NAPI disable.
-
*/
-
static inline int napi_schedule_prep(struct napi_struct *n)
-
{
-
return !napi_disable_pending(n) &&
-
!test_and_set_bit(NAPI_STATE_SCHED, &n->state);
-
}
__napi_schedule涉及一个percpu的变量softnet_data
-
void __napi_schedule(struct napi_struct *n)
-
{
-
unsigned long flags;
-
-
local_irq_save(flags);
-
____napi_schedule(&__get_cpu_var(softnet_data), n);
-
local_irq_restore(flags);
-
}
-
static inline void ____napi_schedule(struct softnet_data *sd,
-
struct napi_struct *napi)
-
{
-
list_add_tail(&napi->poll_list, &sd->poll_list);//把当前网卡对应napi添加到percpu的数据结构的链表中
-
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
-
}
softnet_data定义如下:DEFINE_PER_CPU_ALIGNED(struct softnet_data, softnet_data);
在net_dev_init函数中初始化:
-
for_each_possible_cpu(i) {
-
struct softnet_data *sd = &per_cpu(softnet_data, i);
-
-
memset(sd, 0, sizeof(*sd));
-
skb_queue_head_init(&sd->input_pkt_queue);
-
skb_queue_head_init(&sd->process_queue);
-
sd->completion_queue = NULL;
-
INIT_LIST_HEAD(&sd->poll_list);
-
sd->output_queue = NULL;
-
sd->output_queue_tailp = &sd->output_queue;
-
#ifdef CONFIG_RPS
-
sd->csd.func = rps_trigger_softirq;
-
sd->csd.info = sd;
-
sd->csd.flags = 0;
-
sd->cpu = i;
-
#endif
-
-
sd->backlog.poll = process_backlog; //no napi驱动共用该处理函数
-
sd->backlog.weight = weight_p; //默认64
-
sd->backlog.gro_list = NULL;
-
sd->backlog.gro_count = 0;
-
}
看一下网卡接收软中断的流程:
static void net_rx_action(struct softirq_action *h)
-
{
-
struct softnet_data *sd = &__get_cpu_var(softnet_data);
-
unsigned long time_limit = jiffies + 2; //每次软中断处理2个jiffies,防止一个软中断占用太长时间
-
int budget = netdev_budget; //默认300
-
void *have;
-
-
local_irq_disable();
-
-
while (!list_empty(&sd->poll_list)) { //所有网卡对应的napi数据结构都在这个链表中
-
struct napi_struct *n;
-
int work, weight;
-
-
/* If softirq window is exhuasted then punt.
-
* Allow this to run for 2 jiffies since which will allow
-
* an average latency of 1.5/HZ.
-
*/
-
if (unlikely(budget <= 0 || time_after(jiffies, time_limit)))
-
goto softnet_break;
-
-
local_irq_enable();
-
-
/* Even though interrupts have been re-enabled, this
-
* access is safe because interrupts can only add new
-
* entries to the tail of this list, and only ->poll()
-
* calls can remove this head entry from the list.
-
*/
-
n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
-
-
have = netpoll_poll_lock(n);
-
-
weight = n->weight;
-
-
/* This NAPI_STATE_SCHED test is for avoiding a race
-
* with netpoll's poll_napi(). Only the entity which
-
* obtains the lock and sees NAPI_STATE_SCHED set will
-
* actually make the ->poll() call. Therefore we avoid
-
* accidentally calling ->poll() when NAPI is not scheduled.
-
*/
-
work = 0;
-
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
-
work = n->poll(n, weight); //调用各个网卡驱动在函数netif_napi_add中注册的poll函数,
-
trace_napi_poll(n);
-
}
-
}
-
-
WARN_ON_ONCE(work > weight);
-
-
budget -= work;
-
-
local_irq_disable();
-
-
/* Drivers must not modify the NAPI state if they
-
* consume the entire weight. In such cases this code
-
* still "owns" the NAPI instance and therefore can
-
* move the instance around on the list at-will.
-
*/
-
if (unlikely(work == weight)) {
-
if (unlikely(napi_disable_pending(n))) {
-
local_irq_enable();
-
napi_complete(n);
-
local_irq_disable();
-
} else //网卡驱动消耗了所有的份额,意味着可能还没有处理完,驱动只有在没有完全消耗完份额的情况下才会删除poll_list链表
-
list_move_tail(&n->poll_list, &sd->poll_list);
-
}
-
-
netpoll_poll_unlock(have);
-
}
-
out:
-
net_rps_action_and_irq_enable(sd);
-
-
#ifdef CONFIG_NET_DMA
-
/*
-
* There may not be any more sk_buffs coming right now, so push
-
* any pending DMA copies to hardware
-
*/
-
dma_issue_pending_all();
-
#endif
-
-
return;
-
-
softnet_break:
-
sd->time_squeeze++;
-
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
-
goto out;
-
}
看r8169的poll函数:
-
static int rtl8169_poll(struct napi_struct *napi, int budget)
-
{
-
struct rtl8169_private *tp = container_of(napi, struct rtl8169_private, napi);
-
struct net_device *dev = tp->dev;
-
void __iomem *ioaddr = tp->mmio_addr;
-
int work_done;
-
-
work_done = rtl8169_rx_interrupt(dev, tp, ioaddr, (u32) budget); //napi_gro_receive(&tp->napi, skb);
-
rtl8169_tx_interrupt(dev, tp, ioaddr);
-
-
if (work_done < budget) {
-
napi_complete(napi); //一次操作结束
-
-
/* We need for force the visibility of tp->intr_mask
-
* for other CPUs, as we can loose an MSI interrupt
-
* and potentially wait for a retransmit timeout if we don't.
-
* The posted write to IntrMask is safe, as it will
-
* eventually make it to the chip and we won't loose anything
-
* until it does.
-
*/
-
tp->intr_mask = 0xffff;
-
wmb();
-
RTL_W16(IntrMask, tp->intr_event); //重新使能中断
-
}
-
-
return work_done;
-
}
-
void __napi_complete(struct napi_struct *n)
-
{
-
BUG_ON(!test_bit(NAPI_STATE_SCHED, &n->state));
-
BUG_ON(n->gro_list);
-
-
list_del(&n->poll_list); //删除链表
-
smp_mb__before_clear_bit();
-
clear_bit(NAPI_STATE_SCHED, &n->state); //清楚标志,一次操作结束
-
}
-
void napi_complete(struct napi_struct *n)
-
{
-
unsigned long flags;
-
-
/*
-
* don't let napi dequeue from the cpu poll list
-
* just in case its running on a different cpu
-
*/
-
if (unlikely(test_bit(NAPI_STATE_NPSVC, &n->state)))
-
return;
-
-
napi_gro_flush(n); //gro相关
-
local_irq_save(flags);
-
__napi_complete(n);
-
local_irq_restore(flags);
-
}
总结:
1)使用NAPI机制的驱动会维护一个napi_struct的数据结构,初始化对应的poll函数和份额
2)中断发生的时候,在中断处理程序中把当前网卡对应的
napi_struct数据结构添加到percpu的变量中,触发软中断
3)在接收软中断中遍历percpu的链表,触发各个网卡对应的napi_struct数据结构中的poll函数
4)网卡驱动注册的poll函数中完成skb的初始化,然后调用napi_gro_receive上发协议栈
目前绝大部分的网卡驱动都使用NAPI的机制,但是还是有一些驱动没有使用,内核尽量做到兼容。
以loopback为例,接收时网卡驱动调用netif_rx函数:
-
int netif_rx(struct sk_buff *skb)
-
{
-
int ret;
-
-
/* if netpoll wants it, pretend we never saw it */
-
if (netpoll_rx(skb))
-
return NET_RX_DROP;
-
-
if (netdev_tstamp_prequeue)
-
net_timestamp_check(skb);
-
-
trace_netif_rx(skb);
-
#ifdef CONFIG_RPS //先略过RPS机制
-
{
-
struct rps_dev_flow voidflow, *rflow = &voidflow;
-
int cpu;
-
-
preempt_disable();
-
rcu_read_lock();
-
-
cpu = get_rps_cpu(skb->dev, skb, &rflow);
-
if (cpu < 0)
-
cpu = smp_processor_id();
-
-
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
-
-
rcu_read_unlock();
-
preempt_enable();
-
}
-
#else
-
{
-
unsigned int qtail;
-
ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
-
put_cpu();
-
}
-
#endif
-
return ret;
-
}
-
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
-
unsigned int *qtail)
-
{
-
struct softnet_data *sd;
-
unsigned long flags;
-
-
sd = &per_cpu(softnet_data, cpu); //NAPI机制的驱动使用独立的napi_struct结构,lo与其他驱动共用percpu变量
-
-
local_irq_save(flags);
-
-
rps_lock(sd);
-
if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog) { //默认为1000
-
if (skb_queue_len(&sd->input_pkt_queue)) { //如果队列中还有数据,意味着上次还没有处理完,不需要再次触发
-
enqueue:
-
__skb_queue_tail(&sd->input_pkt_queue, skb); //把当前skb放入softnet_data对应的队列中
-
input_queue_tail_incr_save(sd, qtail);//RPS相关
-
rps_unlock(sd);
-
local_irq_restore(flags);
-
return NET_RX_SUCCESS;
-
}
-
-
/* Schedule NAPI for backlog device
-
* We can use non atomic operation since we own the queue lock
-
*/
-
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
-
if (!rps_ipi_queued(sd))
-
____napi_schedule(sd, &sd->backlog); //把softnet_data中的napi_struct加入到链表中,触发软中断
-
}
-
goto enqueue;
-
}
-
-
sd->dropped++;
-
rps_unlock(sd);
-
-
local_irq_restore(flags);
-
-
atomic_long_inc(&skb->dev->rx_dropped);
-
kfree_skb(skb);
-
return NET_RX_DROP;
-
}
NONAPI共用的poll函数为process_backlog,这个在初始化percpu变量softnet_data的时候赋值:
-
static int process_backlog(struct napi_struct *napi, int quota)
-
{
-
int work = 0;
-
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
-
-
#ifdef CONFIG_RPS
-
/* Check if we have pending ipi, its better to send them now,
-
* not waiting net_rx_action() end.
-
*/
-
if (sd->rps_ipi_list) {
-
local_irq_disable();
-
net_rps_action_and_irq_enable(sd);
-
}
-
#endif
-
napi->weight = weight_p;
-
local_irq_disable();
-
while (work < quota) {
-
struct sk_buff *skb;
-
unsigned int qlen;
-
-
while ((skb = __skb_dequeue(&sd->process_queue))) { //从链表中取出一个skb进行处理
-
local_irq_enable();
-
__netif_receive_skb(skb); //上传给协议栈
-
local_irq_disable();
-
input_queue_head_incr(sd);
-
if (++work >= quota) { //一次处理不完,先返回,让软中断处理函数进行处理,那边会再次进行调度
-
local_irq_enable();
-
return work;
-
}
-
}
-
-
rps_lock(sd);
-
qlen = skb_queue_len(&sd->input_pkt_queue);
-
if (qlen)
-
skb_queue_splice_tail_init(&sd->input_pkt_queue, //把input_pkt_queue队列上的skb移到process_queue准备处理
-
&sd->process_queue); //用两个队列的好处是处理和增加可以并发,内核很多地方都使用了这种优化
-
-
if (qlen < quota - work) { //如果一次能够处理完,驱动注册的函数需要删除poll_list
-
/*
-
* Inline a custom version of __napi_complete().
-
* only current cpu owns and manipulates this napi,
-
* and NAPI_STATE_SCHED is the only possible flag set on backlog.
-
* we can use a plain write instead of clear_bit(),
-
* and we dont need an smp_mb() memory barrier.
-
*/
-
list_del(&napi->poll_list);
-
napi->state = 0;
-
-
quota = work + qlen;
-
}
-
rps_unlock(sd);
-
}
-
local_irq_enable();
-
-
return work;
-
}
总结:
1)对于非NAPI的驱动,内核做了一些适应使得可以和NAPI的驱动尽量兼容
2)共用percpu变量softnet_data内包含的napi_struct,使用同一个poll函数
3)调用机制和NAPI流程基本一样
阅读(2399) | 评论(0) | 转发(0) |