xgboost/tests/python/test_with_dask.py
Rory Mitchell 09b90d9329
Add native support for Dask (#4473)
* Add native support for Dask

* Add multi-GPU demo

* Add sklearn example
2019-05-27 13:29:28 +12:00

94 lines
2.7 KiB
Python

import testing as tm
import pytest
import xgboost as xgb
import numpy as np
import sys
if sys.platform.startswith("win"):
pytest.skip("Skipping dask tests on Windows", allow_module_level=True)
try:
from distributed.utils_test import client, loop, cluster_fixture
import dask.dataframe as dd
import dask.array as da
except ImportError:
pass
pytestmark = pytest.mark.skipif(**tm.no_dask())
def run_train():
# Contains one label equal to rank
dmat = xgb.DMatrix([[0]], label=[xgb.rabit.get_rank()])
bst = xgb.train({"eta": 1.0, "lambda": 0.0}, dmat, 1)
pred = bst.predict(dmat)
expected_result = np.average(range(xgb.rabit.get_world_size()))
assert all(p == expected_result for p in pred)
def test_train(client):
# Train two workers, the first has label 0, the second has label 1
# If they build the model together the output should be 0.5
xgb.dask.run(client, run_train)
# Run again to check we can have multiple sessions
xgb.dask.run(client, run_train)
def run_create_dmatrix(X, y, weights):
dmat = xgb.dask.create_worker_dmatrix(X, y, weight=weights)
# Expect this worker to get two partitions and concatenate them
assert dmat.num_row() == 50
def test_dask_dataframe(client):
n = 10
m = 100
partition_size = 25
X = dd.from_array(np.random.random((m, n)), partition_size)
y = dd.from_array(np.random.random(m), partition_size)
weights = dd.from_array(np.random.random(m), partition_size)
xgb.dask.run(client, run_create_dmatrix, X, y, weights)
def test_dask_array(client):
n = 10
m = 100
partition_size = 25
X = da.random.random((m, n), partition_size)
y = da.random.random(m, partition_size)
weights = da.random.random(m, partition_size)
xgb.dask.run(client, run_create_dmatrix, X, y, weights)
def run_get_local_data(X, y):
X_local = xgb.dask.get_local_data(X)
y_local = xgb.dask.get_local_data(y)
assert (X_local.shape == (50, 10))
assert (y_local.shape == (50,))
def test_get_local_data(client):
n = 10
m = 100
partition_size = 25
X = da.random.random((m, n), partition_size)
y = da.random.random(m, partition_size)
xgb.dask.run(client, run_get_local_data, X, y)
def run_sklearn():
# Contains one label equal to rank
X = [[0]]
y = [xgb.rabit.get_rank()]
model = xgb.XGBRegressor(learning_rate=1.0)
model.fit(X, y)
pred = model.predict(X)
expected_result = np.average(range(xgb.rabit.get_world_size()))
assert all(p == expected_result for p in pred)
return pred
def test_sklearn(client):
result = xgb.dask.run(client, run_sklearn)
print(result)