Chinaunix首页 | 论坛 | 博客
  • 博客访问: 18339
  • 博文数量: 3
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 20
  • 用 户 组: 普通用户
  • 注册时间: 2011-11-18 16:22
文章分类
文章存档

2019年(3)

我的朋友

分类: LINUX

2019-10-29 10:55:34

dpdk1811virtio1.1 的实现—packed ring

——lvyilong316

    virtio1.1已经在新的kerneldpdk pmd中陆续支持,但是网上关于这一块的介绍却比较少,唯一描述多一点的就是这个ppt 但是看ppt这东西总觉得还是不过瘾的。只是模糊的大概理解,但要想看清其本质还是要看代码。这篇文章主要是基于dpdk18.11中的vhost_user来分析virtio1.1具有哪些新特性,已经具体是如何工作的。

    virtio1.1 关键的最大改动点就是引入了packed queue,也就是将virtio1.0中的desc ringavail ringused ring三个ring打包成一个desc ring了。向对应的,我们将virtio 1.0这种实现方式称之为split ring。我们以vm的接收处理逻辑(vhost_user的发送逻辑)为例分析一下split packed方式的区别。

    在virtio_dev_rx中有如下实现:

点击(此处)折叠或打开

  1. if (vq_is_packed(dev))
  2.         nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
  3.     else
  4.         nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);

  根据后端设备是否支持VIRTIO_F_RING_PACKED这个feature,分别调用packedsplit处理函数。我们先回顾了解下我们看下其分别实现的流程。

split方式处理

l  virtio_dev_rx_split

点击(此处)折叠或打开

  1. static __rte_always_inline uint32_t
  2. virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.     struct rte_mbuf **pkts, uint32_t count)
  4. {
  5.     uint32_t pkt_idx = 0;
  6.     uint16_t num_buffers;
  7.     struct buf_vector buf_vec[BUF_VECTOR_MAX];
  8.     uint16_t avail_head;

  9.     rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
  10.     avail_head = *((volatile uint16_t *)&vq->avail->idx);

  11.     for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
  12.         uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
  13.         uint16_t nr_vec = 0;
  14.         /* 为拷贝当前mbuf后续预留avail desc */
  15.         if (unlikely(reserve_avail_buf_split(dev, vq,
  16.                         pkt_len, buf_vec, &num_buffers,
  17.                         avail_head, &nr_vec) < 0)) {
  18.             vq->shadow_used_idx -= num_buffers;
  19.             break;
  20.         }
  21.         /* 拷贝mbuf到avail desc */
  22.         if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
  23.                         buf_vec, nr_vec,
  24.                         num_buffers) < 0) {
  25.             vq->shadow_used_idx -= num_buffers;
  26.             break;
  27.         }
  28.         /* 更新last_avail_idx */
  29.         vq->last_avail_idx += num_buffers;
  30.     }
  31.     /* 小包的批处理拷贝 */
  32.     do_data_copy_enqueue(dev, vq);

  33.     if (likely(vq->shadow_used_idx)) {
  34.         flush_shadow_used_ring_split(dev, vq); /* 更新used ring */
  35.         vhost_vring_call_split(dev, vq); /* 通知前端 */
  36.     }

  37.     return pkt_idx;
  38. }

    其中涉及三处会和packed方式处理不同的地方,从函数名字我们也能看出,就是带有split的函数,对应一定有packed函数。split收包处理流程这里不再具体展开,下图描述了split相关数据结构。下面重点以这个图为背景大致描述一下guset接收流程。

    图中黄色部分表示guest内存,绿色部分表示host内存(非共享内存)。可以看到共享内存主要由三部分也就是三个ring构成:desc ringavail ringused ring。这三个ring也是split模式的核心构成。

    首先是desc ring,有多个desc chain构成,用来指向存放数据的地址。desc中有个flag,主要有以下几个取值:

