Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3595734
  • 博文数量: 208
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7375
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 18:56
个人简介

将晦涩难懂的技术讲的通俗易懂

文章分类

全部博文(208)

文章存档

2024年(10)

2023年(9)

2022年(4)

2021年(12)

2020年(8)

2019年(18)

2018年(19)

2017年(9)

2016年(26)

2015年(18)

2014年(54)

2013年(20)

分类: LINUX

2019-04-07 18:49:28

vhost前后端通知机制场景分析

——lvyilong316


这篇文章主要详细的分析一下vhost的通知机制,前面的一些文章陆续也对前后端的通知有所介绍,这里相当于算是一次汇总吧。

所谓前后端通知,必然涉及两个方向:前端通知后端,后端通知前端。而我们知道vhosttxqrxq,对于每种queue都伴随有这两种通知。而通知方式又根据是否支持event_idx有着不同的实现,最后virtio1.1引入的packed ring后,通知相对split ring又有不同。下面我们以txqrxq的两个方向共四种情况来分析前后端的通知实现。其中前端以kernel4.9 virtio_net实现为例分析,后端以dpdk 18.11 vhost_user实现分析。在展开前后端通知分析前,我们先了解两个背景知识:前端中断处理函数的注册和后端vhost_userkick方式。

前端中断处理函数注册

后端对前端的通知,是以中断方式传递到前端的。分析通知的接收处理就少不了要了解这些中断处理函数。所以我们先看一下前端是怎么注册中断处理函数的。这些需要从virtio_net的加载函数virtnet_probe说起,具体如下图所示。

    我们知道virtio设备分为mordenlagecy两种,我们以morden设备为例。对于morden设备,会调用virtio_pci_modern_probe初始化config ops

vp_dev->vdev.config = &virtio_pci_config_ops;

find_vqs函数对应为vp_modern_find_vqsvp_modern_find_vqs 其中主要调用vq_find_vqs函数。vp_find_vqs函数完成队列中断处理函数的初始化,根据设备对中断的支持,分为以下三种情况:

(1)     所有txqrxq以及ctrlq都共享一个中断处理;

(2)     ctrlq单独使用一个中断处理,其他txqrxq共享一个中断处理;

(3)     可以每个queue(包含txqrxq以及ctrlq)各一个中断处理;

l  vp_find_vqs (kernel 4.9)

点击(此处)折叠或打开

  1. /* the config->find_vqs() implementation */
  2. int vp_find_vqs(struct virtio_device *vdev, unsigned nvqs,
  3.         struct virtqueue *vqs[],
  4.         vq_callback_t *callbacks[],
  5.         const char * const names[])
  6. {
  7.     int err;

  8.     /* Try MSI-X with one vector per queue. */
  9.     err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, true, true);
  10.     if (!err)
  11.         return 0;
  12.     /* Fallback: MSI-X with one vector for config, one shared for queues. */
  13.     err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names,
  14.                  true, false);
  15.     if (!err)
  16.         return 0;
  17.     /* Finally fall back to regular interrupts. */
  18.     return vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names,
  19.                  false, false);
  20. }


函数首先尝试方式(3)每个queue各一个中断处理,如果失败再尝试方式(2),如果再失败就只能使用方式(1)了。

我们看到每种方式都是调用同一个函数vp_try_to_find_vqs,只是传入的参数不同。那我们就来看下这个函数的主要内容。

l  vp_try_to_find_vqs

点击(此处)折叠或打开

  1. static int vp_try_to_find_vqs(struct virtio_device *vdev, unsigned nvqs,
  2.              struct virtqueue *vqs[],
  3.              vq_callback_t *callbacks[],
  4.              const char * const names[],
  5.              bool use_msix,
  6.              bool per_vq_vectors)
  7. {
  8.     struct virtio_pci_device *vp_dev = to_vp_device(vdev);
  9.     u16 msix_vec;
  10.     int i, err, nvectors, allocated_vectors;

  11.     vp_dev->vqs = kmalloc(nvqs * sizeof *vp_dev->vqs, GFP_KERNEL);
  12.     if (!vp_dev->vqs)
  13.         return -ENOMEM;

  14.     if (!use_msix) {
  15.         /* 方式1,所有txq,rxq以及ctrlq都共享一个中断处理 */
  16.         /* Old style: one normal interrupt for change and all vqs. */
  17.         err = vp_request_intx(vdev);
  18.         if (err)
  19.             goto error_find;
  20.     } else {
  21.         if (per_vq_vectors) {
  22.             /* Best option: one for change interrupt, one per vq. */
  23.          /* 方式3,可以每个queue一个中断处理 */
  24.             nvectors = 1;
  25.             for (i = 0; i < nvqs; ++i)
  26.                 if (callbacks[i])
  27.                     ++nvectors;
  28.         } else {
  29.             /* Second best: one for change, shared for all vqs. */
  30.          /* 方式2,ctrlq一个中断,其他txq和rxq共享一个中断处理 */
  31.             nvectors = 2;
  32.         }

  33.         err = vp_request_msix_vectors(vdev, nvectors, per_vq_vectors);
  34.         if (err)
  35.             goto error_find;
  36.     }

  37.     vp_dev->per_vq_vectors = per_vq_vectors;
  38.     allocated_vectors = vp_dev->msix_used_vectors;
  39.     for (i = 0; i < nvqs; ++i) { /* 方式3的处理 */
  40.         if (!names[i]) {
  41.             vqs[i] = NULL;
  42.             continue;
  43.         } else if (!callbacks[i] || !vp_dev->msix_enabled)
  44.             msix_vec = VIRTIO_MSI_NO_VECTOR;
  45.         else if (vp_dev->per_vq_vectors)
  46.             msix_vec = allocated_vectors++;
  47.         else
  48.             msix_vec = VP_MSIX_VQ_VECTOR;
  49.         vqs[i] = vp_setup_vq(vdev, i, callbacks[i], names[i], msix_vec);
  50.         if (IS_ERR(vqs[i])) {
  51.             err = PTR_ERR(vqs[i]);
  52.             goto error_find;
  53.         }

  54.         if (!vp_dev->per_vq_vectors || msix_vec == VIRTIO_MSI_NO_VECTOR)
  55.             continue;

  56.         /* allocate per-vq irq if available and necessary */
  57.         snprintf(vp_dev->msix_names[msix_vec],
  58.              sizeof *vp_dev->msix_names,
  59.              "%s-%s",
  60.              dev_name(&vp_dev->vdev.dev), names[i]);
  61.         err = request_irq(vp_dev->msix_entries[msix_vec].vector,
  62.                  vring_interrupt, 0,
  63.                  vp_dev->msix_names[msix_vec],
  64.                  vqs[i]);
  65.         if (err) {
  66.             vp_del_vq(vqs[i]);
  67.             goto error_find;
  68.         }
  69.     }
  70.     return 0;

  71. error_find:
  72.     vp_del_vqs(vdev);
  73.     return err;
  74. }

