Set a minimal reducer size and parent_down size (#139)

* set a minimal reducer msg size. Receive the same data size from parent each time.

* When parent read from a child, check it receive minimal reduce size.
 fix bug. Rewrite the minimal reducer size check, make sure it's 1~N times of minimal reduce size

 Assume the minimal reduce size is X, the logic here is
 1: each child upload total_size of message
 2: each parent receive X message at least, up to total_size
 3: parent reduce X or NxX or total_size message
 4: parent sends X or NxX or total_size message to its parent
 4: parent's parent receive X message at least, up to total_size. Then reduce X or NxX or total_size message
 6: parent's parent sends X or NxX or total_size message to its children
 7: parent receives X or NxX or total_size message, sends to its children
 8: child receive X or NxN or total_size message.

 During the whole process, each transfer is (1~N)xX Byte message or up to total_size.

 if X is larger than total_size, then allreduce allways reduce the whole messages and pass down.

* Follow style check rule

* fix the cpplint check

* fix allreduce_base header seq

Co-authored-by: Philip Hyunsu Cho <chohyu01@cs.washington.edu>
This commit is contained in:
FelixYBW 2020-07-25 12:46:45 -07:00 committed by GitHub
parent 74bf00a5ab
commit e6cd74ead3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 20 deletions

View File

@ -6,11 +6,11 @@
* \author Tianqi Chen, Ignacio Cano, Tianyi Zhou * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
*/ */
#define NOMINMAX #define NOMINMAX
#include "allreduce_base.h"
#include <rabit/base.h> #include <rabit/base.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <cstring> #include <cstring>
#include <map> #include <map>
#include "allreduce_base.h"
namespace rabit { namespace rabit {
@ -33,6 +33,8 @@ AllreduceBase::AllreduceBase(void) {
version_number = 0; version_number = 0;
// 32 K items // 32 K items
reduce_ring_mincount = 32 << 10; reduce_ring_mincount = 32 << 10;
// 1M reducer size each time
tree_reduce_minsize = 1 << 20;
// tracker URL // tracker URL
task_id = "NULL"; task_id = "NULL";
err_link = NULL; err_link = NULL;
@ -187,6 +189,7 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
if (!strcmp(name, "DMLC_ROLE")) dmlc_role = val; if (!strcmp(name, "DMLC_ROLE")) dmlc_role = val;
if (!strcmp(name, "rabit_world_size")) world_size = atoi(val); if (!strcmp(name, "rabit_world_size")) world_size = atoi(val);
if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = utils::StringToBool(val); if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = utils::StringToBool(val);
if (!strcmp(name, "rabit_tree_reduce_minsize")) tree_reduce_minsize = atoi(val);
if (!strcmp(name, "rabit_reduce_ring_mincount")) { if (!strcmp(name, "rabit_reduce_ring_mincount")) {
reduce_ring_mincount = atoi(val); reduce_ring_mincount = atoi(val);
utils::Assert(reduce_ring_mincount > 0, "rabit_reduce_ring_mincount should be greater than 0"); utils::Assert(reduce_ring_mincount > 0, "rabit_reduce_ring_mincount should be greater than 0");
@ -504,6 +507,9 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_,
size_t size_up_out = 0; size_t size_up_out = 0;
// size of message we received, and send in the down pass // size of message we received, and send in the down pass
size_t size_down_in = 0; size_t size_down_in = 0;
// minimal size of each reducer
const size_t eachreduce = (tree_reduce_minsize / type_nbytes * type_nbytes);
// initialize the link ring-buffer and pointer // initialize the link ring-buffer and pointer
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
if (i != parent_index) { if (i != parent_index) {
@ -560,9 +566,14 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_,
// read data from childs // read data from childs
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
if (i != parent_index && watcher.CheckRead(links[i].sock)) { if (i != parent_index && watcher.CheckRead(links[i].sock)) {
ReturnType ret = links[i].ReadToRingBuffer(size_up_out, total_size); // make sure to receive minimal reducer size
if (ret != kSuccess) { // since each child reduce and sends the minimal reducer size
return ReportError(&links[i], ret); while (links[i].size_read < total_size
&& links[i].size_read - size_up_reduce < eachreduce) {
ReturnType ret = links[i].ReadToRingBuffer(size_up_out, total_size);
if (ret != kSuccess) {
return ReportError(&links[i], ret);
}
} }
} }
} }
@ -582,6 +593,12 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_,
utils::Assert(buffer_size != 0, "must assign buffer_size"); utils::Assert(buffer_size != 0, "must assign buffer_size");
// round to type_n4bytes // round to type_n4bytes
max_reduce = (max_reduce / type_nbytes * type_nbytes); max_reduce = (max_reduce / type_nbytes * type_nbytes);
// if max reduce is less than total size, we reduce multiple times of
// eachreduce size
if (max_reduce < total_size)
max_reduce = max_reduce - max_reduce % eachreduce;
// peform reduce, can be at most two rounds // peform reduce, can be at most two rounds
while (size_up_reduce < max_reduce) { while (size_up_reduce < max_reduce) {
// start position // start position
@ -605,7 +622,7 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_,
// pass message up to parent, can pass data that are already been reduced // pass message up to parent, can pass data that are already been reduced
if (size_up_out < size_up_reduce) { if (size_up_out < size_up_reduce) {
ssize_t len = links[parent_index].sock. ssize_t len = links[parent_index].sock.
Send(sendrecvbuf + size_up_out, size_up_reduce - size_up_out); Send(sendrecvbuf + size_up_out, size_up_reduce - size_up_out);
if (len != -1) { if (len != -1) {
size_up_out += static_cast<size_t>(len); size_up_out += static_cast<size_t>(len);
} else { } else {
@ -618,20 +635,33 @@ AllreduceBase::TryAllreduceTree(void *sendrecvbuf_,
// read data from parent // read data from parent
if (watcher.CheckRead(links[parent_index].sock) && if (watcher.CheckRead(links[parent_index].sock) &&
total_size > size_down_in) { total_size > size_down_in) {
ssize_t len = links[parent_index].sock. size_t left_size = total_size-size_down_in;
Recv(sendrecvbuf + size_down_in, total_size - size_down_in); size_t reduce_size_min = std::min(left_size, eachreduce);
if (len == 0) { size_t recved = 0;
links[parent_index].sock.Close(); while (recved < reduce_size_min) {
return ReportError(&links[parent_index], kRecvZeroLen); ssize_t len = links[parent_index].sock.
} Recv(sendrecvbuf + size_down_in, total_size - size_down_in);
if (len != -1) {
size_down_in += static_cast<size_t>(len); if (len == 0) {
utils::Assert(size_down_in <= size_up_out, links[parent_index].sock.Close();
"Allreduce: boundary error"); return ReportError(&links[parent_index], kRecvZeroLen);
} else { }
ReturnType ret = Errno2Return(); if (len != -1) {
if (ret != kSuccess) { size_down_in += static_cast<size_t>(len);
return ReportError(&links[parent_index], ret); utils::Assert(size_down_in <= size_up_out,
"Allreduce: boundary error");
recved+=len;
// if it receives more data than each reduce, it means the next block is sent.
// we double the reduce_size_min or add to left_size
while (recved > reduce_size_min) {
reduce_size_min += std::min(left_size-reduce_size_min, eachreduce);
}
} else {
ReturnType ret = Errno2Return();
if (ret != kSuccess) {
return ReportError(&links[parent_index], ret);
}
} }
} }
} }

View File

@ -565,6 +565,8 @@ class AllreduceBase : public IEngine {
int reduce_method; int reduce_method;
// mininum count of cells to use ring based method // mininum count of cells to use ring based method
size_t reduce_ring_mincount; size_t reduce_ring_mincount;
// minimul block size per tree reduce
size_t tree_reduce_minsize;
// current rank // current rank
int rank; int rank;
// world size // world size

View File

@ -7,9 +7,10 @@
* \author Tianqi Chen * \author Tianqi Chen
*/ */
#define NOMINMAX #define NOMINMAX
#include <rabit/base.h>
#include <mpi.h> #include <mpi.h>
#include <rabit/base.h>
#include <cstdio> #include <cstdio>
#include <string>
#include "rabit/internal/engine.h" #include "rabit/internal/engine.h"
#include "rabit/internal/utils.h" #include "rabit/internal/utils.h"