Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3665452
  • 博文数量: 216
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7464
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 18:56
个人简介

将晦涩难懂的技术讲的通俗易懂

文章分类

全部博文(216)

文章存档

2025年(8)

2024年(11)

2023年(9)

2022年(4)

2021年(12)

2020年(8)

2019年(18)

2018年(19)

2017年(9)

2016年(26)

2015年(18)

2014年(54)

2013年(20)

分类: LINUX

2025-03-23 00:38:18

NCCL源码解析6——channel setup

——lvyilong316
继续NCCL源码分析。上一篇文章介绍了机内和机间的channel搜索情况。所谓channel只是描述了一些列可以通信rank的关系,channel搜索结束后只是确定了这些关系,并记录在:comm->channels[c].ring.prev(当前rankchannel c的前节点), comm->rank(当前rank, comm->channels[c].ring.next(当前rankchannel c的后节点)。但这几个rank并没有真的建立通信连接(比如RDMA连接),因此还无法通信,所以本节就看下channel中的rank是如何进行setup的。

继续看initTransportsRank函数,接下来调用ncclTopoComputeP2pChannels获取P2P Channel的数量。所谓P2P Channel就是只两个rank之间的点对点通信channel

点击(此处)折叠或打开

  1. ncclResult_t ncclTopoComputeP2pChannels(struct ncclComm* comm) {
  2.   /* here we already honor comm->max/minCTAs for p2pnChannels. */
  3.   if (comm->sharedRes->owner != comm) {
  4.     comm->p2pnChannels = std::min(comm->nChannels, (int)ncclParamMaxP2pNChannels());
  5.     comm->p2pnChannels = std::min(std::max(comm->p2pnChannels, (int)ncclParamMinP2pNChannels()), comm->sharedRes->tpP2pNChannels);
  6.   } else {
  7.     comm->p2pnChannels = std::min(comm->nChannels, (int)ncclParamMaxP2pNChannels());
  8.     comm->p2pnChannels = std::max(comm->p2pnChannels, (int)ncclParamMinP2pNChannels());
  9.   }

  10.   int minChannels = comm->p2pnChannels;
  11.   // We need to loop through all local GPUs to have a global picture
  12.   for (int g=0; g<comm->topo->nodes[GPU].count; g++) {
  13.     for (int r=0; r<comm->nRanks; r++) {
  14.       int nChannels;
  15.       NCCLCHECK(ncclTopoGetNchannels(comm, g, r, &nChannels));
  16.       if (nChannels >= 0) minChannels = std::min(minChannels, nChannels);
  17.     }
  18.   }

  19.   // Make nChannelsPerPeer and nChannels powers of 2. This is relied on when
  20.   // mapping p2p peers to channels.
  21.   comm->p2pnChannelsPerPeer = pow2Up(minChannels);
  22.   comm->p2pnChannels = pow2Up(comm->p2pnChannels);

  23.   comm->p2pnChannels = std::min(comm->p2pnChannels, pow2Down(ncclDevMaxChannelsForArgsBytes(ncclParamWorkArgsBytes())));
  24.   comm->p2pnChannelsPerPeer = std::min(comm->p2pnChannelsPerPeer, comm->p2pnChannels);

  25.   // Init channels that weren't used so far
  26.   for (int c=comm->nChannels; c<comm->p2pnChannels; c++) NCCLCHECK(initChannel(comm, c));

  27.   return ncclSuccess;
  28. }

之前在建立ringGraph的时候有搜索出一系列的环,并根据这些环建立了channel,假设现在一共有nChannelschannel,而p2p需要p2pnChannelschannel,那么如果p2pnChannels大于nChannles,会再创建p2pnChannels - nChannelschannel,并进行初始化,其他的复用;否则直接复用即可。对于每个send/recv操作,会使用p2pnChannelsPerPeerchannel并行发送/接收。

     接着initTransportsRank调用ncclProxyCreate创建proxyService线程。

点击(此处)折叠或打开

  1. // Launch proxy service thread, after this, the proxy calls can be used.
  2.   if (parent && parent->config.splitShare) {
  3.     comm->proxyState = parent->sharedRes->proxyState;
  4.     ncclAtomicRefCountIncrement(&parent->sharedRes->proxyState->refCount);
  5.   } else {
  6.     NCCLCHECKGOTO(ncclProxyCreate(comm), ret, fail);
  7.   }

    具体线程处理函数为ncclProxyService,当然如果支持UDS的话还会创建对应的UDS线程。

点击(此处)折叠或打开

  1. ncclResult_t ncclProxyCreate(struct ncclComm* comm) {
  2.   /* proxyState is shared among parent comm and split comms. comm->proxyState->thread is
  3.    * pthread_join()'d by commFree() in init.cc when the refCount reduces down to 0. */
  4.     //……
  5.     pthread_create(&comm->proxyState->thread, NULL, ncclProxyService, comm->proxyState);
  6.     ncclSetThreadName(comm->proxyState->thread, "NCCL Service %2d", comm->cudaDev);

  7.     // UDS support
  8.     INFO(NCCL_PROXY, "UDS: Creating service thread comm %p rank %d", comm, comm->rank);
  9.     pthread_create(&comm->proxyState->threadUDS, NULL, ncclProxyServiceUDS, comm->proxyState);
  10.     ncclSetThreadName(comm->proxyState->threadUDS, "NCCL UDS Service %2d", comm->cudaDev);
  11.   }
  12.   return ncclSuccess;
  13. }

