From ed0216642f23047830ba32455126ba936b7fb80b Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 3 Feb 2020 12:39:20 +0800 Subject: [PATCH] Avoid dask test fixtures. (#5270) * Fix Travis OSX timeout. * Fix classifier. --- tests/python/test_with_dask.py | 169 ++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 77 deletions(-) diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index f7dc7583d..18884281d 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -10,18 +10,20 @@ if sys.platform.startswith("win"): pytestmark = pytest.mark.skipif(**tm.no_dask()) try: - from distributed.utils_test import client, loop, cluster_fixture + from distributed import LocalCluster, Client import dask.dataframe as dd import dask.array as da from xgboost.dask import DaskDMatrix except ImportError: - client = None - loop = None - cluster_fixture = None - pass + LocalCluster = None + Client = None + dd = None + da = None + DaskDMatrix = None kRows = 1000 kCols = 10 +kWorkers = 5 def generate_array(): @@ -31,97 +33,106 @@ def generate_array(): return X, y -def test_from_dask_dataframe(client): - X, y = generate_array() +def test_from_dask_dataframe(): + with LocalCluster(n_workers=5) as cluster: + with Client(cluster) as client: + X, y = generate_array() - X = dd.from_dask_array(X) - y = dd.from_dask_array(y) + X = dd.from_dask_array(X) + y = dd.from_dask_array(y) - dtrain = DaskDMatrix(client, X, y) - booster = xgb.dask.train( - client, {}, dtrain, num_boost_round=2)['booster'] + dtrain = DaskDMatrix(client, X, y) + booster = xgb.dask.train( + client, {}, dtrain, num_boost_round=2)['booster'] - prediction = xgb.dask.predict(client, model=booster, data=dtrain) + prediction = xgb.dask.predict(client, model=booster, data=dtrain) - assert prediction.ndim == 1 - assert isinstance(prediction, da.Array) - assert prediction.shape[0] == kRows + assert prediction.ndim == 1 + assert isinstance(prediction, da.Array) + assert prediction.shape[0] == kRows - with pytest.raises(ValueError): - # evals_result is not supported in dask interface. - xgb.dask.train( - client, {}, dtrain, num_boost_round=2, evals_result={}) - - prediction = prediction.compute() # force prediction to be computed + with pytest.raises(ValueError): + # evals_result is not supported in dask interface. + xgb.dask.train( + client, {}, dtrain, num_boost_round=2, evals_result={}) + # force prediction to be computed + prediction = prediction.compute() -def test_from_dask_array(client): - X, y = generate_array() - dtrain = DaskDMatrix(client, X, y) - # results is {'booster': Booster, 'history': {...}} - result = xgb.dask.train(client, {}, dtrain) +def test_from_dask_array(): + with LocalCluster(n_workers=5) as cluster: + with Client(cluster) as client: + X, y = generate_array() + dtrain = DaskDMatrix(client, X, y) + # results is {'booster': Booster, 'history': {...}} + result = xgb.dask.train(client, {}, dtrain) - prediction = xgb.dask.predict(client, result, dtrain) - assert prediction.shape[0] == kRows + prediction = xgb.dask.predict(client, result, dtrain) + assert prediction.shape[0] == kRows - assert isinstance(prediction, da.Array) - - prediction = prediction.compute() # force prediction to be computed + assert isinstance(prediction, da.Array) + # force prediction to be computed + prediction = prediction.compute() -def test_regressor(client): - X, y = generate_array() - regressor = xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2) - regressor.set_params(tree_method='hist') - regressor.client = client - regressor.fit(X, y, eval_set=[(X, y)]) - prediction = regressor.predict(X) +def test_dask_regressor(): + with LocalCluster(n_workers=5) as cluster: + with Client(cluster) as client: + X, y = generate_array() + regressor = xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2) + regressor.set_params(tree_method='hist') + regressor.client = client + regressor.fit(X, y, eval_set=[(X, y)]) + prediction = regressor.predict(X) - assert prediction.ndim == 1 - assert prediction.shape[0] == kRows + assert prediction.ndim == 1 + assert prediction.shape[0] == kRows - history = regressor.evals_result() + history = regressor.evals_result() - assert isinstance(prediction, da.Array) - assert isinstance(history, dict) + assert isinstance(prediction, da.Array) + assert isinstance(history, dict) - assert list(history['validation_0'].keys())[0] == 'rmse' - assert len(history['validation_0']['rmse']) == 2 + assert list(history['validation_0'].keys())[0] == 'rmse' + assert len(history['validation_0']['rmse']) == 2 -def test_classifier(client): - X, y = generate_array() - y = (y * 10).astype(np.int32) - classifier = xgb.dask.DaskXGBClassifier(verbosity=1, n_estimators=2) - classifier.client = client - classifier.fit(X, y, eval_set=[(X, y)]) - prediction = classifier.predict(X) +def test_dask_classifier(): + with LocalCluster(n_workers=5) as cluster: + with Client(cluster) as client: + X, y = generate_array() + y = (y * 10).astype(np.int32) + classifier = xgb.dask.DaskXGBClassifier( + verbosity=1, n_estimators=2) + classifier.client = client + classifier.fit(X, y, eval_set=[(X, y)]) + prediction = classifier.predict(X) - assert prediction.ndim == 1 - assert prediction.shape[0] == kRows + assert prediction.ndim == 1 + assert prediction.shape[0] == kRows - history = classifier.evals_result() + history = classifier.evals_result() - assert isinstance(prediction, da.Array) - assert isinstance(history, dict) + assert isinstance(prediction, da.Array) + assert isinstance(history, dict) - assert list(history.keys())[0] == 'validation_0' - assert list(history['validation_0'].keys())[0] == 'merror' - assert len(list(history['validation_0'])) == 1 - assert len(history['validation_0']['merror']) == 2 + assert list(history.keys())[0] == 'validation_0' + assert list(history['validation_0'].keys())[0] == 'merror' + assert len(list(history['validation_0'])) == 1 + assert len(history['validation_0']['merror']) == 2 - assert classifier.n_classes_ == 10 + assert classifier.n_classes_ == 10 - # Test with dataframe. - X_d = dd.from_dask_array(X) - y_d = dd.from_dask_array(y) - classifier.fit(X_d, y_d) + # Test with dataframe. + X_d = dd.from_dask_array(X) + y_d = dd.from_dask_array(y) + classifier.fit(X_d, y_d) - assert classifier.n_classes_ == 10 - prediction = classifier.predict(X_d) + assert classifier.n_classes_ == 10 + prediction = classifier.predict(X_d) - assert prediction.ndim == 1 - assert prediction.shape[0] == kRows + assert prediction.ndim == 1 + assert prediction.shape[0] == kRows def run_empty_dmatrix(client, parameters): @@ -164,11 +175,15 @@ def run_empty_dmatrix(client, parameters): # No test for Exact, as empty DMatrix handling are mostly for distributed # environment and Exact doesn't support it. -def test_empty_dmatrix_hist(client): - parameters = {'tree_method': 'hist'} - run_empty_dmatrix(client, parameters) +def test_empty_dmatrix_hist(): + with LocalCluster(n_workers=5) as cluster: + with Client(cluster) as client: + parameters = {'tree_method': 'hist'} + run_empty_dmatrix(client, parameters) -def test_empty_dmatrix_approx(client): - parameters = {'tree_method': 'approx'} - run_empty_dmatrix(client, parameters) +def test_empty_dmatrix_approx(): + with LocalCluster(n_workers=5) as cluster: + with Client(cluster) as client: + parameters = {'tree_method': 'approx'} + run_empty_dmatrix(client, parameters)