diff --git a/doc/tutorials/dask.rst b/doc/tutorials/dask.rst index 652a6d3a1..ef65b1065 100644 --- a/doc/tutorials/dask.rst +++ b/doc/tutorials/dask.rst @@ -107,6 +107,95 @@ Here ``prediction`` is a dask ``Array`` object containing predictions from model Alternatively, XGBoost also implements the Scikit-Learn interface with ``DaskXGBClassifier`` and ``DaskXGBRegressor``. See ``xgboost/demo/dask`` for more examples. + +****************** +Running prediction +****************** + +In previous example we used ``DaskDMatrix`` as input to ``predict`` function. In +practice, it's also possible to call ``predict`` function directly on dask collections +like ``Array`` and ``DataFrame`` and might have better prediction performance. When +``DataFrame`` is used as prediction input, the result is a dask ``Series`` instead of +array. Also, there's inplace predict support on dask interface, which can help reducing +both memory usage and prediction time. + +.. code-block:: python + + # dtrain is the DaskDMatrix defined above. + prediction = xgb.dask.predict(client, booster, dtrain) + +or equivalently: + +.. code-block:: python + + # where X is a dask DataFrame or dask Array. + prediction = xgb.dask.predict(client, booster, X) + +Also for inplace prediction: + +.. code-block:: python + + booster.set_param({'predictor': 'gpu_predictor'}) + # where X is a dask DataFrame or dask Array. + prediction = xgb.dask.inplace_predict(client, booster, X) + + +*************************** +Working with other clusters +*************************** + +``LocalCluster`` is mostly used for testing. In real world applications some other +clusters might be preferred. Examples are like ``LocalCUDACluster`` for single node +multi-GPU instance, manually launched cluster by using command line utilities like +``dask-worker`` from ``distributed`` for not yet automated environments. Some special +clusters like ``KubeCluster`` from ``dask-kubernetes`` package are also possible. The +dask API in xgboost is orthogonal to the cluster type and can be used with any of them. A +typical testing workflow with ``KubeCluster`` looks like this: + +.. code-block:: python + + from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package + from dask.distributed import Client + import xgboost as xgb + import dask + import dask.array as da + + dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer", + "kubernetes.scheduler-service-wait-timeout": 360, + "distributed.comm.timeouts.connect": 360}) + + + def main(): + '''Connect to a remote kube cluster with GPU nodes and run training on it.''' + m = 1000 + n = 10 + kWorkers = 2 # assuming you have 2 GPU nodes on that cluster. + # You need to work out the worker-spec youself. See document in dask_kubernetes for + # its usage. Here we just want to show that XGBoost works on various clusters. + cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote') + cluster.scale(kWorkers) # scale to use all GPUs + + with Client(cluster) as client: + X = da.random.random(size=(m, n), chunks=100) + y = da.random.random(size=(m, ), chunks=100) + + regressor = xgb.dask.DaskXGBRegressor(n_estimators=10, missing=0.0) + regressor.client = client + regressor.set_params(tree_method='gpu_hist') + regressor.fit(X, y, eval_set=[(X, y)]) + + + if __name__ == '__main__': + # Launch the kube cluster on somewhere like GKE, then run this as client process. + # main function will connect to that cluster and start training xgboost model. + main() + + +However, these clusters might have their subtle differences like network configuration, or +specific cluster implementation might contains bugs that we are not aware of. Open an +issue if such case is found and there's no documentation on how to resolve it in that +cluster implementation. + ******* Threads ******* @@ -184,10 +273,6 @@ actual computation will return a coroutine and hence require awaiting: # Use `client.compute` instead of the `compute` method from dask collection print(await client.compute(prediction)) -Be careful that XGBoost uses all the workers supplied by the ``client`` object. If you -are training on GPU cluster and have 2 GPUs, the client object passed to XGBoost should -return 2 workers. - ***************************************************************************** Why is the initialization of ``DaskDMatrix`` so slow and throws weird errors ***************************************************************************** @@ -208,6 +293,28 @@ computations, one can explicitly wait for results of input data before construct Also dask's `diagnostics dashboard `_ can be used to monitor what operations are currently being performed. +************ +Memory Usage +************ + +Here are some pratices on reducing memory usage with dask and xgboost. + +- In a distributed work flow, data is best loaded by dask collections directly instead of + loaded by client process. When loading with client process is unavoidable, use + ``client.scatter`` to distribute data from client process to workers. See [2] for a + nice summary. + +- When using GPU input, like dataframe loaded by ``dask_cudf``, you can try + ``xgboost.dask.DaskDeviceQuantileDMatrix`` as a drop in replacement for ``DaskDMatrix`` + to reduce overall memory usage. See ``demo/dask/gpu_training.py`` for an example. + +- Use inplace prediction when possible. + +References: + +#. https://github.com/dask/dask/issues/6833 +#. https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array + *********** Limitations ***********