proxy的监听端口和socket信息是在我们第二章讲bootstrap网络的时候创建的,并采用 bootstrapAllGather 方法获取了所有进程(rank) proxy 监听端口和所有进程的 UDS 信息。

NCCL中,intra-node场景,GPUGPU之间建立P2P transport,以及inter-node场景,通过NET建立NET transport的时候,都需要proxy线程的参与,其实总共有两个proxy线程,一个叫做proxyService线程,是每个NODE中每个GPU对应的一个,主要维护连接建立, 用于transportsetupconnect阶段。另一个叫做proxyProgress线程,也是每个NODE中每个GPU对应的一个,主要在inter-node通信过程中,处理kernelIB之间的数据交互。而我们这里提到的就是proxyService线程。

接下来,构建 p2pSchedule 数据结构,包含每个round中点对点(P2P)通信的发送rank和接收rank。通过这种方式,可以实现全量 rank 间的通信。

点击(此处)折叠或打开

  1. do { // Build p2p schedule
  2.     int node = comm->node;
  3.     int nNodes = comm->nNodes;
  4.     int nRanks = comm->nRanks;
  5.     int local = comm->localRank;
  6.     int nLocals = comm->maxLocalRanks;
  7.     struct ncclNodeRanks* nodeRanks = comm->nodeRanks;
  8.     bool flat = false;
  9.     for (int node = 0; node < nNodes; node++) {
  10.       if (nodeRanks[node].localRanks != nLocals) {
  11.         flat = true;
  12.         nNodes = 1; node = 0;
  13.         nLocals = nRanks; local = rank;
  14.         break;
  15.       }
  16.     }
  17.     int nNodesPow2 = pow2Up(nNodes);
  18.     int nLocalsPow2 = pow2Up(nLocals);
  19.     comm->p2pSchedule = ncclMemoryStackAlloc<ncclComm::P2pSchedulePair>(&comm->memPermanent, nRanks);
  20.     comm->planner.peers = ncclMemoryStackAlloc<ncclKernelPlanner::Peer>(&comm->memPermanent, nRanks);
  21.     uint32_t nodeRound = 0;
  22.     uint32_t nodeDelta = 0;
  23.     int round = 0;
  24.     // When enumerating peer deltas we use the quadratic formula (x*x+x)/2 mod N.
  25.     // Since that formula only produces valid permutations when N is a pow of 2,
  26.     // we let N = pow2Up(n) and filter out results greater-eq to n.
  27.     // Example sequence for 16 ranks: 0, 1, 3, 6, 10, 15, 5, 12, 4, 13, 7, 2, 14, 11, 9, 8
  28.     do {
  29.       if (nodeDelta < nNodes) { // Filter nonsensical node deltas
  30.         int sendNode = (node + nodeDelta) % nNodes;
  31.         int recvNode = (node - nodeDelta + nNodes) % nNodes;
  32.         uint32_t localRound = 0;
  33.         uint32_t localDelta = 0;
  34.         do {
  35.           if (localDelta < nLocals) { // Filter nonsensical node-local deltas
  36.             int sendLocal = (local + localDelta) % nLocals;
  37.             int recvLocal = (local - localDelta + nLocals) % nLocals;
  38.             comm->p2pSchedule[round].sendRank = flat ? sendLocal : nodeRanks[sendNode].localRankToRank[sendLocal];
  39.             comm->p2pSchedule[round].recvRank = flat ? recvLocal : nodeRanks[recvNode].localRankToRank[recvLocal];
  40.             round += 1;
  41.           }
  42.           localRound += 1;
  43.           localDelta = (localDelta + localRound) & (nLocalsPow2 - 1); // Quadratic update
  44.         } while (localRound != nLocalsPow2);
  45.       }
  46.       nodeRound += 1;
  47.       nodeDelta = (nodeDelta + nodeRound) & (nNodesPow2 - 1); // Quadratic update
  48.     } while (nodeRound != nNodesPow2);

  49.     if (round != nRanks) {
  50.       WARN("P2p schedule creation has bugs.");
  51.       ret = ncclInternalError;
  52.       goto fail;
  53.     }
  54.   } while (0);

