Chinaunix首页 | 论坛 | 博客
  • 博客访问: 568881
  • 博文数量: 117
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 359
  • 用 户 组: 普通用户
  • 注册时间: 2011-03-13 21:58
个人简介

爱上香烟

文章分类

全部博文(117)

文章存档

2018年(3)

2017年(8)

2016年(65)

2015年(41)

我的朋友

分类: LINUX

2016-04-15 10:32:25

原文地址:RPS和RFS实现分析 作者:playmud

本文档的Copyleft归wwwlkk所有,使用GPL发布,可以自由拷贝、转载,转载时请保持文档的完整性,严禁用于任何商业用途。

E-mail:

来源:


(1)单CPU的硬中断

执行流程:

IRQi :进入中断号是i的硬中断处理程序。最先执行的指令是把当前寄存器的内容保存在内核堆栈中。

iret:从硬中断中返回,最后将会恢复寄存器的内容。

中断是可以嵌套执行的:

IRQi

IRQj

iret

iret

注意:中断信号(指令)是可以嵌套执行,但是,每个中断处理程序都是不同的,在中断处理程序中有可能关闭中断,或者使用自旋锁等措施来禁止中断的嵌套执行。

以上的内容在《linux中断分析》中有较详细的分析。

(2)e1000网卡硬件中断处理程序

下面分析一下e1000类型的网卡的硬中断处理程序。

注册并分配中断号:

e1000_open(structnet_device *netdev)

err= e1000_request_irq(adapter);

request_irq(adapter->pdev->irq,handler, irq_flags, netdev->name, netdev);

中断处理程序:

1e1000_intr()

2 ew32(IMC,~0);关闭网卡中断

3 napi_schedule_prep(&adapter->napi)测试NAPI是否已经运行

4 !test_and_set_bit(NAPI_STATE_SCHED,&n->state)测试并设置NAPI运行位

如果NAPI未运行

6 __napi_schedule(&adapter->napi);加入本CPU的软中断处理队列

如果NAPI已经运行

直接返回

1—2的程序段之间有可能产生多个中断信号,有可能出现如下的情况:

IRQEi //收到第一个中断信号

e1000_intr()

1—2之间运行

IRQEi //收到第二个中断信号

e1000_intr()

ew32(IMC,~0);关闭网卡中断

执行napi_schedule_prep(&adapter->napi)

!test_and_set_bit(NAPI_STATE_SCHED,&n->state)设置NAPI运行位

__napi_schedule(&adapter->napi);

iret

第一个中断服务例程继续运行

!test_and_set_bit(NAPI_STATE_SCHED,&n->state)测试NAPI运行位

NAPI已经运行

直接返回//第一个中断信号触发的中断处理例程将会直接返回

iret

从这里得出的结论:

  1. 在关闭中断前,可能已经产生多个网卡中断,但是,最终只在其中一个中断处理例程中执行__napi_schedule(&adapter->napi)

  2. 结论1也同样适用于多个CPU

(3)单CPU的软中断

软中断的入口点是do_softirq(),内核代码中有多个点会进入do_softirq()

1.local_bh_enable()

2.irq_exit()

3.ksoftirq/n内核线程被唤醒时

以上3个是最重要的入口点。

1do_softirq(void)

2 if(in_interrupt())

3 return;如果已经在硬中断或者软中断中,直接返回。

4 local_irq_save(flags);保存IF标志并屏蔽可编程中断

5 __do_softirq();

5.1 pending= local_softirq_pending();

6 __local_bh_disable()软中断计数器加1

6.1 set_softirq_pending(0);

7 local_irq_enable();打开本地CPU可编程中断

8 local_irq_disable();屏蔽本地CPU可编程中断

9 _local_bh_enable()软中断计数器减1

10 local_irq_restore(flags);打开本地CPU可编程中断,并恢复IF标志

从这段代码中可以肯定:

