[pyspark] Cleanup data processing. (#8344)

* Enable additional combinations of ctor parameters.
* Unify procedures for QuantileDMatrix and DMatrix.
This commit is contained in:
Jiaming Yuan 2022-10-18 14:56:23 +08:00 committed by GitHub
parent 521086d56b
commit 3901f5d9db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 55 deletions

View File

@ -83,10 +83,11 @@ generate result dataset with 3 new columns:
XGBoost PySpark GPU support XGBoost PySpark GPU support
*************************** ***************************
XGBoost PySpark supports GPU training and prediction. To enable GPU support, you first need XGBoost PySpark supports GPU training and prediction. To enable GPU support, first you
to install the xgboost and cudf packages. Then you can set `use_gpu` parameter to `True`. need to install the XGBoost and the `cuDF <https://docs.rapids.ai/api/cudf/stable/>`_
package. Then you can set `use_gpu` parameter to `True`.
Below tutorial will show you how to train a model with XGBoost PySpark GPU on Spark Below tutorial demonstrates how to train a model with XGBoost PySpark GPU on Spark
standalone cluster. standalone cluster.
@ -138,7 +139,7 @@ in PySpark. Please refer to
conda create -y -n xgboost-env -c conda-forge conda-pack python=3.9 conda create -y -n xgboost-env -c conda-forge conda-pack python=3.9
conda activate xgboost-env conda activate xgboost-env
pip install xgboost pip install xgboost
pip install cudf conda install cudf -c rapids -c nvidia -c conda-forge
conda pack -f -o xgboost-env.tar.gz conda pack -f -o xgboost-env.tar.gz
@ -220,3 +221,6 @@ Below is a simple example submit command for enabling GPU acceleration:
--conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \ --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
--archives xgboost-env.tar.gz#environment \ --archives xgboost-env.tar.gz#environment \
xgboost_app.py xgboost_app.py
When rapids plugin is enabled, both of the JVM rapids plugin and the cuDF Python are
required for the acceleration.

View File

@ -747,7 +747,7 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
k: v for k, v in train_call_kwargs_params.items() if v is not None k: v for k, v in train_call_kwargs_params.items() if v is not None
} }
dmatrix_kwargs = {k: v for k, v in dmatrix_kwargs.items() if v is not None} dmatrix_kwargs = {k: v for k, v in dmatrix_kwargs.items() if v is not None}
use_qdm = booster_params.get("tree_method") in ("hist", "gpu_hist") use_qdm = booster_params.get("tree_method", None) in ("hist", "gpu_hist")
def _train_booster(pandas_df_iter): def _train_booster(pandas_df_iter):
"""Takes in an RDD partition and outputs a booster for that partition after """Takes in an RDD partition and outputs a booster for that partition after

View File

