Chinaunix首页 | 论坛 | 博客
  • 博客访问: 18336
  • 博文数量: 3
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 20
  • 用 户 组: 普通用户
  • 注册时间: 2011-11-18 16:22
文章分类
文章存档

2019年(3)

我的朋友

分类: LINUX

2019-10-28 15:16:56

原文地址:vhost_user mergeable 特性 作者:lvyilong316

vhost_user mergeable 特性

——lvyilong316

vhost_user在收包时(将数据包发往vm内部)会调用rte_vhost_enqueue_burst函数,这个函数的实现如下:

l  rte_vhost_enqueue_burst


点击(此处)折叠或打开

  1. uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
  2.          struct rte_mbuf **pkts, uint16_t count)
  3. {
  4.          struct virtio_net *dev = get_device(vid);
  5.  
  6.          if (!dev)
  7.                    return 0;
  8.  
  9.          if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
  10.                    return virtio_dev_merge_rx(dev, queue_id, pkts, count);
  11.          else
  12.                    return virtio_dev_rx(dev, queue_id, pkts, count);
  13. }


     我们可以看到根据vhost_user后端设备是否支持VIRTIO_NET_F_MRG_RXBUF特性会调用不同函数,其中不支持时调用的virtio_dev_rx之前已经分析过,这里看下virtio_dev_merge_rx的具体实现。

l  virtio_dev_merge_rx


点击(此处)折叠或打开

  1. static inline uint32_t __attribute__((always_inline))
  2. virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
  3.          struct rte_mbuf **pkts, uint32_t count)
  4. {
  5.          struct vhost_virtqueue *vq;
  6.          uint32_t pkt_idx = 0;
  7.          uint16_t num_buffers;
  8.          struct buf_vector buf_vec[BUF_VECTOR_MAX];
  9.          uint16_t avail_head;
  10.  
  11.     /*获取对应的queue*/
  12.          vq = dev->virtqueue[queue_id];
  13.          if (unlikely(vq->enabled == 0))
  14.                    return 0;
  15.  
  16.          count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
  17.          if (count == 0)
  18.                    return 0;
  19.     /*avail->ring[vq->last_avail_idx & (vq->size - 1)]记录着首个可用的desc index,
  20.           *avail->ring[vq->avail->idx & (vq->size - 1)]记录着最后一个可用的desc index*/
  21.          rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
  22.  
  23.          vq->shadow_used_idx = 0;
  24.          /*avail->ring[avail_head]记录着最后一个可用的desc index*/
  25.          avail_head = *((volatile uint16_t *)&vq->avail->idx);
  26.          /*遍历每一个要发送的数据包*/
  27.          for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
  28.                    uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
  29.         /*预留足够的desc来存放mbuf,使用buf_vec来记录,每个buf_vec对应一个desc,
  30.              所以num_buffers就是存放这个数据包所需的desc chain的个数*/
  31.                    if (unlikely(reserve_avail_buf_mergeable(dev, vq,
  32.                                                         pkt_len, buf_vec, &num_buffers,
  33.                                                         avail_head) < 0)) {
  34.                             LOG_DEBUG(VHOST_DATA,
  35.                                      "(%d) failed to get enough desc from vring\n",
  36.                                      dev->vid);
  37.                             vq->shadow_used_idx -= num_buffers;
  38.                             break;
  39.                    }
  40.  
  41.                    LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n",
  42.                             dev->vid, vq->last_avail_idx,
  43.                             vq->last_avail_idx + num_buffers);
  44.         /*根据buf_vec中记录的desc信息,将当前数据包(mbuf)拷贝到这些desc中*/
  45.                    if (copy_mbuf_to_desc_mergeable(dev, pkts[pkt_idx],
  46.                                                         buf_vec, num_buffers) < 0) {
  47.                             vq->shadow_used_idx -= num_buffers;
  48.                             break;
  49.                    }
  50.         /*更新last_avail_idx*/
  51.                    vq->last_avail_idx += num_buffers;
  52.          }
  53.  
  54.          if (likely(vq->shadow_used_idx)) {
  55.                    flush_shadow_used_ring(dev, vq);
  56.  
  57.                    /* flush used->idx update before we read avail->flags. */
  58.                    rte_mb();
  59.  
  60.                    /* Kick the guest if necessary. */
  61.                    if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
  62.                                      && (vq->callfd >= 0))
  63.                             eventfd_write(vq->callfd, (eventfd_t)1);
  64.          }
  65.  
  66.          return pkt_idx;
  67. }


这里主要引入了一个struct buf_vector的数组,这个数组和desc是一一对应的,其关系如下所示:


    那么为什么要引入这个buf_vector数组呢?首先我们知道所谓merge_rx的功能,主要是为了vm接收大包(实现LRO的功能)。一般来说,一个mbuf会对应转换成为一个desc,但是当mbuf稍微大点,一个desc无法装下怎么办呢?

这时不要忘了desc中的next成员,即desc也是可以形成chain的,如果当前desc无法装下mbuf,那么就用vq->desc[desc->next]来继续存放。如下图:


desc1desc3desc5形成一个chainmbufdesc1desc3存放。那么如果mbuf再大点呢?达到desc1desc3desc5三个desc都无法存放呢?这时如果没有打开VIRTIO_NET_F_MRG_RXBUF调用virtio_dev_rx就会接收出错了(尽管还有其他可用的desc,但由于不在一个chain,所以也不会使用)。但是如果打开了VIRTIO_NET_F_MRG_RXBUF,则会尝试找其他其他的chain

还以上图为例,desc数组中共有两个chain1desc1->desc3->desc5; chain2:desc2->desc4->desc6,那么当chain1无法存下这个mbuf数据时,mbuf剩下的数据将由chain2存放。主要:一个mbuf绝不可能按照desc1àdesc2的顺序存放,因为desc1desc2属于不同的chain,只有当前chain使用完才能使用另一个chain

由于desc数组的顺序不一定是按照chain的顺序组织的(如上图),所以为了方便后续的mbuf到各个desc的拷贝操作,我们增加了buf_vector这个数组,用它来记录拷贝mbufdesc顺序。如下图所示:


    这样我们拷贝mbuf时就可以按照buf_vector1buf_vector6依次拷贝到对应的desc中了。

另外我们注意一点,在函数的最后会对last_avail_idx进行更新:vq->last_avail_idx += num_buffers; 注意这里的num_buffers不是这个mbuf使用的desc个数,而是使用的desc chain个数。还以上面mbuf使用两个chain为例,last_avail_idx需要加2,如下图所示,如果之前last_avail_idx1的话,这里就要更新为3了。


     这里容易混淆的一点是:avail->ring中存放的index不是和desc一一对应的,而是和desc chain一一对应的,即只存放desc chain headerindex。所以avail->idx-last_avail_id也不是可用desc的个数,而是可用desc chain的个数

 

有了以上背景,我们再看reserve_avail_buf_mergeable这个函数就容易理解多了。这个函数的作用就是:预留足够的desc来存放mbuf,同时使用buf_vec来记录,每个buf_vec对应一个desc,如果当前所有可用的desc都无法装得下这个mbuf则返错。

l  reserve_avail_buf_mergeable


