From 16eb41936d6edfa216f78029ed2980f391a97adc Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Sat, 15 Jul 2023 19:11:20 +0800 Subject: [PATCH] Handle the new `device` parameter in dask and demos. (#9386) * Handle the new `device` parameter in dask and demos. - Check no ordinal is specified in the dask interface. - Update demos. - Update dask doc. - Update the condition for QDM. --- demo/dask/cpu_survival.py | 52 +- demo/dask/cpu_training.py | 22 +- demo/dask/gpu_training.py | 59 +- demo/dask/sklearn_gpu_training.py | 9 +- demo/guide-python/callbacks.py | 3 +- demo/guide-python/cat_in_the_dat.py | 3 +- demo/guide-python/categorical.py | 4 +- demo/guide-python/external_memory.py | 5 +- demo/guide-python/learning_to_rank.py | 6 +- demo/guide-python/quantile_data_iterator.py | 35 +- demo/guide-python/update_process.py | 8 +- doc/gpu/index.rst | 31 +- doc/parameter.rst | 8 +- doc/python/python_intro.rst | 4 +- doc/tutorials/dask.rst | 11 +- include/xgboost/c_api.h | 2 +- python-package/xgboost/core.py | 2 +- python-package/xgboost/dask.py | 20 +- python-package/xgboost/sklearn.py | 6 +- python-package/xgboost/spark/core.py | 8 +- src/c_api/c_api.cc | 15 +- src/common/error_msg.cc | 13 +- src/common/error_msg.h | 3 + src/learner.cc | 15 +- tests/ci_build/lint_python.py | 3 + tests/python/test_quantile_dmatrix.py | 12 + tests/python/test_updaters.py | 22 +- tests/python/test_with_sklearn.py | 5 +- .../test_gpu_with_dask/test_gpu_with_dask.py | 68 +- .../test_with_dask/test_with_dask.py | 623 ++++++++++-------- .../test_with_spark/test_spark_local.py | 4 +- 31 files changed, 631 insertions(+), 450 deletions(-) diff --git a/demo/dask/cpu_survival.py b/demo/dask/cpu_survival.py index 83eddd361..7fe0570de 100644 --- a/demo/dask/cpu_survival.py +++ b/demo/dask/cpu_survival.py @@ -18,43 +18,45 @@ def main(client): # The Veterans' Administration Lung Cancer Trial # The Statistical Analysis of Failure Time Data by Kalbfleisch J. and Prentice R (1980) CURRENT_DIR = os.path.dirname(__file__) - df = dd.read_csv(os.path.join(CURRENT_DIR, os.pardir, 'data', 'veterans_lung_cancer.csv')) + df = dd.read_csv( + os.path.join(CURRENT_DIR, os.pardir, "data", "veterans_lung_cancer.csv") + ) # DaskDMatrix acts like normal DMatrix, works as a proxy for local # DMatrix scatter around workers. # For AFT survival, you'd need to extract the lower and upper bounds for the label # and pass them as arguments to DaskDMatrix. - y_lower_bound = df['Survival_label_lower_bound'] - y_upper_bound = df['Survival_label_upper_bound'] - X = df.drop(['Survival_label_lower_bound', - 'Survival_label_upper_bound'], axis=1) - dtrain = DaskDMatrix(client, X, label_lower_bound=y_lower_bound, - label_upper_bound=y_upper_bound) + y_lower_bound = df["Survival_label_lower_bound"] + y_upper_bound = df["Survival_label_upper_bound"] + X = df.drop(["Survival_label_lower_bound", "Survival_label_upper_bound"], axis=1) + dtrain = DaskDMatrix( + client, X, label_lower_bound=y_lower_bound, label_upper_bound=y_upper_bound + ) # Use train method from xgboost.dask instead of xgboost. This # distributed version of train returns a dictionary containing the # resulting booster and evaluation history obtained from # evaluation metrics. - params = {'verbosity': 1, - 'objective': 'survival:aft', - 'eval_metric': 'aft-nloglik', - 'learning_rate': 0.05, - 'aft_loss_distribution_scale': 1.20, - 'aft_loss_distribution': 'normal', - 'max_depth': 6, - 'lambda': 0.01, - 'alpha': 0.02} - output = xgb.dask.train(client, - params, - dtrain, - num_boost_round=100, - evals=[(dtrain, 'train')]) - bst = output['booster'] - history = output['history'] + params = { + "verbosity": 1, + "objective": "survival:aft", + "eval_metric": "aft-nloglik", + "learning_rate": 0.05, + "aft_loss_distribution_scale": 1.20, + "aft_loss_distribution": "normal", + "max_depth": 6, + "lambda": 0.01, + "alpha": 0.02, + } + output = xgb.dask.train( + client, params, dtrain, num_boost_round=100, evals=[(dtrain, "train")] + ) + bst = output["booster"] + history = output["history"] # you can pass output directly into `predict` too. prediction = xgb.dask.predict(client, bst, dtrain) - print('Evaluation history: ', history) + print("Evaluation history: ", history) # Uncomment the following line to save the model to the disk # bst.save_model('survival_model.json') @@ -62,7 +64,7 @@ def main(client): return prediction -if __name__ == '__main__': +if __name__ == "__main__": # or use other clusters for scaling with LocalCluster(n_workers=7, threads_per_worker=4) as cluster: with Client(cluster) as client: diff --git a/demo/dask/cpu_training.py b/demo/dask/cpu_training.py index a31e5d2a6..811af5cd3 100644 --- a/demo/dask/cpu_training.py +++ b/demo/dask/cpu_training.py @@ -15,7 +15,7 @@ def main(client): m = 100000 n = 100 X = da.random.random(size=(m, n), chunks=100) - y = da.random.random(size=(m, ), chunks=100) + y = da.random.random(size=(m,), chunks=100) # DaskDMatrix acts like normal DMatrix, works as a proxy for local # DMatrix scatter around workers. @@ -25,21 +25,23 @@ def main(client): # distributed version of train returns a dictionary containing the # resulting booster and evaluation history obtained from # evaluation metrics. - output = xgb.dask.train(client, - {'verbosity': 1, - 'tree_method': 'hist'}, - dtrain, - num_boost_round=4, evals=[(dtrain, 'train')]) - bst = output['booster'] - history = output['history'] + output = xgb.dask.train( + client, + {"verbosity": 1, "tree_method": "hist"}, + dtrain, + num_boost_round=4, + evals=[(dtrain, "train")], + ) + bst = output["booster"] + history = output["history"] # you can pass output directly into `predict` too. prediction = xgb.dask.predict(client, bst, dtrain) - print('Evaluation history:', history) + print("Evaluation history:", history) return prediction -if __name__ == '__main__': +if __name__ == "__main__": # or use other clusters for scaling with LocalCluster(n_workers=7, threads_per_worker=4) as cluster: with Client(cluster) as client: diff --git a/demo/dask/gpu_training.py b/demo/dask/gpu_training.py index 1a75f6c70..6eea00692 100644 --- a/demo/dask/gpu_training.py +++ b/demo/dask/gpu_training.py @@ -13,33 +13,38 @@ from xgboost import dask as dxgb from xgboost.dask import DaskDMatrix -def using_dask_matrix(client: Client, X, y): - # DaskDMatrix acts like normal DMatrix, works as a proxy for local - # DMatrix scatter around workers. +def using_dask_matrix(client: Client, X: da.Array, y: da.Array) -> da.Array: + # DaskDMatrix acts like normal DMatrix, works as a proxy for local DMatrix scatter + # around workers. dtrain = DaskDMatrix(client, X, y) - # Use train method from xgboost.dask instead of xgboost. This - # distributed version of train returns a dictionary containing the - # resulting booster and evaluation history obtained from - # evaluation metrics. - output = xgb.dask.train(client, - {'verbosity': 2, - # Golden line for GPU training - 'tree_method': 'gpu_hist'}, - dtrain, - num_boost_round=4, evals=[(dtrain, 'train')]) - bst = output['booster'] - history = output['history'] + # Use train method from xgboost.dask instead of xgboost. This distributed version + # of train returns a dictionary containing the resulting booster and evaluation + # history obtained from evaluation metrics. + output = xgb.dask.train( + client, + { + "verbosity": 2, + "tree_method": "hist", + # Golden line for GPU training + "device": "cuda", + }, + dtrain, + num_boost_round=4, + evals=[(dtrain, "train")], + ) + bst = output["booster"] + history = output["history"] # you can pass output directly into `predict` too. prediction = xgb.dask.predict(client, bst, dtrain) - print('Evaluation history:', history) + print("Evaluation history:", history) return prediction -def using_quantile_device_dmatrix(client: Client, X, y): - """`DaskQuantileDMatrix` is a data type specialized for `gpu_hist` and `hist` tree - methods for reducing memory usage. +def using_quantile_device_dmatrix(client: Client, X: da.Array, y: da.Array) -> da.Array: + """`DaskQuantileDMatrix` is a data type specialized for `hist` tree methods for + reducing memory usage. .. versionadded:: 1.2.0 @@ -52,26 +57,28 @@ def using_quantile_device_dmatrix(client: Client, X, y): # the `ref` argument of `DaskQuantileDMatrix`. dtrain = dxgb.DaskQuantileDMatrix(client, X, y) output = xgb.dask.train( - client, {"verbosity": 2, "tree_method": "gpu_hist"}, dtrain, num_boost_round=4 + client, + {"verbosity": 2, "tree_method": "hist", "device": "cuda"}, + dtrain, + num_boost_round=4, ) prediction = xgb.dask.predict(client, output, X) return prediction -if __name__ == '__main__': +if __name__ == "__main__": # `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here - # `n_workers` represents the number of GPUs since we use one GPU per worker - # process. + # `n_workers` represents the number of GPUs since we use one GPU per worker process. with LocalCUDACluster(n_workers=2, threads_per_worker=4) as cluster: with Client(cluster) as client: # generate some random data for demonstration m = 100000 n = 100 X = da.random.random(size=(m, n), chunks=10000) - y = da.random.random(size=(m, ), chunks=10000) + y = da.random.random(size=(m,), chunks=10000) - print('Using DaskQuantileDMatrix') + print("Using DaskQuantileDMatrix") from_ddqdm = using_quantile_device_dmatrix(client, X, y) - print('Using DMatrix') + print("Using DMatrix") from_dmatrix = using_dask_matrix(client, X, y) diff --git a/demo/dask/sklearn_gpu_training.py b/demo/dask/sklearn_gpu_training.py index 4c544e4e8..32a994464 100644 --- a/demo/dask/sklearn_gpu_training.py +++ b/demo/dask/sklearn_gpu_training.py @@ -21,7 +21,8 @@ def main(client): y = da.random.random(m, partition_size) regressor = xgboost.dask.DaskXGBRegressor(verbosity=1) - regressor.set_params(tree_method='gpu_hist') + # set the device to CUDA + regressor.set_params(tree_method="hist", device="cuda") # assigning client here is optional regressor.client = client @@ -31,13 +32,13 @@ def main(client): bst = regressor.get_booster() history = regressor.evals_result() - print('Evaluation history:', history) + print("Evaluation history:", history) # returned prediction is always a dask array. assert isinstance(prediction, da.Array) - return bst # returning the trained model + return bst # returning the trained model -if __name__ == '__main__': +if __name__ == "__main__": # With dask cuda, one can scale up XGBoost to arbitrary GPU clusters. # `LocalCUDACluster` used here is only for demonstration purpose. with LocalCUDACluster() as cluster: diff --git a/demo/guide-python/callbacks.py b/demo/guide-python/callbacks.py index 817a65939..42fe397db 100644 --- a/demo/guide-python/callbacks.py +++ b/demo/guide-python/callbacks.py @@ -71,7 +71,8 @@ def custom_callback(): { 'objective': 'binary:logistic', 'eval_metric': ['error', 'rmse'], - 'tree_method': 'gpu_hist' + 'tree_method': 'hist', + "device": "cuda", }, D_train, evals=[(D_train, 'Train'), (D_valid, 'Valid')], diff --git a/demo/guide-python/cat_in_the_dat.py b/demo/guide-python/cat_in_the_dat.py index fdac04d6b..f8f345bda 100644 --- a/demo/guide-python/cat_in_the_dat.py +++ b/demo/guide-python/cat_in_the_dat.py @@ -63,7 +63,8 @@ def load_cat_in_the_dat() -> tuple[pd.DataFrame, pd.Series]: params = { - "tree_method": "gpu_hist", + "tree_method": "hist", + "device": "cuda", "n_estimators": 32, "colsample_bylevel": 0.7, } diff --git a/demo/guide-python/categorical.py b/demo/guide-python/categorical.py index a7fc85c71..aa5fb005b 100644 --- a/demo/guide-python/categorical.py +++ b/demo/guide-python/categorical.py @@ -58,13 +58,13 @@ def main() -> None: # Specify `enable_categorical` to True, also we use onehot encoding based split # here for demonstration. For details see the document of `max_cat_to_onehot`. reg = xgb.XGBRegressor( - tree_method="gpu_hist", enable_categorical=True, max_cat_to_onehot=5 + tree_method="hist", enable_categorical=True, max_cat_to_onehot=5, device="cuda" ) reg.fit(X, y, eval_set=[(X, y)]) # Pass in already encoded data X_enc, y_enc = make_categorical(100, 10, 4, True) - reg_enc = xgb.XGBRegressor(tree_method="gpu_hist") + reg_enc = xgb.XGBRegressor(tree_method="hist", device="cuda") reg_enc.fit(X_enc, y_enc, eval_set=[(X_enc, y_enc)]) reg_results = np.array(reg.evals_result()["validation_0"]["rmse"]) diff --git a/demo/guide-python/external_memory.py b/demo/guide-python/external_memory.py index 11a05c61c..fdaa9dab9 100644 --- a/demo/guide-python/external_memory.py +++ b/demo/guide-python/external_memory.py @@ -82,8 +82,9 @@ def main(tmpdir: str) -> xgboost.Booster: missing = np.NaN Xy = xgboost.DMatrix(it, missing=missing, enable_categorical=False) - # Other tree methods including ``approx``, and ``gpu_hist`` are supported. GPU - # behaves differently than CPU tree methods. See tutorial in doc for details. + # ``approx`` is also supported, but less efficient due to sketching. GPU behaves + # differently than CPU tree methods as it uses a hybrid approach. See tutorial in + # doc for details. booster = xgboost.train( {"tree_method": "hist", "max_depth": 4}, Xy, diff --git a/demo/guide-python/learning_to_rank.py b/demo/guide-python/learning_to_rank.py index 37b7157f5..62df8253b 100644 --- a/demo/guide-python/learning_to_rank.py +++ b/demo/guide-python/learning_to_rank.py @@ -104,7 +104,8 @@ def ranking_demo(args: argparse.Namespace) -> None: qid_test = qid_test[sorted_idx] ranker = xgb.XGBRanker( - tree_method="gpu_hist", + tree_method="hist", + device="cuda", lambdarank_pair_method="topk", lambdarank_num_pair_per_sample=13, eval_metric=["ndcg@1", "ndcg@8"], @@ -161,7 +162,8 @@ def click_data_demo(args: argparse.Namespace) -> None: ranker = xgb.XGBRanker( n_estimators=512, - tree_method="gpu_hist", + tree_method="hist", + device="cuda", learning_rate=0.01, reg_lambda=1.5, subsample=0.8, diff --git a/demo/guide-python/quantile_data_iterator.py b/demo/guide-python/quantile_data_iterator.py index 29dd96b24..1241caef4 100644 --- a/demo/guide-python/quantile_data_iterator.py +++ b/demo/guide-python/quantile_data_iterator.py @@ -23,22 +23,23 @@ import numpy import xgboost COLS = 64 -ROWS_PER_BATCH = 1000 # data is splited by rows +ROWS_PER_BATCH = 1000 # data is splited by rows BATCHES = 32 class IterForDMatrixDemo(xgboost.core.DataIter): - '''A data iterator for XGBoost DMatrix. + """A data iterator for XGBoost DMatrix. `reset` and `next` are required for any data iterator, other functions here are utilites for demonstration's purpose. - ''' + """ + def __init__(self): - '''Generate some random data for demostration. + """Generate some random data for demostration. Actual data can be anything that is currently supported by XGBoost. - ''' + """ self.rows = ROWS_PER_BATCH self.cols = COLS rng = cupy.random.RandomState(1994) @@ -46,7 +47,7 @@ class IterForDMatrixDemo(xgboost.core.DataIter): self._labels = [rng.randn(self.rows)] * BATCHES self._weights = [rng.uniform(size=self.rows)] * BATCHES - self.it = 0 # set iterator to 0 + self.it = 0 # set iterator to 0 super().__init__() def as_array(self): @@ -59,27 +60,26 @@ class IterForDMatrixDemo(xgboost.core.DataIter): return cupy.concatenate(self._weights) def data(self): - '''Utility function for obtaining current batch of data.''' + """Utility function for obtaining current batch of data.""" return self._data[self.it] def labels(self): - '''Utility function for obtaining current batch of label.''' + """Utility function for obtaining current batch of label.""" return self._labels[self.it] def weights(self): return self._weights[self.it] def reset(self): - '''Reset the iterator''' + """Reset the iterator""" self.it = 0 def next(self, input_data): - '''Yield next batch of data.''' + """Yield next batch of data.""" if self.it == len(self._data): # Return 0 when there's no more batch. return 0 - input_data(data=self.data(), label=self.labels(), - weight=self.weights()) + input_data(data=self.data(), label=self.labels(), weight=self.weights()) self.it += 1 return 1 @@ -103,18 +103,19 @@ def main(): assert m_with_it.num_col() == m.num_col() assert m_with_it.num_row() == m.num_row() - # Tree meethod must be one of the `hist` or `gpu_hist`. We use `gpu_hist` for GPU - # input here. + # Tree meethod must be `hist`. reg_with_it = xgboost.train( - {"tree_method": "gpu_hist"}, m_with_it, num_boost_round=rounds + {"tree_method": "hist", "device": "cuda"}, m_with_it, num_boost_round=rounds ) predict_with_it = reg_with_it.predict(m_with_it) - reg = xgboost.train({"tree_method": "gpu_hist"}, m, num_boost_round=rounds) + reg = xgboost.train( + {"tree_method": "hist", "device": "cuda"}, m, num_boost_round=rounds + ) predict = reg.predict(m) numpy.testing.assert_allclose(predict_with_it, predict, rtol=1e6) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/demo/guide-python/update_process.py b/demo/guide-python/update_process.py index 77e0dc870..17bbbc39c 100644 --- a/demo/guide-python/update_process.py +++ b/demo/guide-python/update_process.py @@ -24,7 +24,7 @@ def main(): Xy = xgb.DMatrix(X_train, y_train) evals_result: xgb.callback.EvaluationMonitor.EvalsLog = {} booster = xgb.train( - {"tree_method": "gpu_hist", "max_depth": 6}, + {"tree_method": "hist", "max_depth": 6, "device": "cuda"}, Xy, num_boost_round=n_rounds, evals=[(Xy, "Train")], @@ -33,8 +33,8 @@ def main(): SHAP = booster.predict(Xy, pred_contribs=True) # Refresh the leaf value and tree statistic - X_refresh = X[X.shape[0] // 2:] - y_refresh = y[y.shape[0] // 2:] + X_refresh = X[X.shape[0] // 2 :] + y_refresh = y[y.shape[0] // 2 :] Xy_refresh = xgb.DMatrix(X_refresh, y_refresh) # The model will adapt to other half of the data by changing leaf value (no change in # split condition) with refresh_leaf set to True. @@ -87,7 +87,7 @@ def main(): np.testing.assert_allclose( np.array(prune_result["Original"]["rmse"]), np.array(prune_result["Train"]["rmse"]), - atol=1e-5 + atol=1e-5, ) diff --git a/doc/gpu/index.rst b/doc/gpu/index.rst index 3cee0cdf5..4489c1427 100644 --- a/doc/gpu/index.rst +++ b/doc/gpu/index.rst @@ -14,30 +14,24 @@ Most of the algorithms in XGBoost including training, prediction and evaluation Usage ===== -Specify the ``tree_method`` parameter as ``gpu_hist``. For details around the ``tree_method`` parameter, see :doc:`tree method `. - -Supported parameters --------------------- - -GPU accelerated prediction is enabled by default for the above mentioned ``tree_method`` parameters but can be switched to CPU prediction by setting ``predictor`` to ``cpu_predictor``. This could be useful if you want to conserve GPU memory. Likewise when using CPU algorithms, GPU accelerated prediction can be enabled by setting ``predictor`` to ``gpu_predictor``. - -The device ordinal (which GPU to use if you have many of them) can be selected using the -``device`` parameter, which defaults to 0 when "CUDA" is specified(the first device reported by CUDA -runtime). +To enable GPU acceleration, specify the ``device`` parameter as ``cuda``. In addition, the device ordinal (which GPU to use if you have multiple devices in the same node) can be specified using the ``cuda:`` syntax, where ```` is an integer that represents the device ordinal. XGBoost defaults to 0 (the first device reported by CUDA runtime). The GPU algorithms currently work with CLI, Python, R, and JVM packages. See :doc:`/install` for details. .. code-block:: python :caption: Python example - param["device"] = "cuda:0" - param['tree_method'] = 'gpu_hist' + params = dict() + params["device"] = "cuda:0" + params["tree_method"] = "hist" + Xy = xgboost.QuantileDMatrix(X, y) + xgboost.train(params, Xy) .. code-block:: python :caption: With Scikit-Learn interface - XGBRegressor(tree_method='gpu_hist', device="cuda") + XGBRegressor(tree_method="hist", device="cuda") GPU-Accelerated SHAP values @@ -46,12 +40,11 @@ XGBoost makes use of `GPUTreeShap `_ as .. code-block:: python - model.set_param({"device": "cuda:0", "tree_method": "gpu_hist"}) - shap_values = model.predict(dtrain, pred_contribs=True) + booster.set_param({"device": "cuda:0"}) + shap_values = booster.predict(dtrain, pred_contribs=True) shap_interaction_values = model.predict(dtrain, pred_interactions=True) -See examples `here -`__. +See examples `here `__. Multi-node Multi-GPU Training ============================= @@ -61,7 +54,7 @@ XGBoost supports fully distributed GPU training using `Dask ` Memory usage ============ -The following are some guidelines on the device memory usage of the `gpu_hist` tree method. +The following are some guidelines on the device memory usage of the ``hist`` tree method on GPU. Memory inside xgboost training is generally allocated for two reasons - storing the dataset and working memory. @@ -79,7 +72,7 @@ XGBoost models trained on GPUs can be used on CPU-only systems to generate predi Developer notes =============== -The application may be profiled with annotations by specifying USE_NTVX to cmake. Regions covered by the 'Monitor' class in CUDA code will automatically appear in the nsight profiler when `verbosity` is set to 3. +The application may be profiled with annotations by specifying ``USE_NTVX`` to cmake. Regions covered by the 'Monitor' class in CUDA code will automatically appear in the nsight profiler when `verbosity` is set to 3. ********** References diff --git a/doc/parameter.rst b/doc/parameter.rst index d628d161b..382cddd4f 100644 --- a/doc/parameter.rst +++ b/doc/parameter.rst @@ -55,10 +55,6 @@ General Parameters - Flag to disable default metric. Set to 1 or ``true`` to disable. -* ``num_feature`` [set automatically by XGBoost, no need to be set by user] - - - Feature dimension used in boosting, set to maximum dimension of the feature - * ``device`` [default= ``cpu``] .. versionadded:: 2.0.0 @@ -164,7 +160,7 @@ Parameters for Tree Booster - ``grow_colmaker``: non-distributed column-based construction of trees. - ``grow_histmaker``: distributed tree construction with row-based data splitting based on global proposal of histogram counting. - ``grow_quantile_histmaker``: Grow tree using quantized histogram. - - ``grow_gpu_hist``: Grow tree with GPU. Same as setting tree method to ``hist`` and use ``device=cuda``. + - ``grow_gpu_hist``: Grow tree with GPU. Same as setting ``tree_method`` to ``hist`` and use ``device=cuda``. - ``sync``: synchronizes trees in all distributed nodes. - ``refresh``: refreshes tree's statistics and/or leaf values based on the current data. Note that no random subsampling of data rows is performed. - ``prune``: prunes the splits where loss < min_split_loss (or gamma) and nodes that have depth greater than ``max_depth``. @@ -421,7 +417,7 @@ Specify the learning task and the corresponding learning objective. The objectiv .. math:: - AP@l = \frac{1}{min{(l, N)}}\sum^l_{k=1}P@k \cdot I_{(k)} + AP@l = \frac{1}{min{(l, N)}}\sum^l_{k=1}P@k \cdot I_{(k)} where :math:`I_{(k)}` is an indicator function that equals to :math:`1` when the document at :math:`k` is relevant and :math:`0` otherwise. The :math:`P@k` is the precision at :math:`k`, and :math:`N` is the total number of relevant documents. Lastly, the `mean average precision` is defined as the weighted average across all queries. diff --git a/doc/python/python_intro.rst b/doc/python/python_intro.rst index 505556383..bb74e7bc3 100644 --- a/doc/python/python_intro.rst +++ b/doc/python/python_intro.rst @@ -310,8 +310,8 @@ for more info. .. code-block:: python - # Use "gpu_hist" for training the model. - reg = xgb.XGBRegressor(tree_method="gpu_hist") + # Use "hist" for training the model. + reg = xgb.XGBRegressor(tree_method="hist", device="cuda") # Fit the model using predictor X and response y. reg.fit(X, y) # Save model into JSON format. diff --git a/doc/tutorials/dask.rst b/doc/tutorials/dask.rst index 7fde35b0e..131929b24 100644 --- a/doc/tutorials/dask.rst +++ b/doc/tutorials/dask.rst @@ -56,7 +56,6 @@ on a dask cluster: dtrain = xgb.dask.DaskDMatrix(client, X, y) # or # dtrain = xgb.dask.DaskQuantileDMatrix(client, X, y) - # `DaskQuantileDMatrix` is available for the `hist` and `gpu_hist` tree method. output = xgb.dask.train( client, @@ -149,7 +148,7 @@ Also for inplace prediction: .. code-block:: python # where X is a dask DataFrame or dask Array backed by cupy or cuDF. - booster.set_param({"device": "cuda:0"}) + booster.set_param({"device": "cuda"}) prediction = xgb.dask.inplace_predict(client, booster, X) When input is ``da.Array`` object, output is always ``da.Array``. However, if the input @@ -225,6 +224,12 @@ collection. main(client) +**************** +GPU acceleration +**************** + +For most of the use cases with GPUs, the `Dask-CUDA `__ project should be used to create the cluster, which automatically configures the correct device ordinal for worker processes. As a result, users should NOT specify the ordinal (good: ``device=cuda``, bad: ``device=cuda:1``). See :ref:`sphx_glr_python_dask-examples_gpu_training.py` and :ref:`sphx_glr_python_dask-examples_sklearn_gpu_training.py` for worked examples. + *************************** Working with other clusters *************************** @@ -262,7 +267,7 @@ In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernete regressor = xgb.dask.DaskXGBRegressor(n_estimators=10, missing=0.0) regressor.client = client - regressor.set_params(tree_method='gpu_hist') + regressor.set_params(tree_method='hist', device="cuda") regressor.fit(X, y, eval_set=[(X, y)]) diff --git a/include/xgboost/c_api.h b/include/xgboost/c_api.h index 2a7d51393..8844b853b 100644 --- a/include/xgboost/c_api.h +++ b/include/xgboost/c_api.h @@ -478,7 +478,7 @@ XGB_DLL int XGDMatrixCreateFromCallback(DataIterHandle iter, DMatrixHandle proxy * \param config JSON encoded parameters for DMatrix construction. Accepted fields are: * - missing: Which value to represent missing value * - nthread (optional): Number of threads used for initializing DMatrix. - * - max_bin (optional): Maximum number of bins for building histogram. + * - max_bin (optional): Maximum number of bins for building histogram. * \param out The created Device Quantile DMatrix * * \return 0 when success, -1 when failure happens diff --git a/python-package/xgboost/core.py b/python-package/xgboost/core.py index 0250dd293..86c49e0ff 100644 --- a/python-package/xgboost/core.py +++ b/python-package/xgboost/core.py @@ -1451,7 +1451,7 @@ class QuantileDMatrix(DMatrix): enable_categorical: bool = False, data_split_mode: DataSplitMode = DataSplitMode.ROW, ) -> None: - self.max_bin: int = max_bin if max_bin is not None else 256 + self.max_bin = max_bin self.missing = missing if missing is not None else np.nan self.nthread = nthread if nthread is not None else -1 self._silent = silent # unused, kept for compatibility diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 35c5c009f..32dd2a4a7 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -82,6 +82,7 @@ from .sklearn import ( XGBRanker, XGBRankerMixIn, XGBRegressorBase, + _can_use_qdm, _check_rf_callback, _cls_predict_proba, _objective_decorator, @@ -617,14 +618,7 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902 if self._iter == len(self._data): # Return 0 when there's no more batch. return 0 - feature_names: Optional[FeatureNames] = None - if self._feature_names: - feature_names = self._feature_names - else: - if hasattr(self.data(), "columns"): - feature_names = self.data().columns.format() - else: - feature_names = None + input_data( data=self.data(), label=self._get("_label"), @@ -634,7 +628,7 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902 base_margin=self._get("_base_margin"), label_lower_bound=self._get("_label_lower_bound"), label_upper_bound=self._get("_label_upper_bound"), - feature_names=feature_names, + feature_names=self._feature_names, feature_types=self._feature_types, feature_weights=self._feature_weights, ) @@ -935,6 +929,12 @@ async def _train_async( raise NotImplementedError( f"booster `{params['booster']}` is not yet supported for dask." ) + device = params.get("device", None) + if device and device.find(":") != -1: + raise ValueError( + "The dask interface for XGBoost doesn't support selecting specific device" + " ordinal. Use `device=cpu` or `device=cuda` instead." + ) def dispatched_train( parameters: Dict, @@ -1574,7 +1574,7 @@ async def _async_wrap_evaluation_matrices( """A switch function for async environment.""" def _dispatch(ref: Optional[DaskDMatrix], **kwargs: Any) -> DaskDMatrix: - if tree_method in ("hist", "gpu_hist"): + if _can_use_qdm(tree_method): return DaskQuantileDMatrix( client=client, ref=ref, max_bin=max_bin, **kwargs ) diff --git a/python-package/xgboost/sklearn.py b/python-package/xgboost/sklearn.py index e9f9e9f10..d69cb3a01 100644 --- a/python-package/xgboost/sklearn.py +++ b/python-package/xgboost/sklearn.py @@ -76,6 +76,10 @@ def _check_rf_callback( ) +def _can_use_qdm(tree_method: Optional[str]) -> bool: + return tree_method in ("hist", "gpu_hist", None, "auto") + + SklObjective = Optional[ Union[str, Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]] ] @@ -939,7 +943,7 @@ class XGBModel(XGBModelBase): def _create_dmatrix(self, ref: Optional[DMatrix], **kwargs: Any) -> DMatrix: # Use `QuantileDMatrix` to save memory. - if self.tree_method in ("hist", "gpu_hist"): + if _can_use_qdm(self.tree_method) and self.booster != "gblinear": try: return QuantileDMatrix( **kwargs, ref=ref, nthread=self.n_jobs, max_bin=self.max_bin diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index a170fbf9f..283999c6d 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -61,7 +61,7 @@ import xgboost from xgboost import XGBClassifier from xgboost.compat import is_cudf_available from xgboost.core import Booster -from xgboost.sklearn import DEFAULT_N_ESTIMATORS, XGBModel +from xgboost.sklearn import DEFAULT_N_ESTIMATORS, XGBModel, _can_use_qdm from xgboost.training import train as worker_train from .data import ( @@ -901,7 +901,7 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable): context = BarrierTaskContext.get() dev_ordinal = None - use_hist = booster_params.get("tree_method", None) in ("hist", "gpu_hist") + use_qdm = _can_use_qdm(booster_params.get("tree_method", None)) if use_gpu: dev_ordinal = ( @@ -912,9 +912,7 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable): # because without cuDF, DMatrix performs better than QDM. # Note: Checking `is_cudf_available` in spark worker side because # spark worker might has different python environment with driver side. - use_qdm = use_hist and is_cudf_available() - else: - use_qdm = use_hist + use_qdm = use_qdm and is_cudf_available() if use_qdm and (booster_params.get("max_bin", None) is not None): dmatrix_kwargs["max_bin"] = booster_params["max_bin"] diff --git a/src/c_api/c_api.cc b/src/c_api/c_api.cc index 4e1f86ff2..0c98c0198 100644 --- a/src/c_api/c_api.cc +++ b/src/c_api/c_api.cc @@ -81,13 +81,6 @@ void XGBBuildInfoDevice(Json *p_info) { } // namespace xgboost #endif -namespace { -void DeprecatedFunc(StringView old, StringView since, StringView replacement) { - LOG(WARNING) << "`" << old << "` is deprecated since" << since << ", use `" << replacement - << "` instead."; -} -} // anonymous namespace - XGB_DLL int XGBuildInfo(char const **out) { API_BEGIN(); xgboost_CHECK_C_ARG_PTR(out); @@ -328,7 +321,7 @@ XGB_DLL int XGDeviceQuantileDMatrixCreateFromCallback(DataIterHandle iter, DMatr int nthread, int max_bin, DMatrixHandle *out) { API_BEGIN(); - DeprecatedFunc(__func__, "1.7.0", "XGQuantileDMatrixCreateFromCallback"); + LOG(WARNING) << error::DeprecatedFunc(__func__, "1.7.0", "XGQuantileDMatrixCreateFromCallback"); *out = new std::shared_ptr{ xgboost::DMatrix::Create(iter, proxy, nullptr, reset, next, missing, nthread, max_bin)}; API_END(); @@ -432,7 +425,7 @@ XGB_DLL int XGDMatrixCreateFromCSREx(const size_t *indptr, const unsigned *indic const bst_float *data, size_t nindptr, size_t nelem, size_t num_col, DMatrixHandle *out) { API_BEGIN(); - DeprecatedFunc(__func__, "2.0.0", "XGDMatrixCreateFromCSR"); + LOG(WARNING) << error::DeprecatedFunc(__func__, "2.0.0", "XGDMatrixCreateFromCSR"); data::CSRAdapter adapter(indptr, indices, data, nindptr - 1, nelem, num_col); *out = new std::shared_ptr(DMatrix::Create(&adapter, std::nan(""), 1)); API_END(); @@ -496,7 +489,7 @@ XGB_DLL int XGDMatrixCreateFromCSCEx(const size_t *col_ptr, const unsigned *indi const bst_float *data, size_t nindptr, size_t, size_t num_row, DMatrixHandle *out) { API_BEGIN(); - DeprecatedFunc(__func__, "2.0.0", "XGDMatrixCreateFromCSC"); + LOG(WARNING) << error::DeprecatedFunc(__func__, "2.0.0", "XGDMatrixCreateFromCSC"); data::CSCAdapter adapter(col_ptr, indices, data, nindptr - 1, num_row); xgboost_CHECK_C_ARG_PTR(out); *out = new std::shared_ptr(DMatrix::Create(&adapter, std::nan(""), 1)); @@ -1347,7 +1340,7 @@ XGB_DLL int XGBoosterGetModelRaw(BoosterHandle handle, xgboost::bst_ulong *out_l raw_str.resize(0); common::MemoryBufferStream fo(&raw_str); - DeprecatedFunc(__func__, "1.6.0", "XGBoosterSaveModelToBuffer"); + LOG(WARNING) << error::DeprecatedFunc(__func__, "1.6.0", "XGBoosterSaveModelToBuffer"); learner->Configure(); learner->SaveModel(&fo); diff --git a/src/common/error_msg.cc b/src/common/error_msg.cc index bb57014a6..593c7d6de 100644 --- a/src/common/error_msg.cc +++ b/src/common/error_msg.cc @@ -3,10 +3,18 @@ */ #include "error_msg.h" +#include // for stringstream + #include "../collective/communicator-inl.h" // for GetRank #include "xgboost/logging.h" namespace xgboost::error { +std::string DeprecatedFunc(StringView old, StringView since, StringView replacement) { + std::stringstream ss; + ss << "`" << old << "` is deprecated since" << since << ", use `" << replacement << "` instead."; + return ss.str(); +} + void WarnDeprecatedGPUHist() { auto msg = "The tree method `gpu_hist` is deprecated since 2.0.0. To use GPU training, set the `device` " @@ -34,8 +42,9 @@ void WarnDeprecatedGPUId() { if (logged) { return; } - LOG(WARNING) << "`gpu_id` is deprecated in favor of the new `device` parameter: " - << "device = cpu/cuda/cuda:0"; + auto msg = DeprecatedFunc("gpu_id", "2.0.0", "device"); + msg += " E.g. device=cpu/cuda/cuda:0"; + LOG(WARNING) << msg; logged = true; } diff --git a/src/common/error_msg.h b/src/common/error_msg.h index 07b5c3e53..8bdc85999 100644 --- a/src/common/error_msg.h +++ b/src/common/error_msg.h @@ -8,6 +8,7 @@ #include // for uint64_t #include // for numeric_limits +#include // for string #include "xgboost/base.h" // for bst_feature_t #include "xgboost/logging.h" @@ -86,5 +87,7 @@ void WarnManualUpdater(); void WarnDeprecatedGPUId(); void WarnEmptyDataset(); + +std::string DeprecatedFunc(StringView old, StringView since, StringView replacement); } // namespace xgboost::error #endif // XGBOOST_COMMON_ERROR_MSG_H_ diff --git a/src/learner.cc b/src/learner.cc index 03714a056..2f453ea30 100644 --- a/src/learner.cc +++ b/src/learner.cc @@ -690,19 +690,20 @@ class LearnerConfiguration : public Learner { stack.pop(); auto const &obj = get(j_obj); - for (auto const &kv : obj) { + for (auto const& kv : obj) { if (is_parameter(kv.first)) { auto parameter = get(kv.second); - std::transform(parameter.begin(), parameter.end(), std::back_inserter(keys), - [](std::pair const& kv) { - return kv.first; - }); + std::transform( + parameter.begin(), parameter.end(), std::back_inserter(keys), + [](std::pair const& kv) { return kv.first; }); } else if (IsA(kv.second)) { stack.push(kv.second); - } else if (kv.first == "metrics") { + } else if (IsA(kv.second)) { auto const& array = get(kv.second); for (auto const& v : array) { - stack.push(v); + if (IsA(v) || IsA(v)) { + stack.push(v); + } } } } diff --git a/tests/ci_build/lint_python.py b/tests/ci_build/lint_python.py index 08baa844b..ca5d56e4c 100644 --- a/tests/ci_build/lint_python.py +++ b/tests/ci_build/lint_python.py @@ -32,6 +32,7 @@ class LintersPaths: "tests/test_distributed/test_with_spark/", "tests/test_distributed/test_gpu_with_spark/", # demo + "demo/dask/", "demo/json-model/json_parser.py", "demo/guide-python/cat_in_the_dat.py", "demo/guide-python/categorical.py", @@ -42,6 +43,8 @@ class LintersPaths: "demo/guide-python/quantile_regression.py", "demo/guide-python/multioutput_regression.py", "demo/guide-python/learning_to_rank.py", + "demo/guide-python/quantile_data_iterator.py", + "demo/guide-python/update_process.py", "demo/aft_survival/aft_survival_viz_demo.py", # CI "tests/ci_build/lint_python.py", diff --git a/tests/python/test_quantile_dmatrix.py b/tests/python/test_quantile_dmatrix.py index c1ec23ea3..b7428dfac 100644 --- a/tests/python/test_quantile_dmatrix.py +++ b/tests/python/test_quantile_dmatrix.py @@ -322,3 +322,15 @@ class TestQuantileDMatrix: X: np.ndarray = np.array(orig, dtype=dtype) with pytest.raises(ValueError): xgb.QuantileDMatrix(X) + + def test_changed_max_bin(self) -> None: + n_samples = 128 + n_features = 16 + csr, y = make_sparse_regression(n_samples, n_features, 0.5, False) + Xy = xgb.QuantileDMatrix(csr, y, max_bin=9) + booster = xgb.train({"max_bin": 9}, Xy, num_boost_round=2) + + Xy = xgb.QuantileDMatrix(csr, y, max_bin=11) + + with pytest.raises(ValueError, match="consistent"): + xgb.train({}, Xy, num_boost_round=2, xgb_model=booster) diff --git a/tests/python/test_updaters.py b/tests/python/test_updaters.py index 2027942fe..029911bf0 100644 --- a/tests/python/test_updaters.py +++ b/tests/python/test_updaters.py @@ -27,7 +27,7 @@ def train_result(param, dmat, num_rounds): param, dmat, num_rounds, - [(dmat, "train")], + evals=[(dmat, "train")], verbose_eval=False, evals_result=result, ) @@ -169,13 +169,21 @@ class TestTreeMethod: hist_res = {} exact_res = {} - xgb.train(ag_param, ag_dtrain, 10, - [(ag_dtrain, 'train'), (ag_dtest, 'test')], - evals_result=hist_res) + xgb.train( + ag_param, + ag_dtrain, + 10, + evals=[(ag_dtrain, "train"), (ag_dtest, "test")], + evals_result=hist_res + ) ag_param["tree_method"] = "exact" - xgb.train(ag_param, ag_dtrain, 10, - [(ag_dtrain, 'train'), (ag_dtest, 'test')], - evals_result=exact_res) + xgb.train( + ag_param, + ag_dtrain, + 10, + evals=[(ag_dtrain, "train"), (ag_dtest, "test")], + evals_result=exact_res + ) assert hist_res['train']['auc'] == exact_res['train']['auc'] assert hist_res['test']['auc'] == exact_res['test']['auc'] diff --git a/tests/python/test_with_sklearn.py b/tests/python/test_with_sklearn.py index b961db2c4..9a58b7277 100644 --- a/tests/python/test_with_sklearn.py +++ b/tests/python/test_with_sklearn.py @@ -1349,10 +1349,11 @@ def test_multilabel_classification() -> None: np.testing.assert_allclose(clf.predict(X), predt) -def test_data_initialization(): +def test_data_initialization() -> None: from sklearn.datasets import load_digits + X, y = load_digits(return_X_y=True) - validate_data_initialization(xgb.DMatrix, xgb.XGBClassifier, X, y) + validate_data_initialization(xgb.QuantileDMatrix, xgb.XGBClassifier, X, y) @parametrize_with_checks([xgb.XGBRegressor()]) diff --git a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py index 047093700..9386486de 100644 --- a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py +++ b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py @@ -1,10 +1,9 @@ """Copyright 2019-2022 XGBoost contributors""" import asyncio -import os -import subprocess +import json from collections import OrderedDict from inspect import signature -from typing import Any, Dict, Type, TypeVar, Union +from typing import Any, Dict, Type, TypeVar import numpy as np import pytest @@ -64,7 +63,7 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None: dtrain = DMatrixT(client, X, y) out = dxgb.train( client, - {"tree_method": "gpu_hist", "debug_synchronize": True}, + {"tree_method": "hist", "debug_synchronize": True, "device": "cuda"}, dtrain=dtrain, evals=[(dtrain, "X")], num_boost_round=4, @@ -116,12 +115,18 @@ def run_with_dask_array(DMatrixT: Type, client: Client) -> None: dtrain = DMatrixT(client, X, y) out = dxgb.train( client, - {"tree_method": "gpu_hist", "debug_synchronize": True}, + {"tree_method": "hist", "debug_synchronize": True, "device": "cuda"}, dtrain=dtrain, evals=[(dtrain, "X")], num_boost_round=2, ) from_dmatrix = dxgb.predict(client, out, dtrain).compute() + assert ( + json.loads(out["booster"].save_config())["learner"]["gradient_booster"][ + "updater" + ][0]["name"] + == "grow_gpu_hist" + ) inplace_predictions = dxgb.inplace_predict(client, out, X).compute() single_node = out["booster"].predict(xgb.DMatrix(X.compute())) np.testing.assert_allclose(single_node, from_dmatrix) @@ -149,7 +154,8 @@ def run_gpu_hist( DMatrixT: Type, client: Client, ) -> None: - params["tree_method"] = "gpu_hist" + params["tree_method"] = "hist" + params["device"] = "cuda" params = dataset.set_params(params) # It doesn't make sense to distribute a completely # empty dataset. @@ -196,11 +202,11 @@ def run_gpu_hist( def test_tree_stats() -> None: with LocalCUDACluster(n_workers=1) as cluster: with Client(cluster) as client: - local = run_tree_stats(client, "gpu_hist") + local = run_tree_stats(client, "hist", "cuda") with LocalCUDACluster(n_workers=2) as cluster: with Client(cluster) as client: - distributed = run_tree_stats(client, "gpu_hist") + distributed = run_tree_stats(client, "hist", "cuda") assert local == distributed @@ -214,12 +220,12 @@ class TestDistributedGPU: X_, y_ = load_breast_cancer(return_X_y=True) X = dd.from_array(X_, chunksize=100).map_partitions(cudf.from_pandas) y = dd.from_array(y_, chunksize=100).map_partitions(cudf.from_pandas) - run_boost_from_prediction(X, y, "gpu_hist", local_cuda_client) + run_boost_from_prediction(X, y, "hist", "cuda", local_cuda_client) X_, y_ = load_iris(return_X_y=True) X = dd.from_array(X_, chunksize=50).map_partitions(cudf.from_pandas) y = dd.from_array(y_, chunksize=50).map_partitions(cudf.from_pandas) - run_boost_from_prediction_multi_class(X, y, "gpu_hist", local_cuda_client) + run_boost_from_prediction_multi_class(X, y, "hist", "cuda", local_cuda_client) def test_init_estimation(self, local_cuda_client: Client) -> None: check_init_estimation("gpu_hist", local_cuda_client) @@ -282,7 +288,7 @@ class TestDistributedGPU: ) result = xgb.dask.train( client, - {"tree_method": "gpu_hist"}, + {"tree_method": "hist", "device": "cuda", "debug_synchronize": True}, Xy, num_boost_round=10, evals=[(Xy_valid, "Valid")], @@ -313,7 +319,8 @@ class TestDistributedGPU: { "objective": "binary:logistic", "eval_metric": "error", - "tree_method": "gpu_hist", + "tree_method": "hist", + "device": "cuda", }, m, evals=[(valid, "Valid")], @@ -328,7 +335,8 @@ class TestDistributedGPU: valid_y = y cls = dxgb.DaskXGBClassifier( objective="binary:logistic", - tree_method="gpu_hist", + tree_method="hist", + device="cuda", eval_metric="error", n_estimators=100, ) @@ -356,7 +364,11 @@ class TestDistributedGPU: run_dask_classifier(X, y, w, model, "gpu_hist", local_cuda_client, 10) def test_empty_dmatrix(self, local_cuda_client: Client) -> None: - parameters = {"tree_method": "gpu_hist", "debug_synchronize": True} + parameters = { + "tree_method": "hist", + "debug_synchronize": True, + "device": "cuda", + } run_empty_dmatrix_reg(local_cuda_client, parameters) run_empty_dmatrix_cls(local_cuda_client, parameters) @@ -374,7 +386,11 @@ class TestDistributedGPU: "y": [10, 20, 30, 40.0, 50] * mult, } ) - parameters = {"tree_method": "gpu_hist", "debug_synchronize": True} + parameters = { + "tree_method": "hist", + "debug_synchronize": True, + "device": "cuda", + } empty = df.iloc[:0] ddf = dask_cudf.concat( @@ -432,13 +448,25 @@ class TestDistributedGPU: def test_empty_dmatrix_auc(self, local_cuda_client: Client) -> None: n_workers = len(tm.get_client_workers(local_cuda_client)) - run_empty_dmatrix_auc(local_cuda_client, "gpu_hist", n_workers) + run_empty_dmatrix_auc(local_cuda_client, "cuda", n_workers) def test_auc(self, local_cuda_client: Client) -> None: - run_auc(local_cuda_client, "gpu_hist") + run_auc(local_cuda_client, "cuda") + + def test_invalid_ordinal(self, local_cuda_client: Client) -> None: + """One should not specify the device ordinal with dask.""" + with pytest.raises(ValueError, match="device=cuda"): + X, y, _ = generate_array() + m = dxgb.DaskDMatrix(local_cuda_client, X, y) + dxgb.train(local_cuda_client, {"device": "cuda:0"}, m) + + booster = dxgb.train(local_cuda_client, {"device": "cuda"}, m)["booster"] + assert ( + json.loads(booster.save_config())["learner"]["generic_param"]["device"] + == "cuda:0" + ) def test_data_initialization(self, local_cuda_client: Client) -> None: - X, y, _ = generate_array() fw = da.random.random((random_cols,)) fw = fw - fw.min() @@ -531,7 +559,9 @@ async def run_from_dask_array_asyncio(scheduler_address: str) -> dxgb.TrainRetur y = y.map_blocks(cp.array) m = await xgb.dask.DaskQuantileDMatrix(client, X, y) - output = await xgb.dask.train(client, {"tree_method": "gpu_hist"}, dtrain=m) + output = await xgb.dask.train( + client, {"tree_method": "hist", "device": "cuda"}, dtrain=m + ) with_m = await xgb.dask.predict(client, output, m) with_X = await xgb.dask.predict(client, output, X) diff --git a/tests/test_distributed/test_with_dask/test_with_dask.py b/tests/test_distributed/test_with_dask/test_with_dask.py index cab4188a8..66c6058a5 100644 --- a/tests/test_distributed/test_with_dask/test_with_dask.py +++ b/tests/test_distributed/test_with_dask/test_with_dask.py @@ -45,7 +45,7 @@ from xgboost.testing.dask import check_init_estimation, check_uneven_nan dask.config.set({"distributed.scheduler.allowed-failures": False}) -if hasattr(HealthCheck, 'function_scoped_fixture'): +if hasattr(HealthCheck, "function_scoped_fixture"): suppress = [HealthCheck.function_scoped_fixture] else: suppress = hypothesis.utils.conventions.not_set # type:ignore @@ -131,7 +131,9 @@ def generate_array( return X, y, None -def deterministic_persist_per_worker(df: dd.DataFrame, client: "Client") -> dd.DataFrame: +def deterministic_persist_per_worker( + df: dd.DataFrame, client: "Client" +) -> dd.DataFrame: # Got this script from https://github.com/dmlc/xgboost/issues/7927 # Query workers n_workers = len(client.cluster.workers) @@ -196,6 +198,7 @@ def test_xgbclassifier_classes_type_and_value(to_frame: bool, client: "Client"): X, y = make_classification(n_samples=1000, n_features=4, random_state=123) if to_frame: import pandas as pd + feats = [f"var_{i}" for i in range(4)] df = pd.DataFrame(X, columns=feats) df["target"] = y @@ -219,7 +222,7 @@ def test_from_dask_dataframe() -> None: y = dd.from_dask_array(y) dtrain = DaskDMatrix(client, X, y) - booster = xgb.dask.train(client, {}, dtrain, num_boost_round=2)['booster'] + booster = xgb.dask.train(client, {}, dtrain, num_boost_round=2)["booster"] prediction = xgb.dask.predict(client, model=booster, data=dtrain) @@ -230,7 +233,8 @@ def test_from_dask_dataframe() -> None: with pytest.raises(TypeError): # evals_result is not supported in dask interface. xgb.dask.train( # type:ignore - client, {}, dtrain, num_boost_round=2, evals_result={}) + client, {}, dtrain, num_boost_round=2, evals_result={} + ) # force prediction to be computed from_dmatrix = prediction.compute() @@ -243,8 +247,9 @@ def test_from_dask_dataframe() -> None: series_predictions = xgb.dask.inplace_predict(client, booster, X) assert isinstance(series_predictions, dd.Series) - np.testing.assert_allclose(series_predictions.compute().values, - from_dmatrix) + np.testing.assert_allclose( + series_predictions.compute().values, from_dmatrix + ) # Make sure the output can be integrated back to original dataframe X["predict"] = prediction @@ -303,7 +308,8 @@ def test_dask_sparse(client: "Client") -> None: clf.fit(X, y, eval_set=[(X, y)]) sparse_results = clf.evals_result() np.testing.assert_allclose( - dense_results["validation_0"]["mlogloss"], sparse_results["validation_0"]["mlogloss"] + dense_results["validation_0"]["mlogloss"], + sparse_results["validation_0"]["mlogloss"], ) @@ -357,7 +363,7 @@ def run_categorical(client: "Client", tree_method: str, X, X_onehot, y) -> None: n_estimators=10, tree_method=tree_method, # force onehot - max_cat_to_onehot=9999 + max_cat_to_onehot=9999, ) reg.fit(X, y) @@ -435,10 +441,15 @@ def run_boost_from_prediction_multi_class( X: dd.DataFrame, y: dd.Series, tree_method: str, + device: str, client: "Client", ) -> None: model_0 = xgb.dask.DaskXGBClassifier( - learning_rate=0.3, n_estimators=4, tree_method=tree_method, max_bin=768 + learning_rate=0.3, + n_estimators=4, + tree_method=tree_method, + max_bin=768, + device=device, ) X, y, _ = deterministic_repartition(client, X, y, None) model_0.fit(X=X, y=y) @@ -448,7 +459,11 @@ def run_boost_from_prediction_multi_class( margin.columns = [f"m_{i}" for i in range(margin.shape[1])] model_1 = xgb.dask.DaskXGBClassifier( - learning_rate=0.3, n_estimators=4, tree_method=tree_method, max_bin=768 + learning_rate=0.3, + n_estimators=4, + tree_method=tree_method, + max_bin=768, + device=device, ) X, y, margin = deterministic_repartition(client, X, y, margin) model_1.fit(X=X, y=y, base_margin=margin) @@ -460,7 +475,11 @@ def run_boost_from_prediction_multi_class( ) model_2 = xgb.dask.DaskXGBClassifier( - learning_rate=0.3, n_estimators=8, tree_method=tree_method, max_bin=768 + learning_rate=0.3, + n_estimators=8, + tree_method=tree_method, + max_bin=768, + device=device, ) X, y, _ = deterministic_repartition(client, X, y, None) model_2.fit(X=X, y=y) @@ -483,19 +502,28 @@ def run_boost_from_prediction( X: dd.DataFrame, y: dd.Series, tree_method: str, + device: str, client: "Client", ) -> None: X, y = client.persist([X, y]) model_0 = xgb.dask.DaskXGBClassifier( - learning_rate=0.3, n_estimators=4, tree_method=tree_method, max_bin=512 + learning_rate=0.3, + n_estimators=4, + tree_method=tree_method, + max_bin=512, + device=device, ) X, y, _ = deterministic_repartition(client, X, y, None) model_0.fit(X=X, y=y) margin: dd.Series = model_0.predict(X, output_margin=True) model_1 = xgb.dask.DaskXGBClassifier( - learning_rate=0.3, n_estimators=4, tree_method=tree_method, max_bin=512 + learning_rate=0.3, + n_estimators=4, + tree_method=tree_method, + max_bin=512, + device=device, ) X, y, margin = deterministic_repartition(client, X, y, margin) model_1.fit(X=X, y=y, base_margin=margin) @@ -503,7 +531,11 @@ def run_boost_from_prediction( predictions_1: dd.Series = model_1.predict(X, base_margin=margin) model_2 = xgb.dask.DaskXGBClassifier( - learning_rate=0.3, n_estimators=8, tree_method=tree_method, max_bin=512 + learning_rate=0.3, + n_estimators=8, + tree_method=tree_method, + max_bin=512, + device=device, ) X, y, _ = deterministic_repartition(client, X, y, None) model_2.fit(X=X, y=y) @@ -539,17 +571,19 @@ def run_boost_from_prediction( @pytest.mark.parametrize("tree_method", ["hist", "approx"]) def test_boost_from_prediction(tree_method: str, client: "Client") -> None: from sklearn.datasets import load_breast_cancer, load_digits + X_, y_ = load_breast_cancer(return_X_y=True) X, y = dd.from_array(X_, chunksize=200), dd.from_array(y_, chunksize=200) - run_boost_from_prediction(X, y, tree_method, client) + run_boost_from_prediction(X, y, tree_method, "cpu", client) X_, y_ = load_digits(return_X_y=True) X, y = dd.from_array(X_, chunksize=100), dd.from_array(y_, chunksize=100) - run_boost_from_prediction_multi_class(X, y, tree_method, client) + run_boost_from_prediction_multi_class(X, y, tree_method, "cpu", client) def test_inplace_predict(client: "Client") -> None: from sklearn.datasets import load_diabetes + X_, y_ = load_diabetes(return_X_y=True) X, y = dd.from_array(X_, chunksize=32), dd.from_array(y_, chunksize=32) reg = xgb.dask.DaskXGBRegressor(n_estimators=4).fit(X, y) @@ -573,16 +607,14 @@ def test_dask_missing_value_reg(client: "Client") -> None: X = X.rechunk(20, 1) y = da.random.randint(0, 3, size=20) y.rechunk(20) - regressor = xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2, - missing=0.0) + regressor = xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2, missing=0.0) regressor.client = client - regressor.set_params(tree_method='hist') + regressor.set_params(tree_method="hist") regressor.fit(X, y, eval_set=[(X, y)]) dd_predt = regressor.predict(X).compute() np_X = X.compute() - np_predt = regressor.get_booster().predict( - xgb.DMatrix(np_X, missing=0.0)) + np_predt = regressor.get_booster().predict(xgb.DMatrix(np_X, missing=0.0)) np.testing.assert_allclose(np_predt, dd_predt) @@ -595,20 +627,19 @@ def test_dask_missing_value_cls(client: "Client") -> None: X = X.rechunk(20, None) y = da.random.randint(0, 3, size=kRows) y = y.rechunk(20, 1) - cls = xgb.dask.DaskXGBClassifier(verbosity=1, n_estimators=2, - tree_method='hist', - missing=0.0) + cls = xgb.dask.DaskXGBClassifier( + verbosity=1, n_estimators=2, tree_method="hist", missing=0.0 + ) cls.client = client cls.fit(X, y, eval_set=[(X, y)]) dd_pred_proba = cls.predict_proba(X).compute() np_X = X.compute() - np_pred_proba = cls.get_booster().predict( - xgb.DMatrix(np_X, missing=0.0)) + np_pred_proba = cls.get_booster().predict(xgb.DMatrix(np_X, missing=0.0)) np.testing.assert_allclose(np_pred_proba, dd_pred_proba) cls = xgb.dask.DaskXGBClassifier() - assert hasattr(cls, 'missing') + assert hasattr(cls, "missing") @pytest.mark.parametrize("model", ["boosting", "rf"]) @@ -622,7 +653,7 @@ def test_dask_regressor(model: str, client: "Client") -> None: assert regressor._estimator_type == "regressor" assert sklearn.base.is_regressor(regressor) - regressor.set_params(tree_method='hist') + regressor.set_params(tree_method="hist") regressor.client = client regressor.fit(X, y, sample_weight=w, eval_set=[(X, y)]) prediction = regressor.predict(X) @@ -635,7 +666,7 @@ def test_dask_regressor(model: str, client: "Client") -> None: assert isinstance(prediction, da.Array) assert isinstance(history, dict) - assert list(history['validation_0'].keys())[0] == 'rmse' + assert list(history["validation_0"].keys())[0] == "rmse" forest = int( json.loads(regressor.get_booster().save_config())["learner"][ "gradient_booster" @@ -643,10 +674,10 @@ def test_dask_regressor(model: str, client: "Client") -> None: ) if model == "boosting": - assert len(history['validation_0']['rmse']) == 2 + assert len(history["validation_0"]["rmse"]) == 2 assert forest == 1 else: - assert len(history['validation_0']['rmse']) == 1 + assert len(history["validation_0"]["rmse"]) == 1 assert forest == 2 @@ -753,30 +784,38 @@ def test_empty_dmatrix_training_continuation(client: "Client") -> None: kRows, kCols = 1, 97 X = dd.from_array(np.random.randn(kRows, kCols)) y = dd.from_array(np.random.rand(kRows)) - X.columns = ['X' + str(i) for i in range(0, kCols)] + X.columns = ["X" + str(i) for i in range(0, kCols)] dtrain = xgb.dask.DaskDMatrix(client, X, y) kRows += 1000 X = dd.from_array(np.random.randn(kRows, kCols), chunksize=10) - X.columns = ['X' + str(i) for i in range(0, kCols)] + X.columns = ["X" + str(i) for i in range(0, kCols)] y = dd.from_array(np.random.rand(kRows), chunksize=10) valid = xgb.dask.DaskDMatrix(client, X, y) - out = xgb.dask.train(client, {'tree_method': 'hist'}, - dtrain=dtrain, num_boost_round=2, - evals=[(valid, 'validation')]) + out = xgb.dask.train( + client, + {"tree_method": "hist"}, + dtrain=dtrain, + num_boost_round=2, + evals=[(valid, "validation")], + ) - out = xgb.dask.train(client, {'tree_method': 'hist'}, - dtrain=dtrain, xgb_model=out['booster'], - num_boost_round=2, - evals=[(valid, 'validation')]) + out = xgb.dask.train( + client, + {"tree_method": "hist"}, + dtrain=dtrain, + xgb_model=out["booster"], + num_boost_round=2, + evals=[(valid, "validation")], + ) assert xgb.dask.predict(client, out, dtrain).compute().shape[0] == 1 def run_empty_dmatrix_reg(client: "Client", parameters: dict) -> None: def _check_outputs(out: xgb.dask.TrainReturnT, predictions: np.ndarray) -> None: - assert isinstance(out['booster'], xgb.dask.Booster) - for _, v in out['history']['validation'].items(): + assert isinstance(out["booster"], xgb.dask.Booster) + for _, v in out["history"]["validation"].items(): assert len(v) == 2 assert isinstance(predictions, np.ndarray) assert predictions.shape[0] == 1 @@ -786,12 +825,14 @@ def run_empty_dmatrix_reg(client: "Client", parameters: dict) -> None: y = dd.from_array(np.random.rand(kRows)) dtrain = xgb.dask.DaskDMatrix(client, X, y) - out = xgb.dask.train(client, parameters, - dtrain=dtrain, - evals=[(dtrain, 'validation')], - num_boost_round=2) - predictions = xgb.dask.predict(client=client, model=out, - data=dtrain).compute() + out = xgb.dask.train( + client, + parameters, + dtrain=dtrain, + evals=[(dtrain, "validation")], + num_boost_round=2, + ) + predictions = xgb.dask.predict(client=client, model=out, data=dtrain).compute() _check_outputs(out, predictions) # valid has more rows than train @@ -799,12 +840,14 @@ def run_empty_dmatrix_reg(client: "Client", parameters: dict) -> None: X = dd.from_array(np.random.randn(kRows, kCols)) y = dd.from_array(np.random.rand(kRows)) valid = xgb.dask.DaskDMatrix(client, X, y) - out = xgb.dask.train(client, parameters, - dtrain=dtrain, - evals=[(valid, 'validation')], - num_boost_round=2) - predictions = xgb.dask.predict(client=client, model=out, - data=dtrain).compute() + out = xgb.dask.train( + client, + parameters, + dtrain=dtrain, + evals=[(valid, "validation")], + num_boost_round=2, + ) + predictions = xgb.dask.predict(client=client, model=out, data=dtrain).compute() _check_outputs(out, predictions) # train has more rows than evals @@ -814,12 +857,14 @@ def run_empty_dmatrix_reg(client: "Client", parameters: dict) -> None: y = dd.from_array(np.random.rand(kRows)) dtrain = xgb.dask.DaskDMatrix(client, X, y) - out = xgb.dask.train(client, parameters, - dtrain=dtrain, - evals=[(valid, 'validation')], - num_boost_round=2) - predictions = xgb.dask.predict(client=client, model=out, - data=valid).compute() + out = xgb.dask.train( + client, + parameters, + dtrain=dtrain, + evals=[(valid, "validation")], + num_boost_round=2, + ) + predictions = xgb.dask.predict(client=client, model=out, data=valid).compute() _check_outputs(out, predictions) @@ -827,8 +872,8 @@ def run_empty_dmatrix_cls(client: "Client", parameters: dict) -> None: n_classes = 4 def _check_outputs(out: xgb.dask.TrainReturnT, predictions: np.ndarray) -> None: - assert isinstance(out['booster'], xgb.dask.Booster) - assert len(out['history']['validation']['merror']) == 2 + assert isinstance(out["booster"], xgb.dask.Booster) + assert len(out["history"]["validation"]["merror"]) == 2 assert isinstance(predictions, np.ndarray) assert predictions.shape[1] == n_classes, predictions.shape @@ -836,16 +881,18 @@ def run_empty_dmatrix_cls(client: "Client", parameters: dict) -> None: X = dd.from_array(np.random.randn(kRows, kCols)) y = dd.from_array(np.random.randint(low=0, high=n_classes, size=kRows)) dtrain = xgb.dask.DaskDMatrix(client, X, y) - parameters['objective'] = 'multi:softprob' - parameters['eval_metric'] = 'merror' - parameters['num_class'] = n_classes + parameters["objective"] = "multi:softprob" + parameters["eval_metric"] = "merror" + parameters["num_class"] = n_classes - out = xgb.dask.train(client, parameters, - dtrain=dtrain, - evals=[(dtrain, 'validation')], - num_boost_round=2) - predictions = xgb.dask.predict(client=client, model=out, - data=dtrain) + out = xgb.dask.train( + client, + parameters, + dtrain=dtrain, + evals=[(dtrain, "validation")], + num_boost_round=2, + ) + predictions = xgb.dask.predict(client=client, model=out, data=dtrain) assert predictions.shape[1] == n_classes predictions = predictions.compute() _check_outputs(out, predictions) @@ -857,25 +904,26 @@ def run_empty_dmatrix_cls(client: "Client", parameters: dict) -> None: y = dd.from_array(np.random.randint(low=0, high=n_classes, size=kRows)) dtrain = xgb.dask.DaskDMatrix(client, X, y) - out = xgb.dask.train(client, parameters, - dtrain=dtrain, - evals=[(valid, 'validation')], - num_boost_round=2) - predictions = xgb.dask.predict(client=client, model=out, - data=valid).compute() + out = xgb.dask.train( + client, + parameters, + dtrain=dtrain, + evals=[(valid, "validation")], + num_boost_round=2, + ) + predictions = xgb.dask.predict(client=client, model=out, data=valid).compute() _check_outputs(out, predictions) -def run_empty_dmatrix_auc(client: "Client", tree_method: str, n_workers: int) -> None: +def run_empty_dmatrix_auc(client: "Client", device: str, n_workers: int) -> None: from sklearn import datasets + n_samples = 100 n_features = 7 rng = np.random.RandomState(1994) make_classification = partial( - datasets.make_classification, - n_features=n_features, - random_state=rng + datasets.make_classification, n_features=n_features, random_state=rng ) # binary @@ -888,7 +936,7 @@ def run_empty_dmatrix_auc(client: "Client", tree_method: str, n_workers: int) -> valid_X = dd.from_array(valid_X_, chunksize=n_samples) valid_y = dd.from_array(valid_y_, chunksize=n_samples) - cls = xgb.dask.DaskXGBClassifier(tree_method=tree_method, n_estimators=2) + cls = xgb.dask.DaskXGBClassifier(device=device, n_estimators=2) cls.fit(X, y, eval_metric=["auc", "aucpr"], eval_set=[(valid_X, valid_y)]) # multiclass @@ -897,7 +945,7 @@ def run_empty_dmatrix_auc(client: "Client", tree_method: str, n_workers: int) -> n_classes=n_workers, n_informative=n_features, n_redundant=0, - n_repeated=0 + n_repeated=0, ) for i in range(y_.shape[0]): y_[i] = i % n_workers @@ -910,25 +958,26 @@ def run_empty_dmatrix_auc(client: "Client", tree_method: str, n_workers: int) -> n_classes=n_workers, n_informative=n_features, n_redundant=0, - n_repeated=0 + n_repeated=0, ) for i in range(valid_y_.shape[0]): valid_y_[i] = i % n_workers valid_X = dd.from_array(valid_X_, chunksize=n_samples) valid_y = dd.from_array(valid_y_, chunksize=n_samples) - cls = xgb.dask.DaskXGBClassifier(tree_method=tree_method, n_estimators=2) + cls = xgb.dask.DaskXGBClassifier(device=device, n_estimators=2) cls.fit(X, y, eval_metric=["auc", "aucpr"], eval_set=[(valid_X, valid_y)]) def test_empty_dmatrix_auc() -> None: with LocalCluster(n_workers=4, dashboard_address=":0") as cluster: with Client(cluster) as client: - run_empty_dmatrix_auc(client, "hist", 4) + run_empty_dmatrix_auc(client, "cpu", 4) -def run_auc(client: "Client", tree_method: str) -> None: +def run_auc(client: "Client", device: str) -> None: from sklearn import datasets + n_samples = 100 n_features = 97 rng = np.random.RandomState(1994) @@ -944,10 +993,10 @@ def run_auc(client: "Client", tree_method: str) -> None: valid_X = dd.from_array(valid_X_, chunksize=10) valid_y = dd.from_array(valid_y_, chunksize=10) - cls = xgb.XGBClassifier(tree_method=tree_method, n_estimators=2) + cls = xgb.XGBClassifier(device=device, n_estimators=2) cls.fit(X_, y_, eval_metric="auc", eval_set=[(valid_X_, valid_y_)]) - dcls = xgb.dask.DaskXGBClassifier(tree_method=tree_method, n_estimators=2) + dcls = xgb.dask.DaskXGBClassifier(device=device, n_estimators=2) dcls.fit(X, y, eval_metric="auc", eval_set=[(valid_X, valid_y)]) approx = dcls.evals_result()["validation_0"]["auc"] @@ -958,7 +1007,7 @@ def run_auc(client: "Client", tree_method: str) -> None: def test_auc(client: "Client") -> None: - run_auc(client, "hist") + run_auc(client, "cpu") # No test for Exact, as empty DMatrix handling are mostly for distributed @@ -967,10 +1016,10 @@ def test_auc(client: "Client") -> None: def test_empty_dmatrix(tree_method) -> None: with LocalCluster(n_workers=kWorkers, dashboard_address=":0") as cluster: with Client(cluster) as client: - parameters = {'tree_method': tree_method} + parameters = {"tree_method": tree_method} run_empty_dmatrix_reg(client, parameters) run_empty_dmatrix_cls(client, parameters) - parameters = {'tree_method': tree_method, "objective": "reg:absoluteerror"} + parameters = {"tree_method": tree_method, "objective": "reg:absoluteerror"} run_empty_dmatrix_reg(client, parameters) @@ -987,10 +1036,12 @@ async def run_from_dask_array_asyncio(scheduler_address: str) -> xgb.dask.TrainR assert isinstance(with_X, da.Array) assert isinstance(inplace, da.Array) - np.testing.assert_allclose(await client.compute(with_m), - await client.compute(with_X)) - np.testing.assert_allclose(await client.compute(with_m), - await client.compute(inplace)) + np.testing.assert_allclose( + await client.compute(with_m), await client.compute(with_X) + ) + np.testing.assert_allclose( + await client.compute(with_m), await client.compute(inplace) + ) return output @@ -998,7 +1049,7 @@ async def run_dask_regressor_asyncio(scheduler_address: str) -> None: async with Client(scheduler_address, asynchronous=True) as client: X, y, _ = generate_array() regressor = await xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2) - regressor.set_params(tree_method='hist') + regressor.set_params(tree_method="hist") regressor.client = client await regressor.fit(X, y, eval_set=[(X, y)]) prediction = await regressor.predict(X) @@ -1011,8 +1062,8 @@ async def run_dask_regressor_asyncio(scheduler_address: str) -> None: assert isinstance(prediction, da.Array) assert isinstance(history, dict) - assert list(history['validation_0'].keys())[0] == 'rmse' - assert len(history['validation_0']['rmse']) == 2 + assert list(history["validation_0"].keys())[0] == "rmse" + assert len(history["validation_0"]["rmse"]) == 2 awaited = await client.compute(prediction) assert awaited.shape[0] == kRows @@ -1023,7 +1074,8 @@ async def run_dask_classifier_asyncio(scheduler_address: str) -> None: X, y, _ = generate_array() y = (y * 10).astype(np.int32) classifier = await xgb.dask.DaskXGBClassifier( - verbosity=1, n_estimators=2, eval_metric='merror') + verbosity=1, n_estimators=2, eval_metric="merror" + ) classifier.client = client await classifier.fit(X, y, eval_set=[(X, y)]) prediction = await classifier.predict(X) @@ -1036,10 +1088,10 @@ async def run_dask_classifier_asyncio(scheduler_address: str) -> None: assert isinstance(prediction, da.Array) assert isinstance(history, dict) - assert list(history.keys())[0] == 'validation_0' - assert list(history['validation_0'].keys())[0] == 'merror' - assert len(list(history['validation_0'])) == 1 - assert len(history['validation_0']['merror']) == 2 + assert list(history.keys())[0] == "validation_0" + assert list(history["validation_0"].keys())[0] == "merror" + assert len(list(history["validation_0"])) == 1 + assert len(history["validation_0"]["merror"]) == 2 # Test .predict_proba() probas = await classifier.predict_proba(X) @@ -1065,8 +1117,8 @@ def test_with_asyncio() -> None: with Client(cluster) as client: address = client.scheduler.address output = asyncio.run(run_from_dask_array_asyncio(address)) - assert isinstance(output['booster'], xgb.Booster) - assert isinstance(output['history'], dict) + assert isinstance(output["booster"], xgb.Booster) + assert isinstance(output["history"], dict) asyncio.run(run_dask_regressor_asyncio(address)) asyncio.run(run_dask_classifier_asyncio(address)) @@ -1124,8 +1176,9 @@ def test_predict_with_meta(client: "Client") -> None: margin = da.random.random(kRows, partition_size) + 1e4 dtrain = DaskDMatrix(client, X, y, weight=w, base_margin=margin) - booster: xgb.Booster = xgb.dask.train( - client, {}, dtrain, num_boost_round=4)['booster'] + booster: xgb.Booster = xgb.dask.train(client, {}, dtrain, num_boost_round=4)[ + "booster" + ] prediction = xgb.dask.predict(client, model=booster, data=dtrain) assert prediction.ndim == 1 @@ -1141,41 +1194,41 @@ def test_predict_with_meta(client: "Client") -> None: def run_aft_survival(client: "Client", dmatrix_t: Type) -> None: - df = dd.read_csv( - os.path.join(tm.data_dir(__file__), "veterans_lung_cancer.csv") + df = dd.read_csv(os.path.join(tm.data_dir(__file__), "veterans_lung_cancer.csv")) + y_lower_bound = df["Survival_label_lower_bound"] + y_upper_bound = df["Survival_label_upper_bound"] + X = df.drop(["Survival_label_lower_bound", "Survival_label_upper_bound"], axis=1) + m = dmatrix_t( + client, X, label_lower_bound=y_lower_bound, label_upper_bound=y_upper_bound ) - y_lower_bound = df['Survival_label_lower_bound'] - y_upper_bound = df['Survival_label_upper_bound'] - X = df.drop(['Survival_label_lower_bound', - 'Survival_label_upper_bound'], axis=1) - m = dmatrix_t(client, X, label_lower_bound=y_lower_bound, - label_upper_bound=y_upper_bound) - base_params = {'verbosity': 0, - 'objective': 'survival:aft', - 'eval_metric': 'aft-nloglik', - 'learning_rate': 0.05, - 'aft_loss_distribution_scale': 1.20, - 'max_depth': 6, - 'lambda': 0.01, - 'alpha': 0.02} + base_params = { + "verbosity": 0, + "objective": "survival:aft", + "eval_metric": "aft-nloglik", + "learning_rate": 0.05, + "aft_loss_distribution_scale": 1.20, + "max_depth": 6, + "lambda": 0.01, + "alpha": 0.02, + } nloglik_rec = {} - dists = ['normal', 'logistic', 'extreme'] + dists = ["normal", "logistic", "extreme"] for dist in dists: params = base_params - params.update({'aft_loss_distribution': dist}) + params.update({"aft_loss_distribution": dist}) evals_result = {} - out = xgb.dask.train(client, params, m, num_boost_round=100, - evals=[(m, 'train')]) - evals_result = out['history'] - nloglik_rec[dist] = evals_result['train']['aft-nloglik'] + out = xgb.dask.train( + client, params, m, num_boost_round=100, evals=[(m, "train")] + ) + evals_result = out["history"] + nloglik_rec[dist] = evals_result["train"]["aft-nloglik"] # AFT metric (negative log likelihood) improve monotonically - assert all(p >= q for p, q in zip(nloglik_rec[dist], - nloglik_rec[dist][:1])) + assert all(p >= q for p, q in zip(nloglik_rec[dist], nloglik_rec[dist][:1])) # For this data, normal distribution works the best - assert nloglik_rec['normal'][-1] < 4.9 - assert nloglik_rec['logistic'][-1] > 4.9 - assert nloglik_rec['extreme'][-1] > 4.9 + assert nloglik_rec["normal"][-1] < 4.9 + assert nloglik_rec["logistic"][-1] > 4.9 + assert nloglik_rec["extreme"][-1] > 4.9 def test_dask_aft_survival() -> None: @@ -1244,7 +1297,7 @@ def test_dask_predict_leaf(booster: str, client: "Client") -> None: leaf = xgb.dask.predict( client, cls.get_booster(), - X.to_dask_array(), # we can't map_blocks on dataframe when output is 4-dim. + X.to_dask_array(), # we can't map_blocks on dataframe when output is 4-dim. pred_leaf=True, strict_shape=True, validate_features=False, @@ -1304,7 +1357,7 @@ class TestWithDask: path = os.path.join(tmpdir, f"{rank}.bin") Xy.save_binary(path) - def load_dmatrix(rabit_args: Dict[str, Union[int,str]], tmpdir: str) -> None: + def load_dmatrix(rabit_args: Dict[str, Union[int, str]], tmpdir: str) -> None: with xgb.dask.CommunicatorContext(**rabit_args): rank = xgb.collective.get_rank() path = os.path.join(tmpdir, f"{rank}.bin") @@ -1339,22 +1392,21 @@ class TestWithDask: futures.append(f) client.gather(futures) - @pytest.mark.parametrize('config_key,config_value', [('verbosity', 0), ('use_rmm', True)]) + @pytest.mark.parametrize( + "config_key,config_value", [("verbosity", 0), ("use_rmm", True)] + ) def test_global_config( - self, - client: "Client", - config_key: str, - config_value: Any + self, client: "Client", config_key: str, config_value: Any ) -> None: X, y, _ = generate_array() xgb.config.set_config(**{config_key: config_value}) dtrain = DaskDMatrix(client, X, y) - before_fname = './before_training-test_global_config' - after_fname = './after_training-test_global_config' + before_fname = "./before_training-test_global_config" + after_fname = "./after_training-test_global_config" class TestCallback(xgb.callback.TrainingCallback): def write_file(self, fname: str) -> None: - with open(fname, 'w') as fd: + with open(fname, "w") as fd: fd.write(str(xgb.config.get_config()[config_key])) def before_training(self, model: xgb.Booster) -> xgb.Booster: @@ -1367,33 +1419,34 @@ class TestWithDask: return model def before_iteration( - self, model: xgb.Booster, epoch: int, evals_log: Dict + self, model: xgb.Booster, epoch: int, evals_log: Dict ) -> bool: assert xgb.config.get_config()[config_key] == config_value return False def after_iteration( - self, model: xgb.Booster, epoch: int, evals_log: Dict + self, model: xgb.Booster, epoch: int, evals_log: Dict ) -> bool: self.write_file(after_fname) assert xgb.config.get_config()[config_key] == config_value return False - xgb.dask.train(client, {}, dtrain, num_boost_round=4, callbacks=[TestCallback()])[ - 'booster'] + xgb.dask.train( + client, {}, dtrain, num_boost_round=4, callbacks=[TestCallback()] + )["booster"] - with open(before_fname, 'r') as before, open(after_fname, 'r') as after: + with open(before_fname, "r") as before, open(after_fname, "r") as after: assert before.read() == str(config_value) assert after.read() == str(config_value) os.remove(before_fname) os.remove(after_fname) - with dask.config.set({'xgboost.foo': "bar"}): + with dask.config.set({"xgboost.foo": "bar"}): with pytest.raises(ValueError, match=r"Unknown configuration.*"): xgb.dask.train(client, {}, dtrain, num_boost_round=4) - with dask.config.set({'xgboost.scheduler_address': "127.0.0.1:foo"}): + with dask.config.set({"xgboost.scheduler_address": "127.0.0.1:foo"}): with pytest.raises(socket.gaierror, match=r".*not known.*"): xgb.dask.train(client, {}, dtrain, num_boost_round=1) @@ -1403,9 +1456,9 @@ class TestWithDask: params: Dict, num_rounds: int, dataset: tm.TestDataset, - tree_method: str + tree_method: str, ) -> None: - params['tree_method'] = tree_method + params["tree_method"] = tree_method params = dataset.set_params(params) # It doesn't make sense to distribute a completely # empty dataset. @@ -1462,10 +1515,10 @@ class TestWithDask: deadline=None, max_examples=10, suppress_health_check=suppress, print_blob=True ) def test_hist( - self, params: Dict, dataset: tm.TestDataset, client: "Client" + self, params: Dict, dataset: tm.TestDataset, client: "Client" ) -> None: num_rounds = 10 - self.run_updater_test(client, params, num_rounds, dataset, 'hist') + self.run_updater_test(client, params, num_rounds, dataset, "hist") def test_quantile_dmatrix(self, client: Client) -> None: X, y = make_categorical(client, 10000, 30, 13) @@ -1478,7 +1531,7 @@ class TestWithDask: {"tree_method": "hist"}, Xy, num_boost_round=10, - evals=[(Xy, "Train"), (valid_Xy, "Valid")] + evals=[(Xy, "Train"), (valid_Xy, "Valid")], ) dmatrix_hist = output["history"] @@ -1492,7 +1545,7 @@ class TestWithDask: {"tree_method": "hist"}, Xy, num_boost_round=10, - evals=[(Xy, "Train"), (valid_Xy, "Valid")] + evals=[(Xy, "Train"), (valid_Xy, "Valid")], ) quantile_hist = output["history"] @@ -1532,7 +1585,7 @@ class TestWithDask: self, client: "Client", params: Dict, dataset: tm.TestDataset ) -> None: num_rounds = 10 - self.run_updater_test(client, params, num_rounds, dataset, 'approx') + self.run_updater_test(client, params, num_rounds, dataset, "approx") def test_adaptive(self) -> None: def get_score(config: Dict) -> float: @@ -1593,7 +1646,9 @@ class TestWithDask: dy = client.persist(dy, workers=workers[1]) valid = xgb.dask.DaskDMatrix(client, dX, dy) - merged = xgb.dask._get_workers_from_data(train, evals=[(valid, 'Valid')]) + merged = xgb.dask._get_workers_from_data( + train, evals=[(valid, "Valid")] + ) assert len(merged) == 2 @pytest.mark.skipif(**tm.no_dask()) @@ -1630,28 +1685,30 @@ class TestWithDask: @pytest.mark.skipif(**tm.no_sklearn()) def test_custom_objective(self, client: "Client") -> None: from sklearn.datasets import fetch_california_housing + X, y = fetch_california_housing(return_X_y=True) X, y = da.from_array(X), da.from_array(y) rounds = 20 with tempfile.TemporaryDirectory() as tmpdir: - path = os.path.join(tmpdir, 'log') + path = os.path.join(tmpdir, "log") def sqr( labels: np.ndarray, predts: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: - with open(path, 'a') as fd: - print('Running sqr', file=fd) + with open(path, "a") as fd: + print("Running sqr", file=fd) grad = predts - labels hess = np.ones(shape=labels.shape[0]) return grad, hess - reg = xgb.dask.DaskXGBRegressor(n_estimators=rounds, objective=sqr, - tree_method='hist') + reg = xgb.dask.DaskXGBRegressor( + n_estimators=rounds, objective=sqr, tree_method="hist" + ) reg.fit(X, y, eval_set=[(X, y)]) # Check the obj is ran for rounds. - with open(path, 'r') as fd: + with open(path, "r") as fd: out = fd.readlines() assert len(out) == rounds @@ -1670,10 +1727,10 @@ class TestWithDask: tm.non_increasing(results_native["validation_0"]["rmse"]) def test_no_duplicated_partition(self) -> None: - '''Assert each worker has the correct amount of data, and DMatrix initialization doesn't + """Assert each worker has the correct amount of data, and DMatrix initialization doesn't generate unnecessary copies of data. - ''' + """ with LocalCluster(n_workers=2, dashboard_address=":0") as cluster: with Client(cluster) as client: X, y, _ = generate_array() @@ -1698,9 +1755,12 @@ class TestWithDask: for i in range(len(workers)): futures.append( client.submit( - worker_fn, workers[i], - m._create_fn_args(workers[i]), pure=False, - workers=[workers[i]]) + worker_fn, + workers[i], + m._create_fn_args(workers[i]), + pure=False, + workers=[workers[i]], + ) ) client.gather(futures) @@ -1719,13 +1779,16 @@ class TestWithDask: def test_data_initialization(self, client: "Client") -> None: """assert that we don't create duplicated DMatrix""" from sklearn.datasets import load_digits + X, y = load_digits(return_X_y=True) X, y = dd.from_array(X, chunksize=32), dd.from_array(y, chunksize=32) validate_data_initialization( - xgb.dask.DaskDMatrix, xgb.dask.DaskXGBClassifier, X, y + xgb.dask.DaskQuantileDMatrix, xgb.dask.DaskXGBClassifier, X, y ) - def run_shap(self, X: Any, y: Any, params: Dict[str, Any], client: "Client") -> None: + def run_shap( + self, X: Any, y: Any, params: Dict[str, Any], client: "Client" + ) -> None: rows = X.shape[0] cols = X.shape[1] @@ -1739,12 +1802,14 @@ class TestWithDask: X, y = da.from_array(X, chunks=(32, -1)), da.from_array(y, chunks=32) Xy = xgb.dask.DaskDMatrix(client, X, y) - booster = xgb.dask.train(client, params, Xy, num_boost_round=10)['booster'] + booster = xgb.dask.train(client, params, Xy, num_boost_round=10)["booster"] test_Xy = xgb.dask.DaskDMatrix(client, X, y) shap = xgb.dask.predict(client, booster, test_Xy, pred_contribs=True).compute() - margin = xgb.dask.predict(client, booster, test_Xy, output_margin=True).compute() + margin = xgb.dask.predict( + client, booster, test_Xy, output_margin=True + ).compute() assert_shape(shap.shape) assert np.allclose(np.sum(shap, axis=len(shap.shape) - 1), margin, 1e-5, 1e-5) @@ -1774,7 +1839,9 @@ class TestWithDask: test_Xy = xgb.dask.DaskDMatrix(client, X, y) shap = xgb.dask.predict(client, booster, test_Xy, pred_contribs=True).compute() - margin = xgb.dask.predict(client, booster, test_Xy, output_margin=True).compute() + margin = xgb.dask.predict( + client, booster, test_Xy, output_margin=True + ).compute() assert np.allclose(np.sum(shap, axis=len(shap.shape) - 1), margin, 1e-5, 1e-5) shap = xgb.dask.predict(client, booster, X, pred_contribs=True).compute() @@ -1783,32 +1850,29 @@ class TestWithDask: def test_shap(self, client: "Client") -> None: from sklearn.datasets import load_diabetes, load_iris + X, y = load_diabetes(return_X_y=True) - params: Dict[str, Any] = {'objective': 'reg:squarederror'} + params: Dict[str, Any] = {"objective": "reg:squarederror"} self.run_shap(X, y, params, client) X, y = load_iris(return_X_y=True) - params = {'objective': 'multi:softmax', 'num_class': 3} + params = {"objective": "multi:softmax", "num_class": 3} self.run_shap(X, y, params, client) - params = {'objective': 'multi:softprob', 'num_class': 3} + params = {"objective": "multi:softprob", "num_class": 3} self.run_shap(X, y, params, client) self.run_shap_cls_sklearn(X, y, client) def run_shap_interactions( - self, - X: Any, - y: Any, - params: Dict[str, Any], - client: "Client" + self, X: Any, y: Any, params: Dict[str, Any], client: "Client" ) -> None: rows = X.shape[0] cols = X.shape[1] X, y = da.from_array(X, chunks=(32, -1)), da.from_array(y, chunks=32) Xy = xgb.dask.DaskDMatrix(client, X, y) - booster = xgb.dask.train(client, params, Xy, num_boost_round=10)['booster'] + booster = xgb.dask.train(client, params, Xy, num_boost_round=10)["booster"] test_Xy = xgb.dask.DaskDMatrix(client, X, y) @@ -1821,20 +1885,27 @@ class TestWithDask: assert shap.shape[1] == cols + 1 assert shap.shape[2] == cols + 1 - margin = xgb.dask.predict(client, booster, test_Xy, output_margin=True).compute() - assert np.allclose(np.sum(shap, axis=(len(shap.shape) - 1, len(shap.shape) - 2)), - margin, - 1e-5, 1e-5) + margin = xgb.dask.predict( + client, booster, test_Xy, output_margin=True + ).compute() + assert np.allclose( + np.sum(shap, axis=(len(shap.shape) - 1, len(shap.shape) - 2)), + margin, + 1e-5, + 1e-5, + ) def test_shap_interactions(self, client: "Client") -> None: from sklearn.datasets import load_diabetes + X, y = load_diabetes(return_X_y=True) - params = {'objective': 'reg:squarederror'} + params = {"objective": "reg:squarederror"} self.run_shap_interactions(X, y, params, client) @pytest.mark.skipif(**tm.no_sklearn()) - def test_sklearn_io(self, client: 'Client') -> None: + def test_sklearn_io(self, client: "Client") -> None: from sklearn.datasets import load_digits + X_, y_ = load_digits(return_X_y=True) X, y = da.from_array(X_), da.from_array(y_) cls = xgb.dask.DaskXGBClassifier(n_estimators=10) @@ -1852,7 +1923,7 @@ class TestWithDask: predt_1 = cls.predict(X) np.testing.assert_allclose(predt_0.compute(), predt_1.compute()) - path = os.path.join(tmpdir, 'cls.json') + path = os.path.join(tmpdir, "cls.json") cls.save_model(path) cls = xgb.dask.DaskXGBClassifier() @@ -1910,7 +1981,7 @@ def test_parallel_submits(client: "Client") -> None: assert cls.get_booster().num_boosted_rounds() == i + 1 -def run_tree_stats(client: Client, tree_method: str) -> str: +def run_tree_stats(client: Client, tree_method: str, device: str) -> str: """assert that different workers count dosn't affect summ statistic's on root""" def dask_train(X, y, num_obs, num_features): @@ -1924,6 +1995,7 @@ def run_tree_stats(client: Client, tree_method: str) -> str: { "verbosity": 0, "tree_method": tree_method, + "device": device, "objective": "reg:squarederror", "max_depth": 3, }, @@ -1957,10 +2029,10 @@ def run_tree_stats(client: Client, tree_method: str) -> str: def test_tree_stats(tree_method: str) -> None: with LocalCluster(n_workers=1, dashboard_address=":0") as cluster: with Client(cluster) as client: - local = run_tree_stats(client, tree_method) + local = run_tree_stats(client, tree_method, "cpu") with LocalCluster(n_workers=2, dashboard_address=":0") as cluster: with Client(cluster) as client: - distributed = run_tree_stats(client, tree_method) + distributed = run_tree_stats(client, tree_method, "cpu") assert local == distributed @@ -1999,6 +2071,7 @@ def test_parallel_submit_multi_clients() -> None: t_futures = [] with ThreadPoolExecutor(max_workers=16) as e: for i in range(n_submits): + def _() -> xgb.dask.DaskXGBClassifier: return futures[i][0].compute(futures[i][1]).result() @@ -2025,48 +2098,7 @@ class TestDaskCallbacks: @pytest.mark.skipif(**tm.no_sklearn()) def test_early_stopping(self, client: "Client") -> None: from sklearn.datasets import load_breast_cancer - X, y = load_breast_cancer(return_X_y=True) - X, y = da.from_array(X), da.from_array(y) - m = xgb.dask.DaskDMatrix(client, X, y) - valid = xgb.dask.DaskDMatrix(client, X, y) - early_stopping_rounds = 5 - booster = xgb.dask.train(client, {'objective': 'binary:logistic', - 'eval_metric': 'error', - 'tree_method': 'hist'}, m, - evals=[(valid, 'Valid')], - num_boost_round=1000, - early_stopping_rounds=early_stopping_rounds)['booster'] - assert hasattr(booster, 'best_score') - dump = booster.get_dump(dump_format='json') - assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 - - valid_X, valid_y = load_breast_cancer(return_X_y=True) - valid_X, valid_y = da.from_array(valid_X), da.from_array(valid_y) - cls = xgb.dask.DaskXGBClassifier(objective='binary:logistic', tree_method='hist', - n_estimators=1000) - cls.client = client - cls.fit(X, y, early_stopping_rounds=early_stopping_rounds, - eval_set=[(valid_X, valid_y)]) - booster = cls.get_booster() - dump = booster.get_dump(dump_format='json') - assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 - - # Specify the metric - cls = xgb.dask.DaskXGBClassifier(objective='binary:logistic', tree_method='hist', - n_estimators=1000) - cls.client = client - cls.fit(X, y, early_stopping_rounds=early_stopping_rounds, - eval_set=[(valid_X, valid_y)], eval_metric='error') - assert tm.non_increasing(cls.evals_result()['validation_0']['error']) - booster = cls.get_booster() - dump = booster.get_dump(dump_format='json') - assert len(cls.evals_result()['validation_0']['error']) < 20 - assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 - - @pytest.mark.skipif(**tm.no_sklearn()) - def test_early_stopping_custom_eval(self, client: "Client") -> None: - from sklearn.datasets import load_breast_cancer X, y = load_breast_cancer(return_X_y=True) X, y = da.from_array(X), da.from_array(y) m = xgb.dask.DaskDMatrix(client, X, y) @@ -2074,49 +2106,122 @@ class TestDaskCallbacks: valid = xgb.dask.DaskDMatrix(client, X, y) early_stopping_rounds = 5 booster = xgb.dask.train( - client, {'objective': 'binary:logistic', - 'eval_metric': 'error', - 'tree_method': 'hist'}, m, - evals=[(m, 'Train'), (valid, 'Valid')], - feval=tm.eval_error_metric, + client, + { + "objective": "binary:logistic", + "eval_metric": "error", + "tree_method": "hist", + }, + m, + evals=[(valid, "Valid")], num_boost_round=1000, - early_stopping_rounds=early_stopping_rounds)['booster'] - assert hasattr(booster, 'best_score') - dump = booster.get_dump(dump_format='json') + early_stopping_rounds=early_stopping_rounds, + )["booster"] + assert hasattr(booster, "best_score") + dump = booster.get_dump(dump_format="json") assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 valid_X, valid_y = load_breast_cancer(return_X_y=True) valid_X, valid_y = da.from_array(valid_X), da.from_array(valid_y) cls = xgb.dask.DaskXGBClassifier( - objective='binary:logistic', - tree_method='hist', - n_estimators=1000, - eval_metric=tm.eval_error_metric_skl + objective="binary:logistic", tree_method="hist", n_estimators=1000 ) cls.client = client cls.fit( - X, y, early_stopping_rounds=early_stopping_rounds, eval_set=[(valid_X, valid_y)] + X, + y, + early_stopping_rounds=early_stopping_rounds, + eval_set=[(valid_X, valid_y)], ) booster = cls.get_booster() - dump = booster.get_dump(dump_format='json') + dump = booster.get_dump(dump_format="json") + assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 + + # Specify the metric + cls = xgb.dask.DaskXGBClassifier( + objective="binary:logistic", tree_method="hist", n_estimators=1000 + ) + cls.client = client + cls.fit( + X, + y, + early_stopping_rounds=early_stopping_rounds, + eval_set=[(valid_X, valid_y)], + eval_metric="error", + ) + assert tm.non_increasing(cls.evals_result()["validation_0"]["error"]) + booster = cls.get_booster() + dump = booster.get_dump(dump_format="json") + assert len(cls.evals_result()["validation_0"]["error"]) < 20 + assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 + + @pytest.mark.skipif(**tm.no_sklearn()) + def test_early_stopping_custom_eval(self, client: "Client") -> None: + from sklearn.datasets import load_breast_cancer + + X, y = load_breast_cancer(return_X_y=True) + X, y = da.from_array(X), da.from_array(y) + m = xgb.dask.DaskDMatrix(client, X, y) + + valid = xgb.dask.DaskDMatrix(client, X, y) + early_stopping_rounds = 5 + booster = xgb.dask.train( + client, + { + "objective": "binary:logistic", + "eval_metric": "error", + "tree_method": "hist", + }, + m, + evals=[(m, "Train"), (valid, "Valid")], + feval=tm.eval_error_metric, + num_boost_round=1000, + early_stopping_rounds=early_stopping_rounds, + )["booster"] + assert hasattr(booster, "best_score") + dump = booster.get_dump(dump_format="json") + assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 + + valid_X, valid_y = load_breast_cancer(return_X_y=True) + valid_X, valid_y = da.from_array(valid_X), da.from_array(valid_y) + cls = xgb.dask.DaskXGBClassifier( + objective="binary:logistic", + tree_method="hist", + n_estimators=1000, + eval_metric=tm.eval_error_metric_skl, + ) + cls.client = client + cls.fit( + X, + y, + early_stopping_rounds=early_stopping_rounds, + eval_set=[(valid_X, valid_y)], + ) + booster = cls.get_booster() + dump = booster.get_dump(dump_format="json") assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 @pytest.mark.skipif(**tm.no_sklearn()) def test_callback(self, client: "Client") -> None: from sklearn.datasets import load_breast_cancer + X, y = load_breast_cancer(return_X_y=True) X, y = da.from_array(X), da.from_array(y) - cls = xgb.dask.DaskXGBClassifier(objective='binary:logistic', tree_method='hist', - n_estimators=10) + cls = xgb.dask.DaskXGBClassifier( + objective="binary:logistic", tree_method="hist", n_estimators=10 + ) cls.client = client with tempfile.TemporaryDirectory() as tmpdir: - cls.fit(X, y, callbacks=[xgb.callback.TrainingCheckPoint( - directory=Path(tmpdir), - iterations=1, - name='model' - )]) + cls.fit( + X, + y, + callbacks=[ + xgb.callback.TrainingCheckPoint( + directory=Path(tmpdir), iterations=1, name="model" + ) + ], + ) for i in range(1, 10): - assert os.path.exists( - os.path.join(tmpdir, 'model_' + str(i) + '.json')) + assert os.path.exists(os.path.join(tmpdir, "model_" + str(i) + ".json")) diff --git a/tests/test_distributed/test_with_spark/test_spark_local.py b/tests/test_distributed/test_with_spark/test_spark_local.py index dfdadb2ef..124f36d02 100644 --- a/tests/test_distributed/test_with_spark/test_spark_local.py +++ b/tests/test_distributed/test_with_spark/test_spark_local.py @@ -1120,7 +1120,9 @@ class XgboostLocalTest(SparkTestCase): reg1 = SparkXGBRegressor(**self.reg_params) model = reg1.fit(self.reg_df_train) init_booster = model.get_booster() - reg2 = SparkXGBRegressor(max_depth=2, n_estimators=2, xgb_model=init_booster) + reg2 = SparkXGBRegressor( + max_depth=2, n_estimators=2, xgb_model=init_booster, max_bin=21 + ) model21 = reg2.fit(self.reg_df_train) pred_res21 = model21.transform(self.reg_df_test).collect() reg2.save(path)