点击(此处)折叠或打开

  1. /* This marks a buffer as continuing via the next field. */
  2. #define VRING_DESC_F_NEXT 1
  3. /* This marks a buffer as write-only (otherwise read-only). */
  4. #define VRING_DESC_F_WRITE 2
  5. /* This means the buffer contains a list of buffer descriptors. */
  6. #define VRING_DESC_F_INDIRECT 4

    其次是avail ring,注意avail->idx 不是desc ringidx,而是avail->ringidx,对应的avail->ring[idx]最后一个后端可用的(对前端来说是下一个可用)desc chainheader idxlast_avail_idx也不是desc ringidx,记录的也是avail->ringidx,对应的avail->ring[idx]表示上一轮拷贝用到的最后一个desc chainheader idxavail->ring[idx+1]为本轮拷贝可用的第一个desc chainheader idx)。avail ring中也有一个flag,目前只会取值VRING_AVAIL_F_NO_INTERRUPT,其作用是让guest通过设置这个来告诉后端(如果更新了uesd ring)暂时不用kick前端,作为前端的一个优化

    最后是uesd ring,再讲used ring前要提一下shadow_used ringshadow_used ringvhost user为了提高性能分配的一个ring,它和其他三个ring不同,它是host内存,guest是不感知的。仔细观察上图就可以看出shadow_used ringuesd->ring指向的结构是完全一样的。因为shadow_used ring正是uesd ring的一个暂存的buff。当后端将数据从对应desc chain记录的内存拷贝出之后,这些desc chainidxlen就需要先暂时记录在shadow_used ring中,等这一批mbuf拷贝完后,再将shadow_used ring一次性拷贝到uesd->ring的对应位置。同样last_used_idx记录的也不是desc ringidx,而是used->ringidx,对应used->ring[idx]记录的是上一次后端已经处理好可以给前端释放(对于guest rx来说)的desc chainheader idxused ring中也有一个flag,目前只会取值VRING_USED_F_NO_NOTIFY,其作用是让host使用这个值告诉前端,当用可用的avail ring时不要kick hostdpdk vhost_user默认会设置这个flag,因为后端采用的是polling模式

    另外值得一提的是,整个guest rx涉及到两次位置的向guest内存拷贝的动作,一个是将mbuf中的数据拷贝到desc中(对应函数copy_mbuf_to_desc),另一处是将shadow_uesd ring拷贝到uesd ring的过程中(对应函数flush_shadow_used_ring_split),所以如果在guest热迁移过程中这两处都会涉及到log_page的相关操作

 

关于split ring的其他一些注释:

1. 每个virtqueue由三部分组成:

  1 Descriptor Table

  2 Available Ring

  3 Used Ring

2. Legacy Interfaces

  1vq需要严格的按照以下顺序和pad 布局


点击(此处)折叠或打开

  1. struct virtq {
  2. // The actual descriptors (16 bytes each)
  3. struct virtq_desc desc[ Queue Size ];
  4. // A ring of available descriptor heads with free-running index.
  5. struct virtq_avail avail;
  6. // Padding to the next Queue Align boundary.
  7. u8 pad[ Padding ];
  8. // A ring of used descriptor heads with free-running index.
  9. struct virtq_used used;
  10. };


3. avail desc中的ring存放的是desc chainheader desc iddescid表示的是下一个可用(对于前端)的desc id

packed方式处理

    前面回顾分析完split的处理方式后下面重点要分析packed的处理方式,这是virtio1.1改变的重点。packed的关键变化是desc ring的变化,为了更好的利用cache和硬件的亲和性(方便硬件实现virtio),将split方式中的三个ringdescavailused)打包成一个packed desc ring

我们首先看些相对split desc来说packed desc有什么不同。


点击(此处)折叠或打开

  1. /*split desc*/
  2. struct vring_desc {
  3.     uint64_t addr; /* Address (guest-physical). */
  4.     uint32_t len; /* Length. */
  5.     uint16_t flags; /* The flags as indicated above. */
  6.     uint16_t next; /* We chain unused descriptors via this. */
  7. };
  8. /*packed desc*/
  9. struct vring_packed_desc {
  10.     uint64_t addr;
  11.     uint32_t len;
  12.     uint16_t id;
  13.     uint16_t flags;
  14. };


    我们看到addrlen名字和含义保持不变,flags看起来也没有变化,实际上其取值多了几种。下面我们具体分析其变化的原因,以及每个变化字段的含义。

(1)     相对split desc去掉了next字段:我们知道在split descnext字段是记录一个desc chain中的下一个desc idx使用的,通常配合flags这样使用:

           if ((descs[idx].flags & VRING_DESC_F_NEXT) == 1)

               nextdesc = descs[ descs[idx].next];

    但是在packed desc ring中一个desc chain一定是相邻的(可以理解为链表变为了数组),所以next字段就用不上了,上面获取nextdesc的方式可以转化为如下方式:

    if ((descs[idx].flags & VRING_DESC_F_NEXT) == 1)

        nextdesc = descs[++idx];

