[jvm-packages] fix potential unit test suites aborted issue (#6373)
* fix race conditio * code cleaning rm pom.xml-e * clean again * fix compilation issue * recover * avoid using getOrCreate * interrupt zombie threads * safe guard * fix deadlock * Update SparkParallelismTracker.scala
This commit is contained in:
parent
e426b6e040
commit
4d1d5d4010
@ -146,22 +146,30 @@ class TaskFailedListener(killSparkContext: Boolean = true) extends SparkListener
|
|||||||
|
|
||||||
object TaskFailedListener {
|
object TaskFailedListener {
|
||||||
|
|
||||||
var killerStarted = false
|
var killerStarted: Boolean = false
|
||||||
|
|
||||||
|
var sparkContextKiller: Thread = _
|
||||||
|
|
||||||
|
val sparkContextShutdownLock = new AnyRef
|
||||||
|
|
||||||
private def startedSparkContextKiller(): Unit = this.synchronized {
|
private def startedSparkContextKiller(): Unit = this.synchronized {
|
||||||
if (!killerStarted) {
|
if (!killerStarted) {
|
||||||
|
killerStarted = true
|
||||||
// Spark does not allow ListenerThread to shutdown SparkContext so that we have to do it
|
// Spark does not allow ListenerThread to shutdown SparkContext so that we have to do it
|
||||||
// in a separate thread
|
// in a separate thread
|
||||||
val sparkContextKiller = new Thread() {
|
sparkContextKiller = new Thread() {
|
||||||
override def run(): Unit = {
|
override def run(): Unit = {
|
||||||
LiveListenerBus.withinListenerThread.withValue(false) {
|
LiveListenerBus.withinListenerThread.withValue(false) {
|
||||||
SparkContext.getOrCreate().stop()
|
sparkContextShutdownLock.synchronized {
|
||||||
|
SparkContext.getActive.foreach(_.stop())
|
||||||
|
killerStarted = false
|
||||||
|
sparkContextShutdownLock.notify()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sparkContextKiller.setDaemon(true)
|
sparkContextKiller.setDaemon(true)
|
||||||
sparkContextKiller.start()
|
sparkContextKiller.start()
|
||||||
killerStarted = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -45,12 +45,26 @@ trait PerTest extends BeforeAndAfterEach { self: FunSuite =>
|
|||||||
override def beforeEach(): Unit = getOrCreateSession
|
override def beforeEach(): Unit = getOrCreateSession
|
||||||
|
|
||||||
override def afterEach() {
|
override def afterEach() {
|
||||||
synchronized {
|
TaskFailedListener.sparkContextShutdownLock.synchronized {
|
||||||
if (currentSession != null) {
|
if (currentSession != null) {
|
||||||
|
// this synchronization is mostly for the tests involving SparkContext shutdown
|
||||||
|
// for unit test involving the sparkContext shutdown there are two different events sequence
|
||||||
|
// 1. SparkContext killer is executed before afterEach, in this case, before SparkContext
|
||||||
|
// is fully stopped, afterEach() will block at the following code block
|
||||||
|
// 2. SparkContext killer is executed afterEach, in this case, currentSession.stop() in will
|
||||||
|
// block to wait for all msgs in ListenerBus get processed. Because currentSession.stop()
|
||||||
|
// has been called, SparkContext killer will not take effect
|
||||||
|
while (TaskFailedListener.killerStarted) {
|
||||||
|
TaskFailedListener.sparkContextShutdownLock.wait()
|
||||||
|
}
|
||||||
currentSession.stop()
|
currentSession.stop()
|
||||||
cleanExternalCache(currentSession.sparkContext.appName)
|
cleanExternalCache(currentSession.sparkContext.appName)
|
||||||
currentSession = null
|
currentSession = null
|
||||||
}
|
}
|
||||||
|
if (TaskFailedListener.sparkContextKiller != null) {
|
||||||
|
TaskFailedListener.sparkContextKiller.interrupt()
|
||||||
|
TaskFailedListener.sparkContextKiller = null
|
||||||
|
}
|
||||||
TaskFailedListener.killerStarted = false
|
TaskFailedListener.killerStarted = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -114,6 +114,7 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
|||||||
// assume all tasks throw exception almost same time
|
// assume all tasks throw exception almost same time
|
||||||
// 100ms should be enough to exhaust all retries
|
// 100ms should be enough to exhaust all retries
|
||||||
assert(waitAndCheckSparkShutdown(100) == true)
|
assert(waitAndCheckSparkShutdown(100) == true)
|
||||||
|
TaskFailedListener.killerStarted = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user