From e6cd74ead3cee7a8187eb3b47541488e8472c648 Mon Sep 17 00:00:00 2001 From: FelixYBW <47296334+FelixYBW@users.noreply.github.com> Date: Sat, 25 Jul 2020 12:46:45 -0700 Subject: [PATCH] Set a minimal reducer size and parent_down size (#139) * set a minimal reducer msg size. Receive the same data size from parent each time. * When parent read from a child, check it receive minimal reduce size. fix bug. Rewrite the minimal reducer size check, make sure it's 1~N times of minimal reduce size Assume the minimal reduce size is X, the logic here is 1: each child upload total_size of message 2: each parent receive X message at least, up to total_size 3: parent reduce X or NxX or total_size message 4: parent sends X or NxX or total_size message to its parent 4: parent's parent receive X message at least, up to total_size. Then reduce X or NxX or total_size message 6: parent's parent sends X or NxX or total_size message to its children 7: parent receives X or NxX or total_size message, sends to its children 8: child receive X or NxN or total_size message. During the whole process, each transfer is (1~N)xX Byte message or up to total_size. if X is larger than total_size, then allreduce allways reduce the whole messages and pass down. * Follow style check rule * fix the cpplint check * fix allreduce_base header seq Co-authored-by: Philip Hyunsu Cho --- src/allreduce_base.cc | 68 +++++++++++++++++++++++++++++++------------ src/allreduce_base.h | 2 ++ src/engine_mpi.cc | 3 +- 3 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/allreduce_base.cc b/src/allreduce_base.cc index 179a11f75..ca02771a4 100644 --- a/src/allreduce_base.cc +++ b/src/allreduce_base.cc @@ -6,11 +6,11 @@ * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ #define NOMINMAX +#include "allreduce_base.h" #include #include #include #include -#include "allreduce_base.h" namespace rabit { @@ -33,6 +33,8 @@ AllreduceBase::AllreduceBase(void) { version_number = 0; // 32 K items reduce_ring_mincount = 32 << 10; + // 1M reducer size each time + tree_reduce_minsize = 1 << 20; // tracker URL task_id = "NULL"; err_link = NULL; @@ -187,6 +189,7 @@ void AllreduceBase::SetParam(const char *name, const char *val) { if (!strcmp(name, "DMLC_ROLE")) dmlc_role = val; if (!strcmp(name, "rabit_world_size")) world_size = atoi(val); if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = utils::StringToBool(val); + if (!strcmp(name, "rabit_tree_reduce_minsize")) tree_reduce_minsize = atoi(val); if (!strcmp(name, "rabit_reduce_ring_mincount")) { reduce_ring_mincount = atoi(val); utils::Assert(reduce_ring_mincount > 0, "rabit_reduce_ring_mincount should be greater than 0"); @@ -504,6 +507,9 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_, size_t size_up_out = 0; // size of message we received, and send in the down pass size_t size_down_in = 0; + // minimal size of each reducer + const size_t eachreduce = (tree_reduce_minsize / type_nbytes * type_nbytes); + // initialize the link ring-buffer and pointer for (int i = 0; i < nlink; ++i) { if (i != parent_index) { @@ -560,9 +566,14 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_, // read data from childs for (int i = 0; i < nlink; ++i) { if (i != parent_index && watcher.CheckRead(links[i].sock)) { - ReturnType ret = links[i].ReadToRingBuffer(size_up_out, total_size); - if (ret != kSuccess) { - return ReportError(&links[i], ret); + // make sure to receive minimal reducer size + // since each child reduce and sends the minimal reducer size + while (links[i].size_read < total_size + && links[i].size_read - size_up_reduce < eachreduce) { + ReturnType ret = links[i].ReadToRingBuffer(size_up_out, total_size); + if (ret != kSuccess) { + return ReportError(&links[i], ret); + } } } } @@ -582,6 +593,12 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_, utils::Assert(buffer_size != 0, "must assign buffer_size"); // round to type_n4bytes max_reduce = (max_reduce / type_nbytes * type_nbytes); + + // if max reduce is less than total size, we reduce multiple times of + // eachreduce size + if (max_reduce < total_size) + max_reduce = max_reduce - max_reduce % eachreduce; + // peform reduce, can be at most two rounds while (size_up_reduce < max_reduce) { // start position @@ -605,7 +622,7 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_, // pass message up to parent, can pass data that are already been reduced if (size_up_out < size_up_reduce) { ssize_t len = links[parent_index].sock. - Send(sendrecvbuf + size_up_out, size_up_reduce - size_up_out); + Send(sendrecvbuf + size_up_out, size_up_reduce - size_up_out); if (len != -1) { size_up_out += static_cast(len); } else { @@ -618,20 +635,33 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_, // read data from parent if (watcher.CheckRead(links[parent_index].sock) && total_size > size_down_in) { - ssize_t len = links[parent_index].sock. - Recv(sendrecvbuf + size_down_in, total_size - size_down_in); - if (len == 0) { - links[parent_index].sock.Close(); - return ReportError(&links[parent_index], kRecvZeroLen); - } - if (len != -1) { - size_down_in += static_cast(len); - utils::Assert(size_down_in <= size_up_out, - "Allreduce: boundary error"); - } else { - ReturnType ret = Errno2Return(); - if (ret != kSuccess) { - return ReportError(&links[parent_index], ret); + size_t left_size = total_size-size_down_in; + size_t reduce_size_min = std::min(left_size, eachreduce); + size_t recved = 0; + while (recved < reduce_size_min) { + ssize_t len = links[parent_index].sock. + Recv(sendrecvbuf + size_down_in, total_size - size_down_in); + + if (len == 0) { + links[parent_index].sock.Close(); + return ReportError(&links[parent_index], kRecvZeroLen); + } + if (len != -1) { + size_down_in += static_cast(len); + utils::Assert(size_down_in <= size_up_out, + "Allreduce: boundary error"); + recved+=len; + + // if it receives more data than each reduce, it means the next block is sent. + // we double the reduce_size_min or add to left_size + while (recved > reduce_size_min) { + reduce_size_min += std::min(left_size-reduce_size_min, eachreduce); + } + } else { + ReturnType ret = Errno2Return(); + if (ret != kSuccess) { + return ReportError(&links[parent_index], ret); + } } } } diff --git a/src/allreduce_base.h b/src/allreduce_base.h index d1d333308..c7ef638ff 100644 --- a/src/allreduce_base.h +++ b/src/allreduce_base.h @@ -565,6 +565,8 @@ class AllreduceBase : public IEngine { int reduce_method; // mininum count of cells to use ring based method size_t reduce_ring_mincount; + // minimul block size per tree reduce + size_t tree_reduce_minsize; // current rank int rank; // world size diff --git a/src/engine_mpi.cc b/src/engine_mpi.cc index 23f7bc530..107baf0a3 100644 --- a/src/engine_mpi.cc +++ b/src/engine_mpi.cc @@ -7,9 +7,10 @@ * \author Tianqi Chen */ #define NOMINMAX -#include #include +#include #include +#include #include "rabit/internal/engine.h" #include "rabit/internal/utils.h"