其中vp_request_intx函数完成方式1的处理,具体就是通过request_irq注册中断处理函数vp_interruptvp_request_msix_vectors函数完成方式2的处理,其中调用request_irqctrlq注册中断处理函数为vp_config_changed(方式3ctrlq中断处理也是这里注册),调用request_irq给数据queuetxqrxq)注册中断处理函数为vp_vring_interrupt;而方式3的剩余处理在本函数的后半部分,为每个数据queue调用request_irq注册中断处理函数vring_interrupt。如果查看代码会发现方式2数据queue共享的中断处理vp_vring_interrupt函数中也是通过遍历所有queue调用vring_interrupt实现的。所以我们重点关注vring_interrupt函数的实现,这是数据queue中断处理的核心。

l  vring_interrupt

点击(此处)折叠或打开

  1. irqreturn_t vring_interrupt(int irq, void *_vq)
  2. {
  3.     struct vring_virtqueue *vq = to_vvq(_vq);

  4.     if (!more_used(vq)) { /* 如果没有更新uesd desc则不需要特殊处理直接返回 */
  5.         pr_debug("virtqueue interrupt with no work for %p\n", vq);
  6.         return IRQ_NONE;
  7.     }

  8.     if (unlikely(vq->broken))
  9.         return IRQ_HANDLED;

  10.     pr_debug("virtqueue callback for %p (%p)\n", vq, vq->vq.callback);
  11.     if (vq->vq.callback)
  12.         vq->vq.callback(&vq->vq);

  13.     return IRQ_HANDLED;
  14. }

  15. static inline bool more_used(const struct vring_virtqueue *vq)
  16. {
  17.     return vq->last_used_idx != virtio16_to_cpu(vq->vq.vdev, vq->vring.used->idx);
  18. }

    如果more_used返回false表示vq->last_used_idx== vring.used->idx,这说明当前没有uesd desc需要更新处理,所以中断直接返回。否则就调用对应queuecallback函数。而callback函数在之前virtnet_find_vqs的调用中被设置。rxqtxqcallback分别注册为了skb_recv_doneskb_xmit_done

callbacks[rxq2vq(i)] = skb_recv_done;

callbacks[txq2vq(i)] = skb_xmit_done;

    所以接收队列和发送队列的中断处理主要就是分别调用skb_recv_doneskb_xmit_done函数。

后端vhost_userkick方式

下面我们再看下后端vhost_user是如果kick前端的。首先是split ring的情况。

l  vhost_vring_call_split (dpdk 1811)

点击(此处)折叠或打开

  1. static __rte_always_inline void
  2. vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
  3. {
  4.     /* Flush used->idx update before we read avail->flags. */
  5.     rte_smp_mb();

  6.     /* Don't kick guest if we don't reach index specified by guest. */
  7.     if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
  8.         uint16_t old = vq->signalled_used;
  9.         uint16_t new = vq->last_used_idx;

  10.         if (vhost_need_event(vhost_used_event(vq), new, old)
  11.             && (vq->callfd >= 0)) {
  12.             vq->signalled_used = vq->last_used_idx;
  13.             eventfd_write(vq->callfd, (eventfd_t) 1);
  14.         }
  15.     } else {
  16.         /* Kick the guest if necessary. */
  17.         if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
  18.                 && (vq->callfd >= 0))
  19.             eventfd_write(vq->callfd, (eventfd_t)1);
  20.     }
  21. }

split的方式比较简单,当开启event_idx的时候,就根据oldnew以及前端消耗到的位置avail->ring[(vr)->size]来判断是否kick;如果没有开启event_idx时则只要前端没有设置VRING_AVAIL_F_NO_INTERRUPTkick前端,注意不开启event_idx时,后端也不是无脑kick的,只有前端没设置VRING_AVAIL_F_NO_INTERRUPT时才会kick,至于前端什么时候设置VRING_AVAIL_F_NO_INTERRUPT,一会再分析。

下面我们看packed ringkick前端处理方式。在介绍packed处理之前我们先看其相关结构。

点击(此处)折叠或打开

  1. struct vhost_virtqueue {
  2.     union {
  3.         struct vring_desc    *desc;
  4.         struct vring_packed_desc *desc_packed;
  5.     };
  6.     union {
  7.         struct vring_avail    *avail;
  8.         struct vring_packed_desc_event *driver_event;
  9.     };
  10.     union {
  11.         struct vring_used    *used;
  12.         struct vring_packed_desc_event *device_event;
  13.     };
  14. ……
  15. };


packed 方式中,由于uesd ringavail ring不再需要,取而代之的是两个desc_event结构。在split方式中我们知道前后端控制是否相互通知是通过avail->flaguesd->flag的设置来完成的,但packed中分别是通过driver_eventdevice_event来完成的。