@ -208,14 +208,27 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
def append_m(part: pd.DataFrame, name: str, is_valid: bool) -> None: def append_m(part: pd.DataFrame, name: str, is_valid: bool) -> None:
nonlocal n_features nonlocal n_features
if name in part.columns and part[name].shape[0] > 0: if name == alias.data or name in part.columns:
array = part[name] if (
if name == alias.data: name == alias.data
and feature_cols is not None
and part[feature_cols].shape[0] > 0 # guard against empty partition
):
array: Optional[np.ndarray] = part[feature_cols]
elif part[name].shape[0] > 0:
array = part[name]
array = stack_series(array) array = stack_series(array)
else:
array = None
if name == alias.data and array is not None:
if n_features == 0: if n_features == 0:
n_features = array.shape[1] n_features = array.shape[1]
assert n_features == array.shape[1] assert n_features == array.shape[1]
if array is None:
return
if is_valid: if is_valid:
valid_data[name].append(array) valid_data[name].append(array)
else: else:
@ -238,26 +251,6 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
else: else:
train_data[name].append(array) train_data[name].append(array)
def append_qdm(part: pd.DataFrame, name: str, is_valid: bool) -> None:
"""Preprocessing for QuantileDMatrix."""
nonlocal n_features
if name == alias.data or name in part.columns:
if name == alias.data and feature_cols is not None:
array = part[feature_cols]
else:
array = part[name]
array = stack_series(array)
if name == alias.data:
if n_features == 0:
n_features = array.shape[1]
assert n_features == array.shape[1]
if is_valid:
valid_data[name].append(array)
else:
train_data[name].append(array)
def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix: def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix:
if len(values) == 0: if len(values) == 0:
get_logger("XGBoostPySpark").warning( get_logger("XGBoostPySpark").warning(
@ -305,13 +298,14 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
meta, params = split_params() meta, params = split_params()
if feature_cols is not None: # rapidsai plugin if feature_cols is not None and use_qdm:
assert gpu_id is not None cache_partitions(iterator, append_fn)
assert use_qdm is True
cache_partitions(iterator, append_qdm)
dtrain: DMatrix = make_qdm(train_data, gpu_id, meta, None, params) dtrain: DMatrix = make_qdm(train_data, gpu_id, meta, None, params)
elif use_qdm: elif feature_cols is not None and not use_qdm:
cache_partitions(iterator, append_qdm) cache_partitions(iterator, append_fn)
dtrain = make(train_data, kwargs)
elif feature_cols is None and use_qdm:
cache_partitions(iterator, append_fn)
dtrain = make_qdm(train_data, gpu_id, meta, None, params) dtrain = make_qdm(train_data, gpu_id, meta, None, params)
else: else:
cache_partitions(iterator, append_fn) cache_partitions(iterator, append_fn)

View File

@ -19,7 +19,9 @@ from test_spark.test_data import run_dmatrix_ctor
@pytest.mark.skipif(**tm.no_cudf()) @pytest.mark.skipif(**tm.no_cudf())
def test_qdm_ctor() -> None: @pytest.mark.parametrize(
run_dmatrix_ctor(is_dqm=True, on_gpu=True) "is_feature_cols,is_qdm",
with pytest.raises(AssertionError): [(True, True), (True, False), (False, True), (False, False)],
run_dmatrix_ctor(is_dqm=False, on_gpu=True) )
def test_dmatrix_ctor(is_feature_cols: bool, is_qdm: bool) -> None:
run_dmatrix_ctor(is_feature_cols, is_qdm, on_gpu=True)

View File

@ -18,6 +18,8 @@ from xgboost.spark.data import (
stack_series, stack_series,
) )
from xgboost import DMatrix, QuantileDMatrix
def test_stack() -> None: def test_stack() -> None:
a = pd.DataFrame({"a": [[1, 2], [3, 4]]}) a = pd.DataFrame({"a": [[1, 2], [3, 4]]})
@ -37,7 +39,7 @@ def test_stack() -> None:
assert b.shape == (2, 1) assert b.shape == (2, 1)
def run_dmatrix_ctor(is_dqm: bool, on_gpu: bool) -> None: def run_dmatrix_ctor(is_feature_cols: bool, is_qdm: bool, on_gpu: bool) -> None:
rng = np.random.default_rng(0) rng = np.random.default_rng(0)
dfs: List[pd.DataFrame] = [] dfs: List[pd.DataFrame] = []
n_features = 16 n_features = 16
@ -57,7 +59,7 @@ def run_dmatrix_ctor(is_dqm: bool, on_gpu: bool) -> None:
df = pd.DataFrame( df = pd.DataFrame(
{alias.label: y, alias.margin: m, alias.weight: w, alias.valid: valid} {alias.label: y, alias.margin: m, alias.weight: w, alias.valid: valid}
) )
if on_gpu: if is_feature_cols:
for j in range(X.shape[1]): for j in range(X.shape[1]):
df[f"feat-{j}"] = pd.Series(X[:, j]) df[f"feat-{j}"] = pd.Series(X[:, j])
else: else:
@ -65,19 +67,27 @@ def run_dmatrix_ctor(is_dqm: bool, on_gpu: bool) -> None:
dfs.append(df) dfs.append(df)
kwargs = {"feature_types": feature_types} kwargs = {"feature_types": feature_types}
if on_gpu: device_id = 0 if on_gpu else None
cols = [f"feat-{i}" for i in range(n_features)] cols = [f"feat-{i}" for i in range(n_features)]
train_Xy, valid_Xy = create_dmatrix_from_partitions( feature_cols = cols if is_feature_cols else None
iter(dfs), cols, 0, is_dqm, kwargs, False, True train_Xy, valid_Xy = create_dmatrix_from_partitions(
) iter(dfs),
elif is_dqm: feature_cols,
train_Xy, valid_Xy = create_dmatrix_from_partitions( gpu_id=device_id,
iter(dfs), None, None, True, kwargs, False, True use_qdm=is_qdm,
) kwargs=kwargs,
enable_sparse_data_optim=False,
has_validation_col=True,
)
if is_qdm:
assert isinstance(train_Xy, QuantileDMatrix)
assert isinstance(valid_Xy, QuantileDMatrix)
else: else:
train_Xy, valid_Xy = create_dmatrix_from_partitions( assert not isinstance(train_Xy, QuantileDMatrix)
iter(dfs), None, None, False, kwargs, False, True assert isinstance(train_Xy, DMatrix)
) assert not isinstance(valid_Xy, QuantileDMatrix)
assert isinstance(valid_Xy, DMatrix)
assert valid_Xy is not None assert valid_Xy is not None
assert valid_Xy.num_row() + train_Xy.num_row() == n_samples_per_batch * n_batches assert valid_Xy.num_row() + train_Xy.num_row() == n_samples_per_batch * n_batches
@ -109,9 +119,12 @@ def run_dmatrix_ctor(is_dqm: bool, on_gpu: bool) -> None:
np.testing.assert_equal(valid_Xy.feature_types, feature_types) np.testing.assert_equal(valid_Xy.feature_types, feature_types)
def test_dmatrix_ctor() -> None: @pytest.mark.parametrize(
run_dmatrix_ctor(is_dqm=False, on_gpu=False) "is_feature_cols,is_qdm",
run_dmatrix_ctor(is_dqm=True, on_gpu=False) [(True, True), (True, False), (False, True), (False, False)],
)
def test_dmatrix_ctor(is_feature_cols: bool, is_qdm: bool) -> None:
run_dmatrix_ctor(is_feature_cols, is_qdm, on_gpu=False)
def test_read_csr_matrix_from_unwrapped_spark_vec() -> None: def test_read_csr_matrix_from_unwrapped_spark_vec() -> None: