Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3612180
  • 博文数量: 211
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7406
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 18:56
个人简介

将晦涩难懂的技术讲的通俗易懂

文章分类

全部博文(211)

文章存档

2025年(2)

2024年(11)

2023年(9)

2022年(4)

2021年(12)

2020年(8)

2019年(18)

2018年(19)

2017年(9)

2016年(26)

2015年(18)

2014年(54)

2013年(20)

分类: LINUX

2024-12-15 23:00:14

NCCL源码解析2——bootstrap网络初始化

——lvyilong316
      我们在前一部分中介绍了root进程生成ncclUniqueId的过程。ncclUniqueId数据结构中包含了root进程的监听地址和端口,用于其它进程与root进程进行连接。同时帮助root进程创建了bootstrapRoot这个线程,用于等待其他rank来传递信息以完成环形网络的建立。下面我们要介绍的ncclCommInitRank就是完成这部分工作的,当前这个函数也会完成nccl通信子的建立。

如果我们注意本系列的{BANNED}最佳开头nccl-testrun函数中,会发现当ncclProc == 1(只有一个rank)时调用的是ncclCommInitAll,否则调用的是ncclCommInitRank。在这一部分我们讨论 ncclCommInitRank 函数,而暂时忽略 ncclCommInitAll 函数。ncclCommInitAll 通过单个函数调用,为其管理的多个 GPU 依次创建通信子,其底层也是通过调用 ncclCommInitRank 函数实现的。此外,PyTorch 也推荐使用单个进程管理单张 GPU 的模式,意味着通过 ncclCommInitRank 函数创建 ncclComm_t 结构的通信子。

ncclCommInitRank 函数使用 ncclUniqueId 数据结构、进程总数和当前进程的 rank 值初始化 newcomm 数据结构。具体流程如下:

l  ncclCommInitRank

点击(此处)折叠或打开

  1. ncclResult_t ncclCommInitRank(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
  2.   // Load the CUDA driver and dlsym hooks (can fail on old drivers)
  3.   (void)ncclCudaLibraryInit();//函数初始化 cuda 驱动,并做一些必要的驱动版本兼容性检查。

  4.   int cudaDev;
  5.   ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
  6.   CUDACHECK(cudaGetDevice(&cudaDev));

  7.   NvtxParamsCommInitRank payload{myrank, nranks, cudaDev};
  8.   NVTX3_FUNC_WITH_PARAMS(CommInitRank, CommInitRankSchema, payload)

  9.   NCCLCHECK(ncclCommInitRankDev(newcomm, nranks, commId, myrank, cudaDev, &config));
  10.   return ncclSuccess;
  11. }

ncclCommInitRank 首先通过 ncclCudaLibraryInit 函数初始化 cuda 驱动,并做一些必要的版本兼容性检查。

接着调用 cudaGetDevice 函数获取当前进程管理的GPU并调用 ncclCommInitRankDev 函数来创建通信子。

通信子用ncclComm_t结构表示,其实也就是struct ncclComm,其包含了 NCCL 通信所需要的所有信息,包括 NCCL 配置、通道、拓扑等信息,如下面的代码所示。

点击(此处)折叠或打开

  1. struct ncclComm {
  2.   uint64_t startMagic;
  3.   struct ncclMemoryStack memPermanent, memScoped;
  4.   // List of destructors to run when comm is destructed
  5.   struct ncclDestructor* destructorHead;
  6.   struct ncclSharedResources* sharedRes;
  7.   /* map to top parent ranks. */
  8.   int* topParentRanks;
  9.   int* topParentLocalRanks;
  10.   struct ncclChannel channels[MAXCHANNELS];
  11.   struct ncclPeerInfo* peerInfo;
  12.   struct ncclTopoSystem* topo;
  13.   int netPluginLoaded;
  14.   ncclNet_t* ncclNet;
  15.   ncclNetDeviceType netDeviceType;
  16.   ncclCollNet_t* ncclCollNet;
  17.   void* bootstrap;
  18.   // Bitmasks for ncclTransportP2pSetup
  19.   uint64_t* connectSend;
  20.   uint64_t* connectRecv;
  21.   struct ncclTopoGraph graphs[NCCL_NUM_ALGORITHMS];
  22.   bool initAlgoChannels[NCCL_NUM_ALGORITHMS];
  23.   bool runtimeConn; // if dynamic connection is supported
  24.   int cuMemSupport;
  25.   uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
  26.   uint64_t commHash;
  27.   int rank; // my rank in the communicator
  28.   int nRanks; // number of GPUs in communicator
  29.   int cudaDev; // my cuda device index
  30.   int nvmlDev; // my nvml device index
  31.   int compCap; // compute capability of the GPU
  32.   int minCompCap, maxCompCap; // min/max compute capability in the communicator
  33.   int64_t busId; // my PCI bus ID in int format
  34.   cpu_set_t cpuAffinity; // CPU affinity of the GPU
  35.   int cudaArch; // matches __CUDA_ARCH__ of device
  36.   int cpuArch; // architecture - As defined in src/include/graph.h, e.g. x86/arm/ppc/mixed
  37.   int cpuVendor; // vendor - As defined in src/include/graph.h
  38.   int node;
  39.   int nNodes;
  40.   int localRank;
  41.   int localRanks;
  42.   int maxLocalRanks;
  43.   int* rankToNode;
  44.   int* rankToLocalRank;
  45.   int* localRankToRank;
  46.   // localRanks and localRanktoRank for all nodes
  47.   struct ncclNodeRanks* nodeRanks;
  48.   // MNNVL: Multi-Node NVLink
  49.   int MNNVL; // true when MNNVL is available
  50.   struct cliqueInfo clique; // Our MNNVL clique information
  51.   int cliqueRank; // Our rank within the MNNVL clique
  52.   bool checkPointers;
  53.   bool dmaBufSupport;
  54.   // Counter for tracking CUDA launches (P2P and collectives included)
  55.   uint64_t opCount;
  56.   // Channels for collectives
  57.   int nChannels; // connection nChannels
  58.   int collChannels; // enqueue nChannels
  59.   int nvlsChannels; // enqueue nChannels
  60.   ... ...
  61.   // Flag to ask NCCL kernels to abort
  62.   uint32_t* abortFlag;
  63.   uint32_t* abortFlagDev;
  64.   int* abortFlagRefCount;
  65.   uint32_t* childAbortFlag;
  66.   uint32_t* childAbortFlagDev;
  67.   uint32_t destroyFlag;
  68.   ... ...
  69.   uint64_t endMagic;
  70. };