其中driver_event是后端只读的,是前端控制后端更新uesd desc时是否发送通知的,对应于split 方式的avail->flag

device_event是前端只读的,是后端控制前端更新avail desc时是否发送通知的,对应于split方式的uesd->flag

driver_eventdevice_event都是vring_packed_desc_event结构,其具体结构如下。

点击(此处)折叠或打开

  1. struct vring_packed_desc_event {
  2.     uint16_t off_wrap;
  3.     uint16_t flags;
  4. };

其中flag可以取三个值分别为:

#define VRING_EVENT_F_ENABLE 0x0

#define VRING_EVENT_F_DISABLE 0x1

#define VRING_EVENT_F_DESC 0x2

driver_event取值VRING_EVENT_F_DISABLE相当于split方式avail->flag设置VRING_AVAIL_F_NO_INTERRUPTdevice_event取值VRING_EVENT_F_DISABLE相当于uesd->flag设置VRING_USED_F_NO_NOTIFY

剩下关键的就是最后这个VRING_EVENT_F_DESC flag了,官方spec是这么解释这个flag的: Enable events for a specific descriptor(as specified by Descriptor Ring Change Event Offset/Wrap Counter),Only valid if VIRTIO_F_RING_EVENT_IDX has been negotiated. 可以看出这个flag的作用是指定某一个desc发生变化后触发通知,而这个flag生效的前提就是开启了event_idx。这个解释似乎还是不太直观,其实我们可以对比split event_idx处理方式来理解。我们知道split方式中,如果开启了event_idx,则前端需要通过avail->ring的最后一个desc告诉后端前端的uesd desc处理到哪里了,后端根据这个值来决定是否需要kick前端;而后端则使用uesd->ring的最后一个desc告诉前端后端的avail desc处理到哪里了,前端根据这个来决定是否来通知后端。但是在packed方式中没有了avail ringuesd ring,那如果开启了event_idx前端后端如何才能告知对方自己处理到什么位置了呢?答案就是通过这里的off_wrap成员,可以看到这个成员是一个uint16_t,其中后15位指定了前后端处理到什么位置了,而最高位是为了解决翻转的Wrap Counter,而整个off_wrap字段有意义的前提就是flag被设置为了VRING_EVENT_F_DESC

下面我们对比packedsplit方式后端是否开启通知,来了解下vring_packed_desc_event这三个flag的作用。首先看split的开关中断处理,即vhost_enable_notify_split

l  vhost_enable_notify_split(dpdk 1811)

点击(此处)折叠或打开

  1. static inline void
  2. vhost_enable_notify_split(struct virtio_net *dev,
  3.         struct vhost_virtqueue *vq, int enable)
  4. {
  5.     /* 没有开启EVENT_IDX以vq->used->flags为准 */
  6.     if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) {
  7.         if (enable)
  8.             vq->used->flags &= ~VRING_USED_F_NO_NOTIFY;
  9.         else
  10.             vq->used->flags |= VRING_USED_F_NO_NOTIFY;
  11.     } else {/* 开启EVENT_IDX后不再使用used->flags */
  12.         if (enable)
  13.             vhost_avail_event(vq) = vq->last_avail_idx;
  14.     }
  15. }


   可以看到在没有开启EVENT_IDX时,控制guest是否通知后端是用过控制vq->used->flags设置VRING_USED_F_NO_NOTIFY来实现的,但是如果开启了EVENT_IDX就不再使用used->flags,而是使用EVENT_IDX特有中断限速方式

   下面再看packedguest通知开启关闭方式实现vhost_enable_notify_packed

l  vhost_enable_notify_packed(dpdk 1811)

点击(此处)折叠或打开

  1. static inline void
  2. vhost_enable_notify_packed(struct virtio_net *dev,
  3.         struct vhost_virtqueue *vq, int enable)
  4. {
  5.     uint16_t flags;

  6.     if (!enable) {
  7.         vq->device_event->flags = VRING_EVENT_F_DISABLE;
  8.         return;
  9.     }

  10.     flags = VRING_EVENT_F_ENABLE;
  11.     if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
  12.         flags = VRING_EVENT_F_DESC;
  13.         vq->device_event->off_wrap = vq->last_avail_idx |
  14.             vq->avail_wrap_counter << 15;
  15.     }

  16.     rte_smp_wmb();

  17.     vq->device_event->flags = flags;
  18. }

    当不开启event_idx时,packed 方式使用device_event->flags是否设置VRING_EVENT_F_DISABLE代替split 方式设置的VRING_USED_F_NO_NOTIFY如果开启了event_idxdevice_event->flags一定会被设置为VRING_EVENT_F_DESC,另外注意device_event->off_wrap的初始设置,低15位被设置为vq->last_avail_idx,高位被设置为vq->avail_wrap_counter

    最后我们看一下packed方式下后端是如何决定是否通知前端的,即vhost_vring_call_packed函数的实现。

l  vhost_vring_call_packed(dpdk 1811)

