407 lines
16 KiB
ReStructuredText
407 lines
16 KiB
ReStructuredText
#############################
|
|
Distributed XGBoost with Dask
|
|
#############################
|
|
|
|
`Dask <https://dask.org>`_ is a parallel computing library built on Python. Dask allows
|
|
easy management of distributed workers and excels at handling large distributed data science
|
|
workflows. The implementation in XGBoost originates from `dask-xgboost
|
|
<https://github.com/dask/dask-xgboost>`_ with some extended functionalities and a
|
|
different interface. Right now it is still under construction and may change (with proper
|
|
warnings) in the future. The tutorial here focuses on basic usage of dask with CPU tree
|
|
algorithms. For an overview of GPU based training and internal workings, see `A New,
|
|
Official Dask API for XGBoost
|
|
<https://medium.com/rapids-ai/a-new-official-dask-api-for-xgboost-e8b10f3d1eb7>`_.
|
|
|
|
**Contents**
|
|
|
|
.. contents::
|
|
:backlinks: none
|
|
:local:
|
|
|
|
************
|
|
Requirements
|
|
************
|
|
|
|
Dask can be installed using either pip or conda (see the dask `installation
|
|
documentation <https://docs.dask.org/en/latest/install.html>`_ for more information). For
|
|
accelerating XGBoost with GPUs, `dask-cuda <https://github.com/rapidsai/dask-cuda>`_ is
|
|
recommended for creating GPU clusters.
|
|
|
|
|
|
********
|
|
Overview
|
|
********
|
|
|
|
A dask cluster consists of three different components: a centralized scheduler, one or
|
|
more workers, and one or more clients which act as the user-facing entry point for submitting
|
|
tasks to the cluster. When using XGBoost with dask, one needs to call the XGBoost dask interface
|
|
from the client side. Below is a small example which illustrates basic usage of running XGBoost
|
|
on a dask cluster:
|
|
|
|
.. code-block:: python
|
|
|
|
import xgboost as xgb
|
|
import dask.array as da
|
|
import dask.distributed
|
|
|
|
cluster = dask.distributed.LocalCluster(n_workers=4, threads_per_worker=1)
|
|
client = dask.distributed.Client(cluster)
|
|
|
|
# X and y must be Dask dataframes or arrays
|
|
num_obs = 1e5
|
|
num_features = 20
|
|
X = da.random.random(
|
|
size=(num_obs, num_features),
|
|
chunks=(1000, num_features)
|
|
)
|
|
y = da.random.random(
|
|
size=(num_obs, 1),
|
|
chunks=(1000, 1)
|
|
)
|
|
|
|
dtrain = xgb.dask.DaskDMatrix(client, X, y)
|
|
|
|
output = xgb.dask.train(client,
|
|
{'verbosity': 2,
|
|
'tree_method': 'hist',
|
|
'objective': 'reg:squarederror'
|
|
},
|
|
dtrain,
|
|
num_boost_round=4, evals=[(dtrain, 'train')])
|
|
|
|
Here we first create a cluster in single-node mode with ``dask.distributed.LocalCluster``, then
|
|
connect a ``dask.distributed.Client`` to this cluster, setting up an environment for later computation.
|
|
|
|
We then create a ``DaskDMatrix`` object and pass it to ``train``, along with some other parameters,
|
|
much like XGBoost's normal, non-dask interface. Unlike that interface, ``data`` and ``label`` must
|
|
be either `Dask DataFrame <https://examples.dask.org/dataframe.html>`_ or
|
|
`Dask Array <https://examples.dask.org/array.html>`_ instances.
|
|
|
|
The primary difference with XGBoost's dask interface is
|
|
we pass our dask client as an additional argument for carrying out the computation. Note that if
|
|
client is set to ``None``, XGBoost will use the default client returned by dask.
|
|
|
|
There are two sets of APIs implemented in XGBoost. The first set is functional API
|
|
illustrated in above example. Given the data and a set of parameters, the ``train`` function
|
|
returns a model and the computation history as a Python dictionary:
|
|
|
|
.. code-block:: python
|
|
|
|
{'booster': Booster,
|
|
'history': dict}
|
|
|
|
For prediction, pass the ``output`` returned by ``train`` into ``xgb.dask.predict``:
|
|
|
|
.. code-block:: python
|
|
|
|
prediction = xgb.dask.predict(client, output, dtrain)
|
|
|
|
Or equivalently, pass ``output['booster']``:
|
|
|
|
.. code-block:: python
|
|
|
|
prediction = xgb.dask.predict(client, output['booster'], dtrain)
|
|
|
|
Here ``prediction`` is a dask ``Array`` object containing predictions from model.
|
|
|
|
Alternatively, XGBoost also implements the Scikit-Learn interface with ``DaskXGBClassifier``
|
|
and ``DaskXGBRegressor``. See ``xgboost/demo/dask`` for more examples.
|
|
|
|
|
|
******************
|
|
Running prediction
|
|
******************
|
|
|
|
In previous example we used ``DaskDMatrix`` as input to ``predict`` function. In
|
|
practice, it's also possible to call ``predict`` function directly on dask collections
|
|
like ``Array`` and ``DataFrame`` and might have better prediction performance. When
|
|
``DataFrame`` is used as prediction input, the result is a dask ``Series`` instead of
|
|
array. Also, there's inplace predict support on dask interface, which can help reducing
|
|
both memory usage and prediction time.
|
|
|
|
.. code-block:: python
|
|
|
|
# dtrain is the DaskDMatrix defined above.
|
|
prediction = xgb.dask.predict(client, booster, dtrain)
|
|
|
|
or equivalently:
|
|
|
|
.. code-block:: python
|
|
|
|
# where X is a dask DataFrame or dask Array.
|
|
prediction = xgb.dask.predict(client, booster, X)
|
|
|
|
Also for inplace prediction:
|
|
|
|
.. code-block:: python
|
|
|
|
booster.set_param({'predictor': 'gpu_predictor'})
|
|
# where X is a dask DataFrame or dask Array.
|
|
prediction = xgb.dask.inplace_predict(client, booster, X)
|
|
|
|
|
|
***************************
|
|
Working with other clusters
|
|
***************************
|
|
|
|
``LocalCluster`` is mostly used for testing. In real world applications some other
|
|
clusters might be preferred. Examples are like ``LocalCUDACluster`` for single node
|
|
multi-GPU instance, manually launched cluster by using command line utilities like
|
|
``dask-worker`` from ``distributed`` for not yet automated environments. Some special
|
|
clusters like ``KubeCluster`` from ``dask-kubernetes`` package are also possible. The
|
|
dask API in xgboost is orthogonal to the cluster type and can be used with any of them. A
|
|
typical testing workflow with ``KubeCluster`` looks like this:
|
|
|
|
.. code-block:: python
|
|
|
|
from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package
|
|
from dask.distributed import Client
|
|
import xgboost as xgb
|
|
import dask
|
|
import dask.array as da
|
|
|
|
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer",
|
|
"kubernetes.scheduler-service-wait-timeout": 360,
|
|
"distributed.comm.timeouts.connect": 360})
|
|
|
|
|
|
def main():
|
|
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
|
|
m = 1000
|
|
n = 10
|
|
kWorkers = 2 # assuming you have 2 GPU nodes on that cluster.
|
|
# You need to work out the worker-spec youself. See document in dask_kubernetes for
|
|
# its usage. Here we just want to show that XGBoost works on various clusters.
|
|
cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote')
|
|
cluster.scale(kWorkers) # scale to use all GPUs
|
|
|
|
with Client(cluster) as client:
|
|
X = da.random.random(size=(m, n), chunks=100)
|
|
y = da.random.random(size=(m, ), chunks=100)
|
|
|
|
regressor = xgb.dask.DaskXGBRegressor(n_estimators=10, missing=0.0)
|
|
regressor.client = client
|
|
regressor.set_params(tree_method='gpu_hist')
|
|
regressor.fit(X, y, eval_set=[(X, y)])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Launch the kube cluster on somewhere like GKE, then run this as client process.
|
|
# main function will connect to that cluster and start training xgboost model.
|
|
main()
|
|
|
|
|
|
However, these clusters might have their subtle differences like network configuration, or
|
|
specific cluster implementation might contains bugs that we are not aware of. Open an
|
|
issue if such case is found and there's no documentation on how to resolve it in that
|
|
cluster implementation.
|
|
|
|
*******
|
|
Threads
|
|
*******
|
|
|
|
XGBoost has built in support for parallel computation through threads by the setting
|
|
``nthread`` parameter (``n_jobs`` for scikit-learn). If these parameters are set, they
|
|
will override the configuration in Dask. For example:
|
|
|
|
.. code-block:: python
|
|
|
|
with dask.distributed.LocalCluster(n_workers=7, threads_per_worker=4) as cluster:
|
|
|
|
There are 4 threads allocated for each dask worker. Then by default XGBoost will use 4
|
|
threads in each process for both training and prediction. But if ``nthread`` parameter is
|
|
set:
|
|
|
|
.. code-block:: python
|
|
|
|
output = xgb.dask.train(client,
|
|
{'verbosity': 1,
|
|
'nthread': 8,
|
|
'tree_method': 'hist'},
|
|
dtrain,
|
|
num_boost_round=4, evals=[(dtrain, 'train')])
|
|
|
|
XGBoost will use 8 threads in each training process.
|
|
|
|
********************
|
|
Working with asyncio
|
|
********************
|
|
|
|
.. versionadded:: 1.2.0
|
|
|
|
XGBoost's dask interface supports the new ``asyncio`` in Python and can be integrated into
|
|
asynchronous workflows. For using dask with asynchronous operations, please refer to
|
|
`this dask example <https://examples.dask.org/applications/async-await.html>`_ and document in
|
|
`distributed <https://distributed.dask.org/en/latest/asynchronous.html>`_. To use XGBoost's
|
|
dask interface asynchronously, the ``client`` which is passed as an argument for training and
|
|
prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when the
|
|
``client`` is created (example below). All functions (including ``DaskDMatrix``) provided
|
|
by the functional interface will then return coroutines which can then be awaited to retrieve
|
|
their result.
|
|
|
|
Functional interface:
|
|
|
|
.. code-block:: python
|
|
|
|
async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
|
|
X, y = generate_array()
|
|
m = await xgb.dask.DaskDMatrix(client, X, y)
|
|
output = await xgb.dask.train(client, {}, dtrain=m)
|
|
|
|
with_m = await xgb.dask.predict(client, output, m)
|
|
with_X = await xgb.dask.predict(client, output, X)
|
|
inplace = await xgb.dask.inplace_predict(client, output, X)
|
|
|
|
# Use `client.compute` instead of the `compute` method from dask collection
|
|
print(await client.compute(with_m))
|
|
|
|
|
|
While for the Scikit-Learn interface, trivial methods like ``set_params`` and accessing class
|
|
attributes like ``evals_result_`` do not require ``await``. Other methods involving
|
|
actual computation will return a coroutine and hence require awaiting:
|
|
|
|
.. code-block:: python
|
|
|
|
async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
|
|
X, y = generate_array()
|
|
regressor = await xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2)
|
|
regressor.set_params(tree_method='hist') # trivial method, synchronous operation
|
|
regressor.client = client # accessing attribute, synchronous operation
|
|
regressor = await regressor.fit(X, y, eval_set=[(X, y)])
|
|
prediction = await regressor.predict(X)
|
|
|
|
# Use `client.compute` instead of the `compute` method from dask collection
|
|
print(await client.compute(prediction))
|
|
|
|
*****************************
|
|
Evaluation and Early Stopping
|
|
*****************************
|
|
|
|
.. versionadded:: 1.3.0
|
|
|
|
The Dask interface allows the use of validation sets that are stored in distributed collections (Dask DataFrame or Dask Array). These can be used for evaluation and early stopping.
|
|
|
|
To enable early stopping, pass one or more validation sets containing ``DaskDMatrix`` objects.
|
|
|
|
.. code-block:: python
|
|
|
|
import dask.array as da
|
|
import xgboost as xgb
|
|
|
|
num_rows = 1e6
|
|
num_features = 100
|
|
num_partitions = 10
|
|
rows_per_chunk = num_rows / num_partitions
|
|
|
|
data = da.random.random(
|
|
size=(num_rows, num_features),
|
|
chunks=(rows_per_chunk, num_features)
|
|
)
|
|
|
|
labels = da.random.random(
|
|
size=(num_rows, 1),
|
|
chunks=(rows_per_chunk, 1)
|
|
)
|
|
|
|
X_eval = da.random.random(
|
|
size=(num_rows, num_features),
|
|
chunks=(rows_per_chunk, num_features)
|
|
)
|
|
|
|
y_eval = da.random.random(
|
|
size=(num_rows, 1),
|
|
chunks=(rows_per_chunk, 1)
|
|
)
|
|
|
|
dtrain = xgb.dask.DaskDMatrix(
|
|
client=client,
|
|
data=data,
|
|
label=labels
|
|
)
|
|
|
|
dvalid = xgb.dask.DaskDMatrix(
|
|
client=client,
|
|
data=X_eval,
|
|
label=y_eval
|
|
)
|
|
|
|
result = xgb.dask.train(
|
|
client=client,
|
|
params={
|
|
"objective": "reg:squarederror",
|
|
},
|
|
dtrain=dtrain,
|
|
num_boost_round=10,
|
|
evals=[(dvalid, "valid1")],
|
|
early_stopping_rounds=3
|
|
)
|
|
|
|
When validation sets are provided to ``xgb.dask.train()`` in this way, the model object returned by ``xgb.dask.train()`` contains a history of evaluation metrics for each validation set, across all boosting rounds.
|
|
|
|
.. code-block:: python
|
|
|
|
print(result["history"])
|
|
# {'valid1': OrderedDict([('rmse', [0.28857, 0.28858, 0.288592, 0.288598])])}
|
|
|
|
If early stopping is enabled by also passing ``early_stopping_rounds``, you can check the best iteration in the returned booster.
|
|
|
|
.. code-block:: python
|
|
|
|
booster = result["booster"]
|
|
print(booster.best_iteration)
|
|
best_model = booster[: booster.best_iteration]
|
|
|
|
*****************************************************************************
|
|
Why is the initialization of ``DaskDMatrix`` so slow and throws weird errors
|
|
*****************************************************************************
|
|
|
|
The dask API in XGBoost requires construction of ``DaskDMatrix``. With the Scikit-Learn
|
|
interface, ``DaskDMatrix`` is implicitly constructed for all input data during the ``fit`` or
|
|
``predict`` steps. You might have observed that ``DaskDMatrix`` construction can take large amounts of time,
|
|
and sometimes throws errors that don't seem to be relevant to ``DaskDMatrix``. Here is a
|
|
brief explanation for why. By default most dask computations are `lazily evaluated
|
|
<https://docs.dask.org/en/latest/user-interfaces.html#laziness-and-computing>`_, which
|
|
means that computation is not carried out until you explicitly ask for a result by, for example,
|
|
calling ``compute()``. See the previous link for details in dask, and `this wiki
|
|
<https://en.wikipedia.org/wiki/Lazy_evaluation>`_ for information on the general concept of lazy evaluation.
|
|
The ``DaskDMatrix`` constructor forces lazy computations to be evaluated, which means it's
|
|
where all your earlier computation actually being carried out, including operations like
|
|
``dd.read_csv()``. To isolate the computation in ``DaskDMatrix`` from other lazy
|
|
computations, one can explicitly wait for results of input data before constructing a ``DaskDMatrix``.
|
|
Also dask's `diagnostics dashboard <https://distributed.dask.org/en/latest/web.html>`_ can be used to
|
|
monitor what operations are currently being performed.
|
|
|
|
************
|
|
Memory Usage
|
|
************
|
|
|
|
Here are some pratices on reducing memory usage with dask and xgboost.
|
|
|
|
- In a distributed work flow, data is best loaded by dask collections directly instead of
|
|
loaded by client process. When loading with client process is unavoidable, use
|
|
``client.scatter`` to distribute data from client process to workers. See [2] for a
|
|
nice summary.
|
|
|
|
- When using GPU input, like dataframe loaded by ``dask_cudf``, you can try
|
|
``xgboost.dask.DaskDeviceQuantileDMatrix`` as a drop in replacement for ``DaskDMatrix``
|
|
to reduce overall memory usage. See ``demo/dask/gpu_training.py`` for an example.
|
|
|
|
- Use inplace prediction when possible.
|
|
|
|
References:
|
|
|
|
#. https://github.com/dask/dask/issues/6833
|
|
#. https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array
|
|
|
|
***********
|
|
Limitations
|
|
***********
|
|
|
|
Basic functionality including model training and generating classification and regression predictions
|
|
have been implemented. However, there are still some other limitations we haven't
|
|
addressed yet:
|
|
|
|
- Label encoding for the ``DaskXGBClassifier`` classifier may not be supported. So users need
|
|
to encode their training labels into discrete values first.
|
|
- Ranking is not yet supported.
|