点击(此处)折叠或打开

  1. static inline int
  2. reserve_avail_buf_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.                                      uint32_t size, struct buf_vector *buf_vec,
  4.                                      uint16_t *num_buffers, uint16_t avail_head)
  5. {
  6.          uint16_t cur_idx;
  7.          uint32_t vec_idx = 0;
  8.          uint16_t tries = 0;
  9.  
  10.          uint16_t head_idx = 0;
  11.          uint16_t len = 0;
  12.     /*num_buffers为要使用的desc chain的个数*/
  13.          *num_buffers = 0;
  14.          cur_idx = vq->last_avail_idx;
  15.     /*每次遍历一个desc chain,遍历过的chain加起来可以存放下这个mbuf*/
  16.          while (size > 0) {
  17.                   if (unlikely(cur_idx == avail_head))
  18.                             return -1;
  19.         /*fill_vec_buf的作用遍历下一个desc chain,用来存放mbuf,然后buf_vec记录这些desc的信息*/
  20.                    if (unlikely(fill_vec_buf(dev, vq, cur_idx, &vec_idx, buf_vec,
  21.                                                         &head_idx, &len) < 0))
  22.                             return -1;
  23.                    len = RTE_MIN(len, size);
  24.                    update_shadow_used_ring(vq, head_idx, len);
  25.                    size -= len;
  26.  
  27.                    cur_idx++;
  28.                    tries++;
  29.                    *num_buffers += 1;
  30.  
  31.                    /*
  32.                     * if we tried all available ring items, and still
  33.                     * can't get enough buf, it means something abnormal
  34.                     * happened.
  35.                     */
  36.                     /*当尝试次数大于了vq->size,说明所有可用的desc都被扫描过了,
  37.                     即所有可用的desc加起来都无法满足这个mbuf*/
  38.                    if (unlikely(tries >= vq->size))
  39.                             return -1;
  40.          }
  41.  
  42.          return 0;
  43. }


这里需要注意的一点就是num_buffers不是使用desc的个数,而是使用的desc chain个数。而fill_vec_buf负责每次挑选一个desc chain填入对应的buf_vector,注意传入参数head_idx每次随着被desc填充而被修改。下面看fill_vec_buf

l  fill_vec_buf


点击(此处)折叠或打开

  1. static inline int __attribute__((always_inline))
  2. fill_vec_buf(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.                              uint32_t avail_idx, uint32_t *vec_idx,
  4.                              struct buf_vector *buf_vec, uint16_t *desc_chain_head,
  5.                              uint16_t *desc_chain_len)
  6. {
  7.          uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
  8.          uint32_t vec_id = *vec_idx;
  9.          uint32_t len = 0;
  10.          struct vring_desc *descs = vq->desc;
  11.  
  12.          *desc_chain_head = idx;
  13.     /* VRING_DESC_F_INDIRECT处理 */
  14.          if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
  15.                    descs = (struct vring_desc *)(uintptr_t)
  16.                                                gpa_to_vva(dev, vq->desc[idx].addr);
  17.                    if (unlikely(!descs))
  18.                             return -1;
  19.  
  20.                    idx = 0;
  21.          }
  22.     /*遍历这个desc chain知道结束,将这个chain中的desc信息记录到buf_vec中,
  23.      *chain能存放的数据长度赋值给desc_chain_len*/
  24.          while (1) {
  25.                    if (unlikely(vec_id >= BUF_VECTOR_MAX || idx >= vq->size))
  26.                             return -1;
  27.  
  28.                    len += descs[idx].len;
  29.                    buf_vec[vec_id].buf_addr = descs[idx].addr;
  30.                    buf_vec[vec_id].buf_len = descs[idx].len;
  31.                    buf_vec[vec_id].desc_idx = idx;
  32.                    vec_id++;
  33.  
  34.                    if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
  35.                             break;
  36.  
  37.                    idx = descs[idx].next;
  38.          }
  39.  
  40.          *desc_chain_len = len;
  41.          *vec_idx = vec_id;
  42.  
  43.          return 0;
  44. }


这个函数比较简单,就是遍历一个desc chain到结束为止,然后将这个chain的信息存放在buf_vec中,将这个chain能存放的数据长度信息返回,以供上层判断是否还需要再找下一个chain填充。

这里有一个VRING_DESC_F_INDIRECT desc特性的判断,这里顺便说下direct descindirect desc的区别。通常的desc->addr指向的是存放数据的page,这样的desc叫做direct desc。但是indirect descdesc->addr指向的是一个direct desc数组,如下图所示。(主要indirect desc指向的只能是direct desc,即不能再继续级联下去)