随后,调用setupChannel设置通信 channel。该函数主要计算当前进程在ring中的位置和rank=0进程在ring中位置的距离,并赋值给ring->index。此外,setupChannel重新组织ring中的所有 ranks,使得本进程的 rank 作为ring的起点,并把重组后的ring赋值给ring->userRanks。接着,调用ncclTransportRingConnect等连接对应的 channel。关于ncclTransportRingConnect后面再详细展开。

点击(此处)折叠或打开

  1. comm->runtimeConn = comm->cuMemSupport && ncclParamRuntimeConnect();
  2.   if (comm->runtimeConn) {
  3.     for (int c=0; c<comm->nChannels; c++) {
  4.       NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, fail);
  5.     }
  6.     // Setup NVLS
  7.     NCCLCHECKGOTO(ncclNvlsSetup(comm, parent), ret, fail);
  8.     // Check if we can setup CollNet
  9.     if (comm->collNetSupport > 0) ncclCollNetSetup(comm, parent, graphs);
  10.   } else {
  11.     for (int c=0; c<comm->nChannels; c++) {
  12.       NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, fail);
  13.     }
  14.     NCCLCHECKGOTO(ncclTransportRingConnect(comm), ret, fail);

  15.     // Connect Trees
  16.     NCCLCHECKGOTO(ncclTransportTreeConnect(comm), ret, fail);

  17.     // Setup NVLS
  18.     NCCLCHECKGOTO(ncclNvlsSetup(comm, parent), ret, fail);
  19.     NCCLCHECKGOTO(ncclNvlsBufferSetup(comm), ret, fail);

  20.     // And NVLS trees if needed
  21.     NCCLCHECKGOTO(ncclNvlsTreeConnect(comm), ret, fail);

  22.     // Check if we can setup CollNet
  23.     if (comm->collNetSupport > 0) {
  24.       ncclCollNetSetup(comm, parent, graphs);
  25.       NCCLCHECKGOTO(ncclCollNetChainBufferSetup(comm), ret, fail);
  26.       NCCLCHECKGOTO(ncclCollNetDirectBufferSetup(comm), ret, fail);
  27.     }

  28.     // Connect to local net proxy
  29.     tpProxyRank = comm->topParentRanks[comm->rank];
  30.     NCCLCHECKGOTO(ncclProxyConnect(comm, TRANSPORT_NET, 1, tpProxyRank, &proxyConn), ret, fail);
  31.     NCCLCHECKGOTO(ncclProxyCallBlocking(comm, &proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0), ret, fail);

  32.     // Then to remote ones when using PXN
  33.     if (ncclPxnDisable(comm) == 0) {
  34.       int nranks;
  35.       NCCLCHECKGOTO(ncclTopoGetPxnRanks(comm, &pxnPeers, &nranks), ret, fail);
  36.       for (int r=0; r<nranks; r++) {
  37.         tpProxyRank = comm->topParentRanks[pxnPeers[r]];
  38.         NCCLCHECKGOTO(ncclProxyConnect(comm, TRANSPORT_NET, 1, tpProxyRank, &proxyConn), ret, fail);
  39.         NCCLCHECKGOTO(ncclProxyCallBlocking(comm, &proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0), ret, fail);
  40.       }
  41.     }

  42.     if (ncclParamNvbPreconnect()) {
  43.       // Connect p2p when using NVB path
  44.       int nvbNpeers;
  45.       NCCLCHECKGOTO(ncclTopoGetNvbGpus(comm->topo, comm->rank, &nvbNpeers, &nvbPeers), ret, fail);
  46.       for (int r=0; r<nvbNpeers; r++) {
  47.         int peer = nvbPeers[r];
  48.         int sendRound=0, recvRound=0;
  49.         while (comm->p2pSchedule[sendRound].sendRank != peer) sendRound++;
  50.         while (comm->p2pSchedule[recvRound].recvRank != peer) recvRound++;
  51.         uint8_t sendBase = ncclP2pChannelBaseForRound(comm, sendRound);
  52.         uint8_t recvBase = ncclP2pChannelBaseForRound(comm, recvRound);
  53.         for (int c=0; c<comm->p2pnChannelsPerPeer; c++) {
  54.           int channelId;
  55.           channelId = ncclP2pChannelForPart(comm->p2pnChannels, sendBase, c);
  56.           if (comm->channels[channelId].peers[peer]->send[1].connected == 0) {
  57.             comm->connectSend[peer] |= (1UL<<channelId);
  58.           }
  59.           channelId = ncclP2pChannelForPart(comm->p2pnChannels, recvBase, c);
  60.           if (comm->channels[channelId].peers[peer]->recv[1].connected == 0) {
  61.             comm->connectRecv[peer] |= (1UL<<channelId);
  62.           }
  63.         }
  64.       }

  65.       NCCLCHECKGOTO(ncclTransportP2pSetup(comm, NULL, 1), ret, fail);
  66.     }
  67.   }

  68.   TRACE(NCCL_INIT, "rank %d nranks %d - CONNECTED %d RINGS AND TREES", rank, nranks, comm->nChannels);

然后,做{BANNED}最佳后的初始化操作。

点击(此处)折叠或打开

  1. // Compute time models for algorithm and protocol combinations
  2.   NCCLCHECKGOTO(ncclTopoTuneModel(comm, comm->minCompCap, comm->maxCompCap, graphs), ret, fail);

  3.   INFO(NCCL_INIT, "%d coll channels, %d collnet channels, %d nvls channels, %d p2p channels, %d p2p channels per peer", comm->nChannels, comm->nChannels, comm->nvlsChannels, comm->p2pnChannels, comm->p2pnChannelsPerPeer);

  4.   if (comm->intraRank == 0) { // Load ncclParamLaunchMode
  5.     const char* str = ncclGetEnv("NCCL_LAUNCH_MODE");
  6.     enum ncclLaunchMode mode, modeOld;
  7.     if (str && strcasecmp(str, "GROUP") == 0) {
  8.       mode = ncclLaunchModeGroup;
  9.     } else {
  10.       mode = ncclLaunchModeParallel;
  11.     }
  12.     // In theory we could be racing with other communicators not associated with
  13.     // this one if the user is connecting to multiple ncclUniqueId's concurrently.
  14.     modeOld = __atomic_exchange_n(&ncclParamLaunchMode, mode, __ATOMIC_RELAXED);
  15.     if (modeOld == ncclLaunchModeInvalid && str && str[0]!='\0') {
  16.       INFO(NCCL_ENV, "NCCL_LAUNCH_MODE set by environment to %s", mode == ncclLaunchModeParallel ? "PARALLEL" : "GROUP");
  17.     }
  18.   }

  19.   // Call devCommSetup before the last barrier, making sure we don't have a thread running in front and starting to
  20.   // launch NCCL kernels before all cuda mem allocation is complete. That could cause a deadlock.
  21.   NCCLCHECKGOTO(devCommSetup(comm), ret, fail);
  22.   timers[TIMER_INIT_CONNECT] = clockNano() - timers[TIMER_INIT_CONNECT];

  23.   /* Local intra-node barrier */
  24.   NCCLCHECKGOTO(bootstrapIntraNodeBarrier(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, comm->localRankToRank[0]), ret, fail);

  25.   // We should have allocated all buffers, collective fifos, ... we can
  26.   // restore the affinity.
  27.   TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);

    到此为止initTransportsRank函数终于结束了。再介绍setupChannelncclTransportRingConnect前我们还是要把前面提到的NCCL的两种proxy线程再展开澄清一下。

