diff --git a/src/common/quantile.cc b/src/common/quantile.cc index 4712a6938..a7789cb3f 100644 --- a/src/common/quantile.cc +++ b/src/common/quantile.cc @@ -1,13 +1,14 @@ /*! * Copyright 2020-2022 by XGBoost Contributors */ +#include "quantile.h" + #include #include -#include "rabit/rabit.h" -#include "quantile.h" -#include "hist_util.h" #include "categorical.h" +#include "hist_util.h" +#include "rabit/rabit.h" namespace xgboost { namespace common { @@ -15,7 +16,7 @@ namespace common { template SketchContainerImpl::SketchContainerImpl(std::vector columns_size, int32_t max_bins, - common::Span feature_types, + Span feature_types, bool use_group, int32_t n_threads) : feature_types_(feature_types.cbegin(), feature_types.cend()), columns_size_{std::move(columns_size)}, @@ -67,7 +68,7 @@ std::vector SketchContainerImpl::LoadBalance(SparsePage */ auto page = batch.GetView(); size_t const total_entries = page.data.size(); - size_t const entries_per_thread = common::DivRoundUp(total_entries, nthreads); + size_t const entries_per_thread = DivRoundUp(total_entries, nthreads); std::vector> column_sizes(nthreads); for (auto& column : column_sizes) { @@ -213,53 +214,162 @@ void SketchContainerImpl::PushRowPage(SparsePage const &page, MetaInfo monitor_.Stop(__func__); } +namespace { +/** + * \brief A view over gathered sketch values. + */ +template +struct QuantileAllreduce { + common::Span global_values; + common::Span worker_indptr; + common::Span feature_indptr; + size_t n_features{0}; + /** + * \brief Get sketch values of the a feature from a worker. + * + * \param rank rank of target worker + * \param fidx feature idx + */ + auto Values(int32_t rank, bst_feature_t fidx) const { + // get span for worker + auto wsize = worker_indptr[rank + 1] - worker_indptr[rank]; + auto worker_values = global_values.subspan(worker_indptr[rank], wsize); + auto psize = n_features + 1; + auto worker_feat_indptr = feature_indptr.subspan(psize * rank, psize); + // get span for feature + auto feat_beg = worker_feat_indptr[fidx]; + auto feat_size = worker_feat_indptr[fidx + 1] - feat_beg; + return worker_values.subspan(feat_beg, feat_size); + } +}; + +/** + * \brief Merge all categories from other workers. + */ +void AllreduceCategories(Span feature_types, int32_t n_threads, + std::vector> *p_categories) { + auto &categories = *p_categories; + auto world_size = rabit::GetWorldSize(); + auto rank = rabit::GetRank(); + if (world_size == 1) { + return; + } + + // CSC indptr to each feature + std::vector feature_ptr(categories.size() + 1, 0); + for (size_t i = 0; i < categories.size(); ++i) { + auto const &feat = categories[i]; + feature_ptr[i + 1] = feat.size(); + } + std::partial_sum(feature_ptr.begin(), feature_ptr.end(), feature_ptr.begin()); + CHECK_EQ(feature_ptr.front(), 0); + + // gather all feature ptrs from workers + std::vector global_feat_ptrs(feature_ptr.size() * world_size, 0); + size_t feat_begin = rank * feature_ptr.size(); // pointer to current worker + std::copy(feature_ptr.begin(), feature_ptr.end(), global_feat_ptrs.begin() + feat_begin); + rabit::Allreduce(global_feat_ptrs.data(), global_feat_ptrs.size()); + + // move all categories into a flatten vector to prepare for allreduce + size_t total = feature_ptr.back(); + std::vector flatten(total, 0); + auto cursor{flatten.begin()}; + for (auto const &feat : categories) { + cursor = std::copy(feat.cbegin(), feat.cend(), cursor); + } + + // indptr for indexing workers + std::vector global_worker_ptr(world_size + 1, 0); + global_worker_ptr[rank + 1] = total; // shift 1 to right for constructing the indptr + rabit::Allreduce(global_worker_ptr.data(), global_worker_ptr.size()); + std::partial_sum(global_worker_ptr.cbegin(), global_worker_ptr.cend(), global_worker_ptr.begin()); + // total number of categories in all workers with all features + auto gtotal = global_worker_ptr.back(); + + // categories in all workers with all features. + std::vector global_categories(gtotal, 0); + auto rank_begin = global_worker_ptr[rank]; + auto rank_size = global_worker_ptr[rank + 1] - rank_begin; + CHECK_EQ(rank_size, total); + std::copy(flatten.cbegin(), flatten.cend(), global_categories.begin() + rank_begin); + // gather values from all workers. + rabit::Allreduce(global_categories.data(), global_categories.size()); + QuantileAllreduce allreduce_result{global_categories, global_worker_ptr, + global_feat_ptrs, categories.size()}; + ParallelFor(categories.size(), n_threads, [&](auto fidx) { + if (!IsCat(feature_types, fidx)) { + return; + } + for (int32_t r = 0; r < world_size; ++r) { + if (r == rank) { + // continue if it's current worker. + continue; + } + // 1 feature of 1 worker + auto worker_feature = allreduce_result.Values(r, fidx); + for (auto c : worker_feature) { + categories[fidx].emplace(c); + } + } + }); +} +} // anonymous namespace + template void SketchContainerImpl::GatherSketchInfo( std::vector const &reduced, - std::vector *p_worker_segments, - std::vector *p_sketches_scan, + std::vector *p_worker_segments, std::vector *p_sketches_scan, std::vector *p_global_sketches) { - auto& worker_segments = *p_worker_segments; + auto &worker_segments = *p_worker_segments; worker_segments.resize(1, 0); auto world = rabit::GetWorldSize(); auto rank = rabit::GetRank(); auto n_columns = sketches_.size(); + // get the size of each feature. std::vector sketch_size; - for (auto const& sketch : reduced) { - sketch_size.push_back(sketch.size); + for (size_t i = 0; i < reduced.size(); ++i) { + if (IsCat(feature_types_, i)) { + sketch_size.push_back(0); + } else { + sketch_size.push_back(reduced[i].size); + } } - std::vector& sketches_scan = *p_sketches_scan; + // turn the size into CSC indptr + std::vector &sketches_scan = *p_sketches_scan; sketches_scan.resize((n_columns + 1) * world, 0); - size_t beg_scan = rank * (n_columns + 1); - std::partial_sum(sketch_size.cbegin(), sketch_size.cend(), - sketches_scan.begin() + beg_scan + 1); + size_t beg_scan = rank * (n_columns + 1); // starting storage for current worker. + std::partial_sum(sketch_size.cbegin(), sketch_size.cend(), sketches_scan.begin() + beg_scan + 1); + // Gather all column pointers rabit::Allreduce(sketches_scan.data(), sketches_scan.size()); - for (int32_t i = 0; i < world; ++i) { size_t back = (i + 1) * (n_columns + 1) - 1; auto n_entries = sketches_scan.at(back); worker_segments.push_back(n_entries); } // Offset of sketch from each worker. - std::partial_sum(worker_segments.begin(), worker_segments.end(), - worker_segments.begin()); + std::partial_sum(worker_segments.begin(), worker_segments.end(), worker_segments.begin()); CHECK_GE(worker_segments.size(), 1); auto total = worker_segments.back(); - auto& global_sketches = *p_global_sketches; - global_sketches.resize(total, typename WQSketch::Entry{0, 0, 0, 0}); + auto &global_sketches = *p_global_sketches; + global_sketches.resize(total, typename WQSketch::Entry{0, 0, 0, 0}); auto worker_sketch = Span{global_sketches}.subspan( worker_segments[rank], worker_segments[rank + 1] - worker_segments[rank]); - size_t cursor = 0; - for (auto const &sketch : reduced) { - std::copy(sketch.data, sketch.data + sketch.size, - worker_sketch.begin() + cursor); - cursor += sketch.size; + auto cursor{worker_sketch.begin()}; + for (size_t fidx = 0; fidx < reduced.size(); ++fidx) { + auto const &sketch = reduced[fidx]; + if (IsCat(feature_types_, fidx)) { + // nothing to do if it's categorical feature, size is 0 so no need to change cursor + continue; + } else { + cursor = std::copy(sketch.data, sketch.data + sketch.size, cursor); + } } - static_assert(sizeof(typename WQSketch::Entry) / 4 == sizeof(float), ""); + static_assert(sizeof(typename WQSketch::Entry) / 4 == sizeof(float), + "Unexpected size of sketch entry."); rabit::Allreduce( reinterpret_cast(global_sketches.data()), global_sketches.size() * sizeof(typename WQSketch::Entry) / sizeof(float)); @@ -270,6 +380,13 @@ void SketchContainerImpl::AllReduce( std::vector *p_reduced, std::vector* p_num_cuts) { monitor_.Start(__func__); + + size_t n_columns = sketches_.size(); + rabit::Allreduce(&n_columns, 1); + CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers"; + + AllreduceCategories(feature_types_, n_threads_, &categories_); + auto& num_cuts = *p_num_cuts; CHECK_EQ(num_cuts.size(), 0); num_cuts.resize(sketches_.size()); @@ -277,19 +394,19 @@ void SketchContainerImpl::AllReduce( auto &reduced = *p_reduced; reduced.resize(sketches_.size()); - size_t n_columns = sketches_.size(); - rabit::Allreduce(&n_columns, 1); - CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers"; - // Prune the intermediate num cuts for synchronization. std::vector global_column_size(columns_size_); rabit::Allreduce(global_column_size.data(), global_column_size.size()); ParallelFor(sketches_.size(), n_threads_, [&](size_t i) { int32_t intermediate_num_cuts = static_cast( - std::min(global_column_size[i], - static_cast(max_bins_ * WQSketch::kFactor))); - if (global_column_size[i] != 0) { + std::min(global_column_size[i], static_cast(max_bins_ * WQSketch::kFactor))); + if (global_column_size[i] == 0) { + return; + } + if (IsCat(feature_types_, i)) { + intermediate_num_cuts = categories_[i].size(); + } else { typename WQSketch::SummaryContainer out; sketches_[i].GetSummary(&out); reduced[i].Reserve(intermediate_num_cuts); @@ -309,25 +426,21 @@ void SketchContainerImpl::AllReduce( std::vector sketches_scan((n_columns + 1) * world, 0); std::vector global_sketches; - this->GatherSketchInfo(reduced, &worker_segments, &sketches_scan, - &global_sketches); + this->GatherSketchInfo(reduced, &worker_segments, &sketches_scan, &global_sketches); std::vector final_sketches(n_columns); + QuantileAllreduce allreduce_result{global_sketches, worker_segments, + sketches_scan, n_columns}; ParallelFor(n_columns, n_threads_, [&](auto fidx) { int32_t intermediate_num_cuts = num_cuts[fidx]; - auto nbytes = - WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts); + auto nbytes = WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts); + if (IsCat(feature_types_, fidx)) { + return; + } - for (int32_t i = 1; i < world + 1; ++i) { - auto size = worker_segments.at(i) - worker_segments[i - 1]; - auto worker_sketches = - Span{global_sketches}.subspan(worker_segments[i - 1], size); - auto worker_scan = - Span(sketches_scan) - .subspan((i - 1) * (n_columns + 1), (n_columns + 1)); - - auto worker_feature = worker_sketches.subspan( - worker_scan[fidx], worker_scan[fidx + 1] - worker_scan[fidx]); + for (int32_t r = 0; r < world; ++r) { + // 1 feature of 1 worker + auto worker_feature = allreduce_result.Values(r, fidx); CHECK(worker_feature.data()); typename WQSketch::Summary summary(worker_feature.data(), worker_feature.size()); auto &out = final_sketches.at(fidx); diff --git a/tests/cpp/common/test_quantile.cc b/tests/cpp/common/test_quantile.cc index 5fd67f46d..a079fdee0 100644 --- a/tests/cpp/common/test_quantile.cc +++ b/tests/cpp/common/test_quantile.cc @@ -58,10 +58,17 @@ void TestDistributedQuantile(size_t rows, size_t cols) { // Generate cuts for distributed environment. auto sparsity = 0.5f; auto rank = rabit::GetRank(); + std::vector ft(cols); + for (size_t i = 0; i < ft.size(); ++i) { + ft[i] = (i % 2 == 0) ? FeatureType::kNumerical : FeatureType::kCategorical; + } + auto m = RandomDataGenerator{rows, cols, sparsity} .Seed(rank) .Lower(.0f) .Upper(1.0f) + .Type(ft) + .MaxCategory(13) .GenerateDMatrix(); std::vector hessian(rows, 1.0); @@ -95,6 +102,8 @@ void TestDistributedQuantile(size_t rows, size_t cols) { for (auto rank = 0; rank < world; ++rank) { auto m = RandomDataGenerator{rows, cols, sparsity} .Seed(rank) + .Type(ft) + .MaxCategory(13) .Lower(.0f) .Upper(1.0f) .GenerateDMatrix(); @@ -181,8 +190,15 @@ TEST(Quantile, SameOnAllWorkers) { kRows, [=](int32_t seed, size_t n_bins, MetaInfo const &info) { auto rank = rabit::GetRank(); HostDeviceVector storage; + std::vector ft(kCols); + for (size_t i = 0; i < ft.size(); ++i) { + ft[i] = (i % 2 == 0) ? FeatureType::kNumerical : FeatureType::kCategorical; + } + auto m = RandomDataGenerator{kRows, kCols, 0} .Device(0) + .Type(ft) + .MaxCategory(17) .Seed(rank + seed) .GenerateDMatrix(); auto cuts = SketchOnDMatrix(m.get(), n_bins); diff --git a/tests/cpp/helpers.cc b/tests/cpp/helpers.cc index 3a74197d8..eb39faef5 100644 --- a/tests/cpp/helpers.cc +++ b/tests/cpp/helpers.cc @@ -183,7 +183,7 @@ bool IsNear(std::vector::const_iterator _beg1, } SimpleLCG::StateType SimpleLCG::operator()() { - state_ = (alpha_ * state_) % mod_; + state_ = (alpha_ * state_ + (state_ == 0 ? kDefaultInit : 0)) % mod_; return state_; } SimpleLCG::StateType SimpleLCG::Min() const { return min(); } diff --git a/tests/python-gpu/test_gpu_with_dask.py b/tests/python-gpu/test_gpu_with_dask.py index 119a02e57..a412badf3 100644 --- a/tests/python-gpu/test_gpu_with_dask.py +++ b/tests/python-gpu/test_gpu_with_dask.py @@ -1,3 +1,4 @@ +"""Copyright 2019-2022 XGBoost contributors""" import sys import os from typing import Type, TypeVar, Any, Dict, List, Tuple @@ -36,7 +37,8 @@ from test_with_dask import generate_array # noqa from test_with_dask import kCols as random_cols # noqa from test_with_dask import suppress # noqa from test_with_dask import run_tree_stats # noqa - +from test_with_dask import run_categorical # noqa +from test_with_dask import make_categorical # noqa try: @@ -51,49 +53,6 @@ except ImportError: pass -def make_categorical( - client: Client, - n_samples: int, - n_features: int, - n_categories: int, - onehot: bool = False, -) -> Tuple[dd.DataFrame, dd.Series]: - workers = _get_client_workers(client) - n_workers = len(workers) - dfs = [] - - def pack(**kwargs: Any) -> dd.DataFrame: - X, y = tm.make_categorical(**kwargs) - X["label"] = y - return X - - meta = pack( - n_samples=1, n_features=n_features, n_categories=n_categories, onehot=False - ) - - for i, worker in enumerate(workers): - l_n_samples = min( - n_samples // n_workers, n_samples - i * (n_samples // n_workers) - ) - future = client.submit( - pack, - n_samples=l_n_samples, - n_features=n_features, - n_categories=n_categories, - onehot=False, - workers=[worker], - ) - dfs.append(future) - - df = dd.from_delayed(dfs, meta=meta) - y = df["label"] - X = df[df.columns.difference(["label"])] - - if onehot: - return dd.get_dummies(X), y - return X, y - - def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None: import cupy as cp cp.cuda.runtime.setDevice(0) @@ -184,69 +143,12 @@ def test_categorical(local_cuda_cluster: LocalCUDACluster) -> None: with Client(local_cuda_cluster) as client: import dask_cudf - rounds = 10 X, y = make_categorical(client, 10000, 30, 13) X = dask_cudf.from_dask_dataframe(X) X_onehot, _ = make_categorical(client, 10000, 30, 13, True) X_onehot = dask_cudf.from_dask_dataframe(X_onehot) - - parameters = {"tree_method": "gpu_hist"} - - m = dxgb.DaskDMatrix(client, X_onehot, y, enable_categorical=True) - by_etl_results = dxgb.train( - client, - parameters, - m, - num_boost_round=rounds, - evals=[(m, "Train")], - )["history"] - - m = dxgb.DaskDMatrix(client, X, y, enable_categorical=True) - output = dxgb.train( - client, - parameters, - m, - num_boost_round=rounds, - evals=[(m, "Train")], - ) - by_builtin_results = output["history"] - - np.testing.assert_allclose( - np.array(by_etl_results["Train"]["rmse"]), - np.array(by_builtin_results["Train"]["rmse"]), - rtol=1e-3, - ) - assert tm.non_increasing(by_builtin_results["Train"]["rmse"]) - - def check_model_output(model: dxgb.Booster) -> None: - with tempfile.TemporaryDirectory() as tempdir: - path = os.path.join(tempdir, "model.json") - model.save_model(path) - with open(path, "r") as fd: - categorical = json.load(fd) - - categories_sizes = np.array( - categorical["learner"]["gradient_booster"]["model"]["trees"][-1][ - "categories_sizes" - ] - ) - assert categories_sizes.shape[0] != 0 - np.testing.assert_allclose(categories_sizes, 1) - - check_model_output(output["booster"]) - reg = dxgb.DaskXGBRegressor( - enable_categorical=True, n_estimators=10, tree_method="gpu_hist" - ) - reg.fit(X, y) - - check_model_output(reg.get_booster()) - - reg = dxgb.DaskXGBRegressor( - enable_categorical=True, n_estimators=10 - ) - with pytest.raises(ValueError): - reg.fit(X, y) + run_categorical(client, "gpu_hist", X, X_onehot, y) def to_cp(x: Any, DMatrixT: Type) -> Any: diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index 31e006244..48c7a2f3c 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -1,3 +1,4 @@ +"""Copyright 2019-2022 XGBoost contributors""" from pathlib import Path import pickle import testing as tm @@ -64,6 +65,49 @@ def _get_client_workers(client: "Client") -> List[str]: return list(workers.keys()) +def make_categorical( + client: Client, + n_samples: int, + n_features: int, + n_categories: int, + onehot: bool = False, +) -> Tuple[dd.DataFrame, dd.Series]: + workers = _get_client_workers(client) + n_workers = len(workers) + dfs = [] + + def pack(**kwargs: Any) -> dd.DataFrame: + X, y = tm.make_categorical(**kwargs) + X["label"] = y + return X + + meta = pack( + n_samples=1, n_features=n_features, n_categories=n_categories, onehot=False + ) + + for i, worker in enumerate(workers): + l_n_samples = min( + n_samples // n_workers, n_samples - i * (n_samples // n_workers) + ) + future = client.submit( + pack, + n_samples=l_n_samples, + n_features=n_features, + n_categories=n_categories, + onehot=False, + workers=[worker], + ) + dfs.append(future) + + df = dd.from_delayed(dfs, meta=meta) + y = df["label"] + X = df[df.columns.difference(["label"])] + + if onehot: + return dd.get_dummies(X), y + return X, y + + def generate_array( with_weights: bool = False ) -> Tuple[xgb.dask._DaskCollection, xgb.dask._DaskCollection, @@ -173,6 +217,80 @@ def test_dask_sparse(client: "Client") -> None: ) +def run_categorical(client: "Client", tree_method: str, X, X_onehot, y) -> None: + parameters = {"tree_method": tree_method, "max_cat_to_onehot": 9999} # force onehot + rounds = 10 + m = xgb.dask.DaskDMatrix(client, X_onehot, y, enable_categorical=True) + by_etl_results = xgb.dask.train( + client, + parameters, + m, + num_boost_round=rounds, + evals=[(m, "Train")], + )["history"] + + m = xgb.dask.DaskDMatrix(client, X, y, enable_categorical=True) + output = xgb.dask.train( + client, + parameters, + m, + num_boost_round=rounds, + evals=[(m, "Train")], + ) + by_builtin_results = output["history"] + + np.testing.assert_allclose( + np.array(by_etl_results["Train"]["rmse"]), + np.array(by_builtin_results["Train"]["rmse"]), + rtol=1e-3, + ) + assert tm.non_increasing(by_builtin_results["Train"]["rmse"]) + + def check_model_output(model: xgb.dask.Booster) -> None: + with tempfile.TemporaryDirectory() as tempdir: + path = os.path.join(tempdir, "model.json") + model.save_model(path) + with open(path, "r") as fd: + categorical = json.load(fd) + + categories_sizes = np.array( + categorical["learner"]["gradient_booster"]["model"]["trees"][-1][ + "categories_sizes" + ] + ) + assert categories_sizes.shape[0] != 0 + np.testing.assert_allclose(categories_sizes, 1) + + check_model_output(output["booster"]) + reg = xgb.dask.DaskXGBRegressor( + enable_categorical=True, + n_estimators=10, + tree_method=tree_method, + # force onehot + max_cat_to_onehot=9999 + ) + reg.fit(X, y) + + check_model_output(reg.get_booster()) + + reg = xgb.dask.DaskXGBRegressor( + enable_categorical=True, n_estimators=10 + ) + with pytest.raises(ValueError): + reg.fit(X, y) + # check partition based + reg = xgb.dask.DaskXGBRegressor( + enable_categorical=True, n_estimators=10, tree_method=tree_method + ) + reg.fit(X, y, eval_set=[(X, y)]) + assert tm.non_increasing(reg.evals_result()["validation_0"]["rmse"]) + +def test_categorical(client: "Client") -> None: + X, y = make_categorical(client, 10000, 30, 13) + X_onehot, _ = make_categorical(client, 10000, 30, 13, True) + run_categorical(client, "approx", X, X_onehot, y) + + def test_dask_predict_shape_infer(client: "Client") -> None: X, y = make_classification(n_samples=1000, n_informative=5, n_classes=3) X_ = dd.from_array(X, chunksize=100) @@ -1167,7 +1285,7 @@ class TestWithDask: exe: Optional[str] = None for possible_path in {'./testxgboost', './build/testxgboost', - '../build/testxgboost', + '../build/cpubuild/testxgboost', '../cpu-build/testxgboost'}: if os.path.exists(possible_path): exe = possible_path