merge latest changes

This commit is contained in:
amdsc21 2023-06-15 21:39:14 +02:00
commit 5ca7daaa13
18 changed files with 284 additions and 151 deletions

View File

@ -11,7 +11,7 @@ General Development Process
--------------------------- ---------------------------
Everyone in the community is welcomed to send patches, documents, and propose new directions to the project. The key guideline here is to enable everyone in the community to get involved and participate the decision and development. When major changes are proposed, an RFC should be sent to allow discussion by the community. We encourage public discussion, archivable channels such as issues and discuss forum, so that everyone in the community can participate and review the process later. Everyone in the community is welcomed to send patches, documents, and propose new directions to the project. The key guideline here is to enable everyone in the community to get involved and participate the decision and development. When major changes are proposed, an RFC should be sent to allow discussion by the community. We encourage public discussion, archivable channels such as issues and discuss forum, so that everyone in the community can participate and review the process later.
Code reviews are one of the key ways to ensure the quality of the code. High-quality code reviews prevent technical debt for long-term and are crucial to the success of the project. A pull request needs to be reviewed before it gets merged. A committer who has the expertise of the corresponding area would moderate the pull request and the merge the code when it is ready. The corresponding committer could request multiple reviewers who are familiar with the area of the code. We encourage contributors to request code reviews themselves and help review each other's code -- remember everyone is volunteering their time to the community, high-quality code review itself costs as much as the actual code contribution, you could get your code quickly reviewed if you do others the same favor. Code reviews are one of the key ways to ensure the quality of the code. High-quality code reviews prevent technical debt for long-term and are crucial to the success of the project. A pull request needs to be reviewed before it gets merged. A committer who has the expertise of the corresponding area would moderate the pull request and then merge the code when it is ready. The corresponding committer could request multiple reviewers who are familiar with the area of the code. We encourage contributors to request code reviews themselves and help review each other's code -- remember everyone is volunteering their time to the community, high-quality code review itself costs as much as the actual code contribution, you could get your code quickly reviewed if you do others the same favor.
The community should strive to reach a consensus on technical decisions through discussion. We expect committers and PMCs to moderate technical discussions in a diplomatic way, and provide suggestions with clear technical reasoning when necessary. The community should strive to reach a consensus on technical decisions through discussion. We expect committers and PMCs to moderate technical discussions in a diplomatic way, and provide suggestions with clear technical reasoning when necessary.
@ -25,11 +25,11 @@ Committers are individuals who are granted the write access to the project. A co
- Quality of contributions: High-quality, readable code contributions indicated by pull requests that can be merged without a substantial code review. History of creating clean, maintainable code and including good test cases. Informative code reviews to help other contributors that adhere to a good standard. - Quality of contributions: High-quality, readable code contributions indicated by pull requests that can be merged without a substantial code review. History of creating clean, maintainable code and including good test cases. Informative code reviews to help other contributors that adhere to a good standard.
- Community involvement: active participation in the discussion forum, promote the projects via tutorials, talks and outreach. We encourage committers to collaborate broadly, e.g. do code reviews and discuss designs with community members that they do not interact physically. - Community involvement: active participation in the discussion forum, promote the projects via tutorials, talks and outreach. We encourage committers to collaborate broadly, e.g. do code reviews and discuss designs with community members that they do not interact physically.
The Project Management Committee(PMC) consists group of active committers that moderate the discussion, manage the project release, and proposes new committer/PMC members. Potential candidates are usually proposed via an internal discussion among PMCs, followed by a consensus approval, i.e. least 3 +1 votes, and no vetoes. Any veto must be accompanied by reasoning. PMCs should serve the community by upholding the community practices and guidelines XGBoost a better community for everyone. PMCs should strive to only nominate new candidates outside of their own organization. The Project Management Committee(PMC) consists of a group of active committers that moderate the discussion, manage the project release, and proposes new committer/PMC members. Potential candidates are usually proposed via an internal discussion among PMCs, followed by a consensus approval, i.e. least 3 +1 votes, and no vetoes. Any veto must be accompanied by reasoning. PMCs should serve the community by upholding the community practices and guidelines in order to make XGBoost a better community for everyone. PMCs should strive to only nominate new candidates outside of their own organization.
The PMC is in charge of the project's `continuous integration (CI) <https://en.wikipedia.org/wiki/Continuous_integration>`_ and testing infrastructure. Currently, we host our own Jenkins server at https://xgboost-ci.net. The PMC shall appoint committer(s) to manage the CI infrastructure. The PMC may accept 3rd-party donations and sponsorships that would defray the cost of the CI infrastructure. See :ref:`donation_policy`. The PMC is in charge of the project's `continuous integration (CI) <https://en.wikipedia.org/wiki/Continuous_integration>`_ and testing infrastructure. Currently, we host our own Jenkins server at https://xgboost-ci.net. The PMC shall appoint committer(s) to manage the CI infrastructure. The PMC may accept 3rd-party donations and sponsorships that would defray the cost of the CI infrastructure. See :ref:`donation_policy`.
Reviewers Reviewers
--------- ---------
Reviewers are individuals who actively contributed to the project and are willing to participate in the code review of new contributions. We identify reviewers from active contributors. The committers should explicitly solicit reviews from reviewers. High-quality code reviews prevent technical debt for long-term and are crucial to the success of the project. A pull request to the project has to be reviewed by at least one reviewer in order to be merged. Reviewers are individuals who actively contributed to the project and are willing to participate in the code review of new contributions. We identify reviewers from active contributors. The committers should explicitly solicit reviews from reviewers. High-quality code reviews prevent technical debt for the long-term and are crucial to the success of the project. A pull request to the project has to be reviewed by at least one reviewer in order to be merged.

