Refactor PySpark tests. (#8605)

- Convert classifier tests to pytest tests.
- Replace hardcoded tests.
This commit is contained in:
Jiaming Yuan 2023-01-04 17:05:16 +08:00 committed by GitHub
parent fa44a33ee6
commit d308124910
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,9 +1,10 @@
import glob
import logging
import random
import tempfile
import uuid
from collections import namedtuple
from typing import Generator
from typing import Generator, Sequence, Type
import numpy as np
import pytest
@ -248,6 +249,87 @@ def clf_with_weight(
)
ClfData = namedtuple(
"ClfData", ("cls_params", "cls_df_train", "cls_df_train_large", "cls_df_test")
)
@pytest.fixture
def clf_data(spark: SparkSession) -> Generator[ClfData, None, None]:
cls_params = {"max_depth": 5, "n_estimators": 10, "scale_pos_weight": 4}
X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]])
y = np.array([0, 1])
cl1 = xgb.XGBClassifier()
cl1.fit(X, y)
predt0 = cl1.predict(X)
proba0: np.ndarray = cl1.predict_proba(X)
cl2 = xgb.XGBClassifier(max_depth=5, n_estimators=10, scale_pos_weight=4)
cl2.fit(X, y)
predt1 = cl2.predict(X)
proba1: np.ndarray = cl2.predict_proba(X)
# convert np array to pyspark dataframe
cls_df_train_data = [
(Vectors.dense(X[0, :]), int(y[0])),
(Vectors.sparse(3, {1: float(X[1, 1]), 2: float(X[1, 2])}), int(y[1])),
]
cls_df_train = spark.createDataFrame(cls_df_train_data, ["features", "label"])
cls_df_train_large = spark.createDataFrame(
cls_df_train_data * 100, ["features", "label"]
)
cls_df_test = spark.createDataFrame(
[
(
Vectors.dense(X[0, :]),
int(predt0[0]),
proba0[0, :].tolist(),
int(predt1[0]),
proba1[0, :].tolist(),
),
(
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
int(predt0[1]),
proba0[1, :].tolist(),
int(predt1[1]),
proba1[1, :].tolist(),
),
],
[
"features",
"expected_prediction",
"expected_probability",
"expected_prediction_with_params",
"expected_probability_with_params",
],
)
yield ClfData(cls_params, cls_df_train, cls_df_train_large, cls_df_test)
def assert_model_compatible(model: XGBModel, model_path: str) -> None:
bst = xgb.Booster()
path = glob.glob(f"{model_path}/**/model/part-00000", recursive=True)[0]
bst.load_model(path)
np.testing.assert_equal(
np.array(model.get_booster().save_raw("json")), np.array(bst.save_raw("json"))
)
def check_sub_dict_match(
sub_dist: dict, whole_dict: dict, excluding_keys: Sequence[str]
) -> None:
for k in sub_dist:
if k not in excluding_keys:
assert k in whole_dict, f"check on {k} failed"
assert sub_dist[k] == whole_dict[k], f"check on {k} failed"
def get_params_map(params_kv: dict, estimator: Type) -> dict:
return {getattr(estimator, k): v for k, v in params_kv.items()}
class TestPySparkLocal:
def test_regressor_with_weight_eval(self, reg_with_weight: RegWithWeight) -> None:
# with weight
@ -350,10 +432,161 @@ class TestPySparkLocal:
)
for row in pred_result_with_weight_eval:
np.testing.assert_allclose( # failed
np.testing.assert_allclose(
row.probability, row.expected_prob_with_weight_and_eval, atol=1e-3
)
def test_classifier_model_save_load(self, clf_data: ClfData) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
path = "file:" + tmpdir
clf = SparkXGBClassifier(**clf_data.cls_params)
model = clf.fit(clf_data.cls_df_train)
model.save(path)
loaded_model = SparkXGBClassifierModel.load(path)
assert model.uid == loaded_model.uid
for k, v in clf_data.cls_params.items():
assert loaded_model.getOrDefault(k) == v
pred_result = loaded_model.transform(clf_data.cls_df_test).collect()
for row in pred_result:
np.testing.assert_allclose(
row.probability, row.expected_probability_with_params, atol=1e-3
)
with pytest.raises(AssertionError, match="Expected class name"):
SparkXGBRegressorModel.load(path)
assert_model_compatible(model, tmpdir)
def test_classifier_basic(self, clf_data: ClfData) -> None:
classifier = SparkXGBClassifier()
model = classifier.fit(clf_data.cls_df_train)
pred_result = model.transform(clf_data.cls_df_test).collect()
for row in pred_result:
np.testing.assert_equal(row.prediction, row.expected_prediction)
np.testing.assert_allclose(
row.probability, row.expected_probability, rtol=1e-3
)
def test_classifier_with_params(self, clf_data: ClfData) -> None:
classifier = SparkXGBClassifier(**clf_data.cls_params)
all_params = dict(
**(classifier._gen_xgb_params_dict()),
**(classifier._gen_fit_params_dict()),
**(classifier._gen_predict_params_dict()),
)
check_sub_dict_match(
clf_data.cls_params, all_params, excluding_keys=_non_booster_params
)
model = classifier.fit(clf_data.cls_df_train)
all_params = dict(
**(model._gen_xgb_params_dict()),
**(model._gen_fit_params_dict()),
**(model._gen_predict_params_dict()),
)
check_sub_dict_match(
clf_data.cls_params, all_params, excluding_keys=_non_booster_params
)
pred_result = model.transform(clf_data.cls_df_test).collect()
for row in pred_result:
np.testing.assert_equal(row.prediction, row.expected_prediction_with_params)
np.testing.assert_allclose(
row.probability, row.expected_probability_with_params, rtol=1e-3
)
def test_classifier_model_pipeline_save_load(self, clf_data: ClfData) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
path = "file:" + tmpdir
classifier = SparkXGBClassifier()
pipeline = Pipeline(stages=[classifier])
pipeline = pipeline.copy(
extra=get_params_map(clf_data.cls_params, classifier)
)
model = pipeline.fit(clf_data.cls_df_train)
model.save(path)
loaded_model = PipelineModel.load(path)
for k, v in clf_data.cls_params.items():
assert loaded_model.stages[0].getOrDefault(k) == v
pred_result = loaded_model.transform(clf_data.cls_df_test).collect()
for row in pred_result:
np.testing.assert_allclose(
row.probability, row.expected_probability_with_params, atol=1e-3
)
assert_model_compatible(model.stages[0], tmpdir)
def test_classifier_with_cross_validator(self, clf_data: ClfData) -> None:
xgb_classifer = SparkXGBClassifier(n_estimators=1)
paramMaps = ParamGridBuilder().addGrid(xgb_classifer.max_depth, [1, 2]).build()
cvBin = CrossValidator(
estimator=xgb_classifer,
estimatorParamMaps=paramMaps,
evaluator=BinaryClassificationEvaluator(),
seed=1,
parallelism=4,
numFolds=2,
)
cvBinModel = cvBin.fit(clf_data.cls_df_train_large)
cvBinModel.transform(clf_data.cls_df_test)
def test_convert_to_sklearn_model_clf(self, clf_data: ClfData) -> None:
classifier = SparkXGBClassifier(
n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5
)
clf_model = classifier.fit(clf_data.cls_df_train)
# Check that regardless of what booster, _convert_to_model converts to the
# correct class type
sklearn_classifier = classifier._convert_to_sklearn_model(
clf_model.get_booster().save_raw("json"),
clf_model.get_booster().save_config(),
)
assert isinstance(sklearn_classifier, XGBClassifier)
assert sklearn_classifier.n_estimators == 200
assert sklearn_classifier.missing == 2.0
assert sklearn_classifier.max_depth == 3
assert sklearn_classifier.get_params()["sketch_eps"] == 0.5
def test_classifier_array_col_as_feature(self, clf_data: ClfData) -> None:
train_dataset = clf_data.cls_df_train.withColumn(
"features", vector_to_array(spark_sql_func.col("features"))
)
test_dataset = clf_data.cls_df_test.withColumn(
"features", vector_to_array(spark_sql_func.col("features"))
)
classifier = SparkXGBClassifier()
model = classifier.fit(train_dataset)
pred_result = model.transform(test_dataset).collect()
for row in pred_result:
np.testing.assert_equal(row.prediction, row.expected_prediction)
np.testing.assert_allclose(
row.probability, row.expected_probability, rtol=1e-3
)
def test_classifier_with_feature_names_types_weights(
self, clf_data: ClfData
) -> None:
classifier = SparkXGBClassifier(
feature_names=["a1", "a2", "a3"],
feature_types=["i", "int", "float"],
feature_weights=[2.0, 5.0, 3.0],
)
model = classifier.fit(clf_data.cls_df_train)
model.transform(clf_data.cls_df_test).collect()
def test_early_stop_param_validation(self, clf_data: ClfData) -> None:
classifier = SparkXGBClassifier(early_stopping_rounds=1)
with pytest.raises(ValueError, match="early_stopping_rounds"):
classifier.fit(clf_data.cls_df_train)
def test_gpu_param_setting(self, clf_data: ClfData) -> None:
py_cls = SparkXGBClassifier(use_gpu=True)
train_params = py_cls._get_distributed_train_params(clf_data.cls_df_train)
assert train_params["tree_method"] == "gpu_hist"
class XgboostLocalTest(SparkTestCase):
def setUp(self):
@ -406,60 +639,6 @@ class XgboostLocalTest(SparkTestCase):
],
)
# >>> X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]])
# >>> y = np.array([0, 1])
# >>> cl1 = xgboost.XGBClassifier()
# >>> cl1.fit(X, y)
# >>> cl1.predict(X)
# array([0, 0])
# >>> cl1.predict_proba(X)
# array([[0.5, 0.5],
# [0.5, 0.5]], dtype=float32)
# >>> cl2 = xgboost.XGBClassifier(max_depth=5, n_estimators=10, scale_pos_weight=4)
# >>> cl2.fit(X, y)
# >>> cl2.predict(X)
# array([1, 1])
# >>> cl2.predict_proba(X)
# array([[0.27574146, 0.72425854 ],
# [0.27574146, 0.72425854 ]], dtype=float32)
self.cls_params = {"max_depth": 5, "n_estimators": 10, "scale_pos_weight": 4}
cls_df_train_data = [
(Vectors.dense(1.0, 2.0, 3.0), 0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1),
]
self.cls_df_train = self.session.createDataFrame(
cls_df_train_data, ["features", "label"]
)
self.cls_df_train_large = self.session.createDataFrame(
cls_df_train_data * 100, ["features", "label"]
)
self.cls_df_test = self.session.createDataFrame(
[
(
Vectors.dense(1.0, 2.0, 3.0),
0,
[0.5, 0.5],
1,
[0.27574146, 0.72425854],
),
(
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
0,
[0.5, 0.5],
1,
[0.27574146, 0.72425854],
),
],
[
"features",
"expected_prediction",
"expected_probability",
"expected_prediction_with_params",
"expected_probability_with_params",
],
)
# kwargs test (using the above data, train, we get the same results)
self.cls_params_kwargs = {"tree_method": "approx", "sketch_eps": 0.03}
@ -610,6 +789,22 @@ class XgboostLocalTest(SparkTestCase):
bst.load_model(path)
self.assertEqual(model.get_booster().save_raw("json"), bst.save_raw("json"))
def test_convert_to_sklearn_model_reg(self) -> None:
regressor = SparkXGBRegressor(
n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5
)
reg_model = regressor.fit(self.reg_df_train)
sklearn_regressor = regressor._convert_to_sklearn_model(
reg_model.get_booster().save_raw("json"),
reg_model.get_booster().save_config(),
)
assert isinstance(sklearn_regressor, XGBRegressor)
assert sklearn_regressor.n_estimators == 200
assert sklearn_regressor.missing == 2.0
assert sklearn_regressor.max_depth == 3
assert sklearn_regressor.get_params()["sketch_eps"] == 0.5
def test_regressor_params_basic(self):
py_reg = SparkXGBRegressor()
self.assertTrue(hasattr(py_reg, "n_estimators"))
@ -665,11 +860,6 @@ class XgboostLocalTest(SparkTestCase):
):
SparkXGBClassifier(featuresCol="f1")
def test_gpu_param_setting(self):
py_cls = SparkXGBClassifier(use_gpu=True)
train_params = py_cls._get_distributed_train_params(self.cls_df_train)
assert train_params["tree_method"] == "gpu_hist"
@staticmethod
def test_param_value_converter():
py_cls = SparkXGBClassifier(missing=np.float64(1.0), sketch_eps=np.float64(0.3))
@ -691,16 +881,6 @@ class XgboostLocalTest(SparkTestCase):
np.isclose(row.prediction, row.expected_prediction, atol=1e-3)
)
def test_classifier_basic(self):
classifier = SparkXGBClassifier()
model = classifier.fit(self.cls_df_train)
pred_result = model.transform(self.cls_df_test).collect()
for row in pred_result:
self.assertEqual(row.prediction, row.expected_prediction)
self.assertTrue(
np.allclose(row.probability, row.expected_probability, rtol=1e-3)
)
def test_multi_classifier(self):
classifier = SparkXGBClassifier()
model = classifier.fit(self.multi_cls_df_train)
@ -710,12 +890,6 @@ class XgboostLocalTest(SparkTestCase):
np.allclose(row.probability, row.expected_probability, rtol=1e-3)
)
def _check_sub_dict_match(self, sub_dist, whole_dict, excluding_keys):
for k in sub_dist:
if k not in excluding_keys:
self.assertTrue(k in whole_dict, f"check on {k} failed")
self.assertEqual(sub_dist[k], whole_dict[k], f"check on {k} failed")
def test_regressor_with_params(self):
regressor = SparkXGBRegressor(**self.reg_params)
all_params = dict(
@ -723,7 +897,7 @@ class XgboostLocalTest(SparkTestCase):
**(regressor._gen_fit_params_dict()),
**(regressor._gen_predict_params_dict()),
)
self._check_sub_dict_match(
check_sub_dict_match(
self.reg_params, all_params, excluding_keys=_non_booster_params
)
@ -733,7 +907,7 @@ class XgboostLocalTest(SparkTestCase):
**(model._gen_fit_params_dict()),
**(model._gen_predict_params_dict()),
)
self._check_sub_dict_match(
check_sub_dict_match(
self.reg_params, all_params, excluding_keys=_non_booster_params
)
pred_result = model.transform(self.reg_df_test).collect()
@ -744,35 +918,6 @@ class XgboostLocalTest(SparkTestCase):
)
)
def test_classifier_with_params(self):
classifier = SparkXGBClassifier(**self.cls_params)
all_params = dict(
**(classifier._gen_xgb_params_dict()),
**(classifier._gen_fit_params_dict()),
**(classifier._gen_predict_params_dict()),
)
self._check_sub_dict_match(
self.cls_params, all_params, excluding_keys=_non_booster_params
)
model = classifier.fit(self.cls_df_train)
all_params = dict(
**(model._gen_xgb_params_dict()),
**(model._gen_fit_params_dict()),
**(model._gen_predict_params_dict()),
)
self._check_sub_dict_match(
self.cls_params, all_params, excluding_keys=_non_booster_params
)
pred_result = model.transform(self.cls_df_test).collect()
for row in pred_result:
self.assertEqual(row.prediction, row.expected_prediction_with_params)
self.assertTrue(
np.allclose(
row.probability, row.expected_probability_with_params, rtol=1e-3
)
)
def test_regressor_model_save_load(self):
tmp_dir = self.get_local_tmp_dir()
path = "file:" + tmp_dir
@ -797,40 +942,12 @@ class XgboostLocalTest(SparkTestCase):
self.assert_model_compatible(model, tmp_dir)
def test_classifier_model_save_load(self):
tmp_dir = self.get_local_tmp_dir()
path = "file:" + tmp_dir
regressor = SparkXGBClassifier(**self.cls_params)
model = regressor.fit(self.cls_df_train)
model.save(path)
loaded_model = SparkXGBClassifierModel.load(path)
self.assertEqual(model.uid, loaded_model.uid)
for k, v in self.cls_params.items():
self.assertEqual(loaded_model.getOrDefault(k), v)
pred_result = loaded_model.transform(self.cls_df_test).collect()
for row in pred_result:
self.assertTrue(
np.allclose(
row.probability, row.expected_probability_with_params, atol=1e-3
)
)
with self.assertRaisesRegex(AssertionError, "Expected class name"):
SparkXGBRegressorModel.load(path)
self.assert_model_compatible(model, tmp_dir)
@staticmethod
def _get_params_map(params_kv, estimator):
return {getattr(estimator, k): v for k, v in params_kv.items()}
def test_regressor_model_pipeline_save_load(self):
tmp_dir = self.get_local_tmp_dir()
path = "file:" + tmp_dir
regressor = SparkXGBRegressor()
pipeline = Pipeline(stages=[regressor])
pipeline = pipeline.copy(extra=self._get_params_map(self.reg_params, regressor))
pipeline = pipeline.copy(extra=get_params_map(self.reg_params, regressor))
model = pipeline.fit(self.reg_df_train)
model.save(path)
@ -847,44 +964,6 @@ class XgboostLocalTest(SparkTestCase):
)
self.assert_model_compatible(model.stages[0], tmp_dir)
def test_classifier_model_pipeline_save_load(self):
tmp_dir = self.get_local_tmp_dir()
path = "file:" + tmp_dir
classifier = SparkXGBClassifier()
pipeline = Pipeline(stages=[classifier])
pipeline = pipeline.copy(
extra=self._get_params_map(self.cls_params, classifier)
)
model = pipeline.fit(self.cls_df_train)
model.save(path)
loaded_model = PipelineModel.load(path)
for k, v in self.cls_params.items():
self.assertEqual(loaded_model.stages[0].getOrDefault(k), v)
pred_result = loaded_model.transform(self.cls_df_test).collect()
for row in pred_result:
self.assertTrue(
np.allclose(
row.probability, row.expected_probability_with_params, atol=1e-3
)
)
self.assert_model_compatible(model.stages[0], tmp_dir)
def test_classifier_with_cross_validator(self):
xgb_classifer = SparkXGBClassifier(n_estimators=1)
paramMaps = ParamGridBuilder().addGrid(xgb_classifer.max_depth, [1, 2]).build()
cvBin = CrossValidator(
estimator=xgb_classifer,
estimatorParamMaps=paramMaps,
evaluator=BinaryClassificationEvaluator(),
seed=1,
parallelism=4,
numFolds=2,
)
cvBinModel = cvBin.fit(self.cls_df_train_large)
cvBinModel.transform(self.cls_df_test)
def test_callbacks(self):
from xgboost.callback import LearningRateScheduler
@ -1003,38 +1082,6 @@ class XgboostLocalTest(SparkTestCase):
classifier = SparkXGBClassifier(use_gpu=True, tree_method="gpu_hist")
classifier = SparkXGBClassifier(use_gpu=True)
def test_convert_to_sklearn_model(self):
classifier = SparkXGBClassifier(
n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5
)
clf_model = classifier.fit(self.cls_df_train)
regressor = SparkXGBRegressor(
n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5
)
reg_model = regressor.fit(self.reg_df_train)
# Check that regardless of what booster, _convert_to_model converts to the correct class type
sklearn_classifier = classifier._convert_to_sklearn_model(
clf_model.get_booster().save_raw("json"),
clf_model.get_booster().save_config(),
)
assert isinstance(sklearn_classifier, XGBClassifier)
assert sklearn_classifier.n_estimators == 200
assert sklearn_classifier.missing == 2.0
assert sklearn_classifier.max_depth == 3
assert sklearn_classifier.get_params()["sketch_eps"] == 0.5
sklearn_regressor = regressor._convert_to_sklearn_model(
reg_model.get_booster().save_raw("json"),
reg_model.get_booster().save_config(),
)
assert isinstance(sklearn_regressor, XGBRegressor)
assert sklearn_regressor.n_estimators == 200
assert sklearn_regressor.missing == 2.0
assert sklearn_regressor.max_depth == 3
assert sklearn_classifier.get_params()["sketch_eps"] == 0.5
def test_feature_importances(self):
reg1 = SparkXGBRegressor(**self.reg_params)
model = reg1.fit(self.reg_df_train)
@ -1060,32 +1107,6 @@ class XgboostLocalTest(SparkTestCase):
np.isclose(row.prediction, row.expected_prediction, atol=1e-3)
)
def test_classifier_array_col_as_feature(self):
train_dataset = self.cls_df_train.withColumn(
"features", vector_to_array(spark_sql_func.col("features"))
)
test_dataset = self.cls_df_test.withColumn(
"features", vector_to_array(spark_sql_func.col("features"))
)
classifier = SparkXGBClassifier()
model = classifier.fit(train_dataset)
pred_result = model.transform(test_dataset).collect()
for row in pred_result:
self.assertEqual(row.prediction, row.expected_prediction)
self.assertTrue(
np.allclose(row.probability, row.expected_probability, rtol=1e-3)
)
def test_classifier_with_feature_names_types_weights(self):
classifier = SparkXGBClassifier(
feature_names=["a1", "a2", "a3"],
feature_types=["i", "int", "float"],
feature_weights=[2.0, 5.0, 3.0],
)
model = classifier.fit(self.cls_df_train)
model.transform(self.cls_df_test).collect()
def test_regressor_with_sparse_optim(self):
regressor = SparkXGBRegressor(missing=0.0)
model = regressor.fit(self.reg_df_sparse_train)
@ -1192,11 +1213,6 @@ class XgboostLocalTest(SparkTestCase):
classifier = SparkXGBClassifier(num_workers=4, tree_method=tree_method)
classifier.fit(data_trans)
def test_early_stop_param_validation(self):
classifier = SparkXGBClassifier(early_stopping_rounds=1)
with pytest.raises(ValueError, match="early_stopping_rounds"):
classifier.fit(self.cls_df_train)
def test_unsupported_params(self):
with pytest.raises(ValueError, match="evals_result"):
SparkXGBClassifier(evals_result={})