[breaking] Add prediction fucntion for DMatrix and use inplace predict for dask. (#6668)

* Add a new API function for predicting on `DMatrix`.  This function aligns
with rest of the `XGBoosterPredictFrom*` functions on semantic of function
arguments.
* Purge `ntree_limit` from libxgboost, use iteration instead.
* [dask] Use `inplace_predict` by default for dask sklearn models.
* [dask] Run prediction shape inference on worker instead of client.

The breaking change is in the Python sklearn `apply` function, I made it to be
consistent with other prediction functions where `best_iteration` is used by
default.
This commit is contained in:
Jiaming Yuan
2021-02-08 18:26:32 +08:00
committed by GitHub
parent dbb5208a0a
commit 4656b09d5d
29 changed files with 1134 additions and 604 deletions

View File

@@ -96,6 +96,24 @@ def from_cstr_to_pystr(data, length):
return res
def _convert_ntree_limit(booster, ntree_limit, iteration_range):
if ntree_limit is not None and ntree_limit != 0:
warnings.warn(
"ntree_limit is deprecated, use `iteration_range` or model "
"slicing instead.",
UserWarning
)
if iteration_range is not None and iteration_range[1] != 0:
raise ValueError(
"Only one of `iteration_range` and `ntree_limit` can be non zero."
)
num_parallel_tree, num_groups = _get_booster_layer_trees(booster)
num_parallel_tree = max([num_parallel_tree, 1])
num_groups = max([num_groups, 1])
iteration_range = (0, ntree_limit // num_parallel_tree)
return iteration_range
def _expect(expectations, got):
"""Translate input error into string.
@@ -1111,6 +1129,34 @@ Objective = Callable[[np.ndarray, DMatrix], Tuple[np.ndarray, np.ndarray]]
Metric = Callable[[np.ndarray, DMatrix], Tuple[str, float]]
def _get_booster_layer_trees(model: "Booster") -> Tuple[int, int]:
"""Get number of trees added to booster per-iteration. This function will be removed
once `best_ntree_limit` is dropped in favor of `best_iteration`. Returns
`num_parallel_tree` and `num_groups`.
"""
config = json.loads(model.save_config())
booster = config["learner"]["gradient_booster"]["name"]
if booster == "gblinear":
num_parallel_tree = 0
elif booster == "dart":
num_parallel_tree = int(
config["learner"]["gradient_booster"]["gbtree"]["gbtree_train_param"][
"num_parallel_tree"
]
)
elif booster == "gbtree":
num_parallel_tree = int(
config["learner"]["gradient_booster"]["gbtree_train_param"][
"num_parallel_tree"
]
)
else:
raise ValueError(f"Unknown booster: {booster}")
num_groups = int(config["learner"]["learner_model_param"]["num_class"])
return num_parallel_tree, num_groups
class Booster(object):
# pylint: disable=too-many-public-methods
"""A Booster of XGBoost.
@@ -1497,16 +1543,20 @@ class Booster(object):
return self.eval_set([(data, name)], iteration)
# pylint: disable=too-many-function-args
def predict(self,
data,
output_margin=False,
ntree_limit=0,
pred_leaf=False,
pred_contribs=False,
approx_contribs=False,
pred_interactions=False,
validate_features=True,
training=False):
def predict(
self,
data: DMatrix,
output_margin: bool = False,
ntree_limit: int = 0,
pred_leaf: bool = False,
pred_contribs: bool = False,
approx_contribs: bool = False,
pred_interactions: bool = False,
validate_features: bool = True,
training: bool = False,
iteration_range: Tuple[int, int] = (0, 0),
strict_shape: bool = False,
) -> np.ndarray:
"""Predict with data.
.. note:: This function is not thread safe except for ``gbtree`` booster.
@@ -1518,33 +1568,32 @@ class Booster(object):
Parameters
----------
data : DMatrix
data :
The dmatrix storing the input.
output_margin : bool
output_margin :
Whether to output the raw untransformed margin value.
ntree_limit : int
Limit number of trees in the prediction; defaults to 0 (use all
trees).
ntree_limit :
Deprecated, use `iteration_range` instead.
pred_leaf : bool
pred_leaf :
When this option is on, the output will be a matrix of (nsample,
ntrees) with each record indicating the predicted leaf index of
each sample in each tree. Note that the leaf index of a tree is
unique per tree, so you may find leaf 1 in both tree 1 and tree 0.
pred_contribs : bool
pred_contribs :
When this is True the output will be a matrix of size (nsample,
nfeats + 1) with each record indicating the feature contributions
(SHAP values) for that prediction. The sum of all feature
contributions is equal to the raw untransformed margin value of the
prediction. Note the final column is the bias term.
approx_contribs : bool
approx_contribs :
Approximate the contributions of each feature
pred_interactions : bool
pred_interactions :
When this is True the output will be a matrix of size (nsample,
nfeats + 1, nfeats + 1) indicating the SHAP interaction values for
each pair of features. The sum of each row (or column) of the
@@ -1553,17 +1602,33 @@ class Booster(object):
untransformed margin value of the prediction. Note the last row and
column correspond to the bias term.
validate_features : bool
validate_features :
When this is True, validate that the Booster's and data's
feature_names are identical. Otherwise, it is assumed that the
feature_names are the same.
training : bool
training :
Whether the prediction value is used for training. This can effect
`dart` booster, which performs dropouts during training iterations.
.. versionadded:: 1.0.0
iteration_range :
Specifies which layer of trees are used in prediction. For example, if a
random forest is trained with 100 rounds. Specifying `iteration_range=(10,
20)`, then only the forests built during [10, 20) (half open set) rounds are
used in this prediction.
.. versionadded:: 1.4.0
strict_shape :
When set to True, output shape is invariant to whether classification is used.
For both value and margin prediction, the output shape is (n_samples,
n_groups), n_groups == 1 when multi-class is not used. Default to False, in
which case the output shape can be (n_samples, ) if multi-class is not used.
.. versionadded:: 1.4.0
.. note:: Using ``predict()`` with DART booster
If the booster object is DART type, ``predict()`` will not perform
@@ -1575,64 +1640,50 @@ class Booster(object):
prediction : numpy array
"""
option_mask = 0x00
if output_margin:
option_mask |= 0x01
if pred_leaf:
option_mask |= 0x02
if pred_contribs:
option_mask |= 0x04
if approx_contribs:
option_mask |= 0x08
if pred_interactions:
option_mask |= 0x10
if not isinstance(data, DMatrix):
raise TypeError('Expecting data to be a DMatrix object, got: ',
type(data))
raise TypeError('Expecting data to be a DMatrix object, got: ', type(data))
if validate_features:
self._validate_features(data)
iteration_range = _convert_ntree_limit(self, ntree_limit, iteration_range)
args = {
"type": 0,
"training": training,
"iteration_begin": iteration_range[0],
"iteration_end": iteration_range[1],
"strict_shape": strict_shape,
}
length = c_bst_ulong()
preds = ctypes.POINTER(ctypes.c_float)()
_check_call(_LIB.XGBoosterPredict(self.handle, data.handle,
ctypes.c_int(option_mask),
ctypes.c_uint(ntree_limit),
ctypes.c_int(training),
ctypes.byref(length),
ctypes.byref(preds)))
preds = ctypes2numpy(preds, length.value, np.float32)
def assign_type(t: int) -> None:
if args["type"] != 0:
raise ValueError("One type of prediction at a time.")
args["type"] = t
if output_margin:
assign_type(1)
if pred_contribs:
assign_type(2 if not approx_contribs else 3)
if pred_interactions:
assign_type(4)
if pred_leaf:
preds = preds.astype(np.int32, copy=False)
nrow = data.num_row()
if preds.size != nrow and preds.size % nrow == 0:
chunk_size = int(preds.size / nrow)
if pred_interactions:
ngroup = int(chunk_size / ((data.num_col() + 1) *
(data.num_col() + 1)))
if ngroup == 1:
preds = preds.reshape(nrow,
data.num_col() + 1,
data.num_col() + 1)
else:
preds = preds.reshape(nrow, ngroup,
data.num_col() + 1,
data.num_col() + 1)
elif pred_contribs:
ngroup = int(chunk_size / (data.num_col() + 1))
if ngroup == 1:
preds = preds.reshape(nrow, data.num_col() + 1)
else:
preds = preds.reshape(nrow, ngroup, data.num_col() + 1)
else:
preds = preds.reshape(nrow, chunk_size)
return preds
assign_type(5)
preds = ctypes.POINTER(ctypes.c_float)()
shape = ctypes.POINTER(c_bst_ulong)()
dims = c_bst_ulong()
_check_call(
_LIB.XGBoosterPredictFromDMatrix(
self.handle,
data.handle,
from_pystr_to_cstr(json.dumps(args)),
ctypes.byref(shape),
ctypes.byref(dims),
ctypes.byref(preds)
)
)
return _prediction_output(shape, dims, preds, False)
def inplace_predict(
self,
data,
data: Any,
iteration_range: Tuple[int, int] = (0, 0),
predict_type: str = "value",
missing: float = np.nan,
@@ -1665,26 +1716,24 @@ class Booster(object):
The input data, must not be a view for numpy array. Set
``predictor`` to ``gpu_predictor`` for running prediction on CuPy
array or CuDF DataFrame.
iteration_range : tuple
Specifies which layer of trees are used in prediction. For
example, if a random forest is trained with 100 rounds. Specifying
`iteration_range=(10, 20)`, then only the forests built during [10,
20) (open set) rounds are used in this prediction.
predict_type : str
iteration_range :
See :py:meth:`xgboost.Booster.predict` for details.
predict_type :
* `value` Output model prediction values.
* `margin` Output the raw untransformed margin value.
missing : float
Value in the input data which needs to be present as a missing
value.
missing :
See :py:obj:`xgboost.DMatrix` for details.
validate_features:
See :py:meth:`xgboost.Booster.predict` for details.
base_margin:
See :py:obj:`xgboost.DMatrix` for details.
.. versionadded:: 1.4.0
strict_shape:
When set to True, output shape is invariant to whether classification is used.
For both value and margin prediction, the output shape is (n_samples,
n_groups), n_groups == 1 when multi-class is not used. Default to False, in
which case the output shape can be (n_samples, ) if multi-class is not used.
See :py:meth:`xgboost.Booster.predict` for details.
.. versionadded:: 1.4.0
Returns
-------
@@ -1772,7 +1821,7 @@ class Booster(object):
interface["mask"] = interface["mask"].__cuda_array_interface__
interface_str = bytes(json.dumps(interface, indent=2), "utf-8")
_check_call(
_LIB.XGBoosterPredictFromArrayInterface(
_LIB.XGBoosterPredictFromCudaArray(
self.handle,
interface_str,
from_pystr_to_cstr(json.dumps(args)),
@@ -1788,7 +1837,7 @@ class Booster(object):
interfaces_str = _cudf_array_interfaces(data)
_check_call(
_LIB.XGBoosterPredictFromArrayInterfaceColumns(
_LIB.XGBoosterPredictFromCudaColumnar(
self.handle,
interfaces_str,
from_pystr_to_cstr(json.dumps(args)),

View File

@@ -254,6 +254,7 @@ class DaskDMatrix:
raise TypeError(_expect((dd.DataFrame, da.Array, dd.Series), type(label)))
self._n_cols = data.shape[1]
assert isinstance(self._n_cols, int)
self.worker_map: Dict[str, "distributed.Future"] = defaultdict(list)
self.is_quantile: bool = False
@@ -881,7 +882,7 @@ async def _train_async(
return list(filter(lambda ret: ret is not None, results))[0]
def train(
def train( # pylint: disable=unused-argument
client: "distributed.Client",
params: Dict[str, Any],
dtrain: DaskDMatrix,
@@ -892,16 +893,17 @@ def train(
early_stopping_rounds: Optional[int] = None,
xgb_model: Optional[Booster] = None,
verbose_eval: Union[int, bool] = True,
callbacks: Optional[List[TrainingCallback]] = None
callbacks: Optional[List[TrainingCallback]] = None,
) -> Any:
'''Train XGBoost model.
"""Train XGBoost model.
.. versionadded:: 1.0.0
.. note::
Other parameters are the same as `xgboost.train` except for `evals_result`, which
is returned as part of function return value instead of argument.
Other parameters are the same as :py:func:`xgboost.train` except for
`evals_result`, which is returned as part of function return value instead of
argument.
Parameters
----------
@@ -920,29 +922,17 @@ def train(
{'booster': xgboost.Booster,
'history': {'train': {'logloss': ['0.48253', '0.35953']},
'eval': {'logloss': ['0.480385', '0.357756']}}}
'''
"""
_assert_dask_support()
client = _xgb_get_client(client)
# Get global configuration before transferring computation to another thread or
# process.
global_config = config.get_config()
return client.sync(_train_async,
client=client,
global_config=global_config,
num_boost_round=num_boost_round,
obj=obj,
feval=feval,
params=params,
dtrain=dtrain,
evals=evals,
early_stopping_rounds=early_stopping_rounds,
verbose_eval=verbose_eval,
xgb_model=xgb_model,
callbacks=callbacks)
return client.sync(_train_async, global_config=config.get_config(), **locals())
def _can_output_df(data: _DaskCollection, output_shape: Tuple) -> bool:
return isinstance(data, dd.DataFrame) and len(output_shape) <= 2
def _can_output_df(is_df: bool, output_shape: Tuple) -> bool:
return is_df and len(output_shape) <= 2
async def _direct_predict_impl(
@@ -954,8 +944,9 @@ async def _direct_predict_impl(
meta: Dict[int, str],
) -> _DaskCollection:
columns = list(meta.keys())
if _can_output_df(data, output_shape):
if _can_output_df(isinstance(data, dd.DataFrame), output_shape):
if base_margin is not None and isinstance(base_margin, da.Array):
# Easier for map_partitions
base_margin_df: Optional[dd.DataFrame] = base_margin.to_dask_dataframe()
else:
base_margin_df = base_margin
@@ -975,17 +966,21 @@ async def _direct_predict_impl(
if base_margin is not None and isinstance(
base_margin, (dd.Series, dd.DataFrame)
):
# Easier for map_blocks
base_margin_array: Optional[da.Array] = base_margin.to_dask_array()
else:
base_margin_array = base_margin
# Input data is 2-dim array, output can be 1(reg, binary)/2(multi-class,
# contrib)/3(contrib)/4(interaction) dims.
# contrib)/3(contrib, interaction)/4(interaction) dims.
if len(output_shape) == 1:
drop_axis: Union[int, List[int]] = [1] # drop from 2 to 1 dim.
new_axis: Union[int, List[int]] = []
else:
drop_axis = []
new_axis = [i + 2 for i in range(len(output_shape) - 2)]
if isinstance(data, dd.DataFrame):
new_axis = list(range(len(output_shape) - 2))
else:
new_axis = [i + 2 for i in range(len(output_shape) - 2)]
predictions = da.map_blocks(
mapped_predict,
booster,
@@ -1001,28 +996,21 @@ async def _direct_predict_impl(
def _infer_predict_output(
booster: Booster,
data: Union[DaskDMatrix, _DaskCollection],
inplace: bool,
**kwargs: Any
booster: Booster, features: int, is_df: bool, inplace: bool, **kwargs: Any
) -> Tuple[Tuple[int, ...], Dict[int, str]]:
"""Create a dummy test sample to infer output shape for prediction."""
if isinstance(data, DaskDMatrix):
features = data.num_col()
else:
features = data.shape[1]
assert isinstance(features, int)
rng = numpy.random.RandomState(1994)
test_sample = rng.randn(1, features)
if inplace:
# clear the state to avoid gpu_id, gpu_predictor
booster = Booster(model_file=booster.save_raw())
test_predt = booster.inplace_predict(test_sample, **kwargs)
else:
m = DMatrix(test_sample)
test_predt = booster.predict(m, **kwargs)
kwargs = kwargs.copy()
if kwargs.pop("predict_type") == "margin":
kwargs["output_margin"] = True
m = DMatrix(test_sample)
test_predt = booster.predict(m, validate_features=False, **kwargs)
n_columns = test_predt.shape[1] if len(test_predt.shape) > 1 else 1
meta: Dict[int, str] = {}
if _can_output_df(data, test_predt.shape):
if _can_output_df(is_df, test_predt.shape):
for i in range(n_columns):
meta[i] = "f4"
return test_predt.shape, meta
@@ -1034,7 +1022,7 @@ async def _get_model_future(
if isinstance(model, Booster):
booster = await client.scatter(model, broadcast=True)
elif isinstance(model, dict):
booster = await client.scatter(model["booster"])
booster = await client.scatter(model["booster"], broadcast=True)
elif isinstance(model, distributed.Future):
booster = model
if booster.type is not Booster:
@@ -1059,6 +1047,8 @@ async def _predict_async(
approx_contribs: bool,
pred_interactions: bool,
validate_features: bool,
iteration_range: Tuple[int, int],
strict_shape: bool,
) -> _DaskCollection:
_booster = await _get_model_future(client, model)
if not isinstance(data, (DaskDMatrix, da.Array, dd.DataFrame)):
@@ -1077,43 +1067,51 @@ async def _predict_async(
approx_contribs=approx_contribs,
pred_interactions=pred_interactions,
validate_features=validate_features,
iteration_range=iteration_range,
strict_shape=strict_shape,
)
if is_df and len(predt.shape) <= 2:
if _can_output_df(is_df, predt.shape):
if lazy_isinstance(partition, "cudf", "core.dataframe.DataFrame"):
import cudf
predt = cudf.DataFrame(predt, columns=columns)
predt = cudf.DataFrame(predt, columns=columns, dtype=numpy.float32)
else:
predt = DataFrame(predt, columns=columns)
predt = DataFrame(predt, columns=columns, dtype=numpy.float32)
return predt
# Predict on dask collection directly.
if isinstance(data, (da.Array, dd.DataFrame)):
_output_shape, meta = _infer_predict_output(
await _booster.result(),
data,
_output_shape, meta = await client.compute(
client.submit(
_infer_predict_output,
_booster,
features=data.shape[1],
is_df=isinstance(data, dd.DataFrame),
inplace=False,
output_margin=output_margin,
pred_leaf=pred_leaf,
pred_contribs=pred_contribs,
approx_contribs=approx_contribs,
pred_interactions=pred_interactions,
)
)
return await _direct_predict_impl(
mapped_predict, _booster, data, None, _output_shape, meta
)
output_shape, _ = await client.compute(
client.submit(
_infer_predict_output,
booster=_booster,
features=data.num_col(),
is_df=False,
inplace=False,
output_margin=output_margin,
pred_leaf=pred_leaf,
pred_contribs=pred_contribs,
approx_contribs=approx_contribs,
pred_interactions=pred_interactions,
validate_features=False,
)
return await _direct_predict_impl(
mapped_predict, _booster, data, None, _output_shape, meta
)
output_shape, _ = _infer_predict_output(
booster=await _booster.result(),
data=data,
inplace=False,
output_margin=output_margin,
pred_leaf=pred_leaf,
pred_contribs=pred_contribs,
approx_contribs=approx_contribs,
pred_interactions=pred_interactions,
validate_features=False,
)
# Prediction on dask DMatrix.
partition_order = data.partition_order
@@ -1180,7 +1178,7 @@ async def _predict_async(
futures[i], shape=(rows,) + output_shape[1:], dtype=numpy.float32
)
)
predictions = await da.concatenate(arrays, axis=0)
predictions = da.concatenate(arrays, axis=0)
return predictions
@@ -1194,15 +1192,19 @@ def predict( # pylint: disable=unused-argument
pred_contribs: bool = False,
approx_contribs: bool = False,
pred_interactions: bool = False,
validate_features: bool = True
validate_features: bool = True,
iteration_range: Tuple[int, int] = (0, 0),
strict_shape: bool = False,
) -> Any:
'''Run prediction with a trained booster.
.. note::
Using ``inplace_predict `` might be faster when meta information like
``base_margin`` is not needed. For other parameters, please see
``Booster.predict``.
Using ``inplace_predict`` might be faster when some features are not needed. See
:py:meth:`xgboost.Booster.predict` for details on various parameters. When using
``pred_interactions`` with mutli-class model, input should be ``da.Array`` or
``DaskDMatrix`` due to limitation in ``da.map_blocks``.
.. versionadded:: 1.0.0
@@ -1232,69 +1234,83 @@ def predict( # pylint: disable=unused-argument
'''
_assert_dask_support()
client = _xgb_get_client(client)
return client.sync(
_predict_async, global_config=config.get_config(), **locals()
)
return client.sync(_predict_async, global_config=config.get_config(), **locals())
async def _inplace_predict_async(
async def _inplace_predict_async( # pylint: disable=too-many-branches
client: "distributed.Client",
global_config: Dict[str, Any],
model: Union[Booster, Dict, "distributed.Future"],
data: _DaskCollection,
iteration_range: Tuple[int, int] = (0, 0),
predict_type: str = 'value',
missing: float = numpy.nan
iteration_range: Tuple[int, int],
predict_type: str,
missing: float,
validate_features: bool,
base_margin: Optional[_DaskCollection],
strict_shape: bool,
) -> _DaskCollection:
client = _xgb_get_client(client)
booster = await _get_model_future(client, model)
if not isinstance(data, (da.Array, dd.DataFrame)):
raise TypeError(_expect([da.Array, dd.DataFrame], type(data)))
if base_margin is not None and not isinstance(
data, (da.Array, dd.DataFrame, dd.Series)
):
raise TypeError(_expect([da.Array, dd.DataFrame, dd.Series], type(base_margin)))
def mapped_predict(
booster: Booster, data: Any, is_df: bool, columns: List[int], _: Any
booster: Booster, data: Any, is_df: bool, columns: List[int], base_margin: Any
) -> Any:
with config.config_context(**global_config):
prediction = booster.inplace_predict(
data,
iteration_range=iteration_range,
predict_type=predict_type,
missing=missing
missing=missing,
base_margin=base_margin,
validate_features=validate_features,
strict_shape=strict_shape,
)
if is_df and len(prediction.shape) <= 2:
if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'):
if _can_output_df(is_df, prediction.shape):
if lazy_isinstance(data, "cudf.core.dataframe", "DataFrame"):
import cudf
prediction = cudf.DataFrame(
prediction, columns=columns, dtype=numpy.float32
)
else:
# If it's from pandas, the partition is a numpy array
prediction = DataFrame(
prediction, columns=columns, dtype=numpy.float32
)
# If it's from pandas, the partition is a numpy array
prediction = DataFrame(prediction, columns=columns, dtype=numpy.float32)
return prediction
shape, meta = _infer_predict_output(
await booster.result(),
data,
True,
predict_type=predict_type,
iteration_range=iteration_range
# await turns future into value.
shape, meta = await client.compute(
client.submit(
_infer_predict_output,
booster,
features=data.shape[1],
is_df=isinstance(data, dd.DataFrame),
inplace=True,
predict_type=predict_type,
iteration_range=iteration_range,
)
)
return await _direct_predict_impl(
mapped_predict, booster, data, None, shape, meta
mapped_predict, booster, data, base_margin, shape, meta
)
def inplace_predict( # pylint: disable=unused-argument
def inplace_predict( # pylint: disable=unused-argument
client: "distributed.Client",
model: Union[TrainReturnT, Booster, "distributed.Future"],
data: _DaskCollection,
iteration_range: Tuple[int, int] = (0, 0),
predict_type: str = 'value',
missing: float = numpy.nan
predict_type: str = "value",
missing: float = numpy.nan,
validate_features: bool = True,
base_margin: Optional[_DaskCollection] = None,
strict_shape: bool = False,
) -> Any:
'''Inplace prediction.
"""Inplace prediction. See doc in :py:meth:`xgboost.Booster.inplace_predict` for details.
.. versionadded:: 1.1.0
@@ -1304,16 +1320,27 @@ def inplace_predict( # pylint: disable=unused-argument
Specify the dask client used for training. Use default client
returned from dask if it's set to None.
model:
The trained model. It can be a distributed.Future so user can
pre-scatter it onto all workers.
See :py:func:`xgboost.dask.predict` for details.
data :
dask collection.
iteration_range:
Specify the range of trees used for prediction.
See :py:meth:`xgboost.Booster.predict` for details.
predict_type:
* 'value': Normal prediction result.
* 'margin': Output the raw untransformed margin value.
See :py:meth:`xgboost.Booster.inplace_predict` for details.
missing:
Value in the input data which needs to be present as a missing
value. If None, defaults to np.nan.
base_margin:
See :py:obj:`xgboost.DMatrix` for details. Right now classifier is not well
supported with base_margin as it requires the size of base margin to be `n_classes
* n_samples`.
.. versionadded:: 1.4.0
strict_shape:
See :py:meth:`xgboost.Booster.predict` for details.
.. versionadded:: 1.4.0
Returns
-------
@@ -1322,7 +1349,7 @@ def inplace_predict( # pylint: disable=unused-argument
data is ``dask.dataframe.DataFrame``, return value can be
``dask.dataframe.Series``, ``dask.dataframe.DataFrame`` or ``dask.array.Array``,
depending on the output shape.
'''
"""
_assert_dask_support()
client = _xgb_get_client(client)
return client.sync(
@@ -1334,9 +1361,11 @@ async def _async_wrap_evaluation_matrices(
client: "distributed.Client", **kwargs: Any
) -> Tuple[DaskDMatrix, Optional[List[Tuple[DaskDMatrix, str]]]]:
"""A switch function for async environment."""
def _inner(**kwargs: Any) -> DaskDMatrix:
m = DaskDMatrix(client=client, **kwargs)
return m
train_dmatrix, evals = _wrap_evaluation_matrices(create_dmatrix=_inner, **kwargs)
train_dmatrix = await train_dmatrix
if evals is None:
@@ -1351,25 +1380,45 @@ async def _async_wrap_evaluation_matrices(
class DaskScikitLearnBase(XGBModel):
'''Base class for implementing scikit-learn interface with Dask'''
"""Base class for implementing scikit-learn interface with Dask"""
_client = None
async def _predict_async(
self, data: _DaskCollection,
output_margin: bool = False,
validate_features: bool = True,
base_margin: Optional[_DaskCollection] = None
self,
data: _DaskCollection,
output_margin: bool,
validate_features: bool,
base_margin: Optional[_DaskCollection],
iteration_range: Optional[Tuple[int, int]],
) -> Any:
test_dmatrix = await DaskDMatrix(
client=self.client, data=data, base_margin=base_margin,
missing=self.missing
)
pred_probs = await predict(client=self.client,
model=self.get_booster(), data=test_dmatrix,
output_margin=output_margin,
validate_features=validate_features)
return pred_probs
iteration_range = self._get_iteration_range(iteration_range)
if self._can_use_inplace_predict():
predts = await inplace_predict(
client=self.client,
model=self.get_booster(),
data=data,
iteration_range=iteration_range,
predict_type="margin" if output_margin else "value",
missing=self.missing,
base_margin=base_margin,
validate_features=validate_features,
)
if isinstance(predts, dd.DataFrame):
predts = predts.to_dask_array()
else:
test_dmatrix = await DaskDMatrix(
self.client, data=data, base_margin=base_margin, missing=self.missing
)
predts = await predict(
self.client,
model=self.get_booster(),
data=test_dmatrix,
output_margin=output_margin,
validate_features=validate_features,
iteration_range=iteration_range,
)
return predts
def predict(
self,
@@ -1377,26 +1426,56 @@ class DaskScikitLearnBase(XGBModel):
output_margin: bool = False,
ntree_limit: Optional[int] = None,
validate_features: bool = True,
base_margin: Optional[_DaskCollection] = None
base_margin: Optional[_DaskCollection] = None,
iteration_range: Optional[Tuple[int, int]] = None,
) -> Any:
_assert_dask_support()
msg = '`ntree_limit` is not supported on dask, use model slicing instead.'
msg = "`ntree_limit` is not supported on dask, use `iteration_range` instead."
assert ntree_limit is None, msg
return self.client.sync(
self._predict_async,
X,
output_margin=output_margin,
validate_features=validate_features,
base_margin=base_margin
base_margin=base_margin,
iteration_range=iteration_range,
)
async def _apply_async(
self,
X: _DaskCollection,
iteration_range: Optional[Tuple[int, int]] = None,
) -> Any:
iteration_range = self._get_iteration_range(iteration_range)
test_dmatrix = await DaskDMatrix(self.client, data=X, missing=self.missing)
predts = await predict(
self.client,
model=self.get_booster(),
data=test_dmatrix,
pred_leaf=True,
iteration_range=iteration_range,
)
return predts
def apply(
self,
X: _DaskCollection,
ntree_limit: Optional[int] = None,
iteration_range: Optional[Tuple[int, int]] = None,
) -> Any:
_assert_dask_support()
msg = "`ntree_limit` is not supported on dask, use `iteration_range` instead."
assert ntree_limit is None, msg
return self.client.sync(self._apply_async, X, iteration_range=iteration_range)
def __await__(self) -> Awaitable[Any]:
# Generate a coroutine wrapper to make this class awaitable.
async def _() -> Awaitable[Any]:
return self
return self.client.sync(_).__await__()
def __getstate__(self):
def __getstate__(self) -> Dict:
this = self.__dict__.copy()
if "_client" in this.keys():
del this["_client"]
@@ -1404,7 +1483,7 @@ class DaskScikitLearnBase(XGBModel):
@property
def client(self) -> "distributed.Client":
'''The dask client used in this model.'''
"""The dask client used in this model."""
client = _xgb_get_client(self._client)
return client
@@ -1494,7 +1573,7 @@ class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase):
sample_weight_eval_set: Optional[List[_DaskCollection]] = None,
base_margin_eval_set: Optional[List[_DaskCollection]] = None,
feature_weights: Optional[_DaskCollection] = None,
callbacks: Optional[List[TrainingCallback]] = None
callbacks: Optional[List[TrainingCallback]] = None,
) -> "DaskXGBRegressor":
_assert_dask_support()
args = {k: v for k, v in locals().items() if k != "self"}
@@ -1556,9 +1635,7 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
else:
obj = None
model, metric, params = self._configure_fit(
booster=xgb_model,
eval_metric=eval_metric,
params=params
booster=xgb_model, eval_metric=eval_metric, params=params
)
results = await train(
client=self.client,
@@ -1610,18 +1687,19 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
X: _DaskCollection,
validate_features: bool,
output_margin: bool,
base_margin: Optional[_DaskCollection]
base_margin: Optional[_DaskCollection],
iteration_range: Optional[Tuple[int, int]],
) -> _DaskCollection:
test_dmatrix = await DaskDMatrix(
client=self.client, data=X, base_margin=base_margin,
missing=self.missing
if iteration_range is None:
iteration_range = (0, 0)
predts = await super()._predict_async(
data=X,
output_margin=output_margin,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
)
pred_probs = await predict(client=self.client,
model=self.get_booster(),
data=test_dmatrix,
validate_features=validate_features,
output_margin=output_margin)
return _cls_predict_proba(self.objective, pred_probs, da.vstack)
return _cls_predict_proba(self.objective, predts, da.vstack)
# pylint: disable=missing-function-docstring
def predict_proba(
@@ -1630,37 +1708,49 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
ntree_limit: Optional[int] = None,
validate_features: bool = True,
output_margin: bool = False,
base_margin: Optional[_DaskCollection] = None
base_margin: Optional[_DaskCollection] = None,
iteration_range: Optional[Tuple[int, int]] = None,
) -> Any:
_assert_dask_support()
msg = '`ntree_limit` is not supported on dask, use model slicing instead.'
msg = "`ntree_limit` is not supported on dask, use `iteration_range` instead."
assert ntree_limit is None, msg
return self.client.sync(
self._predict_proba_async,
X=X,
validate_features=validate_features,
output_margin=output_margin,
base_margin=base_margin
base_margin=base_margin,
iteration_range=iteration_range,
)
predict_proba.__doc__ = XGBClassifier.predict_proba.__doc__
async def _predict_async(
self, data: _DaskCollection,
output_margin: bool = False,
validate_features: bool = True,
base_margin: Optional[_DaskCollection] = None
self,
data: _DaskCollection,
output_margin: bool,
validate_features: bool,
base_margin: Optional[_DaskCollection],
iteration_range: Optional[Tuple[int, int]],
) -> _DaskCollection:
pred_probs = await super()._predict_async(
data, output_margin, validate_features, base_margin
data, output_margin, validate_features, base_margin, iteration_range
)
if output_margin:
return pred_probs
if self.n_classes_ == 2:
if len(pred_probs.shape) == 1:
preds = (pred_probs > 0.5).astype(int)
else:
preds = da.argmax(pred_probs, axis=1)
assert len(pred_probs.shape) == 2
assert isinstance(pred_probs, da.Array)
# when using da.argmax directly, dask will construct a numpy based return
# array, which runs into error when computing GPU based prediction.
def _argmax(x: Any) -> Any:
return x.argmax(axis=1)
preds = da.map_blocks(_argmax, pred_probs, drop_axis=1)
return preds
@@ -1770,7 +1860,7 @@ class DaskXGBRanker(DaskScikitLearnBase, XGBRankerMixIn):
callbacks: Optional[List[TrainingCallback]] = None
) -> "DaskXGBRanker":
_assert_dask_support()
args = {k: v for k, v in locals().items() if k != 'self'}
args = {k: v for k, v in locals().items() if k != "self"}
return self.client.sync(self._fit_async, **args)
# FIXME(trivialfis): arguments differ due to additional parameters like group and qid.

View File

@@ -6,7 +6,8 @@ import warnings
import json
from typing import Union, Optional, List, Dict, Callable, Tuple, Any
import numpy as np
from .core import Booster, DMatrix, XGBoostError, _deprecate_positional_args
from .core import Booster, DMatrix, XGBoostError
from .core import _deprecate_positional_args, _convert_ntree_limit
from .core import Metric
from .training import train
from .data import _is_cudf_df, _is_cudf_ser, _is_cupy_array
@@ -413,8 +414,8 @@ class XGBModel(XGBModelBase):
# Simple optimization to gain speed (inspect is slow)
return self
# this concatenates kwargs into paraemters, enabling `get_params` for
# obtaining parameters from keyword paraemters.
# this concatenates kwargs into parameters, enabling `get_params` for
# obtaining parameters from keyword parameters.
for key, value in params.items():
if hasattr(self, key):
setattr(self, key, value)
@@ -747,26 +748,45 @@ class XGBModel(XGBModelBase):
self._set_evaluation_result(evals_result)
return self
def _can_use_inplace_predict(self) -> bool:
# When predictor is explicitly set, using `inplace_predict` might result into
# error with incompatible data type.
# Inplace predict doesn't handle as many data types as DMatrix, but it's
# sufficient for dask interface where input is simpiler.
params = self.get_params()
booster = self.booster
if params.get("predictor", None) is None and (
booster is None or booster == "gbtree"
):
return True
return False
def _get_iteration_range(
self, iteration_range: Optional[Tuple[int, int]]
) -> Tuple[int, int]:
if (iteration_range is None or iteration_range[1] == 0):
# Use best_iteration if defined.
try:
iteration_range = (0, self.best_iteration + 1)
except AttributeError:
iteration_range = (0, 0)
if self.booster == "gblinear":
iteration_range = (0, 0)
return iteration_range
def predict(
self,
X,
output_margin=False,
ntree_limit=None,
validate_features=True,
base_margin=None
base_margin=None,
iteration_range=None,
):
"""
Predict with `X`.
.. note:: This function is not thread safe.
For each booster object, predict can only be called from one thread.
If you want to run prediction using multiple thread, call ``xgb.copy()`` to make copies
of model object and then call ``predict()``.
.. code-block:: python
preds = bst.predict(dtest, ntree_limit=num_round)
.. note:: This function is only thread safe for `gbtree`
Parameters
----------
@@ -775,37 +795,40 @@ class XGBModel(XGBModelBase):
output_margin : bool
Whether to output the raw untransformed margin value.
ntree_limit : int
Limit number of trees in the prediction; defaults to best_ntree_limit if
defined (i.e. it has been trained with early stopping), otherwise 0 (use all
trees).
Deprecated, use `iteration_range` instead.
validate_features : bool
When this is True, validate that the Booster's and data's feature_names are identical.
Otherwise, it is assumed that the feature_names are the same.
When this is True, validate that the Booster's and data's feature_names are
identical. Otherwise, it is assumed that the feature_names are the same.
base_margin : array_like
Margin added to prediction.
iteration_range :
Specifies which layer of trees are used in prediction. For example, if a
random forest is trained with 100 rounds. Specifying `iteration_range=(10,
20)`, then only the forests built during [10, 20) (half open set) rounds are
used in this prediction.
.. versionadded:: 1.4.0
Returns
-------
prediction : numpy array
"""
# pylint: disable=missing-docstring,invalid-name
test_dmatrix = DMatrix(X, base_margin=base_margin,
missing=self.missing, nthread=self.n_jobs)
# get ntree_limit to use - if none specified, default to
# best_ntree_limit if defined, otherwise 0.
if ntree_limit is None:
try:
ntree_limit = self.best_ntree_limit
except AttributeError:
ntree_limit = 0
iteration_range = _convert_ntree_limit(
self.get_booster(), ntree_limit, iteration_range
)
iteration_range = self._get_iteration_range(iteration_range)
test = DMatrix(
X, base_margin=base_margin, missing=self.missing, nthread=self.n_jobs
)
return self.get_booster().predict(
test_dmatrix,
data=test,
iteration_range=iteration_range,
output_margin=output_margin,
ntree_limit=ntree_limit,
validate_features=validate_features
validate_features=validate_features,
)
def apply(self, X, ntree_limit=0):
def apply(
self, X, ntree_limit: int = 0, iteration_range: Optional[Tuple[int, int]] = None
) -> np.ndarray:
"""Return the predicted leaf every tree for each sample.
Parameters
@@ -823,10 +846,16 @@ class XGBModel(XGBModelBase):
leaf x ends up in. Leaves are numbered within
``[0; 2**(self.max_depth+1))``, possibly with gaps in the numbering.
"""
iteration_range = _convert_ntree_limit(
self.get_booster(), ntree_limit, iteration_range
)
iteration_range = self._get_iteration_range(iteration_range)
test_dmatrix = DMatrix(X, missing=self.missing, nthread=self.n_jobs)
return self.get_booster().predict(test_dmatrix,
pred_leaf=True,
ntree_limit=ntree_limit)
return self.get_booster().predict(
test_dmatrix,
pred_leaf=True,
iteration_range=iteration_range
)
def evals_result(self):
"""Return the evaluation results.
@@ -945,8 +974,7 @@ class XGBModel(XGBModelBase):
'Coefficients are not defined for Booster type {}'
.format(self.booster))
b = self.get_booster()
coef = np.array(json.loads(
b.get_dump(dump_format='json')[0])['weight'])
coef = np.array(json.loads(b.get_dump(dump_format='json')[0])['weight'])
# Logic for multiclass classification
n_classes = getattr(self, 'n_classes_', None)
if n_classes is not None:
@@ -1157,14 +1185,16 @@ class XGBClassifier(XGBModel, XGBClassifierBase):
output_margin=False,
ntree_limit=None,
validate_features=True,
base_margin=None
base_margin=None,
iteration_range: Optional[Tuple[int, int]] = None,
):
class_probs = super().predict(
X=X,
output_margin=output_margin,
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin
base_margin=base_margin,
iteration_range=iteration_range,
)
if output_margin:
# If output_margin is active, simply return the scores
@@ -1180,29 +1210,34 @@ class XGBClassifier(XGBModel, XGBClassifierBase):
return self._le.inverse_transform(column_indexes)
return column_indexes
def predict_proba(self, X, ntree_limit=None, validate_features=False,
base_margin=None):
def predict_proba(
self,
X,
ntree_limit=None,
validate_features=False,
base_margin=None,
iteration_range: Optional[Tuple[int, int]] = None,
) -> np.ndarray:
""" Predict the probability of each `X` example being of a given class.
.. note:: This function is not thread safe
For each booster object, predict can only be called from one
thread. If you want to run prediction using multiple thread, call
``xgb.copy()`` to make copies of model object and then call predict
.. note:: This function is only thread safe for `gbtree`
Parameters
----------
X : array_like
Feature matrix.
ntree_limit : int
Limit number of trees in the prediction; defaults to best_ntree_limit if
defined (i.e. it has been trained with early stopping), otherwise 0 (use all
trees).
Deprecated, use `iteration_range` instead.
validate_features : bool
When this is True, validate that the Booster's and data's feature_names are
identical. Otherwise, it is assumed that the feature_names are the same.
base_margin : array_like
Margin added to prediction.
iteration_range :
Specifies which layer of trees are used in prediction. For example, if a
random forest is trained with 100 rounds. Specifying `iteration_range=(10,
20)`, then only the forests built during [10, 20) (half open set) rounds are
used in this prediction.
Returns
-------
@@ -1215,7 +1250,8 @@ class XGBClassifier(XGBModel, XGBClassifierBase):
output_margin=False,
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin
base_margin=base_margin,
iteration_range=iteration_range
)
return _cls_predict_proba(self.objective, class_probs, np.vstack)

View File

@@ -4,9 +4,8 @@
"""Training Library containing training routines."""
import warnings
import copy
import json
import numpy as np
from .core import Booster, XGBoostError
from .core import Booster, XGBoostError, _get_booster_layer_trees
from .compat import (SKLEARN_INSTALLED, XGBStratifiedKFold)
from . import callback
@@ -91,24 +90,7 @@ def _train_internal(params, dtrain,
# These should be moved into callback functions `after_training`, but until old
# callbacks are removed, the train function is the only place for setting the
# attributes.
config = json.loads(bst.save_config())
booster = config['learner']['gradient_booster']['name']
if booster == 'gblinear':
num_parallel_tree = 0
elif booster == 'dart':
num_parallel_tree = int(
config['learner']['gradient_booster']['gbtree']['gbtree_train_param'][
'num_parallel_tree'
]
)
elif booster == 'gbtree':
num_parallel_tree = int(
config['learner']['gradient_booster']['gbtree_train_param'][
'num_parallel_tree']
)
else:
raise ValueError(f'Unknown booster: {booster}')
num_parallel_tree, _ = _get_booster_layer_trees(bst)
if bst.attr('best_score') is not None:
bst.best_score = float(bst.attr('best_score'))
bst.best_iteration = int(bst.attr('best_iteration'))