Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3643851
  • 博文数量: 213
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7427
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 18:56
个人简介

将晦涩难懂的技术讲的通俗易懂

文章分类

全部博文(213)

文章存档

2025年(5)

2024年(11)

2023年(9)

2022年(4)

2021年(12)

2020年(8)

2019年(18)

2018年(19)

2017年(9)

2016年(26)

2015年(18)

2014年(54)

2013年(20)

分类: 云计算

2025-02-23 22:22:17

NCCL概述和NCCL-Test分析

——lvyilong316
      NCCLNVIDIA Collective Communications Library)是一种由NVIDIA开发的高性能通信库,专门用于加速多GPU系统中的并行计算工作负载。NCCL旨在优化多GPU之间的数据传输和通信,以实现更快的并行计算速度。

NCCL具备以下通信模式:

l  点对点通信:允许两个特定的GPU之间直接交换数据。

l  群集通信:NCCL还支持集体通信,这些操作涉及多个GPU之间的数据交换

NCCL还具有的其他功能

l  拓扑感知通信:识别多GPU系统的拓扑结构,并优化通信以{BANNED}最佳大程度地减少通信延迟和带宽消耗。

l  异步通信:允许计算操作与通信操作并行执行,从而提高了系统的效率。

那么也就是说我们可以通过一下几个方向来了解什么是NCCL

代码结构

NCCL仓库:NCCL [Github]: git clone

后续我们选择v2.22.3-1这个tag的代码进行源码分析。由于NCCL是一个通信库,类似DPDK一样,不是一个服务或进程,所以它不存在一个main函数。那么我们如何开始着手分析呢?这就不得不提另一个重要的工具nccl-test

nccl-test

nccl-test 工具是 nvidia 开源的一项用于测试 NCCL 集合通信的工具。可以用于检测集合通信是否正常、压测集合通信速率。官方开源地址:。而运行nccl-test又通常需要借助mpirun命令。

mpirun

    mpirun是一个用于启动和管理 MPI(消息传递接口)程序的实用程序。它允许您在单个节点或多个节点上并行运行程序。如下所示,我们有两台机器,使用mpirun在两台机器上分别运行一个pwd命令。

点击(此处)折叠或打开

  1. -> # cat hostfile
  2. 22.22.22.175
  3. 22.22.22.121

  4. -> # /usr/local/openmpi/bin/mpirun --allow-run-as-root -np 2 -hostfile hostfile pwd
  5. /root
  6. /root

    了解上述mpirun的作用,就很容易理解一般测试nccl-test的命令了,一个典型的例子如下所示:

点击(此处)折叠或打开

  1. # 2台机器,16 张 GPU卡,执行 all_reduce_perf 测试
  2.  mpirun -np 16 \
  3.         -H 172.16.2.8:8,172.16.2.13:8 \
  4.         --allow-run-as-root -bind-to none -map-by slot \
  5.         -x NCCL_DEBUG=INFO \
  6.         -x NCCL_IB_GID_INDEX=3 \
  7.         -x NCCL_IB_DISABLE=0 \
  8.         -x NCCL_SOCKET_IFNAME=eth0 \
  9.         -x NCCL_NET_GDR_LEVEL=2 \
  10.         -x NCCL_IB_QPS_PER_CONNECTION=4 \
  11.         -x NCCL_IB_TC=160 \
  12.         -x LD_LIBRARY_PATH -x PATH \
  13.         -mca coll_hcoll_enable 0 -mca pml ob1 -mca btl_tcp_if_include eth0 -mca btl ^openib \
  14.         all_reduce_perf -b 32M -e 1G -i 1000 -f 2 -g 1

可以看出,这个命令是分别在172.16.2.8172.16.2.13两个host上,各启动8个(IP后面的冒号指定每个host上的进程数)all_reduce_perf进程。当然mpirunnccl-test分别有很多属于自己的参数,这个网上有很多说明,这里就不再赘述了。

nccl test源码分析

下面我们简单分析一下nccl test这个测试框架。nccl test有几个测试文件,如下图

