use another thread to control spark job
This commit is contained in:
parent
6499422e90
commit
718a9d8c96
@ -62,14 +62,28 @@ object XGBoost extends Serializable {
|
|||||||
require(tracker.start(), "FAULT: Failed to start tracker")
|
require(tracker.start(), "FAULT: Failed to start tracker")
|
||||||
boosters = buildDistributedBoosters(trainingData, configMap,
|
boosters = buildDistributedBoosters(trainingData, configMap,
|
||||||
tracker.getWorkerEnvs.asScala, numWorkers, round, obj, eval)
|
tracker.getWorkerEnvs.asScala, numWorkers, round, obj, eval)
|
||||||
// force the job
|
@volatile var booster: Booster = null
|
||||||
boosters.foreachPartition(_ => ())
|
val sparkJobThread = new Thread() {
|
||||||
val booster = boosters.first()
|
override def run() {
|
||||||
|
// force the job
|
||||||
|
boosters.foreachPartition(_ => ())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sparkJobThread.start()
|
||||||
val returnVal = tracker.waitFor()
|
val returnVal = tracker.waitFor()
|
||||||
logger.info(s"Rabit returns with exit code $returnVal")
|
logger.info(s"Rabit returns with exit code $returnVal")
|
||||||
if (returnVal == 0) {
|
if (returnVal == 0) {
|
||||||
|
booster = boosters.first()
|
||||||
Some(booster)
|
Some(booster)
|
||||||
} else {
|
} else {
|
||||||
|
try {
|
||||||
|
if (sparkJobThread.isAlive) {
|
||||||
|
sparkJobThread.interrupt()
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case ie: InterruptedException =>
|
||||||
|
logger.info("spark job thread is interrupted")
|
||||||
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user