NCCL为每个Rank创建一个proxyService线程,线程处理函数为:ncclProxyService,每个Rank上的线程名字分别为:NCCL Service 0NCCL Service 1NCCL Service 2NCCL Service 34 GPU)。NCCL为每个Rank创建一个proxyProgress线程,线程处理函数为:ncclProxyProgress,线程名字一样,都为:NCCL Progress 44 GPU

    下面我们看ncclTransportRingConnect的逻辑,如下所示。

点击(此处)折叠或打开

  1. ncclResult_t ncclTransportRingConnect(struct ncclComm* comm) {
  2.   ncclResult_t ret = ncclSuccess;
  3.   if (comm && comm->nRanks > 1) {
  4.     for (int c = 0; c < comm->nChannels; c++) {
  5.       struct ncclChannel* channel = comm->channels + c;
  6.       NCCLCHECKGOTO(ncclTransportP2pConnect(comm, c, 1, &channel->ring.prev, 1, &channel->ring.next, 0), ret, fail);
  7.     }
  8.     NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &comm->graphs[NCCL_ALGO_RING], 0), ret, fail);
  9.     INFO(NCCL_INIT, "Connected all rings");
  10.   }
  11. exit:
  12.   return ret;
  13. fail:
  14.   goto exit;
  15. }

其中主要就是调用ncclTransportP2pConnectncclTransportP2pSetup,而ncclTransportP2pConnect也不是真的把P2P channel的两个rank之间连接起来,而是根据当前rank在每个channel中的prevnext,设置对应rankmask,后续真正建立transport时根据这里的mask决定哪些rank之间需要创建,以及需要创建多少。

点击(此处)折叠或打开

  1. ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) {
  2.   TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv);
  3.   struct ncclChannel* channel = &comm->channels[channelId];
  4.   uint64_t mask = 1UL << channel->id;
  5.   for (int i=0; i<nrecv; i++) {
  6.     int peer = peerRecv[i];
  7.     if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->recv[connIndex].connected) continue;
  8.     comm->connectRecv[peer] |= mask;
  9.   }
  10.   for (int i=0; i<nsend; i++) {
  11.     int peer = peerSend[i];
  12.     if (peer == -1 || peer >= comm->nRanks || peer == comm->rank || channel->peers[peer]->send[connIndex].connected) continue;
  13.     comm->connectSend[peer] |= mask;
  14.   }
  15.   return ncclSuccess;
  16. }

{BANNED}最佳关键的还是ncclTransportP2pSetup这个函数,这个函数是创建transport的核心。

点击(此处)折叠或打开

  1. ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/) {
  2.   // Stream used during transport setup; need for P2P pre-connect + CUDA Graph
  3.   ncclResult_t ret = ncclSuccess;
  4.   int highestType = TRANSPORT_UNDEFINED; // track highest transport type
  5.   struct ncclConnect** data; // Store intermediate send/recvData structs for connect
  6.   struct ncclConnect** recvData; // Points to entries inside data for given recv connection within a channel
  7.   struct ncclConnect** sendData; // Points to entries inside data for given send connection within a channel
  8.   int done = 0;

  9.   int maxPeers = ncclParamConnectRoundMaxPeers();
  10.   NCCLCHECK(ncclCalloc(&data, maxPeers));
  11.   NCCLCHECK(ncclCalloc(&recvData, maxPeers));
  12.   NCCLCHECK(ncclCalloc(&sendData, maxPeers));

  13.   struct timeval timeStart, timeLast;
  14.   gettimeofday(&timeStart, NULL);
  15.   timeLast = timeStart; // struct copy
  16.   bool timeReported = false;

  17.   NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail);
  18.   // First time initialization
  19.   for (int i=1; i<comm->nRanks; i++) {
  20.     int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0);
  21.     int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks;
  22.     int sendPeer = (comm->rank + i) % comm->nRanks;
  23.     uint64_t recvMask = comm->connectRecv[recvPeer];
  24.     uint64_t sendMask = comm->connectSend[sendPeer];

  25.     /*
  26.     data[i] 包含了与特定对等体的所有发送和接收连接的相关信息。
  27.     这些信息根据与该对等体连接的发送通道(sendChannels)和接收通道(recvChannels)的数量进行组织。
  28.     数组的前 N 个条目存储接收连接的信息(recvData),接下来的 M 个条目存储发送连接的信息(sendData)。
  29.     每个 data 条目中的总连接数或特定的发送/接收连接数可能不同。
  30.     */
  31.     int p = i-(done+1);
  32.     if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
  33.     recvData[p] = data[p];
  34.     int sendChannels = 0, recvChannels = 0;
  35.     int type;
  36.     TIME_START(0);
  37.     for (int c=0; c<MAXCHANNELS; c++) {
  38.       if (recvMask & (1UL<<c)) {
  39.         NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail);
  40.         if (type > highestType) highestType = type;
  41.       }
  42.     }
  43.     TIME_STOP(0);
  44.     TIME_START(1);
  45.     sendData[p] = recvData[p]+recvChannels;
  46.     for (int c=0; c<MAXCHANNELS; c++) {
  47.       if (sendMask & (1UL<<c)) {
  48.         NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail);
  49.         if (type > highestType) highestType = type;
  50.       }
  51.     }
  52.     TIME_STOP(1);