如果某个内核路径已经在6—9这段代码中执行(软中断计数器已经加1),此时,其它内核路径,执行到2时将会直接返回。

由于中断上下文(包括软中断)中是禁止进程切换,如果有2个内核路径进入do_softirq(),那么后一个必定是在硬中断处理程序中进入do_softirq(),又由于4—7之间是屏蔽硬中断,那么这段代码是不会被硬中断打断,也就是说,不会被其它内核路径抢占。

4—7之间不会发生抢占(包括进程抢占和硬中断),这段代码内执行了__local_bh_disable()软中断计数器加1,在7—9这段代码内,不会发生进程抢占,但是可能发生硬中断,在这之后执行的硬中断服务程序,可能进入do_softirq(),此时已经可以判断in_interrupt()必定返回1,也就可以断定在7—9这段代码内执行的硬中断服务程序在进入do_softirq()时便会立即返回。也就是说,在一个CPU内不会有两个内核路径在执行4—9之间的代码。

好了,分析了这么多就是为了得出一个关键的结论:在一个CPU上,软中断服务例程必定是串行执行(这里的例程指的是h->action(h))

注意:这里分析的是4—9这段代码,但是,对于具体的h->action(h)内部有具体的实现。

(4)NET_RX_SOFTIRQ软中断服务例程

下面分析一个具体的action-------NET_RX_SOFTIRQ

软中断例程注册流程:

net_dev_init(void)

open_softirq(NET_RX_SOFTIRQ,net_rx_action);

net_rx_action内部流程:

net_rx_action(structsoftirq_action *h)

structsoftnet_data *sd = &__get_cpu_var(softnet_data)

local_irq_disable();

while(!list_empty(&sd->poll_list)) {

structnapi_struct *n;

local_irq_enable();

n= list_first_entry(&sd->poll_list, struct napi_struct,poll_list);

work= n->poll(n, weight);

}

上关键代码出现了一个关键的数据结构—-CPU变量&__get_cpu_var(softnet_data)

CPU变量的分配:

DEFINE_PER_CPU_ALIGNED(structsoftnet_data, softnet_data);

其中第二个参数是数组名

structsoftnet_data {

structQdisc *output_queue;

structQdisc **output_queue_tailp;

structlist_head poll_list; struct napi_struct结构链表

structsk_buff *completion_queue;

structsk_buff_head process_queue;

/*stats */

unsignedint processed;

unsignedint time_squeeze;

unsignedint cpu_collision;

unsignedint received_rps;

#ifdefCONFIG_RPS

structsoftnet_data *rps_ipi_list;

/*Elements below can be accessed between CPUs for RPS */

structcall_single_data csd ____cacheline_aligned_in_smp;

structsoftnet_data *rps_ipi_next;

unsignedint cpu;

unsignedint input_queue_head;

unsignedint input_queue_tail;

#endif

unsigned dropped;

structsk_buff_head input_pkt_queue;

structnapi_struct backlog;

};

structnapi_struct {

structlist_head poll_list;连接structnapi_struct结构链表

unsignedlong state;

int weight;

int (*poll)(structnapi_struct *, int);执行函数

#ifdefCONFIG_NETPOLL

spinlock_t poll_lock;

int poll_owner;

#endif

unsignedint gro_count;

structnet_device *dev;

structlist_head dev_list;

structsk_buff *gro_list;

structsk_buff *skb;

};

net_rx_action内部流程就是执行挂接到poll_list链表的structnapi_struct中的回调函数poll

(5)e1000软中断服务例程

下面分析一下网卡e1000驱动程序如何接收数据包。(使用NAPI)

从上边硬中断已经分析到,e1000硬中断处理函数最终执行__napi_schedule(adapter->napi)

__napi_schedule(structnapi_struct *n)

____napi_schedule(&__get_cpu_var(softnet_data),n);

list_add_tail(&napi->poll_list,&sd->poll_list); 加入轮询队列

__raise_softirq_irqoff(NET_RX_SOFTIRQ);触发rx软中断