点击(此处)折叠或打开

  1. static __rte_always_inline void
  2. vhost_vring_call_packed(struct virtio_net *dev, struct vhost_virtqueue *vq)
  3. {
  4.     uint16_t old, new, off, off_wrap;
  5.     bool signalled_used_valid, kick = false;

  6.     /* Flush used desc update. */
  7.     rte_smp_mb();
  8.     /* 如果没有开启EVENT_IDX, 则以dev->driver_event->flags是否设置VRING_EVENT_F_DISABLE为准确定是否通知前端 */
  9.     if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) {
  10.         if (vq->driver_event->flags !=
  11.                 VRING_EVENT_F_DISABLE)
  12.             kick = true;
  13.         goto kick;
  14.     }
  15.     /* old 表示上一次通知前端时的used idx,new表示当前的uesd idx */
  16.     old = vq->signalled_used;
  17.     new = vq->last_used_idx;
  18.     vq->signalled_used = new; //注意和split的区别,split是kick前端后才进行signalled_used赋值,而这里是直接赋值
  19.     signalled_used_valid = vq->signalled_used_valid;
  20.     vq->signalled_used_valid = true;
  21.     /* 如果开启了event_idx但是driver_event->flags没有设置VRING_EVENT_F_DESC(正常情况是不存在的)则按照不开启event_idx时处理,及根据VRING_EVENT_F_DISABLE决定是否kick */
  22.     if (vq->driver_event->flags != VRING_EVENT_F_DESC) {
  23.         if (vq->driver_event->flags != VRING_EVENT_F_DISABLE)
  24.             kick = true;
  25.         goto kick;
  26.     }

  27.     if (unlikely(!signalled_used_valid)) {
  28.         kick = true;
  29.         goto kick;
  30.     }

  31.     rte_smp_rmb();
  32.  
  33.     off_wrap = vq->driver_event->off_wrap;
  34.     off = off_wrap & ~(1 << 15); /* 从低15位获取desc的idx */

  35.     if (new <= old)
  36.         old -= vq->size;
  37.     /* 根据最高位的warp counter决定是否翻转 */
  38.     if (vq->used_wrap_counter != off_wrap >> 15)
  39.         off -= vq->size;
  40.     /* off表示前端当前处理的位置,根据off,new,old决定是否kick前端 */
  41.     if (vhost_need_event(off, new, old))
  42.         kick = true;
  43. kick:
  44.     if (kick)
  45.         eventfd_write(vq->callfd, (eventfd_t)1);
  46. }

   在不开启event_idx的时候,和开启event_idx但是driver_event->flags != VRING_EVENT_F_DESC时(正常情况不存在)都是按照前端是否设置VRING_EVENT_F_DISABLE来决定的。

下面一点和split略有不同,就是vq->signalled_used赋值的位置,splitkick前端后条件满足时才进行signalled_used赋值,而这里是直接赋值,并且引入了一个signalled_used_valid变量。首先明确一下,这个变化和packed本身无关,其实split在后续patch也采用了类似处理(详见:http://mails.dpdk.org/archives/dev/2019-March/126684.html )。其中vq->signalled_used_valid的引入是为了在热迁移后以及前端驱动reload后,将vq->signalled_used的值标记为无效,正常情况vq->signalled_used记录的是上次通知前端时后端处理到的uesd idx,但是当热迁移发生或者前端驱动reload后这个值将不再有意义。signalled_used_valid是用在第一次更新used ringvirtio-net driver reloadlive migration之后,所以在ring初始化的时候、guest发送VHOST_USER_GET_VRING_BASE的时候才赋值为false

另外将vq->signalled_used赋值位置前移,而不是只有满足kick条件才赋值,这个改动是为了一个优化。之前产生interrupt的条件是: last_used_idx event_idx <= last_used_idx signalled_used event_idx为前端更新到的uesd idx)。以前的实现是产生interrupt的时候才会更新signalled_used,而现在是每次更新uesd_ring之后,不管是否有interrupt都更新它,现在的做法和kernel vhost_net的实现是一样的。这样做的好处是可以减少不必要的kick事件,以vm接收方向为例,如果guest kernelNAPI,那么在guestpoll的方式收包的时候会停止更新event_idx。假设NAPI在Δt时间内都会poll(也就是不更新event_idx),那么对原来的实现而言,last_used_idx就是一直增长,而event_idxsignalled_used都是不变的;在这种情况下,产生Interrupt只有两种情况,一是guest NAPI停止poll、更新event_idx,另一种是last_used_idx超过2^16,比如(last_used_idx=13, signaled_used=65535, event_idx=10 )。而对现在的实现而言,last_used_idx也一直增长,event_idx也是固定不变的,不过(last_used_idx signalled_used)是一个相对固定的值(其值等于这一次更新的used ringdescriptor数,我们称它为Δf);因此在这种情况下guest NAPI结束后,更新event_idx,但(last_used_idx event_idx)的值未必小于等于Δf,所以不一定需要产生interrupt。所以,在guest如果没有NAPI,现在的实现与以前的相比可以减小(last_used_idx signaled_used)的值,这样可以减小(last_used_idx event_idx)小于等于这个值的的概率,从而减小产生interrupt的概率。

最后一点就是packed方式是如何获取前端当前的消耗位置的,也就是event_idx的,是通过off_wrap的低15位以及高位的warp counter来完成的。

有了以上背景知识,我们就开始分别分析前后端通知的四种情况。

发送队列通前端知后端

我们看发送队列(txq)通知后端的情况。首先明确对于发送队列,前端通知(kick)后端的作用是什么?发送队列kick后端就是为了告诉后端前端已经将数据放入了共享ring(具体就是avail desc)中,后端可以来取数据了

所以我们看前端virtio_net驱动的代码实现,在发送函数start_xmit的结尾有如下调用:

                   if (kick || netif_xmit_stopped(txq))

                                      virtqueue_kick(sq->vq);

我们来看virtqueue_kick的实现。

l  virtqueue_kick (kernel4.9 )

点击(此处)折叠或打开

  1. bool virtqueue_kick(struct virtqueue *vq)
  2. {
  3.     if (virtqueue_kick_prepare(vq))
  4.         return virtqueue_notify(vq);
  5.     return true;
  6. }

可以看到真正发送kick通知的函数是virtqueue_notify,其调用的条件是virtqueue_kick_prepare,只有其返回true的时候才会kick后端。我们看下virtqueue_kick_prepare的实现。

l  virtqueue_kick_prepare (kernel4.9 )