每一个测试文件会单独生成一个测试程序,并以"_perf"为后缀生成可执行的文件。每个测试源码文件会和两个公共文件common.cutimer.cc,以及ncclcudampi这三个lib一起编译生成可执行的文件。

common.cu

  common.cu为公共文件,提供测试测试程序框架,包括main函数、参数解析,并调用每个测试文件提供的公共调用接口,以初始化测试程序和运行测试程序。

下面对run函数的代码进行分析,run函数开始部分,首先获取MPI相关信息,具体信息有:总进程数,当前进程的rank编号,当前主机上所有的进程个数。如果对进程划分了子组(配置了NCCL_TESTS_SPLIT_MASK环境变量),则创建当前进程所在子组的mpi通信器。获取当前子组中的进程总数,获取当前进程在子组中的rank编号环境变量,则默认所有进程在同一个子组中。

点击(此处)折叠或打开

  1. testResult_t run() {
  2.   int totalProcs = 1, proc = 0, ncclProcs = 1, ncclProc = 0, color = 0;
  3.   int localRank = 0;
  4.   char hostname[1024];
  5.   getHostName(hostname, 1024);

  6. #ifdef MPI_SUPPORT
  7.   MPI_Comm_size(MPI_COMM_WORLD, &totalProcs);
  8.   MPI_Comm_rank(MPI_COMM_WORLD, &proc);
  9.   uint64_t hostHashs[totalProcs];
  10.   hostHashs[proc] = getHostHash(hostname);
  11.   MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD);
  12.   for (int p=0; p<totalProcs; p++) {
  13.     if (p == proc) break;
  14.     if (hostHashs[p] == hostHashs[proc]) localRank++;
  15.   }

  16.   char* str = getenv("NCCL_TESTS_SPLIT_MASK");
  17.   uint64_t mask = str ? strtoul(str, NULL, 16) : 0;
  18.   MPI_Comm mpi_comm;
  19.   color = proc & mask;
  20.   MPI_Comm_split(MPI_COMM_WORLD, color, proc, &mpi_comm);
  21.   MPI_Comm_size(mpi_comm, &ncclProcs);
  22.   MPI_Comm_rank(mpi_comm, &ncclProc);
  23. #endif
  24.   is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;

  25.   PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
  26.         nThreads, nGpus, minBytes, maxBytes,
  27.         (stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
  28.         warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
  29.   if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
  30.   if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
  31.   PRINT("#\n");

  32.   PRINT("# Using devices\n");
  33. #define MAX_LINE 2048

紧接着获取{BANNED}最佳大可用的gpu缓存大小(从众多要使用的GPU中,选取缓存{BANNED}最佳小的GPU的缓存值)

l  获取当前进程使用的GPU的{BANNED}最佳大内存

n  一个进程可以有n个线程,每个线程可以使用nGPU

n  选取缓存{BANNED}最佳小的GPU的缓存值,作为一个maxMem

n  需要确定当前进程使用了哪些GPU,才能获取maxMem信息

n  进程分配GPU的规则:

u  一个主机可能有多个进程,每个进程可能使用多个GPU

u  cuda通常情况下,对于主机上可用的GPU已经从0开始做了编号

u  默认情况下,按照进程的rank编号从小到大来分配在GPU

u  默认情况下,分配的GPU的编号是从0开始分配

n  收集所有进程的{BANNED}最佳大可用GPU maxMem信息,并打印

n  使用MPI_Allreduce获取各个进程的maxMem,并使用{BANNED}最佳小的maxMem作为{BANNED}最佳终的maxMem

点击(此处)折叠或打开

  1. char line[MAX_LINE];
  2.   int len = 0;
  3.   size_t maxMem = ~0;
  4.   char* envstr = getenv("NCCL_TESTS_DEVICE");
  5.   int gpu0 = envstr ? atoi(envstr) : -1;
  6.   for (int i=0; i<nThreads*nGpus; i++) {
  7.     int cudaDev = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
  8.     int rank = proc*nThreads*nGpus+i;
  9.     cudaDeviceProp prop;
  10.     CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev));
  11.     len += snprintf(line+len, MAX_LINE-len, "# Rank %2d Group %2d Pid %6d on %10s device %2d [0x%02x] %s\n",
  12.                     rank, color, getpid(), hostname, cudaDev, prop.pciBusID, prop.name);
  13.     maxMem = std::min(maxMem, prop.totalGlobalMem);
  14.   }

  15. #if MPI_SUPPORT
  16.   char *lines = (proc == 0) ? (char *)malloc(totalProcs*MAX_LINE) : NULL;
  17.   // Gather all output in rank order to root (0)
  18.   MPI_Gather(line, MAX_LINE, MPI_BYTE, lines, MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
  19.   if (proc == 0) {
  20.     for (int p = 0; p < totalProcs; p++)
  21.       PRINT("%s", lines+MAX_LINE*p);
  22.     free(lines);
  23.   }
  24.   MPI_Allreduce(MPI_IN_PLACE, &maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
  25. #else
  26.   PRINT("%s", line);
  27. #endif

然后根据maxMem获取在gpu上申请的buff的{BANNED}最佳大字节数,并使用该值,来修正配置的maxBytes:将maxMem预留1G,剩下的根据需要切成2等份或者3等份(send buffrecv buff必包含,还有可能包含一个expected buff(需要参数指定使能了datacheck))作为申请的buff{BANNED}最佳大值。使用该值来修正maxBytes,即如果配置的maxBytes大于{BANNED}最佳大可设置的buff值,则maxBytes设置为{BANNED}最佳大可设置的buff值。子组中的rank 0的进程,获取一个ncclUniqueId,并使用MPI_Bcast同步给子组的其他进程。

点击(此处)折叠或打开

  1. // We need sendbuff, recvbuff, expected (when datacheck enabled), plus 1G for the rest.
  2.   size_t memMaxBytes = (maxMem - (1<<30)) / (datacheck ? 3 : 2);
  3.   if (maxBytes > memMaxBytes) {
  4.     maxBytes = memMaxBytes;
  5.     if (proc == 0) printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
  6.   }

  7.   ncclUniqueId ncclId;
  8.   if (ncclProc == 0) {
  9.     NCCLCHECK(ncclGetUniqueId(&ncclId));
  10.   }
  11. #ifdef MPI_SUPPORT
  12.   MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, mpi_comm);
  13.   MPI_Barrier(MPI_COMM_WORLD); // Ensure Bcast is complete for HCOLL
  14. #endif

然后调用各个测试用例定义的getBuffSize函数,获取sendBytesrecvBytes大小,通常是根据maxBytes来设置。每个测试用例都要定义一个struct testEngine ncclTestEngine全局变量,ncclTestEngine变量中有两个函数指针:getBuffSizerunTest,每个测试用例都要实现这两个函数,以测试自己用例的目标。

点击(此处)折叠或打开

  1.   int gpus[nGpus*nThreads];
  2.   cudaStream_t streams[nGpus*nThreads];
  3.   void* sendbuffs[nGpus*nThreads];
  4.   void* recvbuffs[nGpus*nThreads];
  5.   void* expected[nGpus*nThreads];
  6.   size_t sendBytes, recvBytes;

  7.   ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)ncclProcs*nGpus*nThreads);

接着在当前进程使用的每个GPU上,都申请maxBytes大小的sendbuffrecvbuffexpected buff(如果指定要做datacheck),并根据需要,创建一个cudaStream。申请的buff指针记录在一个数组中。

点击(此处)折叠或打开

  1. envstr = getenv("NCCL_TESTS_DEVICE");
  2.   gpu0 = envstr ? atoi(envstr) : -1;
  3.   for (int i=0; i<nGpus*nThreads; i++) {
  4.     gpus[i] = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
  5.     CUDACHECK(cudaSetDevice(gpus[i]));
  6.     TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
  7.     if (streamnull)
  8.       streams[i] = NULL;
  9.     else
  10.       CUDACHECK(cudaStreamCreateWithFlags(streams+i, cudaStreamNonBlocking));
  11.   }

初始化nccl通讯器ncclComm_t comms,每个GPU需要一个。可以在主线程中初始化所有的线程的GPUcomms,也可以每个线程运行时,线程初始化自己使用的GPU的通讯器(指定参数:-p,--parallel_init <0/1>),这样速度更快一些。如果当前nccl子组中只有一个进程。则直接调用ncclCommInitAll来初始化commsncclCommInitAllnccl提供的用于创建单个进程的一组通信器。如果nccl子组有多个进程,则自己依次对每个GPU调用ncclCommInitRank,初始化通信器。关于通讯器ncclComm_t的创建在后续NCCL代码分析中再进行展开。

