NCCL概述和NCCL-Test分析
NCCL(NVIDIA Collective Communications Library)是一种由NVIDIA开发的高性能通信库,专门用于加速多GPU系统中的并行计算工作负载。NCCL旨在优化多GPU之间的数据传输和通信,以实现更快的并行计算速度。
NCCL具备以下通信模式:
l 点对点通信:允许两个特定的GPU之间直接交换数据。
l 群集通信:NCCL还支持集体通信,这些操作涉及多个GPU之间的数据交换
NCCL还具有的其他功能
l 拓扑感知通信:识别多GPU系统的拓扑结构,并优化通信以{BANNED}最佳大程度地减少通信延迟和带宽消耗。
l 异步通信:允许计算操作与通信操作并行执行,从而提高了系统的效率。
那么也就是说我们可以通过一下几个方向来了解什么是NCCL
代码结构
NCCL仓库:NCCL [Github]: git clone
后续我们选择v2.22.3-1这个tag的代码进行源码分析。由于NCCL是一个通信库,类似DPDK一样,不是一个服务或进程,所以它不存在一个main函数。那么我们如何开始着手分析呢?这就不得不提另一个重要的工具nccl-test。
nccl-test
nccl-test 工具是 nvidia 开源的一项用于测试 NCCL 集合通信的工具。可以用于检测集合通信是否正常、压测集合通信速率。官方开源地址:。而运行nccl-test又通常需要借助mpirun命令。
mpirun
mpirun是一个用于启动和管理 MPI(消息传递接口)程序的实用程序。它允许您在单个节点或多个节点上并行运行程序。如下所示,我们有两台机器,使用mpirun在两台机器上分别运行一个pwd命令。
-
-> # cat hostfile
-
22.22.22.175
-
22.22.22.121
-
-
-> # /usr/local/openmpi/bin/mpirun --allow-run-as-root -np 2 -hostfile hostfile pwd
-
/root
-
/root
了解上述mpirun的作用,就很容易理解一般测试nccl-test的命令了,一个典型的例子如下所示:
-
# 2台机器,16 张 GPU卡,执行 all_reduce_perf 测试
-
mpirun -np 16 \
-
-H 172.16.2.8:8,172.16.2.13:8 \
-
--allow-run-as-root -bind-to none -map-by slot \
-
-x NCCL_DEBUG=INFO \
-
-x NCCL_IB_GID_INDEX=3 \
-
-x NCCL_IB_DISABLE=0 \
-
-x NCCL_SOCKET_IFNAME=eth0 \
-
-x NCCL_NET_GDR_LEVEL=2 \
-
-x NCCL_IB_QPS_PER_CONNECTION=4 \
-
-x NCCL_IB_TC=160 \
-
-x LD_LIBRARY_PATH -x PATH \
-
-mca coll_hcoll_enable 0 -mca pml ob1 -mca btl_tcp_if_include eth0 -mca btl ^openib \
-
all_reduce_perf -b 32M -e 1G -i 1000 -f 2 -g 1
可以看出,这个命令是分别在172.16.2.8,172.16.2.13两个host上,各启动8个(IP后面的冒号指定每个host上的进程数)all_reduce_perf进程。当然mpirun和nccl-test分别有很多属于自己的参数,这个网上有很多说明,这里就不再赘述了。
nccl test源码分析
下面我们简单分析一下nccl test这个测试框架。nccl test有几个测试文件,如下图
每一个测试文件会单独生成一个测试程序,并以"_perf"为后缀生成可执行的文件。每个测试源码文件会和两个公共文件common.cu和timer.cc,以及nccl、cuda、mpi这三个lib一起编译生成可执行的文件。
common.cu
common.cu为公共文件,提供测试测试程序框架,包括main函数、参数解析,并调用每个测试文件提供的公共调用接口,以初始化测试程序和运行测试程序。
下面对run函数的代码进行分析,run函数开始部分,首先获取MPI相关信息,具体信息有:总进程数,当前进程的rank编号,当前主机上所有的进程个数。如果对进程划分了子组(配置了NCCL_TESTS_SPLIT_MASK环境变量),则创建当前进程所在子组的mpi通信器。获取当前子组中的进程总数,获取当前进程在子组中的rank编号环境变量,则默认所有进程在同一个子组中。
-
testResult_t run() {
-
int totalProcs = 1, proc = 0, ncclProcs = 1, ncclProc = 0, color = 0;
-
int localRank = 0;
-
char hostname[1024];
-
getHostName(hostname, 1024);
-
-
#ifdef MPI_SUPPORT
-
MPI_Comm_size(MPI_COMM_WORLD, &totalProcs);
-
MPI_Comm_rank(MPI_COMM_WORLD, &proc);
-
uint64_t hostHashs[totalProcs];
-
hostHashs[proc] = getHostHash(hostname);
-
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD);
-
for (int p=0; p<totalProcs; p++) {
-
if (p == proc) break;
-
if (hostHashs[p] == hostHashs[proc]) localRank++;
-
}
-
-
char* str = getenv("NCCL_TESTS_SPLIT_MASK");
-
uint64_t mask = str ? strtoul(str, NULL, 16) : 0;
-
MPI_Comm mpi_comm;
-
color = proc & mask;
-
MPI_Comm_split(MPI_COMM_WORLD, color, proc, &mpi_comm);
-
MPI_Comm_size(mpi_comm, &ncclProcs);
-
MPI_Comm_rank(mpi_comm, &ncclProc);
-
#endif
-
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
-
-
PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
-
nThreads, nGpus, minBytes, maxBytes,
-
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
-
warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
-
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
-
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
-
PRINT("#\n");
-
-
PRINT("# Using devices\n");
-
#define MAX_LINE 2048
紧接着获取{BANNED}最佳大可用的gpu缓存大小(从众多要使用的GPU中,选取缓存{BANNED}最佳小的GPU的缓存值)
l 获取当前进程使用的GPU的{BANNED}最佳大内存
n 一个进程可以有n个线程,每个线程可以使用n个GPU
n 选取缓存{BANNED}最佳小的GPU的缓存值,作为一个maxMem
n 需要确定当前进程使用了哪些GPU,才能获取maxMem信息
n 进程分配GPU的规则:
u 一个主机可能有多个进程,每个进程可能使用多个GPU
u cuda通常情况下,对于主机上可用的GPU已经从0开始做了编号
u 默认情况下,按照进程的rank编号从小到大来分配在GPU
u 默认情况下,分配的GPU的编号是从0开始分配
n 收集所有进程的{BANNED}最佳大可用GPU maxMem信息,并打印
n 使用MPI_Allreduce获取各个进程的maxMem,并使用{BANNED}最佳小的maxMem作为{BANNED}最佳终的maxMem
-
char line[MAX_LINE];
-
int len = 0;
-
size_t maxMem = ~0;
-
char* envstr = getenv("NCCL_TESTS_DEVICE");
-
int gpu0 = envstr ? atoi(envstr) : -1;
-
for (int i=0; i<nThreads*nGpus; i++) {
-
int cudaDev = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
-
int rank = proc*nThreads*nGpus+i;
-
cudaDeviceProp prop;
-
CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev));
-
len += snprintf(line+len, MAX_LINE-len, "# Rank %2d Group %2d Pid %6d on %10s device %2d [0x%02x] %s\n",
-
rank, color, getpid(), hostname, cudaDev, prop.pciBusID, prop.name);
-
maxMem = std::min(maxMem, prop.totalGlobalMem);
-
}
-
-
#if MPI_SUPPORT
-
char *lines = (proc == 0) ? (char *)malloc(totalProcs*MAX_LINE) : NULL;
-
// Gather all output in rank order to root (0)
-
MPI_Gather(line, MAX_LINE, MPI_BYTE, lines, MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
-
if (proc == 0) {
-
for (int p = 0; p < totalProcs; p++)
-
PRINT("%s", lines+MAX_LINE*p);
-
free(lines);
-
}
-
MPI_Allreduce(MPI_IN_PLACE, &maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
-
#else
-
PRINT("%s", line);
-
#endif
然后根据maxMem获取在gpu上申请的buff的{BANNED}最佳大字节数,并使用该值,来修正配置的maxBytes:将maxMem预留1G,剩下的根据需要切成2等份或者3等份(send buff和recv buff必包含,还有可能包含一个expected buff(需要参数指定使能了datacheck))作为申请的buff{BANNED}最佳大值。使用该值来修正maxBytes,即如果配置的maxBytes大于{BANNED}最佳大可设置的buff值,则maxBytes设置为{BANNED}最佳大可设置的buff值。子组中的rank 0的进程,获取一个ncclUniqueId,并使用MPI_Bcast同步给子组的其他进程。
-
// We need sendbuff, recvbuff, expected (when datacheck enabled), plus 1G for the rest.
-
size_t memMaxBytes = (maxMem - (1<<30)) / (datacheck ? 3 : 2);
-
if (maxBytes > memMaxBytes) {
-
maxBytes = memMaxBytes;
-
if (proc == 0) printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
-
}
-
-
ncclUniqueId ncclId;
-
if (ncclProc == 0) {
-
NCCLCHECK(ncclGetUniqueId(&ncclId));
-
}
-
#ifdef MPI_SUPPORT
-
MPI_Bcast(&ncclId, sizeof(ncclId), MPI_BYTE, 0, mpi_comm);
-
MPI_Barrier(MPI_COMM_WORLD); // Ensure Bcast is complete for HCOLL
-
#endif
然后调用各个测试用例定义的getBuffSize函数,获取sendBytes和recvBytes大小,通常是根据maxBytes来设置。每个测试用例都要定义一个struct testEngine ncclTestEngine全局变量,ncclTestEngine变量中有两个函数指针:getBuffSize和runTest,每个测试用例都要实现这两个函数,以测试自己用例的目标。
-
int gpus[nGpus*nThreads];
-
cudaStream_t streams[nGpus*nThreads];
-
void* sendbuffs[nGpus*nThreads];
-
void* recvbuffs[nGpus*nThreads];
-
void* expected[nGpus*nThreads];
-
size_t sendBytes, recvBytes;
-
-
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)ncclProcs*nGpus*nThreads);
接着在当前进程使用的每个GPU上,都申请maxBytes大小的sendbuff、recvbuff和expected buff(如果指定要做datacheck),并根据需要,创建一个cudaStream。申请的buff指针记录在一个数组中。
-
envstr = getenv("NCCL_TESTS_DEVICE");
-
gpu0 = envstr ? atoi(envstr) : -1;
-
for (int i=0; i<nGpus*nThreads; i++) {
-
gpus[i] = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
-
CUDACHECK(cudaSetDevice(gpus[i]));
-
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
-
if (streamnull)
-
streams[i] = NULL;
-
else
-
CUDACHECK(cudaStreamCreateWithFlags(streams+i, cudaStreamNonBlocking));
-
}
初始化nccl通讯器ncclComm_t comms,每个GPU需要一个。可以在主线程中初始化所有的线程的GPU的comms,也可以每个线程运行时,线程初始化自己使用的GPU的通讯器(指定参数:-p,--parallel_init <0/1>),这样速度更快一些。如果当前nccl子组中只有一个进程。则直接调用ncclCommInitAll来初始化comms。ncclCommInitAll是nccl提供的用于创建单个进程的一组通信器。如果nccl子组有多个进程,则自己依次对每个GPU调用ncclCommInitRank,初始化通信器。关于通讯器ncclComm_t的创建在后续NCCL代码分析中再进行展开。
-
//if parallel init is not selected, use main thread to initialize NCCL
-
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus);
-
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
-
void **sendRegHandles = NULL;
-
void **recvRegHandles = NULL;
-
#endif
-
if (!parallel_init) {
-
if (ncclProcs == 1) {
-
NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpus));
-
} else {
-
NCCLCHECK(ncclGroupStart());
-
for (int i=0; i<nGpus*nThreads; i++) {
-
CUDACHECK(cudaSetDevice(gpus[i]));
-
NCCLCHECK(ncclCommInitRank(comms+i, ncclProcs*nThreads*nGpus, ncclId, ncclProc*nThreads*nGpus+i));
-
}
-
NCCLCHECK(ncclGroupEnd());
-
}
通信器创建好了之后,调用ncclCommRegister,将buff注册到通信器中。用于zero-copy通信。每个GPU的通信器注册自己的GPU缓存中申请的buff。注册好了需要记录一个buff句柄,这个句柄用于后续解注册buff使用(ncclCommDeregister)。expected buff不需要注册,可能是因为不需要同步这个buff。
-
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
-
sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*nThreads*nGpus) : NULL;
-
recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*nThreads*nGpus) : NULL;
-
for (int i=0; i<nGpus*nThreads; i++) {
-
if (local_register) NCCLCHECK(ncclCommRegister(comms[i], sendbuffs[i], maxBytes, &sendRegHandles[i]));
-
if (local_register) NCCLCHECK(ncclCommRegister(comms[i], recvbuffs[i], maxBytes, &recvRegHandles[i]));
-
}
-
#endif
-
}
然后,创建线程,并等待线程运行结束。每个线程调用每个具体的测试用例文件提供的runTest函数(ncclTestEngine.runTest)
-
int errors[nThreads];
-
double bw[nThreads];
-
double* delta;
-
CUDACHECK(cudaHostAlloc(&delta, sizeof(double)*nThreads*NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped));
-
int bw_count[nThreads];
-
for (int t=0; t<nThreads; t++) {
-
bw[t] = 0.0;
-
errors[t] = bw_count[t] = 0;
-
}
-
-
fflush(stdout);
-
-
const char* timeStr = report_cputime ? "cputime" : "time";
-
PRINT("#\n");
-
PRINT("# %10s %12s %8s %6s %6s out-of-place in-place \n", "", "", "", "", "");
-
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "type", "redop", "root",
-
timeStr, "algbw", "busbw", "#wrong", timeStr, "algbw", "busbw", "#wrong");
-
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
-
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
-
-
struct testThread threads[nThreads];
-
memset(threads, 0, sizeof(struct testThread)*nThreads);
-
-
for (int t=nThreads-1; t>=0; t--) {
-
threads[t].args.minbytes=minBytes; //{BANNED}最佳小字节数minbytes
-
threads[t].args.maxbytes=maxBytes; //{BANNED}最佳大字节数maxbytes
-
threads[t].args.stepbytes=stepBytes; //递增的字节数stepbytes
-
threads[t].args.stepfactor=stepFactor; //递增倍数stepfactor(和递增的字节数二选1)
-
threads[t].args.localRank = localRank; //本进程在当前主机的localRank编号
-
threads[t].args.totalProcs=totalProcs; //总的进程数totalProcs
-
threads[t].args.nProcs=ncclProcs; //nccl子组的进程个数nProcs
-
threads[t].args.proc=ncclProc; //当前进程在nccl子组中的rank编号proc
-
threads[t].args.nThreads=nThreads; //当前进程的线程个数nThreads
-
threads[t].args.thread=t; //当前线程的编号thread
-
threads[t].args.nGpus=nGpus; //当前线程使用的gpu个数nGpus
-
threads[t].args.gpus=gpus+t*nGpus; //保存当前线程使用的gpu id的数组gpus[]
-
threads[t].args.sendbuffs = sendbuffs+t*nGpus;
-
threads[t].args.recvbuffs = recvbuffs+t*nGpus;
-
threads[t].args.expected = expected+t*nGpus;
-
threads[t].args.ncclId = ncclId; //当前nccl子组的ncclId(当前nccl子组的rank 0进程申请的id,并同步给其他进程)
-
threads[t].args.comms=comms+t*nGpus; //保存当前线程使用的nccl通信器的数组comms[]
-
threads[t].args.streams=streams+t*nGpus; //保存当前线程使用的streams的数组streams[]
-
//存储运行结果的统计信息指针:errors、bw、bw_count
-
threads[t].args.errors=errors+t;
-
threads[t].args.bw=bw+t;
-
threads[t].args.bw_count=bw_count+t;
-
//是否做做datacheck,即报告错误
-
threads[t].args.reportErrors = datacheck;
-
-
threads[t].func = parallel_init ? threadInit : threadRunTests;
-
if (t)
-
TESTCHECK(threadLaunch(threads+t));
-
else
-
TESTCHECK(threads[t].func(&threads[t].args));
-
}
每个线程调用每个具体的测试用例文件提供的runTest函数(ncclTestEngine.runTest),函数的包括上arg参数,指定nccl子组的root gpu的rank号(配置参数-r,--root指定,默认为0),测试的数据的类型(配置参数-d,--datatype,默认为ncclFloat),及其类型名字(字符串),数据操作(配置参数-o,--op ,默认为ncclSum),及其操作名字(字符串)。
测试函数调用关系
测试函数调用关系具体如下图所示:
all_reduce.cu
我们以allreduce为例,其实现在all_reduce.cu文件中。这个用例主要提供如下两个函数,一个是获取测试用例需要的缓冲区的大小,一个是测试用例的运行函数。
-
struct testEngine allReduceEngine = {
-
AllReduceGetBuffSize,
-
AllReduceRunTest
-
};
获取的缓冲区的大小跟传入的缓冲区{BANNED}最佳大值相等。Runtest函数{BANNED}最佳终调用的是TimeTest函数来运行。run_types和run_ops有配置参数决定,可以各自指定为-1(参数为all),会遍历各种data type和op type的组合,来调用TimeTest函数。
-
testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName) {
-
args->collTest = &allReduceTest;
-
ncclDataType_t *run_types;
-
ncclRedOp_t *run_ops;
-
const char **run_typenames, **run_opnames;
-
int type_count, op_count;
-
-
if ((int)type != -1) {
-
type_count = 1;
-
run_types = &type;
-
run_typenames = &typeName;
-
} else {
-
type_count = test_typenum;
-
run_types = test_types;
-
run_typenames = test_typenames;
-
}
-
-
if ((int)op != -1) {
-
op_count = 1;
-
run_ops = &op;
-
run_opnames = &opName;
-
} else {
-
op_count = test_opnum;
-
run_ops = test_ops;
-
run_opnames = test_opnames;
-
}
-
-
for (int i=0; i<type_count; i++) {
-
for (int j=0; j<op_count; j++) {
-
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], -1));
-
}
-
}
-
return testSuccess;
-
}
run_types表示数据类型(ncclDataType),在common.cu中定义有如下ncclDataType:
-
ncclDataType_t test_types[ncclNumTypes] = {ncclChar, ncclInt, ncclHalf, ncclFloat, ncclDouble, ncclInt64, ncclUint64};
run_ops表示操作类型(ncclRedOp),在common.cu中定义有如下ncclRedOp:
-
ncclRedOp_t test_ops[] = {ncclSum, ncclProd, ncclMax, ncclMin};
{BANNED}最佳终根据不同ncclDataType和ncclRedOp调用TimeTest函数。此外调用TimeTest的传参还包括args参数,args除了包括之前的各种参数外,还包括一个collTest回调,对应testColl结构定义如下:
-
struct testColl {
-
const char name[20];
-
void (*getCollByteCount)(
-
size_t *sendcount, size_t *recvcount, size_t *paramcount,
-
size_t *sendInplaceOffset, size_t *recvInplaceOffset,
-
size_t count, int nranks);
-
testResult_t (*initData)(struct threadArgs* args, ncclDataType_t type,
-
ncclRedOp_t op, int root, int rep, int in_place);
-
void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
-
testResult_t (*runColl)(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type,
-
ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream);
-
};
每个测试用例都有自己的testColl实现,all_reduce.cu实现如下:
-
struct testColl allReduceTest = {
-
"AllReduce",
-
AllReduceGetCollByteCount,
-
AllReduceInitData,
-
AllReduceGetBw,
-
AllReduceRunColl
-
};
接下来我们重点看TimeTest的执行过程。
1. 先是调用 Barrier(args),同步所有线程。该函数的作用为让所有线程在该函数中等待,然后所有线程执行一遍test。之后下一轮执行test,还要回到该函数中等待,等待所有线程都执行完上一轮的test,然后才执行新一轮的test。
2. 分别执行几次{BANNED}最佳大数据量的测试操作和执行几次{BANNED}最佳小数据量的测试操作。具体执行次数由-w,--warmup_iters配置参数决定,默认值为5。不需要check结果。其作用为warm-up设备。
3. 从{BANNED}最佳小数据到{BANNED}最佳大数据,按照配置的步长间隔(两个间隔方式,一个是按照固定大小递增,一个是从{BANNED}最佳小值开始,按照固定的倍数递增),依次测试每个大小的数据。默认测试一轮,可以通过配置参数"-N,--run_cycles "指定测试多少轮。0表示无限次。每一个大小数据的测试会测试两种情况,一个是in_place、另外一个不是in_place。in_place表示发送数据和接收数据是同一个缓冲区。具体的测试是调用BenchTime函数实现。
-
testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root) {
-
// Sync to avoid first-call timeout
-
Barrier(args);
-
-
// Warm-up for large size
-
setupArgs(args->maxbytes, type, args);
-
for (int iter = 0; iter < warmup_iters; iter++) {
-
TESTCHECK(startColl(args, type, op, root, 0, iter));
-
}
-
TESTCHECK(completeColl(args));
-
-
// Warm-up for small size
-
setupArgs(args->minbytes, type, args);
-
for (int iter = 0; iter < warmup_iters; iter++) {
-
TESTCHECK(startColl(args, type, op, root, 0, iter));
-
}
-
TESTCHECK(completeColl(args));
-
-
// Benchmark
-
long repeat = run_cycles;
-
do {
-
for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) {
-
setupArgs(size, type, args);
-
char rootName[100];
-
sprintf(rootName, "%6i", root);
-
PRINT("%12li %12li %8s %6s %6s", max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, rootName);
-
TESTCHECK(BenchTime(args, type, op, root, 0));
-
TESTCHECK(BenchTime(args, type, op, root, 1));
-
PRINT("\n");
-
}
-
} while (--repeat);
-
-
return testSuccess;
-
}
下面看BenchTime函数的实现
-
testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place) {
-
size_t count = args->nbytes / wordSize(type);
-
//如果指定要做datacheck,则调用每个用例自己的initData函数,构造senddata和expect data
-
if (datacheck) {
-
// Initialize sendbuffs, recvbuffs and expected
-
TESTCHECK(args->collTest->initData(args, type, op, root, 99, in_place));
-
}
-
-
// Sync
-
//执行一次测试操作,但是并不记录测试数据(消耗时间、带宽等操作,检查数据正确性)
-
TESTCHECK(startColl(args, type, op, root, in_place, 0));
-
TESTCHECK(completeColl(args));
-
-
Barrier(args);
开始正式的性能测试时,需要注意两个配置参数:
-n,--iters ,迭代次数,默认为20。即执行多少组nccl操作,作为一次测试操作。并取平均值作为测试结果(单次测试结果可能会不准确);
-m,--agg_iters ,每次迭代中要聚合在一起的操作次数。默认值为:1。每一次迭代中,要再执行几次聚合操作(其实和普通nccl操作一样);
上面两个参数的乘积,就是具体执行nccl操作的次数。但是每次聚合迭代的操作需要额外进行nccl的同步一下。每次测试操作,会根据迭代次数(迭代次数和聚合次数的乘积),选择一个gpu的buff地址进行操作。gpu一开始按照{BANNED}最佳大数据量,申请了一块{BANNED}最佳大的buff。而正常测试的数据大小,是从大到小开始测试的,因此一开始的数据量可能比较小。
再每一次迭代测试的时候,期望使用的gpu buff地址尽量不一样。因此底层的测试执行函数(startColl),会根据当前的迭代次数,计算一个当前数据在buff中的位置。具体做法是将申请的gpu的{BANNED}最佳大buff按照当前测试数据大小,切分成多个块,从第0块开始,每次迭代测试依次往后选择一个buff块。跳到{BANNED}最佳后一个块后,会循环从头开始选择。
-
// Performance Benchmark:开始正式的性能测试
-
timer tim;
-
for (int iter = 0; iter < iters; iter++) {
-
if (agg_iters>1) NCCLCHECK(ncclGroupStart());
-
for (int aiter = 0; aiter < agg_iters; aiter++) {
-
TESTCHECK(startColl(args, type, op, root, in_place, iter*agg_iters+aiter));
-
}
-
if (agg_iters>1) NCCLCHECK(ncclGroupEnd());
-
}
在进行正式的性能测试之前,会记录下当前时间,测试结束之后,也会记录下当前时间,这里会计算几种耗时:
l 当前线程执行完后的耗时,即CPU执行时间。
l GPU的stream执行完成时的耗时。CPU处理+GPU处理的耗时。再求出单个测试操作的平均耗时。
然后通过startColl调用args->collTest->runColl,进而调研各个测试用例的自定义测试函数。而all_reduce定义的runColl函数就是nccl的ncclAllReduce函数,这个函数后续在NCCL源码分析中再写。
之后会对所有线程的执行时间进行一个汇总。汇总方式由配置参数执行:-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>,默认值为1,即取平均值。0表示,需要rank 0的统计数据,本意上是要0号进程的0号线程的耗时,但是实际上目前都是使用本进程的0号线程的耗时。其他几个都比较好理解,分别为取所有线程(所有进程的所有线程)的平均值、{BANNED}最佳小值、{BANNED}最佳大值。获取统计值的具体实现函数为:void Allreduce(struct threadArgs* args, T* value, int average)。其实现方法为:先是利用线程同步,统计本进程中各个线程的汇总数据,作为进程的数据;然后利用MPI allreduce操作,汇总各个进程的数据。
接下来,根据计算的平均时间,计算带宽。这个计算方式由具体的测试用例提供。有两种带宽。
l 一种是算法带宽algBw。根据”当前测试的数据大小/所有GPU执行结束的平均时间“得到。单位为G。
l 另外一个是总线带宽busBw。即当前测试的算法,实际在总线上传输的平均数据量(每张卡在总线上实际传输的数据量),单位为G。
-
double algBw, busBw;
-
args->collTest->getBw(count, wordSize(type), deltaSec, &algBw, &busBw, args->nProcs*args->nThreads*args->nGpus);
-
-
Barrier(args);
-
-
int64_t wrongElts = 0;
-
static __thread int rep = 0;
-
rep++;
-
for (int c = 0; c < datacheck; c++) {
-
// Initialize sendbuffs, recvbuffs and expected
-
TESTCHECK(args->collTest->initData(args, type, op, root, rep, in_place));
-
//test validation in single itertion, should ideally be included into the multi-iteration run
-
TESTCHECK(startColl(args, type, op, root, in_place, 0));
-
-
TESTCHECK(completeColl(args));
接下来,测试操作数据的正确性,这里不会测试性能,只测试正确性。测试的次数由配置参数”-c,--check “决定,默认只测试一次。
-
TESTCHECK(CheckData(args, type, op, root, in_place, &wrongElts));
-
-
//aggregate delta from all threads and procs
-
long long wrongElts1 = wrongElts;
-
//if (wrongElts) fprintf(stderr, "\nERROR: Data corruption : rank %d size %ld wrongElts %ld\n", args->proc, args->expectedBytes, wrongElts);
-
Allreduce(args, &wrongElts1, /*sum*/4);
-
wrongElts = wrongElts1;
-
if (wrongElts) break;
{BANNED}最佳后,打印测试结果。具体显示的结果信息有:
l time,根据配置的参数”-C,--report_cputime <0/1>“来决定显示的是CPU的执行时间还是GPU的执行时间,默认是要显示GPU的执行时间
l algBw,算法带宽
l busBw,总线带宽
l wrong,datacheck错误的数量
如果NCCL版本为2.9及以上,且cuda版本为11.3及以上。那么BenchTime在每次进行测试操作的时候,会根据配置参数”-G,--cudagraph “来决定是否使用cudaGraph来加速测试,以及重复该加速测试的次数,默认是不启用的。CUDA Graph的作用主要用于快速执行一组固定的动作,减少cpu显示的调用这一组动作的开销。适用于重复执行一组固定动作的场景。具体介绍参考。在BenchTime函数中,如果启用了cudagraph,那么会使用cudagraph捕捉之前性能测试的动作,然后再快速启动之前性能测试的动作(多次的迭代测试)。因为启动了cudagraph capture,因此之前的显示调用的操作不会执行,因此不会统计之前的性能测试结果,而是统计用cudagraph快速启动的新的测试的性能。
-
double timeUsec = (report_cputime ? cputimeSec : deltaSec)*1.0E6;
-
char timeStr[100];
-
if (timeUsec >= 10000.0) {
-
sprintf(timeStr, "%7.0f", timeUsec);
-
} else if (timeUsec >= 100.0) {
-
sprintf(timeStr, "%7.1f", timeUsec);
-
} else {
-
sprintf(timeStr, "%7.2f", timeUsec);
-
}
-
if (args->reportErrors) {
-
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
-
} else {
-
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
-
}
-
-
args->bw[0] += busBw;
-
args->bw_count[0]++;
-
return testSuccess;
-
}