[jvm-packages] Added baseMargin to ml.dmlc.xgboost4j.LabeledPoint (#2532)

* Converted ml.dmlc.xgboost4j.LabeledPoint to Scala

This allows to easily integrate LabeledPoint with Spark DataFrame APIs,
which support encoding/decoding case classes out of the box. Alternative
solution would be to keep LabeledPoint in Java and make it a Bean by
generating boilerplate getters/setters. I have decided against that, even
thought the conversion in this PR implies a public API change.

I also had to remove the factory methods fromSparseVector and
fromDenseVector because a) they would need to be duplicated to support
overloaded calls with extra data (e.g. weight); and b) Scala would expose
them via mangled $.MODULE$ which looks ugly in Java.

Additionally, this commit makes it possible to switch to LabeledPoint in
all public APIs and effectively to pass initial margin/group as part of
the point. This seems to be the only reliable way of implementing distributed
learning with these data. Note that group size format used by single-node
XGBoost is not compatible with that scenario, since the partition split
could divide a group into two chunks.

* Switched to ml.dmlc.xgboost4j.LabeledPoint in RDD-based public APIs

Note that DataFrame-based and Flink APIs are not affected by this change.

* Removed baseMargin argument in favour of the LabeledPoint field

* Do a single pass over the partition in buildDistributedBoosters

Note that there is no formal guarantee that

    val repartitioned = rdd.repartition(42)
    repartitioned.zipPartitions(repartitioned.map(_ + 1)) { it1, it2, => ... }

would do a single shuffle, but in practice it seems to be always the case.

* Exposed baseMargin in DataFrame-based API

* Addressed review comments

* Pass baseMargin to XGBoost.trainWithDataFrame via params

* Reverted MLLabeledPoint in Spark APIs

As discussed, baseMargin would only be supported for DataFrame-based APIs.

* Cleaned up baseMargin tests

- Removed RDD-based test, since the option is no longer exposed via
  public APIs
- Changed DataFrame-based one to check that adding a margin actually
  affects the prediction

* Pleased Scalastyle

* Addressed more review comments

* Pleased scalastyle again

* Fixed XGBoost.fromBaseMarginsToArray

which always returned an array of NaNs even if base margin was not
specified. Surprisingly this only failed a few tests.
This commit is contained in:
Sergei Lebedev
2017-08-10 23:29:26 +02:00
committed by Nan Zhu
parent c1104f7d0a
commit 771a95aec6
16 changed files with 307 additions and 265 deletions

View File