这个函数功能很简单,就是将structnapi_struct加入到每CPU变量softnet_datapoll_list,并唤醒NET_RX_SOFTIRQ软中断。

那么structnapi_struct中的poll将指向什么函数?

poll函数注册流程:

e1000_probe(structpci_dev *pdev, const struct pci_device_id *ent)

netif_napi_add(netdev,&adapter->napi,e1000_clean, 64);

现在可以知道poll指向e1000_clean()

e1000_clean(structnapi_struct *napi, int budget)

adapter->clean_rx(adapter,&adapter->rx_ring[0], &work_done, budget);

根据不同的情况clean_rx指向不同的函数,注册流程:

e1000_configure_rx(structe1000_adapter *adapter)

if(adapter->netdev->mtu > ETH_DATA_LEN) {

rdlen= adapter->rx_ring[0].count *

sizeof(struct e1000_rx_desc);

adapter->clean_rx= e1000_clean_jumbo_rx_irq;

adapter->alloc_rx_buf= e1000_alloc_jumbo_rx_buffers;

}else {

rdlen= adapter->rx_ring[0].count *

sizeof(struct e1000_rx_desc);

adapter->clean_rx= e1000_clean_rx_irq;

adapter->alloc_rx_buf= e1000_alloc_rx_buffers;

}

正常情况下是调用e1000_clean_rx_irq()

e1000_clean_rx_irq()

skb= buffer_info->skb;

buffer_info->skb= NULL;

e1000_receive_skb(adapter,status, rx_desc->special, skb);

netif_receive_skb(skb);

现在看到函数最终调用netif_receive_skb(skb)来接收数据包。

这里还可以看到明确一点,在执行netif_receive_skb(skb)时,这个skb离开了e1000网卡的接收环。


在网卡硬中断处理程序中关闭了网卡中断,在软中断处理例程结束时将会打开网卡中断:

e1000_clean(structnapi_struct *napi, int budget)-->

e1000_irq_enable(adapter)-->

ew32(IMS,IMS_ENABLE_MASK);打开网卡中断

(6)多CPU下的e1000网卡数据包接收流程

上面主要是分析单个CPU的情况,下面开始分析多个CPU下的情况。

先来分析一下中断信号是如何分发的,这里涉及到一个组件:APIC(可编程中断控制器)。

每个CPU都有一个本地的APIC,本地APIC都连接到一个外部的I/OAPIC,设备的IRQ线连接到I/OAPICI/OAPIC接收到中断信号,根据一定的算法,再将中断分配给其中某个本地APIC。也可以通过配置指定某个中断分配给某个CPU

CPU收到中断后就会执行相应的中断处理程序(这部分的内容在前面的《linux中断分析》中有较详细的分析)。

这里我们需要明确一点:任何时刻,每个CPU上运行的进程都是不同的,中断处理程序,包括硬中断和软中断,使用的是本地CPU上运行进程的内核栈,于是可以看到,不同CPU上的中断处理程序使用不同的内核栈,这给并发创造了很好的条件。

注意:如何并发还是要看具体的中断处理程序,也许中断处理程序中加入了一些自旋锁阻止了并发运行。

现在分析一下e1000网卡在多CPU下的NAPI,首先分析一个硬中断处理例程:

1e1000_intr()-->

2 ew32(IMC,~0);关闭网卡中断-->

3 napi_schedule_prep(&adapter->napi)测试NAPI是否已经运行-->

4 !test_and_set_bit(NAPI_STATE_SCHED,&n->state)测试并设置NAPI

如果NAPI未运行-->

6 __napi_schedule(&adapter->napi);加入本地CPU的软中断处理队列

NAPI已经运行

直接返回


在执行了ew32(IMC,~0)之后将不会产生网卡中断,直到软中断处理例程轮询结束的时候,才打开网卡中断。

但是,在未关闭网卡中断前,可能发生了多次的网卡中断,并且可能多个CPU都收到了中断信号,则多个CPU都将进入中断处理程序e1000_intr()

