Quantile DMatrix for CPU. (#8130)

- Add a new `QuantileDMatrix` that works for both CPU and GPU.
- Deprecate `DeviceQuantileDMatrix`.
This commit is contained in:
Jiaming Yuan
2022-08-02 15:51:23 +08:00
committed by GitHub
parent 2cba1d9fcc
commit d87f69215e
14 changed files with 521 additions and 117 deletions

View File

@@ -6,6 +6,7 @@ Contributors: https://github.com/dmlc/xgboost/blob/master/CONTRIBUTORS.md
from .core import (
DMatrix,
DeviceQuantileDMatrix,
QuantileDMatrix,
Booster,
DataIter,
build_info,
@@ -33,6 +34,7 @@ __all__ = [
# core
"DMatrix",
"DeviceQuantileDMatrix",
"QuantileDMatrix",
"Booster",
"DataIter",
"train",

View File

@@ -1146,7 +1146,7 @@ class DMatrix: # pylint: disable=too-many-instance-attributes
Parameters
----------
feature_types : list or None
feature_types :
Labels for features. None will reset existing feature names
"""
@@ -1189,7 +1189,7 @@ class DMatrix: # pylint: disable=too-many-instance-attributes
class _ProxyDMatrix(DMatrix):
"""A placeholder class when DMatrix cannot be constructed (DeviceQuantileDMatrix,
"""A placeholder class when DMatrix cannot be constructed (QuantileDMatrix,
inplace_predict).
"""
@@ -1234,17 +1234,35 @@ class _ProxyDMatrix(DMatrix):
)
class DeviceQuantileDMatrix(DMatrix):
"""Device memory Data Matrix used in XGBoost for training with tree_method='gpu_hist'. Do
not use this for test/validation tasks as some information may be lost in
quantisation. This DMatrix is primarily designed to save memory in training from
device memory inputs by avoiding intermediate storage. Set max_bin to control the
number of bins during quantisation. See doc string in :py:obj:`xgboost.DMatrix` for
documents on meta info.
class QuantileDMatrix(DMatrix):
"""A DMatrix variant that generates quantilized data directly from input for
``hist`` and ``gpu_hist`` tree methods. This DMatrix is primarily designed to save
memory in training by avoiding intermediate storage. Set ``max_bin`` to control the
number of bins during quantisation, which should be consistent with the training
parameter ``max_bin``. When ``QuantileDMatrix`` is used for validation/test dataset,
``ref`` should be another ``QuantileDMatrix``(or ``DMatrix``, but not recommended as
it defeats the purpose of saving memory) constructed from training dataset. See
:py:obj:`xgboost.DMatrix` for documents on meta info.
You can construct DeviceQuantileDMatrix from cupy/cudf/dlpack.
.. note::
.. versionadded:: 1.1.0
Do not use ``QuantileDMatrix`` as validation/test dataset without supplying a
reference (the training dataset) ``QuantileDMatrix`` using ``ref`` as some
information may be lost in quantisation.
.. versionadded:: 2.0.0
Parameters
----------
max_bin :
The number of histogram bin, should be consistent with the training parameter
``max_bin``.
ref :
The training dataset that provides quantile information, needed when creating
validation/test dataset with ``QuantileDMatrix``. Supplying the training DMatrix
as a reference means that the same quantisation applied to the training data is
applied to the validation/test data
"""
@@ -1261,7 +1279,8 @@ class DeviceQuantileDMatrix(DMatrix):
feature_names: Optional[FeatureNames] = None,
feature_types: Optional[FeatureTypes] = None,
nthread: Optional[int] = None,
max_bin: int = 256,
max_bin: Optional[int] = None,
ref: Optional[DMatrix] = None,
group: Optional[ArrayLike] = None,
qid: Optional[ArrayLike] = None,
label_lower_bound: Optional[ArrayLike] = None,
@@ -1269,9 +1288,9 @@ class DeviceQuantileDMatrix(DMatrix):
feature_weights: Optional[ArrayLike] = None,
enable_categorical: bool = False,
) -> None:
self.max_bin = max_bin
self.max_bin: int = max_bin if max_bin is not None else 256
self.missing = missing if missing is not None else np.nan
self.nthread = nthread if nthread is not None else 1
self.nthread = nthread if nthread is not None else -1
self._silent = silent # unused, kept for compatibility
if isinstance(data, ctypes.c_void_p):
@@ -1280,12 +1299,13 @@ class DeviceQuantileDMatrix(DMatrix):
if qid is not None and group is not None:
raise ValueError(
'Only one of the eval_qid or eval_group for each evaluation '
'dataset should be provided.'
"Only one of the eval_qid or eval_group for each evaluation "
"dataset should be provided."
)
self._init(
data,
ref=ref,
label=label,
weight=weight,
base_margin=base_margin,
@@ -1299,7 +1319,13 @@ class DeviceQuantileDMatrix(DMatrix):
enable_categorical=enable_categorical,
)
def _init(self, data: DataType, enable_categorical: bool, **meta: Any) -> None:
def _init(
self,
data: DataType,
ref: Optional[DMatrix],
enable_categorical: bool,
**meta: Any,
) -> None:
from .data import (
_is_dlpack,
_transform_dlpack,
@@ -1317,20 +1343,26 @@ class DeviceQuantileDMatrix(DMatrix):
it = SingleBatchInternalIter(data=data, **meta)
handle = ctypes.c_void_p()
reset_callback, next_callback = it.get_callbacks(False, enable_categorical)
reset_callback, next_callback = it.get_callbacks(True, enable_categorical)
if it.cache_prefix is not None:
raise ValueError(
"DeviceQuantileDMatrix doesn't cache data, remove the cache_prefix "
"QuantileDMatrix doesn't cache data, remove the cache_prefix "
"in iterator to fix this error."
)
ret = _LIB.XGDeviceQuantileDMatrixCreateFromCallback(
args = {
"nthread": self.nthread,
"missing": self.missing,
"max_bin": self.max_bin,
}
config = from_pystr_to_cstr(json.dumps(args))
ret = _LIB.XGQuantileDMatrixCreateFromCallback(
None,
it.proxy.handle,
ref.handle if ref is not None else ref,
reset_callback,
next_callback,
ctypes.c_float(self.missing),
ctypes.c_int(self.nthread),
ctypes.c_int(self.max_bin),
config,
ctypes.byref(handle),
)
it.reraise()
@@ -1339,6 +1371,20 @@ class DeviceQuantileDMatrix(DMatrix):
self.handle = handle
class DeviceQuantileDMatrix(QuantileDMatrix):
""" Use `QuantileDMatrix` instead.
.. deprecated:: 2.0.0
.. versionadded:: 1.1.0
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn("Please use `QuantileDMatrix` instead.", FutureWarning)
super().__init__(*args, **kwargs)
Objective = Callable[[np.ndarray, DMatrix], Tuple[np.ndarray, np.ndarray]]
Metric = Callable[[np.ndarray, DMatrix], Tuple[str, float]]

View File

@@ -35,6 +35,7 @@ import collections
import logging
import platform
import socket
import warnings
from collections import defaultdict
from contextlib import contextmanager
from functools import partial, update_wrapper
@@ -64,10 +65,10 @@ from .compat import DataFrame, LazyLoader, concat, lazy_isinstance
from .core import (
Booster,
DataIter,
DeviceQuantileDMatrix,
DMatrix,
Metric,
Objective,
QuantileDMatrix,
_deprecate_positional_args,
_expect,
_has_categorical,
@@ -495,7 +496,7 @@ async def map_worker_partitions(
client: Optional["distributed.Client"],
func: Callable[..., _MapRetT],
*refs: Any,
workers: List[str],
workers: Sequence[str],
) -> List[_MapRetT]:
"""Map a function onto partitions of each worker."""
# Note for function purity:
@@ -628,22 +629,7 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902
return 1
class DaskDeviceQuantileDMatrix(DaskDMatrix):
"""Specialized data type for `gpu_hist` tree method. This class is used to reduce
the memory usage by eliminating data copies. Internally the all partitions/chunks
of data are merged by weighted GK sketching. So the number of partitions from dask
may affect training accuracy as GK generates bounded error for each merge. See doc
string for :py:obj:`xgboost.DeviceQuantileDMatrix` and :py:obj:`xgboost.DMatrix` for
other parameters.
.. versionadded:: 1.2.0
Parameters
----------
max_bin : Number of bins for histogram construction.
"""
class DaskQuantileDMatrix(DaskDMatrix):
@_deprecate_positional_args
def __init__(
self,
@@ -657,7 +643,8 @@ class DaskDeviceQuantileDMatrix(DaskDMatrix):
silent: bool = False, # disable=unused-argument
feature_names: Optional[FeatureNames] = None,
feature_types: Optional[Union[Any, List[Any]]] = None,
max_bin: int = 256,
max_bin: Optional[int] = None,
ref: Optional[DMatrix] = None,
group: Optional[_DaskCollection] = None,
qid: Optional[_DaskCollection] = None,
label_lower_bound: Optional[_DaskCollection] = None,
@@ -684,14 +671,31 @@ class DaskDeviceQuantileDMatrix(DaskDMatrix):
)
self.max_bin = max_bin
self.is_quantile = True
self._ref: Optional[int] = id(ref) if ref is not None else None
def _create_fn_args(self, worker_addr: str) -> Dict[str, Any]:
args = super()._create_fn_args(worker_addr)
args["max_bin"] = self.max_bin
if self._ref is not None:
args["ref"] = self._ref
return args
def _create_device_quantile_dmatrix(
class DaskDeviceQuantileDMatrix(DaskQuantileDMatrix):
"""Use `DaskQuantileDMatrix` instead.
.. deprecated:: 2.0.0
.. versionadded:: 1.2.0
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn("Please use `DaskQuantileDMatrix` instead.", FutureWarning)
super().__init__(*args, **kwargs)
def _create_quantile_dmatrix(
feature_names: Optional[FeatureNames],
feature_types: Optional[Union[Any, List[Any]]],
feature_weights: Optional[Any],
@@ -700,18 +704,20 @@ def _create_device_quantile_dmatrix(
parts: Optional[_DataParts],
max_bin: int,
enable_categorical: bool,
) -> DeviceQuantileDMatrix:
ref: Optional[DMatrix] = None,
) -> QuantileDMatrix:
worker = distributed.get_worker()
if parts is None:
msg = f"worker {worker.address} has an empty DMatrix."
LOGGER.warning(msg)
import cupy
d = DeviceQuantileDMatrix(
d = QuantileDMatrix(
cupy.zeros((0, 0)),
feature_names=feature_names,
feature_types=feature_types,
max_bin=max_bin,
ref=ref,
enable_categorical=enable_categorical,
)
return d
@@ -719,13 +725,14 @@ def _create_device_quantile_dmatrix(
unzipped_dict = _get_worker_parts(parts)
it = DaskPartitionIter(**unzipped_dict)
dmatrix = DeviceQuantileDMatrix(
dmatrix = QuantileDMatrix(
it,
missing=missing,
feature_names=feature_names,
feature_types=feature_types,
nthread=nthread,
max_bin=max_bin,
ref=ref,
enable_categorical=enable_categorical,
)
dmatrix.set_info(feature_weights=feature_weights)
@@ -786,11 +793,9 @@ def _create_dmatrix(
return dmatrix
def _dmatrix_from_list_of_parts(
is_quantile: bool, **kwargs: Any
) -> Union[DMatrix, DeviceQuantileDMatrix]:
def _dmatrix_from_list_of_parts(is_quantile: bool, **kwargs: Any) -> DMatrix:
if is_quantile:
return _create_device_quantile_dmatrix(**kwargs)
return _create_quantile_dmatrix(**kwargs)
return _create_dmatrix(**kwargs)
@@ -921,7 +926,18 @@ async def _train_async(
if evals_id[i] == train_id:
evals.append((Xy, evals_name[i]))
continue
eval_Xy = _dmatrix_from_list_of_parts(**ref, nthread=n_threads)
if ref.get("ref", None) is not None:
if ref["ref"] != train_id:
raise ValueError(
"The training DMatrix should be used as a reference"
" to evaluation `QuantileDMatrix`."
)
del ref["ref"]
eval_Xy = _dmatrix_from_list_of_parts(
**ref, nthread=n_threads, ref=Xy
)
else:
eval_Xy = _dmatrix_from_list_of_parts(**ref, nthread=n_threads)
evals.append((eval_Xy, evals_name[i]))
booster = worker_train(
@@ -960,12 +976,14 @@ async def _train_async(
results = await map_worker_partitions(
client,
dispatched_train,
# extra function parameters
params,
_rabit_args,
id(dtrain),
evals_name,
evals_id,
*([dtrain] + evals_data),
# workers to be used for training
workers=workers,
)
return list(filter(lambda ret: ret is not None, results))[0]

View File

@@ -1167,6 +1167,7 @@ def _proxy_transform(
if _is_dlpack(data):
return _transform_dlpack(data), None, feature_names, feature_types
if _is_numpy_array(data):
data, _ = _ensure_np_dtype(data, data.dtype)
return data, None, feature_names, feature_types
if _is_scipy_csr(data):
return data, None, feature_names, feature_types