View File

@ -8,23 +8,83 @@ Documentation and Examples
:backlinks: none :backlinks: none
:local: :local:
********* *************
Documents Documentation
********* *************
* Python and C documentation is built using `Sphinx <http://www.sphinx-doc.org/en/master/>`_. * Python and C documentation is built using `Sphinx <http://www.sphinx-doc.org/en/master/>`_.
* Each document is written in `reStructuredText <http://www.sphinx-doc.org/en/master/usage/restructuredtext/basics.html>`_. * Each document is written in `reStructuredText <http://www.sphinx-doc.org/en/master/usage/restructuredtext/basics.html>`_.
* You can build document locally to see the effect, by running * The documentation is the ``doc/`` directory.
* You can build it locally using ``make html`` command.
.. code-block:: bash .. code-block:: bash
make html make html
inside the ``doc/`` directory. The online document is hosted by `Read the Docs <https://readthedocs.org/>`__ where the imported project is managed by `Hyunsu Cho <https://github.com/hcho3>`__ and `Jiaming Yuan <https://github.com/trivialfis>`__. Run ``make help`` to learn about the other commands.
The online document is hosted by `Read the Docs <https://readthedocs.org/>`__ where the imported project is managed by `Hyunsu Cho <https://github.com/hcho3>`__ and `Jiaming Yuan <https://github.com/trivialfis>`__.
=========================================
Build the Python Docs using pip and Conda
=========================================
#. Create a conda environment.
.. code-block:: bash
conda create -n xgboost-docs --yes python=3.10
.. note:: Python 3.10 is required by `xgboost_ray <https://github.com/ray-project/xgboost_ray>`__ package.
#. Activate the environment
.. code-block:: bash
conda activate xgboost-docs
#. Install required packages (in the current environment) using ``pip`` command.
.. code-block:: bash
pip install -r requirements.txt
.. note::
It is currently not possible to install the required packages using ``conda``
due to ``xgboost_ray`` being unavailable in conda channels.
.. code-block:: bash
conda install --file requirements.txt --yes -c conda-forge
#. (optional) Install `graphviz <https://www.graphviz.org/>`__
.. code-block:: bash
conda install graphviz --yes
#. Eventually, build the docs.
.. code-block:: bash
make html
You should see the following messages in the console:
.. code-block:: console
$ make html
sphinx-build -b html -d _build/doctrees . _build/html
Running Sphinx v6.2.1
...
The HTML pages are in _build/html.
Build finished. The HTML pages are in _build/html.
******** ********
Examples Examples
******** ********
* Use cases and examples will be in `demo <https://github.com/dmlc/xgboost/tree/master/demo>`_. * Use cases and examples are in `demo <https://github.com/dmlc/xgboost/tree/master/demo>`_ directory.
* We are super excited to hear about your story. If you have blog posts, * We are super excited to hear about your story. If you have blog posts,
tutorials, or code solutions using XGBoost, please tell us, and we will add tutorials, or code solutions using XGBoost, please tell us, and we will add
a link in the example pages. a link in the example pages.

View File

@ -23,8 +23,8 @@ Installation
:local: :local:
:backlinks: none :backlinks: none
Checkout the :doc:`Installation Guide </install>` for how to install jvm package, or Checkout the :doc:`Installation Guide </install>` for how to install the jvm package, or
:doc:`Building from Source </build>` on how to build it form source. :doc:`Building from Source </build>` on how to build it from the sources.
******** ********
Contents Contents

View File

