diff --git a/src/allreduce_robust.cc b/src/allreduce_robust.cc index e88e9db30..f2f75c19e 100644 --- a/src/allreduce_robust.cc +++ b/src/allreduce_robust.cc @@ -141,12 +141,17 @@ int AllreduceRobust::LoadCheckPoint(utils::ISerializable *global_model, } // check if we succesful if (RecoverExec(NULL, 0, ActionSummary::kLoadCheck, ActionSummary::kSpecialOp)) { + int nlocal = std::max(static_cast(local_rptr[local_chkpt_version].size()) - 1, 0); if (local_model != NULL) { - // load in local model - utils::MemoryFixSizeBuffer fs(BeginPtr(local_chkpt[local_chkpt_version]), - local_rptr[local_chkpt_version][1]); - local_model->Load(fs); - } + if (nlocal == num_local_replica + 1) { + // load in local model + utils::MemoryFixSizeBuffer fs(BeginPtr(local_chkpt[local_chkpt_version]), + local_rptr[local_chkpt_version][1]); + local_model->Load(fs); + } else { + utils::Assert(nlocal == 0, "[%d] local model inconsistent, nlocal=%d", rank, nlocal); + } + } // reset result buffer resbuf.Clear(); seq_counter = 0; // load from buffer @@ -156,6 +161,8 @@ int AllreduceRobust::LoadCheckPoint(utils::ISerializable *global_model, } else { utils::Assert(fs.Read(&version_number, sizeof(version_number)) != 0, "read in version number"); global_model->Load(fs); + utils::Assert(local_model == NULL || nlocal == num_local_replica + 1, + "local model inconsistent, nlocal=%d", nlocal); } // run another phase of check ack, if recovered from data utils::Assert(RecoverExec(NULL, 0, ActionSummary::kCheckAck, ActionSummary::kSpecialOp), diff --git a/test/test_local_recover.cpp b/test/test_local_recover.cpp index 27d4541a4..106e04ef9 100644 --- a/test/test_local_recover.cpp +++ b/test/test_local_recover.cpp @@ -26,6 +26,7 @@ class Model : public rabit::utils::ISerializable { fo.Write(data); } virtual void InitModel(size_t n, float v) { + data.clear(); data.resize(n, v); } }; @@ -34,13 +35,13 @@ inline void TestMax(test::Mock &mock, Model *model, Model *local, int ntrial, in int rank = rabit::GetRank(); int nproc = rabit::GetWorldSize(); const int z = iter + 111; - + std::vector ndata(model->data.size()); for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % z + local->data[i]; } mock.Allreduce(&ndata[0], ndata.size()); - if (ntrial == iter && rank == 3) { + if (ntrial == iter && rank == 1) { throw MockException(); } for (size_t i = 0; i < ndata.size(); ++i) { @@ -66,11 +67,10 @@ inline void TestSum(test::Mock &mock, Model *model, Model *local, int ntrial, in for (size_t i = 0; i < ndata.size(); ++i) { ndata[i] = (i * (rank+1)) % z + local->data[i]; } - mock.Allreduce(&ndata[0], ndata.size()); - if (ntrial == iter && rank == 0) { - exit(-1); + throw MockException(); } + mock.Allreduce(&ndata[0], ndata.size()); for (size_t i = 0; i < ndata.size(); ++i) { float rsum = 0.0f; @@ -135,9 +135,9 @@ int main(int argc, char *argv[]) { utils::LogPrintf("[%d] !!!TestMax pass, iter=%d\n", rank, r); int step = std::max(nproc / 3, 1); for (int i = 0; i < nproc; i += step) { - TestBcast(mock, n, i, ntrial); + //TestBcast(mock, n, i, ntrial); } - utils::LogPrintf("[%d] !!!TestBcast pass, iter=%d\n", rank, r); + //utils::LogPrintf("[%d] !!!TestBcast pass, iter=%d\n", rank, r); TestSum(mock, &model, &local, ntrial, r); utils::LogPrintf("[%d] !!!TestSum pass, iter=%d\n", rank, r); rabit::CheckPoint(&model, &local);