-
struct AioContext {
-
GSource source;
-
-
/* Protects all fields from multi-threaded access */
-
RFifoLock lock;
-
-
/* The list of registered AIO handlers */
-
QLIST_HEAD(, AioHandler) aio_handlers;
-
-
/* This is a simple lock used to protect the aio_handlers list.
-
* Specifically, it's used to ensure that no callbacks are removed while
-
* we're walking and dispatching callbacks.
-
*/
-
int walking_handlers;
-
-
/* lock to protect between bh's adders and deleter */
-
QemuMutex bh_lock;
-
/* Anchor of the list of Bottom Halves belonging to the context */
-
struct QEMUBH *first_bh;
-
-
/* A simple lock used to protect the first_bh list, and ensure that
-
* no callbacks are removed while we're walking and dispatching callbacks.
-
*/
-
int walking_bh;
-
-
/* Used for aio_notify. */
-
EventNotifier notifier;
-
-
/* GPollFDs for aio_poll() */
-
GArray *pollfds;
-
-
/* Thread pool for performing work and receiving completion callbacks */
-
struct ThreadPool *thread_pool;
-
-
/* TimerLists for calling timers - one per clock type */
-
QEMUTimerListGroup tlg;
-
};
-
-
struct QEMUBH {
-
AioContext *ctx;
-
QEMUBHFunc *cb;
-
void *opaque;
-
QEMUBH *next;
-
bool scheduled;
-
bool idle;
-
bool deleted;
-
};
-
-
struct BlockDriverAIOCB {
-
const AIOCBInfo *aiocb_info;
-
BlockDriverState *bs;
-
BlockDriverCompletionFunc *cb;
-
void *opaque;
-
};
-
-
struct ThreadPoolElement {
-
BlockDriverAIOCB common;
-
ThreadPool *pool;
-
ThreadPoolFunc *func;
-
void *arg;
-
-
/* Moving state out of THREAD_QUEUED is protected by lock. After
-
* that, only the worker thread can write to it. Reads and writes
-
* of state and ret are ordered with memory barriers.
-
*/
-
enum ThreadState state;
-
int ret; //io返回值
-
-
/* Access to this list is protected by lock. */
-
QTAILQ_ENTRY(ThreadPoolElement) reqs;
-
-
/* Access to this list is protected by the global mutex. */
-
QLIST_ENTRY(ThreadPoolElement) all;
-
};
-
-
struct ThreadPool {
-
EventNotifier notifier;
-
AioContext *ctx;
-
QemuMutex lock;
-
QemuCond check_cancel;
-
QemuCond worker_stopped;
-
QemuSemaphore sem;
-
int max_threads;
-
QEMUBH *new_thread_bh;
-
-
/* The following variables are only accessed from one AioContext. */
-
QLIST_HEAD(, ThreadPoolElement) head;
-
-
/* The following variables are protected by lock. */
-
QTAILQ_HEAD(, ThreadPoolElement) request_list;
-
int cur_threads;
-
int idle_threads;
-
int new_threads; /* backlog of threads we need to create */
-
int pending_threads; /* threads created but not running yet */
-
int pending_cancellations; /* whether we need a cond_broadcast */
-
bool stopping;
-
};
IO线程
worker_thread(pool)
do_spawn_thread(pool);
req = QTAILQ_FIRST(&pool->request_list);
QTAILQ_REMOVE(&pool->request_list, req, reqs);//从pool的request队列中拿出req,变更req的状态
req->state = THREAD_ACTIVE;
ret = req->func(req->arg);aio_worker//req->arg是paio_submit中组织的RawPosixAIOData *acb
ret = handle_aiocb_rw(aiocb);
return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
len = pwrite(aiocb->aio_fildes,
(const char *)buf + offset,
aiocb->aio_nbytes - offset,
aiocb->aio_offset + offset);
offset += len;
return offset
if (ret == aiocb->aio_nbytes) ret = 0; //在这里检查IO是否完整
req->state = THREAD_DONE;//变更thread的状态值
req->ret = ret;//记录IO是否成功
event_notifier_set(&pool->notifier);
主线程
main_loop ()
last_io = main_loop_wait(nonblocking);
ret = os_host_main_loop_wait(timeout_ns);
glib_pollfds_poll();
g_main_context_check(context, max_priority, pfds, glib_n_poll_fds)
g_main_context_dispatch(context);
aio_ctx_dispatch
aio_poll(ctx, false);
aio_bh_poll(ctx)
for (bh = ctx->first_bh; bh; bh = next)
bh->cb(bh->opaque);spawn_thread_bh_fn(ctx)
do_spawn_thread(pool);
qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
err = pthread_create(&thread->thread, &attr, start_routine, arg);
bdrv_co_em_bh(acb)
virtio_blk_rw_complete//virtio的callback
aio_dispatch(ctx)
node = QLIST_FIRST(&ctx->aio_handlers);
while(node)
node->io_read(node->opaque);event_notifier_ready(pool)
QLIST_FOREACH_SAFE(elem, &pool->head, all, next)
if (elem->state == THREAD_DONE && elem->common.cb)
elem->common.cb(elem->common.opaque, elem->ret);bdrv_co_io_em_complete//协程的callback
qemu_iohandler_poll(gpollfds, ret);
ioh->fd_read(ioh->opaque);virtio_queue_host_notifier_read
virtio_queue_notify_vq(vq);
vq->handle_output(vdev, vq);virtio_blk_handle_output
virtio_submit_multiwrite(s->bs, &mrb);
ret = bdrv_aio_multiwrite(bs, mrb->blkreq, mrb->num_writes);
num_reqs = multiwrite_merge(bs, reqs, num_reqs, mcb);
for (i = 0; i < num_reqs; i++) {
bdrv_co_aio_rw_vector(bs, reqs[i].sector, reqs[i].qiov,
reqs[i].nb_sectors, reqs[i].flags,
multiwrite_cb, mcb,
true);}//callback ==
,实现是多路的virtio_blk_rw_complete
virtio_blk_handle_rw_error //如果req->ret != 0,在这里处理
action = bdrv_get_error_action(req->dev->bs, is_read, error); //取得处理错误的动作
virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);//提交VIRTIO_BLK_S_IOERR
bdrv_error_action(s->bs, action, is_read, error);
bdrv_emit_qmp_error_event(bs, QEVENT_BLOCK_IO_ERROR, action, is_read);
virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);//ret==0则提交VIRTIO_BLK_S_OK
return 0
bdrv_co_aio_rw_vector
//在这里组织BlockDriverAIOCBCoroutine *acb
co = qemu_coroutine_create(bdrv_co_do_rw);
qemu_coroutine_enter(co, acb);
bdrv_co_do_rw
acb->req.error = bdrv_co_do_writev(bs, acb->req.sector,
acb->req.nb_sectors, acb->req.qiov, acb->req.flags);
return bdrv_co_do_pwritev(bs, sector_num << BDRV_SECTOR_BITS,
nb_sectors << BDRV_SECTOR_BITS, qiov, flags);
bdrv_io_limits_intercept(bs, bytes, true);
ret = bdrv_aligned_pwritev(bs, &req, offset, bytes,
use_local_qiov ? &local_qiov : qiov,
flags);
ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
bdrv_set_dirty(bs, sector_num, nb_sectors);
bdrv_co_writev_em
return bdrv_co_io_em(bs, sector_num, nb_sectors, iov, true);
acb = bs->drv->bdrv_aio_writev(bs, sector_num, iov, nb_sectors,
bdrv_co_io_em_complete, &co);raw_aio_writev
return raw_aio_submit(bs, sector_num, qiov, nb_sectors,
cb, opaque, QEMU_AIO_WRITE);cb==
return paio_submit(bs, s->fd, sector_num, qiov, nb_sectors,
cb, opaque, type);
pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
//pool就是qemu_aio_context这个全局AioContext的thread_pool
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
//在这里组织ThreadPoolElement *req
if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads)
QLIST_INSERT_HEAD(&pool->head, req, all); //把req加入tpool的执行队列
spawn_thread(pool);
if (!pool->pending_threads)
qemu_bh_schedule(pool->new_thread_bh);
/*把产生IO线程的BH加入到ctxt的执行队列里
其中,“pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);”
new_thread_bh是一个专用来生成thread的bh,它的cb是spawn_thread_bh_fn*/
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
return &req->common;
qemu_coroutine_yield();
//bdrv_co_io_em_complete callback here
return co.ret;
acb->bh = qemu_bh_new(bdrv_co_em_bh, acb);
return aio_bh_new(qemu_aio_context, cb, opaque);
//在这里组织BH
bh->ctx = ctx; //qemu_aio_context
bh->next = ctx->first_bh; //把这个bh加入到ctx的bh list
ctx->first_bh = bh;
reuturn bh;
qemu_bh_schedule(acb->bh);
aio_notify(bh->ctx);
event_notifier_set(&ctx->notifier);
ret = write(e->wfd, &value, sizeof(value))
return &acb->common;
guest virtio-driver->host kvm-> host qemu
GUEST: 2.6.32
-
virtio_blk.c
-
static void do_virtblk_request(struct request_queue *q)
-
{
-
struct virtio_blk *vblk = q->queuedata;
-
struct request *req;
-
unsigned int issued = 0;
-
-
while ((req = blk_peek_request(q)) != NULL) {
-
BUG_ON(req->nr_phys_segments + 2 > vblk->sg_elems);
-
-
/* If this request fails, stop queue and wait for something to
-
finish to restart it. */
-
if (!do_req(q, vblk, req)) {
-
blk_stop_queue(q);
-
break;
-
}
-
blk_start_request(req);
-
issued++;
-
}
-
-
if (issued)
-
vblk->vq->vq_ops->kick(vblk->vq);
-
}
-
-
virtio_pci.c
-
static struct virtqueue *setup_vq(struct virtio_device *vdev, unsigned index,
-
void (*callback)(struct virtqueue *vq),
-
const char *name,
-
u16 msix_vec)
-
-
static void vp_notify(struct virtqueue *vq)
-
{
-
struct virtio_pci_device *vp_dev = to_vp_device(vq->vdev);
-
struct virtio_pci_vq_info *info = vq->priv;
-
-
/* we write the queue's selector into the notification register to
-
* signal the other end */
-
iowrite16(info->queue_index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_NOTIFY);
-
}
HOST:
设备初始化流程中有:
virtio_pci_start_ioeventfd
virtio_pci_set_host_notifier_internal
virtio_queue_set_host_notifier_fd_handler(vq, true, set_handler); //将fd加入iohandlers的fd处理队列,对应的触发handler就是virtio_queue_host_notifier_read
event_notifier_set_handler(&vq->host_notifier,
virtio_queue_host_notifier_read);
qemu_set_fd_handler
qemu_set_fd_handler2(fd, NULL, fd_read, fd_write, opaque);
memory_region_add_eventfd(&proxy->bar, VIRTIO_PCI_QUEUE_NOTIFY, 2,
true, n, notifier);//注册使KVM处理对于VIRTIO_PCI_QUEUE_NOTIFY地址的io
阅读(1631) | 评论(0) | 转发(0) |