diff --git a/src/collective/comm.cc b/src/collective/comm.cc index 543ece639..565443fbc 100644 --- a/src/collective/comm.cc +++ b/src/collective/comm.cc @@ -372,7 +372,7 @@ RabitComm::~RabitComm() noexcept(false) { // Tell the error hanlding thread that we are shutting down. TCPSocket err_client; - return Success() << [&] { + auto rc = Success() << [&] { return ConnectTrackerImpl(tracker_, timeout_, retry_, task_id_, &tracker, Rank(), World()); } << [&] { return this->Block(); @@ -403,6 +403,10 @@ RabitComm::~RabitComm() noexcept(false) { // the previous more important steps. return proto::Error{}.SignalShutdown(&err_client); }; + if (!rc.OK()) { + return Fail("Failed to shutdown.", std::move(rc)); + } + return rc; } [[nodiscard]] Result RabitComm::LogTracker(std::string msg) const { diff --git a/tests/cpp/collective/test_worker.h b/tests/cpp/collective/test_worker.h index 243091190..631a225b3 100644 --- a/tests/cpp/collective/test_worker.h +++ b/tests/cpp/collective/test_worker.h @@ -186,6 +186,12 @@ void TestDistributedGlobal(std::int32_t n_workers, WorkerFn worker_fn, bool need system::SocketFinalize(); } +inline std::int32_t GetWorkerLocalThreads(std::int32_t n_workers) { + std::int32_t n_total_threads = std::thread::hardware_concurrency(); + auto n_threads = std::max(n_total_threads / n_workers, 1); + return n_threads; +} + class BaseMGPUTest : public ::testing::Test { public: /** diff --git a/tests/cpp/predictor/test_cpu_predictor.cc b/tests/cpp/predictor/test_cpu_predictor.cc index 637b77b25..c0d2c8e28 100644 --- a/tests/cpp/predictor/test_cpu_predictor.cc +++ b/tests/cpp/predictor/test_cpu_predictor.cc @@ -1,12 +1,9 @@ /** - * Copyright 2017-2023 by XGBoost contributors + * Copyright 2017-2024, XGBoost contributors */ #include #include -#include -#include - #include "../../../src/collective/communicator-inl.h" #include "../../../src/data/adapter.h" #include "../../../src/data/proxy_dmatrix.h" diff --git a/tests/cpp/predictor/test_predictor.cc b/tests/cpp/predictor/test_predictor.cc index fde0e480b..b79b75012 100644 --- a/tests/cpp/predictor/test_predictor.cc +++ b/tests/cpp/predictor/test_predictor.cc @@ -511,15 +511,20 @@ void VerifyIterationRangeColumnSplit(bool use_gpu, Json const &ranged_model, if (use_gpu) { ctx = MakeCUDACtx(common::AllVisibleGPUs() == 1 ? 0 : rank); } + auto n_threads = collective::GetWorkerLocalThreads(world_size); + ctx.UpdateAllowUnknown( + Args{{"nthread", std::to_string(n_threads)}, {"device", ctx.DeviceName()}}); + auto dmat = RandomDataGenerator(rows, cols, 0).GenerateDMatrix(true, true, classes); std::shared_ptr Xy{dmat->SliceCol(world_size, rank)}; std::unique_ptr learner{Learner::Create({Xy})}; - learner->SetParam("device", ctx.DeviceName()); + auto args = Args{{"device", ctx.DeviceName()}, {"nthread", std::to_string(ctx.Threads())}}; + learner->SetParams(args); learner->LoadModel(ranged_model); std::unique_ptr sliced{Learner::Create({Xy})}; - sliced->SetParam("device", ctx.DeviceName()); + sliced->SetParams(args); sliced->LoadModel(sliced_model); HostDeviceVector out_predt_sliced; diff --git a/tests/cpp/test_learner.cc b/tests/cpp/test_learner.cc index c25f684a4..be11a2a76 100644 --- a/tests/cpp/test_learner.cc +++ b/tests/cpp/test_learner.cc @@ -662,13 +662,15 @@ TEST_F(InitBaseScore, UpdateProcess) { this->TestUpdateProcess(); } class TestColumnSplit : public ::testing::TestWithParam { void TestBaseScore(std::string objective, float expected_base_score, Json expected_model) { auto const world_size = collective::GetWorldSize(); + auto n_threads = collective::GetWorkerLocalThreads(world_size); auto const rank = collective::GetRank(); auto p_fmat = MakeFmatForObjTest(objective, 10, 10); std::shared_ptr sliced{p_fmat->SliceCol(world_size, rank)}; std::unique_ptr learner{Learner::Create({sliced})}; - learner->SetParam("tree_method", "approx"); - learner->SetParam("objective", objective); + learner->SetParams(Args{{"nthread", std::to_string(n_threads)}, + {"tree_method", "approx"}, + {"objective", objective}}); if (objective.find("quantile") != std::string::npos) { learner->SetParam("quantile_alpha", "0.5"); } @@ -707,7 +709,9 @@ class TestColumnSplit : public ::testing::TestWithParam { learner->SaveModel(&model); auto constexpr kWorldSize{3}; - auto call = [this, &objective](auto&... args) { TestBaseScore(objective, args...); }; + auto call = [this, &objective](auto&... args) { + this->TestBaseScore(objective, args...); + }; auto score = GetBaseScore(config); collective::TestDistributedGlobal(kWorldSize, [&] { call(score, model); @@ -730,8 +734,10 @@ namespace { Json GetModelWithArgs(std::shared_ptr dmat, std::string const& tree_method, std::string const& device, Args const& args) { std::unique_ptr learner{Learner::Create({dmat})}; + auto n_threads = collective::GetWorkerLocalThreads(collective::GetWorldSize()); learner->SetParam("tree_method", tree_method); learner->SetParam("device", device); + learner->SetParam("nthread", std::to_string(n_threads)); learner->SetParam("objective", "reg:logistic"); learner->SetParams(args); learner->UpdateOneIter(0, dmat); diff --git a/tests/cpp/tree/test_approx.cc b/tests/cpp/tree/test_approx.cc index 83e9243a2..4b45d80c5 100644 --- a/tests/cpp/tree/test_approx.cc +++ b/tests/cpp/tree/test_approx.cc @@ -2,6 +2,11 @@ * Copyright 2021-2024, XGBoost contributors. */ #include +#include // for TreeUpdater + +#include // for transform +#include // for unique_ptr +#include // for vector #include "../../../src/tree/common_row_partitioner.h" #include "../../../src/tree/param.h" // for TrainParam diff --git a/tests/cpp/tree/test_column_split.cc b/tests/cpp/tree/test_column_split.cc new file mode 100644 index 000000000..4b98c6f76 --- /dev/null +++ b/tests/cpp/tree/test_column_split.cc @@ -0,0 +1,67 @@ +/** + * Copyright 2024, XGBoost Contributors + */ +#include "test_column_split.h" + +#include +#include // for RegTree +#include // for TreeUpdater + +#include // for hardware_concurrency +#include // for vector + +#include "../../../src/tree/param.h" // for TrainParam +#include "../collective/test_worker.h" // for TestDistributedGlobal + +namespace xgboost::tree { +void TestColumnSplit(bst_target_t n_targets, bool categorical, std::string name, float sparsity) { + auto constexpr kRows = 32; + auto constexpr kCols = 16; + + RegTree expected_tree{n_targets, static_cast(kCols)}; + ObjInfo task{ObjInfo::kRegression}; + Context ctx; + { + auto p_dmat = GenerateCatDMatrix(kRows, kCols, sparsity, categorical); + auto gpair = GenerateRandomGradients(&ctx, kRows, n_targets); + std::unique_ptr updater{TreeUpdater::Create(name, &ctx, &task)}; + std::vector> position(1); + TrainParam param; + param.Init(Args{}); + updater->Configure(Args{}); + updater->Update(¶m, &gpair, p_dmat.get(), position, {&expected_tree}); + } + + auto constexpr kWorldSize = 2; + + auto verify = [&] { + Context ctx; + ctx.UpdateAllowUnknown( + Args{{"nthread", std::to_string(collective::GetWorkerLocalThreads(kWorldSize))}}); + + auto p_dmat = GenerateCatDMatrix(kRows, kCols, sparsity, categorical); + auto gpair = GenerateRandomGradients(&ctx, kRows, n_targets); + + ObjInfo task{ObjInfo::kRegression}; + std::unique_ptr updater{TreeUpdater::Create(name, &ctx, &task)}; + std::vector> position(1); + + std::unique_ptr sliced{ + p_dmat->SliceCol(collective::GetWorldSize(), collective::GetRank())}; + + RegTree tree{n_targets, static_cast(kCols)}; + TrainParam param; + param.Init(Args{}); + updater->Configure(Args{}); + updater->Update(¶m, &gpair, sliced.get(), position, {&tree}); + + Json json{Object{}}; + tree.SaveModel(&json); + Json expected_json{Object{}}; + expected_tree.SaveModel(&expected_json); + ASSERT_EQ(json, expected_json); + }; + + collective::TestDistributedGlobal(kWorldSize, [&] { verify(); }); +} +} // namespace xgboost::tree diff --git a/tests/cpp/tree/test_column_split.h b/tests/cpp/tree/test_column_split.h index eba452a15..c1edbc811 100644 --- a/tests/cpp/tree/test_column_split.h +++ b/tests/cpp/tree/test_column_split.h @@ -4,15 +4,11 @@ #pragma once #include // for FeatureType, DMatrix -#include // for RegTree -#include // for TreeUpdater #include // for size_t #include // for shared_ptr #include // for vector -#include "../../../src/tree/param.h" // for TrainParam -#include "../collective/test_worker.h" // for TestDistributedGlobal #include "../helpers.h" // for RandomDataGenerator namespace xgboost::tree { @@ -33,51 +29,5 @@ inline std::shared_ptr GenerateCatDMatrix(std::size_t rows, std::size_t } } -inline void TestColumnSplit(bst_target_t n_targets, bool categorical, std::string name, - float sparsity) { - auto constexpr kRows = 32; - auto constexpr kCols = 16; - - RegTree expected_tree{n_targets, static_cast(kCols)}; - ObjInfo task{ObjInfo::kRegression}; - Context ctx; - { - auto p_dmat = GenerateCatDMatrix(kRows, kCols, sparsity, categorical); - auto gpair = GenerateRandomGradients(&ctx, kRows, n_targets); - std::unique_ptr updater{TreeUpdater::Create(name, &ctx, &task)}; - std::vector> position(1); - TrainParam param; - param.Init(Args{}); - updater->Configure(Args{}); - updater->Update(¶m, &gpair, p_dmat.get(), position, {&expected_tree}); - } - - auto verify = [&] { - Context ctx; - auto p_dmat = GenerateCatDMatrix(kRows, kCols, sparsity, categorical); - auto gpair = GenerateRandomGradients(&ctx, kRows, n_targets); - - ObjInfo task{ObjInfo::kRegression}; - std::unique_ptr updater{TreeUpdater::Create(name, &ctx, &task)}; - std::vector> position(1); - - std::unique_ptr sliced{ - p_dmat->SliceCol(collective::GetWorldSize(), collective::GetRank())}; - - RegTree tree{n_targets, static_cast(kCols)}; - TrainParam param; - param.Init(Args{}); - updater->Configure(Args{}); - updater->Update(¶m, &gpair, sliced.get(), position, {&tree}); - - Json json{Object{}}; - tree.SaveModel(&json); - Json expected_json{Object{}}; - expected_tree.SaveModel(&expected_json); - ASSERT_EQ(json, expected_json); - }; - - auto constexpr kWorldSize = 2; - collective::TestDistributedGlobal(kWorldSize, [&] { verify(); }); -} +void TestColumnSplit(bst_target_t n_targets, bool categorical, std::string name, float sparsity); } // namespace xgboost::tree