vhost前后端(vhost_net/virtio_net)转发流程详解
——lvyilong316
说明:本系列文章是早些年读linux kernel 3.10代码时写的,现在为了查阅方便作为记录,也分享给需要的人。
vhost net 的目的是为了避免在host kernel上做一次qemu的调度,提升性能。让vm的数据报在 host的内核就把报文发送出去。
vhost_net初始化
vhost-net在kernel中是miscdevice的形态存在的。在Linux驱动中把无法归类的五花八门的设备定义为混杂设备(用miscdevice结构体表述)。miscdevice共享一个主设备号MISC_MAJOR(即10),但次设备号不同。 所有的miscdevice设备形成了一个链表,对设备访问时内核根据次设备号查找对应的miscdevice设备,然后调用其file_operations结构中注册的文件操作接口进行操作。对应vhost-net,其file_operations为:
-
static const struct file_operations vhost_net_fops = {
-
.owner = THIS_MODULE,
-
.release = vhost_net_release,
-
.unlocked_ioctl = vhost_net_ioctl,
-
#ifdef CONFIG_COMPAT
-
.compat_ioctl = vhost_net_compat_ioctl,
-
#endif
-
.open = vhost_net_open,
-
.llseek = noop_llseek,
-
};
另一方面,当qemu创建tap设备时会调用到net_init_tap()函数。net_init_tap()其中会检查选项是否指定vhost=on,如果指定,则会调用到vhost_net_init()进行初始化,其中通过open(“/dev/vhost-net”, O_RDWR)打开了vhost-net driver;并通过ioctl(vhost_fd)进行了一系列的初始化。而open(“/dev/vhost-net”, O_RDWR),则会调用到vhost-net驱动的vhost_net_fops-open函数,即vhost_net_open。
l vhost_net_open
-
vhost_net_open
-
static int vhost_net_open(struct inode *inode, struct file *f)
-
{
-
struct vhost_net *n = kmalloc(sizeof *n, GFP_KERNEL);
-
struct vhost_dev *dev;
-
struct vhost_virtqueue **vqs;
-
int r, i;
-
-
if (!n)
-
return -ENOMEM;
-
vqs = kmalloc(VHOST_NET_VQ_MAX * sizeof(*vqs), GFP_KERNEL);
-
if (!vqs) {
-
kfree(n);
-
return -ENOMEM;
-
}
-
-
dev = &n->dev;
-
/*初始化vq*/
-
vqs[VHOST_NET_VQ_TX] = &n->vqs[VHOST_NET_VQ_TX].vq;
-
vqs[VHOST_NET_VQ_RX] = &n->vqs[VHOST_NET_VQ_RX].vq;
-
n->vqs[VHOST_NET_VQ_TX].vq.handle_kick = handle_tx_kick;
-
n->vqs[VHOST_NET_VQ_RX].vq.handle_kick = handle_rx_kick;
-
for (i = 0; i < VHOST_NET_VQ_MAX; i++) {
-
n->vqs[i].ubufs = NULL;
-
n->vqs[i].ubuf_info = NULL;
-
n->vqs[i].upend_idx = 0;
-
n->vqs[i].done_idx = 0;
-
n->vqs[i].vhost_hlen = 0;
-
n->vqs[i].sock_hlen = 0;
-
}
-
/*初始化vhost-net的vhost_dev 成员*/
-
r = vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);
-
if (r < 0) {
-
kfree(n);
-
kfree(vqs);
-
return r;
-
}
-
/* 初始化vhost_net上的vhost_poll结构,将handle_tx_net,handle_tx_net分别注册在vhost_net->poll->work上 */
-
vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT, dev);
-
vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN, dev);
-
/*赋值给open /dev/vhost-net得到的file结构的private_data字段*/
-
f->private_data = n;
-
-
return 0;
-
}
整个vhost_net_open的调用路径如下图。主要是创建vhost_net结构,并对其成员进行初始化。
下面逐个分析这些函数,vhost_dev_init函数主要负责初始化vhost_net的vhost_dev成员。
l vhost_dev_init
-
vhost_dev_init
-
long vhost_dev_init(struct vhost_dev *dev,
-
struct vhost_virtqueue **vqs, int nvqs)
-
{
-
int i;
-
/*将vq和vhost_dev关联*/
-
dev->vqs = vqs;
-
dev->nvqs = nvqs; //vqueue的数量(2)
-
mutex_init(&dev->mutex);
-
dev->log_ctx = NULL;
-
dev->log_file = NULL;
-
dev->memory = NULL;
-
dev->mm = NULL;
-
spin_lock_init(&dev->work_lock);
-
INIT_LIST_HEAD(&dev->work_list);
-
dev->worker = NULL;
-
-
for (i = 0; i < dev->nvqs; ++i) {
-
dev->vqs[i]->log = NULL;
-
dev->vqs[i]->indirect = NULL;
-
dev->vqs[i]->heads = NULL;
-
dev->vqs[i]->dev = dev;
-
mutex_init(&dev->vqs[i]->mutex);
-
vhost_vq_reset(dev, dev->vqs[i]);
-
if (dev->vqs[i]->handle_kick)
-
/*初始化vhost-dev的vhost_poll结构*/
-
vhost_poll_init(&dev->vqs[i]->poll,
-
dev->vqs[i]->handle_kick, POLLIN, dev);
-
}
-
-
return 0;
-
}
其中又会调用vhost_poll_init来初始化struct vhost_virtqueue的vhost_poll结构。
l vhost_poll_init
-
vhost_poll_init
-
void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn,
-
unsigned long mask, struct vhost_dev *dev)
-
{
-
/* 将vhost_poll_wakeup注册给poll->wait->func */
-
init_waitqueue_func_entry(&poll->wait, vhost_poll_wakeup);
-
init_poll_funcptr(&poll->table, vhost_poll_func);
-
poll->mask = mask;
-
poll->dev = dev;
-
poll->wqh = NULL;
-
/* 将传入的fn即handle_tx_kick或handle_rx_kick注册给poll->work->fn */
-
vhost_work_init(&poll->work, fn);
-
}
下面再回到vhost_net_open函数中。接下来两次调用vhost_poll_init:
vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT, dev);
vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN, dev);
这两次调用分别是针对vhost_net的两个vhost_poll成员(一个管接收一个管发送)进行初始化的。由于都是用vhost_poll_init,所以和之前初始化vhost_virtqueue的vhost_poll结构是相同的,只是由于参数不同,所以其vhost_poll->work->fn函数被分别初始化为了handle_rx/tx_net和handle_rx/tx_kick。
下面是整个初始化过程结束后创建的数据结构关系图。(说明:绿色和紫色分别代表收发两个队列相关结构,由于篇幅有限,只将接收队列结构画完整了)
数据通道
下面分析一下vhost是如何收发数据的。我们分两个方向分析,首先是vm的收方向,然后是vm的发方向。
HostàGuest
Host将数据包传递到vm可以由以下三个调用路径逻辑分析。所有调用的路径分析都依托于下面这个数据结构关系图。
其中上半部分是vhost设备,就是open “/dev/vhost_net”的结果,而下面是关联的tap设备。
路径1
下面我们先看第一个调用路径
首先qemu在创建后端设备,也就是打开vhost_net后,会调用ioctl发送VHOST_NET_SET_BACKEND命令。对应vhost的处理函数即为vhost_net_set_backend。
l vhost_net_set_backend
-
/*这里传入的fd是qemu传入的打开后端tap设备*/
-
static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
-
{
-
struct socket *sock, *oldsock;
-
struct vhost_virtqueue *vq;
-
struct vhost_net_virtqueue *nvq;
-
struct vhost_net_ubuf_ref *ubufs, *oldubufs = NULL;
-
int r;
-
-
mutex_lock(&n->dev.mutex);
-
r = vhost_dev_check_owner(&n->dev);
-
if (r)
-
goto err;
-
-
if (index >= VHOST_NET_VQ_MAX) {
-
r = -ENOBUFS;
-
goto err;
-
}
-
vq = &n->vqs[index].vq;
-
nvq = &n->vqs[index];
-
mutex_lock(&vq->mutex);
-
-
/* Verify that ring has been setup correctly. */
-
if (!vhost_vq_access_ok(vq)) {
-
r = -EFAULT;
-
goto err_vq;
-
}
-
sock = get_socket(fd);
-
if (IS_ERR(sock)) {
-
r = PTR_ERR(sock);
-
goto err_vq;
-
}
-
-
/* start polling new socket */
-
oldsock = rcu_dereference_protected(vq->private_data,
-
lockdep_is_held(&vq->mutex));
-
if (sock != oldsock) {/*如果之前vq关联的sock和当前tap设备的sock不是同一个*/
-
ubufs = vhost_net_ubuf_alloc(vq,
-
sock && vhost_sock_zcopy(sock));
-
if (IS_ERR(ubufs)) {
-
r = PTR_ERR(ubufs);
-
goto err_ubufs;
-
}
-
/*停掉vq的接收工作,即将vhost_net->poll->wait移除等待队列*/
-
vhost_net_disable_vq(n, vq);
-
rcu_assign_pointer(vq->private_data, sock); /*将当前新的tap设备的sock赋值给vq的private_data*/
-
r = vhost_init_used(vq);
-
if (r)
-
goto err_used;
-
r = vhost_net_enable_vq(n, vq);/*enable vq,后文分析*/
-
if (r)
-
goto err_used;
-
-
oldubufs = nvq->ubufs;
-
nvq->ubufs = ubufs;
-
-
n->tx_packets = 0;
-
n->tx_zcopy_err = 0;
-
n->tx_flush = false;
-
}
-
-
mutex_unlock(&vq->mutex);
-
-
if (oldubufs) {
-
vhost_net_ubuf_put_wait_and_free(oldubufs);
-
mutex_lock(&vq->mutex);
-
vhost_zerocopy_signal_used(n, vq);
-
mutex_unlock(&vq->mutex);
-
}
-
-
if (oldsock) {
-
vhost_net_flush_vq(n, index);
-
fput(oldsock->file);
-
}
-
-
mutex_unlock(&n->dev.mutex);
-
return 0;
-
}
下面看vhost_net_enable_vq是如何enable vq的。
l vhost_net_enable_vq
-
static int vhost_net_enable_vq(struct vhost_net *n,
-
struct vhost_virtqueue *vq)
-
{
-
struct vhost_net_virtqueue *nvq =
-
container_of(vq, struct vhost_net_virtqueue, vq);
-
/*获取vhost_net 对应的vhost_poll结构*/
-
struct vhost_poll *poll = n->poll + (nvq - n->vqs);
-
struct socket *sock;
-
-
sock = rcu_dereference_protected(vq->private_data,
-
lockdep_is_held
l vhost_poll_start
-
/* Start polling a file. We add ourselves to file's wait queue. The caller must
-
* keep a reference to a file until after vhost_poll_stop is called. */
-
int vhost_poll_start(struct vhost_poll *poll, struct file *file)
-
{
-
unsigned long mask;
-
int ret = 0;
-
-
if (poll->wqh)
-
return 0;
-
/*这里的poll函数也就是tap设备的poll函数*/
-
mask = file->f_op->poll(file, &poll->table);
-
if (mask)
-
vhost_poll_wakeup(&poll->wait, 0, 0, (void *)mask);
-
if (mask & POLLERR) {
-
if (poll->wqh)
-
remove_wait_queue(poll->wqh, &poll->wait);
-
ret = -EINVAL;
-
}
-
-
return ret;
-
}
T
ap设备的poll函数为tun_chr_poll,而其中主要调用了poll_wait(file, &tfile->wq.wait, wait)。
-
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
-
{
-
if (p && p->_qproc && wait_address)
-
p->_qproc(filp, wait_address, p);
-
}
其中又调用了传入poll_table
的_qproc函数。对于vhost_poll的poll_table,这个函数为vhost_poll_func。
l vhost_poll_func
-
static void vhost_poll_func(struct file *file, wait_queue_head_t *wqh,
-
poll_table *pt)
-
{
-
struct vhost_poll *poll;
-
-
poll = container_of(pt, struct vhost_poll, table);
-
poll->wqh = wqh;
-
add_wait_queue(wqh, &poll->wait);
-
}
可以看到这个函数主要是将vhost_net->poll->wait挂在tap设备的struct tun_file的等待队列上。
路径2
下面来看第二个调用路径,如下所示。
当tap设备发送数据包的时候,会调用对应net_device的ndo_start_xmit函数,对于tap设备而言就是tun_net_xmit。
l tun_net_xmit
-
/* Net device start xmit */
-
static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
-
{
-
struct tun_struct *tun = netdev_priv(dev);
-
int txq = skb->queue_mapping;
-
struct tun_file *tfile;
-
-
rcu_read_lock();
-
tfile = rcu_dereference(tun->tfiles[txq]);
-
-
/* Drop packet if interface is not attached */
-
if (txq >= tun->numqueues)
-
goto drop;
-
-
tun_debug(KERN_INFO, tun, "tun_net_xmit %d\n", skb->len);
-
-
BUG_ON(!tfile);
-
-
/* Drop if the filter does not like it.
-
* This is a noop if the filter is disabled.
-
* Filter can be enabled only for the TAP devices. */
-
if (!check_filter(&tun->txflt, skb))
-
goto drop;
-
-
if (tfile->socket.sk->sk_filter &&
-
sk_filter(tfile->socket.sk, skb))
-
goto drop;
-
-
/* Limit the number of packets queued by dividing txq length with the
-
* number of queues.
-
*/
-
if (skb_queue_len(&tfile->socket.sk->sk_receive_queue)
-
>= dev->tx_queue_len / tun->numqueues)
-
goto drop;
-
-
/* Orphan the skb - required as we might hang on to it
-
* for indefinite time. */
-
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
-
goto drop;
-
skb_orphan(skb);
-
-
nf_reset(skb);
-
-
/*将数据包放入tap设备关联的队列中*/
-
/* Enqueue packet */
-
skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
-
-
/* Notify and wake up reader process */
-
if (tfile->flags & TUN_FASYNC)
-
kill_fasync(&tfile->fasync, SIGIO, POLL_IN);
-
/*唤醒在tap设备等待队列等待的进程*/
-
wake_up_interruptible_poll(&tfile->wq.wait, POLLIN |
-
POLLRDNORM | POLLRDBAND);
-
-
rcu_read_unlock();
-
return NETDEV_TX_OK;
-
-
drop:
-
dev->stats.tx_dropped++;
-
skb_tx_error(skb);
-
kfree_skb(skb);
-
rcu_read_unlock();
-
return NETDEV_TX_OK;
-
}
其中wake_up_interruptible_poll会进一步调用wait_queue_t结构(等待队列的节点)上的fun。这个结构就是vhost_poll->wait,它的fun在vhost_net初始化时被设置为vhost_poll_wakeup。
vhost_poll_wakeup
-
static int vhost_poll_wakeup(wait_queue_t *wait, unsigned mode, int sync,
-
void *key)
-
{
-
struct vhost_poll *poll = container_of(wait, struct vhost_poll, wait);
-
-
if (!((unsigned long)key & poll->mask))
-
return 0;
-
-
vhost_poll_queue(poll);
-
return 0;
-
}
其中又会调用vhost_poll_queue函数。
l vhost_poll_queue
-
void vhost_poll_queue(struct vhost_poll *poll)
-
{
-
vhost_work_queue(poll->dev, &poll->work);
-
}
l vhost_work_queue
-
void vhost_work_queue(struct vhost_dev *dev, struct vhost_work *work)
-
{
-
unsigned long flags;
-
-
spin_lock_irqsave(&dev->work_lock, flags);
-
if (list_empty(&work->node)) {
-
/*将 vhost_poll->work添加到vhost_dev->work_list*/
-
list_add_tail(&work->node, &dev->work_list);
-
work->queue_seq++;
-
wake_up_process(dev->worker);
-
}
-
spin_unlock_irqrestore(&dev->work_lock, flags);
-
}
可以看到这里将
vhost_poll->work也就是vhost_work结构添加到vhost_dev->work_list中。
路径3
下面看下第三个调用路径。
qemu 调用过open “/dev/vhost_net”后会还会通过ioctl发送VHOST_SET_OWNER命令,这个命令主要是创建后端vhost线程。对应的kernel处理函数为vhost_net_set_owner。其中又会调用vhost_dev_set_owner。
l vhost_dev_set_owner
-
long vhost_dev_set_owner(struct vhost_dev *dev)
-
{
-
struct task_struct *worker;
-
int err;
-
-
/* Is there an owner already? */
-
if (vhost_dev_has_owner(dev)) {
-
err = -EBUSY;
-
goto err_mm;
-
}
-
-
/* No owner, become one */
-
dev->mm = get_task_mm(current);
-
worker = kthread_create(vhost_worker, dev, "vhost-%d", current->pid);
-
if (IS_ERR(worker)) {
-
err = PTR_ERR(worker);
-
goto err_worker;
-
}
-
-
dev->worker = worker;
-
wake_up_process(worker); /* avoid contributing to loadavg */
-
-
err = vhost_attach_cgroups(dev);
-
if (err)
-
goto err_cgroup;
-
-
err = vhost_dev_alloc_iovecs(dev);
-
if (err)
-
goto err_cgroup;
-
-
return 0;
-
}
vhost_dev_set_owner
主要创建了名字为vhost-pid的kthread。其执行函数为vhost_worker。
l vhost_worker
-
static int vhost_worker(void *data)
-
{
-
struct vhost_dev *dev = data;
-
struct vhost_work *work = NULL;
-
unsigned uninitialized_var(seq);
-
mm_segment_t oldfs = get_fs();
-
-
set_fs(USER_DS);
-
use_mm(dev->mm);
-
-
for (;;) {
-
/* mb paired w/ kthread_stop */
-
set_current_state(TASK_INTERRUPTIBLE);
-
-
spin_lock_irq(&dev->work_lock);
-
if (work) {
-
work->done_seq = seq;
-
if (work->flushing)
-
wake_up_all(&work->done);
-
}
-
-
if (kthread_should_stop()) {
-
spin_unlock_irq(&dev->work_lock);
-
__set_current_state(TASK_RUNNING);
-
break;
-
}
-
/*判断dev->work_list是否为NULL*/
-
if (!list_empty(&dev->work_list)) {
-
work = list_first_entry(&dev->work_list,
-
struct vhost_work, node);
-
list_del_init(&work->node);
-
seq = work->queue_seq;
-
} else
-
work = NULL;
-
spin_unlock_irq(&dev->work_lock);
-
-
if (work) {
-
__set_current_state(TASK_RUNNING);
-
work->fn(work);/*调用dev->work_list上的vhost_work的函数*/
-
if (need_resched())
-
schedule();
-
} else
-
schedule();
-
-
}
-
unuse_mm(dev->mm);
-
set_fs(oldfs);
-
return 0;
-
}
从之前的“路径
2”可知,vhost_work_queue会将vhost_poll->work添加到vhost_dev->work_list。而对于接收和发送对应着不同的vhost_poll->work,对于接收的vhost_poll->work,其注册的fn函数为handle_rx_net。
l handle_rx_net
-
static void handle_rx_net(struct vhost_work *work)
-
{
-
struct vhost_net *net = container_of(work, struct vhost_net,
-
poll[VHOST_NET_VQ_RX].work);
-
handle_rx(net);
-
}
其中又会调用
handle_rx。
l handle_rx
-
static void handle_rx(struct vhost_net *net)
-
{
-
struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_RX];
-
struct vhost_virtqueue *vq = &nvq->vq;
-
unsigned uninitialized_var(in), log;
-
struct vhost_log *vq_log;
-
struct msghdr msg = {
-
.msg_name = NULL,
-
.msg_namelen = 0,
-
.msg_control = NULL, /* FIXME: get and handle RX aux data. */
-
.msg_controllen = 0,
-
.msg_iov = vq->iov,
-
.msg_flags = MSG_DONTWAIT,
-
};
-
struct virtio_net_hdr_mrg_rxbuf hdr = {
-
.hdr.flags = 0,
-
.hdr.gso_type = VIRTIO_NET_HDR_GSO_NONE
-
};
-
size_t total_len = 0;
-
int err, mergeable;
-
s16 headcount;
-
size_t vhost_hlen, sock_hlen;
-
size_t vhost_len, sock_len;
-
/* TODO: check that we are running from vhost_worker? */
-
/* 获取vq上关联的sock结构 */
-
struct socket *sock = rcu_dereference_check(vq->private_data, 1);
-
-
if (!sock)
-
return;
-
-
mutex_lock(&vq->mutex);
-
vhost_disable_notify(&net->dev, vq);
-
vhost_hlen = nvq->vhost_hlen;
-
sock_hlen = nvq->sock_hlen;
-
-
vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
-
vq->log : NULL;
-
mergeable = vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF);
-
-
while ((sock_len = peek_head_len(sock->sk))) {
-
sock_len += sock_hlen;
-
vhost_len = sock_len + vhost_hlen;
-
headcount = get_rx_bufs(vq, vq->heads, vhost_len,
-
&in, vq_log, &log,
-
likely(mergeable) ? UIO_MAXIOV : 1);
-
/* On error, stop handling until the next kick. */
-
if (unlikely(headcount < 0))
-
break;
-
/* On overrun, truncate and discard */
-
if (unlikely(headcount > UIO_MAXIOV)) {
-
msg.msg_iovlen = 1;
-
/*由于这里的sock其实就是tap设备的sock结构,所以其接受函数就是tun_recvmsg */
-
err = sock->ops->recvmsg(NULL, sock, &msg,
-
1, MSG_DONTWAIT | MSG_TRUNC);
-
pr_debug("Discarded rx packet: len %zd\n", sock_len);
-
continue;
-
}
-
/* OK, now we need to know about added descriptors. */
-
if (!headcount) {
-
if (unlikely(vhost_enable_notify(&net->dev, vq))) {
-
/* They have slipped one in as we were
-
* doing that: check again. */
-
vhost_disable_notify(&net->dev, vq);
-
continue;
-
}
-
/* Nothing new? Wait for eventfd to tell us
-
* they refilled. */
-
break;
-
}
-
/* We don't need to be notified again. */
-
if (unlikely((vhost_hlen)))
-
/* Skip header. TODO: support TSO. */
-
move_iovec_hdr(vq->iov, nvq->hdr, vhost_hlen, in);
-
else
-
/* Copy the header for use in VIRTIO_NET_F_MRG_RXBUF:
-
* needed because recvmsg can modify msg_iov. */
-
copy_iovec_hdr(vq->iov, nvq->hdr, sock_hlen, in);
-
msg.msg_iovlen = in;
-
err = sock->ops->recvmsg(NULL, sock, &msg,
-
sock_len, MSG_DONTWAIT | MSG_TRUNC);
-
/* Userspace might have consumed the packet meanwhile:
-
* it's not supposed to do this usually, but might be hard
-
* to prevent. Discard data we got (if any) and keep going. */
-
if (unlikely(err != sock_len)) {
-
pr_debug("Discarded rx packet: "
-
" len %d, expected %zd\n", err, sock_len);
-
vhost_discard_vq_desc(vq, headcount);
-
continue;
-
}
-
if (unlikely(vhost_hlen) &&
-
memcpy_toiovecend(nvq->hdr, (unsigned char *)&hdr, 0,
-
vhost_hlen)) {
-
vq_err(vq, "Unable to write vnet_hdr at addr %p\n",
-
vq->iov->iov_base);
-
break;
-
}
-
/* TODO: Should check and handle checksum. */
-
if (likely(mergeable) &&
-
memcpy_toiovecend(nvq->hdr, (unsigned char *)&headcount,
-
offsetof(typeof(hdr), num_buffers),
-
sizeof hdr.num_buffers)) {
-
vq_err(vq, "Failed num_buffers write");
-
vhost_discard_vq_desc(vq, headcount);
-
break;
-
}
-
/* 更新virtio队列,并kick guest */
-
vhost_add_used_and_signal_n(&net->dev, vq, vq->heads,
-
headcount);
-
if (unlikely(vq_log))
-
vhost_log_write(vq, vq_log, log, vhost_len);
-
total_len += vhost_len;
-
/* 如果本次接受达到最大限制,则再次将vhost_poll加入到链表中,等待线程下次调度 */
-
if (unlikely(total_len >= VHOST_NET_WEIGHT)) {
-
vhost_poll_queue(&vq->poll);
-
break;
-
}
-
}
-
-
mutex_unlock(&vq->mutex);
-
}
在将数据包拷贝到virtio
队列后,就调用vhost_add_used_and_signal_n更新virtio队列,并kick guest。
l vhost_add_used_and_signal_n
-
void vhost_add_used_and_signal_n(struct vhost_dev *dev,
-
struct vhost_virtqueue *vq,
-
struct vring_used_elem *heads, unsigned count)
-
{
-
vhost_add_used_n(vq, heads, count);
-
vhost_signal(dev, vq);
-
}
其中
vhost_add_used_n用于更新virtio队列,具体原理我们在后面分析vhost_user再讲。vhost_signal用于通知guest,其实是通过内核kvm模块来实现的,这个我们也放在后面单独分析。
Guest内部接收数据
本节重点分析一下guset中是如何收取数据包的,也就是virtio-net的实现。其对应代码主要在virtio_net.c中。这里我们仍然以kernel 3.10为例分析。
首先virtio_net实现了pci设备的一般处理函数,virtnet_probe用于pci总线发现virtio net设备。这些在“virtio-net初始化”一节已经讲过,这里就以初始化后的数据结构关系图开始展开。
首先,但后端处理完数据包,放入和前端共享的virtio ring后就会Call前端,这会导致前端guest产生中断。由于每个vq的注册的中断处理函数为vp_interrupt(具体参考“virtio-net初始化”),这会导致这个中断处理函数被调用。
l vp_interrupt
-
static irqreturn_t vp_interrupt(int irq, void *opaque)
-
{
-
struct virtio_pci_device *vp_dev = opaque;
-
u8 isr;
-
-
/* reading the ISR has the effect of also clearing it so it's very
-
* important to save off the value. */
-
/*获取中断服务号*/
-
isr = ioread8(vp_dev->ioaddr + VIRTIO_PCI_ISR);
-
-
/* It's definitely not us if the ISR was not high */
-
if (!isr)
-
return IRQ_NONE;
-
/*如果是配置中断,则调用vp_config_changed处理配置中断*/
-
/* Configuration change? Tell driver if it wants to know. */
-
if (isr & VIRTIO_PCI_ISR_CONFIG)
-
vp_config_changed(irq, opaque);
-
/*是数据中断,则调用vp_vring_interrupt处理*/
-
return vp_vring_interrupt(irq, opaque);
-
}
如果产生的中断不是配置中断的话则调用vp_vring_interrupt。
vp_vring_interrupt
-
/* Notify all virtqueues on an interrupt. */
-
static irqreturn_t vp_vring_interrupt(int irq, void *opaque)
-
{
-
struct virtio_pci_device *vp_dev = opaque;
-
struct virtio_pci_vq_info *info;
-
irqreturn_t ret = IRQ_NONE;
-
unsigned long flags;
-
-
spin_lock_irqsave(&vp_dev->lock, flags);
-
/*中断处理程序返回IRQ_HANDLED表示接收到了准确的中断信号,并且作了相应正确的处理*/
-
list_for_each_entry(info, &vp_dev->virtqueues, node) {
-
if (vring_interrupt(irq, info->vq) == IRQ_HANDLED)
-
ret = IRQ_HANDLED;
-
}
-
spin_unlock_irqrestore(&vp_dev->lock, flags);
-
-
return ret;
-
}
尝试调用每个queue的中断处理函数。中断处理函数可能返回的值有两个: IRQ_NONE表示中断程序接收到中断信号后发现这并不是注册时指定的中断原发出的中断信号. IRQ_HANDLED表示接收到了准确的中断信号,并且作了相应正确的处理。
l vring_interrupt
-
irqreturn_t vring_interrupt(int irq, void *_vq)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
-
if (!more_used(vq)) {
-
pr_debug("virtqueue interrupt with no work for %p\n", vq);
-
return IRQ_NONE;
-
}
-
-
if (unlikely(vq->broken))
-
return IRQ_HANDLED;
-
-
pr_debug("virtqueue callback for %p (%p)\n", vq, vq->vq.callback);
-
if (vq->vq.callback)
-
vq->vq.callback(&vq->vq);
-
-
return IRQ_HANDLED;
-
}
其中主要调用了vq->vq.callback,对于接受队列,其callback被初始化为skb_recv_done。
l skb_recv_done
-
static void skb_recv_done(struct virtqueue *rvq)
-
{
-
struct virtnet_info *vi = rvq->vdev->priv;
-
struct receive_queue *rq = &vi->rq[vq2rxq(rvq)];
-
-
/* Schedule NAPI, Suppress further interrupts if successful. */
-
if (napi_schedule_prep(&rq->napi)) {/*将rq->napi 设置为NAPI_STATE_SCHED 状态*/
-
virtqueue_disable_cb(rvq); /*设置vq->vring.avail->flags |= VRING_AVAIL_F_NO_INTERRUPT */
-
__napi_schedule(&rq->napi);
-
}
-
}
l __napi_schedule
-
void __napi_schedule(struct napi_struct *n)
-
{
-
unsigned long flags;
-
-
local_irq_save(flags);
-
/*将recv_queue的napi加入每cpu变量softnet_data->poll_list */
-
____napi_schedule(&__get_cpu_var(softnet_data), n);
-
local_irq_restore(flags);
-
}
l ____napi_schedule
-
static inline void ____napi_schedule(struct softnet_data *sd,
-
struct napi_struct *napi)
-
{
-
list_add_tail(&napi->poll_list, &sd->poll_list);
-
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
-
}
可以看到____napi_schedule将receive_queue->napi加入到每cpu变量softnet_data->poll_list中,同时唤醒NET_RX_SOFTIRQ软中断。
到目前为止,接收的中断处理,也就是中断的上半部已经结束了。接下来是软中断,也就是中断的下半部。
由于软中断NET_RX_SOFTIRQ的处理函数被初始化为了net_rx_action,所以调用NET_RX_SOFTIRQ就会触发net_rx_action的执行。
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
l net_rx_action
-
static void net_rx_action(struct softirq_action *h)
-
{
-
struct softnet_data *sd = &__get_cpu_var(softnet_data);
-
unsigned long time_limit = jiffies + 2;
-
int budget = netdev_budget;
-
void *have;
-
-
local_irq_disable();
-
/*遍历当前cpu上的softnet_data->poll_list上的所有napi结构*/
-
while (!list_empty(&sd->poll_list)) {
-
struct napi_struct *n;
-
int work, weight;
-
-
if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
-
goto softnet_break;
-
-
local_irq_enable();
-
n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
-
have = netpoll_poll_lock(n);
-
weight = n->weight;
-
-
work = 0;
-
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
-
work = n->poll(n, weight);/*调用napi结构上的poll函数*/
-
trace_napi_poll(n);
-
}
-
-
WARN_ON_ONCE(work > weight);
-
budget -= work;
-
local_irq_disable();
-
if (unlikely(work == weight)) { /*如果已经poll出了weight个skb*/
-
if (unlikely(napi_disable_pending(n))) {/*如果napi被disable了,则直接将当前napi从softnet_data->poll_list上移除*/
-
local_irq_enable();
-
napi_complete(n);
-
local_irq_disable();
-
} else {
-
if (n->gro_list) { /*n->gro_list不空说明有skb还没有被重组完成*/
-
/* flush too old packets
-
* If HZ < 1000, flush all packets.
-
*/
-
local_irq_enable();
-
napi_gro_flush(n, HZ >= 1000); /*直接将n->gro_list的skb送往协议栈,不再等待重组*/
-
local_irq_disable();
-
}
-
list_move_tail(&n->poll_list, &sd->poll_list); /*将当前napi从softnet_data->poll_list上移除*/
-
}
-
}
-
-
netpoll_poll_unlock(have);
-
}
-
out:
-
net_rps_action_and_irq_enable(sd);
-
return;
-
-
softnet_break:
-
sd->time_squeeze++;
-
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
-
goto out;
-
}
这里要注意一下每次调用napi->poll出的skb个数为napi->weight,而这个值在virtio队列初始化被设置为napi_weight。
netif_napi_add(vi->dev, &vi->rq[i].napi, virtnet_poll,napi_weight)
napi_weight默认被初始化64,当然它是一个模块参数,所以是可以在模块加载时设置的。
-
static int napi_weight = NAPI_POLL_WEIGHT; //64
-
module_param(napi_weight, int, 0444);
回调收包逻辑,virtio-net将receive_queue的napi->poll函数初始化为,所以接下来调用的就是virtnet_poll。
l virtnet_poll
-
static int virtnet_poll(struct napi_struct *napi, int budget)
-
{
-
struct receive_queue *rq =
-
container_of(napi, struct receive_queue, napi);
-
struct virtnet_info *vi = rq->vq->vdev->priv;
-
void *buf;
-
unsigned int r, len, received = 0;
-
-
again:
-
while (received < budget &&/*virtqueue_get_buf取出要接收的skb*/
-
(buf = virtqueue_get_buf(rq->vq, &len)) != NULL) {
-
receive_buf(rq, buf, len); /*真正的接收处理操作,最终调用netif_receive_skb*/
-
--rq->num;
-
received++;
-
}
-
-
if (rq->num < rq->max / 2) {/* 如果接收队列空闲skb不够,重新refill,添加skb */
-
if (!try_fill_recv(rq, GFP_ATOMIC))
-
schedule_delayed_work(&vi->refill, 0);/*如果暂时分配不出来skb,则延时给delay_work去处理*/
-
}
-
-
/*接收skb个数小于budget,有可能是当时空闲的skb不足,则先调用napi_complete将gro_list的数据包接收,然后再次尝试接收,如果接收
-
成功,则再次调用napi*/
-
/* Out of packets? */
-
if (received < budget) {
-
r = virtqueue_enable_cb_prepare(rq->vq);
-
napi_complete(napi);
-
if (unlikely(virtqueue_poll(rq->vq, r)) &&
-
napi_schedule_prep(napi)) {
-
virtqueue_disable_cb(rq->vq);
-
__napi_schedule(napi);
-
goto again;
-
}
-
}
-
-
return received;
-
}
这个函数中比较重要的有三个函数的调用:
virtqueue_get_buf(rq->vq, &len):这个函数根据last_used_idx获取带接收的skb,并释放和这个skb相关联的vring上的desc。
receive_buf(rq, buf, len): 处理skb,将skb送往协议栈;
try_fill_recv(rq, GFP_ATOMIC):添加未使用的skb到队列,用于接收(或发送)。
下面逐个分析。
l virtqueue_get_buf
-
void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
void *ret;
-
unsigned int i;
-
u16 last_used;
-
-
START_USE(vq);
-
-
/* Only get used array entries after they have been exposed by host. */
-
virtio_rmb(vq->weak_barriers);
-
/*获取本次要是有的used_elem数组index*/
-
last_used = (vq->last_used_idx & (vq->vring.num - 1));
-
i = vq->vring.used->ring[last_used].id; /*本次要接受skb对应的data下标,也是skb对应第一个desc的index*/
-
*len = vq->vring.used->ring[last_used].len;/*本次要接受skb的长度*/
-
-
if (unlikely(i >= vq->vring.num)) {
-
BAD_RING(vq, "id %u out of range\n", i);
-
return NULL;
-
}
-
if (unlikely(!vq->data[i])) {
-
BAD_RING(vq, "id %u is not a head!\n", i);
-
return NULL;
-
}
-
-
/* detach_buf clears data, so grab it now. */
-
/*取出要接受的skb*/
-
ret = vq->data[i];
-
/*释放skb对应的desc*/
-
detach_buf(vq, i);
-
vq->last_used_idx++;
-
/* If we expect an interrupt for the next entry, tell host
-
* by writing event index and flush out the write before
-
* the read in the next get_buf call. */
-
if (!(vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
-
vring_used_event(&vq->vring) = vq->last_used_idx;
-
virtio_mb(vq->weak_barriers);
-
}
-
-
END_USE(vq);
-
return ret;
-
}
要看懂这个函数,需要借助下面这个接收ring的数据结构关系图。
vring.used->ring[last_used].id将data中的skb和vring中的desc关联起来,因为这个id即是待处理skb所在data数组的下标,也是这个skb对应的起始desc下标。获取到skb后,需要先将skb对应的desc释放,这个工作由detach_buf来完成。
l detach_buf
-
static void detach_buf(struct vring_virtqueue *vq, unsigned int head)
-
{
-
unsigned int i;
-
-
/* Clear data ptr. */
-
vq->data[head] = NULL;
-
-
/* Put back on free list: find end */
-
i = head;
-
-
/* Free the indirect table */
-
if (vq->vring.desc[i].flags & VRING_DESC_F_INDIRECT)
-
kfree(phys_to_virt(vq->vring.desc[i].addr));
-
/*因为这个skb马上要被接受处理了,所以释放这个skb对应的所有desc*/
-
while (vq->vring.desc[i].flags & VRING_DESC_F_NEXT) {
-
i = vq->vring.desc[i].next;
-
vq->vq.num_free++;
-
}
-
-
vq->vring.desc[i].next = vq->free_head;
-
vq->free_head = head;
-
/* Plus final descriptor */
-
vq->vq.num_free++;
-
}
下面看receive_buf,这个函数真正完成将skb送往协议栈。
l receive_buf
-
static void receive_buf(struct receive_queue *rq, void *buf, unsigned int len)
-
{
-
struct virtnet_info *vi = rq->vq->vdev->priv;
-
struct net_device *dev = vi->dev;
-
struct virtnet_stats *stats = this_cpu_ptr(vi->stats);
-
struct sk_buff *skb;
-
struct skb_vnet_hdr *hdr;
-
……
-
if (vi->mergeable_rx_bufs)
-
skb = receive_mergeable(dev, rq, buf, len);
-
else if (vi->big_packets)
-
skb = receive_big(dev, rq, buf);
-
else
-
skb = receive_small(buf, len);
-
-
if (unlikely(!skb))
-
return;
-
-
hdr = skb_vnet_hdr(skb);
-
-
u64_stats_update_begin(&stats->rx_syncp);
-
stats->rx_bytes += skb->len;
-
stats->rx_packets++;
-
u64_stats_update_end(&stats->rx_syncp);
-
-
if (hdr->hdr.flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) {
-
pr_debug("Needs csum!\n");
-
if (!skb_partial_csum_set(skb,
-
hdr->hdr.csum_start,
-
hdr->hdr.csum_offset))
-
goto frame_err;
-
} else if (hdr->hdr.flags & VIRTIO_NET_HDR_F_DATA_VALID) {
-
skb->ip_summed = CHECKSUM_UNNECESSARY;
-
}
-
-
skb->protocol = eth_type_trans(skb, dev);
-
pr_debug("Receiving skb proto 0x%04x len %i type %i\n",
-
ntohs(skb->protocol), skb->len, skb->pkt_type);
-
/*根据后端填入virtio_net_hdr中的信息,设置gso的相关字段,说明收到的是大包*/
-
if (hdr->hdr.gso_type != VIRTIO_NET_HDR_GSO_NONE) {
-
pr_debug("GSO!\n");
-
switch (hdr->hdr.gso_type & ~VIRTIO_NET_HDR_GSO_ECN) {
-
case VIRTIO_NET_HDR_GSO_TCPV4:
-
skb_shinfo(skb)->gso_type = SKB_GSO_TCPV4;
-
break;
-
case VIRTIO_NET_HDR_GSO_UDP:
-
skb_shinfo(skb)->gso_type = SKB_GSO_UDP;
-
break;
-
case VIRTIO_NET_HDR_GSO_TCPV6:
-
skb_shinfo(skb)->gso_type = SKB_GSO_TCPV6;
-
break;
-
default:
-
net_warn_ratelimited("%s: bad gso type %u.\n",
-
dev->name, hdr->hdr.gso_type);
-
goto frame_err;
-
}
-
-
if (hdr->hdr.gso_type & VIRTIO_NET_HDR_GSO_ECN)
-
skb_shinfo(skb)->gso_type |= SKB_GSO_TCP_ECN;
-
-
skb_shinfo(skb)->gso_size = hdr->hdr.gso_size;
-
if (skb_shinfo(skb)->gso_size == 0) {
-
net_warn_ratelimited("%s: zero gso size.\n", dev->name);
-
goto frame_err;
-
}
-
-
/* Header must be checked, and gso_segs computed. */
-
skb_shinfo(skb)->gso_type |= SKB_GSO_DODGY;
-
skb_shinfo(skb)->gso_segs = 0;
-
}
-
/*发往协议栈*/
-
netif_receive_skb(skb);
-
return;
-
-
frame_err:
-
dev->stats.rx_frame_errors++;
-
dev_kfree_skb(skb);
-
}
该函数会根据初始化时设置的特性,调用receive_small或receive_big或receive_mergeable根据数据初始化好skb。然后设置GSO的相关特性,注意这里的GSO不是对发送而言的,是以后端的角度,说明是后端合并了大包。最终调用netif_receive_skb将skb发往协议栈。注意这个版本(linux 3.10)virtio-net还不支持GRO,在linux 4.2的时候netif_receive_skb已经替换为了napi_gro_receiv函数,支持GRO功能。
最后是try_fill_recv,这个函数用于当data中可用的skb不够时,申请构造skb,同时通知后端更新。
l try_fill_recv
-
static bool try_fill_recv(struct receive_queue *rq, gfp_t gfp)
-
{
-
struct virtnet_info *vi = rq->vq->vdev->priv;
-
int err;
-
bool oom;
-
-
do {
-
if (vi->mergeable_rx_bufs)
-
err = add_recvbuf_mergeable(rq, gfp); /*后端支持VIRTIO_NET_F_MRG_RXBUF*/
-
else if (vi->big_packets)
-
err = add_recvbuf_big(rq, gfp); /*后端支持GUEST_GSO/GUEST_TSO,相当于LRO*/
-
else
-
err = add_recvbuf_small(rq, gfp);
-
-
oom = err == -ENOMEM;
-
if (err)
-
break;
-
++rq->num;
-
} while (rq->vq->num_free);
-
if (unlikely(rq->num > rq->max))
-
rq->max = rq->num;
-
virtqueue_kick(rq->vq); /*通知后端avail ring更新*/
-
return !oom;
-
}
无论是add_recvbuf_small还是add_recvbuf_big都是申请skb,然后将skb转换为desc。然后调用virtqueue_kick通知后端,以供后端继续放入收包数据。
整个virtio-net的收包逻辑如下所示。
其中黄色代码中断(上半部)的处理过程,绿色代表软中断(下半部)的处理过程。可见网络中断只负责将napi加入链表,而真正的收包逻辑都是由软中断来处理的。
Guest内部发送数据
Guest的发送我们要从virtio_net驱动注册到netdevice的ndo_start_xmit说起。这部分内容需要有“virtio-net初始化”一节作为背景。virtio-net将netdevice的netdev_ops注册为virtnet_netdev,所以对应的ndo_start_xmit函数即为start_xmit,
-
static const struct net_device_ops virtnet_netdev = {
-
.ndo_open = virtnet_open,
-
.ndo_stop = virtnet_close,
-
.ndo_start_xmit = start_xmit,
-
……
-
}
首先看下整体的发送流程:
l start_xmit
-
static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev)
-
{
-
struct virtnet_info *vi = netdev_priv(dev);
-
/*从skb->queue_mapping获取队列编号*/
-
int qnum = skb_get_queue_mapping(skb);
-
struct send_queue *sq = &vi->sq[qnum];
-
int err;
-
-
/* Free up any pending old buffers before queueing new ones. */
-
/* 在发送数据包前首先释放掉发送队列中之前残留的数据包 */
-
free_old_xmit_skbs(sq);
-
-
/* Try to transmit */
-
/*发送数据包*/
-
err = xmit_skb(sq, skb);
-
-
/* This should not */
-
if (unlikely(err)) {
-
dev->stats.tx_fifo_errors++;
-
if (net_ratelimit())
-
dev_warn(&dev->dev,
-
"Unexpected TXQ (%d) queue failure: %d\n", qnum, err);
-
dev->stats.tx_dropped++;
-
kfree_skb(skb);
-
return NETDEV_TX_OK;
-
}
-
/*通知后端接受数据包*/
-
virtqueue_kick(sq->vq);
-
-
/* Don't wait up for transmitted skbs to be freed. */
-
skb_orphan(skb);
-
nf_reset(skb);
-
-
/* Apparently nice girls don't return TX_BUSY; stop the queue
-
* before it gets out of hand. Naturally, this wastes entries. */
-
if (sq->vq->num_free < 2+MAX_SKB_FRAGS) {
-
netif_stop_subqueue(dev, qnum);
-
if (unlikely(!virtqueue_enable_cb_delayed(sq->vq))) {
-
/* More just got used, free them then recheck. */
-
free_old_xmit_skbs(sq);
-
if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) {
-
netif_start_subqueue(dev, qnum);
-
virtqueue_disable_cb(sq->vq);
-
}
-
}
-
}
-
-
return NETDEV_TX_OK;
-
}
可以看到发送数据包的工作主要由xmit_skb函数完成,然后通过virtqueue_kick(sq->vq)通知后端收包。这里我们先来看下virtqueue_kick。
l virtqueue_kick
-
void virtqueue_kick(struct virtqueue *vq)
-
{
-
if (virtqueue_kick_prepare(vq))
-
virtqueue_notify(vq);
-
}
其中首先要调用virtqueue_kick_prepare来确定是否真的需要kick。
l virtqueue_kick_prepare
-
bool virtqueue_kick_prepare(struct virtqueue *_vq)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
u16 new, old;
-
bool needs_kick;
-
-
START_USE(vq);
-
/* We need to expose available array entries before checking avail
-
* event. */
-
virtio_mb(vq->weak_barriers);
-
/*old是add_sg之前的avail.idx*/
-
old = vq->vring.avail->idx - vq->num_added;
-
/*new是当前的avail.idx*/
-
new = vq->vring.avail->idx;
-
vq->num_added = 0;
-
-
#ifdef DEBUG
-
if (vq->last_add_time_valid) {
-
WARN_ON(ktime_to_ms(ktime_sub(ktime_get(),
-
vq->last_add_time)) > 100);
-
}
-
vq->last_add_time_valid = false;
-
#endif
-
-
if (vq->event) {
-
needs_kick = vring_need_event(vring_avail_event(&vq->vring),
-
new, old);
-
} else {
-
needs_kick = !(vq->vring.used->flags & VRING_USED_F_NO_NOTIFY);
-
}
-
END_USE(vq);
-
return needs_kick;
-
}
这里面涉及到几个变量,old是add_sg之前的avail.idx,而new是当前的avail.idx,还有一个是vring_avail_event(&vq->vring),看具体的实现:
#define vring_avail_event(vr) (*(__u16 *)&(vr)->used->ring[(vr)->num])
可以看到这里是VRingUsed中的ring数组最后一项的值,该值在后端驱动从virtqueue中pop一个elem之前设置成相应队列的下一个将要使用的index,即last_avail_index。
看下vring_need_event函数:
l vring_need_event
-
static inline int vring_need_event(__u16 event_idx, __u16 new_idx, __u16 old)
-
{
-
return (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old);
-
}
前后端通过对比 (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old)来判断是否需要notify后端。
下面回头看发送数据的过程,也就是xmit_skb。
l xmit_skb
-
static int xmit_skb(struct send_queue *sq, struct sk_buff *skb)
-
{
-
struct skb_vnet_hdr *hdr = skb_vnet_hdr(skb); /*struct skb_vnet_hdr存放在skb的cb中*/
-
const unsigned char *dest = ((struct ethhdr *)skb->data)->h_dest;
-
struct virtnet_info *vi = sq->vq->vdev->priv;
-
unsigned num_sg;
-
-
pr_debug("%s: xmit %p %pM\n", vi->dev->name, skb, dest);
-
-
if (skb->ip_summed == CHECKSUM_PARTIAL) { /*校验和由硬件计算,这里也就是交给后端vhost-net或vhost-user*/
-
hdr->hdr.flags = VIRTIO_NET_HDR_F_NEEDS_CSUM;
-
hdr->hdr.csum_start = skb_checksum_start_offset(skb); /*记录硬件计算校验和的相关信息,校验起始位置和偏移*/
-
hdr->hdr.csum_offset = skb->csum_offset;
-
} else {
-
hdr->hdr.flags = 0;
-
hdr->hdr.csum_offset = hdr->hdr.csum_start = 0;
-
}
-
-
if (skb_is_gso(skb)) { /*注意这里判断的不是GSO,而是TSO和UFO,GSO的逻辑在进入驱动前就过了*/
-
hdr->hdr.hdr_len = skb_headlen(skb); /*设置TSO/UFO相关特性,方便带给后端,让后端识别*/
-
hdr->hdr.gso_size = skb_shinfo(skb)->gso_size;
-
if (skb_shinfo(skb)->gso_type & SKB_GSO_TCPV4)
-
hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_TCPV4; /*TSO*/
-
else if (skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6)
-
hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_TCPV6; /*TSO*/
-
else if (skb_shinfo(skb)->gso_type & SKB_GSO_UDP)
-
hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_UDP; /*UFO*/
-
else
-
BUG();
-
if (skb_shinfo(skb)->gso_type & SKB_GSO_TCP_ECN)
-
hdr->hdr.gso_type |= VIRTIO_NET_HDR_GSO_ECN;
-
} else { /*不支持硬件GSO(TSO/UFO)的情况*/
-
hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_NONE;
-
hdr->hdr.gso_size = hdr->hdr.hdr_len = 0;
-
}
-
-
hdr->mhdr.num_buffers = 0;
-
-
/* Encode metadata header at front. */
-
/*将cb中的skb_vnet_hdr拷贝到数据之前*/
-
if (vi->mergeable_rx_bufs)
-
sg_set_buf(sq->sg, &hdr->mhdr, sizeof hdr->mhdr);
-
else
-
sg_set_buf(sq->sg, &hdr->hdr, sizeof hdr->hdr);
-
/*将skb数据拷贝到发送队列中*/
-
num_sg = skb_to_sgvec(skb, sq->sg + 1, 0, skb->len) + 1;
-
return virtqueue_add_outbuf(sq->vq, sq->sg, num_sg, skb, GFP_ATOMIC);
-
}
这里要先说一下整个发送过程中的数据结构转换。首先所有数据(不管是skb中的data还是skb->cb中的virtio hdr)都要先转换为send_queue上的struct scatterlist结构(也叫做sg),然后再将这些sg转换为vring上的struct vring_desc结构(也叫做desc),然后通知后端取包。
而这里的sg_set_buf就是将skb->cb中的virtio hdr转换为sg,skb_to_sgvec则是将skb中的data转换为sg,最后virtqueue_add_outbuf则是将之前转换的sg转换为desc。下面我们一个一个的来看。
l sg_set_buf
-
/**
-
* sg_set_buf - Set sg entry to point at given data
-
* @sg: SG entry
-
* @buf: Data
-
* @buflen: Data length
-
*
-
**/
-
static inline void sg_set_buf(struct scatterlist *sg, const void *buf,
-
unsigned int buflen)
-
{
-
#ifdef CONFIG_DEBUG_SG
-
BUG_ON(!virt_addr_valid(buf));
-
#endif
-
/*virt_to_page根据数据的起始地址找到页对齐的地址,offset_in_page获取数据起始地址在这一页的偏移*/
-
sg_set_page(sg, virt_to_page(buf), buflen, offset_in_page(buf));
-
}
这里的参数buf和buflen分别是cb中的skb_vnet_hdr及其长度。然后调用sg_set_page使sg和skb_vnet_hdr对应的page关联(任何数据最终都是在page上)。
l sg_set_page
-
static inline void sg_set_page(struct scatterlist *sg, struct page *page,
-
unsigned int len, unsigned int offset)
-
{
-
sg_assign_page(sg, page);
-
sg->offset = offset;
-
sg->length = len;
-
}
sg_assign_page主要是将page对应的地址赋值个sg->page_link,只不过这个赋值有点特殊。
l sg_assign_page
-
static inline void sg_assign_page(struct scatterlist *sg, struct page *page)
-
{
-
unsigned long page_link = sg->page_link & 0x3;
-
-
/*
-
* In order for the low bit stealing approach to work, pages
-
* must be aligned at a 32-bit boundary as a minimum.
-
*/
-
BUG_ON((unsigned long) page & 0x03); /*后两位有特殊作用,用来标识结尾*/
-
#ifdef CONFIG_DEBUG_SG
-
BUG_ON(sg->sg_magic != SG_MAGIC);
-
BUG_ON(sg_is_chain(sg));
-
#endif
-
sg->page_link = page_link | (unsigned long) page;
-
}
我们知道一个page的地址一定是4字节对齐的,所以其地址的低两位肯定是0。这样我们用sg->page_link存放其地址的时候,就可以用低两位存储些其他信息了。Sg用这个低两位存放sg链表结尾的标识。
这样就将cb中的skb_vnet_hdr转换成了对应的sg了。下面看skb的data是如何被转换为sg的。
num_sg = skb_to_sgvec(skb, sq->sg + 1, 0, skb->len) + 1;
首先我们注意参数,是sq->sg + 1,因为sq->sg 已经被用做标识cb中的信息了。其次返回值是skb->data转换为的sg个数加1,也就是加上cb对应的一个sg,即这个skb总共对应的sg个数。
l skb_to_sgvec
-
int skb_to_sgvec(struct sk_buff *skb, struct scatterlist *sg, int offset, int len)
-
{
-
int nsg = __skb_to_sgvec(skb, sg, offset, len);
-
-
sg_mark_end(&sg[nsg - 1]);
-
-
return nsg;
-
}
转换工作主要由__skb_to_sgvec进行,而sg_mark_end只是负责标识最后一个sg。我们先看下它是如何标识的。
l sg_mark_end
-
static inline void sg_mark_end(struct scatterlist *sg)
-
{
-
#ifdef CONFIG_DEBUG_SG
-
BUG_ON(sg->sg_magic != SG_MAGIC);
-
#endif
-
/*
-
* Set termination bit, clear potential chain bit
-
*/
-
sg->page_link |= 0x02;
-
sg->page_link &= ~0x01;/*低字节有特殊作用,0x02表示结尾*/
-
}
正如前文所述,sg使用sg->page_link(地址)的低两位来标识最后一个sg。下面看__skb_to_sgvec.
l __skb_to_sgvec
注意其参数,offset和len分别为0和skb->len。
-
static int
-
__skb_to_sgvec(struct sk_buff *skb, struct scatterlist *sg, int offset, int len)
-
{
-
int start = skb_headlen(skb);/*线性区长度*/
-
int i, copy = start - offset;
-
struct sk_buff *frag_iter;
-
int elt = 0;
-
-
if (copy > 0) {
-
if (copy > len) /*数据包全在线性区*/
-
copy = len;
-
sg_set_buf(sg, skb->data + offset, copy);/*线性区占用一个scatterlist entry*/
-
elt++;
-
if ((len -= copy) == 0)
-
return elt;
-
offset += copy;
-
}
-
/*对非线性区的处理,逐个frag的处理*/
-
for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
-
int end;
-
-
WARN_ON(start > offset + len);
-
-
end = start + skb_frag_size(&skb_shinfo(skb)->frags[i]);
-
if ((copy = end - offset) > 0) {
-
skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
-
-
if (copy > len) /*copy为一个frag的长度*/
-
copy = len;
-
sg_set_page(&sg[elt], skb_frag_page(frag), copy, /*每一个frag对应一个scatterlist entry*/
-
frag->page_offset+offset-start);
-
elt++;
-
if (!(len -= copy))
-
return elt;
-
offset += copy; //offset为已经拷贝的长度
-
}
-
start = end;
-
}
-
-
/*处理skb_shinfo(skb)->frag_list,*/
-
skb_walk_frags(skb, frag_iter) {
-
int end;
-
-
WARN_ON(start > offset + len);
-
-
end = start + frag_iter->len;
-
if ((copy = end - offset) > 0) {
-
if (copy > len)
-
copy = len;
-
/*对链表上的每个skb递归调用__skb_to_sgvec,转换为对应的scatterlist entry*/
-
elt += __skb_to_sgvec(frag_iter, sg+elt, offset - start,
-
copy);
-
if ((len -= copy) == 0)
-
return elt;
-
offset += copy;
-
}
-
start = end;
-
}
-
BUG_ON(len);
-
return elt;
-
}
具体转为过程已经在代码中的注释解释的很清楚了,这里不再分析。最后看sg到desc的转换,也就是virtqueue_add_outbuf(sq->vq, sq->sg, num_sg, skb, GFP_ATOMIC)的实现。
l virtqueue_add_outbuf
-
int virtqueue_add_outbuf(struct virtqueue *vq,
-
struct scatterlist sg[], unsigned int num,
-
void *data,
-
gfp_t gfp)
-
{
-
return virtqueue_add(vq, &sg, sg_next_arr, num, 0, 1, 0, data, gfp);
-
}
其中主要调用了virtqueue_add。
l virtqueue_add
-
static inline int virtqueue_add(struct virtqueue *_vq, /*添加的目的队列*/
-
struct scatterlist *sgs[], /*要添加的scatterlist*/
-
struct scatterlist *(*next) /*一个函数指针,用于获取下一个scatterlist entry*/
-
(struct scatterlist *, unsigned int *),
-
unsigned int total_out, /*输入的scatterlist entry个数,即skb转换为的scatterlist entry的个数*/
-
unsigned int total_in, /*输出的scatterlist entry个数,对于发送total_in为0*/
-
unsigned int out_sgs, /*输出的scatterlist list的个数,这里一个out_sgs代表一个完整的skb_buffer,对于发送out_sgs为1*/
-
unsigned int in_sgs, /*输入的scatterlist list的个数,这里一个in_sgs代表一个完整的skb_buffer,对于发送in_sgs为0*/
-
void *data, /* data为skb* */
-
gfp_t gfp)
-
{
-
struct vring_virtqueue *vq = to_vvq(_vq);
-
struct scatterlist *sg;
-
unsigned int i, n, avail, uninitialized_var(prev), total_sg;
-
int head;
-
-
START_USE(vq);
-
-
BUG_ON(data == NULL);
-
-
total_sg = total_in + total_out;
-
-
/* If the host supports indirect descriptor tables, and we have multiple
-
* buffers, then go indirect. FIXME: tune this threshold */
-
if (vq->indirect && total_sg > 1 && vq->vq.num_free) {
-
head = vring_add_indirect(vq, sgs, next, total_sg, total_out,
-
total_in,
-
out_sgs, in_sgs, gfp);
-
if (likely(head >= 0))
-
goto add_head;
-
}
-
-
BUG_ON(total_sg > vq->vring.num);
-
BUG_ON(total_sg == 0);
-
-
if (vq->vq.num_free < total_sg) {
-
pr_debug("Can't add buf len %i - avail = %i\n",
-
total_sg, vq->vq.num_free);
-
/* FIXME: for historical reasons, we force a notify here if
-
* there are outgoing parts to the buffer. Presumably the
-
* host should service the ring ASAP. */
-
if (out_sgs)
-
vq->notify(&vq->vq);
-
END_USE(vq);
-
return -ENOSPC;
-
}
-
-
/* We're about to use some buffers from the free list. */
-
vq->vq.num_free -= total_sg; /*total_sg为skb对应的scatterlist entry总数*/
-
/*一个desc对应一个scatterlist entry*/
-
head = i = vq->free_head;
-
for (n = 0; n < out_sgs; n++) {
-
for (sg = sgs[n]; sg; sg = next(sg, &total_out)) {
-
vq->vring.desc[i].flags = VRING_DESC_F_NEXT;
-
vq->vring.desc[i].addr = sg_phys(sg);
-
vq->vring.desc[i].len = sg->length;
-
prev = i;
-
i = vq->vring.desc[i].next;
-
}
-
}
-
for (; n < (out_sgs + in_sgs); n++) { /*对于发送方向,没有in_sgs*/
-
for (sg = sgs[n]; sg; sg = next(sg, &total_in)) {
-
vq->vring.desc[i].flags = VRING_DESC_F_NEXT|VRING_DESC_F_WRITE;
-
vq->vring.desc[i].addr = sg_phys(sg);
-
vq->vring.desc[i].len = sg->length;
-
prev = i;
-
i = vq->vring.desc[i].next;
-
}
-
}
-
/* Last one doesn't continue. */
-
vq->vring.desc[prev].flags &= ~VRING_DESC_F_NEXT;
-
-
/* Update free pointer */
-
vq->free_head = i;
-
-
add_head:
-
/* Set token.记录数组记录本次发送的skb */
-
vq->data[head] = data; /*head 为记录次skb的首个desc的下标,data为本skb的地址*/
-
-
/* Put entry in available array (but don't update avail->idx until they
-
* do sync). */
-
/*更新avail*/
-
avail = (vq->vring.avail->idx & (vq->vring.num-1));
-
vq->vring.avail->ring[avail] = head; /*将本次要发送的首个desc下标记录在avail->ring[vq->vring.avail->idx]*/
-
-
/* Descriptors and available array need to be set before we expose the
-
* new available array entries. */
-
virtio_wmb(vq->weak_barriers);
-
vq->vring.avail->idx++;
-
vq->num_added++;
-
-
/* This is very unlikely, but theoretically possible. Kick
-
* just in case. */
-
/*如果avail的数量太多,则kick后端收包,这种情况是你很难发生的*/
-
if (unlikely(vq->num_added == (1 << 16) - 1))
-
virtqueue_kick(_vq);
-
-
pr_debug("Added buffer head %i to %p\n", head, vq);
-
END_USE(vq);
-
-
return 0;
-
}
整个skb->sg->desc的转换相关数据结构关系图如下所示。
Guest->Host
下面看guest发出数据包后,后端vhost如何接收。首先看一些准备工作:
vhost_net_open中会分别针对vhost_virtqueue和vhost_net的vhost_poll结构进行初始化。分别对应下图的绿色和橙色部分。
然后我们从qemu通过ioctl下发VHOST_SET_VRING_KICK命令说起。这个命令在vhost-net端由vhost_vring_ioctl处理。
-
long vhost_vring_ioctl(struct vhost_dev *d, int ioctl, void __user *argp)
-
{
-
struct file *eventfp, *filep = NULL;
-
bool pollstart = false, pollstop = false;
-
struct eventfd_ctx *ctx = NULL;
-
......
-
-
switch (ioctl) {
-
......
-
case VHOST_SET_VRING_KICK:
-
if (copy_from_user(&f, argp, sizeof f)) {
-
r = -EFAULT;
-
break;
-
}
-
eventfp = f.fd == -1 ? NULL : eventfd_fget(f.fd);
-
if (IS_ERR(eventfp)) {
-
r = PTR_ERR(eventfp);
-
break;
-
}
-
if (eventfp != vq->kick) {
-
pollstop = (filep = vq->kick) != NULL;
-
pollstart = (vq->kick = eventfp) != NULL; /*设置vq->kick fd,且设置pollstart为true*/
-
} else
-
filep = eventfp;
-
break;
-
case VHOST_SET_VRING_CALL:
-
......
-
break;
-
default:
-
r = -ENOIOCTLCMD;
-
}
-
-
if (pollstop && vq->handle_kick)
-
vhost_poll_stop(&vq->poll);
-
-
if (ctx)
-
eventfd_ctx_put(ctx);
-
if (filep)
-
fput(filep);
-
-
if (pollstart && vq->handle_kick) /*如果设置了kickfd*/
-
r = vhost_poll_start(&vq->poll, vq->kick);
-
-
mutex_unlock(&vq->mutex);
-
-
if (pollstop && vq->handle_kick)
-
vhost_poll_flush(&vq->poll);
-
return r;
-
}
可以看到对VHOST_SET_VRING_KICK的处理,除了设置了vq->kick,同时还会调用vhost_poll_start函数。
l vhost_poll_start
注意:这里调用vhost_poll_start的参数分别是vq->poll,和vq->kick,也就是eventfd对应的struct file结构。
-
int vhost_poll_start(struct vhost_poll *poll, struct file *file)
-
{
-
unsigned long mask;
-
int ret = 0;
-
-
if (poll->wqh)
-
return 0;
-
/*这里的poll函数也就是vq->kick对应的poll*/
-
mask = file->f_op->poll(file, &poll->table);
-
if (mask)
-
vhost_poll_wakeup(&poll->wait, 0, 0, (void *)mask);
-
if (mask & POLLERR) {
-
if (poll->wqh)
-
remove_wait_queue(poll->wqh, &poll->wait);
-
ret = -EINVAL;
-
}
-
-
return ret;
-
}
所以其中调用file->f_op->poll,其实就是eventfd的file结构的poll函数,由于eventfd的file_operations为eventfd_fops。
-
static const struct file_operations eventfd_fops = {
-
……
-
.poll = eventfd_poll,
-
……
-
};
所以这里的poll函数为eventfd_poll。
l eventfd_poll
这里的参数file即为eventfd对应的file结构。
-
static unsigned int eventfd_poll(struct file *file, poll_table *wait)
-
{
-
struct eventfd_ctx *ctx = file->private_data;
-
unsigned int events = 0;
-
unsigned long flags;
-
/*这里的wait即vhost_virtqueue.poll.table*/
-
poll_wait(file, &ctx->wqh, wait);
-
-
spin_lock_irqsave(&ctx->wqh.lock, flags);
-
if (ctx->count > 0)
-
events |= POLLIN;
-
if (ctx->count == ULLONG_MAX)
-
events |= POLLERR;
-
if (ULLONG_MAX - 1 > ctx->count)
-
events |= POLLOUT;
-
spin_unlock_irqrestore(&ctx->wqh.lock, flags);
-
-
return events;
-
}
l poll_wait
-
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
-
{
-
/*由于vhost_virtqueue.poll.table._qproc被初始化为vhost_poll_func,
-
*所以这里实际是调用vhost_poll_func
-
*/
-
if (p && p->_qproc && wait_address)
-
p->_qproc(filp, wait_address, p);
-
}
l vhost_poll_func
-
static void vhost_poll_func(struct file *file, wait_queue_head_t *wqh,
-
poll_table *pt)
-
{
-
struct vhost_poll *poll;
-
-
poll = container_of(pt, struct vhost_poll, table);
-
poll->wqh = wqh;
-
add_wait_queue(wqh, &poll->wait);
-
}
这里就是将vhost_virtqueue.poll.wait添加到了eventfd file的私有字段eventfd_ctx中的等待队列中。
以上就是vhost发包的基础。下面看guest发包后的情况。Guest发完包会最终会导致内核kvm模块write eventfd(至于具体过程我们后面单独分析,这里不再展开)。我们看下evenfd的write函数的实现,也就是eventfd_write。
l eventfd_write
-
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
-
loff_t *ppos)
-
{
-
struct eventfd_ctx *ctx = file->private_data;
-
ssize_t res;
-
......
-
if (likely(res > 0)) {
-
ctx->count += ucnt;
-
if (waitqueue_active(&ctx->wqh))/*如果ctx的等待队列不空*/
-
wake_up_locked_poll(&ctx->wqh, POLLIN); /*调用等待队列每个节点的func函数*/
-
}
-
spin_unlock_irq(&ctx->wqh.lock);
-
-
return res;
-
}
对kick eventfd进行write会导致调用其eventfd_ctx等待队列的每个节点注册的func函数。之前分析过vhost_poll_start会最终将vhost_virtqueue.poll.wait添加到eventfd file的私有字段eventfd_ctx中的等待队列中。而vhost_virtqueue.poll.wait的func被初始化为vhost_poll_wakeup。所以这里就会调用vhost_poll_wakeup。
l vhost_poll_wakeup
-
static int vhost_poll_wakeup(wait_queue_t *wait, unsigned mode, int sync,
-
void *key)
-
{
-
struct vhost_poll *poll = container_of(wait, struct vhost_poll, wait);
-
-
if (!((unsigned long)key & poll->mask))
-
return 0;
-
-
vhost_poll_queue(poll);
-
return 0;
-
}
其中又会调用vhost_poll_queue。
l vhost_poll_queue
-
void vhost_poll_queue(struct vhost_poll *poll)
-
{
-
vhost_work_queue(poll->dev, &poll->work);
-
}
其中又是调用vhost_work_queue。
l vhost_work_queue
-
void vhost_work_queue(struct vhost_dev *dev, struct vhost_work *work)
-
{
-
unsigned long flags;
-
-
spin_lock_irqsave(&dev->work_lock, flags);
-
if (list_empty(&work->node)) {
-
/*将 vhost_poll->work添加到vhost_dev->work_list*/
-
list_add_tail(&work->node, &dev->work_list);
-
work->queue_seq++;
-
/*唤醒vhost线程*/
-
wake_up_process(dev->worker);
-
}
-
spin_unlock_irqrestore(&dev->work_lock, flags);
-
}
vhost_work_queue在将vhost_poll->work添加到vhost_dev->work_list后,又会唤醒vhost线程,而vhost线程的处理函数为vhost_worker。
l vhost_worker
-
static int vhost_worker(void *data)
-
{
-
struct vhost_dev *dev = data;
-
......
-
for (;;) {
-
......
-
/*判断dev->work_list是否为NULL*/
-
if (!list_empty(&dev->work_list)) {
-
work = list_first_entry(&dev->work_list,
-
struct vhost_work, node);
-
list_del_init(&work->node);
-
seq = work->queue_seq;
-
} else
-
work = NULL;
-
spin_unlock_irq(&dev->work_lock);
-
-
if (work) {
-
__set_current_state(TASK_RUNNING);
-
work->fn(work);/*调用dev->work_list上的vhost_work的函数*/
-
if (need_resched())
-
schedule();
-
} else
-
schedule();
-
-
}
-
......
-
return 0;
-
}
vhost_worker是个死循环,一直在检查dev->work_list是否为空,如果不空,就去下其上的vhost_work结构,并调用其对应的注册函数fn。而之前分析过vhost_work_queue会将vhost_poll->work添加到vhost_dev->work_list中。而这里的vhost_poll->work的注册函数为handle_tx_kick。
l handle_tx_kick
-
static void handle_tx_kick(struct vhost_work *work)
-
{
-
struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
-
poll.work);
-
struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev);
-
-
handle_tx(net);
-
}
其中主要是调用handle_tx。
l handle_tx
-
static void handle_tx(struct vhost_net *net)
-
{
-
struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
-
struct vhost_virtqueue *vq = &nvq->vq;
-
unsigned out, in, s;
-
int head;
-
struct msghdr msg = {
-
.msg_name = NULL,
-
.msg_namelen = 0,
-
.msg_control = NULL,
-
.msg_controllen = 0,
-
.msg_iov = vq->iov,
-
.msg_flags = MSG_DONTWAIT,
-
};
-
size_t len, total_len = 0;
-
int err;
-
size_t hdr_size;
-
struct socket *sock;
-
struct vhost_net_ubuf_ref *uninitialized_var(ubufs);
-
bool zcopy, zcopy_used;
-
-
/* TODO: check that we are running from vhost_worker? */
-
/*对应tap设备关联的sock结构*/
-
sock = rcu_dereference_check(vq->private_data, 1);
-
if (!sock)
-
return;
-
-
mutex_lock(&vq->mutex);
-
/*disable virtqueue的notify通知,通过VRING_USED_F_NO_NOTIFY标志位*/
-
vhost_disable_notify(&net->dev, vq);
-
-
hdr_size = nvq->vhost_hlen;
-
zcopy = nvq->ubufs;
-
-
for (;;) {
-
/* Release DMAs done buffers first */
-
if (zcopy)
-
vhost_zerocopy_signal_used(net, vq);
-
/*根据vq->avail_idx将desc转换为iovec*/
-
head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
-
ARRAY_SIZE(vq->iov),
-
&out, &in,
-
NULL, NULL);
-
/* On error, stop handling until the next kick. */
-
if (unlikely(head < 0))
-
break;
-
/* Nothing new? Wait for eventfd to tell us they refilled. */
-
if (head == vq->num) {
-
int num_pends;
-
-
/* If more outstanding DMAs, queue the work.
-
* Handle upend_idx wrap around
-
*/
-
num_pends = likely(nvq->upend_idx >= nvq->done_idx) ?
-
(nvq->upend_idx - nvq->done_idx) :
-
(nvq->upend_idx + UIO_MAXIOV -
-
nvq->done_idx);
-
if (unlikely(num_pends > VHOST_MAX_PEND))
-
break;
-
if (unlikely(vhost_enable_notify(&net->dev, vq))) {
-
vhost_disable_notify(&net->dev, vq);
-
continue;
-
}
-
break;
-
}
-
if (in) {
-
vq_err(vq, "Unexpected descriptor format for TX: "
-
"out %d, int %d\n", out, in);
-
break;
-
}
-
/* Skip header. TODO: support TSO. */
-
s = move_iovec_hdr(vq->iov, nvq->hdr, hdr_size, out);
-
msg.msg_iovlen = out;
-
len = iov_length(vq->iov, out);
-
/* Sanity check */
-
if (!len) {
-
vq_err(vq, "Unexpected header len for TX: "
-
"%zd expected %zd\n",
-
iov_length(nvq->hdr, s), hdr_size);
-
break;
-
}
-
zcopy_used = zcopy && (len >= VHOST_GOODCOPY_LEN ||
-
nvq->upend_idx != nvq->done_idx);
-
-
/* use msg_control to pass vhost zerocopy ubuf info to skb */
-
if (zcopy_used) {
-
vq->heads[nvq->upend_idx].id = head;
-
if (!vhost_net_tx_select_zcopy(net) ||
-
len < VHOST_GOODCOPY_LEN) {
-
/* copy don't need to wait for DMA done */
-
vq->heads[nvq->upend_idx].len =
-
VHOST_DMA_DONE_LEN;
-
msg.msg_control = NULL;
-
msg.msg_controllen = 0;
-
ubufs = NULL;
-
} else {
-
struct ubuf_info *ubuf;
-
ubuf = nvq->ubuf_info + nvq->upend_idx;
-
-
vq->heads[nvq->upend_idx].len =
-
VHOST_DMA_IN_PROGRESS;
-
ubuf->callback = vhost_zerocopy_callback;
-
ubuf->ctx = nvq->ubufs;
-
ubuf->desc = nvq->upend_idx;
-
msg.msg_control = ubuf;
-
msg.msg_controllen = sizeof(ubuf);
-
ubufs = nvq->ubufs;
-
kref_get(&ubufs->kref);
-
}
-
nvq->upend_idx = (nvq->upend_idx + 1) % UIO_MAXIOV;
-
} else
-
msg.msg_control = NULL;
-
/* TODO: Check specific error and bomb out unless ENOBUFS? */
-
/*调用tap设备sock关联的发送函数,即tun_sendmsg*/
-
err = sock->ops->sendmsg(NULL, sock, &msg, len);
-
if (unlikely(err < 0)) {
-
if (zcopy_used) {
-
if (ubufs)
-
vhost_net_ubuf_put(ubufs);
-
nvq->upend_idx = ((unsigned)nvq->upend_idx - 1)
-
% UIO_MAXIOV;
-
}
-
vhost_discard_vq_desc(vq, 1);
-
break;
-
}
-
if (err != len)
-
pr_debug("Truncated TX packet: "
-
" len %d != %zd\n", err, len);
-
/*更新used*/
-
if (!zcopy_used)
-
vhost_add_used_and_signal(&net->dev, vq, head, 0);
-
else
-
vhost_zerocopy_signal_used(net, vq);
-
total_len += len;
-
vhost_net_tx_packet(net);
-
/* 超出了quota,重新入队列等待调度 */
-
if (unlikely(total_len >= VHOST_NET_WEIGHT)) {
-
vhost_poll_queue(&vq->poll);
-
break;
-
}
-
}
-
-
mutex_unlock(&vq->mutex);
-
}
其中调用了tap设备的sock结构上的发送函数,即tun_sendmsg。
l tun_sendmsg
-
static int tun_sendmsg(struct kiocb *iocb, struct socket *sock,
-
struct msghdr *m, size_t total_len)
-
{
-
int ret;
-
struct tun_file *tfile = container_of(sock, struct tun_file, socket);
-
struct tun_struct *tun = __tun_get(tfile);
-
-
if (!tun)
-
return -EBADFD;
-
ret = tun_get_user(tun, tfile, m->msg_control, m->msg_iov, total_len,
-
m->msg_iovlen, m->msg_flags & MSG_DONTWAIT);
-
tun_put(tun);
-
return ret;
-
}
l tun_get_user
tun_get_user内部主要调用tun_alloc_skb分配skb,然后将数据包从virtio ring中copy到skb中。最后调用netif_rx_ni进入协议栈。
这个vhost发送相关数据结构和调用路径如下图红色线条所示,整个发送过程一部分处于进程上下文(vhost内核线程),一部分处于软中断中。