[Breaking][jvm-packages] remove timeoutRequestWorkers parameter (#7839)
This commit is contained in:
parent
11d65fcb21
commit
9fa7ed1743
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -62,11 +62,12 @@ object SparkTraining {
|
|||||||
val Array(train, eval1, eval2, test) = xgbInput.randomSplit(Array(0.6, 0.2, 0.1, 0.1))
|
val Array(train, eval1, eval2, test) = xgbInput.randomSplit(Array(0.6, 0.2, 0.1, 0.1))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* setup "timeout_request_workers" -> 60000L to make this application if it cannot get enough resources
|
* setup spark.scheduler.barrier.maxConcurrentTasksCheck.interval and
|
||||||
* to get 2 workers within 60000 ms
|
* spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures to make this application
|
||||||
|
* if it cannot get enough resources to get 2 workers within interval * maxFailures s
|
||||||
*
|
*
|
||||||
* setup "checkpoint_path" -> "/checkpoints" and "checkpoint_interval" -> 2 to save checkpoint for every
|
* setup "checkpoint_path" -> "/checkpoints" and "checkpoint_interval" -> 2 to save
|
||||||
* two iterations
|
* checkpoint for every two iterations
|
||||||
*/
|
*/
|
||||||
val xgbParam = Map("eta" -> 0.1f,
|
val xgbParam = Map("eta" -> 0.1f,
|
||||||
"max_depth" -> 2,
|
"max_depth" -> 2,
|
||||||
|
|||||||
@ -75,7 +75,6 @@ private[scala] case class XGBoostExecutionParams(
|
|||||||
missing: Float,
|
missing: Float,
|
||||||
allowNonZeroForMissing: Boolean,
|
allowNonZeroForMissing: Boolean,
|
||||||
trackerConf: TrackerConf,
|
trackerConf: TrackerConf,
|
||||||
timeoutRequestWorkers: Long,
|
|
||||||
checkpointParam: Option[ExternalCheckpointParams],
|
checkpointParam: Option[ExternalCheckpointParams],
|
||||||
xgbInputParams: XGBoostExecutionInputParams,
|
xgbInputParams: XGBoostExecutionInputParams,
|
||||||
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
|
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
|
||||||
@ -201,12 +200,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
|
|||||||
case _ => throw new IllegalArgumentException("parameter \"tracker_conf\" must be an " +
|
case _ => throw new IllegalArgumentException("parameter \"tracker_conf\" must be an " +
|
||||||
"instance of TrackerConf.")
|
"instance of TrackerConf.")
|
||||||
}
|
}
|
||||||
val timeoutRequestWorkers: Long = overridedParams.get("timeout_request_workers") match {
|
|
||||||
case None => 0L
|
|
||||||
case Some(interval: Long) => interval
|
|
||||||
case _ => throw new IllegalArgumentException("parameter \"timeout_request_workers\" must be" +
|
|
||||||
" an instance of Long.")
|
|
||||||
}
|
|
||||||
val checkpointParam =
|
val checkpointParam =
|
||||||
ExternalCheckpointParams.extractParams(overridedParams)
|
ExternalCheckpointParams.extractParams(overridedParams)
|
||||||
|
|
||||||
@ -227,7 +220,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
|
|||||||
|
|
||||||
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
|
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
|
||||||
missing, allowNonZeroForMissing, trackerConf,
|
missing, allowNonZeroForMissing, trackerConf,
|
||||||
timeoutRequestWorkers,
|
|
||||||
checkpointParam,
|
checkpointParam,
|
||||||
inputParams,
|
inputParams,
|
||||||
xgbExecEarlyStoppingParams,
|
xgbExecEarlyStoppingParams,
|
||||||
|
|||||||
@ -66,8 +66,6 @@ class XGBoostClassifier (
|
|||||||
|
|
||||||
def setMissing(value: Float): this.type = set(missing, value)
|
def setMissing(value: Float): this.type = set(missing, value)
|
||||||
|
|
||||||
def setTimeoutRequestWorkers(value: Long): this.type = set(timeoutRequestWorkers, value)
|
|
||||||
|
|
||||||
def setCheckpointPath(value: String): this.type = set(checkpointPath, value)
|
def setCheckpointPath(value: String): this.type = set(checkpointPath, value)
|
||||||
|
|
||||||
def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)
|
def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)
|
||||||
|
|||||||
@ -68,8 +68,6 @@ class XGBoostRegressor (
|
|||||||
|
|
||||||
def setMissing(value: Float): this.type = set(missing, value)
|
def setMissing(value: Float): this.type = set(missing, value)
|
||||||
|
|
||||||
def setTimeoutRequestWorkers(value: Long): this.type = set(timeoutRequestWorkers, value)
|
|
||||||
|
|
||||||
def setCheckpointPath(value: String): this.type = set(checkpointPath, value)
|
def setCheckpointPath(value: String): this.type = set(checkpointPath, value)
|
||||||
|
|
||||||
def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)
|
def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)
|
||||||
|
|||||||
@ -112,15 +112,6 @@ private[spark] trait GeneralParams extends Params {
|
|||||||
|
|
||||||
final def getAllowNonZeroForMissingValue: Boolean = $(allowNonZeroForMissing)
|
final def getAllowNonZeroForMissingValue: Boolean = $(allowNonZeroForMissing)
|
||||||
|
|
||||||
/**
|
|
||||||
* the maximum time to wait for the job requesting new workers. default: 30 minutes
|
|
||||||
*/
|
|
||||||
final val timeoutRequestWorkers = new LongParam(this, "timeoutRequestWorkers", "the maximum " +
|
|
||||||
"time to request new Workers if numCores are insufficient. The timeout will be disabled " +
|
|
||||||
"if this value is set smaller than or equal to 0.")
|
|
||||||
|
|
||||||
final def getTimeoutRequestWorkers: Long = $(timeoutRequestWorkers)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The hdfs folder to load and save checkpoint boosters. default: `empty_string`
|
* The hdfs folder to load and save checkpoint boosters. default: `empty_string`
|
||||||
*/
|
*/
|
||||||
@ -181,7 +172,7 @@ private[spark] trait GeneralParams extends Params {
|
|||||||
setDefault(numRound -> 1, numWorkers -> 1, nthread -> 1,
|
setDefault(numRound -> 1, numWorkers -> 1, nthread -> 1,
|
||||||
useExternalMemory -> false, silent -> 0, verbosity -> 1,
|
useExternalMemory -> false, silent -> 0, verbosity -> 1,
|
||||||
customObj -> null, customEval -> null, missing -> Float.NaN,
|
customObj -> null, customEval -> null, missing -> Float.NaN,
|
||||||
trackerConf -> TrackerConf(), seed -> 0, timeoutRequestWorkers -> 30 * 60 * 1000L,
|
trackerConf -> TrackerConf(), seed -> 0,
|
||||||
checkpointPath -> "", checkpointInterval -> -1,
|
checkpointPath -> "", checkpointInterval -> -1,
|
||||||
allowNonZeroForMissing -> false)
|
allowNonZeroForMissing -> false)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -165,18 +165,6 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
|||||||
assert(x < 0.1)
|
assert(x < 0.1)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("training with spark parallelism checks disabled") {
|
|
||||||
val eval = new EvalError()
|
|
||||||
val training = buildDataFrame(Classification.train)
|
|
||||||
val testDM = new DMatrix(Classification.test.iterator)
|
|
||||||
val paramMap = Map("eta" -> "1", "max_depth" -> "6",
|
|
||||||
"objective" -> "binary:logistic", "timeout_request_workers" -> 0L,
|
|
||||||
"num_round" -> 5, "num_workers" -> numWorkers)
|
|
||||||
val model = new XGBoostClassifier(paramMap).fit(training)
|
|
||||||
val x = eval.eval(model._booster.predict(testDM, outPutMargin = true), testDM)
|
|
||||||
assert(x < 0.1)
|
|
||||||
}
|
|
||||||
|
|
||||||
test("repartitionForTrainingGroup with group data") {
|
test("repartitionForTrainingGroup with group data") {
|
||||||
// test different splits to cover the corner cases.
|
// test different splits to cover the corner cases.
|
||||||
for (split <- 1 to 20) {
|
for (split <- 1 to 20) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user