From 871fabeee3cff53056d2f988a64152acc739c5fd Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Tue, 14 May 2024 04:21:02 +0800 Subject: [PATCH] [doc][dask] Update notes about k8s. (#10271) --- doc/tutorials/dask.rst | 72 ++++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/doc/tutorials/dask.rst b/doc/tutorials/dask.rst index 4b145f9a9..3544f88b5 100644 --- a/doc/tutorials/dask.rst +++ b/doc/tutorials/dask.rst @@ -237,41 +237,44 @@ For most of the use cases with GPUs, the `Dask-CUDA `_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud `_. See the `Dask documentation for a more comprehensive list `_. +Using Dask's ``LocalCluster`` is convenient for getting started quickly on a local machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA `_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud `_. See the `Dask documentation for a more comprehensive list `_. In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernetes `_: .. code-block:: python - from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package + from dask_kubernetes.operator import KubeCluster # Need to install the ``dask-kubernetes`` package + from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode + from dask.distributed import Client from xgboost import dask as dxgb - import dask import dask.array as da - dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer", - "kubernetes.scheduler-service-wait-timeout": 360, - "distributed.comm.timeouts.connect": 360}) - def main(): - '''Connect to a remote kube cluster with GPU nodes and run training on it.''' + '''Connect to a remote kube cluster with GPU nodes and run training on it.''' m = 1000 n = 10 kWorkers = 2 # assuming you have 2 GPU nodes on that cluster. # You need to work out the worker-spec yourself. See document in dask_kubernetes for # its usage. Here we just want to show that XGBoost works on various clusters. - cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote') - cluster.scale(kWorkers) # scale to use all GPUs - with Client(cluster) as client: - X = da.random.random(size=(m, n), chunks=100) - y = da.random.random(size=(m, ), chunks=100) + # See notes below for why we use pre-allocated cluster. + with KubeCluster( + name="xgboost-test", + image="my-image-name:latest", + n_workers=kWorkers, + create_mode=CreateMode.CONNECT_ONLY, + shutdown_on_close=False, + ) as cluster: + with Client(cluster) as client: + X = da.random.random(size=(m, n), chunks=100) + y = X.sum(axis=1) - regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0) - regressor.client = client - regressor.set_params(tree_method='hist', device="cuda") - regressor.fit(X, y, eval_set=[(X, y)]) + regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0) + regressor.client = client + regressor.set_params(tree_method='hist', device="cuda") + regressor.fit(X, y, eval_set=[(X, y)]) if __name__ == '__main__': @@ -279,11 +282,46 @@ In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernete # main function will connect to that cluster and start training xgboost model. main() + Different cluster classes might have subtle differences like network configuration, or specific cluster implementation might contains bugs that we are not aware of. Open an issue if such case is found and there's no documentation on how to resolve it in that cluster implementation. +An interesting aspect of the Kubernetes cluster is that the pods may become available +after the Dask workflow has begun, which can cause issues with distributed XGBoost since +XGBoost expects the nodes used by input data to remain unchanged during training. To use +Kubernetes clusters, it is necessary to wait for all the pods to be online before +submitting XGBoost tasks. One can either create a wait function in Python or simply +pre-allocate a cluster with k8s tools (like ``kubectl``) before running dask workflows. To +pre-allocate a cluster, we can first generate the cluster spec using dask kubernetes: + +.. code-block:: python + + import json + + from dask_kubernetes.operator import make_cluster_spec + + spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16) + with open("cluster-spec.json", "w") as fd: + json.dump(spec, fd, indent=2) + +.. code-block:: sh + + kubectl apply -f ./cluster-spec.json + + +Check whether the pods are available: + +.. code-block:: sh + + kubectl get pods + +Once all pods have been initialized, the Dask XGBoost workflow can be run, as in the +previous example. It is important to ensure that the cluster sets the parameter +``create_mode=CreateMode.CONNECT_ONLY`` and optionally ``shutdown_on_close=False`` if you +do not want to shut down the cluster after a single job. + ******* Threads *******