fix bug in initialization of routing

This commit is contained in:
tqchen 2015-01-14 19:40:41 -08:00
parent 797fe27efe
commit f161d2f1e5
9 changed files with 28 additions and 12 deletions

View File

@ -2,7 +2,8 @@ export CC = gcc
export CXX = g++ export CXX = g++
export MPICXX = mpicxx export MPICXX = mpicxx
export LDFLAGS= -Llib export LDFLAGS= -Llib
export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -Iinclude export WARNFLAGS= -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -pedantic
export CFLAGS = -O3 -msse2 -fPIC -Iinclude $(WARNFLAGS)
# build path # build path
BPATH=. BPATH=.
@ -15,7 +16,7 @@ ALIB= lib/librabit.a lib/librabit_mpi.a lib/librabit_empty.a lib/librabit_mock.a
HEADERS=src/*.h include/*.h include/rabit/*.h HEADERS=src/*.h include/*.h include/rabit/*.h
.PHONY: clean all install mpi python .PHONY: clean all install mpi python
all: lib/librabit.a lib/librabit_mock.a $(SLIB) all: lib/librabit.a lib/librabit_mock.a wrapper/librabit_wrapper.so wrapper/librabit_wrapper_mock.so
mpi: lib/librabit_mpi.a wrapper/librabit_wrapper_mpi.so mpi: lib/librabit_mpi.a wrapper/librabit_wrapper_mpi.so
python: wrapper/librabit_wrapper.so wrapper/librabit_wrapper_mock.so python: wrapper/librabit_wrapper.so wrapper/librabit_wrapper_mock.so

View File

@ -242,8 +242,10 @@ class ReduceHandle {
static int TypeSize(const MPI::Datatype &dtype); static int TypeSize(const MPI::Datatype &dtype);
protected: protected:
// handle data field // handle function field
void *handle_; void *handle_;
// reduce function of the reducer
IEngine::ReduceFunction *redfunc_;
// handle to the type field // handle to the type field
void *htype_; void *htype_;
// the created type in 4 bytes // the created type in 4 bytes

View File

@ -201,7 +201,7 @@ inline void ReducerFunc_(const void *src_, void *dst_, int len_, const MPI::Data
const char *psrc = reinterpret_cast<const char*>(src_); const char *psrc = reinterpret_cast<const char*>(src_);
char *pdst = reinterpret_cast<char*>(dst_); char *pdst = reinterpret_cast<char*>(dst_);
DType tdst, tsrc; DType tdst, tsrc;
for (size_t i = 0; i < len_; ++i) { for (int i = 0; i < len_; ++i) {
// use memcpy to avoid alignment issue // use memcpy to avoid alignment issue
std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst)); std::memcpy(&tdst, pdst + i * kUnit, sizeof(tdst));
std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc)); std::memcpy(&tsrc, psrc + i * kUnit, sizeof(tsrc));

View File

@ -27,6 +27,7 @@ AllreduceBase::AllreduceBase(void) {
hadoop_mode = 0; hadoop_mode = 0;
version_number = 0; version_number = 0;
task_id = "NULL"; task_id = "NULL";
err_link = NULL;
this->SetParam("rabit_reduce_buffer", "256MB"); this->SetParam("rabit_reduce_buffer", "256MB");
} }

View File

@ -249,7 +249,9 @@ class AllreduceBase : public IEngine {
// buffer size, in bytes // buffer size, in bytes
size_t buffer_size; size_t buffer_size;
// constructor // constructor
LinkRecord(void) {} LinkRecord(void)
: buffer_head(NULL), buffer_size(0) {
}
// initialize buffer // initialize buffer
inline void InitBuffer(size_t type_nbytes, size_t count, inline void InitBuffer(size_t type_nbytes, size_t count,
size_t reduce_buffer_size) { size_t reduce_buffer_size) {
@ -276,6 +278,7 @@ class AllreduceBase : public IEngine {
* \return the type of reading * \return the type of reading
*/ */
inline ReturnType ReadToRingBuffer(size_t protect_start) { inline ReturnType ReadToRingBuffer(size_t protect_start) {
utils::Assert(buffer_head != NULL, "ReadToRingBuffer: buffer not allocated");
size_t ngap = size_read - protect_start; size_t ngap = size_read - protect_start;
utils::Assert(ngap <= buffer_size, "Allreduce: boundary check"); utils::Assert(ngap <= buffer_size, "Allreduce: boundary check");
size_t offset = size_read % buffer_size; size_t offset = size_read % buffer_size;

View File

@ -27,6 +27,7 @@ AllreduceRobust::AllreduceRobust(void) {
result_buffer_round = 1; result_buffer_round = 1;
global_lazycheck = NULL; global_lazycheck = NULL;
use_local_model = -1; use_local_model = -1;
recover_counter = 0;
} }
void AllreduceRobust::Init(void) { void AllreduceRobust::Init(void) {
AllreduceBase::Init(); AllreduceBase::Init();
@ -421,6 +422,8 @@ AllreduceRobust::ReturnType AllreduceRobust::TryResetLinks(void) {
*/ */
bool AllreduceRobust::CheckAndRecover(ReturnType err_type) { bool AllreduceRobust::CheckAndRecover(ReturnType err_type) {
if (err_type == kSuccess) return true; if (err_type == kSuccess) return true;
utils::Assert(err_link != NULL, "must know the error source");
recover_counter += 1;
{ {
// simple way, shutdown all links // simple way, shutdown all links
for (size_t i = 0; i < all_links.size(); ++i) { for (size_t i = 0; i < all_links.size(); ++i) {
@ -602,6 +605,9 @@ AllreduceRobust::TryRecoverData(RecoverType role,
if (!req_data) return kSuccess; if (!req_data) return kSuccess;
} }
utils::Assert(recv_link >= 0 || role == kHaveData, "recv_link must be active"); utils::Assert(recv_link >= 0 || role == kHaveData, "recv_link must be active");
if (role == kPassData) {
links[recv_link].InitBuffer(1, size, reduce_buffer_size);
}
for (int i = 0; i < nlink; ++i) { for (int i = 0; i < nlink; ++i) {
links[i].ResetSize(); links[i].ResetSize();
} }

View File

@ -531,6 +531,8 @@ o * the input state must exactly one saved state(local state of current node)
int use_local_model; int use_local_model;
// number of replica for global state/model // number of replica for global state/model
int num_global_replica; int num_global_replica;
// number of times recovery happens
int recover_counter;
// --- recovery data structure for local checkpoint // --- recovery data structure for local checkpoint
// there is two version of the data structure, // there is two version of the data structure,
// at one time one version is valid and another is used as temp memory // at one time one version is valid and another is used as temp memory

View File

@ -56,7 +56,8 @@ void Allreduce_(void *sendrecvbuf,
} }
// code for reduce handle // code for reduce handle
ReduceHandle::ReduceHandle(void) : handle_(NULL), htype_(NULL) { ReduceHandle::ReduceHandle(void)
: handle_(NULL), redfunc_(NULL), htype_(NULL) {
} }
ReduceHandle::~ReduceHandle(void) {} ReduceHandle::~ReduceHandle(void) {}
@ -64,17 +65,16 @@ int ReduceHandle::TypeSize(const MPI::Datatype &dtype) {
return static_cast<int>(dtype.type_size); return static_cast<int>(dtype.type_size);
} }
void ReduceHandle::Init(IEngine::ReduceFunction redfunc, size_t type_nbytes) { void ReduceHandle::Init(IEngine::ReduceFunction redfunc, size_t type_nbytes) {
utils::Assert(handle_ == NULL, "cannot initialize reduce handle twice"); utils::Assert(redfunc_ == NULL, "cannot initialize reduce handle twice");
handle_ = reinterpret_cast<void*>(redfunc); redfunc_ = redfunc;
} }
void ReduceHandle::Allreduce(void *sendrecvbuf, void ReduceHandle::Allreduce(void *sendrecvbuf,
size_t type_nbytes, size_t count, size_t type_nbytes, size_t count,
IEngine::PreprocFunction prepare_fun, IEngine::PreprocFunction prepare_fun,
void *prepare_arg) { void *prepare_arg) {
utils::Assert(handle_ != NULL, "must intialize handle to call AllReduce"); utils::Assert(redfunc_ != NULL, "must intialize handle to call AllReduce");
GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count,
reinterpret_cast<IEngine::ReduceFunction*>(handle_), redfunc_, prepare_fun, prepare_arg);
prepare_fun, prepare_arg);
} }
} // namespace engine } // namespace engine
} // namespace rabit } // namespace rabit

View File

@ -137,7 +137,8 @@ void Allreduce_(void *sendrecvbuf,
} }
// code for reduce handle // code for reduce handle
ReduceHandle::ReduceHandle(void) : handle_(NULL), htype_(NULL) { ReduceHandle::ReduceHandle(void)
: handle_(NULL), redfunc_(NULL), htype_(NULL) {
} }
ReduceHandle::~ReduceHandle(void) { ReduceHandle::~ReduceHandle(void) {
if (handle_ != NULL) { if (handle_ != NULL) {