Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1513514
  • 博文数量: 153
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 5522
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 18:56
个人简介

将晦涩难懂的技术讲的通俗易懂

文章分类

全部博文(153)

文章存档

2019年(7)

2018年(19)

2017年(9)

2016年(26)

2015年(18)

2014年(54)

2013年(20)

分类: LINUX

2019-03-17 00:12:48

dpdk 1811实现看virtio 1.1——inorder特性支持

——lvyilong316

  VIRTIO_F_IN_ORDER 这个featurevirtio1.1引入的,其官方文档是这样介绍的: Some devices always use descriptors in the same order in which they have been made available. These devices can offer the VIRTIO_F_IN_ORDER feature. If negotiated, this knowledge allows devices to notify the use of a batch of buffers to the driver by only writing out a single used ring entry with the id corresponding to the head entry of the descriptor chain describing the last buffer in the batch.”

这段话有几点值得注意的:

1.  所谓inorder特性是在后端使用desc(消耗avail desc)和释放desc(更新used desc)的顺序是一致的。

2.  这个特性的支持关键在于device(后端)的行为;

3.  当支持这种行为的时候后端不必对每个消耗的avail desc chain都更新一个对应的used desc,而是可用对一次消耗多个avail desc chain只更新一个used desc,这个used desc中的id记录着这一批avail desc chain的最后一个chain head desc id

如下图所示,图左,消耗avail ring是按照1-2-3-4的顺序,但是更新used ring是按照2-3-4-1的顺序,顺序是不一致的,所以不能开启inorder,图右,使用avail ring和更新uesd ring的顺序一致,后端只需要更新一个used desc记录最后一个avail desc chainidx即可。

   下面我们以dpdk 1811分析一下inorder特性的处理。

什么情况下可以支持inorder

默认dpdk vhost user就是按照使用avail ring的顺序更新used ring的,所以天然是具备支持inorder能力的。那么什么时候不支持呢?我们看dpdk代码中rte_vhost_driver_register中的实现:

点击(此处)折叠或打开

  1.     /*
  2.      * Dequeue zero copy can't assure descriptors returned in order.
  3.      * Also, it requires that the guest memory is populated, which is
  4.      * not compatible with postcopy.
  5.      */
  6.     if (vsocket->dequeue_zero_copy) {
  7.         vsocket->supported_features &= ~(1ULL << VIRTIO_F_IN_ORDER);
  8.         vsocket->features &= ~(1ULL << VIRTIO_F_IN_ORDER);

  9.         RTE_LOG(INFO, VHOST_CONFIG,
  10.             "Dequeue zero copy requested, disabling postcopy support\n");
  11.         vsocket->protocol_features &=
  12.             ~(1ULL << VHOST_USER_PROTOCOL_F_PAGEFAULT);
  13.     }

我们看到如果开启零拷贝是无法支持inorder的,因为零拷贝模式下更新used的顺序取决于相关物理设备驱动的DMA逻辑。

inorder特性有什么好处?

inoder特性可以让我们使前端驱动和后端的收发报文逻辑更加简单,此外后端可以支持批处理更新uesd(但并没有减少kick前端的次数,因为默认就是一个batch处理kick一次前端),最后简单有序的逻辑可以让我们优化驱动。

下面我们就以dpdk virtio-net前端驱动的实现对比看下inorder情况下收发包的处理逻辑是如何简化的。

在前端的驱动virtio_dev_configure函数中有如下逻辑:

点击(此处)折叠或打开

  1. if (vtpci_with_feature(hw, VIRTIO_F_IN_ORDER)) {
  2.         hw->use_inorder_tx = 1;
  3.         if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
  4.             hw->use_inorder_rx = 1;
  5.             hw->use_simple_rx = 0;
  6.         } else {
  7.             hw->use_inorder_rx = 0;
  8.         }
  9.     }

紧接着看下set_rxtx_funcs函数。

l  set_rxtx_funcs