点击(此处)折叠或打开

  1. bool virtqueue_kick_prepare(struct virtqueue *_vq)
  2. {
  3.     struct vring_virtqueue *vq = to_vvq(_vq);
  4.     u16 new, old;
  5.     bool needs_kick;

  6.     START_USE(vq);
  7.     /* We need to expose available array entries before checking avail
  8.      * event. */
  9.     virtio_mb(vq->weak_barriers);

  10.     old = vq->avail_idx_shadow - vq->num_added; /*上次通知时的avail_idx*/
  11.     new = vq->avail_idx_shadow; /* 本次发送报文后的avail_idx */
  12.     vq->num_added = 0;

  13.     if (vq->event) { /* 如果支持event_idx */
  14.         needs_kick = vring_need_event(virtio16_to_cpu(_vq->vdev, vring_avail_event(&vq->vring)),
  15.                      new, old);
  16.     } else {
  17.         needs_kick = !(vq->vring.used->flags & cpu_to_virtio16(_vq->vdev, VRING_USED_F_NO_NOTIFY));
  18.     }
  19.     END_USE(vq);
  20.     return needs_kick;
  21. }

    可以看到如果支持event_idx就更加oldnew以及used->ring[(vr)->num]的范围来通知后端,详细过程可以参考之前写的event_idx相关文章。如果不支持event_idx就看used->flags是否设置了VRING_USED_F_NO_NOTIFY,如果没有设置就通知后端,否则就不通知。这里我们也可以看到,当开启event_idx后,VRING_USED_F_NO_NOTIFY也就失去了作用此外VRING_USED_F_NO_NOTIFY这个flag是后端设置的,对前端是只读的,用来告诉前端是否需要通知后端。

    然后我们看后端处理,这里我们因为使用的是dpdk,而dpdk一般采用的polling模式,设置VRING_USED_F_NO_NOTIFY,所以前端是不会kick后端的,但是如果开启了event_idx呢?我们知道开启event_idx时,前端kick后端需要后端通过used->ring[(vr)->num]告诉前端当前avail desc消耗到什么位置了,但是当前dpdk vhost_user并没有这个处理,因此当前如果打开event_idx,后端dpdk依然是无法收到中断的

发送队列后端通知前端

    我们再看发送队列后端通知前端的过程。发送队列为什么要通知前端呢?因为后端将前端放入avail ring中的数据取出后需要告诉前端对应的数据已经被取走了,你可以把相关数据buffer释放了。而究竟释放那些buffer是取决于uesd ring的,所以通知前端本质上是为了告诉前端uesd ring有更新了

    但是有一点要注意,我们知道uesd ring是前后端共享的,所以如果后端更新了uesd ring,即使不通知前端,前端应该也是可以感知到的。所以前端释放buffer不一定要依赖后端kick。事实上也的确如此。

     我们还看发送start_xmit

l  start_xmit (kernel4.9 )

点击(此处)折叠或打开

  1. static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev)
  2. {
  3.     struct virtnet_info *vi = netdev_priv(dev);
  4.     int qnum = skb_get_queue_mapping(skb);
  5.     struct send_queue *sq = &vi->sq[qnum];
  6.     int err;
  7.     struct netdev_queue *txq = netdev_get_tx_queue(dev, qnum);
  8.     bool kick = !skb->xmit_more;

  9.     /* Free up any pending old buffers before queueing new ones. */
  10.     free_old_xmit_skbs(sq);
  11. …….
  12. }

    其中开头部分就首先调用free_old_xmit_skbs根据uesd ring将之前的buffer释放掉。具体流程如下图所示。

    其中detach_buf负责更加uesd ring来将对应的desc释放掉,并将对应地址dma unmmap。另外要注意的是virtqueue_get_buf函数的最后有如下调用:

l  virtqueue_get_buf (kernel4.9 )

点击(此处)折叠或打开

  1. void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len)
  2. {
  3.     struct vring_virtqueue *vq = to_vvq(_vq);
  4.     void *ret;
  5.     unsigned int i;
  6.     u16 last_used;

  7.     last_used = (vq->last_used_idx & (vq->vring.num - 1));
  8.     i = virtio32_to_cpu(_vq->vdev, vq->vring.used->ring[last_used].id);
  9.     *len = virtio32_to_cpu(_vq->vdev, vq->vring.used->ring[last_used].len);
  10.     ……
  11.     /* detach_buf clears data, so grab it now. */
  12.     ret = vq->desc_state[i].data;
  13.     detach_buf(vq, i);
  14.     vq->last_used_idx++;
  15.     /* If we expect an interrupt for the next entry, tell host
  16.      * by writing event index and flush out the write before
  17.      * the read in the next get_buf call. */
  18.     if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT))
  19.         virtio_store_mb(vq->weak_barriers,
  20.                 &vring_used_event(&vq->vring),
  21.                 cpu_to_virtio16(_vq->vdev, vq->last_used_idx));

  22.     END_USE(vq);
  23.     return ret;
  24. }

当前端没有设置VRING_AVAIL_F_NO_INTERRUPT时,会更新avail->ring[(vr)->num],这也是后端开启event_idxkick前端的条件。VRING_AVAIL_F_NO_INTERRUPT是前端设置,后端只读的。为什么要更新avail->ring[(vr)->num]呢?avail->ring[(vr)->num]中记录的前端已经处理到那个uesd idx了,因为可以及时告诉后端前端处理到什么位置了,后端来根据情况决定是否需要kick前端。

那现在问题又回来了,对于txq既然前端不需要后端kick也能释放buffer,那后端kick有什么用呢?我们再回头看一下start_xmit发送函数,有一下逻辑。

l  start_xmit  (kernel4.9 )

