[backport] jvm-packages 1.6.1 (#7849)
* [jvm-packages] move the dmatrix building into rabit context (#7823) This fixes the QuantileDeviceDMatrix in distributed environment. * [doc] update the jvm tutorial to 1.6.1 [skip ci] (#7834) * [Breaking][jvm-packages] Use barrier execution mode (#7836) With the introduction of the barrier execution mode. we don't need to kill SparkContext when some xgboost tasks failed. Instead, Spark will handle the errors for us. So in this PR, `killSparkContextOnWorkerFailure` parameter is deleted. * [doc] remove the doc about killing SparkContext [skip ci] (#7840) * [jvm-package] remove the coalesce in barrier mode (#7846) * [jvm-packages] Fix model compatibility (#7845) * Ignore all Java exceptions when looking for Linux musl support (#7844) Co-authored-by: Bobby Wang <wbo4958@gmail.com> Co-authored-by: Michael Allman <msa@allman.ms>
This commit is contained in:
parent
f75c007f27
commit
f4eb6b984e
@ -1,5 +1,5 @@
|
|||||||
#############################################
|
#############################################
|
||||||
XGBoost4J-Spark-GPU Tutorial (version 1.6.0+)
|
XGBoost4J-Spark-GPU Tutorial (version 1.6.1+)
|
||||||
#############################################
|
#############################################
|
||||||
|
|
||||||
**XGBoost4J-Spark-GPU** is an open source library aiming to accelerate distributed XGBoost training on Apache Spark cluster from
|
**XGBoost4J-Spark-GPU** is an open source library aiming to accelerate distributed XGBoost training on Apache Spark cluster from
|
||||||
@ -220,7 +220,7 @@ application jar is iris-1.0.0.jar
|
|||||||
|
|
||||||
cudf_version=22.02.0
|
cudf_version=22.02.0
|
||||||
rapids_version=22.02.0
|
rapids_version=22.02.0
|
||||||
xgboost_version=1.6.0
|
xgboost_version=1.6.1
|
||||||
main_class=Iris
|
main_class=Iris
|
||||||
app_jar=iris-1.0.0.jar
|
app_jar=iris-1.0.0.jar
|
||||||
|
|
||||||
|
|||||||
@ -16,12 +16,6 @@ This tutorial is to cover the end-to-end process to build a machine learning pip
|
|||||||
* Building a Machine Learning Pipeline with XGBoost4J-Spark
|
* Building a Machine Learning Pipeline with XGBoost4J-Spark
|
||||||
* Running XGBoost4J-Spark in Production
|
* Running XGBoost4J-Spark in Production
|
||||||
|
|
||||||
.. note::
|
|
||||||
|
|
||||||
**SparkContext will be stopped by default when XGBoost training task fails**.
|
|
||||||
|
|
||||||
XGBoost4J-Spark 1.2.0+ exposes a parameter **kill_spark_context_on_worker_failure**. Set **kill_spark_context_on_worker_failure** to **false** so that the SparkContext will not be stopping on training failure. Instead of stopping the SparkContext, XGBoost4J-Spark will throw an exception instead. Users who want to re-use the SparkContext should wrap the training code in a try-catch block.
|
|
||||||
|
|
||||||
.. contents::
|
.. contents::
|
||||||
:backlinks: none
|
:backlinks: none
|
||||||
:local:
|
:local:
|
||||||
@ -129,7 +123,7 @@ labels. A DataFrame like this (containing vector-represented features and numeri
|
|||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
||||||
There is no need to assemble feature columns from version 1.6.0+. Instead, users can specify an array of
|
There is no need to assemble feature columns from version 1.6.1+. Instead, users can specify an array of
|
||||||
feture column names by ``setFeaturesCol(value: Array[String])`` and XGBoost4j-Spark will do it.
|
feture column names by ``setFeaturesCol(value: Array[String])`` and XGBoost4j-Spark will do it.
|
||||||
|
|
||||||
Dealing with missing values
|
Dealing with missing values
|
||||||
|
|||||||
@ -69,7 +69,7 @@ public class BoosterTest {
|
|||||||
.hasHeader().build();
|
.hasHeader().build();
|
||||||
|
|
||||||
int maxBin = 16;
|
int maxBin = 16;
|
||||||
int round = 100;
|
int round = 10;
|
||||||
//set params
|
//set params
|
||||||
Map<String, Object> paramMap = new HashMap<String, Object>() {
|
Map<String, Object> paramMap = new HashMap<String, Object>() {
|
||||||
{
|
{
|
||||||
|
|||||||
@ -56,18 +56,20 @@ class GpuPreXGBoost extends PreXGBoostProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||||
*
|
*
|
||||||
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
|
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
|
||||||
* @param dataset the training data
|
* @param dataset the training data
|
||||||
* @param params all user defined and defaulted params
|
* @param params all user defined and defaulted params
|
||||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||||
* RDD[Watches] will be used as the training input
|
* Boolean if building DMatrix in rabit context
|
||||||
|
* RDD[() => Watches] will be used as the training input
|
||||||
* Option[ RDD[_] ] is the optional cached RDD
|
* Option[ RDD[_] ] is the optional cached RDD
|
||||||
*/
|
*/
|
||||||
override def buildDatasetToRDD(estimator: Estimator[_],
|
override def buildDatasetToRDD(estimator: Estimator[_],
|
||||||
dataset: Dataset[_],
|
dataset: Dataset[_],
|
||||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
params: Map[String, Any]):
|
||||||
|
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||||
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
|
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,19 +118,21 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||||
*
|
*
|
||||||
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
||||||
* @param dataset the training data
|
* @param dataset the training data
|
||||||
* @param params all user defined and defaulted params
|
* @param params all user defined and defaulted params
|
||||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||||
* RDD[Watches] will be used as the training input
|
* Boolean if building DMatrix in rabit context
|
||||||
|
* RDD[() => Watches] will be used as the training input to build DMatrix
|
||||||
* Option[ RDD[_] ] is the optional cached RDD
|
* Option[ RDD[_] ] is the optional cached RDD
|
||||||
*/
|
*/
|
||||||
override def buildDatasetToRDD(
|
override def buildDatasetToRDD(
|
||||||
estimator: Estimator[_],
|
estimator: Estimator[_],
|
||||||
dataset: Dataset[_],
|
dataset: Dataset[_],
|
||||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
params: Map[String, Any]):
|
||||||
|
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||||
|
|
||||||
val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
|
val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
|
||||||
estimator match {
|
estimator match {
|
||||||
@ -166,7 +170,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
|||||||
xgbExecParams: XGBoostExecutionParams =>
|
xgbExecParams: XGBoostExecutionParams =>
|
||||||
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
|
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
|
||||||
xgbExecParams.cacheTrainingSet)
|
xgbExecParams.cacheTrainingSet)
|
||||||
(buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
|
(true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -403,14 +407,9 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private def repartitionInputData(dataFrame: DataFrame, nWorkers: Int): DataFrame = {
|
private def repartitionInputData(dataFrame: DataFrame, nWorkers: Int): DataFrame = {
|
||||||
// We can't check dataFrame.rdd.getNumPartitions == nWorkers here, since dataFrame.rdd is
|
// we can't involve any coalesce operation here, since Barrier mode will check
|
||||||
// a lazy variable. If we call it here, we will not directly extract RDD[Table] again,
|
// the RDD patterns which does not allow coalesce.
|
||||||
// instead, we will involve Columnar -> Row -> Columnar and decrease the performance
|
dataFrame.repartition(nWorkers)
|
||||||
if (nWorkers == 1) {
|
|
||||||
dataFrame.coalesce(1)
|
|
||||||
} else {
|
|
||||||
dataFrame.repartition(nWorkers)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def repartitionForGroup(
|
private def repartitionForGroup(
|
||||||
@ -448,7 +447,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
|||||||
private def buildRDDWatches(
|
private def buildRDDWatches(
|
||||||
dataMap: Map[String, ColumnDataBatch],
|
dataMap: Map[String, ColumnDataBatch],
|
||||||
xgbExeParams: XGBoostExecutionParams,
|
xgbExeParams: XGBoostExecutionParams,
|
||||||
noEvalSet: Boolean): RDD[Watches] = {
|
noEvalSet: Boolean): RDD[() => Watches] = {
|
||||||
|
|
||||||
val sc = dataMap(TRAIN_NAME).rawDF.sparkSession.sparkContext
|
val sc = dataMap(TRAIN_NAME).rawDF.sparkSession.sparkContext
|
||||||
val maxBin = xgbExeParams.toMap.getOrElse("max_bin", 256).asInstanceOf[Int]
|
val maxBin = xgbExeParams.toMap.getOrElse("max_bin", 256).asInstanceOf[Int]
|
||||||
@ -459,7 +458,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
|||||||
GpuUtils.toColumnarRdd(dataMap(TRAIN_NAME).rawDF).mapPartitions({
|
GpuUtils.toColumnarRdd(dataMap(TRAIN_NAME).rawDF).mapPartitions({
|
||||||
iter =>
|
iter =>
|
||||||
val iterColBatch = iter.map(table => new GpuColumnBatch(table, null))
|
val iterColBatch = iter.map(table => new GpuColumnBatch(table, null))
|
||||||
Iterator(buildWatches(
|
Iterator(() => buildWatches(
|
||||||
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
|
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
|
||||||
colIndicesForTrain, iterColBatch, maxBin))
|
colIndicesForTrain, iterColBatch, maxBin))
|
||||||
})
|
})
|
||||||
@ -469,7 +468,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
|||||||
val nameAndColIndices = dataMap.map(nc => (nc._1, nc._2.colIndices))
|
val nameAndColIndices = dataMap.map(nc => (nc._1, nc._2.colIndices))
|
||||||
coPartitionForGpu(dataMap, sc, xgbExeParams.numWorkers).mapPartitions {
|
coPartitionForGpu(dataMap, sc, xgbExeParams.numWorkers).mapPartitions {
|
||||||
nameAndColumnBatchIter =>
|
nameAndColumnBatchIter =>
|
||||||
Iterator(buildWatchesWithEval(
|
Iterator(() => buildWatchesWithEval(
|
||||||
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
|
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
|
||||||
nameAndColIndices, nameAndColumnBatchIter, maxBin))
|
nameAndColIndices, nameAndColumnBatchIter, maxBin))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -39,13 +39,8 @@ trait GpuTestSuite extends FunSuite with TmpFolderSuite {
|
|||||||
|
|
||||||
def enableCsvConf(): SparkConf = {
|
def enableCsvConf(): SparkConf = {
|
||||||
new SparkConf()
|
new SparkConf()
|
||||||
.set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true")
|
.set("spark.rapids.sql.csv.read.float.enabled", "true")
|
||||||
.set(RapidsConf.ENABLE_READ_CSV_BYTES.key, "true")
|
.set("spark.rapids.sql.csv.read.double.enabled", "true")
|
||||||
.set(RapidsConf.ENABLE_READ_CSV_SHORTS.key, "true")
|
|
||||||
.set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")
|
|
||||||
.set(RapidsConf.ENABLE_READ_CSV_LONGS.key, "true")
|
|
||||||
.set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")
|
|
||||||
.set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def withGpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
|
def withGpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
|
||||||
@ -246,12 +241,13 @@ object SparkSessionHolder extends Logging {
|
|||||||
Locale.setDefault(Locale.US)
|
Locale.setDefault(Locale.US)
|
||||||
|
|
||||||
val builder = SparkSession.builder()
|
val builder = SparkSession.builder()
|
||||||
.master("local[1]")
|
.master("local[2]")
|
||||||
.config("spark.sql.adaptive.enabled", "false")
|
.config("spark.sql.adaptive.enabled", "false")
|
||||||
.config("spark.rapids.sql.enabled", "false")
|
.config("spark.rapids.sql.enabled", "false")
|
||||||
.config("spark.rapids.sql.test.enabled", "false")
|
.config("spark.rapids.sql.test.enabled", "false")
|
||||||
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
|
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
|
||||||
.config("spark.rapids.memory.gpu.pooling.enabled", "false") // Disable RMM for unit tests.
|
.config("spark.rapids.memory.gpu.pooling.enabled", "false") // Disable RMM for unit tests.
|
||||||
|
.config("spark.sql.files.maxPartitionBytes", "1000")
|
||||||
.appName("XGBoost4j-Spark-Gpu unit test")
|
.appName("XGBoost4j-Spark-Gpu unit test")
|
||||||
|
|
||||||
builder.getOrCreate()
|
builder.getOrCreate()
|
||||||
|
|||||||
@ -96,19 +96,21 @@ object PreXGBoost extends PreXGBoostProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||||
*
|
*
|
||||||
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
||||||
* @param dataset the training data
|
* @param dataset the training data
|
||||||
* @param params all user defined and defaulted params
|
* @param params all user defined and defaulted params
|
||||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||||
* RDD[Watches] will be used as the training input
|
* Boolean if building DMatrix in rabit context
|
||||||
|
* RDD[() => Watches] will be used as the training input
|
||||||
* Option[RDD[_]\] is the optional cached RDD
|
* Option[RDD[_]\] is the optional cached RDD
|
||||||
*/
|
*/
|
||||||
override def buildDatasetToRDD(
|
override def buildDatasetToRDD(
|
||||||
estimator: Estimator[_],
|
estimator: Estimator[_],
|
||||||
dataset: Dataset[_],
|
dataset: Dataset[_],
|
||||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
params: Map[String, Any]): XGBoostExecutionParams =>
|
||||||
|
(Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||||
|
|
||||||
if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
|
if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
|
||||||
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
|
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
|
||||||
@ -170,12 +172,12 @@ object PreXGBoost extends PreXGBoostProvider {
|
|||||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||||
} else None
|
} else None
|
||||||
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||||
case Right(trainingData) =>
|
case Right(trainingData) =>
|
||||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||||
} else None
|
} else None
|
||||||
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -311,17 +313,18 @@ object PreXGBoost extends PreXGBoostProvider {
|
|||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[Watches]
|
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[() => Watches]
|
||||||
*
|
*
|
||||||
* @param trainingSet the input training RDD[XGBLabeledPoint]
|
* @param trainingSet the input training RDD[XGBLabeledPoint]
|
||||||
* @param evalRDDMap the eval set
|
* @param evalRDDMap the eval set
|
||||||
* @param hasGroup if has group
|
* @param hasGroup if has group
|
||||||
* @return function to build (RDD[Watches], the cached RDD)
|
* @return function to build (RDD[() => Watches], the cached RDD)
|
||||||
*/
|
*/
|
||||||
private[spark] def buildRDDLabeledPointToRDDWatches(
|
private[spark] def buildRDDLabeledPointToRDDWatches(
|
||||||
trainingSet: RDD[XGBLabeledPoint],
|
trainingSet: RDD[XGBLabeledPoint],
|
||||||
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
|
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
|
||||||
hasGroup: Boolean = false): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
hasGroup: Boolean = false):
|
||||||
|
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||||
|
|
||||||
xgbExecParams: XGBoostExecutionParams =>
|
xgbExecParams: XGBoostExecutionParams =>
|
||||||
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
|
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
|
||||||
@ -329,12 +332,12 @@ object PreXGBoost extends PreXGBoostProvider {
|
|||||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||||
} else None
|
} else None
|
||||||
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||||
case Right(trainingData) =>
|
case Right(trainingData) =>
|
||||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||||
} else None
|
} else None
|
||||||
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -374,34 +377,34 @@ object PreXGBoost extends PreXGBoostProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build RDD[Watches] for Ranking
|
* Build RDD[() => Watches] for Ranking
|
||||||
* @param trainingData the training data RDD
|
* @param trainingData the training data RDD
|
||||||
* @param xgbExecutionParams xgboost execution params
|
* @param xgbExecutionParams xgboost execution params
|
||||||
* @param evalSetsMap the eval RDD
|
* @param evalSetsMap the eval RDD
|
||||||
* @return RDD[Watches]
|
* @return RDD[() => Watches]
|
||||||
*/
|
*/
|
||||||
private def trainForRanking(
|
private def trainForRanking(
|
||||||
trainingData: RDD[Array[XGBLabeledPoint]],
|
trainingData: RDD[Array[XGBLabeledPoint]],
|
||||||
xgbExecutionParam: XGBoostExecutionParams,
|
xgbExecutionParam: XGBoostExecutionParams,
|
||||||
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
|
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
|
||||||
if (evalSetsMap.isEmpty) {
|
if (evalSetsMap.isEmpty) {
|
||||||
trainingData.mapPartitions(labeledPointGroups => {
|
trainingData.mapPartitions(labeledPointGroups => {
|
||||||
val watches = Watches.buildWatchesWithGroup(xgbExecutionParam,
|
val buildWatches = () => Watches.buildWatchesWithGroup(xgbExecutionParam,
|
||||||
DataUtils.processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing,
|
DataUtils.processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing,
|
||||||
xgbExecutionParam.allowNonZeroForMissing),
|
xgbExecutionParam.allowNonZeroForMissing),
|
||||||
getCacheDirName(xgbExecutionParam.useExternalMemory))
|
getCacheDirName(xgbExecutionParam.useExternalMemory))
|
||||||
Iterator.single(watches)
|
Iterator.single(buildWatches)
|
||||||
}).cache()
|
}).cache()
|
||||||
} else {
|
} else {
|
||||||
coPartitionGroupSets(trainingData, evalSetsMap, xgbExecutionParam.numWorkers).mapPartitions(
|
coPartitionGroupSets(trainingData, evalSetsMap, xgbExecutionParam.numWorkers).mapPartitions(
|
||||||
labeledPointGroupSets => {
|
labeledPointGroupSets => {
|
||||||
val watches = Watches.buildWatchesWithGroup(
|
val buildWatches = () => Watches.buildWatchesWithGroup(
|
||||||
labeledPointGroupSets.map {
|
labeledPointGroupSets.map {
|
||||||
case (name, iter) => (name, DataUtils.processMissingValuesWithGroup(iter,
|
case (name, iter) => (name, DataUtils.processMissingValuesWithGroup(iter,
|
||||||
xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing))
|
xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing))
|
||||||
},
|
},
|
||||||
getCacheDirName(xgbExecutionParam.useExternalMemory))
|
getCacheDirName(xgbExecutionParam.useExternalMemory))
|
||||||
Iterator.single(watches)
|
Iterator.single(buildWatches)
|
||||||
}).cache()
|
}).cache()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -462,35 +465,35 @@ object PreXGBoost extends PreXGBoostProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build RDD[Watches] for Non-Ranking
|
* Build RDD[() => Watches] for Non-Ranking
|
||||||
* @param trainingData the training data RDD
|
* @param trainingData the training data RDD
|
||||||
* @param xgbExecutionParams xgboost execution params
|
* @param xgbExecutionParams xgboost execution params
|
||||||
* @param evalSetsMap the eval RDD
|
* @param evalSetsMap the eval RDD
|
||||||
* @return RDD[Watches]
|
* @return RDD[() => Watches]
|
||||||
*/
|
*/
|
||||||
private def trainForNonRanking(
|
private def trainForNonRanking(
|
||||||
trainingData: RDD[XGBLabeledPoint],
|
trainingData: RDD[XGBLabeledPoint],
|
||||||
xgbExecutionParams: XGBoostExecutionParams,
|
xgbExecutionParams: XGBoostExecutionParams,
|
||||||
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
|
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
|
||||||
if (evalSetsMap.isEmpty) {
|
if (evalSetsMap.isEmpty) {
|
||||||
trainingData.mapPartitions { labeledPoints => {
|
trainingData.mapPartitions { labeledPoints => {
|
||||||
val watches = Watches.buildWatches(xgbExecutionParams,
|
val buildWatches = () => Watches.buildWatches(xgbExecutionParams,
|
||||||
DataUtils.processMissingValues(labeledPoints, xgbExecutionParams.missing,
|
DataUtils.processMissingValues(labeledPoints, xgbExecutionParams.missing,
|
||||||
xgbExecutionParams.allowNonZeroForMissing),
|
xgbExecutionParams.allowNonZeroForMissing),
|
||||||
getCacheDirName(xgbExecutionParams.useExternalMemory))
|
getCacheDirName(xgbExecutionParams.useExternalMemory))
|
||||||
Iterator.single(watches)
|
Iterator.single(buildWatches)
|
||||||
}}.cache()
|
}}.cache()
|
||||||
} else {
|
} else {
|
||||||
coPartitionNoGroupSets(trainingData, evalSetsMap, xgbExecutionParams.numWorkers).
|
coPartitionNoGroupSets(trainingData, evalSetsMap, xgbExecutionParams.numWorkers).
|
||||||
mapPartitions {
|
mapPartitions {
|
||||||
nameAndLabeledPointSets =>
|
nameAndLabeledPointSets =>
|
||||||
val watches = Watches.buildWatches(
|
val buildWatches = () => Watches.buildWatches(
|
||||||
nameAndLabeledPointSets.map {
|
nameAndLabeledPointSets.map {
|
||||||
case (name, iter) => (name, DataUtils.processMissingValues(iter,
|
case (name, iter) => (name, DataUtils.processMissingValues(iter,
|
||||||
xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing))
|
xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing))
|
||||||
},
|
},
|
||||||
getCacheDirName(xgbExecutionParams.useExternalMemory))
|
getCacheDirName(xgbExecutionParams.useExternalMemory))
|
||||||
Iterator.single(watches)
|
Iterator.single(buildWatches)
|
||||||
}.cache()
|
}.cache()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2021 by Contributors
|
Copyright (c) 2021-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -45,19 +45,21 @@ private[scala] trait PreXGBoostProvider {
|
|||||||
def transformSchema(xgboostEstimator: XGBoostEstimatorCommon, schema: StructType): StructType
|
def transformSchema(xgboostEstimator: XGBoostEstimatorCommon, schema: StructType): StructType
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||||
*
|
*
|
||||||
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
||||||
* @param dataset the training data
|
* @param dataset the training data
|
||||||
* @param params all user defined and defaulted params
|
* @param params all user defined and defaulted params
|
||||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||||
* RDD[Watches] will be used as the training input
|
* Boolean if building DMatrix in rabit context
|
||||||
|
* RDD[() => Watches] will be used as the training input to build DMatrix
|
||||||
* Option[ RDD[_] ] is the optional cached RDD
|
* Option[ RDD[_] ] is the optional cached RDD
|
||||||
*/
|
*/
|
||||||
def buildDatasetToRDD(
|
def buildDatasetToRDD(
|
||||||
estimator: Estimator[_],
|
estimator: Estimator[_],
|
||||||
dataset: Dataset[_],
|
dataset: Dataset[_],
|
||||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]])
|
params: Map[String, Any]):
|
||||||
|
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform Dataset
|
* Transform Dataset
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import java.io.File
|
|||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
|
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
|
||||||
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
|
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
|
||||||
import ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams
|
import ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams
|
||||||
@ -30,8 +31,9 @@ import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
|||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
import org.apache.commons.logging.LogFactory
|
import org.apache.commons.logging.LogFactory
|
||||||
import org.apache.hadoop.fs.FileSystem
|
import org.apache.hadoop.fs.FileSystem
|
||||||
|
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.{SparkContext, SparkParallelismTracker, TaskContext}
|
import org.apache.spark.{SparkContext, TaskContext}
|
||||||
import org.apache.spark.sql.SparkSession
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -79,8 +81,7 @@ private[scala] case class XGBoostExecutionParams(
|
|||||||
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
|
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
|
||||||
cacheTrainingSet: Boolean,
|
cacheTrainingSet: Boolean,
|
||||||
treeMethod: Option[String],
|
treeMethod: Option[String],
|
||||||
isLocal: Boolean,
|
isLocal: Boolean) {
|
||||||
killSparkContextOnWorkerFailure: Boolean) {
|
|
||||||
|
|
||||||
private var rawParamMap: Map[String, Any] = _
|
private var rawParamMap: Map[String, Any] = _
|
||||||
|
|
||||||
@ -224,9 +225,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
|
|||||||
val cacheTrainingSet = overridedParams.getOrElse("cache_training_set", false)
|
val cacheTrainingSet = overridedParams.getOrElse("cache_training_set", false)
|
||||||
.asInstanceOf[Boolean]
|
.asInstanceOf[Boolean]
|
||||||
|
|
||||||
val killSparkContext = overridedParams.getOrElse("kill_spark_context_on_worker_failure", true)
|
|
||||||
.asInstanceOf[Boolean]
|
|
||||||
|
|
||||||
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
|
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
|
||||||
missing, allowNonZeroForMissing, trackerConf,
|
missing, allowNonZeroForMissing, trackerConf,
|
||||||
timeoutRequestWorkers,
|
timeoutRequestWorkers,
|
||||||
@ -235,8 +233,7 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
|
|||||||
xgbExecEarlyStoppingParams,
|
xgbExecEarlyStoppingParams,
|
||||||
cacheTrainingSet,
|
cacheTrainingSet,
|
||||||
treeMethod,
|
treeMethod,
|
||||||
isLocal,
|
isLocal)
|
||||||
killSparkContext)
|
|
||||||
xgbExecParam.setRawParamMap(overridedParams)
|
xgbExecParam.setRawParamMap(overridedParams)
|
||||||
xgbExecParam
|
xgbExecParam
|
||||||
}
|
}
|
||||||
@ -283,13 +280,8 @@ object XGBoost extends Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def buildDistributedBooster(
|
private def buildWatchesAndCheck(buildWatchesFun: () => Watches): Watches = {
|
||||||
watches: Watches,
|
val watches = buildWatchesFun()
|
||||||
xgbExecutionParam: XGBoostExecutionParams,
|
|
||||||
rabitEnv: java.util.Map[String, String],
|
|
||||||
obj: ObjectiveTrait,
|
|
||||||
eval: EvalTrait,
|
|
||||||
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {
|
|
||||||
// to workaround the empty partitions in training dataset,
|
// to workaround the empty partitions in training dataset,
|
||||||
// this might not be the best efficient implementation, see
|
// this might not be the best efficient implementation, see
|
||||||
// (https://github.com/dmlc/xgboost/issues/1277)
|
// (https://github.com/dmlc/xgboost/issues/1277)
|
||||||
@ -298,14 +290,39 @@ object XGBoost extends Serializable {
|
|||||||
s"detected an empty partition in the training data, partition ID:" +
|
s"detected an empty partition in the training data, partition ID:" +
|
||||||
s" ${TaskContext.getPartitionId()}")
|
s" ${TaskContext.getPartitionId()}")
|
||||||
}
|
}
|
||||||
|
watches
|
||||||
|
}
|
||||||
|
|
||||||
|
private def buildDistributedBooster(
|
||||||
|
buildDMatrixInRabit: Boolean,
|
||||||
|
buildWatches: () => Watches,
|
||||||
|
xgbExecutionParam: XGBoostExecutionParams,
|
||||||
|
rabitEnv: java.util.Map[String, String],
|
||||||
|
obj: ObjectiveTrait,
|
||||||
|
eval: EvalTrait,
|
||||||
|
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {
|
||||||
|
|
||||||
|
var watches: Watches = null
|
||||||
|
if (!buildDMatrixInRabit) {
|
||||||
|
// for CPU pipeline, we need to build DMatrix out of rabit context
|
||||||
|
watches = buildWatchesAndCheck(buildWatches)
|
||||||
|
}
|
||||||
|
|
||||||
val taskId = TaskContext.getPartitionId().toString
|
val taskId = TaskContext.getPartitionId().toString
|
||||||
val attempt = TaskContext.get().attemptNumber.toString
|
val attempt = TaskContext.get().attemptNumber.toString
|
||||||
rabitEnv.put("DMLC_TASK_ID", taskId)
|
rabitEnv.put("DMLC_TASK_ID", taskId)
|
||||||
rabitEnv.put("DMLC_NUM_ATTEMPT", attempt)
|
rabitEnv.put("DMLC_NUM_ATTEMPT", attempt)
|
||||||
val numRounds = xgbExecutionParam.numRounds
|
val numRounds = xgbExecutionParam.numRounds
|
||||||
val makeCheckpoint = xgbExecutionParam.checkpointParam.isDefined && taskId.toInt == 0
|
val makeCheckpoint = xgbExecutionParam.checkpointParam.isDefined && taskId.toInt == 0
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Rabit.init(rabitEnv)
|
Rabit.init(rabitEnv)
|
||||||
|
|
||||||
|
if (buildDMatrixInRabit) {
|
||||||
|
// for GPU pipeline, we need to move dmatrix building into rabit context
|
||||||
|
watches = buildWatchesAndCheck(buildWatches)
|
||||||
|
}
|
||||||
|
|
||||||
val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds
|
val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds
|
||||||
val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds))
|
val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds))
|
||||||
val externalCheckpointParams = xgbExecutionParam.checkpointParam
|
val externalCheckpointParams = xgbExecutionParam.checkpointParam
|
||||||
@ -331,14 +348,18 @@ object XGBoost extends Serializable {
|
|||||||
watches.toMap, metrics, obj, eval,
|
watches.toMap, metrics, obj, eval,
|
||||||
earlyStoppingRound = numEarlyStoppingRounds, prevBooster)
|
earlyStoppingRound = numEarlyStoppingRounds, prevBooster)
|
||||||
}
|
}
|
||||||
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
|
if (TaskContext.get().partitionId() == 0) {
|
||||||
|
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
|
||||||
|
} else {
|
||||||
|
Iterator.empty
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case xgbException: XGBoostError =>
|
case xgbException: XGBoostError =>
|
||||||
logger.error(s"XGBooster worker $taskId has failed $attempt times due to ", xgbException)
|
logger.error(s"XGBooster worker $taskId has failed $attempt times due to ", xgbException)
|
||||||
throw xgbException
|
throw xgbException
|
||||||
} finally {
|
} finally {
|
||||||
Rabit.shutdown()
|
Rabit.shutdown()
|
||||||
watches.delete()
|
if (watches != null) watches.delete()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -364,7 +385,7 @@ object XGBoost extends Serializable {
|
|||||||
@throws(classOf[XGBoostError])
|
@throws(classOf[XGBoostError])
|
||||||
private[spark] def trainDistributed(
|
private[spark] def trainDistributed(
|
||||||
sc: SparkContext,
|
sc: SparkContext,
|
||||||
buildTrainingData: XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]),
|
buildTrainingData: XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]),
|
||||||
params: Map[String, Any]):
|
params: Map[String, Any]):
|
||||||
(Booster, Map[String, Array[Float]]) = {
|
(Booster, Map[String, Array[Float]]) = {
|
||||||
|
|
||||||
@ -383,50 +404,36 @@ object XGBoost extends Serializable {
|
|||||||
}.orNull
|
}.orNull
|
||||||
|
|
||||||
// Get the training data RDD and the cachedRDD
|
// Get the training data RDD and the cachedRDD
|
||||||
val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
|
val (buildDMatrixInRabit, trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Train for every ${savingRound} rounds and save the partially completed booster
|
// Train for every ${savingRound} rounds and save the partially completed booster
|
||||||
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
|
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
|
||||||
val (booster, metrics) = try {
|
val (booster, metrics) = try {
|
||||||
val parallelismTracker = new SparkParallelismTracker(sc,
|
|
||||||
xgbExecParams.timeoutRequestWorkers,
|
|
||||||
xgbExecParams.numWorkers,
|
|
||||||
xgbExecParams.killSparkContextOnWorkerFailure)
|
|
||||||
|
|
||||||
tracker.getWorkerEnvs().putAll(xgbRabitParams)
|
tracker.getWorkerEnvs().putAll(xgbRabitParams)
|
||||||
val rabitEnv = tracker.getWorkerEnvs
|
val rabitEnv = tracker.getWorkerEnvs
|
||||||
|
|
||||||
val boostersAndMetrics = trainingRDD.mapPartitions { iter => {
|
val boostersAndMetrics = trainingRDD.barrier().mapPartitions { iter => {
|
||||||
var optionWatches: Option[Watches] = None
|
var optionWatches: Option[() => Watches] = None
|
||||||
|
|
||||||
// take the first Watches to train
|
// take the first Watches to train
|
||||||
if (iter.hasNext) {
|
if (iter.hasNext) {
|
||||||
optionWatches = Some(iter.next())
|
optionWatches = Some(iter.next())
|
||||||
}
|
}
|
||||||
|
|
||||||
optionWatches.map { watches => buildDistributedBooster(watches, xgbExecParams, rabitEnv,
|
optionWatches.map { buildWatches => buildDistributedBooster(buildDMatrixInRabit,
|
||||||
xgbExecParams.obj, xgbExecParams.eval, prevBooster)}
|
buildWatches, xgbExecParams, rabitEnv, xgbExecParams.obj,
|
||||||
|
xgbExecParams.eval, prevBooster)}
|
||||||
.getOrElse(throw new RuntimeException("No Watches to train"))
|
.getOrElse(throw new RuntimeException("No Watches to train"))
|
||||||
|
|
||||||
}}.cache()
|
}}
|
||||||
|
|
||||||
val sparkJobThread = new Thread() {
|
|
||||||
override def run() {
|
|
||||||
// force the job
|
|
||||||
boostersAndMetrics.foreachPartition(() => _)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sparkJobThread.setUncaughtExceptionHandler(tracker)
|
|
||||||
|
|
||||||
val trackerReturnVal = parallelismTracker.execute {
|
|
||||||
sparkJobThread.start()
|
|
||||||
tracker.waitFor(0L)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
val (booster, metrics) = boostersAndMetrics.collect()(0)
|
||||||
|
val trackerReturnVal = tracker.waitFor(0L)
|
||||||
logger.info(s"Rabit returns with exit code $trackerReturnVal")
|
logger.info(s"Rabit returns with exit code $trackerReturnVal")
|
||||||
val (booster, metrics) = postTrackerReturnProcessing(trackerReturnVal,
|
if (trackerReturnVal != 0) {
|
||||||
boostersAndMetrics, sparkJobThread)
|
throw new XGBoostError("XGBoostModel training failed.")
|
||||||
|
}
|
||||||
(booster, metrics)
|
(booster, metrics)
|
||||||
} finally {
|
} finally {
|
||||||
tracker.stop()
|
tracker.stop()
|
||||||
@ -446,42 +453,12 @@ object XGBoost extends Serializable {
|
|||||||
case t: Throwable =>
|
case t: Throwable =>
|
||||||
// if the job was aborted due to an exception
|
// if the job was aborted due to an exception
|
||||||
logger.error("the job was aborted due to ", t)
|
logger.error("the job was aborted due to ", t)
|
||||||
if (xgbExecParams.killSparkContextOnWorkerFailure) {
|
|
||||||
sc.stop()
|
|
||||||
}
|
|
||||||
throw t
|
throw t
|
||||||
} finally {
|
} finally {
|
||||||
optionalCachedRDD.foreach(_.unpersist())
|
optionalCachedRDD.foreach(_.unpersist())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def postTrackerReturnProcessing(
|
|
||||||
trackerReturnVal: Int,
|
|
||||||
distributedBoostersAndMetrics: RDD[(Booster, Map[String, Array[Float]])],
|
|
||||||
sparkJobThread: Thread): (Booster, Map[String, Array[Float]]) = {
|
|
||||||
if (trackerReturnVal == 0) {
|
|
||||||
// Copies of the final booster and the corresponding metrics
|
|
||||||
// reside in each partition of the `distributedBoostersAndMetrics`.
|
|
||||||
// Any of them can be used to create the model.
|
|
||||||
// it's safe to block here forever, as the tracker has returned successfully, and the Spark
|
|
||||||
// job should have finished, there is no reason for the thread cannot return
|
|
||||||
sparkJobThread.join()
|
|
||||||
val (booster, metrics) = distributedBoostersAndMetrics.first()
|
|
||||||
distributedBoostersAndMetrics.unpersist(false)
|
|
||||||
(booster, metrics)
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
if (sparkJobThread.isAlive) {
|
|
||||||
sparkJobThread.interrupt()
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case _: InterruptedException =>
|
|
||||||
logger.info("spark job thread is interrupted")
|
|
||||||
}
|
|
||||||
throw new XGBoostError("XGBoostModel training failed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class Watches private[scala] (
|
class Watches private[scala] (
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -16,18 +16,22 @@
|
|||||||
|
|
||||||
package ml.dmlc.xgboost4j.scala.spark.params
|
package ml.dmlc.xgboost4j.scala.spark.params
|
||||||
|
|
||||||
|
import ml.dmlc.xgboost4j.scala.spark
|
||||||
|
import org.apache.commons.logging.LogFactory
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.json4s.{DefaultFormats, JValue}
|
import org.json4s.{DefaultFormats, JValue}
|
||||||
import org.json4s.JsonAST.JObject
|
import org.json4s.JsonAST.JObject
|
||||||
import org.json4s.jackson.JsonMethods.{compact, parse, render}
|
import org.json4s.jackson.JsonMethods.{compact, parse, render}
|
||||||
|
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
import org.apache.spark.ml.param.{Param, Params}
|
import org.apache.spark.ml.param.Params
|
||||||
import org.apache.spark.ml.util.MLReader
|
import org.apache.spark.ml.util.MLReader
|
||||||
|
|
||||||
// This originates from apache-spark DefaultPramsReader copy paste
|
// This originates from apache-spark DefaultPramsReader copy paste
|
||||||
private[spark] object DefaultXGBoostParamsReader {
|
private[spark] object DefaultXGBoostParamsReader {
|
||||||
|
|
||||||
|
private val logger = LogFactory.getLog("XGBoostSpark")
|
||||||
|
|
||||||
private val paramNameCompatibilityMap: Map[String, String] = Map("silent" -> "verbosity")
|
private val paramNameCompatibilityMap: Map[String, String] = Map("silent" -> "verbosity")
|
||||||
|
|
||||||
private val paramValueCompatibilityMap: Map[String, Map[Any, Any]] =
|
private val paramValueCompatibilityMap: Map[String, Map[Any, Any]] =
|
||||||
@ -126,9 +130,16 @@ private[spark] object DefaultXGBoostParamsReader {
|
|||||||
metadata.params match {
|
metadata.params match {
|
||||||
case JObject(pairs) =>
|
case JObject(pairs) =>
|
||||||
pairs.foreach { case (paramName, jsonValue) =>
|
pairs.foreach { case (paramName, jsonValue) =>
|
||||||
val param = instance.getParam(handleBrokenlyChangedName(paramName))
|
val finalName = handleBrokenlyChangedName(paramName)
|
||||||
val value = param.jsonDecode(compact(render(jsonValue)))
|
// For the deleted parameters, we'd better to remove it instead of throwing an exception.
|
||||||
instance.set(param, handleBrokenlyChangedValue(paramName, value))
|
// So we need to check if the parameter exists instead of blindly setting it.
|
||||||
|
if (instance.hasParam(finalName)) {
|
||||||
|
val param = instance.getParam(finalName)
|
||||||
|
val value = param.jsonDecode(compact(render(jsonValue)))
|
||||||
|
instance.set(param, handleBrokenlyChangedValue(paramName, value))
|
||||||
|
} else {
|
||||||
|
logger.warn(s"$finalName is no longer used in ${spark.VERSION}")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case _ =>
|
case _ =>
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
|
|||||||
@ -105,14 +105,8 @@ private[spark] trait LearningTaskParams extends Params {
|
|||||||
|
|
||||||
final def getMaximizeEvaluationMetrics: Boolean = $(maximizeEvaluationMetrics)
|
final def getMaximizeEvaluationMetrics: Boolean = $(maximizeEvaluationMetrics)
|
||||||
|
|
||||||
/**
|
|
||||||
* whether killing SparkContext when training task fails
|
|
||||||
*/
|
|
||||||
final val killSparkContextOnWorkerFailure = new BooleanParam(this,
|
|
||||||
"killSparkContextOnWorkerFailure", "whether killing SparkContext when training task fails")
|
|
||||||
|
|
||||||
setDefault(objective -> "reg:squarederror", baseScore -> 0.5, trainTestRatio -> 1.0,
|
setDefault(objective -> "reg:squarederror", baseScore -> 0.5, trainTestRatio -> 1.0,
|
||||||
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false, killSparkContextOnWorkerFailure -> true)
|
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[spark] object LearningTaskParams {
|
private[spark] object LearningTaskParams {
|
||||||
|
|||||||
@ -1,175 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright (c) 2014 by Contributors
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.spark
|
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory
|
|
||||||
import org.apache.spark.scheduler._
|
|
||||||
|
|
||||||
import scala.collection.mutable.{HashMap, HashSet}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A tracker that ensures enough number of executor cores are alive.
|
|
||||||
* Throws an exception when the number of alive cores is less than nWorkers.
|
|
||||||
*
|
|
||||||
* @param sc The SparkContext object
|
|
||||||
* @param timeout The maximum time to wait for enough number of workers.
|
|
||||||
* @param numWorkers nWorkers used in an XGBoost Job
|
|
||||||
* @param killSparkContextOnWorkerFailure kill SparkContext or not when task fails
|
|
||||||
*/
|
|
||||||
class SparkParallelismTracker(
|
|
||||||
val sc: SparkContext,
|
|
||||||
timeout: Long,
|
|
||||||
numWorkers: Int,
|
|
||||||
killSparkContextOnWorkerFailure: Boolean = true) {
|
|
||||||
|
|
||||||
private[this] val requestedCores = numWorkers * sc.conf.getInt("spark.task.cpus", 1)
|
|
||||||
private[this] val logger = LogFactory.getLog("XGBoostSpark")
|
|
||||||
|
|
||||||
private[this] def numAliveCores: Int = {
|
|
||||||
sc.statusStore.executorList(true).map(_.totalCores).sum
|
|
||||||
}
|
|
||||||
|
|
||||||
private[this] def waitForCondition(
|
|
||||||
condition: => Boolean,
|
|
||||||
timeout: Long,
|
|
||||||
checkInterval: Long = 100L) = {
|
|
||||||
val waitImpl = new ((Long, Boolean) => Boolean) {
|
|
||||||
override def apply(waitedTime: Long, status: Boolean): Boolean = status match {
|
|
||||||
case s if s => true
|
|
||||||
case _ => waitedTime match {
|
|
||||||
case t if t < timeout =>
|
|
||||||
Thread.sleep(checkInterval)
|
|
||||||
apply(t + checkInterval, status = condition)
|
|
||||||
case _ => false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
waitImpl(0L, condition)
|
|
||||||
}
|
|
||||||
|
|
||||||
private[this] def safeExecute[T](body: => T): T = {
|
|
||||||
val listener = new TaskFailedListener(killSparkContextOnWorkerFailure)
|
|
||||||
sc.addSparkListener(listener)
|
|
||||||
try {
|
|
||||||
body
|
|
||||||
} finally {
|
|
||||||
sc.removeSparkListener(listener)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute a blocking function call with two checks on enough nWorkers:
|
|
||||||
* - Before the function starts, wait until there are enough executor cores.
|
|
||||||
* - During the execution, throws an exception if there is any executor lost.
|
|
||||||
*
|
|
||||||
* @param body A blocking function call
|
|
||||||
* @tparam T Return type
|
|
||||||
* @return The return of body
|
|
||||||
*/
|
|
||||||
def execute[T](body: => T): T = {
|
|
||||||
if (timeout <= 0) {
|
|
||||||
logger.info("starting training without setting timeout for waiting for resources")
|
|
||||||
safeExecute(body)
|
|
||||||
} else {
|
|
||||||
logger.info(s"starting training with timeout set as $timeout ms for waiting for resources")
|
|
||||||
if (!waitForCondition(numAliveCores >= requestedCores, timeout)) {
|
|
||||||
throw new IllegalStateException(s"Unable to get $requestedCores cores for XGBoost training")
|
|
||||||
}
|
|
||||||
safeExecute(body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class TaskFailedListener(killSparkContext: Boolean = true) extends SparkListener {
|
|
||||||
|
|
||||||
private[this] val logger = LogFactory.getLog("XGBoostTaskFailedListener")
|
|
||||||
|
|
||||||
// {jobId, [stageId0, stageId1, ...] }
|
|
||||||
// keep track of the mapping of job id and stage ids
|
|
||||||
// when a task fails, find the job id and stage id the task belongs to, finally
|
|
||||||
// cancel the jobs
|
|
||||||
private val jobIdToStageIds: HashMap[Int, HashSet[Int]] = HashMap.empty
|
|
||||||
|
|
||||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
|
|
||||||
if (!killSparkContext) {
|
|
||||||
jobStart.stageIds.foreach(stageId => {
|
|
||||||
jobIdToStageIds.getOrElseUpdate(jobStart.jobId, new HashSet[Int]()) += stageId
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
|
|
||||||
if (!killSparkContext) {
|
|
||||||
jobIdToStageIds.remove(jobEnd.jobId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
|
|
||||||
taskEnd.reason match {
|
|
||||||
case taskEndReason: TaskFailedReason =>
|
|
||||||
logger.error(s"Training Task Failed during XGBoost Training: " +
|
|
||||||
s"$taskEndReason")
|
|
||||||
if (killSparkContext) {
|
|
||||||
logger.error("killing SparkContext")
|
|
||||||
TaskFailedListener.startedSparkContextKiller()
|
|
||||||
} else {
|
|
||||||
val stageId = taskEnd.stageId
|
|
||||||
// find job ids according to stage id and then cancel the job
|
|
||||||
|
|
||||||
jobIdToStageIds.foreach {
|
|
||||||
case (jobId, stageIds) =>
|
|
||||||
if (stageIds.contains(stageId)) {
|
|
||||||
logger.error("Cancelling jobId:" + jobId)
|
|
||||||
jobIdToStageIds.remove(jobId)
|
|
||||||
SparkContext.getOrCreate().cancelJob(jobId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case _ =>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
object TaskFailedListener {
|
|
||||||
|
|
||||||
var killerStarted: Boolean = false
|
|
||||||
|
|
||||||
var sparkContextKiller: Thread = _
|
|
||||||
|
|
||||||
val sparkContextShutdownLock = new AnyRef
|
|
||||||
|
|
||||||
private def startedSparkContextKiller(): Unit = this.synchronized {
|
|
||||||
if (!killerStarted) {
|
|
||||||
killerStarted = true
|
|
||||||
// Spark does not allow ListenerThread to shutdown SparkContext so that we have to do it
|
|
||||||
// in a separate thread
|
|
||||||
sparkContextKiller = new Thread() {
|
|
||||||
override def run(): Unit = {
|
|
||||||
LiveListenerBus.withinListenerThread.withValue(false) {
|
|
||||||
sparkContextShutdownLock.synchronized {
|
|
||||||
SparkContext.getActive.foreach(_.stop())
|
|
||||||
killerStarted = false
|
|
||||||
sparkContextShutdownLock.notify()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sparkContextKiller.setDaemon(true)
|
|
||||||
sparkContextKiller.start()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1 +1 @@
|
|||||||
log4j.logger.org.apache.spark=ERROR
|
log4j.logger.org.apache.spark=ERROR
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -19,7 +19,7 @@ package ml.dmlc.xgboost4j.scala.spark
|
|||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, ExternalCheckpointManager, XGBoost => SXGBoost}
|
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, ExternalCheckpointManager, XGBoost => SXGBoost}
|
||||||
import org.scalatest.{FunSuite, Ignore}
|
import org.scalatest.FunSuite
|
||||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||||
|
|
||||||
class ExternalCheckpointManagerSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
class ExternalCheckpointManagerSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -16,10 +16,8 @@
|
|||||||
|
|
||||||
package ml.dmlc.xgboost4j.scala.spark
|
package ml.dmlc.xgboost4j.scala.spark
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
|
||||||
import org.apache.spark.Partitioner
|
import org.apache.spark.Partitioner
|
||||||
import org.apache.spark.ml.feature.VectorAssembler
|
import org.apache.spark.ml.feature.VectorAssembler
|
||||||
import org.apache.spark.sql.SparkSession
|
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
|
|
||||||
@ -53,7 +51,7 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest {
|
|||||||
"objective" -> "binary:logistic",
|
"objective" -> "binary:logistic",
|
||||||
"num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true, "missing" -> 0)
|
"num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true, "missing" -> 0)
|
||||||
import DataUtils._
|
import DataUtils._
|
||||||
val sparkSession = SparkSession.builder().getOrCreate()
|
val sparkSession = ss
|
||||||
import sparkSession.implicits._
|
import sparkSession.implicits._
|
||||||
val repartitioned = sc.parallelize(Synthetic.trainWithDiffFeatureSize, 2)
|
val repartitioned = sc.parallelize(Synthetic.trainWithDiffFeatureSize, 2)
|
||||||
.map(lp => (lp.label, lp)).partitionBy(
|
.map(lp => (lp.label, lp)).partitionBy(
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -16,14 +16,14 @@
|
|||||||
|
|
||||||
package ml.dmlc.xgboost4j.scala.spark
|
package ml.dmlc.xgboost4j.scala.spark
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
|
||||||
import org.apache.spark.ml.feature.VectorAssembler
|
import org.apache.spark.ml.feature.VectorAssembler
|
||||||
import org.apache.spark.ml.linalg.Vectors
|
import org.apache.spark.ml.linalg.Vectors
|
||||||
import org.apache.spark.sql.DataFrame
|
import org.apache.spark.sql.DataFrame
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
|
import org.apache.spark.SparkException
|
||||||
|
|
||||||
class MissingValueHandlingSuite extends FunSuite with PerTest {
|
class MissingValueHandlingSuite extends FunSuite with PerTest {
|
||||||
test("dense vectors containing missing value") {
|
test("dense vectors containing missing value") {
|
||||||
def buildDenseDataFrame(): DataFrame = {
|
def buildDenseDataFrame(): DataFrame = {
|
||||||
@ -113,7 +113,7 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
|
|||||||
val inputDF = vectorAssembler.transform(testDF).select("features", "label")
|
val inputDF = vectorAssembler.transform(testDF).select("features", "label")
|
||||||
val paramMap = List("eta" -> "1", "max_depth" -> "2",
|
val paramMap = List("eta" -> "1", "max_depth" -> "2",
|
||||||
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
|
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
|
||||||
intercept[XGBoostError] {
|
intercept[SparkException] {
|
||||||
new XGBoostClassifier(paramMap).fit(inputDF)
|
new XGBoostClassifier(paramMap).fit(inputDF)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -140,7 +140,7 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
|
|||||||
inputDF.show()
|
inputDF.show()
|
||||||
val paramMap = List("eta" -> "1", "max_depth" -> "2",
|
val paramMap = List("eta" -> "1", "max_depth" -> "2",
|
||||||
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
|
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
|
||||||
intercept[XGBoostError] {
|
intercept[SparkException] {
|
||||||
new XGBoostClassifier(paramMap).fit(inputDF)
|
new XGBoostClassifier(paramMap).fit(inputDF)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -16,9 +16,9 @@
|
|||||||
|
|
||||||
package ml.dmlc.xgboost4j.scala.spark
|
package ml.dmlc.xgboost4j.scala.spark
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
import org.scalatest.{BeforeAndAfterAll, FunSuite}
|
||||||
import org.scalatest.{BeforeAndAfterAll, FunSuite, Ignore}
|
|
||||||
|
|
||||||
|
import org.apache.spark.SparkException
|
||||||
import org.apache.spark.ml.param.ParamMap
|
import org.apache.spark.ml.param.ParamMap
|
||||||
|
|
||||||
class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
|
class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
|
||||||
@ -40,28 +40,16 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
|
|||||||
assert(xgbCopy2.MLlib2XGBoostParams("eval_metric").toString === "logloss")
|
assert(xgbCopy2.MLlib2XGBoostParams("eval_metric").toString === "logloss")
|
||||||
}
|
}
|
||||||
|
|
||||||
private def waitForSparkContextShutdown(): Unit = {
|
|
||||||
var totalWaitedTime = 0L
|
|
||||||
while (!ss.sparkContext.isStopped && totalWaitedTime <= 120000) {
|
|
||||||
Thread.sleep(10000)
|
|
||||||
totalWaitedTime += 10000
|
|
||||||
}
|
|
||||||
assert(ss.sparkContext.isStopped === true)
|
|
||||||
}
|
|
||||||
|
|
||||||
test("fail training elegantly with unsupported objective function") {
|
test("fail training elegantly with unsupported objective function") {
|
||||||
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
|
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
|
||||||
"objective" -> "wrong_objective_function", "num_class" -> "6", "num_round" -> 5,
|
"objective" -> "wrong_objective_function", "num_class" -> "6", "num_round" -> 5,
|
||||||
"num_workers" -> numWorkers)
|
"num_workers" -> numWorkers)
|
||||||
val trainingDF = buildDataFrame(MultiClassification.train)
|
val trainingDF = buildDataFrame(MultiClassification.train)
|
||||||
val xgb = new XGBoostClassifier(paramMap)
|
val xgb = new XGBoostClassifier(paramMap)
|
||||||
try {
|
intercept[SparkException] {
|
||||||
val model = xgb.fit(trainingDF)
|
xgb.fit(trainingDF)
|
||||||
} catch {
|
|
||||||
case e: Throwable => // swallow anything
|
|
||||||
} finally {
|
|
||||||
waitForSparkContextShutdown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test("fail training elegantly with unsupported eval metrics") {
|
test("fail training elegantly with unsupported eval metrics") {
|
||||||
@ -70,12 +58,8 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
|
|||||||
"num_workers" -> numWorkers, "eval_metric" -> "wrong_eval_metrics")
|
"num_workers" -> numWorkers, "eval_metric" -> "wrong_eval_metrics")
|
||||||
val trainingDF = buildDataFrame(MultiClassification.train)
|
val trainingDF = buildDataFrame(MultiClassification.train)
|
||||||
val xgb = new XGBoostClassifier(paramMap)
|
val xgb = new XGBoostClassifier(paramMap)
|
||||||
try {
|
intercept[SparkException] {
|
||||||
val model = xgb.fit(trainingDF)
|
xgb.fit(trainingDF)
|
||||||
} catch {
|
|
||||||
case e: Throwable => // swallow anything
|
|
||||||
} finally {
|
|
||||||
waitForSparkContextShutdown()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -19,7 +19,7 @@ package ml.dmlc.xgboost4j.scala.spark
|
|||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||||
import org.apache.spark.{SparkConf, SparkContext, TaskFailedListener}
|
import org.apache.spark.SparkContext
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.scalatest.{BeforeAndAfterEach, FunSuite}
|
import org.scalatest.{BeforeAndAfterEach, FunSuite}
|
||||||
|
|
||||||
@ -40,32 +40,16 @@ trait PerTest extends BeforeAndAfterEach { self: FunSuite =>
|
|||||||
.appName("XGBoostSuite")
|
.appName("XGBoostSuite")
|
||||||
.config("spark.ui.enabled", false)
|
.config("spark.ui.enabled", false)
|
||||||
.config("spark.driver.memory", "512m")
|
.config("spark.driver.memory", "512m")
|
||||||
|
.config("spark.barrier.sync.timeout", 10)
|
||||||
.config("spark.task.cpus", 1)
|
.config("spark.task.cpus", 1)
|
||||||
|
|
||||||
override def beforeEach(): Unit = getOrCreateSession
|
override def beforeEach(): Unit = getOrCreateSession
|
||||||
|
|
||||||
override def afterEach() {
|
override def afterEach() {
|
||||||
TaskFailedListener.sparkContextShutdownLock.synchronized {
|
if (currentSession != null) {
|
||||||
if (currentSession != null) {
|
currentSession.stop()
|
||||||
// this synchronization is mostly for the tests involving SparkContext shutdown
|
cleanExternalCache(currentSession.sparkContext.appName)
|
||||||
// for unit test involving the sparkContext shutdown there are two different events sequence
|
currentSession = null
|
||||||
// 1. SparkContext killer is executed before afterEach, in this case, before SparkContext
|
|
||||||
// is fully stopped, afterEach() will block at the following code block
|
|
||||||
// 2. SparkContext killer is executed afterEach, in this case, currentSession.stop() in will
|
|
||||||
// block to wait for all msgs in ListenerBus get processed. Because currentSession.stop()
|
|
||||||
// has been called, SparkContext killer will not take effect
|
|
||||||
while (TaskFailedListener.killerStarted) {
|
|
||||||
TaskFailedListener.sparkContextShutdownLock.wait()
|
|
||||||
}
|
|
||||||
currentSession.stop()
|
|
||||||
cleanExternalCache(currentSession.sparkContext.appName)
|
|
||||||
currentSession = null
|
|
||||||
}
|
|
||||||
if (TaskFailedListener.sparkContextKiller != null) {
|
|
||||||
TaskFailedListener.sparkContextKiller.interrupt()
|
|
||||||
TaskFailedListener.sparkContextKiller = null
|
|
||||||
}
|
|
||||||
TaskFailedListener.killerStarted = false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014,2021 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -16,10 +16,8 @@
|
|||||||
|
|
||||||
package ml.dmlc.xgboost4j.scala.spark
|
package ml.dmlc.xgboost4j.scala.spark
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.java.Rabit
|
|
||||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
|
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -16,13 +16,12 @@
|
|||||||
|
|
||||||
package ml.dmlc.xgboost4j.scala.spark
|
package ml.dmlc.xgboost4j.scala.spark
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||||
import ml.dmlc.xgboost4j.scala.DMatrix
|
import ml.dmlc.xgboost4j.scala.DMatrix
|
||||||
|
|
||||||
import org.apache.spark.TaskContext
|
import org.apache.spark.{SparkException, TaskContext}
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
import org.apache.spark.ml.feature.VectorAssembler
|
import org.apache.spark.ml.feature.VectorAssembler
|
||||||
@ -375,13 +374,14 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
|||||||
|
|
||||||
test("throw exception for empty partition in trainingset") {
|
test("throw exception for empty partition in trainingset") {
|
||||||
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
|
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
|
||||||
"objective" -> "multi:softmax", "num_class" -> "2", "num_round" -> 5,
|
"objective" -> "binary:logistic", "num_class" -> "2", "num_round" -> 5,
|
||||||
"num_workers" -> numWorkers, "tree_method" -> "auto")
|
"num_workers" -> numWorkers, "tree_method" -> "auto", "allow_non_zero_for_missing" -> true)
|
||||||
// The Dmatrix will be empty
|
// The Dmatrix will be empty
|
||||||
val trainingDF = buildDataFrame(Seq(XGBLabeledPoint(1.0f, 1, Array(), Array())))
|
val trainingDF = buildDataFrame(Seq(XGBLabeledPoint(1.0f, 4,
|
||||||
|
Array(0, 1, 2, 3), Array(0, 1, 2, 3))))
|
||||||
val xgb = new XGBoostClassifier(paramMap)
|
val xgb = new XGBoostClassifier(paramMap)
|
||||||
intercept[XGBoostError] {
|
intercept[SparkException] {
|
||||||
val model = xgb.fit(trainingDF)
|
xgb.fit(trainingDF)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 by Contributors
|
Copyright (c) 2014-2022 by Contributors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -16,14 +16,15 @@
|
|||||||
|
|
||||||
package ml.dmlc.xgboost4j.scala.spark
|
package ml.dmlc.xgboost4j.scala.spark
|
||||||
|
|
||||||
import ml.dmlc.xgboost4j.java.{Rabit, XGBoostError}
|
import ml.dmlc.xgboost4j.java.Rabit
|
||||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
|
import ml.dmlc.xgboost4j.scala.Booster
|
||||||
import org.apache.spark.TaskFailedListener
|
|
||||||
import org.apache.spark.SparkException
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
|
import org.apache.spark.SparkException
|
||||||
|
|
||||||
class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
||||||
val predictionErrorMin = 0.00001f
|
val predictionErrorMin = 0.00001f
|
||||||
val maxFailure = 2;
|
val maxFailure = 2;
|
||||||
@ -33,15 +34,6 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
|||||||
.config("spark.kryo.classesToRegister", classOf[Booster].getName)
|
.config("spark.kryo.classesToRegister", classOf[Booster].getName)
|
||||||
.master(s"local[${numWorkers},${maxFailure}]")
|
.master(s"local[${numWorkers},${maxFailure}]")
|
||||||
|
|
||||||
private def waitAndCheckSparkShutdown(waitMiliSec: Int): Boolean = {
|
|
||||||
var totalWaitedTime = 0L
|
|
||||||
while (!ss.sparkContext.isStopped && totalWaitedTime <= waitMiliSec) {
|
|
||||||
Thread.sleep(10)
|
|
||||||
totalWaitedTime += 10
|
|
||||||
}
|
|
||||||
return ss.sparkContext.isStopped
|
|
||||||
}
|
|
||||||
|
|
||||||
test("test classification prediction parity w/o ring reduce") {
|
test("test classification prediction parity w/o ring reduce") {
|
||||||
val training = buildDataFrame(Classification.train)
|
val training = buildDataFrame(Classification.train)
|
||||||
val testDF = buildDataFrame(Classification.test)
|
val testDF = buildDataFrame(Classification.test)
|
||||||
@ -91,14 +83,11 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
test("test rabit timeout fail handle") {
|
test("test rabit timeout fail handle") {
|
||||||
// disable spark kill listener to verify if rabit_timeout take effect and kill tasks
|
|
||||||
TaskFailedListener.killerStarted = true
|
|
||||||
|
|
||||||
val training = buildDataFrame(Classification.train)
|
val training = buildDataFrame(Classification.train)
|
||||||
// mock rank 0 failure during 8th allreduce synchronization
|
// mock rank 0 failure during 8th allreduce synchronization
|
||||||
Rabit.mockList = Array("0,8,0,0").toList.asJava
|
Rabit.mockList = Array("0,8,0,0").toList.asJava
|
||||||
|
|
||||||
try {
|
intercept[SparkException] {
|
||||||
new XGBoostClassifier(Map(
|
new XGBoostClassifier(Map(
|
||||||
"eta" -> "0.1",
|
"eta" -> "0.1",
|
||||||
"max_depth" -> "10",
|
"max_depth" -> "10",
|
||||||
@ -108,37 +97,7 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
|||||||
"num_workers" -> numWorkers,
|
"num_workers" -> numWorkers,
|
||||||
"rabit_timeout" -> 0))
|
"rabit_timeout" -> 0))
|
||||||
.fit(training)
|
.fit(training)
|
||||||
} catch {
|
|
||||||
case e: Throwable => // swallow anything
|
|
||||||
} finally {
|
|
||||||
// assume all tasks throw exception almost same time
|
|
||||||
// 100ms should be enough to exhaust all retries
|
|
||||||
assert(waitAndCheckSparkShutdown(100) == true)
|
|
||||||
TaskFailedListener.killerStarted = false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("test SparkContext should not be killed ") {
|
|
||||||
val training = buildDataFrame(Classification.train)
|
|
||||||
// mock rank 0 failure during 8th allreduce synchronization
|
|
||||||
Rabit.mockList = Array("0,8,0,0").toList.asJava
|
|
||||||
|
|
||||||
try {
|
|
||||||
new XGBoostClassifier(Map(
|
|
||||||
"eta" -> "0.1",
|
|
||||||
"max_depth" -> "10",
|
|
||||||
"verbosity" -> "1",
|
|
||||||
"objective" -> "binary:logistic",
|
|
||||||
"num_round" -> 5,
|
|
||||||
"num_workers" -> numWorkers,
|
|
||||||
"kill_spark_context_on_worker_failure" -> false,
|
|
||||||
"rabit_timeout" -> 0))
|
|
||||||
.fit(training)
|
|
||||||
} catch {
|
|
||||||
case e: Throwable => // swallow anything
|
|
||||||
} finally {
|
|
||||||
// wait 3s to check if SparkContext is killed
|
|
||||||
assert(waitAndCheckSparkShutdown(3000) == false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,7 +21,6 @@ import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost}
|
|||||||
import org.apache.spark.ml.linalg.{Vector, Vectors}
|
import org.apache.spark.ml.linalg.{Vector, Vectors}
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
import org.apache.spark.sql.{DataFrame, Row}
|
import org.apache.spark.sql.{DataFrame, Row}
|
||||||
import org.apache.spark.sql.types._
|
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
import org.apache.spark.ml.feature.VectorAssembler
|
import org.apache.spark.ml.feature.VectorAssembler
|
||||||
|
|||||||
@ -1,151 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright (c) 2014 by Contributors
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.spark
|
|
||||||
|
|
||||||
import org.scalatest.FunSuite
|
|
||||||
import _root_.ml.dmlc.xgboost4j.scala.spark.PerTest
|
|
||||||
import org.apache.spark.rdd.RDD
|
|
||||||
import org.apache.spark.sql.SparkSession
|
|
||||||
|
|
||||||
import scala.math.min
|
|
||||||
|
|
||||||
class SparkParallelismTrackerSuite extends FunSuite with PerTest {
|
|
||||||
|
|
||||||
val numParallelism: Int = min(Runtime.getRuntime.availableProcessors(), 4)
|
|
||||||
|
|
||||||
override protected def sparkSessionBuilder: SparkSession.Builder = SparkSession.builder()
|
|
||||||
.master(s"local[${numParallelism}]")
|
|
||||||
.appName("XGBoostSuite")
|
|
||||||
.config("spark.ui.enabled", true)
|
|
||||||
.config("spark.driver.memory", "512m")
|
|
||||||
.config("spark.task.cpus", 1)
|
|
||||||
|
|
||||||
private def waitAndCheckSparkShutdown(waitMiliSec: Int): Boolean = {
|
|
||||||
var totalWaitedTime = 0L
|
|
||||||
while (!ss.sparkContext.isStopped && totalWaitedTime <= waitMiliSec) {
|
|
||||||
Thread.sleep(100)
|
|
||||||
totalWaitedTime += 100
|
|
||||||
}
|
|
||||||
ss.sparkContext.isStopped
|
|
||||||
}
|
|
||||||
|
|
||||||
test("tracker should not affect execution result when timeout is not larger than 0") {
|
|
||||||
val nWorkers = numParallelism
|
|
||||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
|
|
||||||
val tracker = new SparkParallelismTracker(sc, 10000, nWorkers)
|
|
||||||
val disabledTracker = new SparkParallelismTracker(sc, 0, nWorkers)
|
|
||||||
assert(tracker.execute(rdd.sum()) == rdd.sum())
|
|
||||||
assert(disabledTracker.execute(rdd.sum()) == rdd.sum())
|
|
||||||
}
|
|
||||||
|
|
||||||
test("tracker should throw exception if parallelism is not sufficient") {
|
|
||||||
val nWorkers = numParallelism * 3
|
|
||||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
|
|
||||||
val tracker = new SparkParallelismTracker(sc, 1000, nWorkers)
|
|
||||||
intercept[IllegalStateException] {
|
|
||||||
tracker.execute {
|
|
||||||
rdd.map { i =>
|
|
||||||
// Test interruption
|
|
||||||
Thread.sleep(Long.MaxValue)
|
|
||||||
i
|
|
||||||
}.sum()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
test("tracker should throw exception if parallelism is not sufficient with" +
|
|
||||||
" spark.task.cpus larger than 1") {
|
|
||||||
sc.conf.set("spark.task.cpus", "2")
|
|
||||||
val nWorkers = numParallelism
|
|
||||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
|
|
||||||
val tracker = new SparkParallelismTracker(sc, 1000, nWorkers)
|
|
||||||
intercept[IllegalStateException] {
|
|
||||||
tracker.execute {
|
|
||||||
rdd.map { i =>
|
|
||||||
// Test interruption
|
|
||||||
Thread.sleep(Long.MaxValue)
|
|
||||||
i
|
|
||||||
}.sum()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
test("tracker should not kill SparkContext when killSparkContextOnWorkerFailure=false") {
|
|
||||||
val nWorkers = numParallelism
|
|
||||||
val tracker = new SparkParallelismTracker(sc, 0, nWorkers, false)
|
|
||||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers, nWorkers)
|
|
||||||
try {
|
|
||||||
tracker.execute {
|
|
||||||
rdd.map { i =>
|
|
||||||
val partitionId = TaskContext.get().partitionId()
|
|
||||||
if (partitionId == 0) {
|
|
||||||
throw new RuntimeException("mocking task failing")
|
|
||||||
}
|
|
||||||
i
|
|
||||||
}.sum()
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case e: Exception => // catch the exception
|
|
||||||
} finally {
|
|
||||||
// wait 3s to check if SparkContext is killed
|
|
||||||
assert(waitAndCheckSparkShutdown(3000) == false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
test("tracker should cancel the correct job when killSparkContextOnWorkerFailure=false") {
|
|
||||||
val nWorkers = 2
|
|
||||||
val tracker = new SparkParallelismTracker(sc, 0, nWorkers, false)
|
|
||||||
val rdd: RDD[Int] = sc.parallelize(1 to 10, nWorkers)
|
|
||||||
val thread = new TestThread(sc)
|
|
||||||
thread.start()
|
|
||||||
try {
|
|
||||||
tracker.execute {
|
|
||||||
rdd.map { i =>
|
|
||||||
Thread.sleep(100)
|
|
||||||
val partitionId = TaskContext.get().partitionId()
|
|
||||||
if (partitionId == 0) {
|
|
||||||
throw new RuntimeException("mocking task failing")
|
|
||||||
}
|
|
||||||
i
|
|
||||||
}.sum()
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case e: Exception => // catch the exception
|
|
||||||
} finally {
|
|
||||||
thread.join(8000)
|
|
||||||
// wait 3s to check if SparkContext is killed
|
|
||||||
assert(waitAndCheckSparkShutdown(3000) == false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private[this] class TestThread(sc: SparkContext) extends Thread {
|
|
||||||
override def run(): Unit = {
|
|
||||||
var sum: Double = 0.0f
|
|
||||||
try {
|
|
||||||
val rdd = sc.parallelize(1 to 4, 2)
|
|
||||||
sum = rdd.mapPartitions(iter => {
|
|
||||||
// sleep 2s to ensure task is alive when cancelling other jobs
|
|
||||||
Thread.sleep(2000)
|
|
||||||
iter
|
|
||||||
}).sum()
|
|
||||||
} finally {
|
|
||||||
// get the correct result
|
|
||||||
assert(sum.toInt == 10)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -100,7 +100,7 @@ class NativeLibLoader {
|
|||||||
});
|
});
|
||||||
|
|
||||||
return muslRelatedMemoryMappedFilename.isPresent();
|
return muslRelatedMemoryMappedFilename.isPresent();
|
||||||
} catch (IOException ignored) {
|
} catch (Exception ignored) {
|
||||||
// ignored
|
// ignored
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user