Fix empty DMatrix with categorical features. (#8739)
This commit is contained in:
parent
7214a45e83
commit
a2e433a089
@ -641,6 +641,8 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902
|
|||||||
|
|
||||||
|
|
||||||
class DaskQuantileDMatrix(DaskDMatrix):
|
class DaskQuantileDMatrix(DaskDMatrix):
|
||||||
|
"""A dask version of :py:class:`QuantileDMatrix`."""
|
||||||
|
|
||||||
@_deprecate_positional_args
|
@_deprecate_positional_args
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*!
|
/**
|
||||||
* Copyright 2014-2022 by XGBoost Contributors
|
* Copyright 2014-2023 by XGBoost Contributors
|
||||||
* \file quantile.h
|
* \file quantile.h
|
||||||
* \brief util to compute quantiles
|
* \brief util to compute quantiles
|
||||||
* \author Tianqi Chen
|
* \author Tianqi Chen
|
||||||
@ -7,7 +7,6 @@
|
|||||||
#ifndef XGBOOST_COMMON_QUANTILE_H_
|
#ifndef XGBOOST_COMMON_QUANTILE_H_
|
||||||
#define XGBOOST_COMMON_QUANTILE_H_
|
#define XGBOOST_COMMON_QUANTILE_H_
|
||||||
|
|
||||||
#include <dmlc/base.h>
|
|
||||||
#include <xgboost/data.h>
|
#include <xgboost/data.h>
|
||||||
#include <xgboost/logging.h>
|
#include <xgboost/logging.h>
|
||||||
|
|
||||||
|
|||||||
@ -3,15 +3,20 @@
|
|||||||
*/
|
*/
|
||||||
#include "iterative_dmatrix.h"
|
#include "iterative_dmatrix.h"
|
||||||
|
|
||||||
#include <algorithm> // std::copy
|
#include <algorithm> // std::copy
|
||||||
|
#include <cstddef> // std::size_t
|
||||||
|
#include <type_traits> // std::underlying_type_t
|
||||||
|
#include <vector> // std::vector
|
||||||
|
|
||||||
#include "../collective/communicator-inl.h"
|
#include "../collective/communicator-inl.h"
|
||||||
#include "../common/categorical.h" // common::IsCat
|
#include "../common/categorical.h" // common::IsCat
|
||||||
#include "../common/column_matrix.h"
|
#include "../common/column_matrix.h"
|
||||||
#include "../tree/param.h" // FIXME(jiamingy): Find a better way to share this parameter.
|
#include "../tree/param.h" // FIXME(jiamingy): Find a better way to share this parameter.
|
||||||
#include "gradient_index.h"
|
#include "gradient_index.h"
|
||||||
#include "proxy_dmatrix.h"
|
#include "proxy_dmatrix.h"
|
||||||
#include "simple_batch_iterator.h"
|
#include "simple_batch_iterator.h"
|
||||||
|
#include "xgboost/data.h" // FeatureType
|
||||||
|
#include "xgboost/logging.h"
|
||||||
|
|
||||||
namespace xgboost {
|
namespace xgboost {
|
||||||
namespace data {
|
namespace data {
|
||||||
@ -79,6 +84,27 @@ void GetCutsFromRef(std::shared_ptr<DMatrix> ref_, bst_feature_t n_features, Bat
|
|||||||
<< "Invalid ref DMatrix, different number of features.";
|
<< "Invalid ref DMatrix, different number of features.";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
// Synchronize feature type in case of empty DMatrix
|
||||||
|
void SyncFeatureType(std::vector<FeatureType>* p_h_ft) {
|
||||||
|
if (!collective::IsDistributed()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto& h_ft = *p_h_ft;
|
||||||
|
auto n_ft = h_ft.size();
|
||||||
|
collective::Allreduce<collective::Operation::kMax>(&n_ft, 1);
|
||||||
|
if (!h_ft.empty()) {
|
||||||
|
// Check correct size if this is not an empty DMatrix.
|
||||||
|
CHECK_EQ(h_ft.size(), n_ft);
|
||||||
|
}
|
||||||
|
if (n_ft > 0) {
|
||||||
|
h_ft.resize(n_ft);
|
||||||
|
auto ptr = reinterpret_cast<std::underlying_type_t<FeatureType>*>(h_ft.data());
|
||||||
|
collective::Allreduce<collective::Operation::kMax>(ptr, h_ft.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
||||||
std::shared_ptr<DMatrix> ref) {
|
std::shared_ptr<DMatrix> ref) {
|
||||||
DMatrixProxy* proxy = MakeProxy(proxy_);
|
DMatrixProxy* proxy = MakeProxy(proxy_);
|
||||||
@ -96,13 +122,14 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
|||||||
return HostAdapterDispatch(proxy, [](auto const& value) { return value.NumCols(); });
|
return HostAdapterDispatch(proxy, [](auto const& value) { return value.NumCols(); });
|
||||||
};
|
};
|
||||||
|
|
||||||
std::vector<size_t> column_sizes;
|
std::vector<std::size_t> column_sizes;
|
||||||
auto const is_valid = data::IsValidFunctor{missing};
|
auto const is_valid = data::IsValidFunctor{missing};
|
||||||
auto nnz_cnt = [&]() {
|
auto nnz_cnt = [&]() {
|
||||||
return HostAdapterDispatch(proxy, [&](auto const& value) {
|
return HostAdapterDispatch(proxy, [&](auto const& value) {
|
||||||
size_t n_threads = ctx_.Threads();
|
size_t n_threads = ctx_.Threads();
|
||||||
size_t n_features = column_sizes.size();
|
size_t n_features = column_sizes.size();
|
||||||
linalg::Tensor<size_t, 2> column_sizes_tloc({n_threads, n_features}, Context::kCpuId);
|
linalg::Tensor<std::size_t, 2> column_sizes_tloc({n_threads, n_features}, Context::kCpuId);
|
||||||
|
column_sizes_tloc.Data()->Fill(0ul);
|
||||||
auto view = column_sizes_tloc.HostView();
|
auto view = column_sizes_tloc.HostView();
|
||||||
common::ParallelFor(value.Size(), n_threads, common::Sched::Static(256), [&](auto i) {
|
common::ParallelFor(value.Size(), n_threads, common::Sched::Static(256), [&](auto i) {
|
||||||
auto const& line = value.GetLine(i);
|
auto const& line = value.GetLine(i);
|
||||||
@ -139,7 +166,8 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
|||||||
if (n_features == 0) {
|
if (n_features == 0) {
|
||||||
n_features = num_cols();
|
n_features = num_cols();
|
||||||
collective::Allreduce<collective::Operation::kMax>(&n_features, 1);
|
collective::Allreduce<collective::Operation::kMax>(&n_features, 1);
|
||||||
column_sizes.resize(n_features);
|
column_sizes.clear();
|
||||||
|
column_sizes.resize(n_features, 0);
|
||||||
info_.num_col_ = n_features;
|
info_.num_col_ = n_features;
|
||||||
} else {
|
} else {
|
||||||
CHECK_EQ(n_features, num_cols()) << "Inconsistent number of columns.";
|
CHECK_EQ(n_features, num_cols()) << "Inconsistent number of columns.";
|
||||||
@ -166,14 +194,18 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
|||||||
* Generate quantiles
|
* Generate quantiles
|
||||||
*/
|
*/
|
||||||
accumulated_rows = 0;
|
accumulated_rows = 0;
|
||||||
|
std::vector<FeatureType> h_ft;
|
||||||
if (ref) {
|
if (ref) {
|
||||||
GetCutsFromRef(ref, Info().num_col_, batch_param_, &cuts);
|
GetCutsFromRef(ref, Info().num_col_, batch_param_, &cuts);
|
||||||
|
h_ft = ref->Info().feature_types.HostVector();
|
||||||
} else {
|
} else {
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
while (iter.Next()) {
|
while (iter.Next()) {
|
||||||
if (!p_sketch) {
|
if (!p_sketch) {
|
||||||
|
h_ft = proxy->Info().feature_types.ConstHostVector();
|
||||||
|
SyncFeatureType(&h_ft);
|
||||||
p_sketch.reset(new common::HostSketchContainer{
|
p_sketch.reset(new common::HostSketchContainer{
|
||||||
batch_param_.max_bin, proxy->Info().feature_types.ConstHostSpan(), column_sizes, false,
|
batch_param_.max_bin, h_ft, column_sizes, false,
|
||||||
proxy->Info().data_split_mode == DataSplitMode::kCol, ctx_.Threads()});
|
proxy->Info().data_split_mode == DataSplitMode::kCol, ctx_.Threads()});
|
||||||
}
|
}
|
||||||
HostAdapterDispatch(proxy, [&](auto const& batch) {
|
HostAdapterDispatch(proxy, [&](auto const& batch) {
|
||||||
@ -191,6 +223,9 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
|||||||
CHECK(p_sketch);
|
CHECK(p_sketch);
|
||||||
p_sketch->MakeCuts(&cuts);
|
p_sketch->MakeCuts(&cuts);
|
||||||
}
|
}
|
||||||
|
if (!h_ft.empty()) {
|
||||||
|
CHECK_EQ(h_ft.size(), n_features);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate gradient index.
|
* Generate gradient index.
|
||||||
@ -202,8 +237,7 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
|||||||
while (iter.Next()) {
|
while (iter.Next()) {
|
||||||
HostAdapterDispatch(proxy, [&](auto const& batch) {
|
HostAdapterDispatch(proxy, [&](auto const& batch) {
|
||||||
proxy->Info().num_nonzero_ = batch_nnz[i];
|
proxy->Info().num_nonzero_ = batch_nnz[i];
|
||||||
this->ghist_->PushAdapterBatch(&ctx_, rbegin, prev_sum, batch, missing,
|
this->ghist_->PushAdapterBatch(&ctx_, rbegin, prev_sum, batch, missing, h_ft,
|
||||||
proxy->Info().feature_types.ConstHostSpan(),
|
|
||||||
batch_param_.sparse_thresh, Info().num_row_);
|
batch_param_.sparse_thresh, Info().num_row_);
|
||||||
});
|
});
|
||||||
if (n_batches != 1) {
|
if (n_batches != 1) {
|
||||||
@ -236,6 +270,8 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
|
|||||||
this->info_.num_col_ = n_features; // proxy might be empty.
|
this->info_.num_col_ = n_features; // proxy might be empty.
|
||||||
CHECK_EQ(proxy->Info().labels.Size(), 0);
|
CHECK_EQ(proxy->Info().labels.Size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Info().feature_types.HostVector() = h_ft;
|
||||||
}
|
}
|
||||||
|
|
||||||
BatchSet<GHistIndexMatrix> IterativeDMatrix::GetGradientIndex(BatchParam const& param) {
|
BatchSet<GHistIndexMatrix> IterativeDMatrix::GetGradientIndex(BatchParam const& param) {
|
||||||
|
|||||||
@ -265,6 +265,27 @@ class TestDistributedGPU:
|
|||||||
) -> None:
|
) -> None:
|
||||||
run_gpu_hist(params, num_rounds, dataset, dmatrix_type, local_cuda_client)
|
run_gpu_hist(params, num_rounds, dataset, dmatrix_type, local_cuda_client)
|
||||||
|
|
||||||
|
def test_empty_quantile_dmatrix(self, local_cuda_client: Client) -> None:
|
||||||
|
client = local_cuda_client
|
||||||
|
X, y = make_categorical(client, 1, 30, 13)
|
||||||
|
X_valid, y_valid = make_categorical(client, 10000, 30, 13)
|
||||||
|
|
||||||
|
Xy = xgb.dask.DaskQuantileDMatrix(client, X, y, enable_categorical=True)
|
||||||
|
Xy_valid = xgb.dask.DaskQuantileDMatrix(
|
||||||
|
client, X_valid, y_valid, ref=Xy, enable_categorical=True
|
||||||
|
)
|
||||||
|
result = xgb.dask.train(
|
||||||
|
client,
|
||||||
|
{"tree_method": "gpu_hist"},
|
||||||
|
Xy,
|
||||||
|
num_boost_round=10,
|
||||||
|
evals=[(Xy_valid, "Valid")],
|
||||||
|
)
|
||||||
|
predt = xgb.dask.inplace_predict(client, result["booster"], X).compute()
|
||||||
|
np.testing.assert_allclose(y.compute(), predt)
|
||||||
|
rmse = result["history"]["Valid"]["rmse"][-1]
|
||||||
|
assert rmse < 32.0
|
||||||
|
|
||||||
@pytest.mark.skipif(**tm.no_cupy())
|
@pytest.mark.skipif(**tm.no_cupy())
|
||||||
def test_dask_array(self, local_cuda_client: Client) -> None:
|
def test_dask_array(self, local_cuda_client: Client) -> None:
|
||||||
run_with_dask_array(dxgb.DaskDMatrix, local_cuda_client)
|
run_with_dask_array(dxgb.DaskDMatrix, local_cuda_client)
|
||||||
|
|||||||
@ -96,6 +96,9 @@ def make_categorical(
|
|||||||
l_n_samples = min(
|
l_n_samples = min(
|
||||||
n_samples // n_workers, n_samples - i * (n_samples // n_workers)
|
n_samples // n_workers, n_samples - i * (n_samples // n_workers)
|
||||||
)
|
)
|
||||||
|
# make sure there's at least one sample for testing empty DMatrix
|
||||||
|
if n_samples == 1 and i == 0:
|
||||||
|
l_n_samples = 1
|
||||||
future = client.submit(
|
future = client.submit(
|
||||||
pack,
|
pack,
|
||||||
n_samples=l_n_samples,
|
n_samples=l_n_samples,
|
||||||
@ -1480,6 +1483,27 @@ class TestWithDask:
|
|||||||
quantile_hist["Valid"]["rmse"], dmatrix_hist["Valid"]["rmse"]
|
quantile_hist["Valid"]["rmse"], dmatrix_hist["Valid"]["rmse"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_empty_quantile_dmatrix(self, client: Client) -> None:
|
||||||
|
X, y = make_categorical(client, 2, 30, 13)
|
||||||
|
X_valid, y_valid = make_categorical(client, 10000, 30, 13)
|
||||||
|
X_valid, y_valid, _ = deterministic_repartition(client, X_valid, y_valid, None)
|
||||||
|
|
||||||
|
Xy = xgb.dask.DaskQuantileDMatrix(client, X, y, enable_categorical=True)
|
||||||
|
Xy_valid = xgb.dask.DaskQuantileDMatrix(
|
||||||
|
client, X_valid, y_valid, ref=Xy, enable_categorical=True
|
||||||
|
)
|
||||||
|
result = xgb.dask.train(
|
||||||
|
client,
|
||||||
|
{"tree_method": "hist"},
|
||||||
|
Xy,
|
||||||
|
num_boost_round=10,
|
||||||
|
evals=[(Xy_valid, "Valid")],
|
||||||
|
)
|
||||||
|
predt = xgb.dask.inplace_predict(client, result["booster"], X).compute()
|
||||||
|
np.testing.assert_allclose(y.compute(), predt)
|
||||||
|
rmse = result["history"]["Valid"]["rmse"][-1]
|
||||||
|
assert rmse < 32.0
|
||||||
|
|
||||||
@given(params=hist_parameter_strategy, dataset=tm.dataset_strategy)
|
@given(params=hist_parameter_strategy, dataset=tm.dataset_strategy)
|
||||||
@settings(
|
@settings(
|
||||||
deadline=None, max_examples=10, suppress_health_check=suppress, print_blob=True
|
deadline=None, max_examples=10, suppress_health_check=suppress, print_blob=True
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user