[jvm-packages] Exposed baseMargin (#2450)

* Disabled excessive Spark logging in tests

* Fixed a singature of XGBoostModel.predict

Prior to this commit XGBoostModel.predict produced an RDD with
an array of predictions for each partition, effectively changing
the shape wrt the input RDD. A more natural contract for prediction
API is that given an RDD it returns a new RDD with the same number
of elements. This allows the users to easily match inputs with
predictions.

This commit removes one layer of nesting in XGBoostModel.predict output.
Even though the change is clearly non-backward compatible, I still
think it is well justified.

* Removed boxing in XGBoost.fromDenseToSparseLabeledPoints

* Inlined XGBoost.repartitionData

An if is more explicit than an opaque method name.

* Moved XGBoost.convertBoosterToXGBoostModel to XGBoostModel

* Check the input dimension in DMatrix.setBaseMargin

Prior to this commit providing an array of incorrect dimensions would
have resulted in memory corruption. Maybe backport this to C++?

* Reduced nesting in XGBoost.buildDistributedBoosters

* Ensured consistent naming of the params map

* Cleaned up DataBatch to make it easier to comprehend

* Made scalastyle happy

* Added baseMargin to XGBoost.train and trainWithRDD

* Deprecated XGBoost.train

It is ambiguous and work only for RDDs.

* Addressed review comments

* Revert "Fixed a singature of XGBoostModel.predict"

This reverts commit 06bd5dcae7780265dd57e93ed7d4135f4e78f9b4.

* Addressed more review comments

* Fixed NullPointerException in buildDistributedBoosters
This commit is contained in:
Sergei Lebedev 2017-06-30 17:27:24 +02:00 committed by Nan Zhu
parent 6b287177c8
commit d535340459
8 changed files with 206 additions and 190 deletions

View File

@ -49,7 +49,7 @@ object SparkWithRDD {
"eta" -> 0.1f, "eta" -> 0.1f,
"max_depth" -> 2, "max_depth" -> 2,
"objective" -> "binary:logistic").toMap "objective" -> "binary:logistic").toMap
val xgboostModel = XGBoost.train(trainRDD, paramMap, numRound, nWorkers = args(1).toInt, val xgboostModel = XGBoost.trainWithRDD(trainRDD, paramMap, numRound, nWorkers = args(1).toInt,
useExternalMemory = true) useExternalMemory = true)
xgboostModel.booster.predict(new DMatrix(testSet)) xgboostModel.booster.predict(new DMatrix(testSet))
// save model to HDFS path // save model to HDFS path

View File

