Remove ntree limit in python package. (#8345)
- Remove `ntree_limit`. The parameter has been deprecated since 1.4.0. - The SHAP package compatibility is broken.
This commit is contained in:
@@ -169,7 +169,7 @@ def reg_with_weight(
|
||||
)
|
||||
|
||||
|
||||
RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test"))
|
||||
RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test", "reg_params"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -181,6 +181,13 @@ def reg_data(spark: SparkSession) -> Generator[RegData, None, None]:
|
||||
predt0 = reg1.predict(X)
|
||||
pred_contrib0: np.ndarray = pred_contribs(reg1, X, None, False)
|
||||
|
||||
reg_params = {
|
||||
"max_depth": 5,
|
||||
"n_estimators": 10,
|
||||
"iteration_range": [0, 5],
|
||||
"max_bin": 9,
|
||||
}
|
||||
|
||||
# convert np array to pyspark dataframe
|
||||
reg_df_train_data = [
|
||||
(Vectors.dense(X[0, :]), int(y[0])),
|
||||
@@ -188,26 +195,34 @@ def reg_data(spark: SparkSession) -> Generator[RegData, None, None]:
|
||||
]
|
||||
reg_df_train = spark.createDataFrame(reg_df_train_data, ["features", "label"])
|
||||
|
||||
reg2 = xgb.XGBRegressor(max_depth=5, n_estimators=10)
|
||||
reg2.fit(X, y)
|
||||
predt2 = reg2.predict(X, iteration_range=[0, 5])
|
||||
# array([0.22185266, 0.77814734], dtype=float32)
|
||||
|
||||
reg_df_test = spark.createDataFrame(
|
||||
[
|
||||
(
|
||||
Vectors.dense(X[0, :]),
|
||||
float(predt0[0]),
|
||||
pred_contrib0[0, :].tolist(),
|
||||
float(predt2[0]),
|
||||
),
|
||||
(
|
||||
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
|
||||
float(predt0[1]),
|
||||
pred_contrib0[1, :].tolist(),
|
||||
float(predt2[1]),
|
||||
),
|
||||
],
|
||||
[
|
||||
"features",
|
||||
"expected_prediction",
|
||||
"expected_pred_contribs",
|
||||
"expected_prediction_with_params",
|
||||
],
|
||||
)
|
||||
yield RegData(reg_df_train, reg_df_test)
|
||||
yield RegData(reg_df_train, reg_df_test, reg_params)
|
||||
|
||||
|
||||
MultiClfData = namedtuple("MultiClfData", ("multi_clf_df_train", "multi_clf_df_test"))
|
||||
@@ -740,6 +755,76 @@ class TestPySparkLocal:
|
||||
model = classifier.fit(clf_data.cls_df_train)
|
||||
model.transform(clf_data.cls_df_test).collect()
|
||||
|
||||
def test_regressor_model_save_load(self, reg_data: RegData) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = "file:" + tmpdir
|
||||
regressor = SparkXGBRegressor(**reg_data.reg_params)
|
||||
model = regressor.fit(reg_data.reg_df_train)
|
||||
model.save(path)
|
||||
loaded_model = SparkXGBRegressorModel.load(path)
|
||||
assert model.uid == loaded_model.uid
|
||||
for k, v in reg_data.reg_params.items():
|
||||
assert loaded_model.getOrDefault(k) == v
|
||||
|
||||
pred_result = loaded_model.transform(reg_data.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
assert np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
|
||||
with pytest.raises(AssertionError, match="Expected class name"):
|
||||
SparkXGBClassifierModel.load(path)
|
||||
|
||||
assert_model_compatible(model, tmpdir)
|
||||
|
||||
def test_regressor_with_params(self, reg_data: RegData) -> None:
|
||||
regressor = SparkXGBRegressor(**reg_data.reg_params)
|
||||
all_params = dict(
|
||||
**(regressor._gen_xgb_params_dict()),
|
||||
**(regressor._gen_fit_params_dict()),
|
||||
**(regressor._gen_predict_params_dict()),
|
||||
)
|
||||
check_sub_dict_match(
|
||||
reg_data.reg_params, all_params, excluding_keys=_non_booster_params
|
||||
)
|
||||
|
||||
model = regressor.fit(reg_data.reg_df_train)
|
||||
all_params = dict(
|
||||
**(model._gen_xgb_params_dict()),
|
||||
**(model._gen_fit_params_dict()),
|
||||
**(model._gen_predict_params_dict()),
|
||||
)
|
||||
check_sub_dict_match(
|
||||
reg_data.reg_params, all_params, excluding_keys=_non_booster_params
|
||||
)
|
||||
pred_result = model.transform(reg_data.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
assert np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
|
||||
def test_regressor_model_pipeline_save_load(self, reg_data: RegData) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = "file:" + tmpdir
|
||||
regressor = SparkXGBRegressor()
|
||||
pipeline = Pipeline(stages=[regressor])
|
||||
pipeline = pipeline.copy(
|
||||
extra=get_params_map(reg_data.reg_params, regressor)
|
||||
)
|
||||
model = pipeline.fit(reg_data.reg_df_train)
|
||||
model.save(path)
|
||||
|
||||
loaded_model = PipelineModel.load(path)
|
||||
for k, v in reg_data.reg_params.items():
|
||||
assert loaded_model.stages[0].getOrDefault(k) == v
|
||||
|
||||
pred_result = loaded_model.transform(reg_data.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
assert np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
assert_model_compatible(model.stages[0], tmpdir)
|
||||
|
||||
|
||||
class XgboostLocalTest(SparkTestCase):
|
||||
def setUp(self):
|
||||
@@ -918,12 +1003,6 @@ class XgboostLocalTest(SparkTestCase):
|
||||
def get_local_tmp_dir(self):
|
||||
return self.tempdir + str(uuid.uuid4())
|
||||
|
||||
def assert_model_compatible(self, model: XGBModel, model_path: str):
|
||||
bst = xgb.Booster()
|
||||
path = glob.glob(f"{model_path}/**/model/part-00000", recursive=True)[0]
|
||||
bst.load_model(path)
|
||||
self.assertEqual(model.get_booster().save_raw("json"), bst.save_raw("json"))
|
||||
|
||||
def test_convert_to_sklearn_model_reg(self) -> None:
|
||||
regressor = SparkXGBRegressor(
|
||||
n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5
|
||||
@@ -1007,80 +1086,6 @@ class XgboostLocalTest(SparkTestCase):
|
||||
== "float64"
|
||||
)
|
||||
|
||||
def test_regressor_with_params(self):
|
||||
regressor = SparkXGBRegressor(**self.reg_params)
|
||||
all_params = dict(
|
||||
**(regressor._gen_xgb_params_dict()),
|
||||
**(regressor._gen_fit_params_dict()),
|
||||
**(regressor._gen_predict_params_dict()),
|
||||
)
|
||||
check_sub_dict_match(
|
||||
self.reg_params, all_params, excluding_keys=_non_booster_params
|
||||
)
|
||||
|
||||
model = regressor.fit(self.reg_df_train)
|
||||
all_params = dict(
|
||||
**(model._gen_xgb_params_dict()),
|
||||
**(model._gen_fit_params_dict()),
|
||||
**(model._gen_predict_params_dict()),
|
||||
)
|
||||
check_sub_dict_match(
|
||||
self.reg_params, all_params, excluding_keys=_non_booster_params
|
||||
)
|
||||
pred_result = model.transform(self.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
self.assertTrue(
|
||||
np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
)
|
||||
|
||||
def test_regressor_model_save_load(self):
|
||||
tmp_dir = self.get_local_tmp_dir()
|
||||
path = "file:" + tmp_dir
|
||||
regressor = SparkXGBRegressor(**self.reg_params)
|
||||
model = regressor.fit(self.reg_df_train)
|
||||
model.save(path)
|
||||
loaded_model = SparkXGBRegressorModel.load(path)
|
||||
self.assertEqual(model.uid, loaded_model.uid)
|
||||
for k, v in self.reg_params.items():
|
||||
self.assertEqual(loaded_model.getOrDefault(k), v)
|
||||
|
||||
pred_result = loaded_model.transform(self.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
self.assertTrue(
|
||||
np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
)
|
||||
|
||||
with self.assertRaisesRegex(AssertionError, "Expected class name"):
|
||||
SparkXGBClassifierModel.load(path)
|
||||
|
||||
self.assert_model_compatible(model, tmp_dir)
|
||||
|
||||
def test_regressor_model_pipeline_save_load(self):
|
||||
tmp_dir = self.get_local_tmp_dir()
|
||||
path = "file:" + tmp_dir
|
||||
regressor = SparkXGBRegressor()
|
||||
pipeline = Pipeline(stages=[regressor])
|
||||
pipeline = pipeline.copy(extra=get_params_map(self.reg_params, regressor))
|
||||
model = pipeline.fit(self.reg_df_train)
|
||||
model.save(path)
|
||||
|
||||
loaded_model = PipelineModel.load(path)
|
||||
for k, v in self.reg_params.items():
|
||||
self.assertEqual(loaded_model.stages[0].getOrDefault(k), v)
|
||||
|
||||
pred_result = loaded_model.transform(self.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
self.assertTrue(
|
||||
np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
)
|
||||
self.assert_model_compatible(model.stages[0], tmp_dir)
|
||||
|
||||
def test_callbacks(self):
|
||||
from xgboost.callback import LearningRateScheduler
|
||||
|
||||
|
||||
@@ -1,16 +1,24 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import tempfile
|
||||
import uuid
|
||||
from collections import namedtuple
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
import xgboost as xgb
|
||||
from xgboost import testing as tm
|
||||
from xgboost.callback import LearningRateScheduler
|
||||
|
||||
pytestmark = pytest.mark.skipif(**tm.no_spark())
|
||||
|
||||
from typing import Generator
|
||||
|
||||
from pyspark.ml.linalg import Vectors
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor
|
||||
from xgboost.spark.utils import _get_max_num_concurrent_tasks
|
||||
@@ -18,51 +26,119 @@ from xgboost.spark.utils import _get_max_num_concurrent_tasks
|
||||
from .utils import SparkLocalClusterTestCase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def spark() -> Generator[SparkSession, None, None]:
|
||||
config = {
|
||||
"spark.master": "local-cluster[2, 2, 1024]",
|
||||
"spark.python.worker.reuse": "false",
|
||||
"spark.driver.host": "127.0.0.1",
|
||||
"spark.task.maxFailures": "1",
|
||||
"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": "false",
|
||||
"spark.sql.pyspark.jvmStacktrace.enabled": "true",
|
||||
"spark.cores.max": "4",
|
||||
"spark.task.cpus": "1",
|
||||
"spark.executor.cores": "2",
|
||||
}
|
||||
|
||||
builder = SparkSession.builder.appName("XGBoost PySpark Python API Tests")
|
||||
for k, v in config.items():
|
||||
builder.config(k, v)
|
||||
logging.getLogger("pyspark").setLevel(logging.INFO)
|
||||
sess = builder.getOrCreate()
|
||||
yield sess
|
||||
|
||||
sess.stop()
|
||||
sess.sparkContext.stop()
|
||||
|
||||
|
||||
RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test", "reg_params"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reg_data(spark: SparkSession) -> Generator[RegData, None, None]:
|
||||
reg_params = {"max_depth": 5, "n_estimators": 10, "iteration_range": (0, 5)}
|
||||
|
||||
X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]])
|
||||
y = np.array([0, 1])
|
||||
|
||||
def custom_lr(boosting_round):
|
||||
return 1.0 / (boosting_round + 1)
|
||||
|
||||
reg1 = xgb.XGBRegressor(callbacks=[LearningRateScheduler(custom_lr)])
|
||||
reg1.fit(X, y)
|
||||
predt1 = reg1.predict(X)
|
||||
# array([0.02406833, 0.97593164], dtype=float32)
|
||||
|
||||
reg2 = xgb.XGBRegressor(max_depth=5, n_estimators=10)
|
||||
reg2.fit(X, y)
|
||||
predt2 = reg2.predict(X, iteration_range=(0, 5))
|
||||
# array([0.22185263, 0.77814734], dtype=float32)
|
||||
|
||||
reg_df_train = spark.createDataFrame(
|
||||
[
|
||||
(Vectors.dense(1.0, 2.0, 3.0), 0),
|
||||
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1),
|
||||
],
|
||||
["features", "label"],
|
||||
)
|
||||
reg_df_test = spark.createDataFrame(
|
||||
[
|
||||
(Vectors.dense(1.0, 2.0, 3.0), 0.0, float(predt2[0]), float(predt1[0])),
|
||||
(
|
||||
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
|
||||
1.0,
|
||||
float(predt2[1]),
|
||||
float(predt1[1]),
|
||||
),
|
||||
],
|
||||
[
|
||||
"features",
|
||||
"expected_prediction",
|
||||
"expected_prediction_with_params",
|
||||
"expected_prediction_with_callbacks",
|
||||
],
|
||||
)
|
||||
yield RegData(reg_df_train, reg_df_test, reg_params)
|
||||
|
||||
|
||||
class TestPySparkLocalCluster:
|
||||
def test_regressor_basic_with_params(self, reg_data: RegData) -> None:
|
||||
regressor = SparkXGBRegressor(**reg_data.reg_params)
|
||||
model = regressor.fit(reg_data.reg_df_train)
|
||||
pred_result = model.transform(reg_data.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
assert np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
|
||||
def test_callbacks(self, reg_data: RegData) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = os.path.join(tmpdir, str(uuid.uuid4()))
|
||||
|
||||
def custom_lr(boosting_round):
|
||||
return 1.0 / (boosting_round + 1)
|
||||
|
||||
cb = [LearningRateScheduler(custom_lr)]
|
||||
regressor = SparkXGBRegressor(callbacks=cb)
|
||||
|
||||
# Test the save/load of the estimator instead of the model, since
|
||||
# the callbacks param only exists in the estimator but not in the model
|
||||
regressor.save(path)
|
||||
regressor = SparkXGBRegressor.load(path)
|
||||
|
||||
model = regressor.fit(reg_data.reg_df_train)
|
||||
pred_result = model.transform(reg_data.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
assert np.isclose(
|
||||
row.prediction, row.expected_prediction_with_callbacks, atol=1e-3
|
||||
)
|
||||
|
||||
|
||||
class XgboostLocalClusterTestCase(SparkLocalClusterTestCase):
|
||||
def setUp(self):
|
||||
random.seed(2020)
|
||||
|
||||
self.n_workers = _get_max_num_concurrent_tasks(self.session.sparkContext)
|
||||
# The following code use xgboost python library to train xgb model and predict.
|
||||
#
|
||||
# >>> import numpy as np
|
||||
# >>> import xgboost
|
||||
# >>> X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]])
|
||||
# >>> y = np.array([0, 1])
|
||||
# >>> reg1 = xgboost.XGBRegressor()
|
||||
# >>> reg1.fit(X, y)
|
||||
# >>> reg1.predict(X)
|
||||
# array([8.8363886e-04, 9.9911636e-01], dtype=float32)
|
||||
# >>> def custom_lr(boosting_round, num_boost_round):
|
||||
# ... return 1.0 / (boosting_round + 1)
|
||||
# ...
|
||||
# >>> reg1.fit(X, y, callbacks=[xgboost.callback.reset_learning_rate(custom_lr)])
|
||||
# >>> reg1.predict(X)
|
||||
# array([0.02406833, 0.97593164], dtype=float32)
|
||||
# >>> reg2 = xgboost.XGBRegressor(max_depth=5, n_estimators=10)
|
||||
# >>> reg2.fit(X, y)
|
||||
# >>> reg2.predict(X, ntree_limit=5)
|
||||
# array([0.22185263, 0.77814734], dtype=float32)
|
||||
self.reg_params = {"max_depth": 5, "n_estimators": 10, "ntree_limit": 5}
|
||||
self.reg_df_train = self.session.createDataFrame(
|
||||
[
|
||||
(Vectors.dense(1.0, 2.0, 3.0), 0),
|
||||
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1),
|
||||
],
|
||||
["features", "label"],
|
||||
)
|
||||
self.reg_df_test = self.session.createDataFrame(
|
||||
[
|
||||
(Vectors.dense(1.0, 2.0, 3.0), 0.0, 0.2219, 0.02406),
|
||||
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1.0, 0.7781, 0.9759),
|
||||
],
|
||||
[
|
||||
"features",
|
||||
"expected_prediction",
|
||||
"expected_prediction_with_params",
|
||||
"expected_prediction_with_callbacks",
|
||||
],
|
||||
)
|
||||
|
||||
# Distributed section
|
||||
# Binary classification
|
||||
@@ -218,42 +294,6 @@ class XgboostLocalClusterTestCase(SparkLocalClusterTestCase):
|
||||
self.reg_best_score_eval = 5.239e-05
|
||||
self.reg_best_score_weight_and_eval = 4.850e-05
|
||||
|
||||
def test_regressor_basic_with_params(self):
|
||||
regressor = SparkXGBRegressor(**self.reg_params)
|
||||
model = regressor.fit(self.reg_df_train)
|
||||
pred_result = model.transform(self.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
self.assertTrue(
|
||||
np.isclose(
|
||||
row.prediction, row.expected_prediction_with_params, atol=1e-3
|
||||
)
|
||||
)
|
||||
|
||||
def test_callbacks(self):
|
||||
from xgboost.callback import LearningRateScheduler
|
||||
|
||||
path = os.path.join(self.tempdir, str(uuid.uuid4()))
|
||||
|
||||
def custom_learning_rate(boosting_round):
|
||||
return 1.0 / (boosting_round + 1)
|
||||
|
||||
cb = [LearningRateScheduler(custom_learning_rate)]
|
||||
regressor = SparkXGBRegressor(callbacks=cb)
|
||||
|
||||
# Test the save/load of the estimator instead of the model, since
|
||||
# the callbacks param only exists in the estimator but not in the model
|
||||
regressor.save(path)
|
||||
regressor = SparkXGBRegressor.load(path)
|
||||
|
||||
model = regressor.fit(self.reg_df_train)
|
||||
pred_result = model.transform(self.reg_df_test).collect()
|
||||
for row in pred_result:
|
||||
self.assertTrue(
|
||||
np.isclose(
|
||||
row.prediction, row.expected_prediction_with_callbacks, atol=1e-3
|
||||
)
|
||||
)
|
||||
|
||||
def test_classifier_distributed_basic(self):
|
||||
classifier = SparkXGBClassifier(num_workers=self.n_workers, n_estimators=100)
|
||||
model = classifier.fit(self.cls_df_train_distributed)
|
||||
@@ -409,7 +449,6 @@ class XgboostLocalClusterTestCase(SparkLocalClusterTestCase):
|
||||
pred_result = model.transform(
|
||||
self.cls_df_test_distributed_lower_estimators
|
||||
).collect()
|
||||
print(pred_result)
|
||||
for row in pred_result:
|
||||
self.assertTrue(np.isclose(row.expected_label, row.prediction, atol=1e-3))
|
||||
self.assertTrue(
|
||||
|
||||
Reference in New Issue
Block a user