注意到4行的原子位测试和设置函数test_and_set_bit(NAPI_STATE_SCHED,&n->state),我们可以看到,只能有一个CPU测试到NAPI未运行,其它CPU将测试到NAPI已经运行,也就是说,只能有一个CPU运行__napi_schedule(&adapter->napi),在这个CPU的软中断中将会执行数据包的接收和发送例程e1000_clean()


e1000_clean(structnapi_struct *napi, int budget)

napi_complete(napi);

__napi_complete(n);

clear_bit(NAPI_STATE_SCHED,&n->state);

从上面这段流程可以看出,直到软中断轮询结束后才会清零NAPI_STATE_SCHED标志位,其它CPU才可能执行__napi_schedule(&adapter->napi)


到这里可以得到一个重要的结论:e1000网卡NAPI轮询函数e1000_clean()只能在一个CPU上执行。

这样就没法充分利用多个CPU,由于e1000网卡只有一个接收队列(接收环),同一时刻也只能有一个CPU操作这个接收队列,所以在驱动程序层面上是不好实现并发执行。

(7)RPS实现分析

但是,注意到数据包的接收,转发等工作都是在中断上下文中(包括硬中断和软中断),e1000网卡的软中断处理例程最终是调用netif_receive_skb(skb)接收数据包:

e1000_clean_rx_irq()-->

e1000_receive_skb(adapter,status, rx_desc->special, skb);-->

netif_receive_skb(skb);

netif_receive_skb()之后还有很多处理工作,也是在软中断上下文中,而且处理工作是和进程无关的。这样就可以把后续的处理工作移到其它的CPU上执行,这也是网络协议栈的软中断的负载均衡的基本思想(分析到这里总算进入了网络协议栈的软中断的负载均衡问题了)。

下面开始分析RPS的实现流程。

netif_receive_skb(structsk_buff *skb)

cpu= get_rps_cpu(skb->dev, skb, &rflow);根据数据包选择一个CPU

enqueue_to_backlog(skb,cpu, &rflow->last_qtail);

sd= &per_cpu(softnet_data, cpu);得到每CPU变量softnet_data

__skb_queue_tail(&sd->input_pkt_queue,skb);将数据包加入接收队列

____napi_schedule(sd,&sd->backlog);启动backlog,结构是napi_struct

这里的backlog指向process_backlog,注册流程如下:

net_dev_init(void)-->

structsoftnet_data *sd = &per_cpu(softnet_data,i);初始化所有的CPU的变量softnet_data-->

sd->backlog.poll= process_backlog;

函数流程如下:

process_backlog(structnapi_struct *napi, int quota)

skb_queue_splice_tail_init(&sd->input_pkt_queue,&sd->process_queue);

while((skb = __skb_dequeue(&sd->process_queue))){

local_irq_enable();

__netif_receive_skb(skb);处理后续工作

local_irq_disable();

input_queue_head_incr(sd);

if(++work >= quota) {

local_irq_enable();

returnwork;

}

}

现在总结RPS的实现流程:将数据包加入其它CPU的接收队列sd->input_pkt_queue,并激活其它CPUNAPI结构sd->backlog,则其它CPU将会在自己的软中断中执行process_backlogprocess_backlog将会接收sd->input_pkt_queue队列中的所有数据包,并调用__netif_receive_skb()执行后续工作。

(8)防止CPU的cache频繁刷新

不能随意选择CPU,为了降低CPU硬件高速缓存的刷新频率,需要把特征相似的数据包分配给同一个CPU处理。

首先分析一下CPU硬件缓存(cache)的实现原理,实现原理并不复杂,如图1所示:

1内存与直接映射cache的映射

这是最简单的一种cache方式----------直接映射cache

