From ea0deeca685d5b4f7084693a172d22d9b448b9a6 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Sat, 10 Jun 2023 02:31:34 +0800 Subject: [PATCH] Disable dense optimization in hist for distributed training. (#9272) --- python-package/xgboost/testing/dask.py | 21 +++++++++++++++++++ src/tree/updater_gpu_hist.cu | 12 +++++------ src/tree/updater_quantile_hist.cc | 2 +- .../test_gpu_with_dask/test_gpu_with_dask.py | 8 ++++++- .../test_with_dask/test_with_dask.py | 11 ++++++++-- 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/python-package/xgboost/testing/dask.py b/python-package/xgboost/testing/dask.py index 8b39ba122..a939170b4 100644 --- a/python-package/xgboost/testing/dask.py +++ b/python-package/xgboost/testing/dask.py @@ -1,6 +1,8 @@ """Tests for dask shared by different test modules.""" import numpy as np +import pandas as pd from dask import array as da +from dask import dataframe as dd from distributed import Client import xgboost as xgb @@ -52,3 +54,22 @@ def check_init_estimation(tree_method: str, client: Client) -> None: """Test init estimation.""" check_init_estimation_reg(tree_method, client) check_init_estimation_clf(tree_method, client) + + +def check_uneven_nan(client: Client, tree_method: str, n_workers: int) -> None: + """Issue #9271, not every worker has missing value.""" + assert n_workers >= 2 + + with client.as_current(): + clf = xgb.dask.DaskXGBClassifier(tree_method=tree_method) + X = pd.DataFrame({"a": range(10000), "b": range(10000, 0, -1)}) + y = pd.Series([*[0] * 5000, *[1] * 5000]) + + X["a"][:3000:1000] = np.NaN + + client.wait_for_workers(n_workers=n_workers) + + clf.fit( + dd.from_pandas(X, npartitions=n_workers), + dd.from_pandas(y, npartitions=n_workers), + ) diff --git a/src/tree/updater_gpu_hist.cu b/src/tree/updater_gpu_hist.cu index c14e19db1..d1c1c8290 100644 --- a/src/tree/updater_gpu_hist.cu +++ b/src/tree/updater_gpu_hist.cu @@ -285,7 +285,7 @@ struct GPUHistMakerDevice { matrix.feature_segments, matrix.gidx_fvalue_map, matrix.min_fvalue, - matrix.is_dense + matrix.is_dense && !collective::IsDistributed() }; auto split = this->evaluator_.EvaluateSingleSplit(inputs, shared_inputs); return split; @@ -299,11 +299,11 @@ struct GPUHistMakerDevice { std::vector nidx(2 * candidates.size()); auto h_node_inputs = pinned2.GetSpan(2 * candidates.size()); auto matrix = page->GetDeviceAccessor(ctx_->gpu_id); - EvaluateSplitSharedInputs shared_inputs{ - GPUTrainingParam{param}, *quantiser, feature_types, matrix.feature_segments, - matrix.gidx_fvalue_map, matrix.min_fvalue, - matrix.is_dense - }; + EvaluateSplitSharedInputs shared_inputs{GPUTrainingParam{param}, *quantiser, feature_types, + matrix.feature_segments, matrix.gidx_fvalue_map, + matrix.min_fvalue, + // is_dense represents the local data + matrix.is_dense && !collective::IsDistributed()}; dh::TemporaryArray entries(2 * candidates.size()); // Store the feature set ptrs so they dont go out of scope before the kernel is called std::vector>> feature_sets; diff --git a/src/tree/updater_quantile_hist.cc b/src/tree/updater_quantile_hist.cc index ae1c4b468..22b715dc7 100644 --- a/src/tree/updater_quantile_hist.cc +++ b/src/tree/updater_quantile_hist.cc @@ -435,7 +435,7 @@ class HistBuilder { { GradientPairPrecise grad_stat; - if (p_fmat->IsDense()) { + if (p_fmat->IsDense() && !collective::IsDistributed()) { /** * Specialized code for dense data: For dense data (with no missing value), the sum * of gradient histogram is equal to snode[nid] diff --git a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py index 503169d2c..047093700 100644 --- a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py +++ b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py @@ -44,7 +44,7 @@ try: from dask_cuda import LocalCUDACluster from xgboost import dask as dxgb - from xgboost.testing.dask import check_init_estimation + from xgboost.testing.dask import check_init_estimation, check_uneven_nan except ImportError: pass @@ -224,6 +224,12 @@ class TestDistributedGPU: def test_init_estimation(self, local_cuda_client: Client) -> None: check_init_estimation("gpu_hist", local_cuda_client) + def test_uneven_nan(self) -> None: + n_workers = 2 + with LocalCUDACluster(n_workers=n_workers) as cluster: + with Client(cluster) as client: + check_uneven_nan(client, "gpu_hist", n_workers) + @pytest.mark.skipif(**tm.no_dask_cudf()) def test_dask_dataframe(self, local_cuda_client: Client) -> None: run_with_dask_dataframe(dxgb.DaskDMatrix, local_cuda_client) diff --git a/tests/test_distributed/test_with_dask/test_with_dask.py b/tests/test_distributed/test_with_dask/test_with_dask.py index c119d3940..d6075481f 100644 --- a/tests/test_distributed/test_with_dask/test_with_dask.py +++ b/tests/test_distributed/test_with_dask/test_with_dask.py @@ -4,7 +4,6 @@ import json import os import pickle import socket -import subprocess import tempfile from concurrent.futures import ThreadPoolExecutor from functools import partial @@ -41,7 +40,7 @@ from distributed import Client, LocalCluster from toolz import sliding_window # dependency of dask from xgboost.dask import DaskDMatrix -from xgboost.testing.dask import check_init_estimation +from xgboost.testing.dask import check_init_estimation, check_uneven_nan dask.config.set({"distributed.scheduler.allowed-failures": False}) @@ -2014,6 +2013,14 @@ def test_init_estimation(client: Client) -> None: check_init_estimation("hist", client) +@pytest.mark.parametrize("tree_method", ["hist", "approx"]) +def test_uneven_nan(tree_method) -> None: + n_workers = 2 + with LocalCluster(n_workers=n_workers) as cluster: + with Client(cluster) as client: + check_uneven_nan(client, tree_method, n_workers) + + class TestDaskCallbacks: @pytest.mark.skipif(**tm.no_sklearn()) def test_early_stopping(self, client: "Client") -> None: