[jvm-packages] Tutorial of XGBoost4J-Spark (#3534)

* add back train method but mark as deprecated

* add back train method but mark as deprecated

* fix scalastyle error

* fix scalastyle error

* add new

* update doc

* finish Gang Scheduling

* more

* intro

* Add sections: Prediction, Model persistence and ML pipeline.

* Add XGBoost4j-Spark MLlib pipeline example

* partial finished version

* finish the doc

* adjust code

* fix the doc

* use rst

* Convert XGBoost4J-Spark tutorial to reST

* Bring XGBoost4J up to date

* add note about using hdfs

* remove duplicate file

* fix descriptions

* update doc

* Wrap HDFS/S3 export support as a note

* update

* wrap indexing_mode example in code block
This commit is contained in:
Nan Zhu
2018-08-03 21:17:50 -07:00
committed by Philip Hyunsu Cho
parent 34dc9155ab
commit 31d1baba3d
8 changed files with 761 additions and 323 deletions

View File

@@ -0,0 +1,131 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.example.spark
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}
// this example works with Iris dataset (https://archive.ics.uci.edu/ml/datasets/iris)
object SparkMLlibPipeline {
def main(args: Array[String]): Unit = {
if (args.length != 1) {
println("Usage: SparkMLlibPipeline input_path native_model_path pipeline_model_path")
sys.exit(1)
}
val inputPath = args(0)
val nativeModelPath = args(1)
val pipelineModelPath = args(2)
val spark = SparkSession
.builder()
.appName("XGBoost4J-Spark Pipeline Example")
.getOrCreate()
// Load dataset
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField("class", StringType, true)))
val rawInput = spark.read.schema(schema).csv(inputPath)
// Split training and test dataset
val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123)
// Build ML pipeline, it includes 4 stages:
// 1, Assemble all features into a single vector column.
// 2, From string label to indexed double label.
// 3, Use XGBoostClassifier to train classification model.
// 4, Convert indexed double label back to original string label.
val assembler = new VectorAssembler()
.setInputCols(Array("sepal length", "sepal width", "petal length", "petal width"))
.setOutputCol("features")
val labelIndexer = new StringIndexer()
.setInputCol("class")
.setOutputCol("classIndex")
.fit(training)
val booster = new XGBoostClassifier(
Map("eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2
)
)
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("realLabel")
.setLabels(labelIndexer.labels)
val pipeline = new Pipeline()
.setStages(Array(assembler, labelIndexer, booster, labelConverter))
val model = pipeline.fit(training)
// Batch prediction
val prediction = model.transform(test)
prediction.show(false)
// Model evaluation
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(prediction)
println("The model accuracy is : " + accuracy)
// Tune model using cross validation
val paramGrid = new ParamGridBuilder()
.addGrid(booster.maxDepth, Array(3, 8))
.addGrid(booster.eta, Array(0.2, 0.6))
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
val cvModel = cv.fit(training)
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel].stages(2)
.asInstanceOf[XGBoostClassificationModel]
println("The params of best XGBoostClassification model : " +
bestModel.extractParamMap())
println("The training summary of best XGBoostClassificationModel : " +
bestModel.summary)
// Export the XGBoostClassificationModel as local XGBoost model,
// then you can load it back in local Python environment.
bestModel.nativeBooster.saveModel(nativeModelPath)
// ML pipeline persistence
model.write.overwrite().save(pipelineModelPath)
// Load a saved model and serving
val model2 = PipelineModel.load(pipelineModelPath)
model2.transform(test).show(false)
}
}

View File

