diff --git a/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkWithRDD.scala b/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkWithRDD.scala index 851cffea9..ed3d54fa3 100644 --- a/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkWithRDD.scala +++ b/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkWithRDD.scala @@ -16,12 +16,13 @@ package ml.dmlc.xgboost4j.scala.example.spark -import ml.dmlc.xgboost4j.scala.{Booster, DMatrix} -import ml.dmlc.xgboost4j.scala.spark.{DataUtils, XGBoost} -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.ml.linalg.{DenseVector => MLDenseVector} +import ml.dmlc.xgboost4j.scala.Booster +import ml.dmlc.xgboost4j.scala.spark.XGBoost + import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} +import org.apache.spark.ml.linalg.{DenseVector => MLDenseVector} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.{SparkConf, SparkContext} object SparkWithRDD { def main(args: Array[String]): Unit = { @@ -39,11 +40,10 @@ object SparkWithRDD { val outputModelPath = args(4) // number of iterations val numRound = args(0).toInt - import DataUtils._ val trainRDD = MLUtils.loadLibSVMFile(sc, inputTrainPath).map(lp => MLLabeledPoint(lp.label, new MLDenseVector(lp.features.toArray))) - val testSet = MLUtils.loadLibSVMFile(sc, inputTestPath).collect().map( - lp => new MLDenseVector(lp.features.toArray)).iterator + val testSet = MLUtils.loadLibSVMFile(sc, inputTestPath) + .map(lp => new MLDenseVector(lp.features.toArray)) // training parameters val paramMap = List( "eta" -> 0.1f, @@ -51,7 +51,7 @@ object SparkWithRDD { "objective" -> "binary:logistic").toMap val xgboostModel = XGBoost.trainWithRDD(trainRDD, paramMap, numRound, nWorkers = args(1).toInt, useExternalMemory = true) - xgboostModel.booster.predict(new DMatrix(testSet)) + xgboostModel.predict(testSet, missingValue = Float.NaN) // save model to HDFS path xgboostModel.saveModelAsHadoopFile(outputModelPath) } diff --git a/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala b/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala index 9ac8c2668..fe34783cd 100644 --- a/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala +++ b/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala @@ -16,19 +16,19 @@ package ml.dmlc.xgboost4j.scala.flink -import scala.collection.JavaConverters.asScalaIteratorConverter; +import scala.collection.JavaConverters.asScalaIteratorConverter + import ml.dmlc.xgboost4j.LabeledPoint -import ml.dmlc.xgboost4j.java.{RabitTracker, Rabit} +import ml.dmlc.xgboost4j.java.{Rabit, RabitTracker} import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => XGBoostScala} + import org.apache.commons.logging.LogFactory import org.apache.flink.api.common.functions.RichMapPartitionFunction -import org.apache.flink.api.scala.DataSet -import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.{DataSet, _} import org.apache.flink.ml.common.LabeledVector import org.apache.flink.util.Collector -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} object XGBoost { /** @@ -49,8 +49,7 @@ object XGBoost { Rabit.init(workerEnvs) val mapper = (x: LabeledVector) => { val (index, value) = x.vector.toSeq.unzip - LabeledPoint.fromSparseVector(x.label.toFloat, - index.toArray, value.map(z => z.toFloat).toArray) + LabeledPoint(x.label.toFloat, index.toArray, value.map(_.toFloat).toArray) } val dataIter = for (x <- it.iterator().asScala) yield mapper(x) val trainMat = new DMatrix(dataIter, null) diff --git a/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoostModel.scala b/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoostModel.scala index b66439c38..46b5689a5 100644 --- a/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoostModel.scala +++ b/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoostModel.scala @@ -17,13 +17,12 @@ package ml.dmlc.xgboost4j.scala.flink import ml.dmlc.xgboost4j.LabeledPoint -import ml.dmlc.xgboost4j.scala.{DMatrix, Booster} -import org.apache.flink.api.scala.DataSet -import org.apache.flink.api.scala._ +import ml.dmlc.xgboost4j.scala.{Booster, DMatrix} + +import org.apache.flink.api.scala.{DataSet, _} import org.apache.flink.ml.math.Vector -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} class XGBoostModel (booster: Booster) extends Serializable { /** @@ -57,8 +56,7 @@ class XGBoostModel (booster: Booster) extends Serializable { (it: Iterator[Vector]) => { val mapper = (x: Vector) => { val (index, value) = x.toSeq.unzip - LabeledPoint.fromSparseVector(0.0f, - index.toArray, value.map(z => z.toFloat).toArray) + LabeledPoint(0.0f, index.toArray, value.map(_.toFloat).toArray) } val dataIter = for (x <- it) yield mapper(x) val dmat = new DMatrix(dataIter, null) diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala index 064ab0ea7..dbec929ae 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala @@ -16,47 +16,55 @@ package ml.dmlc.xgboost4j.scala.spark -import scala.collection.JavaConverters._ +import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} -import ml.dmlc.xgboost4j.LabeledPoint import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} -import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} object DataUtils extends Serializable { + private[spark] implicit class XGBLabeledPointFeatures( + val labeledPoint: XGBLabeledPoint + ) extends AnyVal { + /** Converts the point to [[MLLabeledPoint]]. */ + private[spark] def asML: MLLabeledPoint = { + MLLabeledPoint(labeledPoint.label, labeledPoint.features) + } - implicit def fromSparkPointsToXGBoostPointsJava(sps: Iterator[MLLabeledPoint]) - : java.util.Iterator[LabeledPoint] = { - fromSparkPointsToXGBoostPoints(sps).asJava - } - - implicit def fromSparkPointsToXGBoostPoints(sps: Iterator[MLLabeledPoint]): - Iterator[LabeledPoint] = { - for (p <- sps) yield { - p.features match { - case denseFeature: DenseVector => - LabeledPoint.fromDenseVector(p.label.toFloat, denseFeature.values.map(_.toFloat)) - case sparseFeature: SparseVector => - LabeledPoint.fromSparseVector(p.label.toFloat, sparseFeature.indices, - sparseFeature.values.map(_.toFloat)) - } + /** + * Returns feature of the point as [[org.apache.spark.ml.linalg.Vector]]. + * + * If the point is sparse, the dimensionality of the resulting sparse + * vector would be [[Int.MaxValue]]. This is the only safe value, since + * XGBoost does not store the dimensionality explicitly. + */ + def features: Vector = if (labeledPoint.indices == null) { + Vectors.dense(labeledPoint.values.map(_.toDouble)) + } else { + Vectors.sparse(Int.MaxValue, labeledPoint.indices, labeledPoint.values.map(_.toDouble)) } } - implicit def fromSparkVectorToXGBoostPointsJava(sps: Iterator[Vector]) - : java.util.Iterator[LabeledPoint] = { - fromSparkVectorToXGBoostPoints(sps).asJava + private[spark] implicit class MLLabeledPointToXGBLabeledPoint( + val labeledPoint: MLLabeledPoint + ) extends AnyVal { + /** Converts an [[MLLabeledPoint]] to an [[XGBLabeledPoint]]. */ + def asXGB: XGBLabeledPoint = { + labeledPoint.features.asXGB.copy(label = labeledPoint.label.toFloat) + } } - implicit def fromSparkVectorToXGBoostPoints(sps: Iterator[Vector]) - : Iterator[LabeledPoint] = { - for (p <- sps) yield { - p match { - case denseFeature: DenseVector => - LabeledPoint.fromDenseVector(0.0f, denseFeature.values.map(_.toFloat)) - case sparseFeature: SparseVector => - LabeledPoint.fromSparseVector(0.0f, sparseFeature.indices, - sparseFeature.values.map(_.toFloat)) - } + private[spark] implicit class MLVectorToXGBLabeledPoint(val v: Vector) extends AnyVal { + /** + * Converts a [[Vector]] to a data point with a dummy label. + * + * This is needed for constructing a [[ml.dmlc.xgboost4j.scala.DMatrix]] + * for prediction. + */ + def asXGB: XGBLabeledPoint = v match { + case v: DenseVector => + XGBLabeledPoint(0.0f, null, v.values.map(_.toFloat)) + case v: SparseVector => + XGBLabeledPoint(0.0f, v.indices, v.values.map(_.toFloat)) } } } diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index 8eb9c2400..ad4dc10e5 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -21,13 +21,13 @@ import scala.collection.mutable import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker} import ml.dmlc.xgboost4j.scala.rabit.RabitTracker import ml.dmlc.xgboost4j.scala.{XGBoost => SXGBoost, _} +import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} + import org.apache.commons.logging.LogFactory import org.apache.hadoop.fs.{FSDataInputStream, Path} - -import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} -import org.apache.spark.ml.linalg.SparseVector import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset +import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} import org.apache.spark.{SparkContext, TaskContext} object TrackerConf { @@ -52,30 +52,49 @@ object XGBoost extends Serializable { private val logger = LogFactory.getLog("XGBoostSpark") private def fromDenseToSparseLabeledPoints( - denseLabeledPoints: Iterator[MLLabeledPoint], - missing: Float): Iterator[MLLabeledPoint] = { + denseLabeledPoints: Iterator[XGBLabeledPoint], + missing: Float): Iterator[XGBLabeledPoint] = { if (!missing.isNaN) { - denseLabeledPoints.map { case MLLabeledPoint(label, features) => - val dFeatures = features.toDense - val indices = new mutable.ArrayBuilder.ofInt() - val values = new mutable.ArrayBuilder.ofDouble() - for (i <- dFeatures.values.indices) { - if (dFeatures.values(i) != missing) { - indices += i - values += dFeatures.values(i) - } + denseLabeledPoints.map { labeledPoint => + val indicesBuilder = new mutable.ArrayBuilder.ofInt() + val valuesBuilder = new mutable.ArrayBuilder.ofFloat() + for ((value, i) <- labeledPoint.values.zipWithIndex if value != missing) { + indicesBuilder += (if (labeledPoint.indices == null) i else labeledPoint.indices(i)) + valuesBuilder += value } - val sFeatures = new SparseVector(dFeatures.values.length, indices.result(), - values.result()) - MLLabeledPoint(label, sFeatures) + labeledPoint.copy(indices = indicesBuilder.result(), values = valuesBuilder.result()) } } else { denseLabeledPoints } } + private def fromBaseMarginsToArray(baseMargins: Iterator[Float]): Option[Array[Float]] = { + val builder = new mutable.ArrayBuilder.ofFloat() + var nTotal = 0 + var nUndefined = 0 + while (baseMargins.hasNext) { + nTotal += 1 + val baseMargin = baseMargins.next() + if (baseMargin.isNaN) { + nUndefined += 1 // don't waste space for all-NaNs. + } else { + builder += baseMargin + } + } + if (nUndefined == nTotal) { + None + } else if (nUndefined == 0) { + Some(builder.result()) + } else { + throw new IllegalArgumentException( + s"Encountered a partition with $nUndefined NaN base margin values. " + + "If you want to specify base margin, ensure all values are non-NaN.") + } + } + private[spark] def buildDistributedBoosters( - trainingSet: RDD[MLLabeledPoint], + trainingSet: RDD[XGBLabeledPoint], params: Map[String, Any], rabitEnv: java.util.Map[String, String], numWorkers: Int, @@ -83,25 +102,20 @@ object XGBoost extends Serializable { obj: ObjectiveTrait, eval: EvalTrait, useExternalMemory: Boolean, - missing: Float, - baseMargin: RDD[Float]): RDD[Booster] = { - import DataUtils._ - + missing: Float): RDD[Booster] = { val partitionedTrainingSet = if (trainingSet.getNumPartitions != numWorkers) { logger.info(s"repartitioning training set to $numWorkers partitions") trainingSet.repartition(numWorkers) } else { trainingSet } - val partitionedBaseMargin = Option(baseMargin) - .getOrElse(trainingSet.sparkContext.emptyRDD) - .repartition(partitionedTrainingSet.getNumPartitions) + val partitionedBaseMargin = partitionedTrainingSet.map(_.baseMargin) val appName = partitionedTrainingSet.context.appName // to workaround the empty partitions in training dataset, // this might not be the best efficient implementation, see // (https://github.com/dmlc/xgboost/issues/1277) - partitionedTrainingSet.zipPartitions(partitionedBaseMargin) { (trainingSamples, baseMargin) => - if (trainingSamples.isEmpty) { + partitionedTrainingSet.zipPartitions(partitionedBaseMargin) { (trainingPoints, baseMargins) => + if (trainingPoints.isEmpty) { throw new XGBoostError( s"detected an empty partition in the training data, partition ID:" + s" ${TaskContext.getPartitionId()}") @@ -114,16 +128,15 @@ object XGBoost extends Serializable { } rabitEnv.put("DMLC_TASK_ID", TaskContext.getPartitionId().toString) Rabit.init(rabitEnv) - val partitionItr = fromDenseToSparseLabeledPoints(trainingSamples, missing) - val trainingMatrix = new DMatrix(partitionItr, cacheFileName) + val trainingMatrix = new DMatrix( + fromDenseToSparseLabeledPoints(trainingPoints, missing), cacheFileName) try { + // TODO: use group attribute from the points. if (params.contains("groupData") && params("groupData") != null) { trainingMatrix.setGroup(params("groupData").asInstanceOf[Seq[Seq[Int]]]( TaskContext.getPartitionId()).toArray) } - if (baseMargin.nonEmpty) { - trainingMatrix.setBaseMargin(baseMargin.toArray) - } + fromBaseMarginsToArray(baseMargins).foreach(trainingMatrix.setBaseMargin) val booster = SXGBoost.train(trainingMatrix, params, round, watches = Map("train" -> trainingMatrix), obj, eval) Iterator(booster) @@ -199,7 +212,6 @@ object XGBoost extends Serializable { * @param useExternalMemory indicate whether to use external memory cache, by setting this flag as * true, the user may save the RAM cost for running XGBoost within Spark * @param missing the value represented the missing value in the dataset - * @param baseMargin initial prediction for boosting. * @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed * @return XGBoostModel when successful training */ @@ -212,10 +224,9 @@ object XGBoost extends Serializable { obj: ObjectiveTrait = null, eval: EvalTrait = null, useExternalMemory: Boolean = false, - missing: Float = Float.NaN, - baseMargin: RDD[Float] = null): XGBoostModel = { + missing: Float = Float.NaN): XGBoostModel = { trainWithRDD(trainingData, params, round, nWorkers, obj, eval, useExternalMemory, - missing, baseMargin) + missing) } private def overrideParamsAccordingToTaskCPUs( @@ -257,7 +268,6 @@ object XGBoost extends Serializable { * @param useExternalMemory indicate whether to use external memory cache, by setting this flag as * true, the user may save the RAM cost for running XGBoost within Spark * @param missing the value represented the missing value in the dataset - * @param baseMargin initial prediction for boosting. * @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed * @return XGBoostModel when successful training */ @@ -270,30 +280,46 @@ object XGBoost extends Serializable { obj: ObjectiveTrait = null, eval: EvalTrait = null, useExternalMemory: Boolean = false, - missing: Float = Float.NaN, - baseMargin: RDD[Float] = null): XGBoostModel = { + missing: Float = Float.NaN): XGBoostModel = { + import DataUtils._ + val xgbTrainingData = trainingData.map { case MLLabeledPoint(label, features) => + features.asXGB.copy(label = label.toFloat) + } + trainDistributed(xgbTrainingData, params, round, nWorkers, obj, eval, + useExternalMemory, missing) + } + + @throws(classOf[XGBoostError]) + private[spark] def trainDistributed( + trainingData: RDD[XGBLabeledPoint], + params: Map[String, Any], + round: Int, + nWorkers: Int, + obj: ObjectiveTrait = null, + eval: EvalTrait = null, + useExternalMemory: Boolean = false, + missing: Float = Float.NaN): XGBoostModel = { if (params.contains("tree_method")) { require(params("tree_method") != "hist", "xgboost4j-spark does not support fast histogram" + - " for now") + " for now") } require(nWorkers > 0, "you must specify more than 0 workers") if (obj != null) { require(params.get("obj_type").isDefined, "parameter \"obj_type\" is not defined," + - " you have to specify the objective type as classification or regression with a" + - " customized objective function") + " you have to specify the objective type as classification or regression with a" + + " customized objective function") } val trackerConf = params.get("tracker_conf") match { case None => TrackerConf() case Some(conf: TrackerConf) => conf case _ => throw new IllegalArgumentException("parameter \"tracker_conf\" must be an " + - "instance of TrackerConf.") + "instance of TrackerConf.") } val tracker = startTracker(nWorkers, trackerConf) try { val overriddenParams = overrideParamsAccordingToTaskCPUs(params, trainingData.sparkContext) val boosters = buildDistributedBoosters(trainingData, overriddenParams, - tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing, - baseMargin) + tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing) val sparkJobThread = new Thread() { override def run() { // force the job diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala index ba7fe1098..94c99d41b 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala @@ -19,23 +19,23 @@ package ml.dmlc.xgboost4j.scala.spark import scala.collection.mutable import ml.dmlc.xgboost4j.scala.spark.params._ -import org.json4s.DefaultFormats +import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} import org.apache.spark.ml.Predictor -import org.apache.spark.ml.feature.LabeledPoint -import org.apache.spark.ml.linalg.{Vector => MLVector} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.ml.param._ import org.apache.spark.ml.util._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.FloatType import org.apache.spark.sql.{Dataset, Row} +import org.json4s.DefaultFormats /** * XGBoost Estimator to produce a XGBoost model */ class XGBoostEstimator private[spark]( override val uid: String, xgboostParams: Map[String, Any]) - extends Predictor[MLVector, XGBoostEstimator, XGBoostModel] + extends Predictor[Vector, XGBoostEstimator, XGBoostModel] with LearningTaskParams with GeneralParams with BoosterParams with MLWritable { def this(xgboostParams: Map[String, Any]) = @@ -107,18 +107,32 @@ class XGBoostEstimator private[spark]( } } + private def ensureColumns(trainingSet: Dataset[_]): Dataset[_] = { + if (trainingSet.columns.contains($(baseMarginCol))) { + trainingSet + } else { + trainingSet.withColumn($(baseMarginCol), lit(Float.NaN)) + } + } + /** * produce a XGBoostModel by fitting the given dataset */ override def train(trainingSet: Dataset[_]): XGBoostModel = { - val instances = trainingSet.select( - col($(featuresCol)), col($(labelCol)).cast(DoubleType)).rdd.map { - case Row(feature: MLVector, label: Double) => - LabeledPoint(label, feature) + val instances = ensureColumns(trainingSet).select( + col($(featuresCol)), + col($(labelCol)).cast(FloatType), + col($(baseMarginCol)).cast(FloatType) + ).rdd.map { case Row(features: Vector, label: Float, baseMargin: Float) => + val (indices, values) = features match { + case v: SparseVector => (v.indices, v.values.map(_.toFloat)) + case v: DenseVector => (null, v.values.map(_.toFloat)) + } + XGBLabeledPoint(label.toFloat, indices, values, baseMargin = baseMargin) } transformSchema(trainingSet.schema, logging = true) val derivedXGBoosterParamMap = fromParamsToXGBParamMap - val trainedModel = XGBoost.trainWithRDD(instances, derivedXGBoosterParamMap, + val trainedModel = XGBoost.trainDistributed(instances, derivedXGBoosterParamMap, $(round), $(nWorkers), $(customObj), $(customEval), $(useExternalMemory), $(missing)).setParent(this) val returnedModel = copyValues(trainedModel, extractParamMap()) diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala index c0be87de1..8f209eebf 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostModel.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import ml.dmlc.xgboost4j.java.Rabit import ml.dmlc.xgboost4j.scala.spark.params.{BoosterParams, DefaultXGBoostParamsWriter} import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, EvalTrait} + import org.apache.hadoop.fs.{FSDataOutputStream, Path} import org.apache.spark.ml.PredictionModel @@ -66,7 +67,7 @@ abstract class XGBoostModel(protected var _booster: Booster) val rabitEnv = Map("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString) Rabit.init(rabitEnv.asJava) if (testSamples.nonEmpty) { - val dMatrix = new DMatrix(testSamples) + val dMatrix = new DMatrix(testSamples.map(_.asXGB)) try { broadcastBooster.value.predictLeaf(dMatrix).iterator } finally { @@ -103,6 +104,7 @@ abstract class XGBoostModel(protected var _booster: Booster) val appName = evalDataset.context.appName val allEvalMetrics = evalDataset.mapPartitions { labeledPointsPartition => + import DataUtils._ if (labeledPointsPartition.hasNext) { val rabitEnv = Map("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString) Rabit.init(rabitEnv.asJava) @@ -114,8 +116,7 @@ abstract class XGBoostModel(protected var _booster: Booster) null } } - import DataUtils._ - val dMatrix = new DMatrix(labeledPointsPartition, cacheFileName) + val dMatrix = new DMatrix(labeledPointsPartition.map(_.features.asXGB), cacheFileName) try { if (groupData != null) { dMatrix.setGroup(groupData(TaskContext.getPartitionId()).toArray) @@ -202,7 +203,7 @@ abstract class XGBoostModel(protected var _booster: Booster) null } } - val dMatrix = new DMatrix(testSamples, cacheFileName) + val dMatrix = new DMatrix(testSamples.map(_.asXGB), cacheFileName) try { broadcastBooster.value.predict(dMatrix).iterator } finally { @@ -250,7 +251,7 @@ abstract class XGBoostModel(protected var _booster: Booster) null } } - val testDataset = new DMatrix(vectorIterator, cachePrefix) + val testDataset = new DMatrix(vectorIterator.map(_.asXGB), cachePrefix) try { val rawPredictResults = { if (!predLeaf) { diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/LearningTaskParams.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/LearningTaskParams.scala index 722f7d079..5be9173ab 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/LearningTaskParams.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/LearningTaskParams.scala @@ -60,7 +60,13 @@ trait LearningTaskParams extends Params { val groupData = new GroupDataParam(this, "groupData", "group data specify each group size" + " for ranking task. To correspond to partition of training data, it is nested.") - setDefault(objective -> "reg:linear", baseScore -> 0.5, numClasses -> 2, groupData -> null) + /** + * Initial prediction (aka base margin) column name. + */ + val baseMarginCol = new Param[String](this, "baseMarginCol", "base margin column name") + + setDefault(objective -> "reg:linear", baseScore -> 0.5, numClasses -> 2, groupData -> null, + baseMarginCol -> "baseMargin") } private[spark] object LearningTaskParams { diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala index 426de99fb..a33443614 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala @@ -18,8 +18,7 @@ package ml.dmlc.xgboost4j.scala.spark import scala.io.Source -import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} -import org.apache.spark.ml.linalg.{Vectors => MLVectors} +import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} trait TrainTestData { protected def getResourceLines(resource: String): Iterator[String] = { @@ -32,60 +31,60 @@ trait TrainTestData { Source.fromInputStream(is).getLines() } - protected def getLabeledPoints(resource: String, zeroBased: Boolean): Seq[MLLabeledPoint] = { + protected def getLabeledPoints(resource: String, zeroBased: Boolean): Seq[XGBLabeledPoint] = { getResourceLines(resource).map { line => val labelAndFeatures = line.split(" ") - val label = labelAndFeatures.head.toDouble - val values = new Array[Double](126) + val label = labelAndFeatures.head.toFloat + val values = new Array[Float](126) for (feature <- labelAndFeatures.tail) { val idAndValue = feature.split(":") if (!zeroBased) { - values(idAndValue(0).toInt - 1) = idAndValue(1).toDouble + values(idAndValue(0).toInt - 1) = idAndValue(1).toFloat } else { - values(idAndValue(0).toInt) = idAndValue(1).toDouble + values(idAndValue(0).toInt) = idAndValue(1).toFloat } } - MLLabeledPoint(label, MLVectors.dense(values)) + XGBLabeledPoint(label, null, values) }.toList } } object Classification extends TrainTestData { - val train: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.train", zeroBased = false) - val test: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.test", zeroBased = false) + val train: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.train", zeroBased = false) + val test: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.test", zeroBased = false) } object MultiClassification extends TrainTestData { - val train: Seq[MLLabeledPoint] = getLabeledPoints("/dermatology.data") + val train: Seq[XGBLabeledPoint] = getLabeledPoints("/dermatology.data") - private def getLabeledPoints(resource: String): Seq[MLLabeledPoint] = { + private def getLabeledPoints(resource: String): Seq[XGBLabeledPoint] = { getResourceLines(resource).map { line => val featuresAndLabel = line.split(",") - val label = featuresAndLabel.last.toDouble - 1 - val values = new Array[Double](featuresAndLabel.length - 1) + val label = featuresAndLabel.last.toFloat - 1 + val values = new Array[Float](featuresAndLabel.length - 1) values(values.length - 1) = if (featuresAndLabel(featuresAndLabel.length - 2) == "?") 1 else 0 for (i <- 0 until values.length - 2) { - values(i) = featuresAndLabel(i).toDouble + values(i) = featuresAndLabel(i).toFloat } - MLLabeledPoint(label, MLVectors.dense(values.take(values.length - 1))) + XGBLabeledPoint(label, null, values.take(values.length - 1)) }.toList } } object Regression extends TrainTestData { - val train: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.train", zeroBased = true) - val test: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.test", zeroBased = true) + val train: Seq[XGBLabeledPoint] = getLabeledPoints("/machine.txt.train", zeroBased = true) + val test: Seq[XGBLabeledPoint] = getLabeledPoints("/machine.txt.test", zeroBased = true) } object Ranking extends TrainTestData { - val train0: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-0.txt.train", zeroBased = false) - val train1: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-1.txt.train", zeroBased = false) + val train0: Seq[XGBLabeledPoint] = getLabeledPoints("/rank-demo-0.txt.train", zeroBased = false) + val train1: Seq[XGBLabeledPoint] = getLabeledPoints("/rank-demo-1.txt.train", zeroBased = false) val trainGroup0: Seq[Int] = getGroups("/rank-demo-0.txt.train.group") val trainGroup1: Seq[Int] = getGroups("/rank-demo-1.txt.train.group") - val test: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo.txt.test", zeroBased = false) + val test: Seq[XGBLabeledPoint] = getLabeledPoints("/rank-demo.txt.test", zeroBased = false) private def getGroups(resource: String): Seq[Int] = { getResourceLines(resource).map(_.toInt).toList diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala index 6f1d92645..c216ab257 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala @@ -18,6 +18,8 @@ package ml.dmlc.xgboost4j.scala.spark import ml.dmlc.xgboost4j.scala.{Booster, DMatrix} +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import org.scalatest.FunSuite @@ -27,19 +29,18 @@ class XGBoostConfigureSuite extends FunSuite with PerTest { .config("spark.kryo.classesToRegister", classOf[Booster].getName) test("nthread configuration must be no larger than spark.task.cpus") { - val trainingRDD = sc.parallelize(Classification.train) val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic", "nthread" -> (sc.getConf.getInt("spark.task.cpus", 1) + 1)) intercept[IllegalArgumentException] { - XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers) + XGBoost.trainWithRDD(sc.parallelize(List()), paramMap, 5, numWorkers) } } test("kryoSerializer test") { import DataUtils._ // TODO write an isolated test for Booster. - val trainingRDD = sc.parallelize(Classification.train) + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator, null) val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic") diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala index 8fb3dbbde..8a5813d3c 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala @@ -17,20 +17,22 @@ package ml.dmlc.xgboost4j.scala.spark import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost} +import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} -import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} import org.apache.spark.ml.linalg.DenseVector import org.apache.spark.ml.param.ParamMap +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.scalatest.FunSuite class XGBoostDFSuite extends FunSuite with PerTest { private def buildDataFrame( - instances: Seq[MLLabeledPoint], + labeledPoints: Seq[XGBLabeledPoint], numPartitions: Int = numWorkers): DataFrame = { - val it = instances.iterator.zipWithIndex - .map { case (instance: MLLabeledPoint, id: Int) => - (id, instance.label, instance.features) + import DataUtils._ + val it = labeledPoints.iterator.zipWithIndex + .map { case (labeledPoint: XGBLabeledPoint, id: Int) => + (id, labeledPoint.label, labeledPoint.features) } ss.createDataFrame(sc.parallelize(it.toList, numPartitions)) @@ -42,7 +44,6 @@ class XGBoostDFSuite extends FunSuite with PerTest { "objective" -> "binary:logistic") val trainingItr = Classification.train.iterator val testItr = Classification.test.iterator - import DataUtils._ val round = 5 val trainDMatrix = new DMatrix(trainingItr) val testDMatrix = new DMatrix(testItr) @@ -157,7 +158,6 @@ class XGBoostDFSuite extends FunSuite with PerTest { val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap, round = 10, nWorkers = math.min(2, numWorkers)) val error = new EvalError - import DataUtils._ val testSetDMatrix = new DMatrix(testItr) assert(error.eval(xgBoostModelWithDF.booster.predict(testSetDMatrix, outPutMargin = true), testSetDMatrix) < 0.1) @@ -193,4 +193,24 @@ class XGBoostDFSuite extends FunSuite with PerTest { assert(model.get[Double](model.eta).get == 0.1) assert(model.get[Int](model.maxDepth).get == 6) } + + test("test use base margin") { + import DataUtils._ + val trainingDf = buildDataFrame(Classification.train) + val trainingDfWithMargin = trainingDf.withColumn("margin", functions.rand()) + val testRDD = sc.parallelize(Classification.test.map(_.features)) + val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", + "objective" -> "binary:logistic", "baseMarginCol" -> "margin") + + def trainPredict(df: Dataset[_]): Array[Float] = { + XGBoost.trainWithDataFrame(df, paramMap, round = 1, numWorkers) + .predict(testRDD) + .map { case Array(p) => p } + .collect() + } + + val pred = trainPredict(trainingDf) + val predWithMargin = trainPredict(trainingDfWithMargin) + assert((pred, predWithMargin).zipped.exists { case (p, pwm) => p !== pwm }) + } } diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index fbe8a9c85..de4ab91ce 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -19,7 +19,6 @@ package ml.dmlc.xgboost4j.scala.spark import java.nio.file.Files import java.util.concurrent.LinkedBlockingDeque -import scala.collection.mutable.ListBuffer import scala.util.Random import ml.dmlc.xgboost4j.java.Rabit @@ -27,8 +26,8 @@ import ml.dmlc.xgboost4j.scala.DMatrix import ml.dmlc.xgboost4j.scala.rabit.RabitTracker import org.apache.spark.SparkContext -import org.apache.spark.ml.feature.LabeledPoint -import org.apache.spark.ml.linalg.{Vectors, Vector => SparkVector} +import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} +import org.apache.spark.ml.linalg.{DenseVector, Vectors, Vector => SparkVector} import org.apache.spark.rdd.RDD import org.scalatest.FunSuite @@ -82,15 +81,15 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { "objective" -> "binary:logistic").toMap, new java.util.HashMap[String, String](), numWorkers = 2, round = 5, eval = null, obj = null, useExternalMemory = true, - missing = Float.NaN, baseMargin = null) + missing = Float.NaN) val boosterCount = boosterRDD.count() assert(boosterCount === 2) } test("training with external memory cache") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic").toMap @@ -101,9 +100,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } test("training with Scala-implemented Rabit tracker") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic", @@ -115,9 +114,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } ignore("test with fast histo depthwise") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic", "tree_method" -> "hist", @@ -130,9 +129,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } ignore("test with fast histo lossguide") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "1", "objective" -> "binary:logistic", "tree_method" -> "hist", @@ -145,9 +144,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } ignore("test with fast histo lossguide with max bin") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", "objective" -> "binary:logistic", "tree_method" -> "hist", @@ -161,9 +160,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } ignore("test with fast histo depthwidth with max depth") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", "objective" -> "binary:logistic", "tree_method" -> "hist", @@ -177,9 +176,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } ignore("test with fast histo depthwidth with max depth and max bin") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", "objective" -> "binary:logistic", "tree_method" -> "hist", @@ -193,7 +192,7 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } test("test with dense vectors containing missing value") { - def buildDenseRDD(): RDD[LabeledPoint] = { + def buildDenseRDD(): RDD[MLLabeledPoint] = { val numRows = 100 val numCols = 5 @@ -203,23 +202,24 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { if (c == numCols - 1) -0.1 else Random.nextDouble() } - LabeledPoint(label, Vectors.dense(values)) + MLLabeledPoint(label, Vectors.dense(values)) } sc.parallelize(labeledPoints) } val trainingRDD = buildDenseRDD().repartition(4) - val testRDD = buildDenseRDD().repartition(4) + val testRDD = buildDenseRDD().repartition(4).map(_.features.asInstanceOf[DenseVector]) val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic").toMap val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers, useExternalMemory = true) - xgBoostModel.predict(testRDD.map(_.features.toDense), missingValue = -0.1f).collect() + xgBoostModel.predict(testRDD, missingValue = -0.1f).collect() } test("test consistency of prediction functions with RDD") { - val trainingRDD = sc.parallelize(Classification.train) + import DataUtils._ + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSet = Classification.test val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features) val testCollection = testRDD.collect() @@ -232,7 +232,6 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { val predRDD = xgBoostModel.predict(testRDD) val predResult1 = predRDD.collect() assert(testRDD.count() === predResult1.length) - import DataUtils._ val predResult2 = xgBoostModel.booster.predict(new DMatrix(testSet.iterator)) for (i <- predResult1.indices; j <- predResult1(i).indices) { assert(predResult1(i)(j) === predResult2(i)(j)) @@ -240,21 +239,22 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } test("test eval functions with RDD") { - val trainingRDD = sc.parallelize(Classification.train).cache() + import DataUtils._ + val trainingRDD = sc.parallelize(Classification.train).map(_.asML).cache() val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic") - val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, nWorkers = numWorkers) + val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, numWorkers) // Nan Zhu: deprecate it for now // xgBoostModel.eval(trainingRDD, "eval1", iter = 5, useExternalCache = false) xgBoostModel.eval(trainingRDD, "eval2", evalFunc = new EvalError, useExternalCache = false) } test("test prediction functionality with empty partition") { + import DataUtils._ def buildEmptyRDD(sparkContext: Option[SparkContext] = None): RDD[SparkVector] = { sparkContext.getOrElse(sc).parallelize(List[SparkVector](), numWorkers) } - - val trainingRDD = sc.parallelize(Classification.train) + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testRDD = buildEmptyRDD() val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic").toMap @@ -263,9 +263,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } test("test model consistency after save and load") { - val eval = new EvalError() - val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ + val eval = new EvalError() + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) val testSetDMatrix = new DMatrix(Classification.test.iterator) val tempDir = Files.createTempDirectory("xgboosttest-") val tempFile = Files.createTempFile(tempDir, "", "") @@ -283,9 +283,10 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } test("test save and load of different types of models") { + import DataUtils._ val tempDir = Files.createTempDirectory("xgboosttest-") val tempFile = Files.createTempFile(tempDir, "", "") - val trainingRDD = sc.parallelize(Classification.train) + val trainingRDD = sc.parallelize(Classification.train).map(_.asML) var paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "reg:linear") // validate regression model @@ -320,7 +321,8 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } test("test use groupData") { - val trainingRDD = sc.parallelize(Ranking.train0, numSlices = 1) + import DataUtils._ + val trainingRDD = sc.parallelize(Ranking.train0, numSlices = 1).map(_.asML) val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0) val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features) @@ -337,9 +339,10 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { } test("test use nested groupData") { + import DataUtils._ val trainingRDD0 = sc.parallelize(Ranking.train0, numSlices = 1) val trainingRDD1 = sc.parallelize(Ranking.train1, numSlices = 1) - val trainingRDD = trainingRDD0.union(trainingRDD1) + val trainingRDD = trainingRDD0.union(trainingRDD1).map(_.asML) val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0, Ranking.trainGroup1) @@ -353,27 +356,4 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { val predResult1: Array[Array[Float]] = predRDD.collect() assert(testRDD.count() === predResult1.length) } - - test("test use base margin") { - val trainRDD = sc.parallelize(Ranking.train0, numSlices = 1) - val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features) - - val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", - "objective" -> "rank:pairwise") - - val trainMargin = { - XGBoost.trainWithRDD(trainRDD, paramMap, round = 1, nWorkers = 2) - .predict(trainRDD.map(_.features), outputMargin = true) - .map { case Array(m) => m } - } - - val xgBoostModel = XGBoost.trainWithRDD( - trainRDD, - paramMap, - round = 1, - nWorkers = 2, - baseMargin = trainMargin) - - assert(testRDD.count() === xgBoostModel.predict(testRDD).count()) - } } diff --git a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/LabeledPoint.java b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/LabeledPoint.java deleted file mode 100644 index 5f4351eb1..000000000 --- a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/LabeledPoint.java +++ /dev/null @@ -1,48 +0,0 @@ -package ml.dmlc.xgboost4j; - -import java.io.Serializable; - -/** - * Labeled data point for training examples. - * Represent a sparse training instance. - */ -public class LabeledPoint implements Serializable { - /** Label of the point */ - public float label; - /** Weight of this data point */ - public float weight = 1.0f; - /** Feature indices, used for sparse input */ - public int[] indices = null; - /** Feature values */ - public float[] values; - - private LabeledPoint() {} - - /** - * Create Labeled data point from sparse vector. - * @param label The label of the data point. - * @param indices The indices - * @param values The values. - */ - public static LabeledPoint fromSparseVector(float label, int[] indices, float[] values) { - LabeledPoint ret = new LabeledPoint(); - ret.label = label; - ret.indices = indices; - ret.values = values; - assert indices.length == values.length; - return ret; - } - - /** - * Create Labeled data point from dense vector. - * @param label The label of the data point. - * @param values The values. - */ - public static LabeledPoint fromDenseVector(float label, float[] values) { - LabeledPoint ret = new LabeledPoint(); - ret.label = label; - ret.indices = null; - ret.values = values; - return ret; - } -} diff --git a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java index 09c74b2d3..4a0ff2380 100644 --- a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java +++ b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java @@ -55,7 +55,7 @@ class DataBatch { while (base.hasNext() && batch.size() < batchSize) { LabeledPoint labeledPoint = base.next(); batch.add(labeledPoint); - numElem += labeledPoint.values.length; + numElem += labeledPoint.values().length; numRows++; } @@ -68,18 +68,19 @@ class DataBatch { for (int i = 0; i < batch.size(); i++) { LabeledPoint labeledPoint = batch.get(i); rowOffset[i] = offset; - label[i] = labeledPoint.label; - if (labeledPoint.indices != null) { - System.arraycopy(labeledPoint.indices, 0, featureIndex, offset, - labeledPoint.indices.length); + label[i] = labeledPoint.label(); + if (labeledPoint.indices() != null) { + System.arraycopy(labeledPoint.indices(), 0, featureIndex, offset, + labeledPoint.indices().length); } else { - for (int j = 0; j < labeledPoint.values.length; j++) { + for (int j = 0; j < labeledPoint.values().length; j++) { featureIndex[offset + j] = j; } } - System.arraycopy(labeledPoint.values, 0, featureValue, offset, labeledPoint.values.length); - offset += labeledPoint.values.length; + System.arraycopy(labeledPoint.values(), 0, featureValue, offset, + labeledPoint.values().length); + offset += labeledPoint.values().length; } rowOffset[batch.size()] = offset; diff --git a/jvm-packages/xgboost4j/src/main/scala/ml/dmlc/xgboost4j/LabeledPoint.scala b/jvm-packages/xgboost4j/src/main/scala/ml/dmlc/xgboost4j/LabeledPoint.scala new file mode 100644 index 000000000..73bc91cf3 --- /dev/null +++ b/jvm-packages/xgboost4j/src/main/scala/ml/dmlc/xgboost4j/LabeledPoint.scala @@ -0,0 +1,41 @@ +/* + Copyright (c) 2014 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j + +/** Labeled training data point. */ +private[xgboost4j] case class LabeledPoint( + /** Label of this point. */ + label: Float, + /** Feature indices of this point or `null` if the data is dense. */ + indices: Array[Int], + /** Feature values of this point. */ + values: Array[Float], + /** Weight of this point. */ + weight: Float = 1.0f, + /** Group of this point (used for ranking) or -1. */ + group: Int = -1, + /** Initial prediction on this point or `Float.NaN`. */ + baseMargin: Float = Float.NaN +) extends Serializable { + require(indices == null || indices.length == values.length, + "indices and values must have the same number of elements") + + def this(label: Float, indices: Array[Int], values: Array[Float]) = { + // [[weight]] default duplicated to disambiguate the constructor call. + this(label, indices, values, 1.0f) + } +} diff --git a/jvm-packages/xgboost4j/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java b/jvm-packages/xgboost4j/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java index e2bf5c7ab..24c783987 100644 --- a/jvm-packages/xgboost4j/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java +++ b/jvm-packages/xgboost4j/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java @@ -15,15 +15,11 @@ */ package ml.dmlc.xgboost4j.java; -import java.awt.*; import java.util.Arrays; import java.util.Random; import junit.framework.TestCase; import ml.dmlc.xgboost4j.LabeledPoint; -import ml.dmlc.xgboost4j.java.DMatrix; -import ml.dmlc.xgboost4j.java.DataBatch; -import ml.dmlc.xgboost4j.java.XGBoostError; import org.junit.Test; /** @@ -41,10 +37,10 @@ public class DMatrixTest { int nrep = 3000; java.util.List blist = new java.util.LinkedList(); for (int i = 0; i < nrep; ++i) { - LabeledPoint p = LabeledPoint.fromSparseVector( + LabeledPoint p = new LabeledPoint( 0.1f + i, new int[]{0, 2, 3}, new float[]{3, 4, 5}); blist.add(p); - labelall.add(p.label); + labelall.add(p.label()); } DMatrix dmat = new DMatrix(blist.iterator(), null); // get label