[jvm-packages] Integration with Spark Dataframe/Dataset (#1559)

* bump up to scala 2.11

* framework of data frame integration

* test consistency between RDD and DataFrame

* order preservation

* test order preservation

* example code and fix makefile

* improve type checking

* improve APIs

* user docs

* work around travis CI's limitation on log length

* adjust test structure

* integrate with Spark -1 .x

* spark 2.x integration

* remove spark 1.x implementation but provide instructions on how to downgrade
This commit is contained in:
Nan Zhu
2016-09-11 15:02:58 -04:00
committed by GitHub
parent 7ff742ebf7
commit fb02797e2a
15 changed files with 625 additions and 139 deletions

View File

@@ -18,10 +18,9 @@ package ml.dmlc.xgboost4j.scala.spark
import scala.collection.JavaConverters._
import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector}
import org.apache.spark.mllib.regression.{LabeledPoint => SparkLabeledPoint}
import ml.dmlc.xgboost4j.LabeledPoint
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.{LabeledPoint => SparkLabeledPoint}
object DataUtils extends Serializable {
implicit def fromSparkPointsToXGBoostPointsJava(sps: Iterator[SparkLabeledPoint])

View File

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.{SparkContext, TaskContext}
object XGBoost extends Serializable {
@@ -111,6 +112,33 @@ object XGBoost extends Serializable {
}.cache()
}
/**
*
* @param trainingData the trainingset represented as DataFrame
* @param params Map containing the parameters to configure XGBoost
* @param round the number of iterations
* @param nWorkers the number of xgboost workers, 0 by default which means that the number of
* workers equals to the partition number of trainingData RDD
* @param obj the user-defined objective function, null by default
* @param eval the user-defined evaluation function, null by default
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset
* @param inputCol the name of input column, "features" as default value
* @param labelCol the name of output column, "label" as default value
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training
*/
@throws(classOf[XGBoostError])
def trainWithDataFrame(trainingData: Dataset[_],
params: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN,
inputCol: String = "features", labelCol: String = "label"): XGBoostModel = {
new XGBoostEstimator(inputCol, labelCol, params, round, nWorkers, obj, eval,
useExternalMemory, missing).fit(trainingData)
}
/**
*
* @param trainingData the trainingset represented as RDD
@@ -127,9 +155,9 @@ object XGBoost extends Serializable {
* @return XGBoostModel when successful training
*/
@throws(classOf[XGBoostError])
def train(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = {
def trainWithRDD(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = {
require(nWorkers > 0, "you must specify more than 0 workers")
val tracker = new RabitTracker(nWorkers)
implicit val sc = trainingData.sparkContext

View File

@@ -0,0 +1,81 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait}
import org.apache.spark.ml.{Predictor, Estimator}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{NumericType, DoubleType, StructType}
import org.apache.spark.sql.{DataFrame, TypedColumn, Dataset, Row}
/**
* the estimator wrapping XGBoost to produce a training model
*
* @param inputCol the name of input column
* @param labelCol the name of label column
* @param xgboostParams the parameters configuring XGBoost
* @param round the number of iterations to train
* @param nWorkers the total number of workers of xgboost
* @param obj the customized objective function, default to be null and using the default in model
* @param eval the customized eval function, default to be null and using the default in model
* @param useExternalMemory whether to use external memory when training
* @param missing the value taken as missing
*/
class XGBoostEstimator(
inputCol: String, labelCol: String,
xgboostParams: Map[String, Any], round: Int, nWorkers: Int,
obj: ObjectiveTrait = null,
eval: EvalTrait = null, useExternalMemory: Boolean = false, missing: Float = Float.NaN)
extends Estimator[XGBoostModel] {
override val uid: String = Identifiable.randomUID("XGBoostEstimator")
/**
* produce a XGBoostModel by fitting the given dataset
*/
def fit(trainingSet: Dataset[_]): XGBoostModel = {
val instances = trainingSet.select(
col(inputCol), col(labelCol).cast(DoubleType)).rdd.map {
case Row(feature: Vector, label: Double) =>
LabeledPoint(label, feature)
}
transformSchema(trainingSet.schema, logging = true)
val trainedModel = XGBoost.trainWithRDD(instances, xgboostParams, round, nWorkers, obj,
eval, useExternalMemory, missing).setParent(this)
copyValues(trainedModel)
}
override def copy(extra: ParamMap): Estimator[XGBoostModel] = {
defaultCopy(extra)
}
override def transformSchema(schema: StructType): StructType = {
// check input type, for now we only support vectorUDT as the input feature type
val inputType = schema(inputCol).dataType
require(inputType.equals(new VectorUDT), s"the type of input column $inputCol has to VectorUDT")
// check label Type,
val labelType = schema(labelCol).dataType
require(labelType.isInstanceOf[NumericType], s"the type of label column $labelCol has to" +
s" be NumericType")
schema
}
}

View File

@@ -16,16 +16,28 @@
package ml.dmlc.xgboost4j.scala.spark
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{TaskContext, SparkContext}
import org.apache.spark.mllib.linalg.{DenseVector, Vector}
import org.apache.spark.rdd.RDD
import ml.dmlc.xgboost4j.java.{Rabit, DMatrix => JDMatrix}
import ml.dmlc.xgboost4j.scala.{EvalTrait, Booster, DMatrix}
import scala.collection.JavaConverters._
class XGBoostModel(_booster: Booster) extends Serializable {
import ml.dmlc.xgboost4j.java.{DMatrix => JDMatrix, Rabit}
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, EvalTrait}
import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ml.{Model, PredictionModel}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.mllib.linalg.{VectorUDT, DenseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.{SparkContext, TaskContext}
class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializable {
var inputCol = "features"
var outputCol = "prediction"
var outputType: DataType = ArrayType(elementType = FloatType, containsNull = false)
/**
* evaluate XGBoostModel with a RDD-wrapped dataset
@@ -40,6 +52,7 @@ class XGBoostModel(_booster: Booster) extends Serializable {
eval: EvalTrait,
evalName: String,
useExternalCache: Boolean = false): String = {
val broadcastBooster = evalDataset.sparkContext.broadcast(_booster)
val appName = evalDataset.context.appName
val allEvalMetrics = evalDataset.mapPartitions {
labeledPointsPartition =>
@@ -55,7 +68,7 @@ class XGBoostModel(_booster: Booster) extends Serializable {
}
}
val dMatrix = new DMatrix(labeledPointsPartition, cacheFileName)
val predictions = _booster.predict(dMatrix)
val predictions = broadcastBooster.value.predict(dMatrix)
Rabit.shutdown()
Iterator(Some(eval.eval(predictions, dMatrix)))
} else {
@@ -152,8 +165,71 @@ class XGBoostModel(_booster: Booster) extends Serializable {
outputStream.close()
}
/**
* Get the booster instance of this model
*/
def booster: Booster = _booster
override val uid: String = Identifiable.randomUID("XGBoostModel")
override def copy(extra: ParamMap): XGBoostModel = {
defaultCopy(extra)
}
/**
* produces the prediction results and append as an additional column in the original dataset
* NOTE: the prediction results is kept as the original format of xgboost
* @return the original dataframe with an additional column containing prediction results
*/
override def transform(testSet: Dataset[_]): DataFrame = {
transform(testSet, None)
}
/**
* produces the prediction results and append as an additional column in the original dataset
* NOTE: the prediction results is transformed by applying the transformation function
* predictResultTrans to the original xgboost output
* @param predictResultTrans the function to transform xgboost output to the expected format
* @return the original dataframe with an additional column containing prediction results
*/
def transform(testSet: Dataset[_], predictResultTrans: Option[Array[Float] => DataType]):
DataFrame = {
transformSchema(testSet.schema, logging = true)
val broadcastBooster = testSet.sqlContext.sparkContext.broadcast(_booster)
val instances = testSet.rdd.mapPartitions {
rowIterator =>
if (rowIterator.hasNext) {
val (rowItr1, rowItr2) = rowIterator.duplicate
val vectorIterator = rowItr2.map(row => row.asInstanceOf[Row].getAs[Vector](inputCol)).
toList.iterator
import DataUtils._
val testDataset = new DMatrix(vectorIterator, null)
val rowPredictResults = broadcastBooster.value.predict(testDataset)
val predictResults = {
if (predictResultTrans.isDefined) {
rowPredictResults.map(prediction => Row(predictResultTrans.get(prediction))).iterator
} else {
rowPredictResults.map(prediction => Row(prediction)).iterator
}
}
rowItr1.zip(predictResults).map {
case (originalColumns: Row, predictColumn: Row) =>
Row.fromSeq(originalColumns.toSeq ++ predictColumn.toSeq)
}
} else {
Iterator[Row]()
}
}
testSet.sqlContext.createDataFrame(instances, testSet.schema.add("prediction", outputType)).
cache()
}
@DeveloperApi
override def transformSchema(schema: StructType): StructType = {
if (schema.fieldNames.contains(outputCol)) {
throw new IllegalArgumentException(s"Output column $outputCol already exists.")
}
val inputType = schema(inputCol).dataType
require(inputType.equals(new VectorUDT),
s"the type of input column $inputCol has to be VectorUDT")
val outputFields = schema.fields :+ StructField(outputCol, outputType, nullable = false)
StructType(outputFields)
}
}