@@ -16,47 +16,55 @@
package ml.dmlc.xgboost4j.scala.spark
import scala.collection.JavaConverters._
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import ml.dmlc.xgboost4j.LabeledPoint
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
object DataUtils extends Serializable {
private[spark] implicit class XGBLabeledPointFeatures(
val labeledPoint: XGBLabeledPoint
) extends AnyVal {
/** Converts the point to [[MLLabeledPoint]]. */
private[spark] def asML: MLLabeledPoint = {
MLLabeledPoint(labeledPoint.label, labeledPoint.features)
}
implicit def fromSparkPointsToXGBoostPointsJava(sps: Iterator[MLLabeledPoint])
: java.util.Iterator[LabeledPoint] = {
fromSparkPointsToXGBoostPoints(sps).asJava
}
implicit def fromSparkPointsToXGBoostPoints(sps: Iterator[MLLabeledPoint]):
Iterator[LabeledPoint] = {
for (p <- sps) yield {
p.features match {
case denseFeature: DenseVector =>
LabeledPoint.fromDenseVector(p.label.toFloat, denseFeature.values.map(_.toFloat))
case sparseFeature: SparseVector =>
LabeledPoint.fromSparseVector(p.label.toFloat, sparseFeature.indices,
sparseFeature.values.map(_.toFloat))
}
/**
* Returns feature of the point as [[org.apache.spark.ml.linalg.Vector]].
*
* If the point is sparse, the dimensionality of the resulting sparse
* vector would be [[Int.MaxValue]]. This is the only safe value, since
* XGBoost does not store the dimensionality explicitly.
*/
def features: Vector = if (labeledPoint.indices == null) {
Vectors.dense(labeledPoint.values.map(_.toDouble))
} else {
Vectors.sparse(Int.MaxValue, labeledPoint.indices, labeledPoint.values.map(_.toDouble))
}
}
implicit def fromSparkVectorToXGBoostPointsJava(sps: Iterator[Vector])
: java.util.Iterator[LabeledPoint] = {
fromSparkVectorToXGBoostPoints(sps).asJava
private[spark] implicit class MLLabeledPointToXGBLabeledPoint(
val labeledPoint: MLLabeledPoint
) extends AnyVal {
/** Converts an [[MLLabeledPoint]] to an [[XGBLabeledPoint]]. */
def asXGB: XGBLabeledPoint = {
labeledPoint.features.asXGB.copy(label = labeledPoint.label.toFloat)
}
}
implicit def fromSparkVectorToXGBoostPoints(sps: Iterator[Vector])
: Iterator[LabeledPoint] = {
for (p <- sps) yield {
p match {
case denseFeature: DenseVector =>
LabeledPoint.fromDenseVector(0.0f, denseFeature.values.map(_.toFloat))
case sparseFeature: SparseVector =>
LabeledPoint.fromSparseVector(0.0f, sparseFeature.indices,
sparseFeature.values.map(_.toFloat))
}
private[spark] implicit class MLVectorToXGBLabeledPoint(val v: Vector) extends AnyVal {
/**
* Converts a [[Vector]] to a data point with a dummy label.
*
* This is needed for constructing a [[ml.dmlc.xgboost4j.scala.DMatrix]]
* for prediction.
*/
def asXGB: XGBLabeledPoint = v match {
case v: DenseVector =>
XGBLabeledPoint(0.0f, null, v.values.map(_.toFloat))
case v: SparseVector =>
XGBLabeledPoint(0.0f, v.indices, v.values.map(_.toFloat))
}
}
}

View File

@@ -21,13 +21,13 @@ import scala.collection.mutable
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
import ml.dmlc.xgboost4j.scala.{XGBoost => SXGBoost, _}
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.{SparkContext, TaskContext}
object TrackerConf {
@@ -52,30 +52,49 @@ object XGBoost extends Serializable {
private val logger = LogFactory.getLog("XGBoostSpark")
private def fromDenseToSparseLabeledPoints(
denseLabeledPoints: Iterator[MLLabeledPoint],
missing: Float): Iterator[MLLabeledPoint] = {
denseLabeledPoints: Iterator[XGBLabeledPoint],
missing: Float): Iterator[XGBLabeledPoint] = {
if (!missing.isNaN) {
denseLabeledPoints.map { case MLLabeledPoint(label, features) =>
val dFeatures = features.toDense
val indices = new mutable.ArrayBuilder.ofInt()
val values = new mutable.ArrayBuilder.ofDouble()
for (i <- dFeatures.values.indices) {
if (dFeatures.values(i) != missing) {
indices += i
values += dFeatures.values(i)
}
denseLabeledPoints.map { labeledPoint =>
val indicesBuilder = new mutable.ArrayBuilder.ofInt()
val valuesBuilder = new mutable.ArrayBuilder.ofFloat()
for ((value, i) <- labeledPoint.values.zipWithIndex if value != missing) {
indicesBuilder += (if (labeledPoint.indices == null) i else labeledPoint.indices(i))
valuesBuilder += value
}
val sFeatures = new SparseVector(dFeatures.values.length, indices.result(),
values.result())
MLLabeledPoint(label, sFeatures)
labeledPoint.copy(indices = indicesBuilder.result(), values = valuesBuilder.result())
}
} else {
denseLabeledPoints
}
}
private def fromBaseMarginsToArray(baseMargins: Iterator[Float]): Option[Array[Float]] = {
val builder = new mutable.ArrayBuilder.ofFloat()
var nTotal = 0
var nUndefined = 0
while (baseMargins.hasNext) {
nTotal += 1
val baseMargin = baseMargins.next()
if (baseMargin.isNaN) {
nUndefined += 1 // don't waste space for all-NaNs.
} else {
builder += baseMargin
}
}
if (nUndefined == nTotal) {
None
} else if (nUndefined == 0) {
Some(builder.result())
} else {
throw new IllegalArgumentException(
s"Encountered a partition with $nUndefined NaN base margin values. " +
"If you want to specify base margin, ensure all values are non-NaN.")
}
}
private[spark] def buildDistributedBoosters(
trainingSet: RDD[MLLabeledPoint],
trainingSet: RDD[XGBLabeledPoint],
params: Map[String, Any],
rabitEnv: java.util.Map[String, String],
numWorkers: Int,
@@ -83,25 +102,20 @@ object XGBoost extends Serializable {
obj: ObjectiveTrait,
eval: EvalTrait,
useExternalMemory: Boolean,
missing: Float,
baseMargin: RDD[Float]): RDD[Booster] = {
import DataUtils._
missing: Float): RDD[Booster] = {
val partitionedTrainingSet = if (trainingSet.getNumPartitions != numWorkers) {
logger.info(s"repartitioning training set to $numWorkers partitions")
trainingSet.repartition(numWorkers)
} else {
trainingSet
}
val partitionedBaseMargin = Option(baseMargin)
.getOrElse(trainingSet.sparkContext.emptyRDD)
.repartition(partitionedTrainingSet.getNumPartitions)
val partitionedBaseMargin = partitionedTrainingSet.map(_.baseMargin)
val appName = partitionedTrainingSet.context.appName
// to workaround the empty partitions in training dataset,
// this might not be the best efficient implementation, see
// (https://github.com/dmlc/xgboost/issues/1277)
partitionedTrainingSet.zipPartitions(partitionedBaseMargin) { (trainingSamples, baseMargin) =>
if (trainingSamples.isEmpty) {
partitionedTrainingSet.zipPartitions(partitionedBaseMargin) { (trainingPoints, baseMargins) =>
if (trainingPoints.isEmpty) {
throw new XGBoostError(
s"detected an empty partition in the training data, partition ID:" +
s" ${TaskContext.getPartitionId()}")
@@ -114,16 +128,15 @@ object XGBoost extends Serializable {
}
rabitEnv.put("DMLC_TASK_ID", TaskContext.getPartitionId().toString)
Rabit.init(rabitEnv)
val partitionItr = fromDenseToSparseLabeledPoints(trainingSamples, missing)
val trainingMatrix = new DMatrix(partitionItr, cacheFileName)
val trainingMatrix = new DMatrix(
fromDenseToSparseLabeledPoints(trainingPoints, missing), cacheFileName)
try {
// TODO: use group attribute from the points.
if (params.contains("groupData") && params("groupData") != null) {
trainingMatrix.setGroup(params("groupData").asInstanceOf[Seq[Seq[Int]]](
TaskContext.getPartitionId()).toArray)
}
if (baseMargin.nonEmpty) {
trainingMatrix.setBaseMargin(baseMargin.toArray)
}
fromBaseMarginsToArray(baseMargins).foreach(trainingMatrix.setBaseMargin)
val booster = SXGBoost.train(trainingMatrix, params, round,
watches = Map("train" -> trainingMatrix), obj, eval)
Iterator(booster)
@@ -199,7 +212,6 @@ object XGBoost extends Serializable {
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset
* @param baseMargin initial prediction for boosting.
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training
*/
@@ -212,10 +224,9 @@ object XGBoost extends Serializable {
obj: ObjectiveTrait = null,
eval: EvalTrait = null,
useExternalMemory: Boolean = false,
missing: Float = Float.NaN,
baseMargin: RDD[Float] = null): XGBoostModel = {
missing: Float = Float.NaN): XGBoostModel = {
trainWithRDD(trainingData, params, round, nWorkers, obj, eval, useExternalMemory,
missing, baseMargin)
missing)
}
private def overrideParamsAccordingToTaskCPUs(
@@ -257,7 +268,6 @@ object XGBoost extends Serializable {
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset
* @param baseMargin initial prediction for boosting.
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training
*/
@@ -270,30 +280,46 @@ object XGBoost extends Serializable {
obj: ObjectiveTrait = null,
eval: EvalTrait = null,
useExternalMemory: Boolean = false,
missing: Float = Float.NaN,
baseMargin: RDD[Float] = null): XGBoostModel = {
missing: Float = Float.NaN): XGBoostModel = {
import DataUtils._
val xgbTrainingData = trainingData.map { case MLLabeledPoint(label, features) =>
features.asXGB.copy(label = label.toFloat)
}
trainDistributed(xgbTrainingData, params, round, nWorkers, obj, eval,
useExternalMemory, missing)
}
@throws(classOf[XGBoostError])
private[spark] def trainDistributed(
trainingData: RDD[XGBLabeledPoint],
params: Map[String, Any],
round: Int,
nWorkers: Int,
obj: ObjectiveTrait = null,
eval: EvalTrait = null,
useExternalMemory: Boolean = false,
missing: Float = Float.NaN): XGBoostModel = {
if (params.contains("tree_method")) {
require(params("tree_method") != "hist", "xgboost4j-spark does not support fast histogram" +
" for now")
" for now")
}
require(nWorkers > 0, "you must specify more than 0 workers")
if (obj != null) {
require(params.get("obj_type").isDefined, "parameter \"obj_type\" is not defined," +
" you have to specify the objective type as classification or regression with a" +
" customized objective function")
" you have to specify the objective type as classification or regression with a" +
" customized objective function")
}
val trackerConf = params.get("tracker_conf") match {
case None => TrackerConf()
case Some(conf: TrackerConf) => conf
case _ => throw new IllegalArgumentException("parameter \"tracker_conf\" must be an " +
"instance of TrackerConf.")
"instance of TrackerConf.")
}
val tracker = startTracker(nWorkers, trackerConf)
try {
val overriddenParams = overrideParamsAccordingToTaskCPUs(params, trainingData.sparkContext)
val boosters = buildDistributedBoosters(trainingData, overriddenParams,
tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing,
baseMargin)
tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing)
val sparkJobThread = new Thread() {
override def run() {
// force the job

View File

@@ -19,23 +19,23 @@ package ml.dmlc.xgboost4j.scala.spark
import scala.collection.mutable
import ml.dmlc.xgboost4j.scala.spark.params._
import org.json4s.DefaultFormats
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.spark.ml.Predictor
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.FloatType
import org.apache.spark.sql.{Dataset, Row}
import org.json4s.DefaultFormats
/**
* XGBoost Estimator to produce a XGBoost model
*/
class XGBoostEstimator private[spark](
override val uid: String, xgboostParams: Map[String, Any])
extends Predictor[MLVector, XGBoostEstimator, XGBoostModel]
extends Predictor[Vector, XGBoostEstimator, XGBoostModel]
with LearningTaskParams with GeneralParams with BoosterParams with MLWritable {
def this(xgboostParams: Map[String, Any]) =
@@ -107,18 +107,32 @@ class XGBoostEstimator private[spark](
}
}
private def ensureColumns(trainingSet: Dataset[_]): Dataset[_] = {
if (trainingSet.columns.contains($(baseMarginCol))) {
trainingSet
} else {
trainingSet.withColumn($(baseMarginCol), lit(Float.NaN))
}
}
/**
* produce a XGBoostModel by fitting the given dataset
*/
override def train(trainingSet: Dataset[_]): XGBoostModel = {
val instances = trainingSet.select(
col($(featuresCol)), col($(labelCol)).cast(DoubleType)).rdd.map {
case Row(feature: MLVector, label: Double) =>
LabeledPoint(label, feature)
val instances = ensureColumns(trainingSet).select(
col($(featuresCol)),
col($(labelCol)).cast(FloatType),
col($(baseMarginCol)).cast(FloatType)
).rdd.map { case Row(features: Vector, label: Float, baseMargin: Float) =>
val (indices, values) = features match {
case v: SparseVector => (v.indices, v.values.map(_.toFloat))
case v: DenseVector => (null, v.values.map(_.toFloat))
}
XGBLabeledPoint(label.toFloat, indices, values, baseMargin = baseMargin)
}
transformSchema(trainingSet.schema, logging = true)
val derivedXGBoosterParamMap = fromParamsToXGBParamMap
val trainedModel = XGBoost.trainWithRDD(instances, derivedXGBoosterParamMap,
val trainedModel = XGBoost.trainDistributed(instances, derivedXGBoosterParamMap,
$(round), $(nWorkers), $(customObj), $(customEval), $(useExternalMemory),
$(missing)).setParent(this)
val returnedModel = copyValues(trainedModel, extractParamMap())

View File

@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import ml.dmlc.xgboost4j.java.Rabit
import ml.dmlc.xgboost4j.scala.spark.params.{BoosterParams, DefaultXGBoostParamsWriter}
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, EvalTrait}
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.spark.ml.PredictionModel
@@ -66,7 +67,7 @@ abstract class XGBoostModel(protected var _booster: Booster)
val rabitEnv = Map("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString)
Rabit.init(rabitEnv.asJava)
if (testSamples.nonEmpty) {
val dMatrix = new DMatrix(testSamples)
val dMatrix = new DMatrix(testSamples.map(_.asXGB))
try {
broadcastBooster.value.predictLeaf(dMatrix).iterator
} finally {
@@ -103,6 +104,7 @@ abstract class XGBoostModel(protected var _booster: Booster)
val appName = evalDataset.context.appName
val allEvalMetrics = evalDataset.mapPartitions {
labeledPointsPartition =>
import DataUtils._
if (labeledPointsPartition.hasNext) {
val rabitEnv = Map("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString)
Rabit.init(rabitEnv.asJava)
@@ -114,8 +116,7 @@ abstract class XGBoostModel(protected var _booster: Booster)
null
}
}
import DataUtils._
val dMatrix = new DMatrix(labeledPointsPartition, cacheFileName)
val dMatrix = new DMatrix(labeledPointsPartition.map(_.features.asXGB), cacheFileName)
try {
if (groupData != null) {
dMatrix.setGroup(groupData(TaskContext.getPartitionId()).toArray)
@@ -202,7 +203,7 @@ abstract class XGBoostModel(protected var _booster: Booster)
null
}
}
val dMatrix = new DMatrix(testSamples, cacheFileName)
val dMatrix = new DMatrix(testSamples.map(_.asXGB), cacheFileName)
try {
broadcastBooster.value.predict(dMatrix).iterator
} finally {
@@ -250,7 +251,7 @@ abstract class XGBoostModel(protected var _booster: Booster)
null
}
}
val testDataset = new DMatrix(vectorIterator, cachePrefix)
val testDataset = new DMatrix(vectorIterator.map(_.asXGB), cachePrefix)
try {
val rawPredictResults = {
if (!predLeaf) {

View File

@@ -60,7 +60,13 @@ trait LearningTaskParams extends Params {
val groupData = new GroupDataParam(this, "groupData", "group data specify each group size" +
" for ranking task. To correspond to partition of training data, it is nested.")
setDefault(objective -> "reg:linear", baseScore -> 0.5, numClasses -> 2, groupData -> null)
/**
* Initial prediction (aka base margin) column name.
*/
val baseMarginCol = new Param[String](this, "baseMarginCol", "base margin column name")
setDefault(objective -> "reg:linear", baseScore -> 0.5, numClasses -> 2, groupData -> null,
baseMarginCol -> "baseMargin")
}
private[spark] object LearningTaskParams {

View File

@@ -18,8 +18,7 @@ package ml.dmlc.xgboost4j.scala.spark
import scala.io.Source
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.{Vectors => MLVectors}
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
trait TrainTestData {
protected def getResourceLines(resource: String): Iterator[String] = {
@@ -32,60 +31,60 @@ trait TrainTestData {
Source.fromInputStream(is).getLines()
}
protected def getLabeledPoints(resource: String, zeroBased: Boolean): Seq[MLLabeledPoint] = {
protected def getLabeledPoints(resource: String, zeroBased: Boolean): Seq[XGBLabeledPoint] = {
getResourceLines(resource).map { line =>
val labelAndFeatures = line.split(" ")
val label = labelAndFeatures.head.toDouble
val values = new Array[Double](126)
val label = labelAndFeatures.head.toFloat
val values = new Array[Float](126)
for (feature <- labelAndFeatures.tail) {
val idAndValue = feature.split(":")
if (!zeroBased) {
values(idAndValue(0).toInt - 1) = idAndValue(1).toDouble
values(idAndValue(0).toInt - 1) = idAndValue(1).toFloat
} else {
values(idAndValue(0).toInt) = idAndValue(1).toDouble
values(idAndValue(0).toInt) = idAndValue(1).toFloat
}
}
MLLabeledPoint(label, MLVectors.dense(values))
XGBLabeledPoint(label, null, values)
}.toList
}
}
object Classification extends TrainTestData {
val train: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.train", zeroBased = false)
val test: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.test", zeroBased = false)
val train: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.train", zeroBased = false)
val test: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.test", zeroBased = false)
}
object MultiClassification extends TrainTestData {
val train: Seq[MLLabeledPoint] = getLabeledPoints("/dermatology.data")
val train: Seq[XGBLabeledPoint] = getLabeledPoints("/dermatology.data")
private def getLabeledPoints(resource: String): Seq[MLLabeledPoint] = {
private def getLabeledPoints(resource: String): Seq[XGBLabeledPoint] = {
getResourceLines(resource).map { line =>
val featuresAndLabel = line.split(",")
val label = featuresAndLabel.last.toDouble - 1
val values = new Array[Double](featuresAndLabel.length - 1)
val label = featuresAndLabel.last.toFloat - 1
val values = new Array[Float](featuresAndLabel.length - 1)
values(values.length - 1) =
if (featuresAndLabel(featuresAndLabel.length - 2) == "?") 1 else 0
for (i <- 0 until values.length - 2) {
values(i) = featuresAndLabel(i).toDouble
values(i) = featuresAndLabel(i).toFloat
}
MLLabeledPoint(label, MLVectors.dense(values.take(values.length - 1)))
XGBLabeledPoint(label, null, values.take(values.length - 1))
}.toList
}
}
object Regression extends TrainTestData {
val train: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.train", zeroBased = true)
val test: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.test", zeroBased = true)
val train: Seq[XGBLabeledPoint] = getLabeledPoints("/machine.txt.train", zeroBased = true)
val test: Seq[XGBLabeledPoint] = getLabeledPoints("/machine.txt.test", zeroBased = true)
}
object Ranking extends TrainTestData {
val train0: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-0.txt.train", zeroBased = false)
val train1: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-1.txt.train", zeroBased = false)
val train0: Seq[XGBLabeledPoint] = getLabeledPoints("/rank-demo-0.txt.train", zeroBased = false)
val train1: Seq[XGBLabeledPoint] = getLabeledPoints("/rank-demo-1.txt.train", zeroBased = false)
val trainGroup0: Seq[Int] = getGroups("/rank-demo-0.txt.train.group")
val trainGroup1: Seq[Int] = getGroups("/rank-demo-1.txt.train.group")
val test: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo.txt.test", zeroBased = false)
val test: Seq[XGBLabeledPoint] = getLabeledPoints("/rank-demo.txt.test", zeroBased = false)
private def getGroups(resource: String): Seq[Int] = {
getResourceLines(resource).map(_.toInt).toList

View File

@@ -18,6 +18,8 @@ package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.scalatest.FunSuite
@@ -27,19 +29,18 @@ class XGBoostConfigureSuite extends FunSuite with PerTest {
.config("spark.kryo.classesToRegister", classOf[Booster].getName)
test("nthread configuration must be no larger than spark.task.cpus") {
val trainingRDD = sc.parallelize(Classification.train)
val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic",
"nthread" -> (sc.getConf.getInt("spark.task.cpus", 1) + 1))
intercept[IllegalArgumentException] {
XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers)
XGBoost.trainWithRDD(sc.parallelize(List()), paramMap, 5, numWorkers)
}
}
test("kryoSerializer test") {
import DataUtils._
// TODO write an isolated test for Booster.
val trainingRDD = sc.parallelize(Classification.train)
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator, null)
val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic")

View File

@@ -17,20 +17,22 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost}
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.scalatest.FunSuite
class XGBoostDFSuite extends FunSuite with PerTest {
private def buildDataFrame(
instances: Seq[MLLabeledPoint],
labeledPoints: Seq[XGBLabeledPoint],
numPartitions: Int = numWorkers): DataFrame = {
val it = instances.iterator.zipWithIndex
.map { case (instance: MLLabeledPoint, id: Int) =>
(id, instance.label, instance.features)
import DataUtils._
val it = labeledPoints.iterator.zipWithIndex
.map { case (labeledPoint: XGBLabeledPoint, id: Int) =>
(id, labeledPoint.label, labeledPoint.features)
}
ss.createDataFrame(sc.parallelize(it.toList, numPartitions))
@@ -42,7 +44,6 @@ class XGBoostDFSuite extends FunSuite with PerTest {
"objective" -> "binary:logistic")
val trainingItr = Classification.train.iterator
val testItr = Classification.test.iterator
import DataUtils._
val round = 5
val trainDMatrix = new DMatrix(trainingItr)
val testDMatrix = new DMatrix(testItr)
@@ -157,7 +158,6 @@ class XGBoostDFSuite extends FunSuite with PerTest {
val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap,
round = 10, nWorkers = math.min(2, numWorkers))
val error = new EvalError
import DataUtils._
val testSetDMatrix = new DMatrix(testItr)
assert(error.eval(xgBoostModelWithDF.booster.predict(testSetDMatrix, outPutMargin = true),
testSetDMatrix) < 0.1)
@@ -193,4 +193,24 @@ class XGBoostDFSuite extends FunSuite with PerTest {
assert(model.get[Double](model.eta).get == 0.1)
assert(model.get[Int](model.maxDepth).get == 6)
}
test("test use base margin") {
import DataUtils._
val trainingDf = buildDataFrame(Classification.train)
val trainingDfWithMargin = trainingDf.withColumn("margin", functions.rand())
val testRDD = sc.parallelize(Classification.test.map(_.features))
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic", "baseMarginCol" -> "margin")
def trainPredict(df: Dataset[_]): Array[Float] = {
XGBoost.trainWithDataFrame(df, paramMap, round = 1, numWorkers)
.predict(testRDD)
.map { case Array(p) => p }
.collect()
}
val pred = trainPredict(trainingDf)
val predWithMargin = trainPredict(trainingDfWithMargin)
assert((pred, predWithMargin).zipped.exists { case (p, pwm) => p !== pwm })
}
}

View File

@@ -19,7 +19,6 @@ package ml.dmlc.xgboost4j.scala.spark
import java.nio.file.Files
import java.util.concurrent.LinkedBlockingDeque
import scala.collection.mutable.ListBuffer
import scala.util.Random
import ml.dmlc.xgboost4j.java.Rabit
@@ -27,8 +26,8 @@ import ml.dmlc.xgboost4j.scala.DMatrix
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.{Vectors, Vector => SparkVector}
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.{DenseVector, Vectors, Vector => SparkVector}
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
@@ -82,15 +81,15 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
"objective" -> "binary:logistic").toMap,
new java.util.HashMap[String, String](),
numWorkers = 2, round = 5, eval = null, obj = null, useExternalMemory = true,
missing = Float.NaN, baseMargin = null)
missing = Float.NaN)
val boosterCount = boosterRDD.count()
assert(boosterCount === 2)
}
test("training with external memory cache") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic").toMap
@@ -101,9 +100,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
test("training with Scala-implemented Rabit tracker") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic",
@@ -115,9 +114,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
ignore("test with fast histo depthwise") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic", "tree_method" -> "hist",
@@ -130,9 +129,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
ignore("test with fast histo lossguide") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "1",
"objective" -> "binary:logistic", "tree_method" -> "hist",
@@ -145,9 +144,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
ignore("test with fast histo lossguide with max bin") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0",
"objective" -> "binary:logistic", "tree_method" -> "hist",
@@ -161,9 +160,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
ignore("test with fast histo depthwidth with max depth") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0",
"objective" -> "binary:logistic", "tree_method" -> "hist",
@@ -177,9 +176,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
ignore("test with fast histo depthwidth with max depth and max bin") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0",
"objective" -> "binary:logistic", "tree_method" -> "hist",
@@ -193,7 +192,7 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
test("test with dense vectors containing missing value") {
def buildDenseRDD(): RDD[LabeledPoint] = {
def buildDenseRDD(): RDD[MLLabeledPoint] = {
val numRows = 100
val numCols = 5
@@ -203,23 +202,24 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
if (c == numCols - 1) -0.1 else Random.nextDouble()
}
LabeledPoint(label, Vectors.dense(values))
MLLabeledPoint(label, Vectors.dense(values))
}
sc.parallelize(labeledPoints)
}
val trainingRDD = buildDenseRDD().repartition(4)
val testRDD = buildDenseRDD().repartition(4)
val testRDD = buildDenseRDD().repartition(4).map(_.features.asInstanceOf[DenseVector])
val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic").toMap
val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers,
useExternalMemory = true)
xgBoostModel.predict(testRDD.map(_.features.toDense), missingValue = -0.1f).collect()
xgBoostModel.predict(testRDD, missingValue = -0.1f).collect()
}
test("test consistency of prediction functions with RDD") {
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSet = Classification.test
val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features)
val testCollection = testRDD.collect()
@@ -232,7 +232,6 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
val predRDD = xgBoostModel.predict(testRDD)
val predResult1 = predRDD.collect()
assert(testRDD.count() === predResult1.length)
import DataUtils._
val predResult2 = xgBoostModel.booster.predict(new DMatrix(testSet.iterator))
for (i <- predResult1.indices; j <- predResult1(i).indices) {
assert(predResult1(i)(j) === predResult2(i)(j))
@@ -240,21 +239,22 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
test("test eval functions with RDD") {
val trainingRDD = sc.parallelize(Classification.train).cache()
import DataUtils._
val trainingRDD = sc.parallelize(Classification.train).map(_.asML).cache()
val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic")
val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, nWorkers = numWorkers)
val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, numWorkers)
// Nan Zhu: deprecate it for now
// xgBoostModel.eval(trainingRDD, "eval1", iter = 5, useExternalCache = false)
xgBoostModel.eval(trainingRDD, "eval2", evalFunc = new EvalError, useExternalCache = false)
}
test("test prediction functionality with empty partition") {
import DataUtils._
def buildEmptyRDD(sparkContext: Option[SparkContext] = None): RDD[SparkVector] = {
sparkContext.getOrElse(sc).parallelize(List[SparkVector](), numWorkers)
}
val trainingRDD = sc.parallelize(Classification.train)
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testRDD = buildEmptyRDD()
val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic").toMap
@@ -263,9 +263,9 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
test("test model consistency after save and load") {
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train)
import DataUtils._
val eval = new EvalError()
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
val testSetDMatrix = new DMatrix(Classification.test.iterator)
val tempDir = Files.createTempDirectory("xgboosttest-")
val tempFile = Files.createTempFile(tempDir, "", "")
@@ -283,9 +283,10 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
test("test save and load of different types of models") {
import DataUtils._
val tempDir = Files.createTempDirectory("xgboosttest-")
val tempFile = Files.createTempFile(tempDir, "", "")
val trainingRDD = sc.parallelize(Classification.train)
val trainingRDD = sc.parallelize(Classification.train).map(_.asML)
var paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "reg:linear")
// validate regression model
@@ -320,7 +321,8 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
test("test use groupData") {
val trainingRDD = sc.parallelize(Ranking.train0, numSlices = 1)
import DataUtils._
val trainingRDD = sc.parallelize(Ranking.train0, numSlices = 1).map(_.asML)
val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0)
val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features)
@@ -337,9 +339,10 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
}
test("test use nested groupData") {
import DataUtils._
val trainingRDD0 = sc.parallelize(Ranking.train0, numSlices = 1)
val trainingRDD1 = sc.parallelize(Ranking.train1, numSlices = 1)
val trainingRDD = trainingRDD0.union(trainingRDD1)
val trainingRDD = trainingRDD0.union(trainingRDD1).map(_.asML)
val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0, Ranking.trainGroup1)
@@ -353,27 +356,4 @@ class XGBoostGeneralSuite extends FunSuite with PerTest {
val predResult1: Array[Array[Float]] = predRDD.collect()
assert(testRDD.count() === predResult1.length)
}
test("test use base margin") {
val trainRDD = sc.parallelize(Ranking.train0, numSlices = 1)
val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features)
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "rank:pairwise")
val trainMargin = {
XGBoost.trainWithRDD(trainRDD, paramMap, round = 1, nWorkers = 2)
.predict(trainRDD.map(_.features), outputMargin = true)
.map { case Array(m) => m }
}
val xgBoostModel = XGBoost.trainWithRDD(
trainRDD,
paramMap,
round = 1,
nWorkers = 2,
baseMargin = trainMargin)
assert(testRDD.count() === xgBoostModel.predict(testRDD).count())
}
}