* Handles missing value. * Accept all floating point and integer types. * Move to cudf 9.0 API. * Remove requirement on `null_count`. * Arbitrary column types support.
94 lines
2.7 KiB
Python
94 lines
2.7 KiB
Python
import testing as tm
|
|
import pytest
|
|
import xgboost as xgb
|
|
import numpy as np
|
|
import sys
|
|
|
|
if sys.platform.startswith("win"):
|
|
pytest.skip("Skipping dask tests on Windows", allow_module_level=True)
|
|
|
|
try:
|
|
from distributed.utils_test import client, loop, cluster_fixture
|
|
import dask.dataframe as dd
|
|
import dask.array as da
|
|
except ImportError:
|
|
pass
|
|
|
|
pytestmark = pytest.mark.skipif(**tm.no_dask())
|
|
|
|
|
|
def run_train():
|
|
# Contains one label equal to rank
|
|
dmat = xgb.DMatrix(np.array([[0]]), label=[xgb.rabit.get_rank()])
|
|
bst = xgb.train({"eta": 1.0, "lambda": 0.0}, dmat, 1)
|
|
pred = bst.predict(dmat)
|
|
expected_result = np.average(range(xgb.rabit.get_world_size()))
|
|
assert all(p == expected_result for p in pred)
|
|
|
|
|
|
def test_train(client):
|
|
# Train two workers, the first has label 0, the second has label 1
|
|
# If they build the model together the output should be 0.5
|
|
xgb.dask.run(client, run_train)
|
|
# Run again to check we can have multiple sessions
|
|
xgb.dask.run(client, run_train)
|
|
|
|
|
|
def run_create_dmatrix(X, y, weights):
|
|
dmat = xgb.dask.create_worker_dmatrix(X, y, weight=weights)
|
|
# Expect this worker to get two partitions and concatenate them
|
|
assert dmat.num_row() == 50
|
|
|
|
|
|
def test_dask_dataframe(client):
|
|
n = 10
|
|
m = 100
|
|
partition_size = 25
|
|
X = dd.from_array(np.random.random((m, n)), partition_size)
|
|
y = dd.from_array(np.random.random(m), partition_size)
|
|
weights = dd.from_array(np.random.random(m), partition_size)
|
|
xgb.dask.run(client, run_create_dmatrix, X, y, weights)
|
|
|
|
|
|
def test_dask_array(client):
|
|
n = 10
|
|
m = 100
|
|
partition_size = 25
|
|
X = da.random.random((m, n), partition_size)
|
|
y = da.random.random(m, partition_size)
|
|
weights = da.random.random(m, partition_size)
|
|
xgb.dask.run(client, run_create_dmatrix, X, y, weights)
|
|
|
|
|
|
def run_get_local_data(X, y):
|
|
X_local = xgb.dask.get_local_data(X)
|
|
y_local = xgb.dask.get_local_data(y)
|
|
assert (X_local.shape == (50, 10))
|
|
assert (y_local.shape == (50,))
|
|
|
|
|
|
def test_get_local_data(client):
|
|
n = 10
|
|
m = 100
|
|
partition_size = 25
|
|
X = da.random.random((m, n), partition_size)
|
|
y = da.random.random(m, partition_size)
|
|
xgb.dask.run(client, run_get_local_data, X, y)
|
|
|
|
|
|
def run_sklearn():
|
|
# Contains one label equal to rank
|
|
X = np.array([[0]])
|
|
y = [xgb.rabit.get_rank()]
|
|
model = xgb.XGBRegressor(learning_rate=1.0)
|
|
model.fit(X, y)
|
|
pred = model.predict(X)
|
|
expected_result = np.average(range(xgb.rabit.get_world_size()))
|
|
assert all(p == expected_result for p in pred)
|
|
return pred
|
|
|
|
|
|
def test_sklearn(client):
|
|
result = xgb.dask.run(client, run_sklearn)
|
|
print(result)
|