@ -17,7 +17,6 @@
package ml.dmlc.xgboost4j.scala.spark package ml.dmlc.xgboost4j.scala.spark
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, DMatrix => JDMatrix, RabitTracker => PyRabitTracker} import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, DMatrix => JDMatrix, RabitTracker => PyRabitTracker}
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
@ -30,7 +29,6 @@ import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset import org.apache.spark.sql.Dataset
import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.{SparkContext, TaskContext}
import scala.concurrent.duration.{Duration, FiniteDuration, MILLISECONDS}
object TrackerConf { object TrackerConf {
def apply(): TrackerConf = TrackerConf(0L, "python") def apply(): TrackerConf = TrackerConf(0L, "python")
@ -53,97 +51,86 @@ case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String)
object XGBoost extends Serializable { object XGBoost extends Serializable {
private val logger = LogFactory.getLog("XGBoostSpark") private val logger = LogFactory.getLog("XGBoostSpark")
private def convertBoosterToXGBoostModel(booster: Booster, isClassification: Boolean):
XGBoostModel = {
if (!isClassification) {
new XGBoostRegressionModel(booster)
} else {
new XGBoostClassificationModel(booster)
}
}
private def fromDenseToSparseLabeledPoints( private def fromDenseToSparseLabeledPoints(
denseLabeledPoints: Iterator[MLLabeledPoint], denseLabeledPoints: Iterator[MLLabeledPoint],
missing: Float): Iterator[MLLabeledPoint] = { missing: Float): Iterator[MLLabeledPoint] = {
if (!missing.isNaN) { if (!missing.isNaN) {
val sparseLabeledPoints = new ListBuffer[MLLabeledPoint] denseLabeledPoints.map { case MLLabeledPoint(label, features) =>
for (labelPoint <- denseLabeledPoints) { val dFeatures = features.toDense
val dVector = labelPoint.features.toDense val indices = new mutable.ArrayBuilder.ofInt()
val indices = new ListBuffer[Int] val values = new mutable.ArrayBuilder.ofDouble()
val values = new ListBuffer[Double] for (i <- dFeatures.values.indices) {
for (i <- dVector.values.indices) { if (dFeatures.values(i) != missing) {
if (dVector.values(i) != missing) {
indices += i indices += i
values += dVector.values(i) values += dFeatures.values(i)
} }
} }
val sparseVector = new SparseVector(dVector.values.length, indices.toArray, val sFeatures = new SparseVector(dFeatures.values.length, indices.result(),
values.toArray) values.result())
sparseLabeledPoints += MLLabeledPoint(labelPoint.label, sparseVector) MLLabeledPoint(label, sFeatures)
} }
sparseLabeledPoints.iterator
} else { } else {
denseLabeledPoints denseLabeledPoints
} }
} }
private def repartitionData(trainingData: RDD[MLLabeledPoint], numWorkers: Int):
RDD[MLLabeledPoint] = {
if (numWorkers != trainingData.partitions.length) {
logger.info(s"repartitioning training set to $numWorkers partitions")
trainingData.repartition(numWorkers)
} else {
trainingData
}
}
private[spark] def buildDistributedBoosters( private[spark] def buildDistributedBoosters(
trainingSet: RDD[MLLabeledPoint], trainingSet: RDD[MLLabeledPoint],
xgBoostConfMap: Map[String, Any], params: Map[String, Any],
rabitEnv: java.util.Map[String, String], rabitEnv: java.util.Map[String, String],
numWorkers: Int, round: Int, obj: ObjectiveTrait, eval: EvalTrait, numWorkers: Int,
useExternalMemory: Boolean, missing: Float = Float.NaN): RDD[Booster] = { round: Int,
obj: ObjectiveTrait,
eval: EvalTrait,
useExternalMemory: Boolean,
missing: Float,
baseMargin: RDD[Float]): RDD[Booster] = {
import DataUtils._ import DataUtils._
val partitionedTrainingSet = repartitionData(trainingSet, numWorkers)
val partitionedTrainingSet = if (trainingSet.getNumPartitions != numWorkers) {
logger.info(s"repartitioning training set to $numWorkers partitions")
trainingSet.repartition(numWorkers)
} else {
trainingSet
}
val partitionedBaseMargin = Option(baseMargin)
.getOrElse(trainingSet.sparkContext.emptyRDD)
.repartition(partitionedTrainingSet.getNumPartitions)
val appName = partitionedTrainingSet.context.appName val appName = partitionedTrainingSet.context.appName
// to workaround the empty partitions in training dataset, // to workaround the empty partitions in training dataset,
// this might not be the best efficient implementation, see // this might not be the best efficient implementation, see
// (https://github.com/dmlc/xgboost/issues/1277) // (https://github.com/dmlc/xgboost/issues/1277)
partitionedTrainingSet.mapPartitions { partitionedTrainingSet.zipPartitions(partitionedBaseMargin) { (trainingSamples, baseMargin) =>
trainingSamples => if (trainingSamples.isEmpty) {
rabitEnv.put("DMLC_TASK_ID", TaskContext.getPartitionId().toString) throw new XGBoostError(
Rabit.init(rabitEnv) s"detected an empty partition in the training data, partition ID:" +
var booster: Booster = null s" ${TaskContext.getPartitionId()}")
if (trainingSamples.hasNext) { }
val cacheFileName: String = { val cacheFileName = if (useExternalMemory) {
if (useExternalMemory) { s"$appName-${TaskContext.get().stageId()}-" +
s"$appName-${TaskContext.get().stageId()}-" + s"dtrain_cache-${TaskContext.getPartitionId()}"
s"dtrain_cache-${TaskContext.getPartitionId()}" } else {
} else { null
null }
} rabitEnv.put("DMLC_TASK_ID", TaskContext.getPartitionId().toString)
} Rabit.init(rabitEnv)
val partitionItr = fromDenseToSparseLabeledPoints(trainingSamples, missing) val partitionItr = fromDenseToSparseLabeledPoints(trainingSamples, missing)
val trainingSet = new DMatrix(new JDMatrix(partitionItr, cacheFileName)) val trainingMatrix = new DMatrix(new JDMatrix(partitionItr, cacheFileName))
try { try {
if (xgBoostConfMap.contains("groupData") && xgBoostConfMap("groupData") != null) { if (params.contains("groupData") && params("groupData") != null) {
trainingSet.setGroup(xgBoostConfMap("groupData").asInstanceOf[Seq[Seq[Int]]]( trainingMatrix.setGroup(params("groupData").asInstanceOf[Seq[Seq[Int]]](
TaskContext.getPartitionId()).toArray) TaskContext.getPartitionId()).toArray)
}
booster = SXGBoost.train(trainingSet, xgBoostConfMap, round,
watches = new mutable.HashMap[String, DMatrix] {
put("train", trainingSet)
}.toMap, obj, eval)
Rabit.shutdown()
} finally {
trainingSet.delete()
}
} else {
Rabit.shutdown()
throw new XGBoostError(s"detect the empty partition in training dataset, partition ID:" +
s" ${TaskContext.getPartitionId().toString}")
} }
if (baseMargin.nonEmpty) {
trainingMatrix.setBaseMargin(baseMargin.toArray)
}
val booster = SXGBoost.train(trainingMatrix, params, round,
watches = Map("train" -> trainingMatrix), obj, eval)
Iterator(booster) Iterator(booster)
} finally {
Rabit.shutdown()
trainingMatrix.delete()
}
}.cache() }.cache()
} }
@ -191,8 +178,8 @@ object XGBoost extends Serializable {
fit(trainingData) fit(trainingData)
} }
private[spark] def isClassificationTask(paramsMap: Map[String, Any]): Boolean = { private[spark] def isClassificationTask(params: Map[String, Any]): Boolean = {
val objective = paramsMap.getOrElse("objective", paramsMap.getOrElse("obj_type", null)) val objective = params.getOrElse("objective", params.getOrElse("obj_type", null))
objective != null && { objective != null && {
val objStr = objective.toString val objStr = objective.toString
objStr == "classification" || (!objStr.startsWith("reg:") && objStr != "count:poisson" && objStr == "classification" || (!objStr.startsWith("reg:") && objStr != "count:poisson" &&
@ -212,18 +199,26 @@ object XGBoost extends Serializable {
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as * @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark * true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset * @param missing the value represented the missing value in the dataset
* @param baseMargin initial prediction for boosting.
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed * @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training * @return XGBoostModel when successful training
*/ */
@deprecated("Use XGBoost.trainWithRDD instead.")
def train( def train(
trainingData: RDD[MLLabeledPoint], params: Map[String, Any], round: Int, trainingData: RDD[MLLabeledPoint],
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null, params: Map[String, Any],
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = { round: Int,
require(nWorkers > 0, "you must specify more than 0 workers") nWorkers: Int,
trainWithRDD(trainingData, params, round, nWorkers, obj, eval, useExternalMemory, missing) obj: ObjectiveTrait = null,
eval: EvalTrait = null,
useExternalMemory: Boolean = false,
missing: Float = Float.NaN,
baseMargin: RDD[Float] = null): XGBoostModel = {
trainWithRDD(trainingData, params, round, nWorkers, obj, eval, useExternalMemory,
missing, baseMargin)
} }
private def overrideParamMapAccordingtoTaskCPUs( private def overrideParamsAccordingToTaskCPUs(
params: Map[String, Any], params: Map[String, Any],
sc: SparkContext): Map[String, Any] = { sc: SparkContext): Map[String, Any] = {
val coresPerTask = sc.getConf.get("spark.task.cpus", "1").toInt val coresPerTask = sc.getConf.get("spark.task.cpus", "1").toInt
@ -262,14 +257,21 @@ object XGBoost extends Serializable {
* @param useExternalMemory indicate whether to use external memory cache, by setting this flag as * @param useExternalMemory indicate whether to use external memory cache, by setting this flag as
* true, the user may save the RAM cost for running XGBoost within Spark * true, the user may save the RAM cost for running XGBoost within Spark
* @param missing the value represented the missing value in the dataset * @param missing the value represented the missing value in the dataset
* @param baseMargin initial prediction for boosting.
* @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed * @throws ml.dmlc.xgboost4j.java.XGBoostError when the model training is failed
* @return XGBoostModel when successful training * @return XGBoostModel when successful training
*/ */
@throws(classOf[XGBoostError]) @throws(classOf[XGBoostError])
def trainWithRDD( def trainWithRDD(
trainingData: RDD[MLLabeledPoint], params: Map[String, Any], round: Int, trainingData: RDD[MLLabeledPoint],
nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null, params: Map[String, Any],
useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = { round: Int,
nWorkers: Int,
obj: ObjectiveTrait = null,
eval: EvalTrait = null,
useExternalMemory: Boolean = false,
missing: Float = Float.NaN,
baseMargin: RDD[Float] = null): XGBoostModel = {
if (params.contains("tree_method")) { if (params.contains("tree_method")) {
require(params("tree_method") != "hist", "xgboost4j-spark does not support fast histogram" + require(params("tree_method") != "hist", "xgboost4j-spark does not support fast histogram" +
" for now") " for now")
@ -288,9 +290,10 @@ object XGBoost extends Serializable {
} }
val tracker = startTracker(nWorkers, trackerConf) val tracker = startTracker(nWorkers, trackerConf)
try { try {
val overridedConfMap = overrideParamMapAccordingtoTaskCPUs(params, trainingData.sparkContext) val overriddenParams = overrideParamsAccordingToTaskCPUs(params, trainingData.sparkContext)
val boosters = buildDistributedBoosters(trainingData, overridedConfMap, val boosters = buildDistributedBoosters(trainingData, overriddenParams,
tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing) tracker.getWorkerEnvs, nWorkers, round, obj, eval, useExternalMemory, missing,
baseMargin)
val sparkJobThread = new Thread() { val sparkJobThread = new Thread() {
override def run() { override def run() {
// force the job // force the job
@ -302,7 +305,7 @@ object XGBoost extends Serializable {
val isClsTask = isClassificationTask(params) val isClsTask = isClassificationTask(params)
val trackerReturnVal = tracker.waitFor(0L) val trackerReturnVal = tracker.waitFor(0L)
logger.info(s"Rabit returns with exit code $trackerReturnVal") logger.info(s"Rabit returns with exit code $trackerReturnVal")
postTrackerReturnProcessing(trackerReturnVal, boosters, overridedConfMap, sparkJobThread, postTrackerReturnProcessing(trackerReturnVal, boosters, overriddenParams, sparkJobThread,
isClsTask) isClsTask)
} finally { } finally {
tracker.stop() tracker.stop()
@ -311,11 +314,10 @@ object XGBoost extends Serializable {
private def postTrackerReturnProcessing( private def postTrackerReturnProcessing(
trackerReturnVal: Int, distributedBoosters: RDD[Booster], trackerReturnVal: Int, distributedBoosters: RDD[Booster],
configMap: Map[String, Any], sparkJobThread: Thread, isClassificationTask: Boolean): params: Map[String, Any], sparkJobThread: Thread, isClassificationTask: Boolean):
XGBoostModel = { XGBoostModel = {
if (trackerReturnVal == 0) { if (trackerReturnVal == 0) {
val xgboostModel = convertBoosterToXGBoostModel(distributedBoosters.first(), val xgboostModel = XGBoostModel(distributedBoosters.first(), isClassificationTask)
isClassificationTask)
distributedBoosters.unpersist(false) distributedBoosters.unpersist(false)
xgboostModel xgboostModel
} else { } else {

View File

@ -125,16 +125,15 @@ abstract class XGBoostModel(protected var _booster: Booster)
case (null, _) => { case (null, _) => {
val predStr = broadcastBooster.value.evalSet(Array(dMatrix), Array(evalName), iter) val predStr = broadcastBooster.value.evalSet(Array(dMatrix), Array(evalName), iter)
val Array(evName, predNumeric) = predStr.split(":") val Array(evName, predNumeric) = predStr.split(":")
Rabit.shutdown()
Iterator(Some(evName, predNumeric.toFloat)) Iterator(Some(evName, predNumeric.toFloat))
} }
case _ => { case _ => {
val predictions = broadcastBooster.value.predict(dMatrix) val predictions = broadcastBooster.value.predict(dMatrix)
Rabit.shutdown()
Iterator(Some((evalName, evalFunc.eval(predictions, dMatrix)))) Iterator(Some((evalName, evalFunc.eval(predictions, dMatrix))))
} }
} }
} finally { } finally {
Rabit.shutdown()
dMatrix.delete() dMatrix.delete()
} }
} else { } else {
@ -170,10 +169,9 @@ abstract class XGBoostModel(protected var _booster: Booster)
} }
val dMatrix = new DMatrix(flatSampleArray, numRows, numColumns, missingValue) val dMatrix = new DMatrix(flatSampleArray, numRows, numColumns, missingValue)
try { try {
val res = broadcastBooster.value.predict(dMatrix) Iterator(broadcastBooster.value.predict(dMatrix))
Rabit.shutdown()
Iterator(res)
} finally { } finally {
Rabit.shutdown()
dMatrix.delete() dMatrix.delete()
} }
} }
@ -185,13 +183,16 @@ abstract class XGBoostModel(protected var _booster: Booster)
* *
* @param testSet test set represented as RDD * @param testSet test set represented as RDD
* @param useExternalCache whether to use external cache for the test set * @param useExternalCache whether to use external cache for the test set
* @param outputMargin whether to output raw untransformed margin value
*/ */
def predict(testSet: RDD[MLVector], useExternalCache: Boolean = false): def predict(
RDD[Array[Array[Float]]] = { testSet: RDD[MLVector],
useExternalCache: Boolean = false,
outputMargin: Boolean = false): RDD[Array[Array[Float]]] = {
val broadcastBooster = testSet.sparkContext.broadcast(_booster) val broadcastBooster = testSet.sparkContext.broadcast(_booster)
val appName = testSet.context.appName val appName = testSet.context.appName
testSet.mapPartitions { testSamples => testSet.mapPartitions { testSamples =>
if (testSamples.hasNext) { if (testSamples.nonEmpty) {
import DataUtils._ import DataUtils._
val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap
Rabit.init(rabitEnv.asJava) Rabit.init(rabitEnv.asJava)
@ -204,10 +205,9 @@ abstract class XGBoostModel(protected var _booster: Booster)
} }
val dMatrix = new DMatrix(new JDMatrix(testSamples, cacheFileName)) val dMatrix = new DMatrix(new JDMatrix(testSamples, cacheFileName))
try { try {
val res = broadcastBooster.value.predict(dMatrix) Iterator(broadcastBooster.value.predict(dMatrix))
Rabit.shutdown()
Iterator(res)
} finally { } finally {
Rabit.shutdown()
dMatrix.delete() dMatrix.delete()
} }
} else { } else {
@ -334,6 +334,13 @@ abstract class XGBoostModel(protected var _booster: Booster)
} }
object XGBoostModel extends MLReadable[XGBoostModel] { object XGBoostModel extends MLReadable[XGBoostModel] {
private[spark] def apply(booster: Booster, isClassification: Boolean): XGBoostModel = {
if (!isClassification) {
new XGBoostRegressionModel(booster)
} else {
new XGBoostClassificationModel(booster)
}
}
override def read: MLReader[XGBoostModel] = new XGBoostModelModelReader override def read: MLReader[XGBoostModel] = new XGBoostModelModelReader

View File

@ -0,0 +1 @@
log4j.logger.org.apache.spark=ERROR

View File

@ -22,19 +22,22 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
trait SharedSparkContext extends FunSuite with BeforeAndAfter with BeforeAndAfterAll trait SharedSparkContext extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
with Serializable { with Serializable {
@transient protected implicit var sc: SparkContext = null @transient protected implicit var sc: SparkContext = _
override def beforeAll() { override def beforeAll() {
// build SparkContext val sparkConf = new SparkConf()
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("XGBoostSuite"). .setMaster("local[*]")
set("spark.driver.memory", "512m") .setAppName("XGBoostSuite")
.set("spark.driver.memory", "512m")
.set("spark.ui.enabled", "false")
sc = new SparkContext(sparkConf) sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
} }
override def afterAll() { override def afterAll() {
if (sc != null) { if (sc != null) {
sc.stop() sc.stop()
sc = null
} }
} }
} }

View File

@ -17,17 +17,15 @@
package ml.dmlc.xgboost4j.scala.spark package ml.dmlc.xgboost4j.scala.spark
import java.nio.file.Files import java.nio.file.Files
import java.util.concurrent.{BlockingQueue, LinkedBlockingDeque} import java.util.concurrent.LinkedBlockingDeque
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.io.Source import scala.io.Source
import scala.util.Random import scala.util.Random
import scala.concurrent.duration._
import ml.dmlc.xgboost4j.java.{Rabit, DMatrix => JDMatrix, RabitTracker => PyRabitTracker} import ml.dmlc.xgboost4j.java.{Rabit, DMatrix => JDMatrix}
import ml.dmlc.xgboost4j.scala.DMatrix import ml.dmlc.xgboost4j.scala.DMatrix
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
import org.scalatest.Ignore
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.feature.LabeledPoint
@ -83,7 +81,8 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", List("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic").toMap, "objective" -> "binary:logistic").toMap,
new java.util.HashMap[String, String](), new java.util.HashMap[String, String](),
numWorkers = 2, round = 5, eval = null, obj = null, useExternalMemory = true) numWorkers = 2, round = 5, eval = null, obj = null, useExternalMemory = true,
missing = Float.NaN, baseMargin = null)
val boosterCount = boosterRDD.count() val boosterCount = boosterRDD.count()
assert(boosterCount === 2) assert(boosterCount === 2)
cleanExternalCache("XGBoostSuite") cleanExternalCache("XGBoostSuite")
@ -390,4 +389,30 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
val predResult1: Array[Array[Float]] = predRDD.collect()(0) val predResult1: Array[Array[Float]] = predRDD.collect()(0)
assert(testRDD.count() === predResult1.length) assert(testRDD.count() === predResult1.length)
} }
test("test use base margin") {
val trainSet = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile)
val trainRDD = sc.parallelize(trainSet, numSlices = 1)
val testSet = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile)
val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features)
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "rank:pairwise")
val trainMargin = {
XGBoost.trainWithRDD(trainRDD, paramMap, round = 1, nWorkers = 2)
.predict(trainRDD.map(_.features), outputMargin = true)
.flatMap { _.flatten.iterator }
}
val xgBoostModel = XGBoost.trainWithRDD(
trainRDD,
paramMap,
round = 1,
nWorkers = 2,
baseMargin = trainMargin)
assert(testRDD.count() === xgBoostModel.predict(testRDD).first().length)
}
} }

View File

@ -171,26 +171,26 @@ public class DMatrix {
} }
/** /**
* if specified, xgboost will start from this init margin * Set base margin (initial prediction).
* can be used to specify initial prediction to boost from
* *
* @param baseMargin base margin * The margin must have the same number of elements as the number of
* @throws XGBoostError native error * rows in this matrix.
*/ */
public void setBaseMargin(float[] baseMargin) throws XGBoostError { public void setBaseMargin(float[] baseMargin) throws XGBoostError {
if (baseMargin.length != rowNum()) {
throw new IllegalArgumentException(String.format(
"base margin must have exactly %s elements, got %s",
rowNum(), baseMargin.length));
}
XGBoostJNI.checkCall(XGBoostJNI.XGDMatrixSetFloatInfo(handle, "base_margin", baseMargin)); XGBoostJNI.checkCall(XGBoostJNI.XGDMatrixSetFloatInfo(handle, "base_margin", baseMargin));
} }
/** /**
* if specified, xgboost will start from this init margin * Set base margin (initial prediction).
* can be used to specify initial prediction to boost from
*
* @param baseMargin base margin
* @throws XGBoostError native error
*/ */
public void setBaseMargin(float[][] baseMargin) throws XGBoostError { public void setBaseMargin(float[][] baseMargin) throws XGBoostError {
float[] flattenMargin = flatten(baseMargin); setBaseMargin(flatten(baseMargin));
setBaseMargin(flattenMargin);
} }
/** /**
@ -236,10 +236,7 @@ public class DMatrix {
} }
/** /**
* get base margin of the DMatrix * Get base margin of the DMatrix.
*
* @return base margin
* @throws XGBoostError native error
*/ */
public float[] getBaseMargin() throws XGBoostError { public float[] getBaseMargin() throws XGBoostError {
return getFloatInfo("base_margin"); return getFloatInfo("base_margin");

View File

@ -1,7 +1,8 @@
package ml.dmlc.xgboost4j.java; package ml.dmlc.xgboost4j.java;
import java.io.Serializable; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import ml.dmlc.xgboost4j.LabeledPoint; import ml.dmlc.xgboost4j.LabeledPoint;
@ -13,20 +14,18 @@ import ml.dmlc.xgboost4j.LabeledPoint;
*/ */
class DataBatch { class DataBatch {
/** The offset of each rows in the sparse matrix */ /** The offset of each rows in the sparse matrix */
long[] rowOffset = null; final long[] rowOffset;
/** weight of each data point, can be null */ /** weight of each data point, can be null */
float[] weight = null; final float[] weight;
/** label of each data point, can be null */ /** label of each data point, can be null */
float[] label = null; final float[] label;
/** index of each feature(column) in the sparse matrix */ /** index of each feature(column) in the sparse matrix */
int[] featureIndex = null; final int[] featureIndex;
/** value of each non-missing entry in the sparse matrix */ /** value of each non-missing entry in the sparse matrix */
float[] featureValue = null; final float[] featureValue ;
public DataBatch() {} DataBatch(long[] rowOffset, float[] weight, float[] label, int[] featureIndex,
float[] featureValue) {
public DataBatch(long[] rowOffset, float[] weight, float[] label, int[] featureIndex,
float[] featureValue) {
this.rowOffset = rowOffset; this.rowOffset = rowOffset;
this.weight = weight; this.weight = weight;
this.label = label; this.label = label;
@ -34,80 +33,62 @@ class DataBatch {
this.featureValue = featureValue; this.featureValue = featureValue;
} }
/**
* Get number of rows in the data batch.
* @return Number of rows in the data batch.
*/
public int numRows() {
return rowOffset.length - 1;
}
/**
* Shallow copy a DataBatch
* @return a copy of the batch
*/
public DataBatch shallowCopy() {
DataBatch b = new DataBatch();
b.rowOffset = this.rowOffset;
b.weight = this.weight;
b.label = this.label;
b.featureIndex = this.featureIndex;
b.featureValue = this.featureValue;
return b;
}
static class BatchIterator implements Iterator<DataBatch> { static class BatchIterator implements Iterator<DataBatch> {
private Iterator<LabeledPoint> base; private final Iterator<LabeledPoint> base;
private int batchSize; private final int batchSize;
BatchIterator(java.util.Iterator<LabeledPoint> base, int batchSize) { BatchIterator(Iterator<LabeledPoint> base, int batchSize) {
this.base = base; this.base = base;
this.batchSize = batchSize; this.batchSize = batchSize;
} }
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return base.hasNext(); return base.hasNext();
} }
@Override @Override
public DataBatch next() { public DataBatch next() {
int num_rows = 0, num_elem = 0; int numRows = 0;
java.util.List<LabeledPoint> batch = new java.util.ArrayList<LabeledPoint>(); int numElem = 0;
for (int i = 0; i < this.batchSize; ++i) { List<LabeledPoint> batch = new ArrayList<>(batchSize);
if (!base.hasNext()) break; while (base.hasNext() && batch.size() < batchSize) {
LabeledPoint inst = base.next(); LabeledPoint labeledPoint = base.next();
batch.add(inst); batch.add(labeledPoint);
num_elem += inst.values.length; numElem += labeledPoint.values.length;
++num_rows; numRows++;
} }
DataBatch ret = new DataBatch();
// label long[] rowOffset = new long[numRows + 1];
ret.rowOffset = new long[num_rows + 1]; float[] label = new float[numRows];
ret.label = new float[num_rows]; int[] featureIndex = new int[numElem];
ret.featureIndex = new int[num_elem]; float[] featureValue = new float[numElem];
ret.featureValue = new float[num_elem];
// current offset
int offset = 0; int offset = 0;
for (int i = 0; i < batch.size(); ++i) { for (int i = 0; i < batch.size(); i++) {
LabeledPoint inst = batch.get(i); LabeledPoint labeledPoint = batch.get(i);
ret.rowOffset[i] = offset; rowOffset[i] = offset;
ret.label[i] = inst.label; label[i] = labeledPoint.label;
if (inst.indices != null) { if (labeledPoint.indices != null) {
System.arraycopy(inst.indices, 0, ret.featureIndex, offset, inst.indices.length); System.arraycopy(labeledPoint.indices, 0, featureIndex, offset,
} else{ labeledPoint.indices.length);
for (int j = 0; j < inst.values.length; ++j) { } else {
ret.featureIndex[offset + j] = j; for (int j = 0; j < labeledPoint.values.length; j++) {
featureIndex[offset + j] = j;
} }
} }
System.arraycopy(inst.values, 0, ret.featureValue, offset, inst.values.length);
offset += inst.values.length; System.arraycopy(labeledPoint.values, 0, featureValue, offset, labeledPoint.values.length);
offset += labeledPoint.values.length;
} }
ret.rowOffset[batch.size()] = offset;
return ret; rowOffset[batch.size()] = offset;
return new DataBatch(rowOffset, null, label, featureIndex, featureValue);
} }
@Override @Override
public void remove() { public void remove() {
throw new Error("not implemented"); throw new UnsupportedOperationException("DataBatch.BatchIterator.remove");
} }
} }
} }