''' Collection of examples for using xgboost.spark estimator interface ================================================================== @author: Weichen Xu ''' from pyspark.sql import SparkSession from pyspark.sql.functions import rand from pyspark.ml.linalg import Vectors import sklearn.datasets from sklearn.model_selection import train_test_split from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator spark = SparkSession.builder.master("local[*]").getOrCreate() def create_spark_df(X, y): return spark.createDataFrame( spark.sparkContext.parallelize([ (Vectors.dense(features), float(label)) for features, label in zip(X, y) ]), ["features", "label"] ) # load diabetes dataset (regression dataset) diabetes_X, diabetes_y = sklearn.datasets.load_diabetes(return_X_y=True) diabetes_X_train, diabetes_X_test, diabetes_y_train, diabetes_y_test = \ train_test_split(diabetes_X, diabetes_y, test_size=0.3, shuffle=True) diabetes_train_spark_df = create_spark_df(diabetes_X_train, diabetes_y_train) diabetes_test_spark_df = create_spark_df(diabetes_X_test, diabetes_y_test) # train xgboost regressor model xgb_regressor = SparkXGBRegressor(max_depth=5) xgb_regressor_model = xgb_regressor.fit(diabetes_train_spark_df) transformed_diabetes_test_spark_df = xgb_regressor_model.transform(diabetes_test_spark_df) regressor_evaluator = RegressionEvaluator(metricName="rmse") print(f"regressor rmse={regressor_evaluator.evaluate(transformed_diabetes_test_spark_df)}") diabetes_train_spark_df2 = diabetes_train_spark_df.withColumn( "validationIndicatorCol", rand(1) > 0.7 ) # train xgboost regressor model with validation dataset xgb_regressor2 = SparkXGBRegressor(max_depth=5, validation_indicator_col="validationIndicatorCol") xgb_regressor_model2 = xgb_regressor2.fit(diabetes_train_spark_df2) transformed_diabetes_test_spark_df2 = xgb_regressor_model2.transform(diabetes_test_spark_df) print(f"regressor2 rmse={regressor_evaluator.evaluate(transformed_diabetes_test_spark_df2)}") # load iris dataset (classification dataset) iris_X, iris_y = sklearn.datasets.load_iris(return_X_y=True) iris_X_train, iris_X_test, iris_y_train, iris_y_test = \ train_test_split(iris_X, iris_y, test_size=0.3, shuffle=True) iris_train_spark_df = create_spark_df(iris_X_train, iris_y_train) iris_test_spark_df = create_spark_df(iris_X_test, iris_y_test) # train xgboost classifier model xgb_classifier = SparkXGBClassifier(max_depth=5) xgb_classifier_model = xgb_classifier.fit(iris_train_spark_df) transformed_iris_test_spark_df = xgb_classifier_model.transform(iris_test_spark_df) classifier_evaluator = MulticlassClassificationEvaluator(metricName="f1") print(f"classifier f1={classifier_evaluator.evaluate(transformed_iris_test_spark_df)}") iris_train_spark_df2 = iris_train_spark_df.withColumn( "validationIndicatorCol", rand(1) > 0.7 ) # train xgboost classifier model with validation dataset xgb_classifier2 = SparkXGBClassifier(max_depth=5, validation_indicator_col="validationIndicatorCol") xgb_classifier_model2 = xgb_classifier2.fit(iris_train_spark_df2) transformed_iris_test_spark_df2 = xgb_classifier_model2.transform(iris_test_spark_df) print(f"classifier2 f1={classifier_evaluator.evaluate(transformed_iris_test_spark_df2)}") spark.stop()