Fixed Exception handling for fragmented Rabit 'print' tracker command. Fixed unit test. (#2081)

This commit is contained in:
Xin Yin 2017-03-05 15:40:59 -06:00 committed by Tianqi Chen
parent ab13fd72bd
commit 5b54b9437c
2 changed files with 10 additions and 5 deletions

View File

@ -129,8 +129,8 @@ private[scala] class RabitWorkerHandler(host: String, worldSize: Int, tracker: A
Try(decodeCommand(readBuffer)) match {
case scala.util.Success(decodedCommand) =>
tracker ! decodedCommand
case scala.util.Failure(th: java.nio.BufferOverflowException) =>
// BufferOverflowException would occur if the message to print has not arrived yet.
case scala.util.Failure(th: java.nio.BufferUnderflowException) =>
// BufferUnderflowException would occur if the message to print has not arrived yet.
// Do nothing, wait for next Tcp.Received event
case scala.util.Failure(th: Throwable) => throw th
}

View File

@ -188,9 +188,14 @@ class RabitTrackerConnectionHandlerTest
// ResumeReading should be seen once state transitions
connProbe.expectMsg(Tcp.ResumeReading)
val printCmd = WorkerTrackerPrint(0, 4, "print", "hello world!")
// 4 + 4 + 4 + 5 = 17
val (partialMessage, remainder) = printCmd.encode.splitAt(17)
val printCmd = WorkerTrackerPrint(0, 4, "0", "fragmented!")
// 4 (rank: Int) + 4 (worldSize: Int) + (4+1) (jobId: String) + (4+5) (command: String) = 22
val (partialMessage, remainder) = printCmd.encode.splitAt(22)
// make sure that the partialMessage in itself is a valid command
val partialMsgBuf = ByteBuffer.allocate(22).order(ByteOrder.nativeOrder())
partialMsgBuf.put(partialMessage.asByteBuffer)
RabitWorkerHandler.StructTrackerCommand.verify(partialMsgBuf) shouldBe true
fsm ! Tcp.Received(partialMessage)
fsm ! Tcp.Received(remainder)