最后在开启mergeablevirtio_dev_merge_rx的调用和普通模式virtio_dev_rx的调用还有点不同。在virtio_dev_rx中,每次将mbuf中的数据存放在一个desc后都会更新vq->used->ringvq->last_used_idx,即告诉前端那些desc中已经存放了数据。但在virtio_dev_merge_rx中却没有看到这个过程。其实这个过程是有的,我们注意到在reserve_avail_buf_mergeable中每次调用完fill_vec_buf就会调用一下update_shadow_used_ring,我们看一下其实现。

l  update_shadow_used_ring


点击(此处)折叠或打开

  1. static inline void __attribute__((always_inline))
  2. update_shadow_used_ring(struct vhost_virtqueue *vq,
  3.                              uint16_t desc_idx, uint16_t len)
  4. {
  5.          uint16_t i = vq->shadow_used_idx++;
  6.  
  7.          vq->shadow_used_ring[i].id = desc_idx;
  8.          vq->shadow_used_ring[i].len = len;
  9. }


这里会更新shadow_used_idxshadow_used_ring。这里引入的shadow_used_idxshadow_used_ring其实是为了最后批处理更新vq->used->ring,通常更新vq->used->ring要先找到对应的idx,在更新vq->used->ring[idx]。如果对于要使用多个desc chain的情况,这样每次更新就会造成较大的访存开销。那我们看看使用shadow_used_idxshadow_used_ring会怎么更新。在virtio_dev_merge_rx中拷贝完mbuf数据后,最后会调用flush_shadow_used_ring函数。

l  flush_shadow_used_ring

点击(此处)折叠或打开

  1. static inline void __attribute__((always_inline))
  2. flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq)
  3. {
  4.          uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
  5.          /*vq->shadow_used_idx存放的是本次使用的desc chain个数*/
  6.     /*如果used_idx + vq->shadow_used_idx没有产生环形队列回绕*/
  7.          if (used_idx + vq->shadow_used_idx <= vq->size) {
  8.                    do_flush_shadow_used_ring(dev, vq, used_idx, 0,
  9.                                                  vq->shadow_used_idx);
  10.          } else {
  11.                    uint16_t size;
  12.         /*used_idx + vq->shadow_used_idx产生了回绕,需要分首尾两部分更新*/
  13.                    /* update used ring interval [used_idx, vq->size] */
  14.                    size = vq->size - used_idx;
  15.                    do_flush_shadow_used_ring(dev, vq, used_idx, 0, size);
  16.  
  17.                    /* update the left half used ring interval [0, left_size] */
  18.                    do_flush_shadow_used_ring(dev, vq, 0, size,
  19.                                                  vq->shadow_used_idx - size);
  20.          }
  21.          vq->last_used_idx += vq->shadow_used_idx;
  22.  
  23.          rte_smp_wmb();
  24.     /*更新vq->used->idx*/
  25.          *(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx;
  26.     /*更新热迁移位图*/
  27.          vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
  28.                    sizeof(vq->used->idx));
  29. }

我们再看下do_flush_shadow_used_ring的处理。

l  do_flush_shadow_used_ring

点击(此处)折叠或打开

  1. static inline void __attribute__((always_inline))
  2. do_flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
  3.                               uint16_t to, uint16_t from, uint16_t size)
  4. {
  5.          rte_memcpy(&vq->used->ring[to],
  6.                             &vq->shadow_used_ring[from],
  7.                             size * sizeof(struct vring_used_elem));
  8.          vhost_log_used_vring(dev, vq,
  9.                             offsetof(struct vring_used, ring[to]),
  10.                             size * sizeof(struct vring_used_elem));
  11. }

可以看到由于之前shadow_used_ring中有相关记录,所以这里可以一次性拷贝到uesd_ring中,减少了内存访问次数。

阅读(4238) | 评论(0) | 转发(0) |
0

上一篇:没有了

下一篇:从dpdk1811看virtio1.1 的实现—packed ring

给主人留下些什么吧!~~