[jvm-packages] move dmatrix building into rabit context for cpu pipeline (#7908)

This commit is contained in:
Bobby Wang 2022-05-17 14:52:25 +08:00 committed by GitHub
parent 77d4a53c32
commit b41cf92dc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 36 deletions

View File

@ -61,15 +61,14 @@ class GpuPreXGBoost extends PreXGBoostProvider {
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]] * @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
* @param dataset the training data * @param dataset the training data
* @param params all user defined and defaulted params * @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input * RDD[() => Watches] will be used as the training input
* Option[ RDD[_] ] is the optional cached RDD * Option[ RDD[_] ] is the optional cached RDD
*/ */
override def buildDatasetToRDD(estimator: Estimator[_], override def buildDatasetToRDD(estimator: Estimator[_],
dataset: Dataset[_], dataset: Dataset[_],
params: Map[String, Any]): params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = { XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params) GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
} }
@ -123,8 +122,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor * @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data * @param dataset the training data
* @param params all user defined and defaulted params * @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input to build DMatrix * RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD * Option[ RDD[_] ] is the optional cached RDD
*/ */
@ -132,7 +130,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
estimator: Estimator[_], estimator: Estimator[_],
dataset: Dataset[_], dataset: Dataset[_],
params: Map[String, Any]): params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = { XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {
val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) = val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
estimator match { estimator match {
@ -170,7 +168,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
xgbExecParams: XGBoostExecutionParams => xgbExecParams: XGBoostExecutionParams =>
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers, val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
xgbExecParams.cacheTrainingSet) xgbExecParams.cacheTrainingSet)
(true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None) (buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
} }
/** /**

View File

@ -101,8 +101,7 @@ object PreXGBoost extends PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor * @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data * @param dataset the training data
* @param params all user defined and defaulted params * @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input * RDD[() => Watches] will be used as the training input
* Option[RDD[_]\] is the optional cached RDD * Option[RDD[_]\] is the optional cached RDD
*/ */
@ -110,7 +109,7 @@ object PreXGBoost extends PreXGBoostProvider {
estimator: Estimator[_], estimator: Estimator[_],
dataset: Dataset[_], dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => params: Map[String, Any]): XGBoostExecutionParams =>
(Boolean, RDD[() => Watches], Option[RDD[_]]) = { (RDD[() => Watches], Option[RDD[_]]) = {
if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) { if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params) return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
@ -172,12 +171,12 @@ object PreXGBoost extends PreXGBoostProvider {
val cachedRDD = if (xgbExecParams.cacheTrainingSet) { val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None } else None
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) (trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) => case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) { val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None } else None
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) (trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
} }
} }
@ -324,7 +323,7 @@ object PreXGBoost extends PreXGBoostProvider {
trainingSet: RDD[XGBLabeledPoint], trainingSet: RDD[XGBLabeledPoint],
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(), evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
hasGroup: Boolean = false): hasGroup: Boolean = false):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = { XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {
xgbExecParams: XGBoostExecutionParams => xgbExecParams: XGBoostExecutionParams =>
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match { composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
@ -332,12 +331,12 @@ object PreXGBoost extends PreXGBoostProvider {
val cachedRDD = if (xgbExecParams.cacheTrainingSet) { val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None } else None
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) (trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) => case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) { val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None } else None
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) (trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
} }
} }

View File

@ -50,8 +50,7 @@ private[scala] trait PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor * @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data * @param dataset the training data
* @param params all user defined and defaulted params * @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input to build DMatrix * RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD * Option[ RDD[_] ] is the optional cached RDD
*/ */
@ -59,7 +58,7 @@ private[scala] trait PreXGBoostProvider {
estimator: Estimator[_], estimator: Estimator[_],
dataset: Dataset[_], dataset: Dataset[_],
params: Map[String, Any]): params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]])
/** /**
* Transform Dataset * Transform Dataset

View File

@ -286,7 +286,6 @@ object XGBoost extends Serializable {
} }
private def buildDistributedBooster( private def buildDistributedBooster(
buildDMatrixInRabit: Boolean,
buildWatches: () => Watches, buildWatches: () => Watches,
xgbExecutionParam: XGBoostExecutionParams, xgbExecutionParam: XGBoostExecutionParams,
rabitEnv: java.util.Map[String, String], rabitEnv: java.util.Map[String, String],
@ -295,11 +294,6 @@ object XGBoost extends Serializable {
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = { prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {
var watches: Watches = null var watches: Watches = null
if (!buildDMatrixInRabit) {
// for CPU pipeline, we need to build DMatrix out of rabit context
watches = buildWatchesAndCheck(buildWatches)
}
val taskId = TaskContext.getPartitionId().toString val taskId = TaskContext.getPartitionId().toString
val attempt = TaskContext.get().attemptNumber.toString val attempt = TaskContext.get().attemptNumber.toString
rabitEnv.put("DMLC_TASK_ID", taskId) rabitEnv.put("DMLC_TASK_ID", taskId)
@ -310,10 +304,7 @@ object XGBoost extends Serializable {
try { try {
Rabit.init(rabitEnv) Rabit.init(rabitEnv)
if (buildDMatrixInRabit) { watches = buildWatchesAndCheck(buildWatches)
// for GPU pipeline, we need to move dmatrix building into rabit context
watches = buildWatchesAndCheck(buildWatches)
}
val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds
val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds)) val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds))
@ -377,7 +368,7 @@ object XGBoost extends Serializable {
@throws(classOf[XGBoostError]) @throws(classOf[XGBoostError])
private[spark] def trainDistributed( private[spark] def trainDistributed(
sc: SparkContext, sc: SparkContext,
buildTrainingData: XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]), buildTrainingData: XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]),
params: Map[String, Any]): params: Map[String, Any]):
(Booster, Map[String, Array[Float]]) = { (Booster, Map[String, Array[Float]]) = {
@ -396,7 +387,7 @@ object XGBoost extends Serializable {
}.orNull }.orNull
// Get the training data RDD and the cachedRDD // Get the training data RDD and the cachedRDD
val (buildDMatrixInRabit, trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams) val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
try { try {
// Train for every ${savingRound} rounds and save the partially completed booster // Train for every ${savingRound} rounds and save the partially completed booster
@ -413,9 +404,8 @@ object XGBoost extends Serializable {
optionWatches = Some(iter.next()) optionWatches = Some(iter.next())
} }
optionWatches.map { buildWatches => buildDistributedBooster(buildDMatrixInRabit, optionWatches.map { buildWatches => buildDistributedBooster(buildWatches,
buildWatches, xgbExecParams, rabitEnv, xgbExecParams.obj, xgbExecParams, rabitEnv, xgbExecParams.obj, xgbExecParams.eval, prevBooster)}
xgbExecParams.eval, prevBooster)}
.getOrElse(throw new RuntimeException("No Watches to train")) .getOrElse(throw new RuntimeException("No Watches to train"))
}} }}

View File

@ -65,8 +65,6 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest {
(id, lp.label, lp.features) (id, lp.label, lp.features)
}.toDF("id", "label", "features") }.toDF("id", "label", "features")
val xgb = new XGBoostClassifier(paramMap) val xgb = new XGBoostClassifier(paramMap)
intercept[Exception] { xgb.fit(repartitioned)
xgb.fit(repartitioned)
}
} }
} }