点击(此处)折叠或打开

  1. static void
  2. set_rxtx_funcs(struct rte_eth_dev *eth_dev)
  3. {
  4.     struct virtio_hw *hw = eth_dev->data->dev_private;

  5.     if (hw->use_simple_rx) {
  6.         eth_dev->rx_pkt_burst = virtio_recv_pkts_vec;
  7.     } else if (hw->use_inorder_rx) {
  8.         eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts_inorder;
  9.     } else if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
  10.         eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts;
  11.     } else {
  12.         eth_dev->rx_pkt_burst = &virtio_recv_pkts;
  13.     }

  14.     if (hw->use_inorder_tx) {
  15.         eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
  16.     } else {
  17.         eth_dev->tx_pkt_burst = virtio_xmit_pkts;
  18.     }
  19. }

可以看到收发函数对应是否开启inorder分别使用不同的函数。所以我们对比起实现就可以看出inorder的简化。在此之前我们可以看到,如果mergeable特性不打开,rx方向使用的函数也不同

接收处理对比

我们先看正常的发送函数virtio_xmit_pkts。首先我们看一下前端virtio-net相关的数据结构关系图:

l  virtio_xmit_pkts

点击(此处)折叠或打开

  1. uint16_t
  2. virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
  3. {
  4.     struct virtnet_tx *txvq = tx_queue;
  5.     struct virtqueue *vq = txvq->vq;
  6.     struct virtio_hw *hw = vq->hw;
  7.     uint16_t hdr_size = hw->vtnet_hdr_size;
  8.     uint16_t nb_used, nb_tx = 0;
  9.     int error;

  10.     if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
  11.         return nb_tx;

  12.     if (unlikely(nb_pkts < 1))
  13.         return nb_pkts;

  14.     PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
  15.     nb_used = VIRTQUEUE_NUSED(vq);/* used desc 的个数 */

  16.     virtio_rmb();
  17.     /* 如果uesd desc过多,则需要释放掉 */
  18.     if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh))
  19.         virtio_xmit_cleanup(vq, nb_used);

  20.     for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
  21.         struct rte_mbuf *txm = tx_pkts[nb_tx];
  22.         int can_push = 0, use_indirect = 0, slots, need;

  23.         /* Do VLAN tag insertion */
  24.         if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
  25.             error = rte_vlan_insert(&txm);
  26.             if (unlikely(error)) {
  27.                 rte_pktmbuf_free(txm);
  28.                 continue;
  29.             }
  30.         }

  31.         /* optimize ring usage */
  32.         /* 如果满足以下条件,可以将virtio hdr直接放在同一个mbuf中,在拷贝时使用同一个desc */
  33.         if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
  34.          vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
  35.          rte_mbuf_refcnt_read(txm) == 1 &&
  36.          RTE_MBUF_DIRECT(txm) &&
  37.          txm->nb_segs == 1 &&
  38.          rte_pktmbuf_headroom(txm) >= hdr_size &&
  39.          rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
  40.                  __alignof__(struct virtio_net_hdr_mrg_rxbuf)))
  41.             can_push = 1;
  42.         else if (vtpci_with_feature(hw, VIRTIO_RING_F_INDIRECT_DESC) &&
  43.              txm->nb_segs < VIRTIO_MAX_TX_INDIRECT)
  44.             use_indirect = 1;

  45.         /* How many main ring entries are needed to this Tx?
  46.          * any_layout => number of segments
  47.          * indirect => 1
  48.          * default => number of segments + 1
  49.          */
  50.          /* 注意如果没有VIRTIO_F_ANY_LAYOUT feature,则virtio_hdr必须独占一个desc */
  51.         slots = use_indirect ? 1 : (txm->nb_segs + !can_push);
  52.         need = slots - vq->vq_free_cnt;

  53.         /* Positive value indicates it need free vring descriptors */
  54.         if (unlikely(need > 0)) { /* need 大于0说明当前可用的avail desc不足,需要释放used desc */
  55.             nb_used = VIRTQUEUE_NUSED(vq);
  56.             virtio_rmb();
  57.             need = RTE_MIN(need, (int)nb_used);

  58.             virtio_xmit_cleanup(vq, need);
  59.             need = slots - vq->vq_free_cnt;
  60.             if (unlikely(need > 0)) {
  61.                 PMD_TX_LOG(ERR,
  62.                      "No free tx descriptors to transmit");
  63.                 break;
  64.             }
  65.         }

  66.         /* Enqueue Packet buffers */
  67.         virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect,
  68.             can_push, 0);

  69.         txvq->stats.bytes += txm->pkt_len;
  70.         virtio_update_packet_stats(&txvq->stats, txm);
  71.     }

  72.     txvq->stats.packets += nb_tx;

  73.     if (likely(nb_tx)) {
  74.         vq_update_avail_idx(vq);

  75.         if (unlikely(virtqueue_kick_prepare(vq))) {
  76.             virtqueue_notify(vq);
  77.             PMD_TX_LOG(DEBUG, "Notified backend after xmit");
  78.         }
  79.     }

  80.     return nb_tx;
  81. }

     然后我们再看支持inorder时选择的函数virtio_xmit_pkts_inorder

