diff --git a/demo/dask/README.md b/demo/dask/README.md new file mode 100644 index 000000000..b70248ca4 --- /dev/null +++ b/demo/dask/README.md @@ -0,0 +1,6 @@ +Dask +==== + +This directory contains some demonstrations for using `dask` with `XGBoost`. +For an overview, see +https://xgboost.readthedocs.io/en/latest/tutorials/dask.html . \ No newline at end of file diff --git a/demo/dask/cpu_training.py b/demo/dask/cpu_training.py index a949a27bf..dac734371 100644 --- a/demo/dask/cpu_training.py +++ b/demo/dask/cpu_training.py @@ -6,16 +6,23 @@ from dask import array as da def main(client): + # generate some random data for demonstration n = 100 m = 100000 partition_size = 1000 X = da.random.random((m, n), partition_size) y = da.random.random(m, partition_size) + # DaskDMatrix acts like normal DMatrix, works as a proxy for local + # DMatrix scatter around workers. dtrain = DaskDMatrix(client, X, y) + # Use train method from xgboost.dask instead of xgboost. This + # distributed version of train returns a dictionary containing the + # resulting booster and evaluation history obtained from + # evaluation metrics. output = xgb.dask.train(client, - {'verbosity': 2, + {'verbosity': 1, 'nthread': 1, 'tree_method': 'hist'}, dtrain, @@ -23,13 +30,14 @@ def main(client): bst = output['booster'] history = output['history'] + # you can pass output directly into `predict` too. prediction = xgb.dask.predict(client, bst, dtrain) print('Evaluation history:', history) return prediction if __name__ == '__main__': - # or use any other clusters - cluster = LocalCluster(n_workers=4, threads_per_worker=1) - client = Client(cluster) - main(client) + # or use other clusters for scaling + with LocalCluster(n_workers=4, threads_per_worker=1) as cluster: + with Client(cluster) as client: + main(client) diff --git a/demo/dask/gpu_training.py b/demo/dask/gpu_training.py index 469c6a7ee..807613e61 100644 --- a/demo/dask/gpu_training.py +++ b/demo/dask/gpu_training.py @@ -29,13 +29,16 @@ def main(client): bst = output['booster'] history = output['history'] + # you can pass output directly into `predict` too. prediction = xgb.dask.predict(client, bst, dtrain) print('Evaluation history:', history) return prediction if __name__ == '__main__': - # or use any other clusters - cluster = LocalCUDACluster(n_workers=4, threads_per_worker=1) - client = Client(cluster) - main(client) + # `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here + # `n_workers` represents the number of GPUs since we use one GPU per worker + # process. + with LocalCUDACluster(n_workers=2, threads_per_worker=1) as cluster: + with Client(cluster) as client: + main(client) diff --git a/demo/dask/sklearn_cpu_training.py b/demo/dask/sklearn_cpu_training.py index 4a16f9b4d..0549aa3d4 100644 --- a/demo/dask/sklearn_cpu_training.py +++ b/demo/dask/sklearn_cpu_training.py @@ -6,18 +6,18 @@ from dask.distributed import LocalCluster from dask import array as da import xgboost -if __name__ == '__main__': - cluster = LocalCluster(n_workers=2, silence_logs=False) # or use any other clusters - client = Client(cluster) +def main(client): + # generate some random data for demonstration n = 100 m = 10000 partition_size = 100 X = da.random.random((m, n), partition_size) y = da.random.random(m, partition_size) - regressor = xgboost.dask.DaskXGBRegressor(verbosity=2, n_estimators=2) + regressor = xgboost.dask.DaskXGBRegressor(verbosity=1, n_estimators=2) regressor.set_params(tree_method='hist') + # assigning client here is optional regressor.client = client regressor.fit(X, y, eval_set=[(X, y)]) @@ -27,4 +27,13 @@ if __name__ == '__main__': history = regressor.evals_result() print('Evaluation history:', history) + # returned prediction is always a dask array. assert isinstance(prediction, da.Array) + return bst # returning the trained model + + +if __name__ == '__main__': + # or use other clusters for scaling + with LocalCluster(n_workers=4, threads_per_worker=1) as cluster: + with Client(cluster) as client: + main(client) diff --git a/demo/dask/sklearn_gpu_training.py b/demo/dask/sklearn_gpu_training.py index caa58cfe1..afba21504 100644 --- a/demo/dask/sklearn_gpu_training.py +++ b/demo/dask/sklearn_gpu_training.py @@ -8,18 +8,18 @@ from dask_cuda import LocalCUDACluster from dask import array as da import xgboost -if __name__ == '__main__': - cluster = LocalCUDACluster() - client = Client(cluster) +def main(client): + # generate some random data for demonstration n = 100 m = 1000000 partition_size = 10000 X = da.random.random((m, n), partition_size) y = da.random.random(m, partition_size) - regressor = xgboost.dask.DaskXGBRegressor(verbosity=2) + regressor = xgboost.dask.DaskXGBRegressor(verbosity=1) regressor.set_params(tree_method='gpu_hist') + # assigning client here is optional regressor.client = client regressor.fit(X, y, eval_set=[(X, y)]) @@ -29,3 +29,14 @@ if __name__ == '__main__': history = regressor.evals_result() print('Evaluation history:', history) + # returned prediction is always a dask array. + assert isinstance(prediction, da.Array) + return bst # returning the trained model + + +if __name__ == '__main__': + # With dask cuda, one can scale up XGBoost to arbitrary GPU clusters. + # `LocalCUDACluster` used here is only for demonstration purpose. + with LocalCUDACluster() as cluster: + with Client(cluster) as client: + main(client)