ncclCudaLibraryInit

ncclCudaLibraryInit 函数通过调用 cudaGetDevice 函数初始化 cuda 驱动。接着获取 cuda 驱动版本,并检查是否满足版本要求。

接着ncclCudaLibraryInit 函数调用 cudaPfnFuncLoader 函数加载 cuda 启动库中的符号/函数,如 cudaDeviceGet 等。

点击(此处)折叠或打开

  1. ncclResult_t ncclCudaLibraryInit() {
  2.   pthread_once(&initOnceControl, initOnceFunc);
  3.   return initResult;
  4. }
  5. static void initOnceFunc() {
  6.   do {
  7.     const char* val = ncclGetEnv("CUDA_LAUNCH_BLOCKING");
  8.     ncclCudaLaunchBlocking = val!=nullptr && val[0]!=0 && !(val[0]=='0' && val[1]==0);
  9.   } while (0);

  10.   ncclResult_t ret = ncclSuccess;
  11.   int cudaDev;
  12.   int driverVersion;
  13.   CUDACHECKGOTO(cudaGetDevice(&cudaDev), ret, error); // Initialize the driver

  14.   CUDACHECKGOTO(cudaDriverGetVersion(&driverVersion), ret, error);
  15.   INFO(NCCL_INIT, "cudaDriverVersion %d", driverVersion);

  16.   if (driverVersion < CUDA_DRIVER_MIN_VERSION) {
  17.     // WARN("CUDA Driver version found is %d. Minimum requirement is %d", driverVersion, CUDA_DRIVER_MIN_VERSION);
  18.     // Silently ignore version check mismatch for backwards compatibility
  19.     goto error;
  20.   }

  21.   #if CUDART_VERSION >= 11030
  22.   if (cudaPfnFuncLoader()) {
  23.     WARN("CUDA some PFN functions not found in the library");
  24.     goto error;
  25.   }
  26.   #endif

  27.   // Determine whether we support the cuMem APIs or not
  28.   ncclCuMemSupported = ncclIsCuMemSupported();

  29.   initResult = ret;
  30.   return;
  31. error:
  32.   initResult = ncclSystemError;
  33.   return;
  34. }

{BANNED}最佳后,调用 ncclIsCuMemSupported 判断当前驱动版本是否支持CUMEMVMM RDMA。整体调用流程图如下。

CUMEM & VMM RDMA

上文提到了CUMEMVMM RDMA,这里也做一下简单介绍。

CUDA 应用程序对快速和高效管理内存的需求日益增长。在 CUDA 10.2 之前,开发者可用的选项仅限于 CUDA 提供的类似 malloc 的函数抽象。CUDA 10.2 引入了一组新的虚拟内存管理 API,构建更高效的动态数据结构,并在应用程序中更好地控制 GPU 内存的使用。

在许多应用中,很难猜测初始分配应该有多大。以下代码是描述可扩展向量的简单C++类。

点击(此处)折叠或打开

  1. class Vector {
  2. private:
  3.   void *d_p;
  4.   size_t alloc_sz, reserve_sz;
  5. public:
  6.   Vector() : d_p(NULL), alloc_sz(0), reserve_sz(0) {}
  7.   // 预留一些额外空间以加速扩展
  8.   CUresult reserve(size_t new_sz);
  9.   // 实际提交额外的内存
  10.   CUresult grow(size_t new_sz);
  11.   // 释放所有关联的资源
  12.   ~Vector();
  13. };

CUDA 10.2 之前,在 CUDA 中实现这个概念的唯一方法是使用 cudaMalloccudaFree cudaMemcpy,或者使用 cudaMallocManaged cudaPrefetchAsync 按需提交内存。

点击(此处)折叠或打开

  1. CUresult Vector::reserve(size_t new_sz) {
  2.   if (new_sz > reserve_sz) {
  3.     void *new_ptr = nullptr;
  4. #ifndef USE_MANAGED_MEMORY
  5.     cudaMalloc(&new_ptr, new_sz);
  6. #else
  7.     cudaMallocManaged(&new_ptr, new_sz);
  8. #endif
  9.     cudaMemcpy(new_ptr, d_p, alloc_sz);
  10.     cudaFree(d_p);
  11.     d_p = new_ptr;
  12.     reserve_sz = new_sz;
  13.   }
  14. }
  15. CUresult Vector::grow(size_t new_sz) {
  16.   Vector::reserve(alloc_sz + new_sz);
  17. #ifdef USE_MANAGED_MEMORY
  18.   cudaPrefetchAsync(d_p + alloc_sz, new_sz, dev);
  19. #endif
  20.   alloc_sz += new_sz;
  21. }

  22. Vector::~Vector() {
  23.   if (d_p) cudaFree(d_p);
  24. }

尽管实现相对直接,但上述实现存在许多性能影响。

1.   cudaMalloc 函数分配的内存多于扩展所需的内存。为了扩展,需要保留旧的分配并分配一个新的内存块,以容纳旧的分配和额外的空间,这大大减少了可扩展的空间。例如,如果设备只有 2GB 的内存,而已经有一个 1GB 的向量,那么无法再扩展它,因为需要 1GB 加上需要扩展的空间。实际上,无法扩展超过一半 GPU 内存大小的向量。

2.   每个分配必须映射到所有对等上下文,即使它们在这些上下文中从未使用。

3.   cudaMemcpy调用增加了扩展请求的延迟,并使用了宝贵的内存带宽来复制数据。这些带宽可以更好地用于其他地方。

4.   cudaFree调用在当前上下文上的所有待处理工作完成后才能继续。

使用托管内存解决了一些问题,但是使用托管内存会带来一些兼容性问题,导致其无法适用于所有应用程序。例如,cudaMallocManaged 内存不能与 CUDA 进程间通信函数一起使用,如 cudaIpc*

