Compare commits

..

8 Commits

Author SHA1 Message Date
dependabot[bot]
42d8b06e0a Bump rapids-4-spark_2.12 from 23.04.1 to 23.06.0 in /jvm-packages
Bumps rapids-4-spark_2.12 from 23.04.1 to 23.06.0.

---
updated-dependencies:
- dependency-name: com.nvidia:rapids-4-spark_2.12
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-27 03:02:23 +00:00
Jiaming Yuan
cfa9c42eb4 Fix callback in AFT viz demo. (#9333)
* Fix callback in AFT viz demo.

- Update the callback function.
- Add lint check.
2023-06-26 22:35:02 +08:00
Jiaming Yuan
6efe7c129f [doc] Update reference in R vignettes. (#9323) 2023-06-26 18:32:11 +08:00
Jiaming Yuan
54da4b3185 Cleanup to prepare for using mmap pointer in external memory. (#9317)
- Update SparseDMatrix comment.
- Use a pointer in the bitfield. We will replace the `std::vector<bool>` in `ColumnMatrix` with bitfield.
- Clean up the page source. The timer is removed as it's inaccurate once we swap the mmap pointer into the page.
2023-06-22 06:43:11 +08:00
Jiaming Yuan
4066d68261 [doc] Clarify early stopping. (#9304) 2023-06-20 17:56:47 +08:00
Jiaming Yuan
6d22ea793c Test QDM with sparse data on CPU. (#9316) 2023-06-19 21:27:03 +08:00
Jiaming Yuan
ee6809e642 Use mmap for external memory. (#9282)
- Have basic infrastructure for mmap.
- Release file write handle.
2023-06-19 18:52:55 +08:00
Rong Ou
d8beb517ed Support bitwise allreduce in NCCL communicator (#9300) 2023-06-17 01:56:50 +08:00
41 changed files with 1212 additions and 649 deletions

View File

@@ -18,13 +18,11 @@
publisher={Institute of Mathematical Statistics}
}
@misc{
Bache+Lichman:2013 ,
author = "K. Bache and M. Lichman",
year = "2013",
title = "{UCI} Machine Learning Repository",
url = "http://archive.ics.uci.edu/ml/",
institution = "University of California, Irvine, School of Information and Computer Sciences"
url = "https://archive.ics.uci.edu/",
institution = "University of California, Irvine, School of Information and Computer Sciences"
}

View File

@@ -11,33 +11,43 @@ import numpy as np
import xgboost as xgb
plt.rcParams.update({'font.size': 13})
plt.rcParams.update({"font.size": 13})
# Function to visualize censored labels
def plot_censored_labels(X, y_lower, y_upper):
def replace_inf(x, target_value):
def plot_censored_labels(
X: np.ndarray, y_lower: np.ndarray, y_upper: np.ndarray
) -> None:
def replace_inf(x: np.ndarray, target_value: float) -> np.ndarray:
x[np.isinf(x)] = target_value
return x
plt.plot(X, y_lower, 'o', label='y_lower', color='blue')
plt.plot(X, y_upper, 'o', label='y_upper', color='fuchsia')
plt.vlines(X, ymin=replace_inf(y_lower, 0.01), ymax=replace_inf(y_upper, 1000),
label='Range for y', color='gray')
plt.plot(X, y_lower, "o", label="y_lower", color="blue")
plt.plot(X, y_upper, "o", label="y_upper", color="fuchsia")
plt.vlines(
X,
ymin=replace_inf(y_lower, 0.01),
ymax=replace_inf(y_upper, 1000.0),
label="Range for y",
color="gray",
)
# Toy data
X = np.array([1, 2, 3, 4, 5]).reshape((-1, 1))
INF = np.inf
y_lower = np.array([ 10, 15, -INF, 30, 100])
y_upper = np.array([INF, INF, 20, 50, INF])
y_lower = np.array([10, 15, -INF, 30, 100])
y_upper = np.array([INF, INF, 20, 50, INF])
# Visualize toy data
plt.figure(figsize=(5, 4))
plot_censored_labels(X, y_lower, y_upper)
plt.ylim((6, 200))
plt.legend(loc='lower right')
plt.title('Toy data')
plt.xlabel('Input feature')
plt.ylabel('Label')
plt.yscale('log')
plt.legend(loc="lower right")
plt.title("Toy data")
plt.xlabel("Input feature")
plt.ylabel("Label")
plt.yscale("log")
plt.tight_layout()
plt.show(block=True)
@@ -46,54 +56,83 @@ grid_pts = np.linspace(0.8, 5.2, 1000).reshape((-1, 1))
# Train AFT model using XGBoost
dmat = xgb.DMatrix(X)
dmat.set_float_info('label_lower_bound', y_lower)
dmat.set_float_info('label_upper_bound', y_upper)
params = {'max_depth': 3, 'objective':'survival:aft', 'min_child_weight': 0}
dmat.set_float_info("label_lower_bound", y_lower)
dmat.set_float_info("label_upper_bound", y_upper)
params = {"max_depth": 3, "objective": "survival:aft", "min_child_weight": 0}
accuracy_history = []
def plot_intermediate_model_callback(env):
"""Custom callback to plot intermediate models"""
# Compute y_pred = prediction using the intermediate model, at current boosting iteration
y_pred = env.model.predict(dmat)
# "Accuracy" = the number of data points whose ranged label (y_lower, y_upper) includes
# the corresponding predicted label (y_pred)
acc = np.sum(np.logical_and(y_pred >= y_lower, y_pred <= y_upper)/len(X) * 100)
accuracy_history.append(acc)
# Plot ranged labels as well as predictions by the model
plt.subplot(5, 3, env.iteration + 1)
plot_censored_labels(X, y_lower, y_upper)
y_pred_grid_pts = env.model.predict(xgb.DMatrix(grid_pts))
plt.plot(grid_pts, y_pred_grid_pts, 'r-', label='XGBoost AFT model', linewidth=4)
plt.title('Iteration {}'.format(env.iteration), x=0.5, y=0.8)
plt.xlim((0.8, 5.2))
plt.ylim((1 if np.min(y_pred) < 6 else 6, 200))
plt.yscale('log')
res = {}
plt.figure(figsize=(12,13))
bst = xgb.train(params, dmat, 15, [(dmat, 'train')], evals_result=res,
callbacks=[plot_intermediate_model_callback])
class PlotIntermediateModel(xgb.callback.TrainingCallback):
"""Custom callback to plot intermediate models."""
def __init__(self) -> None:
super().__init__()
def after_iteration(
self,
model: xgb.Booster,
epoch: int,
evals_log: xgb.callback.TrainingCallback.EvalsLog,
) -> bool:
"""Run after training is finished."""
# Compute y_pred = prediction using the intermediate model, at current boosting
# iteration
y_pred = model.predict(dmat)
# "Accuracy" = the number of data points whose ranged label (y_lower, y_upper)
# includes the corresponding predicted label (y_pred)
acc = np.sum(
np.logical_and(y_pred >= y_lower, y_pred <= y_upper) / len(X) * 100
)
accuracy_history.append(acc)
# Plot ranged labels as well as predictions by the model
plt.subplot(5, 3, epoch + 1)
plot_censored_labels(X, y_lower, y_upper)
y_pred_grid_pts = model.predict(xgb.DMatrix(grid_pts))
plt.plot(
grid_pts, y_pred_grid_pts, "r-", label="XGBoost AFT model", linewidth=4
)
plt.title("Iteration {}".format(epoch), x=0.5, y=0.8)
plt.xlim((0.8, 5.2))
plt.ylim((1 if np.min(y_pred) < 6 else 6, 200))
plt.yscale("log")
return False
res: xgb.callback.TrainingCallback.EvalsLog = {}
plt.figure(figsize=(12, 13))
bst = xgb.train(
params,
dmat,
15,
[(dmat, "train")],
evals_result=res,
callbacks=[PlotIntermediateModel()],
)
plt.tight_layout()
plt.legend(loc='lower center', ncol=4,
bbox_to_anchor=(0.5, 0),
bbox_transform=plt.gcf().transFigure)
plt.legend(
loc="lower center",
ncol=4,
bbox_to_anchor=(0.5, 0),
bbox_transform=plt.gcf().transFigure,
)
plt.tight_layout()
# Plot negative log likelihood over boosting iterations
plt.figure(figsize=(8,3))
plt.figure(figsize=(8, 3))
plt.subplot(1, 2, 1)
plt.plot(res['train']['aft-nloglik'], 'b-o', label='aft-nloglik')
plt.xlabel('# Boosting Iterations')
plt.legend(loc='best')
plt.plot(res["train"]["aft-nloglik"], "b-o", label="aft-nloglik")
plt.xlabel("# Boosting Iterations")
plt.legend(loc="best")
# Plot "accuracy" over boosting iterations
# "Accuracy" = the number of data points whose ranged label (y_lower, y_upper) includes
# the corresponding predicted label (y_pred)
plt.subplot(1, 2, 2)
plt.plot(accuracy_history, 'r-o', label='Accuracy (%)')
plt.xlabel('# Boosting Iterations')
plt.legend(loc='best')
plt.plot(accuracy_history, "r-o", label="Accuracy (%)")
plt.xlabel("# Boosting Iterations")
plt.legend(loc="best")
plt.tight_layout()
plt.show()

View File

@@ -82,10 +82,10 @@ def main(tmpdir: str) -> xgboost.Booster:
missing = np.NaN
Xy = xgboost.DMatrix(it, missing=missing, enable_categorical=False)
# Other tree methods including ``hist`` and ``gpu_hist`` also work, see tutorial in
# doc for details.
# Other tree methods including ``approx``, and ``gpu_hist`` are supported. GPU
# behaves differently than CPU tree methods. See tutorial in doc for details.
booster = xgboost.train(
{"tree_method": "approx", "max_depth": 2},
{"tree_method": "hist", "max_depth": 4},
Xy,
evals=[(Xy, "Train")],
num_boost_round=10,

View File

@@ -2,11 +2,25 @@
Using XGBoost External Memory Version
#####################################
XGBoost supports loading data from external memory using builtin data parser. And
starting from version 1.5, users can also define a custom iterator to load data in chunks.
The feature is still experimental and not yet ready for production use. In this tutorial
we will introduce both methods. Please note that training on data from external memory is
not supported by ``exact`` tree method.
When working with large datasets, training XGBoost models can be challenging as the entire
dataset needs to be loaded into memory. This can be costly and sometimes
infeasible. Staring from 1.5, users can define a custom iterator to load data in chunks
for running XGBoost algorithms. External memory can be used for both training and
prediction, but training is the primary use case and it will be our focus in this
tutorial. For prediction and evaluation, users can iterate through the data themseleves
while training requires the full dataset to be loaded into the memory.
During training, there are two different modes for external memory support available in
XGBoost, one for CPU-based algorithms like ``hist`` and ``approx``, another one for the
GPU-based training algorithm. We will introduce them in the following sections.
.. note::
Training on data from external memory is not supported by the ``exact`` tree method.
.. note::
The feature is still experimental as of 2.0. The performance is not well optimized.
*************
Data Iterator
@@ -15,8 +29,8 @@ Data Iterator
Starting from XGBoost 1.5, users can define their own data loader using Python or C
interface. There are some examples in the ``demo`` directory for quick start. This is a
generalized version of text input external memory, where users no longer need to prepare a
text file that XGBoost recognizes. To enable the feature, user need to define a data
iterator with 2 class methods ``next`` and ``reset`` then pass it into ``DMatrix``
text file that XGBoost recognizes. To enable the feature, users need to define a data
iterator with 2 class methods: ``next`` and ``reset``, then pass it into the ``DMatrix``
constructor.
.. code-block:: python
@@ -60,20 +74,96 @@ constructor.
# Other tree methods including ``hist`` and ``gpu_hist`` also work, but has some caveats
# as noted in following sections.
booster = xgboost.train({"tree_method": "approx"}, Xy)
booster = xgboost.train({"tree_method": "hist"}, Xy)
The above snippet is a simplified version of ``demo/guide-python/external_memory.py``. For
an example in C, please see ``demo/c-api/external-memory/``.
The above snippet is a simplified version of :ref:`sphx_glr_python_examples_external_memory.py`.
For an example in C, please see ``demo/c-api/external-memory/``. The iterator is the
common interface for using external memory with XGBoost, you can pass the resulting
``DMatrix`` object for training, prediction, and evaluation.
It is important to set the batch size based on the memory available. A good starting point
is to set the batch size to 10GB per batch if you have 64GB of memory. It is *not*
recommended to set small batch sizes like 32 samples per batch, as this can seriously hurt
performance in gradient boosting.
***********
CPU Version
***********
In the previous section, we demonstrated how to train a tree-based model using the
``hist`` tree method on a CPU. This method involves iterating through data batches stored
in a cache during tree construction. For optimal performance, we recommend using the
``grow_policy=depthwise`` setting, which allows XGBoost to build an entire layer of tree
nodes with only a few batch iterations. Conversely, using the ``lossguide`` policy
requires XGBoost to iterate over the data set for each tree node, resulting in slower
performance.
If external memory is used, the performance of CPU training is limited by IO
(input/output) speed. This means that the disk IO speed primarily determines the training
speed. During benchmarking, we used an NVMe connected to a PCIe-4 slot, other types of
storage can be too slow for practical usage. In addition, your system may perform caching
to reduce the overhead of file reading.
**********************************
GPU Version (GPU Hist tree method)
**********************************
External memory is supported by GPU algorithms (i.e. when ``tree_method`` is set to
``gpu_hist``). However, the algorithm used for GPU is different from the one used for
CPU. When training on a CPU, the tree method iterates through all batches from external
memory for each step of the tree construction algorithm. On the other hand, the GPU
algorithm concatenates all batches into one and stores it in GPU memory. To reduce overall
memory usage, users can utilize subsampling. The good news is that the GPU hist tree
method supports gradient-based sampling, enabling users to set a low sampling rate without
compromising accuracy.
.. code-block:: python
param = {
...
'subsample': 0.2,
'sampling_method': 'gradient_based',
}
For more information about the sampling algorithm and its use in external memory training,
see `this paper <https://arxiv.org/abs/2005.09148>`_.
.. warning::
When GPU is running out of memory during iteration on external memory, user might
recieve a segfault instead of an OOM exception.
*******
Remarks
*******
When using external memory with XBGoost, data is divided into smaller chunks so that only
a fraction of it needs to be stored in memory at any given time. It's important to note
that this method only applies to the predictor data (``X``), while other data, like labels
and internal runtime structures are concatenated. This means that memory reduction is most
effective when dealing with wide datasets where ``X`` is larger compared to other data
like ``y``, while it has little impact on slim datasets.
Starting with XGBoost 2.0, the implementation of external memory uses ``mmap``. It is not
yet tested against system errors like disconnected network devices (`SIGBUS`). Also, it's
worth noting that most tests have been conducted on Linux distributions.
Another important point to keep in mind is that creating the initial cache for XGBoost may
take some time. The interface to external memory is through custom iterators, which may or
may not be thread-safe. Therefore, initialization is performed sequentially.
****************
Text File Inputs
****************
There is no big difference between using external memory version and in-memory version.
The only difference is the filename format.
This is the original form of external memory support, users are encouraged to use custom
data iterator instead. There is no big difference between using external memory version of
text input and the in-memory version. The only difference is the filename format.
The external memory version takes in the following `URI <https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_ format:
The external memory version takes in the following `URI
<https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_ format:
.. code-block:: none
@@ -91,9 +181,8 @@ To load from csv files, use the following syntax:
where ``label_column`` should point to the csv column acting as the label.
To provide a simple example for illustration, extracting the code from
`demo/guide-python/external_memory.py <https://github.com/dmlc/xgboost/blob/master/demo/guide-python/external_memory.py>`_. If
you have a dataset stored in a file similar to ``agaricus.txt.train`` with LIBSVM format, the external memory support can be enabled by:
If you have a dataset stored in a file similar to ``demo/data/agaricus.txt.train`` with LIBSVM
format, the external memory support can be enabled by:
.. code-block:: python
@@ -104,35 +193,3 @@ XGBoost will first load ``agaricus.txt.train`` in, preprocess it, then write to
more notes about text input formats, see :doc:`/tutorials/input_format`.
For CLI version, simply add the cache suffix, e.g. ``"../data/agaricus.txt.train?format=libsvm#dtrain.cache"``.
**********************************
GPU Version (GPU Hist tree method)
**********************************
External memory is supported in GPU algorithms (i.e. when ``tree_method`` is set to ``gpu_hist``).
If you are still getting out-of-memory errors after enabling external memory, try subsampling the
data to further reduce GPU memory usage:
.. code-block:: python
param = {
...
'subsample': 0.1,
'sampling_method': 'gradient_based',
}
For more information, see `this paper <https://arxiv.org/abs/2005.09148>`_. Internally
the tree method still concatenate all the chunks into 1 final histogram index due to
performance reason, but in compressed format. So its scalability has an upper bound but
still has lower memory cost in general.
***********
CPU Version
***********
For CPU histogram based tree methods (``approx``, ``hist``) it's recommended to use
``grow_policy=depthwise`` for performance reason. Iterating over data batches is slow,
with ``depthwise`` policy XGBoost can build a entire layer of tree nodes with a few
iterations, while with ``lossguide`` XGBoost needs to iterate over the data set for each
tree node.

View File

@@ -44,10 +44,10 @@
<log.capi.invocation>OFF</log.capi.invocation>
<use.cuda>OFF</use.cuda>
<cudf.version>23.04.0</cudf.version>
<spark.rapids.version>23.04.1</spark.rapids.version>
<spark.rapids.version>23.06.0</spark.rapids.version>
<cudf.classifier>cuda11</cudf.classifier>
<scalatest.version>3.2.16</scalatest.version>
<scala-collection-compat.version>2.11.0</scala-collection-compat.version>
<scala-collection-compat.version>2.10.0</scala-collection-compat.version>
</properties>
<repositories>
<repository>

View File

@@ -381,17 +381,21 @@ __model_doc = f"""
every **early_stopping_rounds** round(s) to continue training. Requires at
least one item in **eval_set** in :py:meth:`fit`.
- The method returns the model from the last iteration, not the best one, use a
callback :py:class:`xgboost.callback.EarlyStopping` if returning the best
model is preferred.
- If early stopping occurs, the model will have two additional attributes:
:py:attr:`best_score` and :py:attr:`best_iteration`. These are used by the
:py:meth:`predict` and :py:meth:`apply` methods to determine the optimal
number of trees during inference. If users want to access the full model
(including trees built after early stopping), they can specify the
`iteration_range` in these inference methods. In addition, other utilities
like model plotting can also use the entire model.
- If you prefer to discard the trees after `best_iteration`, consider using the
callback function :py:class:`xgboost.callback.EarlyStopping`.
- If there's more than one item in **eval_set**, the last entry will be used for
early stopping. If there's more than one metric in **eval_metric**, the last
metric will be used for early stopping.
- If early stopping occurs, the model will have three additional fields:
:py:attr:`best_score`, :py:attr:`best_iteration`.
.. note::
This parameter replaces `early_stopping_rounds` in :py:meth:`fit` method.

View File

@@ -198,14 +198,14 @@ class IteratorForTest(xgb.core.DataIter):
X: Sequence,
y: Sequence,
w: Optional[Sequence],
cache: Optional[str] = "./",
cache: Optional[str],
) -> None:
assert len(X) == len(y)
self.X = X
self.y = y
self.w = w
self.it = 0
super().__init__(cache)
super().__init__(cache_prefix=cache)
def next(self, input_data: Callable) -> int:
if self.it == len(self.X):
@@ -347,7 +347,9 @@ class TestDataset:
if w is not None:
weight.append(w)
it = IteratorForTest(predictor, response, weight if weight else None)
it = IteratorForTest(
predictor, response, weight if weight else None, cache="cache"
)
return xgb.DMatrix(it)
def __repr__(self) -> str:

View File

@@ -1,18 +1,21 @@
/*!
* Copyright (c) 2014-2019 by Contributors
/**
* Copyright 2014-2023, XGBoost Contributors
* \file io.h
* \brief utilities with different serializable implementations
* \author Tianqi Chen
*/
#ifndef RABIT_INTERNAL_IO_H_
#define RABIT_INTERNAL_IO_H_
#include <cstdio>
#include <vector>
#include <cstring>
#include <string>
#include <algorithm>
#include <numeric>
#include <cstddef> // for size_t
#include <cstdio>
#include <cstring> // for memcpy
#include <limits>
#include <numeric>
#include <string>
#include <vector>
#include "rabit/internal/utils.h"
#include "rabit/serializable.h"
@@ -20,54 +23,61 @@ namespace rabit {
namespace utils {
/*! \brief re-use definition of dmlc::SeekStream */
using SeekStream = dmlc::SeekStream;
/*! \brief fixed size memory buffer */
/**
* @brief Fixed size memory buffer as a stream.
*/
struct MemoryFixSizeBuffer : public SeekStream {
public:
// similar to SEEK_END in libc
static size_t constexpr kSeekEnd = std::numeric_limits<size_t>::max();
static std::size_t constexpr kSeekEnd = std::numeric_limits<std::size_t>::max();
protected:
MemoryFixSizeBuffer() = default;
public:
MemoryFixSizeBuffer(void *p_buffer, size_t buffer_size)
: p_buffer_(reinterpret_cast<char*>(p_buffer)),
buffer_size_(buffer_size) {
curr_ptr_ = 0;
}
/**
* @brief Ctor
*
* @param p_buffer Pointer to the source buffer with size `buffer_size`.
* @param buffer_size Size of the source buffer
*/
MemoryFixSizeBuffer(void *p_buffer, std::size_t buffer_size)
: p_buffer_(reinterpret_cast<char *>(p_buffer)), buffer_size_(buffer_size) {}
~MemoryFixSizeBuffer() override = default;
size_t Read(void *ptr, size_t size) override {
size_t nread = std::min(buffer_size_ - curr_ptr_, size);
std::size_t Read(void *ptr, std::size_t size) override {
std::size_t nread = std::min(buffer_size_ - curr_ptr_, size);
if (nread != 0) std::memcpy(ptr, p_buffer_ + curr_ptr_, nread);
curr_ptr_ += nread;
return nread;
}
void Write(const void *ptr, size_t size) override {
void Write(const void *ptr, std::size_t size) override {
if (size == 0) return;
utils::Assert(curr_ptr_ + size <= buffer_size_,
"write position exceed fixed buffer size");
CHECK_LE(curr_ptr_ + size, buffer_size_);
std::memcpy(p_buffer_ + curr_ptr_, ptr, size);
curr_ptr_ += size;
}
void Seek(size_t pos) override {
void Seek(std::size_t pos) override {
if (pos == kSeekEnd) {
curr_ptr_ = buffer_size_;
} else {
curr_ptr_ = static_cast<size_t>(pos);
curr_ptr_ = static_cast<std::size_t>(pos);
}
}
size_t Tell() override {
return curr_ptr_;
}
virtual bool AtEnd() const {
return curr_ptr_ == buffer_size_;
}
/**
* @brief Current position in the buffer (stream).
*/
std::size_t Tell() override { return curr_ptr_; }
virtual bool AtEnd() const { return curr_ptr_ == buffer_size_; }
private:
protected:
/*! \brief in memory buffer */
char *p_buffer_;
char *p_buffer_{nullptr};
/*! \brief current pointer */
size_t buffer_size_;
std::size_t buffer_size_{0};
/*! \brief current pointer */
size_t curr_ptr_;
}; // class MemoryFixSizeBuffer
std::size_t curr_ptr_{0};
};
/*! \brief a in memory buffer that can be read and write as stream interface */
struct MemoryBufferStream : public SeekStream {

View File

@@ -0,0 +1,228 @@
/*!
* Copyright 2023 XGBoost contributors
*/
#if defined(XGBOOST_USE_NCCL)
#include "nccl_device_communicator.cuh"
namespace xgboost {
namespace collective {
NcclDeviceCommunicator::NcclDeviceCommunicator(int device_ordinal, Communicator *communicator)
: device_ordinal_{device_ordinal}, communicator_{communicator} {
if (device_ordinal_ < 0) {
LOG(FATAL) << "Invalid device ordinal: " << device_ordinal_;
}
if (communicator_ == nullptr) {
LOG(FATAL) << "Communicator cannot be null.";
}
int32_t const rank = communicator_->GetRank();
int32_t const world = communicator_->GetWorldSize();
if (world == 1) {
return;
}
std::vector<uint64_t> uuids(world * kUuidLength, 0);
auto s_uuid = xgboost::common::Span<uint64_t>{uuids.data(), uuids.size()};
auto s_this_uuid = s_uuid.subspan(rank * kUuidLength, kUuidLength);
GetCudaUUID(s_this_uuid);
// TODO(rongou): replace this with allgather.
communicator_->AllReduce(uuids.data(), uuids.size(), DataType::kUInt64, Operation::kSum);
std::vector<xgboost::common::Span<uint64_t, kUuidLength>> converted(world);
size_t j = 0;
for (size_t i = 0; i < uuids.size(); i += kUuidLength) {
converted[j] = xgboost::common::Span<uint64_t, kUuidLength>{uuids.data() + i, kUuidLength};
j++;
}
auto iter = std::unique(converted.begin(), converted.end());
auto n_uniques = std::distance(converted.begin(), iter);
CHECK_EQ(n_uniques, world)
<< "Multiple processes within communication group running on same CUDA "
<< "device is not supported. " << PrintUUID(s_this_uuid) << "\n";
nccl_unique_id_ = GetUniqueId();
dh::safe_cuda(cudaSetDevice(device_ordinal_));
dh::safe_nccl(ncclCommInitRank(&nccl_comm_, world, nccl_unique_id_, rank));
dh::safe_cuda(cudaStreamCreate(&cuda_stream_));
}
NcclDeviceCommunicator::~NcclDeviceCommunicator() {
if (communicator_->GetWorldSize() == 1) {
return;
}
if (cuda_stream_) {
dh::safe_cuda(cudaStreamDestroy(cuda_stream_));
}
if (nccl_comm_) {
dh::safe_nccl(ncclCommDestroy(nccl_comm_));
}
if (xgboost::ConsoleLogger::ShouldLog(xgboost::ConsoleLogger::LV::kDebug)) {
LOG(CONSOLE) << "======== NCCL Statistics========";
LOG(CONSOLE) << "AllReduce calls: " << allreduce_calls_;
LOG(CONSOLE) << "AllReduce total MiB communicated: " << allreduce_bytes_ / 1048576;
}
}
namespace {
ncclDataType_t GetNcclDataType(DataType const &data_type) {
ncclDataType_t result{ncclInt8};
switch (data_type) {
case DataType::kInt8:
result = ncclInt8;
break;
case DataType::kUInt8:
result = ncclUint8;
break;
case DataType::kInt32:
result = ncclInt32;
break;
case DataType::kUInt32:
result = ncclUint32;
break;
case DataType::kInt64:
result = ncclInt64;
break;
case DataType::kUInt64:
result = ncclUint64;
break;
case DataType::kFloat:
result = ncclFloat;
break;
case DataType::kDouble:
result = ncclDouble;
break;
default:
LOG(FATAL) << "Unknown data type.";
}
return result;
}
bool IsBitwiseOp(Operation const &op) {
return op == Operation::kBitwiseAND || op == Operation::kBitwiseOR ||
op == Operation::kBitwiseXOR;
}
ncclRedOp_t GetNcclRedOp(Operation const &op) {
ncclRedOp_t result{ncclMax};
switch (op) {
case Operation::kMax:
result = ncclMax;
break;
case Operation::kMin:
result = ncclMin;
break;
case Operation::kSum:
result = ncclSum;
break;
default:
LOG(FATAL) << "Unsupported reduce operation.";
}
return result;
}
template <typename Func>
void RunBitwiseAllreduce(char *out_buffer, char const *device_buffer, Func func, int world_size,
std::size_t size, cudaStream_t stream) {
dh::LaunchN(size, stream, [=] __device__(std::size_t idx) {
out_buffer[idx] = device_buffer[idx];
for (auto rank = 1; rank < world_size; rank++) {
out_buffer[idx] = func(out_buffer[idx], device_buffer[rank * size + idx]);
}
});
}
} // anonymous namespace
void NcclDeviceCommunicator::BitwiseAllReduce(void *send_receive_buffer, std::size_t count,
DataType data_type, Operation op) {
auto const world_size = communicator_->GetWorldSize();
auto const size = count * GetTypeSize(data_type);
dh::caching_device_vector<char> buffer(size * world_size);
auto *device_buffer = buffer.data().get();
// First gather data from all the workers.
dh::safe_nccl(ncclAllGather(send_receive_buffer, device_buffer, count, GetNcclDataType(data_type),
nccl_comm_, cuda_stream_));
// Then reduce locally.
auto *out_buffer = static_cast<char *>(send_receive_buffer);
switch (op) {
case Operation::kBitwiseAND:
RunBitwiseAllreduce(out_buffer, device_buffer, thrust::bit_and<char>(), world_size, size,
cuda_stream_);
break;
case Operation::kBitwiseOR:
RunBitwiseAllreduce(out_buffer, device_buffer, thrust::bit_or<char>(), world_size, size,
cuda_stream_);
break;
case Operation::kBitwiseXOR:
RunBitwiseAllreduce(out_buffer, device_buffer, thrust::bit_xor<char>(), world_size, size,
cuda_stream_);
break;
default:
LOG(FATAL) << "Not a bitwise reduce operation.";
}
}
void NcclDeviceCommunicator::AllReduce(void *send_receive_buffer, std::size_t count,
DataType data_type, Operation op) {
if (communicator_->GetWorldSize() == 1) {
return;
}
dh::safe_cuda(cudaSetDevice(device_ordinal_));
if (IsBitwiseOp(op)) {
BitwiseAllReduce(send_receive_buffer, count, data_type, op);
} else {
dh::safe_nccl(ncclAllReduce(send_receive_buffer, send_receive_buffer, count,
GetNcclDataType(data_type), GetNcclRedOp(op), nccl_comm_,
cuda_stream_));
}
allreduce_bytes_ += count * GetTypeSize(data_type);
allreduce_calls_ += 1;
}
void NcclDeviceCommunicator::AllGatherV(void const *send_buffer, size_t length_bytes,
std::vector<std::size_t> *segments,
dh::caching_device_vector<char> *receive_buffer) {
if (communicator_->GetWorldSize() == 1) {
return;
}
dh::safe_cuda(cudaSetDevice(device_ordinal_));
int const world_size = communicator_->GetWorldSize();
int const rank = communicator_->GetRank();
segments->clear();
segments->resize(world_size, 0);
segments->at(rank) = length_bytes;
communicator_->AllReduce(segments->data(), segments->size(), DataType::kUInt64, Operation::kMax);
auto total_bytes = std::accumulate(segments->cbegin(), segments->cend(), 0UL);
receive_buffer->resize(total_bytes);
size_t offset = 0;
dh::safe_nccl(ncclGroupStart());
for (int32_t i = 0; i < world_size; ++i) {
size_t as_bytes = segments->at(i);
dh::safe_nccl(ncclBroadcast(send_buffer, receive_buffer->data().get() + offset, as_bytes,
ncclChar, i, nccl_comm_, cuda_stream_));
offset += as_bytes;
}
dh::safe_nccl(ncclGroupEnd());
}
void NcclDeviceCommunicator::Synchronize() {
if (communicator_->GetWorldSize() == 1) {
return;
}
dh::safe_cuda(cudaSetDevice(device_ordinal_));
dh::safe_cuda(cudaStreamSynchronize(cuda_stream_));
}
} // namespace collective
} // namespace xgboost
#endif

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2022 XGBoost contributors
* Copyright 2022-2023 XGBoost contributors
*/
#pragma once
@@ -12,116 +12,13 @@ namespace collective {
class NcclDeviceCommunicator : public DeviceCommunicator {
public:
NcclDeviceCommunicator(int device_ordinal, Communicator *communicator)
: device_ordinal_{device_ordinal}, communicator_{communicator} {
if (device_ordinal_ < 0) {
LOG(FATAL) << "Invalid device ordinal: " << device_ordinal_;
}
if (communicator_ == nullptr) {
LOG(FATAL) << "Communicator cannot be null.";
}
int32_t const rank = communicator_->GetRank();
int32_t const world = communicator_->GetWorldSize();
if (world == 1) {
return;
}
std::vector<uint64_t> uuids(world * kUuidLength, 0);
auto s_uuid = xgboost::common::Span<uint64_t>{uuids.data(), uuids.size()};
auto s_this_uuid = s_uuid.subspan(rank * kUuidLength, kUuidLength);
GetCudaUUID(s_this_uuid);
// TODO(rongou): replace this with allgather.
communicator_->AllReduce(uuids.data(), uuids.size(), DataType::kUInt64, Operation::kSum);
std::vector<xgboost::common::Span<uint64_t, kUuidLength>> converted(world);
size_t j = 0;
for (size_t i = 0; i < uuids.size(); i += kUuidLength) {
converted[j] = xgboost::common::Span<uint64_t, kUuidLength>{uuids.data() + i, kUuidLength};
j++;
}
auto iter = std::unique(converted.begin(), converted.end());
auto n_uniques = std::distance(converted.begin(), iter);
CHECK_EQ(n_uniques, world)
<< "Multiple processes within communication group running on same CUDA "
<< "device is not supported. " << PrintUUID(s_this_uuid) << "\n";
nccl_unique_id_ = GetUniqueId();
dh::safe_nccl(ncclCommInitRank(&nccl_comm_, world, nccl_unique_id_, rank));
dh::safe_cuda(cudaStreamCreate(&cuda_stream_));
}
~NcclDeviceCommunicator() override {
if (communicator_->GetWorldSize() == 1) {
return;
}
if (cuda_stream_) {
dh::safe_cuda(cudaStreamDestroy(cuda_stream_));
}
if (nccl_comm_) {
dh::safe_nccl(ncclCommDestroy(nccl_comm_));
}
if (xgboost::ConsoleLogger::ShouldLog(xgboost::ConsoleLogger::LV::kDebug)) {
LOG(CONSOLE) << "======== NCCL Statistics========";
LOG(CONSOLE) << "AllReduce calls: " << allreduce_calls_;
LOG(CONSOLE) << "AllReduce total MiB communicated: " << allreduce_bytes_ / 1048576;
}
}
NcclDeviceCommunicator(int device_ordinal, Communicator *communicator);
~NcclDeviceCommunicator() override;
void AllReduce(void *send_receive_buffer, std::size_t count, DataType data_type,
Operation op) override {
if (communicator_->GetWorldSize() == 1) {
return;
}
dh::safe_cuda(cudaSetDevice(device_ordinal_));
dh::safe_nccl(ncclAllReduce(send_receive_buffer, send_receive_buffer, count,
GetNcclDataType(data_type), GetNcclRedOp(op), nccl_comm_,
cuda_stream_));
allreduce_bytes_ += count * GetTypeSize(data_type);
allreduce_calls_ += 1;
}
Operation op) override;
void AllGatherV(void const *send_buffer, size_t length_bytes, std::vector<std::size_t> *segments,
dh::caching_device_vector<char> *receive_buffer) override {
if (communicator_->GetWorldSize() == 1) {
return;
}
dh::safe_cuda(cudaSetDevice(device_ordinal_));
int const world_size = communicator_->GetWorldSize();
int const rank = communicator_->GetRank();
segments->clear();
segments->resize(world_size, 0);
segments->at(rank) = length_bytes;
communicator_->AllReduce(segments->data(), segments->size(), DataType::kUInt64,
Operation::kMax);
auto total_bytes = std::accumulate(segments->cbegin(), segments->cend(), 0UL);
receive_buffer->resize(total_bytes);
size_t offset = 0;
dh::safe_nccl(ncclGroupStart());
for (int32_t i = 0; i < world_size; ++i) {
size_t as_bytes = segments->at(i);
dh::safe_nccl(ncclBroadcast(send_buffer, receive_buffer->data().get() + offset, as_bytes,
ncclChar, i, nccl_comm_, cuda_stream_));
offset += as_bytes;
}
dh::safe_nccl(ncclGroupEnd());
}
void Synchronize() override {
if (communicator_->GetWorldSize() == 1) {
return;
}
dh::safe_cuda(cudaSetDevice(device_ordinal_));
dh::safe_cuda(cudaStreamSynchronize(cuda_stream_));
}
dh::caching_device_vector<char> *receive_buffer) override;
void Synchronize() override;
private:
static constexpr std::size_t kUuidLength =
@@ -160,60 +57,8 @@ class NcclDeviceCommunicator : public DeviceCommunicator {
return id;
}
static ncclDataType_t GetNcclDataType(DataType const &data_type) {
ncclDataType_t result;
switch (data_type) {
case DataType::kInt8:
result = ncclInt8;
break;
case DataType::kUInt8:
result = ncclUint8;
break;
case DataType::kInt32:
result = ncclInt32;
break;
case DataType::kUInt32:
result = ncclUint32;
break;
case DataType::kInt64:
result = ncclInt64;
break;
case DataType::kUInt64:
result = ncclUint64;
break;
case DataType::kFloat:
result = ncclFloat;
break;
case DataType::kDouble:
result = ncclDouble;
break;
default:
LOG(FATAL) << "Unknown data type.";
}
return result;
}
static ncclRedOp_t GetNcclRedOp(Operation const &op) {
ncclRedOp_t result;
switch (op) {
case Operation::kMax:
result = ncclMax;
break;
case Operation::kMin:
result = ncclMin;
break;
case Operation::kSum:
result = ncclSum;
break;
case Operation::kBitwiseAND:
case Operation::kBitwiseOR:
case Operation::kBitwiseXOR:
LOG(FATAL) << "Not implemented yet.";
default:
LOG(FATAL) << "Unknown reduce operation.";
}
return result;
}
void BitwiseAllReduce(void *send_receive_buffer, std::size_t count, DataType data_type,
Operation op);
int const device_ordinal_;
Communicator *communicator_;

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019 by Contributors
/**
* Copyright 2019-2023, XGBoost Contributors
* \file bitfield.h
*/
#ifndef XGBOOST_COMMON_BITFIELD_H_
@@ -50,14 +50,17 @@ __forceinline__ __device__ BitFieldAtomicType AtomicAnd(BitFieldAtomicType* addr
}
#endif // defined(__CUDACC__)
/*!
* \brief A non-owning type with auxiliary methods defined for manipulating bits.
/**
* @brief A non-owning type with auxiliary methods defined for manipulating bits.
*
* \tparam Direction Whether the bits start from left or from right.
* @tparam VT Underlying value type, must be an unsigned integer.
* @tparam Direction Whether the bits start from left or from right.
* @tparam IsConst Whether the view is const.
*/
template <typename VT, typename Direction, bool IsConst = false>
struct BitFieldContainer {
using value_type = std::conditional_t<IsConst, VT const, VT>; // NOLINT
using size_type = size_t; // NOLINT
using index_type = size_t; // NOLINT
using pointer = value_type*; // NOLINT
@@ -70,8 +73,9 @@ struct BitFieldContainer {
};
private:
common::Span<value_type> bits_;
static_assert(!std::is_signed<VT>::value, "Must use unsiged type as underlying storage.");
value_type* bits_{nullptr};
size_type n_values_{0};
static_assert(!std::is_signed<VT>::value, "Must use an unsiged type as the underlying storage.");
public:
XGBOOST_DEVICE static Pos ToBitPos(index_type pos) {
@@ -86,13 +90,15 @@ struct BitFieldContainer {
public:
BitFieldContainer() = default;
XGBOOST_DEVICE explicit BitFieldContainer(common::Span<value_type> bits) : bits_{bits} {}
XGBOOST_DEVICE BitFieldContainer(BitFieldContainer const& other) : bits_{other.bits_} {}
XGBOOST_DEVICE explicit BitFieldContainer(common::Span<value_type> bits)
: bits_{bits.data()}, n_values_{bits.size()} {}
BitFieldContainer(BitFieldContainer const& other) = default;
BitFieldContainer(BitFieldContainer&& other) = default;
BitFieldContainer &operator=(BitFieldContainer const &that) = default;
BitFieldContainer &operator=(BitFieldContainer &&that) = default;
XGBOOST_DEVICE common::Span<value_type> Bits() { return bits_; }
XGBOOST_DEVICE common::Span<value_type const> Bits() const { return bits_; }
XGBOOST_DEVICE auto Bits() { return common::Span<value_type>{bits_, NumValues()}; }
XGBOOST_DEVICE auto Bits() const { return common::Span<value_type const>{bits_, NumValues()}; }
/*\brief Compute the size of needed memory allocation. The returned value is in terms
* of number of elements with `BitFieldContainer::value_type'.
@@ -103,17 +109,17 @@ struct BitFieldContainer {
#if defined(__CUDA_ARCH__)
__device__ BitFieldContainer& operator|=(BitFieldContainer const& rhs) {
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
size_t min_size = min(bits_.size(), rhs.bits_.size());
size_t min_size = min(NumValues(), rhs.NumValues());
if (tid < min_size) {
bits_[tid] |= rhs.bits_[tid];
Data()[tid] |= rhs.Data()[tid];
}
return *this;
}
#else
BitFieldContainer& operator|=(BitFieldContainer const& rhs) {
size_t min_size = std::min(bits_.size(), rhs.bits_.size());
size_t min_size = std::min(NumValues(), rhs.NumValues());
for (size_t i = 0; i < min_size; ++i) {
bits_[i] |= rhs.bits_[i];
Data()[i] |= rhs.Data()[i];
}
return *this;
}
@@ -121,75 +127,85 @@ struct BitFieldContainer {
#if defined(__CUDA_ARCH__)
__device__ BitFieldContainer& operator&=(BitFieldContainer const& rhs) {
size_t min_size = min(bits_.size(), rhs.bits_.size());
size_t min_size = min(NumValues(), rhs.NumValues());
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < min_size) {
bits_[tid] &= rhs.bits_[tid];
Data()[tid] &= rhs.Data()[tid];
}
return *this;
}
#else
BitFieldContainer& operator&=(BitFieldContainer const& rhs) {
size_t min_size = std::min(bits_.size(), rhs.bits_.size());
size_t min_size = std::min(NumValues(), rhs.NumValues());
for (size_t i = 0; i < min_size; ++i) {
bits_[i] &= rhs.bits_[i];
Data()[i] &= rhs.Data()[i];
}
return *this;
}
#endif // defined(__CUDA_ARCH__)
#if defined(__CUDA_ARCH__)
__device__ auto Set(index_type pos) {
__device__ auto Set(index_type pos) noexcept(true) {
Pos pos_v = Direction::Shift(ToBitPos(pos));
value_type& value = bits_[pos_v.int_pos];
value_type& value = Data()[pos_v.int_pos];
value_type set_bit = kOne << pos_v.bit_pos;
using Type = typename dh::detail::AtomicDispatcher<sizeof(value_type)>::Type;
atomicOr(reinterpret_cast<Type *>(&value), set_bit);
}
__device__ void Clear(index_type pos) {
__device__ void Clear(index_type pos) noexcept(true) {
Pos pos_v = Direction::Shift(ToBitPos(pos));
value_type& value = bits_[pos_v.int_pos];
value_type& value = Data()[pos_v.int_pos];
value_type clear_bit = ~(kOne << pos_v.bit_pos);
using Type = typename dh::detail::AtomicDispatcher<sizeof(value_type)>::Type;
atomicAnd(reinterpret_cast<Type *>(&value), clear_bit);
}
#else
void Set(index_type pos) {
void Set(index_type pos) noexcept(true) {
Pos pos_v = Direction::Shift(ToBitPos(pos));
value_type& value = bits_[pos_v.int_pos];
value_type& value = Data()[pos_v.int_pos];
value_type set_bit = kOne << pos_v.bit_pos;
value |= set_bit;
}
void Clear(index_type pos) {
void Clear(index_type pos) noexcept(true) {
Pos pos_v = Direction::Shift(ToBitPos(pos));
value_type& value = bits_[pos_v.int_pos];
value_type& value = Data()[pos_v.int_pos];
value_type clear_bit = ~(kOne << pos_v.bit_pos);
value &= clear_bit;
}
#endif // defined(__CUDA_ARCH__)
XGBOOST_DEVICE bool Check(Pos pos_v) const {
XGBOOST_DEVICE bool Check(Pos pos_v) const noexcept(true) {
pos_v = Direction::Shift(pos_v);
SPAN_LT(pos_v.int_pos, bits_.size());
value_type const value = bits_[pos_v.int_pos];
assert(pos_v.int_pos < NumValues());
value_type const value = Data()[pos_v.int_pos];
value_type const test_bit = kOne << pos_v.bit_pos;
value_type result = test_bit & value;
return static_cast<bool>(result);
}
XGBOOST_DEVICE bool Check(index_type pos) const {
[[nodiscard]] XGBOOST_DEVICE bool Check(index_type pos) const noexcept(true) {
Pos pos_v = ToBitPos(pos);
return Check(pos_v);
}
/**
* @brief Returns the total number of bits that can be viewed. This is equal to or
* larger than the acutal number of valid bits.
*/
[[nodiscard]] XGBOOST_DEVICE size_type Capacity() const noexcept(true) {
return kValueSize * NumValues();
}
/**
* @brief Number of storage unit used in this bit field.
*/
[[nodiscard]] XGBOOST_DEVICE size_type NumValues() const noexcept(true) { return n_values_; }
XGBOOST_DEVICE size_t Size() const { return kValueSize * bits_.size(); }
XGBOOST_DEVICE pointer Data() const noexcept(true) { return bits_; }
XGBOOST_DEVICE pointer Data() const { return bits_.data(); }
inline friend std::ostream &
operator<<(std::ostream &os, BitFieldContainer<VT, Direction, IsConst> field) {
os << "Bits " << "storage size: " << field.bits_.size() << "\n";
for (typename common::Span<value_type>::index_type i = 0; i < field.bits_.size(); ++i) {
std::bitset<BitFieldContainer<VT, Direction, IsConst>::kValueSize> bset(field.bits_[i]);
inline friend std::ostream& operator<<(std::ostream& os,
BitFieldContainer<VT, Direction, IsConst> field) {
os << "Bits "
<< "storage size: " << field.NumValues() << "\n";
for (typename common::Span<value_type>::index_type i = 0; i < field.NumValues(); ++i) {
std::bitset<BitFieldContainer<VT, Direction, IsConst>::kValueSize> bset(field.Data()[i]);
os << bset << "\n";
}
return os;

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2020-2022 by XGBoost Contributors
/**
* Copyright 2020-2023, XGBoost Contributors
* \file categorical.h
*/
#ifndef XGBOOST_COMMON_CATEGORICAL_H_
@@ -10,7 +10,6 @@
#include "bitfield.h"
#include "xgboost/base.h"
#include "xgboost/data.h"
#include "xgboost/parameter.h"
#include "xgboost/span.h"
namespace xgboost {

View File

@@ -84,7 +84,7 @@ class HistogramCuts {
return *this;
}
uint32_t FeatureBins(bst_feature_t feature) const {
[[nodiscard]] bst_bin_t FeatureBins(bst_feature_t feature) const {
return cut_ptrs_.ConstHostVector().at(feature + 1) - cut_ptrs_.ConstHostVector()[feature];
}
@@ -92,8 +92,8 @@ class HistogramCuts {
std::vector<float> const& Values() const { return cut_values_.ConstHostVector(); }
std::vector<float> const& MinValues() const { return min_vals_.ConstHostVector(); }
bool HasCategorical() const { return has_categorical_; }
float MaxCategory() const { return max_cat_; }
[[nodiscard]] bool HasCategorical() const { return has_categorical_; }
[[nodiscard]] float MaxCategory() const { return max_cat_; }
/**
* \brief Set meta info about categorical features.
*
@@ -105,12 +105,13 @@ class HistogramCuts {
max_cat_ = max_cat;
}
size_t TotalBins() const { return cut_ptrs_.ConstHostVector().back(); }
[[nodiscard]] bst_bin_t TotalBins() const { return cut_ptrs_.ConstHostVector().back(); }
// Return the index of a cut point that is strictly greater than the input
// value, or the last available index if none exists
bst_bin_t SearchBin(float value, bst_feature_t column_id, std::vector<uint32_t> const& ptrs,
std::vector<float> const& values) const {
[[nodiscard]] bst_bin_t SearchBin(float value, bst_feature_t column_id,
std::vector<uint32_t> const& ptrs,
std::vector<float> const& values) const {
auto end = ptrs[column_id + 1];
auto beg = ptrs[column_id];
auto it = std::upper_bound(values.cbegin() + beg, values.cbegin() + end, value);
@@ -119,20 +120,20 @@ class HistogramCuts {
return idx;
}
bst_bin_t SearchBin(float value, bst_feature_t column_id) const {
[[nodiscard]] bst_bin_t SearchBin(float value, bst_feature_t column_id) const {
return this->SearchBin(value, column_id, Ptrs(), Values());
}
/**
* \brief Search the bin index for numerical feature.
*/
bst_bin_t SearchBin(Entry const& e) const { return SearchBin(e.fvalue, e.index); }
[[nodiscard]] bst_bin_t SearchBin(Entry const& e) const { return SearchBin(e.fvalue, e.index); }
/**
* \brief Search the bin index for categorical feature.
*/
bst_bin_t SearchCatBin(float value, bst_feature_t fidx, std::vector<uint32_t> const& ptrs,
std::vector<float> const& vals) const {
[[nodiscard]] bst_bin_t SearchCatBin(float value, bst_feature_t fidx,
std::vector<uint32_t> const& ptrs,
std::vector<float> const& vals) const {
auto end = ptrs.at(fidx + 1) + vals.cbegin();
auto beg = ptrs[fidx] + vals.cbegin();
// Truncates the value in case it's not perfectly rounded.
@@ -143,12 +144,14 @@ class HistogramCuts {
}
return bin_idx;
}
bst_bin_t SearchCatBin(float value, bst_feature_t fidx) const {
[[nodiscard]] bst_bin_t SearchCatBin(float value, bst_feature_t fidx) const {
auto const& ptrs = this->Ptrs();
auto const& vals = this->Values();
return this->SearchCatBin(value, fidx, ptrs, vals);
}
bst_bin_t SearchCatBin(Entry const& e) const { return SearchCatBin(e.fvalue, e.index); }
[[nodiscard]] bst_bin_t SearchCatBin(Entry const& e) const {
return SearchCatBin(e.fvalue, e.index);
}
/**
* \brief Return numerical bin value given bin index.

View File

@@ -1,24 +1,47 @@
/*!
* Copyright (c) by XGBoost Contributors 2019-2022
/**
* Copyright 2019-2023, by XGBoost Contributors
*/
#if defined(__unix__)
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#if !defined(NOMINMAX) && defined(_WIN32)
#define NOMINMAX
#endif // !defined(NOMINMAX)
#if !defined(xgboost_IS_WIN)
#if defined(_MSC_VER) || defined(__MINGW32__)
#define xgboost_IS_WIN 1
#endif // defined(_MSC_VER) || defined(__MINGW32__)
#endif // !defined(xgboost_IS_WIN)
#if defined(__unix__) || defined(__APPLE__)
#include <fcntl.h> // for open, O_RDONLY
#include <sys/mman.h> // for mmap, mmap64, munmap
#include <unistd.h> // for close, getpagesize
#elif defined(xgboost_IS_WIN)
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#endif // defined(__unix__)
#include <algorithm>
#include <fstream>
#include <string>
#include <memory>
#include <utility>
#include <cstdio>
#include "xgboost/logging.h"
#include <algorithm> // for copy, transform
#include <cctype> // for tolower
#include <cerrno> // for errno
#include <cstddef> // for size_t
#include <cstdint> // for int32_t, uint32_t
#include <cstring> // for memcpy
#include <fstream> // for ifstream
#include <iterator> // for distance
#include <limits> // for numeric_limits
#include <memory> // for unique_ptr
#include <string> // for string
#include <system_error> // for error_code, system_category
#include <utility> // for move
#include <vector> // for vector
#include "io.h"
#include "xgboost/collective/socket.h" // for LastError
#include "xgboost/logging.h"
namespace xgboost {
namespace common {
namespace xgboost::common {
size_t PeekableInStream::Read(void* dptr, size_t size) {
size_t nbuffer = buffer_.length() - buffer_ptr_;
if (nbuffer == 0) return strm_->Read(dptr, size);
@@ -94,11 +117,32 @@ void FixedSizeStream::Take(std::string* out) {
*out = std::move(buffer_);
}
namespace {
// Get system alignment value for IO with mmap.
std::size_t GetMmapAlignment() {
#if defined(xgboost_IS_WIN)
SYSTEM_INFO sys_info;
GetSystemInfo(&sys_info);
// During testing, `sys_info.dwPageSize` is of size 4096 while `dwAllocationGranularity` is of
// size 65536.
return sys_info.dwAllocationGranularity;
#else
return getpagesize();
#endif
}
auto SystemErrorMsg() {
std::int32_t errsv = system::LastError();
auto err = std::error_code{errsv, std::system_category()};
return err.message();
}
} // anonymous namespace
std::string LoadSequentialFile(std::string uri, bool stream) {
auto OpenErr = [&uri]() {
std::string msg;
msg = "Opening " + uri + " failed: ";
msg += strerror(errno);
msg += SystemErrorMsg();
LOG(FATAL) << msg;
};
@@ -155,5 +199,99 @@ std::string FileExtension(std::string fname, bool lower) {
return "";
}
}
} // namespace common
} // namespace xgboost
struct PrivateMmapConstStream::MMAPFile {
#if defined(xgboost_IS_WIN)
HANDLE fd{INVALID_HANDLE_VALUE};
HANDLE file_map{INVALID_HANDLE_VALUE};
#else
std::int32_t fd{0};
#endif
char* base_ptr{nullptr};
std::size_t base_size{0};
std::string path;
};
char* PrivateMmapConstStream::Open(std::string path, std::size_t offset, std::size_t length) {
if (length == 0) {
return nullptr;
}
#if defined(xgboost_IS_WIN)
HANDLE fd = CreateFile(path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, nullptr);
CHECK_NE(fd, INVALID_HANDLE_VALUE) << "Failed to open:" << path << ". " << SystemErrorMsg();
#else
auto fd = open(path.c_str(), O_RDONLY);
CHECK_GE(fd, 0) << "Failed to open:" << path << ". " << SystemErrorMsg();
#endif
char* ptr{nullptr};
// Round down for alignment.
auto view_start = offset / GetMmapAlignment() * GetMmapAlignment();
auto view_size = length + (offset - view_start);
#if defined(__linux__) || defined(__GLIBC__)
int prot{PROT_READ};
ptr = reinterpret_cast<char*>(mmap64(nullptr, view_size, prot, MAP_PRIVATE, fd, view_start));
CHECK_NE(ptr, MAP_FAILED) << "Failed to map: " << path << ". " << SystemErrorMsg();
handle_.reset(new MMAPFile{fd, ptr, view_size, std::move(path)});
#elif defined(xgboost_IS_WIN)
auto file_size = GetFileSize(fd, nullptr);
DWORD access = PAGE_READONLY;
auto map_file = CreateFileMapping(fd, nullptr, access, 0, file_size, nullptr);
access = FILE_MAP_READ;
std::uint32_t loff = static_cast<std::uint32_t>(view_start);
std::uint32_t hoff = view_start >> 32;
CHECK(map_file) << "Failed to map: " << path << ". " << SystemErrorMsg();
ptr = reinterpret_cast<char*>(MapViewOfFile(map_file, access, hoff, loff, view_size));
CHECK_NE(ptr, nullptr) << "Failed to map: " << path << ". " << SystemErrorMsg();
handle_.reset(new MMAPFile{fd, map_file, ptr, view_size, std::move(path)});
#else
CHECK_LE(offset, std::numeric_limits<off_t>::max())
<< "File size has exceeded the limit on the current system.";
int prot{PROT_READ};
ptr = reinterpret_cast<char*>(mmap(nullptr, view_size, prot, MAP_PRIVATE, fd, view_start));
CHECK_NE(ptr, MAP_FAILED) << "Failed to map: " << path << ". " << SystemErrorMsg();
handle_.reset(new MMAPFile{fd, ptr, view_size, std::move(path)});
#endif // defined(__linux__)
ptr += (offset - view_start);
return ptr;
}
PrivateMmapConstStream::PrivateMmapConstStream(std::string path, std::size_t offset,
std::size_t length)
: MemoryFixSizeBuffer{}, handle_{nullptr} {
this->p_buffer_ = Open(std::move(path), offset, length);
this->buffer_size_ = length;
}
PrivateMmapConstStream::~PrivateMmapConstStream() {
CHECK(handle_);
#if defined(xgboost_IS_WIN)
if (p_buffer_) {
CHECK(UnmapViewOfFile(handle_->base_ptr)) "Faled to call munmap: " << SystemErrorMsg();
}
if (handle_->fd != INVALID_HANDLE_VALUE) {
CHECK(CloseHandle(handle_->fd)) << "Failed to close handle: " << SystemErrorMsg();
}
if (handle_->file_map != INVALID_HANDLE_VALUE) {
CHECK(CloseHandle(handle_->file_map)) << "Failed to close mapping object: " << SystemErrorMsg();
}
#else
if (handle_->base_ptr) {
CHECK_NE(munmap(handle_->base_ptr, handle_->base_size), -1)
<< "Faled to call munmap: " << handle_->path << ". " << SystemErrorMsg();
}
if (handle_->fd != 0) {
CHECK_NE(close(handle_->fd), -1)
<< "Faled to close: " << handle_->path << ". " << SystemErrorMsg();
}
#endif
}
} // namespace xgboost::common
#if defined(xgboost_IS_WIN)
#undef xgboost_IS_WIN
#endif // defined(xgboost_IS_WIN)

View File

@@ -1,5 +1,5 @@
/*!
* Copyright by XGBoost Contributors 2014-2022
/**
* Copyright 2014-2023, XGBoost Contributors
* \file io.h
* \brief general stream interface for serialization, I/O
* \author Tianqi Chen
@@ -10,9 +10,11 @@
#include <dmlc/io.h>
#include <rabit/rabit.h>
#include <string>
#include <cstring>
#include <fstream>
#include <memory> // for unique_ptr
#include <string> // for string
#include "common.h"
@@ -127,6 +129,31 @@ inline std::string ReadAll(std::string const &path) {
return content;
}
/**
* @brief Private mmap file as a read-only stream.
*
* It can calculate alignment automatically based on system page size (or allocation
* granularity on Windows).
*/
class PrivateMmapConstStream : public MemoryFixSizeBuffer {
struct MMAPFile;
std::unique_ptr<MMAPFile> handle_;
char* Open(std::string path, std::size_t offset, std::size_t length);
public:
/**
* @brief Construct a private mmap stream.
*
* @param path File path.
* @param offset See the `offset` parameter of `mmap` for details.
* @param length See the `length` parameter of `mmap` for details.
*/
explicit PrivateMmapConstStream(std::string path, std::size_t offset, std::size_t length);
void Write(void const*, std::size_t) override { LOG(FATAL) << "Read-only stream."; }
~PrivateMmapConstStream() override;
};
} // namespace common
} // namespace xgboost
#endif // XGBOOST_COMMON_IO_H_

View File

@@ -590,7 +590,7 @@ class ArrayInterface {
template <std::int32_t D, typename Fn>
void DispatchDType(ArrayInterface<D> const array, std::int32_t device, Fn fn) {
// Only used for cuDF at the moment.
CHECK_EQ(array.valid.Size(), 0);
CHECK_EQ(array.valid.Capacity(), 0);
auto dispatch = [&](auto t) {
using T = std::remove_const_t<decltype(t)> const;
// Set the data size to max as we don't know the original size of a sliced array:

View File

@@ -416,7 +416,8 @@ void CopyTensorInfoImpl(Context const& ctx, Json arr_interface, linalg::Tensor<T
p_out->Reshape(array.shape);
return;
}
CHECK(array.valid.Size() == 0) << "Meta info like label or weight can not have missing value.";
CHECK_EQ(array.valid.Capacity(), 0)
<< "Meta info like label or weight can not have missing value.";
if (array.is_contiguous && array.type == ToDType<T>::kType) {
// Handle contigious
p_out->ModifyInplace([&](HostDeviceVector<T>* data, common::Span<size_t, D> shape) {

View File

@@ -33,7 +33,8 @@ void CopyTensorInfoImpl(CUDAContext const* ctx, Json arr_interface, linalg::Tens
p_out->Reshape(array.shape);
return;
}
CHECK(array.valid.Size() == 0) << "Meta info like label or weight can not have missing value.";
CHECK_EQ(array.valid.Capacity(), 0)
<< "Meta info like label or weight can not have missing value.";
auto ptr_device = SetDeviceToPtr(array.data);
p_out->SetDevice(ptr_device);

View File

@@ -5,6 +5,7 @@
#include <thrust/iterator/transform_output_iterator.h>
#include "../common/categorical.h"
#include "../common/cuda_context.cuh"
#include "../common/hist_util.cuh"
#include "../common/random.h"
#include "../common/transform_iterator.h" // MakeIndexTransformIter
@@ -313,7 +314,8 @@ void CopyGHistToEllpack(GHistIndexMatrix const& page, common::Span<size_t const>
auto d_csc_indptr = dh::ToSpan(csc_indptr);
auto bin_type = page.index.GetBinTypeSize();
common::CompressedBufferWriter writer{page.cut.TotalBins() + 1}; // +1 for null value
common::CompressedBufferWriter writer{page.cut.TotalBins() +
static_cast<std::size_t>(1)}; // +1 for null value
dh::LaunchN(row_stride * page.Size(), [=] __device__(size_t idx) mutable {
auto ridx = idx / row_stride;
@@ -357,8 +359,10 @@ EllpackPageImpl::EllpackPageImpl(Context const* ctx, GHistIndexMatrix const& pag
// copy gidx
common::CompressedByteT* d_compressed_buffer = gidx_buffer.DevicePointer();
dh::device_vector<size_t> row_ptr(page.row_ptr);
dh::device_vector<size_t> row_ptr(page.row_ptr.size());
auto d_row_ptr = dh::ToSpan(row_ptr);
dh::safe_cuda(cudaMemcpyAsync(d_row_ptr.data(), page.row_ptr.data(), d_row_ptr.size_bytes(),
cudaMemcpyHostToDevice, ctx->CUDACtx()->Stream()));
auto accessor = this->GetDeviceAccessor(ctx->gpu_id, ft);
auto null = accessor.NullValue();

View File

@@ -7,9 +7,6 @@
#ifndef XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_
#define XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_
#include <xgboost/data.h>
#include <xgboost/logging.h>
#include <algorithm>
#include <map>
#include <memory>
@@ -20,35 +17,33 @@
#include "ellpack_page_source.h"
#include "gradient_index_page_source.h"
#include "sparse_page_source.h"
#include "xgboost/data.h"
#include "xgboost/logging.h"
namespace xgboost {
namespace data {
namespace xgboost::data {
/**
* \brief DMatrix used for external memory.
*
* The external memory is created for controlling memory usage by splitting up data into
* multiple batches. However that doesn't mean we will actually process exact 1 batch at
* a time, which would be terribly slow considering that we have to loop through the
* whole dataset for every tree split. So we use async pre-fetch and let caller to decide
* how many batches it wants to process by returning data as shared pointer. The caller
* can use async function to process the data or just stage those batches, making the
* decision is out of the scope for sparse page dmatrix. These 2 optimizations might
* defeat the purpose of splitting up dataset since if you load all the batches then the
* memory usage is even worse than using a single batch. Essentially we need to control
* how many batches can be in memory at the same time.
* multiple batches. However that doesn't mean we will actually process exactly 1 batch
* at a time, which would be terribly slow considering that we have to loop through the
* whole dataset for every tree split. So we use async to pre-fetch pages and let the
* caller to decide how many batches it wants to process by returning data as a shared
* pointer. The caller can use async function to process the data or just stage those
* batches based on its use cases. These two optimizations might defeat the purpose of
* splitting up dataset since if you stage all the batches then the memory usage might be
* even worse than using a single batch. As a result, we must control how many batches can
* be in memory at any given time.
*
* Right now the write to the cache is sequential operation and is blocking, reading from
* cache is async but with a hard coded limit of 4 pages as an heuristic. So by sparse
* dmatrix itself there can be only 9 pages in main memory (might be of different types)
* at the same time: 1 page pending for write, 4 pre-fetched sparse pages, 4 pre-fetched
* dependent pages. If the caller stops iteration at the middle and start again, then the
* number of pages in memory can hit 16 due to pre-fetching, but this should be a bug in
* caller's code (XGBoost doesn't discard a large portion of data at the end, there's not
* sampling algo that samples only the first portion of data).
* Right now the write to the cache is a sequential operation and is blocking. Reading
* from cache on ther other hand, is async but with a hard coded limit of 3 pages as an
* heuristic. So by sparse dmatrix itself there can be only 7 pages in main memory (might
* be of different types) at the same time: 1 page pending for write, 3 pre-fetched sparse
* pages, 3 pre-fetched dependent pages.
*
* Of course if the caller decides to retain some batches to perform parallel processing,
* then we might load all pages in memory, which is also considered as a bug in caller's
* code. So if the algo supports external memory, it must be careful that queue for async
* code. So if the algo supports external memory, it must be careful that queue for async
* call must have an upper limit.
*
* Another assumption we make is that the data must be immutable so caller should never
@@ -101,7 +96,7 @@ class SparsePageDMatrix : public DMatrix {
MetaInfo &Info() override;
const MetaInfo &Info() const override;
Context const *Ctx() const override { return &fmat_ctx_; }
// The only DMatrix implementation that returns false.
bool SingleColBlock() const override { return false; }
DMatrix *Slice(common::Span<int32_t const>) override {
LOG(FATAL) << "Slicing DMatrix is not supported for external memory.";
@@ -153,6 +148,5 @@ inline std::string MakeCache(SparsePageDMatrix *ptr, std::string format, std::st
}
return id;
}
} // namespace data
} // namespace xgboost
} // namespace xgboost::data
#endif // XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_

View File

@@ -1,45 +1,48 @@
/*!
* Copyright 2014-2022 by XGBoost Contributors
/**
* Copyright 2014-2023, XGBoost Contributors
* \file sparse_page_source.h
*/
#ifndef XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#define XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#include <algorithm> // std::min
#include <string>
#include <utility>
#include <vector>
#include <future>
#include <thread>
#include <algorithm> // for min
#include <future> // for async
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility> // for pair, move
#include <vector>
#include "../common/common.h"
#include "../common/io.h" // for PrivateMmapConstStream
#include "../common/timer.h" // for Monitor, Timer
#include "adapter.h"
#include "dmlc/common.h" // for OMPException
#include "proxy_dmatrix.h" // for DMatrixProxy
#include "sparse_page_writer.h" // for SparsePageFormat
#include "xgboost/base.h"
#include "xgboost/data.h"
#include "adapter.h"
#include "sparse_page_writer.h"
#include "proxy_dmatrix.h"
#include "../common/common.h"
#include "../common/timer.h"
namespace xgboost {
namespace data {
namespace xgboost::data {
inline void TryDeleteCacheFile(const std::string& file) {
if (std::remove(file.c_str()) != 0) {
// Don't throw, this is called in a destructor.
LOG(WARNING) << "Couldn't remove external memory cache file " << file
<< "; you may want to remove it manually";
<< "; you may want to remove it manually";
}
}
/**
* @brief Information about the cache including path and page offsets.
*/
struct Cache {
// whether the write to the cache is complete
bool written;
std::string name;
std::string format;
// offset into binary cache file.
std::vector<size_t> offset;
std::vector<std::uint64_t> offset;
Cache(bool w, std::string n, std::string fmt)
: written{w}, name{std::move(n)}, format{std::move(fmt)} {
@@ -51,11 +54,24 @@ struct Cache {
return name + format;
}
std::string ShardName() {
[[nodiscard]] std::string ShardName() const {
return ShardName(this->name, this->format);
}
// The write is completed.
/**
* @brief Record a page with size of n_bytes.
*/
void Push(std::size_t n_bytes) { offset.push_back(n_bytes); }
/**
* @brief Returns the view start and length for the i^th page.
*/
[[nodiscard]] auto View(std::size_t i) const {
std::uint64_t off = offset.at(i);
std::uint64_t len = offset.at(i + 1) - offset[i];
return std::pair{off, len};
}
/**
* @brief Call this once the write for the cache is complete.
*/
void Commit() {
if (!written) {
std::partial_sum(offset.begin(), offset.end(), offset.begin());
@@ -64,7 +80,7 @@ struct Cache {
}
};
// Prevents multi-threaded call.
// Prevents multi-threaded call to `GetBatches`.
class TryLockGuard {
std::mutex& lock_;
@@ -77,74 +93,87 @@ class TryLockGuard {
}
};
/**
* @brief Base class for all page sources. Handles fetching, writing, and iteration.
*/
template <typename S>
class SparsePageSourceImpl : public BatchIteratorImpl<S> {
protected:
// Prevents calling this iterator from multiple places(or threads).
std::mutex single_threaded_;
// The current page.
std::shared_ptr<S> page_;
bool at_end_ {false};
float missing_;
int nthreads_;
std::int32_t nthreads_;
bst_feature_t n_features_;
uint32_t count_{0};
uint32_t n_batches_ {0};
// Index to the current page.
std::uint32_t count_{0};
// Total number of batches.
std::uint32_t n_batches_{0};
std::shared_ptr<Cache> cache_info_;
std::unique_ptr<dmlc::Stream> fo_;
using Ring = std::vector<std::future<std::shared_ptr<S>>>;
// A ring storing futures to data. Since the DMatrix iterator is forward only, so we
// can pre-fetch data in a ring.
std::unique_ptr<Ring> ring_{new Ring};
// Catching exception in pre-fetch threads to prevent segfault. Not always work though,
// OOM error can be delayed due to lazy commit. On the bright side, if mmap is used then
// OOM error should be rare.
dmlc::OMPException exec_;
common::Monitor monitor_;
bool ReadCache() {
CHECK(!at_end_);
if (!cache_info_->written) {
return false;
}
if (fo_) {
fo_.reset(); // flush the data to disk.
if (ring_->empty()) {
ring_->resize(n_batches_);
}
// An heuristic for number of pre-fetched batches. We can make it part of BatchParam
// to let user adjust number of pre-fetched batches when needed.
uint32_t constexpr kPreFetch = 4;
uint32_t constexpr kPreFetch = 3;
size_t n_prefetch_batches = std::min(kPreFetch, n_batches_);
CHECK_GT(n_prefetch_batches, 0) << "total batches:" << n_batches_;
size_t fetch_it = count_;
std::size_t fetch_it = count_;
for (size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
exec_.Rethrow();
for (std::size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
fetch_it %= n_batches_; // ring
if (ring_->at(fetch_it).valid()) {
continue;
}
auto const *self = this; // make sure it's const
auto const* self = this; // make sure it's const
CHECK_LT(fetch_it, cache_info_->offset.size());
ring_->at(fetch_it) = std::async(std::launch::async, [fetch_it, self]() {
common::Timer timer;
timer.Start();
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
auto n = self->cache_info_->ShardName();
size_t offset = self->cache_info_->offset.at(fetch_it);
std::unique_ptr<dmlc::SeekStream> fi{dmlc::SeekStream::CreateForRead(n.c_str())};
fi->Seek(offset);
CHECK_EQ(fi->Tell(), offset);
ring_->at(fetch_it) = std::async(std::launch::async, [fetch_it, self, this]() {
auto page = std::make_shared<S>();
CHECK(fmt->Read(page.get(), fi.get()));
LOG(INFO) << "Read a page in " << timer.ElapsedSeconds() << " seconds.";
this->exec_.Run([&] {
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
auto name = self->cache_info_->ShardName();
auto [offset, length] = self->cache_info_->View(fetch_it);
auto fi = std::make_unique<common::PrivateMmapConstStream>(name, offset, length);
CHECK(fmt->Read(page.get(), fi.get()));
});
return page;
});
}
CHECK_EQ(std::count_if(ring_->cbegin(), ring_->cend(), [](auto const& f) { return f.valid(); }),
n_prefetch_batches)
<< "Sparse DMatrix assumes forward iteration.";
monitor_.Start("Wait");
page_ = (*ring_)[count_].get();
CHECK(!(*ring_)[count_].valid());
monitor_.Stop("Wait");
exec_.Rethrow();
return true;
}
@@ -153,29 +182,41 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
common::Timer timer;
timer.Start();
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
if (!fo_) {
auto n = cache_info_->ShardName();
fo_.reset(dmlc::Stream::Create(n.c_str(), "w"));
}
auto bytes = fmt->Write(*page_, fo_.get());
timer.Stop();
auto name = cache_info_->ShardName();
std::unique_ptr<dmlc::Stream> fo;
if (this->Iter() == 0) {
fo.reset(dmlc::Stream::Create(name.c_str(), "wb"));
} else {
fo.reset(dmlc::Stream::Create(name.c_str(), "ab"));
}
auto bytes = fmt->Write(*page_, fo.get());
timer.Stop();
// Not entirely accurate, the kernels doesn't have to flush the data.
LOG(INFO) << static_cast<double>(bytes) / 1024.0 / 1024.0 << " MB written in "
<< timer.ElapsedSeconds() << " seconds.";
cache_info_->offset.push_back(bytes);
cache_info_->Push(bytes);
}
virtual void Fetch() = 0;
public:
SparsePageSourceImpl(float missing, int nthreads, bst_feature_t n_features,
uint32_t n_batches, std::shared_ptr<Cache> cache)
: missing_{missing}, nthreads_{nthreads}, n_features_{n_features},
n_batches_{n_batches}, cache_info_{std::move(cache)} {}
SparsePageSourceImpl(float missing, int nthreads, bst_feature_t n_features, uint32_t n_batches,
std::shared_ptr<Cache> cache)
: missing_{missing},
nthreads_{nthreads},
n_features_{n_features},
n_batches_{n_batches},
cache_info_{std::move(cache)} {
monitor_.Init(typeid(S).name()); // not pretty, but works for basic profiling
}
SparsePageSourceImpl(SparsePageSourceImpl const &that) = delete;
~SparsePageSourceImpl() override {
// Don't orphan the threads.
for (auto& fu : *ring_) {
if (fu.valid()) {
fu.get();
@@ -183,18 +224,18 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
}
}
uint32_t Iter() const { return count_; }
[[nodiscard]] uint32_t Iter() const { return count_; }
const S &operator*() const override {
CHECK(page_);
return *page_;
}
std::shared_ptr<S const> Page() const override {
[[nodiscard]] std::shared_ptr<S const> Page() const override {
return page_;
}
bool AtEnd() const override {
[[nodiscard]] bool AtEnd() const override {
return at_end_;
}
@@ -202,20 +243,23 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
TryLockGuard guard{single_threaded_};
at_end_ = false;
count_ = 0;
// Pre-fetch for the next round of iterations.
this->Fetch();
}
};
#if defined(XGBOOST_USE_CUDA)
// Push data from CUDA.
void DevicePush(DMatrixProxy* proxy, float missing, SparsePage* page);
#else
inline void DevicePush(DMatrixProxy*, float, SparsePage*) { common::AssertGPUSupport(); }
#endif
class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
// This is the source from the user.
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext> iter_;
DMatrixProxy* proxy_;
size_t base_row_id_ {0};
std::size_t base_row_id_{0};
void Fetch() final {
page_ = std::make_shared<SparsePage>();
@@ -244,7 +288,7 @@ class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
iter_{iter}, proxy_{proxy} {
if (!cache_info_->written) {
iter_.Reset();
CHECK_EQ(iter_.Next(), 1) << "Must have at least 1 batch.";
CHECK(iter_.Next()) << "Must have at least 1 batch.";
}
this->Fetch();
}
@@ -259,6 +303,7 @@ class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
}
if (at_end_) {
CHECK_EQ(cache_info_->offset.size(), n_batches_ + 1);
cache_info_->Commit();
if (n_batches_ != 0) {
CHECK_EQ(count_, n_batches_);
@@ -371,6 +416,5 @@ class SortedCSCPageSource : public PageSourceIncMixIn<SortedCSCPage> {
this->Fetch();
}
};
} // namespace data
} // namespace xgboost
} // namespace xgboost::data
#endif // XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_

View File

@@ -439,7 +439,7 @@ struct ShapSplitCondition {
if (isnan(x)) {
return is_missing_branch;
}
if (categories.Size() != 0) {
if (categories.Capacity() != 0) {
auto cat = static_cast<uint32_t>(x);
return categories.Check(cat);
} else {
@@ -454,7 +454,7 @@ struct ShapSplitCondition {
if (l.Data() == r.Data()) {
return l;
}
if (l.Size() > r.Size()) {
if (l.Capacity() > r.Capacity()) {
thrust::swap(l, r);
}
for (size_t i = 0; i < r.Bits().size(); ++i) {
@@ -466,7 +466,7 @@ struct ShapSplitCondition {
// Combine two split conditions on the same feature
XGBOOST_DEVICE void Merge(ShapSplitCondition other) {
// Combine duplicate features
if (categories.Size() != 0 || other.categories.Size() != 0) {
if (categories.Capacity() != 0 || other.categories.Capacity() != 0) {
categories = Intersect(categories, other.categories);
} else {
feature_lower_bound = max(feature_lower_bound, other.feature_lower_bound);

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019 XGBoost contributors
/**
* Copyright 2019-2023, XGBoost contributors
*/
#include <thrust/copy.h>
#include <thrust/device_vector.h>
@@ -140,20 +140,20 @@ void FeatureInteractionConstraintDevice::Reset() {
__global__ void ClearBuffersKernel(
LBitField64 result_buffer_output, LBitField64 result_buffer_input) {
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < result_buffer_output.Size()) {
if (tid < result_buffer_output.Capacity()) {
result_buffer_output.Clear(tid);
}
if (tid < result_buffer_input.Size()) {
if (tid < result_buffer_input.Capacity()) {
result_buffer_input.Clear(tid);
}
}
void FeatureInteractionConstraintDevice::ClearBuffers() {
CHECK_EQ(output_buffer_bits_.Size(), input_buffer_bits_.Size());
CHECK_LE(feature_buffer_.Size(), output_buffer_bits_.Size());
CHECK_EQ(output_buffer_bits_.Capacity(), input_buffer_bits_.Capacity());
CHECK_LE(feature_buffer_.Capacity(), output_buffer_bits_.Capacity());
uint32_t constexpr kBlockThreads = 256;
auto const n_grids = static_cast<uint32_t>(
common::DivRoundUp(input_buffer_bits_.Size(), kBlockThreads));
common::DivRoundUp(input_buffer_bits_.Capacity(), kBlockThreads));
dh::LaunchKernel {n_grids, kBlockThreads} (
ClearBuffersKernel,
output_buffer_bits_, input_buffer_bits_);
@@ -207,11 +207,11 @@ common::Span<bst_feature_t> FeatureInteractionConstraintDevice::Query(
ClearBuffers();
LBitField64 node_constraints = s_node_constraints_[nid];
CHECK_EQ(input_buffer_bits_.Size(), output_buffer_bits_.Size());
CHECK_EQ(input_buffer_bits_.Capacity(), output_buffer_bits_.Capacity());
uint32_t constexpr kBlockThreads = 256;
auto n_grids = static_cast<uint32_t>(
common::DivRoundUp(output_buffer_bits_.Size(), kBlockThreads));
common::DivRoundUp(output_buffer_bits_.Capacity(), kBlockThreads));
dh::LaunchKernel {n_grids, kBlockThreads} (
SetInputBufferKernel,
feature_list, input_buffer_bits_);
@@ -274,13 +274,13 @@ __global__ void InteractionConstraintSplitKernel(LBitField64 feature,
LBitField64 left,
LBitField64 right) {
auto tid = threadIdx.x + blockDim.x * blockIdx.x;
if (tid > node.Size()) {
if (tid > node.Capacity()) {
return;
}
// enable constraints from feature
node |= feature;
// clear the buffer after use
if (tid < feature.Size()) {
if (tid < feature.Capacity()) {
feature.Clear(tid);
}
@@ -323,7 +323,7 @@ void FeatureInteractionConstraintDevice::Split(
s_sets_, s_sets_ptr_);
uint32_t constexpr kBlockThreads = 256;
auto n_grids = static_cast<uint32_t>(common::DivRoundUp(node.Size(), kBlockThreads));
auto n_grids = static_cast<uint32_t>(common::DivRoundUp(node.Capacity(), kBlockThreads));
dh::LaunchKernel {n_grids, kBlockThreads} (
InteractionConstraintSplitKernel,

View File

@@ -146,27 +146,30 @@ class PoissonSampling : public thrust::binary_function<GradientPair, size_t, Gra
CombineGradientPair combine_;
};
NoSampling::NoSampling(EllpackPageImpl const* page) : page_(page) {}
NoSampling::NoSampling(BatchParam batch_param) : batch_param_(std::move(batch_param)) {}
GradientBasedSample NoSampling::Sample(Context const*, common::Span<GradientPair> gpair,
GradientBasedSample NoSampling::Sample(Context const* ctx, common::Span<GradientPair> gpair,
DMatrix* dmat) {
return {dmat->Info().num_row_, page_, gpair};
auto page = (*dmat->GetBatches<EllpackPage>(ctx, batch_param_).begin()).Impl();
return {dmat->Info().num_row_, page, gpair};
}
ExternalMemoryNoSampling::ExternalMemoryNoSampling(Context const* ctx, EllpackPageImpl const* page,
size_t n_rows, BatchParam batch_param)
: batch_param_{std::move(batch_param)},
page_(new EllpackPageImpl(ctx->gpu_id, page->Cuts(), page->is_dense, page->row_stride,
n_rows)) {}
ExternalMemoryNoSampling::ExternalMemoryNoSampling(BatchParam batch_param)
: batch_param_{std::move(batch_param)} {}
GradientBasedSample ExternalMemoryNoSampling::Sample(Context const* ctx,
common::Span<GradientPair> gpair,
DMatrix* dmat) {
if (!page_concatenated_) {
// Concatenate all the external memory ELLPACK pages into a single in-memory page.
page_.reset(nullptr);
size_t offset = 0;
for (auto& batch : dmat->GetBatches<EllpackPage>(ctx, batch_param_)) {
auto page = batch.Impl();
if (!page_) {
page_ = std::make_unique<EllpackPageImpl>(ctx->gpu_id, page->Cuts(), page->is_dense,
page->row_stride, dmat->Info().num_row_);
}
size_t num_elements = page_->Copy(ctx->gpu_id, page, offset);
offset += num_elements;
}
@@ -175,8 +178,8 @@ GradientBasedSample ExternalMemoryNoSampling::Sample(Context const* ctx,
return {dmat->Info().num_row_, page_.get(), gpair};
}
UniformSampling::UniformSampling(EllpackPageImpl const* page, float subsample)
: page_(page), subsample_(subsample) {}
UniformSampling::UniformSampling(BatchParam batch_param, float subsample)
: batch_param_{std::move(batch_param)}, subsample_(subsample) {}
GradientBasedSample UniformSampling::Sample(Context const* ctx, common::Span<GradientPair> gpair,
DMatrix* dmat) {
@@ -185,7 +188,8 @@ GradientBasedSample UniformSampling::Sample(Context const* ctx, common::Span<Gra
thrust::replace_if(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair),
thrust::counting_iterator<std::size_t>(0),
BernoulliTrial(common::GlobalRandom()(), subsample_), GradientPair());
return {dmat->Info().num_row_, page_, gpair};
auto page = (*dmat->GetBatches<EllpackPage>(ctx, batch_param_).begin()).Impl();
return {dmat->Info().num_row_, page, gpair};
}
ExternalMemoryUniformSampling::ExternalMemoryUniformSampling(size_t n_rows,
@@ -236,12 +240,10 @@ GradientBasedSample ExternalMemoryUniformSampling::Sample(Context const* ctx,
return {sample_rows, page_.get(), dh::ToSpan(gpair_)};
}
GradientBasedSampling::GradientBasedSampling(EllpackPageImpl const* page,
size_t n_rows,
const BatchParam&,
GradientBasedSampling::GradientBasedSampling(std::size_t n_rows, BatchParam batch_param,
float subsample)
: page_(page),
subsample_(subsample),
: subsample_(subsample),
batch_param_{std::move(batch_param)},
threshold_(n_rows + 1, 0.0f),
grad_sum_(n_rows, 0.0f) {}
@@ -252,18 +254,19 @@ GradientBasedSample GradientBasedSampling::Sample(Context const* ctx,
size_t threshold_index = GradientBasedSampler::CalculateThresholdIndex(
gpair, dh::ToSpan(threshold_), dh::ToSpan(grad_sum_), n_rows * subsample_);
auto page = (*dmat->GetBatches<EllpackPage>(ctx, batch_param_).begin()).Impl();
// Perform Poisson sampling in place.
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair),
thrust::counting_iterator<size_t>(0), dh::tbegin(gpair),
PoissonSampling(dh::ToSpan(threshold_), threshold_index,
RandomWeight(common::GlobalRandom()())));
return {n_rows, page_, gpair};
return {n_rows, page, gpair};
}
ExternalMemoryGradientBasedSampling::ExternalMemoryGradientBasedSampling(
size_t n_rows,
BatchParam batch_param,
float subsample)
ExternalMemoryGradientBasedSampling::ExternalMemoryGradientBasedSampling(size_t n_rows,
BatchParam batch_param,
float subsample)
: batch_param_(std::move(batch_param)),
subsample_(subsample),
threshold_(n_rows + 1, 0.0f),
@@ -273,16 +276,15 @@ ExternalMemoryGradientBasedSampling::ExternalMemoryGradientBasedSampling(
GradientBasedSample ExternalMemoryGradientBasedSampling::Sample(Context const* ctx,
common::Span<GradientPair> gpair,
DMatrix* dmat) {
size_t n_rows = dmat->Info().num_row_;
auto cuctx = ctx->CUDACtx();
bst_row_t n_rows = dmat->Info().num_row_;
size_t threshold_index = GradientBasedSampler::CalculateThresholdIndex(
gpair, dh::ToSpan(threshold_), dh::ToSpan(grad_sum_), n_rows * subsample_);
// Perform Poisson sampling in place.
thrust::transform(dh::tbegin(gpair), dh::tend(gpair),
thrust::counting_iterator<size_t>(0),
dh::tbegin(gpair),
PoissonSampling(dh::ToSpan(threshold_),
threshold_index,
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair),
thrust::counting_iterator<size_t>(0), dh::tbegin(gpair),
PoissonSampling(dh::ToSpan(threshold_), threshold_index,
RandomWeight(common::GlobalRandom()())));
// Count the sampled rows.
@@ -290,16 +292,15 @@ GradientBasedSample ExternalMemoryGradientBasedSampling::Sample(Context const* c
// Compact gradient pairs.
gpair_.resize(sample_rows);
thrust::copy_if(dh::tbegin(gpair), dh::tend(gpair), gpair_.begin(), IsNonZero());
thrust::copy_if(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair), gpair_.begin(), IsNonZero());
// Index the sample rows.
thrust::transform(dh::tbegin(gpair), dh::tend(gpair), sample_row_index_.begin(), IsNonZero());
thrust::exclusive_scan(sample_row_index_.begin(), sample_row_index_.end(),
sample_row_index_.begin());
thrust::transform(dh::tbegin(gpair), dh::tend(gpair),
sample_row_index_.begin(),
sample_row_index_.begin(),
ClearEmptyRows());
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair), sample_row_index_.begin(),
IsNonZero());
thrust::exclusive_scan(cuctx->CTP(), sample_row_index_.begin(), sample_row_index_.end(),
sample_row_index_.begin());
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair), sample_row_index_.begin(),
sample_row_index_.begin(), ClearEmptyRows());
auto batch_iterator = dmat->GetBatches<EllpackPage>(ctx, batch_param_);
auto first_page = (*batch_iterator.begin()).Impl();
@@ -317,13 +318,13 @@ GradientBasedSample ExternalMemoryGradientBasedSampling::Sample(Context const* c
return {sample_rows, page_.get(), dh::ToSpan(gpair_)};
}
GradientBasedSampler::GradientBasedSampler(Context const* ctx, EllpackPageImpl const* page,
size_t n_rows, const BatchParam& batch_param,
float subsample, int sampling_method) {
GradientBasedSampler::GradientBasedSampler(Context const* /*ctx*/, size_t n_rows,
const BatchParam& batch_param, float subsample,
int sampling_method, bool is_external_memory) {
// The ctx is kept here for future development of stream-based operations.
monitor_.Init("gradient_based_sampler");
bool is_sampling = subsample < 1.0;
bool is_external_memory = page->n_rows != n_rows;
if (is_sampling) {
switch (sampling_method) {
@@ -331,24 +332,24 @@ GradientBasedSampler::GradientBasedSampler(Context const* ctx, EllpackPageImpl c
if (is_external_memory) {
strategy_.reset(new ExternalMemoryUniformSampling(n_rows, batch_param, subsample));
} else {
strategy_.reset(new UniformSampling(page, subsample));
strategy_.reset(new UniformSampling(batch_param, subsample));
}
break;
case TrainParam::kGradientBased:
if (is_external_memory) {
strategy_.reset(
new ExternalMemoryGradientBasedSampling(n_rows, batch_param, subsample));
strategy_.reset(new ExternalMemoryGradientBasedSampling(n_rows, batch_param, subsample));
} else {
strategy_.reset(new GradientBasedSampling(page, n_rows, batch_param, subsample));
strategy_.reset(new GradientBasedSampling(n_rows, batch_param, subsample));
}
break;
default:LOG(FATAL) << "unknown sampling method";
default:
LOG(FATAL) << "unknown sampling method";
}
} else {
if (is_external_memory) {
strategy_.reset(new ExternalMemoryNoSampling(ctx, page, n_rows, batch_param));
strategy_.reset(new ExternalMemoryNoSampling(batch_param));
} else {
strategy_.reset(new NoSampling(page));
strategy_.reset(new NoSampling(batch_param));
}
}
}
@@ -362,11 +363,11 @@ GradientBasedSample GradientBasedSampler::Sample(Context const* ctx,
return sample;
}
size_t GradientBasedSampler::CalculateThresholdIndex(
common::Span<GradientPair> gpair, common::Span<float> threshold,
common::Span<float> grad_sum, size_t sample_rows) {
thrust::fill(dh::tend(threshold) - 1, dh::tend(threshold),
std::numeric_limits<float>::max());
size_t GradientBasedSampler::CalculateThresholdIndex(common::Span<GradientPair> gpair,
common::Span<float> threshold,
common::Span<float> grad_sum,
size_t sample_rows) {
thrust::fill(dh::tend(threshold) - 1, dh::tend(threshold), std::numeric_limits<float>::max());
thrust::transform(dh::tbegin(gpair), dh::tend(gpair), dh::tbegin(threshold),
CombineGradientPair());
thrust::sort(dh::tbegin(threshold), dh::tend(threshold) - 1);
@@ -379,6 +380,5 @@ size_t GradientBasedSampler::CalculateThresholdIndex(
thrust::min_element(dh::tbegin(grad_sum), dh::tend(grad_sum));
return thrust::distance(dh::tbegin(grad_sum), min) + 1;
}
}; // namespace tree
}; // namespace xgboost

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019 by XGBoost Contributors
/**
* Copyright 2019-2023, XGBoost Contributors
*/
#pragma once
#include <xgboost/base.h>
@@ -32,37 +32,36 @@ class SamplingStrategy {
/*! \brief No sampling in in-memory mode. */
class NoSampling : public SamplingStrategy {
public:
explicit NoSampling(EllpackPageImpl const* page);
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
DMatrix* dmat) override;
private:
EllpackPageImpl const* page_;
};
/*! \brief No sampling in external memory mode. */
class ExternalMemoryNoSampling : public SamplingStrategy {
public:
ExternalMemoryNoSampling(Context const* ctx, EllpackPageImpl const* page, size_t n_rows,
BatchParam batch_param);
explicit NoSampling(BatchParam batch_param);
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
DMatrix* dmat) override;
private:
BatchParam batch_param_;
std::unique_ptr<EllpackPageImpl> page_;
};
/*! \brief No sampling in external memory mode. */
class ExternalMemoryNoSampling : public SamplingStrategy {
public:
explicit ExternalMemoryNoSampling(BatchParam batch_param);
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
DMatrix* dmat) override;
private:
BatchParam batch_param_;
std::unique_ptr<EllpackPageImpl> page_{nullptr};
bool page_concatenated_{false};
};
/*! \brief Uniform sampling in in-memory mode. */
class UniformSampling : public SamplingStrategy {
public:
UniformSampling(EllpackPageImpl const* page, float subsample);
UniformSampling(BatchParam batch_param, float subsample);
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
DMatrix* dmat) override;
private:
EllpackPageImpl const* page_;
BatchParam batch_param_;
float subsample_;
};
@@ -84,13 +83,12 @@ class ExternalMemoryUniformSampling : public SamplingStrategy {
/*! \brief Gradient-based sampling in in-memory mode.. */
class GradientBasedSampling : public SamplingStrategy {
public:
GradientBasedSampling(EllpackPageImpl const* page, size_t n_rows, const BatchParam& batch_param,
float subsample);
GradientBasedSampling(std::size_t n_rows, BatchParam batch_param, float subsample);
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
DMatrix* dmat) override;
private:
EllpackPageImpl const* page_;
BatchParam batch_param_;
float subsample_;
dh::caching_device_vector<float> threshold_;
dh::caching_device_vector<float> grad_sum_;
@@ -106,11 +104,11 @@ class ExternalMemoryGradientBasedSampling : public SamplingStrategy {
private:
BatchParam batch_param_;
float subsample_;
dh::caching_device_vector<float> threshold_;
dh::caching_device_vector<float> grad_sum_;
dh::device_vector<float> threshold_;
dh::device_vector<float> grad_sum_;
std::unique_ptr<EllpackPageImpl> page_;
dh::device_vector<GradientPair> gpair_;
dh::caching_device_vector<size_t> sample_row_index_;
dh::device_vector<size_t> sample_row_index_;
};
/*! \brief Draw a sample of rows from a DMatrix.
@@ -124,8 +122,8 @@ class ExternalMemoryGradientBasedSampling : public SamplingStrategy {
*/
class GradientBasedSampler {
public:
GradientBasedSampler(Context const* ctx, EllpackPageImpl const* page, size_t n_rows,
const BatchParam& batch_param, float subsample, int sampling_method);
GradientBasedSampler(Context const* ctx, size_t n_rows, const BatchParam& batch_param,
float subsample, int sampling_method, bool is_external_memory);
/*! \brief Sample from a DMatrix based on the given gradient pairs. */
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair, DMatrix* dmat);

View File

@@ -213,7 +213,7 @@ std::vector<bst_cat_t> GetSplitCategories(RegTree const &tree, int32_t nidx) {
auto split = common::KCatBitField{csr.categories.subspan(seg.beg, seg.size)};
std::vector<bst_cat_t> cats;
for (size_t i = 0; i < split.Size(); ++i) {
for (size_t i = 0; i < split.Capacity(); ++i) {
if (split.Check(i)) {
cats.push_back(static_cast<bst_cat_t>(i));
}
@@ -1004,7 +1004,7 @@ void RegTree::SaveCategoricalSplit(Json* p_out) const {
auto segment = split_categories_segments_[i];
auto node_categories = this->GetSplitCategories().subspan(segment.beg, segment.size);
common::KCatBitField const cat_bits(node_categories);
for (size_t i = 0; i < cat_bits.Size(); ++i) {
for (size_t i = 0; i < cat_bits.Capacity(); ++i) {
if (cat_bits.Check(i)) {
categories.GetArray().emplace_back(i);
}

View File

@@ -176,7 +176,7 @@ struct GPUHistMakerDevice {
Context const* ctx_;
public:
EllpackPageImpl const* page;
EllpackPageImpl const* page{nullptr};
common::Span<FeatureType const> feature_types;
BatchParam batch_param;
@@ -205,41 +205,41 @@ struct GPUHistMakerDevice {
std::unique_ptr<FeatureGroups> feature_groups;
GPUHistMakerDevice(Context const* ctx, EllpackPageImpl const* _page,
common::Span<FeatureType const> _feature_types, bst_uint _n_rows,
GPUHistMakerDevice(Context const* ctx, bool is_external_memory,
common::Span<FeatureType const> _feature_types, bst_row_t _n_rows,
TrainParam _param, uint32_t column_sampler_seed, uint32_t n_features,
BatchParam _batch_param)
: evaluator_{_param, n_features, ctx->gpu_id},
ctx_(ctx),
page(_page),
feature_types{_feature_types},
param(std::move(_param)),
column_sampler(column_sampler_seed),
interaction_constraints(param, n_features),
batch_param(std::move(_batch_param)) {
sampler.reset(new GradientBasedSampler(ctx, page, _n_rows, batch_param, param.subsample,
param.sampling_method));
sampler.reset(new GradientBasedSampler(ctx, _n_rows, batch_param, param.subsample,
param.sampling_method, is_external_memory));
if (!param.monotone_constraints.empty()) {
// Copy assigning an empty vector causes an exception in MSVC debug builds
monotone_constraints = param.monotone_constraints;
}
// Init histogram
hist.Init(ctx_->gpu_id, page->Cuts().TotalBins());
monitor.Init(std::string("GPUHistMakerDevice") + std::to_string(ctx_->gpu_id));
feature_groups.reset(new FeatureGroups(page->Cuts(), page->is_dense,
dh::MaxSharedMemoryOptin(ctx_->gpu_id),
sizeof(GradientSumT)));
}
~GPUHistMakerDevice() { // NOLINT
dh::safe_cuda(cudaSetDevice(ctx_->gpu_id));
}
void InitFeatureGroupsOnce() {
if (!feature_groups) {
CHECK(page);
feature_groups.reset(new FeatureGroups(page->Cuts(), page->is_dense,
dh::MaxSharedMemoryOptin(ctx_->gpu_id),
sizeof(GradientSumT)));
}
}
// Reset values for each update iteration
// Note that the column sampler must be passed by value because it is not
// thread safe
void Reset(HostDeviceVector<GradientPair>* dh_gpair, DMatrix* dmat, int64_t num_columns) {
auto const& info = dmat->Info();
this->column_sampler.Init(ctx_, num_columns, info.feature_weights.HostVector(),
@@ -247,26 +247,30 @@ struct GPUHistMakerDevice {
param.colsample_bytree);
dh::safe_cuda(cudaSetDevice(ctx_->gpu_id));
this->evaluator_.Reset(page->Cuts(), feature_types, dmat->Info().num_col_, param,
ctx_->gpu_id);
this->interaction_constraints.Reset();
if (d_gpair.size() != dh_gpair->Size()) {
d_gpair.resize(dh_gpair->Size());
}
dh::safe_cuda(cudaMemcpyAsync(
d_gpair.data().get(), dh_gpair->ConstDevicePointer(),
dh_gpair->Size() * sizeof(GradientPair), cudaMemcpyDeviceToDevice));
dh::safe_cuda(cudaMemcpyAsync(d_gpair.data().get(), dh_gpair->ConstDevicePointer(),
dh_gpair->Size() * sizeof(GradientPair),
cudaMemcpyDeviceToDevice));
auto sample = sampler->Sample(ctx_, dh::ToSpan(d_gpair), dmat);
page = sample.page;
gpair = sample.gpair;
this->evaluator_.Reset(page->Cuts(), feature_types, dmat->Info().num_col_, param, ctx_->gpu_id);
quantiser.reset(new GradientQuantiser(this->gpair));
row_partitioner.reset(); // Release the device memory first before reallocating
row_partitioner.reset(new RowPartitioner(ctx_->gpu_id, sample.sample_rows));
row_partitioner.reset(new RowPartitioner(ctx_->gpu_id, sample.sample_rows));
// Init histogram
hist.Init(ctx_->gpu_id, page->Cuts().TotalBins());
hist.Reset();
this->InitFeatureGroupsOnce();
}
GPUExpandEntry EvaluateRootSplit(GradientPairInt64 root_sum) {
@@ -808,12 +812,11 @@ class GPUHistMaker : public TreeUpdater {
collective::Broadcast(&column_sampling_seed, sizeof(column_sampling_seed), 0);
auto batch_param = BatchParam{param->max_bin, TrainParam::DftSparseThreshold()};
auto page = (*dmat->GetBatches<EllpackPage>(ctx_, batch_param).begin()).Impl();
dh::safe_cuda(cudaSetDevice(ctx_->gpu_id));
info_->feature_types.SetDevice(ctx_->gpu_id);
maker.reset(new GPUHistMakerDevice<GradientSumT>(
ctx_, page, info_->feature_types.ConstDeviceSpan(), info_->num_row_, *param,
column_sampling_seed, info_->num_col_, batch_param));
ctx_, !dmat->SingleColBlock(), info_->feature_types.ConstDeviceSpan(), info_->num_row_,
*param, column_sampling_seed, info_->num_col_, batch_param));
p_last_fmat_ = dmat;
initialised_ = true;

View File

@@ -37,6 +37,7 @@ class LintersPaths:
"demo/guide-python/quantile_regression.py",
"demo/guide-python/multioutput_regression.py",
"demo/guide-python/learning_to_rank.py",
"demo/aft_survival/aft_survival_viz_demo.py",
# CI
"tests/ci_build/lint_python.py",
"tests/ci_build/test_r_package.py",
@@ -78,6 +79,7 @@ class LintersPaths:
"demo/guide-python/quantile_regression.py",
"demo/guide-python/multioutput_regression.py",
"demo/guide-python/learning_to_rank.py",
"demo/aft_survival/aft_survival_viz_demo.py",
# CI
"tests/ci_build/lint_python.py",
"tests/ci_build/test_r_package.py",
@@ -114,7 +116,13 @@ def run_black(rel_path: str, fix: bool) -> bool:
@cd(PY_PACKAGE)
def run_isort(rel_path: str, fix: bool) -> bool:
# Isort gets confused when trying to find the config file, so specified explicitly.
cmd = ["isort", "--settings-path", PY_PACKAGE, os.path.join(ROOT, rel_path)]
cmd = [
"isort",
"--settings-path",
PY_PACKAGE,
f"--src={PY_PACKAGE}",
os.path.join(ROOT, rel_path),
]
if not fix:
cmd += ["--check"]

View File

@@ -5,10 +5,12 @@
#include <gtest/gtest.h>
#include <bitset>
#include <string> // for string
#include "../../../src/collective/nccl_device_communicator.cuh"
#include "../../../src/collective/communicator-inl.cuh"
#include "../../../src/collective/nccl_device_communicator.cuh"
#include "../helpers.h"
namespace xgboost {
namespace collective {
@@ -31,6 +33,69 @@ TEST(NcclDeviceCommunicatorSimpleTest, SystemError) {
ASSERT_TRUE(str.find("environment variables") != std::string::npos);
}
}
namespace {
void VerifyAllReduceBitwiseAND() {
auto const rank = collective::GetRank();
std::bitset<64> original{};
original[rank] = true;
HostDeviceVector<uint64_t> buffer({original.to_ullong()}, rank);
collective::AllReduce<collective::Operation::kBitwiseAND>(rank, buffer.DevicePointer(), 1);
collective::Synchronize(rank);
EXPECT_EQ(buffer.HostVector()[0], 0ULL);
}
} // anonymous namespace
TEST(NcclDeviceCommunicator, MGPUAllReduceBitwiseAND) {
auto const n_gpus = common::AllVisibleGPUs();
if (n_gpus <= 1) {
GTEST_SKIP() << "Skipping MGPUAllReduceBitwiseAND test with # GPUs = " << n_gpus;
}
RunWithInMemoryCommunicator(n_gpus, VerifyAllReduceBitwiseAND);
}
namespace {
void VerifyAllReduceBitwiseOR() {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::bitset<64> original{};
original[rank] = true;
HostDeviceVector<uint64_t> buffer({original.to_ullong()}, rank);
collective::AllReduce<collective::Operation::kBitwiseOR>(rank, buffer.DevicePointer(), 1);
collective::Synchronize(rank);
EXPECT_EQ(buffer.HostVector()[0], (1ULL << world_size) - 1);
}
} // anonymous namespace
TEST(NcclDeviceCommunicator, MGPUAllReduceBitwiseOR) {
auto const n_gpus = common::AllVisibleGPUs();
if (n_gpus <= 1) {
GTEST_SKIP() << "Skipping MGPUAllReduceBitwiseOR test with # GPUs = " << n_gpus;
}
RunWithInMemoryCommunicator(n_gpus, VerifyAllReduceBitwiseOR);
}
namespace {
void VerifyAllReduceBitwiseXOR() {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::bitset<64> original{~0ULL};
original[rank] = false;
HostDeviceVector<uint64_t> buffer({original.to_ullong()}, rank);
collective::AllReduce<collective::Operation::kBitwiseXOR>(rank, buffer.DevicePointer(), 1);
collective::Synchronize(rank);
EXPECT_EQ(buffer.HostVector()[0], (1ULL << world_size) - 1);
}
} // anonymous namespace
TEST(NcclDeviceCommunicator, MGPUAllReduceBitwiseXOR) {
auto const n_gpus = common::AllVisibleGPUs();
if (n_gpus <= 1) {
GTEST_SKIP() << "Skipping MGPUAllReduceBitwiseXOR test with # GPUs = " << n_gpus;
}
RunWithInMemoryCommunicator(n_gpus, VerifyAllReduceBitwiseXOR);
}
} // namespace collective
} // namespace xgboost

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019 XGBoost contributors
/**
* Copyright 2019-2023, XGBoost contributors
*/
#include <gtest/gtest.h>
#include "../../../src/common/bitfield.h"
@@ -14,7 +14,7 @@ TEST(BitField, Check) {
static_cast<typename common::Span<LBitField64::value_type>::index_type>(
storage.size())});
size_t true_bit = 190;
for (size_t i = true_bit + 1; i < bits.Size(); ++i) {
for (size_t i = true_bit + 1; i < bits.Capacity(); ++i) {
ASSERT_FALSE(bits.Check(i));
}
ASSERT_TRUE(bits.Check(true_bit));
@@ -34,7 +34,7 @@ TEST(BitField, Check) {
ASSERT_FALSE(bits.Check(i));
}
ASSERT_TRUE(bits.Check(true_bit));
for (size_t i = true_bit + 1; i < bits.Size(); ++i) {
for (size_t i = true_bit + 1; i < bits.Capacity(); ++i) {
ASSERT_FALSE(bits.Check(i));
}
}

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019 XGBoost contributors
/**
* Copyright 2019-2023, XGBoost contributors
*/
#include <gtest/gtest.h>
#include <thrust/copy.h>
@@ -12,7 +12,7 @@ namespace xgboost {
__global__ void TestSetKernel(LBitField64 bits) {
auto tid = threadIdx.x + blockIdx.x * blockDim.x;
if (tid < bits.Size()) {
if (tid < bits.Capacity()) {
bits.Set(tid);
}
}
@@ -36,20 +36,16 @@ TEST(BitField, GPUSet) {
std::vector<LBitField64::value_type> h_storage(storage.size());
thrust::copy(storage.begin(), storage.end(), h_storage.begin());
LBitField64 outputs {
common::Span<LBitField64::value_type>{h_storage.data(),
h_storage.data() + h_storage.size()}};
LBitField64 outputs{
common::Span<LBitField64::value_type>{h_storage.data(), h_storage.data() + h_storage.size()}};
for (size_t i = 0; i < kBits; ++i) {
ASSERT_TRUE(outputs.Check(i));
}
}
__global__ void TestOrKernel(LBitField64 lhs, LBitField64 rhs) {
lhs |= rhs;
}
TEST(BitField, GPUAnd) {
namespace {
template <bool is_and, typename Op>
void TestGPULogic(Op op) {
uint32_t constexpr kBits = 128;
dh::device_vector<LBitField64::value_type> lhs_storage(kBits);
dh::device_vector<LBitField64::value_type> rhs_storage(kBits);
@@ -57,13 +53,32 @@ TEST(BitField, GPUAnd) {
auto rhs = LBitField64(dh::ToSpan(rhs_storage));
thrust::fill(lhs_storage.begin(), lhs_storage.end(), 0UL);
thrust::fill(rhs_storage.begin(), rhs_storage.end(), ~static_cast<LBitField64::value_type>(0UL));
TestOrKernel<<<1, kBits>>>(lhs, rhs);
dh::LaunchN(kBits, [=] __device__(auto) mutable { op(lhs, rhs); });
std::vector<LBitField64::value_type> h_storage(lhs_storage.size());
thrust::copy(lhs_storage.begin(), lhs_storage.end(), h_storage.begin());
LBitField64 outputs {{h_storage.data(), h_storage.data() + h_storage.size()}};
for (size_t i = 0; i < kBits; ++i) {
ASSERT_TRUE(outputs.Check(i));
LBitField64 outputs{{h_storage.data(), h_storage.data() + h_storage.size()}};
if (is_and) {
for (size_t i = 0; i < kBits; ++i) {
ASSERT_FALSE(outputs.Check(i));
}
} else {
for (size_t i = 0; i < kBits; ++i) {
ASSERT_TRUE(outputs.Check(i));
}
}
}
} // namespace xgboost
void TestGPUAnd() {
TestGPULogic<true>([] XGBOOST_DEVICE(LBitField64 & lhs, LBitField64 const& rhs) { lhs &= rhs; });
}
void TestGPUOr() {
TestGPULogic<false>([] XGBOOST_DEVICE(LBitField64 & lhs, LBitField64 const& rhs) { lhs |= rhs; });
}
} // namespace
TEST(BitField, GPUAnd) { TestGPUAnd(); }
TEST(BitField, GPUOr) { TestGPUOr(); }
} // namespace xgboost

View File

@@ -83,7 +83,9 @@ template <typename BinIdxType>
void CheckColumWithMissingValue(const DenseColumnIter<BinIdxType, true>& col,
const GHistIndexMatrix& gmat) {
for (auto i = 0ull; i < col.Size(); i++) {
if (col.IsMissing(i)) continue;
if (col.IsMissing(i)) {
continue;
}
EXPECT_EQ(gmat.index[gmat.row_ptr[i]], col.GetGlobalBinIdx(i));
}
}

View File

@@ -1,5 +1,5 @@
/*!
* Copyright (c) by XGBoost Contributors 2019
/**
* Copyright 2019-2023, XGBoost Contributors
*/
#include <gtest/gtest.h>
@@ -9,8 +9,7 @@
#include "../helpers.h"
#include "../filesystem.h" // dmlc::TemporaryDirectory
namespace xgboost {
namespace common {
namespace xgboost::common {
TEST(MemoryFixSizeBuffer, Seek) {
size_t constexpr kSize { 64 };
std::vector<int32_t> memory( kSize );
@@ -89,5 +88,54 @@ TEST(IO, LoadSequentialFile) {
ASSERT_THROW(LoadSequentialFile("non-exist", true), dmlc::Error);
}
} // namespace common
} // namespace xgboost
TEST(IO, PrivateMmapStream) {
dmlc::TemporaryDirectory tempdir;
auto path = tempdir.path + "/testfile";
// The page size on Linux is usually set to 4096, while the allocation granularity on
// the Windows machine where this test is writted is 65536. We span the test to cover
// all of them.
std::size_t n_batches{64};
std::size_t multiplier{2048};
std::vector<std::vector<std::int32_t>> batches;
std::vector<std::size_t> offset{0ul};
using T = std::int32_t;
{
std::unique_ptr<dmlc::Stream> fo{dmlc::Stream::Create(path.c_str(), "w")};
for (std::size_t i = 0; i < n_batches; ++i) {
std::size_t size = (i + 1) * multiplier;
std::vector<T> data(size, 0);
std::iota(data.begin(), data.end(), i * i);
fo->Write(static_cast<std::uint64_t>(data.size()));
fo->Write(data.data(), data.size() * sizeof(T));
std::size_t bytes = sizeof(std::uint64_t) + data.size() * sizeof(T);
offset.push_back(bytes);
batches.emplace_back(std::move(data));
}
}
// Turn size info offset
std::partial_sum(offset.begin(), offset.end(), offset.begin());
for (std::size_t i = 0; i < n_batches; ++i) {
std::size_t off = offset[i];
std::size_t n = offset.at(i + 1) - offset[i];
std::unique_ptr<dmlc::Stream> fi{std::make_unique<PrivateMmapConstStream>(path, off, n)};
std::vector<T> data;
std::uint64_t size{0};
fi->Read(&size);
data.resize(size);
fi->Read(data.data(), size * sizeof(T));
ASSERT_EQ(data, batches[i]);
}
}
} // namespace xgboost::common

View File

@@ -2,6 +2,10 @@
#include "../../src/data/ellpack_page.cuh"
#endif
#include <xgboost/data.h> // for SparsePage
#include "./helpers.h" // for RandomDataGenerator
namespace xgboost {
#if defined(__CUDACC__)
namespace {

View File

@@ -285,8 +285,6 @@ TEST(GpuHist, PartitionTwoNodes) {
dh::ToSpan(feature_histogram_b)};
thrust::device_vector<GPUExpandEntry> results(2);
evaluator.EvaluateSplits({0, 1}, 1, dh::ToSpan(inputs), shared_inputs, dh::ToSpan(results));
GPUExpandEntry result_a = results[0];
GPUExpandEntry result_b = results[1];
EXPECT_EQ(std::bitset<32>(evaluator.GetHostNodeCats(0)[0]),
std::bitset<32>("10000000000000000000000000000000"));
EXPECT_EQ(std::bitset<32>(evaluator.GetHostNodeCats(1)[0]),

View File

@@ -39,7 +39,8 @@ void VerifySampling(size_t page_size,
EXPECT_NE(page->n_rows, kRows);
}
GradientBasedSampler sampler(&ctx, page, kRows, param, subsample, sampling_method);
GradientBasedSampler sampler(&ctx, kRows, param, subsample, sampling_method,
!fixed_size_sampling);
auto sample = sampler.Sample(&ctx, gpair.DeviceSpan(), dmat.get());
if (fixed_size_sampling) {
@@ -93,7 +94,7 @@ TEST(GradientBasedSampler, NoSamplingExternalMemory) {
auto page = (*dmat->GetBatches<EllpackPage>(&ctx, param).begin()).Impl();
EXPECT_NE(page->n_rows, kRows);
GradientBasedSampler sampler(&ctx, page, kRows, param, kSubsample, TrainParam::kUniform);
GradientBasedSampler sampler(&ctx, kRows, param, kSubsample, TrainParam::kUniform, true);
auto sample = sampler.Sample(&ctx, gpair.DeviceSpan(), dmat.get());
auto sampled_page = sample.page;
EXPECT_EQ(sample.sample_rows, kRows);
@@ -141,7 +142,8 @@ TEST(GradientBasedSampler, GradientBasedSampling) {
constexpr size_t kPageSize = 0;
constexpr float kSubsample = 0.8;
constexpr int kSamplingMethod = TrainParam::kGradientBased;
VerifySampling(kPageSize, kSubsample, kSamplingMethod);
constexpr bool kFixedSizeSampling = true;
VerifySampling(kPageSize, kSubsample, kSamplingMethod, kFixedSizeSampling);
}
TEST(GradientBasedSampler, GradientBasedSamplingExternalMemory) {

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2019 XGBoost contributors
/**
* Copyright 2019-2023, XGBoost contributors
*/
#include <gtest/gtest.h>
#include <thrust/copy.h>
@@ -53,7 +53,7 @@ void CompareBitField(LBitField64 d_field, std::set<uint32_t> positions) {
LBitField64 h_field{ {h_field_storage.data(),
h_field_storage.data() + h_field_storage.size()} };
for (size_t i = 0; i < h_field.Size(); ++i) {
for (size_t i = 0; i < h_field.Capacity(); ++i) {
if (positions.find(i) != positions.cend()) {
ASSERT_TRUE(h_field.Check(i));
} else {
@@ -82,7 +82,7 @@ TEST(GPUFeatureInteractionConstraint, Init) {
{h_node_storage.data(), h_node_storage.data() + h_node_storage.size()}
};
// no feature is attached to node.
for (size_t i = 0; i < h_node.Size(); ++i) {
for (size_t i = 0; i < h_node.Capacity(); ++i) {
ASSERT_FALSE(h_node.Check(i));
}
}

View File

@@ -92,8 +92,8 @@ void TestBuildHist(bool use_shared_memory_histograms) {
auto page = BuildEllpackPage(kNRows, kNCols);
BatchParam batch_param{};
Context ctx{MakeCUDACtx(0)};
GPUHistMakerDevice<GradientSumT> maker(&ctx, page.get(), {}, kNRows, param, kNCols, kNCols,
batch_param);
GPUHistMakerDevice<GradientSumT> maker(&ctx, /*is_external_memory=*/false, {}, kNRows, param,
kNCols, kNCols, batch_param);
xgboost::SimpleLCG gen;
xgboost::SimpleRealUniformDistribution<bst_float> dist(0.0f, 1.0f);
HostDeviceVector<GradientPair> gpair(kNRows);
@@ -106,9 +106,15 @@ void TestBuildHist(bool use_shared_memory_histograms) {
thrust::host_vector<common::CompressedByteT> h_gidx_buffer (page->gidx_buffer.HostVector());
maker.row_partitioner.reset(new RowPartitioner(0, kNRows));
maker.hist.Init(0, page->Cuts().TotalBins());
maker.hist.AllocateHistograms({0});
maker.gpair = gpair.DeviceSpan();
maker.quantiser.reset(new GradientQuantiser(maker.gpair));
maker.page = page.get();
maker.InitFeatureGroupsOnce();
BuildGradientHistogram(ctx.CUDACtx(), page->GetDeviceAccessor(0),
maker.feature_groups->DeviceAccessor(0), gpair.DeviceSpan(),
@@ -126,8 +132,8 @@ void TestBuildHist(bool use_shared_memory_histograms) {
std::vector<GradientPairPrecise> solution = GetHostHistGpair();
for (size_t i = 0; i < h_result.size(); ++i) {
auto result = maker.quantiser->ToFloatingPoint(h_result[i]);
EXPECT_NEAR(result.GetGrad(), solution[i].GetGrad(), 0.01f);
EXPECT_NEAR(result.GetHess(), solution[i].GetHess(), 0.01f);
ASSERT_NEAR(result.GetGrad(), solution[i].GetGrad(), 0.01f);
ASSERT_NEAR(result.GetHess(), solution[i].GetHess(), 0.01f);
}
}

View File

@@ -305,7 +305,7 @@ class IterForDMatrixTest(xgb.core.DataIter):
self._labels = [rng.randn(self.rows)] * self.BATCHES
self.it = 0 # set iterator to 0
super().__init__()
super().__init__(cache_prefix=None)
def as_array(self):
import cudf

View File

@@ -64,7 +64,8 @@ def run_data_iterator(
subsample_rate = 0.8 if subsample else 1.0
it = IteratorForTest(
*make_batches(n_samples_per_batch, n_features, n_batches, use_cupy)
*make_batches(n_samples_per_batch, n_features, n_batches, use_cupy),
cache="cache"
)
if n_batches == 0:
with pytest.raises(ValueError, match="1 batch"):

View File

@@ -253,9 +253,12 @@ class TestQuantileDMatrix:
self.run_ref_dmatrix(rng, "hist", True)
self.run_ref_dmatrix(rng, "hist", False)
def test_predict(self) -> None:
n_samples, n_features = 16, 2
X, y = make_categorical(n_samples, n_features, n_categories=13, onehot=False)
@pytest.mark.parametrize("sparsity", [0.0, 0.5])
def test_predict(self, sparsity: float) -> None:
n_samples, n_features = 256, 4
X, y = make_categorical(
n_samples, n_features, n_categories=13, onehot=False, sparsity=sparsity
)
Xy = xgb.DMatrix(X, y, enable_categorical=True)
booster = xgb.train({"tree_method": "hist"}, Xy)