diff --git a/tests/cpp/common/test_quantile.cc b/tests/cpp/common/test_quantile.cc index 73fa4d5e7..bd6932aa3 100644 --- a/tests/cpp/common/test_quantile.cc +++ b/tests/cpp/common/test_quantile.cc @@ -40,20 +40,10 @@ void PushPage(HostSketchContainer* container, SparsePage const& page, MetaInfo c Span hessian) { container->PushRowPage(page, info, hessian); } -} // anonymous namespace template -void TestDistributedQuantile(size_t rows, size_t cols) { - std::string msg {"Skipping AllReduce test"}; - int32_t constexpr kWorkers = 4; - InitCommunicatorContext(msg, kWorkers); - auto world = collective::GetWorldSize(); - if (world != 1) { - ASSERT_EQ(world, kWorkers); - } else { - return; - } - +void DoTestDistributedQuantile(size_t rows, size_t cols) { + auto const world = collective::GetWorldSize(); std::vector infos(2); auto& h_weights = infos.front().weights_.HostVector(); h_weights.resize(rows); @@ -152,47 +142,36 @@ void TestDistributedQuantile(size_t rows, size_t cols) { } } +template +void TestDistributedQuantile(size_t const rows, size_t const cols) { + auto constexpr kWorkers = 4; + RunWithInMemoryCommunicator(kWorkers, DoTestDistributedQuantile, rows, cols); +} +} // anonymous namespace + TEST(Quantile, DistributedBasic) { -#if defined(__unix__) constexpr size_t kRows = 10, kCols = 10; TestDistributedQuantile(kRows, kCols); -#endif } TEST(Quantile, Distributed) { -#if defined(__unix__) constexpr size_t kRows = 4000, kCols = 200; TestDistributedQuantile(kRows, kCols); -#endif } TEST(Quantile, SortedDistributedBasic) { -#if defined(__unix__) constexpr size_t kRows = 10, kCols = 10; TestDistributedQuantile(kRows, kCols); -#endif } TEST(Quantile, SortedDistributed) { -#if defined(__unix__) constexpr size_t kRows = 4000, kCols = 200; TestDistributedQuantile(kRows, kCols); -#endif } -TEST(Quantile, SameOnAllWorkers) { -#if defined(__unix__) - std::string msg{"Skipping Quantile AllreduceBasic test"}; - int32_t constexpr kWorkers = 4; - InitCommunicatorContext(msg, kWorkers); - auto world = collective::GetWorldSize(); - if (world != 1) { - CHECK_EQ(world, kWorkers); - } else { - LOG(WARNING) << msg; - return; - } - +namespace { +void TestSameOnAllWorkers() { + auto const world = collective::GetWorldSize(); constexpr size_t kRows = 1000, kCols = 100; RunWithSeedsAndBins( kRows, [=](int32_t seed, size_t n_bins, MetaInfo const&) { @@ -256,8 +235,13 @@ TEST(Quantile, SameOnAllWorkers) { } } }); - collective::Finalize(); -#endif // defined(__unix__) } +} // anonymous namespace + +TEST(Quantile, SameOnAllWorkers) { + auto constexpr kWorkers = 4; + RunWithInMemoryCommunicator(kWorkers, TestSameOnAllWorkers); +} + } // namespace common } // namespace xgboost diff --git a/tests/cpp/common/test_quantile.cu b/tests/cpp/common/test_quantile.cu index 4d6fecd88..d3f7dbed0 100644 --- a/tests/cpp/common/test_quantile.cu +++ b/tests/cpp/common/test_quantile.cu @@ -338,12 +338,9 @@ TEST(GPUQuantile, MultiMerge) { }); } -TEST(GPUQuantile, AllReduceBasic) { - // This test is supposed to run by a python test that setups the environment. - std::string msg {"Skipping AllReduce test"}; - auto n_gpus = AllVisibleGPUs(); - InitCommunicatorContext(msg, n_gpus); - auto world = collective::GetWorldSize(); +namespace { +void TestAllReduceBasic(int32_t n_gpus) { + auto const world = collective::GetWorldSize(); if (world != 1) { ASSERT_EQ(world, n_gpus); } else { @@ -420,13 +417,16 @@ TEST(GPUQuantile, AllReduceBasic) { ASSERT_NEAR(single_node_data[i].wmin, distributed_data[i].wmin, Eps); } }); - collective::Finalize(); +} +} // anonymous namespace + +TEST(GPUQuantile, AllReduceBasic) { + auto const n_gpus = AllVisibleGPUs(); + RunWithInMemoryCommunicator(n_gpus, TestAllReduceBasic, n_gpus); } -TEST(GPUQuantile, SameOnAllWorkers) { - std::string msg {"Skipping SameOnAllWorkers test"}; - auto n_gpus = AllVisibleGPUs(); - InitCommunicatorContext(msg, n_gpus); +namespace { +void TestSameOnAllWorkers(int32_t n_gpus) { auto world = collective::GetWorldSize(); if (world != 1) { ASSERT_EQ(world, n_gpus); @@ -490,6 +490,12 @@ TEST(GPUQuantile, SameOnAllWorkers) { } }); } +} // anonymous namespace + +TEST(GPUQuantile, SameOnAllWorkers) { + auto const n_gpus = AllVisibleGPUs(); + RunWithInMemoryCommunicator(n_gpus, TestSameOnAllWorkers, n_gpus); +} TEST(GPUQuantile, Push) { size_t constexpr kRows = 100; diff --git a/tests/cpp/common/test_quantile.h b/tests/cpp/common/test_quantile.h index 0d2e52cae..957e5c987 100644 --- a/tests/cpp/common/test_quantile.h +++ b/tests/cpp/common/test_quantile.h @@ -10,31 +10,6 @@ namespace xgboost { namespace common { -inline void InitCommunicatorContext(std::string msg, int32_t n_workers) { - auto port = std::getenv("DMLC_TRACKER_PORT"); - std::string port_str; - if (port) { - port_str = port; - } else { - LOG(WARNING) << msg << " as `DMLC_TRACKER_PORT` is not set up."; - return; - } - auto uri = std::getenv("DMLC_TRACKER_URI"); - std::string uri_str; - if (uri) { - uri_str = uri; - } else { - LOG(WARNING) << msg << " as `DMLC_TRACKER_URI` is not set up."; - return; - } - - Json config{JsonObject()}; - config["DMLC_TRACKER_PORT"] = port_str; - config["DMLC_TRACKER_URI"] = uri_str; - config["DMLC_NUM_WORKER"] = n_workers; - collective::Init(config); -} - template void RunWithSeedsAndBins(size_t rows, Fn fn) { std::vector seeds(2); SimpleLCG lcg; diff --git a/tests/cpp/helpers.h b/tests/cpp/helpers.h index d3ccff284..80f504d8e 100644 --- a/tests/cpp/helpers.h +++ b/tests/cpp/helpers.h @@ -1,8 +1,7 @@ /** * Copyright 2016-2023 by XGBoost contributors */ -#ifndef XGBOOST_TESTS_CPP_HELPERS_H_ -#define XGBOOST_TESTS_CPP_HELPERS_H_ +#pragma once #include #include @@ -16,8 +15,10 @@ #include #include #include +#include #include +#include "../../src/collective/communicator-inl.h" #include "../../src/common/common.h" #include "../../src/data/array_interface.h" #include "../../src/gbm/gbtree_model.h" @@ -460,5 +461,25 @@ inline LearnerModelParam MakeMP(bst_feature_t n_features, float base_score, uint return mparam; } +template +void RunWithInMemoryCommunicator(int32_t world_size, Function&& function, Args&&... args) { + std::vector threads; + for (auto rank = 0; rank < world_size; rank++) { + threads.emplace_back([&, rank]() { + Json config{JsonObject()}; + config["xgboost_communicator"] = String("in-memory"); + config["in_memory_world_size"] = world_size; + config["in_memory_rank"] = rank; + xgboost::collective::Init(config); + + std::forward(function)(std::forward(args)...); + + xgboost::collective::Finalize(); + }); + } + for (auto& thread : threads) { + thread.join(); + } +} + } // namespace xgboost -#endif diff --git a/tests/cpp/predictor/test_cpu_predictor.cc b/tests/cpp/predictor/test_cpu_predictor.cc index f0c50fa94..af666432a 100644 --- a/tests/cpp/predictor/test_cpu_predictor.cc +++ b/tests/cpp/predictor/test_cpu_predictor.cc @@ -90,49 +90,42 @@ TEST(CpuPredictor, Basic) { } } -TEST(CpuPredictor, ColumnSplit) { +namespace { +void TestColumnSplitPredictBatch() { size_t constexpr kRows = 5; size_t constexpr kCols = 5; auto dmat = RandomDataGenerator(kRows, kCols, 0).GenerateDMatrix(); + auto const world_size = collective::GetWorldSize(); + auto const rank = collective::GetRank(); + auto const kSliceSize = (kCols + 1) / world_size; - std::vector threads; - std::int32_t constexpr kWorldSize = 2; - size_t constexpr kSliceSize = (kCols + 1) / kWorldSize; - for (auto rank = 0; rank < kWorldSize; rank++) { - threads.emplace_back([=, &dmat]() { - Json config{JsonObject()}; - config["xgboost_communicator"] = String("in-memory"); - config["in_memory_world_size"] = kWorldSize; - config["in_memory_rank"] = rank; - xgboost::collective::Init(config); + auto lparam = CreateEmptyGenericParam(GPUIDX); + std::unique_ptr cpu_predictor = + std::unique_ptr(Predictor::Create("cpu_predictor", &lparam)); - auto lparam = CreateEmptyGenericParam(GPUIDX); - std::unique_ptr cpu_predictor = - std::unique_ptr(Predictor::Create("cpu_predictor", &lparam)); + LearnerModelParam mparam{MakeMP(kCols, .0, 1)}; - LearnerModelParam mparam{MakeMP(kCols, .0, 1)}; + Context ctx; + ctx.UpdateAllowUnknown(Args{}); + gbm::GBTreeModel model = CreateTestModel(&mparam, &ctx); - Context ctx; - ctx.UpdateAllowUnknown(Args{}); - gbm::GBTreeModel model = CreateTestModel(&mparam, &ctx); + // Test predict batch + PredictionCacheEntry out_predictions; + cpu_predictor->InitOutPredictions(dmat->Info(), &out_predictions.predictions, model); + auto sliced = std::unique_ptr{dmat->SliceCol(rank * kSliceSize, kSliceSize)}; + cpu_predictor->PredictBatch(sliced.get(), &out_predictions, model, 0); - // Test predict batch - PredictionCacheEntry out_predictions; - cpu_predictor->InitOutPredictions(dmat->Info(), &out_predictions.predictions, model); - auto sliced = std::unique_ptr{dmat->SliceCol(rank * kSliceSize, kSliceSize)}; - cpu_predictor->PredictBatch(sliced.get(), &out_predictions, model, 0); - - std::vector& out_predictions_h = out_predictions.predictions.HostVector(); - for (size_t i = 0; i < out_predictions.predictions.Size(); i++) { - ASSERT_EQ(out_predictions_h[i], 1.5); - } - xgboost::collective::Finalize(); - }); - } - for (auto& thread : threads) { - thread.join(); + std::vector& out_predictions_h = out_predictions.predictions.HostVector(); + for (size_t i = 0; i < out_predictions.predictions.Size(); i++) { + ASSERT_EQ(out_predictions_h[i], 1.5); } } +} // anonymous namespace + +TEST(CpuPredictor, ColumnSplit) { + auto constexpr kWorldSize = 2; + RunWithInMemoryCommunicator(kWorldSize, TestColumnSplitPredictBatch); +} TEST(CpuPredictor, IterationRange) { TestIterationRange("cpu_predictor"); diff --git a/tests/pytest.ini b/tests/pytest.ini index 5a0d27a6c..fc0a40ff6 100644 --- a/tests/pytest.ini +++ b/tests/pytest.ini @@ -2,4 +2,3 @@ markers = mgpu: Mark a test that requires multiple GPUs to run. ci: Mark a test that runs only on CI. - gtest: Mark a test that requires C++ Google Test executable. diff --git a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py index ee00874ef..4e61d9023 100644 --- a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py +++ b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py @@ -486,49 +486,6 @@ class TestDistributedGPU: for rn, drn in zip(ranker_names, dranker_names): assert rn == drn - def run_quantile(self, name: str, local_cuda_client: Client) -> None: - exe = None - for possible_path in { - "./testxgboost", - "./build/testxgboost", - "../build/testxgboost", - "../gpu-build/testxgboost", - }: - if os.path.exists(possible_path): - exe = possible_path - assert exe, "No testxgboost executable found." - test = "--gtest_filter=GPUQuantile." + name - - def runit( - worker_addr: str, rabit_args: Dict[str, Union[int, str]] - ) -> subprocess.CompletedProcess: - # setup environment for running the c++ part. - env = os.environ.copy() - env['DMLC_TRACKER_PORT'] = str(rabit_args['DMLC_TRACKER_PORT']) - env["DMLC_TRACKER_URI"] = str(rabit_args["DMLC_TRACKER_URI"]) - return subprocess.run([str(exe), test], env=env, stdout=subprocess.PIPE) - - workers = tm.get_client_workers(local_cuda_client) - rabit_args = local_cuda_client.sync( - dxgb._get_rabit_args, len(workers), None, local_cuda_client - ) - futures = local_cuda_client.map( - runit, workers, pure=False, workers=workers, rabit_args=rabit_args - ) - results = local_cuda_client.gather(futures) - for ret in results: - msg = ret.stdout.decode("utf-8") - assert msg.find("1 test from GPUQuantile") != -1, msg - assert ret.returncode == 0, msg - - @pytest.mark.gtest - def test_quantile_basic(self, local_cuda_client: Client) -> None: - self.run_quantile("AllReduceBasic", local_cuda_client) - - @pytest.mark.gtest - def test_quantile_same_on_all_workers(self, local_cuda_client: Client) -> None: - self.run_quantile("SameOnAllWorkers", local_cuda_client) - @pytest.mark.skipif(**tm.no_cupy()) def test_with_asyncio(local_cuda_client: Client) -> None: diff --git a/tests/test_distributed/test_with_dask/test_with_dask.py b/tests/test_distributed/test_with_dask/test_with_dask.py index 244c6f1e2..03f3a3e46 100644 --- a/tests/test_distributed/test_with_dask/test_with_dask.py +++ b/tests/test_distributed/test_with_dask/test_with_dask.py @@ -1490,62 +1490,6 @@ class TestWithDask: num_rounds = 10 self.run_updater_test(client, params, num_rounds, dataset, 'approx') - def run_quantile(self, name: str) -> None: - exe: Optional[str] = None - for possible_path in {'./testxgboost', './build/testxgboost', - '../build/cpubuild/testxgboost', - '../cpu-build/testxgboost'}: - if os.path.exists(possible_path): - exe = possible_path - if exe is None: - return - - test = "--gtest_filter=Quantile." + name - - def runit( - worker_addr: str, rabit_args: Dict[str, Union[int, str]] - ) -> subprocess.CompletedProcess: - # setup environment for running the c++ part. - env = os.environ.copy() - env['DMLC_TRACKER_PORT'] = str(rabit_args['DMLC_TRACKER_PORT']) - env["DMLC_TRACKER_URI"] = str(rabit_args["DMLC_TRACKER_URI"]) - return subprocess.run([str(exe), test], env=env, capture_output=True) - - with LocalCluster(n_workers=4, dashboard_address=":0") as cluster: - with Client(cluster) as client: - workers = tm.get_client_workers(client) - rabit_args = client.sync( - xgb.dask._get_rabit_args, len(workers), None, client - ) - futures = client.map(runit, - workers, - pure=False, - workers=workers, - rabit_args=rabit_args) - results = client.gather(futures) - - for ret in results: - msg = ret.stdout.decode('utf-8') - assert msg.find('1 test from Quantile') != -1, msg - assert ret.returncode == 0, msg - - @pytest.mark.skipif(**tm.no_dask()) - @pytest.mark.gtest - def test_quantile_basic(self) -> None: - self.run_quantile('DistributedBasic') - self.run_quantile('SortedDistributedBasic') - - @pytest.mark.skipif(**tm.no_dask()) - @pytest.mark.gtest - def test_quantile(self) -> None: - self.run_quantile('Distributed') - self.run_quantile('SortedDistributed') - - @pytest.mark.skipif(**tm.no_dask()) - @pytest.mark.gtest - def test_quantile_same_on_all_workers(self) -> None: - self.run_quantile("SameOnAllWorkers") - def test_adaptive(self) -> None: def get_score(config: Dict) -> float: return float(config["learner"]["learner_model_param"]["base_score"])