先看一下本例中cache存储器的布局:

  1. 一共可以存储4KB的主存空间,每一行存储4个字的主存空间(16字节)。

  2. 每一行的标签字段对应物理地址的1231位。

  3. 索引字段对应物理地址的411位,第一行的索引号是0x000,第二行的索引号是0x010,第n行的索引号是0xn0

  4. 每一行对应16字节的连续主存空间。

  5. 每个内存地址都唯一对应cache存储器的一个行,事实上主存中的每个字节都唯一对应cache存储器中的一个字节。

  6. 多个主存地址会同时对应一个cache行。


处理器如何使用cache存储器:

处理器要访问一个内存块,首先判断内存块是否已经在cache存储器中,如果存在则直接从cache存储器中获得数据,如果不存在则从内存中获得数据,并刷新对应的cache行。

如何确定是否已经在cache存储器:

  1. 首先根据物理地址的411位获得cache行(行号是0xn0其中n411位的值)。

  2. 判断cache行标签值是否和物理地址的1231位相同,如果相同,命中,如果不相同,不命中。

  3. 如果命中,根据物理地址的03位,在cache行中获得对应的数据。

例如,物理地址0x0000824,根据41182,获得cache行索引820,判断820cache行的标签是否等于0x0000,如果相等,根据03位的值4,起始位置是第5个字节,获得数据word1(每个word4个字节)。

这里注意到,如果某个物理地址在cache存储器中不命中,那么会刷新整个cache行的数据(一共16个字节),这样就降低了CPU的性能,所以在程序里面要尽量减少cache不命中的几率。

注意:cache行的v标志:标记当前cache行包含最近从主存中获取的数据。cache行的d标志:该cache行与主存中的内容不一致。


当命中一个cache行时,高速缓存控制器会进行不同的操作,具体取决于存取的类型。如果是读取操作,控制器从cache行中选择数据并送到CPU寄存器中,不需要访问主存。如果是写操作,控制器可能使用两种不同的策略,分别称为通写和回写。

通写:控制器总是既写主存又写cache行。

回写:只更新cache行,不更新主存,只有当CPU执行一条要求要刷新cache行的指令时,或者一个FLUSH硬件信号产生时,才将cache行中的数据写回主存。

上面的cache实现原理是最简单的一种,还有很多复杂的实现方式,但是基本原理应该是相同的。


多处理器系统的每个处理器都有一个单独的硬件高速缓存,如果其中一个CPU修改了自己的硬件高速缓存,它就必须检查同样的数据是否包含在其它CPU的硬件高速缓存,如果存在,它必须通知其它CPU更新硬件高速缓存。当然,这一切都是由硬件来处理的。


数据包的处理有这样的特点:两个特征(ip,端口等)相似的数据包,在处理过程中,访问相同的主存区域的交集应该越大,这样就可以减少硬件高速缓存的刷新频率,所以在RPS会根据数据包的特征选择CPU,具体的实现函数是:

staticint get_rps_cpu(struct net_device *dev, struct sk_buff *skb, structrps_dev_flow **rflowp)

未使用RFS(需要通过sysctl设置RFS)时的选择算法:

skb->rxhash= jhash_3words(addr1, addr2, ports.v32, hashrnd);

if(!skb->rxhash)

skb->rxhash= 1;

map= rcu_dereference(rxqueue->rps_map);

if(map) {

tcpu= map->cpus[((u64) skb->rxhash * map->len) >> 32];

if(cpu_online(tcpu)) {

cpu= tcpu;

gotodone;

}

}

其中map结构如图2所示,这个函数将会在下面的RFS中做较详细的分析。


不仅由于cache缓存的问题,还有一些其它的原因需要把特征相同的数据包分配给同一个CPU处理,例如:

TCPIP包的分段重组问题,一旦乱序就要重传,这种情况下,一个linux主机如果只是作为一台路由器的话,那么进入系统的一个TCP包的不同分段如果被不同的cpu处理并向一个网卡转发了,那么同步问题会很麻烦的,如果你不做同步处理,那么很可能后面的段被一个cpu先发出去了,那么在真正的接收方接收到乱序的包后就会请求重发,这是不希望的,因此还是一个cpu串行处理好。


