initial merge

This commit is contained in:
amdsc21
2023-03-25 04:31:55 +01:00
146 changed files with 6730 additions and 4082 deletions

View File

@@ -10,13 +10,16 @@
#include <cstring>
#include "../collective/communicator-inl.h"
#include "../common/algorithm.h" // StableSort
#include "../common/api_entry.h" // XGBAPIThreadLocalEntry
#include "../collective/communicator.h"
#include "../common/common.h"
#include "../common/algorithm.h" // for StableSort
#include "../common/api_entry.h" // for XGBAPIThreadLocalEntry
#include "../common/error_msg.h" // for InfInData
#include "../common/group_data.h"
#include "../common/io.h"
#include "../common/linalg_op.h"
#include "../common/math.h"
#include "../common/numeric.h" // Iota
#include "../common/numeric.h" // for Iota
#include "../common/threading_utils.h"
#include "../common/version.h"
#include "../data/adapter.h"
@@ -700,6 +703,14 @@ void MetaInfo::Extend(MetaInfo const& that, bool accumulate_rows, bool check_col
}
}
void MetaInfo::SynchronizeNumberOfColumns() {
if (collective::IsFederated() && data_split_mode == DataSplitMode::kCol) {
collective::Allreduce<collective::Operation::kSum>(&num_col_, 1);
} else {
collective::Allreduce<collective::Operation::kMax>(&num_col_, 1);
}
}
void MetaInfo::Validate(std::int32_t device) const {
if (group_ptr_.size() != 0 && weights_.Size() != 0) {
CHECK_EQ(group_ptr_.size(), weights_.Size() + 1)
@@ -867,7 +878,7 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
dmlc::Parser<uint32_t>::Create(fname.c_str(), partid, npart, file_format.c_str()));
data::FileAdapter adapter(parser.get());
dmat = DMatrix::Create(&adapter, std::numeric_limits<float>::quiet_NaN(), Context{}.Threads(),
cache_file);
cache_file, data_split_mode);
} else {
data::FileIterator iter{fname, static_cast<uint32_t>(partid), static_cast<uint32_t>(npart),
file_format};
@@ -903,11 +914,6 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
LOG(FATAL) << "Encountered parser error:\n" << e.what();
}
/* sync up number of features after matrix loaded.
* partitioned data will fail the train/val validation check
* since partitioned data not knowing the real number of features. */
collective::Allreduce<collective::Operation::kMax>(&dmat->Info().num_col_, 1);
if (need_split && data_split_mode == DataSplitMode::kCol) {
if (!cache_file.empty()) {
LOG(FATAL) << "Column-wise data split is not support for external memory.";
@@ -917,7 +923,6 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
delete dmat;
return sliced;
} else {
dmat->Info().data_split_mode = data_split_mode;
return dmat;
}
}
@@ -954,39 +959,49 @@ template DMatrix *DMatrix::Create<DataIterHandle, DMatrixHandle,
XGDMatrixCallbackNext *next, float missing, int32_t n_threads, std::string);
template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, const std::string&) {
return new data::SimpleDMatrix(adapter, missing, nthread);
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, const std::string&,
DataSplitMode data_split_mode) {
return new data::SimpleDMatrix(adapter, missing, nthread, data_split_mode);
}
template DMatrix* DMatrix::Create<data::DenseAdapter>(data::DenseAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::ArrayAdapter>(data::ArrayAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSRAdapter>(data::CSRAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSCAdapter>(data::CSCAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::DataTableAdapter>(data::DataTableAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::FileAdapter>(data::FileAdapter* adapter, float missing,
std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSRArrayAdapter>(data::CSRArrayAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CSCArrayAdapter>(data::CSCArrayAdapter* adapter,
float missing, std::int32_t nthread,
const std::string& cache_prefix);
const std::string& cache_prefix,
DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create(
data::IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>* adapter,
float missing, int nthread, const std::string& cache_prefix);
float missing, int nthread, const std::string& cache_prefix, DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::RecordBatchesIterAdapter>(
data::RecordBatchesIterAdapter* adapter, float missing, int nthread, const std::string&);
data::RecordBatchesIterAdapter* adapter, float missing, int nthread, const std::string&,
DataSplitMode data_split_mode);
SparsePage SparsePage::GetTranspose(int num_columns, int32_t n_threads) const {
SparsePage transpose;
@@ -1048,6 +1063,13 @@ void SparsePage::SortIndices(int32_t n_threads) {
});
}
void SparsePage::Reindex(uint64_t feature_offset, int32_t n_threads) {
auto& h_data = this->data.HostVector();
common::ParallelFor(h_data.size(), n_threads, [&](auto i) {
h_data[i].index += feature_offset;
});
}
void SparsePage::SortRows(int32_t n_threads) {
auto& h_offset = this->offset.HostVector();
auto& h_data = this->data.HostVector();
@@ -1144,7 +1166,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
});
}
exec.Rethrow();
CHECK(valid) << "Input data contains `inf` or `nan`";
CHECK(valid) << error::InfInData();
for (const auto & max : max_columns_vector) {
max_columns = std::max(max_columns, max[0]);
}

View File

@@ -208,17 +208,17 @@ void MetaInfo::SetInfoFromCUDA(Context const& ctx, StringView key, Json array) {
template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix) {
const std::string& cache_prefix, DataSplitMode data_split_mode) {
CHECK_EQ(cache_prefix.size(), 0)
<< "Device memory construction is not currently supported with external "
"memory.";
return new data::SimpleDMatrix(adapter, missing, nthread);
return new data::SimpleDMatrix(adapter, missing, nthread, data_split_mode);
}
template DMatrix* DMatrix::Create<data::CudfAdapter>(
data::CudfAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix);
const std::string& cache_prefix, DataSplitMode data_split_mode);
template DMatrix* DMatrix::Create<data::CupyAdapter>(
data::CupyAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix);
const std::string& cache_prefix, DataSplitMode data_split_mode);
} // namespace xgboost

