[dask] Update document. [skip ci] (#6413)
This commit is contained in:
parent
c120822a24
commit
00218d065a
@ -107,6 +107,95 @@ Here ``prediction`` is a dask ``Array`` object containing predictions from model
|
|||||||
Alternatively, XGBoost also implements the Scikit-Learn interface with ``DaskXGBClassifier``
|
Alternatively, XGBoost also implements the Scikit-Learn interface with ``DaskXGBClassifier``
|
||||||
and ``DaskXGBRegressor``. See ``xgboost/demo/dask`` for more examples.
|
and ``DaskXGBRegressor``. See ``xgboost/demo/dask`` for more examples.
|
||||||
|
|
||||||
|
|
||||||
|
******************
|
||||||
|
Running prediction
|
||||||
|
******************
|
||||||
|
|
||||||
|
In previous example we used ``DaskDMatrix`` as input to ``predict`` function. In
|
||||||
|
practice, it's also possible to call ``predict`` function directly on dask collections
|
||||||
|
like ``Array`` and ``DataFrame`` and might have better prediction performance. When
|
||||||
|
``DataFrame`` is used as prediction input, the result is a dask ``Series`` instead of
|
||||||
|
array. Also, there's inplace predict support on dask interface, which can help reducing
|
||||||
|
both memory usage and prediction time.
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
# dtrain is the DaskDMatrix defined above.
|
||||||
|
prediction = xgb.dask.predict(client, booster, dtrain)
|
||||||
|
|
||||||
|
or equivalently:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
# where X is a dask DataFrame or dask Array.
|
||||||
|
prediction = xgb.dask.predict(client, booster, X)
|
||||||
|
|
||||||
|
Also for inplace prediction:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
booster.set_param({'predictor': 'gpu_predictor'})
|
||||||
|
# where X is a dask DataFrame or dask Array.
|
||||||
|
prediction = xgb.dask.inplace_predict(client, booster, X)
|
||||||
|
|
||||||
|
|
||||||
|
***************************
|
||||||
|
Working with other clusters
|
||||||
|
***************************
|
||||||
|
|
||||||
|
``LocalCluster`` is mostly used for testing. In real world applications some other
|
||||||
|
clusters might be preferred. Examples are like ``LocalCUDACluster`` for single node
|
||||||
|
multi-GPU instance, manually launched cluster by using command line utilities like
|
||||||
|
``dask-worker`` from ``distributed`` for not yet automated environments. Some special
|
||||||
|
clusters like ``KubeCluster`` from ``dask-kubernetes`` package are also possible. The
|
||||||
|
dask API in xgboost is orthogonal to the cluster type and can be used with any of them. A
|
||||||
|
typical testing workflow with ``KubeCluster`` looks like this:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package
|
||||||
|
from dask.distributed import Client
|
||||||
|
import xgboost as xgb
|
||||||
|
import dask
|
||||||
|
import dask.array as da
|
||||||
|
|
||||||
|
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer",
|
||||||
|
"kubernetes.scheduler-service-wait-timeout": 360,
|
||||||
|
"distributed.comm.timeouts.connect": 360})
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
|
||||||
|
m = 1000
|
||||||
|
n = 10
|
||||||
|
kWorkers = 2 # assuming you have 2 GPU nodes on that cluster.
|
||||||
|
# You need to work out the worker-spec youself. See document in dask_kubernetes for
|
||||||
|
# its usage. Here we just want to show that XGBoost works on various clusters.
|
||||||
|
cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote')
|
||||||
|
cluster.scale(kWorkers) # scale to use all GPUs
|
||||||
|
|
||||||
|
with Client(cluster) as client:
|
||||||
|
X = da.random.random(size=(m, n), chunks=100)
|
||||||
|
y = da.random.random(size=(m, ), chunks=100)
|
||||||
|
|
||||||
|
regressor = xgb.dask.DaskXGBRegressor(n_estimators=10, missing=0.0)
|
||||||
|
regressor.client = client
|
||||||
|
regressor.set_params(tree_method='gpu_hist')
|
||||||
|
regressor.fit(X, y, eval_set=[(X, y)])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# Launch the kube cluster on somewhere like GKE, then run this as client process.
|
||||||
|
# main function will connect to that cluster and start training xgboost model.
|
||||||
|
main()
|
||||||
|
|
||||||
|
|
||||||
|
However, these clusters might have their subtle differences like network configuration, or
|
||||||
|
specific cluster implementation might contains bugs that we are not aware of. Open an
|
||||||
|
issue if such case is found and there's no documentation on how to resolve it in that
|
||||||
|
cluster implementation.
|
||||||
|
|
||||||
*******
|
*******
|
||||||
Threads
|
Threads
|
||||||
*******
|
*******
|
||||||
@ -184,10 +273,6 @@ actual computation will return a coroutine and hence require awaiting:
|
|||||||
# Use `client.compute` instead of the `compute` method from dask collection
|
# Use `client.compute` instead of the `compute` method from dask collection
|
||||||
print(await client.compute(prediction))
|
print(await client.compute(prediction))
|
||||||
|
|
||||||
Be careful that XGBoost uses all the workers supplied by the ``client`` object. If you
|
|
||||||
are training on GPU cluster and have 2 GPUs, the client object passed to XGBoost should
|
|
||||||
return 2 workers.
|
|
||||||
|
|
||||||
*****************************************************************************
|
*****************************************************************************
|
||||||
Why is the initialization of ``DaskDMatrix`` so slow and throws weird errors
|
Why is the initialization of ``DaskDMatrix`` so slow and throws weird errors
|
||||||
*****************************************************************************
|
*****************************************************************************
|
||||||
@ -208,6 +293,28 @@ computations, one can explicitly wait for results of input data before construct
|
|||||||
Also dask's `diagnostics dashboard <https://distributed.dask.org/en/latest/web.html>`_ can be used to
|
Also dask's `diagnostics dashboard <https://distributed.dask.org/en/latest/web.html>`_ can be used to
|
||||||
monitor what operations are currently being performed.
|
monitor what operations are currently being performed.
|
||||||
|
|
||||||
|
************
|
||||||
|
Memory Usage
|
||||||
|
************
|
||||||
|
|
||||||
|
Here are some pratices on reducing memory usage with dask and xgboost.
|
||||||
|
|
||||||
|
- In a distributed work flow, data is best loaded by dask collections directly instead of
|
||||||
|
loaded by client process. When loading with client process is unavoidable, use
|
||||||
|
``client.scatter`` to distribute data from client process to workers. See [2] for a
|
||||||
|
nice summary.
|
||||||
|
|
||||||
|
- When using GPU input, like dataframe loaded by ``dask_cudf``, you can try
|
||||||
|
``xgboost.dask.DaskDeviceQuantileDMatrix`` as a drop in replacement for ``DaskDMatrix``
|
||||||
|
to reduce overall memory usage. See ``demo/dask/gpu_training.py`` for an example.
|
||||||
|
|
||||||
|
- Use inplace prediction when possible.
|
||||||
|
|
||||||
|
References:
|
||||||
|
|
||||||
|
#. https://github.com/dask/dask/issues/6833
|
||||||
|
#. https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array
|
||||||
|
|
||||||
***********
|
***********
|
||||||
Limitations
|
Limitations
|
||||||
***********
|
***********
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user