(9)Receiveflow steering (RFS)分析

RFS是在RPS上的改进,通过RPS已经可以把同一流的数据包分发给同一个CPU核来处理了,但是有可能出现这样的情况,即给该数据流分发的CPU核和执行处理该数据流的应用程序的CPU核不是同一个。

不仅要把同一流的数据包分发给同一个CPU核来处理,还要分发给其‘被期望’的CPU核来处理就是RFS需要解决的问题。

RFS会创建两个与数据包相关信息的CPU核映射hash表:

1.一个用于表示期望处理具有该类相关信息数据包的CPU核映射,通过recvmsg()sendmsg()等系统调用信息来创建该hash表(称之为期望CPU表)。比如运行于CPU0核上的某应用程序调用了recvmsg()从远程机器host1上获取数据,那么NIC对从host1上发过来的数据包的分发期望CPU核就是CPU0

2.一个用于表示最近处理过具有该类相关信息数据包的CPU核映射,称这种表为当前CPU表。该表的存在是因为有多线程的情况,比如运行在两个CPU核上的多线程程序(每个核运行一个线程)交替调用recvmsg()系统函数从同一个socket上获取远程机器host1上的数据会导致期望CPU表频繁更改。如果数据包的分发仅由期望CPU表决定则会导致数据包交替分发到这两个CPU核上,很明显,这不是我们想要的效果。

使用以下算法选择CPU

1.如果当前CPU表对应表项未设置或者当前CPU表对应表项映射的CPU核处于离线状态,那么使用期望CPU表对应表项映射的CPU核。

2.如果当前CPU表对应表项映射的CPU核和期望CPU表对应表项映射的CPU核为同一个,那么好办,就使用这一个核。

3.如果当前CPU表对应表项映射的CPU核和期望CPU表对应表项映射的CPU核不为同一个,那么:

a)如果同一流的前一段数据包未处理完,那么必须使用当前CPU表对应表项映射的CPU核,以避免乱序。

b)如果同一流的前一段数据包已经处理完,那么则可以使用期望CPU表对应表项映射的CPU核。


下面分析一下内核是如何实现以上的CPU选择算法。

当前CPU表:flow_table= rcu_dereference(rxqueue->rps_flow_table);

期望CPU表:sock_flow_table= rcu_dereference(rps_sock_flow_table);

每个网卡设备都有一个当前CPU表,结构如图2所示:

2当前CPU

整个系统只有一个期望CPU表,结构如图3所示:

3期望CPU

两个表的结构虽然不同,但是使用的方法是相同的:

  1. 计算数据包的哈希值skb->rxhash= jhash_3words(addr1, addr2, ports.v32, hashrnd);

  2. 如果是当前CPU表,选择的CPUflows[skb->rxhash& flow_table->mask].cpu

  3. 如果是期望CPU表,选择的CPUents[skb->rxhash& sock_flow_table->mask]

下面是选择算法的实现,其中tcpu是当前CPUnext_cpu是期望CPU

if(unlikely(tcpu != next_cpu) && // 当前CPU和期望CPU不相同

(tcpu == RPS_NO_CPU || !cpu_online(tcpu)//当前CPU表未设置或者当前CPU处于离线状态

((int)(per_cpu(softnet_data, tcpu).input_queue_head-

rflow->last_qtail))>= 0)) { //或者前一段数据包已经处理完毕

tcpu= rflow->cpu = next_cpu;//则选择期望的CPU

if(tcpu != RPS_NO_CPU)

rflow->last_qtail= per_cpu(softnet_data,

tcpu).input_queue_head;

}

if(tcpu != RPS_NO_CPU && cpu_online(tcpu)) {

*rflowp= rflow;

cpu= tcpu;

gotodone;

}

这段代码涉及到两个统计量input_queue_headlast_qtail,其意义如下:

process_backlog(structnapi_struct *napi, int quota)

__netif_receive_skb(skb);

input_queue_head_incr(sd);

sd->input_queue_head++;

由以上流程可知统计量input_queue_head表示已经处理完的的数据包个数。


intnetif_receive_skb(struct sk_buff *skb)

cpu= get_rps_cpu(skb->dev, skb, &rflow);

enqueue_to_backlog(skb,cpu, &rflow->last_qtail);

input_queue_tail_incr_save(sd,qtail);

*qtail= ++sd->input_queue_tail;

由以上流程可知统计量last_qtail表示一共要处理多少个数据包。


那么input_queue_head>= last_qtail则表示前面接收到的数据包已经处理完毕了。


现在可以知道rps_sock_flow_table表就是为了让内核知道应用程序是在哪个CPU上运行,那么内核和应用程序是如何进行通信的,下面分析这个过程。

首先明确两个流程:

内核接收数据包流程判断是否存在对应的sock,如果存在,将数据包skb加入sock的接收队列中,到此内核处理完毕,整个处理过程都在软中断中进行。

应用程序接收数据包的流程:使用read()或者其它函数,陷入内核态,执行对应的系统调用服务例程,系统调用服务例程从sock的接收队列中读取数据,并返回,如果sock中没有数据包,进程可能被挂起。

(以上的两个过程在实现分析>中有详细的讲解)


下面这段是内核的一个接收流程:

tcp_v4_rcv(structsk_buff *skb)

tcp_v4_do_rcv(sk,skb);

sock_rps_save_rxhash(sk,skb->rxhash);修改sock的哈希值

其中staticinline void sock_rps_save_rxhash(struct sock *sk, u32 rxhash)

{

if(unlikely(sk->sk_rxhash != rxhash)){如果哈希值不相同

sock_rps_reset_flow(sk);

sk->sk_rxhash= rxhash; 赋予相同的哈希值

}

}

在这个流程中sock->sk_rxhash将会被修改为最新的值,以上的流程与进程无关的,在协议栈软中断中进行。


下面是应用程序的一个接收流程:

read()陷入内核态,并执行相应的系统调用服务例程。

inet_recvmsg()

sock_rps_record_flow()

sock_flow_table= rcu_dereference(rps_sock_flow_table);

rps_record_sock_flow(sock_flow_table,sk->sk_rxhash);

其中staticinline void rps_record_sock_flow(structrps_sock_flow_table *table,u32 hash)

{

if(table && hash) {

unsignedint cpu, index = hash & table->mask;

cpu= raw_smp_processor_id();//获取当前CPU

if(table->ents[index] != cpu)

table->ents[index]= cpu;注册CPU

}

}

每次用户程序读取数据包都会更新rps_sock_flow_table表,保证其中的CPU号是最新的。


第一个流程保证应用程序和内核会得到相同的哈希值。

第二个流程保证内核会得到最新的CPU号。


需要这样流程,原因有以下2点:

  1. 某些网络协议在通信的过程中数据包对应的哈希值有可能改变,那么应用程序就可以通过sock->sk_rxhash获得最新的哈希值。

  2. 应用程序在哪个CPU上运行也不是固定的,如果应用程序的运行CPU改变了,可以通过更新rps_sock_flow_table表来告知内核。


到现在RPSRFS的实现基本分析完毕,当然还有很多东西没有分析:

比如:

  1. rxqueue->rps_flow_table表是如何初始化的,这就涉及到sysfs文件系统,在《用户空间和内核通信》中有提到,但是没有详细的分析。

  2. rps_sock_flow_table表默认是空的,需要通过sysctl进行配置,sysctl的实现在《用户空间和内核通信》有较详细的分析。

  3. 在多CPU下进程是如何调度的,还没分析过,但是,网络协议栈的软中断处理是和进程调度无关的,在软中断处理例程中将会禁止本地CPU的进程调度。

阅读(1553) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~