CUDA 虚拟内存分配函数与 cudaMalloc 类的高级函数与很大不同。低级虚拟内存分配包括 cuMemCreatecuMemAddressReserve 等。

回到上面 Vector 的示例。使用 CUDA 虚拟内存管理函数,可以将内存提交到虚拟地址空间的区域。如果用完了保留的空间,无需发出 cudaMemcpy 调用,也无需分配比原始请求更多的内存,只需将已有的分配重新映射到新地址即可。


点击(此处)折叠或打开

  1. CUresult Vector::reserve(size_t new_sz) {
  2.   // 尝试在old_ptr末尾保留
  3.   status = cuMemAddressReserve(&new_ptr, (aligned_sz - reserve_sz), 0ULL, old_ptr + reserve_sz, 0ULL);
  4. }

ncclCommInitRankDev

下面继续看ncclCommInitRankDev函数,它是ncclCommInitRank中的关键,负责环形网络的建立和nccl通信子ncclComm_t的初始化,由于流程太多,这里先将前半部分,即环形网络的建立,ncclComm_t通信子的初始化放在下一小结。

ncclCommInitRankDev 针对具体的设备 cudaDev rank 创建通信子 newcomm。相比于 ncclCommInitRank 函数其进一步增加了 cudaDev 参数

点击(此处)折叠或打开

  1. static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev, ncclConfig_t *config) {
  2.   ncclResult_t res = ncclSuccess;
  3.   ncclComm_t comm = NULL;
  4.   struct ncclCommInitRankAsyncJob *job = NULL;
  5.   const char* env = ncclGetEnv("NCCL_COMM_ID");
  6.   if (env && myrank == 0) {
  7.     INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", env);
  8.     NCCLCHECKGOTO(bootstrapCreateRoot((struct ncclBootstrapHandle*)&commId, true), res, fail);
  9.   }

  10.   NCCLCHECKGOTO(ncclInit(), res, fail);
  11.   if (ncclDebugLevel > NCCL_LOG_WARN || (ncclDebugLevel != NCCL_LOG_NONE && myrank == 0)) {
  12.     static pthread_once_t once = PTHREAD_ONCE_INIT;
  13.     pthread_once(&once, showVersion);
  14.   }
  15.   // Make sure the CUDA runtime is initialized.
  16.   CUDACHECKGOTO(cudaFree(NULL), res, fail);

  17.   NCCLCHECKGOTO(PtrCheck(newcomm, "CommInitRank", "newcomm"), res, fail);
  18.   NCCLCHECKGOTO(PtrCheck(config, "CommInitRank", "config"), res, fail);
  19.   if (nranks < 1 || myrank < 0 || myrank >= nranks) {
  20.     WARN("Invalid rank requested : %d/%d", myrank, nranks);
  21.     res = ncclInvalidArgument;
  22.     goto fail;
  23.   }

  24.   NCCLCHECKGOTO(ncclCalloc(&comm, 1), res, fail);
  25.   NCCLCHECKGOTO(ncclCalloc(&comm->abortFlag, 1), res, fail);
  26.   NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->abortFlagDev, 1), res, fail);
  27.   NCCLCHECKGOTO(ncclCalloc(&comm->abortFlagRefCount, 1), res, fail);
  28.   comm->startMagic = comm->endMagic = NCCL_MAGIC; // Used to detect comm corruption.
  29.   *comm->abortFlagRefCount = 1;
  30.   NCCLCHECKGOTO(parseCommConfig(comm, config), res, fail);
  31.   /* start with ncclInternalError and will be changed to ncclSuccess if init succeeds. */
  32.   comm->initState = ncclInternalError;
  33.   *newcomm = comm;

  34.   NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
  35.   job->comm = comm;
  36.   job->nranks = nranks;
  37.   job->commId = commId; // C++ struct assignment
  38.   job->myrank = myrank;
  39.   job->cudaDev = cudaDev;
  40.   NCCLCHECKGOTO(ncclAsyncLaunch(&job->base, ncclCommInitRankFunc, NULL, free, comm), res, fail);

  41. exit:
  42.   return ncclGroupErrCheck(res);
  43. fail:
  44.   if (comm) {
  45.     free(comm->abortFlag);
  46.     if (comm->abortFlagDev) ncclCudaHostFree((void*)comm->abortFlagDev);
  47.     free(comm->abortFlagRefCount);
  48.     free(comm);
  49.   }
  50.   if (newcomm) *newcomm = NULL;
  51.   goto exit;
  52. }

   首先,如果设置了 NCCL_COMM_ID 环境变量,并且当前进程是root 进程的话,则直接调用 bootstrapCreateRoot 函数。我们在{BANNED}中国第一部分中已经介绍了 bootstrapCreateRoot 函数,该函数在 root 进程建立监听端口,并将监听地址和端口信息赋值给 ncclUniqueId 数据结构。此外,该函数调用 bootstrapRoot 线程,等待其他rank通信,并在所有进程间建立通信环。需要说明的是,如果设置了 NCCL_COMM_ID 环境变量,root 进程在调用 ncclGetUniqueId 时会跳过 bootstrapCreateRoot 函数的调用。因此,我们需要在这个地方调用 bootstrapCreateRoot 来构建通信环。

接着ncclCommInitRankDev 函数会调用 ncclInit 函数初始化环境变量和 GdrCopy 功能,并设置 bootstrap 网络使用的网络接口,如下面的代码所示。该函数调用 initOnceFunc 函数,并通过 pthread_once 保证 initOnceFunc 在多线程环境中只执行一次。由于对于 root 进程,已经在调用 ncclGetUniqueId 函数时执行了 ncclInit 函数,因此这里不会重复执行 ncclInit 函数。而对于非root进程,这里会执行ncclInit

点击(此处)折叠或打开

  1. static ncclResult_t ncclInit() {
  2.   pthread_once(&initOnceControl, initOnceFunc);
  3.   return initResult;
  4. }

然后ncclCommInitRankDev ncclComm_t 类型的通信子分配空间,并初始化其中的一些域。此外,调用 parseCommConfig 函数使用 config 参数初始化通信子的配置信息,主要是设置为默认值。