首先,我们知道ncclTransportP2pSetup这个函数首先就是在一个循环中对每个channel调用的,然后ncclTransportP2pSetup里面开始也有一个for循环,遍历每个rank的前后相邻的两个rank,根据之前设置的channel mask记录的什么情况下创建send transport以及什么情况下创建recv transport,创建几个transport,进行transport的创建。

接下来我们注意到会调用一个模版函数selectTransport,如下,根据代码可知type0表示recvtype1比表示send

点击(此处)折叠或打开

  1. template <int type>
  2. static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclConnect* connect, int channelId, int peer, int connIndex, int* transportType) {
  3.   struct ncclPeerInfo* myInfo = comm->peerInfo+comm->rank;
  4.   struct ncclPeerInfo* peerInfo = comm->peerInfo+peer;
  5.   struct ncclConnector* connector = (type == 1) ? comm->channels[channelId].peers[peer]->send + connIndex :
  6.                                                   comm->channels[channelId].peers[peer]->recv + connIndex;
  7.   for (int t=0; t<NTRANSPORTS; t++) {
  8.     struct ncclTransport *transport = ncclTransports[t];
  9.     struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv;
  10.     int ret = 0;
  11.     NCCLCHECK(transport->canConnect(&ret, comm->topo, graph, myInfo, peerInfo));
  12.     if (ret) {
  13.       connector->transportComm = transportComm;
  14.       NCCLCHECK(transportComm->setup(comm, graph, myInfo, peerInfo, connect, connector, channelId, connIndex));
  15.       if (transportType) *transportType = t;
  16.       return ncclSuccess;
  17.     }
  18.   }
  19.   WARN("No transport found for rank %d[%lx] -> rank %d[%lx]", myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId);
  20.   return ncclSystemError;
  21. }

然后selectTransport会遍历NCCL库中所有的transport。我们当前版本(v2.22.3-1)定义了以下四种transport。我们后面主要以netTransport为例进行分析。

点击(此处)折叠或打开

  1. struct ncclTransport* ncclTransports[NTRANSPORTS] = {
  2.   &p2pTransport,
  3.   &shmTransport,
  4.   &netTransport,
  5.   &collNetTransport
  6. };

首先尝试便利每种transportcanConnect,如果成功则使用这个transportsetup函数进行transport的设置。所以优先级也是按照定义顺序决定的。例如机内intrarank之前如果可以创建P2Ptransport,则不会去创建netTransport

netTransport定义如下:

点击(此处)折叠或打开

  1. struct ncclTransport netTransport = {
  2.   "NET",
  3.   canConnect,
  4.   { sendSetup, sendConnect, sendFree, proxySharedInit, sendProxySetup, sendProxyConnect, sendProxyFree, sendProxyProgress, NULL },
  5.   { recvSetup, recvConnect, recvFree, proxySharedInit, recvProxySetup, recvProxyConnect, recvProxyFree, recvProxyProgress, NULL }
  6. };

transportsetup函数主要就是和自己的proxyService线程通信,通过proxyService线程为创建用于传送数据的transport通道做时代的准备,以netTransport为例就是sendSetuprecvSetupsendSetuprecvSetup具体就不再展开,其中会通过ncclProxyConnect连接proxyService,并调用ncclProxyCallBlockingproxyService发送对于命令,如ncclProxyMsgInitncclProxyMsgSetup等,而proxyService收到命令后就会执行相应的函数。

proxyService在收到ncclProxyMsgInit后会调用proxyConnInit函数。这个函数会判断当前transport如果定义了proxyProgress函数,则调用proxyProgressInit创建proxyProgress线程,proxyProgress主要是在inter-node通信过程中,处理kernelIB之间的数据交互用的。后面章节再具体展开。

点击(此处)折叠或打开

  1. static ncclResult_t proxyConnInit(struct ncclProxyLocalPeer* peer, struct ncclProxyConnectionPool* connectionPool, struct ncclProxyState* proxyState, ncclProxyInitReq* req, ncclProxyInitResp* resp, struct ncclProxyConnection** connection) {
  2.   //...
  3.   (*connection)->tcomm = (*connection)->send ? &ncclTransports[(*connection)->transport]->send : &ncclTransports[(*connection)->transport]->recv;
  4.   // If we need proxy progress, let's allocate ops and start the thread
  5.   if ((*connection)->tcomm->proxyProgress) {
  6.     NCCLCHECK(proxyProgressInit(proxyState));
  7.     struct ncclProxyProgressState* state = &proxyState->progressState;
  8.     strncpy(resp->devShmPath, state->opsPoolShmSuffix, sizeof(resp->devShmPath));
  9.   }
  10.    //...
  11. }

    回到ncclTransportP2pSetup函数,如下逻辑,前面setup完成必要的transport设置,如内存注册等,后面在收发两端就开始调用transportconnect函数。

点击(此处)折叠或打开

  1. if (i-done == maxPeers || i == comm->nRanks-1) {
  2.       // Loop until all channels with all ranks have been connected
  3.       bool allChannelsConnected;
  4.       allChannelsConnected = false;
  5.       while (!allChannelsConnected) {
  6.         allChannelsConnected = true;
  7.         for (int j=done+1; j<=i; j++) {
  8.           int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks;
  9.           int sendPeer = (comm->rank + j) % comm->nRanks;
  10.           uint64_t recvMask = comm->connectRecv[recvPeer];
  11.           uint64_t sendMask = comm->connectSend[sendPeer];

  12.           int p = j-(done+1);
  13.           int sendDataOffset = 0;
  14.           int recvDataOffset = 0;
  15.           for (int c=0; c<MAXCHANNELS; c++) {
  16.             TIME_START(3);
  17.             if (sendMask & (1UL<<c)) {
  18.               struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex;
  19.               // This connector hasn't completed connection yet
  20.               if (conn->connected == 0) {
  21.                 NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[p] + sendDataOffset++, 1, comm->rank, conn), ret, fail);
  22.                 if (ret == ncclSuccess) {
  23.                   conn->connected = 1;
  24.                   /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */
  25.                   CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
  26.                 } else if (ret == ncclInProgress) {
  27.                   allChannelsConnected = false;
  28.                 }
  29.               }
  30.             }
  31.             TIME_STOP(3);

  32.             // Start with recv channels
  33.             TIME_START(4);
  34.             if (recvMask & (1UL<<c)) {
  35.               struct ncclConnector* conn = comm->channels[c].peers[recvPeer]->recv + connIndex;
  36.               // This connector hasn't completed connection yet
  37.               if (conn->connected == 0) {
  38.                 NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[p] + recvDataOffset++, 1, comm->rank, conn), ret, fail);
  39.                 if (ret == ncclSuccess) {
  40.                   conn->connected = 1;
  41.                   /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */
  42.                   CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
  43.                 } else if (ret == ncclInProgress) {
  44.                   allChannelsConnected = false;
  45.                 }
  46.               }
  47.             }
  48.             TIME_STOP(4);
  49.           }
  50.           if (sendMask || recvMask) {
  51.             free(data[p]);
  52.             data[p] = NULL;
  53.           }
  54.         }

    以上调用逻辑实际一个rank的多个线程,为了方便理解,画了一个调用关系图,具体如下所示:

阅读(62) | 评论(0) | 转发(0) |
1

上一篇:RDMA为什么要Pin内存?

下一篇:没有了

给主人留下些什么吧!~~