@ -0,0 +1,81 @@
/**
* Copyright 2023 by XGBoost contributors
*/
#pragma once
#include <string>
#include <vector>
#include "communicator.h"
#include "device_communicator.cuh"
namespace xgboost {
namespace collective {
/**
* @brief Reduce values from all processes and distribute the result back to all processes.
* @param device ID of the device.
* @param send_receive_buffer Buffer storing the data.
* @param count Number of elements in the buffer.
*/
template <Operation op>
inline void AllReduce(int device, std::int8_t *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kInt8, op);
}
template <Operation op>
inline void AllReduce(int device, std::uint8_t *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kUInt8, op);
}
template <Operation op>
inline void AllReduce(int device, std::int32_t *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kInt32, op);
}
template <Operation op>
inline void AllReduce(int device, std::uint32_t *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kUInt32, op);
}
template <Operation op>
inline void AllReduce(int device, std::int64_t *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kInt64, op);
}
template <Operation op>
inline void AllReduce(int device, std::uint64_t *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kUInt64, op);
}
template <Operation op>
inline void AllReduce(int device, float *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kFloat, op);
}
template <Operation op>
inline void AllReduce(int device, double *send_receive_buffer, size_t count) {
Communicator::GetDevice(device)->AllReduce(send_receive_buffer, count, DataType::kDouble, op);
}
/**
* @brief Gather variable-length values from all processes.
* @param device ID of the device.
* @param send_buffer Buffer storing the input data.
* @param length_bytes Length in bytes of the input data.
* @param segments Size of each segment.
* @param receive_buffer Buffer storing the output data.
*/
inline void AllGatherV(int device, void const *send_buffer, size_t length_bytes,
std::vector<size_t> *segments,
dh::caching_device_vector<char> *receive_buffer) {
Communicator::GetDevice(device)->AllGatherV(send_buffer, length_bytes, segments, receive_buffer);
}
/**
* @brief Synchronize device operations.
* @param device ID of the device.
*/
inline void Synchronize(int device) { Communicator::GetDevice(device)->Synchronize(); }
} // namespace collective
} // namespace xgboost

View File

@ -0,0 +1,7 @@
/*!
* Copyright 2022 XGBoost contributors
*/
#pragma once
#include "communicator-inl.cuh"

View File

@ -17,32 +17,15 @@ class DeviceCommunicator {
virtual ~DeviceCommunicator() = default; virtual ~DeviceCommunicator() = default;
/** /**
* @brief Sum values from all processes and distribute the result back to all processes. * @brief Combines values from all processes and distributes the result back to all processes.
*
* @param send_receive_buffer Buffer storing the data. * @param send_receive_buffer Buffer storing the data.
* @param count Number of elements in the buffer. * @param count Number of elements in the buffer.
* @param data_type Data type stored in the buffer.
* @param op The operation to perform.
*/ */
virtual void AllReduceSum(float *send_receive_buffer, size_t count) = 0; virtual void AllReduce(void *send_receive_buffer, std::size_t count, DataType data_type,
Operation op) = 0;
/**
* @brief Sum values from all processes and distribute the result back to all processes.
* @param send_receive_buffer Buffer storing the data.
* @param count Number of elements in the buffer.
*/
virtual void AllReduceSum(double *send_receive_buffer, size_t count) = 0;
/**
* @brief Sum values from all processes and distribute the result back to all processes.
* @param send_receive_buffer Buffer storing the data.
* @param count Number of elements in the buffer.
*/
virtual void AllReduceSum(int64_t *send_receive_buffer, size_t count) = 0;
/**
* @brief Sum values from all processes and distribute the result back to all processes.
* @param send_receive_buffer Buffer storing the data.
* @param count Number of elements in the buffer.
*/
virtual void AllReduceSum(uint64_t *send_receive_buffer, size_t count) = 0;
/** /**
* @brief Gather variable-length values from all processes. * @brief Gather variable-length values from all processes.

View File

@ -23,20 +23,28 @@ class DeviceCommunicatorAdapter : public DeviceCommunicator {
~DeviceCommunicatorAdapter() override = default; ~DeviceCommunicatorAdapter() override = default;
void AllReduceSum(float *send_receive_buffer, size_t count) override { void AllReduce(void *send_receive_buffer, std::size_t count, DataType data_type,
DoAllReduceSum<collective::DataType::kFloat>(send_receive_buffer, count); Operation op) override {
} if (communicator_->GetWorldSize() == 1) {
return;
}
void AllReduceSum(double *send_receive_buffer, size_t count) override { #if defined(XGBOOST_USE_CUDA)
DoAllReduceSum<collective::DataType::kDouble>(send_receive_buffer, count); dh::safe_cuda(cudaSetDevice(device_ordinal_));
} #elif defined(XGBOOST_USE_HIP)
dh::safe_cuda(hipSetDevice(device_ordinal_));
void AllReduceSum(int64_t *send_receive_buffer, size_t count) override { #endif
DoAllReduceSum<collective::DataType::kInt64>(send_receive_buffer, count); auto size = count * GetTypeSize(data_type);
} host_buffer_.reserve(size);
#if defined(XGBOOST_USE_CUDA)
void AllReduceSum(uint64_t *send_receive_buffer, size_t count) override { dh::safe_cuda(cudaMemcpy(host_buffer_.data(), send_receive_buffer, size, cudaMemcpyDefault));
DoAllReduceSum<collective::DataType::kUInt64>(send_receive_buffer, count); communicator_->AllReduce(host_buffer_.data(), count, data_type, op);
dh::safe_cuda(cudaMemcpy(send_receive_buffer, host_buffer_.data(), size, cudaMemcpyDefault));
#elif defined(XGBOOST_USE_HIP)
dh::safe_cuda(hipMemcpy(host_buffer_.data(), send_receive_buffer, size, hipMemcpyDefault));
communicator_->AllReduce(host_buffer_.data(), count, data_type, op);
dh::safe_cuda(hipMemcpy(send_receive_buffer, host_buffer_.data(), size, hipMemcpyDefault));
#endif
} }
void AllGatherV(void const *send_buffer, size_t length_bytes, std::vector<std::size_t> *segments, void AllGatherV(void const *send_buffer, size_t length_bytes, std::vector<std::size_t> *segments,
@ -93,32 +101,6 @@ class DeviceCommunicatorAdapter : public DeviceCommunicator {
} }
private: private:
template <collective::DataType data_type, typename T>
void DoAllReduceSum(T *send_receive_buffer, size_t count) {
if (communicator_->GetWorldSize() == 1) {
return;
}
#if defined(XGBOOST_USE_HIP)
dh::safe_cuda(hipSetDevice(device_ordinal_));
#else
dh::safe_cuda(cudaSetDevice(device_ordinal_));
#endif
auto size = count * sizeof(T);
host_buffer_.reserve(size);
#if defined(XGBOOST_USE_HIP)
dh::safe_cuda(hipMemcpy(host_buffer_.data(), send_receive_buffer, size, hipMemcpyDefault));
communicator_->AllReduce(host_buffer_.data(), count, data_type, collective::Operation::kSum);
dh::safe_cuda(hipMemcpy(send_receive_buffer, host_buffer_.data(), size, hipMemcpyDefault));
#else
dh::safe_cuda(cudaMemcpy(host_buffer_.data(), send_receive_buffer, size, cudaMemcpyDefault));
communicator_->AllReduce(host_buffer_.data(), count, data_type, collective::Operation::kSum);
dh::safe_cuda(cudaMemcpy(send_receive_buffer, host_buffer_.data(), size, cudaMemcpyDefault));
#endif
}
int const device_ordinal_; int const device_ordinal_;
Communicator *communicator_; Communicator *communicator_;
/// Host buffer used to call communicator functions. /// Host buffer used to call communicator functions.

View File

@ -81,20 +81,18 @@ class NcclDeviceCommunicator : public DeviceCommunicator {
} }
} }
void AllReduceSum(float *send_receive_buffer, size_t count) override { void AllReduce(void *send_receive_buffer, std::size_t count, DataType data_type,
DoAllReduceSum<ncclFloat>(send_receive_buffer, count); Operation op) override {
} if (communicator_->GetWorldSize() == 1) {
return;
}
void AllReduceSum(double *send_receive_buffer, size_t count) override { dh::safe_cuda(cudaSetDevice(device_ordinal_));
DoAllReduceSum<ncclDouble>(send_receive_buffer, count); dh::safe_nccl(ncclAllReduce(send_receive_buffer, send_receive_buffer, count,
} GetNcclDataType(data_type), GetNcclRedOp(op), nccl_comm_,
cuda_stream_));
void AllReduceSum(int64_t *send_receive_buffer, size_t count) override { allreduce_bytes_ += count * GetTypeSize(data_type);
DoAllReduceSum<ncclInt64>(send_receive_buffer, count); allreduce_calls_ += 1;
}
void AllReduceSum(uint64_t *send_receive_buffer, size_t count) override {
DoAllReduceSum<ncclUint64>(send_receive_buffer, count);
} }
void AllGatherV(void const *send_buffer, size_t length_bytes, std::vector<std::size_t> *segments, void AllGatherV(void const *send_buffer, size_t length_bytes, std::vector<std::size_t> *segments,
@ -192,22 +190,59 @@ class NcclDeviceCommunicator : public DeviceCommunicator {
return id; return id;
} }
template <ncclDataType_t data_type, typename T> static ncclDataType_t GetNcclDataType(DataType const &data_type) {
void DoAllReduceSum(T *send_receive_buffer, size_t count) { ncclDataType_t result;
if (communicator_->GetWorldSize() == 1) { switch (data_type) {
return; case DataType::kInt8:
result = ncclInt8;
break;
case DataType::kUInt8:
result = ncclUint8;
break;
case DataType::kInt32:
result = ncclInt32;
break;
case DataType::kUInt32:
result = ncclUint32;
break;
case DataType::kInt64:
result = ncclInt64;
break;
case DataType::kUInt64:
result = ncclUint64;
break;
case DataType::kFloat:
result = ncclFloat;
break;
case DataType::kDouble:
result = ncclDouble;
break;
default:
LOG(FATAL) << "Unknown data type.";
} }
return result;
}
#if defined(XGBOOST_USE_HIP) static ncclRedOp_t GetNcclRedOp(Operation const &op) {
dh::safe_cuda(hipSetDevice(device_ordinal_)); ncclRedOp_t result;
#else switch (op) {
dh::safe_cuda(cudaSetDevice(device_ordinal_)); case Operation::kMax:
#endif result = ncclMax;
break;
dh::safe_nccl(ncclAllReduce(send_receive_buffer, send_receive_buffer, count, data_type, ncclSum, case Operation::kMin:
nccl_comm_, cuda_stream_)); result = ncclMin;
allreduce_bytes_ += count * sizeof(T); break;
allreduce_calls_ += 1; case Operation::kSum:
result = ncclSum;
break;
case Operation::kBitwiseAND:
case Operation::kBitwiseOR:
case Operation::kBitwiseXOR:
LOG(FATAL) << "Not implemented yet.";
default:
LOG(FATAL) << "Unknown reduce operation.";
}
return result;
} }
int const device_ordinal_; int const device_ordinal_;

View File

@ -12,8 +12,7 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include "../collective/communicator.h" #include "../collective/communicator-inl.cuh"
#include "../collective/device_communicator.cuh"
#include "categorical.h" #include "categorical.h"
#include "common.h" #include "common.h"
#include "device_helpers.cuh" #include "device_helpers.cuh"
@ -600,7 +599,6 @@ void SketchContainer::AllReduce() {
} }
timer_.Start(__func__); timer_.Start(__func__);
auto* communicator = collective::Communicator::GetDevice(device_);
// Reduce the overhead on syncing. // Reduce the overhead on syncing.
size_t global_sum_rows = num_rows_; size_t global_sum_rows = num_rows_;
collective::Allreduce<collective::Operation::kSum>(&global_sum_rows, 1); collective::Allreduce<collective::Operation::kSum>(&global_sum_rows, 1);
@ -621,14 +619,15 @@ void SketchContainer::AllReduce() {
auto offset = rank * d_columns_ptr.size(); auto offset = rank * d_columns_ptr.size();
thrust::copy(thrust::device, d_columns_ptr.data(), d_columns_ptr.data() + d_columns_ptr.size(), thrust::copy(thrust::device, d_columns_ptr.data(), d_columns_ptr.data() + d_columns_ptr.size(),
gathered_ptrs.begin() + offset); gathered_ptrs.begin() + offset);
communicator->AllReduceSum(gathered_ptrs.data().get(), gathered_ptrs.size()); collective::AllReduce<collective::Operation::kSum>(device_, gathered_ptrs.data().get(),
gathered_ptrs.size());
// Get the data from all workers. // Get the data from all workers.
std::vector<size_t> recv_lengths; std::vector<size_t> recv_lengths;
dh::caching_device_vector<char> recvbuf; dh::caching_device_vector<char> recvbuf;
communicator->AllGatherV(this->Current().data().get(), dh::ToSpan(this->Current()).size_bytes(), collective::AllGatherV(device_, this->Current().data().get(),
&recv_lengths, &recvbuf); dh::ToSpan(this->Current()).size_bytes(), &recv_lengths, &recvbuf);
communicator->Synchronize(); collective::Synchronize(device_);
// Segment the received data. // Segment the received data.
auto s_recvbuf = dh::ToSpan(recvbuf); auto s_recvbuf = dh::ToSpan(recvbuf);

View File

@ -17,7 +17,7 @@
#include <tuple> #include <tuple>
#include <utility> #include <utility>
#include "../collective/device_communicator.cuh" #include "../collective/communicator-inl.cuh"
#include "../common/algorithm.cuh" // SegmentedArgSort #include "../common/algorithm.cuh" // SegmentedArgSort
#include "../common/optional_weight.h" // OptionalWeights #include "../common/optional_weight.h" // OptionalWeights
#include "../common/threading_utils.cuh" // UnravelTrapeziodIdx,SegmentedTrapezoidThreads #include "../common/threading_utils.cuh" // UnravelTrapeziodIdx,SegmentedTrapezoidThreads
@ -231,8 +231,7 @@ double ScaleClasses(common::Span<double> results, common::Span<double> local_are
if (collective::IsDistributed()) { if (collective::IsDistributed()) {
int32_t device = dh::CurrentDevice(); int32_t device = dh::CurrentDevice();
CHECK_EQ(dh::CudaGetPointerDevice(results.data()), device); CHECK_EQ(dh::CudaGetPointerDevice(results.data()), device);
auto* communicator = collective::Communicator::GetDevice(device); collective::AllReduce<collective::Operation::kSum>(device, results.data(), results.size());
communicator->AllReduceSum(results.data(), results.size());
} }
auto reduce_in = dh::MakeTransformIterator<Pair>( auto reduce_in = dh::MakeTransformIterator<Pair>(
thrust::make_counting_iterator(0), [=] XGBOOST_DEVICE(size_t i) { thrust::make_counting_iterator(0), [=] XGBOOST_DEVICE(size_t i) {

View File

@ -11,7 +11,7 @@
#include <cstddef> // std::size_t #include <cstddef> // std::size_t
#include "../collective/device_communicator.cuh" // DeviceCommunicator #include "../collective/communicator-inl.cuh"
#include "../common/device_helpers.cuh" // dh::MakeTransformIterator #include "../common/device_helpers.cuh" // dh::MakeTransformIterator
#include "fit_stump.h" #include "fit_stump.h"
#include "xgboost/base.h" // GradientPairPrecise, GradientPair, XGBOOST_DEVICE #include "xgboost/base.h" // GradientPairPrecise, GradientPair, XGBOOST_DEVICE
@ -55,8 +55,8 @@ void FitStump(Context const* ctx, linalg::TensorView<GradientPair const, 2> gpai
thrust::reduce_by_key(policy, key_it, key_it + gpair.Size(), grad_it, thrust::reduce_by_key(policy, key_it, key_it + gpair.Size(), grad_it,
thrust::make_discard_iterator(), dh::tbegin(d_sum.Values())); thrust::make_discard_iterator(), dh::tbegin(d_sum.Values()));
collective::DeviceCommunicator* communicator = collective::Communicator::GetDevice(ctx->gpu_id); collective::AllReduce<collective::Operation::kSum>(
communicator->AllReduceSum(reinterpret_cast<double*>(d_sum.Values().data()), d_sum.Size() * 2); ctx->gpu_id, reinterpret_cast<double*>(d_sum.Values().data()), d_sum.Size() * 2);
thrust::for_each_n(policy, thrust::make_counting_iterator(0ul), n_targets, thrust::for_each_n(policy, thrust::make_counting_iterator(0ul), n_targets,
[=] XGBOOST_DEVICE(std::size_t i) mutable { [=] XGBOOST_DEVICE(std::size_t i) mutable {

View File

@ -42,12 +42,6 @@ struct GPUExpandEntry {
return true; return true;
} }
static bool ChildIsValid(const TrainParam& param, int depth, int num_leaves) {
if (param.max_depth > 0 && depth >= param.max_depth) return false;
if (param.max_leaves > 0 && num_leaves >= param.max_leaves) return false;
return true;
}
bst_float GetLossChange() const { bst_float GetLossChange() const {
return split.loss_chg; return split.loss_chg;
} }

View File

@ -26,12 +26,6 @@ struct ExpandEntryImpl {
} }
[[nodiscard]] bst_node_t GetNodeId() const { return nid; } [[nodiscard]] bst_node_t GetNodeId() const { return nid; }
static bool ChildIsValid(TrainParam const& param, bst_node_t depth, bst_node_t num_leaves) {
if (param.max_depth > 0 && depth >= param.max_depth) return false;
if (param.max_leaves > 0 && num_leaves >= param.max_leaves) return false;
return true;
}
[[nodiscard]] bool IsValid(TrainParam const& param, bst_node_t num_leaves) const { [[nodiscard]] bool IsValid(TrainParam const& param, bst_node_t num_leaves) const {
return static_cast<Impl const*>(this)->IsValidImpl(param, num_leaves); return static_cast<Impl const*>(this)->IsValidImpl(param, num_leaves);
} }

View File

@ -12,7 +12,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "../collective/device_communicator.cuh" #include "../collective/communicator-inl.cuh"
#include "../common/bitfield.h" #include "../common/bitfield.h"
#include "../common/categorical.h" #include "../common/categorical.h"
@ -605,12 +605,13 @@ struct GPUHistMakerDevice {
} }
// num histograms is the number of contiguous histograms in memory to reduce over // num histograms is the number of contiguous histograms in memory to reduce over
void AllReduceHist(int nidx, collective::DeviceCommunicator* communicator, int num_histograms) { void AllReduceHist(int nidx, int num_histograms) {
monitor.Start("AllReduce"); monitor.Start("AllReduce");
auto d_node_hist = hist.GetNodeHistogram(nidx).data(); auto d_node_hist = hist.GetNodeHistogram(nidx).data();
using ReduceT = typename std::remove_pointer<decltype(d_node_hist)>::type::ValueT; using ReduceT = typename std::remove_pointer<decltype(d_node_hist)>::type::ValueT;
communicator->AllReduceSum(reinterpret_cast<ReduceT*>(d_node_hist), collective::AllReduce<collective::Operation::kSum>(
page->Cuts().TotalBins() * 2 * num_histograms); ctx_->gpu_id, reinterpret_cast<ReduceT*>(d_node_hist),
page->Cuts().TotalBins() * 2 * num_histograms);
monitor.Stop("AllReduce"); monitor.Stop("AllReduce");
} }
@ -618,8 +619,7 @@ struct GPUHistMakerDevice {
/** /**
* \brief Build GPU local histograms for the left and right child of some parent node * \brief Build GPU local histograms for the left and right child of some parent node
*/ */
void BuildHistLeftRight(std::vector<GPUExpandEntry> const& candidates, void BuildHistLeftRight(std::vector<GPUExpandEntry> const& candidates, const RegTree& tree) {
collective::DeviceCommunicator* communicator, const RegTree& tree) {
if (candidates.empty()) return; if (candidates.empty()) return;
// Some nodes we will manually compute histograms // Some nodes we will manually compute histograms
// others we will do by subtraction // others we will do by subtraction
@ -650,7 +650,7 @@ struct GPUHistMakerDevice {
// Reduce all in one go // Reduce all in one go
// This gives much better latency in a distributed setting // This gives much better latency in a distributed setting
// when processing a large batch // when processing a large batch
this->AllReduceHist(hist_nidx.at(0), communicator, hist_nidx.size()); this->AllReduceHist(hist_nidx.at(0), hist_nidx.size());
for (size_t i = 0; i < subtraction_nidx.size(); i++) { for (size_t i = 0; i < subtraction_nidx.size(); i++) {
auto build_hist_nidx = hist_nidx.at(i); auto build_hist_nidx = hist_nidx.at(i);
@ -660,7 +660,7 @@ struct GPUHistMakerDevice {
if (!this->SubtractionTrick(parent_nidx, build_hist_nidx, subtraction_trick_nidx)) { if (!this->SubtractionTrick(parent_nidx, build_hist_nidx, subtraction_trick_nidx)) {
// Calculate other histogram manually // Calculate other histogram manually
this->BuildHist(subtraction_trick_nidx); this->BuildHist(subtraction_trick_nidx);
this->AllReduceHist(subtraction_trick_nidx, communicator, 1); this->AllReduceHist(subtraction_trick_nidx, 1);
} }
} }
} }
@ -718,7 +718,7 @@ struct GPUHistMakerDevice {
parent.RightChild()); parent.RightChild());
} }
GPUExpandEntry InitRoot(RegTree* p_tree, collective::DeviceCommunicator* communicator) { GPUExpandEntry InitRoot(RegTree* p_tree) {
constexpr bst_node_t kRootNIdx = 0; constexpr bst_node_t kRootNIdx = 0;
dh::XGBCachingDeviceAllocator<char> alloc; dh::XGBCachingDeviceAllocator<char> alloc;
auto quantiser = *this->quantiser; auto quantiser = *this->quantiser;
@ -735,7 +735,7 @@ struct GPUHistMakerDevice {
hist.AllocateHistograms({kRootNIdx}); hist.AllocateHistograms({kRootNIdx});
this->BuildHist(kRootNIdx); this->BuildHist(kRootNIdx);
this->AllReduceHist(kRootNIdx, communicator, 1); this->AllReduceHist(kRootNIdx, 1);
// Remember root stats // Remember root stats
auto root_sum = quantiser.ToFloatingPoint(root_sum_quantised); auto root_sum = quantiser.ToFloatingPoint(root_sum_quantised);
@ -751,7 +751,6 @@ struct GPUHistMakerDevice {
void UpdateTree(HostDeviceVector<GradientPair>* gpair_all, DMatrix* p_fmat, void UpdateTree(HostDeviceVector<GradientPair>* gpair_all, DMatrix* p_fmat,
ObjInfo const* task, RegTree* p_tree, ObjInfo const* task, RegTree* p_tree,
collective::DeviceCommunicator* communicator,
HostDeviceVector<bst_node_t>* p_out_position) { HostDeviceVector<bst_node_t>* p_out_position) {
auto& tree = *p_tree; auto& tree = *p_tree;
// Process maximum 32 nodes at a time // Process maximum 32 nodes at a time
@ -762,7 +761,7 @@ struct GPUHistMakerDevice {
monitor.Stop("Reset"); monitor.Stop("Reset");
monitor.Start("InitRoot"); monitor.Start("InitRoot");
driver.Push({ this->InitRoot(p_tree, communicator) }); driver.Push({this->InitRoot(p_tree)});
monitor.Stop("InitRoot"); monitor.Stop("InitRoot");
// The set of leaves that can be expanded asynchronously // The set of leaves that can be expanded asynchronously
@ -789,7 +788,7 @@ struct GPUHistMakerDevice {
monitor.Stop("UpdatePosition"); monitor.Stop("UpdatePosition");
monitor.Start("BuildHist"); monitor.Start("BuildHist");
this->BuildHistLeftRight(filtered_expand_set, communicator, tree); this->BuildHistLeftRight(filtered_expand_set, tree);
monitor.Stop("BuildHist"); monitor.Stop("BuildHist");
monitor.Start("EvaluateSplits"); monitor.Start("EvaluateSplits");
@ -920,8 +919,7 @@ class GPUHistMaker : public TreeUpdater {
monitor_.Stop("InitData"); monitor_.Stop("InitData");
gpair->SetDevice(ctx_->gpu_id); gpair->SetDevice(ctx_->gpu_id);
auto* communicator = collective::Communicator::GetDevice(ctx_->gpu_id); maker->UpdateTree(gpair, p_fmat, task_, p_tree, p_out_position);
maker->UpdateTree(gpair, p_fmat, task_, p_tree, communicator, p_out_position);
} }
bool UpdatePredictionCache(const DMatrix* data, bool UpdatePredictionCache(const DMatrix* data,

View File

@ -80,8 +80,8 @@ STACK_PARAMS = {
"InstanceOperatingSystem": "linux", "InstanceOperatingSystem": "linux",
"InstanceType": "t3a.micro", "InstanceType": "t3a.micro",
"AgentsPerInstance": "1", "AgentsPerInstance": "1",
"MinSize": "1", "MinSize": "2",
"MaxSize": "1", "MaxSize": "2",
"OnDemandPercentage": "100", "OnDemandPercentage": "100",
"ScaleOutFactor": "1.0", "ScaleOutFactor": "1.0",
"ScaleInIdlePeriod": "60", # in seconds "ScaleInIdlePeriod": "60", # in seconds

View File

@ -9,8 +9,10 @@
#if defined(XGBOOST_USE_NCCL) #if defined(XGBOOST_USE_NCCL)
#include "../../../src/collective/nccl_device_communicator.cuh" #include "../../../src/collective/nccl_device_communicator.cuh"
#include "../../../src/collective/communicator-inl.cuh"
#elif defined(XGBOOST_USE_RCCL) #elif defined(XGBOOST_USE_RCCL)
#include "../../../src/collective/nccl_device_communicator.hip.h" #include "../../../src/collective/nccl_device_communicator.hip.h"
#include "../../../src/collective/communicator-inl.hip.h"
#endif #endif
namespace xgboost { namespace xgboost {

View File

@ -2,11 +2,11 @@
#include "test_quantile.h" #include "test_quantile.h"
#include "../helpers.h" #include "../helpers.h"
#if defined(XGBOOST_USE_CUDA) #if defined(XGBOOST_USE_CUDA)
#include "../../../src/collective/device_communicator.cuh" #include "../../../src/collective/communicator-inl.cuh"
#include "../../../src/common/hist_util.cuh" #include "../../../src/common/hist_util.cuh"
#include "../../../src/common/quantile.cuh" #include "../../../src/common/quantile.cuh"
#elif defined(XGBOOST_USE_HIP) #elif defined(XGBOOST_USE_HIP)
#include "../../../src/collective/device_communicator.hip.h" #include "../../../src/collective/communicator-inl.hip.h"
#include "../../../src/common/hist_util.hip.h" #include "../../../src/common/hist_util.hip.h"
#include "../../../src/common/quantile.hip.h" #include "../../../src/common/quantile.hip.h"
#endif #endif
@ -474,10 +474,9 @@ void TestSameOnAllWorkers(std::int32_t n_gpus) {
thrust::copy(thrust::device, local_data.data(), thrust::copy(thrust::device, local_data.data(),
local_data.data() + local_data.size(), local_data.data() + local_data.size(),
all_workers.begin() + local_data.size() * rank); all_workers.begin() + local_data.size() * rank);
collective::DeviceCommunicator* communicator = collective::Communicator::GetDevice(device); collective::AllReduce<collective::Operation::kSum>(device, all_workers.data().get(),
all_workers.size());
communicator->AllReduceSum(all_workers.data().get(), all_workers.size()); collective::Synchronize(device);
communicator->Synchronize();
auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float); auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float);
std::vector<float> h_base_line(base_line.size()); std::vector<float> h_base_line(base_line.size());

View File

@ -36,7 +36,7 @@ TEST_F(FederatedAdapterTest, DeviceAllReduceSum) {
int count = 3; int count = 3;
thrust::device_vector<double> buffer(count, 0); thrust::device_vector<double> buffer(count, 0);
thrust::sequence(buffer.begin(), buffer.end()); thrust::sequence(buffer.begin(), buffer.end());
adapter.AllReduceSum(buffer.data().get(), count); adapter.AllReduce(buffer.data().get(), count, DataType::kDouble, Operation::kSum);
thrust::host_vector<double> host_buffer = buffer; thrust::host_vector<double> host_buffer = buffer;
EXPECT_EQ(host_buffer.size(), count); EXPECT_EQ(host_buffer.size(), count);
for (auto i = 0; i < count; i++) { for (auto i = 0; i < count; i++) {