下面给出 config 的数据结构,用户可以通过更改 config 中的属性值更改通信子的行为。

点击(此处)折叠或打开

  1. typedef struct ncclConfig_v21700 {
  2.   /* attributes that users should never touch. */
  3.   size_t size;
  4.   unsigned int magic;
  5.   unsigned int version;
  6.   /* attributes that users are able to customize. */
  7.   int blocking;
  8.   int cgaClusterSize;
  9.   int minCTAs;
  10.   int maxCTAs;
  11.   const char *netName;
  12.   int splitShare;
  13. } ncclConfig_t;

    可见,这些配置信息包括 sizemagicversionblockingcgaClusterSizeminCTAsmaxCTAsnetName splitShare 等。其中,size 表示该结构体的字节大小,magic 是默认为 0xcafebeef 的魔法值,version 表示当前 NCCL 的版本。对于这几个字段,由 NCCL 自动赋值,用户不要更改。此外,其它几个字段的解释如下:

l  netName 标识使用的网络名称,由 NCCL_NET 环境变量控制,其值必须与所使用的 NCCL 网络的名称完全匹配且不区分大小写。内部网络名称为 "IB" "Socket" (TCP/IP sockets)。外部网络插件定义它们自己的名称。默认值未定义。

l  blocking 控制是否允许 NCCL 调用阻塞,这包括对 NCCL 的所有调用,包括初始化/终止函数以及由于发送/接收调用的连接懒初始化而可能阻塞的通信函数,由 NCCL_COMM_BLOCKING 环境变量控制。默认地,通信子是阻塞的

l  cgaClusterSize 设置 CUDA Cooperative Group Array (CGA) 集群大小,由 NCCL_CGA_CLUSTER_SIZE 环境变量控制。将此值设置为非零会使 NCCL 启动通信核时相应地设置集群维度属性。默认地,NCCL 会将此值设置为 4

l  minCTAs 设置 NCCL 应使用的{BANNED}最佳小 CTA 数量,由 NCCL_MIN_CTAS 环境变量控制。默认地,NCCL 将此值设置为 1

l  maxCTAs 设置 NCCL 应使用的{BANNED}最佳大 CTA 数量,由 NCCL_MAX_CTAS 环境变量控制。默认地,NCCL 将此值设置为{BANNED}最佳大通道值 32

l  split 控制分割共享资源,由 COMM_SPLIT_SHARE_RESOURCES 环境变量控制。默认地,NCCL 将此值设置为 0,表示不分割共享资源。

随后,检查环境变量是否包含这些配置,如果包括则用环境变量中的值覆盖它们。ncclCommInitRankDev 函数还会在 root 进程上显示 NCCL 版本信息。

{BANNED}最佳后,异步调用 ncclCommInitRankFunc 函数继续完成剩余的初始化操作。下面看一下ncclCommInitRankDev 的完整流程。

l  ncclCommInitRankFunc

下面看一下ncclCommInitRankFunc函数,ncclCommInitRankFunc 函数继续完成剩余的通信子初始化操作。

