Compare commits
8 Commits
dependabot
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42d8b06e0a | ||
|
|
cfa9c42eb4 | ||
|
|
6efe7c129f | ||
|
|
54da4b3185 | ||
|
|
4066d68261 | ||
|
|
6d22ea793c | ||
|
|
ee6809e642 | ||
|
|
d8beb517ed |
@@ -18,13 +18,11 @@
|
||||
publisher={Institute of Mathematical Statistics}
|
||||
}
|
||||
|
||||
|
||||
@misc{
|
||||
Bache+Lichman:2013 ,
|
||||
author = "K. Bache and M. Lichman",
|
||||
year = "2013",
|
||||
title = "{UCI} Machine Learning Repository",
|
||||
url = "http://archive.ics.uci.edu/ml/",
|
||||
institution = "University of California, Irvine, School of Information and Computer Sciences"
|
||||
url = "https://archive.ics.uci.edu/",
|
||||
institution = "University of California, Irvine, School of Information and Computer Sciences"
|
||||
}
|
||||
|
||||
|
||||
@@ -11,33 +11,43 @@ import numpy as np
|
||||
|
||||
import xgboost as xgb
|
||||
|
||||
plt.rcParams.update({'font.size': 13})
|
||||
plt.rcParams.update({"font.size": 13})
|
||||
|
||||
|
||||
# Function to visualize censored labels
|
||||
def plot_censored_labels(X, y_lower, y_upper):
|
||||
def replace_inf(x, target_value):
|
||||
def plot_censored_labels(
|
||||
X: np.ndarray, y_lower: np.ndarray, y_upper: np.ndarray
|
||||
) -> None:
|
||||
def replace_inf(x: np.ndarray, target_value: float) -> np.ndarray:
|
||||
x[np.isinf(x)] = target_value
|
||||
return x
|
||||
plt.plot(X, y_lower, 'o', label='y_lower', color='blue')
|
||||
plt.plot(X, y_upper, 'o', label='y_upper', color='fuchsia')
|
||||
plt.vlines(X, ymin=replace_inf(y_lower, 0.01), ymax=replace_inf(y_upper, 1000),
|
||||
label='Range for y', color='gray')
|
||||
|
||||
plt.plot(X, y_lower, "o", label="y_lower", color="blue")
|
||||
plt.plot(X, y_upper, "o", label="y_upper", color="fuchsia")
|
||||
plt.vlines(
|
||||
X,
|
||||
ymin=replace_inf(y_lower, 0.01),
|
||||
ymax=replace_inf(y_upper, 1000.0),
|
||||
label="Range for y",
|
||||
color="gray",
|
||||
)
|
||||
|
||||
|
||||
# Toy data
|
||||
X = np.array([1, 2, 3, 4, 5]).reshape((-1, 1))
|
||||
INF = np.inf
|
||||
y_lower = np.array([ 10, 15, -INF, 30, 100])
|
||||
y_upper = np.array([INF, INF, 20, 50, INF])
|
||||
y_lower = np.array([10, 15, -INF, 30, 100])
|
||||
y_upper = np.array([INF, INF, 20, 50, INF])
|
||||
|
||||
# Visualize toy data
|
||||
plt.figure(figsize=(5, 4))
|
||||
plot_censored_labels(X, y_lower, y_upper)
|
||||
plt.ylim((6, 200))
|
||||
plt.legend(loc='lower right')
|
||||
plt.title('Toy data')
|
||||
plt.xlabel('Input feature')
|
||||
plt.ylabel('Label')
|
||||
plt.yscale('log')
|
||||
plt.legend(loc="lower right")
|
||||
plt.title("Toy data")
|
||||
plt.xlabel("Input feature")
|
||||
plt.ylabel("Label")
|
||||
plt.yscale("log")
|
||||
plt.tight_layout()
|
||||
plt.show(block=True)
|
||||
|
||||
@@ -46,54 +56,83 @@ grid_pts = np.linspace(0.8, 5.2, 1000).reshape((-1, 1))
|
||||
|
||||
# Train AFT model using XGBoost
|
||||
dmat = xgb.DMatrix(X)
|
||||
dmat.set_float_info('label_lower_bound', y_lower)
|
||||
dmat.set_float_info('label_upper_bound', y_upper)
|
||||
params = {'max_depth': 3, 'objective':'survival:aft', 'min_child_weight': 0}
|
||||
dmat.set_float_info("label_lower_bound", y_lower)
|
||||
dmat.set_float_info("label_upper_bound", y_upper)
|
||||
params = {"max_depth": 3, "objective": "survival:aft", "min_child_weight": 0}
|
||||
|
||||
accuracy_history = []
|
||||
def plot_intermediate_model_callback(env):
|
||||
"""Custom callback to plot intermediate models"""
|
||||
# Compute y_pred = prediction using the intermediate model, at current boosting iteration
|
||||
y_pred = env.model.predict(dmat)
|
||||
# "Accuracy" = the number of data points whose ranged label (y_lower, y_upper) includes
|
||||
# the corresponding predicted label (y_pred)
|
||||
acc = np.sum(np.logical_and(y_pred >= y_lower, y_pred <= y_upper)/len(X) * 100)
|
||||
accuracy_history.append(acc)
|
||||
|
||||
# Plot ranged labels as well as predictions by the model
|
||||
plt.subplot(5, 3, env.iteration + 1)
|
||||
plot_censored_labels(X, y_lower, y_upper)
|
||||
y_pred_grid_pts = env.model.predict(xgb.DMatrix(grid_pts))
|
||||
plt.plot(grid_pts, y_pred_grid_pts, 'r-', label='XGBoost AFT model', linewidth=4)
|
||||
plt.title('Iteration {}'.format(env.iteration), x=0.5, y=0.8)
|
||||
plt.xlim((0.8, 5.2))
|
||||
plt.ylim((1 if np.min(y_pred) < 6 else 6, 200))
|
||||
plt.yscale('log')
|
||||
|
||||
res = {}
|
||||
plt.figure(figsize=(12,13))
|
||||
bst = xgb.train(params, dmat, 15, [(dmat, 'train')], evals_result=res,
|
||||
callbacks=[plot_intermediate_model_callback])
|
||||
class PlotIntermediateModel(xgb.callback.TrainingCallback):
|
||||
"""Custom callback to plot intermediate models."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def after_iteration(
|
||||
self,
|
||||
model: xgb.Booster,
|
||||
epoch: int,
|
||||
evals_log: xgb.callback.TrainingCallback.EvalsLog,
|
||||
) -> bool:
|
||||
"""Run after training is finished."""
|
||||
# Compute y_pred = prediction using the intermediate model, at current boosting
|
||||
# iteration
|
||||
y_pred = model.predict(dmat)
|
||||
# "Accuracy" = the number of data points whose ranged label (y_lower, y_upper)
|
||||
# includes the corresponding predicted label (y_pred)
|
||||
acc = np.sum(
|
||||
np.logical_and(y_pred >= y_lower, y_pred <= y_upper) / len(X) * 100
|
||||
)
|
||||
accuracy_history.append(acc)
|
||||
|
||||
# Plot ranged labels as well as predictions by the model
|
||||
plt.subplot(5, 3, epoch + 1)
|
||||
plot_censored_labels(X, y_lower, y_upper)
|
||||
y_pred_grid_pts = model.predict(xgb.DMatrix(grid_pts))
|
||||
plt.plot(
|
||||
grid_pts, y_pred_grid_pts, "r-", label="XGBoost AFT model", linewidth=4
|
||||
)
|
||||
plt.title("Iteration {}".format(epoch), x=0.5, y=0.8)
|
||||
plt.xlim((0.8, 5.2))
|
||||
plt.ylim((1 if np.min(y_pred) < 6 else 6, 200))
|
||||
plt.yscale("log")
|
||||
return False
|
||||
|
||||
|
||||
res: xgb.callback.TrainingCallback.EvalsLog = {}
|
||||
plt.figure(figsize=(12, 13))
|
||||
bst = xgb.train(
|
||||
params,
|
||||
dmat,
|
||||
15,
|
||||
[(dmat, "train")],
|
||||
evals_result=res,
|
||||
callbacks=[PlotIntermediateModel()],
|
||||
)
|
||||
plt.tight_layout()
|
||||
plt.legend(loc='lower center', ncol=4,
|
||||
bbox_to_anchor=(0.5, 0),
|
||||
bbox_transform=plt.gcf().transFigure)
|
||||
plt.legend(
|
||||
loc="lower center",
|
||||
ncol=4,
|
||||
bbox_to_anchor=(0.5, 0),
|
||||
bbox_transform=plt.gcf().transFigure,
|
||||
)
|
||||
plt.tight_layout()
|
||||
|
||||
# Plot negative log likelihood over boosting iterations
|
||||
plt.figure(figsize=(8,3))
|
||||
plt.figure(figsize=(8, 3))
|
||||
plt.subplot(1, 2, 1)
|
||||
plt.plot(res['train']['aft-nloglik'], 'b-o', label='aft-nloglik')
|
||||
plt.xlabel('# Boosting Iterations')
|
||||
plt.legend(loc='best')
|
||||
plt.plot(res["train"]["aft-nloglik"], "b-o", label="aft-nloglik")
|
||||
plt.xlabel("# Boosting Iterations")
|
||||
plt.legend(loc="best")
|
||||
|
||||
# Plot "accuracy" over boosting iterations
|
||||
# "Accuracy" = the number of data points whose ranged label (y_lower, y_upper) includes
|
||||
# the corresponding predicted label (y_pred)
|
||||
plt.subplot(1, 2, 2)
|
||||
plt.plot(accuracy_history, 'r-o', label='Accuracy (%)')
|
||||
plt.xlabel('# Boosting Iterations')
|
||||
plt.legend(loc='best')
|
||||
plt.plot(accuracy_history, "r-o", label="Accuracy (%)")
|
||||
plt.xlabel("# Boosting Iterations")
|
||||
plt.legend(loc="best")
|
||||
plt.tight_layout()
|
||||
|
||||
plt.show()
|
||||
|
||||
@@ -82,10 +82,10 @@ def main(tmpdir: str) -> xgboost.Booster:
|
||||
missing = np.NaN
|
||||
Xy = xgboost.DMatrix(it, missing=missing, enable_categorical=False)
|
||||
|
||||
# Other tree methods including ``hist`` and ``gpu_hist`` also work, see tutorial in
|
||||
# doc for details.
|
||||
# Other tree methods including ``approx``, and ``gpu_hist`` are supported. GPU
|
||||
# behaves differently than CPU tree methods. See tutorial in doc for details.
|
||||
booster = xgboost.train(
|
||||
{"tree_method": "approx", "max_depth": 2},
|
||||
{"tree_method": "hist", "max_depth": 4},
|
||||
Xy,
|
||||
evals=[(Xy, "Train")],
|
||||
num_boost_round=10,
|
||||
|
||||
@@ -2,11 +2,25 @@
|
||||
Using XGBoost External Memory Version
|
||||
#####################################
|
||||
|
||||
XGBoost supports loading data from external memory using builtin data parser. And
|
||||
starting from version 1.5, users can also define a custom iterator to load data in chunks.
|
||||
The feature is still experimental and not yet ready for production use. In this tutorial
|
||||
we will introduce both methods. Please note that training on data from external memory is
|
||||
not supported by ``exact`` tree method.
|
||||
When working with large datasets, training XGBoost models can be challenging as the entire
|
||||
dataset needs to be loaded into memory. This can be costly and sometimes
|
||||
infeasible. Staring from 1.5, users can define a custom iterator to load data in chunks
|
||||
for running XGBoost algorithms. External memory can be used for both training and
|
||||
prediction, but training is the primary use case and it will be our focus in this
|
||||
tutorial. For prediction and evaluation, users can iterate through the data themseleves
|
||||
while training requires the full dataset to be loaded into the memory.
|
||||
|
||||
During training, there are two different modes for external memory support available in
|
||||
XGBoost, one for CPU-based algorithms like ``hist`` and ``approx``, another one for the
|
||||
GPU-based training algorithm. We will introduce them in the following sections.
|
||||
|
||||
.. note::
|
||||
|
||||
Training on data from external memory is not supported by the ``exact`` tree method.
|
||||
|
||||
.. note::
|
||||
|
||||
The feature is still experimental as of 2.0. The performance is not well optimized.
|
||||
|
||||
*************
|
||||
Data Iterator
|
||||
@@ -15,8 +29,8 @@ Data Iterator
|
||||
Starting from XGBoost 1.5, users can define their own data loader using Python or C
|
||||
interface. There are some examples in the ``demo`` directory for quick start. This is a
|
||||
generalized version of text input external memory, where users no longer need to prepare a
|
||||
text file that XGBoost recognizes. To enable the feature, user need to define a data
|
||||
iterator with 2 class methods ``next`` and ``reset`` then pass it into ``DMatrix``
|
||||
text file that XGBoost recognizes. To enable the feature, users need to define a data
|
||||
iterator with 2 class methods: ``next`` and ``reset``, then pass it into the ``DMatrix``
|
||||
constructor.
|
||||
|
||||
.. code-block:: python
|
||||
@@ -60,20 +74,96 @@ constructor.
|
||||
|
||||
# Other tree methods including ``hist`` and ``gpu_hist`` also work, but has some caveats
|
||||
# as noted in following sections.
|
||||
booster = xgboost.train({"tree_method": "approx"}, Xy)
|
||||
booster = xgboost.train({"tree_method": "hist"}, Xy)
|
||||
|
||||
|
||||
The above snippet is a simplified version of ``demo/guide-python/external_memory.py``. For
|
||||
an example in C, please see ``demo/c-api/external-memory/``.
|
||||
The above snippet is a simplified version of :ref:`sphx_glr_python_examples_external_memory.py`.
|
||||
For an example in C, please see ``demo/c-api/external-memory/``. The iterator is the
|
||||
common interface for using external memory with XGBoost, you can pass the resulting
|
||||
``DMatrix`` object for training, prediction, and evaluation.
|
||||
|
||||
It is important to set the batch size based on the memory available. A good starting point
|
||||
is to set the batch size to 10GB per batch if you have 64GB of memory. It is *not*
|
||||
recommended to set small batch sizes like 32 samples per batch, as this can seriously hurt
|
||||
performance in gradient boosting.
|
||||
|
||||
***********
|
||||
CPU Version
|
||||
***********
|
||||
|
||||
In the previous section, we demonstrated how to train a tree-based model using the
|
||||
``hist`` tree method on a CPU. This method involves iterating through data batches stored
|
||||
in a cache during tree construction. For optimal performance, we recommend using the
|
||||
``grow_policy=depthwise`` setting, which allows XGBoost to build an entire layer of tree
|
||||
nodes with only a few batch iterations. Conversely, using the ``lossguide`` policy
|
||||
requires XGBoost to iterate over the data set for each tree node, resulting in slower
|
||||
performance.
|
||||
|
||||
If external memory is used, the performance of CPU training is limited by IO
|
||||
(input/output) speed. This means that the disk IO speed primarily determines the training
|
||||
speed. During benchmarking, we used an NVMe connected to a PCIe-4 slot, other types of
|
||||
storage can be too slow for practical usage. In addition, your system may perform caching
|
||||
to reduce the overhead of file reading.
|
||||
|
||||
**********************************
|
||||
GPU Version (GPU Hist tree method)
|
||||
**********************************
|
||||
|
||||
External memory is supported by GPU algorithms (i.e. when ``tree_method`` is set to
|
||||
``gpu_hist``). However, the algorithm used for GPU is different from the one used for
|
||||
CPU. When training on a CPU, the tree method iterates through all batches from external
|
||||
memory for each step of the tree construction algorithm. On the other hand, the GPU
|
||||
algorithm concatenates all batches into one and stores it in GPU memory. To reduce overall
|
||||
memory usage, users can utilize subsampling. The good news is that the GPU hist tree
|
||||
method supports gradient-based sampling, enabling users to set a low sampling rate without
|
||||
compromising accuracy.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
param = {
|
||||
...
|
||||
'subsample': 0.2,
|
||||
'sampling_method': 'gradient_based',
|
||||
}
|
||||
|
||||
For more information about the sampling algorithm and its use in external memory training,
|
||||
see `this paper <https://arxiv.org/abs/2005.09148>`_.
|
||||
|
||||
.. warning::
|
||||
|
||||
When GPU is running out of memory during iteration on external memory, user might
|
||||
recieve a segfault instead of an OOM exception.
|
||||
|
||||
*******
|
||||
Remarks
|
||||
*******
|
||||
|
||||
When using external memory with XBGoost, data is divided into smaller chunks so that only
|
||||
a fraction of it needs to be stored in memory at any given time. It's important to note
|
||||
that this method only applies to the predictor data (``X``), while other data, like labels
|
||||
and internal runtime structures are concatenated. This means that memory reduction is most
|
||||
effective when dealing with wide datasets where ``X`` is larger compared to other data
|
||||
like ``y``, while it has little impact on slim datasets.
|
||||
|
||||
Starting with XGBoost 2.0, the implementation of external memory uses ``mmap``. It is not
|
||||
yet tested against system errors like disconnected network devices (`SIGBUS`). Also, it's
|
||||
worth noting that most tests have been conducted on Linux distributions.
|
||||
|
||||
Another important point to keep in mind is that creating the initial cache for XGBoost may
|
||||
take some time. The interface to external memory is through custom iterators, which may or
|
||||
may not be thread-safe. Therefore, initialization is performed sequentially.
|
||||
|
||||
|
||||
****************
|
||||
Text File Inputs
|
||||
****************
|
||||
|
||||
There is no big difference between using external memory version and in-memory version.
|
||||
The only difference is the filename format.
|
||||
This is the original form of external memory support, users are encouraged to use custom
|
||||
data iterator instead. There is no big difference between using external memory version of
|
||||
text input and the in-memory version. The only difference is the filename format.
|
||||
|
||||
The external memory version takes in the following `URI <https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_ format:
|
||||
The external memory version takes in the following `URI
|
||||
<https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_ format:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
@@ -91,9 +181,8 @@ To load from csv files, use the following syntax:
|
||||
|
||||
where ``label_column`` should point to the csv column acting as the label.
|
||||
|
||||
To provide a simple example for illustration, extracting the code from
|
||||
`demo/guide-python/external_memory.py <https://github.com/dmlc/xgboost/blob/master/demo/guide-python/external_memory.py>`_. If
|
||||
you have a dataset stored in a file similar to ``agaricus.txt.train`` with LIBSVM format, the external memory support can be enabled by:
|
||||
If you have a dataset stored in a file similar to ``demo/data/agaricus.txt.train`` with LIBSVM
|
||||
format, the external memory support can be enabled by:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@@ -104,35 +193,3 @@ XGBoost will first load ``agaricus.txt.train`` in, preprocess it, then write to
|
||||
more notes about text input formats, see :doc:`/tutorials/input_format`.
|
||||
|
||||
For CLI version, simply add the cache suffix, e.g. ``"../data/agaricus.txt.train?format=libsvm#dtrain.cache"``.
|
||||
|
||||
|
||||
**********************************
|
||||
GPU Version (GPU Hist tree method)
|
||||
**********************************
|
||||
External memory is supported in GPU algorithms (i.e. when ``tree_method`` is set to ``gpu_hist``).
|
||||
|
||||
If you are still getting out-of-memory errors after enabling external memory, try subsampling the
|
||||
data to further reduce GPU memory usage:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
param = {
|
||||
...
|
||||
'subsample': 0.1,
|
||||
'sampling_method': 'gradient_based',
|
||||
}
|
||||
|
||||
For more information, see `this paper <https://arxiv.org/abs/2005.09148>`_. Internally
|
||||
the tree method still concatenate all the chunks into 1 final histogram index due to
|
||||
performance reason, but in compressed format. So its scalability has an upper bound but
|
||||
still has lower memory cost in general.
|
||||
|
||||
***********
|
||||
CPU Version
|
||||
***********
|
||||
|
||||
For CPU histogram based tree methods (``approx``, ``hist``) it's recommended to use
|
||||
``grow_policy=depthwise`` for performance reason. Iterating over data batches is slow,
|
||||
with ``depthwise`` policy XGBoost can build a entire layer of tree nodes with a few
|
||||
iterations, while with ``lossguide`` XGBoost needs to iterate over the data set for each
|
||||
tree node.
|
||||
|
||||
@@ -44,10 +44,10 @@
|
||||
<log.capi.invocation>OFF</log.capi.invocation>
|
||||
<use.cuda>OFF</use.cuda>
|
||||
<cudf.version>23.04.0</cudf.version>
|
||||
<spark.rapids.version>23.04.1</spark.rapids.version>
|
||||
<spark.rapids.version>23.06.0</spark.rapids.version>
|
||||
<cudf.classifier>cuda11</cudf.classifier>
|
||||
<scalatest.version>3.2.16</scalatest.version>
|
||||
<scala-collection-compat.version>2.11.0</scala-collection-compat.version>
|
||||
<scala-collection-compat.version>2.10.0</scala-collection-compat.version>
|
||||
</properties>
|
||||
<repositories>
|
||||
<repository>
|
||||
|
||||
@@ -381,17 +381,21 @@ __model_doc = f"""
|
||||
every **early_stopping_rounds** round(s) to continue training. Requires at
|
||||
least one item in **eval_set** in :py:meth:`fit`.
|
||||
|
||||
- The method returns the model from the last iteration, not the best one, use a
|
||||
callback :py:class:`xgboost.callback.EarlyStopping` if returning the best
|
||||
model is preferred.
|
||||
- If early stopping occurs, the model will have two additional attributes:
|
||||
:py:attr:`best_score` and :py:attr:`best_iteration`. These are used by the
|
||||
:py:meth:`predict` and :py:meth:`apply` methods to determine the optimal
|
||||
number of trees during inference. If users want to access the full model
|
||||
(including trees built after early stopping), they can specify the
|
||||
`iteration_range` in these inference methods. In addition, other utilities
|
||||
like model plotting can also use the entire model.
|
||||
|
||||
- If you prefer to discard the trees after `best_iteration`, consider using the
|
||||
callback function :py:class:`xgboost.callback.EarlyStopping`.
|
||||
|
||||
- If there's more than one item in **eval_set**, the last entry will be used for
|
||||
early stopping. If there's more than one metric in **eval_metric**, the last
|
||||
metric will be used for early stopping.
|
||||
|
||||
- If early stopping occurs, the model will have three additional fields:
|
||||
:py:attr:`best_score`, :py:attr:`best_iteration`.
|
||||
|
||||
.. note::
|
||||
|
||||
This parameter replaces `early_stopping_rounds` in :py:meth:`fit` method.
|
||||
|
||||
@@ -198,14 +198,14 @@ class IteratorForTest(xgb.core.DataIter):
|
||||
X: Sequence,
|
||||
y: Sequence,
|
||||
w: Optional[Sequence],
|
||||
cache: Optional[str] = "./",
|
||||
cache: Optional[str],
|
||||
) -> None:
|
||||
assert len(X) == len(y)
|
||||
self.X = X
|
||||
self.y = y
|
||||
self.w = w
|
||||
self.it = 0
|
||||
super().__init__(cache)
|
||||
super().__init__(cache_prefix=cache)
|
||||
|
||||
def next(self, input_data: Callable) -> int:
|
||||
if self.it == len(self.X):
|
||||
@@ -347,7 +347,9 @@ class TestDataset:
|
||||
if w is not None:
|
||||
weight.append(w)
|
||||
|
||||
it = IteratorForTest(predictor, response, weight if weight else None)
|
||||
it = IteratorForTest(
|
||||
predictor, response, weight if weight else None, cache="cache"
|
||||
)
|
||||
return xgb.DMatrix(it)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
/*!
|
||||
* Copyright (c) 2014-2019 by Contributors
|
||||
/**
|
||||
* Copyright 2014-2023, XGBoost Contributors
|
||||
* \file io.h
|
||||
* \brief utilities with different serializable implementations
|
||||
* \author Tianqi Chen
|
||||
*/
|
||||
#ifndef RABIT_INTERNAL_IO_H_
|
||||
#define RABIT_INTERNAL_IO_H_
|
||||
#include <cstdio>
|
||||
#include <vector>
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
|
||||
#include <algorithm>
|
||||
#include <numeric>
|
||||
#include <cstddef> // for size_t
|
||||
#include <cstdio>
|
||||
#include <cstring> // for memcpy
|
||||
#include <limits>
|
||||
#include <numeric>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "rabit/internal/utils.h"
|
||||
#include "rabit/serializable.h"
|
||||
|
||||
@@ -20,54 +23,61 @@ namespace rabit {
|
||||
namespace utils {
|
||||
/*! \brief re-use definition of dmlc::SeekStream */
|
||||
using SeekStream = dmlc::SeekStream;
|
||||
/*! \brief fixed size memory buffer */
|
||||
/**
|
||||
* @brief Fixed size memory buffer as a stream.
|
||||
*/
|
||||
struct MemoryFixSizeBuffer : public SeekStream {
|
||||
public:
|
||||
// similar to SEEK_END in libc
|
||||
static size_t constexpr kSeekEnd = std::numeric_limits<size_t>::max();
|
||||
static std::size_t constexpr kSeekEnd = std::numeric_limits<std::size_t>::max();
|
||||
|
||||
protected:
|
||||
MemoryFixSizeBuffer() = default;
|
||||
|
||||
public:
|
||||
MemoryFixSizeBuffer(void *p_buffer, size_t buffer_size)
|
||||
: p_buffer_(reinterpret_cast<char*>(p_buffer)),
|
||||
buffer_size_(buffer_size) {
|
||||
curr_ptr_ = 0;
|
||||
}
|
||||
/**
|
||||
* @brief Ctor
|
||||
*
|
||||
* @param p_buffer Pointer to the source buffer with size `buffer_size`.
|
||||
* @param buffer_size Size of the source buffer
|
||||
*/
|
||||
MemoryFixSizeBuffer(void *p_buffer, std::size_t buffer_size)
|
||||
: p_buffer_(reinterpret_cast<char *>(p_buffer)), buffer_size_(buffer_size) {}
|
||||
~MemoryFixSizeBuffer() override = default;
|
||||
size_t Read(void *ptr, size_t size) override {
|
||||
size_t nread = std::min(buffer_size_ - curr_ptr_, size);
|
||||
|
||||
std::size_t Read(void *ptr, std::size_t size) override {
|
||||
std::size_t nread = std::min(buffer_size_ - curr_ptr_, size);
|
||||
if (nread != 0) std::memcpy(ptr, p_buffer_ + curr_ptr_, nread);
|
||||
curr_ptr_ += nread;
|
||||
return nread;
|
||||
}
|
||||
void Write(const void *ptr, size_t size) override {
|
||||
void Write(const void *ptr, std::size_t size) override {
|
||||
if (size == 0) return;
|
||||
utils::Assert(curr_ptr_ + size <= buffer_size_,
|
||||
"write position exceed fixed buffer size");
|
||||
CHECK_LE(curr_ptr_ + size, buffer_size_);
|
||||
std::memcpy(p_buffer_ + curr_ptr_, ptr, size);
|
||||
curr_ptr_ += size;
|
||||
}
|
||||
void Seek(size_t pos) override {
|
||||
void Seek(std::size_t pos) override {
|
||||
if (pos == kSeekEnd) {
|
||||
curr_ptr_ = buffer_size_;
|
||||
} else {
|
||||
curr_ptr_ = static_cast<size_t>(pos);
|
||||
curr_ptr_ = static_cast<std::size_t>(pos);
|
||||
}
|
||||
}
|
||||
size_t Tell() override {
|
||||
return curr_ptr_;
|
||||
}
|
||||
virtual bool AtEnd() const {
|
||||
return curr_ptr_ == buffer_size_;
|
||||
}
|
||||
/**
|
||||
* @brief Current position in the buffer (stream).
|
||||
*/
|
||||
std::size_t Tell() override { return curr_ptr_; }
|
||||
virtual bool AtEnd() const { return curr_ptr_ == buffer_size_; }
|
||||
|
||||
private:
|
||||
protected:
|
||||
/*! \brief in memory buffer */
|
||||
char *p_buffer_;
|
||||
char *p_buffer_{nullptr};
|
||||
/*! \brief current pointer */
|
||||
size_t buffer_size_;
|
||||
std::size_t buffer_size_{0};
|
||||
/*! \brief current pointer */
|
||||
size_t curr_ptr_;
|
||||
}; // class MemoryFixSizeBuffer
|
||||
std::size_t curr_ptr_{0};
|
||||
};
|
||||
|
||||
/*! \brief a in memory buffer that can be read and write as stream interface */
|
||||
struct MemoryBufferStream : public SeekStream {
|
||||
|
||||
228
src/collective/nccl_device_communicator.cu
Normal file
228
src/collective/nccl_device_communicator.cu
Normal file
@@ -0,0 +1,228 @@
|
||||
/*!
|
||||
* Copyright 2023 XGBoost contributors
|
||||
*/
|
||||
#if defined(XGBOOST_USE_NCCL)
|
||||
#include "nccl_device_communicator.cuh"
|
||||
|
||||
namespace xgboost {
|
||||
namespace collective {
|
||||
|
||||
NcclDeviceCommunicator::NcclDeviceCommunicator(int device_ordinal, Communicator *communicator)
|
||||
: device_ordinal_{device_ordinal}, communicator_{communicator} {
|
||||
if (device_ordinal_ < 0) {
|
||||
LOG(FATAL) << "Invalid device ordinal: " << device_ordinal_;
|
||||
}
|
||||
if (communicator_ == nullptr) {
|
||||
LOG(FATAL) << "Communicator cannot be null.";
|
||||
}
|
||||
|
||||
int32_t const rank = communicator_->GetRank();
|
||||
int32_t const world = communicator_->GetWorldSize();
|
||||
|
||||
if (world == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> uuids(world * kUuidLength, 0);
|
||||
auto s_uuid = xgboost::common::Span<uint64_t>{uuids.data(), uuids.size()};
|
||||
auto s_this_uuid = s_uuid.subspan(rank * kUuidLength, kUuidLength);
|
||||
GetCudaUUID(s_this_uuid);
|
||||
|
||||
// TODO(rongou): replace this with allgather.
|
||||
communicator_->AllReduce(uuids.data(), uuids.size(), DataType::kUInt64, Operation::kSum);
|
||||
|
||||
std::vector<xgboost::common::Span<uint64_t, kUuidLength>> converted(world);
|
||||
size_t j = 0;
|
||||
for (size_t i = 0; i < uuids.size(); i += kUuidLength) {
|
||||
converted[j] = xgboost::common::Span<uint64_t, kUuidLength>{uuids.data() + i, kUuidLength};
|
||||
j++;
|
||||
}
|
||||
|
||||
auto iter = std::unique(converted.begin(), converted.end());
|
||||
auto n_uniques = std::distance(converted.begin(), iter);
|
||||
|
||||
CHECK_EQ(n_uniques, world)
|
||||
<< "Multiple processes within communication group running on same CUDA "
|
||||
<< "device is not supported. " << PrintUUID(s_this_uuid) << "\n";
|
||||
|
||||
nccl_unique_id_ = GetUniqueId();
|
||||
dh::safe_cuda(cudaSetDevice(device_ordinal_));
|
||||
dh::safe_nccl(ncclCommInitRank(&nccl_comm_, world, nccl_unique_id_, rank));
|
||||
dh::safe_cuda(cudaStreamCreate(&cuda_stream_));
|
||||
}
|
||||
|
||||
NcclDeviceCommunicator::~NcclDeviceCommunicator() {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
if (cuda_stream_) {
|
||||
dh::safe_cuda(cudaStreamDestroy(cuda_stream_));
|
||||
}
|
||||
if (nccl_comm_) {
|
||||
dh::safe_nccl(ncclCommDestroy(nccl_comm_));
|
||||
}
|
||||
if (xgboost::ConsoleLogger::ShouldLog(xgboost::ConsoleLogger::LV::kDebug)) {
|
||||
LOG(CONSOLE) << "======== NCCL Statistics========";
|
||||
LOG(CONSOLE) << "AllReduce calls: " << allreduce_calls_;
|
||||
LOG(CONSOLE) << "AllReduce total MiB communicated: " << allreduce_bytes_ / 1048576;
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
ncclDataType_t GetNcclDataType(DataType const &data_type) {
|
||||
ncclDataType_t result{ncclInt8};
|
||||
switch (data_type) {
|
||||
case DataType::kInt8:
|
||||
result = ncclInt8;
|
||||
break;
|
||||
case DataType::kUInt8:
|
||||
result = ncclUint8;
|
||||
break;
|
||||
case DataType::kInt32:
|
||||
result = ncclInt32;
|
||||
break;
|
||||
case DataType::kUInt32:
|
||||
result = ncclUint32;
|
||||
break;
|
||||
case DataType::kInt64:
|
||||
result = ncclInt64;
|
||||
break;
|
||||
case DataType::kUInt64:
|
||||
result = ncclUint64;
|
||||
break;
|
||||
case DataType::kFloat:
|
||||
result = ncclFloat;
|
||||
break;
|
||||
case DataType::kDouble:
|
||||
result = ncclDouble;
|
||||
break;
|
||||
default:
|
||||
LOG(FATAL) << "Unknown data type.";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool IsBitwiseOp(Operation const &op) {
|
||||
return op == Operation::kBitwiseAND || op == Operation::kBitwiseOR ||
|
||||
op == Operation::kBitwiseXOR;
|
||||
}
|
||||
|
||||
ncclRedOp_t GetNcclRedOp(Operation const &op) {
|
||||
ncclRedOp_t result{ncclMax};
|
||||
switch (op) {
|
||||
case Operation::kMax:
|
||||
result = ncclMax;
|
||||
break;
|
||||
case Operation::kMin:
|
||||
result = ncclMin;
|
||||
break;
|
||||
case Operation::kSum:
|
||||
result = ncclSum;
|
||||
break;
|
||||
default:
|
||||
LOG(FATAL) << "Unsupported reduce operation.";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
void RunBitwiseAllreduce(char *out_buffer, char const *device_buffer, Func func, int world_size,
|
||||
std::size_t size, cudaStream_t stream) {
|
||||
dh::LaunchN(size, stream, [=] __device__(std::size_t idx) {
|
||||
out_buffer[idx] = device_buffer[idx];
|
||||
for (auto rank = 1; rank < world_size; rank++) {
|
||||
out_buffer[idx] = func(out_buffer[idx], device_buffer[rank * size + idx]);
|
||||
}
|
||||
});
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
void NcclDeviceCommunicator::BitwiseAllReduce(void *send_receive_buffer, std::size_t count,
|
||||
DataType data_type, Operation op) {
|
||||
auto const world_size = communicator_->GetWorldSize();
|
||||
auto const size = count * GetTypeSize(data_type);
|
||||
dh::caching_device_vector<char> buffer(size * world_size);
|
||||
auto *device_buffer = buffer.data().get();
|
||||
|
||||
// First gather data from all the workers.
|
||||
dh::safe_nccl(ncclAllGather(send_receive_buffer, device_buffer, count, GetNcclDataType(data_type),
|
||||
nccl_comm_, cuda_stream_));
|
||||
|
||||
// Then reduce locally.
|
||||
auto *out_buffer = static_cast<char *>(send_receive_buffer);
|
||||
switch (op) {
|
||||
case Operation::kBitwiseAND:
|
||||
RunBitwiseAllreduce(out_buffer, device_buffer, thrust::bit_and<char>(), world_size, size,
|
||||
cuda_stream_);
|
||||
break;
|
||||
case Operation::kBitwiseOR:
|
||||
RunBitwiseAllreduce(out_buffer, device_buffer, thrust::bit_or<char>(), world_size, size,
|
||||
cuda_stream_);
|
||||
break;
|
||||
case Operation::kBitwiseXOR:
|
||||
RunBitwiseAllreduce(out_buffer, device_buffer, thrust::bit_xor<char>(), world_size, size,
|
||||
cuda_stream_);
|
||||
break;
|
||||
default:
|
||||
LOG(FATAL) << "Not a bitwise reduce operation.";
|
||||
}
|
||||
}
|
||||
|
||||
void NcclDeviceCommunicator::AllReduce(void *send_receive_buffer, std::size_t count,
|
||||
DataType data_type, Operation op) {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
dh::safe_cuda(cudaSetDevice(device_ordinal_));
|
||||
if (IsBitwiseOp(op)) {
|
||||
BitwiseAllReduce(send_receive_buffer, count, data_type, op);
|
||||
} else {
|
||||
dh::safe_nccl(ncclAllReduce(send_receive_buffer, send_receive_buffer, count,
|
||||
GetNcclDataType(data_type), GetNcclRedOp(op), nccl_comm_,
|
||||
cuda_stream_));
|
||||
}
|
||||
allreduce_bytes_ += count * GetTypeSize(data_type);
|
||||
allreduce_calls_ += 1;
|
||||
}
|
||||
|
||||
void NcclDeviceCommunicator::AllGatherV(void const *send_buffer, size_t length_bytes,
|
||||
std::vector<std::size_t> *segments,
|
||||
dh::caching_device_vector<char> *receive_buffer) {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
dh::safe_cuda(cudaSetDevice(device_ordinal_));
|
||||
int const world_size = communicator_->GetWorldSize();
|
||||
int const rank = communicator_->GetRank();
|
||||
|
||||
segments->clear();
|
||||
segments->resize(world_size, 0);
|
||||
segments->at(rank) = length_bytes;
|
||||
communicator_->AllReduce(segments->data(), segments->size(), DataType::kUInt64, Operation::kMax);
|
||||
auto total_bytes = std::accumulate(segments->cbegin(), segments->cend(), 0UL);
|
||||
receive_buffer->resize(total_bytes);
|
||||
|
||||
size_t offset = 0;
|
||||
dh::safe_nccl(ncclGroupStart());
|
||||
for (int32_t i = 0; i < world_size; ++i) {
|
||||
size_t as_bytes = segments->at(i);
|
||||
dh::safe_nccl(ncclBroadcast(send_buffer, receive_buffer->data().get() + offset, as_bytes,
|
||||
ncclChar, i, nccl_comm_, cuda_stream_));
|
||||
offset += as_bytes;
|
||||
}
|
||||
dh::safe_nccl(ncclGroupEnd());
|
||||
}
|
||||
|
||||
void NcclDeviceCommunicator::Synchronize() {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
dh::safe_cuda(cudaSetDevice(device_ordinal_));
|
||||
dh::safe_cuda(cudaStreamSynchronize(cuda_stream_));
|
||||
}
|
||||
|
||||
} // namespace collective
|
||||
} // namespace xgboost
|
||||
#endif
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2022 XGBoost contributors
|
||||
* Copyright 2022-2023 XGBoost contributors
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
@@ -12,116 +12,13 @@ namespace collective {
|
||||
|
||||
class NcclDeviceCommunicator : public DeviceCommunicator {
|
||||
public:
|
||||
NcclDeviceCommunicator(int device_ordinal, Communicator *communicator)
|
||||
: device_ordinal_{device_ordinal}, communicator_{communicator} {
|
||||
if (device_ordinal_ < 0) {
|
||||
LOG(FATAL) << "Invalid device ordinal: " << device_ordinal_;
|
||||
}
|
||||
if (communicator_ == nullptr) {
|
||||
LOG(FATAL) << "Communicator cannot be null.";
|
||||
}
|
||||
|
||||
int32_t const rank = communicator_->GetRank();
|
||||
int32_t const world = communicator_->GetWorldSize();
|
||||
|
||||
if (world == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> uuids(world * kUuidLength, 0);
|
||||
auto s_uuid = xgboost::common::Span<uint64_t>{uuids.data(), uuids.size()};
|
||||
auto s_this_uuid = s_uuid.subspan(rank * kUuidLength, kUuidLength);
|
||||
GetCudaUUID(s_this_uuid);
|
||||
|
||||
// TODO(rongou): replace this with allgather.
|
||||
communicator_->AllReduce(uuids.data(), uuids.size(), DataType::kUInt64, Operation::kSum);
|
||||
|
||||
std::vector<xgboost::common::Span<uint64_t, kUuidLength>> converted(world);
|
||||
size_t j = 0;
|
||||
for (size_t i = 0; i < uuids.size(); i += kUuidLength) {
|
||||
converted[j] = xgboost::common::Span<uint64_t, kUuidLength>{uuids.data() + i, kUuidLength};
|
||||
j++;
|
||||
}
|
||||
|
||||
auto iter = std::unique(converted.begin(), converted.end());
|
||||
auto n_uniques = std::distance(converted.begin(), iter);
|
||||
|
||||
CHECK_EQ(n_uniques, world)
|
||||
<< "Multiple processes within communication group running on same CUDA "
|
||||
<< "device is not supported. " << PrintUUID(s_this_uuid) << "\n";
|
||||
|
||||
nccl_unique_id_ = GetUniqueId();
|
||||
dh::safe_nccl(ncclCommInitRank(&nccl_comm_, world, nccl_unique_id_, rank));
|
||||
dh::safe_cuda(cudaStreamCreate(&cuda_stream_));
|
||||
}
|
||||
|
||||
~NcclDeviceCommunicator() override {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
if (cuda_stream_) {
|
||||
dh::safe_cuda(cudaStreamDestroy(cuda_stream_));
|
||||
}
|
||||
if (nccl_comm_) {
|
||||
dh::safe_nccl(ncclCommDestroy(nccl_comm_));
|
||||
}
|
||||
if (xgboost::ConsoleLogger::ShouldLog(xgboost::ConsoleLogger::LV::kDebug)) {
|
||||
LOG(CONSOLE) << "======== NCCL Statistics========";
|
||||
LOG(CONSOLE) << "AllReduce calls: " << allreduce_calls_;
|
||||
LOG(CONSOLE) << "AllReduce total MiB communicated: " << allreduce_bytes_ / 1048576;
|
||||
}
|
||||
}
|
||||
|
||||
NcclDeviceCommunicator(int device_ordinal, Communicator *communicator);
|
||||
~NcclDeviceCommunicator() override;
|
||||
void AllReduce(void *send_receive_buffer, std::size_t count, DataType data_type,
|
||||
Operation op) override {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
dh::safe_cuda(cudaSetDevice(device_ordinal_));
|
||||
dh::safe_nccl(ncclAllReduce(send_receive_buffer, send_receive_buffer, count,
|
||||
GetNcclDataType(data_type), GetNcclRedOp(op), nccl_comm_,
|
||||
cuda_stream_));
|
||||
allreduce_bytes_ += count * GetTypeSize(data_type);
|
||||
allreduce_calls_ += 1;
|
||||
}
|
||||
|
||||
Operation op) override;
|
||||
void AllGatherV(void const *send_buffer, size_t length_bytes, std::vector<std::size_t> *segments,
|
||||
dh::caching_device_vector<char> *receive_buffer) override {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
dh::safe_cuda(cudaSetDevice(device_ordinal_));
|
||||
int const world_size = communicator_->GetWorldSize();
|
||||
int const rank = communicator_->GetRank();
|
||||
|
||||
segments->clear();
|
||||
segments->resize(world_size, 0);
|
||||
segments->at(rank) = length_bytes;
|
||||
communicator_->AllReduce(segments->data(), segments->size(), DataType::kUInt64,
|
||||
Operation::kMax);
|
||||
auto total_bytes = std::accumulate(segments->cbegin(), segments->cend(), 0UL);
|
||||
receive_buffer->resize(total_bytes);
|
||||
|
||||
size_t offset = 0;
|
||||
dh::safe_nccl(ncclGroupStart());
|
||||
for (int32_t i = 0; i < world_size; ++i) {
|
||||
size_t as_bytes = segments->at(i);
|
||||
dh::safe_nccl(ncclBroadcast(send_buffer, receive_buffer->data().get() + offset, as_bytes,
|
||||
ncclChar, i, nccl_comm_, cuda_stream_));
|
||||
offset += as_bytes;
|
||||
}
|
||||
dh::safe_nccl(ncclGroupEnd());
|
||||
}
|
||||
|
||||
void Synchronize() override {
|
||||
if (communicator_->GetWorldSize() == 1) {
|
||||
return;
|
||||
}
|
||||
dh::safe_cuda(cudaSetDevice(device_ordinal_));
|
||||
dh::safe_cuda(cudaStreamSynchronize(cuda_stream_));
|
||||
}
|
||||
dh::caching_device_vector<char> *receive_buffer) override;
|
||||
void Synchronize() override;
|
||||
|
||||
private:
|
||||
static constexpr std::size_t kUuidLength =
|
||||
@@ -160,60 +57,8 @@ class NcclDeviceCommunicator : public DeviceCommunicator {
|
||||
return id;
|
||||
}
|
||||
|
||||
static ncclDataType_t GetNcclDataType(DataType const &data_type) {
|
||||
ncclDataType_t result;
|
||||
switch (data_type) {
|
||||
case DataType::kInt8:
|
||||
result = ncclInt8;
|
||||
break;
|
||||
case DataType::kUInt8:
|
||||
result = ncclUint8;
|
||||
break;
|
||||
case DataType::kInt32:
|
||||
result = ncclInt32;
|
||||
break;
|
||||
case DataType::kUInt32:
|
||||
result = ncclUint32;
|
||||
break;
|
||||
case DataType::kInt64:
|
||||
result = ncclInt64;
|
||||
break;
|
||||
case DataType::kUInt64:
|
||||
result = ncclUint64;
|
||||
break;
|
||||
case DataType::kFloat:
|
||||
result = ncclFloat;
|
||||
break;
|
||||
case DataType::kDouble:
|
||||
result = ncclDouble;
|
||||
break;
|
||||
default:
|
||||
LOG(FATAL) << "Unknown data type.";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static ncclRedOp_t GetNcclRedOp(Operation const &op) {
|
||||
ncclRedOp_t result;
|
||||
switch (op) {
|
||||
case Operation::kMax:
|
||||
result = ncclMax;
|
||||
break;
|
||||
case Operation::kMin:
|
||||
result = ncclMin;
|
||||
break;
|
||||
case Operation::kSum:
|
||||
result = ncclSum;
|
||||
break;
|
||||
case Operation::kBitwiseAND:
|
||||
case Operation::kBitwiseOR:
|
||||
case Operation::kBitwiseXOR:
|
||||
LOG(FATAL) << "Not implemented yet.";
|
||||
default:
|
||||
LOG(FATAL) << "Unknown reduce operation.";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
void BitwiseAllReduce(void *send_receive_buffer, std::size_t count, DataType data_type,
|
||||
Operation op);
|
||||
|
||||
int const device_ordinal_;
|
||||
Communicator *communicator_;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2019 by Contributors
|
||||
/**
|
||||
* Copyright 2019-2023, XGBoost Contributors
|
||||
* \file bitfield.h
|
||||
*/
|
||||
#ifndef XGBOOST_COMMON_BITFIELD_H_
|
||||
@@ -50,14 +50,17 @@ __forceinline__ __device__ BitFieldAtomicType AtomicAnd(BitFieldAtomicType* addr
|
||||
}
|
||||
#endif // defined(__CUDACC__)
|
||||
|
||||
/*!
|
||||
* \brief A non-owning type with auxiliary methods defined for manipulating bits.
|
||||
/**
|
||||
* @brief A non-owning type with auxiliary methods defined for manipulating bits.
|
||||
*
|
||||
* \tparam Direction Whether the bits start from left or from right.
|
||||
* @tparam VT Underlying value type, must be an unsigned integer.
|
||||
* @tparam Direction Whether the bits start from left or from right.
|
||||
* @tparam IsConst Whether the view is const.
|
||||
*/
|
||||
template <typename VT, typename Direction, bool IsConst = false>
|
||||
struct BitFieldContainer {
|
||||
using value_type = std::conditional_t<IsConst, VT const, VT>; // NOLINT
|
||||
using size_type = size_t; // NOLINT
|
||||
using index_type = size_t; // NOLINT
|
||||
using pointer = value_type*; // NOLINT
|
||||
|
||||
@@ -70,8 +73,9 @@ struct BitFieldContainer {
|
||||
};
|
||||
|
||||
private:
|
||||
common::Span<value_type> bits_;
|
||||
static_assert(!std::is_signed<VT>::value, "Must use unsiged type as underlying storage.");
|
||||
value_type* bits_{nullptr};
|
||||
size_type n_values_{0};
|
||||
static_assert(!std::is_signed<VT>::value, "Must use an unsiged type as the underlying storage.");
|
||||
|
||||
public:
|
||||
XGBOOST_DEVICE static Pos ToBitPos(index_type pos) {
|
||||
@@ -86,13 +90,15 @@ struct BitFieldContainer {
|
||||
|
||||
public:
|
||||
BitFieldContainer() = default;
|
||||
XGBOOST_DEVICE explicit BitFieldContainer(common::Span<value_type> bits) : bits_{bits} {}
|
||||
XGBOOST_DEVICE BitFieldContainer(BitFieldContainer const& other) : bits_{other.bits_} {}
|
||||
XGBOOST_DEVICE explicit BitFieldContainer(common::Span<value_type> bits)
|
||||
: bits_{bits.data()}, n_values_{bits.size()} {}
|
||||
BitFieldContainer(BitFieldContainer const& other) = default;
|
||||
BitFieldContainer(BitFieldContainer&& other) = default;
|
||||
BitFieldContainer &operator=(BitFieldContainer const &that) = default;
|
||||
BitFieldContainer &operator=(BitFieldContainer &&that) = default;
|
||||
|
||||
XGBOOST_DEVICE common::Span<value_type> Bits() { return bits_; }
|
||||
XGBOOST_DEVICE common::Span<value_type const> Bits() const { return bits_; }
|
||||
XGBOOST_DEVICE auto Bits() { return common::Span<value_type>{bits_, NumValues()}; }
|
||||
XGBOOST_DEVICE auto Bits() const { return common::Span<value_type const>{bits_, NumValues()}; }
|
||||
|
||||
/*\brief Compute the size of needed memory allocation. The returned value is in terms
|
||||
* of number of elements with `BitFieldContainer::value_type'.
|
||||
@@ -103,17 +109,17 @@ struct BitFieldContainer {
|
||||
#if defined(__CUDA_ARCH__)
|
||||
__device__ BitFieldContainer& operator|=(BitFieldContainer const& rhs) {
|
||||
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
size_t min_size = min(bits_.size(), rhs.bits_.size());
|
||||
size_t min_size = min(NumValues(), rhs.NumValues());
|
||||
if (tid < min_size) {
|
||||
bits_[tid] |= rhs.bits_[tid];
|
||||
Data()[tid] |= rhs.Data()[tid];
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
#else
|
||||
BitFieldContainer& operator|=(BitFieldContainer const& rhs) {
|
||||
size_t min_size = std::min(bits_.size(), rhs.bits_.size());
|
||||
size_t min_size = std::min(NumValues(), rhs.NumValues());
|
||||
for (size_t i = 0; i < min_size; ++i) {
|
||||
bits_[i] |= rhs.bits_[i];
|
||||
Data()[i] |= rhs.Data()[i];
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -121,75 +127,85 @@ struct BitFieldContainer {
|
||||
|
||||
#if defined(__CUDA_ARCH__)
|
||||
__device__ BitFieldContainer& operator&=(BitFieldContainer const& rhs) {
|
||||
size_t min_size = min(bits_.size(), rhs.bits_.size());
|
||||
size_t min_size = min(NumValues(), rhs.NumValues());
|
||||
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
if (tid < min_size) {
|
||||
bits_[tid] &= rhs.bits_[tid];
|
||||
Data()[tid] &= rhs.Data()[tid];
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
#else
|
||||
BitFieldContainer& operator&=(BitFieldContainer const& rhs) {
|
||||
size_t min_size = std::min(bits_.size(), rhs.bits_.size());
|
||||
size_t min_size = std::min(NumValues(), rhs.NumValues());
|
||||
for (size_t i = 0; i < min_size; ++i) {
|
||||
bits_[i] &= rhs.bits_[i];
|
||||
Data()[i] &= rhs.Data()[i];
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
#endif // defined(__CUDA_ARCH__)
|
||||
|
||||
#if defined(__CUDA_ARCH__)
|
||||
__device__ auto Set(index_type pos) {
|
||||
__device__ auto Set(index_type pos) noexcept(true) {
|
||||
Pos pos_v = Direction::Shift(ToBitPos(pos));
|
||||
value_type& value = bits_[pos_v.int_pos];
|
||||
value_type& value = Data()[pos_v.int_pos];
|
||||
value_type set_bit = kOne << pos_v.bit_pos;
|
||||
using Type = typename dh::detail::AtomicDispatcher<sizeof(value_type)>::Type;
|
||||
atomicOr(reinterpret_cast<Type *>(&value), set_bit);
|
||||
}
|
||||
__device__ void Clear(index_type pos) {
|
||||
__device__ void Clear(index_type pos) noexcept(true) {
|
||||
Pos pos_v = Direction::Shift(ToBitPos(pos));
|
||||
value_type& value = bits_[pos_v.int_pos];
|
||||
value_type& value = Data()[pos_v.int_pos];
|
||||
value_type clear_bit = ~(kOne << pos_v.bit_pos);
|
||||
using Type = typename dh::detail::AtomicDispatcher<sizeof(value_type)>::Type;
|
||||
atomicAnd(reinterpret_cast<Type *>(&value), clear_bit);
|
||||
}
|
||||
#else
|
||||
void Set(index_type pos) {
|
||||
void Set(index_type pos) noexcept(true) {
|
||||
Pos pos_v = Direction::Shift(ToBitPos(pos));
|
||||
value_type& value = bits_[pos_v.int_pos];
|
||||
value_type& value = Data()[pos_v.int_pos];
|
||||
value_type set_bit = kOne << pos_v.bit_pos;
|
||||
value |= set_bit;
|
||||
}
|
||||
void Clear(index_type pos) {
|
||||
void Clear(index_type pos) noexcept(true) {
|
||||
Pos pos_v = Direction::Shift(ToBitPos(pos));
|
||||
value_type& value = bits_[pos_v.int_pos];
|
||||
value_type& value = Data()[pos_v.int_pos];
|
||||
value_type clear_bit = ~(kOne << pos_v.bit_pos);
|
||||
value &= clear_bit;
|
||||
}
|
||||
#endif // defined(__CUDA_ARCH__)
|
||||
|
||||
XGBOOST_DEVICE bool Check(Pos pos_v) const {
|
||||
XGBOOST_DEVICE bool Check(Pos pos_v) const noexcept(true) {
|
||||
pos_v = Direction::Shift(pos_v);
|
||||
SPAN_LT(pos_v.int_pos, bits_.size());
|
||||
value_type const value = bits_[pos_v.int_pos];
|
||||
assert(pos_v.int_pos < NumValues());
|
||||
value_type const value = Data()[pos_v.int_pos];
|
||||
value_type const test_bit = kOne << pos_v.bit_pos;
|
||||
value_type result = test_bit & value;
|
||||
return static_cast<bool>(result);
|
||||
}
|
||||
XGBOOST_DEVICE bool Check(index_type pos) const {
|
||||
[[nodiscard]] XGBOOST_DEVICE bool Check(index_type pos) const noexcept(true) {
|
||||
Pos pos_v = ToBitPos(pos);
|
||||
return Check(pos_v);
|
||||
}
|
||||
/**
|
||||
* @brief Returns the total number of bits that can be viewed. This is equal to or
|
||||
* larger than the acutal number of valid bits.
|
||||
*/
|
||||
[[nodiscard]] XGBOOST_DEVICE size_type Capacity() const noexcept(true) {
|
||||
return kValueSize * NumValues();
|
||||
}
|
||||
/**
|
||||
* @brief Number of storage unit used in this bit field.
|
||||
*/
|
||||
[[nodiscard]] XGBOOST_DEVICE size_type NumValues() const noexcept(true) { return n_values_; }
|
||||
|
||||
XGBOOST_DEVICE size_t Size() const { return kValueSize * bits_.size(); }
|
||||
XGBOOST_DEVICE pointer Data() const noexcept(true) { return bits_; }
|
||||
|
||||
XGBOOST_DEVICE pointer Data() const { return bits_.data(); }
|
||||
|
||||
inline friend std::ostream &
|
||||
operator<<(std::ostream &os, BitFieldContainer<VT, Direction, IsConst> field) {
|
||||
os << "Bits " << "storage size: " << field.bits_.size() << "\n";
|
||||
for (typename common::Span<value_type>::index_type i = 0; i < field.bits_.size(); ++i) {
|
||||
std::bitset<BitFieldContainer<VT, Direction, IsConst>::kValueSize> bset(field.bits_[i]);
|
||||
inline friend std::ostream& operator<<(std::ostream& os,
|
||||
BitFieldContainer<VT, Direction, IsConst> field) {
|
||||
os << "Bits "
|
||||
<< "storage size: " << field.NumValues() << "\n";
|
||||
for (typename common::Span<value_type>::index_type i = 0; i < field.NumValues(); ++i) {
|
||||
std::bitset<BitFieldContainer<VT, Direction, IsConst>::kValueSize> bset(field.Data()[i]);
|
||||
os << bset << "\n";
|
||||
}
|
||||
return os;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2020-2022 by XGBoost Contributors
|
||||
/**
|
||||
* Copyright 2020-2023, XGBoost Contributors
|
||||
* \file categorical.h
|
||||
*/
|
||||
#ifndef XGBOOST_COMMON_CATEGORICAL_H_
|
||||
@@ -10,7 +10,6 @@
|
||||
#include "bitfield.h"
|
||||
#include "xgboost/base.h"
|
||||
#include "xgboost/data.h"
|
||||
#include "xgboost/parameter.h"
|
||||
#include "xgboost/span.h"
|
||||
|
||||
namespace xgboost {
|
||||
|
||||
@@ -84,7 +84,7 @@ class HistogramCuts {
|
||||
return *this;
|
||||
}
|
||||
|
||||
uint32_t FeatureBins(bst_feature_t feature) const {
|
||||
[[nodiscard]] bst_bin_t FeatureBins(bst_feature_t feature) const {
|
||||
return cut_ptrs_.ConstHostVector().at(feature + 1) - cut_ptrs_.ConstHostVector()[feature];
|
||||
}
|
||||
|
||||
@@ -92,8 +92,8 @@ class HistogramCuts {
|
||||
std::vector<float> const& Values() const { return cut_values_.ConstHostVector(); }
|
||||
std::vector<float> const& MinValues() const { return min_vals_.ConstHostVector(); }
|
||||
|
||||
bool HasCategorical() const { return has_categorical_; }
|
||||
float MaxCategory() const { return max_cat_; }
|
||||
[[nodiscard]] bool HasCategorical() const { return has_categorical_; }
|
||||
[[nodiscard]] float MaxCategory() const { return max_cat_; }
|
||||
/**
|
||||
* \brief Set meta info about categorical features.
|
||||
*
|
||||
@@ -105,12 +105,13 @@ class HistogramCuts {
|
||||
max_cat_ = max_cat;
|
||||
}
|
||||
|
||||
size_t TotalBins() const { return cut_ptrs_.ConstHostVector().back(); }
|
||||
[[nodiscard]] bst_bin_t TotalBins() const { return cut_ptrs_.ConstHostVector().back(); }
|
||||
|
||||
// Return the index of a cut point that is strictly greater than the input
|
||||
// value, or the last available index if none exists
|
||||
bst_bin_t SearchBin(float value, bst_feature_t column_id, std::vector<uint32_t> const& ptrs,
|
||||
std::vector<float> const& values) const {
|
||||
[[nodiscard]] bst_bin_t SearchBin(float value, bst_feature_t column_id,
|
||||
std::vector<uint32_t> const& ptrs,
|
||||
std::vector<float> const& values) const {
|
||||
auto end = ptrs[column_id + 1];
|
||||
auto beg = ptrs[column_id];
|
||||
auto it = std::upper_bound(values.cbegin() + beg, values.cbegin() + end, value);
|
||||
@@ -119,20 +120,20 @@ class HistogramCuts {
|
||||
return idx;
|
||||
}
|
||||
|
||||
bst_bin_t SearchBin(float value, bst_feature_t column_id) const {
|
||||
[[nodiscard]] bst_bin_t SearchBin(float value, bst_feature_t column_id) const {
|
||||
return this->SearchBin(value, column_id, Ptrs(), Values());
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Search the bin index for numerical feature.
|
||||
*/
|
||||
bst_bin_t SearchBin(Entry const& e) const { return SearchBin(e.fvalue, e.index); }
|
||||
[[nodiscard]] bst_bin_t SearchBin(Entry const& e) const { return SearchBin(e.fvalue, e.index); }
|
||||
|
||||
/**
|
||||
* \brief Search the bin index for categorical feature.
|
||||
*/
|
||||
bst_bin_t SearchCatBin(float value, bst_feature_t fidx, std::vector<uint32_t> const& ptrs,
|
||||
std::vector<float> const& vals) const {
|
||||
[[nodiscard]] bst_bin_t SearchCatBin(float value, bst_feature_t fidx,
|
||||
std::vector<uint32_t> const& ptrs,
|
||||
std::vector<float> const& vals) const {
|
||||
auto end = ptrs.at(fidx + 1) + vals.cbegin();
|
||||
auto beg = ptrs[fidx] + vals.cbegin();
|
||||
// Truncates the value in case it's not perfectly rounded.
|
||||
@@ -143,12 +144,14 @@ class HistogramCuts {
|
||||
}
|
||||
return bin_idx;
|
||||
}
|
||||
bst_bin_t SearchCatBin(float value, bst_feature_t fidx) const {
|
||||
[[nodiscard]] bst_bin_t SearchCatBin(float value, bst_feature_t fidx) const {
|
||||
auto const& ptrs = this->Ptrs();
|
||||
auto const& vals = this->Values();
|
||||
return this->SearchCatBin(value, fidx, ptrs, vals);
|
||||
}
|
||||
bst_bin_t SearchCatBin(Entry const& e) const { return SearchCatBin(e.fvalue, e.index); }
|
||||
[[nodiscard]] bst_bin_t SearchCatBin(Entry const& e) const {
|
||||
return SearchCatBin(e.fvalue, e.index);
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Return numerical bin value given bin index.
|
||||
|
||||
176
src/common/io.cc
176
src/common/io.cc
@@ -1,24 +1,47 @@
|
||||
/*!
|
||||
* Copyright (c) by XGBoost Contributors 2019-2022
|
||||
/**
|
||||
* Copyright 2019-2023, by XGBoost Contributors
|
||||
*/
|
||||
#if defined(__unix__)
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#if !defined(NOMINMAX) && defined(_WIN32)
|
||||
#define NOMINMAX
|
||||
#endif // !defined(NOMINMAX)
|
||||
|
||||
#if !defined(xgboost_IS_WIN)
|
||||
|
||||
#if defined(_MSC_VER) || defined(__MINGW32__)
|
||||
#define xgboost_IS_WIN 1
|
||||
#endif // defined(_MSC_VER) || defined(__MINGW32__)
|
||||
|
||||
#endif // !defined(xgboost_IS_WIN)
|
||||
|
||||
#if defined(__unix__) || defined(__APPLE__)
|
||||
#include <fcntl.h> // for open, O_RDONLY
|
||||
#include <sys/mman.h> // for mmap, mmap64, munmap
|
||||
#include <unistd.h> // for close, getpagesize
|
||||
#elif defined(xgboost_IS_WIN)
|
||||
#define WIN32_LEAN_AND_MEAN
|
||||
#include <windows.h>
|
||||
#endif // defined(__unix__)
|
||||
#include <algorithm>
|
||||
#include <fstream>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <cstdio>
|
||||
|
||||
#include "xgboost/logging.h"
|
||||
#include <algorithm> // for copy, transform
|
||||
#include <cctype> // for tolower
|
||||
#include <cerrno> // for errno
|
||||
#include <cstddef> // for size_t
|
||||
#include <cstdint> // for int32_t, uint32_t
|
||||
#include <cstring> // for memcpy
|
||||
#include <fstream> // for ifstream
|
||||
#include <iterator> // for distance
|
||||
#include <limits> // for numeric_limits
|
||||
#include <memory> // for unique_ptr
|
||||
#include <string> // for string
|
||||
#include <system_error> // for error_code, system_category
|
||||
#include <utility> // for move
|
||||
#include <vector> // for vector
|
||||
|
||||
#include "io.h"
|
||||
#include "xgboost/collective/socket.h" // for LastError
|
||||
#include "xgboost/logging.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace common {
|
||||
|
||||
namespace xgboost::common {
|
||||
size_t PeekableInStream::Read(void* dptr, size_t size) {
|
||||
size_t nbuffer = buffer_.length() - buffer_ptr_;
|
||||
if (nbuffer == 0) return strm_->Read(dptr, size);
|
||||
@@ -94,11 +117,32 @@ void FixedSizeStream::Take(std::string* out) {
|
||||
*out = std::move(buffer_);
|
||||
}
|
||||
|
||||
namespace {
|
||||
// Get system alignment value for IO with mmap.
|
||||
std::size_t GetMmapAlignment() {
|
||||
#if defined(xgboost_IS_WIN)
|
||||
SYSTEM_INFO sys_info;
|
||||
GetSystemInfo(&sys_info);
|
||||
// During testing, `sys_info.dwPageSize` is of size 4096 while `dwAllocationGranularity` is of
|
||||
// size 65536.
|
||||
return sys_info.dwAllocationGranularity;
|
||||
#else
|
||||
return getpagesize();
|
||||
#endif
|
||||
}
|
||||
|
||||
auto SystemErrorMsg() {
|
||||
std::int32_t errsv = system::LastError();
|
||||
auto err = std::error_code{errsv, std::system_category()};
|
||||
return err.message();
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
std::string LoadSequentialFile(std::string uri, bool stream) {
|
||||
auto OpenErr = [&uri]() {
|
||||
std::string msg;
|
||||
msg = "Opening " + uri + " failed: ";
|
||||
msg += strerror(errno);
|
||||
msg += SystemErrorMsg();
|
||||
LOG(FATAL) << msg;
|
||||
};
|
||||
|
||||
@@ -155,5 +199,99 @@ std::string FileExtension(std::string fname, bool lower) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
} // namespace common
|
||||
} // namespace xgboost
|
||||
|
||||
struct PrivateMmapConstStream::MMAPFile {
|
||||
#if defined(xgboost_IS_WIN)
|
||||
HANDLE fd{INVALID_HANDLE_VALUE};
|
||||
HANDLE file_map{INVALID_HANDLE_VALUE};
|
||||
#else
|
||||
std::int32_t fd{0};
|
||||
#endif
|
||||
char* base_ptr{nullptr};
|
||||
std::size_t base_size{0};
|
||||
std::string path;
|
||||
};
|
||||
|
||||
char* PrivateMmapConstStream::Open(std::string path, std::size_t offset, std::size_t length) {
|
||||
if (length == 0) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
#if defined(xgboost_IS_WIN)
|
||||
HANDLE fd = CreateFile(path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING,
|
||||
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, nullptr);
|
||||
CHECK_NE(fd, INVALID_HANDLE_VALUE) << "Failed to open:" << path << ". " << SystemErrorMsg();
|
||||
#else
|
||||
auto fd = open(path.c_str(), O_RDONLY);
|
||||
CHECK_GE(fd, 0) << "Failed to open:" << path << ". " << SystemErrorMsg();
|
||||
#endif
|
||||
|
||||
char* ptr{nullptr};
|
||||
// Round down for alignment.
|
||||
auto view_start = offset / GetMmapAlignment() * GetMmapAlignment();
|
||||
auto view_size = length + (offset - view_start);
|
||||
|
||||
#if defined(__linux__) || defined(__GLIBC__)
|
||||
int prot{PROT_READ};
|
||||
ptr = reinterpret_cast<char*>(mmap64(nullptr, view_size, prot, MAP_PRIVATE, fd, view_start));
|
||||
CHECK_NE(ptr, MAP_FAILED) << "Failed to map: " << path << ". " << SystemErrorMsg();
|
||||
handle_.reset(new MMAPFile{fd, ptr, view_size, std::move(path)});
|
||||
#elif defined(xgboost_IS_WIN)
|
||||
auto file_size = GetFileSize(fd, nullptr);
|
||||
DWORD access = PAGE_READONLY;
|
||||
auto map_file = CreateFileMapping(fd, nullptr, access, 0, file_size, nullptr);
|
||||
access = FILE_MAP_READ;
|
||||
std::uint32_t loff = static_cast<std::uint32_t>(view_start);
|
||||
std::uint32_t hoff = view_start >> 32;
|
||||
CHECK(map_file) << "Failed to map: " << path << ". " << SystemErrorMsg();
|
||||
ptr = reinterpret_cast<char*>(MapViewOfFile(map_file, access, hoff, loff, view_size));
|
||||
CHECK_NE(ptr, nullptr) << "Failed to map: " << path << ". " << SystemErrorMsg();
|
||||
handle_.reset(new MMAPFile{fd, map_file, ptr, view_size, std::move(path)});
|
||||
#else
|
||||
CHECK_LE(offset, std::numeric_limits<off_t>::max())
|
||||
<< "File size has exceeded the limit on the current system.";
|
||||
int prot{PROT_READ};
|
||||
ptr = reinterpret_cast<char*>(mmap(nullptr, view_size, prot, MAP_PRIVATE, fd, view_start));
|
||||
CHECK_NE(ptr, MAP_FAILED) << "Failed to map: " << path << ". " << SystemErrorMsg();
|
||||
handle_.reset(new MMAPFile{fd, ptr, view_size, std::move(path)});
|
||||
#endif // defined(__linux__)
|
||||
|
||||
ptr += (offset - view_start);
|
||||
return ptr;
|
||||
}
|
||||
|
||||
PrivateMmapConstStream::PrivateMmapConstStream(std::string path, std::size_t offset,
|
||||
std::size_t length)
|
||||
: MemoryFixSizeBuffer{}, handle_{nullptr} {
|
||||
this->p_buffer_ = Open(std::move(path), offset, length);
|
||||
this->buffer_size_ = length;
|
||||
}
|
||||
|
||||
PrivateMmapConstStream::~PrivateMmapConstStream() {
|
||||
CHECK(handle_);
|
||||
#if defined(xgboost_IS_WIN)
|
||||
if (p_buffer_) {
|
||||
CHECK(UnmapViewOfFile(handle_->base_ptr)) "Faled to call munmap: " << SystemErrorMsg();
|
||||
}
|
||||
if (handle_->fd != INVALID_HANDLE_VALUE) {
|
||||
CHECK(CloseHandle(handle_->fd)) << "Failed to close handle: " << SystemErrorMsg();
|
||||
}
|
||||
if (handle_->file_map != INVALID_HANDLE_VALUE) {
|
||||
CHECK(CloseHandle(handle_->file_map)) << "Failed to close mapping object: " << SystemErrorMsg();
|
||||
}
|
||||
#else
|
||||
if (handle_->base_ptr) {
|
||||
CHECK_NE(munmap(handle_->base_ptr, handle_->base_size), -1)
|
||||
<< "Faled to call munmap: " << handle_->path << ". " << SystemErrorMsg();
|
||||
}
|
||||
if (handle_->fd != 0) {
|
||||
CHECK_NE(close(handle_->fd), -1)
|
||||
<< "Faled to close: " << handle_->path << ". " << SystemErrorMsg();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
} // namespace xgboost::common
|
||||
|
||||
#if defined(xgboost_IS_WIN)
|
||||
#undef xgboost_IS_WIN
|
||||
#endif // defined(xgboost_IS_WIN)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright by XGBoost Contributors 2014-2022
|
||||
/**
|
||||
* Copyright 2014-2023, XGBoost Contributors
|
||||
* \file io.h
|
||||
* \brief general stream interface for serialization, I/O
|
||||
* \author Tianqi Chen
|
||||
@@ -10,9 +10,11 @@
|
||||
|
||||
#include <dmlc/io.h>
|
||||
#include <rabit/rabit.h>
|
||||
#include <string>
|
||||
|
||||
#include <cstring>
|
||||
#include <fstream>
|
||||
#include <memory> // for unique_ptr
|
||||
#include <string> // for string
|
||||
|
||||
#include "common.h"
|
||||
|
||||
@@ -127,6 +129,31 @@ inline std::string ReadAll(std::string const &path) {
|
||||
return content;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Private mmap file as a read-only stream.
|
||||
*
|
||||
* It can calculate alignment automatically based on system page size (or allocation
|
||||
* granularity on Windows).
|
||||
*/
|
||||
class PrivateMmapConstStream : public MemoryFixSizeBuffer {
|
||||
struct MMAPFile;
|
||||
std::unique_ptr<MMAPFile> handle_;
|
||||
|
||||
char* Open(std::string path, std::size_t offset, std::size_t length);
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a private mmap stream.
|
||||
*
|
||||
* @param path File path.
|
||||
* @param offset See the `offset` parameter of `mmap` for details.
|
||||
* @param length See the `length` parameter of `mmap` for details.
|
||||
*/
|
||||
explicit PrivateMmapConstStream(std::string path, std::size_t offset, std::size_t length);
|
||||
void Write(void const*, std::size_t) override { LOG(FATAL) << "Read-only stream."; }
|
||||
|
||||
~PrivateMmapConstStream() override;
|
||||
};
|
||||
} // namespace common
|
||||
} // namespace xgboost
|
||||
#endif // XGBOOST_COMMON_IO_H_
|
||||
|
||||
@@ -590,7 +590,7 @@ class ArrayInterface {
|
||||
template <std::int32_t D, typename Fn>
|
||||
void DispatchDType(ArrayInterface<D> const array, std::int32_t device, Fn fn) {
|
||||
// Only used for cuDF at the moment.
|
||||
CHECK_EQ(array.valid.Size(), 0);
|
||||
CHECK_EQ(array.valid.Capacity(), 0);
|
||||
auto dispatch = [&](auto t) {
|
||||
using T = std::remove_const_t<decltype(t)> const;
|
||||
// Set the data size to max as we don't know the original size of a sliced array:
|
||||
|
||||
@@ -416,7 +416,8 @@ void CopyTensorInfoImpl(Context const& ctx, Json arr_interface, linalg::Tensor<T
|
||||
p_out->Reshape(array.shape);
|
||||
return;
|
||||
}
|
||||
CHECK(array.valid.Size() == 0) << "Meta info like label or weight can not have missing value.";
|
||||
CHECK_EQ(array.valid.Capacity(), 0)
|
||||
<< "Meta info like label or weight can not have missing value.";
|
||||
if (array.is_contiguous && array.type == ToDType<T>::kType) {
|
||||
// Handle contigious
|
||||
p_out->ModifyInplace([&](HostDeviceVector<T>* data, common::Span<size_t, D> shape) {
|
||||
|
||||
@@ -33,7 +33,8 @@ void CopyTensorInfoImpl(CUDAContext const* ctx, Json arr_interface, linalg::Tens
|
||||
p_out->Reshape(array.shape);
|
||||
return;
|
||||
}
|
||||
CHECK(array.valid.Size() == 0) << "Meta info like label or weight can not have missing value.";
|
||||
CHECK_EQ(array.valid.Capacity(), 0)
|
||||
<< "Meta info like label or weight can not have missing value.";
|
||||
auto ptr_device = SetDeviceToPtr(array.data);
|
||||
p_out->SetDevice(ptr_device);
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#include <thrust/iterator/transform_output_iterator.h>
|
||||
|
||||
#include "../common/categorical.h"
|
||||
#include "../common/cuda_context.cuh"
|
||||
#include "../common/hist_util.cuh"
|
||||
#include "../common/random.h"
|
||||
#include "../common/transform_iterator.h" // MakeIndexTransformIter
|
||||
@@ -313,7 +314,8 @@ void CopyGHistToEllpack(GHistIndexMatrix const& page, common::Span<size_t const>
|
||||
auto d_csc_indptr = dh::ToSpan(csc_indptr);
|
||||
|
||||
auto bin_type = page.index.GetBinTypeSize();
|
||||
common::CompressedBufferWriter writer{page.cut.TotalBins() + 1}; // +1 for null value
|
||||
common::CompressedBufferWriter writer{page.cut.TotalBins() +
|
||||
static_cast<std::size_t>(1)}; // +1 for null value
|
||||
|
||||
dh::LaunchN(row_stride * page.Size(), [=] __device__(size_t idx) mutable {
|
||||
auto ridx = idx / row_stride;
|
||||
@@ -357,8 +359,10 @@ EllpackPageImpl::EllpackPageImpl(Context const* ctx, GHistIndexMatrix const& pag
|
||||
|
||||
// copy gidx
|
||||
common::CompressedByteT* d_compressed_buffer = gidx_buffer.DevicePointer();
|
||||
dh::device_vector<size_t> row_ptr(page.row_ptr);
|
||||
dh::device_vector<size_t> row_ptr(page.row_ptr.size());
|
||||
auto d_row_ptr = dh::ToSpan(row_ptr);
|
||||
dh::safe_cuda(cudaMemcpyAsync(d_row_ptr.data(), page.row_ptr.data(), d_row_ptr.size_bytes(),
|
||||
cudaMemcpyHostToDevice, ctx->CUDACtx()->Stream()));
|
||||
|
||||
auto accessor = this->GetDeviceAccessor(ctx->gpu_id, ft);
|
||||
auto null = accessor.NullValue();
|
||||
|
||||
@@ -7,9 +7,6 @@
|
||||
#ifndef XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_
|
||||
#define XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_
|
||||
|
||||
#include <xgboost/data.h>
|
||||
#include <xgboost/logging.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
@@ -20,35 +17,33 @@
|
||||
#include "ellpack_page_source.h"
|
||||
#include "gradient_index_page_source.h"
|
||||
#include "sparse_page_source.h"
|
||||
#include "xgboost/data.h"
|
||||
#include "xgboost/logging.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
namespace xgboost::data {
|
||||
/**
|
||||
* \brief DMatrix used for external memory.
|
||||
*
|
||||
* The external memory is created for controlling memory usage by splitting up data into
|
||||
* multiple batches. However that doesn't mean we will actually process exact 1 batch at
|
||||
* a time, which would be terribly slow considering that we have to loop through the
|
||||
* whole dataset for every tree split. So we use async pre-fetch and let caller to decide
|
||||
* how many batches it wants to process by returning data as shared pointer. The caller
|
||||
* can use async function to process the data or just stage those batches, making the
|
||||
* decision is out of the scope for sparse page dmatrix. These 2 optimizations might
|
||||
* defeat the purpose of splitting up dataset since if you load all the batches then the
|
||||
* memory usage is even worse than using a single batch. Essentially we need to control
|
||||
* how many batches can be in memory at the same time.
|
||||
* multiple batches. However that doesn't mean we will actually process exactly 1 batch
|
||||
* at a time, which would be terribly slow considering that we have to loop through the
|
||||
* whole dataset for every tree split. So we use async to pre-fetch pages and let the
|
||||
* caller to decide how many batches it wants to process by returning data as a shared
|
||||
* pointer. The caller can use async function to process the data or just stage those
|
||||
* batches based on its use cases. These two optimizations might defeat the purpose of
|
||||
* splitting up dataset since if you stage all the batches then the memory usage might be
|
||||
* even worse than using a single batch. As a result, we must control how many batches can
|
||||
* be in memory at any given time.
|
||||
*
|
||||
* Right now the write to the cache is sequential operation and is blocking, reading from
|
||||
* cache is async but with a hard coded limit of 4 pages as an heuristic. So by sparse
|
||||
* dmatrix itself there can be only 9 pages in main memory (might be of different types)
|
||||
* at the same time: 1 page pending for write, 4 pre-fetched sparse pages, 4 pre-fetched
|
||||
* dependent pages. If the caller stops iteration at the middle and start again, then the
|
||||
* number of pages in memory can hit 16 due to pre-fetching, but this should be a bug in
|
||||
* caller's code (XGBoost doesn't discard a large portion of data at the end, there's not
|
||||
* sampling algo that samples only the first portion of data).
|
||||
* Right now the write to the cache is a sequential operation and is blocking. Reading
|
||||
* from cache on ther other hand, is async but with a hard coded limit of 3 pages as an
|
||||
* heuristic. So by sparse dmatrix itself there can be only 7 pages in main memory (might
|
||||
* be of different types) at the same time: 1 page pending for write, 3 pre-fetched sparse
|
||||
* pages, 3 pre-fetched dependent pages.
|
||||
*
|
||||
* Of course if the caller decides to retain some batches to perform parallel processing,
|
||||
* then we might load all pages in memory, which is also considered as a bug in caller's
|
||||
* code. So if the algo supports external memory, it must be careful that queue for async
|
||||
* code. So if the algo supports external memory, it must be careful that queue for async
|
||||
* call must have an upper limit.
|
||||
*
|
||||
* Another assumption we make is that the data must be immutable so caller should never
|
||||
@@ -101,7 +96,7 @@ class SparsePageDMatrix : public DMatrix {
|
||||
MetaInfo &Info() override;
|
||||
const MetaInfo &Info() const override;
|
||||
Context const *Ctx() const override { return &fmat_ctx_; }
|
||||
|
||||
// The only DMatrix implementation that returns false.
|
||||
bool SingleColBlock() const override { return false; }
|
||||
DMatrix *Slice(common::Span<int32_t const>) override {
|
||||
LOG(FATAL) << "Slicing DMatrix is not supported for external memory.";
|
||||
@@ -153,6 +148,5 @@ inline std::string MakeCache(SparsePageDMatrix *ptr, std::string format, std::st
|
||||
}
|
||||
return id;
|
||||
}
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
} // namespace xgboost::data
|
||||
#endif // XGBOOST_DATA_SPARSE_PAGE_DMATRIX_H_
|
||||
|
||||
@@ -1,45 +1,48 @@
|
||||
/*!
|
||||
* Copyright 2014-2022 by XGBoost Contributors
|
||||
/**
|
||||
* Copyright 2014-2023, XGBoost Contributors
|
||||
* \file sparse_page_source.h
|
||||
*/
|
||||
#ifndef XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
|
||||
#define XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
|
||||
|
||||
#include <algorithm> // std::min
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <future>
|
||||
#include <thread>
|
||||
#include <algorithm> // for min
|
||||
#include <future> // for async
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility> // for pair, move
|
||||
#include <vector>
|
||||
|
||||
#include "../common/common.h"
|
||||
#include "../common/io.h" // for PrivateMmapConstStream
|
||||
#include "../common/timer.h" // for Monitor, Timer
|
||||
#include "adapter.h"
|
||||
#include "dmlc/common.h" // for OMPException
|
||||
#include "proxy_dmatrix.h" // for DMatrixProxy
|
||||
#include "sparse_page_writer.h" // for SparsePageFormat
|
||||
#include "xgboost/base.h"
|
||||
#include "xgboost/data.h"
|
||||
|
||||
#include "adapter.h"
|
||||
#include "sparse_page_writer.h"
|
||||
#include "proxy_dmatrix.h"
|
||||
|
||||
#include "../common/common.h"
|
||||
#include "../common/timer.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
namespace xgboost::data {
|
||||
inline void TryDeleteCacheFile(const std::string& file) {
|
||||
if (std::remove(file.c_str()) != 0) {
|
||||
// Don't throw, this is called in a destructor.
|
||||
LOG(WARNING) << "Couldn't remove external memory cache file " << file
|
||||
<< "; you may want to remove it manually";
|
||||
<< "; you may want to remove it manually";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Information about the cache including path and page offsets.
|
||||
*/
|
||||
struct Cache {
|
||||
// whether the write to the cache is complete
|
||||
bool written;
|
||||
std::string name;
|
||||
std::string format;
|
||||
// offset into binary cache file.
|
||||
std::vector<size_t> offset;
|
||||
std::vector<std::uint64_t> offset;
|
||||
|
||||
Cache(bool w, std::string n, std::string fmt)
|
||||
: written{w}, name{std::move(n)}, format{std::move(fmt)} {
|
||||
@@ -51,11 +54,24 @@ struct Cache {
|
||||
return name + format;
|
||||
}
|
||||
|
||||
std::string ShardName() {
|
||||
[[nodiscard]] std::string ShardName() const {
|
||||
return ShardName(this->name, this->format);
|
||||
}
|
||||
|
||||
// The write is completed.
|
||||
/**
|
||||
* @brief Record a page with size of n_bytes.
|
||||
*/
|
||||
void Push(std::size_t n_bytes) { offset.push_back(n_bytes); }
|
||||
/**
|
||||
* @brief Returns the view start and length for the i^th page.
|
||||
*/
|
||||
[[nodiscard]] auto View(std::size_t i) const {
|
||||
std::uint64_t off = offset.at(i);
|
||||
std::uint64_t len = offset.at(i + 1) - offset[i];
|
||||
return std::pair{off, len};
|
||||
}
|
||||
/**
|
||||
* @brief Call this once the write for the cache is complete.
|
||||
*/
|
||||
void Commit() {
|
||||
if (!written) {
|
||||
std::partial_sum(offset.begin(), offset.end(), offset.begin());
|
||||
@@ -64,7 +80,7 @@ struct Cache {
|
||||
}
|
||||
};
|
||||
|
||||
// Prevents multi-threaded call.
|
||||
// Prevents multi-threaded call to `GetBatches`.
|
||||
class TryLockGuard {
|
||||
std::mutex& lock_;
|
||||
|
||||
@@ -77,74 +93,87 @@ class TryLockGuard {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Base class for all page sources. Handles fetching, writing, and iteration.
|
||||
*/
|
||||
template <typename S>
|
||||
class SparsePageSourceImpl : public BatchIteratorImpl<S> {
|
||||
protected:
|
||||
// Prevents calling this iterator from multiple places(or threads).
|
||||
std::mutex single_threaded_;
|
||||
|
||||
// The current page.
|
||||
std::shared_ptr<S> page_;
|
||||
|
||||
bool at_end_ {false};
|
||||
float missing_;
|
||||
int nthreads_;
|
||||
std::int32_t nthreads_;
|
||||
bst_feature_t n_features_;
|
||||
|
||||
uint32_t count_{0};
|
||||
|
||||
uint32_t n_batches_ {0};
|
||||
// Index to the current page.
|
||||
std::uint32_t count_{0};
|
||||
// Total number of batches.
|
||||
std::uint32_t n_batches_{0};
|
||||
|
||||
std::shared_ptr<Cache> cache_info_;
|
||||
std::unique_ptr<dmlc::Stream> fo_;
|
||||
|
||||
using Ring = std::vector<std::future<std::shared_ptr<S>>>;
|
||||
// A ring storing futures to data. Since the DMatrix iterator is forward only, so we
|
||||
// can pre-fetch data in a ring.
|
||||
std::unique_ptr<Ring> ring_{new Ring};
|
||||
// Catching exception in pre-fetch threads to prevent segfault. Not always work though,
|
||||
// OOM error can be delayed due to lazy commit. On the bright side, if mmap is used then
|
||||
// OOM error should be rare.
|
||||
dmlc::OMPException exec_;
|
||||
common::Monitor monitor_;
|
||||
|
||||
bool ReadCache() {
|
||||
CHECK(!at_end_);
|
||||
if (!cache_info_->written) {
|
||||
return false;
|
||||
}
|
||||
if (fo_) {
|
||||
fo_.reset(); // flush the data to disk.
|
||||
if (ring_->empty()) {
|
||||
ring_->resize(n_batches_);
|
||||
}
|
||||
// An heuristic for number of pre-fetched batches. We can make it part of BatchParam
|
||||
// to let user adjust number of pre-fetched batches when needed.
|
||||
uint32_t constexpr kPreFetch = 4;
|
||||
uint32_t constexpr kPreFetch = 3;
|
||||
|
||||
size_t n_prefetch_batches = std::min(kPreFetch, n_batches_);
|
||||
CHECK_GT(n_prefetch_batches, 0) << "total batches:" << n_batches_;
|
||||
size_t fetch_it = count_;
|
||||
std::size_t fetch_it = count_;
|
||||
|
||||
for (size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
|
||||
exec_.Rethrow();
|
||||
|
||||
for (std::size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
|
||||
fetch_it %= n_batches_; // ring
|
||||
if (ring_->at(fetch_it).valid()) {
|
||||
continue;
|
||||
}
|
||||
auto const *self = this; // make sure it's const
|
||||
auto const* self = this; // make sure it's const
|
||||
CHECK_LT(fetch_it, cache_info_->offset.size());
|
||||
ring_->at(fetch_it) = std::async(std::launch::async, [fetch_it, self]() {
|
||||
common::Timer timer;
|
||||
timer.Start();
|
||||
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
|
||||
auto n = self->cache_info_->ShardName();
|
||||
size_t offset = self->cache_info_->offset.at(fetch_it);
|
||||
std::unique_ptr<dmlc::SeekStream> fi{dmlc::SeekStream::CreateForRead(n.c_str())};
|
||||
fi->Seek(offset);
|
||||
CHECK_EQ(fi->Tell(), offset);
|
||||
ring_->at(fetch_it) = std::async(std::launch::async, [fetch_it, self, this]() {
|
||||
auto page = std::make_shared<S>();
|
||||
CHECK(fmt->Read(page.get(), fi.get()));
|
||||
LOG(INFO) << "Read a page in " << timer.ElapsedSeconds() << " seconds.";
|
||||
this->exec_.Run([&] {
|
||||
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
|
||||
auto name = self->cache_info_->ShardName();
|
||||
auto [offset, length] = self->cache_info_->View(fetch_it);
|
||||
auto fi = std::make_unique<common::PrivateMmapConstStream>(name, offset, length);
|
||||
CHECK(fmt->Read(page.get(), fi.get()));
|
||||
});
|
||||
return page;
|
||||
});
|
||||
}
|
||||
|
||||
CHECK_EQ(std::count_if(ring_->cbegin(), ring_->cend(), [](auto const& f) { return f.valid(); }),
|
||||
n_prefetch_batches)
|
||||
<< "Sparse DMatrix assumes forward iteration.";
|
||||
|
||||
monitor_.Start("Wait");
|
||||
page_ = (*ring_)[count_].get();
|
||||
CHECK(!(*ring_)[count_].valid());
|
||||
monitor_.Stop("Wait");
|
||||
|
||||
exec_.Rethrow();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -153,29 +182,41 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
|
||||
common::Timer timer;
|
||||
timer.Start();
|
||||
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
|
||||
if (!fo_) {
|
||||
auto n = cache_info_->ShardName();
|
||||
fo_.reset(dmlc::Stream::Create(n.c_str(), "w"));
|
||||
}
|
||||
auto bytes = fmt->Write(*page_, fo_.get());
|
||||
timer.Stop();
|
||||
|
||||
auto name = cache_info_->ShardName();
|
||||
std::unique_ptr<dmlc::Stream> fo;
|
||||
if (this->Iter() == 0) {
|
||||
fo.reset(dmlc::Stream::Create(name.c_str(), "wb"));
|
||||
} else {
|
||||
fo.reset(dmlc::Stream::Create(name.c_str(), "ab"));
|
||||
}
|
||||
|
||||
auto bytes = fmt->Write(*page_, fo.get());
|
||||
|
||||
timer.Stop();
|
||||
// Not entirely accurate, the kernels doesn't have to flush the data.
|
||||
LOG(INFO) << static_cast<double>(bytes) / 1024.0 / 1024.0 << " MB written in "
|
||||
<< timer.ElapsedSeconds() << " seconds.";
|
||||
cache_info_->offset.push_back(bytes);
|
||||
cache_info_->Push(bytes);
|
||||
}
|
||||
|
||||
virtual void Fetch() = 0;
|
||||
|
||||
public:
|
||||
SparsePageSourceImpl(float missing, int nthreads, bst_feature_t n_features,
|
||||
uint32_t n_batches, std::shared_ptr<Cache> cache)
|
||||
: missing_{missing}, nthreads_{nthreads}, n_features_{n_features},
|
||||
n_batches_{n_batches}, cache_info_{std::move(cache)} {}
|
||||
SparsePageSourceImpl(float missing, int nthreads, bst_feature_t n_features, uint32_t n_batches,
|
||||
std::shared_ptr<Cache> cache)
|
||||
: missing_{missing},
|
||||
nthreads_{nthreads},
|
||||
n_features_{n_features},
|
||||
n_batches_{n_batches},
|
||||
cache_info_{std::move(cache)} {
|
||||
monitor_.Init(typeid(S).name()); // not pretty, but works for basic profiling
|
||||
}
|
||||
|
||||
SparsePageSourceImpl(SparsePageSourceImpl const &that) = delete;
|
||||
|
||||
~SparsePageSourceImpl() override {
|
||||
// Don't orphan the threads.
|
||||
for (auto& fu : *ring_) {
|
||||
if (fu.valid()) {
|
||||
fu.get();
|
||||
@@ -183,18 +224,18 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t Iter() const { return count_; }
|
||||
[[nodiscard]] uint32_t Iter() const { return count_; }
|
||||
|
||||
const S &operator*() const override {
|
||||
CHECK(page_);
|
||||
return *page_;
|
||||
}
|
||||
|
||||
std::shared_ptr<S const> Page() const override {
|
||||
[[nodiscard]] std::shared_ptr<S const> Page() const override {
|
||||
return page_;
|
||||
}
|
||||
|
||||
bool AtEnd() const override {
|
||||
[[nodiscard]] bool AtEnd() const override {
|
||||
return at_end_;
|
||||
}
|
||||
|
||||
@@ -202,20 +243,23 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
|
||||
TryLockGuard guard{single_threaded_};
|
||||
at_end_ = false;
|
||||
count_ = 0;
|
||||
// Pre-fetch for the next round of iterations.
|
||||
this->Fetch();
|
||||
}
|
||||
};
|
||||
|
||||
#if defined(XGBOOST_USE_CUDA)
|
||||
// Push data from CUDA.
|
||||
void DevicePush(DMatrixProxy* proxy, float missing, SparsePage* page);
|
||||
#else
|
||||
inline void DevicePush(DMatrixProxy*, float, SparsePage*) { common::AssertGPUSupport(); }
|
||||
#endif
|
||||
|
||||
class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
|
||||
// This is the source from the user.
|
||||
DataIterProxy<DataIterResetCallback, XGDMatrixCallbackNext> iter_;
|
||||
DMatrixProxy* proxy_;
|
||||
size_t base_row_id_ {0};
|
||||
std::size_t base_row_id_{0};
|
||||
|
||||
void Fetch() final {
|
||||
page_ = std::make_shared<SparsePage>();
|
||||
@@ -244,7 +288,7 @@ class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
|
||||
iter_{iter}, proxy_{proxy} {
|
||||
if (!cache_info_->written) {
|
||||
iter_.Reset();
|
||||
CHECK_EQ(iter_.Next(), 1) << "Must have at least 1 batch.";
|
||||
CHECK(iter_.Next()) << "Must have at least 1 batch.";
|
||||
}
|
||||
this->Fetch();
|
||||
}
|
||||
@@ -259,6 +303,7 @@ class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
|
||||
}
|
||||
|
||||
if (at_end_) {
|
||||
CHECK_EQ(cache_info_->offset.size(), n_batches_ + 1);
|
||||
cache_info_->Commit();
|
||||
if (n_batches_ != 0) {
|
||||
CHECK_EQ(count_, n_batches_);
|
||||
@@ -371,6 +416,5 @@ class SortedCSCPageSource : public PageSourceIncMixIn<SortedCSCPage> {
|
||||
this->Fetch();
|
||||
}
|
||||
};
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
} // namespace xgboost::data
|
||||
#endif // XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
|
||||
|
||||
@@ -439,7 +439,7 @@ struct ShapSplitCondition {
|
||||
if (isnan(x)) {
|
||||
return is_missing_branch;
|
||||
}
|
||||
if (categories.Size() != 0) {
|
||||
if (categories.Capacity() != 0) {
|
||||
auto cat = static_cast<uint32_t>(x);
|
||||
return categories.Check(cat);
|
||||
} else {
|
||||
@@ -454,7 +454,7 @@ struct ShapSplitCondition {
|
||||
if (l.Data() == r.Data()) {
|
||||
return l;
|
||||
}
|
||||
if (l.Size() > r.Size()) {
|
||||
if (l.Capacity() > r.Capacity()) {
|
||||
thrust::swap(l, r);
|
||||
}
|
||||
for (size_t i = 0; i < r.Bits().size(); ++i) {
|
||||
@@ -466,7 +466,7 @@ struct ShapSplitCondition {
|
||||
// Combine two split conditions on the same feature
|
||||
XGBOOST_DEVICE void Merge(ShapSplitCondition other) {
|
||||
// Combine duplicate features
|
||||
if (categories.Size() != 0 || other.categories.Size() != 0) {
|
||||
if (categories.Capacity() != 0 || other.categories.Capacity() != 0) {
|
||||
categories = Intersect(categories, other.categories);
|
||||
} else {
|
||||
feature_lower_bound = max(feature_lower_bound, other.feature_lower_bound);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2019 XGBoost contributors
|
||||
/**
|
||||
* Copyright 2019-2023, XGBoost contributors
|
||||
*/
|
||||
#include <thrust/copy.h>
|
||||
#include <thrust/device_vector.h>
|
||||
@@ -140,20 +140,20 @@ void FeatureInteractionConstraintDevice::Reset() {
|
||||
__global__ void ClearBuffersKernel(
|
||||
LBitField64 result_buffer_output, LBitField64 result_buffer_input) {
|
||||
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
if (tid < result_buffer_output.Size()) {
|
||||
if (tid < result_buffer_output.Capacity()) {
|
||||
result_buffer_output.Clear(tid);
|
||||
}
|
||||
if (tid < result_buffer_input.Size()) {
|
||||
if (tid < result_buffer_input.Capacity()) {
|
||||
result_buffer_input.Clear(tid);
|
||||
}
|
||||
}
|
||||
|
||||
void FeatureInteractionConstraintDevice::ClearBuffers() {
|
||||
CHECK_EQ(output_buffer_bits_.Size(), input_buffer_bits_.Size());
|
||||
CHECK_LE(feature_buffer_.Size(), output_buffer_bits_.Size());
|
||||
CHECK_EQ(output_buffer_bits_.Capacity(), input_buffer_bits_.Capacity());
|
||||
CHECK_LE(feature_buffer_.Capacity(), output_buffer_bits_.Capacity());
|
||||
uint32_t constexpr kBlockThreads = 256;
|
||||
auto const n_grids = static_cast<uint32_t>(
|
||||
common::DivRoundUp(input_buffer_bits_.Size(), kBlockThreads));
|
||||
common::DivRoundUp(input_buffer_bits_.Capacity(), kBlockThreads));
|
||||
dh::LaunchKernel {n_grids, kBlockThreads} (
|
||||
ClearBuffersKernel,
|
||||
output_buffer_bits_, input_buffer_bits_);
|
||||
@@ -207,11 +207,11 @@ common::Span<bst_feature_t> FeatureInteractionConstraintDevice::Query(
|
||||
ClearBuffers();
|
||||
|
||||
LBitField64 node_constraints = s_node_constraints_[nid];
|
||||
CHECK_EQ(input_buffer_bits_.Size(), output_buffer_bits_.Size());
|
||||
CHECK_EQ(input_buffer_bits_.Capacity(), output_buffer_bits_.Capacity());
|
||||
|
||||
uint32_t constexpr kBlockThreads = 256;
|
||||
auto n_grids = static_cast<uint32_t>(
|
||||
common::DivRoundUp(output_buffer_bits_.Size(), kBlockThreads));
|
||||
common::DivRoundUp(output_buffer_bits_.Capacity(), kBlockThreads));
|
||||
dh::LaunchKernel {n_grids, kBlockThreads} (
|
||||
SetInputBufferKernel,
|
||||
feature_list, input_buffer_bits_);
|
||||
@@ -274,13 +274,13 @@ __global__ void InteractionConstraintSplitKernel(LBitField64 feature,
|
||||
LBitField64 left,
|
||||
LBitField64 right) {
|
||||
auto tid = threadIdx.x + blockDim.x * blockIdx.x;
|
||||
if (tid > node.Size()) {
|
||||
if (tid > node.Capacity()) {
|
||||
return;
|
||||
}
|
||||
// enable constraints from feature
|
||||
node |= feature;
|
||||
// clear the buffer after use
|
||||
if (tid < feature.Size()) {
|
||||
if (tid < feature.Capacity()) {
|
||||
feature.Clear(tid);
|
||||
}
|
||||
|
||||
@@ -323,7 +323,7 @@ void FeatureInteractionConstraintDevice::Split(
|
||||
s_sets_, s_sets_ptr_);
|
||||
|
||||
uint32_t constexpr kBlockThreads = 256;
|
||||
auto n_grids = static_cast<uint32_t>(common::DivRoundUp(node.Size(), kBlockThreads));
|
||||
auto n_grids = static_cast<uint32_t>(common::DivRoundUp(node.Capacity(), kBlockThreads));
|
||||
|
||||
dh::LaunchKernel {n_grids, kBlockThreads} (
|
||||
InteractionConstraintSplitKernel,
|
||||
|
||||
@@ -146,27 +146,30 @@ class PoissonSampling : public thrust::binary_function<GradientPair, size_t, Gra
|
||||
CombineGradientPair combine_;
|
||||
};
|
||||
|
||||
NoSampling::NoSampling(EllpackPageImpl const* page) : page_(page) {}
|
||||
NoSampling::NoSampling(BatchParam batch_param) : batch_param_(std::move(batch_param)) {}
|
||||
|
||||
GradientBasedSample NoSampling::Sample(Context const*, common::Span<GradientPair> gpair,
|
||||
GradientBasedSample NoSampling::Sample(Context const* ctx, common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) {
|
||||
return {dmat->Info().num_row_, page_, gpair};
|
||||
auto page = (*dmat->GetBatches<EllpackPage>(ctx, batch_param_).begin()).Impl();
|
||||
return {dmat->Info().num_row_, page, gpair};
|
||||
}
|
||||
|
||||
ExternalMemoryNoSampling::ExternalMemoryNoSampling(Context const* ctx, EllpackPageImpl const* page,
|
||||
size_t n_rows, BatchParam batch_param)
|
||||
: batch_param_{std::move(batch_param)},
|
||||
page_(new EllpackPageImpl(ctx->gpu_id, page->Cuts(), page->is_dense, page->row_stride,
|
||||
n_rows)) {}
|
||||
ExternalMemoryNoSampling::ExternalMemoryNoSampling(BatchParam batch_param)
|
||||
: batch_param_{std::move(batch_param)} {}
|
||||
|
||||
GradientBasedSample ExternalMemoryNoSampling::Sample(Context const* ctx,
|
||||
common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) {
|
||||
if (!page_concatenated_) {
|
||||
// Concatenate all the external memory ELLPACK pages into a single in-memory page.
|
||||
page_.reset(nullptr);
|
||||
size_t offset = 0;
|
||||
for (auto& batch : dmat->GetBatches<EllpackPage>(ctx, batch_param_)) {
|
||||
auto page = batch.Impl();
|
||||
if (!page_) {
|
||||
page_ = std::make_unique<EllpackPageImpl>(ctx->gpu_id, page->Cuts(), page->is_dense,
|
||||
page->row_stride, dmat->Info().num_row_);
|
||||
}
|
||||
size_t num_elements = page_->Copy(ctx->gpu_id, page, offset);
|
||||
offset += num_elements;
|
||||
}
|
||||
@@ -175,8 +178,8 @@ GradientBasedSample ExternalMemoryNoSampling::Sample(Context const* ctx,
|
||||
return {dmat->Info().num_row_, page_.get(), gpair};
|
||||
}
|
||||
|
||||
UniformSampling::UniformSampling(EllpackPageImpl const* page, float subsample)
|
||||
: page_(page), subsample_(subsample) {}
|
||||
UniformSampling::UniformSampling(BatchParam batch_param, float subsample)
|
||||
: batch_param_{std::move(batch_param)}, subsample_(subsample) {}
|
||||
|
||||
GradientBasedSample UniformSampling::Sample(Context const* ctx, common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) {
|
||||
@@ -185,7 +188,8 @@ GradientBasedSample UniformSampling::Sample(Context const* ctx, common::Span<Gra
|
||||
thrust::replace_if(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair),
|
||||
thrust::counting_iterator<std::size_t>(0),
|
||||
BernoulliTrial(common::GlobalRandom()(), subsample_), GradientPair());
|
||||
return {dmat->Info().num_row_, page_, gpair};
|
||||
auto page = (*dmat->GetBatches<EllpackPage>(ctx, batch_param_).begin()).Impl();
|
||||
return {dmat->Info().num_row_, page, gpair};
|
||||
}
|
||||
|
||||
ExternalMemoryUniformSampling::ExternalMemoryUniformSampling(size_t n_rows,
|
||||
@@ -236,12 +240,10 @@ GradientBasedSample ExternalMemoryUniformSampling::Sample(Context const* ctx,
|
||||
return {sample_rows, page_.get(), dh::ToSpan(gpair_)};
|
||||
}
|
||||
|
||||
GradientBasedSampling::GradientBasedSampling(EllpackPageImpl const* page,
|
||||
size_t n_rows,
|
||||
const BatchParam&,
|
||||
GradientBasedSampling::GradientBasedSampling(std::size_t n_rows, BatchParam batch_param,
|
||||
float subsample)
|
||||
: page_(page),
|
||||
subsample_(subsample),
|
||||
: subsample_(subsample),
|
||||
batch_param_{std::move(batch_param)},
|
||||
threshold_(n_rows + 1, 0.0f),
|
||||
grad_sum_(n_rows, 0.0f) {}
|
||||
|
||||
@@ -252,18 +254,19 @@ GradientBasedSample GradientBasedSampling::Sample(Context const* ctx,
|
||||
size_t threshold_index = GradientBasedSampler::CalculateThresholdIndex(
|
||||
gpair, dh::ToSpan(threshold_), dh::ToSpan(grad_sum_), n_rows * subsample_);
|
||||
|
||||
auto page = (*dmat->GetBatches<EllpackPage>(ctx, batch_param_).begin()).Impl();
|
||||
|
||||
// Perform Poisson sampling in place.
|
||||
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair),
|
||||
thrust::counting_iterator<size_t>(0), dh::tbegin(gpair),
|
||||
PoissonSampling(dh::ToSpan(threshold_), threshold_index,
|
||||
RandomWeight(common::GlobalRandom()())));
|
||||
return {n_rows, page_, gpair};
|
||||
return {n_rows, page, gpair};
|
||||
}
|
||||
|
||||
ExternalMemoryGradientBasedSampling::ExternalMemoryGradientBasedSampling(
|
||||
size_t n_rows,
|
||||
BatchParam batch_param,
|
||||
float subsample)
|
||||
ExternalMemoryGradientBasedSampling::ExternalMemoryGradientBasedSampling(size_t n_rows,
|
||||
BatchParam batch_param,
|
||||
float subsample)
|
||||
: batch_param_(std::move(batch_param)),
|
||||
subsample_(subsample),
|
||||
threshold_(n_rows + 1, 0.0f),
|
||||
@@ -273,16 +276,15 @@ ExternalMemoryGradientBasedSampling::ExternalMemoryGradientBasedSampling(
|
||||
GradientBasedSample ExternalMemoryGradientBasedSampling::Sample(Context const* ctx,
|
||||
common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) {
|
||||
size_t n_rows = dmat->Info().num_row_;
|
||||
auto cuctx = ctx->CUDACtx();
|
||||
bst_row_t n_rows = dmat->Info().num_row_;
|
||||
size_t threshold_index = GradientBasedSampler::CalculateThresholdIndex(
|
||||
gpair, dh::ToSpan(threshold_), dh::ToSpan(grad_sum_), n_rows * subsample_);
|
||||
|
||||
// Perform Poisson sampling in place.
|
||||
thrust::transform(dh::tbegin(gpair), dh::tend(gpair),
|
||||
thrust::counting_iterator<size_t>(0),
|
||||
dh::tbegin(gpair),
|
||||
PoissonSampling(dh::ToSpan(threshold_),
|
||||
threshold_index,
|
||||
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair),
|
||||
thrust::counting_iterator<size_t>(0), dh::tbegin(gpair),
|
||||
PoissonSampling(dh::ToSpan(threshold_), threshold_index,
|
||||
RandomWeight(common::GlobalRandom()())));
|
||||
|
||||
// Count the sampled rows.
|
||||
@@ -290,16 +292,15 @@ GradientBasedSample ExternalMemoryGradientBasedSampling::Sample(Context const* c
|
||||
|
||||
// Compact gradient pairs.
|
||||
gpair_.resize(sample_rows);
|
||||
thrust::copy_if(dh::tbegin(gpair), dh::tend(gpair), gpair_.begin(), IsNonZero());
|
||||
thrust::copy_if(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair), gpair_.begin(), IsNonZero());
|
||||
|
||||
// Index the sample rows.
|
||||
thrust::transform(dh::tbegin(gpair), dh::tend(gpair), sample_row_index_.begin(), IsNonZero());
|
||||
thrust::exclusive_scan(sample_row_index_.begin(), sample_row_index_.end(),
|
||||
sample_row_index_.begin());
|
||||
thrust::transform(dh::tbegin(gpair), dh::tend(gpair),
|
||||
sample_row_index_.begin(),
|
||||
sample_row_index_.begin(),
|
||||
ClearEmptyRows());
|
||||
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair), sample_row_index_.begin(),
|
||||
IsNonZero());
|
||||
thrust::exclusive_scan(cuctx->CTP(), sample_row_index_.begin(), sample_row_index_.end(),
|
||||
sample_row_index_.begin());
|
||||
thrust::transform(cuctx->CTP(), dh::tbegin(gpair), dh::tend(gpair), sample_row_index_.begin(),
|
||||
sample_row_index_.begin(), ClearEmptyRows());
|
||||
|
||||
auto batch_iterator = dmat->GetBatches<EllpackPage>(ctx, batch_param_);
|
||||
auto first_page = (*batch_iterator.begin()).Impl();
|
||||
@@ -317,13 +318,13 @@ GradientBasedSample ExternalMemoryGradientBasedSampling::Sample(Context const* c
|
||||
return {sample_rows, page_.get(), dh::ToSpan(gpair_)};
|
||||
}
|
||||
|
||||
GradientBasedSampler::GradientBasedSampler(Context const* ctx, EllpackPageImpl const* page,
|
||||
size_t n_rows, const BatchParam& batch_param,
|
||||
float subsample, int sampling_method) {
|
||||
GradientBasedSampler::GradientBasedSampler(Context const* /*ctx*/, size_t n_rows,
|
||||
const BatchParam& batch_param, float subsample,
|
||||
int sampling_method, bool is_external_memory) {
|
||||
// The ctx is kept here for future development of stream-based operations.
|
||||
monitor_.Init("gradient_based_sampler");
|
||||
|
||||
bool is_sampling = subsample < 1.0;
|
||||
bool is_external_memory = page->n_rows != n_rows;
|
||||
|
||||
if (is_sampling) {
|
||||
switch (sampling_method) {
|
||||
@@ -331,24 +332,24 @@ GradientBasedSampler::GradientBasedSampler(Context const* ctx, EllpackPageImpl c
|
||||
if (is_external_memory) {
|
||||
strategy_.reset(new ExternalMemoryUniformSampling(n_rows, batch_param, subsample));
|
||||
} else {
|
||||
strategy_.reset(new UniformSampling(page, subsample));
|
||||
strategy_.reset(new UniformSampling(batch_param, subsample));
|
||||
}
|
||||
break;
|
||||
case TrainParam::kGradientBased:
|
||||
if (is_external_memory) {
|
||||
strategy_.reset(
|
||||
new ExternalMemoryGradientBasedSampling(n_rows, batch_param, subsample));
|
||||
strategy_.reset(new ExternalMemoryGradientBasedSampling(n_rows, batch_param, subsample));
|
||||
} else {
|
||||
strategy_.reset(new GradientBasedSampling(page, n_rows, batch_param, subsample));
|
||||
strategy_.reset(new GradientBasedSampling(n_rows, batch_param, subsample));
|
||||
}
|
||||
break;
|
||||
default:LOG(FATAL) << "unknown sampling method";
|
||||
default:
|
||||
LOG(FATAL) << "unknown sampling method";
|
||||
}
|
||||
} else {
|
||||
if (is_external_memory) {
|
||||
strategy_.reset(new ExternalMemoryNoSampling(ctx, page, n_rows, batch_param));
|
||||
strategy_.reset(new ExternalMemoryNoSampling(batch_param));
|
||||
} else {
|
||||
strategy_.reset(new NoSampling(page));
|
||||
strategy_.reset(new NoSampling(batch_param));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -362,11 +363,11 @@ GradientBasedSample GradientBasedSampler::Sample(Context const* ctx,
|
||||
return sample;
|
||||
}
|
||||
|
||||
size_t GradientBasedSampler::CalculateThresholdIndex(
|
||||
common::Span<GradientPair> gpair, common::Span<float> threshold,
|
||||
common::Span<float> grad_sum, size_t sample_rows) {
|
||||
thrust::fill(dh::tend(threshold) - 1, dh::tend(threshold),
|
||||
std::numeric_limits<float>::max());
|
||||
size_t GradientBasedSampler::CalculateThresholdIndex(common::Span<GradientPair> gpair,
|
||||
common::Span<float> threshold,
|
||||
common::Span<float> grad_sum,
|
||||
size_t sample_rows) {
|
||||
thrust::fill(dh::tend(threshold) - 1, dh::tend(threshold), std::numeric_limits<float>::max());
|
||||
thrust::transform(dh::tbegin(gpair), dh::tend(gpair), dh::tbegin(threshold),
|
||||
CombineGradientPair());
|
||||
thrust::sort(dh::tbegin(threshold), dh::tend(threshold) - 1);
|
||||
@@ -379,6 +380,5 @@ size_t GradientBasedSampler::CalculateThresholdIndex(
|
||||
thrust::min_element(dh::tbegin(grad_sum), dh::tend(grad_sum));
|
||||
return thrust::distance(dh::tbegin(grad_sum), min) + 1;
|
||||
}
|
||||
|
||||
}; // namespace tree
|
||||
}; // namespace xgboost
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2019 by XGBoost Contributors
|
||||
/**
|
||||
* Copyright 2019-2023, XGBoost Contributors
|
||||
*/
|
||||
#pragma once
|
||||
#include <xgboost/base.h>
|
||||
@@ -32,37 +32,36 @@ class SamplingStrategy {
|
||||
/*! \brief No sampling in in-memory mode. */
|
||||
class NoSampling : public SamplingStrategy {
|
||||
public:
|
||||
explicit NoSampling(EllpackPageImpl const* page);
|
||||
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) override;
|
||||
|
||||
private:
|
||||
EllpackPageImpl const* page_;
|
||||
};
|
||||
|
||||
/*! \brief No sampling in external memory mode. */
|
||||
class ExternalMemoryNoSampling : public SamplingStrategy {
|
||||
public:
|
||||
ExternalMemoryNoSampling(Context const* ctx, EllpackPageImpl const* page, size_t n_rows,
|
||||
BatchParam batch_param);
|
||||
explicit NoSampling(BatchParam batch_param);
|
||||
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) override;
|
||||
|
||||
private:
|
||||
BatchParam batch_param_;
|
||||
std::unique_ptr<EllpackPageImpl> page_;
|
||||
};
|
||||
|
||||
/*! \brief No sampling in external memory mode. */
|
||||
class ExternalMemoryNoSampling : public SamplingStrategy {
|
||||
public:
|
||||
explicit ExternalMemoryNoSampling(BatchParam batch_param);
|
||||
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) override;
|
||||
|
||||
private:
|
||||
BatchParam batch_param_;
|
||||
std::unique_ptr<EllpackPageImpl> page_{nullptr};
|
||||
bool page_concatenated_{false};
|
||||
};
|
||||
|
||||
/*! \brief Uniform sampling in in-memory mode. */
|
||||
class UniformSampling : public SamplingStrategy {
|
||||
public:
|
||||
UniformSampling(EllpackPageImpl const* page, float subsample);
|
||||
UniformSampling(BatchParam batch_param, float subsample);
|
||||
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) override;
|
||||
|
||||
private:
|
||||
EllpackPageImpl const* page_;
|
||||
BatchParam batch_param_;
|
||||
float subsample_;
|
||||
};
|
||||
|
||||
@@ -84,13 +83,12 @@ class ExternalMemoryUniformSampling : public SamplingStrategy {
|
||||
/*! \brief Gradient-based sampling in in-memory mode.. */
|
||||
class GradientBasedSampling : public SamplingStrategy {
|
||||
public:
|
||||
GradientBasedSampling(EllpackPageImpl const* page, size_t n_rows, const BatchParam& batch_param,
|
||||
float subsample);
|
||||
GradientBasedSampling(std::size_t n_rows, BatchParam batch_param, float subsample);
|
||||
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair,
|
||||
DMatrix* dmat) override;
|
||||
|
||||
private:
|
||||
EllpackPageImpl const* page_;
|
||||
BatchParam batch_param_;
|
||||
float subsample_;
|
||||
dh::caching_device_vector<float> threshold_;
|
||||
dh::caching_device_vector<float> grad_sum_;
|
||||
@@ -106,11 +104,11 @@ class ExternalMemoryGradientBasedSampling : public SamplingStrategy {
|
||||
private:
|
||||
BatchParam batch_param_;
|
||||
float subsample_;
|
||||
dh::caching_device_vector<float> threshold_;
|
||||
dh::caching_device_vector<float> grad_sum_;
|
||||
dh::device_vector<float> threshold_;
|
||||
dh::device_vector<float> grad_sum_;
|
||||
std::unique_ptr<EllpackPageImpl> page_;
|
||||
dh::device_vector<GradientPair> gpair_;
|
||||
dh::caching_device_vector<size_t> sample_row_index_;
|
||||
dh::device_vector<size_t> sample_row_index_;
|
||||
};
|
||||
|
||||
/*! \brief Draw a sample of rows from a DMatrix.
|
||||
@@ -124,8 +122,8 @@ class ExternalMemoryGradientBasedSampling : public SamplingStrategy {
|
||||
*/
|
||||
class GradientBasedSampler {
|
||||
public:
|
||||
GradientBasedSampler(Context const* ctx, EllpackPageImpl const* page, size_t n_rows,
|
||||
const BatchParam& batch_param, float subsample, int sampling_method);
|
||||
GradientBasedSampler(Context const* ctx, size_t n_rows, const BatchParam& batch_param,
|
||||
float subsample, int sampling_method, bool is_external_memory);
|
||||
|
||||
/*! \brief Sample from a DMatrix based on the given gradient pairs. */
|
||||
GradientBasedSample Sample(Context const* ctx, common::Span<GradientPair> gpair, DMatrix* dmat);
|
||||
|
||||
@@ -213,7 +213,7 @@ std::vector<bst_cat_t> GetSplitCategories(RegTree const &tree, int32_t nidx) {
|
||||
auto split = common::KCatBitField{csr.categories.subspan(seg.beg, seg.size)};
|
||||
|
||||
std::vector<bst_cat_t> cats;
|
||||
for (size_t i = 0; i < split.Size(); ++i) {
|
||||
for (size_t i = 0; i < split.Capacity(); ++i) {
|
||||
if (split.Check(i)) {
|
||||
cats.push_back(static_cast<bst_cat_t>(i));
|
||||
}
|
||||
@@ -1004,7 +1004,7 @@ void RegTree::SaveCategoricalSplit(Json* p_out) const {
|
||||
auto segment = split_categories_segments_[i];
|
||||
auto node_categories = this->GetSplitCategories().subspan(segment.beg, segment.size);
|
||||
common::KCatBitField const cat_bits(node_categories);
|
||||
for (size_t i = 0; i < cat_bits.Size(); ++i) {
|
||||
for (size_t i = 0; i < cat_bits.Capacity(); ++i) {
|
||||
if (cat_bits.Check(i)) {
|
||||
categories.GetArray().emplace_back(i);
|
||||
}
|
||||
|
||||
@@ -176,7 +176,7 @@ struct GPUHistMakerDevice {
|
||||
Context const* ctx_;
|
||||
|
||||
public:
|
||||
EllpackPageImpl const* page;
|
||||
EllpackPageImpl const* page{nullptr};
|
||||
common::Span<FeatureType const> feature_types;
|
||||
BatchParam batch_param;
|
||||
|
||||
@@ -205,41 +205,41 @@ struct GPUHistMakerDevice {
|
||||
|
||||
std::unique_ptr<FeatureGroups> feature_groups;
|
||||
|
||||
|
||||
GPUHistMakerDevice(Context const* ctx, EllpackPageImpl const* _page,
|
||||
common::Span<FeatureType const> _feature_types, bst_uint _n_rows,
|
||||
GPUHistMakerDevice(Context const* ctx, bool is_external_memory,
|
||||
common::Span<FeatureType const> _feature_types, bst_row_t _n_rows,
|
||||
TrainParam _param, uint32_t column_sampler_seed, uint32_t n_features,
|
||||
BatchParam _batch_param)
|
||||
: evaluator_{_param, n_features, ctx->gpu_id},
|
||||
ctx_(ctx),
|
||||
page(_page),
|
||||
feature_types{_feature_types},
|
||||
param(std::move(_param)),
|
||||
column_sampler(column_sampler_seed),
|
||||
interaction_constraints(param, n_features),
|
||||
batch_param(std::move(_batch_param)) {
|
||||
sampler.reset(new GradientBasedSampler(ctx, page, _n_rows, batch_param, param.subsample,
|
||||
param.sampling_method));
|
||||
sampler.reset(new GradientBasedSampler(ctx, _n_rows, batch_param, param.subsample,
|
||||
param.sampling_method, is_external_memory));
|
||||
if (!param.monotone_constraints.empty()) {
|
||||
// Copy assigning an empty vector causes an exception in MSVC debug builds
|
||||
monotone_constraints = param.monotone_constraints;
|
||||
}
|
||||
|
||||
// Init histogram
|
||||
hist.Init(ctx_->gpu_id, page->Cuts().TotalBins());
|
||||
monitor.Init(std::string("GPUHistMakerDevice") + std::to_string(ctx_->gpu_id));
|
||||
feature_groups.reset(new FeatureGroups(page->Cuts(), page->is_dense,
|
||||
dh::MaxSharedMemoryOptin(ctx_->gpu_id),
|
||||
sizeof(GradientSumT)));
|
||||
}
|
||||
|
||||
~GPUHistMakerDevice() { // NOLINT
|
||||
dh::safe_cuda(cudaSetDevice(ctx_->gpu_id));
|
||||
}
|
||||
|
||||
void InitFeatureGroupsOnce() {
|
||||
if (!feature_groups) {
|
||||
CHECK(page);
|
||||
feature_groups.reset(new FeatureGroups(page->Cuts(), page->is_dense,
|
||||
dh::MaxSharedMemoryOptin(ctx_->gpu_id),
|
||||
sizeof(GradientSumT)));
|
||||
}
|
||||
}
|
||||
|
||||
// Reset values for each update iteration
|
||||
// Note that the column sampler must be passed by value because it is not
|
||||
// thread safe
|
||||
void Reset(HostDeviceVector<GradientPair>* dh_gpair, DMatrix* dmat, int64_t num_columns) {
|
||||
auto const& info = dmat->Info();
|
||||
this->column_sampler.Init(ctx_, num_columns, info.feature_weights.HostVector(),
|
||||
@@ -247,26 +247,30 @@ struct GPUHistMakerDevice {
|
||||
param.colsample_bytree);
|
||||
dh::safe_cuda(cudaSetDevice(ctx_->gpu_id));
|
||||
|
||||
this->evaluator_.Reset(page->Cuts(), feature_types, dmat->Info().num_col_, param,
|
||||
ctx_->gpu_id);
|
||||
|
||||
this->interaction_constraints.Reset();
|
||||
|
||||
if (d_gpair.size() != dh_gpair->Size()) {
|
||||
d_gpair.resize(dh_gpair->Size());
|
||||
}
|
||||
dh::safe_cuda(cudaMemcpyAsync(
|
||||
d_gpair.data().get(), dh_gpair->ConstDevicePointer(),
|
||||
dh_gpair->Size() * sizeof(GradientPair), cudaMemcpyDeviceToDevice));
|
||||
dh::safe_cuda(cudaMemcpyAsync(d_gpair.data().get(), dh_gpair->ConstDevicePointer(),
|
||||
dh_gpair->Size() * sizeof(GradientPair),
|
||||
cudaMemcpyDeviceToDevice));
|
||||
auto sample = sampler->Sample(ctx_, dh::ToSpan(d_gpair), dmat);
|
||||
page = sample.page;
|
||||
gpair = sample.gpair;
|
||||
|
||||
this->evaluator_.Reset(page->Cuts(), feature_types, dmat->Info().num_col_, param, ctx_->gpu_id);
|
||||
|
||||
quantiser.reset(new GradientQuantiser(this->gpair));
|
||||
|
||||
row_partitioner.reset(); // Release the device memory first before reallocating
|
||||
row_partitioner.reset(new RowPartitioner(ctx_->gpu_id, sample.sample_rows));
|
||||
row_partitioner.reset(new RowPartitioner(ctx_->gpu_id, sample.sample_rows));
|
||||
|
||||
// Init histogram
|
||||
hist.Init(ctx_->gpu_id, page->Cuts().TotalBins());
|
||||
hist.Reset();
|
||||
|
||||
this->InitFeatureGroupsOnce();
|
||||
}
|
||||
|
||||
GPUExpandEntry EvaluateRootSplit(GradientPairInt64 root_sum) {
|
||||
@@ -808,12 +812,11 @@ class GPUHistMaker : public TreeUpdater {
|
||||
collective::Broadcast(&column_sampling_seed, sizeof(column_sampling_seed), 0);
|
||||
|
||||
auto batch_param = BatchParam{param->max_bin, TrainParam::DftSparseThreshold()};
|
||||
auto page = (*dmat->GetBatches<EllpackPage>(ctx_, batch_param).begin()).Impl();
|
||||
dh::safe_cuda(cudaSetDevice(ctx_->gpu_id));
|
||||
info_->feature_types.SetDevice(ctx_->gpu_id);
|
||||
maker.reset(new GPUHistMakerDevice<GradientSumT>(
|
||||
ctx_, page, info_->feature_types.ConstDeviceSpan(), info_->num_row_, *param,
|
||||
column_sampling_seed, info_->num_col_, batch_param));
|
||||
ctx_, !dmat->SingleColBlock(), info_->feature_types.ConstDeviceSpan(), info_->num_row_,
|
||||
*param, column_sampling_seed, info_->num_col_, batch_param));
|
||||
|
||||
p_last_fmat_ = dmat;
|
||||
initialised_ = true;
|
||||
|
||||
@@ -37,6 +37,7 @@ class LintersPaths:
|
||||
"demo/guide-python/quantile_regression.py",
|
||||
"demo/guide-python/multioutput_regression.py",
|
||||
"demo/guide-python/learning_to_rank.py",
|
||||
"demo/aft_survival/aft_survival_viz_demo.py",
|
||||
# CI
|
||||
"tests/ci_build/lint_python.py",
|
||||
"tests/ci_build/test_r_package.py",
|
||||
@@ -78,6 +79,7 @@ class LintersPaths:
|
||||
"demo/guide-python/quantile_regression.py",
|
||||
"demo/guide-python/multioutput_regression.py",
|
||||
"demo/guide-python/learning_to_rank.py",
|
||||
"demo/aft_survival/aft_survival_viz_demo.py",
|
||||
# CI
|
||||
"tests/ci_build/lint_python.py",
|
||||
"tests/ci_build/test_r_package.py",
|
||||
@@ -114,7 +116,13 @@ def run_black(rel_path: str, fix: bool) -> bool:
|
||||
@cd(PY_PACKAGE)
|
||||
def run_isort(rel_path: str, fix: bool) -> bool:
|
||||
# Isort gets confused when trying to find the config file, so specified explicitly.
|
||||
cmd = ["isort", "--settings-path", PY_PACKAGE, os.path.join(ROOT, rel_path)]
|
||||
cmd = [
|
||||
"isort",
|
||||
"--settings-path",
|
||||
PY_PACKAGE,
|
||||
f"--src={PY_PACKAGE}",
|
||||
os.path.join(ROOT, rel_path),
|
||||
]
|
||||
if not fix:
|
||||
cmd += ["--check"]
|
||||
|
||||
|
||||
@@ -5,10 +5,12 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <bitset>
|
||||
#include <string> // for string
|
||||
|
||||
#include "../../../src/collective/nccl_device_communicator.cuh"
|
||||
#include "../../../src/collective/communicator-inl.cuh"
|
||||
#include "../../../src/collective/nccl_device_communicator.cuh"
|
||||
#include "../helpers.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace collective {
|
||||
@@ -31,6 +33,69 @@ TEST(NcclDeviceCommunicatorSimpleTest, SystemError) {
|
||||
ASSERT_TRUE(str.find("environment variables") != std::string::npos);
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
void VerifyAllReduceBitwiseAND() {
|
||||
auto const rank = collective::GetRank();
|
||||
std::bitset<64> original{};
|
||||
original[rank] = true;
|
||||
HostDeviceVector<uint64_t> buffer({original.to_ullong()}, rank);
|
||||
collective::AllReduce<collective::Operation::kBitwiseAND>(rank, buffer.DevicePointer(), 1);
|
||||
collective::Synchronize(rank);
|
||||
EXPECT_EQ(buffer.HostVector()[0], 0ULL);
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
TEST(NcclDeviceCommunicator, MGPUAllReduceBitwiseAND) {
|
||||
auto const n_gpus = common::AllVisibleGPUs();
|
||||
if (n_gpus <= 1) {
|
||||
GTEST_SKIP() << "Skipping MGPUAllReduceBitwiseAND test with # GPUs = " << n_gpus;
|
||||
}
|
||||
RunWithInMemoryCommunicator(n_gpus, VerifyAllReduceBitwiseAND);
|
||||
}
|
||||
|
||||
namespace {
|
||||
void VerifyAllReduceBitwiseOR() {
|
||||
auto const world_size = collective::GetWorldSize();
|
||||
auto const rank = collective::GetRank();
|
||||
std::bitset<64> original{};
|
||||
original[rank] = true;
|
||||
HostDeviceVector<uint64_t> buffer({original.to_ullong()}, rank);
|
||||
collective::AllReduce<collective::Operation::kBitwiseOR>(rank, buffer.DevicePointer(), 1);
|
||||
collective::Synchronize(rank);
|
||||
EXPECT_EQ(buffer.HostVector()[0], (1ULL << world_size) - 1);
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
TEST(NcclDeviceCommunicator, MGPUAllReduceBitwiseOR) {
|
||||
auto const n_gpus = common::AllVisibleGPUs();
|
||||
if (n_gpus <= 1) {
|
||||
GTEST_SKIP() << "Skipping MGPUAllReduceBitwiseOR test with # GPUs = " << n_gpus;
|
||||
}
|
||||
RunWithInMemoryCommunicator(n_gpus, VerifyAllReduceBitwiseOR);
|
||||
}
|
||||
|
||||
namespace {
|
||||
void VerifyAllReduceBitwiseXOR() {
|
||||
auto const world_size = collective::GetWorldSize();
|
||||
auto const rank = collective::GetRank();
|
||||
std::bitset<64> original{~0ULL};
|
||||
original[rank] = false;
|
||||
HostDeviceVector<uint64_t> buffer({original.to_ullong()}, rank);
|
||||
collective::AllReduce<collective::Operation::kBitwiseXOR>(rank, buffer.DevicePointer(), 1);
|
||||
collective::Synchronize(rank);
|
||||
EXPECT_EQ(buffer.HostVector()[0], (1ULL << world_size) - 1);
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
TEST(NcclDeviceCommunicator, MGPUAllReduceBitwiseXOR) {
|
||||
auto const n_gpus = common::AllVisibleGPUs();
|
||||
if (n_gpus <= 1) {
|
||||
GTEST_SKIP() << "Skipping MGPUAllReduceBitwiseXOR test with # GPUs = " << n_gpus;
|
||||
}
|
||||
RunWithInMemoryCommunicator(n_gpus, VerifyAllReduceBitwiseXOR);
|
||||
}
|
||||
|
||||
} // namespace collective
|
||||
} // namespace xgboost
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2019 XGBoost contributors
|
||||
/**
|
||||
* Copyright 2019-2023, XGBoost contributors
|
||||
*/
|
||||
#include <gtest/gtest.h>
|
||||
#include "../../../src/common/bitfield.h"
|
||||
@@ -14,7 +14,7 @@ TEST(BitField, Check) {
|
||||
static_cast<typename common::Span<LBitField64::value_type>::index_type>(
|
||||
storage.size())});
|
||||
size_t true_bit = 190;
|
||||
for (size_t i = true_bit + 1; i < bits.Size(); ++i) {
|
||||
for (size_t i = true_bit + 1; i < bits.Capacity(); ++i) {
|
||||
ASSERT_FALSE(bits.Check(i));
|
||||
}
|
||||
ASSERT_TRUE(bits.Check(true_bit));
|
||||
@@ -34,7 +34,7 @@ TEST(BitField, Check) {
|
||||
ASSERT_FALSE(bits.Check(i));
|
||||
}
|
||||
ASSERT_TRUE(bits.Check(true_bit));
|
||||
for (size_t i = true_bit + 1; i < bits.Size(); ++i) {
|
||||
for (size_t i = true_bit + 1; i < bits.Capacity(); ++i) {
|
||||
ASSERT_FALSE(bits.Check(i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2019 XGBoost contributors
|
||||
/**
|
||||
* Copyright 2019-2023, XGBoost contributors
|
||||
*/
|
||||
#include <gtest/gtest.h>
|
||||
#include <thrust/copy.h>
|
||||
@@ -12,7 +12,7 @@ namespace xgboost {
|
||||
|
||||
__global__ void TestSetKernel(LBitField64 bits) {
|
||||
auto tid = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
if (tid < bits.Size()) {
|
||||
if (tid < bits.Capacity()) {
|
||||
bits.Set(tid);
|
||||
}
|
||||
}
|
||||
@@ -36,20 +36,16 @@ TEST(BitField, GPUSet) {
|
||||
|
||||
std::vector<LBitField64::value_type> h_storage(storage.size());
|
||||
thrust::copy(storage.begin(), storage.end(), h_storage.begin());
|
||||
|
||||
LBitField64 outputs {
|
||||
common::Span<LBitField64::value_type>{h_storage.data(),
|
||||
h_storage.data() + h_storage.size()}};
|
||||
LBitField64 outputs{
|
||||
common::Span<LBitField64::value_type>{h_storage.data(), h_storage.data() + h_storage.size()}};
|
||||
for (size_t i = 0; i < kBits; ++i) {
|
||||
ASSERT_TRUE(outputs.Check(i));
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void TestOrKernel(LBitField64 lhs, LBitField64 rhs) {
|
||||
lhs |= rhs;
|
||||
}
|
||||
|
||||
TEST(BitField, GPUAnd) {
|
||||
namespace {
|
||||
template <bool is_and, typename Op>
|
||||
void TestGPULogic(Op op) {
|
||||
uint32_t constexpr kBits = 128;
|
||||
dh::device_vector<LBitField64::value_type> lhs_storage(kBits);
|
||||
dh::device_vector<LBitField64::value_type> rhs_storage(kBits);
|
||||
@@ -57,13 +53,32 @@ TEST(BitField, GPUAnd) {
|
||||
auto rhs = LBitField64(dh::ToSpan(rhs_storage));
|
||||
thrust::fill(lhs_storage.begin(), lhs_storage.end(), 0UL);
|
||||
thrust::fill(rhs_storage.begin(), rhs_storage.end(), ~static_cast<LBitField64::value_type>(0UL));
|
||||
TestOrKernel<<<1, kBits>>>(lhs, rhs);
|
||||
dh::LaunchN(kBits, [=] __device__(auto) mutable { op(lhs, rhs); });
|
||||
|
||||
std::vector<LBitField64::value_type> h_storage(lhs_storage.size());
|
||||
thrust::copy(lhs_storage.begin(), lhs_storage.end(), h_storage.begin());
|
||||
LBitField64 outputs {{h_storage.data(), h_storage.data() + h_storage.size()}};
|
||||
for (size_t i = 0; i < kBits; ++i) {
|
||||
ASSERT_TRUE(outputs.Check(i));
|
||||
LBitField64 outputs{{h_storage.data(), h_storage.data() + h_storage.size()}};
|
||||
if (is_and) {
|
||||
for (size_t i = 0; i < kBits; ++i) {
|
||||
ASSERT_FALSE(outputs.Check(i));
|
||||
}
|
||||
} else {
|
||||
for (size_t i = 0; i < kBits; ++i) {
|
||||
ASSERT_TRUE(outputs.Check(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace xgboost
|
||||
|
||||
void TestGPUAnd() {
|
||||
TestGPULogic<true>([] XGBOOST_DEVICE(LBitField64 & lhs, LBitField64 const& rhs) { lhs &= rhs; });
|
||||
}
|
||||
|
||||
void TestGPUOr() {
|
||||
TestGPULogic<false>([] XGBOOST_DEVICE(LBitField64 & lhs, LBitField64 const& rhs) { lhs |= rhs; });
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST(BitField, GPUAnd) { TestGPUAnd(); }
|
||||
|
||||
TEST(BitField, GPUOr) { TestGPUOr(); }
|
||||
} // namespace xgboost
|
||||
|
||||
@@ -83,7 +83,9 @@ template <typename BinIdxType>
|
||||
void CheckColumWithMissingValue(const DenseColumnIter<BinIdxType, true>& col,
|
||||
const GHistIndexMatrix& gmat) {
|
||||
for (auto i = 0ull; i < col.Size(); i++) {
|
||||
if (col.IsMissing(i)) continue;
|
||||
if (col.IsMissing(i)) {
|
||||
continue;
|
||||
}
|
||||
EXPECT_EQ(gmat.index[gmat.row_ptr[i]], col.GetGlobalBinIdx(i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright (c) by XGBoost Contributors 2019
|
||||
/**
|
||||
* Copyright 2019-2023, XGBoost Contributors
|
||||
*/
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
@@ -9,8 +9,7 @@
|
||||
#include "../helpers.h"
|
||||
#include "../filesystem.h" // dmlc::TemporaryDirectory
|
||||
|
||||
namespace xgboost {
|
||||
namespace common {
|
||||
namespace xgboost::common {
|
||||
TEST(MemoryFixSizeBuffer, Seek) {
|
||||
size_t constexpr kSize { 64 };
|
||||
std::vector<int32_t> memory( kSize );
|
||||
@@ -89,5 +88,54 @@ TEST(IO, LoadSequentialFile) {
|
||||
|
||||
ASSERT_THROW(LoadSequentialFile("non-exist", true), dmlc::Error);
|
||||
}
|
||||
} // namespace common
|
||||
} // namespace xgboost
|
||||
|
||||
TEST(IO, PrivateMmapStream) {
|
||||
dmlc::TemporaryDirectory tempdir;
|
||||
auto path = tempdir.path + "/testfile";
|
||||
|
||||
// The page size on Linux is usually set to 4096, while the allocation granularity on
|
||||
// the Windows machine where this test is writted is 65536. We span the test to cover
|
||||
// all of them.
|
||||
std::size_t n_batches{64};
|
||||
std::size_t multiplier{2048};
|
||||
|
||||
std::vector<std::vector<std::int32_t>> batches;
|
||||
std::vector<std::size_t> offset{0ul};
|
||||
|
||||
using T = std::int32_t;
|
||||
|
||||
{
|
||||
std::unique_ptr<dmlc::Stream> fo{dmlc::Stream::Create(path.c_str(), "w")};
|
||||
for (std::size_t i = 0; i < n_batches; ++i) {
|
||||
std::size_t size = (i + 1) * multiplier;
|
||||
std::vector<T> data(size, 0);
|
||||
std::iota(data.begin(), data.end(), i * i);
|
||||
|
||||
fo->Write(static_cast<std::uint64_t>(data.size()));
|
||||
fo->Write(data.data(), data.size() * sizeof(T));
|
||||
|
||||
std::size_t bytes = sizeof(std::uint64_t) + data.size() * sizeof(T);
|
||||
offset.push_back(bytes);
|
||||
|
||||
batches.emplace_back(std::move(data));
|
||||
}
|
||||
}
|
||||
|
||||
// Turn size info offset
|
||||
std::partial_sum(offset.begin(), offset.end(), offset.begin());
|
||||
|
||||
for (std::size_t i = 0; i < n_batches; ++i) {
|
||||
std::size_t off = offset[i];
|
||||
std::size_t n = offset.at(i + 1) - offset[i];
|
||||
std::unique_ptr<dmlc::Stream> fi{std::make_unique<PrivateMmapConstStream>(path, off, n)};
|
||||
std::vector<T> data;
|
||||
|
||||
std::uint64_t size{0};
|
||||
fi->Read(&size);
|
||||
data.resize(size);
|
||||
|
||||
fi->Read(data.data(), size * sizeof(T));
|
||||
ASSERT_EQ(data, batches[i]);
|
||||
}
|
||||
}
|
||||
} // namespace xgboost::common
|
||||
|
||||
@@ -2,6 +2,10 @@
|
||||
#include "../../src/data/ellpack_page.cuh"
|
||||
#endif
|
||||
|
||||
#include <xgboost/data.h> // for SparsePage
|
||||
|
||||
#include "./helpers.h" // for RandomDataGenerator
|
||||
|
||||
namespace xgboost {
|
||||
#if defined(__CUDACC__)
|
||||
namespace {
|
||||
|
||||
@@ -285,8 +285,6 @@ TEST(GpuHist, PartitionTwoNodes) {
|
||||
dh::ToSpan(feature_histogram_b)};
|
||||
thrust::device_vector<GPUExpandEntry> results(2);
|
||||
evaluator.EvaluateSplits({0, 1}, 1, dh::ToSpan(inputs), shared_inputs, dh::ToSpan(results));
|
||||
GPUExpandEntry result_a = results[0];
|
||||
GPUExpandEntry result_b = results[1];
|
||||
EXPECT_EQ(std::bitset<32>(evaluator.GetHostNodeCats(0)[0]),
|
||||
std::bitset<32>("10000000000000000000000000000000"));
|
||||
EXPECT_EQ(std::bitset<32>(evaluator.GetHostNodeCats(1)[0]),
|
||||
|
||||
@@ -39,7 +39,8 @@ void VerifySampling(size_t page_size,
|
||||
EXPECT_NE(page->n_rows, kRows);
|
||||
}
|
||||
|
||||
GradientBasedSampler sampler(&ctx, page, kRows, param, subsample, sampling_method);
|
||||
GradientBasedSampler sampler(&ctx, kRows, param, subsample, sampling_method,
|
||||
!fixed_size_sampling);
|
||||
auto sample = sampler.Sample(&ctx, gpair.DeviceSpan(), dmat.get());
|
||||
|
||||
if (fixed_size_sampling) {
|
||||
@@ -93,7 +94,7 @@ TEST(GradientBasedSampler, NoSamplingExternalMemory) {
|
||||
auto page = (*dmat->GetBatches<EllpackPage>(&ctx, param).begin()).Impl();
|
||||
EXPECT_NE(page->n_rows, kRows);
|
||||
|
||||
GradientBasedSampler sampler(&ctx, page, kRows, param, kSubsample, TrainParam::kUniform);
|
||||
GradientBasedSampler sampler(&ctx, kRows, param, kSubsample, TrainParam::kUniform, true);
|
||||
auto sample = sampler.Sample(&ctx, gpair.DeviceSpan(), dmat.get());
|
||||
auto sampled_page = sample.page;
|
||||
EXPECT_EQ(sample.sample_rows, kRows);
|
||||
@@ -141,7 +142,8 @@ TEST(GradientBasedSampler, GradientBasedSampling) {
|
||||
constexpr size_t kPageSize = 0;
|
||||
constexpr float kSubsample = 0.8;
|
||||
constexpr int kSamplingMethod = TrainParam::kGradientBased;
|
||||
VerifySampling(kPageSize, kSubsample, kSamplingMethod);
|
||||
constexpr bool kFixedSizeSampling = true;
|
||||
VerifySampling(kPageSize, kSubsample, kSamplingMethod, kFixedSizeSampling);
|
||||
}
|
||||
|
||||
TEST(GradientBasedSampler, GradientBasedSamplingExternalMemory) {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2019 XGBoost contributors
|
||||
/**
|
||||
* Copyright 2019-2023, XGBoost contributors
|
||||
*/
|
||||
#include <gtest/gtest.h>
|
||||
#include <thrust/copy.h>
|
||||
@@ -53,7 +53,7 @@ void CompareBitField(LBitField64 d_field, std::set<uint32_t> positions) {
|
||||
LBitField64 h_field{ {h_field_storage.data(),
|
||||
h_field_storage.data() + h_field_storage.size()} };
|
||||
|
||||
for (size_t i = 0; i < h_field.Size(); ++i) {
|
||||
for (size_t i = 0; i < h_field.Capacity(); ++i) {
|
||||
if (positions.find(i) != positions.cend()) {
|
||||
ASSERT_TRUE(h_field.Check(i));
|
||||
} else {
|
||||
@@ -82,7 +82,7 @@ TEST(GPUFeatureInteractionConstraint, Init) {
|
||||
{h_node_storage.data(), h_node_storage.data() + h_node_storage.size()}
|
||||
};
|
||||
// no feature is attached to node.
|
||||
for (size_t i = 0; i < h_node.Size(); ++i) {
|
||||
for (size_t i = 0; i < h_node.Capacity(); ++i) {
|
||||
ASSERT_FALSE(h_node.Check(i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,8 +92,8 @@ void TestBuildHist(bool use_shared_memory_histograms) {
|
||||
auto page = BuildEllpackPage(kNRows, kNCols);
|
||||
BatchParam batch_param{};
|
||||
Context ctx{MakeCUDACtx(0)};
|
||||
GPUHistMakerDevice<GradientSumT> maker(&ctx, page.get(), {}, kNRows, param, kNCols, kNCols,
|
||||
batch_param);
|
||||
GPUHistMakerDevice<GradientSumT> maker(&ctx, /*is_external_memory=*/false, {}, kNRows, param,
|
||||
kNCols, kNCols, batch_param);
|
||||
xgboost::SimpleLCG gen;
|
||||
xgboost::SimpleRealUniformDistribution<bst_float> dist(0.0f, 1.0f);
|
||||
HostDeviceVector<GradientPair> gpair(kNRows);
|
||||
@@ -106,9 +106,15 @@ void TestBuildHist(bool use_shared_memory_histograms) {
|
||||
|
||||
thrust::host_vector<common::CompressedByteT> h_gidx_buffer (page->gidx_buffer.HostVector());
|
||||
maker.row_partitioner.reset(new RowPartitioner(0, kNRows));
|
||||
|
||||
maker.hist.Init(0, page->Cuts().TotalBins());
|
||||
maker.hist.AllocateHistograms({0});
|
||||
|
||||
maker.gpair = gpair.DeviceSpan();
|
||||
maker.quantiser.reset(new GradientQuantiser(maker.gpair));
|
||||
maker.page = page.get();
|
||||
|
||||
maker.InitFeatureGroupsOnce();
|
||||
|
||||
BuildGradientHistogram(ctx.CUDACtx(), page->GetDeviceAccessor(0),
|
||||
maker.feature_groups->DeviceAccessor(0), gpair.DeviceSpan(),
|
||||
@@ -126,8 +132,8 @@ void TestBuildHist(bool use_shared_memory_histograms) {
|
||||
std::vector<GradientPairPrecise> solution = GetHostHistGpair();
|
||||
for (size_t i = 0; i < h_result.size(); ++i) {
|
||||
auto result = maker.quantiser->ToFloatingPoint(h_result[i]);
|
||||
EXPECT_NEAR(result.GetGrad(), solution[i].GetGrad(), 0.01f);
|
||||
EXPECT_NEAR(result.GetHess(), solution[i].GetHess(), 0.01f);
|
||||
ASSERT_NEAR(result.GetGrad(), solution[i].GetGrad(), 0.01f);
|
||||
ASSERT_NEAR(result.GetHess(), solution[i].GetHess(), 0.01f);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -305,7 +305,7 @@ class IterForDMatrixTest(xgb.core.DataIter):
|
||||
self._labels = [rng.randn(self.rows)] * self.BATCHES
|
||||
|
||||
self.it = 0 # set iterator to 0
|
||||
super().__init__()
|
||||
super().__init__(cache_prefix=None)
|
||||
|
||||
def as_array(self):
|
||||
import cudf
|
||||
|
||||
@@ -64,7 +64,8 @@ def run_data_iterator(
|
||||
subsample_rate = 0.8 if subsample else 1.0
|
||||
|
||||
it = IteratorForTest(
|
||||
*make_batches(n_samples_per_batch, n_features, n_batches, use_cupy)
|
||||
*make_batches(n_samples_per_batch, n_features, n_batches, use_cupy),
|
||||
cache="cache"
|
||||
)
|
||||
if n_batches == 0:
|
||||
with pytest.raises(ValueError, match="1 batch"):
|
||||
|
||||
@@ -253,9 +253,12 @@ class TestQuantileDMatrix:
|
||||
self.run_ref_dmatrix(rng, "hist", True)
|
||||
self.run_ref_dmatrix(rng, "hist", False)
|
||||
|
||||
def test_predict(self) -> None:
|
||||
n_samples, n_features = 16, 2
|
||||
X, y = make_categorical(n_samples, n_features, n_categories=13, onehot=False)
|
||||
@pytest.mark.parametrize("sparsity", [0.0, 0.5])
|
||||
def test_predict(self, sparsity: float) -> None:
|
||||
n_samples, n_features = 256, 4
|
||||
X, y = make_categorical(
|
||||
n_samples, n_features, n_categories=13, onehot=False, sparsity=sparsity
|
||||
)
|
||||
Xy = xgb.DMatrix(X, y, enable_categorical=True)
|
||||
|
||||
booster = xgb.train({"tree_method": "hist"}, Xy)
|
||||
|
||||
Reference in New Issue
Block a user