From ff26cd321233c831ba2e5b9a6619a991c9f88cd6 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Tue, 28 Mar 2023 01:40:26 -0700 Subject: [PATCH 1/3] More tests for column split and vertical federated learning (#8985) Added some more tests for the learner and fit_stump, for both column-wise distributed learning and vertical federated learning. Also moved the `IsRowSplit` and `IsColumnSplit` methods from the `DMatrix` to the `MetaInfo` since in some places we only have access to the `MetaInfo`. Added a new convenience method `IsVerticalFederatedLearning`. Some refactoring of the testing fixtures. --- include/xgboost/data.h | 26 +++++--- src/common/hist_util.cc | 4 +- src/data/data.cc | 6 +- src/data/iterative_dmatrix.cc | 2 +- src/data/simple_dmatrix.cc | 2 +- src/learner.cc | 6 +- src/predictor/cpu_predictor.cc | 2 +- src/tree/fit_stump.cc | 3 +- src/tree/hist/evaluate_splits.h | 2 +- src/tree/updater_approx.cc | 7 +- src/tree/updater_quantile_hist.cc | 9 +-- tests/cpp/helpers.cc | 12 +--- tests/cpp/helpers.h | 5 +- tests/cpp/plugin/helpers.h | 33 ++++++--- tests/cpp/plugin/test_federated_data.cc | 66 ++++++++---------- tests/cpp/plugin/test_federated_learner.cc | 78 ++++++++++++++++++++++ tests/cpp/test_learner.cc | 32 +++++++-- tests/cpp/tree/test_fit_stump.cc | 11 ++- 18 files changed, 212 insertions(+), 94 deletions(-) create mode 100644 tests/cpp/plugin/test_federated_learner.cc diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 57f8a0e36..4af306859 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -180,6 +180,22 @@ class MetaInfo { */ void SynchronizeNumberOfColumns(); + /*! \brief Whether the data is split row-wise. */ + bool IsRowSplit() const { + return data_split_mode == DataSplitMode::kRow; + } + + /*! \brief Whether the data is split column-wise. */ + bool IsColumnSplit() const { + return data_split_mode == DataSplitMode::kCol; + } + + /*! + * \brief A convenient method to check if we are doing vertical federated learning, which requires + * some special processing. + */ + bool IsVerticalFederated() const; + private: void SetInfoFromHost(Context const& ctx, StringView key, Json arr); void SetInfoFromCUDA(Context const& ctx, StringView key, Json arr); @@ -542,16 +558,6 @@ class DMatrix { return Info().num_nonzero_ == Info().num_row_ * Info().num_col_; } - /*! \brief Whether the data is split row-wise. */ - bool IsRowSplit() const { - return Info().data_split_mode == DataSplitMode::kRow; - } - - /*! \brief Whether the data is split column-wise. */ - bool IsColumnSplit() const { - return Info().data_split_mode == DataSplitMode::kCol; - } - /*! * \brief Load DMatrix from URI. * \param uri The URI of input. diff --git a/src/common/hist_util.cc b/src/common/hist_util.cc index 6e83c084e..a99ed4f10 100644 --- a/src/common/hist_util.cc +++ b/src/common/hist_util.cc @@ -46,7 +46,7 @@ HistogramCuts SketchOnDMatrix(DMatrix *m, int32_t max_bins, int32_t n_threads, b if (!use_sorted) { HostSketchContainer container(max_bins, m->Info().feature_types.ConstHostSpan(), reduced, HostSketchContainer::UseGroup(info), - m->IsColumnSplit(), n_threads); + m->Info().IsColumnSplit(), n_threads); for (auto const& page : m->GetBatches()) { container.PushRowPage(page, info, hessian); } @@ -54,7 +54,7 @@ HistogramCuts SketchOnDMatrix(DMatrix *m, int32_t max_bins, int32_t n_threads, b } else { SortedSketchContainer container{max_bins, m->Info().feature_types.ConstHostSpan(), reduced, HostSketchContainer::UseGroup(info), - m->IsColumnSplit(), n_threads}; + m->Info().IsColumnSplit(), n_threads}; for (auto const& page : m->GetBatches()) { container.PushColPage(page, info, hessian); } diff --git a/src/data/data.cc b/src/data/data.cc index 6f5d52817..694bc48b9 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -704,7 +704,7 @@ void MetaInfo::Extend(MetaInfo const& that, bool accumulate_rows, bool check_col } void MetaInfo::SynchronizeNumberOfColumns() { - if (collective::IsFederated() && data_split_mode == DataSplitMode::kCol) { + if (IsVerticalFederated()) { collective::Allreduce(&num_col_, 1); } else { collective::Allreduce(&num_col_, 1); @@ -770,6 +770,10 @@ void MetaInfo::Validate(std::int32_t device) const { void MetaInfo::SetInfoFromCUDA(Context const&, StringView, Json) { common::AssertGPUSupport(); } #endif // !defined(XGBOOST_USE_CUDA) +bool MetaInfo::IsVerticalFederated() const { + return collective::IsFederated() && IsColumnSplit(); +} + using DMatrixThreadLocal = dmlc::ThreadLocalStore>; diff --git a/src/data/iterative_dmatrix.cc b/src/data/iterative_dmatrix.cc index dc6fb55e8..1bf755915 100644 --- a/src/data/iterative_dmatrix.cc +++ b/src/data/iterative_dmatrix.cc @@ -213,7 +213,7 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, SyncFeatureType(&h_ft); p_sketch.reset(new common::HostSketchContainer{ batch_param_.max_bin, h_ft, column_sizes, !proxy->Info().group_ptr_.empty(), - proxy->IsColumnSplit(), ctx_.Threads()}); + proxy->Info().IsColumnSplit(), ctx_.Threads()}); } HostAdapterDispatch(proxy, [&](auto const& batch) { proxy->Info().num_nonzero_ = batch_nnz[i]; diff --git a/src/data/simple_dmatrix.cc b/src/data/simple_dmatrix.cc index 098c3c4f2..e916311a5 100644 --- a/src/data/simple_dmatrix.cc +++ b/src/data/simple_dmatrix.cc @@ -74,7 +74,7 @@ DMatrix* SimpleDMatrix::SliceCol(int num_slices, int slice_id) { } void SimpleDMatrix::ReindexFeatures() { - if (collective::IsFederated() && info_.data_split_mode == DataSplitMode::kCol) { + if (info_.IsVerticalFederated()) { std::vector buffer(collective::GetWorldSize()); buffer[collective::GetRank()] = info_.num_col_; collective::Allgather(buffer.data(), buffer.size() * sizeof(uint64_t)); diff --git a/src/learner.cc b/src/learner.cc index 8808c3392..1150a2355 100644 --- a/src/learner.cc +++ b/src/learner.cc @@ -860,9 +860,9 @@ class LearnerConfiguration : public Learner { void InitEstimation(MetaInfo const& info, linalg::Tensor* base_score) { // Special handling for vertical federated learning. - if (collective::IsFederated() && info.data_split_mode == DataSplitMode::kCol) { + if (info.IsVerticalFederated()) { // We assume labels are only available on worker 0, so the estimation is calculated there - // and added to other workers. + // and broadcast to other workers. if (collective::GetRank() == 0) { UsePtr(obj_)->InitEstimation(info, base_score); collective::Broadcast(base_score->Data()->HostPointer(), @@ -1487,7 +1487,7 @@ class LearnerImpl : public LearnerIO { void GetGradient(HostDeviceVector const& preds, MetaInfo const& info, int iteration, HostDeviceVector* out_gpair) { // Special handling for vertical federated learning. - if (collective::IsFederated() && info.data_split_mode == DataSplitMode::kCol) { + if (info.IsVerticalFederated()) { // We assume labels are only available on worker 0, so the gradients are calculated there // and broadcast to other workers. if (collective::GetRank() == 0) { diff --git a/src/predictor/cpu_predictor.cc b/src/predictor/cpu_predictor.cc index fe6fea02f..2b7a96d9c 100644 --- a/src/predictor/cpu_predictor.cc +++ b/src/predictor/cpu_predictor.cc @@ -605,7 +605,7 @@ class CPUPredictor : public Predictor { protected: void PredictDMatrix(DMatrix *p_fmat, std::vector *out_preds, gbm::GBTreeModel const &model, int32_t tree_begin, int32_t tree_end) const { - if (p_fmat->IsColumnSplit()) { + if (p_fmat->Info().IsColumnSplit()) { ColumnSplitHelper helper(this->ctx_->Threads(), model, tree_begin, tree_end); helper.PredictDMatrix(p_fmat, out_preds); return; diff --git a/src/tree/fit_stump.cc b/src/tree/fit_stump.cc index 5131f9284..55f23b329 100644 --- a/src/tree/fit_stump.cc +++ b/src/tree/fit_stump.cc @@ -45,8 +45,7 @@ void FitStump(Context const* ctx, MetaInfo const& info, } CHECK(h_sum.CContiguous()); - // In vertical federated learning, only worker 0 needs to call this, no need to do an allreduce. - if (!collective::IsFederated() || info.data_split_mode != DataSplitMode::kCol) { + if (info.IsRowSplit()) { collective::Allreduce( reinterpret_cast(h_sum.Values().data()), h_sum.Size() * 2); } diff --git a/src/tree/hist/evaluate_splits.h b/src/tree/hist/evaluate_splits.h index 0a79fbebc..8d13a48af 100644 --- a/src/tree/hist/evaluate_splits.h +++ b/src/tree/hist/evaluate_splits.h @@ -449,7 +449,7 @@ class HistEvaluator { param_{param}, column_sampler_{std::move(sampler)}, tree_evaluator_{*param, static_cast(info.num_col_), Context::kCpuId}, - is_col_split_{info.data_split_mode == DataSplitMode::kCol} { + is_col_split_{info.IsColumnSplit()} { interaction_constraints_.Configure(*param, info.num_col_); column_sampler_->Init(ctx, info.num_col_, info.feature_weights.HostVector(), param_->colsample_bynode, param_->colsample_bylevel, diff --git a/src/tree/updater_approx.cc b/src/tree/updater_approx.cc index d6bc23f44..d22e8f679 100644 --- a/src/tree/updater_approx.cc +++ b/src/tree/updater_approx.cc @@ -72,12 +72,13 @@ class GloablApproxBuilder { } else { CHECK_EQ(n_total_bins, page.cut.TotalBins()); } - partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid, p_fmat->IsColumnSplit()); + partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid, + p_fmat->Info().IsColumnSplit()); n_batches_++; } histogram_builder_.Reset(n_total_bins, BatchSpec(*param_, hess), ctx_->Threads(), n_batches_, - collective::IsDistributed(), p_fmat->IsColumnSplit()); + collective::IsDistributed(), p_fmat->Info().IsColumnSplit()); monitor_->Stop(__func__); } @@ -91,7 +92,7 @@ class GloablApproxBuilder { for (auto const &g : gpair) { root_sum.Add(g); } - if (p_fmat->IsRowSplit()) { + if (p_fmat->Info().IsRowSplit()) { collective::Allreduce(reinterpret_cast(&root_sum), 2); } std::vector nodes{best}; diff --git a/src/tree/updater_quantile_hist.cc b/src/tree/updater_quantile_hist.cc index 8387177aa..4906a21b7 100644 --- a/src/tree/updater_quantile_hist.cc +++ b/src/tree/updater_quantile_hist.cc @@ -158,7 +158,7 @@ class MultiTargetHistBuilder { } else { CHECK_EQ(n_total_bins, page.cut.TotalBins()); } - partitioner_.emplace_back(ctx_, page.Size(), page.base_rowid, p_fmat->IsColumnSplit()); + partitioner_.emplace_back(ctx_, page.Size(), page.base_rowid, p_fmat->Info().IsColumnSplit()); page_id++; } @@ -167,7 +167,7 @@ class MultiTargetHistBuilder { for (std::size_t i = 0; i < n_targets; ++i) { histogram_builder_.emplace_back(); histogram_builder_.back().Reset(n_total_bins, HistBatch(param_), ctx_->Threads(), page_id, - collective::IsDistributed(), p_fmat->IsColumnSplit()); + collective::IsDistributed(), p_fmat->Info().IsColumnSplit()); } evaluator_ = std::make_unique(ctx_, p_fmat->Info(), param_, col_sampler_); @@ -388,11 +388,12 @@ class HistBuilder { } else { CHECK_EQ(n_total_bins, page.cut.TotalBins()); } - partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid, fmat->IsColumnSplit()); + partitioner_.emplace_back(this->ctx_, page.Size(), page.base_rowid, + fmat->Info().IsColumnSplit()); ++page_id; } histogram_builder_->Reset(n_total_bins, HistBatch(param_), ctx_->Threads(), page_id, - collective::IsDistributed(), fmat->IsColumnSplit()); + collective::IsDistributed(), fmat->Info().IsColumnSplit()); evaluator_ = std::make_unique>(ctx_, this->param_, fmat->Info(), col_sampler_); p_last_tree_ = p_tree; diff --git a/tests/cpp/helpers.cc b/tests/cpp/helpers.cc index 27742bf6b..0c0c9fc9f 100644 --- a/tests/cpp/helpers.cc +++ b/tests/cpp/helpers.cc @@ -191,15 +191,9 @@ double GetMultiMetricEval(xgboost::Metric* metric, } namespace xgboost { -bool IsNear(std::vector::const_iterator _beg1, - std::vector::const_iterator _end1, - std::vector::const_iterator _beg2) { - for (auto iter1 = _beg1, iter2 = _beg2; iter1 != _end1; ++iter1, ++iter2) { - if (std::abs(*iter1 - *iter2) > xgboost::kRtEps){ - return false; - } - } - return true; + +float GetBaseScore(Json const &config) { + return std::stof(get(config["learner"]["learner_model_param"]["base_score"])); } SimpleLCG::StateType SimpleLCG::operator()() { diff --git a/tests/cpp/helpers.h b/tests/cpp/helpers.h index 9d820e4b3..025feae3e 100644 --- a/tests/cpp/helpers.h +++ b/tests/cpp/helpers.h @@ -101,9 +101,8 @@ double GetMultiMetricEval(xgboost::Metric* metric, std::vector groups = {}); namespace xgboost { -bool IsNear(std::vector::const_iterator _beg1, - std::vector::const_iterator _end1, - std::vector::const_iterator _beg2); + +float GetBaseScore(Json const &config); /*! * \brief Linear congruential generator. diff --git a/tests/cpp/plugin/helpers.h b/tests/cpp/plugin/helpers.h index 0ac6746f8..7edfc5efc 100644 --- a/tests/cpp/plugin/helpers.h +++ b/tests/cpp/plugin/helpers.h @@ -52,18 +52,33 @@ class BaseFederatedTest : public ::testing::Test { server_thread_->join(); } - void InitCommunicator(int rank) { - Json config{JsonObject()}; - config["xgboost_communicator"] = String("federated"); - config["federated_server_address"] = String(server_address_); - config["federated_world_size"] = kWorldSize; - config["federated_rank"] = rank; - xgboost::collective::Init(config); - } - static int const kWorldSize{3}; std::string server_address_; std::unique_ptr server_thread_; std::unique_ptr server_; }; + +template +void RunWithFederatedCommunicator(int32_t world_size, std::string const& server_address, + Function&& function, Args&&... args) { + std::vector threads; + for (auto rank = 0; rank < world_size; rank++) { + threads.emplace_back([&, rank]() { + Json config{JsonObject()}; + config["xgboost_communicator"] = String("federated"); + config["federated_server_address"] = String(server_address); + config["federated_world_size"] = world_size; + config["federated_rank"] = rank; + xgboost::collective::Init(config); + + std::forward(function)(std::forward(args)...); + + xgboost::collective::Finalize(); + }); + } + for (auto& thread : threads) { + thread.join(); + } +} + } // namespace xgboost diff --git a/tests/cpp/plugin/test_federated_data.cc b/tests/cpp/plugin/test_federated_data.cc index 8ac89e887..ed877131e 100644 --- a/tests/cpp/plugin/test_federated_data.cc +++ b/tests/cpp/plugin/test_federated_data.cc @@ -1,12 +1,9 @@ /*! * Copyright 2023 XGBoost contributors */ -#include #include #include -#include -#include #include #include "../../../plugin/federated/federated_server.h" @@ -17,49 +14,40 @@ namespace xgboost { -class FederatedDataTest : public BaseFederatedTest { - public: - void VerifyLoadUri(int rank) { - InitCommunicator(rank); +class FederatedDataTest : public BaseFederatedTest {}; - size_t constexpr kRows{16}; - size_t const kCols = 8 + rank; +void VerifyLoadUri() { + auto const rank = collective::GetRank(); - dmlc::TemporaryDirectory tmpdir; - std::string path = tmpdir.path + "/small" + std::to_string(rank) + ".csv"; - CreateTestCSV(path, kRows, kCols); + size_t constexpr kRows{16}; + size_t const kCols = 8 + rank; - std::unique_ptr dmat; - std::string uri = path + "?format=csv"; - dmat.reset(DMatrix::Load(uri, false, DataSplitMode::kCol)); + dmlc::TemporaryDirectory tmpdir; + std::string path = tmpdir.path + "/small" + std::to_string(rank) + ".csv"; + CreateTestCSV(path, kRows, kCols); - ASSERT_EQ(dmat->Info().num_col_, 8 * kWorldSize + 3); - ASSERT_EQ(dmat->Info().num_row_, kRows); + std::unique_ptr dmat; + std::string uri = path + "?format=csv"; + dmat.reset(DMatrix::Load(uri, false, DataSplitMode::kCol)); - for (auto const& page : dmat->GetBatches()) { - auto entries = page.GetView().data; - auto index = 0; - int offsets[] = {0, 8, 17}; - int offset = offsets[rank]; - for (auto row = 0; row < kRows; row++) { - for (auto col = 0; col < kCols; col++) { - EXPECT_EQ(entries[index].index, col + offset); - index++; - } + ASSERT_EQ(dmat->Info().num_col_, 8 * collective::GetWorldSize() + 3); + ASSERT_EQ(dmat->Info().num_row_, kRows); + + for (auto const& page : dmat->GetBatches()) { + auto entries = page.GetView().data; + auto index = 0; + int offsets[] = {0, 8, 17}; + int offset = offsets[rank]; + for (auto row = 0; row < kRows; row++) { + for (auto col = 0; col < kCols; col++) { + EXPECT_EQ(entries[index].index, col + offset); + index++; } } - - xgboost::collective::Finalize(); - } -}; - -TEST_F(FederatedDataTest, LoadUri) { - std::vector threads; - for (auto rank = 0; rank < kWorldSize; rank++) { - threads.emplace_back(&FederatedDataTest_LoadUri_Test::VerifyLoadUri, this, rank); - } - for (auto& thread : threads) { - thread.join(); } } + +TEST_F(FederatedDataTest, LoadUri) { + RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyLoadUri); +} } // namespace xgboost diff --git a/tests/cpp/plugin/test_federated_learner.cc b/tests/cpp/plugin/test_federated_learner.cc new file mode 100644 index 000000000..67e322323 --- /dev/null +++ b/tests/cpp/plugin/test_federated_learner.cc @@ -0,0 +1,78 @@ +/*! + * Copyright 2023 XGBoost contributors + */ +#include +#include +#include +#include + +#include "../../../plugin/federated/federated_server.h" +#include "../../../src/collective/communicator-inl.h" +#include "../helpers.h" +#include "helpers.h" + +namespace xgboost { + +class FederatedLearnerTest : public BaseFederatedTest { + protected: + static auto constexpr kRows{16}; + static auto constexpr kCols{16}; +}; + +void VerifyBaseScore(size_t rows, size_t cols, float expected_base_score) { + auto const world_size = collective::GetWorldSize(); + auto const rank = collective::GetRank(); + std::shared_ptr Xy_{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)}; + std::shared_ptr sliced{Xy_->SliceCol(world_size, rank)}; + std::unique_ptr learner{Learner::Create({sliced})}; + learner->SetParam("tree_method", "approx"); + learner->SetParam("objective", "binary:logistic"); + learner->UpdateOneIter(0, sliced); + Json config{Object{}}; + learner->SaveConfig(&config); + auto base_score = GetBaseScore(config); + ASSERT_EQ(base_score, expected_base_score); +} + +void VerifyModel(size_t rows, size_t cols, Json const& expected_model) { + auto const world_size = collective::GetWorldSize(); + auto const rank = collective::GetRank(); + std::shared_ptr Xy_{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)}; + std::shared_ptr sliced{Xy_->SliceCol(world_size, rank)}; + std::unique_ptr learner{Learner::Create({sliced})}; + learner->SetParam("tree_method", "approx"); + learner->SetParam("objective", "binary:logistic"); + learner->UpdateOneIter(0, sliced); + Json model{Object{}}; + learner->SaveModel(&model); + ASSERT_EQ(model, expected_model); +} + +TEST_F(FederatedLearnerTest, BaseScore) { + std::shared_ptr Xy_{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)}; + std::unique_ptr learner{Learner::Create({Xy_})}; + learner->SetParam("tree_method", "approx"); + learner->SetParam("objective", "binary:logistic"); + learner->UpdateOneIter(0, Xy_); + Json config{Object{}}; + learner->SaveConfig(&config); + auto base_score = GetBaseScore(config); + ASSERT_NE(base_score, ObjFunction::DefaultBaseScore()); + + RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyBaseScore, kRows, kCols, + base_score); +} + +TEST_F(FederatedLearnerTest, Model) { + std::shared_ptr Xy_{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)}; + std::unique_ptr learner{Learner::Create({Xy_})}; + learner->SetParam("tree_method", "approx"); + learner->SetParam("objective", "binary:logistic"); + learner->UpdateOneIter(0, Xy_); + Json model{Object{}}; + learner->SaveModel(&model); + + RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyModel, kRows, kCols, + std::cref(model)); +} +} // namespace xgboost diff --git a/tests/cpp/test_learner.cc b/tests/cpp/test_learner.cc index 27bfbf21e..e4313125d 100644 --- a/tests/cpp/test_learner.cc +++ b/tests/cpp/test_learner.cc @@ -460,10 +460,6 @@ class InitBaseScore : public ::testing::Test { void SetUp() override { Xy_ = RandomDataGenerator{10, Cols(), 0}.GenerateDMatrix(true); } - static float GetBaseScore(Json const &config) { - return std::stof(get(config["learner"]["learner_model_param"]["base_score"])); - } - public: void TestUpdateConfig() { std::unique_ptr learner{Learner::Create({Xy_})}; @@ -611,4 +607,32 @@ TEST_F(InitBaseScore, InitAfterLoad) { this->TestInitAfterLoad(); } TEST_F(InitBaseScore, InitWithPredict) { this->TestInitWithPredt(); } TEST_F(InitBaseScore, UpdateProcess) { this->TestUpdateProcess(); } + +void TestColumnSplitBaseScore(std::shared_ptr Xy_, float expected_base_score) { + auto const world_size = collective::GetWorldSize(); + auto const rank = collective::GetRank(); + std::shared_ptr sliced{Xy_->SliceCol(world_size, rank)}; + std::unique_ptr learner{Learner::Create({sliced})}; + learner->SetParam("tree_method", "approx"); + learner->SetParam("objective", "binary:logistic"); + learner->UpdateOneIter(0, sliced); + Json config{Object{}}; + learner->SaveConfig(&config); + auto base_score = GetBaseScore(config); + ASSERT_EQ(base_score, expected_base_score); +} + +TEST_F(InitBaseScore, ColumnSplit) { + std::unique_ptr learner{Learner::Create({Xy_})}; + learner->SetParam("tree_method", "approx"); + learner->SetParam("objective", "binary:logistic"); + learner->UpdateOneIter(0, Xy_); + Json config{Object{}}; + learner->SaveConfig(&config); + auto base_score = GetBaseScore(config); + ASSERT_NE(base_score, ObjFunction::DefaultBaseScore()); + + auto constexpr kWorldSize{3}; + RunWithInMemoryCommunicator(kWorldSize, &TestColumnSplitBaseScore, Xy_, base_score); +} } // namespace xgboost diff --git a/tests/cpp/tree/test_fit_stump.cc b/tests/cpp/tree/test_fit_stump.cc index 35a6af994..18511c3a0 100644 --- a/tests/cpp/tree/test_fit_stump.cc +++ b/tests/cpp/tree/test_fit_stump.cc @@ -6,11 +6,12 @@ #include "../../src/common/linalg_op.h" #include "../../src/tree/fit_stump.h" +#include "../helpers.h" namespace xgboost { namespace tree { namespace { -void TestFitStump(Context const *ctx) { +void TestFitStump(Context const *ctx, DataSplitMode split = DataSplitMode::kRow) { std::size_t constexpr kRows = 16, kTargets = 2; HostDeviceVector gpair; auto &h_gpair = gpair.HostVector(); @@ -22,6 +23,7 @@ void TestFitStump(Context const *ctx) { } linalg::Vector out; MetaInfo info; + info.data_split_mode = split; FitStump(ctx, info, gpair, kTargets, &out); auto h_out = out.HostView(); for (auto it = linalg::cbegin(h_out); it != linalg::cend(h_out); ++it) { @@ -45,5 +47,12 @@ TEST(InitEstimation, GPUFitStump) { TestFitStump(&ctx); } #endif // defined(XGBOOST_USE_CUDA) + +TEST(InitEstimation, FitStumpColumnSplit) { + Context ctx; + auto constexpr kWorldSize{3}; + RunWithInMemoryCommunicator(kWorldSize, &TestFitStump, &ctx, DataSplitMode::kCol); +} + } // namespace tree } // namespace xgboost From 6676c28cbcc870af2639bd0591ac854168d8086c Mon Sep 17 00:00:00 2001 From: Philip Hyunsu Cho Date: Tue, 28 Mar 2023 21:32:54 -0700 Subject: [PATCH 2/3] [CI] Fix Windows wheel to be compatible with Poetry (#8991) * [CI] Fix Windows wheel to be compatible with Poetry * Typo * Eagerly scan globs to avoid patching same file twice --- tests/ci_build/insert_vcomp140.py | 109 ++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 13 deletions(-) diff --git a/tests/ci_build/insert_vcomp140.py b/tests/ci_build/insert_vcomp140.py index 938817415..cfa8d792d 100644 --- a/tests/ci_build/insert_vcomp140.py +++ b/tests/ci_build/insert_vcomp140.py @@ -1,19 +1,102 @@ +import argparse +import base64 import glob +import hashlib +import os +import pathlib import re -import sys -import zipfile +import shutil +import tempfile -if len(sys.argv) != 2: - print('Usage: {} [wheel]'.format(sys.argv[0])) - sys.exit(1) +VCOMP140_PATH = "C:\\Windows\\System32\\vcomp140.dll" -vcomp140_path = 'C:\\Windows\\System32\\vcomp140.dll' -for wheel_path in sorted(glob.glob(sys.argv[1])): - m = re.search(r'xgboost-(.*)-py3', wheel_path) - assert m, f'wheel_path = {wheel_path}' - version = m.group(1) +def get_sha256sum(path): + return ( + base64.urlsafe_b64encode(hashlib.sha256(open(path, "rb").read()).digest()) + .decode("latin1") + .rstrip("=") + ) - print(f"Inserting vcomp140.dll into {wheel_path}...") - with zipfile.ZipFile(wheel_path, 'a') as f: - f.write(vcomp140_path, 'xgboost-{}.data/data/xgboost/vcomp140.dll'.format(version)) + +def update_record(*, wheel_content_dir, xgboost_version): + vcomp140_size = os.path.getsize(VCOMP140_PATH) + vcomp140_hash = get_sha256sum(VCOMP140_PATH) + + record_path = wheel_content_dir / pathlib.Path( + f"xgboost-{xgboost_version}.dist-info/RECORD" + ) + with open(record_path, "r") as f: + record_content = f.read() + record_content += f"xgboost-{xgboost_version}.data/data/xgboost/vcomp140.dll," + record_content += f"sha256={vcomp140_hash},{vcomp140_size}\n" + with open(record_path, "w") as f: + f.write(record_content) + + +def main(args): + candidates = list(sorted(glob.glob(args.wheel_path))) + for wheel_path in candidates: + print(f"Processing wheel {wheel_path}") + m = re.search(r"xgboost-(.*)\+.*-py3", wheel_path) + if not m: + raise ValueError(f"Wheel {wheel_path} has unexpected name") + version = m.group(1) + print(f" Detected version for {wheel_path}: {version}") + print(f" Inserting vcomp140.dll into {wheel_path}...") + with tempfile.TemporaryDirectory() as tempdir: + wheel_content_dir = pathlib.Path(tempdir) / "wheel_content" + print(f" Extract {wheel_path} into {wheel_content_dir}") + shutil.unpack_archive( + wheel_path, extract_dir=wheel_content_dir, format="zip" + ) + data_dir = wheel_content_dir / pathlib.Path( + f"xgboost-{version}.data/data/xgboost" + ) + data_dir.mkdir(parents=True, exist_ok=True) + + print(f" Copy {VCOMP140_PATH} -> {data_dir}") + shutil.copy(VCOMP140_PATH, data_dir) + + print(f" Update RECORD") + update_record(wheel_content_dir=wheel_content_dir, xgboost_version=version) + + print(f" Content of {wheel_content_dir}:") + for e in sorted(wheel_content_dir.rglob("*")): + if e.is_file(): + r = e.relative_to(wheel_content_dir) + print(f" {r}") + + print(f" Create new wheel...") + new_wheel_tmp_path = pathlib.Path(tempdir) / "new_wheel" + shutil.make_archive( + str(new_wheel_tmp_path.resolve()), + format="zip", + root_dir=wheel_content_dir, + ) + new_wheel_tmp_path = new_wheel_tmp_path.resolve().with_suffix(".zip") + new_wheel_tmp_path = new_wheel_tmp_path.rename( + new_wheel_tmp_path.with_suffix(".whl") + ) + print(f" Created new wheel {new_wheel_tmp_path}") + + # Rename the old wheel with suffix .bak + # The new wheel takes the name of the old wheel + wheel_path_obj = pathlib.Path(wheel_path).resolve() + backup_path = wheel_path_obj.with_suffix(".whl.bak") + print(f" Rename {wheel_path_obj} -> {backup_path}") + wheel_path_obj.replace(backup_path) + print(f" Rename {new_wheel_tmp_path} -> {wheel_path_obj}") + new_wheel_tmp_path.replace(wheel_path_obj) + + shutil.rmtree(wheel_content_dir) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "wheel_path", type=str, help="Path to wheel (wildcard permitted)" + ) + args = parser.parse_args() + + main(args) From a58055075b5fa7c432a3c0a2a177ffecb6889b9f Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Thu, 30 Mar 2023 03:16:18 +0800 Subject: [PATCH 3/3] [dask] Return the first valid booster instead of all valid ones. (#8993) * [dask] Return the first valid booster instead of all valid ones. - Reduce memory footprint of the returned model. * mypy error. * lint. * duplicated. --- python-package/xgboost/dask.py | 35 ++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index a17fbad70..8c679b75b 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -888,6 +888,29 @@ def _get_workers_from_data( return list(X_worker_map) +def _filter_empty( + booster: Booster, local_history: TrainingCallback.EvalsLog, is_valid: bool +) -> Optional[TrainReturnT]: + n_workers = collective.get_world_size() + non_empty = numpy.zeros(shape=(n_workers,), dtype=numpy.int32) + rank = collective.get_rank() + non_empty[rank] = int(is_valid) + non_empty = collective.allreduce(non_empty, collective.Op.SUM) + non_empty = non_empty.astype(bool) + ret: Optional[TrainReturnT] = { + "booster": booster, + "history": local_history, + } + for i in range(non_empty.size): + # This is the first valid worker + if non_empty[i] and i == rank: + return ret + if non_empty[i]: + return None + + raise ValueError("None of the workers can provide a valid result.") + + async def _train_async( client: "distributed.Client", global_config: Dict[str, Any], @@ -973,14 +996,10 @@ async def _train_async( xgb_model=xgb_model, callbacks=callbacks, ) - if Xy.num_row() != 0: - ret: Optional[TrainReturnT] = { - "booster": booster, - "history": local_history, - } - else: - ret = None - return ret + # Don't return the boosters from empty workers. It's quite difficult to + # guarantee everything is in sync in the present of empty workers, + # especially with complex objectives like quantile. + return _filter_empty(booster, local_history, Xy.num_row() != 0) async with distributed.MultiLock(workers, client): if evals is not None: