[PySpark] change the returning model type to string from binary (#8085)

* [PySpark] change the returning model type to string from binary

XGBoost pyspark can be can be accelerated by RAPIDS Accelerator seamlessly by
changing the returning model type from binary to string.
This commit is contained in:
Bobby Wang 2022-07-19 18:39:20 +08:00 committed by GitHub
parent 2365f82750
commit f801d3cf15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 13 deletions

View File

@ -2,7 +2,6 @@
"""Xgboost pyspark integration submodule for core code."""
# pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name
# pylint: disable=too-few-public-methods
import cloudpickle
import numpy as np
import pandas as pd
from scipy.special import expit, softmax # pylint: disable=no-name-in-module
@ -410,12 +409,13 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
def _create_pyspark_model(self, xgb_model):
return self._pyspark_model_cls()(xgb_model)
def _convert_to_sklearn_model(self, booster):
def _convert_to_sklearn_model(self, booster: bytearray, config: str):
xgb_sklearn_params = self._gen_xgb_params_dict(
gen_xgb_sklearn_estimator_param=True
)
sklearn_model = self._xgb_cls()(**xgb_sklearn_params)
sklearn_model._Booster = booster
sklearn_model.load_model(booster)
sklearn_model._Booster.load_config(config)
return sklearn_model
def _query_plan_contains_valid_repartition(self, dataset):
@ -629,16 +629,27 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
context.barrier()
if context.partitionId() == 0:
yield pd.DataFrame(data={"booster_bytes": [cloudpickle.dumps(booster)]})
yield pd.DataFrame(
data={
"config": [booster.save_config()],
"booster": [booster.save_raw("json").decode("utf-8")]
}
)
def _run_job():
ret = (
dataset.mapInPandas(_train_booster, schema="config string, booster string")
.rdd.barrier()
.mapPartitions(lambda x: x)
.collect()[0]
)
return ret[0], ret[1]
(config, booster) = _run_job()
result_ser_booster = (
dataset.mapInPandas(_train_booster, schema="booster_bytes binary")
.rdd.barrier()
.mapPartitions(lambda x: x)
.collect()[0][0]
)
result_xgb_model = self._convert_to_sklearn_model(
cloudpickle.loads(result_ser_booster)
bytearray(booster, "utf-8"),
config
)
return self._copyValues(self._create_pyspark_model(result_xgb_model))

View File

@ -904,7 +904,8 @@ class XgboostLocalTest(SparkTestCase):
# Check that regardless of what booster, _convert_to_model converts to the correct class type
sklearn_classifier = classifier._convert_to_sklearn_model(
clf_model.get_booster()
clf_model.get_booster().save_raw("json"),
clf_model.get_booster().save_config()
)
assert isinstance(sklearn_classifier, XGBClassifier)
assert sklearn_classifier.n_estimators == 200
@ -912,7 +913,10 @@ class XgboostLocalTest(SparkTestCase):
assert sklearn_classifier.max_depth == 3
assert sklearn_classifier.get_params()["sketch_eps"] == 0.5
sklearn_regressor = regressor._convert_to_sklearn_model(reg_model.get_booster())
sklearn_regressor = regressor._convert_to_sklearn_model(
reg_model.get_booster().save_raw("json"),
reg_model.get_booster().save_config()
)
assert isinstance(sklearn_regressor, XGBRegressor)
assert sklearn_regressor.n_estimators == 200
assert sklearn_regressor.missing == 2.0