Jiaming Yuan fcfeb4959c
Deprecate positional arguments. (#6365)
Deprecate positional arguments in following functions:

- `__init__` for all classes in sklearn module.
- `fit` method for all classes in sklearn module.
- dask interface.
- `set_info` for `DMatrix` class.

Refactor the evaluation matrices handling.
2020-11-13 11:10:30 +08:00

1274 lines
47 KiB
Python

# pylint: disable=too-many-arguments, too-many-locals
# pylint: disable=missing-class-docstring, invalid-name
# pylint: disable=too-many-lines
"""Dask extensions for distributed training. See
https://xgboost.readthedocs.io/en/latest/tutorials/dask.html for simple
tutorial. Also xgboost/demo/dask for some examples.
There are two sets of APIs in this module, one is the functional API including
``train`` and ``predict`` methods. Another is stateful Scikit-Learner wrapper
inherited from single-node Scikit-Learn interface.
The implementation is heavily influenced by dask_xgboost:
https://github.com/dask/dask-xgboost
"""
import platform
import logging
from collections import defaultdict
from collections.abc import Sequence
from threading import Thread
from typing import List
import numpy
from . import rabit
from .compat import LazyLoader
from .compat import sparse, scipy_sparse
from .compat import PANDAS_INSTALLED, DataFrame, Series, pandas_concat
from .compat import CUDF_concat
from .compat import lazy_isinstance
from .core import DMatrix, DeviceQuantileDMatrix, Booster, _expect, DataIter
from .core import _deprecate_positional_args
from .training import train as worker_train
from .tracker import RabitTracker
from .sklearn import XGBModel, XGBRegressorBase, XGBClassifierBase
from .sklearn import xgboost_model_doc
dd = LazyLoader('dd', globals(), 'dask.dataframe')
da = LazyLoader('da', globals(), 'dask.array')
dask = LazyLoader('dask', globals(), 'dask')
distributed = LazyLoader('distributed', globals(), 'dask.distributed')
# Current status is considered as initial support, many features are
# not properly supported yet.
#
# TODOs:
# - Callback.
# - Label encoding.
# - CV
# - Ranking
#
# Note for developers:
# As of writing asyncio is still a new feature of Python and in depth
# documentation is rare. Best examples of various asyncio tricks are in dask
# (luckily). Classes like Client, Worker are awaitable. Some general rules
# for the implementation here:
# - Synchronous world is different from asynchronous one, and they don't
# mix well.
# - Write everything with async, then use distributed Client sync function
# to do the switch.
LOGGER = logging.getLogger('[xgboost.dask]')
def _start_tracker(n_workers):
"""Start Rabit tracker """
env = {'DMLC_NUM_WORKER': n_workers}
import socket
host = socket.gethostbyname(socket.gethostname())
rabit_context = RabitTracker(hostIP=host, nslave=n_workers)
env.update(rabit_context.slave_envs())
rabit_context.start(n_workers)
thread = Thread(target=rabit_context.join)
thread.daemon = True
thread.start()
return env
def _assert_dask_support():
try:
import dask # pylint: disable=W0621,W0611
except ImportError as e:
raise ImportError(
'Dask needs to be installed in order to use this module') from e
if platform.system() == 'Windows':
msg = 'Windows is not officially supported for dask/xgboost,'
msg += ' contribution are welcomed.'
LOGGER.warning(msg)
class RabitContext:
'''A context controling rabit initialization and finalization.'''
def __init__(self, args):
self.args = args
worker = distributed.get_worker()
self.args.append(
('DMLC_TASK_ID=[xgboost.dask]:' + str(worker.address)).encode())
def __enter__(self):
rabit.init(self.args)
LOGGER.debug('-------------- rabit say hello ------------------')
def __exit__(self, *args):
rabit.finalize()
LOGGER.debug('--------------- rabit say bye ------------------')
def concat(value): # pylint: disable=too-many-return-statements
'''To be replaced with dask builtin.'''
if isinstance(value[0], numpy.ndarray):
return numpy.concatenate(value, axis=0)
if scipy_sparse and isinstance(value[0], scipy_sparse.spmatrix):
return scipy_sparse.vstack(value, format='csr')
if sparse and isinstance(value[0], sparse.SparseArray):
return sparse.concatenate(value, axis=0)
if PANDAS_INSTALLED and isinstance(value[0], (DataFrame, Series)):
return pandas_concat(value, axis=0)
if lazy_isinstance(value[0], 'cudf.core.dataframe', 'DataFrame') or \
lazy_isinstance(value[0], 'cudf.core.series', 'Series'):
return CUDF_concat(value, axis=0)
if lazy_isinstance(value[0], 'cupy.core.core', 'ndarray'):
import cupy # pylint: disable=import-error
# pylint: disable=c-extension-no-member,no-member
d = cupy.cuda.runtime.getDevice()
for v in value:
d_v = v.device.id
assert d_v == d, 'Concatenating arrays on different devices.'
return cupy.concatenate(value, axis=0)
return dd.multi.concat(list(value), axis=0)
def _xgb_get_client(client):
'''Simple wrapper around testing None.'''
if not isinstance(client, (type(distributed.get_client()), type(None))):
raise TypeError(
_expect([type(distributed.get_client()), type(None)], type(client)))
ret = distributed.get_client() if client is None else client
return ret
# From the implementation point of view, DaskDMatrix complicates a lots of
# things. A large portion of the code base is about syncing and extracting
# stuffs from DaskDMatrix. But having an independent data structure gives us a
# chance to perform some specialized optimizations, like building histogram
# index directly.
class DaskDMatrix:
# pylint: disable=missing-docstring, too-many-instance-attributes
'''DMatrix holding on references to Dask DataFrame or Dask Array. Constructing
a `DaskDMatrix` forces all lazy computation to be carried out. Wait for
the input data explicitly if you want to see actual computation of
constructing `DaskDMatrix`.
.. note::
DaskDMatrix does not repartition or move data between workers. It's
the caller's responsibility to balance the data.
.. versionadded:: 1.0.0
Parameters
----------
client: dask.distributed.Client
Specify the dask client used for training. Use default client
returned from dask if it's set to None.
data : dask.array.Array/dask.dataframe.DataFrame
data source of DMatrix.
label: dask.array.Array/dask.dataframe.DataFrame
label used for trainin.
missing : float, optional
Value in the input data (e.g. `numpy.ndarray`) which needs
to be present as a missing value. If None, defaults to np.nan.
weight : dask.array.Array/dask.dataframe.DataFrame
Weight for each instance.
base_margin : dask.array.Array/dask.dataframe.DataFrame
Global bias for each instance.
label_lower_bound : dask.array.Array/dask.dataframe.DataFrame
Upper bound for survival training.
label_upper_bound : dask.array.Array/dask.dataframe.DataFrame
Lower bound for survival training.
feature_names : list, optional
Set names for features.
feature_types : list, optional
Set types for features
'''
def __init__(self,
client,
data,
label=None,
missing=None,
weight=None,
base_margin=None,
label_lower_bound=None,
label_upper_bound=None,
feature_names=None,
feature_types=None):
_assert_dask_support()
client: distributed.Client = _xgb_get_client(client)
self.feature_names = feature_names
self.feature_types = feature_types
self.missing = missing
if len(data.shape) != 2:
raise ValueError(
'Expecting 2 dimensional input, got: {shape}'.format(
shape=data.shape))
if not isinstance(data, (dd.DataFrame, da.Array)):
raise TypeError(_expect((dd.DataFrame, da.Array), type(data)))
if not isinstance(label, (dd.DataFrame, da.Array, dd.Series,
type(None))):
raise TypeError(
_expect((dd.DataFrame, da.Array, dd.Series), type(label)))
self.worker_map = None
self.is_quantile = False
self._init = client.sync(self.map_local_data,
client, data, label=label, weights=weight,
base_margin=base_margin,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound)
def __await__(self):
return self._init.__await__()
async def map_local_data(self, client, data, label=None, weights=None,
base_margin=None,
label_lower_bound=None, label_upper_bound=None):
'''Obtain references to local data.'''
def inconsistent(left, left_name, right, right_name):
msg = 'Partitions between {a_name} and {b_name} are not ' \
'consistent: {a_len} != {b_len}. ' \
'Please try to repartition/rechunk your data.'.format(
a_name=left_name, b_name=right_name, a_len=len(left),
b_len=len(right)
)
return msg
def check_columns(parts):
# x is required to be 2 dim in __init__
assert parts.ndim == 1 or parts.shape[1], 'Data should be' \
' partitioned by row. To avoid this specify the number' \
' of columns for your dask Array explicitly. e.g.' \
' chunks=(partition_size, X.shape[1])'
data = data.persist()
for meta in [label, weights, base_margin, label_lower_bound,
label_upper_bound]:
if meta is not None:
meta = meta.persist()
# Breaking data into partitions, a trick borrowed from dask_xgboost.
# `to_delayed` downgrades high-level objects into numpy or pandas
# equivalents.
X_parts = data.to_delayed()
if isinstance(X_parts, numpy.ndarray):
check_columns(X_parts)
X_parts = X_parts.flatten().tolist()
def flatten_meta(meta):
if meta is not None:
meta_parts = meta.to_delayed()
if isinstance(meta_parts, numpy.ndarray):
check_columns(meta_parts)
meta_parts = meta_parts.flatten().tolist()
return meta_parts
return None
y_parts = flatten_meta(label)
w_parts = flatten_meta(weights)
margin_parts = flatten_meta(base_margin)
ll_parts = flatten_meta(label_lower_bound)
lu_parts = flatten_meta(label_upper_bound)
parts = [X_parts]
meta_names = []
def append_meta(m_parts, name: str):
if m_parts is not None:
assert len(X_parts) == len(
m_parts), inconsistent(X_parts, 'X', m_parts, name)
parts.append(m_parts)
meta_names.append(name)
append_meta(y_parts, 'labels')
append_meta(w_parts, 'weights')
append_meta(margin_parts, 'base_margin')
append_meta(ll_parts, 'label_lower_bound')
append_meta(lu_parts, 'label_upper_bound')
# At this point, `parts` looks like:
# [(x0, x1, ..), (y0, y1, ..), ..] in delayed form
# delay the zipped result
parts = list(map(dask.delayed, zip(*parts)))
# At this point, the mental model should look like:
# [(x0, y0, ..), (x1, y1, ..), ..] in delayed form
parts = client.compute(parts)
await distributed.wait(parts) # async wait for parts to be computed
for part in parts:
assert part.status == 'finished'
# Preserving the partition order for prediction.
self.partition_order = {}
for i, part in enumerate(parts):
self.partition_order[part.key] = i
key_to_partition = {part.key: part for part in parts}
who_has = await client.scheduler.who_has(keys=[part.key for part in parts])
worker_map = defaultdict(list)
for key, workers in who_has.items():
worker_map[next(iter(workers))].append(key_to_partition[key])
self.worker_map = worker_map
self.meta_names = meta_names
return self
def create_fn_args(self, worker_addr: str):
'''Create a dictionary of objects that can be pickled for function
arguments.
'''
return {'feature_names': self.feature_names,
'feature_types': self.feature_types,
'meta_names': self.meta_names,
'missing': self.missing,
'parts': self.worker_map.get(worker_addr, None),
'is_quantile': self.is_quantile}
def _get_worker_parts_ordered(meta_names, list_of_keys, list_of_parts, partition_order):
# List of partitions like: [(x3, y3, w3, m3, ..), ..], order is not preserved.
assert isinstance(list_of_parts, list)
list_of_parts_value = list_of_parts
result = []
for i, _ in enumerate(list_of_parts):
data = list_of_parts_value[i][0]
labels = None
weights = None
base_margin = None
label_lower_bound = None
label_upper_bound = None
# Iterate through all possible meta info, brings small overhead as in xgboost
# there are constant number of meta info available.
for j, blob in enumerate(list_of_parts_value[i][1:]):
if meta_names[j] == 'labels':
labels = blob
elif meta_names[j] == 'weights':
weights = blob
elif meta_names[j] == 'base_margin':
base_margin = blob
elif meta_names[j] == 'label_lower_bound':
label_lower_bound = blob
elif meta_names[j] == 'label_upper_bound':
label_upper_bound = blob
else:
raise ValueError('Unknown metainfo:', meta_names[j])
if partition_order:
result.append((data, labels, weights, base_margin, label_lower_bound,
label_upper_bound, partition_order[list_of_keys[i]]))
else:
result.append((data, labels, weights, base_margin, label_lower_bound,
label_upper_bound))
return result
def _unzip(list_of_parts):
return list(zip(*list_of_parts))
def _get_worker_parts(list_of_parts: List[tuple], meta_names):
partitions = _get_worker_parts_ordered(meta_names, None, list_of_parts, None)
partitions = _unzip(partitions)
return partitions
class DaskPartitionIter(DataIter): # pylint: disable=R0902
'''A data iterator for `DaskDeviceQuantileDMatrix`.
'''
def __init__(self, data, label=None, weight=None, base_margin=None,
label_lower_bound=None, label_upper_bound=None,
feature_names=None, feature_types=None):
self._data = data
self._labels = label
self._weights = weight
self._base_margin = base_margin
self._label_lower_bound = label_lower_bound
self._label_upper_bound = label_upper_bound
self._feature_names = feature_names
self._feature_types = feature_types
assert isinstance(self._data, Sequence)
types = (Sequence, type(None))
assert isinstance(self._labels, types)
assert isinstance(self._weights, types)
assert isinstance(self._base_margin, types)
assert isinstance(self._label_lower_bound, types)
assert isinstance(self._label_upper_bound, types)
self._iter = 0 # set iterator to 0
super().__init__()
def data(self):
'''Utility function for obtaining current batch of data.'''
return self._data[self._iter]
def labels(self):
'''Utility function for obtaining current batch of label.'''
if self._labels is not None:
return self._labels[self._iter]
return None
def weights(self):
'''Utility function for obtaining current batch of label.'''
if self._weights is not None:
return self._weights[self._iter]
return None
def base_margins(self):
'''Utility function for obtaining current batch of base_margin.'''
if self._base_margin is not None:
return self._base_margin[self._iter]
return None
def label_lower_bounds(self):
'''Utility function for obtaining current batch of label_lower_bound.
'''
if self._label_lower_bound is not None:
return self._label_lower_bound[self._iter]
return None
def label_upper_bounds(self):
'''Utility function for obtaining current batch of label_upper_bound.
'''
if self._label_upper_bound is not None:
return self._label_upper_bound[self._iter]
return None
def reset(self):
'''Reset the iterator'''
self._iter = 0
def next(self, input_data):
'''Yield next batch of data'''
if self._iter == len(self._data):
# Return 0 when there's no more batch.
return 0
if self._feature_names:
feature_names = self._feature_names
else:
if hasattr(self.data(), 'columns'):
feature_names = self.data().columns.format()
else:
feature_names = None
input_data(data=self.data(), label=self.labels(),
weight=self.weights(), group=None,
label_lower_bound=self.label_lower_bounds(),
label_upper_bound=self.label_upper_bounds(),
feature_names=feature_names,
feature_types=self._feature_types)
self._iter += 1
return 1
class DaskDeviceQuantileDMatrix(DaskDMatrix):
'''Specialized data type for `gpu_hist` tree method. This class is
used to reduce the memory usage by eliminating data copies.
Internally the data is merged by weighted GK sketching. So the
number of partitions from dask may affect training accuracy as GK
generates error for each merge.
.. versionadded:: 1.2.0
Parameters
----------
max_bin: Number of bins for histogram construction.
'''
def __init__(self, client,
data,
label=None,
missing=None,
weight=None,
base_margin=None,
label_lower_bound=None,
label_upper_bound=None,
feature_names=None,
feature_types=None,
max_bin=256):
super().__init__(client=client, data=data, label=label,
missing=missing,
weight=weight, base_margin=base_margin,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound,
feature_names=feature_names,
feature_types=feature_types)
self.max_bin = max_bin
self.is_quantile = True
def create_fn_args(self, worker_addr: str):
args = super().create_fn_args(worker_addr)
args['max_bin'] = self.max_bin
return args
def _create_device_quantile_dmatrix(feature_names, feature_types,
meta_names, missing, parts,
max_bin):
worker = distributed.get_worker()
if parts is None:
msg = 'worker {address} has an empty DMatrix. '.format(
address=worker.address)
LOGGER.warning(msg)
import cupy # pylint: disable=import-error
d = DeviceQuantileDMatrix(cupy.zeros((0, 0)),
feature_names=feature_names,
feature_types=feature_types,
max_bin=max_bin)
return d
(data, labels, weights, base_margin,
label_lower_bound, label_upper_bound) = _get_worker_parts(
parts, meta_names)
it = DaskPartitionIter(data=data, label=labels, weight=weights,
base_margin=base_margin,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound)
dmatrix = DeviceQuantileDMatrix(it,
missing=missing,
feature_names=feature_names,
feature_types=feature_types,
nthread=worker.nthreads,
max_bin=max_bin)
return dmatrix
def _create_dmatrix(feature_names, feature_types, meta_names, missing, parts):
'''Get data that local to worker from DaskDMatrix.
Returns
-------
A DMatrix object.
'''
worker = distributed.get_worker()
list_of_parts = parts
if list_of_parts is None:
msg = 'worker {address} has an empty DMatrix. '.format(address=worker.address)
LOGGER.warning(msg)
d = DMatrix(numpy.empty((0, 0)),
feature_names=feature_names,
feature_types=feature_types)
return d
def concat_or_none(data):
if any([part is None for part in data]):
return None
return concat(data)
(data, labels, weights, base_margin,
label_lower_bound, label_upper_bound) = _get_worker_parts(list_of_parts, meta_names)
labels = concat_or_none(labels)
weights = concat_or_none(weights)
base_margin = concat_or_none(base_margin)
label_lower_bound = concat_or_none(label_lower_bound)
label_upper_bound = concat_or_none(label_upper_bound)
data = concat(data)
dmatrix = DMatrix(data,
labels,
missing=missing,
feature_names=feature_names,
feature_types=feature_types,
nthread=worker.nthreads)
dmatrix.set_info(base_margin=base_margin, weight=weights,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound)
return dmatrix
def _dmatrix_from_list_of_parts(is_quantile, **kwargs):
if is_quantile:
return _create_device_quantile_dmatrix(**kwargs)
return _create_dmatrix(**kwargs)
async def _get_rabit_args(n_workers: int, client):
'''Get rabit context arguments from data distribution in DaskDMatrix.'''
env = await client.run_on_scheduler(_start_tracker, n_workers)
rabit_args = [('%s=%s' % item).encode() for item in env.items()]
return rabit_args
# train and predict methods are supposed to be "functional", which meets the
# dask paradigm. But as a side effect, the `evals_result` in single-node API
# is no longer supported since it mutates the input parameter, and it's not
# intuitive to sync the mutation result. Therefore, a dictionary containing
# evaluation history is instead returned.
def _get_workers_from_data(dtrain: DaskDMatrix, evals=()):
X_worker_map = set(dtrain.worker_map.keys())
if evals:
for e in evals:
assert len(e) == 2
assert isinstance(e[0], DaskDMatrix) and isinstance(e[1], str)
worker_map = set(e[0].worker_map.keys())
X_worker_map.union(worker_map)
return X_worker_map
async def _train_async(client,
params,
dtrain: DaskDMatrix,
*args,
evals=(),
early_stopping_rounds=None,
**kwargs):
if 'evals_result' in kwargs.keys():
raise ValueError(
'evals_result is not supported in dask interface.',
'The evaluation history is returned as result of training.')
workers = list(_get_workers_from_data(dtrain, evals))
_rabit_args = await _get_rabit_args(len(workers), client)
def dispatched_train(worker_addr, rabit_args, dtrain_ref, dtrain_idt, evals_ref):
'''Perform training on a single worker. A local function prevents pickling.
'''
LOGGER.info('Training on %s', str(worker_addr))
worker = distributed.get_worker()
with RabitContext(rabit_args):
local_dtrain = _dmatrix_from_list_of_parts(**dtrain_ref)
local_evals = []
if evals_ref:
for ref, name, idt in evals_ref:
if idt == dtrain_idt:
local_evals.append((local_dtrain, name))
continue
local_evals.append((_dmatrix_from_list_of_parts(**ref), name))
local_history = {}
local_param = params.copy() # just to be consistent
msg = 'Overriding `nthreads` defined in dask worker.'
override = ['nthread', 'n_jobs']
for p in override:
val = local_param.get(p, None)
if val is not None and val != worker.nthreads:
LOGGER.info(msg)
else:
local_param[p] = worker.nthreads
bst = worker_train(params=local_param,
dtrain=local_dtrain,
*args,
evals_result=local_history,
evals=local_evals,
early_stopping_rounds=early_stopping_rounds,
**kwargs)
ret = {'booster': bst, 'history': local_history}
if local_dtrain.num_row() == 0:
ret = None
return ret
# Note for function purity:
# XGBoost is deterministic in most of the cases, which means train function is
# supposed to be idempotent. One known exception is gblinear with shotgun updater.
# We haven't been able to do a full verification so here we keep pure to be False.
futures = []
for i, worker_addr in enumerate(workers):
if evals:
evals_per_worker = [(e.create_fn_args(worker_addr), name, id(e))
for e, name in evals]
else:
evals_per_worker = []
f = client.submit(dispatched_train,
worker_addr,
_rabit_args,
dtrain.create_fn_args(workers[i]),
id(dtrain),
evals_per_worker,
pure=False)
futures.append(f)
results = await client.gather(futures)
return list(filter(lambda ret: ret is not None, results))[0]
def train(client, params, dtrain, *args, evals=(), early_stopping_rounds=None,
**kwargs):
'''Train XGBoost model.
.. versionadded:: 1.0.0
Parameters
----------
client: dask.distributed.Client
Specify the dask client used for training. Use default client
returned from dask if it's set to None.
\\*\\*kwargs:
Other parameters are the same as `xgboost.train` except for
`evals_result`, which is returned as part of function return value
instead of argument.
Returns
-------
results: dict
A dictionary containing trained booster and evaluation history.
`history` field is the same as `eval_result` from `xgboost.train`.
.. code-block:: python
{'booster': xgboost.Booster,
'history': {'train': {'logloss': ['0.48253', '0.35953']},
'eval': {'logloss': ['0.480385', '0.357756']}}}
'''
_assert_dask_support()
client = _xgb_get_client(client)
return client.sync(
_train_async, client, params, dtrain=dtrain, *args, evals=evals,
early_stopping_rounds=early_stopping_rounds, **kwargs)
async def _direct_predict_impl(client, data, predict_fn):
if isinstance(data, da.Array):
predictions = await client.submit(
da.map_blocks,
predict_fn, data, False, drop_axis=1,
dtype=numpy.float32
).result()
return predictions
if isinstance(data, dd.DataFrame):
predictions = await client.submit(
dd.map_partitions,
predict_fn, data, True,
meta=dd.utils.make_meta({'prediction': 'f4'})
).result()
return predictions.iloc[:, 0]
raise TypeError('data of type: ' + str(type(data)) +
' is not supported by direct prediction')
# pylint: disable=too-many-statements
async def _predict_async(client, model, data, missing=numpy.nan, **kwargs):
if isinstance(model, Booster):
booster = model
elif isinstance(model, dict):
booster = model['booster']
else:
raise TypeError(_expect([Booster, dict], type(model)))
if not isinstance(data, (DaskDMatrix, da.Array, dd.DataFrame)):
raise TypeError(_expect([DaskDMatrix, da.Array, dd.DataFrame],
type(data)))
def mapped_predict(partition, is_df):
worker = distributed.get_worker()
booster.set_param({'nthread': worker.nthreads})
m = DMatrix(partition, missing=missing, nthread=worker.nthreads)
predt = booster.predict(m, validate_features=False, **kwargs)
if is_df:
if lazy_isinstance(partition, 'cudf', 'core.dataframe.DataFrame'):
import cudf # pylint: disable=import-error
predt = cudf.DataFrame(predt, columns=['prediction'])
else:
predt = DataFrame(predt, columns=['prediction'])
return predt
# Predict on dask collection directly.
if isinstance(data, (da.Array, dd.DataFrame)):
return await _direct_predict_impl(client, data, mapped_predict)
# Prediction on dask DMatrix.
worker_map = data.worker_map
partition_order = data.partition_order
feature_names = data.feature_names
feature_types = data.feature_types
missing = data.missing
meta_names = data.meta_names
def dispatched_predict(worker_id, list_of_keys, list_of_parts):
'''Perform prediction on each worker.'''
LOGGER.info('Predicting on %d', worker_id)
c = distributed.get_client()
list_of_keys = c.compute(list_of_keys).result()
worker = distributed.get_worker()
list_of_parts = _get_worker_parts_ordered(
meta_names, list_of_keys, list_of_parts, partition_order)
predictions = []
booster.set_param({'nthread': worker.nthreads})
for parts in list_of_parts:
(data, _, _, base_margin, _, _, order) = parts
local_part = DMatrix(
data,
base_margin=base_margin,
feature_names=feature_names,
feature_types=feature_types,
missing=missing,
nthread=worker.nthreads
)
predt = booster.predict(
data=local_part,
validate_features=local_part.num_row() != 0,
**kwargs)
columns = 1 if len(predt.shape) == 1 else predt.shape[1]
ret = ((dask.delayed(predt), columns), order)
predictions.append(ret)
return predictions
def dispatched_get_shape(worker_id, list_of_keys, list_of_parts):
'''Get shape of data in each worker.'''
LOGGER.info('Get shape on %d', worker_id)
c = distributed.get_client()
list_of_keys = c.compute(list_of_keys).result()
list_of_parts = _get_worker_parts_ordered(
meta_names,
list_of_keys,
list_of_parts,
partition_order,
)
shapes = []
for parts in list_of_parts:
(data, _, _, _, _, _, order) = parts
shapes.append((data.shape, order))
return shapes
async def map_function(func):
'''Run function for each part of the data.'''
futures = []
workers_address = list(worker_map.keys())
for wid, worker_addr in enumerate(workers_address):
worker_addr = workers_address[wid]
list_of_parts = worker_map[worker_addr]
list_of_keys = [part.key for part in list_of_parts]
f = await client.submit(func, worker_id=wid,
list_of_keys=dask.delayed(list_of_keys),
list_of_parts=list_of_parts,
pure=False, workers=[worker_addr])
futures.append(f)
# Get delayed objects
results = await client.gather(futures)
# flatten into 1 dim list
results = [t for list_per_worker in results for t in list_per_worker]
# sort by order, l[0] is the delayed object, l[1] is its order
results = sorted(results, key=lambda l: l[1])
results = [predt for predt, order in results] # remove order
return results
results = await map_function(dispatched_predict)
shapes = await map_function(dispatched_get_shape)
# Constructing a dask array from list of numpy arrays
# See https://docs.dask.org/en/latest/array-creation.html
arrays = []
for i, shape in enumerate(shapes):
arrays.append(da.from_delayed(
results[i][0], shape=(shape[0],)
if results[i][1] == 1 else (shape[0], results[i][1]),
dtype=numpy.float32))
predictions = await da.concatenate(arrays, axis=0)
return predictions
def predict(client, model, data, missing=numpy.nan, **kwargs):
'''Run prediction with a trained booster.
.. note::
Only default prediction mode is supported right now.
.. versionadded:: 1.0.0
Parameters
----------
client: dask.distributed.Client
Specify the dask client used for training. Use default client
returned from dask if it's set to None.
model: A Booster or a dictionary returned by `xgboost.dask.train`.
The trained model.
data: DaskDMatrix/dask.dataframe.DataFrame/dask.array.Array
Input data used for prediction. When input is a dataframe object,
prediction output is a series.
missing: float
Used when input data is not DaskDMatrix. Specify the value
considered as missing.
Returns
-------
prediction: dask.array.Array/dask.dataframe.Series
'''
_assert_dask_support()
client = _xgb_get_client(client)
return client.sync(_predict_async, client, model, data,
missing=missing, **kwargs)
async def _inplace_predict_async(client, model, data,
iteration_range=(0, 0),
predict_type='value',
missing=numpy.nan):
client = _xgb_get_client(client)
if isinstance(model, Booster):
booster = model
elif isinstance(model, dict):
booster = model['booster']
else:
raise TypeError(_expect([Booster, dict], type(model)))
if not isinstance(data, (da.Array, dd.DataFrame)):
raise TypeError(_expect([da.Array, dd.DataFrame], type(data)))
def mapped_predict(data, is_df):
worker = distributed.get_worker()
booster.set_param({'nthread': worker.nthreads})
prediction = booster.inplace_predict(
data,
iteration_range=iteration_range,
predict_type=predict_type,
missing=missing)
if is_df:
if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'):
import cudf # pylint: disable=import-error
prediction = cudf.DataFrame({'prediction': prediction},
dtype=numpy.float32)
else:
# If it's from pandas, the partition is a numpy array
prediction = DataFrame(prediction, columns=['prediction'],
dtype=numpy.float32)
return prediction
return await _direct_predict_impl(client, data, mapped_predict)
def inplace_predict(client, model, data,
iteration_range=(0, 0),
predict_type='value',
missing=numpy.nan):
'''Inplace prediction.
.. versionadded:: 1.1.0
Parameters
----------
client: dask.distributed.Client
Specify the dask client used for training. Use default client
returned from dask if it's set to None.
model: Booster/dict
The trained model.
iteration_range: tuple
Specify the range of trees used for prediction.
predict_type: str
* 'value': Normal prediction result.
* 'margin': Output the raw untransformed margin value.
missing: float
Value in the input data which needs to be present as a missing
value. If None, defaults to np.nan.
Returns
-------
prediction: dask.array.Array
'''
_assert_dask_support()
client = _xgb_get_client(client)
return client.sync(_inplace_predict_async, client, model=model, data=data,
iteration_range=iteration_range,
predict_type=predict_type,
missing=missing)
async def _evaluation_matrices(client, validation_set, sample_weight, missing):
'''
Parameters
----------
validation_set: list of tuples
Each tuple contains a validation dataset including input X and label y.
E.g.:
.. code-block:: python
[(X_0, y_0), (X_1, y_1), ... ]
sample_weights: list of arrays
The weight vector for validation data.
Returns
-------
evals: list of validation DMatrix
'''
evals = []
if validation_set is not None:
assert isinstance(validation_set, list)
for i, e in enumerate(validation_set):
w = (sample_weight[i] if sample_weight is not None else None)
dmat = await DaskDMatrix(client=client, data=e[0], label=e[1],
weight=w, missing=missing)
evals.append((dmat, 'validation_{}'.format(i)))
else:
evals = None
return evals
class DaskScikitLearnBase(XGBModel):
'''Base class for implementing scikit-learn interface with Dask'''
_client = None
# pylint: disable=arguments-differ
@_deprecate_positional_args
def fit(self, X, y, *,
sample_weight=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
early_stopping_rounds=None,
verbose=True):
'''Fit the regressor.
Parameters
----------
X : array_like
Feature matrix
y : array_like
Labels
sample_weight : array_like
instance weights
eval_set : list, optional
A list of (X, y) tuple pairs to use as validation sets, for which
metrics will be computed.
Validation metrics will help us track the performance of the model.
sample_weight_eval_set : list, optional
A list of the form [L_1, L_2, ..., L_n], where each L_i is a list
of group weights on the i-th validation set.
early_stopping_rounds : int
Activates early stopping.
verbose : bool
If `verbose` and an evaluation set is used, writes the evaluation
metric measured on the validation set to stderr.'''
raise NotImplementedError
def predict(self, data): # pylint: disable=arguments-differ
'''Predict with `data`.
Parameters
----------
data: data that can be used to construct a DaskDMatrix
Returns
-------
prediction : dask.array.Array'''
raise NotImplementedError
def __await__(self):
# Generate a coroutine wrapper to make this class awaitable.
async def _():
return self
return self.client.sync(_).__await__()
@property
def client(self):
'''The dask client used in this model.'''
client = _xgb_get_client(self._client)
return client
@client.setter
def client(self, clt):
self._client = clt
@xgboost_model_doc("""Implementation of the Scikit-Learn API for XGBoost.""",
['estimators', 'model'])
class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase):
# pylint: disable=missing-class-docstring
async def _fit_async(self, X, y, sample_weight, base_margin, eval_set,
sample_weight_eval_set, early_stopping_rounds,
verbose):
dtrain = await DaskDMatrix(client=self.client,
data=X,
label=y,
weight=sample_weight,
base_margin=base_margin,
missing=self.missing)
params = self.get_xgb_params()
evals = await _evaluation_matrices(self.client, eval_set,
sample_weight_eval_set,
self.missing)
results = await train(client=self.client,
params=params,
dtrain=dtrain,
num_boost_round=self.get_num_boosting_rounds(),
evals=evals,
verbose_eval=verbose,
early_stopping_rounds=early_stopping_rounds)
self._Booster = results['booster']
# pylint: disable=attribute-defined-outside-init
self.evals_result_ = results['history']
return self
# pylint: disable=missing-docstring
@_deprecate_positional_args
def fit(self,
X,
y,
*,
sample_weight=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
early_stopping_rounds=None,
verbose=True):
_assert_dask_support()
return self.client.sync(self._fit_async,
X=X,
y=y,
sample_weight=sample_weight,
base_margin=base_margin,
eval_set=eval_set,
sample_weight_eval_set=sample_weight_eval_set,
early_stopping_rounds=early_stopping_rounds,
verbose=verbose)
async def _predict_async(
self, data, output_margin=False, base_margin=None):
test_dmatrix = await DaskDMatrix(
client=self.client, data=data, base_margin=base_margin,
missing=self.missing
)
pred_probs = await predict(client=self.client,
model=self.get_booster(), data=test_dmatrix,
output_margin=output_margin)
return pred_probs
# pylint: disable=arguments-differ
def predict(self, data, output_margin=False, base_margin=None):
_assert_dask_support()
return self.client.sync(self._predict_async, data,
output_margin=output_margin,
base_margin=base_margin)
@xgboost_model_doc(
'Implementation of the scikit-learn API for XGBoost classification.',
['estimators', 'model'])
class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
# pylint: disable=missing-class-docstring
async def _fit_async(self, X, y, sample_weight, base_margin, eval_set,
sample_weight_eval_set, early_stopping_rounds,
verbose):
dtrain = await DaskDMatrix(client=self.client,
data=X,
label=y,
weight=sample_weight,
base_margin=base_margin,
missing=self.missing)
params = self.get_xgb_params()
# pylint: disable=attribute-defined-outside-init
if isinstance(y, (da.Array)):
self.classes_ = await self.client.compute(da.unique(y))
else:
self.classes_ = await self.client.compute(y.drop_duplicates())
self.n_classes_ = len(self.classes_)
if self.n_classes_ > 2:
params["objective"] = "multi:softprob"
params['num_class'] = self.n_classes_
else:
params["objective"] = "binary:logistic"
evals = await _evaluation_matrices(self.client, eval_set,
sample_weight_eval_set,
self.missing)
results = await train(client=self.client,
params=params,
dtrain=dtrain,
num_boost_round=self.get_num_boosting_rounds(),
evals=evals,
early_stopping_rounds=early_stopping_rounds,
verbose_eval=verbose)
self._Booster = results['booster']
# pylint: disable=attribute-defined-outside-init
self.evals_result_ = results['history']
return self
@_deprecate_positional_args
def fit(self,
X,
y,
*,
sample_weight=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
early_stopping_rounds=None,
verbose=True):
_assert_dask_support()
return self.client.sync(self._fit_async,
X=X,
y=y,
sample_weight=sample_weight,
base_margin=base_margin,
eval_set=eval_set,
sample_weight_eval_set=sample_weight_eval_set,
early_stopping_rounds=early_stopping_rounds,
verbose=verbose)
async def _predict_proba_async(self, data, output_margin=False,
base_margin=None):
test_dmatrix = await DaskDMatrix(
client=self.client, data=data, base_margin=base_margin,
missing=self.missing
)
pred_probs = await predict(client=self.client,
model=self.get_booster(),
data=test_dmatrix,
output_margin=output_margin)
return pred_probs
# pylint: disable=arguments-differ,missing-docstring
def predict_proba(self, data, output_margin=False, base_margin=None):
_assert_dask_support()
return self.client.sync(
self._predict_proba_async,
data,
output_margin=output_margin,
base_margin=base_margin
)
async def _predict_async(self, data, output_margin=False, base_margin=None):
test_dmatrix = await DaskDMatrix(
client=self.client, data=data, base_margin=base_margin,
missing=self.missing
)
pred_probs = await predict(client=self.client,
model=self.get_booster(),
data=test_dmatrix,
output_margin=output_margin)
if self.n_classes_ == 2:
preds = (pred_probs > 0.5).astype(int)
else:
preds = da.argmax(pred_probs, axis=1)
return preds
# pylint: disable=arguments-differ
def predict(self, data, output_margin=False, base_margin=None):
_assert_dask_support()
return self.client.sync(
self._predict_async,
data,
output_margin=output_margin,
base_margin=base_margin
)