点击(此处)折叠或打开

  1. static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev)
  2. {
  3.     struct virtnet_info *vi = netdev_priv(dev);
  4.     int qnum = skb_get_queue_mapping(skb);
  5.     struct send_queue *sq = &vi->sq[qnum];
  6.     int err;
  7.     struct netdev_queue *txq = netdev_get_tx_queue(dev, qnum);
  8.     bool kick = !skb->xmit_more;
  9. ……
  10.     /* If running out of space, stop queue to avoid getting packets that we
  11.      * are then unable to transmit.
  12.      * An alternative would be to force queuing layer to requeue the skb by
  13.      * returning NETDEV_TX_BUSY. However, NETDEV_TX_BUSY should not be
  14.      * returned in a normal path of operation: it means that driver is not
  15.      * maintaining the TX queue stop/start state properly, and causes
  16.      * the stack to do a non-trivial amount of useless work.
  17.      * Since most packets only take 1 or 2 ring slots, stopping the queue
  18.      * early means 16 slots are typically wasted.
  19.      */
  20.     if (sq->vq->num_free < 2+MAX_SKB_FRAGS) {
  21.         netif_stop_subqueue(dev, qnum);
  22.         if (unlikely(!virtqueue_enable_cb_delayed(sq->vq))) {
  23.             /* More just got used, free them then recheck. */
  24.             free_old_xmit_skbs(sq);
  25.             if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) {
  26.                 netif_start_subqueue(dev, qnum);
  27.                 virtqueue_disable_cb(sq->vq);
  28.             }
  29.         }
  30.     }
  31. ……
  32. }

当前端发送速率过快,从而vq->num_free较少时会调用netif_stop_subqueue(将队列状态设置为__QUEUE_STATE_DRV_XOFF),这样队列的start_xmit函数下次在__dev_queue_xmit中就不会被调用。要想打破这样一个状态,就需要后端的kick了。这里还有一个十分关键的函数,就这在stop_queue之后调用的virtqueue_enable_cb_delayed函数。这个函数中有着至关重要的一个操作,如下:

l  virtqueue_enable_cb_delayed

点击(此处)折叠或打开

  1. bool virtqueue_enable_cb_delayed(struct virtqueue *_vq)
  2. {
  3.     struct vring_virtqueue *vq = to_vvq(_vq);
  4.     u16 bufs;

  5.     START_USE(vq);

  6.     /* We optimistically turn back on interrupts, then check if there was
  7.      * more to do. */
  8.     /* Depending on the VIRTIO_RING_F_USED_EVENT_IDX feature, we need to
  9.      * either clear the flags bit or point the event index at the next
  10.      * entry. Always update the event index to keep code simple. */
  11.     /* 取消设置VRING_AVAIL_F_NO_INTERRUPT,使后端可以发送中断上来 */
  12.     if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) {
  13.         vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT;
  14.         if (!vq->event)
  15.             vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
  16.     }
  17.     /* TODO: tune this threshold */
  18.     bufs = (u16)(vq->avail_idx_shadow - vq->last_used_idx) * 3 / 4;
  19.     /* 更新avail->ring[(vr)->num]以供event_idx使用 */
  20.     virtio_store_mb(vq->weak_barriers,
  21.             &vring_used_event(&vq->vring),
  22.             cpu_to_virtio16(_vq->vdev, vq->last_used_idx + bufs));
  23.     if (unlikely((u16)(virtio16_to_cpu(_vq->vdev, vq->vring.used->idx) - vq->last_used_idx) > bufs)) {
  24.         END_USE(vq);
  25.         return false;
  26.     }

  27.     END_USE(vq);
  28.     return true;
  29. }

首先这个函数做了一个关键操作,就是取消设置VRING_AVAIL_F_NO_INTERRUPT,使后端可以发送中断上来。那么VRING_AVAIL_F_NO_INTERRUPT这个flag是什么时候被置上的呢?这个我们后面回答。另外一点就是会更新更新avail->ring[(vr)->num]告诉前端uesd desc当前消耗到哪里了,但注意这里不是更新为vq->last_used_idx,而是还加了一个bufs,为的就是让后端延时一会再kick前端。

下面我们看一下dpdk后端kick前端的时机,我们以split方式为例说明。

l  virtio_dev_tx_split(dpdk18.11)

点击(此处)折叠或打开

  1. static __rte_always_inline uint16_t
  2. virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.     struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
  4. {
  5.     uint16_t i;
  6.     uint16_t free_entries;

  7.     if (unlikely(dev->dequeue_zero_copy)) {
  8.         struct zcopy_mbuf *zmbuf, *next;

  9.         for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
  10.          zmbuf != NULL; zmbuf = next) {
  11.             next = TAILQ_NEXT(zmbuf, next);

  12.             if (mbuf_is_consumed(zmbuf->mbuf)) {
  13.                 update_shadow_used_ring_split(vq,
  14.                         zmbuf->desc_idx, 0);
  15.                 TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
  16.                 restore_mbuf(zmbuf->mbuf);
  17.                 rte_pktmbuf_free(zmbuf->mbuf);
  18.                 put_zmbuf(zmbuf);
  19.                 vq->nr_zmbuf -= 1;
  20.             }
  21.         }

  22.         if (likely(vq->shadow_used_idx)) {
  23.             /*如果是零拷贝方式,则每次接收前检查之前已经dma完成的报文,更新uesd ring,kick前端*/
  24.             flush_shadow_used_ring_split(dev, vq);
  25.             vhost_vring_call_split(dev, vq);
  26.         }
  27.     }
  28.     ......
  29.     for (i = 0; i < count; i++) {
  30.         ......
  31.         err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
  32.                 mbuf_pool);
  33.         ......
  34.     }
  35.     vq->last_avail_idx += i;

  36.     if (likely(dev->dequeue_zero_copy == 0)) {
  37.         do_data_copy_dequeue(vq);
  38.         if (unlikely(i < count))
  39.             vq->shadow_used_idx = i;
  40.         if (likely(vq->shadow_used_idx)) {
  41.             /* 更新used ring,kick前端 */
  42.             flush_shadow_used_ring_split(dev, vq);
  43.             vhost_vring_call_split(dev, vq);
  44.         }
  45.     }

  46.     return i;
  47. }

如果是零拷贝,则在后端接受逻辑的开始,判断上一次接受的报文是否dma完成,如果完成则更新uesd ring,调用vhost_vring_call_split kick前端。如果不是零拷贝模式,则在将报文从desc拷贝出来后,更新uesd ring,调用vhost_vring_call_split kick前端。关于vhost_vring_call_split的实现在前面已经介绍过了,这里不再重复。

下面看前端收到后端的中断是如何处理的。前面已经介绍过了,对于发送队列,其注册的中断回调函数中会调用skb_xmit_done

l  skb_xmit_done(kernel 4.9)

点击(此处)折叠或打开

  1. static void skb_xmit_done(struct virtqueue *vq)
  2. {
  3.     struct virtnet_info *vi = vq->vdev->priv;

  4.     /* Suppress further interrupts. */
  5.     virtqueue_disable_cb(vq);

  6.     /* We were probably waiting for more output buffers. */
  7.     netif_wake_subqueue(vi->dev, vq2txq(vq));
  8. }

其中virtqueue_disable_cb会将vring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT,这样后端就不会发生kick到前端了(不开启event_idx的情况),然后调用netif_wake_subqueue唤醒被stopqueue(清除__QUEUE_STATE_DRV_XOFF state)。

l  virtqueue_disable_cb(kernel)

点击(此处)折叠或打开

  1. void virtqueue_disable_cb(struct virtqueue *_vq)
  2. {
  3.     struct vring_virtqueue *vq = to_vvq(_vq);

  4.     if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT)) {
  5.         vq->avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
  6.         if (!vq->event)
  7.             vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
  8.     }
  9. }

这里又有个疑问,唤醒stop 状态的queue容易理解,可以设置上了VRING_AVAIL_F_NO_INTERRUPT后,什么时候取消呢?其实前面我们已经介绍过了,就是下次stop queue时调用virtqueue_enable_cb_delayed的处理中。所以我们可以看到,即使不开启event_idx,也不是每次uesd 变化都会kick前端的,而是只有queue stop后才会kick,正常状态前端是会设置VRING_AVAIL_F_NO_INTERRUPT不让后端kick

接收队列后端通知前端

对于接收队列,后端会在将mbuf数据拷贝到avail desc中后,更新uesd ring,然后kick前端。kick前端的目的就是告诉前端,我有数据发送给你了(更新了uesd ring),你可以来取数据了。我们以split方式的接收方向为例。其接受逻辑在virtio_dev_rx_split中实现。

l  virtio_dev_rx_split(dpdk 18.11)

点击(此处)折叠或打开

  1. static __rte_always_inline uint32_t
  2. virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.     struct rte_mbuf **pkts, uint32_t count)
  4. {
  5.     uint32_t pkt_idx = 0;
  6.     uint16_t num_buffers;
  7.     struct buf_vector buf_vec[BUF_VECTOR_MAX];
  8.     uint16_t avail_head;

  9.     rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
  10.     avail_head = *((volatile uint16_t *)&vq->avail->idx);

  11.     for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
  12.         uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
  13.         uint16_t nr_vec = 0;
  14.         /* 为拷贝当前mbuf后续预留avail desc */
  15.         if (unlikely(reserve_avail_buf_split(dev, vq,
  16.                         pkt_len, buf_vec, &num_buffers,
  17.                         avail_head, &nr_vec) < 0)) {
  18.             VHOST_LOG_DEBUG(VHOST_DATA,
  19.                 "(%d) failed to get enough desc from vring\n",
  20.                 dev->vid);
  21.             vq->shadow_used_idx -= num_buffers;
  22.             break;
  23.         }

  24.         rte_prefetch0((void *)(uintptr_t)buf_vec[0].buf_addr);

  25.         VHOST_LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n",
  26.             dev->vid, vq->last_avail_idx,
  27.             vq->last_avail_idx + num_buffers);
  28.         /* 拷贝mbuf到avail desc */
  29.         if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
  30.                         buf_vec, nr_vec,
  31.                         num_buffers) < 0) {
  32.             vq->shadow_used_idx -= num_buffers;
  33.             break;
  34.         }
  35.         /* 更新last_avail_idx */
  36.         vq->last_avail_idx += num_buffers;
  37.     }
  38.     /* 小包的批处理拷贝 */
  39.     do_data_copy_enqueue(dev, vq);

  40.     if (likely(vq->shadow_used_idx)) {
  41.         flush_shadow_used_ring_split(dev, vq); /* 更新used ring */
  42.         vhost_vring_call_split(dev, vq); /* 通知前端 */
  43.     }

  44.     return pkt_idx;
  45. }

其中kick前端的处理在最后的vhost_vring_call_split函数中,这个我们在“后端vhost_userkick方式”中已经介绍过,这里就不再重复了。

然后我们看前端的通知处理。根据前面的介绍,接受队列前端注册的中断处理函数最终会调用到skb_recv_done

l  skb_recv_done(kernel 4.9)

点击(此处)折叠或打开

  1. static void skb_recv_done(struct virtqueue *rvq)
  2. {
  3.     struct virtnet_info *vi = rvq->vdev->priv;
  4.     struct receive_queue *rq = &vi->rq[vq2rxq(rvq)];

  5.     /* Schedule NAPI, Suppress further interrupts if successful. */
  6.     if (napi_schedule_prep(&rq->napi)) {
  7.         virtqueue_disable_cb(rvq);
  8.         __napi_schedule(&rq->napi);
  9.     }
  10. }

我们看到这个函数主要工作就是调用virtqueue_disable_cbvring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT从而禁止后端发送中断(不开启event_idx的情况),然后唤起NAPI处理。所以在NAPI的情况后端通知是被关闭的。那么这个flag什么时候会被打开呢?答案就是在virtio_netNAPI处理逻辑中,即virtnet_poll函数。

