Dask device dmatrix (#5901)

* Fix softprob with empty dmatrix.
This commit is contained in:
Jiaming Yuan
2020-07-17 13:17:43 +08:00
committed by GitHub
parent e471056ec4
commit 7c2686146e
12 changed files with 392 additions and 149 deletions

View File

@@ -6,13 +6,15 @@ import unittest
import xgboost
import subprocess
from hypothesis import given, strategies, settings, note
from hypothesis._settings import duration
from test_gpu_updaters import parameter_strategy
if sys.platform.startswith("win"):
pytest.skip("Skipping dask tests on Windows", allow_module_level=True)
sys.path.append("tests/python")
from test_with_dask import run_empty_dmatrix # noqa
from test_with_dask import run_empty_dmatrix_reg # noqa
from test_with_dask import run_empty_dmatrix_cls # noqa
from test_with_dask import generate_array # noqa
import testing as tm # noqa
@@ -28,6 +30,126 @@ except ImportError:
pass
def run_with_dask_dataframe(DMatrixT, client):
import cupy as cp
cp.cuda.runtime.setDevice(0)
X, y = generate_array()
X = dd.from_dask_array(X)
y = dd.from_dask_array(y)
X = X.map_partitions(cudf.from_pandas)
y = y.map_partitions(cudf.from_pandas)
dtrain = DMatrixT(client, X, y)
out = dxgb.train(client, {'tree_method': 'gpu_hist',
'debug_synchronize': True},
dtrain=dtrain,
evals=[(dtrain, 'X')],
num_boost_round=4)
assert isinstance(out['booster'], dxgb.Booster)
assert len(out['history']['X']['rmse']) == 4
predictions = dxgb.predict(client, out, dtrain).compute()
assert isinstance(predictions, np.ndarray)
series_predictions = dxgb.inplace_predict(client, out, X)
assert isinstance(series_predictions, dd.Series)
series_predictions = series_predictions.compute()
single_node = out['booster'].predict(
xgboost.DMatrix(X.compute()))
cp.testing.assert_allclose(single_node, predictions)
np.testing.assert_allclose(single_node,
series_predictions.to_array())
predt = dxgb.predict(client, out, X)
assert isinstance(predt, dd.Series)
def is_df(part):
assert isinstance(part, cudf.DataFrame), part
return part
predt.map_partitions(
is_df,
meta=dd.utils.make_meta({'prediction': 'f4'}))
cp.testing.assert_allclose(
predt.values.compute(), single_node)
def run_with_dask_array(DMatrixT, client):
import cupy as cp
cp.cuda.runtime.setDevice(0)
X, y = generate_array()
X = X.map_blocks(cp.asarray)
y = y.map_blocks(cp.asarray)
dtrain = DMatrixT(client, X, y)
out = dxgb.train(client, {'tree_method': 'gpu_hist',
'debug_synchronize': True},
dtrain=dtrain,
evals=[(dtrain, 'X')],
num_boost_round=2)
from_dmatrix = dxgb.predict(client, out, dtrain).compute()
inplace_predictions = dxgb.inplace_predict(
client, out, X).compute()
single_node = out['booster'].predict(
xgboost.DMatrix(X.compute()))
np.testing.assert_allclose(single_node, from_dmatrix)
device = cp.cuda.runtime.getDevice()
assert device == inplace_predictions.device.id
single_node = cp.array(single_node)
assert device == single_node.device.id
cp.testing.assert_allclose(
single_node,
inplace_predictions)
def to_cp(x, DMatrixT):
import cupy
if isinstance(x, np.ndarray) and \
DMatrixT is dxgb.DaskDeviceQuantileDMatrix:
X = cupy.array(x)
else:
X = x
return X
def run_gpu_hist(params, num_rounds, dataset, DMatrixT, client):
params['tree_method'] = 'gpu_hist'
params = dataset.set_params(params)
# It doesn't make sense to distribute a completely
# empty dataset.
if dataset.X.shape[0] == 0:
return
chunk = 128
X = to_cp(dataset.X, DMatrixT)
X = da.from_array(X,
chunks=(chunk, dataset.X.shape[1]))
y = to_cp(dataset.y, DMatrixT)
y = da.from_array(y, chunks=(chunk, ))
if dataset.w is not None:
w = to_cp(dataset.w, DMatrixT)
w = da.from_array(w, chunks=(chunk, ))
else:
w = None
if DMatrixT is dxgb.DaskDeviceQuantileDMatrix:
m = DMatrixT(client, data=X, label=y, weight=w,
max_bin=params.get('max_bin', 256))
else:
m = DMatrixT(client, data=X, label=y, weight=w)
history = dxgb.train(client, params=params, dtrain=m,
num_boost_round=num_rounds,
evals=[(m, 'train')])['history']
note(history)
assert tm.non_increasing(history['train'][dataset.metric])
class TestDistributedGPU(unittest.TestCase):
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_cudf())
@@ -37,119 +159,28 @@ class TestDistributedGPU(unittest.TestCase):
def test_dask_dataframe(self):
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
import cupy as cp
cp.cuda.runtime.setDevice(0)
X, y = generate_array()
X = dd.from_dask_array(X)
y = dd.from_dask_array(y)
X = X.map_partitions(cudf.from_pandas)
y = y.map_partitions(cudf.from_pandas)
dtrain = dxgb.DaskDMatrix(client, X, y)
out = dxgb.train(client, {'tree_method': 'gpu_hist',
'debug_synchronize': True},
dtrain=dtrain,
evals=[(dtrain, 'X')],
num_boost_round=4)
assert isinstance(out['booster'], dxgb.Booster)
assert len(out['history']['X']['rmse']) == 4
predictions = dxgb.predict(client, out, dtrain).compute()
assert isinstance(predictions, np.ndarray)
series_predictions = dxgb.inplace_predict(client, out, X)
assert isinstance(series_predictions, dd.Series)
series_predictions = series_predictions.compute()
single_node = out['booster'].predict(
xgboost.DMatrix(X.compute()))
cp.testing.assert_allclose(single_node, predictions)
np.testing.assert_allclose(single_node,
series_predictions.to_array())
predt = dxgb.predict(client, out, X)
assert isinstance(predt, dd.Series)
def is_df(part):
assert isinstance(part, cudf.DataFrame), part
return part
predt.map_partitions(
is_df,
meta=dd.utils.make_meta({'prediction': 'f4'}))
cp.testing.assert_allclose(
predt.values.compute(), single_node)
run_with_dask_dataframe(dxgb.DaskDMatrix, client)
run_with_dask_dataframe(dxgb.DaskDeviceQuantileDMatrix, client)
@given(parameter_strategy, strategies.integers(1, 20),
tm.dataset_strategy)
@settings(deadline=None)
@settings(deadline=duration(seconds=120))
@pytest.mark.mgpu
def test_gpu_hist(self, params, num_rounds, dataset):
with LocalCUDACluster(n_workers=2) as cluster:
with Client(cluster) as client:
params['tree_method'] = 'gpu_hist'
params = dataset.set_params(params)
# multi class doesn't handle empty dataset well (empty
# means at least 1 worker has data).
if params['objective'] == "multi:softmax":
return
# It doesn't make sense to distribute a completely
# empty dataset.
if dataset.X.shape[0] == 0:
return
chunk = 128
X = da.from_array(dataset.X,
chunks=(chunk, dataset.X.shape[1]))
y = da.from_array(dataset.y, chunks=(chunk, ))
if dataset.w is not None:
w = da.from_array(dataset.w, chunks=(chunk, ))
else:
w = None
m = dxgb.DaskDMatrix(
client, data=X, label=y, weight=w)
history = dxgb.train(client, params=params, dtrain=m,
num_boost_round=num_rounds,
evals=[(m, 'train')])['history']
note(history)
assert tm.non_increasing(history['train'][dataset.metric])
run_gpu_hist(params, num_rounds, dataset, dxgb.DaskDMatrix,
client)
run_gpu_hist(params, num_rounds, dataset,
dxgb.DaskDeviceQuantileDMatrix, client)
@pytest.mark.skipif(**tm.no_cupy())
@pytest.mark.mgpu
def test_dask_array(self):
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
import cupy as cp
cp.cuda.runtime.setDevice(0)
X, y = generate_array()
X = X.map_blocks(cp.asarray)
y = y.map_blocks(cp.asarray)
dtrain = dxgb.DaskDMatrix(client, X, y)
out = dxgb.train(client, {'tree_method': 'gpu_hist',
'debug_synchronize': True},
dtrain=dtrain,
evals=[(dtrain, 'X')],
num_boost_round=2)
from_dmatrix = dxgb.predict(client, out, dtrain).compute()
inplace_predictions = dxgb.inplace_predict(
client, out, X).compute()
single_node = out['booster'].predict(
xgboost.DMatrix(X.compute()))
np.testing.assert_allclose(single_node, from_dmatrix)
device = cp.cuda.runtime.getDevice()
assert device == inplace_predictions.device.id
single_node = cp.array(single_node)
assert device == single_node.device.id
cp.testing.assert_allclose(
single_node,
inplace_predictions)
run_with_dask_array(dxgb.DaskDMatrix, client)
run_with_dask_array(dxgb.DaskDeviceQuantileDMatrix, client)
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@@ -159,7 +190,8 @@ class TestDistributedGPU(unittest.TestCase):
with Client(cluster) as client:
parameters = {'tree_method': 'gpu_hist',
'debug_synchronize': True}
run_empty_dmatrix(client, parameters)
run_empty_dmatrix_reg(client, parameters)
run_empty_dmatrix_cls(client, parameters)
def run_quantile(self, name):
if sys.platform.startswith("win"):

