Handle the new device parameter in dask and demos. (#9386)
* Handle the new `device` parameter in dask and demos. - Check no ordinal is specified in the dask interface. - Update demos. - Update dask doc. - Update the condition for QDM.
This commit is contained in:
@@ -18,43 +18,45 @@ def main(client):
|
||||
# The Veterans' Administration Lung Cancer Trial
|
||||
# The Statistical Analysis of Failure Time Data by Kalbfleisch J. and Prentice R (1980)
|
||||
CURRENT_DIR = os.path.dirname(__file__)
|
||||
df = dd.read_csv(os.path.join(CURRENT_DIR, os.pardir, 'data', 'veterans_lung_cancer.csv'))
|
||||
df = dd.read_csv(
|
||||
os.path.join(CURRENT_DIR, os.pardir, "data", "veterans_lung_cancer.csv")
|
||||
)
|
||||
|
||||
# DaskDMatrix acts like normal DMatrix, works as a proxy for local
|
||||
# DMatrix scatter around workers.
|
||||
# For AFT survival, you'd need to extract the lower and upper bounds for the label
|
||||
# and pass them as arguments to DaskDMatrix.
|
||||
y_lower_bound = df['Survival_label_lower_bound']
|
||||
y_upper_bound = df['Survival_label_upper_bound']
|
||||
X = df.drop(['Survival_label_lower_bound',
|
||||
'Survival_label_upper_bound'], axis=1)
|
||||
dtrain = DaskDMatrix(client, X, label_lower_bound=y_lower_bound,
|
||||
label_upper_bound=y_upper_bound)
|
||||
y_lower_bound = df["Survival_label_lower_bound"]
|
||||
y_upper_bound = df["Survival_label_upper_bound"]
|
||||
X = df.drop(["Survival_label_lower_bound", "Survival_label_upper_bound"], axis=1)
|
||||
dtrain = DaskDMatrix(
|
||||
client, X, label_lower_bound=y_lower_bound, label_upper_bound=y_upper_bound
|
||||
)
|
||||
|
||||
# Use train method from xgboost.dask instead of xgboost. This
|
||||
# distributed version of train returns a dictionary containing the
|
||||
# resulting booster and evaluation history obtained from
|
||||
# evaluation metrics.
|
||||
params = {'verbosity': 1,
|
||||
'objective': 'survival:aft',
|
||||
'eval_metric': 'aft-nloglik',
|
||||
'learning_rate': 0.05,
|
||||
'aft_loss_distribution_scale': 1.20,
|
||||
'aft_loss_distribution': 'normal',
|
||||
'max_depth': 6,
|
||||
'lambda': 0.01,
|
||||
'alpha': 0.02}
|
||||
output = xgb.dask.train(client,
|
||||
params,
|
||||
dtrain,
|
||||
num_boost_round=100,
|
||||
evals=[(dtrain, 'train')])
|
||||
bst = output['booster']
|
||||
history = output['history']
|
||||
params = {
|
||||
"verbosity": 1,
|
||||
"objective": "survival:aft",
|
||||
"eval_metric": "aft-nloglik",
|
||||
"learning_rate": 0.05,
|
||||
"aft_loss_distribution_scale": 1.20,
|
||||
"aft_loss_distribution": "normal",
|
||||
"max_depth": 6,
|
||||
"lambda": 0.01,
|
||||
"alpha": 0.02,
|
||||
}
|
||||
output = xgb.dask.train(
|
||||
client, params, dtrain, num_boost_round=100, evals=[(dtrain, "train")]
|
||||
)
|
||||
bst = output["booster"]
|
||||
history = output["history"]
|
||||
|
||||
# you can pass output directly into `predict` too.
|
||||
prediction = xgb.dask.predict(client, bst, dtrain)
|
||||
print('Evaluation history: ', history)
|
||||
print("Evaluation history: ", history)
|
||||
|
||||
# Uncomment the following line to save the model to the disk
|
||||
# bst.save_model('survival_model.json')
|
||||
@@ -62,7 +64,7 @@ def main(client):
|
||||
return prediction
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# or use other clusters for scaling
|
||||
with LocalCluster(n_workers=7, threads_per_worker=4) as cluster:
|
||||
with Client(cluster) as client:
|
||||
|
||||
@@ -15,7 +15,7 @@ def main(client):
|
||||
m = 100000
|
||||
n = 100
|
||||
X = da.random.random(size=(m, n), chunks=100)
|
||||
y = da.random.random(size=(m, ), chunks=100)
|
||||
y = da.random.random(size=(m,), chunks=100)
|
||||
|
||||
# DaskDMatrix acts like normal DMatrix, works as a proxy for local
|
||||
# DMatrix scatter around workers.
|
||||
@@ -25,21 +25,23 @@ def main(client):
|
||||
# distributed version of train returns a dictionary containing the
|
||||
# resulting booster and evaluation history obtained from
|
||||
# evaluation metrics.
|
||||
output = xgb.dask.train(client,
|
||||
{'verbosity': 1,
|
||||
'tree_method': 'hist'},
|
||||
dtrain,
|
||||
num_boost_round=4, evals=[(dtrain, 'train')])
|
||||
bst = output['booster']
|
||||
history = output['history']
|
||||
output = xgb.dask.train(
|
||||
client,
|
||||
{"verbosity": 1, "tree_method": "hist"},
|
||||
dtrain,
|
||||
num_boost_round=4,
|
||||
evals=[(dtrain, "train")],
|
||||
)
|
||||
bst = output["booster"]
|
||||
history = output["history"]
|
||||
|
||||
# you can pass output directly into `predict` too.
|
||||
prediction = xgb.dask.predict(client, bst, dtrain)
|
||||
print('Evaluation history:', history)
|
||||
print("Evaluation history:", history)
|
||||
return prediction
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# or use other clusters for scaling
|
||||
with LocalCluster(n_workers=7, threads_per_worker=4) as cluster:
|
||||
with Client(cluster) as client:
|
||||
|
||||
@@ -13,33 +13,38 @@ from xgboost import dask as dxgb
|
||||
from xgboost.dask import DaskDMatrix
|
||||
|
||||
|
||||
def using_dask_matrix(client: Client, X, y):
|
||||
# DaskDMatrix acts like normal DMatrix, works as a proxy for local
|
||||
# DMatrix scatter around workers.
|
||||
def using_dask_matrix(client: Client, X: da.Array, y: da.Array) -> da.Array:
|
||||
# DaskDMatrix acts like normal DMatrix, works as a proxy for local DMatrix scatter
|
||||
# around workers.
|
||||
dtrain = DaskDMatrix(client, X, y)
|
||||
|
||||
# Use train method from xgboost.dask instead of xgboost. This
|
||||
# distributed version of train returns a dictionary containing the
|
||||
# resulting booster and evaluation history obtained from
|
||||
# evaluation metrics.
|
||||
output = xgb.dask.train(client,
|
||||
{'verbosity': 2,
|
||||
# Golden line for GPU training
|
||||
'tree_method': 'gpu_hist'},
|
||||
dtrain,
|
||||
num_boost_round=4, evals=[(dtrain, 'train')])
|
||||
bst = output['booster']
|
||||
history = output['history']
|
||||
# Use train method from xgboost.dask instead of xgboost. This distributed version
|
||||
# of train returns a dictionary containing the resulting booster and evaluation
|
||||
# history obtained from evaluation metrics.
|
||||
output = xgb.dask.train(
|
||||
client,
|
||||
{
|
||||
"verbosity": 2,
|
||||
"tree_method": "hist",
|
||||
# Golden line for GPU training
|
||||
"device": "cuda",
|
||||
},
|
||||
dtrain,
|
||||
num_boost_round=4,
|
||||
evals=[(dtrain, "train")],
|
||||
)
|
||||
bst = output["booster"]
|
||||
history = output["history"]
|
||||
|
||||
# you can pass output directly into `predict` too.
|
||||
prediction = xgb.dask.predict(client, bst, dtrain)
|
||||
print('Evaluation history:', history)
|
||||
print("Evaluation history:", history)
|
||||
return prediction
|
||||
|
||||
|
||||
def using_quantile_device_dmatrix(client: Client, X, y):
|
||||
"""`DaskQuantileDMatrix` is a data type specialized for `gpu_hist` and `hist` tree
|
||||
methods for reducing memory usage.
|
||||
def using_quantile_device_dmatrix(client: Client, X: da.Array, y: da.Array) -> da.Array:
|
||||
"""`DaskQuantileDMatrix` is a data type specialized for `hist` tree methods for
|
||||
reducing memory usage.
|
||||
|
||||
.. versionadded:: 1.2.0
|
||||
|
||||
@@ -52,26 +57,28 @@ def using_quantile_device_dmatrix(client: Client, X, y):
|
||||
# the `ref` argument of `DaskQuantileDMatrix`.
|
||||
dtrain = dxgb.DaskQuantileDMatrix(client, X, y)
|
||||
output = xgb.dask.train(
|
||||
client, {"verbosity": 2, "tree_method": "gpu_hist"}, dtrain, num_boost_round=4
|
||||
client,
|
||||
{"verbosity": 2, "tree_method": "hist", "device": "cuda"},
|
||||
dtrain,
|
||||
num_boost_round=4,
|
||||
)
|
||||
|
||||
prediction = xgb.dask.predict(client, output, X)
|
||||
return prediction
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here
|
||||
# `n_workers` represents the number of GPUs since we use one GPU per worker
|
||||
# process.
|
||||
# `n_workers` represents the number of GPUs since we use one GPU per worker process.
|
||||
with LocalCUDACluster(n_workers=2, threads_per_worker=4) as cluster:
|
||||
with Client(cluster) as client:
|
||||
# generate some random data for demonstration
|
||||
m = 100000
|
||||
n = 100
|
||||
X = da.random.random(size=(m, n), chunks=10000)
|
||||
y = da.random.random(size=(m, ), chunks=10000)
|
||||
y = da.random.random(size=(m,), chunks=10000)
|
||||
|
||||
print('Using DaskQuantileDMatrix')
|
||||
print("Using DaskQuantileDMatrix")
|
||||
from_ddqdm = using_quantile_device_dmatrix(client, X, y)
|
||||
print('Using DMatrix')
|
||||
print("Using DMatrix")
|
||||
from_dmatrix = using_dask_matrix(client, X, y)
|
||||
|
||||
@@ -21,7 +21,8 @@ def main(client):
|
||||
y = da.random.random(m, partition_size)
|
||||
|
||||
regressor = xgboost.dask.DaskXGBRegressor(verbosity=1)
|
||||
regressor.set_params(tree_method='gpu_hist')
|
||||
# set the device to CUDA
|
||||
regressor.set_params(tree_method="hist", device="cuda")
|
||||
# assigning client here is optional
|
||||
regressor.client = client
|
||||
|
||||
@@ -31,13 +32,13 @@ def main(client):
|
||||
bst = regressor.get_booster()
|
||||
history = regressor.evals_result()
|
||||
|
||||
print('Evaluation history:', history)
|
||||
print("Evaluation history:", history)
|
||||
# returned prediction is always a dask array.
|
||||
assert isinstance(prediction, da.Array)
|
||||
return bst # returning the trained model
|
||||
return bst # returning the trained model
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# With dask cuda, one can scale up XGBoost to arbitrary GPU clusters.
|
||||
# `LocalCUDACluster` used here is only for demonstration purpose.
|
||||
with LocalCUDACluster() as cluster:
|
||||
|
||||
Reference in New Issue
Block a user