diff --git a/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkTraining.scala b/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkTraining.scala index a16d53c97..17a32bc09 100644 --- a/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkTraining.scala +++ b/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkTraining.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2014 by Contributors + Copyright (c) 2014-2022 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -62,11 +62,12 @@ object SparkTraining { val Array(train, eval1, eval2, test) = xgbInput.randomSplit(Array(0.6, 0.2, 0.1, 0.1)) /** - * setup "timeout_request_workers" -> 60000L to make this application if it cannot get enough resources - * to get 2 workers within 60000 ms + * setup spark.scheduler.barrier.maxConcurrentTasksCheck.interval and + * spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures to make this application + * if it cannot get enough resources to get 2 workers within interval * maxFailures s * - * setup "checkpoint_path" -> "/checkpoints" and "checkpoint_interval" -> 2 to save checkpoint for every - * two iterations + * setup "checkpoint_path" -> "/checkpoints" and "checkpoint_interval" -> 2 to save + * checkpoint for every two iterations */ val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index e6ccb6349..6cfabcfac 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -75,7 +75,6 @@ private[scala] case class XGBoostExecutionParams( missing: Float, allowNonZeroForMissing: Boolean, trackerConf: TrackerConf, - timeoutRequestWorkers: Long, checkpointParam: Option[ExternalCheckpointParams], xgbInputParams: XGBoostExecutionInputParams, earlyStoppingParams: XGBoostExecutionEarlyStoppingParams, @@ -201,12 +200,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s case _ => throw new IllegalArgumentException("parameter \"tracker_conf\" must be an " + "instance of TrackerConf.") } - val timeoutRequestWorkers: Long = overridedParams.get("timeout_request_workers") match { - case None => 0L - case Some(interval: Long) => interval - case _ => throw new IllegalArgumentException("parameter \"timeout_request_workers\" must be" + - " an instance of Long.") - } val checkpointParam = ExternalCheckpointParams.extractParams(overridedParams) @@ -227,7 +220,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval, missing, allowNonZeroForMissing, trackerConf, - timeoutRequestWorkers, checkpointParam, inputParams, xgbExecEarlyStoppingParams, diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala index 3e62e9946..a37a3901f 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala @@ -66,8 +66,6 @@ class XGBoostClassifier ( def setMissing(value: Float): this.type = set(missing, value) - def setTimeoutRequestWorkers(value: Long): this.type = set(timeoutRequestWorkers, value) - def setCheckpointPath(value: String): this.type = set(checkpointPath, value) def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala index 9af52d165..b52ba2a2e 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala @@ -68,8 +68,6 @@ class XGBoostRegressor ( def setMissing(value: Float): this.type = set(missing, value) - def setTimeoutRequestWorkers(value: Long): this.type = set(timeoutRequestWorkers, value) - def setCheckpointPath(value: String): this.type = set(checkpointPath, value) def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/GeneralParams.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/GeneralParams.scala index 2416df0b3..ca0438ca4 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/GeneralParams.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/GeneralParams.scala @@ -112,15 +112,6 @@ private[spark] trait GeneralParams extends Params { final def getAllowNonZeroForMissingValue: Boolean = $(allowNonZeroForMissing) - /** - * the maximum time to wait for the job requesting new workers. default: 30 minutes - */ - final val timeoutRequestWorkers = new LongParam(this, "timeoutRequestWorkers", "the maximum " + - "time to request new Workers if numCores are insufficient. The timeout will be disabled " + - "if this value is set smaller than or equal to 0.") - - final def getTimeoutRequestWorkers: Long = $(timeoutRequestWorkers) - /** * The hdfs folder to load and save checkpoint boosters. default: `empty_string` */ @@ -181,7 +172,7 @@ private[spark] trait GeneralParams extends Params { setDefault(numRound -> 1, numWorkers -> 1, nthread -> 1, useExternalMemory -> false, silent -> 0, verbosity -> 1, customObj -> null, customEval -> null, missing -> Float.NaN, - trackerConf -> TrackerConf(), seed -> 0, timeoutRequestWorkers -> 30 * 60 * 1000L, + trackerConf -> TrackerConf(), seed -> 0, checkpointPath -> "", checkpointInterval -> -1, allowNonZeroForMissing -> false) } diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index cd13e4b6c..0bf8c2fbb 100755 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -165,18 +165,6 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest { assert(x < 0.1) } - test("training with spark parallelism checks disabled") { - val eval = new EvalError() - val training = buildDataFrame(Classification.train) - val testDM = new DMatrix(Classification.test.iterator) - val paramMap = Map("eta" -> "1", "max_depth" -> "6", - "objective" -> "binary:logistic", "timeout_request_workers" -> 0L, - "num_round" -> 5, "num_workers" -> numWorkers) - val model = new XGBoostClassifier(paramMap).fit(training) - val x = eval.eval(model._booster.predict(testDM, outPutMargin = true), testDM) - assert(x < 0.1) - } - test("repartitionForTrainingGroup with group data") { // test different splits to cover the corner cases. for (split <- 1 to 20) {