From ed1de6df80a426dc82250a99d32ae3e9f0d73776 Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 2 Dec 2014 21:11:48 -0800 Subject: [PATCH] change AllReduce to Allreduce --- src/allreduce_base.cc | 30 +++++++++++------------ src/allreduce_base.h | 28 +++++++++++----------- src/allreduce_robust-inl.h | 6 ++--- src/allreduce_robust.cc | 48 ++++++++++++++++++------------------- src/allreduce_robust.h | 14 +++++------ src/engine.cc | 6 ++--- src/engine.h | 6 ++--- src/engine_mpi.cc | 6 ++--- src/mock.h | 4 ++-- src/rabit-inl.h | 6 ++--- src/rabit.h | 8 +++---- src/tcp_master.py | 2 +- test/test_allreduce.cpp | 4 ++-- test/test_model_recover.cpp | 4 ++-- test/test_recover.cpp | 4 ++-- 15 files changed, 88 insertions(+), 88 deletions(-) diff --git a/src/allreduce_base.cc b/src/allreduce_base.cc index ca63f5c1c..0cfb9fd34 100644 --- a/src/allreduce_base.cc +++ b/src/allreduce_base.cc @@ -13,7 +13,7 @@ namespace rabit { namespace engine { // constructor -AllReduceBase::AllReduceBase(void) { +AllreduceBase::AllreduceBase(void) { master_uri = "NULL"; master_port = 9000; host_uri = ""; @@ -26,7 +26,7 @@ AllReduceBase::AllReduceBase(void) { } // initialization function -void AllReduceBase::Init(void) { +void AllreduceBase::Init(void) { utils::Socket::Startup(); // single node mode if (master_uri == "NULL") return; @@ -68,7 +68,7 @@ void AllReduceBase::Init(void) { utils::Assert(master.RecvAll(&hname[0], len) == static_cast(len), "sync::Init failure 10"); utils::Assert(master.RecvAll(&hport, sizeof(hport)) == sizeof(hport), "sync::Init failure 11"); links[0].sock.Create(); - links[0].sock.Connect(utils::SockAddr(hname.c_str(), hport)); + links[0].sock.Connect(utils::SockAddr(hname.c_str(), hport)); utils::Assert(links[0].sock.SendAll(&magic, sizeof(magic)) == sizeof(magic), "sync::Init failure 12"); utils::Assert(links[0].sock.RecvAll(&magic, sizeof(magic)) == sizeof(magic), "sync::Init failure 13"); utils::Check(magic == kMagic, "sync::Init failure, parent magic number mismatch"); @@ -105,7 +105,7 @@ void AllReduceBase::Init(void) { // done } -void AllReduceBase::Shutdown(void) { +void AllreduceBase::Shutdown(void) { for (size_t i = 0; i < links.size(); ++i) { links[i].sock.Close(); } @@ -117,7 +117,7 @@ void AllReduceBase::Shutdown(void) { * \param name parameter name * \param val parameter value */ -void AllReduceBase::SetParam(const char *name, const char *val) { +void AllreduceBase::SetParam(const char *name, const char *val) { if (!strcmp(name, "master_uri")) master_uri = val; if (!strcmp(name, "master_port")) master_port = atoi(val); if (!strcmp(name, "reduce_buffer")) { @@ -140,10 +140,10 @@ void AllReduceBase::SetParam(const char *name, const char *val) { /*! * \brief perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure * - * NOTE on AllReduce: - * The kSuccess TryAllReduce does NOT mean every node have successfully finishes TryAllReduce. - * It only means the current node get the correct result of AllReduce. - * However, it means every node finishes LAST call(instead of this one) of AllReduce/Bcast + * NOTE on Allreduce: + * The kSuccess TryAllreduce does NOT mean every node have successfully finishes TryAllreduce. + * It only means the current node get the correct result of Allreduce. + * However, it means every node finishes LAST call(instead of this one) of Allreduce/Bcast * * \param sendrecvbuf_ buffer for both sending and recving data * \param type_nbytes the unit number of bytes the type have @@ -152,8 +152,8 @@ void AllReduceBase::SetParam(const char *name, const char *val) { * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details * \sa ReturnType */ -AllReduceBase::ReturnType -AllReduceBase::TryAllReduce(void *sendrecvbuf_, +AllreduceBase::ReturnType +AllreduceBase::TryAllreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) { @@ -248,7 +248,7 @@ AllReduceBase::TryAllReduce(void *sendrecvbuf_, size_t start = size_up_reduce % buffer_size; // peform read till end of buffer size_t nread = std::min(buffer_size - start, max_reduce - size_up_reduce); - utils::Assert(nread % type_nbytes == 0, "AllReduce: size check"); + utils::Assert(nread % type_nbytes == 0, "Allreduce: size check"); for (int i = 0; i < nlink; ++i) { if (i != parent_index) { reducer(links[i].buffer_head + start, @@ -280,7 +280,7 @@ AllReduceBase::TryAllReduce(void *sendrecvbuf_, } if (len != -1) { size_down_in += static_cast(len); - utils::Assert(size_down_in <= size_up_out, "AllReduce: boundary error"); + utils::Assert(size_down_in <= size_up_out, "Allreduce: boundary error"); } else { if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError; } @@ -306,8 +306,8 @@ AllReduceBase::TryAllReduce(void *sendrecvbuf_, * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details * \sa ReturnType */ -AllReduceBase::ReturnType -AllReduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) { +AllreduceBase::ReturnType +AllreduceBase::TryBroadcast(void *sendrecvbuf_, size_t total_size, int root) { if (links.size() == 0 || total_size == 0) return kSuccess; utils::Check(root < world_size, "Broadcast: root should be smaller than world size"); // number of links diff --git a/src/allreduce_base.h b/src/allreduce_base.h index 5ddf27635..578b941f1 100644 --- a/src/allreduce_base.h +++ b/src/allreduce_base.h @@ -27,14 +27,14 @@ class Datatype { } namespace rabit { namespace engine { -/*! \brief implementation of basic AllReduce engine */ -class AllReduceBase : public IEngine { +/*! \brief implementation of basic Allreduce engine */ +class AllreduceBase : public IEngine { public: // magic number to verify server const static int kMagic = 0xff99; // constant one byte out of band message to indicate error happening - AllReduceBase(void); - virtual ~AllReduceBase(void) {} + AllreduceBase(void); + virtual ~AllreduceBase(void) {} // initialize the manager void Init(void); // shutdown the engine @@ -65,12 +65,12 @@ class AllReduceBase : public IEngine { * \param count number of elements to be reduced * \param reducer reduce function */ - virtual void AllReduce(void *sendrecvbuf_, + virtual void Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) { - utils::Assert(TryAllReduce(sendrecvbuf_, type_nbytes, count, reducer) == kSuccess, - "AllReduce failed"); + utils::Assert(TryAllreduce(sendrecvbuf_, type_nbytes, count, reducer) == kSuccess, + "Allreduce failed"); } /*! * \brief broadcast data from root to all nodes @@ -80,7 +80,7 @@ class AllReduceBase : public IEngine { */ virtual void Broadcast(void *sendrecvbuf_, size_t total_size, int root) { utils::Assert(TryBroadcast(sendrecvbuf_, total_size, root) == kSuccess, - "AllReduce failed"); + "Allreduce failed"); } /*! * \brief load latest check point @@ -171,7 +171,7 @@ class AllReduceBase : public IEngine { */ inline bool ReadToRingBuffer(size_t protect_start) { size_t ngap = size_read - protect_start; - utils::Assert(ngap <= buffer_size, "AllReduce: boundary check"); + utils::Assert(ngap <= buffer_size, "Allreduce: boundary check"); size_t offset = size_read % buffer_size; size_t nmax = std::min(buffer_size - ngap, buffer_size - offset); if (nmax == 0) return true; @@ -225,10 +225,10 @@ class AllReduceBase : public IEngine { /*! * \brief perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure * - * NOTE on AllReduce: - * The kSuccess TryAllReduce does NOT mean every node have successfully finishes TryAllReduce. - * It only means the current node get the correct result of AllReduce. - * However, it means every node finishes LAST call(instead of this one) of AllReduce/Bcast + * NOTE on Allreduce: + * The kSuccess TryAllreduce does NOT mean every node have successfully finishes TryAllreduce. + * It only means the current node get the correct result of Allreduce. + * However, it means every node finishes LAST call(instead of this one) of Allreduce/Bcast * * \param sendrecvbuf_ buffer for both sending and recving data * \param type_nbytes the unit number of bytes the type have @@ -237,7 +237,7 @@ class AllReduceBase : public IEngine { * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details * \sa ReturnType */ - ReturnType TryAllReduce(void *sendrecvbuf_, + ReturnType TryAllreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer); diff --git a/src/allreduce_robust-inl.h b/src/allreduce_robust-inl.h index cc9943282..f1f557593 100644 --- a/src/allreduce_robust-inl.h +++ b/src/allreduce_robust-inl.h @@ -1,6 +1,6 @@ /*! * \file allreduce_robust-inl.h - * \brief implementation of inline template function in AllReduceRobust + * \brief implementation of inline template function in AllreduceRobust * * \author Tianqi Chen */ @@ -29,8 +29,8 @@ namespace engine { * \tparam NodeType type of node value */ template -inline AllReduceRobust::ReturnType -AllReduceRobust::MsgPassing(const NodeType &node_value, +inline AllreduceRobust::ReturnType +AllreduceRobust::MsgPassing(const NodeType &node_value, std::vector *p_edge_in, std::vector *p_edge_out, EdgeType (*func) (const NodeType &node_value, diff --git a/src/allreduce_robust.cc b/src/allreduce_robust.cc index 83b6a5fc8..d2339a3be 100644 --- a/src/allreduce_robust.cc +++ b/src/allreduce_robust.cc @@ -1,6 +1,6 @@ /*! * \file allreduce_robust.cc - * \brief Robust implementation of AllReduce + * \brief Robust implementation of Allreduce * * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ @@ -15,12 +15,12 @@ namespace rabit { namespace engine { -AllReduceRobust::AllReduceRobust(void) { +AllreduceRobust::AllreduceRobust(void) { result_buffer_round = 1; seq_counter = 0; } /*! \brief shutdown the engine */ -void AllReduceRobust::Shutdown(void) { +void AllreduceRobust::Shutdown(void) { // need to sync the exec before we shutdown, do a pesudo check point // execute checkpoint, note: when checkpoint existing, load will not happen utils::Assert(RecoverExec(NULL, 0, ActionSummary::kCheckPoint, ActionSummary::kMaxSeq), @@ -30,15 +30,15 @@ void AllReduceRobust::Shutdown(void) { // execute check ack step, load happens here utils::Assert(RecoverExec(NULL, 0, ActionSummary::kCheckAck, ActionSummary::kMaxSeq), "check ack must return true"); - AllReduceBase::Shutdown(); + AllreduceBase::Shutdown(); } /*! * \brief set parameters to the engine * \param name parameter name * \param val parameter value */ -void AllReduceRobust::SetParam(const char *name, const char *val) { - AllReduceBase::SetParam(name, val); +void AllreduceRobust::SetParam(const char *name, const char *val) { + AllreduceBase::SetParam(name, val); if (!strcmp(name, "result_buffer_round")) result_buffer_round = atoi(val); if (!strcmp(name, "result_replicate")) { result_buffer_round = std::max(world_size / atoi(val), 1); @@ -52,7 +52,7 @@ void AllReduceRobust::SetParam(const char *name, const char *val) { * \param count number of elements to be reduced * \param reducer reduce function */ -void AllReduceRobust::AllReduce(void *sendrecvbuf_, +void AllreduceRobust::Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) { @@ -68,7 +68,7 @@ void AllReduceRobust::AllReduce(void *sendrecvbuf_, std::memcpy(temp, sendrecvbuf_, type_nbytes * count); break; } else { std::memcpy(temp, sendrecvbuf_, type_nbytes * count); - if (CheckAndRecover(TryAllReduce(temp, type_nbytes, count, reducer))) { + if (CheckAndRecover(TryAllreduce(temp, type_nbytes, count, reducer))) { std::memcpy(sendrecvbuf_, temp, type_nbytes * count); break; } else { recovered = RecoverExec(sendrecvbuf_, type_nbytes * count, 0, seq_counter); @@ -84,7 +84,7 @@ void AllReduceRobust::AllReduce(void *sendrecvbuf_, * \param size the size of the data to be broadcasted * \param root the root worker id to broadcast the data */ -void AllReduceRobust::Broadcast(void *sendrecvbuf_, size_t total_size, int root) { +void AllreduceRobust::Broadcast(void *sendrecvbuf_, size_t total_size, int root) { bool recovered = RecoverExec(sendrecvbuf_, total_size, 0, seq_counter); // now we are free to remove the last result, if any if (resbuf.LastSeqNo() != -1 && @@ -114,7 +114,7 @@ void AllReduceRobust::Broadcast(void *sendrecvbuf_, size_t total_size, int root) * the p_model is not touched, user should do necessary initialization by themselves * \sa CheckPoint, VersionNumber */ -int AllReduceRobust::LoadCheckPoint(utils::ISerializable *p_model) { +int AllreduceRobust::LoadCheckPoint(utils::ISerializable *p_model) { // check if we succesfll if (RecoverExec(NULL, 0, ActionSummary::kLoadCheck, ActionSummary::kMaxSeq)) { // reset result buffer @@ -142,7 +142,7 @@ int AllReduceRobust::LoadCheckPoint(utils::ISerializable *p_model) { * \param p_model pointer to the model * \sa LoadCheckPoint, VersionNumber */ -void AllReduceRobust::CheckPoint(const utils::ISerializable &model) { +void AllreduceRobust::CheckPoint(const utils::ISerializable &model) { // increase version number version_number += 1; // save model @@ -168,7 +168,7 @@ void AllReduceRobust::CheckPoint(const utils::ISerializable &model) { * when kSockError is returned, it simply means there are bad sockets in the links, * and some link recovery proceduer is needed */ -AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) { +AllreduceRobust::ReturnType AllreduceRobust::TryResetLinks(void) { // number of links const int nlink = static_cast(links.size()); for (int i = 0; i < nlink; ++i) { @@ -285,7 +285,7 @@ AllReduceRobust::ReturnType AllReduceRobust::TryResetLinks(void) { * \brief try to reconnect the broken links * \return this function can kSuccess or kSockError */ -AllReduceRobust::ReturnType AllReduceRobust::TryReConnectLinks(void) { +AllreduceRobust::ReturnType AllreduceRobust::TryReConnectLinks(void) { utils::Error("TryReConnectLinks: not implemented"); return kSuccess; } @@ -296,7 +296,7 @@ AllReduceRobust::ReturnType AllReduceRobust::TryReConnectLinks(void) { * \param err_type the type of error happening in the system * \return true if err_type is kSuccess, false otherwise */ -bool AllReduceRobust::CheckAndRecover(ReturnType err_type) { +bool AllreduceRobust::CheckAndRecover(ReturnType err_type) { if (err_type == kSuccess) return true; while(err_type != kSuccess) { switch(err_type) { @@ -383,8 +383,8 @@ inline char DataRequest(const std::pair &node_value, * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details * \sa ReturnType */ -AllReduceRobust::ReturnType -AllReduceRobust::TryDecideRouting(AllReduceRobust::RecoverType role, +AllreduceRobust::ReturnType +AllreduceRobust::TryDecideRouting(AllreduceRobust::RecoverType role, size_t *p_size, int *p_recvlink, std::vector *p_req_in) { @@ -398,7 +398,7 @@ AllReduceRobust::TryDecideRouting(AllReduceRobust::RecoverType role, for (size_t i = 0; i < dist_in.size(); ++i) { if (dist_in[i].first != std::numeric_limits::max()) { utils::Check(best_link == -2 || *p_size == dist_in[i].second, - "[%d] AllReduce size inconsistent, distin=%lu, size=%lu, reporting=%lu\n", + "[%d] Allreduce size inconsistent, distin=%lu, size=%lu, reporting=%lu\n", rank, dist_in[i].first, *p_size, dist_in[i].second); if (best_link == -2 || dist_in[i].first < dist_in[best_link].first) { best_link = static_cast(i); @@ -444,8 +444,8 @@ AllReduceRobust::TryDecideRouting(AllReduceRobust::RecoverType role, * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details * \sa ReturnType, TryDecideRouting */ -AllReduceRobust::ReturnType -AllReduceRobust::TryRecoverData(RecoverType role, +AllreduceRobust::ReturnType +AllreduceRobust::TryRecoverData(RecoverType role, void *sendrecvbuf_, size_t size, int recv_link, @@ -546,7 +546,7 @@ AllReduceRobust::TryRecoverData(RecoverType role, * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details * \sa ReturnType */ -AllReduceRobust::ReturnType AllReduceRobust::TryLoadCheckPoint(bool requester) { +AllreduceRobust::ReturnType AllreduceRobust::TryLoadCheckPoint(bool requester) { RecoverType role = requester ? kRequestData : kHaveData; size_t size = this->checked_model.length(); int recv_link; @@ -573,8 +573,8 @@ AllReduceRobust::ReturnType AllReduceRobust::TryLoadCheckPoint(bool requester) { * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details * \sa ReturnType */ -AllReduceRobust::ReturnType -AllReduceRobust::TryGetResult(void *sendrecvbuf, size_t size, int seqno, bool requester) { RecoverType role; +AllreduceRobust::ReturnType +AllreduceRobust::TryGetResult(void *sendrecvbuf, size_t size, int seqno, bool requester) { RecoverType role; if (!requester) { sendrecvbuf = resbuf.Query(seqno, &size); role = sendrecvbuf != NULL ? kHaveData : kPassData; @@ -605,7 +605,7 @@ AllReduceRobust::TryGetResult(void *sendrecvbuf, size_t size, int seqno, bool re * result by recovering procedure, the action is complete, no further action is needed * - false means this is the lastest action that has not yet been executed, need to execute the action */ -bool AllReduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) { +bool AllreduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) { if (flag != 0) { utils::Assert(seqno == ActionSummary::kMaxSeq, "must only set seqno for normal operations"); } @@ -615,7 +615,7 @@ bool AllReduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) { // action ActionSummary act = req; // get the reduced action - if (!CheckAndRecover(TryAllReduce(&act, sizeof(act), 1, ActionSummary::Reducer))) continue; + if (!CheckAndRecover(TryAllreduce(&act, sizeof(act), 1, ActionSummary::Reducer))) continue; if (act.check_ack()) { if (act.check_point()) { // if we also have check_point, do check point first diff --git a/src/allreduce_robust.h b/src/allreduce_robust.h index d9eee6d25..26e45f16c 100644 --- a/src/allreduce_robust.h +++ b/src/allreduce_robust.h @@ -1,6 +1,6 @@ /*! * \file allreduce_robust.h - * \brief Robust implementation of AllReduce + * \brief Robust implementation of Allreduce * using TCP non-block socket and tree-shape reduction. * * This implementation considers the failure of nodes @@ -16,10 +16,10 @@ namespace rabit { namespace engine { /*! \brief implementation of fault tolerant all reduce engine */ -class AllReduceRobust : public AllReduceBase { +class AllreduceRobust : public AllreduceBase { public: - AllReduceRobust(void); - virtual ~AllReduceRobust(void) {} + AllreduceRobust(void); + virtual ~AllreduceRobust(void) {} /*! \brief shutdown the engine */ virtual void Shutdown(void); /*! @@ -36,7 +36,7 @@ class AllReduceRobust : public AllReduceBase { * \param count number of elements to be reduced * \param reducer reduce function */ - virtual void AllReduce(void *sendrecvbuf_, + virtual void Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer); @@ -142,7 +142,7 @@ class AllReduceRobust : public AllReduceBase { inline int flag(void) const { return seqcode & 15; } - // reducer for AllReduce, used to get the result ActionSummary from all nodes + // reducer for Allreduce, used to get the result ActionSummary from all nodes inline static void Reducer(const void *src_, void *dst_, int len, const MPI::Datatype &dtype) { const ActionSummary *src = (const ActionSummary*)src_; ActionSummary *dst = (ActionSummary*)dst_; @@ -162,7 +162,7 @@ class AllReduceRobust : public AllReduceBase { // internel sequence code int seqcode; }; - /*! \brief data structure to remember result of Bcast and AllReduce calls */ + /*! \brief data structure to remember result of Bcast and Allreduce calls */ class ResultBuffer { public: // constructor diff --git a/src/engine.cc b/src/engine.cc index 24ab1e588..0512ac503 100644 --- a/src/engine.cc +++ b/src/engine.cc @@ -16,7 +16,7 @@ namespace rabit { namespace engine { // singleton sync manager -AllReduceRobust manager; +AllreduceRobust manager; /*! \brief intiialize the synchronization module */ void Init(int argc, char *argv[]) { @@ -38,13 +38,13 @@ IEngine *GetEngine(void) { return &manager; } // perform in-place allreduce, on sendrecvbuf -void AllReduce_(void *sendrecvbuf, +void Allreduce_(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::ReduceFunction red, mpi::DataType dtype, mpi::OpType op) { - GetEngine()->AllReduce(sendrecvbuf, type_nbytes, count, red); + GetEngine()->Allreduce(sendrecvbuf, type_nbytes, count, red); } } // namespace engine } // namespace rabit diff --git a/src/engine.h b/src/engine.h index 873c02588..6d95fe5dc 100644 --- a/src/engine.h +++ b/src/engine.h @@ -16,7 +16,7 @@ class Datatype; namespace rabit { /*! \brief core interface of engine */ namespace engine { -/*! \brief interface of core AllReduce engine */ +/*! \brief interface of core Allreduce engine */ class IEngine { public: /*! @@ -41,7 +41,7 @@ class IEngine { * \param count number of elements to be reduced * \param reducer reduce function */ - virtual void AllReduce(void *sendrecvbuf_, + virtual void Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) = 0; @@ -130,7 +130,7 @@ enum DataType { * \param dtype the data type * \param op the reduce operator type */ -void AllReduce_(void *sendrecvbuf, +void Allreduce_(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::ReduceFunction red, diff --git a/src/engine_mpi.cc b/src/engine_mpi.cc index c2e2a572d..03bd0cb73 100644 --- a/src/engine_mpi.cc +++ b/src/engine_mpi.cc @@ -20,11 +20,11 @@ class MPIEngine : public IEngine { MPIEngine(void) { version_number = 0; } - virtual void AllReduce(void *sendrecvbuf_, + virtual void Allreduce(void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) { - utils::Error("MPIEngine:: AllReduce is not supported, use AllReduce_ instead"); + utils::Error("MPIEngine:: Allreduce is not supported, use Allreduce_ instead"); } virtual void Broadcast(void *sendrecvbuf_, size_t size, int root) { MPI::COMM_WORLD.Bcast(sendrecvbuf_, size, MPI::CHAR, root); @@ -103,7 +103,7 @@ inline MPI::Op GetOp(mpi::OpType otype) { return MPI::MAX; } // perform in-place allreduce, on sendrecvbuf -void AllReduce_(void *sendrecvbuf, +void Allreduce_(void *sendrecvbuf, size_t type_nbytes, size_t count, IEngine::ReduceFunction red, diff --git a/src/mock.h b/src/mock.h index 1dd004c8b..5c85b841f 100644 --- a/src/mock.h +++ b/src/mock.h @@ -25,9 +25,9 @@ public: } template - inline void AllReduce(float *sendrecvbuf, size_t count) { + inline void Allreduce(float *sendrecvbuf, size_t count) { utils::Assert(verify(allReduce), "[%d] error when calling allReduce", rank); - rabit::AllReduce(sendrecvbuf, count); + rabit::Allreduce(sendrecvbuf, count); } inline bool LoadCheckPoint(utils::ISerializable *p_model) { diff --git a/src/rabit-inl.h b/src/rabit-inl.h index bc3c4a4fb..4ea741efe 100644 --- a/src/rabit-inl.h +++ b/src/rabit-inl.h @@ -101,10 +101,10 @@ inline void Bcast(std::string *sendrecv_data, int root) { e->Broadcast(&(*sendrecv_data)[0], len, root); } } -// perform inplace AllReduce +// perform inplace Allreduce template -inline void AllReduce(DType *sendrecvbuf, size_t count) { - engine::AllReduce_(sendrecvbuf, sizeof(DType), count, op::Reducer, +inline void Allreduce(DType *sendrecvbuf, size_t count) { + engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer, engine::mpi::GetType(), OP::kType); } // load latest check point diff --git a/src/rabit.h b/src/rabit.h index 0260ee52b..859b5488a 100644 --- a/src/rabit.h +++ b/src/rabit.h @@ -2,9 +2,9 @@ #define RABIT_RABIT_H /*! * \file rabit.h - * \brief This file defines unified AllReduce/Broadcast interface of rabit + * \brief This file defines unified Allreduce/Broadcast interface of rabit * The actual implementation is redirected to rabit engine - * Code only using this header can also compiled with MPI AllReduce(with no fault recovery), + * Code only using this header can also compiled with MPI Allreduce(with no fault recovery), * * \author Tianqi Chen, Ignacio Cano, Tianyi Zhou */ @@ -54,7 +54,7 @@ inline void Bcast(std::string *sendrecv_data, int root); * Example Usage: the following code gives sum of the result * vector data(10); * ... - * AllReduce(&data[0], data.size()); + * Allreduce(&data[0], data.size()); * ... * \param sendrecvbuf buffer for both sending and recving data * \param count number of elements to be reduced @@ -62,7 +62,7 @@ inline void Bcast(std::string *sendrecv_data, int root); * \tparam DType type of data */ template -inline void AllReduce(DType *sendrecvbuf, size_t count); +inline void Allreduce(DType *sendrecvbuf, size_t count); /*! * \brief load latest check point * \param p_model pointer to the model diff --git a/src/tcp_master.py b/src/tcp_master.py index c0820f14b..015b48784 100644 --- a/src/tcp_master.py +++ b/src/tcp_master.py @@ -68,7 +68,7 @@ class Master: try: magic = slave.recvint() if magic != kMagic: - print 'invalid magic number=%d from %s' % (magic, s_addr[0]) + print 'invalid magic number=%d from %s' % (magic, s_addr[0]) slave.sock.close() continue except socket.error: diff --git a/test/test_allreduce.cpp b/test/test_allreduce.cpp index e0fc9843f..625d9592a 100644 --- a/test/test_allreduce.cpp +++ b/test/test_allreduce.cpp @@ -15,7 +15,7 @@ inline void TestMax(test::Mock &mock, size_t n) { for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % 111; } - mock.AllReduce(&ndata[0], ndata.size()); + mock.Allreduce(&ndata[0], ndata.size()); for (size_t i = 0; i < ndata.size(); ++i) { float rmax = (i * 1) % 111; for (int r = 0; r < nproc; ++r) { @@ -34,7 +34,7 @@ inline void TestSum(test::Mock &mock, size_t n) { for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % z; } - mock.AllReduce(&ndata[0], ndata.size()); + mock.Allreduce(&ndata[0], ndata.size()); for (size_t i = 0; i < ndata.size(); ++i) { float rsum = 0.0f; for (int r = 0; r < nproc; ++r) { diff --git a/test/test_model_recover.cpp b/test/test_model_recover.cpp index c6d2973ce..c482c266c 100644 --- a/test/test_model_recover.cpp +++ b/test/test_model_recover.cpp @@ -39,7 +39,7 @@ inline void TestMax(test::Mock &mock, Model *model, int ntrial, int iter) { for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % z + model->data[i]; } - mock.AllReduce(&ndata[0], ndata.size()); + mock.Allreduce(&ndata[0], ndata.size()); if (ntrial == iter && rank == 3) { throw MockException(); } @@ -62,7 +62,7 @@ inline void TestSum(test::Mock &mock, Model *model, int ntrial, int iter) { for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % z + model->data[i]; } - mock.AllReduce(&ndata[0], ndata.size()); + mock.Allreduce(&ndata[0], ndata.size()); if (ntrial == iter && rank == 0) { throw MockException(); diff --git a/test/test_recover.cpp b/test/test_recover.cpp index 9dfc7f60a..92aa60918 100644 --- a/test/test_recover.cpp +++ b/test/test_recover.cpp @@ -18,7 +18,7 @@ inline void TestMax(test::Mock &mock, size_t n, int ntrial) { for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % 111; } - mock.AllReduce(&ndata[0], ndata.size()); + mock.Allreduce(&ndata[0], ndata.size()); if (ntrial == 0 && rank == 15) throw MockException(); for (size_t i = 0; i < ndata.size(); ++i) { float rmax = (i * 1) % 111; @@ -38,7 +38,7 @@ inline void TestSum(test::Mock &mock, size_t n, int ntrial) { for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % z; } - mock.AllReduce(&ndata[0], ndata.size()); + mock.Allreduce(&ndata[0], ndata.size()); if (ntrial == 0 && rank == 0) throw MockException();