Improve doc and demo for dask. (#4907)
* Add a readme with link to doc. * Add more comments in the demonstrations code. * Workaround https://github.com/dask/distributed/issues/3081 .
This commit is contained in:
parent
d30e63a0a5
commit
7e24a8d245
6
demo/dask/README.md
Normal file
6
demo/dask/README.md
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
Dask
|
||||||
|
====
|
||||||
|
|
||||||
|
This directory contains some demonstrations for using `dask` with `XGBoost`.
|
||||||
|
For an overview, see
|
||||||
|
https://xgboost.readthedocs.io/en/latest/tutorials/dask.html .
|
||||||
@ -6,16 +6,23 @@ from dask import array as da
|
|||||||
|
|
||||||
|
|
||||||
def main(client):
|
def main(client):
|
||||||
|
# generate some random data for demonstration
|
||||||
n = 100
|
n = 100
|
||||||
m = 100000
|
m = 100000
|
||||||
partition_size = 1000
|
partition_size = 1000
|
||||||
X = da.random.random((m, n), partition_size)
|
X = da.random.random((m, n), partition_size)
|
||||||
y = da.random.random(m, partition_size)
|
y = da.random.random(m, partition_size)
|
||||||
|
|
||||||
|
# DaskDMatrix acts like normal DMatrix, works as a proxy for local
|
||||||
|
# DMatrix scatter around workers.
|
||||||
dtrain = DaskDMatrix(client, X, y)
|
dtrain = DaskDMatrix(client, X, y)
|
||||||
|
|
||||||
|
# Use train method from xgboost.dask instead of xgboost. This
|
||||||
|
# distributed version of train returns a dictionary containing the
|
||||||
|
# resulting booster and evaluation history obtained from
|
||||||
|
# evaluation metrics.
|
||||||
output = xgb.dask.train(client,
|
output = xgb.dask.train(client,
|
||||||
{'verbosity': 2,
|
{'verbosity': 1,
|
||||||
'nthread': 1,
|
'nthread': 1,
|
||||||
'tree_method': 'hist'},
|
'tree_method': 'hist'},
|
||||||
dtrain,
|
dtrain,
|
||||||
@ -23,13 +30,14 @@ def main(client):
|
|||||||
bst = output['booster']
|
bst = output['booster']
|
||||||
history = output['history']
|
history = output['history']
|
||||||
|
|
||||||
|
# you can pass output directly into `predict` too.
|
||||||
prediction = xgb.dask.predict(client, bst, dtrain)
|
prediction = xgb.dask.predict(client, bst, dtrain)
|
||||||
print('Evaluation history:', history)
|
print('Evaluation history:', history)
|
||||||
return prediction
|
return prediction
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# or use any other clusters
|
# or use other clusters for scaling
|
||||||
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
|
with LocalCluster(n_workers=4, threads_per_worker=1) as cluster:
|
||||||
client = Client(cluster)
|
with Client(cluster) as client:
|
||||||
main(client)
|
main(client)
|
||||||
|
|||||||
@ -29,13 +29,16 @@ def main(client):
|
|||||||
bst = output['booster']
|
bst = output['booster']
|
||||||
history = output['history']
|
history = output['history']
|
||||||
|
|
||||||
|
# you can pass output directly into `predict` too.
|
||||||
prediction = xgb.dask.predict(client, bst, dtrain)
|
prediction = xgb.dask.predict(client, bst, dtrain)
|
||||||
print('Evaluation history:', history)
|
print('Evaluation history:', history)
|
||||||
return prediction
|
return prediction
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# or use any other clusters
|
# `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here
|
||||||
cluster = LocalCUDACluster(n_workers=4, threads_per_worker=1)
|
# `n_workers` represents the number of GPUs since we use one GPU per worker
|
||||||
client = Client(cluster)
|
# process.
|
||||||
main(client)
|
with LocalCUDACluster(n_workers=2, threads_per_worker=1) as cluster:
|
||||||
|
with Client(cluster) as client:
|
||||||
|
main(client)
|
||||||
|
|||||||
@ -6,18 +6,18 @@ from dask.distributed import LocalCluster
|
|||||||
from dask import array as da
|
from dask import array as da
|
||||||
import xgboost
|
import xgboost
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
cluster = LocalCluster(n_workers=2, silence_logs=False) # or use any other clusters
|
|
||||||
client = Client(cluster)
|
|
||||||
|
|
||||||
|
def main(client):
|
||||||
|
# generate some random data for demonstration
|
||||||
n = 100
|
n = 100
|
||||||
m = 10000
|
m = 10000
|
||||||
partition_size = 100
|
partition_size = 100
|
||||||
X = da.random.random((m, n), partition_size)
|
X = da.random.random((m, n), partition_size)
|
||||||
y = da.random.random(m, partition_size)
|
y = da.random.random(m, partition_size)
|
||||||
|
|
||||||
regressor = xgboost.dask.DaskXGBRegressor(verbosity=2, n_estimators=2)
|
regressor = xgboost.dask.DaskXGBRegressor(verbosity=1, n_estimators=2)
|
||||||
regressor.set_params(tree_method='hist')
|
regressor.set_params(tree_method='hist')
|
||||||
|
# assigning client here is optional
|
||||||
regressor.client = client
|
regressor.client = client
|
||||||
|
|
||||||
regressor.fit(X, y, eval_set=[(X, y)])
|
regressor.fit(X, y, eval_set=[(X, y)])
|
||||||
@ -27,4 +27,13 @@ if __name__ == '__main__':
|
|||||||
history = regressor.evals_result()
|
history = regressor.evals_result()
|
||||||
|
|
||||||
print('Evaluation history:', history)
|
print('Evaluation history:', history)
|
||||||
|
# returned prediction is always a dask array.
|
||||||
assert isinstance(prediction, da.Array)
|
assert isinstance(prediction, da.Array)
|
||||||
|
return bst # returning the trained model
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# or use other clusters for scaling
|
||||||
|
with LocalCluster(n_workers=4, threads_per_worker=1) as cluster:
|
||||||
|
with Client(cluster) as client:
|
||||||
|
main(client)
|
||||||
|
|||||||
@ -8,18 +8,18 @@ from dask_cuda import LocalCUDACluster
|
|||||||
from dask import array as da
|
from dask import array as da
|
||||||
import xgboost
|
import xgboost
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
cluster = LocalCUDACluster()
|
|
||||||
client = Client(cluster)
|
|
||||||
|
|
||||||
|
def main(client):
|
||||||
|
# generate some random data for demonstration
|
||||||
n = 100
|
n = 100
|
||||||
m = 1000000
|
m = 1000000
|
||||||
partition_size = 10000
|
partition_size = 10000
|
||||||
X = da.random.random((m, n), partition_size)
|
X = da.random.random((m, n), partition_size)
|
||||||
y = da.random.random(m, partition_size)
|
y = da.random.random(m, partition_size)
|
||||||
|
|
||||||
regressor = xgboost.dask.DaskXGBRegressor(verbosity=2)
|
regressor = xgboost.dask.DaskXGBRegressor(verbosity=1)
|
||||||
regressor.set_params(tree_method='gpu_hist')
|
regressor.set_params(tree_method='gpu_hist')
|
||||||
|
# assigning client here is optional
|
||||||
regressor.client = client
|
regressor.client = client
|
||||||
|
|
||||||
regressor.fit(X, y, eval_set=[(X, y)])
|
regressor.fit(X, y, eval_set=[(X, y)])
|
||||||
@ -29,3 +29,14 @@ if __name__ == '__main__':
|
|||||||
history = regressor.evals_result()
|
history = regressor.evals_result()
|
||||||
|
|
||||||
print('Evaluation history:', history)
|
print('Evaluation history:', history)
|
||||||
|
# returned prediction is always a dask array.
|
||||||
|
assert isinstance(prediction, da.Array)
|
||||||
|
return bst # returning the trained model
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# With dask cuda, one can scale up XGBoost to arbitrary GPU clusters.
|
||||||
|
# `LocalCUDACluster` used here is only for demonstration purpose.
|
||||||
|
with LocalCUDACluster() as cluster:
|
||||||
|
with Client(cluster) as client:
|
||||||
|
main(client)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user