点击(此处)折叠或打开

  1. static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
  2.   struct ncclCommInitRankAsyncJob* job = (struct ncclCommInitRankAsyncJob*)job_;
  3.   ncclComm_t comm = job->comm;
  4.   ncclResult_t res = ncclSuccess;
  5.   int archMajor, archMinor;
  6.   size_t maxLocalSizeBytes = 0;
  7.   int cudaDev = job->cudaDev;
  8.   int* parentRanks = NULL;
  9.   int cudaArch;
  10.   uint64_t timers[TIMERS_INIT_COUNT];

  11.   timers[TIMER_INIT_TOTAL] = clockNano();
  12.   CUDACHECKGOTO(cudaSetDevice(cudaDev), res, fail);
  13.   CUDACHECKGOTO(cudaDeviceGetAttribute(&archMajor, cudaDevAttrComputeCapabilityMajor, cudaDev), res, fail);
  14.   CUDACHECKGOTO(cudaDeviceGetAttribute(&archMinor, cudaDevAttrComputeCapabilityMinor, cudaDev), res, fail);
  15.   cudaArch = 100*archMajor + 10*archMinor;

  16.   timers[TIMER_INIT_KERNELS] = clockNano();
  17.   NCCLCHECK(ncclInitKernelsForDevice(cudaArch, &maxLocalSizeBytes));
  18.   // Set the maximum kernel stack size of all kernels to avoid
  19.   // a CUDA memory reconfig on load (c.f. NVSHMEM issue)
  20.   if (maxLocalSizeBytes > 0 && ncclParamSetStackSize() == 1) {
  21.     TRACE(NCCL_INIT, "Setting cudaLimitStackSize to %zu", maxLocalSizeBytes);
  22.     CUDACHECKIGNORE(cudaDeviceSetLimit(cudaLimitStackSize, maxLocalSizeBytes));
  23.   }
  24.   timers[TIMER_INIT_KERNELS] = clockNano() - timers[TIMER_INIT_KERNELS];

  25.   timers[TIMER_INIT_BOOTSTRAP] = clockNano();
  26.   if (job->parent) { //当前的配置下 job->parent=NULL,所以跳过if直接看else
  27.     NCCLCHECKGOTO(ncclCalloc(&parentRanks, job->parent->nRanks), res, fail);
  28.     NCCLCHECKGOTO(commGetSplitInfo(comm, job->parent, job->color, job->key, &job->nranks, &job->myrank, parentRanks), res, fail);
  29.     // Negative color does not create a new comm object. We needed to take part in the allgather, but we're done now.
  30.     if (job->color == NCCL_SPLIT_NOCOLOR) goto exit;
  31.     snprintf((char*)&job->commId, sizeof(job->commId), "%016lx-%d", job->parent->commHash, job->color);
  32.     NCCLCHECKGOTO(commAlloc(comm, job->parent, job->nranks, job->myrank), res, fail);
  33.     NCCLCHECKGOTO(bootstrapSplit((struct ncclBootstrapHandle*)&job->commId, comm, job->parent, job->color, job->key, parentRanks), res, fail);
  34.   } else {
  35.     NCCLCHECKGOTO(commAlloc(comm, NULL, job->nranks, job->myrank), res, fail);
  36.     NCCLCHECKGOTO(bootstrapInit((struct ncclBootstrapHandle*)&job->commId, comm), res, fail);
  37.   }
  38.   timers[TIMER_INIT_BOOTSTRAP] = clockNano() - timers[TIMER_INIT_BOOTSTRAP];

  39.   comm->cudaArch = cudaArch;
  40.   comm->commHash = getHash(job->commId.internal, NCCL_UNIQUE_ID_BYTES);

  41.   if (job->parent) { //当前的配置下 job->parent=NULL,所以跳过if直接看else
  42.     INFO(NCCL_INIT,"ncclCommSplit comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p color %d key %d commId 0x%llx - Init START",
  43.     comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->color, job->key, (unsigned long long)hashUniqueId(job->commId));
  44.   } else {
  45.     INFO(NCCL_INIT,"ncclCommInitRank comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init START",
  46.     comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, (unsigned long long)hashUniqueId(job->commId));
  47.   }

  48.   NCCLCHECKGOTO(initTransportsRank(comm, job->parent, timers), res, fail);

  49.   NCCLCHECKGOTO(ncclTunerPluginLoad(comm), res, fail);
  50.   if (comm->tuner) {
  51.     NCCLCHECK(comm->tuner->init(comm->nRanks, comm->nNodes, ncclDebugLog, &comm->tunerContext));
  52.   }

  53.   // update communicator state
  54.   comm->initState = ncclSuccess;
  55.   timers[TIMER_INIT_TOTAL] = clockNano() - timers[TIMER_INIT_TOTAL];

  56.   // Trace this call for replay tool
  57.   if (job->parent) {
  58.     /* unlink child abort flag. */
  59.     __atomic_store_n(&job->parent->childAbortFlag, NULL, __ATOMIC_RELEASE);
  60.     TRACE_CALL("ncclCommSplit(%p, %d, %d, %p, %d, %d)",
  61.                 job->parent, job->color, job->key, comm, comm->rank, comm->nRanks);
  62.   } else {
  63.     TRACE_CALL("ncclCommInitRank(%p, %d, 0x%llx, %d, %d)",
  64.                 comm, comm->nRanks, (unsigned long long)hashUniqueId(job->commId), comm->rank, comm->cudaDev);
  65.   }

  66.   if (job->parent) {
  67.     INFO(NCCL_INIT,"ncclCommSplit comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p color %d key %d commId 0x%llx - Init COMPLETE",
  68.     comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->color, job->key, (unsigned long long)hashUniqueId(job->commId));
  69.   } else {
  70.     INFO(NCCL_INIT,"ncclCommInitRank comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init COMPLETE",
  71.     comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, (unsigned long long)hashUniqueId(job->commId));
  72.   }
  73.   INFO(NCCL_INIT|NCCL_PROFILE,"Init timings: rank %d nranks %d total %.2f (kernels %.2f, bootstrap %.2f, allgathers %.2f, topo %.2f, graphs %.2f, connections %.2f, rest %.2f)", comm->rank, comm->nRanks, timers[TIMER_INIT_TOTAL]/1e9,
  74.     timers[TIMER_INIT_KERNELS]/1e9, timers[TIMER_INIT_BOOTSTRAP]/1e9, timers[TIMER_INIT_ALLGATHER]/1e9, timers[TIMER_INIT_TOPO]/1e9, timers[TIMER_INIT_GRAPHS]/1e9, timers[TIMER_INIT_CONNECT]/1e9,
  75.     (timers[TIMER_INIT_TOTAL]-timers[TIMER_INIT_KERNELS]-timers[TIMER_INIT_BOOTSTRAP]-timers[TIMER_INIT_ALLGATHER]-timers[TIMER_INIT_TOPO]-timers[TIMER_INIT_GRAPHS]-timers[TIMER_INIT_CONNECT])/1e9);
  76. exit:
  77.   if (job->newcomm) {
  78.     /* assign it to user pointer. */
  79.     __atomic_store_n(job->newcomm, comm, __ATOMIC_RELEASE);
  80.   }
  81.   free(parentRanks);
  82.   return res;
  83. fail:
  84.   comm->initState = res;
  85.   goto exit;
  86. }

   其对应处理流程图如下所示

首先,调用 cudaSetDevice 设置当前设备,获取当前设备的算力和版本信息。

接着,调用 ncclInitKernelsForDevice 获取所有 CUDA kernels 中{BANNED}最佳大栈大小,并在设置环境变量 NCCL_SET_STACK_SIZE 时调用 cudaDeviceSetLimit 函数设置{BANNED}最佳大栈大小,以避免由于 CUDA 显存重配置导致的 hang 问题。这里,CUDA kernels 包含 NCCL 支持的所有算子,包括 Broadcast/Reduce/AllGather/ReduceScatter/AllReduce 等。

当前的配置下 job->parent=NULL ,所以我们先跳过相关代码。

然后,调用 commAlloc 继续初始化通信子结构体信息,包括初始化 memPermanent memScoped 数据结构、本设备的 rank、设备总数和当前设备等。

l  commAlloc