l  virtio_xmit_pkts_inorder

点击(此处)折叠或打开

  1. uint16_t
  2. virtio_xmit_pkts_inorder(void *tx_queue,
  3.             struct rte_mbuf **tx_pkts,
  4.             uint16_t nb_pkts)
  5. {
  6.     struct virtnet_tx *txvq = tx_queue;
  7.     struct virtqueue *vq = txvq->vq;
  8.     struct virtio_hw *hw = vq->hw;
  9.     uint16_t hdr_size = hw->vtnet_hdr_size;
  10.     uint16_t nb_used, nb_avail, nb_tx = 0, nb_inorder_pkts = 0;
  11.     struct rte_mbuf *inorder_pkts[nb_pkts];
  12.     int error;

  13.     if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
  14.         return nb_tx;

  15.     if (unlikely(nb_pkts < 1))
  16.         return nb_pkts;

  17.     VIRTQUEUE_DUMP(vq);
  18.     PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
  19.     nb_used = VIRTQUEUE_NUSED(vq); /* 获取used desc的个数 */

  20.     virtio_rmb();
  21.     /* 如果uesd desc过多,则需要释放掉 */
  22.     if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh))
  23.         virtio_xmit_cleanup_inorder(vq, nb_used);
  24.     /* 如果已经没有free的avail desc,则也需要释放used desc */
  25.     if (unlikely(!vq->vq_free_cnt))
  26.         virtio_xmit_cleanup_inorder(vq, nb_used);

  27.     nb_avail = RTE_MIN(vq->vq_free_cnt, nb_pkts);

  28.     for (nb_tx = 0; nb_tx < nb_avail; nb_tx++) {
  29.         struct rte_mbuf *txm = tx_pkts[nb_tx];
  30.         int slots, need;

  31.         /* Do VLAN tag insertion */
  32.         if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
  33.             error = rte_vlan_insert(&txm);
  34.             if (unlikely(error)) {
  35.                 rte_pktmbuf_free(txm);
  36.                 continue;
  37.             }
  38.         }

  39.         /* optimize ring usage */
  40.         /* 如果满足以下条件,可以将virtio hdr直接放在同一个mbuf中,在拷贝时使用同一个desc */
  41.         if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
  42.          vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
  43.          rte_mbuf_refcnt_read(txm) == 1 &&
  44.          RTE_MBUF_DIRECT(txm) &&
  45.          txm->nb_segs == 1 &&
  46.          rte_pktmbuf_headroom(txm) >= hdr_size &&
  47.          rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
  48.                 __alignof__(struct virtio_net_hdr_mrg_rxbuf))) {
  49.             inorder_pkts[nb_inorder_pkts] = txm;
  50.             nb_inorder_pkts++; /* 对于一个mbuf只占用一个desc的情况在inorder情况可以批处理 */

  51.             txvq->stats.bytes += txm->pkt_len;
  52.             virtio_update_packet_stats(&txvq->stats, txm);
  53.             continue;
  54.         }

  55.         if (nb_inorder_pkts) { /* inorder方式批处理独占的desc */
  56.             virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts,
  57.                             nb_inorder_pkts);
  58.             nb_inorder_pkts = 0;
  59.         }

  60.         slots = txm->nb_segs + 1;
  61.         need = slots - vq->vq_free_cnt; /* free 的avail desc不足,需要释放used */
  62.         if (unlikely(need > 0)) {
  63.             nb_used = VIRTQUEUE_NUSED(vq);
  64.             virtio_rmb();
  65.             need = RTE_MIN(need, (int)nb_used);

  66.             virtio_xmit_cleanup_inorder(vq, need);

  67.             need = slots - vq->vq_free_cnt;

  68.             if (unlikely(need > 0)) {
  69.                 PMD_TX_LOG(ERR,
  70.                     "No free tx descriptors to transmit");
  71.                 break;
  72.             }
  73.         }
  74.         /* Enqueue Packet buffers */ /* 普通方式处理非独占desc的情况,如mbuf有segs时 */
  75.         virtqueue_enqueue_xmit(txvq, txm, slots, 0, 0, 1);

  76.         txvq->stats.bytes += txm->pkt_len;
  77.         virtio_update_packet_stats(&txvq->stats, txm);
  78.     }

  79.     /* Transmit all inorder packets */
  80.     if (nb_inorder_pkts) /* 剩余的独占desc情况采用inorder方式处理 */
  81.         virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts,
  82.                         nb_inorder_pkts);

  83.     txvq->stats.packets += nb_tx;

  84.     if (likely(nb_tx)) {
  85.         vq_update_avail_idx(vq);

  86.         if (unlikely(virtqueue_kick_prepare(vq))) {
  87.             virtqueue_notify(vq);
  88.             PMD_TX_LOG(DEBUG, "Notified backend after xmit");
  89.         }
  90.     }

  91.     VIRTQUEUE_DUMP(vq);

  92.     return nb_tx;
  93. }

