从dpdk 1811实现看virtio 1.1——inorder特性支持
——lvyilong316
VIRTIO_F_IN_ORDER 这个feature是virtio1.1引入的,其官方文档是这样介绍的:” Some devices always use descriptors in the same order in
which they have been made available. These devices can offer the
VIRTIO_F_IN_ORDER feature. If negotiated, this knowledge allows devices to
notify the use of a batch of buffers to the driver by only writing out a single used ring entry
with the id corresponding to the head entry of the descriptor chain describing
the last buffer in the batch.”
这段话有几点值得注意的:
1. 所谓inorder特性是在后端使用desc(消耗avail
desc)和释放desc(更新used
desc)的顺序是一致的。
2. 这个特性的支持关键在于device(后端)的行为;
3. 当支持这种行为的时候后端不必对每个消耗的avail
desc chain都更新一个对应的used
desc,而是可用对一次消耗多个avail
desc chain只更新一个used desc,这个used
desc中的id记录着这一批avail
desc chain的最后一个chain 的head
desc id。
如下图所示,图左,消耗avail
ring是按照1-2-3-4的顺序,但是更新used
ring是按照2-3-4-1的顺序,顺序是不一致的,所以不能开启inorder,图右,使用avail
ring和更新uesd ring的顺序一致,后端只需要更新一个used
desc记录最后一个avail desc chain的idx即可。
下面我们以dpdk 1811分析一下inorder特性的处理。
什么情况下可以支持inorder?
默认dpdk
vhost user就是按照使用avail ring的顺序更新used
ring的,所以天然是具备支持inorder能力的。那么什么时候不支持呢?我们看dpdk代码中rte_vhost_driver_register中的实现:
-
/*
-
* Dequeue zero copy can't assure descriptors returned in order.
-
* Also, it requires that the guest memory is populated, which is
-
* not compatible with postcopy.
-
*/
-
if (vsocket->dequeue_zero_copy) {
-
vsocket->supported_features &= ~(1ULL << VIRTIO_F_IN_ORDER);
-
vsocket->features &= ~(1ULL << VIRTIO_F_IN_ORDER);
-
-
RTE_LOG(INFO, VHOST_CONFIG,
-
"Dequeue zero copy requested, disabling postcopy support\n");
-
vsocket->protocol_features &=
-
~(1ULL << VHOST_USER_PROTOCOL_F_PAGEFAULT);
-
}
我们看到如果开启零拷贝是无法支持inorder的,因为零拷贝模式下更新used的顺序取决于相关物理设备驱动的DMA逻辑。
inorder特性有什么好处?
inoder特性可以让我们使前端驱动和后端的收发报文逻辑更加简单,此外后端可以支持批处理更新uesd(但并没有减少kick前端的次数,因为默认就是一个batch处理kick一次前端),最后简单有序的逻辑可以让我们优化驱动。
下面我们就以dpdk virtio-net前端驱动的实现对比看下inorder情况下收发包的处理逻辑是如何简化的。
在前端的驱动virtio_dev_configure函数中有如下逻辑:
-
if (vtpci_with_feature(hw, VIRTIO_F_IN_ORDER)) {
-
hw->use_inorder_tx = 1;
-
if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
-
hw->use_inorder_rx = 1;
-
hw->use_simple_rx = 0;
-
} else {
-
hw->use_inorder_rx = 0;
-
}
-
}
紧接着看下set_rxtx_funcs函数。
l set_rxtx_funcs
-
static void
-
set_rxtx_funcs(struct rte_eth_dev *eth_dev)
-
{
-
struct virtio_hw *hw = eth_dev->data->dev_private;
-
-
if (hw->use_simple_rx) {
-
eth_dev->rx_pkt_burst = virtio_recv_pkts_vec;
-
} else if (hw->use_inorder_rx) {
-
eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts_inorder;
-
} else if (vtpci_with_feature(hw, VIRTIO_NET_F_MRG_RXBUF)) {
-
eth_dev->rx_pkt_burst = &virtio_recv_mergeable_pkts;
-
} else {
-
eth_dev->rx_pkt_burst = &virtio_recv_pkts;
-
}
-
-
if (hw->use_inorder_tx) {
-
eth_dev->tx_pkt_burst = virtio_xmit_pkts_inorder;
-
} else {
-
eth_dev->tx_pkt_burst = virtio_xmit_pkts;
-
}
-
}
可以看到收发函数对应是否开启inorder分别使用不同的函数。所以我们对比起实现就可以看出inorder的简化。在此之前我们可以看到,如果mergeable特性不打开,rx方向使用的函数也不同。
接收处理对比
我们先看正常的发送函数virtio_xmit_pkts。首先我们看一下前端virtio-net相关的数据结构关系图:
l virtio_xmit_pkts
-
uint16_t
-
virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
-
{
-
struct virtnet_tx *txvq = tx_queue;
-
struct virtqueue *vq = txvq->vq;
-
struct virtio_hw *hw = vq->hw;
-
uint16_t hdr_size = hw->vtnet_hdr_size;
-
uint16_t nb_used, nb_tx = 0;
-
int error;
-
-
if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
-
return nb_tx;
-
-
if (unlikely(nb_pkts < 1))
-
return nb_pkts;
-
-
PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
-
nb_used = VIRTQUEUE_NUSED(vq);/* used desc 的个数 */
-
-
virtio_rmb();
-
/* 如果uesd desc过多,则需要释放掉 */
-
if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh))
-
virtio_xmit_cleanup(vq, nb_used);
-
-
for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
-
struct rte_mbuf *txm = tx_pkts[nb_tx];
-
int can_push = 0, use_indirect = 0, slots, need;
-
-
/* Do VLAN tag insertion */
-
if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
-
error = rte_vlan_insert(&txm);
-
if (unlikely(error)) {
-
rte_pktmbuf_free(txm);
-
continue;
-
}
-
}
-
-
/* optimize ring usage */
-
/* 如果满足以下条件,可以将virtio hdr直接放在同一个mbuf中,在拷贝时使用同一个desc */
-
if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
-
vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
-
rte_mbuf_refcnt_read(txm) == 1 &&
-
RTE_MBUF_DIRECT(txm) &&
-
txm->nb_segs == 1 &&
-
rte_pktmbuf_headroom(txm) >= hdr_size &&
-
rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
-
__alignof__(struct virtio_net_hdr_mrg_rxbuf)))
-
can_push = 1;
-
else if (vtpci_with_feature(hw, VIRTIO_RING_F_INDIRECT_DESC) &&
-
txm->nb_segs < VIRTIO_MAX_TX_INDIRECT)
-
use_indirect = 1;
-
-
/* How many main ring entries are needed to this Tx?
-
* any_layout => number of segments
-
* indirect => 1
-
* default => number of segments + 1
-
*/
-
/* 注意如果没有VIRTIO_F_ANY_LAYOUT feature,则virtio_hdr必须独占一个desc */
-
slots = use_indirect ? 1 : (txm->nb_segs + !can_push);
-
need = slots - vq->vq_free_cnt;
-
-
/* Positive value indicates it need free vring descriptors */
-
if (unlikely(need > 0)) { /* need 大于0说明当前可用的avail desc不足,需要释放used desc */
-
nb_used = VIRTQUEUE_NUSED(vq);
-
virtio_rmb();
-
need = RTE_MIN(need, (int)nb_used);
-
-
virtio_xmit_cleanup(vq, need);
-
need = slots - vq->vq_free_cnt;
-
if (unlikely(need > 0)) {
-
PMD_TX_LOG(ERR,
-
"No free tx descriptors to transmit");
-
break;
-
}
-
}
-
-
/* Enqueue Packet buffers */
-
virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect,
-
can_push, 0);
-
-
txvq->stats.bytes += txm->pkt_len;
-
virtio_update_packet_stats(&txvq->stats, txm);
-
}
-
-
txvq->stats.packets += nb_tx;
-
-
if (likely(nb_tx)) {
-
vq_update_avail_idx(vq);
-
-
if (unlikely(virtqueue_kick_prepare(vq))) {
-
virtqueue_notify(vq);
-
PMD_TX_LOG(DEBUG, "Notified backend after xmit");
-
}
-
}
-
-
return nb_tx;
-
}
然后我们再看支持inorder时选择的函数virtio_xmit_pkts_inorder。
l virtio_xmit_pkts_inorder
-
uint16_t
-
virtio_xmit_pkts_inorder(void *tx_queue,
-
struct rte_mbuf **tx_pkts,
-
uint16_t nb_pkts)
-
{
-
struct virtnet_tx *txvq = tx_queue;
-
struct virtqueue *vq = txvq->vq;
-
struct virtio_hw *hw = vq->hw;
-
uint16_t hdr_size = hw->vtnet_hdr_size;
-
uint16_t nb_used, nb_avail, nb_tx = 0, nb_inorder_pkts = 0;
-
struct rte_mbuf *inorder_pkts[nb_pkts];
-
int error;
-
-
if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
-
return nb_tx;
-
-
if (unlikely(nb_pkts < 1))
-
return nb_pkts;
-
-
VIRTQUEUE_DUMP(vq);
-
PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
-
nb_used = VIRTQUEUE_NUSED(vq); /* 获取used desc的个数 */
-
-
virtio_rmb();
-
/* 如果uesd desc过多,则需要释放掉 */
-
if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh))
-
virtio_xmit_cleanup_inorder(vq, nb_used);
-
/* 如果已经没有free的avail desc,则也需要释放used desc */
-
if (unlikely(!vq->vq_free_cnt))
-
virtio_xmit_cleanup_inorder(vq, nb_used);
-
-
nb_avail = RTE_MIN(vq->vq_free_cnt, nb_pkts);
-
-
for (nb_tx = 0; nb_tx < nb_avail; nb_tx++) {
-
struct rte_mbuf *txm = tx_pkts[nb_tx];
-
int slots, need;
-
-
/* Do VLAN tag insertion */
-
if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) {
-
error = rte_vlan_insert(&txm);
-
if (unlikely(error)) {
-
rte_pktmbuf_free(txm);
-
continue;
-
}
-
}
-
-
/* optimize ring usage */
-
/* 如果满足以下条件,可以将virtio hdr直接放在同一个mbuf中,在拷贝时使用同一个desc */
-
if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) ||
-
vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) &&
-
rte_mbuf_refcnt_read(txm) == 1 &&
-
RTE_MBUF_DIRECT(txm) &&
-
txm->nb_segs == 1 &&
-
rte_pktmbuf_headroom(txm) >= hdr_size &&
-
rte_is_aligned(rte_pktmbuf_mtod(txm, char *),
-
__alignof__(struct virtio_net_hdr_mrg_rxbuf))) {
-
inorder_pkts[nb_inorder_pkts] = txm;
-
nb_inorder_pkts++; /* 对于一个mbuf只占用一个desc的情况在inorder情况可以批处理 */
-
-
txvq->stats.bytes += txm->pkt_len;
-
virtio_update_packet_stats(&txvq->stats, txm);
-
continue;
-
}
-
-
if (nb_inorder_pkts) { /* inorder方式批处理独占的desc */
-
virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts,
-
nb_inorder_pkts);
-
nb_inorder_pkts = 0;
-
}
-
-
slots = txm->nb_segs + 1;
-
need = slots - vq->vq_free_cnt; /* free 的avail desc不足,需要释放used */
-
if (unlikely(need > 0)) {
-
nb_used = VIRTQUEUE_NUSED(vq);
-
virtio_rmb();
-
need = RTE_MIN(need, (int)nb_used);
-
-
virtio_xmit_cleanup_inorder(vq, need);
-
-
need = slots - vq->vq_free_cnt;
-
-
if (unlikely(need > 0)) {
-
PMD_TX_LOG(ERR,
-
"No free tx descriptors to transmit");
-
break;
-
}
-
}
-
/* Enqueue Packet buffers */ /* 普通方式处理非独占desc的情况,如mbuf有segs时 */
-
virtqueue_enqueue_xmit(txvq, txm, slots, 0, 0, 1);
-
-
txvq->stats.bytes += txm->pkt_len;
-
virtio_update_packet_stats(&txvq->stats, txm);
-
}
-
-
/* Transmit all inorder packets */
-
if (nb_inorder_pkts) /* 剩余的独占desc情况采用inorder方式处理 */
-
virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts,
-
nb_inorder_pkts);
-
-
txvq->stats.packets += nb_tx;
-
-
if (likely(nb_tx)) {
-
vq_update_avail_idx(vq);
-
-
if (unlikely(virtqueue_kick_prepare(vq))) {
-
virtqueue_notify(vq);
-
PMD_TX_LOG(DEBUG, "Notified backend after xmit");
-
}
-
}
-
-
VIRTQUEUE_DUMP(vq);
-
-
return nb_tx;
-
}
可以看到主要不同有两点,一个是释放used
desc的逻辑;另一个是对独占desc的mbuf,inorder采用了特殊的批处理方式。我们先看前一个,非inorder模式使用的是virtio_xmit_cleanup函数,inorder使用的是virtio_xmit_cleanup_inorder,我们继续看其区别。
l virtio_xmit_cleanup
-
static void
-
virtio_xmit_cleanup(struct virtqueue *vq, uint16_t num)
-
{
-
uint16_t i, used_idx, desc_idx;
-
for (i = 0; i < num; i++) {
-
struct vring_used_elem *uep;
-
struct vq_desc_extra *dxp;
-
-
used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
-
uep = &vq->vq_ring.used->ring[used_idx];
-
-
desc_idx = (uint16_t) uep->id; /* desc_idx为used->ring[used_idx].id */
-
dxp = &vq->vq_descx[desc_idx];
-
vq->vq_used_cons_idx++;
-
vq_ring_free_chain(vq, desc_idx);
-
-
if (dxp->cookie != NULL) {
-
rte_pktmbuf_free(dxp->cookie); /* dxp->cookie存放的mbuf指针 */
-
dxp->cookie = NULL;
-
}
-
}
-
}
该函数主要功能是从vq_used_cons_idx开始,依次根据used_idx找到对于的desc,调用vq_ring_free_chain 是否desc,调用rte_pktmbuf_free是否对应的mbuf
然后我们看vq_ring_free_chain是如何是否desc的。
l vq_ring_free_chain
-
void
-
vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
-
{
-
struct vring_desc *dp, *dp_tail;
-
struct vq_desc_extra *dxp;
-
uint16_t desc_idx_last = desc_idx;
-
-
dp = &vq->vq_ring.desc[desc_idx];
-
dxp = &vq->vq_descx[desc_idx]; /* 更新vq_free_cnt */
-
vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
-
if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
-
while (dp->flags & VRING_DESC_F_NEXT) {
-
desc_idx_last = dp->next;
-
dp = &vq->vq_ring.desc[dp->next];
-
}
-
} /* desc_idx_last指向desc chian的尾部 */
-
dxp->ndescs = 0;
-
-
/*
-
* We must append the existing free chain, if any, to the end of
-
* newly freed chain. If the virtqueue was completely used, then
-
* head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
-
*/
-
if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
-
vq->vq_desc_head_idx = desc_idx;
-
} else {
-
dp_tail = &vq->vq_ring.desc[vq->vq_desc_tail_idx];
-
dp_tail->next = desc_idx;
-
}
-
-
vq->vq_desc_tail_idx = desc_idx_last; /* 更新vq_desc_tail_idx为当前是否desc chain的尾部 */
-
dp->next = VQ_RING_DESC_CHAIN_END;
-
}
主要就是将对应used 指向的desc chain释放,然后连接在已有(如果存在)的free
desc尾部,(注意对于前端发送者来说,free desc chain只有一个,uesd释放的chain直接添加在结尾,只有发送实际使用desc时才会从free desc chain中拆分出新的chain)然后我们看下virtio_xmit_cleanup_inorder的实现。
l virtio_xmit_cleanup_inorder
-
static void
-
virtio_xmit_cleanup_inorder(struct virtqueue *vq, uint16_t num)
-
{
-
uint16_t i, used_idx, desc_idx = 0, last_idx;
-
int16_t free_cnt = 0;
-
struct vq_desc_extra *dxp = NULL;
-
-
if (unlikely(num == 0))
-
return;
-
-
for (i = 0; i < num; i++) {
-
struct vring_used_elem *uep;
-
-
used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1);
-
uep = &vq->vq_ring.used->ring[used_idx];
-
desc_idx = (uint16_t)uep->id; /* desc_idx为used->ring[used_idx].id */
-
-
dxp = &vq->vq_descx[desc_idx];
-
vq->vq_used_cons_idx++;
-
-
if (dxp->cookie != NULL) {
-
rte_pktmbuf_free(dxp->cookie);
-
dxp->cookie = NULL;
-
}
-
}
-
-
last_idx = desc_idx + dxp->ndescs - 1;
-
free_cnt = last_idx - vq->vq_desc_tail_idx;
-
if (free_cnt <= 0)
-
free_cnt += vq->vq_nentries;
-
-
vq_ring_free_inorder(vq, last_idx, free_cnt);
-
}
其和virtio_xmit_cleanup的区别主要是,后者是逐个desc
chian调用vq_ring_free_chain释放的,而前者是调用vq_ring_free_inorder一次释放所有desc
chain。
l vq_ring_free_inorder
-
void
-
vq_ring_free_inorder(struct virtqueue *vq, uint16_t desc_idx, uint16_t num)
-
{
-
vq->vq_free_cnt += num;
-
vq->vq_desc_tail_idx = desc_idx & (vq->vq_nentries - 1);
-
}
那么为什么非inorder模式下就不能像inorder一样采用批处理,一次性释放所有desc
chain呢?
让我们看下下面这个图。
上面是inorder情况,由于avail desc是按顺序使用的,used也是相同顺序,所以我们可以确定used
ring的尾部就是已经使用desc的最后位置,所以就可以直接跳到最后,更新vq_desc_tail_idx。
下面是非inorder的情况,我们看到used的顺序和使用avail的顺序不同。假如我们按照inorder方式进行批处理,直接以uesd的最后一个desc作为本次释放所有desc
chian的尾部,就变成了如下图的样子。
最后一个uesd desc之前的desc都无法得到释放,这个问题的本质是由于使用顺序不一致,最后的used指向的不一定是最后的desc。
下面我们再看inorder和非inorder发送处理的第二个不同点,inorder方式采用了virtqueue_enqueue_xmit_inorder函数,将独自desc的mbuf进行批处理发送,我们看下其具体实现。
l virtqueue_enqueue_xmit_inorder
-
static inline void
-
virtqueue_enqueue_xmit_inorder(struct virtnet_tx *txvq,
-
struct rte_mbuf **cookies,
-
uint16_t num)
-
{
-
struct vq_desc_extra *dxp;
-
struct virtqueue *vq = txvq->vq;
-
struct vring_desc *start_dp;
-
struct virtio_net_hdr *hdr;
-
uint16_t idx;
-
uint16_t head_size = vq->hw->vtnet_hdr_size;
-
uint16_t i = 0;
-
-
idx = vq->vq_desc_head_idx; /* 首个可用desc的idx */
-
start_dp = vq->vq_ring.desc; /* start_dp指向desc ring */
-
-
while (i < num) {
-
idx = idx & (vq->vq_nentries - 1);
-
dxp = &vq->vq_descx[idx];
-
dxp->cookie = (void *)cookies[i];/* 将mbuf赋值给对应的vq_desc_extra.cookie */
-
dxp->ndescs = 1;
-
/* 将virtio_hdr添加到mbuf前端 */
-
hdr = (struct virtio_net_hdr *)
-
rte_pktmbuf_prepend(cookies[i], head_size);
-
cookies[i]->pkt_len -= head_size;
-
-
/* if offload disabled, it is not zeroed below, do it now */
-
if (!vq->hw->has_tx_offload) {
-
ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
-
}
-
/* 初始化virtio_hdr */
-
virtqueue_xmit_offload(hdr, cookies[i],
-
vq->hw->has_tx_offload);
-
/* 根据mbuf 设置对应的desc */
-
start_dp[idx].addr = VIRTIO_MBUF_DATA_DMA_ADDR(cookies[i], vq);
-
start_dp[idx].len = cookies[i]->data_len;
-
start_dp[idx].flags = 0;
-
-
vq_update_avail_ring(vq, idx);
-
-
idx++;
-
i++;
-
};
-
/* 更新free desc的总数,更新free desc的head_idx */
-
vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num);
-
vq->vq_desc_head_idx = idx & (vq->vq_nentries - 1);
-
}
过程很简单,就是将mbuf逐个添加virtio_hdr并设置virtio_hdr,然后按照顺序逐个更新desc,最后更新free desc总数以及free desc的head_idx。这里这个按顺序很重要,非inorder就不能这么做,什么原因呢?我们先回到上面那个非inorder模式释放uesd desc的图,按照非inorder模式逐个释放uesd desc最后会是如下结果。
可以看到,desc chain出现了跳跃。desc[3]的next不是4而是6,所以是无序的,这也就是inorder的一个关键作用:保证avail desc chain的连续性。
当前对于带有一个mbuf占用多个desc的情况(如virtio_hdr占用独立的desc,或mbuf带有segs时),由于要使用desc chain,所以采用正常的发送模式。
l virtqueue_enqueue_xmit
-
static inline void
-
virtqueue_enqueue_xmit(struct virtnet_tx *txvq, struct rte_mbuf *cookie,
-
uint16_t needed, int use_indirect, int can_push,
-
int in_order)
-
{
-
struct virtio_tx_region *txr = txvq->virtio_net_hdr_mz->addr;
-
struct vq_desc_extra *dxp;
-
struct virtqueue *vq = txvq->vq;
-
struct vring_desc *start_dp;
-
uint16_t seg_num = cookie->nb_segs;
-
uint16_t head_idx, idx;
-
uint16_t head_size = vq->hw->vtnet_hdr_size;
-
struct virtio_net_hdr *hdr;
-
-
head_idx = vq->vq_desc_head_idx;
-
idx = head_idx;
-
dxp = &vq->vq_descx[idx];
-
dxp->cookie = (void *)cookie;
-
dxp->ndescs = needed;
-
-
start_dp = vq->vq_ring.desc;
-
-
if (can_push) { /* virtio_hdr可用直接放在mbuf前和数据公用一个desc的情况 */
-
/* prepend cannot fail, checked by caller */
-
hdr = (struct virtio_net_hdr *)
-
rte_pktmbuf_prepend(cookie, head_size);
-
/* rte_pktmbuf_prepend() counts the hdr size to the pkt length,
-
* which is wrong. Below subtract restores correct pkt size.
-
*/
-
cookie->pkt_len -= head_size;
-
-
/* if offload disabled, it is not zeroed below, do it now */
-
if (!vq->hw->has_tx_offload) {
-
ASSIGN_UNLESS_EQUAL(hdr->csum_start, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->csum_offset, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->flags, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->gso_type, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->gso_size, 0);
-
ASSIGN_UNLESS_EQUAL(hdr->hdr_len, 0);
-
}
-
} else if (use_indirect) {
-
/* setup tx ring slot to point to indirect
-
* descriptor list stored in reserved region.
-
*
-
* the first slot in indirect ring is already preset
-
* to point to the header in reserved region
-
*/
-
start_dp[idx].addr = txvq->virtio_net_hdr_mem +
-
RTE_PTR_DIFF(&txr[idx].tx_indir, txr);
-
start_dp[idx].len = (seg_num + 1) * sizeof(struct vring_desc);
-
start_dp[idx].flags = VRING_DESC_F_INDIRECT;
-
hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
-
-
/* loop below will fill in rest of the indirect elements */
-
start_dp = txr[idx].tx_indir;
-
idx = 1;
-
} else {
-
/* setup first tx ring slot to point to header
-
* stored in reserved region.
-
*/
-
start_dp[idx].addr = txvq->virtio_net_hdr_mem +
-
RTE_PTR_DIFF(&txr[idx].tx_hdr, txr);
-
start_dp[idx].len = vq->hw->vtnet_hdr_size;
-
start_dp[idx].flags = VRING_DESC_F_NEXT;
-
hdr = (struct virtio_net_hdr *)&txr[idx].tx_hdr;
-
-
idx = start_dp[idx].next;
-
}
-
-
virtqueue_xmit_offload(hdr, cookie, vq->hw->has_tx_offload);
-
-
do {
-
start_dp[idx].addr = VIRTIO_MBUF_DATA_DMA_ADDR(cookie, vq);
-
start_dp[idx].len = cookie->data_len;
-
/* 从free desc chain中拆分出desc chain的过程 */
-
start_dp[idx].flags = cookie->next ? VRING_DESC_F_NEXT : 0;
-
idx = start_dp[idx].next;
-
} while ((cookie = cookie->next) != NULL);
-
-
if (use_indirect)
-
idx = vq->vq_ring.desc[head_idx].next;
-
-
vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
-
-
vq->vq_desc_head_idx = idx;
-
vq_update_avail_ring(vq, head_idx);
-
-
if (!in_order) {
-
if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
-
vq->vq_desc_tail_idx = idx;
-
}
-
}
上面需要注意一点的就是后端接收方向看到的desc chain产生的过程。
同理接收方向的优化类似,我们这里就不再展开分析了。
阅读(9211) | 评论(0) | 转发(0) |