vhost前后端通知机制场景分析
——lvyilong316
这篇文章主要详细的分析一下vhost的通知机制,前面的一些文章陆续也对前后端的通知有所介绍,这里相当于算是一次汇总吧。
所谓前后端通知,必然涉及两个方向:前端通知后端,后端通知前端。而我们知道vhost有txq和rxq,对于每种queue都伴随有这两种通知。而通知方式又根据是否支持event_idx有着不同的实现,最后virtio1.1引入的packed ring后,通知相对split ring又有不同。下面我们以txq,rxq的两个方向共四种情况来分析前后端的通知实现。其中前端以kernel4.9 virtio_net实现为例分析,后端以dpdk
18.11 vhost_user实现分析。在展开前后端通知分析前,我们先了解两个背景知识:前端中断处理函数的注册和后端vhost_user的kick方式。
前端中断处理函数注册
后端对前端的通知,是以中断方式传递到前端的。分析通知的接收处理就少不了要了解这些中断处理函数。所以我们先看一下前端是怎么注册中断处理函数的。这些需要从virtio_net的加载函数virtnet_probe说起,具体如下图所示。
我们知道virtio设备分为morden和lagecy两种,我们以morden设备为例。对于morden设备,会调用virtio_pci_modern_probe初始化config
ops:
vp_dev->vdev.config =
&virtio_pci_config_ops;
其find_vqs函数对应为vp_modern_find_vqs。vp_modern_find_vqs 其中主要调用vq_find_vqs函数。vp_find_vqs函数完成队列中断处理函数的初始化,根据设备对中断的支持,分为以下三种情况:
(1) 所有txq,rxq以及ctrlq都共享一个中断处理;
(2) ctrlq单独使用一个中断处理,其他txq和rxq共享一个中断处理;
(3) 可以每个queue(包含txq,rxq以及ctrlq)各一个中断处理;
l vp_find_vqs (kernel 4.9)
-
/* the config->find_vqs() implementation */
-
int vp_find_vqs(struct virtio_device *vdev, unsigned nvqs,
-
struct virtqueue *vqs[],
-
vq_callback_t *callbacks[],
-
const char * const names[])
-
{
-
int err;
-
-
/* Try MSI-X with one vector per queue. */
-
err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, true, true);
-
if (!err)
-
return 0;
-
/* Fallback: MSI-X with one vector for config, one shared for queues. */
-
err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names,
-
true, false);
-
if (!err)
-
return 0;
-
/* Finally fall back to regular interrupts. */
-
return vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names,
-
false, false);
-
}
函数首先尝试方式(3)每个queue各一个中断处理,如果失败再尝试方式(2),如果再失败就只能使用方式(1)了。
我们看到每种方式都是调用同一个函数vp_try_to_find_vqs,只是传入的参数不同。那我们就来看下这个函数的主要内容。
l vp_try_to_find_vqs
-
static int vp_try_to_find_vqs(struct virtio_device *vdev, unsigned nvqs,
-
struct virtqueue *vqs[],
-
vq_callback_t *callbacks[],
-
const char * const names[],
-
bool use_msix,
-
bool per_vq_vectors)
-
{
-
struct virtio_pci_device *vp_dev = to_vp_device(vdev);
-
u16 msix_vec;
-
int i, err, nvectors, allocated_vectors;
-
-
vp_dev->vqs = kmalloc(nvqs * sizeof *vp_dev->vqs, GFP_KERNEL);
-
if (!vp_dev->vqs)
-
return -ENOMEM;
-
-
if (!use_msix) {
-
/* 方式1,所有txq,rxq以及ctrlq都共享一个中断处理 */
-
/* Old style: one normal interrupt for change and all vqs. */
-
err = vp_request_intx(vdev);
-
if (err)
-
goto error_find;
-
} else {
-
if (per_vq_vectors) {
-
/* Best option: one for change interrupt, one per vq. */
-
/* 方式3,可以每个queue一个中断处理 */
-
nvectors = 1;
-
for (i = 0; i < nvqs; ++i)
-
if (callbacks[i])
-
++nvectors;
-
} else {
-
/* Second best: one for change, shared for all vqs. */
-
/* 方式2,ctrlq一个中断,其他txq和rxq共享一个中断处理 */
-
nvectors = 2;
-
}
-
-
err = vp_request_msix_vectors(vdev, nvectors, per_vq_vectors);
-
if (err)
-
goto error_find;
-
}
-
-
vp_dev->per_vq_vectors = per_vq_vectors;
-
allocated_vectors = vp_dev->msix_used_vectors;
-
for (i = 0; i < nvqs; ++i) { /* 方式3的处理 */
-
if (!names[i]) {
-
vqs[i] = NULL;
-
continue;
-
} else if (!callbacks[i] || !vp_dev->msix_enabled)
-
msix_vec = VIRTIO_MSI_NO_VECTOR;
-
else if (vp_dev->per_vq_vectors)
-
msix_vec = allocated_vectors++;
-
else
-
msix_vec = VP_MSIX_VQ_VECTOR;
-
vqs[i] = vp_setup_vq(vdev, i, callbacks[i], names[i], msix_vec);
-
if (IS_ERR(vqs[i])) {
-
err = PTR_ERR(vqs[i]);
-
goto error_find;
-
}
-
-
if (!vp_dev->per_vq_vectors || msix_vec == VIRTIO_MSI_NO_VECTOR)
-
continue;
-
-
/* allocate per-vq irq if available and necessary */
-
snprintf(vp_dev->msix_names[msix_vec],
-
sizeof *vp_dev->msix_names,
-
"%s-%s",
-
dev_name(&vp_dev->vdev.dev), names[i]);
-
err = request_irq(vp_dev->msix_entries[msix_vec].vector,
-
vring_interrupt, 0,
-
vp_dev->msix_names[msix_vec],
-
vqs[i]);
-
if (err) {
-
vp_del_vq(vqs[i]);
-
goto error_find;
-
}
-
}
-
return 0;
-
-
error_find:
-
vp_del_vqs(vdev);
-
return err;
-
}
其中vp_request_intx函数完成方式1的处理,具体就是通过request_irq注册中断处理函数vp_interrupt;vp_request_msix_vectors函数完成方式2的处理,其中调用request_irq给ctrlq注册中断处理函数为vp_config_changed(方式3的ctrlq中断处理也是这里注册),调用request_irq给数据queue(txq,rxq)注册中断处理函数为vp_vring_interrupt;而方式3的剩余处理在本函数的后半部分,为每个数据queue调用request_irq注册中断处理函数vring_interrupt。如果查看代码会发现方式2数据queue共享的中断处理vp_vring_interrupt函数中也是通过遍历所有queue调用vring_interrupt实现的。所以我们重点关注vring_interrupt函数的实现,这是数据queue中断处理的核心。
l vring_interrupt
-
irqreturn_t vring_interrupt(int irq, void *_vq)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
-
if (!more_used(vq)) { /* 如果没有更新uesd desc则不需要特殊处理直接返回 */
-
pr_debug("virtqueue interrupt with no work for %p\n", vq);
-
return IRQ_NONE;
-
}
-
-
if (unlikely(vq->broken))
-
return IRQ_HANDLED;
-
-
pr_debug("virtqueue callback for %p (%p)\n", vq, vq->vq.callback);
-
if (vq->vq.callback)
-
vq->vq.callback(&vq->vq);
-
-
return IRQ_HANDLED;
-
}
-
-
static inline bool more_used(const struct vring_virtqueue *vq)
-
{
-
return vq->last_used_idx != virtio16_to_cpu(vq->vq.vdev, vq->vring.used->idx);
-
}
如果more_used返回false表示vq->last_used_idx==
vring.used->idx,这说明当前没有uesd desc需要更新处理,所以中断直接返回。否则就调用对应queue的callback函数。而callback函数在之前virtnet_find_vqs的调用中被设置。rxq和txq的callback分别注册为了skb_recv_done和skb_xmit_done。
callbacks[rxq2vq(i)] = skb_recv_done;
callbacks[txq2vq(i)] = skb_xmit_done;
所以接收队列和发送队列的中断处理主要就是分别调用skb_recv_done和skb_xmit_done函数。
后端vhost_user的kick方式
下面我们再看下后端vhost_user是如果kick前端的。首先是split ring的情况。
l vhost_vring_call_split (dpdk 1811)
-
static __rte_always_inline void
-
vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
-
{
-
/* Flush used->idx update before we read avail->flags. */
-
rte_smp_mb();
-
-
/* Don't kick guest if we don't reach index specified by guest. */
-
if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
-
uint16_t old = vq->signalled_used;
-
uint16_t new = vq->last_used_idx;
-
-
if (vhost_need_event(vhost_used_event(vq), new, old)
-
&& (vq->callfd >= 0)) {
-
vq->signalled_used = vq->last_used_idx;
-
eventfd_write(vq->callfd, (eventfd_t) 1);
-
}
-
} else {
-
/* Kick the guest if necessary. */
-
if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
-
&& (vq->callfd >= 0))
-
eventfd_write(vq->callfd, (eventfd_t)1);
-
}
-
}
split的方式比较简单,当开启event_idx的时候,就根据old,new以及前端消耗到的位置avail->ring[(vr)->size]来判断是否kick;如果没有开启event_idx时则只要前端没有设置VRING_AVAIL_F_NO_INTERRUPT就kick前端,注意不开启event_idx时,后端也不是无脑kick的,只有前端没设置VRING_AVAIL_F_NO_INTERRUPT时才会kick,至于前端什么时候设置VRING_AVAIL_F_NO_INTERRUPT,一会再分析。
下面我们看packed ring的kick前端处理方式。在介绍packed处理之前我们先看其相关结构。
-
struct vhost_virtqueue {
-
union {
-
struct vring_desc *desc;
-
struct vring_packed_desc *desc_packed;
-
};
-
union {
-
struct vring_avail *avail;
-
struct vring_packed_desc_event *driver_event;
-
};
-
union {
-
struct vring_used *used;
-
struct vring_packed_desc_event *device_event;
-
};
-
……
-
};
在packed 方式中,由于uesd ring和avail ring不再需要,取而代之的是两个desc_event结构。在split方式中我们知道前后端控制是否相互通知是通过avail->flag和uesd->flag的设置来完成的,但packed中分别是通过driver_event和device_event来完成的。
其中driver_event是后端只读的,是前端控制后端更新uesd desc时是否发送通知的,对应于split 方式的avail->flag;
device_event是前端只读的,是后端控制前端更新avail desc时是否发送通知的,对应于split方式的uesd->flag。
driver_event和device_event都是vring_packed_desc_event结构,其具体结构如下。
-
struct vring_packed_desc_event {
-
uint16_t off_wrap;
-
uint16_t flags;
-
};
其中flag可以取三个值分别为:
#define VRING_EVENT_F_ENABLE 0x0
#define VRING_EVENT_F_DISABLE 0x1
#define VRING_EVENT_F_DESC 0x2
driver_event取值VRING_EVENT_F_DISABLE相当于split方式avail->flag设置VRING_AVAIL_F_NO_INTERRUPT,device_event取值VRING_EVENT_F_DISABLE相当于uesd->flag设置VRING_USED_F_NO_NOTIFY。
剩下关键的就是最后这个VRING_EVENT_F_DESC
flag了,官方spec是这么解释这个flag的: Enable events for a specific descriptor,(as specified by
Descriptor Ring Change Event Offset/Wrap Counter),Only valid if
VIRTIO_F_RING_EVENT_IDX has been negotiated. 可以看出这个flag的作用是指定某一个desc发生变化后触发通知,而这个flag生效的前提就是开启了event_idx。这个解释似乎还是不太直观,其实我们可以对比split 的event_idx处理方式来理解。我们知道split方式中,如果开启了event_idx,则前端需要通过avail->ring的最后一个desc告诉后端前端的uesd desc处理到哪里了,后端根据这个值来决定是否需要kick前端;而后端则使用uesd->ring的最后一个desc告诉前端后端的avail desc处理到哪里了,前端根据这个来决定是否来通知后端。但是在packed方式中没有了avail ring和uesd ring,那如果开启了event_idx前端后端如何才能告知对方自己处理到什么位置了呢?答案就是通过这里的off_wrap成员,可以看到这个成员是一个uint16_t,其中后15位指定了前后端处理到什么位置了,而最高位是为了解决翻转的Wrap Counter,而整个off_wrap字段有意义的前提就是flag被设置为了VRING_EVENT_F_DESC。
下面我们对比packed和split方式后端是否开启通知,来了解下vring_packed_desc_event这三个flag的作用。首先看split的开关中断处理,即vhost_enable_notify_split :
l vhost_enable_notify_split(dpdk 1811)
-
static inline void
-
vhost_enable_notify_split(struct virtio_net *dev,
-
struct vhost_virtqueue *vq, int enable)
-
{
-
/* 没有开启EVENT_IDX以vq->used->flags为准 */
-
if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) {
-
if (enable)
-
vq->used->flags &= ~VRING_USED_F_NO_NOTIFY;
-
else
-
vq->used->flags |= VRING_USED_F_NO_NOTIFY;
-
} else {/* 开启EVENT_IDX后不再使用used->flags */
-
if (enable)
-
vhost_avail_event(vq) = vq->last_avail_idx;
-
}
-
}
可以看到在没有开启EVENT_IDX时,控制guest是否通知后端是用过控制vq->used->flags设置VRING_USED_F_NO_NOTIFY来实现的,但是如果开启了EVENT_IDX就不再使用used->flags,而是使用EVENT_IDX特有中断限速方式。
下面再看packed的guest通知开启关闭方式实现vhost_enable_notify_packed。
l vhost_enable_notify_packed(dpdk 1811)
-
static inline void
-
vhost_enable_notify_packed(struct virtio_net *dev,
-
struct vhost_virtqueue *vq, int enable)
-
{
-
uint16_t flags;
-
-
if (!enable) {
-
vq->device_event->flags = VRING_EVENT_F_DISABLE;
-
return;
-
}
-
-
flags = VRING_EVENT_F_ENABLE;
-
if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
-
flags = VRING_EVENT_F_DESC;
-
vq->device_event->off_wrap = vq->last_avail_idx |
-
vq->avail_wrap_counter << 15;
-
}
-
-
rte_smp_wmb();
-
-
vq->device_event->flags = flags;
-
}
当不开启event_idx时,packed 方式使用device_event->flags是否设置VRING_EVENT_F_DISABLE代替split 方式设置的VRING_USED_F_NO_NOTIFY。如果开启了event_idx则device_event->flags一定会被设置为VRING_EVENT_F_DESC的,另外注意device_event->off_wrap的初始设置,低15位被设置为vq->last_avail_idx,高位被设置为vq->avail_wrap_counter。
最后我们看一下packed方式下后端是如何决定是否通知前端的,即vhost_vring_call_packed函数的实现。
l vhost_vring_call_packed(dpdk 1811)
-
static __rte_always_inline void
-
vhost_vring_call_packed(struct virtio_net *dev, struct vhost_virtqueue *vq)
-
{
-
uint16_t old, new, off, off_wrap;
-
bool signalled_used_valid, kick = false;
-
-
/* Flush used desc update. */
-
rte_smp_mb();
-
/* 如果没有开启EVENT_IDX, 则以dev->driver_event->flags是否设置VRING_EVENT_F_DISABLE为准确定是否通知前端 */
-
if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) {
-
if (vq->driver_event->flags !=
-
VRING_EVENT_F_DISABLE)
-
kick = true;
-
goto kick;
-
}
-
/* old 表示上一次通知前端时的used idx,new表示当前的uesd idx */
-
old = vq->signalled_used;
-
new = vq->last_used_idx;
-
vq->signalled_used = new; //注意和split的区别,split是kick前端后才进行signalled_used赋值,而这里是直接赋值
-
signalled_used_valid = vq->signalled_used_valid;
-
vq->signalled_used_valid = true;
-
/* 如果开启了event_idx但是driver_event->flags没有设置VRING_EVENT_F_DESC(正常情况是不存在的)则按照不开启event_idx时处理,及根据VRING_EVENT_F_DISABLE决定是否kick */
-
if (vq->driver_event->flags != VRING_EVENT_F_DESC) {
-
if (vq->driver_event->flags != VRING_EVENT_F_DISABLE)
-
kick = true;
-
goto kick;
-
}
-
-
if (unlikely(!signalled_used_valid)) {
-
kick = true;
-
goto kick;
-
}
-
-
rte_smp_rmb();
-
-
off_wrap = vq->driver_event->off_wrap;
-
off = off_wrap & ~(1 << 15); /* 从低15位获取desc的idx */
-
-
if (new <= old)
-
old -= vq->size;
-
/* 根据最高位的warp counter决定是否翻转 */
-
if (vq->used_wrap_counter != off_wrap >> 15)
-
off -= vq->size;
-
/* off表示前端当前处理的位置,根据off,new,old决定是否kick前端 */
-
if (vhost_need_event(off, new, old))
-
kick = true;
-
kick:
-
if (kick)
-
eventfd_write(vq->callfd, (eventfd_t)1);
-
}
在不开启event_idx的时候,和开启event_idx但是driver_event->flags !=
VRING_EVENT_F_DESC时(正常情况不存在)都是按照前端是否设置VRING_EVENT_F_DISABLE来决定的。
下面一点和split略有不同,就是vq->signalled_used赋值的位置,split是kick前端后条件满足时才进行signalled_used赋值,而这里是直接赋值,并且引入了一个signalled_used_valid变量。首先明确一下,这个变化和packed本身无关,其实split在后续patch也采用了类似处理(详见:http://mails.dpdk.org/archives/dev/2019-March/126684.html )。其中vq->signalled_used_valid的引入是为了在热迁移后以及前端驱动reload后,将vq->signalled_used的值标记为无效,正常情况vq->signalled_used记录的是上次通知前端时后端处理到的uesd idx,但是当热迁移发生或者前端驱动reload后这个值将不再有意义。signalled_used_valid是用在第一次更新used ring、virtio-net driver
reload和live migration之后,所以在ring初始化的时候、guest发送VHOST_USER_GET_VRING_BASE的时候才赋值为false。
另外将vq->signalled_used赋值位置前移,而不是只有满足kick条件才赋值,这个改动是为了一个优化。之前产生interrupt的条件是: last_used_idx – event_idx <= last_used_idx – signalled_used (event_idx为前端更新到的uesd idx)。以前的实现是产生interrupt的时候才会更新signalled_used,而现在是每次更新uesd_ring之后,不管是否有interrupt都更新它,现在的做法和kernel
vhost_net的实现是一样的。这样做的好处是可以减少不必要的kick事件,以vm接收方向为例,如果guest kernel有NAPI,那么在guest以poll的方式收包的时候会停止更新event_idx。假设NAPI在Δt时间内都会poll(也就是不更新event_idx),那么对原来的实现而言,last_used_idx就是一直增长,而event_idx和signalled_used都是不变的;在这种情况下,产生Interrupt只有两种情况,一是guest NAPI停止poll、更新event_idx,另一种是last_used_idx超过2^16,比如(last_used_idx=13,
signaled_used=65535, event_idx=10 )。而对现在的实现而言,last_used_idx也一直增长,event_idx也是固定不变的,不过(last_used_idx – signalled_used)是一个相对固定的值(其值等于这一次更新的used ring的descriptor数,我们称它为Δf);因此在这种情况下guest NAPI结束后,更新event_idx,但(last_used_idx – event_idx)的值未必小于等于Δf,所以不一定需要产生interrupt。所以,在guest如果没有NAPI,现在的实现与以前的相比可以减小(last_used_idx – signaled_used)的值,这样可以减小(last_used_idx – event_idx)小于等于这个值的的概率,从而减小产生interrupt的概率。
最后一点就是packed方式是如何获取前端当前的消耗位置的,也就是event_idx的,是通过off_wrap的低15位以及高位的warp counter来完成的。
有了以上背景知识,我们就开始分别分析前后端通知的四种情况。
发送队列通前端知后端
我们看发送队列(txq)通知后端的情况。首先明确对于发送队列,前端通知(kick)后端的作用是什么?发送队列kick后端就是为了告诉后端前端已经将数据放入了共享ring(具体就是avail desc)中,后端可以来取数据了。
所以我们看前端virtio_net驱动的代码实现,在发送函数start_xmit的结尾有如下调用:
if
(kick || netif_xmit_stopped(txq))
virtqueue_kick(sq->vq);
我们来看virtqueue_kick的实现。
l virtqueue_kick (kernel4.9
)
-
bool virtqueue_kick(struct virtqueue *vq)
-
{
-
if (virtqueue_kick_prepare(vq))
-
return virtqueue_notify(vq);
-
return true;
-
}
可以看到真正发送kick通知的函数是virtqueue_notify,其调用的条件是virtqueue_kick_prepare,只有其返回true的时候才会kick后端。我们看下virtqueue_kick_prepare的实现。
l virtqueue_kick_prepare
(kernel4.9 )
-
bool virtqueue_kick_prepare(struct virtqueue *_vq)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
u16 new, old;
-
bool needs_kick;
-
-
START_USE(vq);
-
/* We need to expose available array entries before checking avail
-
* event. */
-
virtio_mb(vq->weak_barriers);
-
-
old = vq->avail_idx_shadow - vq->num_added; /*上次通知时的avail_idx*/
-
new = vq->avail_idx_shadow; /* 本次发送报文后的avail_idx */
-
vq->num_added = 0;
-
-
if (vq->event) { /* 如果支持event_idx */
-
needs_kick = vring_need_event(virtio16_to_cpu(_vq->vdev, vring_avail_event(&vq->vring)),
-
new, old);
-
} else {
-
needs_kick = !(vq->vring.used->flags & cpu_to_virtio16(_vq->vdev, VRING_USED_F_NO_NOTIFY));
-
}
-
END_USE(vq);
-
return needs_kick;
-
}
可以看到如果支持event_idx就更加old和new以及used->ring[(vr)->num]的范围来通知后端,详细过程可以参考之前写的event_idx相关文章。如果不支持event_idx就看used->flags是否设置了VRING_USED_F_NO_NOTIFY,如果没有设置就通知后端,否则就不通知。这里我们也可以看到,当开启event_idx后,VRING_USED_F_NO_NOTIFY也就失去了作用。此外VRING_USED_F_NO_NOTIFY这个flag是后端设置的,对前端是只读的,用来告诉前端是否需要通知后端。
然后我们看后端处理,这里我们因为使用的是dpdk,而dpdk一般采用的polling模式,设置VRING_USED_F_NO_NOTIFY,所以前端是不会kick后端的,但是如果开启了event_idx呢?我们知道开启event_idx时,前端kick后端需要后端通过used->ring[(vr)->num]告诉前端当前avail desc消耗到什么位置了,但是当前dpdk
vhost_user并没有这个处理,因此当前如果打开event_idx,后端dpdk依然是无法收到中断的。
发送队列后端通知前端
我们再看发送队列后端通知前端的过程。发送队列为什么要通知前端呢?因为后端将前端放入avail ring中的数据取出后需要告诉前端对应的数据已经被取走了,你可以把相关数据buffer释放了。而究竟释放那些buffer是取决于uesd ring的,所以通知前端本质上是为了告诉前端uesd
ring有更新了。
但是有一点要注意,我们知道uesd ring是前后端共享的,所以如果后端更新了uesd ring,即使不通知前端,前端应该也是可以感知到的。所以前端释放buffer不一定要依赖后端kick。事实上也的确如此。
我们还看发送start_xmit,
l start_xmit (kernel4.9 )
-
static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev)
-
{
-
struct virtnet_info *vi = netdev_priv(dev);
-
int qnum = skb_get_queue_mapping(skb);
-
struct send_queue *sq = &vi->sq[qnum];
-
int err;
-
struct netdev_queue *txq = netdev_get_tx_queue(dev, qnum);
-
bool kick = !skb->xmit_more;
-
-
/* Free up any pending old buffers before queueing new ones. */
-
free_old_xmit_skbs(sq);
-
…….
-
}
其中开头部分就首先调用free_old_xmit_skbs根据uesd ring将之前的buffer释放掉。具体流程如下图所示。
其中detach_buf负责更加uesd ring来将对应的desc释放掉,并将对应地址dma unmmap。另外要注意的是virtqueue_get_buf函数的最后有如下调用:
l virtqueue_get_buf (kernel4.9 )
-
void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
void *ret;
-
unsigned int i;
-
u16 last_used;
-
-
last_used = (vq->last_used_idx & (vq->vring.num - 1));
-
i = virtio32_to_cpu(_vq->vdev, vq->vring.used->ring[last_used].id);
-
*len = virtio32_to_cpu(_vq->vdev, vq->vring.used->ring[last_used].len);
-
……
-
/* detach_buf clears data, so grab it now. */
-
ret = vq->desc_state[i].data;
-
detach_buf(vq, i);
-
vq->last_used_idx++;
-
/* If we expect an interrupt for the next entry, tell host
-
* by writing event index and flush out the write before
-
* the read in the next get_buf call. */
-
if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT))
-
virtio_store_mb(vq->weak_barriers,
-
&vring_used_event(&vq->vring),
-
cpu_to_virtio16(_vq->vdev, vq->last_used_idx));
-
-
END_USE(vq);
-
return ret;
-
}
当前端没有设置VRING_AVAIL_F_NO_INTERRUPT时,会更新avail->ring[(vr)->num],这也是后端开启event_idx时kick前端的条件。VRING_AVAIL_F_NO_INTERRUPT是前端设置,后端只读的。为什么要更新avail->ring[(vr)->num]呢?avail->ring[(vr)->num]中记录的前端已经处理到那个uesd idx了,因为可以及时告诉后端前端处理到什么位置了,后端来根据情况决定是否需要kick前端。
那现在问题又回来了,对于txq既然前端不需要后端kick也能释放buffer,那后端kick有什么用呢?我们再回头看一下start_xmit发送函数,有一下逻辑。
l start_xmit (kernel4.9
)
-
static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev)
-
{
-
struct virtnet_info *vi = netdev_priv(dev);
-
int qnum = skb_get_queue_mapping(skb);
-
struct send_queue *sq = &vi->sq[qnum];
-
int err;
-
struct netdev_queue *txq = netdev_get_tx_queue(dev, qnum);
-
bool kick = !skb->xmit_more;
-
……
-
/* If running out of space, stop queue to avoid getting packets that we
-
* are then unable to transmit.
-
* An alternative would be to force queuing layer to requeue the skb by
-
* returning NETDEV_TX_BUSY. However, NETDEV_TX_BUSY should not be
-
* returned in a normal path of operation: it means that driver is not
-
* maintaining the TX queue stop/start state properly, and causes
-
* the stack to do a non-trivial amount of useless work.
-
* Since most packets only take 1 or 2 ring slots, stopping the queue
-
* early means 16 slots are typically wasted.
-
*/
-
if (sq->vq->num_free < 2+MAX_SKB_FRAGS) {
-
netif_stop_subqueue(dev, qnum);
-
if (unlikely(!virtqueue_enable_cb_delayed(sq->vq))) {
-
/* More just got used, free them then recheck. */
-
free_old_xmit_skbs(sq);
-
if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) {
-
netif_start_subqueue(dev, qnum);
-
virtqueue_disable_cb(sq->vq);
-
}
-
}
-
}
-
……
-
}
当前端发送速率过快,从而vq->num_free较少时会调用netif_stop_subqueue(将队列状态设置为__QUEUE_STATE_DRV_XOFF),这样队列的start_xmit函数下次在__dev_queue_xmit中就不会被调用。要想打破这样一个状态,就需要后端的kick了。这里还有一个十分关键的函数,就这在stop_queue之后调用的virtqueue_enable_cb_delayed函数。这个函数中有着至关重要的一个操作,如下:
l virtqueue_enable_cb_delayed
-
bool virtqueue_enable_cb_delayed(struct virtqueue *_vq)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
u16 bufs;
-
-
START_USE(vq);
-
-
/* We optimistically turn back on interrupts, then check if there was
-
* more to do. */
-
/* Depending on the VIRTIO_RING_F_USED_EVENT_IDX feature, we need to
-
* either clear the flags bit or point the event index at the next
-
* entry. Always update the event index to keep code simple. */
-
/* 取消设置VRING_AVAIL_F_NO_INTERRUPT,使后端可以发送中断上来 */
-
if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) {
-
vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT;
-
if (!vq->event)
-
vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
-
}
-
/* TODO: tune this threshold */
-
bufs = (u16)(vq->avail_idx_shadow - vq->last_used_idx) * 3 / 4;
-
/* 更新avail->ring[(vr)->num]以供event_idx使用 */
-
virtio_store_mb(vq->weak_barriers,
-
&vring_used_event(&vq->vring),
-
cpu_to_virtio16(_vq->vdev, vq->last_used_idx + bufs));
-
if (unlikely((u16)(virtio16_to_cpu(_vq->vdev, vq->vring.used->idx) - vq->last_used_idx) > bufs)) {
-
END_USE(vq);
-
return false;
-
}
-
-
END_USE(vq);
-
return true;
-
}
首先这个函数做了一个关键操作,就是取消设置VRING_AVAIL_F_NO_INTERRUPT,使后端可以发送中断上来。那么VRING_AVAIL_F_NO_INTERRUPT这个flag是什么时候被置上的呢?这个我们后面回答。另外一点就是会更新更新avail->ring[(vr)->num]告诉前端uesd desc当前消耗到哪里了,但注意这里不是更新为vq->last_used_idx,而是还加了一个bufs,为的就是让后端延时一会再kick前端。
下面我们看一下dpdk后端kick前端的时机,我们以split方式为例说明。
l virtio_dev_tx_split(dpdk18.11)
-
static __rte_always_inline uint16_t
-
virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
-
{
-
uint16_t i;
-
uint16_t free_entries;
-
-
if (unlikely(dev->dequeue_zero_copy)) {
-
struct zcopy_mbuf *zmbuf, *next;
-
-
for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
-
zmbuf != NULL; zmbuf = next) {
-
next = TAILQ_NEXT(zmbuf, next);
-
-
if (mbuf_is_consumed(zmbuf->mbuf)) {
-
update_shadow_used_ring_split(vq,
-
zmbuf->desc_idx, 0);
-
TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
-
restore_mbuf(zmbuf->mbuf);
-
rte_pktmbuf_free(zmbuf->mbuf);
-
put_zmbuf(zmbuf);
-
vq->nr_zmbuf -= 1;
-
}
-
}
-
-
if (likely(vq->shadow_used_idx)) {
-
/*如果是零拷贝方式,则每次接收前检查之前已经dma完成的报文,更新uesd ring,kick前端*/
-
flush_shadow_used_ring_split(dev, vq);
-
vhost_vring_call_split(dev, vq);
-
}
-
}
-
......
-
for (i = 0; i < count; i++) {
-
......
-
err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
-
mbuf_pool);
-
......
-
}
-
vq->last_avail_idx += i;
-
-
if (likely(dev->dequeue_zero_copy == 0)) {
-
do_data_copy_dequeue(vq);
-
if (unlikely(i < count))
-
vq->shadow_used_idx = i;
-
if (likely(vq->shadow_used_idx)) {
-
/* 更新used ring,kick前端 */
-
flush_shadow_used_ring_split(dev, vq);
-
vhost_vring_call_split(dev, vq);
-
}
-
}
-
-
return i;
-
}
如果是零拷贝,则在后端接受逻辑的开始,判断上一次接受的报文是否dma完成,如果完成则更新uesd ring,调用vhost_vring_call_split
kick前端。如果不是零拷贝模式,则在将报文从desc拷贝出来后,更新uesd ring,调用vhost_vring_call_split
kick前端。关于vhost_vring_call_split的实现在前面已经介绍过了,这里不再重复。
下面看前端收到后端的中断是如何处理的。前面已经介绍过了,对于发送队列,其注册的中断回调函数中会调用skb_xmit_done。
l skb_xmit_done(kernel 4.9)
-
static void skb_xmit_done(struct virtqueue *vq)
-
{
-
struct virtnet_info *vi = vq->vdev->priv;
-
-
/* Suppress further interrupts. */
-
virtqueue_disable_cb(vq);
-
-
/* We were probably waiting for more output buffers. */
-
netif_wake_subqueue(vi->dev, vq2txq(vq));
-
}
其中virtqueue_disable_cb会将vring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT,这样后端就不会发生kick到前端了(不开启event_idx的情况),然后调用netif_wake_subqueue唤醒被stop的queue(清除__QUEUE_STATE_DRV_XOFF 的state)。
l virtqueue_disable_cb(kernel)
-
void virtqueue_disable_cb(struct virtqueue *_vq)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
-
if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT)) {
-
vq->avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
-
if (!vq->event)
-
vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
-
}
-
}
这里又有个疑问,唤醒stop 状态的queue容易理解,可以设置上了VRING_AVAIL_F_NO_INTERRUPT后,什么时候取消呢?其实前面我们已经介绍过了,就是下次stop queue时调用virtqueue_enable_cb_delayed的处理中。所以我们可以看到,即使不开启event_idx,也不是每次uesd 变化都会kick前端的,而是只有queue stop后才会kick,正常状态前端是会设置VRING_AVAIL_F_NO_INTERRUPT不让后端kick的。
接收队列后端通知前端
对于接收队列,后端会在将mbuf数据拷贝到avail desc中后,更新uesd ring,然后kick前端。kick前端的目的就是告诉前端,我有数据发送给你了(更新了uesd ring),你可以来取数据了。我们以split方式的接收方向为例。其接受逻辑在virtio_dev_rx_split中实现。
l virtio_dev_rx_split(dpdk
18.11)
-
static __rte_always_inline uint32_t
-
virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
struct rte_mbuf **pkts, uint32_t count)
-
{
-
uint32_t pkt_idx = 0;
-
uint16_t num_buffers;
-
struct buf_vector buf_vec[BUF_VECTOR_MAX];
-
uint16_t avail_head;
-
-
rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
-
avail_head = *((volatile uint16_t *)&vq->avail->idx);
-
-
for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
-
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
-
uint16_t nr_vec = 0;
-
/* 为拷贝当前mbuf后续预留avail desc */
-
if (unlikely(reserve_avail_buf_split(dev, vq,
-
pkt_len, buf_vec, &num_buffers,
-
avail_head, &nr_vec) < 0)) {
-
VHOST_LOG_DEBUG(VHOST_DATA,
-
"(%d) failed to get enough desc from vring\n",
-
dev->vid);
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
-
rte_prefetch0((void *)(uintptr_t)buf_vec[0].buf_addr);
-
-
VHOST_LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n",
-
dev->vid, vq->last_avail_idx,
-
vq->last_avail_idx + num_buffers);
-
/* 拷贝mbuf到avail desc */
-
if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
-
buf_vec, nr_vec,
-
num_buffers) < 0) {
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
/* 更新last_avail_idx */
-
vq->last_avail_idx += num_buffers;
-
}
-
/* 小包的批处理拷贝 */
-
do_data_copy_enqueue(dev, vq);
-
-
if (likely(vq->shadow_used_idx)) {
-
flush_shadow_used_ring_split(dev, vq); /* 更新used ring */
-
vhost_vring_call_split(dev, vq); /* 通知前端 */
-
}
-
-
return pkt_idx;
-
}
其中kick前端的处理在最后的vhost_vring_call_split函数中,这个我们在“后端vhost_user的kick方式”中已经介绍过,这里就不再重复了。
然后我们看前端的通知处理。根据前面的介绍,接受队列前端注册的中断处理函数最终会调用到skb_recv_done。
l skb_recv_done(kernel 4.9)
-
static void skb_recv_done(struct virtqueue *rvq)
-
{
-
struct virtnet_info *vi = rvq->vdev->priv;
-
struct receive_queue *rq = &vi->rq[vq2rxq(rvq)];
-
-
/* Schedule NAPI, Suppress further interrupts if successful. */
-
if (napi_schedule_prep(&rq->napi)) {
-
virtqueue_disable_cb(rvq);
-
__napi_schedule(&rq->napi);
-
}
-
}
我们看到这个函数主要工作就是调用virtqueue_disable_cb给vring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT从而禁止后端发送中断(不开启event_idx的情况),然后唤起NAPI处理。所以在NAPI的情况后端通知是被关闭的。那么这个flag什么时候会被打开呢?答案就是在virtio_net的NAPI处理逻辑中,即virtnet_poll函数。
l virtnet_poll(kernel 4.9)
-
static int virtnet_poll(struct napi_struct *napi, int budget)
-
{
-
struct receive_queue *rq =
-
container_of(napi, struct receive_queue, napi);
-
unsigned int r, received;
-
-
received = virtnet_receive(rq, budget);
-
-
/* Out of packets? */
-
if (received < budget) {
-
r = virtqueue_enable_cb_prepare(rq->vq);
-
napi_complete_done(napi, received);
-
if (unlikely(virtqueue_poll(rq->vq, r)) &&
-
napi_schedule_prep(napi)) {
-
virtqueue_disable_cb(rq->vq);
-
__napi_schedule(napi);
-
}
-
}
-
-
return received;
-
}
在NAPI处理流程中如果received <
budget,证明本轮数据接收已经比较少了,NAPI过程可能要退出了,这时调用virtqueue_enable_cb_prepare将之前的VRING_AVAIL_F_NO_INTERRUPT取消,从NAPI模式进入中断模式。
l virtqueue_enable_cb_prepare(kernel
4.9)
-
unsigned virtqueue_enable_cb_prepare(struct virtqueue *_vq)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
u16 last_used_idx;
-
-
START_USE(vq);
-
-
/* We optimistically turn back on interrupts, then check if there was
-
* more to do. */
-
/* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to
-
* either clear the flags bit or point the event index at the next
-
* entry. Always do both to keep code simple. */
-
if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) {
-
vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT;
-
if (!vq->event)
-
vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
-
}
-
vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, last_used_idx = vq->last_used_idx);
-
END_USE(vq);
-
return last_used_idx;
-
}
注意这个函数除了取消VRING_AVAIL_F_NO_INTERRUPT设置之外,还会更新avail->ring[(vr)->num],以供开启event_idx时后端使用。
接收队列前端通知后端
下面看接收方向前端通知的过程。首先要清除接收方向前端通知后端的目的,那就是告诉后端avail ring已经更新了(有了更多空buffer),你可以继续放入更多数据了。从这里我们也可以看出,前端通知后端,无论发送还是接收方向,都是告诉后端有了更多的avail desc,而后端通知前端,都是告诉前端有了更多的uesd 的desc。然后我们看前端通知后端的时机,要想知道前端再何时通知后端,我们需要对前端的数据接收流程有个清晰的认识。下面这个图描述了前端vhost_net接收数据的过程。
而通知后端的时机就在try_fill_recv函数调用中。
l try_fill_recv(kernel 4.9)
-
static bool try_fill_recv(struct virtnet_info *vi, struct receive_queue *rq,
-
gfp_t gfp)
-
{
-
int err;
-
bool oom;
-
-
gfp |= __GFP_COLD;
-
/* 针对三种情况分别给desc注入对应的buffer,并更新avail idx */
-
do {
-
if (vi->mergeable_rx_bufs)
-
err = add_recvbuf_mergeable(rq, gfp);
-
else if (vi->big_packets)
-
err = add_recvbuf_big(vi, rq, gfp);
-
else
-
err = add_recvbuf_small(vi, rq, gfp);
-
-
oom = err == -ENOMEM;
-
if (err)
-
break;
-
} while (rq->vq->num_free);
-
virtqueue_kick(rq->vq); /* kick 后端 */
-
return !oom;
-
}
这个函数首先会根据是否支持mergeable已经是否支持收大包来向avail desc注入对应的空buffer。每种情况下注入buffer产生的desc
chain长度是不同的。这里不是重点,我们就不展开了。下面看关键的virtqueue_kick函数。
l virtqueue_kick(kernel 4.9)
-
bool virtqueue_kick(struct virtqueue *vq)
-
{
-
if (virtqueue_kick_prepare(vq))
-
return virtqueue_notify(vq);
-
return true;
-
}
其中真正向后端发送通知的是virtqueue_notify,但是首先要通过virtqueue_kick_prepare的判断。virtqueue_kick_prepare这个函数我们之前已经介绍过了,如果不开启event_idx时,会根据vring.used->flags是否设置VRING_USED_F_NO_NOTIFY来决定是否kick后端,而开启event_idx时,则会根据后端填入(vr)->used->ring[(vr)->num]中的后端消耗位置来决定是否kick。
同样如果后端使用的是dpdk
vhost_user,那么当前后端是不会写(vr)->used->ring[(vr)->num]告诉前端自己的avail使用位置的,如果开启了event_idx后端还是无法收到中断的。那如果dpdk使用中断模式,这里收不到中断是否会有问题呢?我们换位思考一下,对比一下“发送队列后端通知前端”的场景,在“发送队列后端通知前端”中,如果后端不通知前端,那么前端一旦感觉到没有可用的avail desc后就会stop
queue,之后就无法被唤醒了。而这里的场景,vhost_user后端没有类似stop queue的操作,所以即使收不到前端的中断也没有问题。但是在“发送队列前端通知后端”的场景中,如果dpdk采用中断模式切开启了event_idx,那么vhost_user就会因为收不到中断而无法取出前端发送的报文。所以对应想使用中断模式,且开启event_idx的场景,vhost_user需要添加对(vr)->used->ring[(vr)->num]的处理。