(2)     flags字段的变化:相对split descflags字段仍然保留,但是其取值增加了,因为要把三个ring合一,每个desc就需要更多的信息表明身份(是used还是avail)。在原有flags的基础上增加了两个flag

    #define VRING_DESC_F_AVAIL          (1ULL << 7)

#define VRING_DESC_F_USED (1ULL << 15)

关于这两个flag如何使用后面再分析。

(3)     相对split desc增加了id字段:这个id比较特殊,他是buffer id,注意不是desc的下标idx。那么这个buffer又是个什么含义呢?其实可用理解为前端guest维护的一个mbuf数组,这个buffer就是这个数组的idx,用来发送或接受数据。为了更准确描述buffer id的来历,我们看下前端是如果将一个avail buffer关联到一个desc的:

对每个(foreach)将要发送的buffer, b:

1.desc ring中获取到下一个可用的descd

2.获取下一个可用的buffer id

3.设置d.addr的值为b的数据起始物理地址;

4.设置d.len的值为b的数据长度;

5.设置d.idbuffer id

6.采用如下方式生成descflag:

(a)如果b是后端可写的,则设置VIRTQ_DESC_F_WRITE,否则不设置;

(b)按照avail ringWrap Counter值设置VIRTQ_DESC_F_AVAIL

(c)按照avail ring Wrap Counter值取反设置VIRTQ_DESC_F_USED

7. 调用一下memory barrier确保desc已经被初始化;

8.设置d.flags为刚刚生成的flag

9.如果davail ring的最后一个desc,则对Wrap Counter进行翻转;

10.否则增加d指向下一个desc

附上伪代码实现:


点击(此处)折叠或打开

  1. /* Note: vq->avail_wrap_count is initialized to 1 */
  2. /* Note: vq->sgs is an array same size as the ring */
  3. id = alloc_id(vq);
  4. first = vq->next_avail;
  5. sgs = 0;
  6. for (each buffer element b) {
  7. sgs++;
  8. vq->ids[vq->next_avail] = -1;
  9. vq->desc[vq->next_avail].address = get_addr(b);
  10. vq->desc[vq->next_avail].len = get_len(b);
  11. avail = vq->avail_wrap_count ? VIRTQ_DESC_F_AVAIL : 0;
  12. used = !vq->avail_wrap_count ? VIRTQ_DESC_F_USED : 0;
  13. f = get_flags(b) | avail | used;
  14. if (b is not the last buffer element) {
  15.     f |= VIRTQ_DESC_F_NEXT;
  16. }
  17. /* Don't mark the 1st descriptor available until all of them are ready. */
  18. if (vq->next_avail == first) {
  19.     flags = f;
  20. } else {
  21.     vq->desc[vq->next_avail].flags = f;
  22. }
  23. last = vq->next_avail;
  24. vq->next_avail++;
  25. if (vq->next_avail >= vq->size) {
  26.     vq->next_avail = 0;
  27.     vq->avail_wrap_count \^= 1;
  28.     }
  29. }
  30. vq->sgs[id] = sgs;
  31. /* ID included in the last descriptor in the list */
  32. vq->desc[last].id = id;
  33. write_memory_barrier();
  34. vq->desc[first].flags = flags;
  35. memory_barrier();
  36. if (vq->device_event.flags != RING_EVENT_FLAGS_DISABLE) {
  37.     notify_device(vq);
  38. }


       注意上面实现的一个细节:当需要传递多个buffer的时候,第一个descflag是延时到最后更新的,这样可以减少memory_barrier调用次数,一次调用确保之后的desc都已经正常初始化了(为什么最后更新flag,以及要调用memory_barrier呢?因为后端是以flag判断desc是否可以使用,所以需要确保flag设置时其他字段以及被正确设置写入内存)。最后再附一张desc ring的图。

    另外注意,由于avail uesd都统一到了desc中,但是并不是每个字段都是必须的。avail ringused ring是如何体现的?

      packed把三个ring进行了整合,但virtio的本质思想并没有变化,整个数据传输还是availused共同作用完成的,所以三ring合一仅仅是形式的变化,avail ringused ring并没有消失。那么自然就有一个问题:avail ringused ring是如何体现的?

    回答这个问题前,我们先看一下virtiovq为了支持packed发生的一些变化。


