Reduce time for some multi-gpu tests (#8288)

* Faster dask tests

* Reuse AllReducer objects in tests.

* Faster boost from prediction tests.

* Use rmm dask fixture.

* Speed up dask demo.

* mypy

* Format with black.

* mypy

* Clang-tidy

Co-authored-by: Hyunsu Philip Cho <chohyu01@cs.washington.edu>
This commit is contained in:
Rory Mitchell 2022-10-04 12:49:33 +02:00 committed by GitHub
parent ca0547bb65
commit d686bf52a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 337 additions and 336 deletions

View File

@ -4,13 +4,12 @@ Example of training with Dask on GPU
""" """
from dask_cuda import LocalCUDACluster from dask_cuda import LocalCUDACluster
import dask_cudf import dask_cudf
from dask.distributed import Client, wait from dask.distributed import Client
from dask import array as da from dask import array as da
from dask import dataframe as dd from dask import dataframe as dd
import xgboost as xgb import xgboost as xgb
from xgboost import dask as dxgb from xgboost import dask as dxgb
from xgboost.dask import DaskDMatrix from xgboost.dask import DaskDMatrix
import argparse
def using_dask_matrix(client: Client, X, y): def using_dask_matrix(client: Client, X, y):
@ -51,7 +50,7 @@ def using_quantile_device_dmatrix(client: Client, X, y):
# `DaskDeviceQuantileDMatrix` is used instead of `DaskDMatrix`, be careful # `DaskDeviceQuantileDMatrix` is used instead of `DaskDMatrix`, be careful
# that it can not be used for anything else other than training. # that it can not be used for anything else other than training.
dtrain = dxgb.DaskDeviceQuantileDMatrix(client, X, y) dtrain = dxgb.DaskQuantileDMatrix(client, X, y)
output = xgb.dask.train(client, output = xgb.dask.train(client,
{'verbosity': 2, {'verbosity': 2,
'tree_method': 'gpu_hist'}, 'tree_method': 'gpu_hist'},
@ -63,12 +62,6 @@ def using_quantile_device_dmatrix(client: Client, X, y):
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--ddqdm', choices=[0, 1], type=int, default=1,
help='''Whether should we use `DaskDeviceQuantileDMatrix`''')
args = parser.parse_args()
# `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here # `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here
# `n_workers` represents the number of GPUs since we use one GPU per worker # `n_workers` represents the number of GPUs since we use one GPU per worker
# process. # process.
@ -77,12 +70,10 @@ if __name__ == '__main__':
# generate some random data for demonstration # generate some random data for demonstration
m = 100000 m = 100000
n = 100 n = 100
X = da.random.random(size=(m, n), chunks=100) X = da.random.random(size=(m, n), chunks=10000)
y = da.random.random(size=(m, ), chunks=100) y = da.random.random(size=(m, ), chunks=10000)
if args.ddqdm == 1: print('Using DaskQuantileDMatrix')
print('Using DaskDeviceQuantileDMatrix')
from_ddqdm = using_quantile_device_dmatrix(client, X, y) from_ddqdm = using_quantile_device_dmatrix(client, X, y)
else:
print('Using DMatrix') print('Using DMatrix')
from_dmatrix = using_dask_matrix(client, X, y) from_dmatrix = using_dask_matrix(client, X, y)

View File

@ -508,7 +508,7 @@ void SketchContainer::AllReduce() {
timer_.Start(__func__); timer_.Start(__func__);
if (!reducer_) { if (!reducer_) {
reducer_ = std::make_unique<dh::AllReducer>(); reducer_ = std::make_shared<dh::AllReducer>();
reducer_->Init(device_); reducer_->Init(device_);
} }
// Reduce the overhead on syncing. // Reduce the overhead on syncing.
@ -518,6 +518,7 @@ void SketchContainer::AllReduce() {
std::min(global_sum_rows, static_cast<size_t>(num_bins_ * kFactor)); std::min(global_sum_rows, static_cast<size_t>(num_bins_ * kFactor));
this->Prune(intermediate_num_cuts); this->Prune(intermediate_num_cuts);
auto d_columns_ptr = this->columns_ptr_.ConstDeviceSpan(); auto d_columns_ptr = this->columns_ptr_.ConstDeviceSpan();
CHECK_EQ(d_columns_ptr.size(), num_columns_ + 1); CHECK_EQ(d_columns_ptr.size(), num_columns_ + 1);
size_t n = d_columns_ptr.size(); size_t n = d_columns_ptr.size();

View File

@ -37,7 +37,7 @@ class SketchContainer {
private: private:
Monitor timer_; Monitor timer_;
std::unique_ptr<dh::AllReducer> reducer_; std::shared_ptr<dh::AllReducer> reducer_;
HostDeviceVector<FeatureType> feature_types_; HostDeviceVector<FeatureType> feature_types_;
bst_row_t num_rows_; bst_row_t num_rows_;
bst_feature_t num_columns_; bst_feature_t num_columns_;
@ -93,13 +93,15 @@ class SketchContainer {
* \param num_columns Total number of columns in dataset. * \param num_columns Total number of columns in dataset.
* \param num_rows Total number of rows in known dataset (typically the rows in current worker). * \param num_rows Total number of rows in known dataset (typically the rows in current worker).
* \param device GPU ID. * \param device GPU ID.
* \param reducer Optional initialised reducer. Useful for speeding up testing.
*/ */
SketchContainer(HostDeviceVector<FeatureType> const &feature_types, SketchContainer(HostDeviceVector<FeatureType> const &feature_types,
int32_t max_bin, int32_t max_bin, bst_feature_t num_columns,
bst_feature_t num_columns, bst_row_t num_rows, bst_row_t num_rows, int32_t device,
int32_t device) std::shared_ptr<dh::AllReducer> reducer = nullptr)
: num_rows_{num_rows}, : num_rows_{num_rows},
num_columns_{num_columns}, num_bins_{max_bin}, device_{device} { num_columns_{num_columns}, num_bins_{max_bin}, device_{device},
reducer_(std::move(reducer)) {
CHECK_GE(device, 0); CHECK_GE(device, 0);
// Initialize Sketches for this dmatrix // Initialize Sketches for this dmatrix
this->columns_ptr_.SetDevice(device_); this->columns_ptr_.SetDevice(device_);

View File

@ -349,6 +349,9 @@ TEST(GPUQuantile, AllReduceBasic) {
return; return;
} }
auto reducer = std::make_shared<dh::AllReducer>();
reducer->Init(0);
constexpr size_t kRows = 1000, kCols = 100; constexpr size_t kRows = 1000, kCols = 100;
RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) { RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, MetaInfo const& info) {
// Set up single node version; // Set up single node version;
@ -378,12 +381,12 @@ TEST(GPUQuantile, AllReduceBasic) {
} }
sketch_on_single_node.Unique(); sketch_on_single_node.Unique();
TestQuantileElemRank(0, sketch_on_single_node.Data(), TestQuantileElemRank(0, sketch_on_single_node.Data(),
sketch_on_single_node.ColumnsPtr()); sketch_on_single_node.ColumnsPtr(), true);
// Set up distributed version. We rely on using rank as seed to generate // Set up distributed version. We rely on using rank as seed to generate
// the exact same copy of data. // the exact same copy of data.
auto rank = rabit::GetRank(); auto rank = rabit::GetRank();
SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0, reducer);
HostDeviceVector<float> storage; HostDeviceVector<float> storage;
std::string interface_str = RandomDataGenerator{kRows, kCols, 0} std::string interface_str = RandomDataGenerator{kRows, kCols, 0}
.Device(0) .Device(0)
@ -402,7 +405,7 @@ TEST(GPUQuantile, AllReduceBasic) {
sketch_on_single_node.Data().size()); sketch_on_single_node.Data().size());
TestQuantileElemRank(0, sketch_distributed.Data(), TestQuantileElemRank(0, sketch_distributed.Data(),
sketch_distributed.ColumnsPtr()); sketch_distributed.ColumnsPtr(), true);
std::vector<SketchEntry> single_node_data( std::vector<SketchEntry> single_node_data(
sketch_on_single_node.Data().size()); sketch_on_single_node.Data().size());
@ -432,13 +435,15 @@ TEST(GPUQuantile, SameOnAllWorkers) {
} else { } else {
return; return;
} }
auto reducer = std::make_shared<dh::AllReducer>();
reducer->Init(0);
constexpr size_t kRows = 1000, kCols = 100; constexpr size_t kRows = 1000, kCols = 100;
RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins, RunWithSeedsAndBins(kRows, [=](int32_t seed, size_t n_bins,
MetaInfo const &info) { MetaInfo const &info) {
auto rank = rabit::GetRank(); auto rank = rabit::GetRank();
HostDeviceVector<FeatureType> ft; HostDeviceVector<FeatureType> ft;
SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0); SketchContainer sketch_distributed(ft, n_bins, kCols, kRows, 0, reducer);
HostDeviceVector<float> storage; HostDeviceVector<float> storage;
std::string interface_str = RandomDataGenerator{kRows, kCols, 0} std::string interface_str = RandomDataGenerator{kRows, kCols, 0}
.Device(0) .Device(0)
@ -450,7 +455,7 @@ TEST(GPUQuantile, SameOnAllWorkers) {
&sketch_distributed); &sketch_distributed);
sketch_distributed.AllReduce(); sketch_distributed.AllReduce();
sketch_distributed.Unique(); sketch_distributed.Unique();
TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr()); TestQuantileElemRank(0, sketch_distributed.Data(), sketch_distributed.ColumnsPtr(), true);
// Test for all workers having the same sketch. // Test for all workers having the same sketch.
size_t n_data = sketch_distributed.Data().size(); size_t n_data = sketch_distributed.Data().size();
@ -467,12 +472,9 @@ TEST(GPUQuantile, SameOnAllWorkers) {
thrust::copy(thrust::device, local_data.data(), thrust::copy(thrust::device, local_data.data(),
local_data.data() + local_data.size(), local_data.data() + local_data.size(),
all_workers.begin() + local_data.size() * rank); all_workers.begin() + local_data.size() * rank);
dh::AllReducer reducer; reducer->AllReduceSum(all_workers.data().get(), all_workers.data().get(),
reducer.Init(0);
reducer.AllReduceSum(all_workers.data().get(), all_workers.data().get(),
all_workers.size()); all_workers.size());
reducer.Synchronize(); reducer->Synchronize();
auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float); auto base_line = dh::ToSpan(all_workers).subspan(0, size_as_float);
std::vector<float> h_base_line(base_line.size()); std::vector<float> h_base_line(base_line.size());

View File

@ -37,12 +37,12 @@ inline void InitRabitContext(std::string msg, int32_t n_workers) {
} }
template <typename Fn> void RunWithSeedsAndBins(size_t rows, Fn fn) { template <typename Fn> void RunWithSeedsAndBins(size_t rows, Fn fn) {
std::vector<int32_t> seeds(4); std::vector<int32_t> seeds(2);
SimpleLCG lcg; SimpleLCG lcg;
SimpleRealUniformDistribution<float> dist(3, 1000); SimpleRealUniformDistribution<float> dist(3, 1000);
std::generate(seeds.begin(), seeds.end(), [&](){ return dist(&lcg); }); std::generate(seeds.begin(), seeds.end(), [&](){ return dist(&lcg); });
std::vector<size_t> bins(8); std::vector<size_t> bins(2);
for (size_t i = 0; i < bins.size() - 1; ++i) { for (size_t i = 0; i < bins.size() - 1; ++i) {
bins[i] = i * 35 + 2; bins[i] = i * 35 + 2;
} }

View File

@ -22,8 +22,8 @@ def setup_rmm_pool(request, pytestconfig):
rmm.reinitialize(pool_allocator=True, initial_pool_size=1024*1024*1024, rmm.reinitialize(pool_allocator=True, initial_pool_size=1024*1024*1024,
devices=list(range(get_n_gpus()))) devices=list(range(get_n_gpus())))
@pytest.fixture(scope='function') @pytest.fixture(scope='class')
def local_cuda_cluster(request, pytestconfig): def local_cuda_client(request, pytestconfig):
kwargs = {} kwargs = {}
if hasattr(request, 'param'): if hasattr(request, 'param'):
kwargs.update(request.param) kwargs.update(request.param)
@ -31,13 +31,12 @@ def local_cuda_cluster(request, pytestconfig):
if not has_rmm(): if not has_rmm():
raise ImportError('The --use-rmm-pool option requires the RMM package') raise ImportError('The --use-rmm-pool option requires the RMM package')
import rmm import rmm
from dask_cuda.utils import get_n_gpus
kwargs['rmm_pool_size'] = '2GB' kwargs['rmm_pool_size'] = '2GB'
if tm.no_dask_cuda()['condition']: if tm.no_dask_cuda()['condition']:
raise ImportError('The local_cuda_cluster fixture requires dask_cuda package') raise ImportError('The local_cuda_cluster fixture requires dask_cuda package')
from dask_cuda import LocalCUDACluster from dask_cuda import LocalCUDACluster
with LocalCUDACluster(**kwargs) as cluster: from dask.distributed import Client
yield cluster yield Client(LocalCUDACluster(**kwargs))
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption('--use-rmm-pool', action='store_true', default=False, help='Use RMM pool') parser.addoption('--use-rmm-pool', action='store_true', default=False, help='Use RMM pool')

View File

@ -32,8 +32,5 @@ def test_categorical_demo():
@pytest.mark.mgpu @pytest.mark.mgpu
def test_dask_training(): def test_dask_training():
script = os.path.join(tm.PROJECT_ROOT, 'demo', 'dask', 'gpu_training.py') script = os.path.join(tm.PROJECT_ROOT, 'demo', 'dask', 'gpu_training.py')
cmd = ['python', script, '--ddqdm=1'] cmd = ['python', script]
subprocess.check_call(cmd)
cmd = ['python', script, '--ddqdm=0']
subprocess.check_call(cmd) subprocess.check_call(cmd)

View File

@ -45,7 +45,7 @@ try:
import xgboost as xgb import xgboost as xgb
from dask.distributed import Client from dask.distributed import Client
from dask import array as da from dask import array as da
from dask_cuda import LocalCUDACluster from dask_cuda import LocalCUDACluster, utils
import cudf import cudf
except ImportError: except ImportError:
pass pass
@ -53,6 +53,7 @@ except ImportError:
def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None: def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None:
import cupy as cp import cupy as cp
cp.cuda.runtime.setDevice(0) cp.cuda.runtime.setDevice(0)
X, y, _ = generate_array() X, y, _ = generate_array()
@ -63,14 +64,16 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None:
y = y.map_partitions(cudf.from_pandas) y = y.map_partitions(cudf.from_pandas)
dtrain = DMatrixT(client, X, y) dtrain = DMatrixT(client, X, y)
out = dxgb.train(client, {'tree_method': 'gpu_hist', out = dxgb.train(
'debug_synchronize': True}, client,
{"tree_method": "gpu_hist", "debug_synchronize": True},
dtrain=dtrain, dtrain=dtrain,
evals=[(dtrain, 'X')], evals=[(dtrain, "X")],
num_boost_round=4) num_boost_round=4,
)
assert isinstance(out['booster'], dxgb.Booster) assert isinstance(out["booster"], dxgb.Booster)
assert len(out['history']['X']['rmse']) == 4 assert len(out["history"]["X"]["rmse"]) == 4
predictions = dxgb.predict(client, out, dtrain) predictions = dxgb.predict(client, out, dtrain)
assert isinstance(predictions.compute(), np.ndarray) assert isinstance(predictions.compute(), np.ndarray)
@ -78,27 +81,23 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None:
series_predictions = dxgb.inplace_predict(client, out, X) series_predictions = dxgb.inplace_predict(client, out, X)
assert isinstance(series_predictions, dd.Series) assert isinstance(series_predictions, dd.Series)
single_node = out['booster'].predict(xgboost.DMatrix(X.compute())) single_node = out["booster"].predict(xgboost.DMatrix(X.compute()))
cp.testing.assert_allclose(single_node, predictions.compute()) cp.testing.assert_allclose(single_node, predictions.compute())
np.testing.assert_allclose(single_node, np.testing.assert_allclose(single_node, series_predictions.compute().to_numpy())
series_predictions.compute().to_numpy())
predt = dxgb.predict(client, out, X) predt = dxgb.predict(client, out, X)
assert isinstance(predt, dd.Series) assert isinstance(predt, dd.Series)
T = TypeVar('T') T = TypeVar("T")
def is_df(part: T) -> T: def is_df(part: T) -> T:
assert isinstance(part, cudf.DataFrame), part assert isinstance(part, cudf.DataFrame), part
return part return part
predt.map_partitions( predt.map_partitions(is_df, meta=dd.utils.make_meta({"prediction": "f4"}))
is_df,
meta=dd.utils.make_meta({'prediction': 'f4'}))
cp.testing.assert_allclose( cp.testing.assert_allclose(predt.values.compute(), single_node)
predt.values.compute(), single_node)
# Make sure the output can be integrated back to original dataframe # Make sure the output can be integrated back to original dataframe
X["predict"] = predictions X["predict"] = predictions
@ -110,49 +109,35 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None:
def run_with_dask_array(DMatrixT: Type, client: Client) -> None: def run_with_dask_array(DMatrixT: Type, client: Client) -> None:
import cupy as cp import cupy as cp
cp.cuda.runtime.setDevice(0) cp.cuda.runtime.setDevice(0)
X, y, _ = generate_array() X, y, _ = generate_array()
X = X.map_blocks(cp.asarray) X = X.map_blocks(cp.asarray)
y = y.map_blocks(cp.asarray) y = y.map_blocks(cp.asarray)
dtrain = DMatrixT(client, X, y) dtrain = DMatrixT(client, X, y)
out = dxgb.train(client, {'tree_method': 'gpu_hist', out = dxgb.train(
'debug_synchronize': True}, client,
{"tree_method": "gpu_hist", "debug_synchronize": True},
dtrain=dtrain, dtrain=dtrain,
evals=[(dtrain, 'X')], evals=[(dtrain, "X")],
num_boost_round=2) num_boost_round=2,
)
from_dmatrix = dxgb.predict(client, out, dtrain).compute() from_dmatrix = dxgb.predict(client, out, dtrain).compute()
inplace_predictions = dxgb.inplace_predict( inplace_predictions = dxgb.inplace_predict(client, out, X).compute()
client, out, X).compute() single_node = out["booster"].predict(xgboost.DMatrix(X.compute()))
single_node = out['booster'].predict(
xgboost.DMatrix(X.compute()))
np.testing.assert_allclose(single_node, from_dmatrix) np.testing.assert_allclose(single_node, from_dmatrix)
device = cp.cuda.runtime.getDevice() device = cp.cuda.runtime.getDevice()
assert device == inplace_predictions.device.id assert device == inplace_predictions.device.id
single_node = cp.array(single_node) single_node = cp.array(single_node)
assert device == single_node.device.id assert device == single_node.device.id
cp.testing.assert_allclose( cp.testing.assert_allclose(single_node, inplace_predictions)
single_node,
inplace_predictions)
@pytest.mark.skipif(**tm.no_dask_cudf())
def test_categorical(local_cuda_cluster: LocalCUDACluster) -> None:
with Client(local_cuda_cluster) as client:
import dask_cudf
X, y = make_categorical(client, 10000, 30, 13)
X = dask_cudf.from_dask_dataframe(X)
X_onehot, _ = make_categorical(client, 10000, 30, 13, True)
X_onehot = dask_cudf.from_dask_dataframe(X_onehot)
run_categorical(client, "gpu_hist", X, X_onehot, y)
def to_cp(x: Any, DMatrixT: Type) -> Any: def to_cp(x: Any, DMatrixT: Type) -> Any:
import cupy import cupy
if isinstance(x, np.ndarray) and \
DMatrixT is dxgb.DaskDeviceQuantileDMatrix: if isinstance(x, np.ndarray) and DMatrixT is dxgb.DaskDeviceQuantileDMatrix:
X = cupy.array(x) X = cupy.array(x)
else: else:
X = x X = x
@ -213,126 +198,155 @@ def run_gpu_hist(
assert tm.non_increasing(history) assert tm.non_increasing(history)
@pytest.mark.skipif(**tm.no_cudf()) def test_tree_stats() -> None:
def test_boost_from_prediction(local_cuda_cluster: LocalCUDACluster) -> None: with LocalCUDACluster(n_workers=1) as cluster:
import cudf with Client(cluster) as client:
from sklearn.datasets import load_breast_cancer, load_digits local = run_tree_stats(client, "gpu_hist")
with Client(local_cuda_cluster) as client:
X_, y_ = load_breast_cancer(return_X_y=True)
X = dd.from_array(X_, chunksize=100).map_partitions(cudf.from_pandas)
y = dd.from_array(y_, chunksize=100).map_partitions(cudf.from_pandas)
run_boost_from_prediction(X, y, "gpu_hist", client)
X_, y_ = load_digits(return_X_y=True) with LocalCUDACluster(n_workers=2) as cluster:
X = dd.from_array(X_, chunksize=100).map_partitions(cudf.from_pandas) with Client(cluster) as client:
y = dd.from_array(y_, chunksize=100).map_partitions(cudf.from_pandas) distributed = run_tree_stats(client, "gpu_hist")
run_boost_from_prediction_multi_class(X, y, "gpu_hist", client)
assert local == distributed
class TestDistributedGPU: class TestDistributedGPU:
@pytest.mark.skipif(**tm.no_cudf())
def test_boost_from_prediction(self, local_cuda_client: Client) -> None:
import cudf
from sklearn.datasets import load_breast_cancer, load_iris
X_, y_ = load_breast_cancer(return_X_y=True)
X = dd.from_array(X_, chunksize=100).map_partitions(cudf.from_pandas)
y = dd.from_array(y_, chunksize=100).map_partitions(cudf.from_pandas)
run_boost_from_prediction(X, y, "gpu_hist", local_cuda_client)
X_, y_ = load_iris(return_X_y=True)
X = dd.from_array(X_, chunksize=50).map_partitions(cudf.from_pandas)
y = dd.from_array(y_, chunksize=50).map_partitions(cudf.from_pandas)
run_boost_from_prediction_multi_class(X, y, "gpu_hist", local_cuda_client)
@pytest.mark.skipif(**tm.no_dask_cudf()) @pytest.mark.skipif(**tm.no_dask_cudf())
def test_dask_dataframe(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_dask_dataframe(self, local_cuda_client: Client) -> None:
with Client(local_cuda_cluster) as client: run_with_dask_dataframe(dxgb.DaskDMatrix, local_cuda_client)
run_with_dask_dataframe(dxgb.DaskDMatrix, client) run_with_dask_dataframe(dxgb.DaskDeviceQuantileDMatrix, local_cuda_client)
run_with_dask_dataframe(dxgb.DaskDeviceQuantileDMatrix, client)
@pytest.mark.skipif(**tm.no_dask_cudf())
def test_categorical(self, local_cuda_client: Client) -> None:
import dask_cudf
X, y = make_categorical(local_cuda_client, 10000, 30, 13)
X = dask_cudf.from_dask_dataframe(X)
X_onehot, _ = make_categorical(local_cuda_client, 10000, 30, 13, True)
X_onehot = dask_cudf.from_dask_dataframe(X_onehot)
run_categorical(local_cuda_client, "gpu_hist", X, X_onehot, y)
@given( @given(
params=parameter_strategy, params=parameter_strategy,
num_rounds=strategies.integers(1, 20), num_rounds=strategies.integers(1, 20),
dataset=tm.dataset_strategy, dataset=tm.dataset_strategy,
dmatrix_type=strategies.sampled_from(
[dxgb.DaskDMatrix, dxgb.DaskDeviceQuantileDMatrix]
),
)
@settings(
deadline=duration(seconds=120),
max_examples=20,
suppress_health_check=suppress,
print_blob=True,
) )
@settings(deadline=duration(seconds=120), suppress_health_check=suppress, print_blob=True)
@pytest.mark.skipif(**tm.no_cupy()) @pytest.mark.skipif(**tm.no_cupy())
@pytest.mark.parametrize(
"local_cuda_cluster", [{"n_workers": 2}], indirect=["local_cuda_cluster"]
)
def test_gpu_hist( def test_gpu_hist(
self, self,
params: Dict, params: Dict,
num_rounds: int, num_rounds: int,
dataset: tm.TestDataset, dataset: tm.TestDataset,
local_cuda_cluster: LocalCUDACluster, dmatrix_type: type,
local_cuda_client: Client,
) -> None: ) -> None:
with Client(local_cuda_cluster) as client: run_gpu_hist(params, num_rounds, dataset, dmatrix_type, local_cuda_client)
run_gpu_hist(params, num_rounds, dataset, dxgb.DaskDMatrix, client)
run_gpu_hist(
params, num_rounds, dataset, dxgb.DaskDeviceQuantileDMatrix, client
)
@pytest.mark.skipif(**tm.no_cupy()) @pytest.mark.skipif(**tm.no_cupy())
def test_dask_array(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_dask_array(self, local_cuda_client: Client) -> None:
with Client(local_cuda_cluster) as client: run_with_dask_array(dxgb.DaskDMatrix, local_cuda_client)
run_with_dask_array(dxgb.DaskDMatrix, client) run_with_dask_array(dxgb.DaskDeviceQuantileDMatrix, local_cuda_client)
run_with_dask_array(dxgb.DaskDeviceQuantileDMatrix, client)
@pytest.mark.skipif(**tm.no_cupy()) @pytest.mark.skipif(**tm.no_cupy())
def test_early_stopping(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_early_stopping(self, local_cuda_client: Client) -> None:
from sklearn.datasets import load_breast_cancer from sklearn.datasets import load_breast_cancer
with Client(local_cuda_cluster) as client:
X, y = load_breast_cancer(return_X_y=True) X, y = load_breast_cancer(return_X_y=True)
X, y = da.from_array(X), da.from_array(y) X, y = da.from_array(X), da.from_array(y)
m = dxgb.DaskDMatrix(client, X, y) m = dxgb.DaskDMatrix(local_cuda_client, X, y)
valid = dxgb.DaskDMatrix(client, X, y) valid = dxgb.DaskDMatrix(local_cuda_client, X, y)
early_stopping_rounds = 5 early_stopping_rounds = 5
booster = dxgb.train(client, {'objective': 'binary:logistic', booster = dxgb.train(
'eval_metric': 'error', local_cuda_client,
'tree_method': 'gpu_hist'}, m, {
evals=[(valid, 'Valid')], "objective": "binary:logistic",
"eval_metric": "error",
"tree_method": "gpu_hist",
},
m,
evals=[(valid, "Valid")],
num_boost_round=1000, num_boost_round=1000,
early_stopping_rounds=early_stopping_rounds)[ early_stopping_rounds=early_stopping_rounds,
'booster'] )["booster"]
assert hasattr(booster, 'best_score') assert hasattr(booster, "best_score")
dump = booster.get_dump(dump_format='json') dump = booster.get_dump(dump_format="json")
print(booster.best_iteration)
assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 assert len(dump) - booster.best_iteration == early_stopping_rounds + 1
valid_X = X valid_X = X
valid_y = y valid_y = y
cls = dxgb.DaskXGBClassifier(objective='binary:logistic', cls = dxgb.DaskXGBClassifier(
tree_method='gpu_hist', objective="binary:logistic",
eval_metric='error', tree_method="gpu_hist",
n_estimators=100) eval_metric="error",
cls.client = client n_estimators=100,
cls.fit(X, y, early_stopping_rounds=early_stopping_rounds, )
eval_set=[(valid_X, valid_y)]) cls.client = local_cuda_client
cls.fit(
X,
y,
early_stopping_rounds=early_stopping_rounds,
eval_set=[(valid_X, valid_y)],
)
booster = cls.get_booster() booster = cls.get_booster()
dump = booster.get_dump(dump_format='json') dump = booster.get_dump(dump_format="json")
assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 assert len(dump) - booster.best_iteration == early_stopping_rounds + 1
@pytest.mark.skipif(**tm.no_cudf()) @pytest.mark.skipif(**tm.no_cudf())
@pytest.mark.parametrize("model", ["boosting"]) @pytest.mark.parametrize("model", ["boosting"])
def test_dask_classifier( def test_dask_classifier(self, model: str, local_cuda_client: Client) -> None:
self, model: str, local_cuda_cluster: LocalCUDACluster
) -> None:
import dask_cudf import dask_cudf
with Client(local_cuda_cluster) as client:
X_, y_, w_ = generate_array(with_weights=True) X_, y_, w_ = generate_array(with_weights=True)
y_ = (y_ * 10).astype(np.int32) y_ = (y_ * 10).astype(np.int32)
X = dask_cudf.from_dask_dataframe(dd.from_dask_array(X_)) X = dask_cudf.from_dask_dataframe(dd.from_dask_array(X_))
y = dask_cudf.from_dask_dataframe(dd.from_dask_array(y_)) y = dask_cudf.from_dask_dataframe(dd.from_dask_array(y_))
w = dask_cudf.from_dask_dataframe(dd.from_dask_array(w_)) w = dask_cudf.from_dask_dataframe(dd.from_dask_array(w_))
run_dask_classifier(X, y, w, model, "gpu_hist", client, 10) run_dask_classifier(X, y, w, model, "gpu_hist", local_cuda_client, 10)
def test_empty_dmatrix(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_empty_dmatrix(self, local_cuda_client: Client) -> None:
with Client(local_cuda_cluster) as client: parameters = {"tree_method": "gpu_hist", "debug_synchronize": True}
parameters = {'tree_method': 'gpu_hist', 'debug_synchronize': True} run_empty_dmatrix_reg(local_cuda_client, parameters)
run_empty_dmatrix_reg(client, parameters) run_empty_dmatrix_cls(local_cuda_client, parameters)
run_empty_dmatrix_cls(client, parameters)
@pytest.mark.skipif(**tm.no_dask_cudf()) @pytest.mark.skipif(**tm.no_dask_cudf())
def test_empty_partition(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_empty_partition(self, local_cuda_client: Client) -> None:
import dask_cudf import dask_cudf
import cudf import cudf
import cupy import cupy
with Client(local_cuda_cluster) as client:
mult = 100 mult = 100
df = cudf.DataFrame( df = cudf.DataFrame(
{ {
"a": [1, 2, 3, 4, 5.1] * mult, "a": [1, 2, 3, 4, 5.1] * mult,
"b": [10, 15, 29.3, 30, 31] * mult, "b": [10, 15, 29.3, 30, 31] * mult,
"y": [10, 20, 30, 40., 50] * mult, "y": [10, 20, 30, 40.0, 50] * mult,
} }
) )
parameters = {"tree_method": "gpu_hist", "debug_synchronize": True} parameters = {"tree_method": "gpu_hist", "debug_synchronize": True}
@ -345,11 +359,11 @@ class TestDistributedGPU:
) )
X = ddf[ddf.columns.difference(["y"])] X = ddf[ddf.columns.difference(["y"])]
y = ddf[["y"]] y = ddf[["y"]]
dtrain = dxgb.DaskDeviceQuantileDMatrix(client, X, y) dtrain = dxgb.DaskDeviceQuantileDMatrix(local_cuda_client, X, y)
bst_empty = xgb.dask.train( bst_empty = xgb.dask.train(
client, parameters, dtrain, evals=[(dtrain, "train")] local_cuda_client, parameters, dtrain, evals=[(dtrain, "train")]
) )
predt_empty = dxgb.predict(client, bst_empty, X).compute().values predt_empty = dxgb.predict(local_cuda_client, bst_empty, X).compute().values
ddf = dask_cudf.concat( ddf = dask_cudf.concat(
[dask_cudf.from_cudf(df, npartitions=3)] [dask_cudf.from_cudf(df, npartitions=3)]
@ -357,16 +371,18 @@ class TestDistributedGPU:
) )
X = ddf[ddf.columns.difference(["y"])] X = ddf[ddf.columns.difference(["y"])]
y = ddf[["y"]] y = ddf[["y"]]
dtrain = dxgb.DaskDeviceQuantileDMatrix(client, X, y) dtrain = dxgb.DaskDeviceQuantileDMatrix(local_cuda_client, X, y)
bst = xgb.dask.train(client, parameters, dtrain, evals=[(dtrain, "train")]) bst = xgb.dask.train(
local_cuda_client, parameters, dtrain, evals=[(dtrain, "train")]
)
predt = dxgb.predict(client, bst, X).compute().values predt = dxgb.predict(local_cuda_client, bst, X).compute().values
cupy.testing.assert_allclose(predt, predt_empty) cupy.testing.assert_allclose(predt, predt_empty)
predt = dxgb.predict(client, bst, dtrain).compute() predt = dxgb.predict(local_cuda_client, bst, dtrain).compute()
cupy.testing.assert_allclose(predt, predt_empty) cupy.testing.assert_allclose(predt, predt_empty)
predt = dxgb.inplace_predict(client, bst, X).compute().values predt = dxgb.inplace_predict(local_cuda_client, bst, X).compute().values
cupy.testing.assert_allclose(predt, predt_empty) cupy.testing.assert_allclose(predt, predt_empty)
df = df.to_pandas() df = df.to_pandas()
@ -381,30 +397,32 @@ class TestDistributedGPU:
predt_empty = cupy.asnumpy(predt_empty) predt_empty = cupy.asnumpy(predt_empty)
predt = dxgb.predict(client, bst_empty, X).compute().values predt = dxgb.predict(local_cuda_client, bst_empty, X).compute().values
np.testing.assert_allclose(predt, predt_empty) np.testing.assert_allclose(predt, predt_empty)
in_predt = dxgb.inplace_predict(client, bst_empty, X).compute().values in_predt = (
dxgb.inplace_predict(local_cuda_client, bst_empty, X).compute().values
)
np.testing.assert_allclose(predt, in_predt) np.testing.assert_allclose(predt, in_predt)
def test_empty_dmatrix_auc(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_empty_dmatrix_auc(self, local_cuda_client: Client) -> None:
with Client(local_cuda_cluster) as client: n_workers = len(_get_client_workers(local_cuda_client))
n_workers = len(_get_client_workers(client)) run_empty_dmatrix_auc(local_cuda_client, "gpu_hist", n_workers)
run_empty_dmatrix_auc(client, "gpu_hist", n_workers)
def test_auc(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_auc(self, local_cuda_client: Client) -> None:
with Client(local_cuda_cluster) as client: run_auc(local_cuda_client, "gpu_hist")
run_auc(client, "gpu_hist")
def test_data_initialization(self, local_cuda_client: Client) -> None:
def test_data_initialization(self, local_cuda_cluster: LocalCUDACluster) -> None:
with Client(local_cuda_cluster) as client:
X, y, _ = generate_array() X, y, _ = generate_array()
fw = da.random.random((random_cols,)) fw = da.random.random((random_cols,))
fw = fw - fw.min() fw = fw - fw.min()
m = dxgb.DaskDMatrix(client, X, y, feature_weights=fw) m = dxgb.DaskDMatrix(local_cuda_client, X, y, feature_weights=fw)
workers = _get_client_workers(client) workers = _get_client_workers(local_cuda_client)
rabit_args = client.sync(dxgb._get_rabit_args, len(workers), None, client) rabit_args = local_cuda_client.sync(
dxgb._get_rabit_args, len(workers), None, local_cuda_client
)
def worker_fn(worker_addr: str, data_ref: Dict) -> None: def worker_fn(worker_addr: str, data_ref: Dict) -> None:
with dxgb.RabitContext(rabit_args): with dxgb.RabitContext(rabit_args):
@ -415,15 +433,15 @@ class TestDistributedGPU:
futures = [] futures = []
for i in range(len(workers)): for i in range(len(workers)):
futures.append( futures.append(
client.submit( local_cuda_client.submit(
worker_fn, worker_fn,
workers[i], workers[i],
m._create_fn_args(workers[i]), m._create_fn_args(workers[i]),
pure=False, pure=False,
workers=[workers[i]] workers=[workers[i]],
) )
) )
client.gather(futures) local_cuda_client.gather(futures)
def test_interface_consistency(self) -> None: def test_interface_consistency(self) -> None:
sig = OrderedDict(signature(dxgb.DaskDMatrix).parameters) sig = OrderedDict(signature(dxgb.DaskDMatrix).parameters)
@ -470,81 +488,79 @@ class TestDistributedGPU:
for rn, drn in zip(ranker_names, dranker_names): for rn, drn in zip(ranker_names, dranker_names):
assert rn == drn assert rn == drn
def test_tree_stats(self) -> None: def run_quantile(self, name: str, local_cuda_client: Client) -> None:
with LocalCUDACluster(n_workers=1) as cluster:
with Client(cluster) as client:
local = run_tree_stats(client, "gpu_hist")
with LocalCUDACluster(n_workers=2) as cluster:
with Client(cluster) as client:
distributed = run_tree_stats(client, "gpu_hist")
assert local == distributed
def run_quantile(self, name: str, local_cuda_cluster: LocalCUDACluster) -> None:
if sys.platform.startswith("win"): if sys.platform.startswith("win"):
pytest.skip("Skipping dask tests on Windows") pytest.skip("Skipping dask tests on Windows")
exe = None exe = None
for possible_path in {'./testxgboost', './build/testxgboost', for possible_path in {
'../build/testxgboost', '../gpu-build/testxgboost'}: "./testxgboost",
"./build/testxgboost",
"../build/testxgboost",
"../gpu-build/testxgboost",
}:
if os.path.exists(possible_path): if os.path.exists(possible_path):
exe = possible_path exe = possible_path
assert exe, 'No testxgboost executable found.' assert exe, "No testxgboost executable found."
test = "--gtest_filter=GPUQuantile." + name test = "--gtest_filter=GPUQuantile." + name
def runit( def runit(
worker_addr: str, rabit_args: List[bytes] worker_addr: str, rabit_args: List[bytes]
) -> subprocess.CompletedProcess: ) -> subprocess.CompletedProcess:
port_env = '' port_env = ""
# setup environment for running the c++ part. # setup environment for running the c++ part.
for arg in rabit_args: for arg in rabit_args:
if arg.decode('utf-8').startswith('DMLC_TRACKER_PORT'): if arg.decode("utf-8").startswith("DMLC_TRACKER_PORT"):
port_env = arg.decode('utf-8') port_env = arg.decode("utf-8")
if arg.decode("utf-8").startswith("DMLC_TRACKER_URI"): if arg.decode("utf-8").startswith("DMLC_TRACKER_URI"):
uri_env = arg.decode("utf-8") uri_env = arg.decode("utf-8")
port = port_env.split('=') port = port_env.split("=")
env = os.environ.copy() env = os.environ.copy()
env[port[0]] = port[1] env[port[0]] = port[1]
uri = uri_env.split("=") uri = uri_env.split("=")
env[uri[0]] = uri[1] env[uri[0]] = uri[1]
return subprocess.run([str(exe), test], env=env, stdout=subprocess.PIPE) return subprocess.run([str(exe), test], env=env, stdout=subprocess.PIPE)
with Client(local_cuda_cluster) as client: workers = _get_client_workers(local_cuda_client)
workers = _get_client_workers(client) rabit_args = local_cuda_client.sync(
rabit_args = client.sync(dxgb._get_rabit_args, len(workers), None, client) dxgb._get_rabit_args, len(workers), None, local_cuda_client
futures = client.map(runit, )
workers, futures = local_cuda_client.map(
pure=False, runit, workers, pure=False, workers=workers, rabit_args=rabit_args
workers=workers, )
rabit_args=rabit_args) results = local_cuda_client.gather(futures)
results = client.gather(futures)
for ret in results: for ret in results:
msg = ret.stdout.decode('utf-8') msg = ret.stdout.decode("utf-8")
assert msg.find('1 test from GPUQuantile') != -1, msg assert msg.find("1 test from GPUQuantile") != -1, msg
assert ret.returncode == 0, msg assert ret.returncode == 0, msg
@pytest.mark.gtest @pytest.mark.gtest
def test_quantile_basic(self, local_cuda_cluster: LocalCUDACluster) -> None: def test_quantile_basic(self, local_cuda_client: Client) -> None:
self.run_quantile('AllReduceBasic', local_cuda_cluster) self.run_quantile("AllReduceBasic", local_cuda_client)
@pytest.mark.gtest @pytest.mark.gtest
def test_quantile_same_on_all_workers( def test_quantile_same_on_all_workers(self, local_cuda_client: Client) -> None:
self, local_cuda_cluster: LocalCUDACluster self.run_quantile("SameOnAllWorkers", local_cuda_client)
) -> None:
self.run_quantile('SameOnAllWorkers', local_cuda_cluster)
@pytest.mark.skipif(**tm.no_cupy())
def test_with_asyncio(local_cuda_client: Client) -> None:
address = local_cuda_client.scheduler.address
output = asyncio.run(run_from_dask_array_asyncio(address))
assert isinstance(output["booster"], xgboost.Booster)
assert isinstance(output["history"], dict)
async def run_from_dask_array_asyncio(scheduler_address: str) -> dxgb.TrainReturnT: async def run_from_dask_array_asyncio(scheduler_address: str) -> dxgb.TrainReturnT:
async with Client(scheduler_address, asynchronous=True) as client: async with Client(scheduler_address, asynchronous=True) as client:
import cupy as cp import cupy as cp
X, y, _ = generate_array() X, y, _ = generate_array()
X = X.map_blocks(cp.array) X = X.map_blocks(cp.array)
y = y.map_blocks(cp.array) y = y.map_blocks(cp.array)
m = await xgboost.dask.DaskDeviceQuantileDMatrix(client, X, y) m = await xgboost.dask.DaskDeviceQuantileDMatrix(client, X, y)
output = await xgboost.dask.train(client, {'tree_method': 'gpu_hist'}, output = await xgboost.dask.train(client, {"tree_method": "gpu_hist"}, dtrain=m)
dtrain=m)
with_m = await xgboost.dask.predict(client, output, m) with_m = await xgboost.dask.predict(client, output, m)
with_X = await xgboost.dask.predict(client, output, X) with_X = await xgboost.dask.predict(client, output, X)
@ -553,19 +569,12 @@ async def run_from_dask_array_asyncio(scheduler_address: str) -> dxgb.TrainRetur
assert isinstance(with_X, da.Array) assert isinstance(with_X, da.Array)
assert isinstance(inplace, da.Array) assert isinstance(inplace, da.Array)
cp.testing.assert_allclose(await client.compute(with_m), cp.testing.assert_allclose(
await client.compute(with_X)) await client.compute(with_m), await client.compute(with_X)
cp.testing.assert_allclose(await client.compute(with_m), )
await client.compute(inplace)) cp.testing.assert_allclose(
await client.compute(with_m), await client.compute(inplace)
)
client.shutdown() client.shutdown()
return output return output
@pytest.mark.skipif(**tm.no_cupy())
def test_with_asyncio(local_cuda_cluster: LocalCUDACluster) -> None:
with Client(local_cuda_cluster) as client:
address = client.scheduler.address
output = asyncio.run(run_from_dask_array_asyncio(address))
assert isinstance(output['booster'], xgboost.Booster)
assert isinstance(output['history'], dict)