diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index 3491a63cc..bd49e108c 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -135,10 +135,35 @@ object XGBoost extends Serializable { nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null, useExternalMemory: Boolean = false, missing: Float = Float.NaN, inputCol: String = "features", labelCol: String = "label"): XGBoostModel = { + require(nWorkers > 0, "you must specify more than 0 workers") new XGBoostEstimator(inputCol, labelCol, params, round, nWorkers, obj, eval, useExternalMemory, missing).fit(trainingData) } + /** + * + * @param trainingData the trainingset represented as RDD + * @param configMap Map containing the configuration entries + * @param round the number of iterations + * @param nWorkers the number of xgboost workers, 0 by default which means that the number of + * workers equals to the partition number of trainingData RDD + * @param obj the user-defined objective function, null by default + * @param eval the user-defined evaluation function, null by default + * @param useExternalMemory indicate whether to use external memory cache, by setting this flag as + * true, the user may save the RAM cost for running XGBoost within Spark + * @param missing the value represented the missing value in the dataset + * @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed + * @return XGBoostModel when successful training + */ + @deprecated(since = "0.7", message = "this method is deprecated since 0.7, users are encouraged" + + " to switch to trainWithRDD") + def train(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int, + nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null, + useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = { + require(nWorkers > 0, "you must specify more than 0 workers") + trainWithRDD(trainingData, configMap, round, nWorkers, obj, eval, useExternalMemory, missing) + } + /** * * @param trainingData the trainingset represented as RDD diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala index 7070109b8..eb81e0c22 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala @@ -42,8 +42,8 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa /** * evaluate XGBoostModel with a RDD-wrapped dataset * - * NOTE: you have to specify value of either eval or iter; when you specify both, this adopts - * the default eval metric of model + * NOTE: you have to specify value of either eval or iter; when you specify both, this method + * adopts the default eval metric of model * * @param evalDataset the dataset used for evaluation * @param evalName the name of evaluation @@ -159,7 +159,7 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa testSet.mapPartitions { testSamples => if (testSamples.hasNext) { val dMatrix = new DMatrix(new JDMatrix(testSamples, null)) - Iterator(broadcastBooster.value.predictLeaf(dMatrix, 0)) + Iterator(broadcastBooster.value.predictLeaf(dMatrix)) } else { Iterator() } @@ -186,6 +186,37 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa defaultCopy(extra) } + /** + * append leaf index of each row as an additional column in the original dataset + * + * @return the original dataframe with an additional column containing prediction results + */ + def transformLeaf(testSet: Dataset[_]): Unit = { + outputCol = "predLeaf" + transformSchema(testSet.schema, logging = true) + val broadcastBooster = testSet.sparkSession.sparkContext.broadcast(_booster) + val instances = testSet.rdd.mapPartitions { + rowIterator => + if (rowIterator.hasNext) { + val (rowItr1, rowItr2) = rowIterator.duplicate + val vectorIterator = rowItr2.map(row => row.asInstanceOf[Row].getAs[Vector](inputCol)). + toList.iterator + import DataUtils._ + val testDataset = new DMatrix(vectorIterator, null) + val rowPredictResults = broadcastBooster.value.predictLeaf(testDataset) + val predictResults = rowPredictResults.map(prediction => Row(prediction)).iterator + rowItr1.zip(predictResults).map { + case (originalColumns: Row, predictColumn: Row) => + Row.fromSeq(originalColumns.toSeq ++ predictColumn.toSeq) + } + } else { + Iterator[Row]() + } + } + testSet.sparkSession.createDataFrame(instances, testSet.schema.add(outputCol, outputType)). + cache() + } + /** * produces the prediction results and append as an additional column in the original dataset * NOTE: the prediction results is kept as the original format of xgboost @@ -201,13 +232,13 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa * NOTE: the prediction results is transformed by applying the transformation function * predictResultTrans to the original xgboost output * - * @param predictResultTrans the function to transform xgboost output to the expected format + * @param rawPredictTransformer the function to transform xgboost output to the expected format * @return the original dataframe with an additional column containing prediction results */ - def transform(testSet: Dataset[_], predictResultTrans: Option[Array[Float] => DataType]): + def transform(testSet: Dataset[_], rawPredictTransformer: Option[Array[Float] => DataType]): DataFrame = { transformSchema(testSet.schema, logging = true) - val broadcastBooster = testSet.sqlContext.sparkContext.broadcast(_booster) + val broadcastBooster = testSet.sparkSession.sparkContext.broadcast(_booster) val instances = testSet.rdd.mapPartitions { rowIterator => if (rowIterator.hasNext) { @@ -218,8 +249,9 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa val testDataset = new DMatrix(vectorIterator, null) val rowPredictResults = broadcastBooster.value.predict(testDataset) val predictResults = { - if (predictResultTrans.isDefined) { - rowPredictResults.map(prediction => Row(predictResultTrans.get(prediction))).iterator + if (rawPredictTransformer.isDefined) { + rowPredictResults.map(prediction => + Row(rawPredictTransformer.get(prediction))).iterator } else { rowPredictResults.map(prediction => Row(prediction)).iterator } @@ -232,7 +264,7 @@ class XGBoostModel(_booster: Booster) extends Model[XGBoostModel] with Serializa Iterator[Row]() } } - testSet.sqlContext.createDataFrame(instances, testSet.schema.add("prediction", outputType)). + testSet.sparkSession.createDataFrame(instances, testSet.schema.add(outputCol, outputType)). cache() } diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index f02496096..956bf859c 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -133,13 +133,12 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { } test("test eval functions with RDD") { - val trainingRDD = buildTrainingRDD(sc) + val trainingRDD = buildTrainingRDD(sc).cache() val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0", "objective" -> "binary:logistic").toMap val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers) - val evalFunc = new EvalError xgBoostModel.eval(trainingRDD, "eval1", iter = 5, useExternalCache = false) - xgBoostModel.eval(trainingRDD, "eval2", evalFunc = evalFunc, useExternalCache = false) + xgBoostModel.eval(trainingRDD, "eval2", evalFunc = new EvalError, useExternalCache = false) } test("test prediction functionality with empty partition") {