点击(此处)折叠或打开

  1. struct vhost_virtqueue {

  2.     bool            used_wrap_counter;
  3.     bool            avail_wrap_counter;

  4. }


    这两个wrap_counter分别对应avail ringused ringpacked方式正式通过这两个bool型变量以及前面提到的packed desc新增的两个flag完成availuesd的区分的。

    首先这两个wrap_counter在初始化队列的时候都被初始化为1

    对于avail ring,当使用了最后一个desc时则将avail_wrap_counter进行翻转(0变为1,1变为0),然后再从第一个开始;对于uesd ring,当使用了最后一个desc时将used_wrap_counter进行翻转,然后再从第一个开始。

    有了上面的前提就可以说明avail descused desc是如果表示的了:

    avail desc:当desc flags关于VRING_DESC_F_AVAIL的设置和avail_wrap_counter同步,且VRING_DESC_F_USED的设置和avail_wrap_counter相反时,表示descavail desc。例如avail_wrap_counter1时,flags应该设置VRING_DESC_F_AVAIL|~VRING_DESC_F_USED,当avail_wrap_counter0时,flags应该设置~VRING_DESC_F_AVAIL|VRING_DESC_F_USED

    used desc:当desc flags关于VRING_DESC_F_USED的设置和used_wrap_counter同步,且VRING_DESC_F_AVAIL的设置也和used_wrap_counter同步时,表示descused desc。例如used_wrap_counter1时,flags应该设置VRING_DESC_F_AVAIL|VRING_DESC_F_USED,当used_wrap_counter0时,flags应该设置~VRING_DESC_F_AVAIL|~VRING_DESC_F_USED

    综上可以看出,avail desc的两个flag总是相反的(只能设置一个),而used desc的两个flag总是相同的,要么都设置,要么都不设置。

    看到这里可能有人会奇怪,为什么要搞得这么麻烦呢?仅仅通过两个flags应该也可以区分出是uesd还是avail吧。那我们看下面这个图,以avail desc为例,假如仅仅靠VRING_DESC_F_AVAIL|~VRING_DESC_F_USED就表示avail desc

    图中情况表示当前avail ring满了,没有uesddesc,这个时候如果后端处理完最后一个avail desc,回绕到第一个avail desc时,就无法区分这个avail desc是新的avail desc还是已经处理过的desc。而如果结合avail_wrap_counter就很好处理了,假如本轮其值为1,则遍历到最后一个avail descavail_wrap_counter要被置零了,再继续遍历到第一个desc时判断是avail desc的标准就变为了~USED|AVAIL,所以第一个desc就不满足条件了。

    所以我们看出引入wrap_counter的作用主要是为了解决desc ring回绕问题。在split方式中,由于对于avail ringavail->idx存放当前最后一个可用avail desc的位置,对于uesd ringused->idx存放最后一个可用的uesd desc位置,而packed方式中三ring合一,不再有这样一个变量表示ring的结束位置,所以才引入了这么个机制

 

关于packed ring的其他一些注释:

1. Packed virtqueues支持2^15 entries;

2. 每个packed virtqueue 有三部分构成:

   1Descriptor Ring

   2Driver Event Suppression:后端(device)只读,用来控制后端向前端(driver)的通知(used notifications

   3Device Event Suppression:前端(driver)只读,用来控制前端向后端(device)的通知(avail notifications

3. Write FlagVIRTQ_DESC_F_WRITE

   1)对于avail desc这个flag用来标记其关联的buffer是只读的还是只写的;

   2)对于used desc这个flag用来表示去关联的buffer是否有被后端(device)写入数据;

4. desc中的len

   1)对于avail desclen表示desc关联的buffer中被写入的数据长度;

   2)对于uesd desc,当VIRTQ_DESC_F_WRITE被设置时,len表示后端(device)写入数据的长度,当VIRTQ_DESC_F_WRITE没有被设置时,len没有意义;

5. Descriptor Chain

   buffer id包含在desc chain的最后一个desc中,另外,VIRTQ_DESC_F_NEXTused desc中是没有意义的。

 

    好了,说了这么多我们大概对packed的实现原理清楚了,那么接下来就看下具体实现,还是以vm收包方向的后端处理逻辑为例。

