[pyspark] support pred_contribs (#8633)

This commit is contained in:
Bobby Wang 2023-01-11 16:51:12 +08:00 committed by GitHub
parent cfa994d57f
commit 72ec0c5484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 287 additions and 67 deletions

View File

@ -45,6 +45,7 @@ from .data import (
_read_csr_matrix_from_unwrapped_spark_vec,
alias,
create_dmatrix_from_partitions,
pred_contribs,
stack_series,
)
from .model import (
@ -56,6 +57,7 @@ from .model import (
from .params import (
HasArbitraryParamsDict,
HasBaseMarginCol,
HasContribPredictionCol,
HasEnableSparseDataOptim,
HasFeaturesCols,
HasQueryIdCol,
@ -92,6 +94,7 @@ _pyspark_specific_params = [
"enable_sparse_data_optim",
"qid_col",
"repartition_random_shuffle",
"pred_contrib_col",
]
_non_booster_params = ["missing", "n_estimators", "feature_types", "feature_weights"]
@ -140,6 +143,12 @@ _unsupported_predict_params = {
"base_margin", # Use pyspark base_margin_col param instead.
}
# Global prediction names
Pred = namedtuple(
"Pred", ("prediction", "raw_prediction", "probability", "pred_contrib")
)
pred = Pred("prediction", "rawPrediction", "probability", "predContrib")
class _SparkXGBParams(
HasFeaturesCol,
@ -152,6 +161,7 @@ class _SparkXGBParams(
HasFeaturesCols,
HasEnableSparseDataOptim,
HasQueryIdCol,
HasContribPredictionCol,
):
num_workers = Param(
Params._dummy(),
@ -993,6 +1003,7 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable):
return features_col, feature_col_names
def _transform(self, dataset):
# pylint: disable=too-many-statements, too-many-locals
# Save xgb_sklearn_model and predict_params to be local variable
# to avoid the `self` object to be pickled to remote.
xgb_sklearn_model = self._xgb_sklearn_model
@ -1010,7 +1021,19 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable):
features_col, feature_col_names = self._get_feature_col(dataset)
enable_sparse_data_optim = self.getOrDefault(self.enable_sparse_data_optim)
@pandas_udf("double")
pred_contrib_col_name = None
if self.isDefined(self.pred_contrib_col) and self.getOrDefault(
self.pred_contrib_col
):
pred_contrib_col_name = self.getOrDefault(self.pred_contrib_col)
single_pred = True
schema = "double"
if pred_contrib_col_name:
single_pred = False
schema = f"{pred.prediction} double, {pred.pred_contrib} array<double>"
@pandas_udf(schema)
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
model = xgb_sklearn_model
for data in iterator:
@ -1027,22 +1050,48 @@ class _SparkXGBModel(Model, _SparkXGBParams, MLReadable, MLWritable):
else:
base_margin = None
data = {}
preds = model.predict(
X,
base_margin=base_margin,
validate_features=False,
**predict_params,
)
yield pd.Series(preds)
data[pred.prediction] = pd.Series(preds)
if pred_contrib_col_name:
contribs = pred_contribs(model, X, base_margin)
data[pred.pred_contrib] = pd.Series(list(contribs))
yield pd.DataFrame(data=data)
else:
yield data[pred.prediction]
if has_base_margin:
pred_col = predict_udf(struct(*features_col, base_margin_col))
else:
pred_col = predict_udf(struct(*features_col))
predictionColName = self.getOrDefault(self.predictionCol)
prediction_col_name = self.getOrDefault(self.predictionCol)
return dataset.withColumn(predictionColName, pred_col)
if single_pred:
dataset = dataset.withColumn(prediction_col_name, pred_col)
else:
pred_struct_col = "_prediction_struct"
dataset = dataset.withColumn(pred_struct_col, pred_col)
dataset = dataset.withColumn(
prediction_col_name, getattr(col(pred_struct_col), pred.prediction)
)
if pred_contrib_col_name:
dataset = dataset.withColumn(
pred_contrib_col_name,
array_to_vector(getattr(col(pred_struct_col), pred.pred_contrib)),
)
dataset = dataset.drop(pred_struct_col)
return dataset
class SparkXGBRegressorModel(_SparkXGBModel):
@ -1069,7 +1118,9 @@ class SparkXGBRankerModel(_SparkXGBModel):
return XGBRanker
class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictionCol):
class SparkXGBClassifierModel(
_SparkXGBModel, HasProbabilityCol, HasRawPredictionCol, HasContribPredictionCol
):
"""
The model returned by :func:`xgboost.spark.SparkXGBClassifier.fit`
@ -1081,7 +1132,7 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio
return XGBClassifier
def _transform(self, dataset):
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements, too-many-locals
# Save xgb_sklearn_model and predict_params to be local variable
# to avoid the `self` object to be pickled to remote.
xgb_sklearn_model = self._xgb_sklearn_model
@ -1112,9 +1163,22 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio
features_col, feature_col_names = self._get_feature_col(dataset)
enable_sparse_data_optim = self.getOrDefault(self.enable_sparse_data_optim)
@pandas_udf(
"rawPrediction array<double>, prediction double, probability array<double>"
pred_contrib_col_name = None
if self.isDefined(self.pred_contrib_col) and self.getOrDefault(
self.pred_contrib_col
):
pred_contrib_col_name = self.getOrDefault(self.pred_contrib_col)
schema = (
f"{pred.raw_prediction} array<double>, {pred.prediction} double,"
f" {pred.probability} array<double>"
)
if pred_contrib_col_name:
# We will force setting strict_shape to True when predicting contribs,
# So, it will also output 3-D shape result.
schema = f"{schema}, {pred.pred_contrib} array<array<double>>"
@pandas_udf(schema)
def predict_udf(
iterator: Iterator[Tuple[pd.Series, ...]]
) -> Iterator[pd.DataFrame]:
@ -1145,13 +1209,17 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio
# It seems that they use argmax of class probs,
# not of margin to get the prediction (Note: scala implementation)
preds = np.argmax(class_probs, axis=1)
yield pd.DataFrame(
data={
"rawPrediction": pd.Series(list(raw_preds)),
"prediction": pd.Series(preds),
"probability": pd.Series(list(class_probs)),
}
)
data = {
pred.raw_prediction: pd.Series(list(raw_preds)),
pred.prediction: pd.Series(preds),
pred.probability: pd.Series(list(class_probs)),
}
if pred_contrib_col_name:
contribs = pred_contribs(model, X, base_margin, strict_shape=True)
data[pred.pred_contrib] = pd.Series(list(contribs.tolist()))
yield pd.DataFrame(data=data)
if has_base_margin:
pred_struct = predict_udf(struct(*features_col, base_margin_col))
@ -1159,23 +1227,32 @@ class SparkXGBClassifierModel(_SparkXGBModel, HasProbabilityCol, HasRawPredictio
pred_struct = predict_udf(struct(*features_col))
pred_struct_col = "_prediction_struct"
rawPredictionColName = self.getOrDefault(self.rawPredictionCol)
predictionColName = self.getOrDefault(self.predictionCol)
probabilityColName = self.getOrDefault(self.probabilityCol)
dataset = dataset.withColumn(pred_struct_col, pred_struct)
if rawPredictionColName:
raw_prediction_col_name = self.getOrDefault(self.rawPredictionCol)
if raw_prediction_col_name:
dataset = dataset.withColumn(
rawPredictionColName,
array_to_vector(col(pred_struct_col).rawPrediction),
raw_prediction_col_name,
array_to_vector(getattr(col(pred_struct_col), pred.raw_prediction)),
)
if predictionColName:
prediction_col_name = self.getOrDefault(self.predictionCol)
if prediction_col_name:
dataset = dataset.withColumn(
predictionColName, col(pred_struct_col).prediction
prediction_col_name, getattr(col(pred_struct_col), pred.prediction)
)
if probabilityColName:
probability_col_name = self.getOrDefault(self.probabilityCol)
if probability_col_name:
dataset = dataset.withColumn(
probabilityColName, array_to_vector(col(pred_struct_col).probability)
probability_col_name,
array_to_vector(getattr(col(pred_struct_col), pred.probability)),
)
if pred_contrib_col_name:
dataset = dataset.withColumn(
pred_contrib_col_name,
getattr(col(pred_struct_col), pred.pred_contrib),
)
return dataset.drop(pred_struct_col)

View File

@ -1,3 +1,4 @@
# pylint: disable=protected-access
"""Utilities for processing spark partitions."""
from collections import defaultdict, namedtuple
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
@ -7,8 +8,10 @@ import pandas as pd
from scipy.sparse import csr_matrix
from xgboost.compat import concat
from xgboost import DataIter, DMatrix, QuantileDMatrix
from xgboost import DataIter, DMatrix, QuantileDMatrix, XGBModel
from .._typing import ArrayLike
from ..core import _convert_ntree_limit
from .utils import get_logger # type: ignore
@ -331,3 +334,29 @@ def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
assert dvalid.num_col() == dtrain.num_col()
return dtrain, dvalid
def pred_contribs(
model: XGBModel,
data: ArrayLike,
base_margin: Optional[ArrayLike] = None,
strict_shape: bool = False,
) -> np.ndarray:
"""Predict contributions with data with the full model."""
iteration_range = _convert_ntree_limit(model.get_booster(), None, None)
iteration_range = model._get_iteration_range(iteration_range)
data_dmatrix = DMatrix(
data,
base_margin=base_margin,
missing=model.missing,
nthread=model.n_jobs,
feature_types=model.feature_types,
enable_categorical=model.enable_categorical,
)
return model.get_booster().predict(
data_dmatrix,
pred_contribs=True,
validate_features=False,
iteration_range=iteration_range,
strict_shape=strict_shape,
)

View File

@ -85,3 +85,19 @@ class HasQueryIdCol(Params):
"query id column name",
typeConverter=TypeConverters.toString,
)
class HasContribPredictionCol(Params):
"""
Mixin for param pred_contrib_col: contribution prediction column name.
Output is a 3-dim array, with (rows, groups, columns + 1) for classification case.
Else, it can be a 2 dimension for regression case.
"""
pred_contrib_col: "Param[str]" = Param(
Params._dummy(),
"pred_contrib_col",
"feature contributions to individual predictions.",
typeConverter=TypeConverters.toString,
)

View File

@ -8,6 +8,7 @@ from typing import Generator, Sequence, Type
import numpy as np
import pytest
from xgboost.spark.data import pred_contribs
import xgboost as xgb
from xgboost import testing as tm
@ -158,6 +159,103 @@ def reg_with_weight(
)
RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test"))
@pytest.fixture
def reg_data(spark: SparkSession) -> Generator[RegData, None, None]:
X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]])
y = np.array([0, 1])
reg1 = xgb.XGBRegressor()
reg1.fit(X, y)
predt0 = reg1.predict(X)
pred_contrib0: np.ndarray = pred_contribs(reg1, X, None, False)
# convert np array to pyspark dataframe
reg_df_train_data = [
(Vectors.dense(X[0, :]), int(y[0])),
(Vectors.sparse(3, {1: float(X[1, 1]), 2: float(X[1, 2])}), int(y[1])),
]
reg_df_train = spark.createDataFrame(reg_df_train_data, ["features", "label"])
reg_df_test = spark.createDataFrame(
[
(
Vectors.dense(X[0, :]),
float(predt0[0]),
pred_contrib0[0, :].tolist(),
),
(
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
float(predt0[1]),
pred_contrib0[1, :].tolist(),
),
],
[
"features",
"expected_prediction",
"expected_pred_contribs",
],
)
yield RegData(reg_df_train, reg_df_test)
MultiClfData = namedtuple("MultiClfData", ("multi_clf_df_train", "multi_clf_df_test"))
@pytest.fixture
def multi_clf_data(spark: SparkSession) -> Generator[MultiClfData, None, None]:
X = np.array([[1.0, 2.0, 3.0], [1.0, 2.0, 4.0], [0.0, 1.0, 5.5], [-1.0, -2.0, 1.0]])
y = np.array([0, 0, 1, 2])
cls1 = xgb.XGBClassifier()
cls1.fit(X, y)
predt0 = cls1.predict(X)
proba0: np.ndarray = cls1.predict_proba(X)
pred_contrib0: np.ndarray = pred_contribs(cls1, X, None, False)
# convert np array to pyspark dataframe
multi_cls_df_train_data = [
(Vectors.dense(X[0, :]), int(y[0])),
(Vectors.dense(X[1, :]), int(y[1])),
(Vectors.sparse(3, {1: float(X[2, 1]), 2: float(X[2, 2])}), int(y[2])),
(Vectors.dense(X[3, :]), int(y[3])),
]
multi_clf_df_train = spark.createDataFrame(
multi_cls_df_train_data, ["features", "label"]
)
multi_clf_df_test = spark.createDataFrame(
[
(
Vectors.dense(X[0, :]),
float(predt0[0]),
proba0[0, :].tolist(),
pred_contrib0[0, :].tolist(),
),
(
Vectors.dense(X[1, :]),
float(predt0[1]),
proba0[1, :].tolist(),
pred_contrib0[1, :].tolist(),
),
(
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
float(predt0[2]),
proba0[2, :].tolist(),
pred_contrib0[2, :].tolist(),
),
],
[
"features",
"expected_prediction",
"expected_probability",
"expected_pred_contribs",
],
)
yield MultiClfData(multi_clf_df_train, multi_clf_df_test)
ClfWithWeight = namedtuple(
"ClfWithWeight",
(
@ -264,10 +362,12 @@ def clf_data(spark: SparkSession) -> Generator[ClfData, None, None]:
cl1.fit(X, y)
predt0 = cl1.predict(X)
proba0: np.ndarray = cl1.predict_proba(X)
cl2 = xgb.XGBClassifier(max_depth=5, n_estimators=10, scale_pos_weight=4)
pred_contrib0: np.ndarray = pred_contribs(cl1, X, None, True)
cl2 = xgb.XGBClassifier(**cls_params)
cl2.fit(X, y)
predt1 = cl2.predict(X)
proba1: np.ndarray = cl2.predict_proba(X)
pred_contrib1: np.ndarray = pred_contribs(cl2, X, None, True)
# convert np array to pyspark dataframe
cls_df_train_data = [
@ -286,23 +386,29 @@ def clf_data(spark: SparkSession) -> Generator[ClfData, None, None]:
Vectors.dense(X[0, :]),
int(predt0[0]),
proba0[0, :].tolist(),
pred_contrib0[0, :].tolist(),
int(predt1[0]),
proba1[0, :].tolist(),
pred_contrib1[0, :].tolist(),
),
(
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
int(predt0[1]),
proba0[1, :].tolist(),
pred_contrib0[1, :].tolist(),
int(predt1[1]),
proba1[1, :].tolist(),
pred_contrib1[1, :].tolist(),
),
],
[
"features",
"expected_prediction",
"expected_probability",
"expected_pred_contribs",
"expected_prediction_with_params",
"expected_probability_with_params",
"expected_pred_contribs_with_params",
],
)
yield ClfData(cls_params, cls_df_train, cls_df_train_large, cls_df_test)
@ -331,6 +437,16 @@ def get_params_map(params_kv: dict, estimator: Type) -> dict:
class TestPySparkLocal:
def test_regressor_basic(self, reg_data: RegData) -> None:
regressor = SparkXGBRegressor(pred_contrib_col="pred_contribs")
model = regressor.fit(reg_data.reg_df_train)
pred_result = model.transform(reg_data.reg_df_test).collect()
for row in pred_result:
np.testing.assert_equal(row.prediction, row.expected_prediction)
np.testing.assert_allclose(
row.pred_contribs, row.expected_pred_contribs, atol=1e-3
)
def test_regressor_with_weight_eval(self, reg_with_weight: RegWithWeight) -> None:
# with weight
regressor_with_weight = SparkXGBRegressor(weight_col="weight")
@ -385,6 +501,19 @@ class TestPySparkLocal:
atol=1e-3,
)
def test_multi_classifier_basic(self, multi_clf_data: MultiClfData) -> None:
cls = SparkXGBClassifier(pred_contrib_col="pred_contribs")
model = cls.fit(multi_clf_data.multi_clf_df_train)
pred_result = model.transform(multi_clf_data.multi_clf_df_test).collect()
for row in pred_result:
np.testing.assert_equal(row.prediction, row.expected_prediction)
np.testing.assert_allclose(
row.probability, row.expected_probability, rtol=1e-3
)
np.testing.assert_allclose(
row.pred_contribs, row.expected_pred_contribs, atol=1e-3
)
def test_classifier_with_weight_eval(self, clf_with_weight: ClfWithWeight) -> None:
# with weight
classifier_with_weight = SparkXGBClassifier(weight_col="weight")
@ -459,13 +588,18 @@ class TestPySparkLocal:
assert_model_compatible(model, tmpdir)
def test_classifier_basic(self, clf_data: ClfData) -> None:
classifier = SparkXGBClassifier()
classifier = SparkXGBClassifier(
**clf_data.cls_params, pred_contrib_col="pred_contrib"
)
model = classifier.fit(clf_data.cls_df_train)
pred_result = model.transform(clf_data.cls_df_test).collect()
for row in pred_result:
np.testing.assert_equal(row.prediction, row.expected_prediction)
np.testing.assert_equal(row.prediction, row.expected_prediction_with_params)
np.testing.assert_allclose(
row.probability, row.expected_probability, rtol=1e-3
row.probability, row.expected_probability_with_params, rtol=1e-3
)
np.testing.assert_equal(
row.pred_contrib, row.expected_pred_contribs_with_params
)
def test_classifier_with_params(self, clf_data: ClfData) -> None:
@ -648,24 +782,6 @@ class XgboostLocalTest(SparkTestCase):
# >>> cl.fit(X, y)
# >>> cl.predict_proba(np.array([[1.0, 2.0, 3.0]]))
# array([[0.5374299 , 0.23128504, 0.23128504]], dtype=float32)
multi_cls_df_train_data = [
(Vectors.dense(1.0, 2.0, 3.0), 0),
(Vectors.dense(1.0, 2.0, 4.0), 0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1),
(Vectors.dense(-1.0, -2.0, 1.0), 2),
]
self.multi_cls_df_train = self.session.createDataFrame(
multi_cls_df_train_data, ["features", "label"]
)
self.multi_cls_df_train_large = self.session.createDataFrame(
multi_cls_df_train_data * 100, ["features", "label"]
)
self.multi_cls_df_test = self.session.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), [0.5374, 0.2312, 0.2312]),
],
["features", "expected_probability"],
)
# Test classifier with both base margin and without
# >>> import numpy as np
@ -872,24 +988,6 @@ class XgboostLocalTest(SparkTestCase):
== "float64"
)
def test_regressor_basic(self):
regressor = SparkXGBRegressor()
model = regressor.fit(self.reg_df_train)
pred_result = model.transform(self.reg_df_test).collect()
for row in pred_result:
self.assertTrue(
np.isclose(row.prediction, row.expected_prediction, atol=1e-3)
)
def test_multi_classifier(self):
classifier = SparkXGBClassifier()
model = classifier.fit(self.multi_cls_df_train)
pred_result = model.transform(self.multi_cls_df_test).collect()
for row in pred_result:
self.assertTrue(
np.allclose(row.probability, row.expected_probability, rtol=1e-3)
)
def test_regressor_with_params(self):
regressor = SparkXGBRegressor(**self.reg_params)
all_params = dict(