点击(此处)折叠或打开

  1. static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, int ndev, int rank) {
  2.   if (ndev < 1) {
  3.     WARN("invalid device count (%d) requested", ndev);
  4.     return ncclInvalidArgument;
  5.   }
  6.   if (rank >= ndev || rank < 0) {
  7.     WARN("rank %d exceeds ndev=%d", rank, ndev);
  8.     return ncclInvalidArgument;
  9.   }

  10.   ncclMemoryStackConstruct(&comm->memPermanent);
  11.   ncclMemoryStackConstruct(&comm->memScoped);
  12.   comm->destructorHead = nullptr;
  13.   comm->rank = rank;
  14.   comm->nRanks = ndev;

  15.   NCCLCHECK(ncclNetPluginLoad(comm));
  16.   NCCLCHECK(ncclNetInit(comm));
  17.   INFO(NCCL_INIT, "Using network %s", comm->ncclNet->name);

  18.   if (parent && parent->config.splitShare) {
  19.     if (parent->ncclNet != comm->ncclNet) {
  20.       WARN("Split shares resources, but parent comm netName %s is different from child comm netName %s", parent->ncclNet->name, comm->ncclNet->name);
  21.       return ncclInvalidUsage;
  22.     }
  23.   }
  24.   // Try to create a CUDA object right away. If there is something wrong with
  25.   // the device we're on (failure cause #1) , better know it early.
  26.   CUDACHECK(cudaGetDevice(&comm->cudaDev));

  27.   NCCLCHECK(getBusId(comm->cudaDev, &comm->busId));
  28.   nvmlDevice_t nvmlDev;
  29.   char busId[NVML_DEVICE_PCI_BUS_ID_BUFFER_SIZE];
  30.   NCCLCHECK(int64ToBusId(comm->busId, busId));
  31.   NCCLCHECK(ncclNvmlDeviceGetHandleByPciBusId(busId, &nvmlDev));
  32.   NCCLCHECK(ncclNvmlDeviceGetIndex(nvmlDev, (unsigned int*)&comm->nvmlDev));

  33.   comm->compCap = ncclCudaCompCap();
  34.   TRACE(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx compCap %d", comm, rank, ndev, comm->cudaDev, comm->busId, comm->compCap);

  35.   comm->checkPointers = ncclParamCheckPointers() == 1 ? true : false;
  36.   comm->dmaBufSupport = (dmaBufSupported(comm) == ncclSuccess) ? true : false;

  37.   comm->collNetSupport = 0;
  38.   memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix));

  39.   ncclMemoryPoolConstruct(&comm->memPool_ncclKernelPlan);
  40.   ncclMemoryPoolConstruct(&comm->memPool_ncclProxyOp);

  41.   comm->groupNext = reinterpret_cast<struct ncclComm*>(0x1);
  42.   comm->preconnectNext = reinterpret_cast<struct ncclComm*>(0x1);

  43.   static_assert(MAXCHANNELS <= sizeof(*comm->connectSend)*8, "comm->connectSend must have enough bits for all channels");
  44.   static_assert(MAXCHANNELS <= sizeof(*comm->connectRecv)*8, "comm->connectRecv must have enough bits for all channels");
  45.   NCCLCHECK(ncclCalloc(&comm->connectSend, comm->nRanks));
  46.   NCCLCHECK(ncclCalloc(&comm->connectRecv, comm->nRanks));

  47.   // Mark channels as non initialized.
  48.   for (int c=0; c < MAXCHANNELS; c++) comm->channels[c].id = -1;

  49.   if (parent == NULL || !parent->config.splitShare) {
  50.     struct ncclSharedResources* sharedRes = NULL;
  51.     NCCLCHECK(ncclCalloc(&sharedRes, 1));
  52.     /* most of attributes are assigned later in initTransportsRank(). */
  53.     sharedRes->owner = comm;
  54.     sharedRes->tpNRanks = comm->nRanks;
  55.     NCCLCHECK(ncclCalloc(&sharedRes->tpRankToLocalRank, comm->nRanks));
  56.     NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->deviceStream));
  57.     NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->hostStream));
  58.     comm->sharedRes = sharedRes;
  59.     sharedRes->refCount = 1;
  60.   } else {
  61.     comm->sharedRes = parent->sharedRes;
  62.     ncclAtomicRefCountIncrement(&parent->sharedRes->refCount);
  63.   }

  64.   if (comm->topParentRanks == NULL) {
  65.     NCCLCHECK(ncclCalloc(&comm->topParentRanks, comm->nRanks));
  66.     for (int i = 0; i < comm->nRanks; ++i)
  67.       comm->topParentRanks[i] = i;
  68.   }

  69.   ncclIntruQueueMpscConstruct(&comm->callbackQueue);

  70.   comm->regCache.pageSize = sysconf(_SC_PAGESIZE);
  71.   return ncclSuccess;
  72. }

commAlloc 还尝试调用 ncclNetPluginLoad ncclNetInit 函数加载 NetPlugin 模块,以初始化 ncclNets ncclCollNets

l  ncclNetPluginLoad

点击(此处)折叠或打开

  1. ncclResult_t ncclNetPluginLoad(struct ncclComm* comm) {
  2.   netPluginLib = openNetPluginLib(couldNotFindNames, MAX_PLUGIN_LOAD * PATH_MAX);
  3.   if (netPluginLib == nullptr) {
  4.     goto fail;
  5.   }
  6.   ncclNets[0] = (ncclNet_v8_t*)dlsym(netPluginLib, "ncclNetPlugin_v8");
  7.   if (ncclNets[0] == nullptr) {
  8.     // Try v7 plugin
  9.     ncclNet_v7 = (ncclNet_v7_t*)dlsym(netPluginLib, "ncclNetPlugin_v7");
  10.     ... ...
  11.   }
  12.   // Check for CollNet
  13.   ncclCollNets[0] = (ncclCollNet_v8_t*)dlsym(netPluginLib, "ncclCollNetPlugin_v8");
  14.   if (ncclCollNets[0] == nullptr) {
  15.     ncclCollNet_v7 = (ncclCollNet_v7_t*)dlsym(netPluginLib, "ncclCollNetPlugin_v7");
  16.     ... ...
  17.   }
  18.   ++netPluginRefCount;
  19.   netPluginStatus = netPluginLoadSuccess;
  20.   comm->netPluginLoaded = 1;
  21. exit:
  22.   pthread_mutex_unlock(&netPluginLock);
  23.   return ncclSuccess;
  24. }

l  ncclNetInit

