diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index e5f8a68ab..b9be4b39b 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -45,6 +45,7 @@ from .data import ( _read_csr_matrix_from_unwrapped_spark_vec, alias, create_dmatrix_from_partitions, + pred_contribs, stack_series, ) from .model import ( @@ -56,6 +57,7 @@ from .model import ( from .params import ( HasArbitraryParamsDict, HasBaseMarginCol, + HasContribPredictionCol, HasEnableSparseDataOptim, HasFeaturesCols, HasQueryIdCol, @@ -92,6 +94,7 @@ _pyspark_specific_params = [ "enable_sparse_data_optim", "qid_col", "repartition_random_shuffle", + "pred_contrib_col", ] _non_booster_params = ["missing", "n_estimators", "feature_types", "feature_weights"] @@ -140,6 +143,12 @@ _unsupported_predict_params = { "base_margin", # Use pyspark base_margin_col param instead. } +# Global prediction names +Pred = namedtuple( + "Pred", ("prediction", "raw_prediction", "probability", "pred_contrib") +) +pred = Pred("prediction", "rawPrediction", "probability", "predContrib") + class _SparkXGBParams( HasFeaturesCol, @@ -152,6 +161,7 @@ class _SparkXGBParams( HasFeaturesCols, HasEnableSparseDataOptim, HasQueryIdCol, + HasContribPredictionCol, ): num_workers = Param( Params._dummy(), @@ -993,6 +1003,7 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable): return features_col, feature_col_names def _transform(self, dataset): + # pylint: disable=too-many-statements, too-many-locals # Save xgb_sklearn_model and predict_params to be local variable # to avoid the `self` object to be pickled to remote. xgb_sklearn_model = self._xgb_sklearn_model @@ -1010,7 +1021,19 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable): features_col, feature_col_names = self._get_feature_col(dataset) enable_sparse_data_optim = self.getOrDefault(self.enable_sparse_data_optim) - @pandas_udf("double") + pred_contrib_col_name = None + if self.isDefined(self.pred_contrib_col) and self.getOrDefault( + self.pred_contrib_col + ): + pred_contrib_col_name = self.getOrDefault(self.pred_contrib_col) + + single_pred = True + schema = "double" + if pred_contrib_col_name: + single_pred = False + schema = f"{pred.prediction} double, {pred.pred_contrib} array" + + @pandas_udf(schema) def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]: model = xgb_sklearn_model for data in iterator: @@ -1027,22 +1050,48 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable): else: base_margin = None + data = {} preds = model.predict( X, base_margin=base_margin, validate_features=False, **predict_params, ) - yield pd.Series(preds) + data[pred.prediction] = pd.Series(preds) + + if pred_contrib_col_name: + contribs = pred_contribs(model, X, base_margin) + data[pred.pred_contrib] = pd.Series(list(contribs)) + yield pd.DataFrame(data=data) + else: + yield data[pred.prediction] if has_base_margin: pred_col = predict_udf(struct(*features_col, base_margin_col)) else: pred_col = predict_udf(struct(*features_col)) - predictionColName = self.getOrDefault(self.predictionCol) + prediction_col_name = self.getOrDefault(self.predictionCol) - return dataset.withColumn(predictionColName, pred_col) + if single_pred: + dataset = dataset.withColumn(prediction_col_name, pred_col) + else: + pred_struct_col = "_prediction_struct" + dataset = dataset.withColumn(pred_struct_col, pred_col) + + dataset = dataset.withColumn( + prediction_col_name, getattr(col(pred_struct_col), pred.prediction) + ) + + if pred_contrib_col_name: + dataset = dataset.withColumn( + pred_contrib_col_name, + array_to_vector(getattr(col(pred_struct_col), pred.pred_contrib)), + ) + + dataset = dataset.drop(pred_struct_col) + + return dataset class SparkXGBRegressorModel(_SparkXGBModel): @@ -1069,7 +1118,9 @@ class SparkXGBRankerModel(_SparkXGBModel): return XGBRanker -class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictionCol): +class SparkXGBClassifierModel( + _SparkXGBModel, HasProbabilityCol, HasRawPredictionCol, HasContribPredictionCol +): """ The model returned by :func:`xgboost.spark.SparkXGBClassifier.fit` @@ -1081,7 +1132,7 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio return XGBClassifier def _transform(self, dataset): - # pylint: disable=too-many-locals + # pylint: disable=too-many-statements, too-many-locals # Save xgb_sklearn_model and predict_params to be local variable # to avoid the `self` object to be pickled to remote. xgb_sklearn_model = self._xgb_sklearn_model @@ -1112,9 +1163,22 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio features_col, feature_col_names = self._get_feature_col(dataset) enable_sparse_data_optim = self.getOrDefault(self.enable_sparse_data_optim) - @pandas_udf( - "rawPrediction array, prediction double, probability array" + pred_contrib_col_name = None + if self.isDefined(self.pred_contrib_col) and self.getOrDefault( + self.pred_contrib_col + ): + pred_contrib_col_name = self.getOrDefault(self.pred_contrib_col) + + schema = ( + f"{pred.raw_prediction} array, {pred.prediction} double," + f" {pred.probability} array" ) + if pred_contrib_col_name: + # We will force setting strict_shape to True when predicting contribs, + # So, it will also output 3-D shape result. + schema = f"{schema}, {pred.pred_contrib} array>" + + @pandas_udf(schema) def predict_udf( iterator: Iterator[Tuple[pd.Series, ...]] ) -> Iterator[pd.DataFrame]: @@ -1145,13 +1209,17 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio # It seems that they use argmax of class probs, # not of margin to get the prediction (Note: scala implementation) preds = np.argmax(class_probs, axis=1) - yield pd.DataFrame( - data={ - "rawPrediction": pd.Series(list(raw_preds)), - "prediction": pd.Series(preds), - "probability": pd.Series(list(class_probs)), - } - ) + data = { + pred.raw_prediction: pd.Series(list(raw_preds)), + pred.prediction: pd.Series(preds), + pred.probability: pd.Series(list(class_probs)), + } + + if pred_contrib_col_name: + contribs = pred_contribs(model, X, base_margin, strict_shape=True) + data[pred.pred_contrib] = pd.Series(list(contribs.tolist())) + + yield pd.DataFrame(data=data) if has_base_margin: pred_struct = predict_udf(struct(*features_col, base_margin_col)) @@ -1159,23 +1227,32 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio pred_struct = predict_udf(struct(*features_col)) pred_struct_col = "_prediction_struct" - - rawPredictionColName = self.getOrDefault(self.rawPredictionCol) - predictionColName = self.getOrDefault(self.predictionCol) - probabilityColName = self.getOrDefault(self.probabilityCol) dataset = dataset.withColumn(pred_struct_col, pred_struct) - if rawPredictionColName: + + raw_prediction_col_name = self.getOrDefault(self.rawPredictionCol) + if raw_prediction_col_name: dataset = dataset.withColumn( - rawPredictionColName, - array_to_vector(col(pred_struct_col).rawPrediction), + raw_prediction_col_name, + array_to_vector(getattr(col(pred_struct_col), pred.raw_prediction)), ) - if predictionColName: + + prediction_col_name = self.getOrDefault(self.predictionCol) + if prediction_col_name: dataset = dataset.withColumn( - predictionColName, col(pred_struct_col).prediction + prediction_col_name, getattr(col(pred_struct_col), pred.prediction) ) - if probabilityColName: + + probability_col_name = self.getOrDefault(self.probabilityCol) + if probability_col_name: dataset = dataset.withColumn( - probabilityColName, array_to_vector(col(pred_struct_col).probability) + probability_col_name, + array_to_vector(getattr(col(pred_struct_col), pred.probability)), + ) + + if pred_contrib_col_name: + dataset = dataset.withColumn( + pred_contrib_col_name, + getattr(col(pred_struct_col), pred.pred_contrib), ) return dataset.drop(pred_struct_col) diff --git a/python-package/xgboost/spark/data.py b/python-package/xgboost/spark/data.py index b2cf3e654..e5a0eac94 100644 --- a/python-package/xgboost/spark/data.py +++ b/python-package/xgboost/spark/data.py @@ -1,3 +1,4 @@ +# pylint: disable=protected-access """Utilities for processing spark partitions.""" from collections import defaultdict, namedtuple from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union @@ -7,8 +8,10 @@ import pandas as pd from scipy.sparse import csr_matrix from xgboost.compat import concat -from xgboost import DataIter, DMatrix, QuantileDMatrix +from xgboost import DataIter, DMatrix, QuantileDMatrix, XGBModel +from .._typing import ArrayLike +from ..core import _convert_ntree_limit from .utils import get_logger # type: ignore @@ -331,3 +334,29 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments assert dvalid.num_col() == dtrain.num_col() return dtrain, dvalid + + +def pred_contribs( + model: XGBModel, + data: ArrayLike, + base_margin: Optional[ArrayLike] = None, + strict_shape: bool = False, +) -> np.ndarray: + """Predict contributions with data with the full model.""" + iteration_range = _convert_ntree_limit(model.get_booster(), None, None) + iteration_range = model._get_iteration_range(iteration_range) + data_dmatrix = DMatrix( + data, + base_margin=base_margin, + missing=model.missing, + nthread=model.n_jobs, + feature_types=model.feature_types, + enable_categorical=model.enable_categorical, + ) + return model.get_booster().predict( + data_dmatrix, + pred_contribs=True, + validate_features=False, + iteration_range=iteration_range, + strict_shape=strict_shape, + ) diff --git a/python-package/xgboost/spark/params.py b/python-package/xgboost/spark/params.py index 77cfcd137..78a35eee0 100644 --- a/python-package/xgboost/spark/params.py +++ b/python-package/xgboost/spark/params.py @@ -85,3 +85,19 @@ class HasQueryIdCol(Params): "query id column name", typeConverter=TypeConverters.toString, ) + + +class HasContribPredictionCol(Params): + """ + Mixin for param pred_contrib_col: contribution prediction column name. + + Output is a 3-dim array, with (rows, groups, columns + 1) for classification case. + Else, it can be a 2 dimension for regression case. + """ + + pred_contrib_col: "Param[str]" = Param( + Params._dummy(), + "pred_contrib_col", + "feature contributions to individual predictions.", + typeConverter=TypeConverters.toString, + ) diff --git a/tests/test_distributed/test_with_spark/test_spark_local.py b/tests/test_distributed/test_with_spark/test_spark_local.py index fa7bdd94f..ca8c672d9 100644 --- a/tests/test_distributed/test_with_spark/test_spark_local.py +++ b/tests/test_distributed/test_with_spark/test_spark_local.py @@ -8,6 +8,7 @@ from typing import Generator, Sequence, Type import numpy as np import pytest +from xgboost.spark.data import pred_contribs import xgboost as xgb from xgboost import testing as tm @@ -158,6 +159,103 @@ def reg_with_weight( ) +RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test")) + + +@pytest.fixture +def reg_data(spark: SparkSession) -> Generator[RegData, None, None]: + X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]]) + y = np.array([0, 1]) + reg1 = xgb.XGBRegressor() + reg1.fit(X, y) + predt0 = reg1.predict(X) + pred_contrib0: np.ndarray = pred_contribs(reg1, X, None, False) + + # convert np array to pyspark dataframe + reg_df_train_data = [ + (Vectors.dense(X[0, :]), int(y[0])), + (Vectors.sparse(3, {1: float(X[1, 1]), 2: float(X[1, 2])}), int(y[1])), + ] + reg_df_train = spark.createDataFrame(reg_df_train_data, ["features", "label"]) + + reg_df_test = spark.createDataFrame( + [ + ( + Vectors.dense(X[0, :]), + float(predt0[0]), + pred_contrib0[0, :].tolist(), + ), + ( + Vectors.sparse(3, {1: 1.0, 2: 5.5}), + float(predt0[1]), + pred_contrib0[1, :].tolist(), + ), + ], + [ + "features", + "expected_prediction", + "expected_pred_contribs", + ], + ) + yield RegData(reg_df_train, reg_df_test) + + +MultiClfData = namedtuple("MultiClfData", ("multi_clf_df_train", "multi_clf_df_test")) + + +@pytest.fixture +def multi_clf_data(spark: SparkSession) -> Generator[MultiClfData, None, None]: + + X = np.array([[1.0, 2.0, 3.0], [1.0, 2.0, 4.0], [0.0, 1.0, 5.5], [-1.0, -2.0, 1.0]]) + y = np.array([0, 0, 1, 2]) + cls1 = xgb.XGBClassifier() + cls1.fit(X, y) + predt0 = cls1.predict(X) + proba0: np.ndarray = cls1.predict_proba(X) + pred_contrib0: np.ndarray = pred_contribs(cls1, X, None, False) + + # convert np array to pyspark dataframe + multi_cls_df_train_data = [ + (Vectors.dense(X[0, :]), int(y[0])), + (Vectors.dense(X[1, :]), int(y[1])), + (Vectors.sparse(3, {1: float(X[2, 1]), 2: float(X[2, 2])}), int(y[2])), + (Vectors.dense(X[3, :]), int(y[3])), + ] + multi_clf_df_train = spark.createDataFrame( + multi_cls_df_train_data, ["features", "label"] + ) + + multi_clf_df_test = spark.createDataFrame( + [ + ( + Vectors.dense(X[0, :]), + float(predt0[0]), + proba0[0, :].tolist(), + pred_contrib0[0, :].tolist(), + ), + ( + Vectors.dense(X[1, :]), + float(predt0[1]), + proba0[1, :].tolist(), + pred_contrib0[1, :].tolist(), + ), + ( + Vectors.sparse(3, {1: 1.0, 2: 5.5}), + float(predt0[2]), + proba0[2, :].tolist(), + pred_contrib0[2, :].tolist(), + ), + ], + [ + "features", + "expected_prediction", + "expected_probability", + "expected_pred_contribs", + ], + ) + yield MultiClfData(multi_clf_df_train, multi_clf_df_test) + + ClfWithWeight = namedtuple( "ClfWithWeight", ( @@ -264,10 +362,12 @@ def clf_data(spark: SparkSession) -> Generator[ClfData, None, None]: cl1.fit(X, y) predt0 = cl1.predict(X) proba0: np.ndarray = cl1.predict_proba(X) - cl2 = xgb.XGBClassifier(max_depth=5, n_estimators=10, scale_pos_weight=4) + pred_contrib0: np.ndarray = pred_contribs(cl1, X, None, True) + cl2 = xgb.XGBClassifier(**cls_params) cl2.fit(X, y) predt1 = cl2.predict(X) proba1: np.ndarray = cl2.predict_proba(X) + pred_contrib1: np.ndarray = pred_contribs(cl2, X, None, True) # convert np array to pyspark dataframe cls_df_train_data = [ @@ -286,23 +386,29 @@ def clf_data(spark: SparkSession) -> Generator[ClfData, None, None]: Vectors.dense(X[0, :]), int(predt0[0]), proba0[0, :].tolist(), + pred_contrib0[0, :].tolist(), int(predt1[0]), proba1[0, :].tolist(), + pred_contrib1[0, :].tolist(), ), ( Vectors.sparse(3, {1: 1.0, 2: 5.5}), int(predt0[1]), proba0[1, :].tolist(), + pred_contrib0[1, :].tolist(), int(predt1[1]), proba1[1, :].tolist(), + pred_contrib1[1, :].tolist(), ), ], [ "features", "expected_prediction", "expected_probability", + "expected_pred_contribs", "expected_prediction_with_params", "expected_probability_with_params", + "expected_pred_contribs_with_params", ], ) yield ClfData(cls_params, cls_df_train, cls_df_train_large, cls_df_test) @@ -331,6 +437,16 @@ def get_params_map(params_kv: dict, estimator: Type) -> dict: class TestPySparkLocal: + def test_regressor_basic(self, reg_data: RegData) -> None: + regressor = SparkXGBRegressor(pred_contrib_col="pred_contribs") + model = regressor.fit(reg_data.reg_df_train) + pred_result = model.transform(reg_data.reg_df_test).collect() + for row in pred_result: + np.testing.assert_equal(row.prediction, row.expected_prediction) + np.testing.assert_allclose( + row.pred_contribs, row.expected_pred_contribs, atol=1e-3 + ) + def test_regressor_with_weight_eval(self, reg_with_weight: RegWithWeight) -> None: # with weight regressor_with_weight = SparkXGBRegressor(weight_col="weight") @@ -385,6 +501,19 @@ class TestPySparkLocal: atol=1e-3, ) + def test_multi_classifier_basic(self, multi_clf_data: MultiClfData) -> None: + cls = SparkXGBClassifier(pred_contrib_col="pred_contribs") + model = cls.fit(multi_clf_data.multi_clf_df_train) + pred_result = model.transform(multi_clf_data.multi_clf_df_test).collect() + for row in pred_result: + np.testing.assert_equal(row.prediction, row.expected_prediction) + np.testing.assert_allclose( + row.probability, row.expected_probability, rtol=1e-3 + ) + np.testing.assert_allclose( + row.pred_contribs, row.expected_pred_contribs, atol=1e-3 + ) + def test_classifier_with_weight_eval(self, clf_with_weight: ClfWithWeight) -> None: # with weight classifier_with_weight = SparkXGBClassifier(weight_col="weight") @@ -459,13 +588,18 @@ class TestPySparkLocal: assert_model_compatible(model, tmpdir) def test_classifier_basic(self, clf_data: ClfData) -> None: - classifier = SparkXGBClassifier() + classifier = SparkXGBClassifier( + **clf_data.cls_params, pred_contrib_col="pred_contrib" + ) model = classifier.fit(clf_data.cls_df_train) pred_result = model.transform(clf_data.cls_df_test).collect() for row in pred_result: - np.testing.assert_equal(row.prediction, row.expected_prediction) + np.testing.assert_equal(row.prediction, row.expected_prediction_with_params) np.testing.assert_allclose( - row.probability, row.expected_probability, rtol=1e-3 + row.probability, row.expected_probability_with_params, rtol=1e-3 + ) + np.testing.assert_equal( + row.pred_contrib, row.expected_pred_contribs_with_params ) def test_classifier_with_params(self, clf_data: ClfData) -> None: @@ -648,24 +782,6 @@ class XgboostLocalTest(SparkTestCase): # >>> cl.fit(X, y) # >>> cl.predict_proba(np.array([[1.0, 2.0, 3.0]])) # array([[0.5374299 , 0.23128504, 0.23128504]], dtype=float32) - multi_cls_df_train_data = [ - (Vectors.dense(1.0, 2.0, 3.0), 0), - (Vectors.dense(1.0, 2.0, 4.0), 0), - (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1), - (Vectors.dense(-1.0, -2.0, 1.0), 2), - ] - self.multi_cls_df_train = self.session.createDataFrame( - multi_cls_df_train_data, ["features", "label"] - ) - self.multi_cls_df_train_large = self.session.createDataFrame( - multi_cls_df_train_data * 100, ["features", "label"] - ) - self.multi_cls_df_test = self.session.createDataFrame( - [ - (Vectors.dense(1.0, 2.0, 3.0), [0.5374, 0.2312, 0.2312]), - ], - ["features", "expected_probability"], - ) # Test classifier with both base margin and without # >>> import numpy as np @@ -872,24 +988,6 @@ class XgboostLocalTest(SparkTestCase): == "float64" ) - def test_regressor_basic(self): - regressor = SparkXGBRegressor() - model = regressor.fit(self.reg_df_train) - pred_result = model.transform(self.reg_df_test).collect() - for row in pred_result: - self.assertTrue( - np.isclose(row.prediction, row.expected_prediction, atol=1e-3) - ) - - def test_multi_classifier(self): - classifier = SparkXGBClassifier() - model = classifier.fit(self.multi_cls_df_train) - pred_result = model.transform(self.multi_cls_df_test).collect() - for row in pred_result: - self.assertTrue( - np.allclose(row.probability, row.expected_probability, rtol=1e-3) - ) - def test_regressor_with_params(self): regressor = SparkXGBRegressor(**self.reg_params) all_params = dict(