From cb685607b23fc06d829cbe7003a319be02707889 Mon Sep 17 00:00:00 2001
From: Bobby Wang XGBLabeledPoint by group id.
LabeledPointGroupIterator organizes data in a tuple format:
+ * (isFistGroup || isLastGroup, Array[XGBLabeledPoint]).
+ * The edge groups across partitions can be stitched together later.
+ * @param base collection of XGBLabeledPoint
+ */
+private[spark] class LabeledPointGroupIterator(base: Iterator[XGBLabeledPoint])
+ extends AbstractIterator[XGBLabeledPointGroup] {
+
+ private var firstPointOfNextGroup: XGBLabeledPoint = null
+ private var isNewGroup = false
+
+ override def hasNext: Boolean = {
+ base.hasNext || isNewGroup
+ }
+
+ override def next(): XGBLabeledPointGroup = {
+ val builder = mutable.ArrayBuilder.make[XGBLabeledPoint]
+ var isFirstGroup = true
+ if (firstPointOfNextGroup != null) {
+ builder += firstPointOfNextGroup
+ isFirstGroup = false
+ }
+
+ isNewGroup = false
+ while (!isNewGroup && base.hasNext) {
+ val point = base.next()
+ val groupId = if (firstPointOfNextGroup != null) firstPointOfNextGroup.group else point.group
+ firstPointOfNextGroup = point
+ if (point.group == groupId) {
+ // add to current group
+ builder += point
+ } else {
+ // start a new group
+ isNewGroup = true
+ }
+ }
+
+ val isLastGroup = !isNewGroup
+ val result = builder.result()
+ val group = XGBLabeledPointGroup(result(0).group, result, isFirstGroup || isLastGroup)
+
+ group
+ }
+}
diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
index dbc4a0aa5..441aeb382 100644
--- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
+++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
@@ -17,9 +17,8 @@
package ml.dmlc.xgboost4j.scala.spark
import java.io.File
-import java.nio.file.Files
-import scala.collection.{AbstractIterator, mutable}
+import scala.collection.mutable
import scala.util.Random
import scala.collection.JavaConverters._
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
@@ -34,8 +33,6 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkParallelismTracker, TaskContext}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.storage.StorageLevel
-
/**
* Rabit tracker configurations.
@@ -50,7 +47,7 @@ import org.apache.spark.storage.StorageLevel
* in Scala without Python components, and with full support of timeouts.
* The Scala implementation is currently experimental, use at your own risk.
*/
-case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String)
+case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String )
object TrackerConf {
def apply(): TrackerConf = TrackerConf(0L, "python")
@@ -61,7 +58,7 @@ private[this] case class XGBoostExecutionEarlyStoppingParams(numEarlyStoppingRou
private[this] case class XGBoostExecutionInputParams(trainTestRatio: Double, seed: Long)
-private[this] case class XGBoostExecutionParams(
+private[spark] case class XGBoostExecutionParams(
numWorkers: Int,
numRounds: Int,
useExternalMemory: Boolean,
@@ -257,96 +254,9 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
)
}
-/**
- * Traing data group in a RDD partition.
- * @param groupId The group id
- * @param points Array of XGBLabeledPoint within the same group.
- * @param isEdgeGroup whether it is a frist or last group in a RDD partition.
- */
-private[spark] case class XGBLabeledPointGroup(
- groupId: Int,
- points: Array[XGBLabeledPoint],
- isEdgeGroup: Boolean)
-
object XGBoost extends Serializable {
private val logger = LogFactory.getLog("XGBoostSpark")
- private def verifyMissingSetting(
- xgbLabelPoints: Iterator[XGBLabeledPoint],
- missing: Float,
- allowNonZeroMissing: Boolean): Iterator[XGBLabeledPoint] = {
- if (missing != 0.0f && !allowNonZeroMissing) {
- xgbLabelPoints.map(labeledPoint => {
- if (labeledPoint.indices != null) {
- throw new RuntimeException(s"you can only specify missing value as 0.0 (the currently" +
- s" set value $missing) when you have SparseVector or Empty vector as your feature" +
- s" format. If you didn't use Spark's VectorAssembler class to build your feature " +
- s"vector but instead did so in a way that preserves zeros in your feature vector " +
- s"you can avoid this check by using the 'allow_non_zero_for_missing parameter'" +
- s" (only use if you know what you are doing)")
- }
- labeledPoint
- })
- } else {
- xgbLabelPoints
- }
- }
-
- private def removeMissingValues(
- xgbLabelPoints: Iterator[XGBLabeledPoint],
- missing: Float,
- keepCondition: Float => Boolean): Iterator[XGBLabeledPoint] = {
- xgbLabelPoints.map { labeledPoint =>
- val indicesBuilder = new mutable.ArrayBuilder.ofInt()
- val valuesBuilder = new mutable.ArrayBuilder.ofFloat()
- for ((value, i) <- labeledPoint.values.zipWithIndex if keepCondition(value)) {
- indicesBuilder += (if (labeledPoint.indices == null) i else labeledPoint.indices(i))
- valuesBuilder += value
- }
- labeledPoint.copy(indices = indicesBuilder.result(), values = valuesBuilder.result())
- }
- }
-
- private[spark] def processMissingValues(
- xgbLabelPoints: Iterator[XGBLabeledPoint],
- missing: Float,
- allowNonZeroMissing: Boolean): Iterator[XGBLabeledPoint] = {
- if (!missing.isNaN) {
- removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing, allowNonZeroMissing),
- missing, (v: Float) => v != missing)
- } else {
- removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing, allowNonZeroMissing),
- missing, (v: Float) => !v.isNaN)
- }
- }
-
- private def processMissingValuesWithGroup(
- xgbLabelPointGroups: Iterator[Array[XGBLabeledPoint]],
- missing: Float,
- allowNonZeroMissing: Boolean): Iterator[Array[XGBLabeledPoint]] = {
- if (!missing.isNaN) {
- xgbLabelPointGroups.map {
- labeledPoints => XGBoost.processMissingValues(
- labeledPoints.iterator,
- missing,
- allowNonZeroMissing
- ).toArray
- }
- } else {
- xgbLabelPointGroups
- }
- }
-
- private def getCacheDirName(useExternalMemory: Boolean): Option[String] = {
- val taskId = TaskContext.getPartitionId().toString
- if (useExternalMemory) {
- val dir = Files.createTempDirectory(s"${TaskContext.get().stageId()}-cache-$taskId")
- Some(dir.toAbsolutePath.toString)
- } else {
- None
- }
- }
-
private def getGPUAddrFromResources: Int = {
val tc = TaskContext.get()
if (tc == null) {
@@ -437,150 +347,22 @@ object XGBoost extends Serializable {
tracker
}
- class IteratorWrapper[T](arrayOfXGBLabeledPoints: Array[(String, Iterator[T])])
- extends Iterator[(String, Iterator[T])] {
-
- private var currentIndex = 0
-
- override def hasNext: Boolean = currentIndex <= arrayOfXGBLabeledPoints.length - 1
-
- override def next(): (String, Iterator[T]) = {
- currentIndex += 1
- arrayOfXGBLabeledPoints(currentIndex - 1)
- }
- }
-
- private def coPartitionNoGroupSets(
- trainingData: RDD[XGBLabeledPoint],
- evalSets: Map[String, RDD[XGBLabeledPoint]],
- nWorkers: Int) = {
- // eval_sets is supposed to be set by the caller of [[trainDistributed]]
- val allDatasets = Map("train" -> trainingData) ++ evalSets
- val repartitionedDatasets = allDatasets.map{case (name, rdd) =>
- if (rdd.getNumPartitions != nWorkers) {
- (name, rdd.repartition(nWorkers))
- } else {
- (name, rdd)
- }
- }
- repartitionedDatasets.foldLeft(trainingData.sparkContext.parallelize(
- Array.fill[(String, Iterator[XGBLabeledPoint])](nWorkers)(null), nWorkers)){
- case (rddOfIterWrapper, (name, rddOfIter)) =>
- rddOfIterWrapper.zipPartitions(rddOfIter){
- (itrWrapper, itr) =>
- if (!itr.hasNext) {
- logger.error("when specifying eval sets as dataframes, you have to ensure that " +
- "the number of elements in each dataframe is larger than the number of workers")
- throw new Exception("too few elements in evaluation sets")
- }
- val itrArray = itrWrapper.toArray
- if (itrArray.head != null) {
- new IteratorWrapper(itrArray :+ (name -> itr))
- } else {
- new IteratorWrapper(Array(name -> itr))
- }
- }
- }
- }
-
- private def trainForNonRanking(
- trainingData: RDD[XGBLabeledPoint],
- xgbExecutionParams: XGBoostExecutionParams,
- rabitEnv: java.util.Map[String, String],
- prevBooster: Booster,
- evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[(Booster, Map[String, Array[Float]])] = {
- if (evalSetsMap.isEmpty) {
- trainingData.mapPartitions(labeledPoints => {
- val watches = Watches.buildWatches(xgbExecutionParams,
- processMissingValues(labeledPoints, xgbExecutionParams.missing,
- xgbExecutionParams.allowNonZeroForMissing),
- getCacheDirName(xgbExecutionParams.useExternalMemory))
- buildDistributedBooster(watches, xgbExecutionParams, rabitEnv, xgbExecutionParams.obj,
- xgbExecutionParams.eval, prevBooster)
- }).cache()
- } else {
- coPartitionNoGroupSets(trainingData, evalSetsMap, xgbExecutionParams.numWorkers).
- mapPartitions {
- nameAndLabeledPointSets =>
- val watches = Watches.buildWatches(
- nameAndLabeledPointSets.map {
- case (name, iter) => (name, processMissingValues(iter,
- xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing))
- },
- getCacheDirName(xgbExecutionParams.useExternalMemory))
- buildDistributedBooster(watches, xgbExecutionParams, rabitEnv, xgbExecutionParams.obj,
- xgbExecutionParams.eval, prevBooster)
- }.cache()
- }
- }
-
- private def trainForRanking(
- trainingData: RDD[Array[XGBLabeledPoint]],
- xgbExecutionParam: XGBoostExecutionParams,
- rabitEnv: java.util.Map[String, String],
- prevBooster: Booster,
- evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[(Booster, Map[String, Array[Float]])] = {
- if (evalSetsMap.isEmpty) {
- trainingData.mapPartitions(labeledPointGroups => {
- val watches = Watches.buildWatchesWithGroup(xgbExecutionParam,
- processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing,
- xgbExecutionParam.allowNonZeroForMissing),
- getCacheDirName(xgbExecutionParam.useExternalMemory))
- buildDistributedBooster(watches, xgbExecutionParam, rabitEnv,
- xgbExecutionParam.obj, xgbExecutionParam.eval, prevBooster)
- }).cache()
- } else {
- coPartitionGroupSets(trainingData, evalSetsMap, xgbExecutionParam.numWorkers).mapPartitions(
- labeledPointGroupSets => {
- val watches = Watches.buildWatchesWithGroup(
- labeledPointGroupSets.map {
- case (name, iter) => (name, processMissingValuesWithGroup(iter,
- xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing))
- },
- getCacheDirName(xgbExecutionParam.useExternalMemory))
- buildDistributedBooster(watches, xgbExecutionParam, rabitEnv,
- xgbExecutionParam.obj,
- xgbExecutionParam.eval,
- prevBooster)
- }).cache()
- }
- }
-
- private def cacheData(ifCacheDataBoolean: Boolean, input: RDD[_]): RDD[_] = {
- if (ifCacheDataBoolean) input.persist(StorageLevel.MEMORY_AND_DISK) else input
- }
-
- private def composeInputData(
- trainingData: RDD[XGBLabeledPoint],
- ifCacheDataBoolean: Boolean,
- hasGroup: Boolean,
- nWorkers: Int): Either[RDD[Array[XGBLabeledPoint]], RDD[XGBLabeledPoint]] = {
- if (hasGroup) {
- val repartitionedData = repartitionForTrainingGroup(trainingData, nWorkers)
- Left(cacheData(ifCacheDataBoolean, repartitionedData).
- asInstanceOf[RDD[Array[XGBLabeledPoint]]])
- } else {
- Right(cacheData(ifCacheDataBoolean, trainingData).asInstanceOf[RDD[XGBLabeledPoint]])
- }
- }
-
/**
* @return A tuple of the booster and the metrics used to build training summary
*/
@throws(classOf[XGBoostError])
private[spark] def trainDistributed(
- trainingData: RDD[XGBLabeledPoint],
- params: Map[String, Any],
- hasGroup: Boolean = false,
- evalSetsMap: Map[String, RDD[XGBLabeledPoint]] = Map()):
+ sc: SparkContext,
+ buildTrainingData: XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]),
+ params: Map[String, Any]):
(Booster, Map[String, Array[Float]]) = {
+
logger.info(s"Running XGBoost ${spark.VERSION} with parameters:\n${params.mkString("\n")}")
- val xgbParamsFactory = new XGBoostExecutionParamsFactory(params, trainingData.sparkContext)
+
+ val xgbParamsFactory = new XGBoostExecutionParamsFactory(params, sc)
val xgbExecParams = xgbParamsFactory.buildXGBRuntimeParams
val xgbRabitParams = xgbParamsFactory.buildRabitParams.asJava
- val sc = trainingData.sparkContext
- val transformedTrainingData = composeInputData(trainingData, xgbExecParams.cacheTrainingSet,
- hasGroup, xgbExecParams.numWorkers)
+
val prevBooster = xgbExecParams.checkpointParam.map { checkpointParam =>
val checkpointManager = new ExternalCheckpointManager(
checkpointParam.checkpointPath,
@@ -588,6 +370,10 @@ object XGBoost extends Serializable {
checkpointManager.cleanUpHigherVersions(xgbExecParams.numRounds)
checkpointManager.loadCheckpointAsScalaBooster()
}.orNull
+
+ // Get the training data RDD and the cachedRDD
+ val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
+
try {
// Train for every ${savingRound} rounds and save the partially completed booster
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
@@ -599,13 +385,21 @@ object XGBoost extends Serializable {
tracker.getWorkerEnvs().putAll(xgbRabitParams)
val rabitEnv = tracker.getWorkerEnvs
- val boostersAndMetrics = if (hasGroup) {
- trainForRanking(transformedTrainingData.left.get, xgbExecParams, rabitEnv, prevBooster,
- evalSetsMap)
- } else {
- trainForNonRanking(transformedTrainingData.right.get, xgbExecParams, rabitEnv,
- prevBooster, evalSetsMap)
- }
+
+ val boostersAndMetrics = trainingRDD.mapPartitions { iter => {
+ var optionWatches: Option[Watches] = None
+
+ // take the first Watches to train
+ if (iter.hasNext) {
+ optionWatches = Some(iter.next())
+ }
+
+ optionWatches.map { watches => buildDistributedBooster(watches, xgbExecParams, rabitEnv,
+ xgbExecParams.obj, xgbExecParams.eval, prevBooster)}
+ .getOrElse(throw new RuntimeException("No Watches to train"))
+
+ }}.cache()
+
val sparkJobThread = new Thread() {
override def run() {
// force the job
@@ -642,85 +436,11 @@ object XGBoost extends Serializable {
// if the job was aborted due to an exception
logger.error("the job was aborted due to ", t)
if (xgbExecParams.killSparkContextOnWorkerFailure) {
- trainingData.sparkContext.stop()
+ sc.stop()
}
throw t
} finally {
- uncacheTrainingData(xgbExecParams.cacheTrainingSet, transformedTrainingData)
- }
- }
-
- private def uncacheTrainingData(
- cacheTrainingSet: Boolean,
- transformedTrainingData: Either[RDD[Array[XGBLabeledPoint]], RDD[XGBLabeledPoint]]): Unit = {
- if (cacheTrainingSet) {
- if (transformedTrainingData.isLeft) {
- transformedTrainingData.left.get.unpersist()
- } else {
- transformedTrainingData.right.get.unpersist()
- }
- }
- }
-
- private def aggByGroupInfo(trainingData: RDD[XGBLabeledPoint]) = {
- val normalGroups: RDD[Array[XGBLabeledPoint]] = trainingData.mapPartitions(
- // LabeledPointGroupIterator returns (Boolean, Array[XGBLabeledPoint])
- new LabeledPointGroupIterator(_)).filter(!_.isEdgeGroup).map(_.points)
-
- // edge groups with partition id.
- val edgeGroups: RDD[(Int, XGBLabeledPointGroup)] = trainingData.mapPartitions(
- new LabeledPointGroupIterator(_)).filter(_.isEdgeGroup).map(
- group => (TaskContext.getPartitionId(), group))
-
- // group chunks from different partitions together by group id in XGBLabeledPoint.
- // use groupBy instead of aggregateBy since all groups within a partition have unique group ids.
- val stitchedGroups: RDD[Array[XGBLabeledPoint]] = edgeGroups.groupBy(_._2.groupId).map(
- groups => {
- val it: Iterable[(Int, XGBLabeledPointGroup)] = groups._2
- // sorted by partition id and merge list of Array[XGBLabeledPoint] into one array
- it.toArray.sortBy(_._1).flatMap(_._2.points)
- })
- normalGroups.union(stitchedGroups)
- }
-
- private[spark] def repartitionForTrainingGroup(
- trainingData: RDD[XGBLabeledPoint], nWorkers: Int): RDD[Array[XGBLabeledPoint]] = {
- val allGroups = aggByGroupInfo(trainingData)
- logger.info(s"repartitioning training group set to $nWorkers partitions")
- allGroups.repartition(nWorkers)
- }
-
- private def coPartitionGroupSets(
- aggedTrainingSet: RDD[Array[XGBLabeledPoint]],
- evalSets: Map[String, RDD[XGBLabeledPoint]],
- nWorkers: Int): RDD[(String, Iterator[Array[XGBLabeledPoint]])] = {
- val repartitionedDatasets = Map("train" -> aggedTrainingSet) ++ evalSets.map {
- case (name, rdd) => {
- val aggedRdd = aggByGroupInfo(rdd)
- if (aggedRdd.getNumPartitions != nWorkers) {
- name -> aggedRdd.repartition(nWorkers)
- } else {
- name -> aggedRdd
- }
- }
- }
- repartitionedDatasets.foldLeft(aggedTrainingSet.sparkContext.parallelize(
- Array.fill[(String, Iterator[Array[XGBLabeledPoint]])](nWorkers)(null), nWorkers)){
- case (rddOfIterWrapper, (name, rddOfIter)) =>
- rddOfIterWrapper.zipPartitions(rddOfIter){
- (itrWrapper, itr) =>
- if (!itr.hasNext) {
- logger.error("when specifying eval sets as dataframes, you have to ensure that " +
- "the number of elements in each dataframe is larger than the number of workers")
- throw new Exception("too few elements in evaluation sets")
- }
- val itrArray = itrWrapper.toArray
- if (itrArray.head != null) {
- new IteratorWrapper(itrArray :+ (name -> itr))
- } else {
- new IteratorWrapper(Array(name -> itr))
- }
- }
+ optionalCachedRDD.foreach(_.unpersist())
}
}
@@ -753,7 +473,7 @@ object XGBoost extends Serializable {
}
-private class Watches private(
+class Watches private(
val datasets: Array[DMatrix],
val names: Array[String],
val cacheDirName: Option[String]) {
@@ -964,50 +684,4 @@ private object Watches {
}
}
-/**
- * Within each RDD partition, group the XGBLabeledPoint by group id.
- * And the first and the last groups may not have all the items due to the data partition.
- * LabeledPointGroupIterator orginaizes data in a tuple format:
- * (isFistGroup || isLastGroup, Array[XGBLabeledPoint]).
- * The edge groups across partitions can be stitched together later.
- * @param base collection of XGBLabeledPoint
- */
-private[spark] class LabeledPointGroupIterator(base: Iterator[XGBLabeledPoint])
- extends AbstractIterator[XGBLabeledPointGroup] {
- private var firstPointOfNextGroup: XGBLabeledPoint = null
- private var isNewGroup = false
-
- override def hasNext: Boolean = {
- base.hasNext || isNewGroup
- }
-
- override def next(): XGBLabeledPointGroup = {
- val builder = mutable.ArrayBuilder.make[XGBLabeledPoint]
- var isFirstGroup = true
- if (firstPointOfNextGroup != null) {
- builder += firstPointOfNextGroup
- isFirstGroup = false
- }
-
- isNewGroup = false
- while (!isNewGroup && base.hasNext) {
- val point = base.next()
- val groupId = if (firstPointOfNextGroup != null) firstPointOfNextGroup.group else point.group
- firstPointOfNextGroup = point
- if (point.group == groupId) {
- // add to current group
- builder += point
- } else {
- // start a new group
- isNewGroup = true
- }
- }
-
- val isLastGroup = !isNewGroup
- val result = builder.result()
- val group = XGBLabeledPointGroup(result(0).group, result, isFirstGroup || isLastGroup)
-
- group
- }
-}
diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala
index 3e4412c34..30c701a7b 100644
--- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala
+++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala
@@ -38,7 +38,7 @@ import scala.collection.{AbstractIterator, Iterator, mutable}
class XGBoostClassifier (
override val uid: String,
- private val xgboostParams: Map[String, Any])
+ private[spark] val xgboostParams: Map[String, Any])
extends ProbabilisticClassifier[Vector, XGBoostClassifier, XGBoostClassificationModel]
with XGBoostClassifierParams with DefaultParamsWritable {
@@ -176,26 +176,15 @@ class XGBoostClassifier (
"\'num_class\' in xgboost params.")
}
- val weight = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
- val baseMargin = if (!isDefined(baseMarginCol) || $(baseMarginCol).isEmpty) {
- lit(Float.NaN)
- } else {
- col($(baseMarginCol))
- }
-
- val trainingSet: RDD[XGBLabeledPoint] = DataUtils.convertDataFrameToXGBLabeledPointRDDs(
- col($(labelCol)), col($(featuresCol)), weight, baseMargin,
- None, $(numWorkers), needDeterministicRepartitioning, dataset.asInstanceOf[DataFrame]).head
- val evalRDDMap = getEvalSets(xgboostParams).map {
- case (name, dataFrame) => (name,
- DataUtils.convertDataFrameToXGBLabeledPointRDDs(col($(labelCol)), col($(featuresCol)),
- weight, baseMargin, None, $(numWorkers), needDeterministicRepartitioning, dataFrame).head)
- }
+ // Packing with all params plus params user defined
+ val derivedXGBParamMap = xgboostParams ++ MLlib2XGBoostParams
+ val buildTrainingData = PreXGBoost.buildDatasetToRDD(this, dataset, derivedXGBParamMap)
transformSchema(dataset.schema, logging = true)
- val derivedXGBParamMap = MLlib2XGBoostParams
+
// All non-null param maps in XGBoostClassifier are in derivedXGBParamMap.
- val (_booster, _metrics) = XGBoost.trainDistributed(trainingSet, derivedXGBParamMap,
- hasGroup = false, evalRDDMap)
+ val (_booster, _metrics) = XGBoost.trainDistributed(dataset.sparkSession.sparkContext,
+ buildTrainingData, derivedXGBParamMap)
+
val model = new XGBoostClassificationModel(uid, _numClasses, _booster)
val summary = XGBoostTrainingSummary(_metrics)
model.setSummary(summary)
@@ -265,7 +254,7 @@ class XGBoostClassificationModel private[ml](
*/
override def predict(features: Vector): Double = {
import DataUtils._
- val dm = new DMatrix(XGBoost.processMissingValues(
+ val dm = new DMatrix(processMissingValues(
Iterator(features.asXGB),
$(missing),
$(allowNonZeroForMissing)
@@ -324,7 +313,7 @@ class XGBoostClassificationModel private[ml](
}
val dm = new DMatrix(
- XGBoost.processMissingValues(
+ processMissingValues(
features.map(_.asXGB),
$(missing),
$(allowNonZeroForMissing)
diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala
index 0d640bfeb..6810a1bb7 100644
--- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala
+++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala
@@ -171,27 +171,16 @@ class XGBoostRegressor (
set(objectiveType, "regression")
}
- val weight = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
- val baseMargin = if (!isDefined(baseMarginCol) || $(baseMarginCol).isEmpty) {
- lit(Float.NaN)
- } else {
- col($(baseMarginCol))
- }
- val group = if (!isDefined(groupCol) || $(groupCol).isEmpty) lit(-1) else col($(groupCol))
- val trainingSet: RDD[XGBLabeledPoint] = DataUtils.convertDataFrameToXGBLabeledPointRDDs(
- col($(labelCol)), col($(featuresCol)), weight, baseMargin, Some(group),
- $(numWorkers), needDeterministicRepartitioning, dataset.asInstanceOf[DataFrame]).head
- val evalRDDMap = getEvalSets(xgboostParams).map {
- case (name, dataFrame) => (name,
- DataUtils.convertDataFrameToXGBLabeledPointRDDs(col($(labelCol)), col($(featuresCol)),
- weight, baseMargin, Some(group), $(numWorkers), needDeterministicRepartitioning,
- dataFrame).head)
- }
transformSchema(dataset.schema, logging = true)
- val derivedXGBParamMap = MLlib2XGBoostParams
+
+ // Packing with all params plus params user defined
+ val derivedXGBParamMap = xgboostParams ++ MLlib2XGBoostParams
+ val buildTrainingData = PreXGBoost.buildDatasetToRDD(this, dataset, derivedXGBParamMap)
+
// All non-null param maps in XGBoostRegressor are in derivedXGBParamMap.
- val (_booster, _metrics) = XGBoost.trainDistributed(trainingSet, derivedXGBParamMap,
- hasGroup = group != lit(-1), evalRDDMap)
+ val (_booster, _metrics) = XGBoost.trainDistributed(dataset.sparkSession.sparkContext,
+ buildTrainingData, derivedXGBParamMap)
+
val model = new XGBoostRegressionModel(uid, _booster)
val summary = XGBoostTrainingSummary(_metrics)
model.setSummary(summary)
@@ -260,7 +249,7 @@ class XGBoostRegressionModel private[ml] (
*/
override def predict(features: Vector): Double = {
import DataUtils._
- val dm = new DMatrix(XGBoost.processMissingValues(
+ val dm = new DMatrix(processMissingValues(
Iterator(features.asXGB),
$(missing),
$(allowNonZeroForMissing)
@@ -301,7 +290,7 @@ class XGBoostRegressionModel private[ml] (
}
val dm = new DMatrix(
- XGBoost.processMissingValues(
+ processMissingValues(
features.map(_.asXGB),
$(missing),
$(allowNonZeroForMissing)
diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimatorCommon.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostEstimatorCommon.scala
similarity index 59%
rename from jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimatorCommon.scala
rename to jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostEstimatorCommon.scala
index 1c8ddfb08..ddd3c486f 100644
--- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimatorCommon.scala
+++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostEstimatorCommon.scala
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2014 by Contributors
+ Copyright (c) 2014,2021 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -14,24 +14,20 @@
limitations under the License.
*/
-package ml.dmlc.xgboost4j.scala.spark
+package ml.dmlc.xgboost4j.scala.spark.params
-import ml.dmlc.xgboost4j.scala.spark.params._
-
-import org.apache.spark.ml.param.shared.HasWeightCol
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasWeightCol}
private[spark] sealed trait XGBoostEstimatorCommon extends GeneralParams with LearningTaskParams
- with BoosterParams with RabitParams with ParamMapFuncs with NonParamVariables {
+ with BoosterParams with RabitParams with ParamMapFuncs with NonParamVariables with HasWeightCol
+ with HasBaseMarginCol with HasLeafPredictionCol with HasContribPredictionCol with HasFeaturesCol
+ with HasLabelCol {
def needDeterministicRepartitioning: Boolean = {
getCheckpointPath != null && getCheckpointPath.nonEmpty && getCheckpointInterval > 0
}
}
-private[spark] trait XGBoostClassifierParams extends HasWeightCol with HasBaseMarginCol
- with HasNumClass with HasLeafPredictionCol with HasContribPredictionCol
- with XGBoostEstimatorCommon
+private[spark] trait XGBoostClassifierParams extends XGBoostEstimatorCommon with HasNumClass
-private[spark] trait XGBoostRegressorParams extends HasBaseMarginCol with HasWeightCol
- with HasGroupCol with HasLeafPredictionCol with HasContribPredictionCol
- with XGBoostEstimatorCommon
+private[spark] trait XGBoostRegressorParams extends XGBoostEstimatorCommon with HasGroupCol
diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala
index ff0492f41..67b2ff0c8 100644
--- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala
+++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala
@@ -18,6 +18,7 @@ package ml.dmlc.xgboost4j.scala.spark
import org.apache.spark.ml.linalg.Vectors
import org.scalatest.FunSuite
+import ml.dmlc.xgboost4j.scala.spark.DataUtils.PackedParams
import org.apache.spark.sql.functions._
@@ -55,13 +56,13 @@ class DeterministicPartitioningSuite extends FunSuite with TmpFolderPerSuite wit
resultDF
})
val transformedRDDs = transformedDFs.map(df => DataUtils.convertDataFrameToXGBLabeledPointRDDs(
- col("label"),
- col("features"),
- lit(1.0),
- lit(Float.NaN),
- None,
- numWorkers,
- deterministicPartition = true,
+ PackedParams(col("label"),
+ col("features"),
+ lit(1.0),
+ lit(Float.NaN),
+ None,
+ numWorkers,
+ deterministicPartition = true),
df
).head)
val resultsMaps = transformedRDDs.map(rdd => rdd.mapPartitionsWithIndex {
@@ -90,14 +91,13 @@ class DeterministicPartitioningSuite extends FunSuite with TmpFolderPerSuite wit
val df = ss.createDataFrame(sc.parallelize(dataset)).toDF("id", "label", "features")
val dfRepartitioned = DataUtils.convertDataFrameToXGBLabeledPointRDDs(
- col("label"),
- col("features"),
- lit(1.0),
- lit(Float.NaN),
- None,
- 10,
- deterministicPartition = true,
- df
+ PackedParams(col("label"),
+ col("features"),
+ lit(1.0),
+ lit(Float.NaN),
+ None,
+ 10,
+ deterministicPartition = true), df
).head
val partitionsSizes = dfRepartitioned
diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
index 76040ac63..875960ed6 100755
--- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
+++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
@@ -17,12 +17,14 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.java.XGBoostError
-
import scala.util.Random
+
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import ml.dmlc.xgboost4j.scala.DMatrix
+
import org.apache.spark.TaskContext
import org.scalatest.FunSuite
+
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.functions.lit
@@ -30,13 +32,14 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
test("distributed training with the specified worker number") {
val trainingRDD = sc.parallelize(Classification.train)
+ val buildTrainingRDD = PreXGBoost.buildRDDLabeledPointToRDDWatches(trainingRDD)
val (booster, metrics) = XGBoost.trainDistributed(
- trainingRDD,
+ sc,
+ buildTrainingRDD,
List("eta" -> "1", "max_depth" -> "6",
"objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> numWorkers,
"custom_eval" -> null, "custom_obj" -> null, "use_external_memory" -> false,
- "missing" -> Float.NaN).toMap,
- hasGroup = false)
+ "missing" -> Float.NaN).toMap)
assert(booster != null)
}
@@ -179,7 +182,7 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
// test different splits to cover the corner cases.
for (split <- 1 to 20) {
val trainingRDD = sc.parallelize(Ranking.train, split)
- val traingGroupsRDD = XGBoost.repartitionForTrainingGroup(trainingRDD, 4)
+ val traingGroupsRDD = PreXGBoost.repartitionForTrainingGroup(trainingRDD, 4)
val trainingGroups: Array[Array[XGBLabeledPoint]] = traingGroupsRDD.collect()
// check the the order of the groups with group id.
// Ranking.train has 20 groups
@@ -201,18 +204,19 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
// make one partition empty for testing
it.filter(_ => TaskContext.getPartitionId() != 3)
})
- XGBoost.repartitionForTrainingGroup(trainingRDD, 4)
+ PreXGBoost.repartitionForTrainingGroup(trainingRDD, 4)
}
test("distributed training with group data") {
val trainingRDD = sc.parallelize(Ranking.train, 5)
+ val buildTrainingRDD = PreXGBoost.buildRDDLabeledPointToRDDWatches(trainingRDD, hasGroup = true)
val (booster, _) = XGBoost.trainDistributed(
- trainingRDD,
+ sc,
+ buildTrainingRDD,
List("eta" -> "1", "max_depth" -> "6",
"objective" -> "rank:pairwise", "num_round" -> 5, "num_workers" -> numWorkers,
"custom_eval" -> null, "custom_obj" -> null, "use_external_memory" -> false,
- "missing" -> Float.NaN).toMap,
- hasGroup = true)
+ "missing" -> Float.NaN).toMap)
assert(booster != null)
}