点击(此处)折叠或打开

  1. //if parallel init is not selected, use main thread to initialize NCCL
  2.   ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus);
  3. #if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
  4.   void **sendRegHandles = NULL;
  5.   void **recvRegHandles = NULL;
  6. #endif
  7.   if (!parallel_init) {
  8.      if (ncclProcs == 1) {
  9.        NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpus));
  10.      } else {
  11.        NCCLCHECK(ncclGroupStart());
  12.        for (int i=0; i<nGpus*nThreads; i++) {
  13.          CUDACHECK(cudaSetDevice(gpus[i]));
  14.          NCCLCHECK(ncclCommInitRank(comms+i, ncclProcs*nThreads*nGpus, ncclId, ncclProc*nThreads*nGpus+i));
  15.        }
  16.        NCCLCHECK(ncclGroupEnd());
  17.      }

通信器创建好了之后,调用ncclCommRegister,将buff注册到通信器中。用于zero-copy通信。每个GPU的通信器注册自己的GPU缓存中申请的buff。注册好了需要记录一个buff句柄,这个句柄用于后续解注册buff使用(ncclCommDeregister)expected buff不需要注册,可能是因为不需要同步这个buff

点击(此处)折叠或打开

  1. #if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
  2.      sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*nThreads*nGpus) : NULL;
  3.      recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*nThreads*nGpus) : NULL;
  4.      for (int i=0; i<nGpus*nThreads; i++) {
  5.        if (local_register) NCCLCHECK(ncclCommRegister(comms[i], sendbuffs[i], maxBytes, &sendRegHandles[i]));
  6.        if (local_register) NCCLCHECK(ncclCommRegister(comms[i], recvbuffs[i], maxBytes, &recvRegHandles[i]));
  7.      }
  8. #endif
  9.   }

然后,创建线程,并等待线程运行结束。每个线程调用每个具体的测试用例文件提供的runTest函数(ncclTestEngine.runTest)

点击(此处)折叠或打开

  1. int errors[nThreads];
  2.   double bw[nThreads];
  3.   double* delta;
  4.   CUDACHECK(cudaHostAlloc(&delta, sizeof(double)*nThreads*NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped));
  5.   int bw_count[nThreads];
  6.   for (int t=0; t<nThreads; t++) {
  7.     bw[t] = 0.0;
  8.     errors[t] = bw_count[t] = 0;
  9.   }

  10.   fflush(stdout);

  11.   const char* timeStr = report_cputime ? "cputime" : "time";
  12.   PRINT("#\n");
  13.   PRINT("# %10s %12s %8s %6s %6s out-of-place in-place \n", "", "", "", "", "");
  14.   PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "type", "redop", "root",
  15.       timeStr, "algbw", "busbw", "#wrong", timeStr, "algbw", "busbw", "#wrong");
  16.   PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
  17.       "(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");

  18.   struct testThread threads[nThreads];
  19.   memset(threads, 0, sizeof(struct testThread)*nThreads);

  20.   for (int t=nThreads-1; t>=0; t--) {
  21.     threads[t].args.minbytes=minBytes; //{BANNED}最佳小字节数minbytes
  22.     threads[t].args.maxbytes=maxBytes; //{BANNED}最佳大字节数maxbytes
  23.     threads[t].args.stepbytes=stepBytes; //递增的字节数stepbytes
  24.     threads[t].args.stepfactor=stepFactor; //递增倍数stepfactor(和递增的字节数二选1)
  25.     threads[t].args.localRank = localRank; //本进程在当前主机的localRank编号
  26.     threads[t].args.totalProcs=totalProcs; //总的进程数totalProcs
  27.     threads[t].args.nProcs=ncclProcs; //nccl子组的进程个数nProcs
  28.     threads[t].args.proc=ncclProc; //当前进程在nccl子组中的rank编号proc
  29.     threads[t].args.nThreads=nThreads; //当前进程的线程个数nThreads
  30.     threads[t].args.thread=t; //当前线程的编号thread
  31.     threads[t].args.nGpus=nGpus; //当前线程使用的gpu个数nGpus
  32.     threads[t].args.gpus=gpus+t*nGpus; //保存当前线程使用的gpu id的数组gpus[]
  33.     threads[t].args.sendbuffs = sendbuffs+t*nGpus;
  34.     threads[t].args.recvbuffs = recvbuffs+t*nGpus;
  35.     threads[t].args.expected = expected+t*nGpus;
  36.     threads[t].args.ncclId = ncclId; //当前nccl子组的ncclId(当前nccl子组的rank 0进程申请的id,并同步给其他进程)
  37.     threads[t].args.comms=comms+t*nGpus; //保存当前线程使用的nccl通信器的数组comms[]
  38.     threads[t].args.streams=streams+t*nGpus; //保存当前线程使用的streams的数组streams[]
  39.     //存储运行结果的统计信息指针:errors、bw、bw_count
  40.     threads[t].args.errors=errors+t;
  41.     threads[t].args.bw=bw+t;
  42.     threads[t].args.bw_count=bw_count+t;
  43.     //是否做做datacheck,即报告错误
  44.     threads[t].args.reportErrors = datacheck;

  45.     threads[t].func = parallel_init ? threadInit : threadRunTests;
  46.     if (t)
  47.       TESTCHECK(threadLaunch(threads+t));
  48.     else
  49.       TESTCHECK(threads[t].func(&threads[t].args));
  50.   }

每个线程调用每个具体的测试用例文件提供的runTest函数(ncclTestEngine.runTest),函数的包括上arg参数,指定nccl子组的root gpurank(配置参数-r,--root指定,默认为0),测试的数据的类型(配置参数-d,--datatype,默认为ncclFloat),及其类型名字(字符串),数据操作(配置参数-o,--op ,默认为ncclSum),及其操作名字(字符串)

测试函数调用关系

测试函数调用关系具体如下图所示:

all_reduce.cu

    我们以allreduce为例,其实现在all_reduce.cu文件中。这个用例主要提供如下两个函数,一个是获取测试用例需要的缓冲区的大小,一个是测试用例的运行函数。

点击(此处)折叠或打开

  1. struct testEngine allReduceEngine = {
  2.   AllReduceGetBuffSize,
  3.   AllReduceRunTest
  4. };

获取的缓冲区的大小跟传入的缓冲区{BANNED}最佳大值相等。Runtest函数{BANNED}最佳终调用的是TimeTest函数来运行。run_typesrun_ops有配置参数决定,可以各自指定为-1(参数为all),会遍历各种data typeop type的组合,来调用TimeTest函数。

点击(此处)折叠或打开

  1. testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
  2.   args->collTest = &allReduceTest;
  3.   ncclDataType_t *run_types;
  4.   ncclRedOp_t *run_ops;
  5.   const char **run_typenames, **run_opnames;
  6.   int type_count, op_count;

  7.   if ((int)type != -1) {
  8.     type_count = 1;
  9.     run_types = &type;
  10.     run_typenames = &typeName;
  11.   } else {
  12.     type_count = test_typenum;
  13.     run_types = test_types;
  14.     run_typenames = test_typenames;
  15.   }

  16.   if ((int)op != -1) {
  17.     op_count = 1;
  18.     run_ops = &op;
  19.     run_opnames = &opName;
  20.   } else {
  21.     op_count = test_opnum;
  22.     run_ops = test_ops;
  23.     run_opnames = test_opnames;
  24.   }

  25.   for (int i=0; i<type_count; i++) {
  26.     for (int j=0; j<op_count; j++) {
  27.       TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], -1));
  28.     }
  29.   }
  30.   return testSuccess;
  31. }

run_types表示数据类型(ncclDataType),在common.cu中定义有如下ncclDataType

点击(此处)折叠或打开

  1. ncclDataType_t test_types[ncclNumTypes] = {ncclChar, ncclInt, ncclHalf, ncclFloat, ncclDouble, ncclInt64, ncclUint64};

run_ops表示操作类型(ncclRedOp),在common.cu中定义有如下ncclRedOp

点击(此处)折叠或打开

  1. ncclRedOp_t test_ops[] = {ncclSum, ncclProd, ncclMax, ncclMin};

{BANNED}最佳终根据不同ncclDataTypencclRedOp调用TimeTest函数。此外调用TimeTest的传参还包括args参数,args除了包括之前的各种参数外,还包括一个collTest回调,对应testColl结构定义如下:

点击(此处)折叠或打开

  1. struct testColl {
  2.   const char name[20];
  3.   void (*getCollByteCount)(
  4.       size_t *sendcount, size_t *recvcount, size_t *paramcount,
  5.       size_t *sendInplaceOffset, size_t *recvInplaceOffset,
  6.       size_t count, int nranks);
  7.   testResult_t (*initData)(struct threadArgs* args, ncclDataType_t type,
  8.       ncclRedOp_t op, int root, int rep, int in_place);
  9.   void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
  10.   testResult_t (*runColl)(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type,
  11.       ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream);
  12. };

每个测试用例都有自己的testColl实现,all_reduce.cu实现如下:

点击(此处)折叠或打开

  1. struct testColl allReduceTest = {
  2.   "AllReduce",
  3.   AllReduceGetCollByteCount,
  4.   AllReduceInitData,
  5.   AllReduceGetBw,
  6.   AllReduceRunColl
  7. };

   接下来我们重点看TimeTest的执行过程。

1.  先是调用 Barrier(args),同步所有线程。该函数的作用为让所有线程在该函数中等待,然后所有线程执行一遍test。之后下一轮执行test,还要回到该函数中等待,等待所有线程都执行完上一轮的test,然后才执行新一轮的test

2.  分别执行几次{BANNED}最佳大数据量的测试操作和执行几次{BANNED}最佳小数据量的测试操作。具体执行次数由-w,--warmup_iters配置参数决定,默认值为5。不需要check结果。其作用为warm-up设备。

3.  从{BANNED}最佳小数据到{BANNED}最佳大数据,按照配置的步长间隔(两个间隔方式,一个是按照固定大小递增,一个是从{BANNED}最佳小值开始,按照固定的倍数递增),依次测试每个大小的数据。默认测试一轮,可以通过配置参数"-N,--run_cycles "指定测试多少轮。0表示无限次。每一个大小数据的测试会测试两种情况,一个是in_place、另外一个不是in_placein_place表示发送数据和接收数据是同一个缓冲区。具体的测试是调用BenchTime函数实现。

点击(此处)折叠或打开

  1. testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root) {
  2.   // Sync to avoid first-call timeout
  3.   Barrier(args);

  4.   // Warm-up for large size
  5.   setupArgs(args->maxbytes, type, args);
  6.   for (int iter = 0; iter < warmup_iters; iter++) {
  7.     TESTCHECK(startColl(args, type, op, root, 0, iter));
  8.   }
  9.   TESTCHECK(completeColl(args));

  10.   // Warm-up for small size
  11.   setupArgs(args->minbytes, type, args);
  12.   for (int iter = 0; iter < warmup_iters; iter++) {
  13.     TESTCHECK(startColl(args, type, op, root, 0, iter));
  14.   }
  15.   TESTCHECK(completeColl(args));

  16.   // Benchmark
  17.   long repeat = run_cycles;
  18.   do {
  19.     for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) {
  20.       setupArgs(size, type, args);
  21.       char rootName[100];
  22.       sprintf(rootName, "%6i", root);
  23.       PRINT("%12li %12li %8s %6s %6s", max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, rootName);
  24.       TESTCHECK(BenchTime(args, type, op, root, 0));
  25.       TESTCHECK(BenchTime(args, type, op, root, 1));
  26.       PRINT("\n");
  27.     }
  28.   } while (--repeat);

  29.   return testSuccess;
  30. }