点击(此处)折叠或打开

  1. ncclResult_t ncclNetInit(struct ncclComm* comm) {
  2.   // Initialize main communication network
  3.   const char* netName;
  4.   bool ok = false;
  5.   netName = comm->config.netName;
  6.   for (int i=0; i<3; i++) {
  7.     if (ncclNets[i] == nullptr) continue;
  8.     enum ncclNetState state;
  9.     NCCLCHECK(netGetState(i, &state));
  10.     if (state != ncclNetStateEnabled) continue;
  11.     if (netName && strcasecmp(netName, ncclNets[i]->name) != 0) continue;
  12.     if (ncclSuccess != ncclNetCheckDeviceVersion(comm, ncclNets[i], 0)) {
  13.       // Mismatched device plugin version
  14.       continue;
  15.     }
  16.     comm->ncclNet = ncclNets[i];
  17.     ok = true;
  18.     if (ncclCollNets[i]) {
  19.       NCCLCHECK(collNetGetState(i, &state));
  20.       if (state == ncclNetStateEnabled) {
  21.         comm->ncclCollNet = ncclCollNets[i];
  22.       }
  23.     }
  24.     break;
  25.   }

  26.   return ncclSuccess;
  27. }

这一过程描述如下:

1.   如果设置了 NCCL_NET_PLUGIN环境变量,则尝试加载该环境为名称的动态库;

2.   如果设置了 NCCL_NET_PLUGIN 环境变量但前述过程失败,则尝试加载 libnccl-net-.so

3.   如果没有设置 NCCL_NET_PLUGIN 环境变量,尝试加载 libnccl-net.so 库;

4.   如果找不到 plugin,则使用内部网络 plugin

例如,如果设置 NCCL_NET_PLUGIN=aws 那么 NCCL 会尝试加载 aws。如果无法找到 aws 库,则尝试加载 libnccl-net-aws.so

注意到上述代码使用到了ncclCollNets[0],在nccl中有如下全局初始化:

ncclNet_t* ncclNets[3] = { nullptr, &ncclNetIb, &ncclNetSocket };

ncclCollNet_t* ncclCollNets[3] = { nullptr, nullptr, nullptr };

其中ncclNets表示nccl的网络插件,如果指定了外部插件,就会赋值给ncclCollNets[0]优先使用,否则就会使用ncclNetIbncclNetSocket这两个内部插件

ncclCollNets则表示SHARP插件。

然后,ncclCommInitRankFunc 函数调用 bootstrapInit 函数初始化 bootstrap 网络。关于 bootstrapInit 的具体实现,我们放在后面一个小节介绍。

{BANNED}最佳后,ncclCommInitRankFunc 调用 initTransportsRank 函数探索网络拓扑,并调用 ncclTunerPluginLoad 加载和和初始化 libnccl-tuner.so

下面首先看前面提到的bootstrapInit函数。

l  bootstrapInit

bootstrapInit函数真正完成了和前面root rankbootstrapRoot线程配合完成了环形网络的建立。

点击(此处)折叠或打开

  1. ncclResult_t bootstrapInit(struct ncclBootstrapHandle* handle, struct ncclComm* comm) {
  2.   int rank = comm->rank;
  3.   int nranks = comm->nRanks;
  4.   struct bootstrapState* state;
  5.   struct ncclSocket* proxySocket;
  6.   ncclSocketAddress nextAddr;
  7.   struct ncclSocket sock, listenSockRoot;
  8.   struct extInfo info = { 0 };

  9.   NCCLCHECK(ncclCalloc(&state, 1));
  10.   state->rank = rank;
  11.   state->nranks = nranks;
  12.   state->abortFlag = comm->abortFlag;
  13.   comm->bootstrap = state;
  14.   comm->magic = state->magic = handle->magic;

  15.   TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);

  16.   info.rank = rank;
  17.   info.nranks = nranks;
  18.   // Create socket for other ranks to contact me
  19.   NCCLCHECK(ncclSocketInit(&state->listenSock, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
  20.   NCCLCHECK(ncclSocketListen(&state->listenSock));
  21.   NCCLCHECK(ncclSocketGetAddr(&state->listenSock, &info.extAddressListen));

  22.   // Create socket for root to contact me
  23.   NCCLCHECK(ncclSocketInit(&listenSockRoot, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
  24.   NCCLCHECK(ncclSocketListen(&listenSockRoot));
  25.   NCCLCHECK(ncclSocketGetAddr(&listenSockRoot, &info.extAddressListenRoot));

  26.   // stagger connection times to avoid an overload of the root
  27.   if (nranks > 128) {
  28.     long msec = rank;
  29.     struct timespec tv;
  30.     tv.tv_sec = msec / 1000;
  31.     tv.tv_nsec = 1000000 * (msec % 1000);
  32.     TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);
  33.     (void) nanosleep(&tv, NULL);
  34.   }

  35.   // send info on my listening socket to root
  36.   /* rank=i 的进程(当前进程)通过 ncclUniqueId 中的bootstrapNetIfAddr创建socket,并通过这个socket将将extAddressListenRoot 和 extAddressListen 发送到 root 进程。这样,root 进程就获取了所有进程的监听端信息 */
  37.   NCCLCHECK(ncclSocketInit(&sock, &handle->addr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
  38.   NCCLCHECK(ncclSocketConnect(&sock));
  39.   NCCLCHECK(bootstrapNetSend(&sock, &info, sizeof(info)));
  40.   NCCLCHECK(ncclSocketClose(&sock));

  41.   // get info on my "next" rank in the bootstrap ring from root
  42.   /* rank=i 的进程在与 root 连接的监听端口上等待 root 进程发送的 rank=i+1 进程的监听端信息。这样,就可以构建通信环。接着进程关闭与 root 连接的监听端口。这样,每个进程上只剩下与其它进程进行连接的监听端 */
  43.   NCCLCHECK(ncclSocketInit(&sock));
  44.   NCCLCHECK(ncclSocketAccept(&sock, &listenSockRoot));
  45.   NCCLCHECK(bootstrapNetRecv(&sock, &nextAddr, sizeof(union ncclSocketAddress)));
  46.   NCCLCHECK(ncclSocketClose(&sock));
  47.   NCCLCHECK(ncclSocketClose(&listenSockRoot));
  48.   /* 进程在 ringSendSocket 上连接其后继进程,并在 ringRecvSocket 接收来自其前序进程的连接。
  49.      这样,进程就可以和其相邻的进程上进行通信。对于 rank=i 的进程,其相邻的进程指的是后继 rank=(i+1)%nranks 和前序 rank=(i-1)%nranks 的进程。*/
  50.   NCCLCHECK(ncclSocketInit(&state->ringSendSocket, &nextAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
  51.   NCCLCHECK(ncclSocketConnect(&state->ringSendSocket));
  52.   // Accept the connect request from the previous rank in the AllGather ring
  53.   NCCLCHECK(ncclSocketInit(&state->ringRecvSocket));
  54.   NCCLCHECK(ncclSocketAccept(&state->ringRecvSocket, &state->listenSock));

  55.   // AllGather all listen handlers
  56.   /* 以 Ring 的方式实现进程监听地址的 AllGather。这样,每个进程上就获得了所有进程的监听地址。也就是说,每个进程都可以和其它任意进程通信。 */
  57.   NCCLCHECK(ncclCalloc(&state->peerCommAddresses, nranks));
  58.   NCCLCHECK(ncclSocketGetAddr(&state->listenSock, state->peerCommAddresses+rank));
  59.   NCCLCHECK(bootstrapAllGather(state, state->peerCommAddresses, sizeof(union ncclSocketAddress)));
  60.   
  61.   // Create the service proxy
  62.   /* 建立 proxy 监听端口,并采用 bootstrapAllGather 方法获取所有进程的 proxy 监听端口和所有进程的 UDS 信息。其中,UDS 是由于一个随机数和进程号哈希值组成的 64 比特值。 */
  63.   NCCLCHECK(ncclCalloc(&state->peerProxyAddresses, nranks));
  64.   NCCLCHECK(ncclCalloc(&state->peerProxyAddressesUDS, nranks));

  65.   // proxy is aborted through a message; don't set abortFlag
  66.   NCCLCHECK(ncclCalloc(&proxySocket, 1));
  67.   NCCLCHECK(ncclSocketInit(proxySocket, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeProxy, comm->abortFlag));
  68.   NCCLCHECK(ncclSocketListen(proxySocket));
  69.   NCCLCHECK(ncclSocketGetAddr(proxySocket, state->peerProxyAddresses+rank));
  70.   NCCLCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union ncclSocketAddress)));
  71.   // cuMem UDS support
  72.   // Make sure we create a unique UDS socket name
  73.   uint64_t randId;
  74.   NCCLCHECK(getRandomData(&randId, sizeof(randId)));
  75.   state->peerProxyAddressesUDS[rank] = getPidHash()+randId;
  76.   NCCLCHECK(bootstrapAllGather(state, state->peerProxyAddressesUDS, sizeof(*state->peerProxyAddressesUDS)));
  77.   /* 调用 ncclProxyInit 函数执行初始化 */
  78.   NCCLCHECK(ncclProxyInit(comm, proxySocket, state->peerProxyAddresses, state->peerProxyAddressesUDS));

  79.   TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);

  80.   return ncclSuccess;
  81. }

其对应处理流程图如下:

bootstrapInit 函数在 bootstrap 网卡上创建两个监听端,分别用于连接 root 进程和非 root 进程,并分别将它们的 sockaddr 数据结构存储在 extAddressListenRoot extAddressListen 结构体中。此外,当进程总数超过 128 时,为了避免 root 过载,会延迟连接 root 的时间。延迟时间为 rank/1000+rank%1000 毫秒。

我们在{BANNED}中国第一部分中介绍 bootstrapCreateRoot 时介绍了通信环的构建过程。简单来说,root 进程等待所有进程连接自己,并将它们的监听端口信息发送给 root 进程。然后,root 进程给所有进程发送它们后继进程的监听端口信息。这里bootstrapInit所做的工作就是配合root进程完成上述工作的。其时序和调用关系如下图所示。

 

l  bootstrapRingAllGather

下面我们看一下其中bootstrapRingAllGather的实现,在 RingAllGather 算法的第 s 步,rank=i 的进程从其前序 rank=i-1 进程接收 rank=rank-s-1 的监听地址,并将第 s-1 步获取的 rank-s 的监听地址发送到后继 rank=i+1。具体过程如下所示。

点击(此处)折叠或打开

  1. ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
  2.   struct bootstrapState* state = (struct bootstrapState*)commState;
  3.   int rank = state->rank;
  4.   int nranks = state->nranks;

  5.   TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);

  6.   NCCLCHECK(bootstrapRingAllGather(&state->ringRecvSocket, &state->ringSendSocket, rank, nranks, (char*)allData, size));

  7.   TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
  8.   return ncclSuccess;
  9. }

我们以下图为例说明 RingAllGather 的过程。在初始状态下,每个进程只有自己的监听地址信息。进程首先将自己的地址信息发送到其后继进程。在后续每一步骤中,进程将其在上一步骤中接收到的监听地址信息发送到后继进程。这样,经过整个流程后,每个进程都得到了所有进程监听地址。

{BANNED}最佳后我们看一下ncclProxyInit的实现,ncclProxyInit 函数初始化 proxy 信息,并调用 ncclIpcSocketInit 函数创建一个 Unix 域的监听端 ipcSock。这里,UDS Unix Domain Socket 的简称。

l  ncclProxyInit

点击(此处)折叠或打开

  1. ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union ncclSocketAddress* peerAddresses, uint64_t *peerAddressesUDS) {
  2.   assert(comm->sharedRes->proxyState == NULL);
  3.   NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1));
  4.   comm->proxyState = comm->sharedRes->proxyState;
  5.   comm->proxyState->refCount = 1;
  6.   comm->proxyState->listenSock = sock;
  7.   comm->proxyState->peerAddresses = peerAddresses;
  8.   comm->proxyState->peerAddressesUDS = peerAddressesUDS;

  9.   // UDS support
  10.   NCCLCHECK(ncclIpcSocketInit(&comm->proxyState->ipcSock, comm->rank, peerAddressesUDS[comm->rank], comm->abortFlag));
  11.   return ncclSuccess;
  12. }

小结

在本部分,我们讨论了 NCCL 如何初始化 bootstrap 网络和通信子。我们将复杂的拓扑检测和建立通信通道部分留在下一部分介绍。我们本小结介绍的流程都是ncclCommInitRank内部的逻辑,整体流程如下,其中{BANNED}最佳后的initTransportsRankncclTunerPluginLoad我们下个小结再介绍。

阅读(176) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~