[Breaking][jvm-packages] Use barrier execution mode (#7836)

With the introduction of the barrier execution mode. we don't need to kill SparkContext when some xgboost tasks failed. Instead, Spark will handle the errors for us. So in this PR, `killSparkContextOnWorkerFailure` parameter is deleted.
This commit is contained in:
Bobby Wang
2022-04-25 17:09:52 +08:00
committed by GitHub
parent 6ece549a90
commit dc2e699656
15 changed files with 60 additions and 516 deletions

View File

@@ -21,6 +21,7 @@ import java.io.File
import scala.collection.mutable
import scala.util.Random
import scala.collection.JavaConverters._
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
import ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams
@@ -30,8 +31,9 @@ import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.commons.io.FileUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkParallelismTracker, TaskContext}
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.sql.SparkSession
/**
@@ -79,8 +81,7 @@ private[scala] case class XGBoostExecutionParams(
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
cacheTrainingSet: Boolean,
treeMethod: Option[String],
isLocal: Boolean,
killSparkContextOnWorkerFailure: Boolean) {
isLocal: Boolean) {
private var rawParamMap: Map[String, Any] = _
@@ -224,9 +225,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
val cacheTrainingSet = overridedParams.getOrElse("cache_training_set", false)
.asInstanceOf[Boolean]
val killSparkContext = overridedParams.getOrElse("kill_spark_context_on_worker_failure", true)
.asInstanceOf[Boolean]
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
missing, allowNonZeroForMissing, trackerConf,
timeoutRequestWorkers,
@@ -235,8 +233,7 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
xgbExecEarlyStoppingParams,
cacheTrainingSet,
treeMethod,
isLocal,
killSparkContext)
isLocal)
xgbExecParam.setRawParamMap(overridedParams)
xgbExecParam
}
@@ -351,7 +348,11 @@ object XGBoost extends Serializable {
watches.toMap, metrics, obj, eval,
earlyStoppingRound = numEarlyStoppingRounds, prevBooster)
}
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
if (TaskContext.get().partitionId() == 0) {
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
} else {
Iterator.empty
}
} catch {
case xgbException: XGBoostError =>
logger.error(s"XGBooster worker $taskId has failed $attempt times due to ", xgbException)
@@ -409,15 +410,10 @@ object XGBoost extends Serializable {
// Train for every ${savingRound} rounds and save the partially completed booster
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
val (booster, metrics) = try {
val parallelismTracker = new SparkParallelismTracker(sc,
xgbExecParams.timeoutRequestWorkers,
xgbExecParams.numWorkers,
xgbExecParams.killSparkContextOnWorkerFailure)
tracker.getWorkerEnvs().putAll(xgbRabitParams)
val rabitEnv = tracker.getWorkerEnvs
val boostersAndMetrics = trainingRDD.mapPartitions { iter => {
val boostersAndMetrics = trainingRDD.barrier().mapPartitions { iter => {
var optionWatches: Option[() => Watches] = None
// take the first Watches to train
@@ -430,24 +426,14 @@ object XGBoost extends Serializable {
xgbExecParams.eval, prevBooster)}
.getOrElse(throw new RuntimeException("No Watches to train"))
}}.cache()
val sparkJobThread = new Thread() {
override def run() {
// force the job
boostersAndMetrics.foreachPartition(() => _)
}
}
sparkJobThread.setUncaughtExceptionHandler(tracker)
val trackerReturnVal = parallelismTracker.execute {
sparkJobThread.start()
tracker.waitFor(0L)
}
}}
val (booster, metrics) = boostersAndMetrics.collect()(0)
val trackerReturnVal = tracker.waitFor(0L)
logger.info(s"Rabit returns with exit code $trackerReturnVal")
val (booster, metrics) = postTrackerReturnProcessing(trackerReturnVal,
boostersAndMetrics, sparkJobThread)
if (trackerReturnVal != 0) {
throw new XGBoostError("XGBoostModel training failed.")
}
(booster, metrics)
} finally {
tracker.stop()
@@ -467,42 +453,12 @@ object XGBoost extends Serializable {
case t: Throwable =>
// if the job was aborted due to an exception
logger.error("the job was aborted due to ", t)
if (xgbExecParams.killSparkContextOnWorkerFailure) {
sc.stop()
}
throw t
} finally {
optionalCachedRDD.foreach(_.unpersist())
}
}
private def postTrackerReturnProcessing(
trackerReturnVal: Int,
distributedBoostersAndMetrics: RDD[(Booster, Map[String, Array[Float]])],
sparkJobThread: Thread): (Booster, Map[String, Array[Float]]) = {
if (trackerReturnVal == 0) {
// Copies of the final booster and the corresponding metrics
// reside in each partition of the `distributedBoostersAndMetrics`.
// Any of them can be used to create the model.
// it's safe to block here forever, as the tracker has returned successfully, and the Spark
// job should have finished, there is no reason for the thread cannot return
sparkJobThread.join()
val (booster, metrics) = distributedBoostersAndMetrics.first()
distributedBoostersAndMetrics.unpersist(false)
(booster, metrics)
} else {
try {
if (sparkJobThread.isAlive) {
sparkJobThread.interrupt()
}
} catch {
case _: InterruptedException =>
logger.info("spark job thread is interrupted")
}
throw new XGBoostError("XGBoostModel training failed")
}
}
}
class Watches private[scala] (

View File

@@ -105,14 +105,8 @@ private[spark] trait LearningTaskParams extends Params {
final def getMaximizeEvaluationMetrics: Boolean = $(maximizeEvaluationMetrics)
/**
* whether killing SparkContext when training task fails
*/
final val killSparkContextOnWorkerFailure = new BooleanParam(this,
"killSparkContextOnWorkerFailure", "whether killing SparkContext when training task fails")
setDefault(objective -> "reg:squarederror", baseScore -> 0.5, trainTestRatio -> 1.0,
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false, killSparkContextOnWorkerFailure -> true)
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false)
}
private[spark] object LearningTaskParams {

View File

@@ -1,175 +0,0 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.spark
import org.apache.commons.logging.LogFactory
import org.apache.spark.scheduler._
import scala.collection.mutable.{HashMap, HashSet}
/**
* A tracker that ensures enough number of executor cores are alive.
* Throws an exception when the number of alive cores is less than nWorkers.
*
* @param sc The SparkContext object
* @param timeout The maximum time to wait for enough number of workers.
* @param numWorkers nWorkers used in an XGBoost Job
* @param killSparkContextOnWorkerFailure kill SparkContext or not when task fails
*/
class SparkParallelismTracker(
val sc: SparkContext,
timeout: Long,
numWorkers: Int,
killSparkContextOnWorkerFailure: Boolean = true) {
private[this] val requestedCores = numWorkers * sc.conf.getInt("spark.task.cpus", 1)
private[this] val logger = LogFactory.getLog("XGBoostSpark")
private[this] def numAliveCores: Int = {
sc.statusStore.executorList(true).map(_.totalCores).sum
}
private[this] def waitForCondition(
condition: => Boolean,
timeout: Long,
checkInterval: Long = 100L) = {
val waitImpl = new ((Long, Boolean) => Boolean) {
override def apply(waitedTime: Long, status: Boolean): Boolean = status match {
case s if s => true
case _ => waitedTime match {
case t if t < timeout =>
Thread.sleep(checkInterval)
apply(t + checkInterval, status = condition)
case _ => false
}
}
}
waitImpl(0L, condition)
}
private[this] def safeExecute[T](body: => T): T = {
val listener = new TaskFailedListener(killSparkContextOnWorkerFailure)
sc.addSparkListener(listener)
try {
body
} finally {
sc.removeSparkListener(listener)
}
}
/**
* Execute a blocking function call with two checks on enough nWorkers:
* - Before the function starts, wait until there are enough executor cores.
* - During the execution, throws an exception if there is any executor lost.
*
* @param body A blocking function call
* @tparam T Return type
* @return The return of body
*/
def execute[T](body: => T): T = {
if (timeout <= 0) {
logger.info("starting training without setting timeout for waiting for resources")
safeExecute(body)
} else {
logger.info(s"starting training with timeout set as $timeout ms for waiting for resources")
if (!waitForCondition(numAliveCores >= requestedCores, timeout)) {
throw new IllegalStateException(s"Unable to get $requestedCores cores for XGBoost training")
}
safeExecute(body)
}
}
}
class TaskFailedListener(killSparkContext: Boolean = true) extends SparkListener {
private[this] val logger = LogFactory.getLog("XGBoostTaskFailedListener")
// {jobId, [stageId0, stageId1, ...] }
// keep track of the mapping of job id and stage ids
// when a task fails, find the job id and stage id the task belongs to, finally
// cancel the jobs
private val jobIdToStageIds: HashMap[Int, HashSet[Int]] = HashMap.empty
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (!killSparkContext) {
jobStart.stageIds.foreach(stageId => {
jobIdToStageIds.getOrElseUpdate(jobStart.jobId, new HashSet[Int]()) += stageId
})
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
if (!killSparkContext) {
jobIdToStageIds.remove(jobEnd.jobId)
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEnd.reason match {
case taskEndReason: TaskFailedReason =>
logger.error(s"Training Task Failed during XGBoost Training: " +
s"$taskEndReason")
if (killSparkContext) {
logger.error("killing SparkContext")
TaskFailedListener.startedSparkContextKiller()
} else {
val stageId = taskEnd.stageId
// find job ids according to stage id and then cancel the job
jobIdToStageIds.foreach {
case (jobId, stageIds) =>
if (stageIds.contains(stageId)) {
logger.error("Cancelling jobId:" + jobId)
jobIdToStageIds.remove(jobId)
SparkContext.getOrCreate().cancelJob(jobId)
}
}
}
case _ =>
}
}
}
object TaskFailedListener {
var killerStarted: Boolean = false
var sparkContextKiller: Thread = _
val sparkContextShutdownLock = new AnyRef
private def startedSparkContextKiller(): Unit = this.synchronized {
if (!killerStarted) {
killerStarted = true
// Spark does not allow ListenerThread to shutdown SparkContext so that we have to do it
// in a separate thread
sparkContextKiller = new Thread() {
override def run(): Unit = {
LiveListenerBus.withinListenerThread.withValue(false) {
sparkContextShutdownLock.synchronized {
SparkContext.getActive.foreach(_.stop())
killerStarted = false
sparkContextShutdownLock.notify()
}
}
}
}
sparkContextKiller.setDaemon(true)
sparkContextKiller.start()
}
}
}