vhost_user mergeable 特性
——lvyilong316
vhost_user在收包时(将数据包发往vm内部)会调用rte_vhost_enqueue_burst函数,这个函数的实现如下:
l rte_vhost_enqueue_burst
-
uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
-
struct rte_mbuf **pkts, uint16_t count)
-
{
-
struct virtio_net *dev = get_device(vid);
-
-
if (!dev)
-
return 0;
-
-
if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
-
return virtio_dev_merge_rx(dev, queue_id, pkts, count);
-
else
-
return virtio_dev_rx(dev, queue_id, pkts, count);
-
}
我们可以看到根据vhost_user后端设备是否支持VIRTIO_NET_F_MRG_RXBUF特性会调用不同函数,其中不支持时调用的virtio_dev_rx之前已经分析过,这里看下virtio_dev_merge_rx的具体实现。
l virtio_dev_merge_rx
-
static inline uint32_t __attribute__((always_inline))
-
virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
-
struct rte_mbuf **pkts, uint32_t count)
-
{
-
struct vhost_virtqueue *vq;
-
uint32_t pkt_idx = 0;
-
uint16_t num_buffers;
-
struct buf_vector buf_vec[BUF_VECTOR_MAX];
-
uint16_t avail_head;
-
-
/*获取对应的queue*/
-
vq = dev->virtqueue[queue_id];
-
if (unlikely(vq->enabled == 0))
-
return 0;
-
-
count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-
if (count == 0)
-
return 0;
-
/*avail->ring[vq->last_avail_idx & (vq->size - 1)]记录着首个可用的desc index,
-
*avail->ring[vq->avail->idx & (vq->size - 1)]记录着最后一个可用的desc index*/
-
rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
-
-
vq->shadow_used_idx = 0;
-
/*avail->ring[avail_head]记录着最后一个可用的desc index*/
-
avail_head = *((volatile uint16_t *)&vq->avail->idx);
-
/*遍历每一个要发送的数据包*/
-
for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
-
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
-
/*预留足够的desc来存放mbuf,使用buf_vec来记录,每个buf_vec对应一个desc,
-
所以num_buffers就是存放这个数据包所需的desc chain的个数*/
-
if (unlikely(reserve_avail_buf_mergeable(dev, vq,
-
pkt_len, buf_vec, &num_buffers,
-
avail_head) < 0)) {
-
LOG_DEBUG(VHOST_DATA,
-
"(%d) failed to get enough desc from vring\n",
-
dev->vid);
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
-
LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n",
-
dev->vid, vq->last_avail_idx,
-
vq->last_avail_idx + num_buffers);
-
/*根据buf_vec中记录的desc信息,将当前数据包(mbuf)拷贝到这些desc中*/
-
if (copy_mbuf_to_desc_mergeable(dev, pkts[pkt_idx],
-
buf_vec, num_buffers) < 0) {
-
vq->shadow_used_idx -= num_buffers;
-
break;
-
}
-
/*更新last_avail_idx*/
-
vq->last_avail_idx += num_buffers;
-
}
-
-
if (likely(vq->shadow_used_idx)) {
-
flush_shadow_used_ring(dev, vq);
-
-
/* flush used->idx update before we read avail->flags. */
-
rte_mb();
-
-
/* Kick the guest if necessary. */
-
if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
-
&& (vq->callfd >= 0))
-
eventfd_write(vq->callfd, (eventfd_t)1);
-
}
-
-
return pkt_idx;
-
}
这里主要引入了一个struct
buf_vector的数组,这个数组和desc是一一对应的,其关系如下所示:
那么为什么要引入这个buf_vector数组呢?首先我们知道所谓merge_rx的功能,主要是为了vm接收大包(实现LRO的功能)。一般来说,一个mbuf会对应转换成为一个desc,但是当mbuf稍微大点,一个desc无法装下怎么办呢?
这时不要忘了desc中的next成员,即desc也是可以形成chain的,如果当前desc无法装下mbuf,那么就用vq->desc[desc->next]来继续存放。如下图:
desc1,desc3,desc5形成一个chain,mbuf由desc1和desc3存放。那么如果mbuf再大点呢?达到desc1,desc3,desc5三个desc都无法存放呢?这时如果没有打开VIRTIO_NET_F_MRG_RXBUF调用virtio_dev_rx就会接收出错了(尽管还有其他可用的desc,但由于不在一个chain,所以也不会使用)。但是如果打开了VIRTIO_NET_F_MRG_RXBUF,则会尝试找其他其他的chain。
还以上图为例,desc数组中共有两个chain1:desc1->desc3->desc5;
chain2:desc2->desc4->desc6,那么当chain1无法存下这个mbuf数据时,mbuf剩下的数据将由chain2存放。主要:一个mbuf绝不可能按照desc1àdesc2的顺序存放,因为desc1和desc2属于不同的chain,只有当前chain使用完才能使用另一个chain。
由于desc数组的顺序不一定是按照chain的顺序组织的(如上图),所以为了方便后续的mbuf到各个desc的拷贝操作,我们增加了buf_vector这个数组,用它来记录拷贝mbuf的desc顺序。如下图所示:
这样我们拷贝mbuf时就可以按照buf_vector1到buf_vector6依次拷贝到对应的desc中了。
另外我们注意一点,在函数的最后会对last_avail_idx进行更新:vq->last_avail_idx += num_buffers; 注意这里的num_buffers不是这个mbuf使用的desc个数,而是使用的desc chain个数。还以上面mbuf使用两个chain为例,last_avail_idx需要加2,如下图所示,如果之前last_avail_idx为1的话,这里就要更新为3了。
这里容易混淆的一点是:avail->ring中存放的index不是和desc一一对应的,而是和desc chain一一对应的,即只存放desc
chain header的index。所以avail->idx-last_avail_id也不是可用desc的个数,而是可用desc
chain的个数。
有了以上背景,我们再看reserve_avail_buf_mergeable这个函数就容易理解多了。这个函数的作用就是:预留足够的desc来存放mbuf,同时使用buf_vec来记录,每个buf_vec对应一个desc,如果当前所有可用的desc都无法装得下这个mbuf则返错。
l reserve_avail_buf_mergeable
-
static inline int
-
reserve_avail_buf_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
uint32_t size, struct buf_vector *buf_vec,
-
uint16_t *num_buffers, uint16_t avail_head)
-
{
-
uint16_t cur_idx;
-
uint32_t vec_idx = 0;
-
uint16_t tries = 0;
-
-
uint16_t head_idx = 0;
-
uint16_t len = 0;
-
/*num_buffers为要使用的desc chain的个数*/
-
*num_buffers = 0;
-
cur_idx = vq->last_avail_idx;
-
/*每次遍历一个desc chain,遍历过的chain加起来可以存放下这个mbuf*/
-
while (size > 0) {
-
if (unlikely(cur_idx == avail_head))
-
return -1;
-
/*fill_vec_buf的作用遍历下一个desc chain,用来存放mbuf,然后buf_vec记录这些desc的信息*/
-
if (unlikely(fill_vec_buf(dev, vq, cur_idx, &vec_idx, buf_vec,
-
&head_idx, &len) < 0))
-
return -1;
-
len = RTE_MIN(len, size);
-
update_shadow_used_ring(vq, head_idx, len);
-
size -= len;
-
-
cur_idx++;
-
tries++;
-
*num_buffers += 1;
-
-
/*
-
* if we tried all available ring items, and still
-
* can't get enough buf, it means something abnormal
-
* happened.
-
*/
-
/*当尝试次数大于了vq->size,说明所有可用的desc都被扫描过了,
-
即所有可用的desc加起来都无法满足这个mbuf*/
-
if (unlikely(tries >= vq->size))
-
return -1;
-
}
-
-
return 0;
-
}
这里需要注意的一点就是num_buffers不是使用desc的个数,而是使用的desc
chain个数。而fill_vec_buf负责每次挑选一个desc chain填入对应的buf_vector,注意传入参数head_idx每次随着被desc填充而被修改。下面看fill_vec_buf。
l fill_vec_buf
-
static inline int __attribute__((always_inline))
-
fill_vec_buf(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
uint32_t avail_idx, uint32_t *vec_idx,
-
struct buf_vector *buf_vec, uint16_t *desc_chain_head,
-
uint16_t *desc_chain_len)
-
{
-
uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
-
uint32_t vec_id = *vec_idx;
-
uint32_t len = 0;
-
struct vring_desc *descs = vq->desc;
-
-
*desc_chain_head = idx;
-
/* VRING_DESC_F_INDIRECT处理 */
-
if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) {
-
descs = (struct vring_desc *)(uintptr_t)
-
gpa_to_vva(dev, vq->desc[idx].addr);
-
if (unlikely(!descs))
-
return -1;
-
-
idx = 0;
-
}
-
/*遍历这个desc chain知道结束,将这个chain中的desc信息记录到buf_vec中,
-
*chain能存放的数据长度赋值给desc_chain_len*/
-
while (1) {
-
if (unlikely(vec_id >= BUF_VECTOR_MAX || idx >= vq->size))
-
return -1;
-
-
len += descs[idx].len;
-
buf_vec[vec_id].buf_addr = descs[idx].addr;
-
buf_vec[vec_id].buf_len = descs[idx].len;
-
buf_vec[vec_id].desc_idx = idx;
-
vec_id++;
-
-
if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0)
-
break;
-
-
idx = descs[idx].next;
-
}
-
-
*desc_chain_len = len;
-
*vec_idx = vec_id;
-
-
return 0;
-
}
这个函数比较简单,就是遍历一个desc
chain到结束为止,然后将这个chain的信息存放在buf_vec中,将这个chain能存放的数据长度信息返回,以供上层判断是否还需要再找下一个chain填充。
这里有一个VRING_DESC_F_INDIRECT 的desc特性的判断,这里顺便说下direct desc和indirect desc的区别。通常的desc->addr指向的是存放数据的page,这样的desc叫做direct
desc。但是indirect desc的desc->addr指向的是一个direct
desc数组,如下图所示。(主要indirect
desc指向的只能是direct desc,即不能再继续级联下去)
最后在开启mergeable后virtio_dev_merge_rx的调用和普通模式virtio_dev_rx的调用还有点不同。在virtio_dev_rx中,每次将mbuf中的数据存放在一个desc后都会更新vq->used->ring和vq->last_used_idx,即告诉前端那些desc中已经存放了数据。但在virtio_dev_merge_rx中却没有看到这个过程。其实这个过程是有的,我们注意到在reserve_avail_buf_mergeable中每次调用完fill_vec_buf就会调用一下update_shadow_used_ring,我们看一下其实现。
l update_shadow_used_ring
-
static inline void __attribute__((always_inline))
-
update_shadow_used_ring(struct vhost_virtqueue *vq,
-
uint16_t desc_idx, uint16_t len)
-
{
-
uint16_t i = vq->shadow_used_idx++;
-
-
vq->shadow_used_ring[i].id = desc_idx;
-
vq->shadow_used_ring[i].len = len;
-
}
这里会更新shadow_used_idx和shadow_used_ring。这里引入的shadow_used_idx和shadow_used_ring其实是为了最后批处理更新vq->used->ring,通常更新vq->used->ring要先找到对应的idx,在更新vq->used->ring[idx]。如果对于要使用多个desc
chain的情况,这样每次更新就会造成较大的访存开销。那我们看看使用shadow_used_idx和shadow_used_ring会怎么更新。在virtio_dev_merge_rx中拷贝完mbuf数据后,最后会调用flush_shadow_used_ring函数。
l flush_shadow_used_ring
-
static inline void __attribute__((always_inline))
-
flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq)
-
{
-
uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
-
/*vq->shadow_used_idx存放的是本次使用的desc chain个数*/
-
/*如果used_idx + vq->shadow_used_idx没有产生环形队列回绕*/
-
if (used_idx + vq->shadow_used_idx <= vq->size) {
-
do_flush_shadow_used_ring(dev, vq, used_idx, 0,
-
vq->shadow_used_idx);
-
} else {
-
uint16_t size;
-
/*used_idx + vq->shadow_used_idx产生了回绕,需要分首尾两部分更新*/
-
/* update used ring interval [used_idx, vq->size] */
-
size = vq->size - used_idx;
-
do_flush_shadow_used_ring(dev, vq, used_idx, 0, size);
-
-
/* update the left half used ring interval [0, left_size] */
-
do_flush_shadow_used_ring(dev, vq, 0, size,
-
vq->shadow_used_idx - size);
-
}
-
vq->last_used_idx += vq->shadow_used_idx;
-
-
rte_smp_wmb();
-
/*更新vq->used->idx*/
-
*(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx;
-
/*更新热迁移位图*/
-
vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
-
sizeof(vq->used->idx));
-
}
我们再看下do_flush_shadow_used_ring的处理。
l do_flush_shadow_used_ring
-
static inline void __attribute__((always_inline))
-
do_flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
-
uint16_t to, uint16_t from, uint16_t size)
-
{
-
rte_memcpy(&vq->used->ring[to],
-
&vq->shadow_used_ring[from],
-
size * sizeof(struct vring_used_elem));
-
vhost_log_used_vring(dev, vq,
-
offsetof(struct vring_used, ring[to]),
-
size * sizeof(struct vring_used_elem));
-
}
可以看到由于之前shadow_used_ring中有相关记录,所以这里可以一次性拷贝到uesd_ring中,减少了内存访问次数。
阅读(4238) | 评论(0) | 转发(0) |