From 37dc82c3ff32de530fc888b8c2c041a3d8fba6ac Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Fri, 26 Apr 2019 21:02:40 -0700 Subject: [PATCH] [jvm-packages] allow partial evaluation of dataframe before prediction (#4407) * allow partial evaluation of dataframe before prediction * resume spark test * comments * Run unit tests after building JVM packages --- CMakeLists.txt | 2 +- dmlc-core | 2 +- ...Suite.scala => RabitRobustnessSuite.scala} | 80 ++++++++++++++++++- .../scala/spark/XGBoostGeneralSuite.scala | 62 -------------- .../java/ml/dmlc/xgboost4j/java/Rabit.java | 1 + rabit | 2 +- tests/ci_build/build_jvm_packages.sh | 2 +- tests/ci_build/test_jvm_cross.sh | 3 +- 8 files changed, 85 insertions(+), 69 deletions(-) rename jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/{RabitTrackerRobustnessSuite.scala => RabitRobustnessSuite.scala} (67%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 65614d0c5..f8aa87911 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -78,7 +78,7 @@ else () endif (MINGW OR R_LIB) add_library(rabit STATIC ${RABIT_SOURCES}) target_include_directories(rabit PRIVATE - $ + $ $) set_target_properties(rabit PROPERTIES diff --git a/dmlc-core b/dmlc-core index ac983092e..13d5acb8b 160000 --- a/dmlc-core +++ b/dmlc-core @@ -1 +1 @@ -Subproject commit ac983092ee3b339f76a2d7e7c3b846570218200d +Subproject commit 13d5acb8ba7e79550bbf2f730f1a3944ff0fa68b diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/RabitTrackerRobustnessSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/RabitRobustnessSuite.scala similarity index 67% rename from jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/RabitTrackerRobustnessSuite.scala rename to jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/RabitRobustnessSuite.scala index 276eb1ba6..322131aff 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/RabitTrackerRobustnessSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/RabitRobustnessSuite.scala @@ -16,14 +16,73 @@ package ml.dmlc.xgboost4j.scala.spark +import java.util.concurrent.LinkedBlockingDeque + +import scala.util.Random + import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, RabitTracker => PyRabitTracker} import ml.dmlc.xgboost4j.scala.rabit.{RabitTracker => ScalaRabitTracker} import ml.dmlc.xgboost4j.java.IRabitTracker.TrackerStatus +import ml.dmlc.xgboost4j.scala.DMatrix + import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.FunSuite -class RabitTrackerRobustnessSuite extends FunSuite with PerTest { +class RabitSuite extends FunSuite with PerTest { + + test("training with Scala-implemented Rabit tracker") { + val eval = new EvalError() + val training = buildDataFrame(Classification.train) + val testDM = new DMatrix(Classification.test.iterator) + val paramMap = Map("eta" -> "1", "max_depth" -> "6", + "objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> numWorkers, + "tracker_conf" -> TrackerConf(60 * 60 * 1000, "scala")) + val model = new XGBoostClassifier(paramMap).fit(training) + assert(eval.eval(model._booster.predict(testDM, outPutMargin = true), testDM) < 0.1) + } + + test("test Rabit allreduce to validate Scala-implemented Rabit tracker") { + val vectorLength = 100 + val rdd = sc.parallelize( + (1 to numWorkers * vectorLength).toArray.map { _ => Random.nextFloat() }, numWorkers).cache() + + val tracker = new ScalaRabitTracker(numWorkers) + tracker.start(0) + val trackerEnvs = tracker.getWorkerEnvs + val collectedAllReduceResults = new LinkedBlockingDeque[Array[Float]]() + + val rawData = rdd.mapPartitions { iter => + Iterator(iter.toArray) + }.collect() + + val maxVec = (0 until vectorLength).toArray.map { j => + (0 until numWorkers).toArray.map { i => rawData(i)(j) }.max + } + + val allReduceResults = rdd.mapPartitions { iter => + Rabit.init(trackerEnvs) + val arr = iter.toArray + val results = Rabit.allReduce(arr, Rabit.OpType.MAX) + Rabit.shutdown() + Iterator(results) + }.cache() + + val sparkThread = new Thread() { + override def run(): Unit = { + allReduceResults.foreachPartition(() => _) + val byPartitionResults = allReduceResults.collect() + assert(byPartitionResults(0).length == vectorLength) + collectedAllReduceResults.put(byPartitionResults(0)) + } + } + sparkThread.start() + assert(tracker.waitFor(0L) == 0) + sparkThread.join() + + assert(collectedAllReduceResults.poll().sameElements(maxVec)) + } + test("test Java RabitTracker wrapper's exception handling: it should not hang forever.") { /* Deliberately create new instances of SparkContext in each unit test to avoid reusing the @@ -148,4 +207,23 @@ class RabitTrackerRobustnessSuite extends FunSuite with PerTest { // should fail due to connection timeout assert(tracker.waitFor(0L) == TrackerStatus.FAILURE.getStatusCode) } + + test("should allow the dataframe containing rabit calls to be partially evaluated for" + + " multiple times (ISSUE-4406)") { + val paramMap = Map( + "eta" -> "1", + "max_depth" -> "6", + "silent" -> "1", + "objective" -> "binary:logistic") + val trainingDF = buildDataFrame(Classification.train) + val model = new XGBoostClassifier(paramMap ++ Array("num_round" -> 10, + "num_workers" -> numWorkers)).fit(trainingDF) + val prediction = model.transform(trainingDF) + // a partial evaluation of dataframe will cause rabit initialized but not shutdown in some + // threads + prediction.show() + // a full evaluation here will re-run init and shutdown all rabit proxy + // expecting no error + prediction.collect() + } } diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index 64eca4b67..45ff6e060 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -17,67 +17,17 @@ package ml.dmlc.xgboost4j.scala.spark import java.nio.file.Files -import java.util.concurrent.LinkedBlockingDeque import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} import ml.dmlc.xgboost4j.scala.DMatrix -import ml.dmlc.xgboost4j.scala.rabit.RabitTracker import ml.dmlc.xgboost4j.scala.{XGBoost => SXGBoost, _} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.TaskContext -import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.sql._ import org.scalatest.FunSuite -import scala.util.Random - -import ml.dmlc.xgboost4j.java.Rabit - -import org.apache.spark.ml.feature.VectorAssembler class XGBoostGeneralSuite extends FunSuite with PerTest { - test("test Rabit allreduce to validate Scala-implemented Rabit tracker") { - val vectorLength = 100 - val rdd = sc.parallelize( - (1 to numWorkers * vectorLength).toArray.map { _ => Random.nextFloat() }, numWorkers).cache() - - val tracker = new RabitTracker(numWorkers) - tracker.start(0) - val trackerEnvs = tracker.getWorkerEnvs - val collectedAllReduceResults = new LinkedBlockingDeque[Array[Float]]() - - val rawData = rdd.mapPartitions { iter => - Iterator(iter.toArray) - }.collect() - - val maxVec = (0 until vectorLength).toArray.map { j => - (0 until numWorkers).toArray.map { i => rawData(i)(j) }.max - } - - val allReduceResults = rdd.mapPartitions { iter => - Rabit.init(trackerEnvs) - val arr = iter.toArray - val results = Rabit.allReduce(arr, Rabit.OpType.MAX) - Rabit.shutdown() - Iterator(results) - }.cache() - - val sparkThread = new Thread() { - override def run(): Unit = { - allReduceResults.foreachPartition(() => _) - val byPartitionResults = allReduceResults.collect() - assert(byPartitionResults(0).length == vectorLength) - collectedAllReduceResults.put(byPartitionResults(0)) - } - } - sparkThread.start() - assert(tracker.waitFor(0L) == 0) - sparkThread.join() - - assert(collectedAllReduceResults.poll().sameElements(maxVec)) - } - test("distributed training with the specified worker number") { val trainingRDD = sc.parallelize(Classification.train) val (booster, metrics) = XGBoost.trainDistributed( @@ -101,18 +51,6 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { assert(eval.eval(model._booster.predict(testDM, outPutMargin = true), testDM) < 0.1) } - - test("training with Scala-implemented Rabit tracker") { - val eval = new EvalError() - val training = buildDataFrame(Classification.train) - val testDM = new DMatrix(Classification.test.iterator) - val paramMap = Map("eta" -> "1", "max_depth" -> "6", - "objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> numWorkers, - "tracker_conf" -> TrackerConf(60 * 60 * 1000, "scala")) - val model = new XGBoostClassifier(paramMap).fit(training) - assert(eval.eval(model._booster.predict(testDM, outPutMargin = true), testDM) < 0.1) - } - test("test with quantile hist with monotone_constraints (lossguide)") { val eval = new EvalError() val training = buildDataFrame(Classification.train) diff --git a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/Rabit.java b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/Rabit.java index 710165d4c..35b500757 100644 --- a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/Rabit.java +++ b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/Rabit.java @@ -9,6 +9,7 @@ import java.util.Map; * Rabit global class for synchronization. */ public class Rabit { + public enum OpType implements Serializable { MAX(0), MIN(1), SUM(2), BITWISE_OR(3); diff --git a/rabit b/rabit index 1cc34f01d..a429748e2 160000 --- a/rabit +++ b/rabit @@ -1 +1 @@ -Subproject commit 1cc34f01db56d28e8e80847cf0fc5e3ecf8bb67b +Subproject commit a429748e244f67f6f144a697f3aa1b1978705b11 diff --git a/tests/ci_build/build_jvm_packages.sh b/tests/ci_build/build_jvm_packages.sh index 417d71b66..5dcc95a0f 100755 --- a/tests/ci_build/build_jvm_packages.sh +++ b/tests/ci_build/build_jvm_packages.sh @@ -9,7 +9,7 @@ set -x rm -rf build/ cd jvm-packages -mvn --no-transfer-progress package -DskipTests +mvn --no-transfer-progress package set +x set +e diff --git a/tests/ci_build/test_jvm_cross.sh b/tests/ci_build/test_jvm_cross.sh index e65d1dbd9..9a9f35011 100755 --- a/tests/ci_build/test_jvm_cross.sh +++ b/tests/ci_build/test_jvm_cross.sh @@ -35,8 +35,7 @@ if [ ! -z "$RUN_INTEGRATION_TEST" ] then python3 get_iris.py spark-submit --class ml.dmlc.xgboost4j.scala.example.spark.SparkTraining --master 'local[8]' ./target/xgboost4j-tester-1.0-SNAPSHOT-jar-with-dependencies.jar ${PWD}/iris.csv - # Disabled due to https://github.com/dmlc/xgboost/issues/4406 - #spark-submit --class ml.dmlc.xgboost4j.scala.example.spark.SparkMLlibPipeline --master 'local[8]' ./target/xgboost4j-tester-1.0-SNAPSHOT-jar-with-dependencies.jar ${PWD}/iris.csv ${PWD}/native_model ${PWD}/pipeline_model + spark-submit --class ml.dmlc.xgboost4j.scala.example.spark.SparkMLlibPipeline --master 'local[8]' ./target/xgboost4j-tester-1.0-SNAPSHOT-jar-with-dependencies.jar ${PWD}/iris.csv ${PWD}/native_model ${PWD}/pipeline_model fi set +x