@@ -1,206 +0,0 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.example.spark
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source
import ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
import org.apache.spark.ml.tuning._
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}
case class SalesRecord(storeId: Int, daysOfWeek: Int, date: String, sales: Int, customers: Int,
open: Int, promo: Int, stateHoliday: String, schoolHoliday: String)
case class Store(storeId: Int, storeType: String, assortment: String, competitionDistance: Int,
competitionOpenSinceMonth: Int, competitionOpenSinceYear: Int, promo2: Int,
promo2SinceWeek: Int, promo2SinceYear: Int, promoInterval: String)
object SparkModelTuningTool {
private def parseStoreFile(storeFilePath: String): List[Store] = {
var isHeader = true
val storeInstances = new ListBuffer[Store]
for (line <- Source.fromFile(storeFilePath).getLines()) {
if (isHeader) {
isHeader = false
} else {
try {
val strArray = line.split(",")
if (strArray.length == 10) {
val Array(storeIdStr, storeTypeStr, assortmentStr, competitionDistanceStr,
competitionOpenSinceMonthStr, competitionOpenSinceYearStr, promo2Str,
promo2SinceWeekStr, promo2SinceYearStr, promoIntervalStr) = line.split(",")
storeInstances += Store(storeIdStr.toInt, storeTypeStr, assortmentStr,
if (competitionDistanceStr == "") -1 else competitionDistanceStr.toInt,
if (competitionOpenSinceMonthStr == "" ) -1 else competitionOpenSinceMonthStr.toInt,
if (competitionOpenSinceYearStr == "" ) -1 else competitionOpenSinceYearStr.toInt,
promo2Str.toInt,
if (promo2Str == "0") -1 else promo2SinceWeekStr.toInt,
if (promo2Str == "0") -1 else promo2SinceYearStr.toInt,
promoIntervalStr.replace("\"", ""))
} else {
val Array(storeIdStr, storeTypeStr, assortmentStr, competitionDistanceStr,
competitionOpenSinceMonthStr, competitionOpenSinceYearStr, promo2Str,
promo2SinceWeekStr, promo2SinceYearStr, firstMonth, secondMonth, thirdMonth,
forthMonth) = line.split(",")
storeInstances += Store(storeIdStr.toInt, storeTypeStr, assortmentStr,
if (competitionDistanceStr == "") -1 else competitionDistanceStr.toInt,
if (competitionOpenSinceMonthStr == "" ) -1 else competitionOpenSinceMonthStr.toInt,
if (competitionOpenSinceYearStr == "" ) -1 else competitionOpenSinceYearStr.toInt,
promo2Str.toInt,
if (promo2Str == "0") -1 else promo2SinceWeekStr.toInt,
if (promo2Str == "0") -1 else promo2SinceYearStr.toInt,
firstMonth.replace("\"", "") + "," + secondMonth + "," + thirdMonth + "," +
forthMonth.replace("\"", ""))
}
} catch {
case e: Exception =>
e.printStackTrace()
sys.exit(1)
}
}
}
storeInstances.toList
}
private def parseTrainingFile(trainingPath: String): List[SalesRecord] = {
var isHeader = true
val records = new ListBuffer[SalesRecord]
for (line <- Source.fromFile(trainingPath).getLines()) {
if (isHeader) {
isHeader = false
} else {
val Array(storeIdStr, daysOfWeekStr, dateStr, salesStr, customerStr, openStr, promoStr,
stateHolidayStr, schoolHolidayStr) = line.split(",")
val salesRecord = SalesRecord(storeIdStr.toInt, daysOfWeekStr.toInt, dateStr,
salesStr.toInt, customerStr.toInt, openStr.toInt, promoStr.toInt, stateHolidayStr,
schoolHolidayStr)
records += salesRecord
}
}
records.toList
}
private def featureEngineering(ds: DataFrame): DataFrame = {
import org.apache.spark.sql.functions._
import ds.sparkSession.implicits._
val stateHolidayIndexer = new StringIndexer()
.setInputCol("stateHoliday")
.setOutputCol("stateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("schoolHoliday")
.setOutputCol("schoolHolidayIndex")
val storeTypeIndexer = new StringIndexer()
.setInputCol("storeType")
.setOutputCol("storeTypeIndex")
val assortmentIndexer = new StringIndexer()
.setInputCol("assortment")
.setOutputCol("assortmentIndex")
val promoInterval = new StringIndexer()
.setInputCol("promoInterval")
.setOutputCol("promoIntervalIndex")
val filteredDS = ds.filter($"sales" > 0).filter($"open" > 0)
// parse date
val dsWithDayCol =
filteredDS.withColumn("day", udf((dateStr: String) =>
dateStr.split("-")(2).toInt).apply(col("date")))
val dsWithMonthCol =
dsWithDayCol.withColumn("month", udf((dateStr: String) =>
dateStr.split("-")(1).toInt).apply(col("date")))
val dsWithYearCol =
dsWithMonthCol.withColumn("year", udf((dateStr: String) =>
dateStr.split("-")(0).toInt).apply(col("date")))
val dsWithLogSales = dsWithYearCol.withColumn("logSales",
udf((sales: Int) => math.log(sales)).apply(col("sales")))
// fill with mean values
val meanCompetitionDistance = dsWithLogSales.select(avg("competitionDistance")).first()(0).
asInstanceOf[Double]
println("====" + meanCompetitionDistance)
val finalDS = dsWithLogSales.withColumn("transformedCompetitionDistance",
udf((distance: Int) => if (distance > 0) distance.toDouble else meanCompetitionDistance).
apply(col("competitionDistance")))
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("storeId", "daysOfWeek", "promo", "competitionDistance", "promo2", "day",
"month", "year", "transformedCompetitionDistance", "stateHolidayIndex",
"schoolHolidayIndex", "storeTypeIndex", "assortmentIndex", "promoIntervalIndex"))
.setOutputCol("features")
val pipeline = new Pipeline().setStages(
Array(stateHolidayIndexer, schoolHolidayIndexer, storeTypeIndexer, assortmentIndexer,
promoInterval, vectorAssembler))
pipeline.fit(finalDS).transform(finalDS).
drop("stateHoliday", "schoolHoliday", "storeType", "assortment", "promoInterval", "sales",
"promo2SinceWeek", "customers", "promoInterval", "competitionOpenSinceYear",
"competitionOpenSinceMonth", "promo2SinceYear", "competitionDistance", "date")
}
private def crossValidation(
xgboostParam: Map[String, Any],
trainingData: Dataset[_]): TrainValidationSplitModel = {
val xgbEstimator = new XGBoostRegressor(xgboostParam).setFeaturesCol("features").
setLabelCol("logSales")
val paramGrid = new ParamGridBuilder()
.addGrid(xgbEstimator.numRound, Array(20, 50))
.addGrid(xgbEstimator.eta, Array(0.1, 0.4))
.build()
val tv = new TrainValidationSplit()
.setEstimator(xgbEstimator)
.setEvaluator(new RegressionEvaluator().setLabelCol("logSales"))
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8) // Use 3+ in practice
tv.fit(trainingData)
}
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("rosseman").getOrCreate()
import sparkSession.implicits._
// parse training file to data frame
val trainingPath = args(0)
val allSalesRecords = parseTrainingFile(trainingPath)
// create dataset
val salesRecordsDF = allSalesRecords.toDF
// parse store file to data frame
val storeFilePath = args(1)
val allStores = parseStoreFile(storeFilePath)
val storesDS = allStores.toDF()
val fullDataset = salesRecordsDF.join(storesDS, "storeId")
val featureEngineeredDF = featureEngineering(fullDataset)
// prediction
val params = new mutable.HashMap[String, Any]()
params += "eta" -> 0.1
params += "max_depth" -> 6
params += "silent" -> 1
params += "ntreelimit" -> 1000
params += "objective" -> "reg:linear"
params += "subsample" -> 0.8
params += "num_round" -> 100
val bestModel = crossValidation(params.toMap, featureEngineeredDF)
}
}

View File

@@ -0,0 +1,78 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.example.spark
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
// this example works with Iris dataset (https://archive.ics.uci.edu/ml/datasets/iris)
object SparkTraining {
def main(args: Array[String]): Unit = {
if (args.length < 1) {
// scalastyle:off
println("Usage: program input_path")
sys.exit(1)
}
val spark = SparkSession.builder().getOrCreate()
val inputPath = args(0)
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField("class", StringType, true)))
val rawInput = spark.read.schema(schema).csv(args(0))
// transform class to index to make xgboost happy
val stringIndexer = new StringIndexer()
.setInputCol("class")
.setOutputCol("classIndex")
.fit(rawInput)
val labelTransformed = stringIndexer.transform(rawInput).drop("class")
// compose all feature columns as vector
val vectorAssembler = new VectorAssembler().
setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features",
"classIndex")
/**
* setup "timeout_request_workers" -> 60000L to make this application if it cannot get enough resources
* to get 2 workers within 60000 ms
*
* setup "checkpoint_path" -> "/checkpoints" and "checkpoint_interval" -> 2 to save checkpoint for every
* two iterations
*/
val xgbParam = Map("eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol("features").
setLabelCol("classIndex")
val xgbClassificationModel = xgbClassifier.fit(xgbInput)
val results = xgbClassificationModel.transform(xgbInput)
results.show()
}
}

View File

@@ -1,55 +0,0 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.example.spark
import ml.dmlc.xgboost4j.scala.Booster
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
object SparkWithDataFrame {
def main(args: Array[String]): Unit = {
if (args.length != 4) {
println(
"usage: program num_of_rounds num_workers training_path test_path")
sys.exit(1)
}
// create SparkSession
val sparkConf = new SparkConf().setAppName("XGBoost-spark-example")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Booster]))
// val sqlContext = new SQLContext(new SparkContext(sparkConf))
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// create training and testing dataframes
val numRound = args(0).toInt
val inputTrainPath = args(2)
val inputTestPath = args(3)
// build dataset
val trainDF = sparkSession.sqlContext.read.format("libsvm").load(inputTrainPath)
val testDF = sparkSession.sqlContext.read.format("libsvm").load(inputTestPath)
// start training
val paramMap = List(
"eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "binary:logistic",
"num_round" -> numRound,
"num_workers" -> args(1).toInt).toMap
val xgboostModel = new XGBoostClassifier(paramMap).fit(trainDF)
// xgboost-spark appends the column containing prediction results
xgboostModel.transform(testDF).show()
}
}