l  virtio_dev_rx_packed


点击(此处)折叠或打开

  1. static __rte_always_inline uint32_t
  2. virtio_dev_rx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.     struct rte_mbuf **pkts, uint32_t count)
  4. {
  5.     uint32_t pkt_idx = 0;
  6.     uint16_t num_buffers;
  7.     struct buf_vector buf_vec[BUF_VECTOR_MAX];

  8.     for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
  9.         uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
  10.         uint16_t nr_vec = 0;
  11.         uint16_t nr_descs = 0;
  12.          /* 为拷贝当前mbuf后续预留avail desc */
  13.         if (unlikely(reserve_avail_buf_packed(dev, vq,
  14.                         pkt_len, buf_vec, &nr_vec,
  15.                         &num_buffers, &nr_descs) < 0)) {
  16.             vq->shadow_used_idx -= num_buffers;
  17.             break;
  18.         }
  19.         rte_prefetch0((void *)(uintptr_t)buf_vec[0].buf_addr);

  20.          /* 拷贝mbuf到avail desc */
  21.         if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
  22.                         buf_vec, nr_vec,
  23.                         num_buffers) < 0) {
  24.             vq->shadow_used_idx -= num_buffers;
  25.             break;
  26.         }

  27.         vq->last_avail_idx += nr_descs;
  28.         if (vq->last_avail_idx >= vq->size) {
  29.             vq->last_avail_idx -= vq->size;
  30.             vq->avail_wrap_counter ^= 1;
  31.         }
  32.     }
  33.     /* 小包的批处理拷贝 */
  34.     do_data_copy_enqueue(dev, vq);

  35.     if (likely(vq->shadow_used_idx)) {
  36.         /* 更新used ring */
  37.         flush_shadow_used_ring_packed(dev, vq);
  38.         /* kick 前端 */
  39.         vhost_vring_call_packed(dev, vq);
  40.     }

  41.     return pkt_idx;
  42. }


    函数中带有packed后缀的都是packed方式的特有处理实现。我们先看reserve_avail_buf_packed,这个函数为拷贝当前mbuf后续预留avail desc

l  reserve_avail_buf_packed


点击(此处)折叠或打开

  1. static inline int
  2. reserve_avail_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.                 uint32_t size, struct buf_vector *buf_vec,
  4.                 uint16_t *nr_vec, uint16_t *num_buffers,
  5.                 uint16_t *nr_descs)
  6. {
  7.     uint16_t avail_idx;
  8.     uint16_t vec_idx = 0;
  9.     uint16_t max_tries, tries = 0;

  10.     uint16_t buf_id = 0;
  11.     uint32_t len = 0;
  12.     uint16_t desc_count;

  13.     *num_buffers = 0;
  14.     avail_idx = vq->last_avail_idx;
  15.     /* 如果支持mergeable特性,则一个mbuf可用使用多个desc chain */
  16.     if (rxvq_is_mergeable(dev))
  17.         max_tries = vq->size - 1;
  18.     else
  19.         max_tries = 1;

  20.     while (size > 0) {
  21.         if (unlikely(++tries > max_tries))
  22.             return -1;
  23.         /* 尝试填充一个desc chain */
  24.         if (unlikely(fill_vec_buf_packed(dev, vq,
  25.                         avail_idx, &desc_count,
  26.                         buf_vec, &vec_idx,
  27.                         &buf_id, &len,
  28.                         VHOST_ACCESS_RW) < 0))
  29.             return -1;

  30.         len = RTE_MIN(len, size);
  31.         /* 将当前使用的desc chian信息同步到shadow_used_packed ring 中 */
  32.         update_shadow_used_ring_packed(vq, buf_id, len, desc_count);
  33.         size -= len;

  34.         avail_idx += desc_count;
  35.         if (avail_idx >= vq->size)
  36.             avail_idx -= vq->size;

  37.         *nr_descs += desc_count;
  38.         *num_buffers += 1;
  39.     }

  40.     *nr_vec = vec_idx;

  41.     return 0;
  42. }


     下面看fill_vec_buf_packed,这个函数是将mbuf填充到当前desc chain中(如果mbuf过大,不保证填完,只负责填充当前desc chian)。

l  fill_vec_buf_packed

主要倒数第三个参数是返回的buffer id


点击(此处)折叠或打开

  1. static __rte_always_inline int
  2. fill_vec_buf_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.                 uint16_t avail_idx, uint16_t *desc_count,
  4.                 struct buf_vector *buf_vec, uint16_t *vec_idx,
  5.                 uint16_t *buf_id, uint32_t *len, uint8_t perm)
  6. {
  7.     bool wrap_counter = vq->avail_wrap_counter;
  8.     struct vring_packed_desc *descs = vq->desc_packed;
  9.     uint16_t vec_id = *vec_idx;
  10.     /* 如果avail idx发送了回绕,则wrap_counter要进行翻转 */
  11.     if (avail_idx < vq->last_avail_idx)
  12.         wrap_counter ^= 1;
  13.     /* 判断是否是avail desc */
  14.     if (unlikely(!desc_is_avail(&descs[avail_idx], wrap_counter)))
  15.         return -1;

  16.     *desc_count = 0;
  17.     *len = 0;
  18.     while (1) {
  19.         if (unlikely(vec_id >= BUF_VECTOR_MAX))
  20.             return -1;

  21.         *desc_count += 1;
  22.         *buf_id = descs[avail_idx].id; /* buf_id记录的是使用的最后一个avail desc的id */
  23.         if (descs[avail_idx].flags & VRING_DESC_F_INDIRECT) {
  24.             if (unlikely(fill_vec_buf_packed_indirect(dev, vq,
  25.                             &descs[avail_idx],
  26.                             &vec_id, buf_vec,
  27.                             len, perm) < 0))
  28.                 return -1;
  29.         } else {
  30.             *len += descs[avail_idx].len;

  31.             if (unlikely(map_one_desc(dev, vq, buf_vec, &vec_id,
  32.                             descs[avail_idx].addr,
  33.                             descs[avail_idx].len,
  34.                             perm)))
  35.                 return -1;
  36.         }
  37.         if ((descs[avail_idx].flags & VRING_DESC_F_NEXT) == 0)
  38.             break;
  39.         if (++avail_idx >= vq->size) {
  40.             avail_idx -= vq->size;
  41.             wrap_counter ^= 1;
  42.         }
  43.     }

  44.     *vec_idx = vec_id;
  45.     return 0;
  46. }


    其中需要注意的有三点,首先,buf_id记录的是使用的最后一个avail descbuffer id,这个id会在shadow_uesd中使用,然后是当avail ring出现翻转的时候,同步翻转对应的wrap_counter。再一点就是desc_is_avail函数,用来判断当前desc是否是avail desc

l  desc_is_avail


点击(此处)折叠或打开

  1. static inline bool
  2. desc_is_avail(struct vring_packed_desc *desc, bool wrap_counter)
  3. {
  4.     /* VRING_DESC_F_AVAIL的设置和wrap_counter一致,且VRING_DESC_F_USED的设置和wrap_counter相反时表示设avail desc */
  5.     return wrap_counter == !!(desc->flags & VRING_DESC_F_AVAIL) &&
  6.         wrap_counter != !!(desc->flags & VRING_DESC_F_USED);
  7. }


    我们看到这个判断逻辑原理和之前我们讲的packed方式中availuesd desc是如果区分的相同。即avail desc需要VRING_DESC_F_AVAIL这个flag的设置和avail wrap_counter一致,且VRING_DESC_F_USED的设置和avail wrap_counter相反。

    下面回头看update_shadow_used_ring_packed函数,这个函数将当前使用的desc chian信息同步到shadow_used_packed ring 中。

l  update_shadow_used_ring_packed


点击(此处)折叠或打开

  1. static __rte_always_inline void
  2. update_shadow_used_ring_packed(struct vhost_virtqueue *vq,
  3.              uint16_t desc_idx, uint32_t len, uint16_t count)
  4. {
  5.     uint16_t i = vq->shadow_used_idx++;

  6.     vq->shadow_used_packed[i].id = desc_idx; /*desc chain最后一个avail desc的buffer id*/
  7.     vq->shadow_used_packed[i].len = len;
  8.     vq->shadow_used_packed[i].count = count;
  9. }


    注意这里的count是当前desc chain中使用的desc个数,desc_idx是当前desc chain使用的最后一个descbuffer idxsplit方式shadow_usedid记录的是当前desc chain头部descid

    另外一个关键的地方,shadow_used_packed相对shadow_used_split 多了一个count字段,用来记录当前desc chain中使用的desc个数。这个作用我们后面马上分析。

 

    flush_shadow_used_ring_packed函数用来根据shadow_used ring的信息更新uesd ring。我们看其具体实现。