l  virtnet_poll(kernel 4.9)

点击(此处)折叠或打开

  1. static int virtnet_poll(struct napi_struct *napi, int budget)
  2. {
  3.     struct receive_queue *rq =
  4.         container_of(napi, struct receive_queue, napi);
  5.     unsigned int r, received;

  6.     received = virtnet_receive(rq, budget);

  7.     /* Out of packets? */
  8.     if (received < budget) {
  9.         r = virtqueue_enable_cb_prepare(rq->vq);
  10.         napi_complete_done(napi, received);
  11.         if (unlikely(virtqueue_poll(rq->vq, r)) &&
  12.          napi_schedule_prep(napi)) {
  13.             virtqueue_disable_cb(rq->vq);
  14.             __napi_schedule(napi);
  15.         }
  16.     }

  17.     return received;
  18. }

NAPI处理流程中如果received < budget,证明本轮数据接收已经比较少了,NAPI过程可能要退出了,这时调用virtqueue_enable_cb_prepare将之前的VRING_AVAIL_F_NO_INTERRUPT取消,从NAPI模式进入中断模式。

l  virtqueue_enable_cb_prepare(kernel 4.9)

点击(此处)折叠或打开

  1. unsigned virtqueue_enable_cb_prepare(struct virtqueue *_vq)
  2. {
  3.     struct vring_virtqueue *vq = to_vvq(_vq);
  4.     u16 last_used_idx;

  5.     START_USE(vq);

  6.     /* We optimistically turn back on interrupts, then check if there was
  7.      * more to do. */
  8.     /* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to
  9.      * either clear the flags bit or point the event index at the next
  10.      * entry. Always do both to keep code simple. */
  11.     if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) {
  12.         vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT;
  13.         if (!vq->event)
  14.             vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
  15.     }
  16.     vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, last_used_idx = vq->last_used_idx);
  17.     END_USE(vq);
  18.     return last_used_idx;
  19. }

    注意这个函数除了取消VRING_AVAIL_F_NO_INTERRUPT设置之外,还会更新avail->ring[(vr)->num],以供开启event_idx时后端使用。

接收队列前端通知后端

    下面看接收方向前端通知的过程。首先要清除接收方向前端通知后端的目的,那就是告诉后端avail ring已经更新了(有了更多空buffer),你可以继续放入更多数据了从这里我们也可以看出,前端通知后端,无论发送还是接收方向,都是告诉后端有了更多的avail desc,而后端通知前端,都是告诉前端有了更多的uesd desc。然后我们看前端通知后端的时机,要想知道前端再何时通知后端,我们需要对前端的数据接收流程有个清晰的认识。下面这个图描述了前端vhost_net接收数据的过程。

而通知后端的时机就在try_fill_recv函数调用中。

l  try_fill_recv(kernel 4.9)


点击(此处)折叠或打开

  1. static bool try_fill_recv(struct virtnet_info *vi, struct receive_queue *rq,
  2.              gfp_t gfp)
  3. {
  4.     int err;
  5.     bool oom;

  6.     gfp |= __GFP_COLD;
  7.     /* 针对三种情况分别给desc注入对应的buffer,并更新avail idx */
  8.     do {
  9.         if (vi->mergeable_rx_bufs)
  10.             err = add_recvbuf_mergeable(rq, gfp);
  11.         else if (vi->big_packets)
  12.             err = add_recvbuf_big(vi, rq, gfp);
  13.         else
  14.             err = add_recvbuf_small(vi, rq, gfp);

  15.         oom = err == -ENOMEM;
  16.         if (err)
  17.             break;
  18.     } while (rq->vq->num_free);
  19.     virtqueue_kick(rq->vq); /* kick 后端 */
  20.     return !oom;
  21. }


这个函数首先会根据是否支持mergeable已经是否支持收大包来向avail desc注入对应的空buffer。每种情况下注入buffer产生的desc chain长度是不同的。这里不是重点,我们就不展开了。下面看关键的virtqueue_kick函数。

l  virtqueue_kick(kernel 4.9)


点击(此处)折叠或打开

  1. bool virtqueue_kick(struct virtqueue *vq)
  2. {
  3.     if (virtqueue_kick_prepare(vq))
  4.         return virtqueue_notify(vq);
  5.     return true;
  6. }


其中真正向后端发送通知的是virtqueue_notify,但是首先要通过virtqueue_kick_prepare的判断。virtqueue_kick_prepare这个函数我们之前已经介绍过了,如果不开启event_idx时,会根据vring.used->flags是否设置VRING_USED_F_NO_NOTIFY来决定是否kick后端,而开启event_idx时,则会根据后端填入(vr)->used->ring[(vr)->num]中的后端消耗位置来决定是否kick

同样如果后端使用的是dpdk vhost_user,那么当前后端是不会写(vr)->used->ring[(vr)->num]告诉前端自己的avail使用位置的,如果开启了event_idx后端还是无法收到中断的。那如果dpdk使用中断模式,这里收不到中断是否会有问题呢?我们换位思考一下,对比一下“发送队列后端通知前端”的场景,在“发送队列后端通知前端”中,如果后端不通知前端,那么前端一旦感觉到没有可用的avail desc后就会stop queue,之后就无法被唤醒了。而这里的场景,vhost_user后端没有类似stop queue的操作,所以即使收不到前端的中断也没有问题。但是在“发送队列前端通知后端”的场景中,如果dpdk采用中断模式切开启了event_idx,那么vhost_user就会因为收不到中断而无法取出前端发送的报文。所以对应想使用中断模式,且开启event_idx的场景,vhost_user需要添加对(vr)->used->ring[(vr)->num]的处理。

阅读(5857) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~