diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 11f92112c..0e5e0d28e 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -641,6 +641,8 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902 class DaskQuantileDMatrix(DaskDMatrix): + """A dask version of :py:class:`QuantileDMatrix`.""" + @_deprecate_positional_args def __init__( self, diff --git a/src/common/quantile.h b/src/common/quantile.h index 751fb773f..a9955d2a0 100644 --- a/src/common/quantile.h +++ b/src/common/quantile.h @@ -1,5 +1,5 @@ -/*! - * Copyright 2014-2022 by XGBoost Contributors +/** + * Copyright 2014-2023 by XGBoost Contributors * \file quantile.h * \brief util to compute quantiles * \author Tianqi Chen @@ -7,7 +7,6 @@ #ifndef XGBOOST_COMMON_QUANTILE_H_ #define XGBOOST_COMMON_QUANTILE_H_ -#include #include #include diff --git a/src/data/iterative_dmatrix.cc b/src/data/iterative_dmatrix.cc index 8aacca48e..067a60bd3 100644 --- a/src/data/iterative_dmatrix.cc +++ b/src/data/iterative_dmatrix.cc @@ -3,15 +3,20 @@ */ #include "iterative_dmatrix.h" -#include // std::copy +#include // std::copy +#include // std::size_t +#include // std::underlying_type_t +#include // std::vector #include "../collective/communicator-inl.h" #include "../common/categorical.h" // common::IsCat #include "../common/column_matrix.h" -#include "../tree/param.h" // FIXME(jiamingy): Find a better way to share this parameter. +#include "../tree/param.h" // FIXME(jiamingy): Find a better way to share this parameter. #include "gradient_index.h" #include "proxy_dmatrix.h" #include "simple_batch_iterator.h" +#include "xgboost/data.h" // FeatureType +#include "xgboost/logging.h" namespace xgboost { namespace data { @@ -79,6 +84,27 @@ void GetCutsFromRef(std::shared_ptr ref_, bst_feature_t n_features, Bat << "Invalid ref DMatrix, different number of features."; } +namespace { +// Synchronize feature type in case of empty DMatrix +void SyncFeatureType(std::vector* p_h_ft) { + if (!collective::IsDistributed()) { + return; + } + auto& h_ft = *p_h_ft; + auto n_ft = h_ft.size(); + collective::Allreduce(&n_ft, 1); + if (!h_ft.empty()) { + // Check correct size if this is not an empty DMatrix. + CHECK_EQ(h_ft.size(), n_ft); + } + if (n_ft > 0) { + h_ft.resize(n_ft); + auto ptr = reinterpret_cast*>(h_ft.data()); + collective::Allreduce(ptr, h_ft.size()); + } +} +} // anonymous namespace + void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, std::shared_ptr ref) { DMatrixProxy* proxy = MakeProxy(proxy_); @@ -96,13 +122,14 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, return HostAdapterDispatch(proxy, [](auto const& value) { return value.NumCols(); }); }; - std::vector column_sizes; + std::vector column_sizes; auto const is_valid = data::IsValidFunctor{missing}; auto nnz_cnt = [&]() { return HostAdapterDispatch(proxy, [&](auto const& value) { size_t n_threads = ctx_.Threads(); size_t n_features = column_sizes.size(); - linalg::Tensor column_sizes_tloc({n_threads, n_features}, Context::kCpuId); + linalg::Tensor column_sizes_tloc({n_threads, n_features}, Context::kCpuId); + column_sizes_tloc.Data()->Fill(0ul); auto view = column_sizes_tloc.HostView(); common::ParallelFor(value.Size(), n_threads, common::Sched::Static(256), [&](auto i) { auto const& line = value.GetLine(i); @@ -139,7 +166,8 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, if (n_features == 0) { n_features = num_cols(); collective::Allreduce(&n_features, 1); - column_sizes.resize(n_features); + column_sizes.clear(); + column_sizes.resize(n_features, 0); info_.num_col_ = n_features; } else { CHECK_EQ(n_features, num_cols()) << "Inconsistent number of columns."; @@ -166,14 +194,18 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, * Generate quantiles */ accumulated_rows = 0; + std::vector h_ft; if (ref) { GetCutsFromRef(ref, Info().num_col_, batch_param_, &cuts); + h_ft = ref->Info().feature_types.HostVector(); } else { size_t i = 0; while (iter.Next()) { if (!p_sketch) { + h_ft = proxy->Info().feature_types.ConstHostVector(); + SyncFeatureType(&h_ft); p_sketch.reset(new common::HostSketchContainer{ - batch_param_.max_bin, proxy->Info().feature_types.ConstHostSpan(), column_sizes, false, + batch_param_.max_bin, h_ft, column_sizes, false, proxy->Info().data_split_mode == DataSplitMode::kCol, ctx_.Threads()}); } HostAdapterDispatch(proxy, [&](auto const& batch) { @@ -191,6 +223,9 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, CHECK(p_sketch); p_sketch->MakeCuts(&cuts); } + if (!h_ft.empty()) { + CHECK_EQ(h_ft.size(), n_features); + } /** * Generate gradient index. @@ -202,8 +237,7 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, while (iter.Next()) { HostAdapterDispatch(proxy, [&](auto const& batch) { proxy->Info().num_nonzero_ = batch_nnz[i]; - this->ghist_->PushAdapterBatch(&ctx_, rbegin, prev_sum, batch, missing, - proxy->Info().feature_types.ConstHostSpan(), + this->ghist_->PushAdapterBatch(&ctx_, rbegin, prev_sum, batch, missing, h_ft, batch_param_.sparse_thresh, Info().num_row_); }); if (n_batches != 1) { @@ -236,6 +270,8 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing, this->info_.num_col_ = n_features; // proxy might be empty. CHECK_EQ(proxy->Info().labels.Size(), 0); } + + Info().feature_types.HostVector() = h_ft; } BatchSet IterativeDMatrix::GetGradientIndex(BatchParam const& param) { diff --git a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py index 4e61d9023..cf36e92b2 100644 --- a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py +++ b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py @@ -265,6 +265,27 @@ class TestDistributedGPU: ) -> None: run_gpu_hist(params, num_rounds, dataset, dmatrix_type, local_cuda_client) + def test_empty_quantile_dmatrix(self, local_cuda_client: Client) -> None: + client = local_cuda_client + X, y = make_categorical(client, 1, 30, 13) + X_valid, y_valid = make_categorical(client, 10000, 30, 13) + + Xy = xgb.dask.DaskQuantileDMatrix(client, X, y, enable_categorical=True) + Xy_valid = xgb.dask.DaskQuantileDMatrix( + client, X_valid, y_valid, ref=Xy, enable_categorical=True + ) + result = xgb.dask.train( + client, + {"tree_method": "gpu_hist"}, + Xy, + num_boost_round=10, + evals=[(Xy_valid, "Valid")], + ) + predt = xgb.dask.inplace_predict(client, result["booster"], X).compute() + np.testing.assert_allclose(y.compute(), predt) + rmse = result["history"]["Valid"]["rmse"][-1] + assert rmse < 32.0 + @pytest.mark.skipif(**tm.no_cupy()) def test_dask_array(self, local_cuda_client: Client) -> None: run_with_dask_array(dxgb.DaskDMatrix, local_cuda_client) diff --git a/tests/test_distributed/test_with_dask/test_with_dask.py b/tests/test_distributed/test_with_dask/test_with_dask.py index 03f3a3e46..ba76c04db 100644 --- a/tests/test_distributed/test_with_dask/test_with_dask.py +++ b/tests/test_distributed/test_with_dask/test_with_dask.py @@ -96,6 +96,9 @@ def make_categorical( l_n_samples = min( n_samples // n_workers, n_samples - i * (n_samples // n_workers) ) + # make sure there's at least one sample for testing empty DMatrix + if n_samples == 1 and i == 0: + l_n_samples = 1 future = client.submit( pack, n_samples=l_n_samples, @@ -1480,6 +1483,27 @@ class TestWithDask: quantile_hist["Valid"]["rmse"], dmatrix_hist["Valid"]["rmse"] ) + def test_empty_quantile_dmatrix(self, client: Client) -> None: + X, y = make_categorical(client, 2, 30, 13) + X_valid, y_valid = make_categorical(client, 10000, 30, 13) + X_valid, y_valid, _ = deterministic_repartition(client, X_valid, y_valid, None) + + Xy = xgb.dask.DaskQuantileDMatrix(client, X, y, enable_categorical=True) + Xy_valid = xgb.dask.DaskQuantileDMatrix( + client, X_valid, y_valid, ref=Xy, enable_categorical=True + ) + result = xgb.dask.train( + client, + {"tree_method": "hist"}, + Xy, + num_boost_round=10, + evals=[(Xy_valid, "Valid")], + ) + predt = xgb.dask.inplace_predict(client, result["booster"], X).compute() + np.testing.assert_allclose(y.compute(), predt) + rmse = result["history"]["Valid"]["rmse"][-1] + assert rmse < 32.0 + @given(params=hist_parameter_strategy, dataset=tm.dataset_strategy) @settings( deadline=None, max_examples=10, suppress_health_check=suppress, print_blob=True