NCCL源码解析7——RDMA建连
——lvyilong316
前文我们分析过在channel setup阶段,设计到proxyService线程还有proxyProgress线程,主线程通过proxyService线程进行RDMA建连接。上一篇上传的图片有些糊了,重新上传一下,如下图所示。
本节我们就以NCCL中的RDMA建连过程来讲一下RDMA的典型建连过程,同时以海思(hns)网卡的verbs实现,大概描述一些建连过程中每次verbs调用都发生了什么。先借用一张图描述RDMA verbs API的调用关系。
NCCL采用的是带外socket建连,与之对应的是CM建连。NCCL RDMA建连本质是两个rank的proxySerivice建连,我们分别以client端的和server的proxySerivice处理过程进行分析。下面开始具体展开分析。
我们以client端为例,首先client端proxySerivice线程收到ncclProxyMsgSetup,然后调用对应Transport的proxySetup,对应net Transport,即为sendProxySetup。而sendProxySetup主要进行如下调用,调用对应netplugin的getProperties。
-
NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props));
当net plugin为IB时,其对应plugin实现如下。
-
ncclNet_t ncclNetIb = {
-
"IB",
-
ncclIbInit,
-
ncclIbDevices,
-
ncclIbGetProperties,
-
ncclIbListen,
-
ncclIbConnect,
-
ncclIbAccept,
-
ncclIbRegMr,
-
ncclIbRegMrDmaBuf,
-
ncclIbDeregMr,
-
ncclIbIsend,
-
ncclIbIrecv,
-
ncclIbIflush,
-
ncclIbTest,
-
ncclIbCloseSend,
-
ncclIbCloseRecv,
-
ncclIbCloseListen,
-
NULL /* getDeviceMr */,
-
NULL /* irecvConsumed */
-
};
其中getProperties对应的是ncclIbGetProperties,主要是获取设备的一些属性,如maxqp,是否支持GDR,以及是否支持DmaBuf等,这里不再展开。
接着,client端proxySerivice线程收到主现程的ncclProxyMsgConnect消息,调用对应Transport的ProxyConnect函数,对于IB就是sendProxyConnect。这个函数的关键是调用IB plugin的connect函数,即ncclIbConnect。
-
ret = proxyState->ncclNet->connect(resources->netDev, req->handle, &resources->netSendComm, &resources->netDeviceHandle);
下面我们对ncclIbConnect分三部分分析。首先{BANNED}中国第一部分如下所示,主要是创建用于带外协商的TCP socket,并和server端建立socket连接,作为后续交互RDMA连接信息的通道。
-
ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNetDeviceHandle_t** /*sendDevComm*/) {
-
struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle;
-
struct ncclIbCommStage* stage = &handle->stage;
-
struct ncclIbSendComm* comm = (struct ncclIbSendComm*)stage->comm;
-
int ready;
-
*sendComm = NULL;
-
-
if (stage->state == ncclIbCommStateConnect) goto ib_connect_check;
-
if (stage->state == ncclIbCommStateSend) goto ib_send;
-
if (stage->state == ncclIbCommStateConnecting) goto ib_connect;
-
if (stage->state == ncclIbCommStateConnected) goto ib_send_ready;
-
if (stage->state != ncclIbCommStateStart) {
-
WARN("Error: trying to connect already connected sendComm");
-
return ncclInternalError;
-
}
-
-
NCCLCHECK(ncclIbMalloc((void**)&comm, sizeof(struct ncclIbSendComm)));
-
/* 创建用于带外通信的TCP socket */
-
NCCLCHECK(ncclSocketInit(&comm->base.sock, &handle->connectAddr, handle->magic, ncclSocketTypeNetIb, NULL, 1));
-
stage->comm = comm;
-
stage->state = ncclIbCommStateConnect;
-
/* 和server简历带外TCP连接 */
-
NCCLCHECK(ncclSocketConnect(&comm->base.sock));
然后,初始化client RDMA信息,首先是ncclIbInitCommDevBase。
-
ib_connect_check:
-
/* since ncclSocketConnect is async, we must check if connection is complete */
-
NCCLCHECK(ncclSocketReady(&comm->base.sock, &ready));
-
if (!ready) return ncclSuccess;
-
-
// IB Setup
-
struct ncclIbMergedDev* mergedDev;
-
mergedDev = ncclIbMergedDevs + dev;
-
comm->base.ndevs = mergedDev->ndevs;
-
comm->base.nqps = ncclParamIbQpsPerConn() * comm->base.ndevs; // We must have at least 1 qp per-device
-
comm->base.isSend = true;
-
-
// Init PD, Ctx for each IB device
-
comm->ar = 1; // Set to 1 for logic
-
for (int i = 0; i < mergedDev->ndevs; i++) {
-
int ibDevN = mergedDev->devs[i];
-
/* 为每个IB设备申请PD和创建QP */
-
NCCLCHECK(ncclIbInitCommDevBase(ibDevN, &comm->devs[i].base));
-
comm->ar = comm->ar && ncclIbDevs[dev].ar; // ADAPTIVE_ROUTING - if all merged devs have it enabled
-
}
-
-
struct ncclIbConnectionMetadata meta;
-
meta.ndevs = comm->base.ndevs;
ncclIbInitCommDevBase具体包括以下两个方面:
1. 调用ibv_alloc_pd分配一个RDMA PD
struct ibv_pd *ibv_alloc_pd(struct ibv_context *context)
PD(protection domain)的核心是一个数字,称为PD number(PDN),调用参数ibv_context是之前调用ibv_open_device申请的,context就相当于open文件得到的fd。而ibv_alloc_pd的用户态通常实现是向/dev/infiniband/uverbsX写入对应cmd,/dev/infiniband/uverbsX是内核驱动ib_uverbs.ko生成的。写入这个uverbsX{BANNED}最佳终会导致对应网卡RDMA内核驱动调用相关的alloc_pd函数,以hns驱动为例,只是在一个PD专用位图中分配一位,然后将其对应的PDN返回给用户态;
2. 调用ibv_create_cq创建RDMA CQ
CQ必须在创建QP之前创建,因为在调用创建QP的verbs API时,需要把CQN(CQ number)作为参数。CQN作为QP context中的一员,告知硬件在处理完某个QP的WQE后,改往哪个CQ填写CQE。多个QP可以使用一个CQ。同样verbs接口创建CQ也分用户态和内核态两部分。
其中用户态部分具体执行以下工作:
?① 从硬件或宏定义获取CQ的 size,即CQ中{BANNED}最佳多包含的 CQE 的个数。
② 分配存放所有CQE的CQ buffer,它可以看作一段普通的虚拟地址连续的缓存。
③ 以页为单位,为 CQ 的 Doorbell record 分配内存。一个内存页包含多个 Doorbell record,每个 Doorbell record 占 4 字节。所以在分配一个内存页后,后面很多次再为 Doorbell record分配内存时,都只需要在原有内存页中找到一个未使用的地址。
之后/dev/infiniband/uverbsX写入对应cmd触发内核驱动部分,内核具体完成以下工作:
?① 从硬件获取并调整CQ Depth,即 CQ buffer中CQE的个数。
② 因为用户态程序申请的 CQ buffer 是虚拟地址连续的,所以在内核会为其创建 CQ MTR 表,提供给硬件查表以获取 CQ buffer的物理地址。随后,函数中还会锁住(Pin)CQ buffer所在的内存页,防止其被切换到 swap 分区。
③ 获取Doorbell record 的物理地址和虚拟地址,物理地址会在之后传递给硬件,虚拟地址为软件自己所用。
④ 分配 CQC,即 CQ Context,对于hns驱动,其数据结构为 struct hns_roce_v2_cq_context 的对象,此对象表示一个 CQC。接着往CQC中填写信息,包括CQN、CQ buffer前两个内存页的物理地址、Doorbell record 的物理地址和 CQ MTR 表的基地址等。
⑤ 将填充好的 CQC 数据通过 mailbox 传递给硬件,将分配到的 CQN返回给用户态程序。
然后继续看ncclIbConnect逻辑,如下:
-
// Alternate QPs between devices
-
int devIndex;
-
devIndex = 0;
-
for (int q = 0; q < comm->base.nqps; q++) {
-
ncclIbSendCommDev* commDev = comm->devs + devIndex;
-
ncclIbDev* ibDev = ncclIbDevs + commDev->base.ibDevN;
-
NCCLCHECK(ncclIbCreateQp(ibDev->portNum, &commDev->base, IBV_ACCESS_REMOTE_WRITE, comm->base.qps+q));
-
comm->base.qps[q].devIndex = devIndex;
-
meta.qpInfo[q].qpn = comm->base.qps[q].qp->qp_num;
-
meta.qpInfo[q].devIndex = comm->base.qps[q].devIndex;
-
-
// Query ece capabilities (enhanced connection establishment)
-
NCCLCHECK(wrap_ibv_query_ece(comm->base.qps[q].qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported));
-
devIndex = (devIndex + 1) % comm->base.ndevs;
-
}
-
-
for (int i = 0; i < comm->base.ndevs; i++) {
-
ncclIbSendCommDev* commDev = comm->devs + i;
-
ncclIbDev* ibDev = ncclIbDevs + commDev->base.ibDevN;
-
-
// Write to the metadata struct via this pointer
-
ncclIbDevInfo* devInfo = meta.devs + i;
-
devInfo->ib_port = ibDev->portNum;
-
devInfo->mtu = ibDev->portAttr.active_mtu;
-
devInfo->lid = ibDev->portAttr.lid;
-
-
// Prepare my fifo
-
NCCLCHECK(wrap_ibv_reg_mr(&commDev->fifoMr, commDev->base.pd, comm->fifo, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ));
-
devInfo->fifoRkey = commDev->fifoMr->rkey;
-
-
// Pack local GID info
-
devInfo->link_layer = commDev->base.gidInfo.link_layer = ibDev->portAttr.link_layer;
-
NCCLCHECK(ncclIbGetGidIndex(ibDev->context, ibDev->portNum, &ibDev->portAttr, &commDev->base.gidInfo.localGidIndex));
-
NCCLCHECK(wrap_ibv_query_gid(ibDev->context, ibDev->portNum, commDev->base.gidInfo.localGidIndex, &commDev->base.gidInfo.localGid));
-
devInfo->gid.global.subnet_prefix = commDev->base.gidInfo.localGid.global.subnet_prefix;
-
devInfo->gid.global.interface_id = commDev->base.gidInfo.localGid.global.interface_id;
-
-
// info logging
-
if (devInfo->link_layer == IBV_LINK_LAYER_INFINIBAND) { // IB
-
for (int q = 0; q < comm->base.nqps; q++) {
-
// Print just the QPs for this dev
-
if (comm->base.qps[q].devIndex == i)
-
INFO(NCCL_NET,"NET/IB: %s %d IbDev %d Port %d qpn %d mtu %d LID %d subnet-prefix %lu FLID %d fifoRkey=0x%x fifoLkey=0x%x",
-
comm->base.ndevs > 2 ? "NCCL MergedDev" : "NCCL Dev",
-
dev, commDev->base.ibDevN, ibDev->portNum, meta.qpInfo[q].qpn, devInfo->mtu, devInfo->lid,
-
devInfo->gid.global.subnet_prefix, ncclIbExtractFlid(&devInfo->gid), devInfo->fifoRkey, commDev->fifoMr->lkey);
-
}
-
} else { // RoCE
-
for (int q = 0; q < comm->base.nqps; q++) {
-
// Print just the QPs for this dev
-
if (comm->base.qps[q].devIndex == i)
-
INFO(NCCL_NET,"NET/IB: %s %d IbDev %d Port %d qpn %d mtu %d query_ece={supported=%d, vendor_id=0x%x, options=0x%x, comp_mask=0x%x} GID %ld (%lX/%lX) fifoRkey=0x%x fifoLkey=0x%x",
-
comm->base.ndevs > 2 ? "NCCL MergedDev" : "NCCL Dev", dev,
-
commDev->base.ibDevN, ibDev->portNum, meta.qpInfo[q].qpn, devInfo->mtu, meta.qpInfo[q].ece_supported, meta.qpInfo[q].ece.vendor_id, meta.qpInfo[q].ece.options, meta.qpInfo[q].ece.comp_mask, (int64_t)commDev->base.gidInfo.localGidIndex,
-
devInfo->gid.global.subnet_prefix, devInfo->gid.global.interface_id, devInfo->fifoRkey, commDev->fifoMr->lkey);
-
}
-
}
-
}
由完成了几个操作;
1. 创建QP:ibv_create_qp
?建 QP 和创建 CQ 的代码执行流程比较类似,具体操作层面有几个不同点,比如, QP中有两个队列,所以需要分配两个 Doorbell record; QP buffer 中保存了两个队列的 WQE;过程中没有配置 QPC。
说明:这里代码流程图有遗漏。
2. 修改QP:ibv_modify_qp(?INIT 状态)
?所有对 QP 的修改{BANNED}最佳终都体现在此 QP 的 Context 中,希望达到的{BANNED}最佳终效果是:在将来发起 RDMA Write 之类的操作时,软件只需要填写 WQE 和写 Doorbell 寄存器, RDMA 网卡就知道如何获取 WQE、如何封装数据包、将数据包传输到哪里,以及传输完成后向哪个 CQ 中填写 CQE 等。
?在修改 QP 的过程中,应用程序会调用三次Verbs API ibv_modify_qp,依次把 QP 配置为?初始化( INIT)、准备接收( ready to receive, RTR)、准备发送( ready to send, RTS)状态。
?注意,函数 ibv_modify_qp 的第三个参数是一个标记,表示此次要修改 QP 的哪些属性。比如本次要修改 QP 的状态,则必须带上标记 IBV_QP_STATE;如果要配置对端 QPN,必须带上标记 IBV_QP_DEST_QPN。这样内核态驱动程序才能知道要修改 QPC 中的哪些具体属性。
另外,struct ibv_qp_attr 的另一个成员 qp_access_flags 被设置为 0,如果之后执行的数据传输操作为 Send 或 Receive,是没问题的。但如果之后要执行 RDMA Write或 RDMA Read 之类的操作,则需要指明 QP 拥有的对数据缓存的操作权限,可以将qp_access_flags 设置为 IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE。
3. 查询ECE:ibv_query_ece
查询当前设备是否具有ece capabilities (enhanced connection establishment),ECE是一种新的协商方案,用于交换有关QP能力的额外信息,然后在连接建立阶段进行协商。它用于支持各种功能,如RoCE选择性重传和PCC。
4. 注册MR:ibv_reg_mr
对于ncclSend这里是将comm->fifo注册为MR。注册MR主要有的如下3个作用:
? 实现虚拟地址到物理地址的转换,为此需要建立一个MR地址转换表。
? 控制 HCA 访问内存的权限,需要生成和使用本地密钥 L_Key 和远程密钥 R_Key。
? 避免换页,需要锁住(Pin)数据缓存所在的内存页。
5. 获取GID:ibv_query_gid
随后进行一些RDMA基本信息的日志打印。随后我们继续看ncclIbConnect逻辑,进入下一个阶段。
-
meta.fifoAddr = (uint64_t)comm->fifo;
-
strncpy(meta.devName, mergedDev->devName, MAX_MERGED_DEV_NAME);
-
-
stage->state = ncclIbCommStateSend;
-
stage->offset = 0;
-
NCCLCHECK(ncclIbMalloc((void**)&stage->buffer, sizeof(meta)));
-
-
memcpy(stage->buffer, &meta, sizeof(meta));
-
-
ib_send:
-
NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_SEND, &comm->base.sock, stage->buffer, sizeof(meta), &stage->offset));
-
if (stage->offset != sizeof(meta)) return ncclSuccess;
-
-
stage->state = ncclIbCommStateConnecting;
-
stage->offset = 0;
-
// Clear the staging buffer for re-use
-
memset(stage->buffer, 0, sizeof(meta));
-
-
ib_connect:
-
struct ncclIbConnectionMetadata remMeta;
-
NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_RECV, &comm->base.sock, stage->buffer, sizeof(ncclIbConnectionMetadata), &stage->offset));
-
if (stage->offset != sizeof(remMeta)) return ncclSuccess;
-
-
memcpy(&remMeta, stage->buffer, sizeof(ncclIbConnectionMetadata));
-
-
comm->base.nRemDevs = remMeta.ndevs;
-
if (comm->base.nRemDevs != comm->base.ndevs) {
-
mergedDev = ncclIbMergedDevs + dev;
-
WARN("NET/IB : Local mergedDev=%s has a different number of devices=%d as remoteDev=%s nRemDevs=%d",
-
mergedDev->devName, comm->base.ndevs, remMeta.devName, comm->base.nRemDevs);
-
}
-
-
int link_layer;
-
link_layer = remMeta.devs[0].link_layer;
-
for (int i = 1; i < remMeta.ndevs; i++) {
-
if (remMeta.devs[i].link_layer != link_layer) {
-
WARN("NET/IB : Can't merge net devices with different link_layer. i=%d remMeta.ndevs=%d link_layer=%d rem_link_layer=%d",
-
i, remMeta.ndevs, link_layer, remMeta.devs[i].link_layer);
-
return ncclInternalError;
-
}
-
}
可能有些人发现了,{BANNED}最佳开始创建的TCP socket连接还没用到,别急,这里就要用到了。上面这一段代码主要就是调用两次ncclSocketProgress,一次是send操作,将本rank的设备RDMA信息,包括设备名称,设备个数,qp信息等,通过struct ncclIbConnectionMetadata结构发送给对端rank;然后在通过recv操作接收对端rank的ncclIbConnectionMetadata信息,以此来完成两端RDMA信息的互相告知。
然后继续看继续看ncclIbConnect逻辑。下面这段代码主要是根据收到对端的RDMA设备信息,进一步进行处理。
-
int link_layer;
-
link_layer = remMeta.devs[0].link_layer;
-
for (int i = 1; i < remMeta.ndevs; i++) {
-
if (remMeta.devs[i].link_layer != link_layer) {
-
WARN("NET/IB : Can't merge net devices with different link_layer. i=%d remMeta.ndevs=%d link_layer=%d rem_link_layer=%d",
-
i, remMeta.ndevs, link_layer, remMeta.devs[i].link_layer);
-
return ncclInternalError;
-
}
-
}
-
-
// Copy remDevInfo for things like remGidInfo, remFifoAddr, etc.
-
for (int i = 0; i < remMeta.ndevs; i++) {
-
comm->base.remDevs[i] = remMeta.devs[i];
-
comm->base.remDevs[i].remoteGid.global.interface_id = comm->base.remDevs[i].gid.global.interface_id;
-
comm->base.remDevs[i].remoteGid.global.subnet_prefix = comm->base.remDevs[i].gid.global.subnet_prefix;
-
-
// Retain remote sizes fifo info and prepare RDMA ops
-
comm->remSizesFifo.rkeys[i] = remMeta.devs[i].fifoRkey;
-
comm->remSizesFifo.addr = remMeta.fifoAddr;
-
}
-
-
for (int i=0; i < comm->base.ndevs; i++) {
-
NCCLCHECK(wrap_ibv_reg_mr(comm->remSizesFifo.mrs+i, comm->devs[i].base.pd, &comm->remSizesFifo.elems, sizeof(int)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ));
-
}
-
comm->base.nRemDevs = remMeta.ndevs;
-
-
for (int q = 0; q < comm->base.nqps; q++) {
-
struct ncclIbQpInfo* remQpInfo = remMeta.qpInfo + q;
-
struct ncclIbDevInfo* remDevInfo = remMeta.devs + remQpInfo->devIndex;
-
-
// Assign per-QP remDev
-
comm->base.qps[q].remDevIdx = remQpInfo->devIndex;
-
int devIndex = comm->base.qps[q].devIndex;
-
ncclIbSendCommDev* commDev = comm->devs + devIndex;
-
-
struct ibv_qp* qp = comm->base.qps[q].qp;
-
if (remQpInfo->ece_supported)
-
NCCLCHECK(wrap_ibv_set_ece(qp, &remQpInfo->ece, &remQpInfo->ece_supported));
-
-
NCCLCHECK(ncclIbRtrQp(qp, &commDev->base.gidInfo, remQpInfo->qpn, remDevInfo, false));
-
NCCLCHECK(ncclIbRtsQp(qp));
-
}
-
-
if (link_layer == IBV_LINK_LAYER_ETHERNET ) { // RoCE
-
for (int q = 0; q < comm->base.nqps; q++) {
-
struct ncclIbQp* qp = comm->base.qps + q;
-
int ibDevN = comm->devs[qp->devIndex].base.ibDevN;
-
struct ncclIbDev* ibDev = ncclIbDevs + ibDevN;
-
INFO(NCCL_NET,"NET/IB: IbDev %d Port %d qpn %d set_ece={supported=%d, vendor_id=0x%x, options=0x%x, comp_mask=0x%x}",
-
ibDevN, ibDev->portNum, remMeta.qpInfo[q].qpn, remMeta.qpInfo[q].ece_supported, remMeta.qpInfo[q].ece.vendor_id, remMeta.qpInfo[q].ece.options, remMeta.qpInfo[q].ece.comp_mask);
-
}
-
}
-
-
comm->base.ready = 1;
-
stage->state = ncclIbCommStateConnected;
-
stage->offset = 0;
首先,检查远端设备和本地是否一致,然后将对端发来的RDMA设备信息保存到comm->base.remDevs这个用于存储远端设备信息的数组中。然后依次调用:
1. ibv_reg_mr
逐个设备注册用于远端访问remSizesFifo.elems,注意之前注册的是fifo,这里注册的是elems。
2. ibv_set_ece
如果对端设备也支持ECE,则打开本端的ECE能力;
3. ncclIbRtrQp,修改QP:ibv_modify_qp(?RTR状态)
这个函数本质是调用ibv_modify_qp,?把 qp_state 赋值为IBV_QPS_RTR,表示把 QP 状态(从 INIT)修改为 RTR。并且 struct ibv_qp_attr 中还写入了对端 QPN、 RQ PSN、 MTU、对端 LID、对端 GID 以及本地 GID 的索引等信息,相应的标记中也包含了IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER,其中的 IBV_QP_AV 用于配置对端 GID 和本地 GID 的索引等。
4. ncclIbRtsQp,修改QP:ibv_modify_qp(??RTS状态)
这个函数将QP改为RTS状态,即? ready to send。
然后,回到ncclIbConnect函数,再次调用ncclSocketProgress,这次只向对端发送一个int类型,表示本端已经准备好了。
-
ib_send_ready:
-
NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_SEND, &comm->base.sock, &comm->base.ready, sizeof(int), &stage->offset));
-
if (stage->offset != sizeof(int)) return ncclSuccess;
-
-
free(stage->buffer);
-
stage->state = ncclIbCommStateStart;
-
-
*sendComm = comm;
-
return ncclSuccess;
-
}