[jvm-packages] predictLeaf with Dataframe (#1576)

* add back train method but mark as deprecated

* predictLeaf with Dataset

* fix

* fix
This commit is contained in:
Nan Zhu 2016-09-15 06:15:47 -04:00 committed by GitHub
parent bb388cbb31
commit 4ad648e856
3 changed files with 68 additions and 12 deletions

View File

@ -135,10 +135,35 @@ object XGBoost extends Serializable {
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN,
inputCol: String = "features", labelCol: String = "label"): XGBoostModel = {
require(nWorkers > 0, "you must specify more than 0 workers")
new XGBoostEstimator(inputCol, labelCol, params, round, nWorkers, obj, eval,
useExternalMemory, missing).fit(trainingData)
}
/**
*
* @param trainingData the trainingset represented as RDD
* @param configMap Map containing the configuration entries
* @param round the number of iterations
* @param nWorkers the number of xgboost workers, 0 by default which means that the number of
* workers equals to the partition number of trainingData RDD
* @param obj the user-defined objective function, null by default
* @param eval the user-defined evaluation function, null by default
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training
*/
@deprecated(since = "0.7", message = "this method is deprecated since 0.7, users are encouraged" +
" to switch to trainWithRDD")
def train(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int,
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null,
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = {
require(nWorkers > 0, "you must specify more than 0 workers")
trainWithRDD(trainingData, configMap, round, nWorkers, obj, eval, useExternalMemory, missing)
}
/**
*
* @param trainingData the trainingset represented as RDD

View File

@ -42,8 +42,8 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa
/**
* evaluate XGBoostModel with a RDD-wrapped dataset
*
* NOTE: you have to specify value of either eval or iter; when you specify both, this adopts
* the default eval metric of model
* NOTE: you have to specify value of either eval or iter; when you specify both, this method
* adopts the default eval metric of model
*
* @param evalDataset the dataset used for evaluation
* @param evalName the name of evaluation
@ -159,7 +159,7 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa
testSet.mapPartitions { testSamples =>
if (testSamples.hasNext) {
val dMatrix = new DMatrix(new JDMatrix(testSamples, null))
Iterator(broadcastBooster.value.predictLeaf(dMatrix, 0))
Iterator(broadcastBooster.value.predictLeaf(dMatrix))
} else {
Iterator()
}
@ -186,6 +186,37 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa
defaultCopy(extra)
}
/**
* append leaf index of each row as an additional column in the original dataset
*
* @return the original dataframe with an additional column containing prediction results
*/
def transformLeaf(testSet: Dataset[_]): Unit = {
outputCol = "predLeaf"
transformSchema(testSet.schema, logging = true)
val broadcastBooster = testSet.sparkSession.sparkContext.broadcast(_booster)
val instances = testSet.rdd.mapPartitions {
rowIterator =>
if (rowIterator.hasNext) {
val (rowItr1, rowItr2) = rowIterator.duplicate
val vectorIterator = rowItr2.map(row => row.asInstanceOf[Row].getAs[Vector](inputCol)).
toList.iterator
import DataUtils._
val testDataset = new DMatrix(vectorIterator, null)
val rowPredictResults = broadcastBooster.value.predictLeaf(testDataset)
val predictResults = rowPredictResults.map(prediction => Row(prediction)).iterator
rowItr1.zip(predictResults).map {
case (originalColumns: Row, predictColumn: Row) =>
Row.fromSeq(originalColumns.toSeq ++ predictColumn.toSeq)
}
} else {
Iterator[Row]()
}
}
testSet.sparkSession.createDataFrame(instances, testSet.schema.add(outputCol, outputType)).
cache()
}
/**
* produces the prediction results and append as an additional column in the original dataset
* NOTE: the prediction results is kept as the original format of xgboost
@ -201,13 +232,13 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa
* NOTE: the prediction results is transformed by applying the transformation function
* predictResultTrans to the original xgboost output
*
* @param predictResultTrans the function to transform xgboost output to the expected format
* @param rawPredictTransformer the function to transform xgboost output to the expected format
* @return the original dataframe with an additional column containing prediction results
*/
def transform(testSet: Dataset[_], predictResultTrans: Option[Array[Float] => DataType]):
def transform(testSet: Dataset[_], rawPredictTransformer: Option[Array[Float] => DataType]):
DataFrame = {
transformSchema(testSet.schema, logging = true)
val broadcastBooster = testSet.sqlContext.sparkContext.broadcast(_booster)
val broadcastBooster = testSet.sparkSession.sparkContext.broadcast(_booster)
val instances = testSet.rdd.mapPartitions {
rowIterator =>
if (rowIterator.hasNext) {
@ -218,8 +249,9 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa
val testDataset = new DMatrix(vectorIterator, null)
val rowPredictResults = broadcastBooster.value.predict(testDataset)
val predictResults = {
if (predictResultTrans.isDefined) {
rowPredictResults.map(prediction => Row(predictResultTrans.get(prediction))).iterator
if (rawPredictTransformer.isDefined) {
rowPredictResults.map(prediction =>
Row(rawPredictTransformer.get(prediction))).iterator
} else {
rowPredictResults.map(prediction => Row(prediction)).iterator
}
@ -232,7 +264,7 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa
Iterator[Row]()
}
}
testSet.sqlContext.createDataFrame(instances, testSet.schema.add("prediction", outputType)).
testSet.sparkSession.createDataFrame(instances, testSet.schema.add(outputCol, outputType)).
cache()
}

View File

@ -133,13 +133,12 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
}
test("test eval functions with RDD") {
val trainingRDD = buildTrainingRDD(sc)
val trainingRDD = buildTrainingRDD(sc).cache()
val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0",
"objective" -> "binary:logistic").toMap
val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers)
val evalFunc = new EvalError
xgBoostModel.eval(trainingRDD, "eval1", iter = 5, useExternalCache = false)
xgBoostModel.eval(trainingRDD, "eval2", evalFunc = evalFunc, useExternalCache = false)
xgBoostModel.eval(trainingRDD, "eval2", evalFunc = new EvalError, useExternalCache = false)
}
test("test prediction functionality with empty partition") {