[SYCL] Add dask support for distributed (#10812)
This commit is contained in:
parent
2a37a8880c
commit
d7599e095b
@ -31,6 +31,33 @@ template void InitHist(::sycl::queue qu,
|
|||||||
GHistRow<double, MemoryType::on_device>* hist,
|
GHistRow<double, MemoryType::on_device>* hist,
|
||||||
size_t size, ::sycl::event* event);
|
size_t size, ::sycl::event* event);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Copy histogram from src to dst
|
||||||
|
*/
|
||||||
|
template<typename GradientSumT>
|
||||||
|
void CopyHist(::sycl::queue qu,
|
||||||
|
GHistRow<GradientSumT, MemoryType::on_device>* dst,
|
||||||
|
const GHistRow<GradientSumT, MemoryType::on_device>& src,
|
||||||
|
size_t size) {
|
||||||
|
GradientSumT* pdst = reinterpret_cast<GradientSumT*>(dst->Data());
|
||||||
|
const GradientSumT* psrc = reinterpret_cast<const GradientSumT*>(src.DataConst());
|
||||||
|
|
||||||
|
qu.submit([&](::sycl::handler& cgh) {
|
||||||
|
cgh.parallel_for<>(::sycl::range<1>(2 * size), [=](::sycl::item<1> pid) {
|
||||||
|
const size_t i = pid.get_id(0);
|
||||||
|
pdst[i] = psrc[i];
|
||||||
|
});
|
||||||
|
}).wait();
|
||||||
|
}
|
||||||
|
template void CopyHist(::sycl::queue qu,
|
||||||
|
GHistRow<float, MemoryType::on_device>* dst,
|
||||||
|
const GHistRow<float, MemoryType::on_device>& src,
|
||||||
|
size_t size);
|
||||||
|
template void CopyHist(::sycl::queue qu,
|
||||||
|
GHistRow<double, MemoryType::on_device>* dst,
|
||||||
|
const GHistRow<double, MemoryType::on_device>& src,
|
||||||
|
size_t size);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Compute Subtraction: dst = src1 - src2
|
* \brief Compute Subtraction: dst = src1 - src2
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -36,6 +36,15 @@ void InitHist(::sycl::queue qu,
|
|||||||
GHistRow<GradientSumT, MemoryType::on_device>* hist,
|
GHistRow<GradientSumT, MemoryType::on_device>* hist,
|
||||||
size_t size, ::sycl::event* event);
|
size_t size, ::sycl::event* event);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Copy histogram from src to dst
|
||||||
|
*/
|
||||||
|
template<typename GradientSumT>
|
||||||
|
void CopyHist(::sycl::queue qu,
|
||||||
|
GHistRow<GradientSumT, MemoryType::on_device>* dst,
|
||||||
|
const GHistRow<GradientSumT, MemoryType::on_device>& src,
|
||||||
|
size_t size);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Compute subtraction: dst = src1 - src2
|
* \brief Compute subtraction: dst = src1 - src2
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -39,6 +39,42 @@ class BatchHistRowsAdder: public HistRowsAdder<GradientSumT> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
template <typename GradientSumT>
|
||||||
|
class DistributedHistRowsAdder: public HistRowsAdder<GradientSumT> {
|
||||||
|
public:
|
||||||
|
void AddHistRows(HistUpdater<GradientSumT>* builder,
|
||||||
|
std::vector<int>* sync_ids, RegTree *p_tree) override {
|
||||||
|
builder->builder_monitor_.Start("AddHistRows");
|
||||||
|
const size_t explicit_size = builder->nodes_for_explicit_hist_build_.size();
|
||||||
|
const size_t subtaction_size = builder->nodes_for_subtraction_trick_.size();
|
||||||
|
std::vector<int> merged_node_ids(explicit_size + subtaction_size);
|
||||||
|
for (size_t i = 0; i < explicit_size; ++i) {
|
||||||
|
merged_node_ids[i] = builder->nodes_for_explicit_hist_build_[i].nid;
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < subtaction_size; ++i) {
|
||||||
|
merged_node_ids[explicit_size + i] =
|
||||||
|
builder->nodes_for_subtraction_trick_[i].nid;
|
||||||
|
}
|
||||||
|
std::sort(merged_node_ids.begin(), merged_node_ids.end());
|
||||||
|
sync_ids->clear();
|
||||||
|
for (auto const& nid : merged_node_ids) {
|
||||||
|
if ((*p_tree)[nid].IsLeftChild()) {
|
||||||
|
builder->hist_.AddHistRow(nid);
|
||||||
|
builder->hist_local_worker_.AddHistRow(nid);
|
||||||
|
sync_ids->push_back(nid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (auto const& nid : merged_node_ids) {
|
||||||
|
if (!((*p_tree)[nid].IsLeftChild())) {
|
||||||
|
builder->hist_.AddHistRow(nid);
|
||||||
|
builder->hist_local_worker_.AddHistRow(nid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder->builder_monitor_.Stop("AddHistRows");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace tree
|
} // namespace tree
|
||||||
} // namespace sycl
|
} // namespace sycl
|
||||||
} // namespace xgboost
|
} // namespace xgboost
|
||||||
|
|||||||
@ -61,6 +61,68 @@ class BatchHistSynchronizer: public HistSynchronizer<GradientSumT> {
|
|||||||
std::vector<::sycl::event> hist_sync_events_;
|
std::vector<::sycl::event> hist_sync_events_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template <typename GradientSumT>
|
||||||
|
class DistributedHistSynchronizer: public HistSynchronizer<GradientSumT> {
|
||||||
|
public:
|
||||||
|
void SyncHistograms(HistUpdater<GradientSumT>* builder,
|
||||||
|
const std::vector<int>& sync_ids,
|
||||||
|
RegTree *p_tree) override {
|
||||||
|
builder->builder_monitor_.Start("SyncHistograms");
|
||||||
|
const size_t nbins = builder->hist_builder_.GetNumBins();
|
||||||
|
for (int node = 0; node < builder->nodes_for_explicit_hist_build_.size(); node++) {
|
||||||
|
const auto entry = builder->nodes_for_explicit_hist_build_[node];
|
||||||
|
auto& this_hist = builder->hist_[entry.nid];
|
||||||
|
// // Store posible parent node
|
||||||
|
auto& this_local = builder->hist_local_worker_[entry.nid];
|
||||||
|
common::CopyHist(builder->qu_, &this_local, this_hist, nbins);
|
||||||
|
|
||||||
|
if (!(*p_tree)[entry.nid].IsRoot()) {
|
||||||
|
const size_t parent_id = (*p_tree)[entry.nid].Parent();
|
||||||
|
auto sibling_nid = entry.GetSiblingId(p_tree, parent_id);
|
||||||
|
auto& parent_hist = builder->hist_local_worker_[parent_id];
|
||||||
|
|
||||||
|
auto& sibling_hist = builder->hist_[sibling_nid];
|
||||||
|
common::SubtractionHist(builder->qu_, &sibling_hist, parent_hist,
|
||||||
|
this_hist, nbins, ::sycl::event());
|
||||||
|
builder->qu_.wait_and_throw();
|
||||||
|
// Store posible parent node
|
||||||
|
auto& sibling_local = builder->hist_local_worker_[sibling_nid];
|
||||||
|
common::CopyHist(builder->qu_, &sibling_local, sibling_hist, nbins);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder->ReduceHists(sync_ids, nbins);
|
||||||
|
|
||||||
|
ParallelSubtractionHist(builder, builder->nodes_for_explicit_hist_build_, p_tree);
|
||||||
|
ParallelSubtractionHist(builder, builder->nodes_for_subtraction_trick_, p_tree);
|
||||||
|
|
||||||
|
builder->builder_monitor_.Stop("SyncHistograms");
|
||||||
|
}
|
||||||
|
|
||||||
|
void ParallelSubtractionHist(HistUpdater<GradientSumT>* builder,
|
||||||
|
const std::vector<ExpandEntry>& nodes,
|
||||||
|
const RegTree * p_tree) {
|
||||||
|
const size_t nbins = builder->hist_builder_.GetNumBins();
|
||||||
|
for (int node = 0; node < nodes.size(); node++) {
|
||||||
|
const auto entry = nodes[node];
|
||||||
|
if (!((*p_tree)[entry.nid].IsLeftChild())) {
|
||||||
|
auto& this_hist = builder->hist_[entry.nid];
|
||||||
|
|
||||||
|
if (!(*p_tree)[entry.nid].IsRoot()) {
|
||||||
|
const size_t parent_id = (*p_tree)[entry.nid].Parent();
|
||||||
|
auto& parent_hist = builder->hist_[parent_id];
|
||||||
|
auto& sibling_hist = builder->hist_[entry.GetSiblingId(p_tree, parent_id)];
|
||||||
|
common::SubtractionHist(builder->qu_, &this_hist, parent_hist,
|
||||||
|
sibling_hist, nbins, ::sycl::event());
|
||||||
|
builder->qu_.wait_and_throw();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<::sycl::event> hist_sync_events_;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace tree
|
} // namespace tree
|
||||||
} // namespace sycl
|
} // namespace sycl
|
||||||
} // namespace xgboost
|
} // namespace xgboost
|
||||||
|
|||||||
@ -22,6 +22,30 @@ using ::sycl::ext::oneapi::plus;
|
|||||||
using ::sycl::ext::oneapi::minimum;
|
using ::sycl::ext::oneapi::minimum;
|
||||||
using ::sycl::ext::oneapi::maximum;
|
using ::sycl::ext::oneapi::maximum;
|
||||||
|
|
||||||
|
template <typename GradientSumT>
|
||||||
|
void HistUpdater<GradientSumT>::ReduceHists(const std::vector<int>& sync_ids,
|
||||||
|
size_t nbins) {
|
||||||
|
if (reduce_buffer_.size() < sync_ids.size() * nbins) {
|
||||||
|
reduce_buffer_.resize(sync_ids.size() * nbins);
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < sync_ids.size(); i++) {
|
||||||
|
auto& this_hist = hist_[sync_ids[i]];
|
||||||
|
const GradientPairT* psrc = reinterpret_cast<const GradientPairT*>(this_hist.DataConst());
|
||||||
|
qu_.memcpy(reduce_buffer_.data() + i * nbins, psrc, nbins*sizeof(GradientPairT)).wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto buffer_vec = linalg::MakeVec(reinterpret_cast<GradientSumT*>(reduce_buffer_.data()),
|
||||||
|
2 * nbins * sync_ids.size());
|
||||||
|
auto rc = collective::Allreduce(ctx_, buffer_vec, collective::Op::kSum);
|
||||||
|
SafeColl(rc);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < sync_ids.size(); i++) {
|
||||||
|
auto& this_hist = hist_[sync_ids[i]];
|
||||||
|
GradientPairT* psrc = reinterpret_cast<GradientPairT*>(this_hist.Data());
|
||||||
|
qu_.memcpy(psrc, reduce_buffer_.data() + i * nbins, nbins*sizeof(GradientPairT)).wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
template <typename GradientSumT>
|
template <typename GradientSumT>
|
||||||
void HistUpdater<GradientSumT>::SetHistSynchronizer(
|
void HistUpdater<GradientSumT>::SetHistSynchronizer(
|
||||||
HistSynchronizer<GradientSumT> *sync) {
|
HistSynchronizer<GradientSumT> *sync) {
|
||||||
@ -492,6 +516,7 @@ void HistUpdater<GradientSumT>::InitData(
|
|||||||
// initialize histogram collection
|
// initialize histogram collection
|
||||||
uint32_t nbins = gmat.cut.Ptrs().back();
|
uint32_t nbins = gmat.cut.Ptrs().back();
|
||||||
hist_.Init(qu_, nbins);
|
hist_.Init(qu_, nbins);
|
||||||
|
hist_local_worker_.Init(qu_, nbins);
|
||||||
|
|
||||||
hist_buffer_.Init(qu_, nbins);
|
hist_buffer_.Init(qu_, nbins);
|
||||||
size_t buffer_size = kBufferSize;
|
size_t buffer_size = kBufferSize;
|
||||||
|
|||||||
@ -87,7 +87,10 @@ class HistUpdater {
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
friend class BatchHistSynchronizer<GradientSumT>;
|
friend class BatchHistSynchronizer<GradientSumT>;
|
||||||
|
friend class DistributedHistSynchronizer<GradientSumT>;
|
||||||
|
|
||||||
friend class BatchHistRowsAdder<GradientSumT>;
|
friend class BatchHistRowsAdder<GradientSumT>;
|
||||||
|
friend class DistributedHistRowsAdder<GradientSumT>;
|
||||||
|
|
||||||
struct SplitQuery {
|
struct SplitQuery {
|
||||||
bst_node_t nid;
|
bst_node_t nid;
|
||||||
@ -183,6 +186,8 @@ class HistUpdater {
|
|||||||
RegTree* p_tree,
|
RegTree* p_tree,
|
||||||
const USMVector<GradientPair, MemoryType::on_device>& gpair);
|
const USMVector<GradientPair, MemoryType::on_device>& gpair);
|
||||||
|
|
||||||
|
void ReduceHists(const std::vector<int>& sync_ids, size_t nbins);
|
||||||
|
|
||||||
inline static bool LossGuide(ExpandEntry lhs, ExpandEntry rhs) {
|
inline static bool LossGuide(ExpandEntry lhs, ExpandEntry rhs) {
|
||||||
if (lhs.GetLossChange() == rhs.GetLossChange()) {
|
if (lhs.GetLossChange() == rhs.GetLossChange()) {
|
||||||
return lhs.GetNodeId() > rhs.GetNodeId(); // favor small timestamp
|
return lhs.GetNodeId() > rhs.GetNodeId(); // favor small timestamp
|
||||||
@ -230,6 +235,8 @@ class HistUpdater {
|
|||||||
common::ParallelGHistBuilder<GradientSumT> hist_buffer_;
|
common::ParallelGHistBuilder<GradientSumT> hist_buffer_;
|
||||||
/*! \brief culmulative histogram of gradients. */
|
/*! \brief culmulative histogram of gradients. */
|
||||||
common::HistCollection<GradientSumT, MemoryType::on_device> hist_;
|
common::HistCollection<GradientSumT, MemoryType::on_device> hist_;
|
||||||
|
/*! \brief culmulative local parent histogram of gradients. */
|
||||||
|
common::HistCollection<GradientSumT, MemoryType::on_device> hist_local_worker_;
|
||||||
|
|
||||||
/*! \brief TreeNode Data: statistics for each constructed node */
|
/*! \brief TreeNode Data: statistics for each constructed node */
|
||||||
std::vector<NodeEntry<GradientSumT>> snode_host_;
|
std::vector<NodeEntry<GradientSumT>> snode_host_;
|
||||||
@ -258,6 +265,8 @@ class HistUpdater {
|
|||||||
USMVector<bst_float, MemoryType::on_device> out_preds_buf_;
|
USMVector<bst_float, MemoryType::on_device> out_preds_buf_;
|
||||||
bst_float* out_pred_ptr = nullptr;
|
bst_float* out_pred_ptr = nullptr;
|
||||||
|
|
||||||
|
std::vector<GradientPairT> reduce_buffer_;
|
||||||
|
|
||||||
::sycl::queue qu_;
|
::sycl::queue qu_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -51,7 +51,8 @@ void QuantileHistMaker::SetPimpl(std::unique_ptr<HistUpdater<GradientSumT>>* pim
|
|||||||
param_,
|
param_,
|
||||||
int_constraint_, dmat));
|
int_constraint_, dmat));
|
||||||
if (collective::IsDistributed()) {
|
if (collective::IsDistributed()) {
|
||||||
LOG(FATAL) << "Distributed mode is not yet upstreamed for sycl";
|
(*pimpl)->SetHistSynchronizer(new DistributedHistSynchronizer<GradientSumT>());
|
||||||
|
(*pimpl)->SetHistRowsAdder(new DistributedHistRowsAdder<GradientSumT>());
|
||||||
} else {
|
} else {
|
||||||
(*pimpl)->SetHistSynchronizer(new BatchHistSynchronizer<GradientSumT>());
|
(*pimpl)->SetHistSynchronizer(new BatchHistSynchronizer<GradientSumT>());
|
||||||
(*pimpl)->SetHistRowsAdder(new BatchHistRowsAdder<GradientSumT>());
|
(*pimpl)->SetHistRowsAdder(new BatchHistRowsAdder<GradientSumT>());
|
||||||
|
|||||||
@ -306,11 +306,12 @@ def _check_distributed_params(kwargs: Dict[str, Any]) -> None:
|
|||||||
raise TypeError(msg)
|
raise TypeError(msg)
|
||||||
|
|
||||||
if device and device.find(":") != -1:
|
if device and device.find(":") != -1:
|
||||||
raise ValueError(
|
if device != "sycl:gpu":
|
||||||
"Distributed training doesn't support selecting device ordinal as GPUs are"
|
raise ValueError(
|
||||||
" managed by the distributed frameworks. use `device=cuda` or `device=gpu`"
|
"Distributed training doesn't support selecting device ordinal as GPUs are"
|
||||||
" instead."
|
" managed by the distributed frameworks. use `device=cuda` or `device=gpu`"
|
||||||
)
|
" instead."
|
||||||
|
)
|
||||||
|
|
||||||
if kwargs.get("booster", None) == "gblinear":
|
if kwargs.get("booster", None) == "gblinear":
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
|
|||||||
@ -17,5 +17,6 @@ dependencies:
|
|||||||
- pytest
|
- pytest
|
||||||
- pytest-timeout
|
- pytest-timeout
|
||||||
- pytest-cov
|
- pytest-cov
|
||||||
|
- dask
|
||||||
- dpcpp_linux-64
|
- dpcpp_linux-64
|
||||||
- onedpl-devel
|
- onedpl-devel
|
||||||
|
|||||||
42
tests/python-sycl/test_sycl_simple_dask.py
Normal file
42
tests/python-sycl/test_sycl_simple_dask.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
from xgboost import dask as dxgb
|
||||||
|
from xgboost import testing as tm
|
||||||
|
|
||||||
|
from hypothesis import given, strategies, assume, settings, note
|
||||||
|
|
||||||
|
import dask.array as da
|
||||||
|
import dask.distributed
|
||||||
|
|
||||||
|
|
||||||
|
def train_result(client, param, dtrain, num_rounds):
|
||||||
|
result = dxgb.train(
|
||||||
|
client,
|
||||||
|
param,
|
||||||
|
dtrain,
|
||||||
|
num_rounds,
|
||||||
|
verbose_eval=False,
|
||||||
|
evals=[(dtrain, "train")],
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
class TestSYCLDask:
|
||||||
|
# The simplest test verify only one node training.
|
||||||
|
def test_simple(self):
|
||||||
|
cluster = dask.distributed.LocalCluster(n_workers=1)
|
||||||
|
client = dask.distributed.Client(cluster)
|
||||||
|
|
||||||
|
param = {}
|
||||||
|
param["tree_method"] = "hist"
|
||||||
|
param["device"] = "sycl"
|
||||||
|
param["verbosity"] = 0
|
||||||
|
param["objective"] = "reg:squarederror"
|
||||||
|
|
||||||
|
# X and y must be Dask dataframes or arrays
|
||||||
|
num_obs = 1e4
|
||||||
|
num_features = 20
|
||||||
|
X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
|
||||||
|
y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))
|
||||||
|
dtrain = dxgb.DaskDMatrix(client, X, y)
|
||||||
|
|
||||||
|
result = train_result(client, param, dtrain, 10)
|
||||||
|
assert tm.non_increasing(result["history"]["train"]["rmse"])
|
||||||
Loading…
x
Reference in New Issue
Block a user