rocm enable for v2.0.1

This commit is contained in:
Hui Liu
2023-10-27 18:50:28 -07:00
447 changed files with 13518 additions and 8719 deletions

View File

@@ -132,16 +132,28 @@ def locate_or_build_libxgboost(
if build_config.use_system_libxgboost:
# Find libxgboost from system prefix
sys_prefix = pathlib.Path(sys.prefix).absolute().resolve()
libxgboost_sys = sys_prefix / "lib" / _lib_name()
if not libxgboost_sys.exists():
raise RuntimeError(
f"use_system_libxgboost was specified but {_lib_name()} is "
f"not found in {libxgboost_sys.parent}"
)
logger.info("Using system XGBoost: %s", str(libxgboost_sys))
return libxgboost_sys
sys_prefix = pathlib.Path(sys.base_prefix)
sys_prefix_candidates = [
sys_prefix / "lib",
# Paths possibly used on Windows
sys_prefix / "bin",
sys_prefix / "Library",
sys_prefix / "Library" / "bin",
sys_prefix / "Library" / "lib",
]
sys_prefix_candidates = [
p.expanduser().resolve() for p in sys_prefix_candidates
]
for candidate_dir in sys_prefix_candidates:
libtreelite_sys = candidate_dir / _lib_name()
if libtreelite_sys.exists():
logger.info("Using system XGBoost: %s", str(libtreelite_sys))
return libtreelite_sys
raise RuntimeError(
f"use_system_libxgboost was specified but {_lib_name()} is "
f"not found. Paths searched (in order): \n"
+ "\n".join([f"* {str(p)}" for p in sys_prefix_candidates])
)
libxgboost = locate_local_libxgboost(toplevel_dir, logger=logger)
if libxgboost is not None:

View File

@@ -7,7 +7,7 @@ build-backend = "packager.pep517"
[project]
name = "xgboost"
version = "2.0.0-dev"
version = "2.0.1"
authors = [
{ name = "Hyunsu Cho", email = "chohyu01@cs.washington.edu" },
{ name = "Jiaming Yuan", email = "jm.yuan@outlook.com" }

View File

@@ -1 +1 @@
2.0.0-dev
2.0.1

View File

@@ -8,7 +8,9 @@ from typing import (
Callable,
Dict,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
@@ -20,8 +22,6 @@ import numpy as np
DataType = Any
# xgboost accepts some other possible types in practice due to historical reason, which is
# lesser tested. For now we encourage users to pass a simple list of string.
FeatureInfo = Sequence[str]
FeatureNames = FeatureInfo
FeatureTypes = FeatureInfo
@@ -97,6 +97,13 @@ else:
ctypes._Pointer,
]
# The second arg is actually Optional[List[cudf.Series]], skipped for easier type check.
# The cudf Series is the obtained cat codes, preserved in the `DataIter` to prevent it
# being freed.
TransformedData = Tuple[
Any, Optional[List], Optional[FeatureNames], Optional[FeatureTypes]
]
# template parameter
_T = TypeVar("_T")
_F = TypeVar("_F", bound=Callable[..., Any])

View File

@@ -134,13 +134,17 @@ class CallbackContainer:
is_cv: bool = False,
) -> None:
self.callbacks = set(callbacks)
if metric is not None:
msg = (
"metric must be callable object for monitoring. For "
+ "builtin metrics, passing them in training parameter"
+ " will invoke monitor automatically."
)
assert callable(metric), msg
for cb in callbacks:
if not isinstance(cb, TrainingCallback):
raise TypeError("callback must be an instance of `TrainingCallback`.")
msg = (
"metric must be callable object for monitoring. For builtin metrics"
", passing them in training parameter invokes monitor automatically."
)
if metric is not None and not callable(metric):
raise TypeError(msg)
self.metric = metric
self.history: TrainingCallback.EvalsLog = collections.OrderedDict()
self._output_margin = output_margin
@@ -170,16 +174,6 @@ class CallbackContainer:
else:
assert isinstance(model, Booster), msg
if not self.is_cv:
if model.attr("best_score") is not None:
model.best_score = float(cast(str, model.attr("best_score")))
model.best_iteration = int(cast(str, model.attr("best_iteration")))
else:
# Due to compatibility with version older than 1.4, these attributes are
# added to Python object even if early stopping is not used.
model.best_iteration = model.num_boosted_rounds() - 1
model.set_attr(best_iteration=str(model.best_iteration))
return model
def before_iteration(
@@ -267,9 +261,14 @@ class LearningRateScheduler(TrainingCallback):
def __init__(
self, learning_rates: Union[Callable[[int], float], Sequence[float]]
) -> None:
assert callable(learning_rates) or isinstance(
if not callable(learning_rates) and not isinstance(
learning_rates, collections.abc.Sequence
)
):
raise TypeError(
"Invalid learning rates, expecting callable or sequence, got: "
f"{type(learning_rates)}"
)
if callable(learning_rates):
self.learning_rates = learning_rates
else:
@@ -302,24 +301,28 @@ class EarlyStopping(TrainingCallback):
save_best :
Whether training should return the best model or the last model.
min_delta :
Minimum absolute change in score to be qualified as an improvement.
.. versionadded:: 1.5.0
.. code-block:: python
Minimum absolute change in score to be qualified as an improvement.
es = xgboost.callback.EarlyStopping(
rounds=2,
min_delta=1e-3,
save_best=True,
maximize=False,
data_name="validation_0",
metric_name="mlogloss",
)
clf = xgboost.XGBClassifier(tree_method="gpu_hist", callbacks=[es])
Examples
--------
X, y = load_digits(return_X_y=True)
clf.fit(X, y, eval_set=[(X, y)])
.. code-block:: python
es = xgboost.callback.EarlyStopping(
rounds=2,
min_delta=1e-3,
save_best=True,
maximize=False,
data_name="validation_0",
metric_name="mlogloss",
)
clf = xgboost.XGBClassifier(tree_method="hist", device="cuda", callbacks=[es])
X, y = load_digits(return_X_y=True)
clf.fit(X, y, eval_set=[(X, y)])
"""
# pylint: disable=too-many-arguments
@@ -363,7 +366,7 @@ class EarlyStopping(TrainingCallback):
return numpy.greater(get_s(new) - self._min_delta, get_s(best))
def minimize(new: _Score, best: _Score) -> bool:
"""New score should be smaller than the old one."""
"""New score should be lesser than the old one."""
return numpy.greater(get_s(best) - self._min_delta, get_s(new))
if self.maximize is None:
@@ -419,38 +422,53 @@ class EarlyStopping(TrainingCallback):
) -> bool:
epoch += self.starting_round # training continuation
msg = "Must have at least 1 validation dataset for early stopping."
assert len(evals_log.keys()) >= 1, msg
data_name = ""
if len(evals_log.keys()) < 1:
raise ValueError(msg)
# Get data name
if self.data:
for d, _ in evals_log.items():
if d == self.data:
data_name = d
if not data_name:
raise ValueError("No dataset named:", self.data)
data_name = self.data
else:
# Use the last one as default.
data_name = list(evals_log.keys())[-1]
assert isinstance(data_name, str) and data_name
if data_name not in evals_log:
raise ValueError(f"No dataset named: {data_name}")
if not isinstance(data_name, str):
raise TypeError(
f"The name of the dataset should be a string. Got: {type(data_name)}"
)
data_log = evals_log[data_name]
# Filter out scores that can not be used for early stopping.
# Get metric name
if self.metric_name:
metric_name = self.metric_name
else:
# Use last metric by default.
assert isinstance(data_log, collections.OrderedDict)
metric_name = list(data_log.keys())[-1]
if metric_name not in data_log:
raise ValueError(f"No metric named: {metric_name}")
# The latest score
score = data_log[metric_name][-1]
return self._update_rounds(score, data_name, metric_name, model, epoch)
def after_training(self, model: _Model) -> _Model:
if not self.save_best:
return model
try:
if self.save_best:
model = model[: int(model.attr("best_iteration")) + 1]
best_iteration = model.best_iteration
best_score = model.best_score
assert best_iteration is not None and best_score is not None
model = model[: best_iteration + 1]
model.best_iteration = best_iteration
model.best_score = best_score
except XGBoostError as e:
raise XGBoostError(
"`save_best` is not applicable to current booster"
"`save_best` is not applicable to the current booster"
) from e
return model
@@ -462,8 +480,6 @@ class EvaluationMonitor(TrainingCallback):
Parameters
----------
metric :
Extra user defined metric.
rank :
Which worker should be used for printing the result.
period :

View File

@@ -88,6 +88,18 @@ def is_cudf_available() -> bool:
return False
def is_cupy_available() -> bool:
"""Check cupy package available or not"""
if importlib.util.find_spec("cupy") is None:
return False
try:
import cupy
return True
except ImportError:
return False
try:
import scipy.sparse as scipy_sparse
from scipy.sparse import csr_matrix as scipy_csr

View File

@@ -3,11 +3,13 @@
"""Core XGBoost Library."""
import copy
import ctypes
import importlib.util
import json
import os
import re
import sys
import warnings
import weakref
from abc import ABC, abstractmethod
from collections.abc import Mapping
from enum import IntEnum, unique
@@ -50,6 +52,7 @@ from ._typing import (
FeatureTypes,
ModelIn,
NumpyOrCupy,
TransformedData,
c_bst_ulong,
)
from .compat import PANDAS_INSTALLED, DataFrame, py_str
@@ -152,7 +155,11 @@ def _expect(expectations: Sequence[Type], got: Type) -> str:
def _log_callback(msg: bytes) -> None:
"""Redirect logs from native library into Python console"""
print(py_str(msg))
smsg = py_str(msg)
if smsg.find("WARNING:") != -1:
warnings.warn(smsg, UserWarning)
return
print(smsg)
def _get_log_callback_func() -> Callable:
@@ -228,8 +235,11 @@ Error message(s): {os_error_list}
def parse(ver: str) -> Tuple[int, int, int]:
"""Avoid dependency on packaging (PEP 440)."""
# 2.0.0-dev or 2.0.0
# 2.0.0-dev, 2.0.0, or 2.0.0rc1
major, minor, patch = ver.split("-")[0].split(".")
rc = patch.find("rc")
if rc != -1:
patch = patch[:rc]
return int(major), int(minor), int(patch)
libver = _lib_version(lib)
@@ -271,6 +281,44 @@ def _check_call(ret: int) -> None:
raise XGBoostError(py_str(_LIB.XGBGetLastError()))
def _check_distributed_params(kwargs: Dict[str, Any]) -> None:
"""Validate parameters in distributed environments."""
device = kwargs.get("device", None)
if device and not isinstance(device, str):
msg = "Invalid type for the `device` parameter"
msg += _expect((str,), type(device))
raise TypeError(msg)
if device and device.find(":") != -1:
raise ValueError(
"Distributed training doesn't support selecting device ordinal as GPUs are"
" managed by the distributed framework. use `device=cuda` or `device=gpu`"
" instead."
)
if kwargs.get("booster", None) == "gblinear":
raise NotImplementedError(
f"booster `{kwargs['booster']}` is not supported for distributed training."
)
def _validate_feature_info(
feature_info: Sequence[str], n_features: int, name: str
) -> List[str]:
if isinstance(feature_info, str) or not isinstance(feature_info, Sequence):
raise TypeError(
f"Expecting a sequence of strings for {name}, got: {type(feature_info)}"
)
feature_info = list(feature_info)
if len(feature_info) != n_features and n_features != 0:
msg = (
f"{name} must have the same length as the number of data columns, ",
f"expected {n_features}, got {len(feature_info)}",
)
raise ValueError(msg)
return feature_info
def build_info() -> dict:
"""Build information of XGBoost. The returned value format is not stable. Also,
please note that build time dependency is not the same as runtime dependency. For
@@ -381,6 +429,54 @@ def c_array(
return (ctype * len(values))(*values)
def from_array_interface(interface: dict) -> NumpyOrCupy:
"""Convert array interface to numpy or cupy array"""
class Array: # pylint: disable=too-few-public-methods
"""Wrapper type for communicating with numpy and cupy."""
_interface: Optional[dict] = None
@property
def __array_interface__(self) -> Optional[dict]:
return self._interface
@__array_interface__.setter
def __array_interface__(self, interface: dict) -> None:
self._interface = copy.copy(interface)
# converts some fields to tuple as required by numpy
self._interface["shape"] = tuple(self._interface["shape"])
self._interface["data"] = tuple(self._interface["data"])
if self._interface.get("strides", None) is not None:
self._interface["strides"] = tuple(self._interface["strides"])
@property
def __cuda_array_interface__(self) -> Optional[dict]:
return self.__array_interface__
@__cuda_array_interface__.setter
def __cuda_array_interface__(self, interface: dict) -> None:
self.__array_interface__ = interface
arr = Array()
if "stream" in interface:
# CUDA stream is presented, this is a __cuda_array_interface__.
spec = importlib.util.find_spec("cupy")
if spec is None:
raise ImportError("`cupy` is required for handling CUDA buffer.")
import cupy as cp # pylint: disable=import-error
arr.__cuda_array_interface__ = interface
out = cp.array(arr, copy=True)
else:
arr.__array_interface__ = interface
out = np.array(arr, copy=True)
return out
def _prediction_output(
shape: CNumericPtr, dims: c_bst_ulong, predts: CFloatPtr, is_cuda: bool
) -> NumpyOrCupy:
@@ -395,7 +491,16 @@ def _prediction_output(
class DataIter(ABC): # pylint: disable=too-many-instance-attributes
"""The interface for user defined data iterator.
"""The interface for user defined data iterator. The iterator facilitates
distributed training, :py:class:`QuantileDMatrix`, and external memory support using
:py:class:`DMatrix`. Most of time, users don't need to interact with this class
directly.
.. note::
The class caches some intermediate results using the `data` input (predictor
`X`) as key. Don't repeat the `X` for multiple batches with different meta data
(like `label`), make a copy if necessary.
Parameters
----------
@@ -419,13 +524,13 @@ class DataIter(ABC): # pylint: disable=too-many-instance-attributes
self._allow_host = True
self._release = release_data
# Stage data in Python until reset or next is called to avoid data being free.
self._temporary_data: Optional[Tuple[Any, Any, Any, Any]] = None
self._input_id: int = 0
self._temporary_data: Optional[TransformedData] = None
self._data_ref: Optional[weakref.ReferenceType] = None
def get_callbacks(
self, allow_host: bool, enable_categorical: bool
) -> Tuple[Callable, Callable]:
"""Get callback functions for iterating in C."""
"""Get callback functions for iterating in C. This is an internal function."""
assert hasattr(self, "cache_prefix"), "__init__ is not called."
self._reset_callback = ctypes.CFUNCTYPE(None, ctypes.c_void_p)(
self._reset_wrapper
@@ -491,8 +596,8 @@ class DataIter(ABC): # pylint: disable=too-many-instance-attributes
@require_keyword_args(True)
def input_data(
data: Any,
*,
data: Any,
feature_names: Optional[FeatureNames] = None,
feature_types: Optional[FeatureTypes] = None,
**kwargs: Any,
@@ -500,7 +605,19 @@ class DataIter(ABC): # pylint: disable=too-many-instance-attributes
from .data import _proxy_transform, dispatch_proxy_set_data
# Reduce the amount of transformation that's needed for QuantileDMatrix.
if self._temporary_data is not None and id(data) == self._input_id:
#
# To construct the QDM, one needs 4 iterations on CPU, or 2 iterations on
# GPU. If the QDM has only one batch of input (most of the cases), we can
# avoid transforming the data repeatly.
try:
ref = weakref.ref(data)
except TypeError:
ref = None
if (
self._temporary_data is not None
and ref is not None
and ref is self._data_ref
):
new, cat_codes, feature_names, feature_types = self._temporary_data
else:
new, cat_codes, feature_names, feature_types = _proxy_transform(
@@ -517,7 +634,7 @@ class DataIter(ABC): # pylint: disable=too-many-instance-attributes
feature_types=feature_types,
**kwargs,
)
self._input_id = id(data)
self._data_ref = ref
# pylint: disable=not-callable
return self._handle_exception(lambda: self.next(input_data), 0)
@@ -593,6 +710,9 @@ def require_keyword_args(
@wraps(func)
def inner_f(*args: Any, **kwargs: Any) -> _T:
extra_args = len(args) - len(all_args)
if not all_args and extra_args > 0: # keyword argument only
raise TypeError("Keyword argument is required.")
if extra_args > 0:
# ignore first 'self' argument for instance methods
args_msg = [
@@ -1040,7 +1160,7 @@ class DMatrix: # pylint: disable=too-many-instance-attributes,too-many-public-m
testing purposes. If this is a quantized DMatrix then quantized values are
returned instead of input values.
.. versionadded:: 1.7.0
.. versionadded:: 1.7.0
"""
indptr = np.empty(self.num_row() + 1, dtype=np.uint64)
@@ -1060,6 +1180,36 @@ class DMatrix: # pylint: disable=too-many-instance-attributes,too-many-public-m
)
return ret
def get_quantile_cut(self) -> Tuple[np.ndarray, np.ndarray]:
"""Get quantile cuts for quantization.
.. versionadded:: 2.0.0
"""
n_features = self.num_col()
c_sindptr = ctypes.c_char_p()
c_sdata = ctypes.c_char_p()
config = make_jcargs()
_check_call(
_LIB.XGDMatrixGetQuantileCut(
self.handle, config, ctypes.byref(c_sindptr), ctypes.byref(c_sdata)
)
)
assert c_sindptr.value is not None
assert c_sdata.value is not None
i_indptr = json.loads(c_sindptr.value)
indptr = from_array_interface(i_indptr)
assert indptr.size == n_features + 1
assert indptr.dtype == np.uint64
i_data = json.loads(c_sdata.value)
data = from_array_interface(i_data)
assert data.size == indptr[-1]
assert data.dtype == np.float32
return indptr, data
def num_row(self) -> int:
"""Get the number of rows in the DMatrix."""
ret = c_bst_ulong()
@@ -1117,11 +1267,10 @@ class DMatrix: # pylint: disable=too-many-instance-attributes,too-many-public-m
@property
def feature_names(self) -> Optional[FeatureNames]:
"""Get feature names (column labels).
"""Labels for features (column labels).
Setting it to ``None`` resets existing feature names.
Returns
-------
feature_names : list or None
"""
length = c_bst_ulong()
sarr = ctypes.POINTER(ctypes.c_char_p)()
@@ -1140,67 +1289,61 @@ class DMatrix: # pylint: disable=too-many-instance-attributes,too-many-public-m
@feature_names.setter
def feature_names(self, feature_names: Optional[FeatureNames]) -> None:
"""Set feature names (column labels).
Parameters
----------
feature_names : list or None
Labels for features. None will reset existing feature names
"""
if feature_names is not None:
# validate feature name
try:
if not isinstance(feature_names, str):
feature_names = list(feature_names)
else:
feature_names = [feature_names]
except TypeError:
feature_names = [cast(str, feature_names)]
if len(feature_names) != len(set(feature_names)):
raise ValueError("feature_names must be unique")
if len(feature_names) != self.num_col() and self.num_col() != 0:
msg = (
"feature_names must have the same length as data, ",
f"expected {self.num_col()}, got {len(feature_names)}",
)
raise ValueError(msg)
# prohibit to use symbols may affect to parse. e.g. []<
if not all(
isinstance(f, str) and not any(x in f for x in ["[", "]", "<"])
for f in feature_names
):
raise ValueError(
"feature_names must be string, and may not contain [, ] or <"
)
feature_names_bytes = [bytes(f, encoding="utf-8") for f in feature_names]
c_feature_names = (ctypes.c_char_p * len(feature_names_bytes))(
*feature_names_bytes
)
_check_call(
_LIB.XGDMatrixSetStrFeatureInfo(
self.handle,
c_str("feature_name"),
c_feature_names,
c_bst_ulong(len(feature_names)),
)
)
else:
# reset feature_types also
if feature_names is None:
_check_call(
_LIB.XGDMatrixSetStrFeatureInfo(
self.handle, c_str("feature_name"), None, c_bst_ulong(0)
)
)
self.feature_types = None
return
# validate feature name
feature_names = _validate_feature_info(
feature_names, self.num_col(), "feature names"
)
if len(feature_names) != len(set(feature_names)):
values, counts = np.unique(
feature_names,
return_index=False,
return_inverse=False,
return_counts=True,
)
duplicates = [name for name, cnt in zip(values, counts) if cnt > 1]
raise ValueError(
f"feature_names must be unique. Duplicates found: {duplicates}"
)
# prohibit the use symbols that may affect parsing. e.g. []<
if not all(
isinstance(f, str) and not any(x in f for x in ["[", "]", "<"])
for f in feature_names
):
raise ValueError(
"feature_names must be string, and may not contain [, ] or <"
)
feature_names_bytes = [bytes(f, encoding="utf-8") for f in feature_names]
c_feature_names = (ctypes.c_char_p * len(feature_names_bytes))(
*feature_names_bytes
)
_check_call(
_LIB.XGDMatrixSetStrFeatureInfo(
self.handle,
c_str("feature_name"),
c_feature_names,
c_bst_ulong(len(feature_names)),
)
)
@property
def feature_types(self) -> Optional[FeatureTypes]:
"""Get feature types (column types).
"""Type of features (column types).
This is for displaying the results and categorical data support. See
:py:class:`DMatrix` for details.
Setting it to ``None`` resets existing feature types.
Returns
-------
feature_types : list or None
"""
length = c_bst_ulong()
sarr = ctypes.POINTER(ctypes.c_char_p)()
@@ -1218,57 +1361,32 @@ class DMatrix: # pylint: disable=too-many-instance-attributes,too-many-public-m
return res
@feature_types.setter
def feature_types(self, feature_types: Optional[Union[List[str], str]]) -> None:
"""Set feature types (column types).
This is for displaying the results and categorical data support. See
:py:class:`DMatrix` for details.
Parameters
----------
feature_types :
Labels for features. None will reset existing feature names
"""
# For compatibility reason this function wraps single str input into a list. But
# we should not promote such usage since other than visualization, the field is
# also used for specifying categorical data type.
if feature_types is not None:
if not isinstance(feature_types, (list, str)):
raise TypeError("feature_types must be string or list of strings")
if isinstance(feature_types, str):
# single string will be applied to all columns
feature_types = [feature_types] * self.num_col()
try:
if not isinstance(feature_types, str):
feature_types = list(feature_types)
else:
feature_types = [feature_types]
except TypeError:
feature_types = [cast(str, feature_types)]
feature_types_bytes = [bytes(f, encoding="utf-8") for f in feature_types]
c_feature_types = (ctypes.c_char_p * len(feature_types_bytes))(
*feature_types_bytes
)
_check_call(
_LIB.XGDMatrixSetStrFeatureInfo(
self.handle,
c_str("feature_type"),
c_feature_types,
c_bst_ulong(len(feature_types)),
)
)
if len(feature_types) != self.num_col() and self.num_col() != 0:
msg = "feature_types must have the same length as data"
raise ValueError(msg)
else:
# Reset.
def feature_types(self, feature_types: Optional[FeatureTypes]) -> None:
if feature_types is None:
# Reset
_check_call(
_LIB.XGDMatrixSetStrFeatureInfo(
self.handle, c_str("feature_type"), None, c_bst_ulong(0)
)
)
return
feature_types = _validate_feature_info(
feature_types, self.num_col(), "feature types"
)
feature_types_bytes = [bytes(f, encoding="utf-8") for f in feature_types]
c_feature_types = (ctypes.c_char_p * len(feature_types_bytes))(
*feature_types_bytes
)
_check_call(
_LIB.XGDMatrixSetStrFeatureInfo(
self.handle,
c_str("feature_type"),
c_feature_types,
c_bst_ulong(len(feature_types)),
)
)
class _ProxyDMatrix(DMatrix):
@@ -1318,13 +1436,13 @@ class _ProxyDMatrix(DMatrix):
class QuantileDMatrix(DMatrix):
"""A DMatrix variant that generates quantilized data directly from input for
``hist`` and ``gpu_hist`` tree methods. This DMatrix is primarily designed to save
memory in training by avoiding intermediate storage. Set ``max_bin`` to control the
number of bins during quantisation, which should be consistent with the training
parameter ``max_bin``. When ``QuantileDMatrix`` is used for validation/test dataset,
``ref`` should be another ``QuantileDMatrix``(or ``DMatrix``, but not recommended as
it defeats the purpose of saving memory) constructed from training dataset. See
"""A DMatrix variant that generates quantilized data directly from input for the
``hist`` tree method. This DMatrix is primarily designed to save memory in training
by avoiding intermediate storage. Set ``max_bin`` to control the number of bins
during quantisation, which should be consistent with the training parameter
``max_bin``. When ``QuantileDMatrix`` is used for validation/test dataset, ``ref``
should be another ``QuantileDMatrix``(or ``DMatrix``, but not recommended as it
defeats the purpose of saving memory) constructed from training dataset. See
:py:obj:`xgboost.DMatrix` for documents on meta info.
.. note::
@@ -1372,7 +1490,7 @@ class QuantileDMatrix(DMatrix):
enable_categorical: bool = False,
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> None:
self.max_bin: int = max_bin if max_bin is not None else 256
self.max_bin = max_bin
self.missing = missing if missing is not None else np.nan
self.nthread = nthread if nthread is not None else -1
self._silent = silent # unused, kept for compatibility
@@ -1544,7 +1662,7 @@ class Booster:
)
for d in cache:
# Validate feature only after the feature names are saved into booster.
self._validate_dmatrix_features(d)
self._assign_dmatrix_features(d)
if isinstance(model_file, Booster):
assert self.handle is not None
@@ -1667,6 +1785,11 @@ class Booster:
self.__dict__.update(state)
def __getitem__(self, val: Union[int, tuple, slice]) -> "Booster":
"""Get a slice of the tree-based model.
.. versionadded:: 1.3.0
"""
if isinstance(val, int):
val = slice(val, val + 1)
if isinstance(val, tuple):
@@ -1705,6 +1828,11 @@ class Booster:
return sliced
def __iter__(self) -> Generator["Booster", None, None]:
"""Iterator method for getting individual trees.
.. versionadded:: 2.0.0
"""
for i in range(0, self.num_boosted_rounds()):
yield self[i]
@@ -1795,7 +1923,7 @@ class Booster:
attr_names = from_cstr_to_pystr(sarr, length)
return {n: self.attr(n) for n in attr_names}
def set_attr(self, **kwargs: Optional[str]) -> None:
def set_attr(self, **kwargs: Optional[Any]) -> None:
"""Set the attribute of the Booster.
Parameters
@@ -1915,7 +2043,7 @@ class Booster:
"""
if not isinstance(dtrain, DMatrix):
raise TypeError(f"invalid training matrix: {type(dtrain).__name__}")
self._validate_dmatrix_features(dtrain)
self._assign_dmatrix_features(dtrain)
if fobj is None:
_check_call(
@@ -1947,7 +2075,7 @@ class Booster:
raise ValueError(f"grad / hess length mismatch: {len(grad)} / {len(hess)}")
if not isinstance(dtrain, DMatrix):
raise TypeError(f"invalid training matrix: {type(dtrain).__name__}")
self._validate_dmatrix_features(dtrain)
self._assign_dmatrix_features(dtrain)
_check_call(
_LIB.XGBoosterBoostOneIter(
@@ -1988,7 +2116,7 @@ class Booster:
raise TypeError(f"expected DMatrix, got {type(d[0]).__name__}")
if not isinstance(d[1], str):
raise TypeError(f"expected string, got {type(d[1]).__name__}")
self._validate_dmatrix_features(d[0])
self._assign_dmatrix_features(d[0])
dmats = c_array(ctypes.c_void_p, [d[0].handle for d in evals])
evnames = c_array(ctypes.c_char_p, [c_str(d[1]) for d in evals])
@@ -2040,7 +2168,7 @@ class Booster:
result: str
Evaluation result string.
"""
self._validate_dmatrix_features(data)
self._assign_dmatrix_features(data)
return self.eval_set([(data, name)], iteration)
# pylint: disable=too-many-function-args
@@ -2139,7 +2267,8 @@ class Booster:
if not isinstance(data, DMatrix):
raise TypeError("Expecting data to be a DMatrix object, got: ", type(data))
if validate_features:
self._validate_dmatrix_features(data)
fn = data.feature_names
self._validate_features(fn)
args = {
"type": 0,
"training": training,
@@ -2187,20 +2316,25 @@ class Booster:
base_margin: Any = None,
strict_shape: bool = False,
) -> NumpyOrCupy:
"""Run prediction in-place, Unlike :py:meth:`predict` method, inplace prediction
does not cache the prediction result.
"""Run prediction in-place when possible, Unlike :py:meth:`predict` method,
inplace prediction does not cache the prediction result.
Calling only ``inplace_predict`` in multiple threads is safe and lock
free. But the safety does not hold when used in conjunction with other
methods. E.g. you can't train the booster in one thread and perform
prediction in the other.
.. note::
If the device ordinal of the input data doesn't match the one configured for
the booster, data will be copied to the booster device.
.. code-block:: python
booster.set_param({"predictor": "gpu_predictor"})
booster.set_param({"device": "cuda:0"})
booster.inplace_predict(cupy_array)
booster.set_param({"predictor": "cpu_predictor"})
booster.set_param({"device": "cpu"})
booster.inplace_predict(numpy_array)
.. versionadded:: 1.1.0
@@ -2208,9 +2342,7 @@ class Booster:
Parameters
----------
data :
The input data, must not be a view for numpy array. Set
``predictor`` to ``gpu_predictor`` for running prediction on CuPy
array or CuDF DataFrame.
The input data.
iteration_range :
See :py:meth:`predict` for details.
predict_type :
@@ -2233,8 +2365,8 @@ class Booster:
Returns
-------
prediction : numpy.ndarray/cupy.ndarray
The prediction result. When input data is on GPU, prediction
result is stored in a cupy array.
The prediction result. When input data is on GPU, prediction result is
stored in a cupy array.
"""
preds = ctypes.POINTER(ctypes.c_float)()
@@ -2267,6 +2399,7 @@ class Booster:
_is_cudf_df,
_is_cupy_array,
_is_list,
_is_np_array_like,
_is_pandas_df,
_is_pandas_series,
_is_tuple,
@@ -2296,7 +2429,7 @@ class Booster:
f"got {data.shape[1]}"
)
if isinstance(data, np.ndarray):
if _is_np_array_like(data):
from .data import _ensure_np_dtype
data, _ = _ensure_np_dtype(data, data.dtype)
@@ -2460,10 +2593,35 @@ class Booster:
else:
raise TypeError("Unknown file type: ", fname)
if self.attr("best_iteration") is not None:
self.best_iteration = int(cast(int, self.attr("best_iteration")))
if self.attr("best_score") is not None:
self.best_score = float(cast(float, self.attr("best_score")))
@property
def best_iteration(self) -> int:
"""The best iteration during training."""
best = self.attr("best_iteration")
if best is not None:
return int(best)
raise AttributeError(
"`best_iteration` is only defined when early stopping is used."
)
@best_iteration.setter
def best_iteration(self, iteration: int) -> None:
self.set_attr(best_iteration=iteration)
@property
def best_score(self) -> float:
"""The best evaluation score during training."""
best = self.attr("best_score")
if best is not None:
return float(best)
raise AttributeError(
"`best_score` is only defined when early stopping is used."
)
@best_score.setter
def best_score(self, score: int) -> None:
self.set_attr(best_score=score)
def num_boosted_rounds(self) -> int:
"""Get number of boosted rounds. For gblinear this is reset to 0 after
@@ -2761,14 +2919,13 @@ class Booster:
# pylint: disable=no-member
return df.sort(["Tree", "Node"]).reset_index(drop=True)
def _validate_dmatrix_features(self, data: DMatrix) -> None:
def _assign_dmatrix_features(self, data: DMatrix) -> None:
if data.num_row() == 0:
return
fn = data.feature_names
ft = data.feature_types
# Be consistent with versions before 1.7, "validate" actually modifies the
# booster.
if self.feature_names is None:
self.feature_names = fn
if self.feature_types is None:

View File

@@ -70,6 +70,7 @@ from .core import (
Metric,
Objective,
QuantileDMatrix,
_check_distributed_params,
_deprecate_positional_args,
_expect,
)
@@ -82,6 +83,7 @@ from .sklearn import (
XGBRanker,
XGBRankerMixIn,
XGBRegressorBase,
_can_use_qdm,
_check_rf_callback,
_cls_predict_proba,
_objective_decorator,
@@ -617,14 +619,7 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902
if self._iter == len(self._data):
# Return 0 when there's no more batch.
return 0
feature_names: Optional[FeatureNames] = None
if self._feature_names:
feature_names = self._feature_names
else:
if hasattr(self.data(), "columns"):
feature_names = self.data().columns.format()
else:
feature_names = None
input_data(
data=self.data(),
label=self._get("_label"),
@@ -634,7 +629,7 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902
base_margin=self._get("_base_margin"),
label_lower_bound=self._get("_label_lower_bound"),
label_upper_bound=self._get("_label_upper_bound"),
feature_names=feature_names,
feature_names=self._feature_names,
feature_types=self._feature_types,
feature_weights=self._feature_weights,
)
@@ -855,8 +850,6 @@ async def _get_rabit_args(
except Exception: # pylint: disable=broad-except
sched_addr = None
# make sure all workers are online so that we can obtain reliable scheduler_info
await client.wait_for_workers(n_workers) # type: ignore
env = await client.run_on_scheduler(
_start_tracker, n_workers, sched_addr, user_addr
)
@@ -912,6 +905,16 @@ def _filter_empty(
raise ValueError("None of the workers can provide a valid result.")
async def _check_workers_are_alive(
workers: List[str], client: "distributed.Client"
) -> None:
info = await client.scheduler.identity()
current_workers = info["workers"].keys()
missing_workers = set(workers) - current_workers
if missing_workers:
raise RuntimeError(f"Missing required workers: {missing_workers}")
async def _train_async(
client: "distributed.Client",
global_config: Dict[str, Any],
@@ -929,12 +932,9 @@ async def _train_async(
custom_metric: Optional[Metric],
) -> Optional[TrainReturnT]:
workers = _get_workers_from_data(dtrain, evals)
await _check_workers_are_alive(workers, client)
_rabit_args = await _get_rabit_args(len(workers), dconfig, client)
if params.get("booster", None) == "gblinear":
raise NotImplementedError(
f"booster `{params['booster']}` is not yet supported for dask."
)
_check_distributed_params(params)
def dispatched_train(
parameters: Dict,
@@ -1574,7 +1574,7 @@ async def _async_wrap_evaluation_matrices(
"""A switch function for async environment."""
def _dispatch(ref: Optional[DaskDMatrix], **kwargs: Any) -> DaskDMatrix:
if tree_method in ("hist", "gpu_hist"):
if _can_use_qdm(tree_method):
return DaskQuantileDMatrix(
client=client, ref=ref, max_bin=max_bin, **kwargs
)

View File

@@ -5,7 +5,7 @@ import ctypes
import json
import os
import warnings
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, cast
import numpy as np
@@ -17,6 +17,7 @@ from ._typing import (
FloatCompatible,
NumpyDType,
PandasDType,
TransformedData,
c_bst_ulong,
)
from .compat import DataFrame, lazy_isinstance
@@ -163,8 +164,8 @@ def _is_scipy_coo(data: DataType) -> bool:
return isinstance(data, scipy.sparse.coo_matrix)
def _is_numpy_array(data: DataType) -> bool:
return isinstance(data, (np.ndarray, np.matrix))
def _is_np_array_like(data: DataType) -> bool:
return hasattr(data, "__array_interface__")
def _ensure_np_dtype(
@@ -197,6 +198,7 @@ def _from_numpy_array(
nthread: int,
feature_names: Optional[FeatureNames],
feature_types: Optional[FeatureTypes],
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> DispatchedDataBackendReturnType:
"""Initialize data from a 2-D numpy matrix."""
_check_data_shape(data)
@@ -205,7 +207,11 @@ def _from_numpy_array(
_check_call(
_LIB.XGDMatrixCreateFromDense(
_array_interface(data),
make_jcargs(missing=float(missing), nthread=int(nthread)),
make_jcargs(
missing=float(missing),
nthread=int(nthread),
data_split_mode=int(data_split_mode),
),
ctypes.byref(handle),
)
)
@@ -311,7 +317,6 @@ def pandas_feature_info(
) -> Tuple[Optional[FeatureNames], Optional[FeatureTypes]]:
"""Handle feature info for pandas dataframe."""
import pandas as pd
from pandas.api.types import is_categorical_dtype, is_sparse
# handle feature names
if feature_names is None and meta is None:
@@ -326,10 +331,10 @@ def pandas_feature_info(
if feature_types is None and meta is None:
feature_types = []
for dtype in data.dtypes:
if is_sparse(dtype):
if is_pd_sparse_dtype(dtype):
feature_types.append(_pandas_dtype_mapper[dtype.subtype.name])
elif (
is_categorical_dtype(dtype) or is_pa_ext_categorical_dtype(dtype)
is_pd_cat_dtype(dtype) or is_pa_ext_categorical_dtype(dtype)
) and enable_categorical:
feature_types.append(CAT_T)
else:
@@ -339,18 +344,13 @@ def pandas_feature_info(
def is_nullable_dtype(dtype: PandasDType) -> bool:
"""Whether dtype is a pandas nullable type."""
from pandas.api.types import (
is_bool_dtype,
is_categorical_dtype,
is_float_dtype,
is_integer_dtype,
)
from pandas.api.types import is_bool_dtype, is_float_dtype, is_integer_dtype
is_int = is_integer_dtype(dtype) and dtype.name in pandas_nullable_mapper
# np.bool has alias `bool`, while pd.BooleanDtype has `boolean`.
is_bool = is_bool_dtype(dtype) and dtype.name == "boolean"
is_float = is_float_dtype(dtype) and dtype.name in pandas_nullable_mapper
return is_int or is_bool or is_float or is_categorical_dtype(dtype)
return is_int or is_bool or is_float or is_pd_cat_dtype(dtype)
def is_pa_ext_dtype(dtype: Any) -> bool:
@@ -365,17 +365,48 @@ def is_pa_ext_categorical_dtype(dtype: Any) -> bool:
)
def is_pd_cat_dtype(dtype: PandasDType) -> bool:
"""Wrapper for testing pandas category type."""
import pandas as pd
if hasattr(pd.util, "version") and hasattr(pd.util.version, "Version"):
Version = pd.util.version.Version
if Version(pd.__version__) >= Version("2.1.0"):
from pandas import CategoricalDtype
return isinstance(dtype, CategoricalDtype)
from pandas.api.types import is_categorical_dtype
return is_categorical_dtype(dtype)
def is_pd_sparse_dtype(dtype: PandasDType) -> bool:
"""Wrapper for testing pandas sparse type."""
import pandas as pd
if hasattr(pd.util, "version") and hasattr(pd.util.version, "Version"):
Version = pd.util.version.Version
if Version(pd.__version__) >= Version("2.1.0"):
from pandas import SparseDtype
return isinstance(dtype, SparseDtype)
from pandas.api.types import is_sparse
return is_sparse(dtype)
def pandas_cat_null(data: DataFrame) -> DataFrame:
"""Handle categorical dtype and nullable extension types from pandas."""
import pandas as pd
from pandas.api.types import is_categorical_dtype
# handle category codes and nullable.
cat_columns = []
nul_columns = []
# avoid an unnecessary conversion if possible
for col, dtype in zip(data.columns, data.dtypes):
if is_categorical_dtype(dtype):
if is_pd_cat_dtype(dtype):
cat_columns.append(col)
elif is_pa_ext_categorical_dtype(dtype):
raise ValueError(
@@ -392,7 +423,7 @@ def pandas_cat_null(data: DataFrame) -> DataFrame:
transformed = data
def cat_codes(ser: pd.Series) -> pd.Series:
if is_categorical_dtype(ser.dtype):
if is_pd_cat_dtype(ser.dtype):
return ser.cat.codes
assert is_pa_ext_categorical_dtype(ser.dtype)
# Not yet supported, the index is not ordered for some reason. Alternately:
@@ -448,14 +479,12 @@ def _transform_pandas_df(
meta: Optional[str] = None,
meta_type: Optional[NumpyDType] = None,
) -> Tuple[np.ndarray, Optional[FeatureNames], Optional[FeatureTypes]]:
from pandas.api.types import is_categorical_dtype, is_sparse
pyarrow_extension = False
for dtype in data.dtypes:
if not (
(dtype.name in _pandas_dtype_mapper)
or is_sparse(dtype)
or (is_categorical_dtype(dtype) and enable_categorical)
or is_pd_sparse_dtype(dtype)
or (is_pd_cat_dtype(dtype) and enable_categorical)
or is_pa_ext_dtype(dtype)
):
_invalid_dataframe_dtype(data)
@@ -509,9 +538,8 @@ def _meta_from_pandas_series(
) -> None:
"""Help transform pandas series for meta data like labels"""
data = data.values.astype("float")
from pandas.api.types import is_sparse
if is_sparse(data):
if is_pd_sparse_dtype(getattr(data, "dtype", data)):
data = data.to_dense() # type: ignore
assert len(data.shape) == 1 or data.shape[1] == 0 or data.shape[1] == 1
_meta_from_numpy(data, name, dtype, handle)
@@ -533,13 +561,11 @@ def _from_pandas_series(
feature_names: Optional[FeatureNames],
feature_types: Optional[FeatureTypes],
) -> DispatchedDataBackendReturnType:
from pandas.api.types import is_categorical_dtype
if (data.dtype.name not in _pandas_dtype_mapper) and not (
is_categorical_dtype(data.dtype) and enable_categorical
is_pd_cat_dtype(data.dtype) and enable_categorical
):
_invalid_dataframe_dtype(data)
if enable_categorical and is_categorical_dtype(data.dtype):
if enable_categorical and is_pd_cat_dtype(data.dtype):
data = data.cat.codes
return _from_numpy_array(
data.values.reshape(data.shape[0], 1).astype("float"),
@@ -1045,8 +1071,10 @@ def dispatch_data_backend(
return _from_scipy_csr(
data.tocsr(), missing, threads, feature_names, feature_types
)
if _is_numpy_array(data):
return _from_numpy_array(data, missing, threads, feature_names, feature_types)
if _is_np_array_like(data):
return _from_numpy_array(
data, missing, threads, feature_names, feature_types, data_split_mode
)
if _is_uri(data):
return _from_uri(data, missing, feature_names, feature_types, data_split_mode)
if _is_list(data):
@@ -1186,7 +1214,7 @@ def dispatch_meta_backend(
if _is_tuple(data):
_meta_from_tuple(data, name, dtype, handle)
return
if _is_numpy_array(data):
if _is_np_array_like(data):
_meta_from_numpy(data, name, dtype, handle)
return
if _is_pandas_df(data):
@@ -1261,12 +1289,7 @@ def _proxy_transform(
feature_names: Optional[FeatureNames],
feature_types: Optional[FeatureTypes],
enable_categorical: bool,
) -> Tuple[
Union[bool, ctypes.c_void_p, np.ndarray],
Optional[list],
Optional[FeatureNames],
Optional[FeatureTypes],
]:
) -> TransformedData:
if _is_cudf_df(data) or _is_cudf_ser(data):
return _transform_cudf_df(
data, feature_names, feature_types, enable_categorical
@@ -1278,7 +1301,7 @@ def _proxy_transform(
return _transform_dlpack(data), None, feature_names, feature_types
if _is_list(data) or _is_tuple(data):
data = np.array(data)
if _is_numpy_array(data):
if _is_np_array_like(data):
data, _ = _ensure_np_dtype(data, data.dtype)
return data, None, feature_names, feature_types
if _is_scipy_csr(data):
@@ -1328,7 +1351,7 @@ def dispatch_proxy_set_data(
if not allow_host:
raise err
if _is_numpy_array(data):
if _is_np_array_like(data):
_check_data_shape(data)
proxy._set_data_from_array(data) # pylint: disable=W0212
return

View File

@@ -27,20 +27,19 @@ def find_lib_path() -> List[str]:
os.path.join(curr_path, os.path.pardir, os.path.pardir, "lib"),
# use libxgboost from a system prefix, if available. This should be the last
# option.
os.path.join(sys.prefix, "lib"),
os.path.join(sys.base_prefix, "lib"),
]
if sys.platform == "win32":
if platform.architecture()[0] == "64bit":
dll_path.append(os.path.join(curr_path, "../../windows/x64/Release/"))
# hack for pip installation when copy all parent source
# directory here
dll_path.append(os.path.join(curr_path, "./windows/x64/Release/"))
else:
dll_path.append(os.path.join(curr_path, "../../windows/Release/"))
# hack for pip installation when copy all parent source
# directory here
dll_path.append(os.path.join(curr_path, "./windows/Release/"))
# On Windows, Conda may install libs in different paths
dll_path.extend(
[
os.path.join(sys.base_prefix, "bin"),
os.path.join(sys.base_prefix, "Library"),
os.path.join(sys.base_prefix, "Library", "bin"),
os.path.join(sys.base_prefix, "Library", "lib"),
]
)
dll_path = [os.path.join(p, "xgboost.dll") for p in dll_path]
elif sys.platform.startswith(("linux", "freebsd", "emscripten")):
dll_path = [os.path.join(p, "libxgboost.so") for p in dll_path]
@@ -62,8 +61,8 @@ def find_lib_path() -> List[str]:
+ ("\n- ".join(dll_path))
+ "\nXGBoost Python package path: "
+ curr_path
+ "\nsys.prefix: "
+ sys.prefix
+ "\nsys.base_prefix: "
+ sys.base_prefix
+ "\nSee: "
+ link
+ " for installing XGBoost."

View File

@@ -76,6 +76,10 @@ def _check_rf_callback(
)
def _can_use_qdm(tree_method: Optional[str]) -> bool:
return tree_method in ("hist", "gpu_hist", None, "auto")
SklObjective = Optional[
Union[str, Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]]
]
@@ -226,10 +230,10 @@ __model_doc = f"""
subsample : Optional[float]
Subsample ratio of the training instance.
sampling_method :
Sampling method. Used only by `gpu_hist` tree method.
- `uniform`: select random training instances uniformly.
- `gradient_based` select random training instances with higher probability when
the gradient and hessian are larger. (cf. CatBoost)
Sampling method. Used only by the GPU version of ``hist`` tree method.
- ``uniform``: select random training instances uniformly.
- ``gradient_based`` select random training instances with higher probability
when the gradient and hessian are larger. (cf. CatBoost)
colsample_bytree : Optional[float]
Subsample ratio of columns when constructing each tree.
colsample_bylevel : Optional[float]
@@ -273,13 +277,16 @@ __model_doc = f"""
* For linear model, only "weight" is defined and it's the normalized coefficients
without bias.
gpu_id : Optional[int]
Device ordinal.
device : Optional[str]
.. versionadded:: 2.0.0
Device ordinal, available options are `cpu`, `cuda`, and `gpu`.
validate_parameters : Optional[bool]
Give warnings for unknown parameter.
predictor : Optional[str]
Force XGBoost to use specific predictor, available choices are [cpu_predictor,
gpu_predictor].
enable_categorical : bool
.. versionadded:: 1.5.0
@@ -381,17 +388,21 @@ __model_doc = f"""
every **early_stopping_rounds** round(s) to continue training. Requires at
least one item in **eval_set** in :py:meth:`fit`.
- The method returns the model from the last iteration, not the best one, use a
callback :py:class:`xgboost.callback.EarlyStopping` if returning the best
model is preferred.
- If early stopping occurs, the model will have two additional attributes:
:py:attr:`best_score` and :py:attr:`best_iteration`. These are used by the
:py:meth:`predict` and :py:meth:`apply` methods to determine the optimal
number of trees during inference. If users want to access the full model
(including trees built after early stopping), they can specify the
`iteration_range` in these inference methods. In addition, other utilities
like model plotting can also use the entire model.
- If you prefer to discard the trees after `best_iteration`, consider using the
callback function :py:class:`xgboost.callback.EarlyStopping`.
- If there's more than one item in **eval_set**, the last entry will be used for
early stopping. If there's more than one metric in **eval_metric**, the last
metric will be used for early stopping.
- If early stopping occurs, the model will have three additional fields:
:py:attr:`best_score`, :py:attr:`best_iteration`.
.. note::
This parameter replaces `early_stopping_rounds` in :py:meth:`fit` method.
@@ -646,9 +657,8 @@ class XGBModel(XGBModelBase):
monotone_constraints: Optional[Union[Dict[str, int], str]] = None,
interaction_constraints: Optional[Union[str, Sequence[Sequence[str]]]] = None,
importance_type: Optional[str] = None,
gpu_id: Optional[int] = None,
device: Optional[str] = None,
validate_parameters: Optional[bool] = None,
predictor: Optional[str] = None,
enable_categorical: bool = False,
feature_types: Optional[FeatureTypes] = None,
max_cat_to_onehot: Optional[int] = None,
@@ -693,9 +703,8 @@ class XGBModel(XGBModelBase):
self.monotone_constraints = monotone_constraints
self.interaction_constraints = interaction_constraints
self.importance_type = importance_type
self.gpu_id = gpu_id
self.device = device
self.validate_parameters = validate_parameters
self.predictor = predictor
self.enable_categorical = enable_categorical
self.feature_types = feature_types
self.max_cat_to_onehot = max_cat_to_onehot
@@ -931,8 +940,7 @@ class XGBModel(XGBModelBase):
callbacks = self.callbacks if self.callbacks is not None else callbacks
tree_method = params.get("tree_method", None)
cat_support = {"gpu_hist", "approx", "hist"}
if self.enable_categorical and tree_method not in cat_support:
if self.enable_categorical and tree_method == "exact":
raise ValueError(
"Experimental support for categorical data is not implemented for"
" current tree method yet."
@@ -941,7 +949,7 @@ class XGBModel(XGBModelBase):
def _create_dmatrix(self, ref: Optional[DMatrix], **kwargs: Any) -> DMatrix:
# Use `QuantileDMatrix` to save memory.
if self.tree_method in ("hist", "gpu_hist"):
if _can_use_qdm(self.tree_method) and self.booster != "gblinear":
try:
return QuantileDMatrix(
**kwargs, ref=ref, nthread=self.n_jobs, max_bin=self.max_bin
@@ -984,12 +992,12 @@ class XGBModel(XGBModelBase):
X :
Feature matrix. See :ref:`py-data` for a list of supported types.
When the ``tree_method`` is set to ``hist`` or ``gpu_hist``, internally, the
When the ``tree_method`` is set to ``hist``, internally, the
:py:class:`QuantileDMatrix` will be used instead of the :py:class:`DMatrix`
for conserving memory. However, this has performance implications when the
device of input data is not matched with algorithm. For instance, if the
input is a numpy array on CPU but ``gpu_hist`` is used for training, then
the data is first processed on CPU then transferred to GPU.
input is a numpy array on CPU but ``cuda`` is used for training, then the
data is first processed on CPU then transferred to GPU.
y :
Labels
sample_weight :
@@ -1002,13 +1010,17 @@ class XGBModel(XGBModelBase):
Validation metrics will help us track the performance of the model.
eval_metric : str, list of str, or callable, optional
.. deprecated:: 1.6.0
Use `eval_metric` in :py:meth:`__init__` or :py:meth:`set_params` instead.
Use `eval_metric` in :py:meth:`__init__` or :py:meth:`set_params` instead.
early_stopping_rounds : int
.. deprecated:: 1.6.0
Use `early_stopping_rounds` in :py:meth:`__init__` or
:py:meth:`set_params` instead.
Use `early_stopping_rounds` in :py:meth:`__init__` or :py:meth:`set_params`
instead.
verbose :
If `verbose` is True and an evaluation set is used, the evaluation metric
measured on the validation set is printed to stdout at each boosting stage.
@@ -1089,12 +1101,7 @@ class XGBModel(XGBModelBase):
return self
def _can_use_inplace_predict(self) -> bool:
# When predictor is explicitly set, using `inplace_predict` might result into
# error with incompatible data type.
# Inplace predict doesn't handle as many data types as DMatrix, but it's
# sufficient for dask interface where input is simpiler.
predictor = self.get_xgb_params().get("predictor", None)
if predictor in ("auto", None) and self.booster != "gblinear":
if self.booster != "gblinear":
return True
return False
@@ -1120,9 +1127,9 @@ class XGBModel(XGBModelBase):
iteration_range: Optional[Tuple[int, int]] = None,
) -> ArrayLike:
"""Predict with `X`. If the model is trained with early stopping, then
:py:attr:`best_iteration` is used automatically. For tree models, when data is
on GPU, like cupy array or cuDF dataframe and `predictor` is not specified, the
prediction is run on GPU automatically, otherwise it will run on CPU.
:py:attr:`best_iteration` is used automatically. The estimator uses
`inplace_predict` by default and falls back to using :py:class:`DMatrix` if
devices between the data and the estimator don't match.
.. note:: This function is only thread safe for `gbtree` and `dart`.
@@ -1272,19 +1279,10 @@ class XGBModel(XGBModelBase):
)
return np.array(feature_names)
def _early_stopping_attr(self, attr: str) -> Union[float, int]:
booster = self.get_booster()
try:
return getattr(booster, attr)
except AttributeError as e:
raise AttributeError(
f"`{attr}` in only defined when early stopping is used."
) from e
@property
def best_score(self) -> float:
"""The best score obtained by early stopping."""
return float(self._early_stopping_attr("best_score"))
return self.get_booster().best_score
@property
def best_iteration(self) -> int:
@@ -1292,7 +1290,7 @@ class XGBModel(XGBModelBase):
for instance if the best iteration is the first round, then best_iteration is 0.
"""
return int(self._early_stopping_attr("best_iteration"))
return self.get_booster().best_iteration
@property
def feature_importances_(self) -> np.ndarray:
@@ -1584,7 +1582,9 @@ class XGBClassifier(XGBModel, XGBClassifierMixIn, XGBClassifierBase):
) -> np.ndarray:
"""Predict the probability of each `X` example being of a given class. If the
model is trained with early stopping, then :py:attr:`best_iteration` is used
automatically.
automatically. The estimator uses `inplace_predict` by default and falls back to
using :py:class:`DMatrix` if devices between the data and the estimator don't
match.
.. note:: This function is only thread safe for `gbtree` and `dart`.
@@ -1917,12 +1917,12 @@ class XGBRanker(XGBModel, XGBRankerMixIn):
| 1 | :math:`x_{20}` | :math:`x_{21}` |
+-----+----------------+----------------+
When the ``tree_method`` is set to ``hist`` or ``gpu_hist``, internally, the
When the ``tree_method`` is set to ``hist``, internally, the
:py:class:`QuantileDMatrix` will be used instead of the :py:class:`DMatrix`
for conserving memory. However, this has performance implications when the
device of input data is not matched with algorithm. For instance, if the
input is a numpy array on CPU but ``gpu_hist`` is used for training, then
the data is first processed on CPU then transferred to GPU.
input is a numpy array on CPU but ``cuda`` is used for training, then the
data is first processed on CPU then transferred to GPU.
y :
Labels
group :

View File

@@ -1,4 +1,4 @@
"""Xgboost pyspark integration submodule for core code."""
"""XGBoost pyspark integration submodule for core code."""
import base64
# pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name
@@ -22,7 +22,7 @@ from typing import (
import numpy as np
import pandas as pd
from pyspark import SparkContext, cloudpickle
from pyspark import RDD, SparkContext, cloudpickle
from pyspark.ml import Estimator, Model
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.ml.linalg import VectorUDT
@@ -44,6 +44,7 @@ from pyspark.ml.util import (
MLWritable,
MLWriter,
)
from pyspark.resource import ResourceProfileBuilder, TaskResourceRequests
from pyspark.sql import Column, DataFrame
from pyspark.sql.functions import col, countDistinct, pandas_udf, rand, struct
from pyspark.sql.types import (
@@ -59,11 +60,12 @@ from scipy.special import expit, softmax # pylint: disable=no-name-in-module
import xgboost
from xgboost import XGBClassifier
from xgboost.compat import is_cudf_available
from xgboost.core import Booster
from xgboost.sklearn import DEFAULT_N_ESTIMATORS, XGBModel
from xgboost.compat import is_cudf_available, is_cupy_available
from xgboost.core import Booster, _check_distributed_params
from xgboost.sklearn import DEFAULT_N_ESTIMATORS, XGBModel, _can_use_qdm
from xgboost.training import train as worker_train
from .._typing import ArrayLike
from .data import (
_read_csr_matrix_from_unwrapped_spark_vec,
alias,
@@ -87,11 +89,13 @@ from .utils import (
_get_rabit_args,
_get_spark_session,
_is_local,
_is_standalone_or_localcluster,
deserialize_booster,
deserialize_xgb_model,
get_class_name,
get_logger,
serialize_booster,
use_cuda,
)
# Put pyspark specific params here, they won't be passed to XGBoost.
@@ -108,13 +112,13 @@ _pyspark_specific_params = [
"arbitrary_params_dict",
"force_repartition",
"num_workers",
"use_gpu",
"feature_names",
"features_cols",
"enable_sparse_data_optim",
"qid_col",
"repartition_random_shuffle",
"pred_contrib_col",
"use_gpu",
]
_non_booster_params = ["missing", "n_estimators", "feature_types", "feature_weights"]
@@ -132,7 +136,7 @@ _pyspark_param_alias_map = {
_inverse_pyspark_param_alias_map = {v: k for k, v in _pyspark_param_alias_map.items()}
_unsupported_xgb_params = [
"gpu_id", # we have "use_gpu" pyspark param instead.
"gpu_id", # we have "device" pyspark param instead.
"enable_categorical", # Use feature_types param to specify categorical feature instead
"use_label_encoder",
"n_jobs", # Do not allow user to set it, will use `spark.task.cpus` value instead.
@@ -197,11 +201,24 @@ class _SparkXGBParams(
"The number of XGBoost workers. Each XGBoost worker corresponds to one spark task.",
TypeConverters.toInt,
)
device = Param(
Params._dummy(),
"device",
(
"The device type for XGBoost executors. Available options are `cpu`,`cuda`"
" and `gpu`. Set `device` to `cuda` or `gpu` if the executors are running "
"on GPU instances. Currently, only one GPU per task is supported."
),
TypeConverters.toString,
)
use_gpu = Param(
Params._dummy(),
"use_gpu",
"A boolean variable. Set use_gpu=true if the executors "
+ "are running on GPU instances. Currently, only one GPU per task is supported.",
(
"Deprecated, use `device` instead. A boolean variable. Set use_gpu=true "
"if the executors are running on GPU instances. Currently, only one GPU per"
" task is supported."
),
TypeConverters.toBoolean,
)
force_repartition = Param(
@@ -227,6 +244,13 @@ class _SparkXGBParams(
TypeConverters.toList,
)
def set_device(self, value: str) -> "_SparkXGBParams":
"""Set device, optional value: cpu, cuda, gpu"""
_check_distributed_params({"device": value})
assert value in ("cpu", "cuda", "gpu")
self.set(self.device, value)
return self
@classmethod
def _xgb_cls(cls) -> Type[XGBModel]:
"""
@@ -320,6 +344,54 @@ class _SparkXGBParams(
predict_params[param.name] = self.getOrDefault(param)
return predict_params
def _validate_gpu_params(self) -> None:
"""Validate the gpu parameters and gpu configurations"""
if use_cuda(self.getOrDefault(self.device)) or self.getOrDefault(self.use_gpu):
ss = _get_spark_session()
sc = ss.sparkContext
if _is_local(sc):
# Support GPU training in Spark local mode is just for debugging
# purposes, so it's okay for printing the below warning instead of
# checking the real gpu numbers and raising the exception.
get_logger(self.__class__.__name__).warning(
"You have enabled GPU in spark local mode. Please make sure your"
" local node has at least %d GPUs",
self.getOrDefault(self.num_workers),
)
else:
executor_gpus = sc.getConf().get("spark.executor.resource.gpu.amount")
if executor_gpus is None:
raise ValueError(
"The `spark.executor.resource.gpu.amount` is required for training"
" on GPU."
)
if not (ss.version >= "3.4.0" and _is_standalone_or_localcluster(sc)):
# We will enable stage-level scheduling in spark 3.4.0+ which doesn't
# require spark.task.resource.gpu.amount to be set explicitly
gpu_per_task = sc.getConf().get("spark.task.resource.gpu.amount")
if gpu_per_task is not None:
if float(gpu_per_task) < 1.0:
raise ValueError(
"XGBoost doesn't support GPU fractional configurations. "
"Please set `spark.task.resource.gpu.amount=spark.executor"
".resource.gpu.amount`"
)
if float(gpu_per_task) > 1.0:
get_logger(self.__class__.__name__).warning(
"%s GPUs for each Spark task is configured, but each "
"XGBoost training task uses only 1 GPU.",
gpu_per_task,
)
else:
raise ValueError(
"The `spark.task.resource.gpu.amount` is required for training"
" on GPU."
)
def _validate_params(self) -> None:
# pylint: disable=too-many-branches
init_model = self.getOrDefault("xgb_model")
@@ -335,10 +407,18 @@ class _SparkXGBParams(
f"It cannot be less than 1 [Default is 1]"
)
tree_method = self.getOrDefault(self.getParam("tree_method"))
if tree_method == "exact":
raise ValueError(
"The `exact` tree method is not supported for distributed systems."
)
if self.getOrDefault(self.features_cols):
if not self.getOrDefault(self.use_gpu):
if not use_cuda(self.getOrDefault(self.device)) and not self.getOrDefault(
self.use_gpu
):
raise ValueError(
"features_col param with list value requires enabling use_gpu."
"features_col param with list value requires `device=cuda`."
)
if self.getOrDefault("objective") is not None:
@@ -391,57 +471,7 @@ class _SparkXGBParams(
"`pyspark.ml.linalg.Vector` type."
)
if self.getOrDefault(self.use_gpu):
tree_method = self.getParam("tree_method")
if (
self.getOrDefault(tree_method) is not None
and self.getOrDefault(tree_method) != "gpu_hist"
):
raise ValueError(
f"tree_method should be 'gpu_hist' or None when use_gpu is True,"
f"found {self.getOrDefault(tree_method)}."
)
gpu_per_task = (
_get_spark_session()
.sparkContext.getConf()
.get("spark.task.resource.gpu.amount")
)
is_local = _is_local(_get_spark_session().sparkContext)
if is_local:
# checking spark local mode.
if gpu_per_task:
raise RuntimeError(
"The spark cluster does not support gpu configuration for local mode. "
"Please delete spark.executor.resource.gpu.amount and "
"spark.task.resource.gpu.amount"
)
# Support GPU training in Spark local mode is just for debugging purposes,
# so it's okay for printing the below warning instead of checking the real
# gpu numbers and raising the exception.
get_logger(self.__class__.__name__).warning(
"You enabled use_gpu in spark local mode. Please make sure your local node "
"has at least %d GPUs",
self.getOrDefault(self.num_workers),
)
else:
# checking spark non-local mode.
if not gpu_per_task or int(gpu_per_task) < 1:
raise RuntimeError(
"The spark cluster does not have the necessary GPU"
+ "configuration for the spark task. Therefore, we cannot"
+ "run xgboost training using GPU."
)
if int(gpu_per_task) > 1:
get_logger(self.__class__.__name__).warning(
"You configured %s GPU cores for each spark task, but in "
"XGBoost training, every Spark task will only use one GPU core.",
gpu_per_task,
)
self._validate_gpu_params()
def _validate_and_convert_feature_col_as_float_col_list(
@@ -557,6 +587,7 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
# they are added in `setParams`.
self._setDefault(
num_workers=1,
device="cpu",
use_gpu=False,
force_repartition=False,
repartition_random_shuffle=False,
@@ -565,9 +596,9 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
arbitrary_params_dict={},
)
def setParams(
self, **kwargs: Dict[str, Any]
) -> None: # pylint: disable=invalid-name
self.logger = get_logger(self.__class__.__name__)
def setParams(self, **kwargs: Any) -> None: # pylint: disable=invalid-name
"""
Set params for the estimator.
"""
@@ -612,6 +643,8 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
)
raise ValueError(err_msg)
_extra_params[k] = v
_check_distributed_params(kwargs)
_existing_extra_params = self.getOrDefault(self.arbitrary_params_dict)
self._set(arbitrary_params_dict={**_existing_extra_params, **_extra_params})
@@ -708,9 +741,6 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
# TODO: support "num_parallel_tree" for random forest
params["num_boost_round"] = self.getOrDefault("n_estimators")
if self.getOrDefault(self.use_gpu):
params["tree_method"] = "gpu_hist"
return params
@classmethod
@@ -870,6 +900,116 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
return booster_params, train_call_kwargs_params, dmatrix_kwargs
def _skip_stage_level_scheduling(self) -> bool:
# pylint: disable=too-many-return-statements
"""Check if stage-level scheduling is not needed,
return true to skip stage-level scheduling"""
if use_cuda(self.getOrDefault(self.device)) or self.getOrDefault(self.use_gpu):
ss = _get_spark_session()
sc = ss.sparkContext
if ss.version < "3.4.0":
self.logger.info(
"Stage-level scheduling in xgboost requires spark version 3.4.0+"
)
return True
if not _is_standalone_or_localcluster(sc):
self.logger.info(
"Stage-level scheduling in xgboost requires spark standalone or "
"local-cluster mode"
)
return True
executor_cores = sc.getConf().get("spark.executor.cores")
executor_gpus = sc.getConf().get("spark.executor.resource.gpu.amount")
if executor_cores is None or executor_gpus is None:
self.logger.info(
"Stage-level scheduling in xgboost requires spark.executor.cores, "
"spark.executor.resource.gpu.amount to be set."
)
return True
if int(executor_cores) == 1:
# there will be only 1 task running at any time.
self.logger.info(
"Stage-level scheduling in xgboost requires spark.executor.cores > 1 "
)
return True
if int(executor_gpus) > 1:
# For spark.executor.resource.gpu.amount > 1, we suppose user knows how to configure
# to make xgboost run successfully.
#
self.logger.info(
"Stage-level scheduling in xgboost will not work "
"when spark.executor.resource.gpu.amount>1"
)
return True
task_gpu_amount = sc.getConf().get("spark.task.resource.gpu.amount")
if task_gpu_amount is None:
# The ETL tasks will not grab a gpu when spark.task.resource.gpu.amount is not set,
# but with stage-level scheduling, we can make training task grab the gpu.
return False
if float(task_gpu_amount) == float(executor_gpus):
# spark.executor.resource.gpu.amount=spark.task.resource.gpu.amount "
# results in only 1 task running at a time, which may cause perf issue.
return True
# We can enable stage-level scheduling
return False
# CPU training doesn't require stage-level scheduling
return True
def _try_stage_level_scheduling(self, rdd: RDD) -> RDD:
"""Try to enable stage-level scheduling"""
if self._skip_stage_level_scheduling():
return rdd
ss = _get_spark_session()
# executor_cores will not be None
executor_cores = ss.sparkContext.getConf().get("spark.executor.cores")
assert executor_cores is not None
# Spark-rapids is a project to leverage GPUs to accelerate spark SQL.
# If spark-rapids is enabled, to avoid GPU OOM, we don't allow other
# ETL gpu tasks running alongside training tasks.
spark_plugins = ss.conf.get("spark.plugins", " ")
assert spark_plugins is not None
spark_rapids_sql_enabled = ss.conf.get("spark.rapids.sql.enabled", "true")
assert spark_rapids_sql_enabled is not None
task_cores = (
int(executor_cores)
if "com.nvidia.spark.SQLPlugin" in spark_plugins
and "true" == spark_rapids_sql_enabled.lower()
else (int(executor_cores) // 2) + 1
)
# Each training task requires cpu cores > total executor cores//2 + 1 which can
# make sure the tasks be sent to different executors.
#
# Please note that we can't use GPU to limit the concurrent tasks because of
# https://issues.apache.org/jira/browse/SPARK-45527.
task_gpus = 1.0
treqs = TaskResourceRequests().cpus(task_cores).resource("gpu", task_gpus)
rp = ResourceProfileBuilder().require(treqs).build
self.logger.info(
"XGBoost training tasks require the resource(cores=%s, gpu=%s).",
task_cores,
task_gpus,
)
return rdd.withResources(rp)
def _fit(self, dataset: DataFrame) -> "_SparkXGBModel":
# pylint: disable=too-many-statements, too-many-locals
self._validate_params()
@@ -882,8 +1022,9 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
dmatrix_kwargs,
) = self._get_xgb_parameters(dataset)
use_gpu = self.getOrDefault(self.use_gpu)
run_on_gpu = use_cuda(self.getOrDefault(self.device)) or self.getOrDefault(
self.use_gpu
)
is_local = _is_local(_get_spark_session().sparkContext)
num_workers = self.getOrDefault(self.num_workers)
@@ -899,34 +1040,30 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
context = BarrierTaskContext.get()
gpu_id = None
use_hist = booster_params.get("tree_method", None) in ("hist", "gpu_hist")
dev_ordinal = None
use_qdm = _can_use_qdm(booster_params.get("tree_method", None))
if use_gpu:
gpu_id = context.partitionId() if is_local else _get_gpu_id(context)
booster_params["gpu_id"] = gpu_id
if run_on_gpu:
dev_ordinal = (
context.partitionId() if is_local else _get_gpu_id(context)
)
booster_params["device"] = "cuda:" + str(dev_ordinal)
# If cuDF is not installed, then using DMatrix instead of QDM,
# because without cuDF, DMatrix performs better than QDM.
# Note: Checking `is_cudf_available` in spark worker side because
# spark worker might has different python environment with driver side.
use_qdm = use_hist and is_cudf_available()
else:
use_qdm = use_hist
use_qdm = use_qdm and is_cudf_available()
get_logger("XGBoost-PySpark").info(
"Leveraging %s to train with QDM: %s",
booster_params["device"],
"on" if use_qdm else "off",
)
if use_qdm and (booster_params.get("max_bin", None) is not None):
dmatrix_kwargs["max_bin"] = booster_params["max_bin"]
_rabit_args = {}
if context.partitionId() == 0:
get_logger("XGBoostPySpark").debug(
"booster params: %s\n"
"train_call_kwargs_params: %s\n"
"dmatrix_kwargs: %s",
booster_params,
train_call_kwargs_params,
dmatrix_kwargs,
)
_rabit_args = _get_rabit_args(context, num_workers)
worker_message = {
@@ -945,7 +1082,7 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
dtrain, dvalid = create_dmatrix_from_partitions(
pandas_df_iter,
feature_prop.features_cols_names,
gpu_id,
dev_ordinal,
use_qdm,
dmatrix_kwargs,
enable_sparse_data_optim=feature_prop.enable_sparse_data_optim,
@@ -973,17 +1110,31 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
)
def _run_job() -> Tuple[str, str]:
ret = (
rdd = (
dataset.mapInPandas(
_train_booster, schema="config string, booster string" # type: ignore
_train_booster, # type: ignore
schema="config string, booster string",
)
.rdd.barrier()
.mapPartitions(lambda x: x)
.collect()[0]
)
rdd_with_resource = self._try_stage_level_scheduling(rdd)
ret = rdd_with_resource.collect()[0]
return ret[0], ret[1]
get_logger("XGBoost-PySpark").info(
"Running xgboost-%s on %s workers with"
"\n\tbooster params: %s"
"\n\ttrain_call_kwargs_params: %s"
"\n\tdmatrix_kwargs: %s",
xgboost._py_version(),
num_workers,
booster_params,
train_call_kwargs_params,
dmatrix_kwargs,
)
(config, booster) = _run_job()
get_logger("XGBoost-PySpark").info("Finished xgboost training!")
result_xgb_model = self._convert_to_sklearn_model(
bytearray(booster, "utf-8"), config
@@ -1092,12 +1243,111 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable):
)
return features_col, feature_col_names
def _get_pred_contrib_col_name(self) -> Optional[str]:
"""Return the pred_contrib_col col name"""
pred_contrib_col_name = None
if (
self.isDefined(self.pred_contrib_col)
and self.getOrDefault(self.pred_contrib_col) != ""
):
pred_contrib_col_name = self.getOrDefault(self.pred_contrib_col)
return pred_contrib_col_name
def _out_schema(self) -> Tuple[bool, str]:
"""Return the bool to indicate if it's a single prediction, true is single prediction,
and the returned type of the user-defined function. The value must
be a DDL-formatted type string."""
if self._get_pred_contrib_col_name() is not None:
return False, f"{pred.prediction} double, {pred.pred_contrib} array<double>"
return True, "double"
def _get_predict_func(self) -> Callable:
"""Return the true prediction function which will be running on the executor side"""
predict_params = self._gen_predict_params_dict()
pred_contrib_col_name = self._get_pred_contrib_col_name()
def _predict(
model: XGBModel, X: ArrayLike, base_margin: Optional[ArrayLike]
) -> Union[pd.DataFrame, pd.Series]:
data = {}
preds = model.predict(
X,
base_margin=base_margin,
validate_features=False,
**predict_params,
)
data[pred.prediction] = pd.Series(preds)
if pred_contrib_col_name is not None:
contribs = pred_contribs(model, X, base_margin)
data[pred.pred_contrib] = pd.Series(list(contribs))
return pd.DataFrame(data=data)
return data[pred.prediction]
return _predict
def _post_transform(self, dataset: DataFrame, pred_col: Column) -> DataFrame:
"""Post process of transform"""
prediction_col_name = self.getOrDefault(self.predictionCol)
single_pred, _ = self._out_schema()
if single_pred:
if prediction_col_name:
dataset = dataset.withColumn(prediction_col_name, pred_col)
else:
pred_struct_col = "_prediction_struct"
dataset = dataset.withColumn(pred_struct_col, pred_col)
if prediction_col_name:
dataset = dataset.withColumn(
prediction_col_name, getattr(col(pred_struct_col), pred.prediction)
)
pred_contrib_col_name = self._get_pred_contrib_col_name()
if pred_contrib_col_name is not None:
dataset = dataset.withColumn(
pred_contrib_col_name,
array_to_vector(getattr(col(pred_struct_col), pred.pred_contrib)),
)
dataset = dataset.drop(pred_struct_col)
return dataset
def _gpu_transform(self) -> bool:
"""If gpu is used to do the prediction, true to gpu prediction"""
if _is_local(_get_spark_session().sparkContext):
# if it's local model, we just use the internal "device"
return use_cuda(self.getOrDefault(self.device))
gpu_per_task = (
_get_spark_session()
.sparkContext.getConf()
.get("spark.task.resource.gpu.amount")
)
# User don't set gpu configurations, just use cpu
if gpu_per_task is None:
if use_cuda(self.getOrDefault(self.device)):
get_logger("XGBoost-PySpark").warning(
"Do the prediction on the CPUs since "
"no gpu configurations are set"
)
return False
# User already sets the gpu configurations, we just use the internal "device".
return use_cuda(self.getOrDefault(self.device))
def _transform(self, dataset: DataFrame) -> DataFrame:
# pylint: disable=too-many-statements, too-many-locals
# Save xgb_sklearn_model and predict_params to be local variable
# to avoid the `self` object to be pickled to remote.
xgb_sklearn_model = self._xgb_sklearn_model
predict_params = self._gen_predict_params_dict()
has_base_margin = False
if (
@@ -1112,79 +1362,92 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable):
features_col, feature_col_names = self._get_feature_col(dataset)
enable_sparse_data_optim = self.getOrDefault(self.enable_sparse_data_optim)
pred_contrib_col_name = None
if (
self.isDefined(self.pred_contrib_col)
and self.getOrDefault(self.pred_contrib_col) != ""
):
pred_contrib_col_name = self.getOrDefault(self.pred_contrib_col)
predict_func = self._get_predict_func()
single_pred = True
schema = "double"
if pred_contrib_col_name:
single_pred = False
schema = f"{pred.prediction} double, {pred.pred_contrib} array<double>"
_, schema = self._out_schema()
is_local = _is_local(_get_spark_session().sparkContext)
run_on_gpu = self._gpu_transform()
@pandas_udf(schema) # type: ignore
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
assert xgb_sklearn_model is not None
model = xgb_sklearn_model
from pyspark import TaskContext
context = TaskContext.get()
assert context is not None
dev_ordinal = -1
if is_cudf_available():
if is_local:
if run_on_gpu and is_cupy_available():
import cupy as cp # pylint: disable=import-error
total_gpus = cp.cuda.runtime.getDeviceCount()
if total_gpus > 0:
partition_id = context.partitionId()
# For transform local mode, default the dev_ordinal to
# (partition id) % gpus.
dev_ordinal = partition_id % total_gpus
elif run_on_gpu:
dev_ordinal = _get_gpu_id(context)
if dev_ordinal >= 0:
device = "cuda:" + str(dev_ordinal)
get_logger("XGBoost-PySpark").info(
"Do the inference with device: %s", device
)
model.set_params(device=device)
else:
get_logger("XGBoost-PySpark").info("Do the inference on the CPUs")
else:
msg = (
"CUDF is unavailable, fallback the inference on the CPUs"
if run_on_gpu
else "Do the inference on the CPUs"
)
get_logger("XGBoost-PySpark").info(msg)
def to_gpu_if_possible(data: ArrayLike) -> ArrayLike:
"""Move the data to gpu if possible"""
if dev_ordinal >= 0:
import cudf # pylint: disable=import-error
import cupy as cp # pylint: disable=import-error
# We must set the device after import cudf, which will change the device id to 0
# See https://github.com/rapidsai/cudf/issues/11386
cp.cuda.runtime.setDevice(dev_ordinal) # pylint: disable=I1101
df = cudf.DataFrame(data)
del data
return df
return data
for data in iterator:
if enable_sparse_data_optim:
X = _read_csr_matrix_from_unwrapped_spark_vec(data)
else:
if feature_col_names is not None:
X = data[feature_col_names]
tmp = data[feature_col_names]
else:
X = stack_series(data[alias.data])
tmp = stack_series(data[alias.data])
X = to_gpu_if_possible(tmp)
if has_base_margin:
base_margin = data[alias.margin].to_numpy()
base_margin = to_gpu_if_possible(data[alias.margin])
else:
base_margin = None
data = {}
preds = model.predict(
X,
base_margin=base_margin,
validate_features=False,
**predict_params,
)
data[pred.prediction] = pd.Series(preds)
if pred_contrib_col_name:
contribs = pred_contribs(model, X, base_margin)
data[pred.pred_contrib] = pd.Series(list(contribs))
yield pd.DataFrame(data=data)
else:
yield data[pred.prediction]
yield predict_func(model, X, base_margin)
if has_base_margin:
pred_col = predict_udf(struct(*features_col, base_margin_col))
else:
pred_col = predict_udf(struct(*features_col))
prediction_col_name = self.getOrDefault(self.predictionCol)
if single_pred:
dataset = dataset.withColumn(prediction_col_name, pred_col)
else:
pred_struct_col = "_prediction_struct"
dataset = dataset.withColumn(pred_struct_col, pred_col)
dataset = dataset.withColumn(
prediction_col_name, getattr(col(pred_struct_col), pred.prediction)
)
if pred_contrib_col_name:
dataset = dataset.withColumn(
pred_contrib_col_name,
array_to_vector(getattr(col(pred_struct_col), pred.pred_contrib)),
)
dataset = dataset.drop(pred_struct_col)
return dataset
return self._post_transform(dataset, pred_col)
class _ClassificationModel( # pylint: disable=abstract-method
@@ -1196,22 +1459,21 @@ class _ClassificationModel( # pylint: disable=abstract-method
.. Note:: This API is experimental.
"""
def _transform(self, dataset: DataFrame) -> DataFrame:
# pylint: disable=too-many-statements, too-many-locals
# Save xgb_sklearn_model and predict_params to be local variable
# to avoid the `self` object to be pickled to remote.
xgb_sklearn_model = self._xgb_sklearn_model
predict_params = self._gen_predict_params_dict()
def _out_schema(self) -> Tuple[bool, str]:
schema = (
f"{pred.raw_prediction} array<double>, {pred.prediction} double,"
f" {pred.probability} array<double>"
)
if self._get_pred_contrib_col_name() is not None:
# We will force setting strict_shape to True when predicting contribs,
# So, it will also output 3-D shape result.
schema = f"{schema}, {pred.pred_contrib} array<array<double>>"
has_base_margin = False
if (
self.isDefined(self.base_margin_col)
and self.getOrDefault(self.base_margin_col) != ""
):
has_base_margin = True
base_margin_col = col(self.getOrDefault(self.base_margin_col)).alias(
alias.margin
)
return False, schema
def _get_predict_func(self) -> Callable:
predict_params = self._gen_predict_params_dict()
pred_contrib_col_name = self._get_pred_contrib_col_name()
def transform_margin(margins: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
if margins.ndim == 1:
@@ -1226,76 +1488,38 @@ class _ClassificationModel( # pylint: disable=abstract-method
class_probs = softmax(raw_preds, axis=1)
return raw_preds, class_probs
features_col, feature_col_names = self._get_feature_col(dataset)
enable_sparse_data_optim = self.getOrDefault(self.enable_sparse_data_optim)
def _predict(
model: XGBModel, X: ArrayLike, base_margin: Optional[np.ndarray]
) -> Union[pd.DataFrame, pd.Series]:
margins = model.predict(
X,
base_margin=base_margin,
output_margin=True,
validate_features=False,
**predict_params,
)
raw_preds, class_probs = transform_margin(margins)
pred_contrib_col_name = None
if (
self.isDefined(self.pred_contrib_col)
and self.getOrDefault(self.pred_contrib_col) != ""
):
pred_contrib_col_name = self.getOrDefault(self.pred_contrib_col)
# It seems that they use argmax of class probs,
# not of margin to get the prediction (Note: scala implementation)
preds = np.argmax(class_probs, axis=1)
result: Dict[str, pd.Series] = {
pred.raw_prediction: pd.Series(list(raw_preds)),
pred.prediction: pd.Series(preds),
pred.probability: pd.Series(list(class_probs)),
}
schema = (
f"{pred.raw_prediction} array<double>, {pred.prediction} double,"
f" {pred.probability} array<double>"
)
if pred_contrib_col_name:
# We will force setting strict_shape to True when predicting contribs,
# So, it will also output 3-D shape result.
schema = f"{schema}, {pred.pred_contrib} array<array<double>>"
if pred_contrib_col_name is not None:
contribs = pred_contribs(model, X, base_margin, strict_shape=True)
result[pred.pred_contrib] = pd.Series(list(contribs.tolist()))
@pandas_udf(schema) # type: ignore
def predict_udf(
iterator: Iterator[Tuple[pd.Series, ...]]
) -> Iterator[pd.DataFrame]:
assert xgb_sklearn_model is not None
model = xgb_sklearn_model
for data in iterator:
if enable_sparse_data_optim:
X = _read_csr_matrix_from_unwrapped_spark_vec(data)
else:
if feature_col_names is not None:
X = data[feature_col_names] # type: ignore
else:
X = stack_series(data[alias.data])
return pd.DataFrame(data=result)
if has_base_margin:
base_margin = stack_series(data[alias.margin])
else:
base_margin = None
margins = model.predict(
X,
base_margin=base_margin,
output_margin=True,
validate_features=False,
**predict_params,
)
raw_preds, class_probs = transform_margin(margins)
# It seems that they use argmax of class probs,
# not of margin to get the prediction (Note: scala implementation)
preds = np.argmax(class_probs, axis=1)
result: Dict[str, pd.Series] = {
pred.raw_prediction: pd.Series(list(raw_preds)),
pred.prediction: pd.Series(preds),
pred.probability: pd.Series(list(class_probs)),
}
if pred_contrib_col_name:
contribs = pred_contribs(model, X, base_margin, strict_shape=True)
result[pred.pred_contrib] = pd.Series(list(contribs.tolist()))
yield pd.DataFrame(data=result)
if has_base_margin:
pred_struct = predict_udf(struct(*features_col, base_margin_col))
else:
pred_struct = predict_udf(struct(*features_col))
return _predict
def _post_transform(self, dataset: DataFrame, pred_col: Column) -> DataFrame:
pred_struct_col = "_prediction_struct"
dataset = dataset.withColumn(pred_struct_col, pred_struct)
dataset = dataset.withColumn(pred_struct_col, pred_col)
raw_prediction_col_name = self.getOrDefault(self.rawPredictionCol)
if raw_prediction_col_name:
@@ -1317,7 +1541,8 @@ class _ClassificationModel( # pylint: disable=abstract-method
array_to_vector(getattr(col(pred_struct_col), pred.probability)),
)
if pred_contrib_col_name:
pred_contrib_col_name = self._get_pred_contrib_col_name()
if pred_contrib_col_name is not None:
dataset = dataset.withColumn(
pred_contrib_col_name,
getattr(col(pred_struct_col), pred.pred_contrib),

View File

@@ -157,7 +157,7 @@ def _read_csr_matrix_from_unwrapped_spark_vec(part: pd.DataFrame) -> csr_matrix:
def make_qdm(
data: Dict[str, List[np.ndarray]],
gpu_id: Optional[int],
dev_ordinal: Optional[int],
meta: Dict[str, Any],
ref: Optional[DMatrix],
params: Dict[str, Any],
@@ -165,7 +165,7 @@ def make_qdm(
"""Handle empty partition for QuantileDMatrix."""
if not data:
return QuantileDMatrix(np.empty((0, 0)), ref=ref)
it = PartIter(data, gpu_id, **meta)
it = PartIter(data, dev_ordinal, **meta)
m = QuantileDMatrix(it, **params, ref=ref)
return m
@@ -173,7 +173,7 @@ def make_qdm(
def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
iterator: Iterator[pd.DataFrame],
feature_cols: Optional[Sequence[str]],
gpu_id: Optional[int],
dev_ordinal: Optional[int],
use_qdm: bool,
kwargs: Dict[str, Any], # use dict to make sure this parameter is passed.
enable_sparse_data_optim: bool,
@@ -187,7 +187,7 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
Pyspark partition iterator.
feature_cols:
A sequence of feature names, used only when rapids plugin is enabled.
gpu_id:
dev_ordinal:
Device ordinal, used when GPU is enabled.
use_qdm :
Whether QuantileDMatrix should be used instead of DMatrix.
@@ -304,13 +304,13 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
if feature_cols is not None and use_qdm:
cache_partitions(iterator, append_fn)
dtrain: DMatrix = make_qdm(train_data, gpu_id, meta, None, params)
dtrain: DMatrix = make_qdm(train_data, dev_ordinal, meta, None, params)
elif feature_cols is not None and not use_qdm:
cache_partitions(iterator, append_fn)
dtrain = make(train_data, kwargs)
elif feature_cols is None and use_qdm:
cache_partitions(iterator, append_fn)
dtrain = make_qdm(train_data, gpu_id, meta, None, params)
dtrain = make_qdm(train_data, dev_ordinal, meta, None, params)
else:
cache_partitions(iterator, append_fn)
dtrain = make(train_data, kwargs)
@@ -324,7 +324,7 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
if has_validation_col:
if use_qdm:
dvalid: Optional[DMatrix] = make_qdm(
valid_data, gpu_id, meta, dtrain, params
valid_data, dev_ordinal, meta, dtrain, params
)
else:
dvalid = make(valid_data, kwargs) if has_validation_col else None

View File

@@ -3,8 +3,8 @@
# pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name
# pylint: disable=unused-argument, too-many-locals
from typing import Any, Dict, List, Optional, Type, Union
import warnings
from typing import Any, List, Optional, Type, Union
import numpy as np
from pyspark import keyword_only
@@ -77,28 +77,35 @@ def _set_pyspark_xgb_cls_param_attrs(
set_param_attrs(name, param_obj)
def _deprecated_use_gpu() -> None:
warnings.warn(
"`use_gpu` is deprecated since 2.0.0, use `device` instead", FutureWarning
)
class SparkXGBRegressor(_SparkXGBEstimator):
"""
SparkXGBRegressor is a PySpark ML estimator. It implements the XGBoost regression
"""SparkXGBRegressor is a PySpark ML estimator. It implements the XGBoost regression
algorithm based on XGBoost python library, and it can be used in PySpark Pipeline
and PySpark ML meta algorithms like :py:class:`~pyspark.ml.tuning.CrossValidator`/
:py:class:`~pyspark.ml.tuning.TrainValidationSplit`/
:py:class:`~pyspark.ml.classification.OneVsRest`
and PySpark ML meta algorithms like
- :py:class:`~pyspark.ml.tuning.CrossValidator`/
- :py:class:`~pyspark.ml.tuning.TrainValidationSplit`/
- :py:class:`~pyspark.ml.classification.OneVsRest`
SparkXGBRegressor automatically supports most of the parameters in
:py:class:`xgboost.XGBRegressor` constructor and most of the parameters used in
:py:meth:`xgboost.XGBRegressor.fit` and :py:meth:`xgboost.XGBRegressor.predict` method.
:py:meth:`xgboost.XGBRegressor.fit` and :py:meth:`xgboost.XGBRegressor.predict`
method.
SparkXGBRegressor doesn't support setting `gpu_id` but support another param `use_gpu`,
see doc below for more details.
To enable GPU support, set `device` to `cuda` or `gpu`.
SparkXGBRegressor doesn't support setting `base_margin` explicitly as well, but support
another param called `base_margin_col`. see doc below for more details.
SparkXGBRegressor doesn't support setting `base_margin` explicitly as well, but
support another param called `base_margin_col`. see doc below for more details.
SparkXGBRegressor doesn't support `validate_features` and `output_margin` param.
SparkXGBRegressor doesn't support setting `nthread` xgboost param, instead, the `nthread`
param for each xgboost worker will be set equal to `spark.task.cpus` config value.
SparkXGBRegressor doesn't support setting `nthread` xgboost param, instead, the
`nthread` param for each xgboost worker will be set equal to `spark.task.cpus`
config value.
Parameters
@@ -134,8 +141,16 @@ class SparkXGBRegressor(_SparkXGBEstimator):
How many XGBoost workers to be used to train.
Each XGBoost worker corresponds to one spark task.
use_gpu:
Boolean value to specify whether the executors are running on GPU
instances.
.. deprecated:: 2.0.0
Use `device` instead.
device:
.. versionadded:: 2.0.0
Device for XGBoost workers, available options are `cpu`, `cuda`, and `gpu`.
force_repartition:
Boolean value to specify if forcing the input dataset to be repartitioned
before XGBoost training.
@@ -194,14 +209,17 @@ class SparkXGBRegressor(_SparkXGBEstimator):
weight_col: Optional[str] = None,
base_margin_col: Optional[str] = None,
num_workers: int = 1,
use_gpu: bool = False,
use_gpu: Optional[bool] = None,
device: Optional[str] = None,
force_repartition: bool = False,
repartition_random_shuffle: bool = False,
enable_sparse_data_optim: bool = False,
**kwargs: Dict[str, Any],
**kwargs: Any,
) -> None:
super().__init__()
input_kwargs = self._input_kwargs
if use_gpu:
_deprecated_use_gpu()
self.setParams(**input_kwargs)
@classmethod
@@ -239,27 +257,29 @@ class SparkXGBClassifier(_SparkXGBEstimator, HasProbabilityCol, HasRawPrediction
"""SparkXGBClassifier is a PySpark ML estimator. It implements the XGBoost
classification algorithm based on XGBoost python library, and it can be used in
PySpark Pipeline and PySpark ML meta algorithms like
:py:class:`~pyspark.ml.tuning.CrossValidator`/
:py:class:`~pyspark.ml.tuning.TrainValidationSplit`/
:py:class:`~pyspark.ml.classification.OneVsRest`
- :py:class:`~pyspark.ml.tuning.CrossValidator`/
- :py:class:`~pyspark.ml.tuning.TrainValidationSplit`/
- :py:class:`~pyspark.ml.classification.OneVsRest`
SparkXGBClassifier automatically supports most of the parameters in
:py:class:`xgboost.XGBClassifier` constructor and most of the parameters used in
:py:meth:`xgboost.XGBClassifier.fit` and :py:meth:`xgboost.XGBClassifier.predict` method.
:py:meth:`xgboost.XGBClassifier.fit` and :py:meth:`xgboost.XGBClassifier.predict`
method.
SparkXGBClassifier doesn't support setting `gpu_id` but support another param `use_gpu`,
see doc below for more details.
To enable GPU support, set `device` to `cuda` or `gpu`.
SparkXGBClassifier doesn't support setting `base_margin` explicitly as well, but support
another param called `base_margin_col`. see doc below for more details.
SparkXGBClassifier doesn't support setting `base_margin` explicitly as well, but
support another param called `base_margin_col`. see doc below for more details.
SparkXGBClassifier doesn't support setting `output_margin`, but we can get output margin
from the raw prediction column. See `raw_prediction_col` param doc below for more details.
SparkXGBClassifier doesn't support setting `output_margin`, but we can get output
margin from the raw prediction column. See `raw_prediction_col` param doc below for
more details.
SparkXGBClassifier doesn't support `validate_features` and `output_margin` param.
SparkXGBClassifier doesn't support setting `nthread` xgboost param, instead, the `nthread`
param for each xgboost worker will be set equal to `spark.task.cpus` config value.
SparkXGBClassifier doesn't support setting `nthread` xgboost param, instead, the
`nthread` param for each xgboost worker will be set equal to `spark.task.cpus`
config value.
Parameters
@@ -301,8 +321,16 @@ class SparkXGBClassifier(_SparkXGBEstimator, HasProbabilityCol, HasRawPrediction
How many XGBoost workers to be used to train.
Each XGBoost worker corresponds to one spark task.
use_gpu:
Boolean value to specify whether the executors are running on GPU
instances.
.. deprecated:: 2.0.0
Use `device` instead.
device:
.. versionadded:: 2.0.0
Device for XGBoost workers, available options are `cpu`, `cuda`, and `gpu`.
force_repartition:
Boolean value to specify if forcing the input dataset to be repartitioned
before XGBoost training.
@@ -361,11 +389,12 @@ class SparkXGBClassifier(_SparkXGBEstimator, HasProbabilityCol, HasRawPrediction
weight_col: Optional[str] = None,
base_margin_col: Optional[str] = None,
num_workers: int = 1,
use_gpu: bool = False,
use_gpu: Optional[bool] = None,
device: Optional[str] = None,
force_repartition: bool = False,
repartition_random_shuffle: bool = False,
enable_sparse_data_optim: bool = False,
**kwargs: Dict[str, Any],
**kwargs: Any,
) -> None:
super().__init__()
# The default 'objective' param value comes from sklearn `XGBClassifier` ctor,
@@ -373,6 +402,8 @@ class SparkXGBClassifier(_SparkXGBEstimator, HasProbabilityCol, HasRawPrediction
# binary or multinomial input dataset, and we need to remove the fixed default
# param value as well to avoid causing ambiguity.
input_kwargs = self._input_kwargs
if use_gpu:
_deprecated_use_gpu()
self.setParams(**input_kwargs)
self._setDefault(objective=None)
@@ -423,19 +454,20 @@ class SparkXGBRanker(_SparkXGBEstimator):
:py:class:`xgboost.XGBRanker` constructor and most of the parameters used in
:py:meth:`xgboost.XGBRanker.fit` and :py:meth:`xgboost.XGBRanker.predict` method.
SparkXGBRanker doesn't support setting `gpu_id` but support another param `use_gpu`,
see doc below for more details.
To enable GPU support, set `device` to `cuda` or `gpu`.
SparkXGBRanker doesn't support setting `base_margin` explicitly as well, but support
another param called `base_margin_col`. see doc below for more details.
SparkXGBRanker doesn't support setting `output_margin`, but we can get output margin
from the raw prediction column. See `raw_prediction_col` param doc below for more details.
from the raw prediction column. See `raw_prediction_col` param doc below for more
details.
SparkXGBRanker doesn't support `validate_features` and `output_margin` param.
SparkXGBRanker doesn't support setting `nthread` xgboost param, instead, the `nthread`
param for each xgboost worker will be set equal to `spark.task.cpus` config value.
SparkXGBRanker doesn't support setting `nthread` xgboost param, instead, the
`nthread` param for each xgboost worker will be set equal to `spark.task.cpus`
config value.
Parameters
@@ -468,13 +500,20 @@ class SparkXGBRanker(_SparkXGBEstimator):
:py:class:`xgboost.XGBRanker` fit method.
qid_col:
Query id column name.
num_workers:
How many XGBoost workers to be used to train.
Each XGBoost worker corresponds to one spark task.
use_gpu:
Boolean value to specify whether the executors are running on GPU
instances.
.. deprecated:: 2.0.0
Use `device` instead.
device:
.. versionadded:: 2.0.0
Device for XGBoost workers, available options are `cpu`, `cuda`, and `gpu`.
force_repartition:
Boolean value to specify if forcing the input dataset to be repartitioned
before XGBoost training.
@@ -539,14 +578,17 @@ class SparkXGBRanker(_SparkXGBEstimator):
base_margin_col: Optional[str] = None,
qid_col: Optional[str] = None,
num_workers: int = 1,
use_gpu: bool = False,
use_gpu: Optional[bool] = None,
device: Optional[str] = None,
force_repartition: bool = False,
repartition_random_shuffle: bool = False,
enable_sparse_data_optim: bool = False,
**kwargs: Dict[str, Any],
**kwargs: Any,
) -> None:
super().__init__()
input_kwargs = self._input_kwargs
if use_gpu:
_deprecated_use_gpu()
self.setParams(**input_kwargs)
@classmethod

View File

@@ -7,10 +7,10 @@ import os
import sys
import uuid
from threading import Thread
from typing import Any, Callable, Dict, Set, Type
from typing import Any, Callable, Dict, Optional, Set, Type
import pyspark
from pyspark import BarrierTaskContext, SparkContext, SparkFiles
from pyspark import BarrierTaskContext, SparkContext, SparkFiles, TaskContext
from pyspark.sql.session import SparkSession
from xgboost import Booster, XGBModel, collective
@@ -104,6 +104,10 @@ def get_logger(name: str, level: str = "INFO") -> logging.Logger:
# If the logger is configured, skip the configure
if not logger.handlers and not logging.getLogger().handlers:
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(name)s: %(funcName)s %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
@@ -125,7 +129,14 @@ def _is_local(spark_context: SparkContext) -> bool:
return spark_context._jsc.sc().isLocal()
def _get_gpu_id(task_context: BarrierTaskContext) -> int:
def _is_standalone_or_localcluster(spark_context: SparkContext) -> bool:
master = spark_context.getConf().get("spark.master")
return master is not None and (
master.startswith("spark://") or master.startswith("local-cluster")
)
def _get_gpu_id(task_context: TaskContext) -> int:
"""Get the gpu id from the task resources"""
if task_context is None:
# This is a safety check.
@@ -186,3 +197,8 @@ def deserialize_booster(model: str) -> Booster:
f.write(model)
booster.load_model(tmp_file_name)
return booster
def use_cuda(device: Optional[str]) -> bool:
"""Whether xgboost is using CUDA workers."""
return device in ("cuda", "gpu")

View File

@@ -25,6 +25,7 @@ from typing import (
Set,
Tuple,
TypedDict,
TypeVar,
Union,
)
@@ -198,20 +199,20 @@ class IteratorForTest(xgb.core.DataIter):
X: Sequence,
y: Sequence,
w: Optional[Sequence],
cache: Optional[str] = "./",
cache: Optional[str],
) -> None:
assert len(X) == len(y)
self.X = X
self.y = y
self.w = w
self.it = 0
super().__init__(cache)
super().__init__(cache_prefix=cache)
def next(self, input_data: Callable) -> int:
if self.it == len(self.X):
return 0
with pytest.raises(TypeError, match="keyword args"):
with pytest.raises(TypeError, match="Keyword argument"):
input_data(self.X[self.it], self.y[self.it], None)
# Use copy to make sure the iterator doesn't hold a reference to the data.
@@ -229,7 +230,7 @@ class IteratorForTest(xgb.core.DataIter):
def as_arrays(
self,
) -> Tuple[Union[np.ndarray, sparse.csr_matrix], ArrayLike, ArrayLike]:
) -> Tuple[Union[np.ndarray, sparse.csr_matrix], ArrayLike, Optional[ArrayLike]]:
if isinstance(self.X[0], sparse.csr_matrix):
X = sparse.vstack(self.X, format="csr")
else:
@@ -243,7 +244,12 @@ class IteratorForTest(xgb.core.DataIter):
def make_batches(
n_samples_per_batch: int, n_features: int, n_batches: int, use_cupy: bool = False
n_samples_per_batch: int,
n_features: int,
n_batches: int,
use_cupy: bool = False,
*,
vary_size: bool = False,
) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]:
X = []
y = []
@@ -254,16 +260,25 @@ def make_batches(
rng = cupy.random.RandomState(1994)
else:
rng = np.random.RandomState(1994)
for _ in range(n_batches):
_X = rng.randn(n_samples_per_batch, n_features)
_y = rng.randn(n_samples_per_batch)
_w = rng.uniform(low=0, high=1, size=n_samples_per_batch)
for i in range(n_batches):
n_samples = n_samples_per_batch + i * 10 if vary_size else n_samples_per_batch
_X = rng.randn(n_samples, n_features)
_y = rng.randn(n_samples)
_w = rng.uniform(low=0, high=1, size=n_samples)
X.append(_X)
y.append(_y)
w.append(_w)
return X, y, w
def make_regression(
n_samples: int, n_features: int, use_cupy: bool
) -> Tuple[ArrayLike, ArrayLike, ArrayLike]:
"""Make a simple regression dataset."""
X, y, w = make_batches(n_samples, n_features, 1, use_cupy)
return X[0], y[0], w[0]
def make_batches_sparse(
n_samples_per_batch: int, n_features: int, n_batches: int, sparsity: float
) -> Tuple[List[sparse.csr_matrix], List[np.ndarray], List[np.ndarray]]:
@@ -347,7 +362,9 @@ class TestDataset:
if w is not None:
weight.append(w)
it = IteratorForTest(predictor, response, weight if weight else None)
it = IteratorForTest(
predictor, response, weight if weight else None, cache="cache"
)
return xgb.DMatrix(it)
def __repr__(self) -> str:
@@ -709,6 +726,9 @@ def predictor_equal(lhs: xgb.DMatrix, rhs: xgb.DMatrix) -> bool:
)
M = TypeVar("M", xgb.Booster, xgb.XGBModel)
def eval_error_metric(predt: np.ndarray, dtrain: xgb.DMatrix) -> Tuple[str, np.float64]:
"""Evaluation metric for xgb.train"""
label = dtrain.get_label()

View File

@@ -0,0 +1,34 @@
"""Tests related to the `DataIter` interface."""
import numpy as np
import xgboost
from xgboost import testing as tm
def run_mixed_sparsity(device: str) -> None:
"""Check QDM with mixed batches."""
X_0, y_0, _ = tm.make_regression(128, 16, False)
if device.startswith("cuda"):
X_1, y_1 = tm.make_sparse_regression(256, 16, 0.1, True)
else:
X_1, y_1 = tm.make_sparse_regression(256, 16, 0.1, False)
X_2, y_2 = tm.make_sparse_regression(512, 16, 0.9, True)
X = [X_0, X_1, X_2]
y = [y_0, y_1, y_2]
if device.startswith("cuda"):
import cupy as cp # pylint: disable=import-error
X = [cp.array(batch) for batch in X]
it = tm.IteratorForTest(X, y, None, None)
Xy_0 = xgboost.QuantileDMatrix(it)
X_1, y_1 = tm.make_sparse_regression(256, 16, 0.1, True)
X = [X_0, X_1, X_2]
y = [y_0, y_1, y_2]
X_arr = np.concatenate(X, axis=0)
y_arr = np.concatenate(y, axis=0)
Xy_1 = xgboost.QuantileDMatrix(X_arr, y_arr)
assert tm.predictor_equal(Xy_0, Xy_1)

View File

@@ -41,6 +41,10 @@ hist_parameter_strategy = strategies.fixed_dictionaries(
and (cast(int, x["max_depth"]) > 0 or x["grow_policy"] == "lossguide")
)
hist_cache_strategy = strategies.fixed_dictionaries(
{"max_cached_hist_node": strategies.sampled_from([1, 4, 1024, 2**31])}
)
hist_multi_parameter_strategy = strategies.fixed_dictionaries(
{
"max_depth": strategies.integers(1, 11),

View File

@@ -1,7 +1,7 @@
"""Tests for updaters."""
import json
from functools import partial, update_wrapper
from typing import Dict
from typing import Any, Dict, List
import numpy as np
@@ -159,3 +159,238 @@ def check_quantile_loss(tree_method: str, weighted: bool) -> None:
for i in range(alpha.shape[0]):
np.testing.assert_allclose(predts[:, i], predt_multi[:, i])
def check_cut(
n_entries: int, indptr: np.ndarray, data: np.ndarray, dtypes: Any
) -> None:
"""Check the cut values."""
from pandas.api.types import is_categorical_dtype
assert data.shape[0] == indptr[-1]
assert data.shape[0] == n_entries
assert indptr.dtype == np.uint64
for i in range(1, indptr.size):
beg = int(indptr[i - 1])
end = int(indptr[i])
for j in range(beg + 1, end):
assert data[j] > data[j - 1]
if is_categorical_dtype(dtypes[i - 1]):
assert data[j] == data[j - 1] + 1
def check_get_quantile_cut_device(tree_method: str, use_cupy: bool) -> None:
"""Check with optional cupy."""
from pandas.api.types import is_categorical_dtype
n_samples = 1024
n_features = 14
max_bin = 16
dtypes = [np.float32] * n_features
# numerical
X, y, w = tm.make_regression(n_samples, n_features, use_cupy=use_cupy)
# - qdm
Xyw: xgb.DMatrix = xgb.QuantileDMatrix(X, y, weight=w, max_bin=max_bin)
indptr, data = Xyw.get_quantile_cut()
check_cut((max_bin + 1) * n_features, indptr, data, dtypes)
# - dm
Xyw = xgb.DMatrix(X, y, weight=w)
xgb.train({"tree_method": tree_method, "max_bin": max_bin}, Xyw)
indptr, data = Xyw.get_quantile_cut()
check_cut((max_bin + 1) * n_features, indptr, data, dtypes)
# - ext mem
n_batches = 3
n_samples_per_batch = 256
it = tm.IteratorForTest(
*tm.make_batches(n_samples_per_batch, n_features, n_batches, use_cupy),
cache="cache",
)
Xy: xgb.DMatrix = xgb.DMatrix(it)
xgb.train({"tree_method": tree_method, "max_bin": max_bin}, Xyw)
indptr, data = Xyw.get_quantile_cut()
check_cut((max_bin + 1) * n_features, indptr, data, dtypes)
# categorical
n_categories = 32
X, y = tm.make_categorical(n_samples, n_features, n_categories, False, sparsity=0.8)
if use_cupy:
import cudf # pylint: disable=import-error
import cupy as cp # pylint: disable=import-error
X = cudf.from_pandas(X)
y = cp.array(y)
# - qdm
Xy = xgb.QuantileDMatrix(X, y, max_bin=max_bin, enable_categorical=True)
indptr, data = Xy.get_quantile_cut()
check_cut(n_categories * n_features, indptr, data, X.dtypes)
# - dm
Xy = xgb.DMatrix(X, y, enable_categorical=True)
xgb.train({"tree_method": tree_method, "max_bin": max_bin}, Xy)
indptr, data = Xy.get_quantile_cut()
check_cut(n_categories * n_features, indptr, data, X.dtypes)
# mixed
X, y = tm.make_categorical(
n_samples, n_features, n_categories, False, sparsity=0.8, cat_ratio=0.5
)
n_cat_features = len([0 for dtype in X.dtypes if is_categorical_dtype(dtype)])
n_num_features = n_features - n_cat_features
n_entries = n_categories * n_cat_features + (max_bin + 1) * n_num_features
# - qdm
Xy = xgb.QuantileDMatrix(X, y, max_bin=max_bin, enable_categorical=True)
indptr, data = Xy.get_quantile_cut()
check_cut(n_entries, indptr, data, X.dtypes)
# - dm
Xy = xgb.DMatrix(X, y, enable_categorical=True)
xgb.train({"tree_method": tree_method, "max_bin": max_bin}, Xy)
indptr, data = Xy.get_quantile_cut()
check_cut(n_entries, indptr, data, X.dtypes)
def check_get_quantile_cut(tree_method: str) -> None:
"""Check the quantile cut getter."""
use_cupy = tree_method == "gpu_hist"
check_get_quantile_cut_device(tree_method, False)
if use_cupy:
check_get_quantile_cut_device(tree_method, True)
USE_ONEHOT = np.iinfo(np.int32).max
USE_PART = 1
def check_categorical_ohe( # pylint: disable=too-many-arguments
rows: int, cols: int, rounds: int, cats: int, device: str, tree_method: str
) -> None:
"Test for one-hot encoding with categorical data."
onehot, label = tm.make_categorical(rows, cols, cats, True)
cat, _ = tm.make_categorical(rows, cols, cats, False)
by_etl_results: Dict[str, Dict[str, List[float]]] = {}
by_builtin_results: Dict[str, Dict[str, List[float]]] = {}
parameters: Dict[str, Any] = {
"tree_method": tree_method,
# Use one-hot exclusively
"max_cat_to_onehot": USE_ONEHOT,
"device": device,
}
m = xgb.DMatrix(onehot, label, enable_categorical=False)
xgb.train(
parameters,
m,
num_boost_round=rounds,
evals=[(m, "Train")],
evals_result=by_etl_results,
)
m = xgb.DMatrix(cat, label, enable_categorical=True)
xgb.train(
parameters,
m,
num_boost_round=rounds,
evals=[(m, "Train")],
evals_result=by_builtin_results,
)
# There are guidelines on how to specify tolerance based on considering output
# as random variables. But in here the tree construction is extremely sensitive
# to floating point errors. An 1e-5 error in a histogram bin can lead to an
# entirely different tree. So even though the test is quite lenient, hypothesis
# can still pick up falsifying examples from time to time.
np.testing.assert_allclose(
np.array(by_etl_results["Train"]["rmse"]),
np.array(by_builtin_results["Train"]["rmse"]),
rtol=1e-3,
)
assert tm.non_increasing(by_builtin_results["Train"]["rmse"])
by_grouping: Dict[str, Dict[str, List[float]]] = {}
# switch to partition-based splits
parameters["max_cat_to_onehot"] = USE_PART
parameters["reg_lambda"] = 0
m = xgb.DMatrix(cat, label, enable_categorical=True)
xgb.train(
parameters,
m,
num_boost_round=rounds,
evals=[(m, "Train")],
evals_result=by_grouping,
)
rmse_oh = by_builtin_results["Train"]["rmse"]
rmse_group = by_grouping["Train"]["rmse"]
# always better or equal to onehot when there's no regularization.
for a, b in zip(rmse_oh, rmse_group):
assert a >= b
parameters["reg_lambda"] = 1.0
by_grouping = {}
xgb.train(
parameters,
m,
num_boost_round=32,
evals=[(m, "Train")],
evals_result=by_grouping,
)
assert tm.non_increasing(by_grouping["Train"]["rmse"]), by_grouping
def check_categorical_missing(
rows: int, cols: int, cats: int, device: str, tree_method: str
) -> None:
"""Check categorical data with missing values."""
parameters: Dict[str, Any] = {"tree_method": tree_method, "device": device}
cat, label = tm.make_categorical(
rows, n_features=cols, n_categories=cats, onehot=False, sparsity=0.5
)
Xy = xgb.DMatrix(cat, label, enable_categorical=True)
def run(max_cat_to_onehot: int) -> None:
# Test with onehot splits
parameters["max_cat_to_onehot"] = max_cat_to_onehot
evals_result: Dict[str, Dict] = {}
booster = xgb.train(
parameters,
Xy,
num_boost_round=16,
evals=[(Xy, "Train")],
evals_result=evals_result,
)
assert tm.non_increasing(evals_result["Train"]["rmse"])
y_predt = booster.predict(Xy)
rmse = tm.root_mean_square(label, y_predt)
np.testing.assert_allclose(rmse, evals_result["Train"]["rmse"][-1], rtol=2e-5)
# Test with OHE split
run(USE_ONEHOT)
# Test with partition-based split
run(USE_PART)
def train_result(
param: Dict[str, Any], dmat: xgb.DMatrix, num_rounds: int
) -> Dict[str, Any]:
"""Get training result from parameters and data."""
result: Dict[str, Any] = {}
booster = xgb.train(
param,
dmat,
num_rounds,
evals=[(dmat, "train")],
verbose_eval=False,
evals_result=result,
)
assert booster.num_features() == dmat.num_col()
assert booster.num_boosted_rounds() == num_rounds
assert booster.feature_names == dmat.feature_names
assert booster.feature_types == dmat.feature_types
return result

View File

@@ -28,17 +28,6 @@ from .core import (
_CVFolds = Sequence["CVPack"]
def _assert_new_callback(callbacks: Optional[Sequence[TrainingCallback]]) -> None:
is_new_callback: bool = not callbacks or all(
isinstance(c, TrainingCallback) for c in callbacks
)
if not is_new_callback:
link = "https://xgboost.readthedocs.io/en/latest/python/callbacks.html"
raise ValueError(
f"Old style callback was removed in version 1.6. See: {link}."
)
def _configure_custom_metric(
feval: Optional[Metric], custom_metric: Optional[Metric]
) -> Optional[Metric]:
@@ -170,7 +159,6 @@ def train(
bst = Booster(params, [dtrain] + [d[0] for d in evals], model_file=xgb_model)
start_iteration = 0
_assert_new_callback(callbacks)
if verbose_eval:
verbose_eval = 1 if verbose_eval is True else verbose_eval
callbacks.append(EvaluationMonitor(period=verbose_eval))
@@ -247,7 +235,7 @@ class _PackedBooster:
result = [f.eval(iteration, feval, output_margin) for f in self.cvfolds]
return result
def set_attr(self, **kwargs: Optional[str]) -> Any:
def set_attr(self, **kwargs: Optional[Any]) -> Any:
"""Iterate through folds for setting attributes"""
for f in self.cvfolds:
f.bst.set_attr(**kwargs)
@@ -274,11 +262,20 @@ class _PackedBooster:
"""Get best_iteration"""
return int(cast(int, self.cvfolds[0].bst.attr("best_iteration")))
@best_iteration.setter
def best_iteration(self, iteration: int) -> None:
"""Get best_iteration"""
self.set_attr(best_iteration=iteration)
@property
def best_score(self) -> float:
"""Get best_score."""
return float(cast(float, self.cvfolds[0].bst.attr("best_score")))
@best_score.setter
def best_score(self, score: float) -> None:
self.set_attr(best_score=score)
def groups_to_rows(groups: List[np.ndarray], boundaries: np.ndarray) -> np.ndarray:
"""
@@ -551,7 +548,6 @@ def cv(
# setup callbacks
callbacks = [] if callbacks is None else copy.copy(list(callbacks))
_assert_new_callback(callbacks)
if verbose_eval:
verbose_eval = 1 if verbose_eval is True else verbose_eval