l  update_shadow_used_ring_split


点击(此处)折叠或打开

  1. static __rte_always_inline void
  2. update_shadow_used_ring_split(struct vhost_virtqueue *vq,
  3.              uint16_t desc_idx, uint32_t len)
  4. {
  5.     uint16_t i = vq->shadow_used_idx++;

  6.     vq->shadow_used_split[i].id = desc_idx;
  7.     vq->shadow_used_split[i].len = len;
  8. }

  9. static __rte_always_inline void
  10. flush_shadow_used_ring_packed(struct virtio_net *dev,
  11.             struct vhost_virtqueue *vq)
  12. {
  13.     int i;
  14.     uint16_t used_idx = vq->last_used_idx;

  15.     /* Split loop in two to save memory barriers */
  16.     for (i = 0; i < vq->shadow_used_idx; i++) {
  17.         vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
  18.         vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
  19.         /* count的作用就一个是用来判断desc ring发送回绕 */
  20.         used_idx += vq->shadow_used_packed[i].count;
  21.         if (used_idx >= vq->size)
  22.             used_idx -= vq->size;
  23.     }

  24.     rte_smp_wmb();
  25.     /* 将desc 标记为uesd desc */
  26.     for (i = 0; i < vq->shadow_used_idx; i++) {
  27.         uint16_t flags;

  28.         if (vq->shadow_used_packed[i].len)
  29.             flags = VRING_DESC_F_WRITE;
  30.         else
  31.             flags = 0;

  32.         if (vq->used_wrap_counter) {
  33.             flags |= VRING_DESC_F_USED;
  34.             flags |= VRING_DESC_F_AVAIL;
  35.         } else {
  36.             flags &= ~VRING_DESC_F_USED;
  37.             flags &= ~VRING_DESC_F_AVAIL;
  38.         }

  39.         vq->desc_packed[vq->last_used_idx].flags = flags;
  40.         /* log page 更新热迁移bitmap*/
  41.         vhost_log_cache_used_vring(dev, vq,
  42.                     vq->last_used_idx *
  43.                     sizeof(struct vring_packed_desc),
  44.                     sizeof(struct vring_packed_desc));
  45.          /* count的作用另一个是用来更新last_used_idx */
  46.         vq->last_used_idx += vq->shadow_used_packed[i].count;
  47.         if (vq->last_used_idx >= vq->size) {
  48.             vq->used_wrap_counter ^= 1;
  49.             vq->last_used_idx -= vq->size;
  50.         }
  51.     }

  52.     rte_smp_wmb();
  53.     vq->shadow_used_idx = 0;
  54.     vhost_log_cache_sync(dev, vq);
  55. }


      注意为什么先更新uesd desc的其他字段,最后才一起更新flag,而不是第一次循环就一起吧flag更新了,这个原因其实我们前面讲“buffer id”的时候已经说明了。对端(前端)是更加desc flag判断desc是否可用的,所以在更新flag需要有memory barrier,确保其他字段以及正确初始化到内存,为了减少memory barrier的调用,所以单独进行flag更新。

    这个函数需要关注的地方还是比较多的。首先我们看到shadow_used_packed count字段的作用,其一是用来判断desc ring发送回绕,可以看到packed方式uesd descdesc中不是连续的,而是会跳隔:used_idx += vq->shadow_used_packed[i].count,这个在split中是不存在的,因为splituesd有单独的ring,所以uesd是连续的,直接就可以使用起始位置和要拷贝的uesd desc长度就可以判断used ring回绕了(注意一个是判断desc ring回绕,一个是判断uesd ring回绕,packed没有单独的uesd ring)。另外一个作用是用来更新last_used_idxpackedlast_used_idx表示的是使用的desc chain的最后的desc idx,而split方式中表示的是使用的desc chainheader idx

    然后就是标记descuesd desc,我们之前已经讲过,uesd desc需要VRING_DESC_F_USEDVRING_DESC_F_AVAIL一致且和used_wrap_counter一致。

    关于通知前端的逻辑vhost_vring_call_packed我们下一次再分析。

阅读(6619) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~