可以看到主要不同有两点,一个是释放used desc的逻辑;另一个是对独占descmbufinorder采用了特殊的批处理方式。我们先看前一个,非inorder模式使用的是virtio_xmit_cleanup函数,inorder使用的是virtio_xmit_cleanup_inorder,我们继续看其区别。

l  virtio_xmit_cleanup

点击(此处)折叠或打开

  1. static void
  2. virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num)
  3. {
  4.     uint16_t i, used_idx, desc_idx;
  5.     for (i = 0; i < num; i++) {
  6.         struct vring_used_elem *uep;
  7.         struct vq_desc_extra *dxp;

  8.         used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
  9.         uep = &vq->vq_ring.used->ring[used_idx];

  10.         desc_idx = (uint16_t) uep->id; /* desc_idx为used->ring[used_idx].id */
  11.         dxp = &vq->vq_descx[desc_idx];
  12.         vq->vq_used_cons_idx++;
  13.         vq_ring_free_chain(vq, desc_idx);

  14.         if (dxp->cookie != NULL) {
  15.             rte_pktmbuf_free(dxp->cookie); /* dxp->cookie存放的mbuf指针 */
  16.             dxp->cookie = NULL;
  17.         }
  18.     }
  19. }

      该函数主要功能是从vq_used_cons_idx开始,依次根据used_idx找到对于的desc,调用vq_ring_free_chain 是否desc,调用rte_pktmbuf_free是否对应的mbuf

然后我们看vq_ring_free_chain是如何是否desc的。

l  vq_ring_free_chain