View File

@@ -4,7 +4,10 @@
*/
#ifndef XGBOOST_DATA_DEVICE_ADAPTER_H_
#define XGBOOST_DATA_DEVICE_ADAPTER_H_
#include <cstddef> // for size_t
#include <thrust/iterator/counting_iterator.h> // for make_counting_iterator
#include <thrust/logical.h> // for none_of
#include <cstddef> // for size_t
#include <limits>
#include <memory>
#include <string>
@@ -240,6 +243,20 @@ size_t GetRowCounts(const AdapterBatchT batch, common::Span<size_t> offset,
return row_stride;
}
/**
* \brief Check there's no inf in data.
*/
template <typename AdapterBatchT>
bool HasInfInData(AdapterBatchT const& batch, IsValidFunctor is_valid) {
auto counting = thrust::make_counting_iterator(0llu);
auto value_iter = dh::MakeTransformIterator<float>(
counting, [=] XGBOOST_DEVICE(std::size_t idx) { return batch.GetElement(idx).value; });
auto valid =
thrust::none_of(value_iter, value_iter + batch.Size(),
[is_valid] XGBOOST_DEVICE(float v) { return is_valid(v) && std::isinf(v); });
return valid;
}
}; // namespace data
} // namespace xgboost
#endif // XGBOOST_DATA_DEVICE_ADAPTER_H_

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019-2022 XGBoost contributors
/**
* Copyright 2019-2023 by XGBoost contributors
*/
#include <thrust/iterator/discard_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
@@ -9,7 +9,7 @@
#include "../common/random.h"
#include "../common/transform_iterator.h" // MakeIndexTransformIter
#include "./ellpack_page.cuh"
#include "device_adapter.cuh"
#include "device_adapter.cuh" // for HasInfInData
#include "gradient_index.h"
#include "xgboost/data.h"
@@ -203,9 +203,8 @@ struct TupleScanOp {
// Here the data is already correctly ordered and simply needs to be compacted
// to remove missing data
template <typename AdapterBatchT>
void CopyDataToEllpack(const AdapterBatchT &batch,
common::Span<FeatureType const> feature_types,
EllpackPageImpl *dst, int device_idx, float missing) {
void CopyDataToEllpack(const AdapterBatchT& batch, common::Span<FeatureType const> feature_types,
EllpackPageImpl* dst, int device_idx, float missing) {
// Some witchcraft happens here
// The goal is to copy valid elements out of the input to an ELLPACK matrix
// with a given row stride, using no extra working memory Standard stream
@@ -215,6 +214,9 @@ void CopyDataToEllpack(const AdapterBatchT &batch,
// correct output position
auto counting = thrust::make_counting_iterator(0llu);
data::IsValidFunctor is_valid(missing);
bool valid = data::HasInfInData(batch, is_valid);
CHECK(valid) << error::InfInData();
auto key_iter = dh::MakeTransformIterator<size_t>(
counting,
[=] __device__(size_t idx) {
@@ -255,9 +257,9 @@ void CopyDataToEllpack(const AdapterBatchT &batch,
cub::DispatchScan<decltype(key_value_index_iter), decltype(out),
TupleScanOp<Tuple>, cub::NullType, int64_t>;
#if THRUST_MAJOR_VERSION >= 2
DispatchScan::Dispatch(nullptr, temp_storage_bytes, key_value_index_iter, out,
TupleScanOp<Tuple>(), cub::NullType(), batch.Size(),
nullptr);
dh::safe_cuda(DispatchScan::Dispatch(nullptr, temp_storage_bytes, key_value_index_iter, out,
TupleScanOp<Tuple>(), cub::NullType(), batch.Size(),
nullptr));
#else
DispatchScan::Dispatch(nullptr, temp_storage_bytes, key_value_index_iter, out,
TupleScanOp<Tuple>(), cub::NullType(), batch.Size(),
@@ -265,9 +267,9 @@ void CopyDataToEllpack(const AdapterBatchT &batch,
#endif
dh::TemporaryArray<char> temp_storage(temp_storage_bytes);
#if THRUST_MAJOR_VERSION >= 2
DispatchScan::Dispatch(temp_storage.data().get(), temp_storage_bytes,
key_value_index_iter, out, TupleScanOp<Tuple>(),
cub::NullType(), batch.Size(), nullptr);
dh::safe_cuda(DispatchScan::Dispatch(temp_storage.data().get(), temp_storage_bytes,
key_value_index_iter, out, TupleScanOp<Tuple>(),
cub::NullType(), batch.Size(), nullptr));
#else
DispatchScan::Dispatch(temp_storage.data().get(), temp_storage_bytes,
key_value_index_iter, out, TupleScanOp<Tuple>(),

View File

@@ -1,21 +1,23 @@
/*!
* Copyright 2017-2022 by XGBoost Contributors
/**
* Copyright 2017-2023 by XGBoost Contributors
* \brief Data type for fast histogram aggregation.
*/
#ifndef XGBOOST_DATA_GRADIENT_INDEX_H_
#define XGBOOST_DATA_GRADIENT_INDEX_H_
#include <algorithm> // std::min
#include <cinttypes> // std::uint32_t
#include <cstddef> // std::size_t
#include <algorithm> // for min
#include <atomic> // for atomic
#include <cinttypes> // for uint32_t
#include <cstddef> // for size_t
#include <memory>
#include <vector>
#include "../common/categorical.h"
#include "../common/error_msg.h" // for InfInData
#include "../common/hist_util.h"
#include "../common/numeric.h"
#include "../common/threading_utils.h"
#include "../common/transform_iterator.h" // common::MakeIndexTransformIter
#include "../common/transform_iterator.h" // for MakeIndexTransformIter
#include "adapter.h"
#include "proxy_dmatrix.h"
#include "xgboost/base.h"
@@ -62,6 +64,7 @@ class GHistIndexMatrix {
BinIdxType* index_data = index_data_span.data();
auto const& ptrs = cut.Ptrs();
auto const& values = cut.Values();
std::atomic<bool> valid{true};
common::ParallelFor(batch_size, batch_threads, [&](size_t i) {
auto line = batch.GetLine(i);
size_t ibegin = row_ptr[rbegin + i]; // index of first entry for current block
@@ -70,6 +73,9 @@ class GHistIndexMatrix {
for (size_t j = 0; j < line.Size(); ++j) {
data::COOTuple elem = line.GetElement(j);
if (is_valid(elem)) {
if (XGBOOST_EXPECT((std::isinf(elem.value)), false)) {
valid = false;
}
bst_bin_t bin_idx{-1};
if (common::IsCat(ft, elem.column_idx)) {
bin_idx = cut.SearchCatBin(elem.value, elem.column_idx, ptrs, values);
@@ -82,6 +88,8 @@ class GHistIndexMatrix {
}
}
});
CHECK(valid) << error::InfInData();
}
// Gather hit_count from all threads

View File

@@ -190,7 +190,7 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
// From here on Info() has the correct data shape
Info().num_row_ = accumulated_rows;
Info().num_nonzero_ = nnz;
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
Info().SynchronizeNumberOfColumns();
CHECK(std::none_of(column_sizes.cbegin(), column_sizes.cend(), [&](auto f) {
return f > accumulated_rows;
})) << "Something went wrong during iteration.";
@@ -257,6 +257,7 @@ void IterativeDMatrix::InitFromCPU(DataIterHandle iter_handle, float missing,
}
iter.Reset();
CHECK_EQ(rbegin, Info().num_row_);
CHECK_EQ(this->ghist_->Features(), Info().num_col_);
/**
* Generate column matrix

View File

@@ -195,7 +195,7 @@ void IterativeDMatrix::InitFromCUDA(DataIterHandle iter_handle, float missing,
iter.Reset();
// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.SynchronizeNumberOfColumns();
}
BatchSet<EllpackPage> IterativeDMatrix::GetEllpackBatches(BatchParam const& param) {

View File

@@ -1,27 +1,24 @@
/*!
* Copyright 2021 XGBoost contributors
/**
* Copyright 2021-2023 XGBoost contributors
*/
#include <any> // for any, any_cast
#include "device_adapter.cuh"
#include "proxy_dmatrix.h"
namespace xgboost {
namespace data {
namespace xgboost::data {
template <typename Fn>
decltype(auto) Dispatch(DMatrixProxy const* proxy, Fn fn) {
if (proxy->Adapter().type() == typeid(std::shared_ptr<CupyAdapter>)) {
auto value = dmlc::get<std::shared_ptr<CupyAdapter>>(
proxy->Adapter())->Value();
auto value = std::any_cast<std::shared_ptr<CupyAdapter>>(proxy->Adapter())->Value();
return fn(value);
} else if (proxy->Adapter().type() == typeid(std::shared_ptr<CudfAdapter>)) {
auto value = dmlc::get<std::shared_ptr<CudfAdapter>>(
proxy->Adapter())->Value();
auto value = std::any_cast<std::shared_ptr<CudfAdapter>>(proxy->Adapter())->Value();
return fn(value);
} else {
LOG(FATAL) << "Unknown type: " << proxy->Adapter().type().name();
auto value = dmlc::get<std::shared_ptr<CudfAdapter>>(
proxy->Adapter())->Value();
auto value = std::any_cast<std::shared_ptr<CudfAdapter>>(proxy->Adapter())->Value();
return fn(value);
}
}
} // namespace data
} // namespace xgboost
} // namespace xgboost::data

View File

@@ -1,11 +1,10 @@
/*!
* Copyright 2020-2022, XGBoost contributors
/**
* Copyright 2020-2023, XGBoost contributors
*/
#ifndef XGBOOST_DATA_PROXY_DMATRIX_H_
#define XGBOOST_DATA_PROXY_DMATRIX_H_
#include <dmlc/any.h>
#include <any> // for any, any_cast
#include <memory>
#include <string>
#include <utility>
@@ -15,8 +14,7 @@
#include "xgboost/context.h"
#include "xgboost/data.h"
namespace xgboost {
namespace data {
namespace xgboost::data {
/*
* \brief A proxy to external iterator.
*/
@@ -44,7 +42,7 @@ class DataIterProxy {
*/
class DMatrixProxy : public DMatrix {
MetaInfo info_;
dmlc::any batch_;
std::any batch_;
Context ctx_;
#if defined(XGBOOST_USE_CUDA) || defined(XGBOOST_USE_HIP)
@@ -115,9 +113,7 @@ class DMatrixProxy : public DMatrix {
LOG(FATAL) << "Not implemented.";
return BatchSet<ExtSparsePage>(BatchIterator<ExtSparsePage>(nullptr));
}
dmlc::any Adapter() const {
return batch_;
}
std::any Adapter() const { return batch_; }
};
inline DMatrixProxy* MakeProxy(DMatrixHandle proxy) {
@@ -131,15 +127,13 @@ inline DMatrixProxy* MakeProxy(DMatrixHandle proxy) {
template <typename Fn>
decltype(auto) HostAdapterDispatch(DMatrixProxy const* proxy, Fn fn, bool* type_error = nullptr) {
if (proxy->Adapter().type() == typeid(std::shared_ptr<CSRArrayAdapter>)) {
auto value =
dmlc::get<std::shared_ptr<CSRArrayAdapter>>(proxy->Adapter())->Value();
auto value = std::any_cast<std::shared_ptr<CSRArrayAdapter>>(proxy->Adapter())->Value();
if (type_error) {
*type_error = false;
}
return fn(value);
} else if (proxy->Adapter().type() == typeid(std::shared_ptr<ArrayAdapter>)) {
auto value = dmlc::get<std::shared_ptr<ArrayAdapter>>(
proxy->Adapter())->Value();
auto value = std::any_cast<std::shared_ptr<ArrayAdapter>>(proxy->Adapter())->Value();
if (type_error) {
*type_error = false;
}
@@ -154,6 +148,5 @@ decltype(auto) HostAdapterDispatch(DMatrixProxy const* proxy, Fn fn, bool* type_
decltype(std::declval<std::shared_ptr<ArrayAdapter>>()->Value()))>();
}
}
} // namespace data
} // namespace xgboost
} // namespace xgboost::data
#endif // XGBOOST_DATA_PROXY_DMATRIX_H_

View File

@@ -73,6 +73,19 @@ DMatrix* SimpleDMatrix::SliceCol(int num_slices, int slice_id) {
return out;
}
void SimpleDMatrix::ReindexFeatures() {
if (collective::IsFederated() && info_.data_split_mode == DataSplitMode::kCol) {
std::vector<uint64_t> buffer(collective::GetWorldSize());
buffer[collective::GetRank()] = info_.num_col_;
collective::Allgather(buffer.data(), buffer.size() * sizeof(uint64_t));
auto offset = std::accumulate(buffer.cbegin(), buffer.cbegin() + collective::GetRank(), 0);
if (offset == 0) {
return;
}
sparse_page_->Reindex(offset, ctx_.Threads());
}
}
BatchSet<SparsePage> SimpleDMatrix::GetRowBatches() {
// since csr is the default data structure so `source_` is always available.
auto begin_iter = BatchIterator<SparsePage>(
@@ -151,7 +164,8 @@ BatchSet<ExtSparsePage> SimpleDMatrix::GetExtBatches(BatchParam const&) {
}
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
DataSplitMode data_split_mode) {
this->ctx_.nthread = nthread;
std::vector<uint64_t> qids;
@@ -217,7 +231,9 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
ReindexFeatures();
info_.SynchronizeNumberOfColumns();
if (adapter->NumRows() == kAdapterUnknownSize) {
using IteratorAdapterT
@@ -272,22 +288,31 @@ void SimpleDMatrix::SaveToLocalFile(const std::string& fname) {
fo->Write(sparse_page_->data.HostVector());
}
template SimpleDMatrix::SimpleDMatrix(DenseAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(ArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSRAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSRArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSCArrayAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(CSCAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(DataTableAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing, int nthread);
template SimpleDMatrix::SimpleDMatrix(DenseAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(ArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSRAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSRArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSCArrayAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CSCAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(DataTableAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(
IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>
*adapter,
float missing, int nthread);
float missing, int nthread, DataSplitMode data_split_mode);
template <>
SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, int nthread) {
ctx_.nthread = nthread;
SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, int nthread,
DataSplitMode data_split_mode) {
ctx_.nthread = nthread;
auto& offset_vec = sparse_page_->offset.HostVector();
auto& data_vec = sparse_page_->data.HostVector();
@@ -346,7 +371,10 @@ SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, i
}
// Synchronise worker columns
info_.num_col_ = adapter->NumColumns();
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
ReindexFeatures();
info_.SynchronizeNumberOfColumns();
info_.num_row_ = total_batch_size;
info_.num_nonzero_ = data_vec.size();
CHECK_EQ(offset_vec.back(), info_.num_nonzero_);

View File

@@ -15,7 +15,10 @@ namespace data {
// Current implementation assumes a single batch. More batches can
// be supported in future. Does not currently support inferring row/column size
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread*/) {
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread*/,
DataSplitMode data_split_mode) {
CHECK(data_split_mode != DataSplitMode::kCol)
<< "Column-wise data split is currently not supported on the GPU.";
auto device = (adapter->DeviceIdx() < 0 || adapter->NumRows() == 0) ? dh::CurrentDevice()
: adapter->DeviceIdx();
CHECK_GE(device, 0);
@@ -40,12 +43,13 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int32_t /*nthread
info_.num_col_ = adapter->NumColumns();
info_.num_row_ = adapter->NumRows();
// Synchronise worker columns
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.data_split_mode = data_split_mode;
info_.SynchronizeNumberOfColumns();
}
template SimpleDMatrix::SimpleDMatrix(CudfAdapter* adapter, float missing,
int nthread);
int nthread, DataSplitMode data_split_mode);
template SimpleDMatrix::SimpleDMatrix(CupyAdapter* adapter, float missing,
int nthread);
int nthread, DataSplitMode data_split_mode);
} // namespace data
} // namespace xgboost

View File

@@ -1,14 +1,13 @@
/*!
* Copyright 2019-2021 by XGBoost Contributors
/**
* Copyright 2019-2023 by XGBoost Contributors
* \file simple_dmatrix.cuh
*/
#ifndef XGBOOST_DATA_SIMPLE_DMATRIX_CUH_
#define XGBOOST_DATA_SIMPLE_DMATRIX_CUH_
#include <thrust/copy.h>
#include <thrust/scan.h>
#include <thrust/execution_policy.h>
#include "device_adapter.cuh"
#include <thrust/scan.h>
#if defined(XGBOOST_USE_CUDA)
#include "../common/device_helpers.cuh"
@@ -16,8 +15,10 @@
#include "../common/device_helpers.hip.h"
#endif
namespace xgboost {
namespace data {
#include "../common/error_msg.h" // for InfInData
#include "device_adapter.cuh" // for HasInfInData
namespace xgboost::data {
#if defined(XGBOOST_USE_CUDA)
template <typename AdapterBatchT>
@@ -94,7 +95,11 @@ void CountRowOffsets(const AdapterBatchT& batch, common::Span<bst_row_t> offset,
}
template <typename AdapterBatchT>
size_t CopyToSparsePage(AdapterBatchT const& batch, int32_t device, float missing, SparsePage* page) {
size_t CopyToSparsePage(AdapterBatchT const& batch, int32_t device, float missing,
SparsePage* page) {
bool valid = HasInfInData(batch, IsValidFunctor{missing});
CHECK(valid) << error::InfInData();
page->offset.SetDevice(device);
page->data.SetDevice(device);
page->offset.Resize(batch.NumRows() + 1);
@@ -106,6 +111,5 @@ size_t CopyToSparsePage(AdapterBatchT const& batch, int32_t device, float missin
return num_nonzero_;
}
} // namespace data
} // namespace xgboost
} // namespace xgboost::data
#endif // XGBOOST_DATA_SIMPLE_DMATRIX_CUH_

View File

@@ -22,7 +22,8 @@ class SimpleDMatrix : public DMatrix {
public:
SimpleDMatrix() = default;
template <typename AdapterT>
explicit SimpleDMatrix(AdapterT* adapter, float missing, int nthread);
explicit SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
DataSplitMode data_split_mode = DataSplitMode::kRow);
explicit SimpleDMatrix(dmlc::Stream* in_stream);
~SimpleDMatrix() override = default;
@@ -61,6 +62,15 @@ class SimpleDMatrix : public DMatrix {
bool GHistIndexExists() const override { return static_cast<bool>(gradient_index_); }
bool SparsePageExists() const override { return true; }
/**
* \brief Reindex the features based on a global view.
*
* In some cases (e.g. vertical federated learning), features are loaded locally with indices
* starting from 0. However, all the algorithms assume the features are globally indexed, so we
* reindex the features based on the offset needed to obtain the global view.
*/
void ReindexFeatures();
private:
Context ctx_;
};

View File

@@ -96,7 +96,7 @@ SparsePageDMatrix::SparsePageDMatrix(DataIterHandle iter_handle, DMatrixHandle p
this->info_.num_col_ = n_features;
this->info_.num_nonzero_ = nnz;
collective::Allreduce<collective::Operation::kMax>(&info_.num_col_, 1);
info_.SynchronizeNumberOfColumns();
CHECK_NE(info_.num_col_, 0);
}