下面看BenchTime函数的实现

点击(此处)折叠或打开

  1. testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) {
  2.   size_t count = args->nbytes / wordSize(type);
  3.   //如果指定要做datacheck,则调用每个用例自己的initData函数,构造senddata和expect data
  4.   if (datacheck) {
  5.     // Initialize sendbuffs, recvbuffs and expected
  6.     TESTCHECK(args->collTest->initData(args, type, op, root, 99, in_place));
  7.   }

  8.   // Sync
  9.   //执行一次测试操作,但是并不记录测试数据(消耗时间、带宽等操作,检查数据正确性)
  10.   TESTCHECK(startColl(args, type, op, root, in_place, 0));
  11.   TESTCHECK(completeColl(args));

  12.   Barrier(args);

开始正式的性能测试时,需要注意两个配置参数:

-n,--iters ,迭代次数,默认为20。即执行多少组nccl操作,作为一次测试操作。并取平均值作为测试结果(单次测试结果可能会不准确);

-m,--agg_iters ,每次迭代中要聚合在一起的操作次数。默认值为:1。每一次迭代中,要再执行几次聚合操作(其实和普通nccl操作一样);

上面两个参数的乘积,就是具体执行nccl操作的次数。但是每次聚合迭代的操作需要额外进行nccl的同步一下。每次测试操作,会根据迭代次数(迭代次数和聚合次数的乘积),选择一个gpubuff地址进行操作。gpu一开始按照{BANNED}最佳大数据量,申请了一块{BANNED}最佳大的buff。而正常测试的数据大小,是从大到小开始测试的,因此一开始的数据量可能比较小。

再每一次迭代测试的时候,期望使用的gpu buff地址尽量不一样。因此底层的测试执行函数(startColl),会根据当前的迭代次数,计算一个当前数据在buff中的位置。具体做法是将申请的gpu的{BANNED}最佳大buff按照当前测试数据大小,切分成多个块,从第0块开始,每次迭代测试依次往后选择一个buff块。跳到{BANNED}最佳后一个块后,会循环从头开始选择。

点击(此处)折叠或打开

  1. // Performance Benchmark:开始正式的性能测试
  2.   timer tim;
  3.   for (int iter = 0; iter < iters; iter++) {
  4.     if (agg_iters>1) NCCLCHECK(ncclGroupStart());
  5.     for (int aiter = 0; aiter < agg_iters; aiter++) {
  6.       TESTCHECK(startColl(args, type, op, root, in_place, iter*agg_iters+aiter));
  7.     }
  8.     if (agg_iters>1) NCCLCHECK(ncclGroupEnd());
  9.   }

在进行正式的性能测试之前,会记录下当前时间,测试结束之后,也会记录下当前时间,这里会计算几种耗时:

l  当前线程执行完后的耗时,即CPU执行时间。

l  GPUstream执行完成时的耗时。CPU处理+GPU处理的耗时。再求出单个测试操作的平均耗时。

然后通过startColl调用args->collTest->runColl,进而调研各个测试用例的自定义测试函数。而all_reduce定义的runColl函数就是ncclncclAllReduce函数,这个函数后续在NCCL源码分析中再写。

之后会对所有线程的执行时间进行一个汇总。汇总方式由配置参数执行:-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>,默认值为1,即取平均值。0表示,需要rank 0的统计数据,本意上是要0号进程的0号线程的耗时,但是实际上目前都是使用本进程的0号线程的耗时。其他几个都比较好理解,分别为取所有线程(所有进程的所有线程)的平均值、{BANNED}最佳小值、{BANNED}最佳大值。获取统计值的具体实现函数为:void Allreduce(struct threadArgs* args, T* value, int average)。其实现方法为:先是利用线程同步,统计本进程中各个线程的汇总数据,作为进程的数据;然后利用MPI allreduce操作,汇总各个进程的数据。

    接下来,根据计算的平均时间,计算带宽。这个计算方式由具体的测试用例提供。有两种带宽。

l  一种是算法带宽algBw。根据当前测试的数据大小/所有GPU执行结束的平均时间得到。单位为G

l  另外一个是总线带宽busBw。即当前测试的算法,实际在总线上传输的平均数据量(每张卡在总线上实际传输的数据量),单位为G

点击(此处)折叠或打开

  1. double algBw, busBw;
  2.   args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus);

  3.   Barrier(args);

  4.   int64_t wrongElts = 0;
  5.   static __thread int rep = 0;
  6.   rep++;
  7.   for (int c = 0; c < datacheck; c++) {
  8.       // Initialize sendbuffs, recvbuffs and expected
  9.       TESTCHECK(args->collTest->initData(args, type, op, root, rep, in_place));
  10.       //test validation in single itertion, should ideally be included into the multi-iteration run
  11.       TESTCHECK(startColl(args, type, op, root, in_place, 0));

  12.       TESTCHECK(completeColl(args));

接下来,测试操作数据的正确性,这里不会测试性能,只测试正确性。测试的次数由配置参数”-c,--check 决定,默认只测试一次。

点击(此处)折叠或打开

  1. TESTCHECK(CheckData(args, type, op, root, in_place, &wrongElts));

  2.       //aggregate delta from all threads and procs
  3.       long long wrongElts1 = wrongElts;
  4.       //if (wrongElts) fprintf(stderr, "\nERROR: Data corruption : rank %d size %ld wrongElts %ld\n", args->proc, args->expectedBytes, wrongElts);
  5.       Allreduce(args, &wrongElts1, /*sum*/4);
  6.       wrongElts = wrongElts1;
  7.       if (wrongElts) break;

{BANNED}最佳后,打印测试结果。具体显示的结果信息有:

l  time,根据配置的参数”-C,--report_cputime <0/1>“来决定显示的是CPU的执行时间还是GPU的执行时间,默认是要显示GPU的执行时间

l  algBw,算法带宽

l  busBw,总线带宽

l  wrongdatacheck错误的数量

如果NCCL版本为2.9及以上,且cuda版本为11.3及以上。那么BenchTime在每次进行测试操作的时候,会根据配置参数”-G,--cudagraph 来决定是否使用cudaGraph来加速测试,以及重复该加速测试的次数,默认是不启用的。CUDA Graph的作用主要用于快速执行一组固定的动作,减少cpu显示的调用这一组动作的开销。适用于重复执行一组固定动作的场景。具体介绍参考。在BenchTime函数中,如果启用了cudagraph,那么会使用cudagraph捕捉之前性能测试的动作,然后再快速启动之前性能测试的动作(多次的迭代测试)。因为启动了cudagraph capture,因此之前的显示调用的操作不会执行,因此不会统计之前的性能测试结果,而是统计用cudagraph快速启动的新的测试的性能。

点击(此处)折叠或打开

  1. double timeUsec = (report_cputime ? cputimeSec : deltaSec)*1.0E6;
  2.   char timeStr[100];
  3.   if (timeUsec >= 10000.0) {
  4.     sprintf(timeStr, "%7.0f", timeUsec);
  5.   } else if (timeUsec >= 100.0) {
  6.     sprintf(timeStr, "%7.1f", timeUsec);
  7.   } else {
  8.     sprintf(timeStr, "%7.2f", timeUsec);
  9.   }
  10.   if (args->reportErrors) {
  11.     PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
  12.   } else {
  13.     PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
  14.   }

  15.   args->bw[0] += busBw;
  16.   args->bw_count[0]++;
  17.   return testSuccess;
  18. }

阅读(61) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~