View File

@@ -128,8 +128,7 @@ def test_dask_missing_value_reg():
def test_dask_missing_value_cls():
# Multi-class doesn't handle empty DMatrix well. So we use lesser workers.
with LocalCluster(n_workers=2) as cluster:
with LocalCluster() as cluster:
with Client(cluster) as client:
X_0 = np.ones((kRows // 2, kCols))
X_1 = np.zeros((kRows // 2, kCols))
@@ -234,7 +233,7 @@ def test_sklearn_grid_search():
assert len(means) == len(set(means))
def run_empty_dmatrix(client, parameters):
def run_empty_dmatrix_reg(client, parameters):
def _check_outputs(out, predictions):
assert isinstance(out['booster'], xgb.dask.Booster)
@@ -271,6 +270,46 @@ def run_empty_dmatrix(client, parameters):
_check_outputs(out, predictions)
def run_empty_dmatrix_cls(client, parameters):
n_classes = 4
def _check_outputs(out, predictions):
assert isinstance(out['booster'], xgb.dask.Booster)
assert len(out['history']['validation']['merror']) == 2
assert isinstance(predictions, np.ndarray)
assert predictions.shape[1] == n_classes, predictions.shape
kRows, kCols = 1, 97
X = dd.from_array(np.random.randn(kRows, kCols))
y = dd.from_array(np.random.randint(low=0, high=n_classes, size=kRows))
dtrain = xgb.dask.DaskDMatrix(client, X, y)
parameters['objective'] = 'multi:softprob'
parameters['num_class'] = n_classes
out = xgb.dask.train(client, parameters,
dtrain=dtrain,
evals=[(dtrain, 'validation')],
num_boost_round=2)
predictions = xgb.dask.predict(client=client, model=out,
data=dtrain).compute()
_check_outputs(out, predictions)
# train has more rows than evals
valid = dtrain
kRows += 1
X = dd.from_array(np.random.randn(kRows, kCols))
y = dd.from_array(np.random.randint(low=0, high=n_classes, size=kRows))
dtrain = xgb.dask.DaskDMatrix(client, X, y)
out = xgb.dask.train(client, parameters,
dtrain=dtrain,
evals=[(valid, 'validation')],
num_boost_round=2)
predictions = xgb.dask.predict(client=client, model=out,
data=valid).compute()
_check_outputs(out, predictions)
# No test for Exact, as empty DMatrix handling are mostly for distributed
# environment and Exact doesn't support it.
@@ -278,11 +317,13 @@ def test_empty_dmatrix_hist():
with LocalCluster(n_workers=5) as cluster:
with Client(cluster) as client:
parameters = {'tree_method': 'hist'}
run_empty_dmatrix(client, parameters)
run_empty_dmatrix_reg(client, parameters)
run_empty_dmatrix_cls(client, parameters)
def test_empty_dmatrix_approx():
with LocalCluster(n_workers=5) as cluster:
with Client(cluster) as client:
parameters = {'tree_method': 'approx'}
run_empty_dmatrix(client, parameters)
run_empty_dmatrix_reg(client, parameters)
run_empty_dmatrix_cls(client, parameters)