在上一篇里面分析到了基于netlink的connector,connector正是内核态与用户态配置命令交互的通道。用户通过调用用户态的工具,发送相应的命令参数,用户态工具将命令参数转换成相应的消息包,内核态解析消息后得到相应的指令,继续转换成函数调用,最后得以执行。
首先仔细看一下上一节提到的创建connector时注册的收数据的回调函数:
#ifdef KERNEL_HAS_CN_SKB_PARMS STATIC void drbd_connector_callback(struct cn_msg *req, struct netlink_skb_parms *nsp) { #else STATIC void drbd_connector_callback(void *data) { struct cn_msg *req = data; #endif struct drbd_nl_cfg_req *nlp = (struct drbd_nl_cfg_req *)req->data; struct cn_handler_struct *cm; struct cn_msg *cn_reply; struct drbd_nl_cfg_reply *reply; struct drbd_conf *mdev; int retcode, rr; int reply_size = sizeof(struct cn_msg) + sizeof(struct drbd_nl_cfg_reply) + sizeof(short int); if (!try_module_get(THIS_MODULE)) { printk(KERN_ERR "drbd: try_module_get() failed!\n"); return; } #ifdef KERNEL_HAS_CN_SKB_PARMS if (!cap_raised(nsp->eff_cap, CAP_SYS_ADMIN)) { retcode = ERR_PERM; goto fail; } #endif mdev = ensure_mdev(nlp); if (!mdev) { retcode = ERR_MINOR_INVALID; goto fail; } trace_drbd_netlink(req, 1); if (nlp->packet_type >= P_nl_after_last_packet) { retcode = ERR_PACKET_NR; goto fail; } printk("packet_type is %d\n", nlp->packet_type); cm = cnd_table + nlp->packet_type; /* This may happen if packet number is 0: */ if (cm->function == NULL) { retcode = ERR_PACKET_NR; goto fail; } reply_size += cm->reply_body_size; /* allocation not in the IO path, cqueue thread context */ cn_reply = kmalloc(reply_size, GFP_KERNEL); if (!cn_reply) { retcode = ERR_NOMEM; goto fail; } reply = (struct drbd_nl_cfg_reply *) cn_reply->data; reply->packet_type = cm->reply_body_size ? nlp->packet_type : P_nl_after_last_packet; reply->minor = nlp->drbd_minor; reply->ret_code = NO_ERROR; /* Might by modified by cm->function. */ /* reply->tag_list; might be modified by cm->function. */ rr = cm->function(mdev, nlp, reply); cn_reply->id = req->id; cn_reply->seq = req->seq; cn_reply->ack = req->ack + 1; cn_reply->len = sizeof(struct drbd_nl_cfg_reply) + rr; cn_reply->flags = 0; trace_drbd_netlink(cn_reply, 0); rr = cn_netlink_send(cn_reply, CN_IDX_DRBD, GFP_KERNEL); if (rr && rr != -ESRCH) printk(KERN_INFO "drbd: cn_netlink_send()=%d\n", rr); kfree(cn_reply); module_put(THIS_MODULE); return; fail: drbd_nl_send_reply(req, retcode); module_put(THIS_MODULE); }
值得注意的是:
rr=cm->function(mdev,nlp,reply);
这一句,这里相当于是一个多态,function绑定到哪一个方法由消息包中携带的包类型决定:
cm=cnd_table+nlp->packet_type;
系统在初始化时级生成了一个全局的静态函数表,类似P_primary的标识符是在编译时动态生成的宏。表示其所在的元素的下标,同时也月包类型相对应。
static struct cn_handler_struct cnd_table[] = { [ P_primary ] = { &drbd_nl_primary, 0 }, [ P_secondary ] = { &drbd_nl_secondary, 0 }, [ P_disk_conf ] = { &drbd_nl_disk_conf, 0 }, [ P_detach ] = { &drbd_nl_detach, 0 }, [ P_net_conf ] = { &drbd_nl_net_conf, 0 }, [ P_disconnect ] = { &drbd_nl_disconnect, 0 }, [ P_resize ] = { &drbd_nl_resize, 0 }, [ P_syncer_conf ] = { &drbd_nl_syncer_conf, 0 }, [ P_invalidate ] = { &drbd_nl_invalidate, 0 }, [ P_invalidate_peer ] = { &drbd_nl_invalidate_peer, 0 }, [ P_pause_sync ] = { &drbd_nl_pause_sync, 0 }, [ P_resume_sync ] = { &drbd_nl_resume_sync, 0 }, [ P_suspend_io ] = { &drbd_nl_suspend_io, 0 }, [ P_resume_io ] = { &drbd_nl_resume_io, 0 }, [ P_outdate ] = { &drbd_nl_outdate, 0 }, [ P_get_config ] = { &drbd_nl_get_config, sizeof(struct syncer_conf_tag_len_struct) + sizeof(struct disk_conf_tag_len_struct) + sizeof(struct net_conf_tag_len_struct) }, [ P_get_state ] = { &drbd_nl_get_state, sizeof(struct get_state_tag_len_struct) + sizeof(struct sync_progress_tag_len_struct) }, [ P_get_uuids ] = { &drbd_nl_get_uuids, sizeof(struct get_uuids_tag_len_struct) }, [ P_get_timeout_flag ] = { &drbd_nl_get_timeout_flag, sizeof(struct get_timeout_flag_tag_len_struct)}, [ P_start_ov ] = { &drbd_nl_start_ov, 0 }, [ P_new_c_uuid ] = { &drbd_nl_new_c_uuid, 0 }, };
比如,在一次完整的用户态与内核态的交互中,用户态会多次发出P_get_state消息,该消息的包类型码为17。
类似cn_handler_struct这样的函数表,在drbd的代码中随处可见,无论是内核态还是用户态,这样一致的风格,应该非常利于扩展和维护。看代码的人也会觉得非常轻松,不至于无章可循。
DRBD的配置信息、虚拟设备、网络通信端口、对端信息等都是通过drbdsetup或者drbdadm工具以netlink消息包发送到内核态的。
在收到5号消息包时,drbd_nl_net_conf会被调用。在该函数中,会启动worker内核线程,该线程监控一个等待队列,当有事件到来时,即取出处理:
int drbd_worker(struct drbd_thread* thi) { ... w = NULL; spin_lock_irq(&mdev->data.work.q_lock); ERR_IF(list_empty(&mdev->data.work.q)) { /* something terribly wrong in our logic. * we were able to down() the semaphore, * but the list is empty... doh. * * what is the best thing to do now? * try again from scratch, restarting the receiver, * asender, whatnot? could break even more ugly, * e.g. when we are primary, but no good local data. * * I'll try to get away just starting over this loop. */ spin_unlock_irq(&mdev->data.work.q_lock); continue; } w = list_entry(mdev->data.work.q.next, struct drbd_work, list); list_del_init(&w->list); spin_unlock_irq(&mdev->data.work.q_lock); if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { /* dev_warn(DEV, "worker: a callback failed! \n"); */ if (mdev->state.conn >= C_CONNECTED) drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); } ... }
启动了worker线程之后,几乎所有的内核态的事务都会交给这个线程来处理。
继续回到drbd_nl_net_conf方法中,在初始化完worker线程后,会继续执行如下语句:
retcode=_drbd_request_state(mdev,NS(conn,C_UNCONNECTED),CS_VERBOSE);
这里既是与对端协商确定当前谁是主节点。在该方法中会向等待队列中放入一个事务,该事务为启动一个receiver线程,receiver线程会使用配置文件中指定的端口和IP信息建立tcp socket监听,等待对端的链接。此时,如果对端一直未有连接过来,本端尝试与对端连接也一直无法建立,则会根据配置等待指定的超时时间,之后会将本段置为Standalone状态。这也就是我们常见的两台服务器同时重启时,会发现一端的启动过程卡在drbd的等待上面。