点击(此处)折叠或打开

  1. void
  2. vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
  3. {
  4.     struct vring_desc *dp, *dp_tail;
  5.     struct vq_desc_extra *dxp;
  6.     uint16_t desc_idx_last = desc_idx;

  7.     dp = &vq->vq_ring.desc[desc_idx];
  8.     dxp = &vq->vq_descx[desc_idx]; /* 更新vq_free_cnt */
  9.     vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
  10.     if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
  11.         while (dp->flags & VRING_DESC_F_NEXT) {
  12.             desc_idx_last = dp->next;
  13.             dp = &vq->vq_ring.desc[dp->next];
  14.         }
  15.     } /* desc_idx_last指向desc chian的尾部 */
  16.     dxp->ndescs = 0;

  17.     /*
  18.      * We must append the existing free chain, if any, to the end of
  19.      * newly freed chain. If the virtqueue was completely used, then
  20.      * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
  21.      */
  22.     if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
  23.         vq->vq_desc_head_idx = desc_idx;
  24.     } else {
  25.         dp_tail = &vq->vq_ring.desc[vq->vq_desc_tail_idx];
  26.         dp_tail->next = desc_idx;
  27.     }

  28.     vq->vq_desc_tail_idx = desc_idx_last; /* 更新vq_desc_tail_idx为当前是否desc chain的尾部 */
  29.     dp->next = VQ_RING_DESC_CHAIN_END;
  30. }

    主要就是将对应used 指向的desc chain释放,然后连接在已有(如果存在)的free desc尾部,(注意对于前端发送者来说,free desc chain只有一个,uesd释放的chain直接添加在结尾,只有发送实际使用desc时才会从free desc chain中拆分出新的chain然后我们看下virtio_xmit_cleanup_inorder的实现。

l  virtio_xmit_cleanup_inorder

点击(此处)折叠或打开

  1. static void
  2. virtio_xmit_cleanup_inorder(struct virtqueue *vq, uint16_t num)
  3. {
  4.     uint16_t i, used_idx, desc_idx = 0, last_idx;
  5.     int16_t free_cnt = 0;
  6.     struct vq_desc_extra *dxp = NULL;

  7.     if (unlikely(num == 0))
  8.         return;

  9.     for (i = 0; i < num; i++) {
  10.         struct vring_used_elem *uep;

  11.         used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1);
  12.         uep = &vq->vq_ring.used->ring[used_idx];
  13.         desc_idx = (uint16_t)uep->id; /* desc_idx为used->ring[used_idx].id */

  14.         dxp = &vq->vq_descx[desc_idx];
  15.         vq->vq_used_cons_idx++;

  16.         if (dxp->cookie != NULL) {
  17.             rte_pktmbuf_free(dxp->cookie);
  18.             dxp->cookie = NULL;
  19.         }
  20.     }

  21.     last_idx = desc_idx + dxp->ndescs - 1;
  22.     free_cnt = last_idx - vq->vq_desc_tail_idx;
  23.     if (free_cnt <= 0)
  24.         free_cnt += vq->vq_nentries;

  25.     vq_ring_free_inorder(vq, last_idx, free_cnt);
  26. }

    其和virtio_xmit_cleanup的区别主要是,后者是逐个desc chian调用vq_ring_free_chain释放的,而前者是调用vq_ring_free_inorder一次释放所有desc chain

l  vq_ring_free_inorder

点击(此处)折叠或打开

  1. void
  2. vq_ring_free_inorder(struct virtqueue *vq, uint16_t desc_idx, uint16_t num)
  3. {
  4.     vq->vq_free_cnt += num;
  5.     vq->vq_desc_tail_idx = desc_idx & (vq->vq_nentries - 1);
  6. }

那么为什么非inorder模式下就不能像inorder一样采用批处理,一次性释放所有desc chain呢?

让我们看下下面这个图。

      上面是inorder情况,由于avail desc是按顺序使用的,used也是相同顺序,所以我们可以确定used ring的尾部就是已经使用desc的最后位置,所以就可以直接跳到最后,更新vq_desc_tail_idx

