[doc][dask] Update notes about k8s. (#10271)

This commit is contained in:
Jiaming Yuan 2024-05-14 04:21:02 +08:00 committed by GitHub
parent 75fe2ff0c3
commit 871fabeee3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -237,41 +237,44 @@ For most of the use cases with GPUs, the `Dask-CUDA <https://docs.rapids.ai/api/
Working with other clusters
***************************
Using Dask's ``LocalCluster`` is convenient for getting started quickly on a single-machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.
Using Dask's ``LocalCluster`` is convenient for getting started quickly on a local machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.
In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernetes <https://docs.dask.org/en/stable/deploying-kubernetes.html>`_:
.. code-block:: python
from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode
from dask.distributed import Client
from xgboost import dask as dxgb
import dask
import dask.array as da
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer",
"kubernetes.scheduler-service-wait-timeout": 360,
"distributed.comm.timeouts.connect": 360})
def main():
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
m = 1000
n = 10
kWorkers = 2 # assuming you have 2 GPU nodes on that cluster.
# You need to work out the worker-spec yourself. See document in dask_kubernetes for
# its usage. Here we just want to show that XGBoost works on various clusters.
cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote')
cluster.scale(kWorkers) # scale to use all GPUs
with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = da.random.random(size=(m, ), chunks=100)
# See notes below for why we use pre-allocated cluster.
with KubeCluster(
name="xgboost-test",
image="my-image-name:latest",
n_workers=kWorkers,
create_mode=CreateMode.CONNECT_ONLY,
shutdown_on_close=False,
) as cluster:
with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = X.sum(axis=1)
regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])
regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])
if __name__ == '__main__':
@ -279,11 +282,46 @@ In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernete
# main function will connect to that cluster and start training xgboost model.
main()
Different cluster classes might have subtle differences like network configuration, or
specific cluster implementation might contains bugs that we are not aware of. Open an
issue if such case is found and there's no documentation on how to resolve it in that
cluster implementation.
An interesting aspect of the Kubernetes cluster is that the pods may become available
after the Dask workflow has begun, which can cause issues with distributed XGBoost since
XGBoost expects the nodes used by input data to remain unchanged during training. To use
Kubernetes clusters, it is necessary to wait for all the pods to be online before
submitting XGBoost tasks. One can either create a wait function in Python or simply
pre-allocate a cluster with k8s tools (like ``kubectl``) before running dask workflows. To
pre-allocate a cluster, we can first generate the cluster spec using dask kubernetes:
.. code-block:: python
import json
from dask_kubernetes.operator import make_cluster_spec
spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16)
with open("cluster-spec.json", "w") as fd:
json.dump(spec, fd, indent=2)
.. code-block:: sh
kubectl apply -f ./cluster-spec.json
Check whether the pods are available:
.. code-block:: sh
kubectl get pods
Once all pods have been initialized, the Dask XGBoost workflow can be run, as in the
previous example. It is important to ensure that the cluster sets the parameter
``create_mode=CreateMode.CONNECT_ONLY`` and optionally ``shutdown_on_close=False`` if you
do not want to shut down the cluster after a single job.
*******
Threads
*******