diff --git a/src/collective/in_memory_handler.cc b/src/collective/in_memory_handler.cc index d8a86ec55..a45fe3e7d 100644 --- a/src/collective/in_memory_handler.cc +++ b/src/collective/in_memory_handler.cc @@ -222,15 +222,15 @@ void InMemoryHandler::Handle(char const* input, std::size_t bytes, std::string* std::unique_lock lock(mutex_); - LOG(INFO) << functor.name << " rank " << rank << ": waiting for current sequence number"; + LOG(DEBUG) << functor.name << " rank " << rank << ": waiting for current sequence number"; cv_.wait(lock, [this, sequence_number] { return sequence_number_ == sequence_number; }); - LOG(INFO) << functor.name << " rank " << rank << ": handling request"; + LOG(DEBUG) << functor.name << " rank " << rank << ": handling request"; functor(input, bytes, &buffer_); received_++; if (received_ == world_size_) { - LOG(INFO) << functor.name << " rank " << rank << ": all requests received"; + LOG(DEBUG) << functor.name << " rank " << rank << ": all requests received"; output->assign(buffer_); sent_++; lock.unlock(); @@ -238,15 +238,15 @@ void InMemoryHandler::Handle(char const* input, std::size_t bytes, std::string* return; } - LOG(INFO) << functor.name << " rank " << rank << ": waiting for all clients"; + LOG(DEBUG) << functor.name << " rank " << rank << ": waiting for all clients"; cv_.wait(lock, [this] { return received_ == world_size_; }); - LOG(INFO) << functor.name << " rank " << rank << ": sending reply"; + LOG(DEBUG) << functor.name << " rank " << rank << ": sending reply"; output->assign(buffer_); sent_++; if (sent_ == world_size_) { - LOG(INFO) << functor.name << " rank " << rank << ": all replies sent"; + LOG(DEBUG) << functor.name << " rank " << rank << ": all replies sent"; sent_ = 0; received_ = 0; buffer_.clear(); diff --git a/src/data/simple_dmatrix.cc b/src/data/simple_dmatrix.cc index ab75cf03e..7855ccb18 100644 --- a/src/data/simple_dmatrix.cc +++ b/src/data/simple_dmatrix.cc @@ -166,7 +166,7 @@ BatchSet SimpleDMatrix::GetGradientIndex(Context const* ctx, } if (!gradient_index_ || detail::RegenGHist(batch_param_, param)) { // GIDX page doesn't exist, generate it - LOG(INFO) << "Generating new Gradient Index."; + LOG(DEBUG) << "Generating new Gradient Index."; // These places can ask for a CSR gidx: // - CPU Hist: the ctx must be on CPU. // - IterativeDMatrix::InitFromCPU: The ctx must be on CPU. diff --git a/tests/cpp/collective/test_in_memory_communicator.cc b/tests/cpp/collective/test_in_memory_communicator.cc index 071005717..f36e30e33 100644 --- a/tests/cpp/collective/test_in_memory_communicator.cc +++ b/tests/cpp/collective/test_in_memory_communicator.cc @@ -26,6 +26,60 @@ class InMemoryCommunicatorTest : public ::testing::Test { static void Allgather(int rank) { InMemoryCommunicator comm{kWorldSize, rank}; + VerifyAllgather(comm, rank); + } + + static void AllreduceMax(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + VerifyAllreduceMax(comm, rank); + } + + static void AllreduceMin(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + VerifyAllreduceMin(comm, rank); + } + + static void AllreduceSum(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + VerifyAllreduceSum(comm); + } + + static void AllreduceBitwiseAND(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + VerifyAllreduceBitwiseAND(comm, rank); + } + + static void AllreduceBitwiseOR(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + VerifyAllreduceBitwiseOR(comm, rank); + } + + static void AllreduceBitwiseXOR(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + VerifyAllreduceBitwiseXOR(comm, rank); + } + + static void Broadcast(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + VerifyBroadcast(comm, rank); + } + + static void Mixture(int rank) { + InMemoryCommunicator comm{kWorldSize, rank}; + for (auto i = 0; i < 5; i++) { + VerifyAllgather(comm, rank); + VerifyAllreduceMax(comm, rank); + VerifyAllreduceMin(comm, rank); + VerifyAllreduceSum(comm); + VerifyAllreduceBitwiseAND(comm, rank); + VerifyAllreduceBitwiseOR(comm, rank); + VerifyAllreduceBitwiseXOR(comm, rank); + VerifyBroadcast(comm, rank); + } + } + + protected: + static void VerifyAllgather(InMemoryCommunicator &comm, int rank) { char buffer[kWorldSize] = {'a', 'b', 'c'}; buffer[rank] = '0' + rank; comm.AllGather(buffer, kWorldSize); @@ -34,8 +88,7 @@ class InMemoryCommunicatorTest : public ::testing::Test { } } - static void AllreduceMax(int rank) { - InMemoryCommunicator comm{kWorldSize, rank}; + static void VerifyAllreduceMax(InMemoryCommunicator &comm, int rank) { int buffer[] = {1 + rank, 2 + rank, 3 + rank, 4 + rank, 5 + rank}; comm.AllReduce(buffer, sizeof(buffer) / sizeof(buffer[0]), DataType::kInt32, Operation::kMax); int expected[] = {3, 4, 5, 6, 7}; @@ -44,8 +97,7 @@ class InMemoryCommunicatorTest : public ::testing::Test { } } - static void AllreduceMin(int rank) { - InMemoryCommunicator comm{kWorldSize, rank}; + static void VerifyAllreduceMin(InMemoryCommunicator &comm, int rank) { int buffer[] = {1 + rank, 2 + rank, 3 + rank, 4 + rank, 5 + rank}; comm.AllReduce(buffer, sizeof(buffer) / sizeof(buffer[0]), DataType::kInt32, Operation::kMin); int expected[] = {1, 2, 3, 4, 5}; @@ -54,8 +106,7 @@ class InMemoryCommunicatorTest : public ::testing::Test { } } - static void AllreduceSum(int rank) { - InMemoryCommunicator comm{kWorldSize, rank}; + static void VerifyAllreduceSum(InMemoryCommunicator &comm) { int buffer[] = {1, 2, 3, 4, 5}; comm.AllReduce(buffer, sizeof(buffer) / sizeof(buffer[0]), DataType::kInt32, Operation::kSum); int expected[] = {3, 6, 9, 12, 15}; @@ -64,16 +115,14 @@ class InMemoryCommunicatorTest : public ::testing::Test { } } - static void AllreduceBitwiseAND(int rank) { - InMemoryCommunicator comm{kWorldSize, rank}; + static void VerifyAllreduceBitwiseAND(InMemoryCommunicator &comm, int rank) { std::bitset<2> original(rank); auto buffer = original.to_ulong(); comm.AllReduce(&buffer, 1, DataType::kUInt32, Operation::kBitwiseAND); EXPECT_EQ(buffer, 0UL); } - static void AllreduceBitwiseOR(int rank) { - InMemoryCommunicator comm{kWorldSize, rank}; + static void VerifyAllreduceBitwiseOR(InMemoryCommunicator &comm, int rank) { std::bitset<2> original(rank); auto buffer = original.to_ulong(); comm.AllReduce(&buffer, 1, DataType::kUInt32, Operation::kBitwiseOR); @@ -82,8 +131,7 @@ class InMemoryCommunicatorTest : public ::testing::Test { EXPECT_EQ(actual, expected); } - static void AllreduceBitwiseXOR(int rank) { - InMemoryCommunicator comm{kWorldSize, rank}; + static void VerifyAllreduceBitwiseXOR(InMemoryCommunicator &comm, int rank) { std::bitset<3> original(rank * 2); auto buffer = original.to_ulong(); comm.AllReduce(&buffer, 1, DataType::kUInt32, Operation::kBitwiseXOR); @@ -92,8 +140,7 @@ class InMemoryCommunicatorTest : public ::testing::Test { EXPECT_EQ(actual, expected); } - static void Broadcast(int rank) { - InMemoryCommunicator comm{kWorldSize, rank}; + static void VerifyBroadcast(InMemoryCommunicator &comm, int rank) { if (rank == 0) { std::string buffer{"hello"}; comm.Broadcast(&buffer[0], buffer.size(), 0); @@ -105,7 +152,6 @@ class InMemoryCommunicatorTest : public ::testing::Test { } } - protected: static int const kWorldSize{3}; }; @@ -173,5 +219,7 @@ TEST_F(InMemoryCommunicatorTest, AllreduceBitwiseXOR) { Verify(&AllreduceBitwise TEST_F(InMemoryCommunicatorTest, Broadcast) { Verify(&Broadcast); } +TEST_F(InMemoryCommunicatorTest, Mixture) { Verify(&Mixture); } + } // namespace collective } // namespace xgboost diff --git a/tests/cpp/helpers.h b/tests/cpp/helpers.h index 3a56bd27f..4103b34df 100644 --- a/tests/cpp/helpers.h +++ b/tests/cpp/helpers.h @@ -497,23 +497,32 @@ inline std::int32_t AllThreadsForTest() { return Context{}.Threads(); } template void RunWithInMemoryCommunicator(int32_t world_size, Function&& function, Args&&... args) { + auto run = [&](auto rank) { + Json config{JsonObject()}; + config["xgboost_communicator"] = String("in-memory"); + config["in_memory_world_size"] = world_size; + config["in_memory_rank"] = rank; + xgboost::collective::Init(config); + + std::forward(function)(std::forward(args)...); + + xgboost::collective::Finalize(); + }; +#if defined(_OPENMP) +#pragma omp parallel num_threads(world_size) + { + auto rank = omp_get_thread_num(); + run(rank); + } +#else std::vector threads; for (auto rank = 0; rank < world_size; rank++) { - threads.emplace_back([&, rank]() { - Json config{JsonObject()}; - config["xgboost_communicator"] = String("in-memory"); - config["in_memory_world_size"] = world_size; - config["in_memory_rank"] = rank; - xgboost::collective::Init(config); - - std::forward(function)(std::forward(args)...); - - xgboost::collective::Finalize(); - }); + threads.emplace_back(run, rank); } for (auto& thread : threads) { thread.join(); } +#endif } class DeclareUnifiedDistributedTest(MetricTest) : public ::testing::Test { diff --git a/tests/cpp/plugin/helpers.h b/tests/cpp/plugin/helpers.h index 0dbdeeca4..67a7d70e2 100644 --- a/tests/cpp/plugin/helpers.h +++ b/tests/cpp/plugin/helpers.h @@ -3,6 +3,7 @@ */ #pragma once +#include #include #include #include @@ -61,24 +62,33 @@ class BaseFederatedTest : public ::testing::Test { template void RunWithFederatedCommunicator(int32_t world_size, std::string const& server_address, Function&& function, Args&&... args) { + auto run = [&](auto rank) { + Json config{JsonObject()}; + config["xgboost_communicator"] = String("federated"); + config["federated_server_address"] = String(server_address); + config["federated_world_size"] = world_size; + config["federated_rank"] = rank; + xgboost::collective::Init(config); + + std::forward(function)(std::forward(args)...); + + xgboost::collective::Finalize(); + }; +#if defined(_OPENMP) +#pragma omp parallel num_threads(world_size) + { + auto rank = omp_get_thread_num(); + run(rank); + } +#else std::vector threads; for (auto rank = 0; rank < world_size; rank++) { - threads.emplace_back([&, rank]() { - Json config{JsonObject()}; - config["xgboost_communicator"] = String("federated"); - config["federated_server_address"] = String(server_address); - config["federated_world_size"] = world_size; - config["federated_rank"] = rank; - xgboost::collective::Init(config); - - std::forward(function)(std::forward(args)...); - - xgboost::collective::Finalize(); - }); + threads.emplace_back(run, rank); } for (auto& thread : threads) { thread.join(); } +#endif } } // namespace xgboost diff --git a/tests/cpp/tree/test_histmaker.cc b/tests/cpp/tree/test_histmaker.cc index 881de57e1..45308457c 100644 --- a/tests/cpp/tree/test_histmaker.cc +++ b/tests/cpp/tree/test_histmaker.cc @@ -90,13 +90,16 @@ void TestColumnSplit(int32_t rows, bst_feature_t cols, RegTree const& expected_t param.Init(Args{}); updater->Update(¶m, p_gradients.get(), sliced.get(), position, {&tree}); - EXPECT_EQ(tree.NumExtraNodes(), 10); - EXPECT_EQ(tree[0].SplitIndex(), 1); + ASSERT_EQ(tree.NumExtraNodes(), 10); + ASSERT_EQ(tree[0].SplitIndex(), 1); - EXPECT_NE(tree[tree[0].LeftChild()].SplitIndex(), 0); - EXPECT_NE(tree[tree[0].RightChild()].SplitIndex(), 0); + ASSERT_NE(tree[tree[0].LeftChild()].SplitIndex(), 0); + ASSERT_NE(tree[tree[0].RightChild()].SplitIndex(), 0); - EXPECT_EQ(tree, expected_tree); + FeatureMap fmap; + auto json = tree.DumpModel(fmap, false, "json"); + auto expected_json = expected_tree.DumpModel(fmap, false, "json"); + ASSERT_EQ(json, expected_json); } } // anonymous namespace