下面是非inorder的情况,我们看到used的顺序和使用avail的顺序不同。假如我们按照inorder方式进行批处理,直接以uesd的最后一个desc作为本次释放所有desc chian的尾部,就变成了如下图的样子。

   

最后一个uesd desc之前的desc都无法得到释放,这个问题的本质是由于使用顺序不一致,最后的used指向的不一定是最后的desc

 

      下面我们再看inorder和非inorder发送处理的第二个不同点,inorder方式采用了virtqueue_enqueue_xmit_inorder函数,将独自descmbuf进行批处理发送,我们看下其具体实现。

l  virtqueue_enqueue_xmit_inorder

点击(此处)折叠或打开

  1. static inline void
  2. virtqueue_enqueue_xmit_inorder(struct virtnet_tx *txvq,
  3.             struct rte_mbuf **cookies,
  4.             uint16_t num)
  5. {
  6.     struct vq_desc_extra *dxp;
  7.     struct virtqueue *vq = txvq->vq;
  8.     struct vring_desc *start_dp;
  9.     struct virtio_net_hdr *hdr;
  10.     uint16_t idx;
  11.     uint16_t head_size = vq->hw->vtnet_hdr_size;
  12.     uint16_t i = 0;

  13.     idx = vq->vq_desc_head_idx; /* 首个可用desc的idx */
  14.     start_dp = vq->vq_ring.desc; /* start_dp指向desc ring */

  15.     while (i < num) {
  16.         idx = idx & (vq->vq_nentries - 1);
  17.         dxp = &vq->vq_descx[idx];
  18.         dxp->cookie = (void *)cookies[i];/* 将mbuf赋值给对应的vq_desc_extra.cookie */
  19.         dxp->ndescs = 1;
  20.         /* 将virtio_hdr添加到mbuf前端 */
  21.         hdr = (struct virtio_net_hdr *)
  22.             rte_pktmbuf_prepend(cookies[i], head_size);
  23.         cookies[i]->pkt_len -= head_size;
  24.         
  25.         /* if offload disabled, it is not zeroed below, do it now */
  26.         if (!vq->hw->has_tx_offload) {
  27.             ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
  28.             ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
  29.             ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
  30.             ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
  31.             ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
  32.             ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
  33.         }
  34.         /* 初始化virtio_hdr */
  35.         virtqueue_xmit_offload(hdr, cookies[i],
  36.                 vq->hw->has_tx_offload);
  37.         /* 根据mbuf 设置对应的desc */
  38.         start_dp[idx].addr = VIRTIO_MBUF_DATA_DMA_ADDR(cookies[i], vq);
  39.         start_dp[idx].len = cookies[i]->data_len;
  40.         start_dp[idx].flags = 0;

  41.         vq_update_avail_ring(vq, idx);

  42.         idx++;
  43.         i++;
  44.     };
  45.     /* 更新free desc的总数,更新free desc的head_idx */
  46.     vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num);
  47.     vq->vq_desc_head_idx = idx & (vq->vq_nentries - 1);
  48. }

过程很简单,就是将mbuf逐个添加virtio_hdr并设置virtio_hdr,然后按照顺序逐个更新desc,最后更新free desc总数以及free deschead_idx。这里这个按顺序很重要,非inorder就不能这么做,什么原因呢?我们先回到上面那个非inorder模式释放uesd desc的图,按照非inorder模式逐个释放uesd desc最后会是如下结果。

   
      
可以看到,desc chain出现了跳跃。desc[3]next不是4而是6,所以是无序的,这也就是inorder的一个关键作用:保证avail desc chain的连续性

当前对于带有一个mbuf占用多个desc的情况(如virtio_hdr占用独立的desc,或mbuf带有segs时),由于要使用desc chain,所以采用正常的发送模式。

l  virtqueue_enqueue_xmit

点击(此处)折叠或打开

  1. static inline void
  2. virtqueue_enqueue_xmit(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
  3.             uint16_t needed, int use_indirect, int can_push,
  4.             int in_order)
  5. {
  6.     struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
  7.     struct vq_desc_extra *dxp;
  8.     struct virtqueue *vq = txvq->vq;
  9.     struct vring_desc *start_dp;
  10.     uint16_t seg_num = cookie->nb_segs;
  11.     uint16_t head_idx, idx;
  12.     uint16_t head_size = vq->hw->vtnet_hdr_size;
  13.     struct virtio_net_hdr *hdr;

  14.     head_idx = vq->vq_desc_head_idx;
  15.     idx = head_idx;
  16.     dxp = &vq->vq_descx[idx];
  17.     dxp->cookie = (void *)cookie;
  18.     dxp->ndescs = needed;

  19.     start_dp = vq->vq_ring.desc;

  20.     if (can_push) { /* virtio_hdr可用直接放在mbuf前和数据公用一个desc的情况 */
  21.         /* prepend cannot fail, checked by caller */
  22.         hdr = (struct virtio_net_hdr *)
  23.             rte_pktmbuf_prepend(cookie, head_size);
  24.         /* rte_pktmbuf_prepend() counts the hdr size to the pkt length,
  25.          * which is wrong. Below subtract restores correct pkt size.
  26.          */
  27.         cookie->pkt_len -= head_size;

  28.         /* if offload disabled, it is not zeroed below, do it now */
  29.         if (!vq->hw->has_tx_offload) {
  30.             ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
  31.             ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
  32.             ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
  33.             ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
  34.             ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
  35.             ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
  36.         }
  37.     } else if (use_indirect) {
  38.         /* setup tx ring slot to point to indirect
  39.          * descriptor list stored in reserved region.
  40.          *
  41.          * the first slot in indirect ring is already preset
  42.          * to point to the header in reserved region
  43.          */
  44.         start_dp[idx].addr = txvq->virtio_net_hdr_mem +
  45.             RTE_PTR_DIFF(&txr[idx].tx_indir, txr);
  46.         start_dp[idx].len = (seg_num + 1) * sizeof(struct vring_desc);
  47.         start_dp[idx].flags = VRING_DESC_F_INDIRECT;
  48.         hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;

  49.         /* loop below will fill in rest of the indirect elements */
  50.         start_dp = txr[idx].tx_indir;
  51.         idx = 1;
  52.     } else {
  53.         /* setup first tx ring slot to point to header
  54.          * stored in reserved region.
  55.          */
  56.         start_dp[idx].addr = txvq->virtio_net_hdr_mem +
  57.             RTE_PTR_DIFF(&txr[idx].tx_hdr, txr);
  58.         start_dp[idx].len = vq->hw->vtnet_hdr_size;
  59.         start_dp[idx].flags = VRING_DESC_F_NEXT;
  60.         hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;

  61.         idx = start_dp[idx].next;
  62.     }

  63.     virtqueue_xmit_offload(hdr, cookie, vq->hw->has_tx_offload);

  64.     do {
  65.         start_dp[idx].addr = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
  66.         start_dp[idx].len = cookie->data_len;
  67.      /* 从free desc chain中拆分出desc chain的过程 */
  68.         start_dp[idx].flags = cookie->next ? VRING_DESC_F_NEXT : 0;
  69.         idx = start_dp[idx].next;
  70.     } while ((cookie = cookie->next) != NULL);

  71.     if (use_indirect)
  72.         idx = vq->vq_ring.desc[head_idx].next;

  73.     vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);

  74.     vq->vq_desc_head_idx = idx;
  75.     vq_update_avail_ring(vq, head_idx);

  76.     if (!in_order) {
  77.         if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
  78.             vq->vq_desc_tail_idx = idx;
  79.     }
  80. }

    上面需要注意一点的就是后端接收方向看到的desc chain产生的过程。

同理接收方向的优化类似,我们这里就不再展开分析了。

阅读(1636) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
评论热议
请登录后评论。

登录 注册