Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d92a7d936 | ||
|
|
c2508814ff | ||
|
|
b1b6246e35 | ||
|
|
f4eb6b984e |
@@ -1,5 +1,5 @@
|
||||
cmake_minimum_required(VERSION 3.14 FATAL_ERROR)
|
||||
project(xgboost LANGUAGES CXX C VERSION 1.6.0)
|
||||
project(xgboost LANGUAGES CXX C VERSION 1.6.1)
|
||||
include(cmake/Utils.cmake)
|
||||
list(APPEND CMAKE_MODULE_PATH "${xgboost_SOURCE_DIR}/cmake/modules")
|
||||
cmake_policy(SET CMP0022 NEW)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#############################################
|
||||
XGBoost4J-Spark-GPU Tutorial (version 1.6.0+)
|
||||
XGBoost4J-Spark-GPU Tutorial (version 1.6.1+)
|
||||
#############################################
|
||||
|
||||
**XGBoost4J-Spark-GPU** is an open source library aiming to accelerate distributed XGBoost training on Apache Spark cluster from
|
||||
@@ -220,7 +220,7 @@ application jar is iris-1.0.0.jar
|
||||
|
||||
cudf_version=22.02.0
|
||||
rapids_version=22.02.0
|
||||
xgboost_version=1.6.0
|
||||
xgboost_version=1.6.1
|
||||
main_class=Iris
|
||||
app_jar=iris-1.0.0.jar
|
||||
|
||||
|
||||
@@ -16,12 +16,6 @@ This tutorial is to cover the end-to-end process to build a machine learning pip
|
||||
* Building a Machine Learning Pipeline with XGBoost4J-Spark
|
||||
* Running XGBoost4J-Spark in Production
|
||||
|
||||
.. note::
|
||||
|
||||
**SparkContext will be stopped by default when XGBoost training task fails**.
|
||||
|
||||
XGBoost4J-Spark 1.2.0+ exposes a parameter **kill_spark_context_on_worker_failure**. Set **kill_spark_context_on_worker_failure** to **false** so that the SparkContext will not be stopping on training failure. Instead of stopping the SparkContext, XGBoost4J-Spark will throw an exception instead. Users who want to re-use the SparkContext should wrap the training code in a try-catch block.
|
||||
|
||||
.. contents::
|
||||
:backlinks: none
|
||||
:local:
|
||||
@@ -129,7 +123,7 @@ labels. A DataFrame like this (containing vector-represented features and numeri
|
||||
|
||||
.. note::
|
||||
|
||||
There is no need to assemble feature columns from version 1.6.0+. Instead, users can specify an array of
|
||||
There is no need to assemble feature columns from version 1.6.1+. Instead, users can specify an array of
|
||||
feture column names by ``setFeaturesCol(value: Array[String])`` and XGBoost4j-Spark will do it.
|
||||
|
||||
Dealing with missing values
|
||||
|
||||
@@ -74,23 +74,20 @@ Optimal Partitioning
|
||||
.. versionadded:: 1.6
|
||||
|
||||
Optimal partitioning is a technique for partitioning the categorical predictors for each
|
||||
node split, the proof of optimality for numerical objectives like ``RMSE`` was first
|
||||
introduced by `[1] <#references>`__. The algorithm is used in decision trees for handling
|
||||
regression and binary classification tasks `[2] <#references>`__, later LightGBM `[3]
|
||||
<#references>`__ brought it to the context of gradient boosting trees and now is also
|
||||
adopted in XGBoost as an optional feature for handling categorical splits. More
|
||||
specifically, the proof by Fisher `[1] <#references>`__ states that, when trying to
|
||||
partition a set of discrete values into groups based on the distances between a measure of
|
||||
these values, one only needs to look at sorted partitions instead of enumerating all
|
||||
possible permutations. In the context of decision trees, the discrete values are
|
||||
categories, and the measure is the output leaf value. Intuitively, we want to group the
|
||||
categories that output similar leaf values. During split finding, we first sort the
|
||||
gradient histogram to prepare the contiguous partitions then enumerate the splits
|
||||
node split, the proof of optimality for numerical output was first introduced by `[1]
|
||||
<#references>`__. The algorithm is used in decision trees `[2] <#references>`__, later
|
||||
LightGBM `[3] <#references>`__ brought it to the context of gradient boosting trees and
|
||||
now is also adopted in XGBoost as an optional feature for handling categorical
|
||||
splits. More specifically, the proof by Fisher `[1] <#references>`__ states that, when
|
||||
trying to partition a set of discrete values into groups based on the distances between a
|
||||
measure of these values, one only needs to look at sorted partitions instead of
|
||||
enumerating all possible permutations. In the context of decision trees, the discrete
|
||||
values are categories, and the measure is the output leaf value. Intuitively, we want to
|
||||
group the categories that output similar leaf values. During split finding, we first sort
|
||||
the gradient histogram to prepare the contiguous partitions then enumerate the splits
|
||||
according to these sorted values. One of the related parameters for XGBoost is
|
||||
``max_cat_to_one_hot``, which controls whether one-hot encoding or partitioning should be
|
||||
used for each feature, see :doc:`/parameter` for details. When objective is not
|
||||
regression or binary classification, XGBoost will fallback to using onehot encoding
|
||||
instead.
|
||||
used for each feature, see :doc:`/parameter` for details.
|
||||
|
||||
|
||||
**********************
|
||||
|
||||
@@ -36,10 +36,6 @@ struct ObjInfo {
|
||||
|
||||
explicit ObjInfo(Task t) : task{t} {}
|
||||
ObjInfo(Task t, bool khess) : task{t}, const_hess{khess} {}
|
||||
|
||||
XGBOOST_DEVICE bool UseOneHot() const {
|
||||
return (task != ObjInfo::kRegression && task != ObjInfo::kBinary);
|
||||
}
|
||||
};
|
||||
} // namespace xgboost
|
||||
#endif // XGBOOST_TASK_H_
|
||||
|
||||
@@ -6,6 +6,6 @@
|
||||
|
||||
#define XGBOOST_VER_MAJOR 1
|
||||
#define XGBOOST_VER_MINOR 6
|
||||
#define XGBOOST_VER_PATCH 0
|
||||
#define XGBOOST_VER_PATCH 1
|
||||
|
||||
#endif // XGBOOST_VERSION_CONFIG_H_
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>XGBoost JVM Package</name>
|
||||
<description>JVM Package for XGBoost</description>
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-example_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
<packaging>jar</packaging>
|
||||
<build>
|
||||
<plugins>
|
||||
@@ -26,7 +26,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-spark_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
@@ -37,7 +37,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-flink_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-flink_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
@@ -26,7 +26,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-gpu_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -69,7 +69,7 @@ public class BoosterTest {
|
||||
.hasHeader().build();
|
||||
|
||||
int maxBin = 16;
|
||||
int round = 100;
|
||||
int round = 10;
|
||||
//set params
|
||||
Map<String, Object> paramMap = new HashMap<String, Object>() {
|
||||
{
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-spark-gpu_2.12</artifactId>
|
||||
<build>
|
||||
@@ -24,7 +24,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-gpu_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
|
||||
@@ -56,18 +56,20 @@ class GpuPreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
||||
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||
*
|
||||
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
|
||||
* @param dataset the training data
|
||||
* @param params all user defined and defaulted params
|
||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
||||
* RDD[Watches] will be used as the training input
|
||||
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||
* Boolean if building DMatrix in rabit context
|
||||
* RDD[() => Watches] will be used as the training input
|
||||
* Option[ RDD[_] ] is the optional cached RDD
|
||||
*/
|
||||
override def buildDatasetToRDD(estimator: Estimator[_],
|
||||
dataset: Dataset[_],
|
||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
||||
params: Map[String, Any]):
|
||||
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
|
||||
}
|
||||
|
||||
@@ -116,19 +118,21 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
||||
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||
*
|
||||
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
||||
* @param dataset the training data
|
||||
* @param params all user defined and defaulted params
|
||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
||||
* RDD[Watches] will be used as the training input
|
||||
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||
* Boolean if building DMatrix in rabit context
|
||||
* RDD[() => Watches] will be used as the training input to build DMatrix
|
||||
* Option[ RDD[_] ] is the optional cached RDD
|
||||
*/
|
||||
override def buildDatasetToRDD(
|
||||
estimator: Estimator[_],
|
||||
dataset: Dataset[_],
|
||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
||||
params: Map[String, Any]):
|
||||
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||
|
||||
val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
|
||||
estimator match {
|
||||
@@ -166,7 +170,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
||||
xgbExecParams: XGBoostExecutionParams =>
|
||||
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
|
||||
xgbExecParams.cacheTrainingSet)
|
||||
(buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
|
||||
(true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -403,14 +407,9 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
private def repartitionInputData(dataFrame: DataFrame, nWorkers: Int): DataFrame = {
|
||||
// We can't check dataFrame.rdd.getNumPartitions == nWorkers here, since dataFrame.rdd is
|
||||
// a lazy variable. If we call it here, we will not directly extract RDD[Table] again,
|
||||
// instead, we will involve Columnar -> Row -> Columnar and decrease the performance
|
||||
if (nWorkers == 1) {
|
||||
dataFrame.coalesce(1)
|
||||
} else {
|
||||
dataFrame.repartition(nWorkers)
|
||||
}
|
||||
// we can't involve any coalesce operation here, since Barrier mode will check
|
||||
// the RDD patterns which does not allow coalesce.
|
||||
dataFrame.repartition(nWorkers)
|
||||
}
|
||||
|
||||
private def repartitionForGroup(
|
||||
@@ -448,7 +447,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
||||
private def buildRDDWatches(
|
||||
dataMap: Map[String, ColumnDataBatch],
|
||||
xgbExeParams: XGBoostExecutionParams,
|
||||
noEvalSet: Boolean): RDD[Watches] = {
|
||||
noEvalSet: Boolean): RDD[() => Watches] = {
|
||||
|
||||
val sc = dataMap(TRAIN_NAME).rawDF.sparkSession.sparkContext
|
||||
val maxBin = xgbExeParams.toMap.getOrElse("max_bin", 256).asInstanceOf[Int]
|
||||
@@ -459,7 +458,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
||||
GpuUtils.toColumnarRdd(dataMap(TRAIN_NAME).rawDF).mapPartitions({
|
||||
iter =>
|
||||
val iterColBatch = iter.map(table => new GpuColumnBatch(table, null))
|
||||
Iterator(buildWatches(
|
||||
Iterator(() => buildWatches(
|
||||
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
|
||||
colIndicesForTrain, iterColBatch, maxBin))
|
||||
})
|
||||
@@ -469,7 +468,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
|
||||
val nameAndColIndices = dataMap.map(nc => (nc._1, nc._2.colIndices))
|
||||
coPartitionForGpu(dataMap, sc, xgbExeParams.numWorkers).mapPartitions {
|
||||
nameAndColumnBatchIter =>
|
||||
Iterator(buildWatchesWithEval(
|
||||
Iterator(() => buildWatchesWithEval(
|
||||
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
|
||||
nameAndColIndices, nameAndColumnBatchIter, maxBin))
|
||||
}
|
||||
|
||||
@@ -39,13 +39,8 @@ trait GpuTestSuite extends FunSuite with TmpFolderSuite {
|
||||
|
||||
def enableCsvConf(): SparkConf = {
|
||||
new SparkConf()
|
||||
.set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true")
|
||||
.set(RapidsConf.ENABLE_READ_CSV_BYTES.key, "true")
|
||||
.set(RapidsConf.ENABLE_READ_CSV_SHORTS.key, "true")
|
||||
.set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")
|
||||
.set(RapidsConf.ENABLE_READ_CSV_LONGS.key, "true")
|
||||
.set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")
|
||||
.set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true")
|
||||
.set("spark.rapids.sql.csv.read.float.enabled", "true")
|
||||
.set("spark.rapids.sql.csv.read.double.enabled", "true")
|
||||
}
|
||||
|
||||
def withGpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
|
||||
@@ -246,12 +241,13 @@ object SparkSessionHolder extends Logging {
|
||||
Locale.setDefault(Locale.US)
|
||||
|
||||
val builder = SparkSession.builder()
|
||||
.master("local[1]")
|
||||
.master("local[2]")
|
||||
.config("spark.sql.adaptive.enabled", "false")
|
||||
.config("spark.rapids.sql.enabled", "false")
|
||||
.config("spark.rapids.sql.test.enabled", "false")
|
||||
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
|
||||
.config("spark.rapids.memory.gpu.pooling.enabled", "false") // Disable RMM for unit tests.
|
||||
.config("spark.sql.files.maxPartitionBytes", "1000")
|
||||
.appName("XGBoost4j-Spark-Gpu unit test")
|
||||
|
||||
builder.getOrCreate()
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-spark_2.12</artifactId>
|
||||
<build>
|
||||
@@ -24,7 +24,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
|
||||
@@ -96,19 +96,21 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
||||
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||
*
|
||||
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
||||
* @param dataset the training data
|
||||
* @param params all user defined and defaulted params
|
||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
||||
* RDD[Watches] will be used as the training input
|
||||
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||
* Boolean if building DMatrix in rabit context
|
||||
* RDD[() => Watches] will be used as the training input
|
||||
* Option[RDD[_]\] is the optional cached RDD
|
||||
*/
|
||||
override def buildDatasetToRDD(
|
||||
estimator: Estimator[_],
|
||||
dataset: Dataset[_],
|
||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
||||
params: Map[String, Any]): XGBoostExecutionParams =>
|
||||
(Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||
|
||||
if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
|
||||
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
|
||||
@@ -170,12 +172,12 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||
} else None
|
||||
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
case Right(trainingData) =>
|
||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||
} else None
|
||||
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -311,17 +313,18 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
|
||||
|
||||
/**
|
||||
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[Watches]
|
||||
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[() => Watches]
|
||||
*
|
||||
* @param trainingSet the input training RDD[XGBLabeledPoint]
|
||||
* @param evalRDDMap the eval set
|
||||
* @param hasGroup if has group
|
||||
* @return function to build (RDD[Watches], the cached RDD)
|
||||
* @return function to build (RDD[() => Watches], the cached RDD)
|
||||
*/
|
||||
private[spark] def buildRDDLabeledPointToRDDWatches(
|
||||
trainingSet: RDD[XGBLabeledPoint],
|
||||
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
|
||||
hasGroup: Boolean = false): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
|
||||
hasGroup: Boolean = false):
|
||||
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
|
||||
|
||||
xgbExecParams: XGBoostExecutionParams =>
|
||||
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
|
||||
@@ -329,12 +332,12 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||
} else None
|
||||
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
case Right(trainingData) =>
|
||||
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
|
||||
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
|
||||
} else None
|
||||
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -374,34 +377,34 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Build RDD[Watches] for Ranking
|
||||
* Build RDD[() => Watches] for Ranking
|
||||
* @param trainingData the training data RDD
|
||||
* @param xgbExecutionParams xgboost execution params
|
||||
* @param evalSetsMap the eval RDD
|
||||
* @return RDD[Watches]
|
||||
* @return RDD[() => Watches]
|
||||
*/
|
||||
private def trainForRanking(
|
||||
trainingData: RDD[Array[XGBLabeledPoint]],
|
||||
xgbExecutionParam: XGBoostExecutionParams,
|
||||
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
|
||||
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
|
||||
if (evalSetsMap.isEmpty) {
|
||||
trainingData.mapPartitions(labeledPointGroups => {
|
||||
val watches = Watches.buildWatchesWithGroup(xgbExecutionParam,
|
||||
val buildWatches = () => Watches.buildWatchesWithGroup(xgbExecutionParam,
|
||||
DataUtils.processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing,
|
||||
xgbExecutionParam.allowNonZeroForMissing),
|
||||
getCacheDirName(xgbExecutionParam.useExternalMemory))
|
||||
Iterator.single(watches)
|
||||
Iterator.single(buildWatches)
|
||||
}).cache()
|
||||
} else {
|
||||
coPartitionGroupSets(trainingData, evalSetsMap, xgbExecutionParam.numWorkers).mapPartitions(
|
||||
labeledPointGroupSets => {
|
||||
val watches = Watches.buildWatchesWithGroup(
|
||||
val buildWatches = () => Watches.buildWatchesWithGroup(
|
||||
labeledPointGroupSets.map {
|
||||
case (name, iter) => (name, DataUtils.processMissingValuesWithGroup(iter,
|
||||
xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing))
|
||||
},
|
||||
getCacheDirName(xgbExecutionParam.useExternalMemory))
|
||||
Iterator.single(watches)
|
||||
Iterator.single(buildWatches)
|
||||
}).cache()
|
||||
}
|
||||
}
|
||||
@@ -462,35 +465,35 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Build RDD[Watches] for Non-Ranking
|
||||
* Build RDD[() => Watches] for Non-Ranking
|
||||
* @param trainingData the training data RDD
|
||||
* @param xgbExecutionParams xgboost execution params
|
||||
* @param evalSetsMap the eval RDD
|
||||
* @return RDD[Watches]
|
||||
* @return RDD[() => Watches]
|
||||
*/
|
||||
private def trainForNonRanking(
|
||||
trainingData: RDD[XGBLabeledPoint],
|
||||
xgbExecutionParams: XGBoostExecutionParams,
|
||||
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
|
||||
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
|
||||
if (evalSetsMap.isEmpty) {
|
||||
trainingData.mapPartitions { labeledPoints => {
|
||||
val watches = Watches.buildWatches(xgbExecutionParams,
|
||||
val buildWatches = () => Watches.buildWatches(xgbExecutionParams,
|
||||
DataUtils.processMissingValues(labeledPoints, xgbExecutionParams.missing,
|
||||
xgbExecutionParams.allowNonZeroForMissing),
|
||||
getCacheDirName(xgbExecutionParams.useExternalMemory))
|
||||
Iterator.single(watches)
|
||||
Iterator.single(buildWatches)
|
||||
}}.cache()
|
||||
} else {
|
||||
coPartitionNoGroupSets(trainingData, evalSetsMap, xgbExecutionParams.numWorkers).
|
||||
mapPartitions {
|
||||
nameAndLabeledPointSets =>
|
||||
val watches = Watches.buildWatches(
|
||||
val buildWatches = () => Watches.buildWatches(
|
||||
nameAndLabeledPointSets.map {
|
||||
case (name, iter) => (name, DataUtils.processMissingValues(iter,
|
||||
xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing))
|
||||
},
|
||||
getCacheDirName(xgbExecutionParams.useExternalMemory))
|
||||
Iterator.single(watches)
|
||||
Iterator.single(buildWatches)
|
||||
}.cache()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2021 by Contributors
|
||||
Copyright (c) 2021-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -45,19 +45,21 @@ private[scala] trait PreXGBoostProvider {
|
||||
def transformSchema(xgboostEstimator: XGBoostEstimatorCommon, schema: StructType): StructType
|
||||
|
||||
/**
|
||||
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
|
||||
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
|
||||
*
|
||||
* @param estimator supports XGBoostClassifier and XGBoostRegressor
|
||||
* @param dataset the training data
|
||||
* @param params all user defined and defaulted params
|
||||
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
|
||||
* RDD[Watches] will be used as the training input
|
||||
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
|
||||
* Boolean if building DMatrix in rabit context
|
||||
* RDD[() => Watches] will be used as the training input to build DMatrix
|
||||
* Option[ RDD[_] ] is the optional cached RDD
|
||||
*/
|
||||
def buildDatasetToRDD(
|
||||
estimator: Estimator[_],
|
||||
dataset: Dataset[_],
|
||||
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]])
|
||||
params: Map[String, Any]):
|
||||
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]])
|
||||
|
||||
/**
|
||||
* Transform Dataset
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.io.File
|
||||
import scala.collection.mutable
|
||||
import scala.util.Random
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
|
||||
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
|
||||
import ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams
|
||||
@@ -30,8 +31,9 @@ import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.commons.logging.LogFactory
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.{SparkContext, SparkParallelismTracker, TaskContext}
|
||||
import org.apache.spark.{SparkContext, TaskContext}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
/**
|
||||
@@ -79,8 +81,7 @@ private[scala] case class XGBoostExecutionParams(
|
||||
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
|
||||
cacheTrainingSet: Boolean,
|
||||
treeMethod: Option[String],
|
||||
isLocal: Boolean,
|
||||
killSparkContextOnWorkerFailure: Boolean) {
|
||||
isLocal: Boolean) {
|
||||
|
||||
private var rawParamMap: Map[String, Any] = _
|
||||
|
||||
@@ -224,9 +225,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
|
||||
val cacheTrainingSet = overridedParams.getOrElse("cache_training_set", false)
|
||||
.asInstanceOf[Boolean]
|
||||
|
||||
val killSparkContext = overridedParams.getOrElse("kill_spark_context_on_worker_failure", true)
|
||||
.asInstanceOf[Boolean]
|
||||
|
||||
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
|
||||
missing, allowNonZeroForMissing, trackerConf,
|
||||
timeoutRequestWorkers,
|
||||
@@ -235,8 +233,7 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
|
||||
xgbExecEarlyStoppingParams,
|
||||
cacheTrainingSet,
|
||||
treeMethod,
|
||||
isLocal,
|
||||
killSparkContext)
|
||||
isLocal)
|
||||
xgbExecParam.setRawParamMap(overridedParams)
|
||||
xgbExecParam
|
||||
}
|
||||
@@ -283,13 +280,8 @@ object XGBoost extends Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
private def buildDistributedBooster(
|
||||
watches: Watches,
|
||||
xgbExecutionParam: XGBoostExecutionParams,
|
||||
rabitEnv: java.util.Map[String, String],
|
||||
obj: ObjectiveTrait,
|
||||
eval: EvalTrait,
|
||||
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {
|
||||
private def buildWatchesAndCheck(buildWatchesFun: () => Watches): Watches = {
|
||||
val watches = buildWatchesFun()
|
||||
// to workaround the empty partitions in training dataset,
|
||||
// this might not be the best efficient implementation, see
|
||||
// (https://github.com/dmlc/xgboost/issues/1277)
|
||||
@@ -298,14 +290,39 @@ object XGBoost extends Serializable {
|
||||
s"detected an empty partition in the training data, partition ID:" +
|
||||
s" ${TaskContext.getPartitionId()}")
|
||||
}
|
||||
watches
|
||||
}
|
||||
|
||||
private def buildDistributedBooster(
|
||||
buildDMatrixInRabit: Boolean,
|
||||
buildWatches: () => Watches,
|
||||
xgbExecutionParam: XGBoostExecutionParams,
|
||||
rabitEnv: java.util.Map[String, String],
|
||||
obj: ObjectiveTrait,
|
||||
eval: EvalTrait,
|
||||
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {
|
||||
|
||||
var watches: Watches = null
|
||||
if (!buildDMatrixInRabit) {
|
||||
// for CPU pipeline, we need to build DMatrix out of rabit context
|
||||
watches = buildWatchesAndCheck(buildWatches)
|
||||
}
|
||||
|
||||
val taskId = TaskContext.getPartitionId().toString
|
||||
val attempt = TaskContext.get().attemptNumber.toString
|
||||
rabitEnv.put("DMLC_TASK_ID", taskId)
|
||||
rabitEnv.put("DMLC_NUM_ATTEMPT", attempt)
|
||||
val numRounds = xgbExecutionParam.numRounds
|
||||
val makeCheckpoint = xgbExecutionParam.checkpointParam.isDefined && taskId.toInt == 0
|
||||
|
||||
try {
|
||||
Rabit.init(rabitEnv)
|
||||
|
||||
if (buildDMatrixInRabit) {
|
||||
// for GPU pipeline, we need to move dmatrix building into rabit context
|
||||
watches = buildWatchesAndCheck(buildWatches)
|
||||
}
|
||||
|
||||
val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds
|
||||
val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds))
|
||||
val externalCheckpointParams = xgbExecutionParam.checkpointParam
|
||||
@@ -331,14 +348,18 @@ object XGBoost extends Serializable {
|
||||
watches.toMap, metrics, obj, eval,
|
||||
earlyStoppingRound = numEarlyStoppingRounds, prevBooster)
|
||||
}
|
||||
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
|
||||
if (TaskContext.get().partitionId() == 0) {
|
||||
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
|
||||
} else {
|
||||
Iterator.empty
|
||||
}
|
||||
} catch {
|
||||
case xgbException: XGBoostError =>
|
||||
logger.error(s"XGBooster worker $taskId has failed $attempt times due to ", xgbException)
|
||||
throw xgbException
|
||||
} finally {
|
||||
Rabit.shutdown()
|
||||
watches.delete()
|
||||
if (watches != null) watches.delete()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -364,7 +385,7 @@ object XGBoost extends Serializable {
|
||||
@throws(classOf[XGBoostError])
|
||||
private[spark] def trainDistributed(
|
||||
sc: SparkContext,
|
||||
buildTrainingData: XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]),
|
||||
buildTrainingData: XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]),
|
||||
params: Map[String, Any]):
|
||||
(Booster, Map[String, Array[Float]]) = {
|
||||
|
||||
@@ -383,50 +404,36 @@ object XGBoost extends Serializable {
|
||||
}.orNull
|
||||
|
||||
// Get the training data RDD and the cachedRDD
|
||||
val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
|
||||
val (buildDMatrixInRabit, trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
|
||||
|
||||
try {
|
||||
// Train for every ${savingRound} rounds and save the partially completed booster
|
||||
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
|
||||
val (booster, metrics) = try {
|
||||
val parallelismTracker = new SparkParallelismTracker(sc,
|
||||
xgbExecParams.timeoutRequestWorkers,
|
||||
xgbExecParams.numWorkers,
|
||||
xgbExecParams.killSparkContextOnWorkerFailure)
|
||||
|
||||
tracker.getWorkerEnvs().putAll(xgbRabitParams)
|
||||
val rabitEnv = tracker.getWorkerEnvs
|
||||
|
||||
val boostersAndMetrics = trainingRDD.mapPartitions { iter => {
|
||||
var optionWatches: Option[Watches] = None
|
||||
val boostersAndMetrics = trainingRDD.barrier().mapPartitions { iter => {
|
||||
var optionWatches: Option[() => Watches] = None
|
||||
|
||||
// take the first Watches to train
|
||||
if (iter.hasNext) {
|
||||
optionWatches = Some(iter.next())
|
||||
}
|
||||
|
||||
optionWatches.map { watches => buildDistributedBooster(watches, xgbExecParams, rabitEnv,
|
||||
xgbExecParams.obj, xgbExecParams.eval, prevBooster)}
|
||||
optionWatches.map { buildWatches => buildDistributedBooster(buildDMatrixInRabit,
|
||||
buildWatches, xgbExecParams, rabitEnv, xgbExecParams.obj,
|
||||
xgbExecParams.eval, prevBooster)}
|
||||
.getOrElse(throw new RuntimeException("No Watches to train"))
|
||||
|
||||
}}.cache()
|
||||
|
||||
val sparkJobThread = new Thread() {
|
||||
override def run() {
|
||||
// force the job
|
||||
boostersAndMetrics.foreachPartition(() => _)
|
||||
}
|
||||
}
|
||||
sparkJobThread.setUncaughtExceptionHandler(tracker)
|
||||
|
||||
val trackerReturnVal = parallelismTracker.execute {
|
||||
sparkJobThread.start()
|
||||
tracker.waitFor(0L)
|
||||
}
|
||||
}}
|
||||
|
||||
val (booster, metrics) = boostersAndMetrics.collect()(0)
|
||||
val trackerReturnVal = tracker.waitFor(0L)
|
||||
logger.info(s"Rabit returns with exit code $trackerReturnVal")
|
||||
val (booster, metrics) = postTrackerReturnProcessing(trackerReturnVal,
|
||||
boostersAndMetrics, sparkJobThread)
|
||||
if (trackerReturnVal != 0) {
|
||||
throw new XGBoostError("XGBoostModel training failed.")
|
||||
}
|
||||
(booster, metrics)
|
||||
} finally {
|
||||
tracker.stop()
|
||||
@@ -446,42 +453,12 @@ object XGBoost extends Serializable {
|
||||
case t: Throwable =>
|
||||
// if the job was aborted due to an exception
|
||||
logger.error("the job was aborted due to ", t)
|
||||
if (xgbExecParams.killSparkContextOnWorkerFailure) {
|
||||
sc.stop()
|
||||
}
|
||||
throw t
|
||||
} finally {
|
||||
optionalCachedRDD.foreach(_.unpersist())
|
||||
}
|
||||
}
|
||||
|
||||
private def postTrackerReturnProcessing(
|
||||
trackerReturnVal: Int,
|
||||
distributedBoostersAndMetrics: RDD[(Booster, Map[String, Array[Float]])],
|
||||
sparkJobThread: Thread): (Booster, Map[String, Array[Float]]) = {
|
||||
if (trackerReturnVal == 0) {
|
||||
// Copies of the final booster and the corresponding metrics
|
||||
// reside in each partition of the `distributedBoostersAndMetrics`.
|
||||
// Any of them can be used to create the model.
|
||||
// it's safe to block here forever, as the tracker has returned successfully, and the Spark
|
||||
// job should have finished, there is no reason for the thread cannot return
|
||||
sparkJobThread.join()
|
||||
val (booster, metrics) = distributedBoostersAndMetrics.first()
|
||||
distributedBoostersAndMetrics.unpersist(false)
|
||||
(booster, metrics)
|
||||
} else {
|
||||
try {
|
||||
if (sparkJobThread.isAlive) {
|
||||
sparkJobThread.interrupt()
|
||||
}
|
||||
} catch {
|
||||
case _: InterruptedException =>
|
||||
logger.info("spark job thread is interrupted")
|
||||
}
|
||||
throw new XGBoostError("XGBoostModel training failed")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class Watches private[scala] (
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,18 +16,22 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark.params
|
||||
|
||||
import ml.dmlc.xgboost4j.scala.spark
|
||||
import org.apache.commons.logging.LogFactory
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.json4s.{DefaultFormats, JValue}
|
||||
import org.json4s.JsonAST.JObject
|
||||
import org.json4s.jackson.JsonMethods.{compact, parse, render}
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.ml.param.{Param, Params}
|
||||
import org.apache.spark.ml.param.Params
|
||||
import org.apache.spark.ml.util.MLReader
|
||||
|
||||
// This originates from apache-spark DefaultPramsReader copy paste
|
||||
private[spark] object DefaultXGBoostParamsReader {
|
||||
|
||||
private val logger = LogFactory.getLog("XGBoostSpark")
|
||||
|
||||
private val paramNameCompatibilityMap: Map[String, String] = Map("silent" -> "verbosity")
|
||||
|
||||
private val paramValueCompatibilityMap: Map[String, Map[Any, Any]] =
|
||||
@@ -126,9 +130,16 @@ private[spark] object DefaultXGBoostParamsReader {
|
||||
metadata.params match {
|
||||
case JObject(pairs) =>
|
||||
pairs.foreach { case (paramName, jsonValue) =>
|
||||
val param = instance.getParam(handleBrokenlyChangedName(paramName))
|
||||
val value = param.jsonDecode(compact(render(jsonValue)))
|
||||
instance.set(param, handleBrokenlyChangedValue(paramName, value))
|
||||
val finalName = handleBrokenlyChangedName(paramName)
|
||||
// For the deleted parameters, we'd better to remove it instead of throwing an exception.
|
||||
// So we need to check if the parameter exists instead of blindly setting it.
|
||||
if (instance.hasParam(finalName)) {
|
||||
val param = instance.getParam(finalName)
|
||||
val value = param.jsonDecode(compact(render(jsonValue)))
|
||||
instance.set(param, handleBrokenlyChangedValue(paramName, value))
|
||||
} else {
|
||||
logger.warn(s"$finalName is no longer used in ${spark.VERSION}")
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
|
||||
@@ -105,14 +105,8 @@ private[spark] trait LearningTaskParams extends Params {
|
||||
|
||||
final def getMaximizeEvaluationMetrics: Boolean = $(maximizeEvaluationMetrics)
|
||||
|
||||
/**
|
||||
* whether killing SparkContext when training task fails
|
||||
*/
|
||||
final val killSparkContextOnWorkerFailure = new BooleanParam(this,
|
||||
"killSparkContextOnWorkerFailure", "whether killing SparkContext when training task fails")
|
||||
|
||||
setDefault(objective -> "reg:squarederror", baseScore -> 0.5, trainTestRatio -> 1.0,
|
||||
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false, killSparkContextOnWorkerFailure -> true)
|
||||
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false)
|
||||
}
|
||||
|
||||
private[spark] object LearningTaskParams {
|
||||
|
||||
@@ -1,175 +0,0 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark
|
||||
|
||||
import org.apache.commons.logging.LogFactory
|
||||
import org.apache.spark.scheduler._
|
||||
|
||||
import scala.collection.mutable.{HashMap, HashSet}
|
||||
|
||||
/**
|
||||
* A tracker that ensures enough number of executor cores are alive.
|
||||
* Throws an exception when the number of alive cores is less than nWorkers.
|
||||
*
|
||||
* @param sc The SparkContext object
|
||||
* @param timeout The maximum time to wait for enough number of workers.
|
||||
* @param numWorkers nWorkers used in an XGBoost Job
|
||||
* @param killSparkContextOnWorkerFailure kill SparkContext or not when task fails
|
||||
*/
|
||||
class SparkParallelismTracker(
|
||||
val sc: SparkContext,
|
||||
timeout: Long,
|
||||
numWorkers: Int,
|
||||
killSparkContextOnWorkerFailure: Boolean = true) {
|
||||
|
||||
private[this] val requestedCores = numWorkers * sc.conf.getInt("spark.task.cpus", 1)
|
||||
private[this] val logger = LogFactory.getLog("XGBoostSpark")
|
||||
|
||||
private[this] def numAliveCores: Int = {
|
||||
sc.statusStore.executorList(true).map(_.totalCores).sum
|
||||
}
|
||||
|
||||
private[this] def waitForCondition(
|
||||
condition: => Boolean,
|
||||
timeout: Long,
|
||||
checkInterval: Long = 100L) = {
|
||||
val waitImpl = new ((Long, Boolean) => Boolean) {
|
||||
override def apply(waitedTime: Long, status: Boolean): Boolean = status match {
|
||||
case s if s => true
|
||||
case _ => waitedTime match {
|
||||
case t if t < timeout =>
|
||||
Thread.sleep(checkInterval)
|
||||
apply(t + checkInterval, status = condition)
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
}
|
||||
waitImpl(0L, condition)
|
||||
}
|
||||
|
||||
private[this] def safeExecute[T](body: => T): T = {
|
||||
val listener = new TaskFailedListener(killSparkContextOnWorkerFailure)
|
||||
sc.addSparkListener(listener)
|
||||
try {
|
||||
body
|
||||
} finally {
|
||||
sc.removeSparkListener(listener)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a blocking function call with two checks on enough nWorkers:
|
||||
* - Before the function starts, wait until there are enough executor cores.
|
||||
* - During the execution, throws an exception if there is any executor lost.
|
||||
*
|
||||
* @param body A blocking function call
|
||||
* @tparam T Return type
|
||||
* @return The return of body
|
||||
*/
|
||||
def execute[T](body: => T): T = {
|
||||
if (timeout <= 0) {
|
||||
logger.info("starting training without setting timeout for waiting for resources")
|
||||
safeExecute(body)
|
||||
} else {
|
||||
logger.info(s"starting training with timeout set as $timeout ms for waiting for resources")
|
||||
if (!waitForCondition(numAliveCores >= requestedCores, timeout)) {
|
||||
throw new IllegalStateException(s"Unable to get $requestedCores cores for XGBoost training")
|
||||
}
|
||||
safeExecute(body)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TaskFailedListener(killSparkContext: Boolean = true) extends SparkListener {
|
||||
|
||||
private[this] val logger = LogFactory.getLog("XGBoostTaskFailedListener")
|
||||
|
||||
// {jobId, [stageId0, stageId1, ...] }
|
||||
// keep track of the mapping of job id and stage ids
|
||||
// when a task fails, find the job id and stage id the task belongs to, finally
|
||||
// cancel the jobs
|
||||
private val jobIdToStageIds: HashMap[Int, HashSet[Int]] = HashMap.empty
|
||||
|
||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
|
||||
if (!killSparkContext) {
|
||||
jobStart.stageIds.foreach(stageId => {
|
||||
jobIdToStageIds.getOrElseUpdate(jobStart.jobId, new HashSet[Int]()) += stageId
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
|
||||
if (!killSparkContext) {
|
||||
jobIdToStageIds.remove(jobEnd.jobId)
|
||||
}
|
||||
}
|
||||
|
||||
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
|
||||
taskEnd.reason match {
|
||||
case taskEndReason: TaskFailedReason =>
|
||||
logger.error(s"Training Task Failed during XGBoost Training: " +
|
||||
s"$taskEndReason")
|
||||
if (killSparkContext) {
|
||||
logger.error("killing SparkContext")
|
||||
TaskFailedListener.startedSparkContextKiller()
|
||||
} else {
|
||||
val stageId = taskEnd.stageId
|
||||
// find job ids according to stage id and then cancel the job
|
||||
|
||||
jobIdToStageIds.foreach {
|
||||
case (jobId, stageIds) =>
|
||||
if (stageIds.contains(stageId)) {
|
||||
logger.error("Cancelling jobId:" + jobId)
|
||||
jobIdToStageIds.remove(jobId)
|
||||
SparkContext.getOrCreate().cancelJob(jobId)
|
||||
}
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object TaskFailedListener {
|
||||
|
||||
var killerStarted: Boolean = false
|
||||
|
||||
var sparkContextKiller: Thread = _
|
||||
|
||||
val sparkContextShutdownLock = new AnyRef
|
||||
|
||||
private def startedSparkContextKiller(): Unit = this.synchronized {
|
||||
if (!killerStarted) {
|
||||
killerStarted = true
|
||||
// Spark does not allow ListenerThread to shutdown SparkContext so that we have to do it
|
||||
// in a separate thread
|
||||
sparkContextKiller = new Thread() {
|
||||
override def run(): Unit = {
|
||||
LiveListenerBus.withinListenerThread.withValue(false) {
|
||||
sparkContextShutdownLock.synchronized {
|
||||
SparkContext.getActive.foreach(_.stop())
|
||||
killerStarted = false
|
||||
sparkContextShutdownLock.notify()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sparkContextKiller.setDaemon(true)
|
||||
sparkContextKiller.start()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
log4j.logger.org.apache.spark=ERROR
|
||||
log4j.logger.org.apache.spark=ERROR
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -19,7 +19,7 @@ package ml.dmlc.xgboost4j.scala.spark
|
||||
import java.io.File
|
||||
|
||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, ExternalCheckpointManager, XGBoost => SXGBoost}
|
||||
import org.scalatest.{FunSuite, Ignore}
|
||||
import org.scalatest.FunSuite
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
|
||||
class ExternalCheckpointManagerSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,10 +16,8 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
||||
import org.apache.spark.Partitioner
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.scalatest.FunSuite
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
@@ -53,7 +51,7 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest {
|
||||
"objective" -> "binary:logistic",
|
||||
"num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true, "missing" -> 0)
|
||||
import DataUtils._
|
||||
val sparkSession = SparkSession.builder().getOrCreate()
|
||||
val sparkSession = ss
|
||||
import sparkSession.implicits._
|
||||
val repartitioned = sc.parallelize(Synthetic.trainWithDiffFeatureSize, 2)
|
||||
.map(lp => (lp.label, lp)).partitionBy(
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,14 +16,14 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
import org.apache.spark.ml.linalg.Vectors
|
||||
import org.apache.spark.sql.DataFrame
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
import org.apache.spark.SparkException
|
||||
|
||||
class MissingValueHandlingSuite extends FunSuite with PerTest {
|
||||
test("dense vectors containing missing value") {
|
||||
def buildDenseDataFrame(): DataFrame = {
|
||||
@@ -113,7 +113,7 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
|
||||
val inputDF = vectorAssembler.transform(testDF).select("features", "label")
|
||||
val paramMap = List("eta" -> "1", "max_depth" -> "2",
|
||||
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
|
||||
intercept[XGBoostError] {
|
||||
intercept[SparkException] {
|
||||
new XGBoostClassifier(paramMap).fit(inputDF)
|
||||
}
|
||||
}
|
||||
@@ -140,7 +140,7 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
|
||||
inputDF.show()
|
||||
val paramMap = List("eta" -> "1", "max_depth" -> "2",
|
||||
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
|
||||
intercept[XGBoostError] {
|
||||
intercept[SparkException] {
|
||||
new XGBoostClassifier(paramMap).fit(inputDF)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,9 +16,9 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
||||
import org.scalatest.{BeforeAndAfterAll, FunSuite, Ignore}
|
||||
import org.scalatest.{BeforeAndAfterAll, FunSuite}
|
||||
|
||||
import org.apache.spark.SparkException
|
||||
import org.apache.spark.ml.param.ParamMap
|
||||
|
||||
class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
|
||||
@@ -40,28 +40,16 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
|
||||
assert(xgbCopy2.MLlib2XGBoostParams("eval_metric").toString === "logloss")
|
||||
}
|
||||
|
||||
private def waitForSparkContextShutdown(): Unit = {
|
||||
var totalWaitedTime = 0L
|
||||
while (!ss.sparkContext.isStopped && totalWaitedTime <= 120000) {
|
||||
Thread.sleep(10000)
|
||||
totalWaitedTime += 10000
|
||||
}
|
||||
assert(ss.sparkContext.isStopped === true)
|
||||
}
|
||||
|
||||
test("fail training elegantly with unsupported objective function") {
|
||||
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
|
||||
"objective" -> "wrong_objective_function", "num_class" -> "6", "num_round" -> 5,
|
||||
"num_workers" -> numWorkers)
|
||||
val trainingDF = buildDataFrame(MultiClassification.train)
|
||||
val xgb = new XGBoostClassifier(paramMap)
|
||||
try {
|
||||
val model = xgb.fit(trainingDF)
|
||||
} catch {
|
||||
case e: Throwable => // swallow anything
|
||||
} finally {
|
||||
waitForSparkContextShutdown()
|
||||
intercept[SparkException] {
|
||||
xgb.fit(trainingDF)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
test("fail training elegantly with unsupported eval metrics") {
|
||||
@@ -70,12 +58,8 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
|
||||
"num_workers" -> numWorkers, "eval_metric" -> "wrong_eval_metrics")
|
||||
val trainingDF = buildDataFrame(MultiClassification.train)
|
||||
val xgb = new XGBoostClassifier(paramMap)
|
||||
try {
|
||||
val model = xgb.fit(trainingDF)
|
||||
} catch {
|
||||
case e: Throwable => // swallow anything
|
||||
} finally {
|
||||
waitForSparkContextShutdown()
|
||||
intercept[SparkException] {
|
||||
xgb.fit(trainingDF)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -19,7 +19,7 @@ package ml.dmlc.xgboost4j.scala.spark
|
||||
import java.io.File
|
||||
|
||||
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||
import org.apache.spark.{SparkConf, SparkContext, TaskFailedListener}
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql._
|
||||
import org.scalatest.{BeforeAndAfterEach, FunSuite}
|
||||
|
||||
@@ -40,32 +40,16 @@ trait PerTest extends BeforeAndAfterEach { self: FunSuite =>
|
||||
.appName("XGBoostSuite")
|
||||
.config("spark.ui.enabled", false)
|
||||
.config("spark.driver.memory", "512m")
|
||||
.config("spark.barrier.sync.timeout", 10)
|
||||
.config("spark.task.cpus", 1)
|
||||
|
||||
override def beforeEach(): Unit = getOrCreateSession
|
||||
|
||||
override def afterEach() {
|
||||
TaskFailedListener.sparkContextShutdownLock.synchronized {
|
||||
if (currentSession != null) {
|
||||
// this synchronization is mostly for the tests involving SparkContext shutdown
|
||||
// for unit test involving the sparkContext shutdown there are two different events sequence
|
||||
// 1. SparkContext killer is executed before afterEach, in this case, before SparkContext
|
||||
// is fully stopped, afterEach() will block at the following code block
|
||||
// 2. SparkContext killer is executed afterEach, in this case, currentSession.stop() in will
|
||||
// block to wait for all msgs in ListenerBus get processed. Because currentSession.stop()
|
||||
// has been called, SparkContext killer will not take effect
|
||||
while (TaskFailedListener.killerStarted) {
|
||||
TaskFailedListener.sparkContextShutdownLock.wait()
|
||||
}
|
||||
currentSession.stop()
|
||||
cleanExternalCache(currentSession.sparkContext.appName)
|
||||
currentSession = null
|
||||
}
|
||||
if (TaskFailedListener.sparkContextKiller != null) {
|
||||
TaskFailedListener.sparkContextKiller.interrupt()
|
||||
TaskFailedListener.sparkContextKiller = null
|
||||
}
|
||||
TaskFailedListener.killerStarted = false
|
||||
if (currentSession != null) {
|
||||
currentSession.stop()
|
||||
cleanExternalCache(currentSession.sparkContext.appName)
|
||||
currentSession = null
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014,2021 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,10 +16,8 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.java.Rabit
|
||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import org.apache.spark.sql._
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,13 +16,12 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
||||
import scala.util.Random
|
||||
|
||||
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||
import ml.dmlc.xgboost4j.scala.DMatrix
|
||||
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.{SparkException, TaskContext}
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
@@ -375,13 +374,14 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||
|
||||
test("throw exception for empty partition in trainingset") {
|
||||
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
|
||||
"objective" -> "multi:softmax", "num_class" -> "2", "num_round" -> 5,
|
||||
"num_workers" -> numWorkers, "tree_method" -> "auto")
|
||||
"objective" -> "binary:logistic", "num_class" -> "2", "num_round" -> 5,
|
||||
"num_workers" -> numWorkers, "tree_method" -> "auto", "allow_non_zero_for_missing" -> true)
|
||||
// The Dmatrix will be empty
|
||||
val trainingDF = buildDataFrame(Seq(XGBLabeledPoint(1.0f, 1, Array(), Array())))
|
||||
val trainingDF = buildDataFrame(Seq(XGBLabeledPoint(1.0f, 4,
|
||||
Array(0, 1, 2, 3), Array(0, 1, 2, 3))))
|
||||
val xgb = new XGBoostClassifier(paramMap)
|
||||
intercept[XGBoostError] {
|
||||
val model = xgb.fit(trainingDF)
|
||||
intercept[SparkException] {
|
||||
xgb.fit(trainingDF)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,14 +16,15 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.java.{Rabit, XGBoostError}
|
||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
|
||||
import org.apache.spark.TaskFailedListener
|
||||
import org.apache.spark.SparkException
|
||||
import ml.dmlc.xgboost4j.java.Rabit
|
||||
import ml.dmlc.xgboost4j.scala.Booster
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.spark.sql._
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.SparkException
|
||||
|
||||
class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
||||
val predictionErrorMin = 0.00001f
|
||||
val maxFailure = 2;
|
||||
@@ -33,15 +34,6 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
||||
.config("spark.kryo.classesToRegister", classOf[Booster].getName)
|
||||
.master(s"local[${numWorkers},${maxFailure}]")
|
||||
|
||||
private def waitAndCheckSparkShutdown(waitMiliSec: Int): Boolean = {
|
||||
var totalWaitedTime = 0L
|
||||
while (!ss.sparkContext.isStopped && totalWaitedTime <= waitMiliSec) {
|
||||
Thread.sleep(10)
|
||||
totalWaitedTime += 10
|
||||
}
|
||||
return ss.sparkContext.isStopped
|
||||
}
|
||||
|
||||
test("test classification prediction parity w/o ring reduce") {
|
||||
val training = buildDataFrame(Classification.train)
|
||||
val testDF = buildDataFrame(Classification.test)
|
||||
@@ -91,14 +83,11 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
||||
}
|
||||
|
||||
test("test rabit timeout fail handle") {
|
||||
// disable spark kill listener to verify if rabit_timeout take effect and kill tasks
|
||||
TaskFailedListener.killerStarted = true
|
||||
|
||||
val training = buildDataFrame(Classification.train)
|
||||
// mock rank 0 failure during 8th allreduce synchronization
|
||||
Rabit.mockList = Array("0,8,0,0").toList.asJava
|
||||
|
||||
try {
|
||||
intercept[SparkException] {
|
||||
new XGBoostClassifier(Map(
|
||||
"eta" -> "0.1",
|
||||
"max_depth" -> "10",
|
||||
@@ -108,37 +97,7 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
||||
"num_workers" -> numWorkers,
|
||||
"rabit_timeout" -> 0))
|
||||
.fit(training)
|
||||
} catch {
|
||||
case e: Throwable => // swallow anything
|
||||
} finally {
|
||||
// assume all tasks throw exception almost same time
|
||||
// 100ms should be enough to exhaust all retries
|
||||
assert(waitAndCheckSparkShutdown(100) == true)
|
||||
TaskFailedListener.killerStarted = false
|
||||
}
|
||||
}
|
||||
|
||||
test("test SparkContext should not be killed ") {
|
||||
val training = buildDataFrame(Classification.train)
|
||||
// mock rank 0 failure during 8th allreduce synchronization
|
||||
Rabit.mockList = Array("0,8,0,0").toList.asJava
|
||||
|
||||
try {
|
||||
new XGBoostClassifier(Map(
|
||||
"eta" -> "0.1",
|
||||
"max_depth" -> "10",
|
||||
"verbosity" -> "1",
|
||||
"objective" -> "binary:logistic",
|
||||
"num_round" -> 5,
|
||||
"num_workers" -> numWorkers,
|
||||
"kill_spark_context_on_worker_failure" -> false,
|
||||
"rabit_timeout" -> 0))
|
||||
.fit(training)
|
||||
} catch {
|
||||
case e: Throwable => // swallow anything
|
||||
} finally {
|
||||
// wait 3s to check if SparkContext is killed
|
||||
assert(waitAndCheckSparkShutdown(3000) == false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost}
|
||||
import org.apache.spark.ml.linalg.{Vector, Vectors}
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.{DataFrame, Row}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
|
||||
@@ -1,151 +0,0 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import _root_.ml.dmlc.xgboost4j.scala.spark.PerTest
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import scala.math.min
|
||||
|
||||
class SparkParallelismTrackerSuite extends FunSuite with PerTest {
|
||||
|
||||
val numParallelism: Int = min(Runtime.getRuntime.availableProcessors(), 4)
|
||||
|
||||
override protected def sparkSessionBuilder: SparkSession.Builder = SparkSession.builder()
|
||||
.master(s"local[${numParallelism}]")
|
||||
.appName("XGBoostSuite")
|
||||
.config("spark.ui.enabled", true)
|
||||
.config("spark.driver.memory", "512m")
|
||||
.config("spark.task.cpus", 1)
|
||||
|
||||
private def waitAndCheckSparkShutdown(waitMiliSec: Int): Boolean = {
|
||||
var totalWaitedTime = 0L
|
||||
while (!ss.sparkContext.isStopped && totalWaitedTime <= waitMiliSec) {
|
||||
Thread.sleep(100)
|
||||
totalWaitedTime += 100
|
||||
}
|
||||
ss.sparkContext.isStopped
|
||||
}
|
||||
|
||||
test("tracker should not affect execution result when timeout is not larger than 0") {
|
||||
val nWorkers = numParallelism
|
||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
|
||||
val tracker = new SparkParallelismTracker(sc, 10000, nWorkers)
|
||||
val disabledTracker = new SparkParallelismTracker(sc, 0, nWorkers)
|
||||
assert(tracker.execute(rdd.sum()) == rdd.sum())
|
||||
assert(disabledTracker.execute(rdd.sum()) == rdd.sum())
|
||||
}
|
||||
|
||||
test("tracker should throw exception if parallelism is not sufficient") {
|
||||
val nWorkers = numParallelism * 3
|
||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
|
||||
val tracker = new SparkParallelismTracker(sc, 1000, nWorkers)
|
||||
intercept[IllegalStateException] {
|
||||
tracker.execute {
|
||||
rdd.map { i =>
|
||||
// Test interruption
|
||||
Thread.sleep(Long.MaxValue)
|
||||
i
|
||||
}.sum()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("tracker should throw exception if parallelism is not sufficient with" +
|
||||
" spark.task.cpus larger than 1") {
|
||||
sc.conf.set("spark.task.cpus", "2")
|
||||
val nWorkers = numParallelism
|
||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
|
||||
val tracker = new SparkParallelismTracker(sc, 1000, nWorkers)
|
||||
intercept[IllegalStateException] {
|
||||
tracker.execute {
|
||||
rdd.map { i =>
|
||||
// Test interruption
|
||||
Thread.sleep(Long.MaxValue)
|
||||
i
|
||||
}.sum()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("tracker should not kill SparkContext when killSparkContextOnWorkerFailure=false") {
|
||||
val nWorkers = numParallelism
|
||||
val tracker = new SparkParallelismTracker(sc, 0, nWorkers, false)
|
||||
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers, nWorkers)
|
||||
try {
|
||||
tracker.execute {
|
||||
rdd.map { i =>
|
||||
val partitionId = TaskContext.get().partitionId()
|
||||
if (partitionId == 0) {
|
||||
throw new RuntimeException("mocking task failing")
|
||||
}
|
||||
i
|
||||
}.sum()
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => // catch the exception
|
||||
} finally {
|
||||
// wait 3s to check if SparkContext is killed
|
||||
assert(waitAndCheckSparkShutdown(3000) == false)
|
||||
}
|
||||
}
|
||||
|
||||
test("tracker should cancel the correct job when killSparkContextOnWorkerFailure=false") {
|
||||
val nWorkers = 2
|
||||
val tracker = new SparkParallelismTracker(sc, 0, nWorkers, false)
|
||||
val rdd: RDD[Int] = sc.parallelize(1 to 10, nWorkers)
|
||||
val thread = new TestThread(sc)
|
||||
thread.start()
|
||||
try {
|
||||
tracker.execute {
|
||||
rdd.map { i =>
|
||||
Thread.sleep(100)
|
||||
val partitionId = TaskContext.get().partitionId()
|
||||
if (partitionId == 0) {
|
||||
throw new RuntimeException("mocking task failing")
|
||||
}
|
||||
i
|
||||
}.sum()
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => // catch the exception
|
||||
} finally {
|
||||
thread.join(8000)
|
||||
// wait 3s to check if SparkContext is killed
|
||||
assert(waitAndCheckSparkShutdown(3000) == false)
|
||||
}
|
||||
}
|
||||
|
||||
private[this] class TestThread(sc: SparkContext) extends Thread {
|
||||
override def run(): Unit = {
|
||||
var sum: Double = 0.0f
|
||||
try {
|
||||
val rdd = sc.parallelize(1 to 4, 2)
|
||||
sum = rdd.mapPartitions(iter => {
|
||||
// sleep 2s to ensure task is alive when cancelling other jobs
|
||||
Thread.sleep(2000)
|
||||
iter
|
||||
}).sum()
|
||||
} finally {
|
||||
// get the correct result
|
||||
assert(sum.toInt == 10)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j_2.12</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<version>1.6.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -100,7 +100,7 @@ class NativeLibLoader {
|
||||
});
|
||||
|
||||
return muslRelatedMemoryMappedFilename.isPresent();
|
||||
} catch (IOException ignored) {
|
||||
} catch (Exception ignored) {
|
||||
// ignored
|
||||
}
|
||||
return false;
|
||||
|
||||
@@ -1 +1 @@
|
||||
1.6.0
|
||||
1.6.1
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
#include "xgboost/data.h"
|
||||
#include "xgboost/parameter.h"
|
||||
#include "xgboost/span.h"
|
||||
#include "xgboost/task.h"
|
||||
|
||||
namespace xgboost {
|
||||
namespace common {
|
||||
@@ -75,15 +74,20 @@ inline void InvalidCategory() {
|
||||
// values to be less than this last representable value.
|
||||
auto str = std::to_string(OutOfRangeCat());
|
||||
LOG(FATAL) << "Invalid categorical value detected. Categorical value should be non-negative, "
|
||||
"less than total umber of categories in training data and less than " +
|
||||
"less than total number of categories in training data and less than " +
|
||||
str;
|
||||
}
|
||||
|
||||
inline void CheckMaxCat(float max_cat, size_t n_categories) {
|
||||
CHECK_GE(max_cat + 1, n_categories)
|
||||
<< "Maximum cateogry should not be lesser than the total number of categories.";
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Whether should we use onehot encoding for categorical data.
|
||||
*/
|
||||
XGBOOST_DEVICE inline bool UseOneHot(uint32_t n_cats, uint32_t max_cat_to_onehot, ObjInfo task) {
|
||||
bool use_one_hot = n_cats < max_cat_to_onehot || task.UseOneHot();
|
||||
XGBOOST_DEVICE inline bool UseOneHot(uint32_t n_cats, uint32_t max_cat_to_onehot) {
|
||||
bool use_one_hot = n_cats < max_cat_to_onehot;
|
||||
return use_one_hot;
|
||||
}
|
||||
|
||||
|
||||
@@ -164,6 +164,74 @@ class Range {
|
||||
Iterator end_;
|
||||
};
|
||||
|
||||
/**
|
||||
* \brief Transform iterator that takes an index and calls transform operator.
|
||||
*
|
||||
* This is CPU-only right now as taking host device function as operator complicates the
|
||||
* code. For device side one can use `thrust::transform_iterator` instead.
|
||||
*/
|
||||
template <typename Fn>
|
||||
class IndexTransformIter {
|
||||
size_t iter_{0};
|
||||
Fn fn_;
|
||||
|
||||
public:
|
||||
using iterator_category = std::random_access_iterator_tag; // NOLINT
|
||||
using value_type = std::result_of_t<Fn(size_t)>; // NOLINT
|
||||
using difference_type = detail::ptrdiff_t; // NOLINT
|
||||
using reference = std::add_lvalue_reference_t<value_type>; // NOLINT
|
||||
using pointer = std::add_pointer_t<value_type>; // NOLINT
|
||||
|
||||
public:
|
||||
/**
|
||||
* \param op Transform operator, takes a size_t index as input.
|
||||
*/
|
||||
explicit IndexTransformIter(Fn &&op) : fn_{op} {}
|
||||
IndexTransformIter(IndexTransformIter const &) = default;
|
||||
IndexTransformIter& operator=(IndexTransformIter&&) = default;
|
||||
IndexTransformIter& operator=(IndexTransformIter const& that) {
|
||||
iter_ = that.iter_;
|
||||
return *this;
|
||||
}
|
||||
|
||||
value_type operator*() const { return fn_(iter_); }
|
||||
|
||||
auto operator-(IndexTransformIter const &that) const { return iter_ - that.iter_; }
|
||||
bool operator==(IndexTransformIter const &that) const { return iter_ == that.iter_; }
|
||||
bool operator!=(IndexTransformIter const &that) const { return !(*this == that); }
|
||||
|
||||
IndexTransformIter &operator++() {
|
||||
iter_++;
|
||||
return *this;
|
||||
}
|
||||
IndexTransformIter operator++(int) {
|
||||
auto ret = *this;
|
||||
++(*this);
|
||||
return ret;
|
||||
}
|
||||
IndexTransformIter &operator+=(difference_type n) {
|
||||
iter_ += n;
|
||||
return *this;
|
||||
}
|
||||
IndexTransformIter &operator-=(difference_type n) {
|
||||
(*this) += -n;
|
||||
return *this;
|
||||
}
|
||||
IndexTransformIter operator+(difference_type n) const {
|
||||
auto ret = *this;
|
||||
return ret += n;
|
||||
}
|
||||
IndexTransformIter operator-(difference_type n) const {
|
||||
auto ret = *this;
|
||||
return ret -= n;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Fn>
|
||||
auto MakeIndexTransformIter(Fn&& fn) {
|
||||
return IndexTransformIter<Fn>(std::forward<Fn>(fn));
|
||||
}
|
||||
|
||||
int AllVisibleGPUs();
|
||||
|
||||
inline void AssertGPUSupport() {
|
||||
|
||||
@@ -468,11 +468,17 @@ void AddCutPoint(typename SketchType::SummaryContainer const &summary, int max_b
|
||||
}
|
||||
}
|
||||
|
||||
void AddCategories(std::set<float> const &categories, HistogramCuts *cuts) {
|
||||
auto &cut_values = cuts->cut_values_.HostVector();
|
||||
for (auto const &v : categories) {
|
||||
cut_values.push_back(AsCat(v));
|
||||
auto AddCategories(std::set<float> const &categories, HistogramCuts *cuts) {
|
||||
if (std::any_of(categories.cbegin(), categories.cend(), InvalidCat)) {
|
||||
InvalidCategory();
|
||||
}
|
||||
auto &cut_values = cuts->cut_values_.HostVector();
|
||||
auto max_cat = *std::max_element(categories.cbegin(), categories.cend());
|
||||
CheckMaxCat(max_cat, categories.size());
|
||||
for (bst_cat_t i = 0; i <= AsCat(max_cat); ++i) {
|
||||
cut_values.push_back(i);
|
||||
}
|
||||
return max_cat;
|
||||
}
|
||||
|
||||
template <typename WQSketch>
|
||||
@@ -505,11 +511,12 @@ void SketchContainerImpl<WQSketch>::MakeCuts(HistogramCuts* cuts) {
|
||||
}
|
||||
});
|
||||
|
||||
float max_cat{-1.f};
|
||||
for (size_t fid = 0; fid < reduced.size(); ++fid) {
|
||||
size_t max_num_bins = std::min(num_cuts[fid], max_bins_);
|
||||
typename WQSketch::SummaryContainer const& a = final_summaries[fid];
|
||||
if (IsCat(feature_types_, fid)) {
|
||||
AddCategories(categories_.at(fid), cuts);
|
||||
max_cat = std::max(max_cat, AddCategories(categories_.at(fid), cuts));
|
||||
} else {
|
||||
AddCutPoint<WQSketch>(a, max_num_bins, cuts);
|
||||
// push a value that is greater than anything
|
||||
@@ -527,30 +534,7 @@ void SketchContainerImpl<WQSketch>::MakeCuts(HistogramCuts* cuts) {
|
||||
cuts->cut_ptrs_.HostVector().push_back(cut_size);
|
||||
}
|
||||
|
||||
if (has_categorical_) {
|
||||
for (auto const &feat : categories_) {
|
||||
if (std::any_of(feat.cbegin(), feat.cend(), InvalidCat)) {
|
||||
InvalidCategory();
|
||||
}
|
||||
}
|
||||
auto const &ptrs = cuts->Ptrs();
|
||||
auto const &vals = cuts->Values();
|
||||
|
||||
float max_cat{-std::numeric_limits<float>::infinity()};
|
||||
for (size_t i = 1; i < ptrs.size(); ++i) {
|
||||
if (IsCat(feature_types_, i - 1)) {
|
||||
auto beg = ptrs[i - 1];
|
||||
auto end = ptrs[i];
|
||||
auto feat = Span<float const>{vals}.subspan(beg, end - beg);
|
||||
auto max_elem = *std::max_element(feat.cbegin(), feat.cend());
|
||||
if (max_elem > max_cat) {
|
||||
max_cat = max_elem;
|
||||
}
|
||||
}
|
||||
}
|
||||
cuts->SetCategorical(true, max_cat);
|
||||
}
|
||||
|
||||
cuts->SetCategorical(this->has_categorical_, max_cat);
|
||||
monitor_.Stop(__func__);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2020 by XGBoost Contributors
|
||||
* Copyright 2020-2022 by XGBoost Contributors
|
||||
*/
|
||||
#include <thrust/binary_search.h>
|
||||
#include <thrust/execution_policy.h>
|
||||
@@ -583,13 +583,13 @@ void SketchContainer::AllReduce() {
|
||||
|
||||
namespace {
|
||||
struct InvalidCatOp {
|
||||
Span<float const> values;
|
||||
Span<uint32_t const> ptrs;
|
||||
Span<SketchEntry const> values;
|
||||
Span<size_t const> ptrs;
|
||||
Span<FeatureType const> ft;
|
||||
|
||||
XGBOOST_DEVICE bool operator()(size_t i) const {
|
||||
auto fidx = dh::SegmentId(ptrs, i);
|
||||
return IsCat(ft, fidx) && InvalidCat(values[i]);
|
||||
return IsCat(ft, fidx) && InvalidCat(values[i].value);
|
||||
}
|
||||
};
|
||||
} // anonymous namespace
|
||||
@@ -611,7 +611,7 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
|
||||
|
||||
p_cuts->min_vals_.SetDevice(device_);
|
||||
auto d_min_values = p_cuts->min_vals_.DeviceSpan();
|
||||
auto in_cut_values = dh::ToSpan(this->Current());
|
||||
auto const in_cut_values = dh::ToSpan(this->Current());
|
||||
|
||||
// Set up output ptr
|
||||
p_cuts->cut_ptrs_.SetDevice(device_);
|
||||
@@ -619,26 +619,70 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
|
||||
h_out_columns_ptr.clear();
|
||||
h_out_columns_ptr.push_back(0);
|
||||
auto const& h_feature_types = this->feature_types_.ConstHostSpan();
|
||||
for (bst_feature_t i = 0; i < num_columns_; ++i) {
|
||||
size_t column_size = std::max(static_cast<size_t>(1ul),
|
||||
this->Column(i).size());
|
||||
if (IsCat(h_feature_types, i)) {
|
||||
h_out_columns_ptr.push_back(static_cast<size_t>(column_size));
|
||||
} else {
|
||||
h_out_columns_ptr.push_back(std::min(static_cast<size_t>(column_size),
|
||||
static_cast<size_t>(num_bins_)));
|
||||
|
||||
auto d_ft = feature_types_.ConstDeviceSpan();
|
||||
|
||||
std::vector<SketchEntry> max_values;
|
||||
float max_cat{-1.f};
|
||||
if (has_categorical_) {
|
||||
dh::XGBCachingDeviceAllocator<char> alloc;
|
||||
auto key_it = dh::MakeTransformIterator<bst_feature_t>(
|
||||
thrust::make_counting_iterator(0ul), [=] XGBOOST_DEVICE(size_t i) -> bst_feature_t {
|
||||
return dh::SegmentId(d_in_columns_ptr, i);
|
||||
});
|
||||
auto invalid_op = InvalidCatOp{in_cut_values, d_in_columns_ptr, d_ft};
|
||||
auto val_it = dh::MakeTransformIterator<SketchEntry>(
|
||||
thrust::make_counting_iterator(0ul), [=] XGBOOST_DEVICE(size_t i) {
|
||||
auto fidx = dh::SegmentId(d_in_columns_ptr, i);
|
||||
auto v = in_cut_values[i];
|
||||
if (IsCat(d_ft, fidx)) {
|
||||
if (invalid_op(i)) {
|
||||
// use inf to indicate invalid value, this way we can keep it as in
|
||||
// indicator in the reduce operation as it's always the greatest value.
|
||||
v.value = std::numeric_limits<float>::infinity();
|
||||
}
|
||||
}
|
||||
return v;
|
||||
});
|
||||
CHECK_EQ(num_columns_, d_in_columns_ptr.size() - 1);
|
||||
max_values.resize(d_in_columns_ptr.size() - 1);
|
||||
dh::caching_device_vector<SketchEntry> d_max_values(d_in_columns_ptr.size() - 1);
|
||||
thrust::reduce_by_key(thrust::cuda::par(alloc), key_it, key_it + in_cut_values.size(), val_it,
|
||||
thrust::make_discard_iterator(), d_max_values.begin(),
|
||||
thrust::equal_to<bst_feature_t>{},
|
||||
[] __device__(auto l, auto r) { return l.value > r.value ? l : r; });
|
||||
dh::CopyDeviceSpanToVector(&max_values, dh::ToSpan(d_max_values));
|
||||
auto max_it = common::MakeIndexTransformIter([&](auto i) {
|
||||
if (IsCat(h_feature_types, i)) {
|
||||
return max_values[i].value;
|
||||
}
|
||||
return -1.f;
|
||||
});
|
||||
max_cat = *std::max_element(max_it, max_it + max_values.size());
|
||||
if (std::isinf(max_cat)) {
|
||||
InvalidCategory();
|
||||
}
|
||||
}
|
||||
std::partial_sum(h_out_columns_ptr.begin(), h_out_columns_ptr.end(),
|
||||
h_out_columns_ptr.begin());
|
||||
auto d_out_columns_ptr = p_cuts->cut_ptrs_.ConstDeviceSpan();
|
||||
|
||||
// Set up output cuts
|
||||
for (bst_feature_t i = 0; i < num_columns_; ++i) {
|
||||
size_t column_size = std::max(static_cast<size_t>(1ul), this->Column(i).size());
|
||||
if (IsCat(h_feature_types, i)) {
|
||||
// column_size is the number of unique values in that feature.
|
||||
CheckMaxCat(max_values[i].value, column_size);
|
||||
h_out_columns_ptr.push_back(max_values[i].value + 1); // includes both max_cat and 0.
|
||||
} else {
|
||||
h_out_columns_ptr.push_back(
|
||||
std::min(static_cast<size_t>(column_size), static_cast<size_t>(num_bins_)));
|
||||
}
|
||||
}
|
||||
std::partial_sum(h_out_columns_ptr.begin(), h_out_columns_ptr.end(), h_out_columns_ptr.begin());
|
||||
auto d_out_columns_ptr = p_cuts->cut_ptrs_.ConstDeviceSpan();
|
||||
|
||||
size_t total_bins = h_out_columns_ptr.back();
|
||||
p_cuts->cut_values_.SetDevice(device_);
|
||||
p_cuts->cut_values_.Resize(total_bins);
|
||||
auto out_cut_values = p_cuts->cut_values_.DeviceSpan();
|
||||
auto d_ft = feature_types_.ConstDeviceSpan();
|
||||
|
||||
dh::LaunchN(total_bins, [=] __device__(size_t idx) {
|
||||
auto column_id = dh::SegmentId(d_out_columns_ptr, idx);
|
||||
@@ -667,8 +711,7 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
|
||||
}
|
||||
|
||||
if (IsCat(d_ft, column_id)) {
|
||||
assert(out_column.size() == in_column.size());
|
||||
out_column[idx] = in_column[idx].value;
|
||||
out_column[idx] = idx;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -684,36 +727,7 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
|
||||
out_column[idx] = in_column[idx+1].value;
|
||||
});
|
||||
|
||||
float max_cat{-1.0f};
|
||||
if (has_categorical_) {
|
||||
auto invalid_op = InvalidCatOp{out_cut_values, d_out_columns_ptr, d_ft};
|
||||
auto it = dh::MakeTransformIterator<thrust::pair<bool, float>>(
|
||||
thrust::make_counting_iterator(0ul), [=] XGBOOST_DEVICE(size_t i) {
|
||||
auto fidx = dh::SegmentId(d_out_columns_ptr, i);
|
||||
if (IsCat(d_ft, fidx)) {
|
||||
auto invalid = invalid_op(i);
|
||||
auto v = out_cut_values[i];
|
||||
return thrust::make_pair(invalid, v);
|
||||
}
|
||||
return thrust::make_pair(false, std::numeric_limits<float>::min());
|
||||
});
|
||||
|
||||
bool invalid{false};
|
||||
dh::XGBCachingDeviceAllocator<char> alloc;
|
||||
thrust::tie(invalid, max_cat) =
|
||||
thrust::reduce(thrust::cuda::par(alloc), it, it + out_cut_values.size(),
|
||||
thrust::make_pair(false, std::numeric_limits<float>::min()),
|
||||
[=] XGBOOST_DEVICE(thrust::pair<bool, bst_cat_t> const &l,
|
||||
thrust::pair<bool, bst_cat_t> const &r) {
|
||||
return thrust::make_pair(l.first || r.first, std::max(l.second, r.second));
|
||||
});
|
||||
if (invalid) {
|
||||
InvalidCategory();
|
||||
}
|
||||
}
|
||||
|
||||
p_cuts->SetCategorical(this->has_categorical_, max_cat);
|
||||
|
||||
timer_.Stop(__func__);
|
||||
}
|
||||
} // namespace common
|
||||
|
||||
@@ -199,13 +199,11 @@ __device__ void EvaluateFeature(
|
||||
}
|
||||
|
||||
template <int BLOCK_THREADS, typename GradientSumT>
|
||||
__global__ void EvaluateSplitsKernel(
|
||||
EvaluateSplitInputs<GradientSumT> left,
|
||||
EvaluateSplitInputs<GradientSumT> right,
|
||||
ObjInfo task,
|
||||
common::Span<bst_feature_t> sorted_idx,
|
||||
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
|
||||
common::Span<DeviceSplitCandidate> out_candidates) {
|
||||
__global__ void EvaluateSplitsKernel(EvaluateSplitInputs<GradientSumT> left,
|
||||
EvaluateSplitInputs<GradientSumT> right,
|
||||
common::Span<bst_feature_t> sorted_idx,
|
||||
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
|
||||
common::Span<DeviceSplitCandidate> out_candidates) {
|
||||
// KeyValuePair here used as threadIdx.x -> gain_value
|
||||
using ArgMaxT = cub::KeyValuePair<int, float>;
|
||||
using BlockScanT =
|
||||
@@ -241,7 +239,7 @@ __global__ void EvaluateSplitsKernel(
|
||||
|
||||
if (common::IsCat(inputs.feature_types, fidx)) {
|
||||
auto n_bins_in_feat = inputs.feature_segments[fidx + 1] - inputs.feature_segments[fidx];
|
||||
if (common::UseOneHot(n_bins_in_feat, inputs.param.max_cat_to_onehot, task)) {
|
||||
if (common::UseOneHot(n_bins_in_feat, inputs.param.max_cat_to_onehot)) {
|
||||
EvaluateFeature<BLOCK_THREADS, SumReduceT, BlockScanT, MaxReduceT, TempStorage, GradientSumT,
|
||||
kOneHot>(fidx, inputs, evaluator, sorted_idx, 0, &best_split, &temp_storage);
|
||||
} else {
|
||||
@@ -310,7 +308,7 @@ __device__ void SortBasedSplit(EvaluateSplitInputs<GradientSumT> const &input,
|
||||
|
||||
template <typename GradientSumT>
|
||||
void GPUHistEvaluator<GradientSumT>::EvaluateSplits(
|
||||
EvaluateSplitInputs<GradientSumT> left, EvaluateSplitInputs<GradientSumT> right, ObjInfo task,
|
||||
EvaluateSplitInputs<GradientSumT> left, EvaluateSplitInputs<GradientSumT> right,
|
||||
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
|
||||
common::Span<DeviceSplitCandidate> out_splits) {
|
||||
if (!split_cats_.empty()) {
|
||||
@@ -323,7 +321,7 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(
|
||||
// One block for each feature
|
||||
uint32_t constexpr kBlockThreads = 256;
|
||||
dh::LaunchKernel {static_cast<uint32_t>(combined_num_features), kBlockThreads, 0}(
|
||||
EvaluateSplitsKernel<kBlockThreads, GradientSumT>, left, right, task, this->SortedIdx(left),
|
||||
EvaluateSplitsKernel<kBlockThreads, GradientSumT>, left, right, this->SortedIdx(left),
|
||||
evaluator, dh::ToSpan(feature_best_splits));
|
||||
|
||||
// Reduce to get best candidate for left and right child over all features
|
||||
@@ -365,7 +363,7 @@ void GPUHistEvaluator<GradientSumT>::CopyToHost(EvaluateSplitInputs<GradientSumT
|
||||
}
|
||||
|
||||
template <typename GradientSumT>
|
||||
void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, ObjInfo task,
|
||||
void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate,
|
||||
EvaluateSplitInputs<GradientSumT> left,
|
||||
EvaluateSplitInputs<GradientSumT> right,
|
||||
common::Span<GPUExpandEntry> out_entries) {
|
||||
@@ -373,7 +371,7 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, Ob
|
||||
|
||||
dh::TemporaryArray<DeviceSplitCandidate> splits_out_storage(2);
|
||||
auto out_splits = dh::ToSpan(splits_out_storage);
|
||||
this->EvaluateSplits(left, right, task, evaluator, out_splits);
|
||||
this->EvaluateSplits(left, right, evaluator, out_splits);
|
||||
|
||||
auto d_sorted_idx = this->SortedIdx(left);
|
||||
auto d_entries = out_entries;
|
||||
@@ -385,7 +383,7 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, Ob
|
||||
auto fidx = out_splits[i].findex;
|
||||
|
||||
if (split.is_cat &&
|
||||
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot, task)) {
|
||||
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot)) {
|
||||
bool is_left = i == 0;
|
||||
auto out = is_left ? cats_out.first(cats_out.size() / 2) : cats_out.last(cats_out.size() / 2);
|
||||
SortBasedSplit(input, d_sorted_idx, fidx, is_left, out, &out_splits[i]);
|
||||
@@ -405,11 +403,11 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, Ob
|
||||
|
||||
template <typename GradientSumT>
|
||||
GPUExpandEntry GPUHistEvaluator<GradientSumT>::EvaluateSingleSplit(
|
||||
EvaluateSplitInputs<GradientSumT> input, float weight, ObjInfo task) {
|
||||
EvaluateSplitInputs<GradientSumT> input, float weight) {
|
||||
dh::TemporaryArray<DeviceSplitCandidate> splits_out(1);
|
||||
auto out_split = dh::ToSpan(splits_out);
|
||||
auto evaluator = tree_evaluator_.GetEvaluator<GPUTrainingParam>();
|
||||
this->EvaluateSplits(input, {}, task, evaluator, out_split);
|
||||
this->EvaluateSplits(input, {}, evaluator, out_split);
|
||||
|
||||
auto cats_out = this->DeviceCatStorage(input.nidx);
|
||||
auto d_sorted_idx = this->SortedIdx(input);
|
||||
@@ -421,7 +419,7 @@ GPUExpandEntry GPUHistEvaluator<GradientSumT>::EvaluateSingleSplit(
|
||||
auto fidx = out_split[i].findex;
|
||||
|
||||
if (split.is_cat &&
|
||||
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot, task)) {
|
||||
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot)) {
|
||||
SortBasedSplit(input, d_sorted_idx, fidx, true, cats_out, &out_split[i]);
|
||||
}
|
||||
|
||||
|
||||
@@ -114,7 +114,7 @@ class GPUHistEvaluator {
|
||||
/**
|
||||
* \brief Reset the evaluator, should be called before any use.
|
||||
*/
|
||||
void Reset(common::HistogramCuts const &cuts, common::Span<FeatureType const> ft, ObjInfo task,
|
||||
void Reset(common::HistogramCuts const &cuts, common::Span<FeatureType const> ft,
|
||||
bst_feature_t n_features, TrainParam const ¶m, int32_t device);
|
||||
|
||||
/**
|
||||
@@ -150,21 +150,20 @@ class GPUHistEvaluator {
|
||||
|
||||
// impl of evaluate splits, contains CUDA kernels so it's public
|
||||
void EvaluateSplits(EvaluateSplitInputs<GradientSumT> left,
|
||||
EvaluateSplitInputs<GradientSumT> right, ObjInfo task,
|
||||
EvaluateSplitInputs<GradientSumT> right,
|
||||
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
|
||||
common::Span<DeviceSplitCandidate> out_splits);
|
||||
/**
|
||||
* \brief Evaluate splits for left and right nodes.
|
||||
*/
|
||||
void EvaluateSplits(GPUExpandEntry candidate, ObjInfo task,
|
||||
void EvaluateSplits(GPUExpandEntry candidate,
|
||||
EvaluateSplitInputs<GradientSumT> left,
|
||||
EvaluateSplitInputs<GradientSumT> right,
|
||||
common::Span<GPUExpandEntry> out_splits);
|
||||
/**
|
||||
* \brief Evaluate splits for root node.
|
||||
*/
|
||||
GPUExpandEntry EvaluateSingleSplit(EvaluateSplitInputs<GradientSumT> input, float weight,
|
||||
ObjInfo task);
|
||||
GPUExpandEntry EvaluateSingleSplit(EvaluateSplitInputs<GradientSumT> input, float weight);
|
||||
};
|
||||
} // namespace tree
|
||||
} // namespace xgboost
|
||||
|
||||
@@ -16,12 +16,12 @@ namespace xgboost {
|
||||
namespace tree {
|
||||
template <typename GradientSumT>
|
||||
void GPUHistEvaluator<GradientSumT>::Reset(common::HistogramCuts const &cuts,
|
||||
common::Span<FeatureType const> ft, ObjInfo task,
|
||||
common::Span<FeatureType const> ft,
|
||||
bst_feature_t n_features, TrainParam const ¶m,
|
||||
int32_t device) {
|
||||
param_ = param;
|
||||
tree_evaluator_ = TreeEvaluator{param, n_features, device};
|
||||
if (cuts.HasCategorical() && !task.UseOneHot()) {
|
||||
if (cuts.HasCategorical()) {
|
||||
dh::XGBCachingDeviceAllocator<char> alloc;
|
||||
auto ptrs = cuts.cut_ptrs_.ConstDeviceSpan();
|
||||
auto beg = thrust::make_counting_iterator<size_t>(1ul);
|
||||
@@ -34,7 +34,7 @@ void GPUHistEvaluator<GradientSumT>::Reset(common::HistogramCuts const &cuts,
|
||||
auto idx = i - 1;
|
||||
if (common::IsCat(ft, idx)) {
|
||||
auto n_bins = ptrs[i] - ptrs[idx];
|
||||
bool use_sort = !common::UseOneHot(n_bins, to_onehot, task);
|
||||
bool use_sort = !common::UseOneHot(n_bins, to_onehot);
|
||||
return use_sort;
|
||||
}
|
||||
return false;
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "xgboost/task.h"
|
||||
#include "../param.h"
|
||||
#include "../constraints.h"
|
||||
#include "../split_evaluator.h"
|
||||
@@ -39,7 +38,6 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
|
||||
int32_t n_threads_ {0};
|
||||
FeatureInteractionConstraintHost interaction_constraints_;
|
||||
std::vector<NodeEntry> snode_;
|
||||
ObjInfo task_;
|
||||
|
||||
// if sum of statistics for non-missing values in the node
|
||||
// is equal to sum of statistics for all values:
|
||||
@@ -244,7 +242,7 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
|
||||
}
|
||||
if (is_cat) {
|
||||
auto n_bins = cut_ptrs.at(fidx + 1) - cut_ptrs[fidx];
|
||||
if (common::UseOneHot(n_bins, param_.max_cat_to_onehot, task_)) {
|
||||
if (common::UseOneHot(n_bins, param_.max_cat_to_onehot)) {
|
||||
EnumerateSplit<+1, kOneHot>(cut, {}, histogram, fidx, nidx, evaluator, best);
|
||||
EnumerateSplit<-1, kOneHot>(cut, {}, histogram, fidx, nidx, evaluator, best);
|
||||
} else {
|
||||
@@ -345,7 +343,6 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
|
||||
|
||||
auto Evaluator() const { return tree_evaluator_.GetEvaluator(); }
|
||||
auto const& Stats() const { return snode_; }
|
||||
auto Task() const { return task_; }
|
||||
|
||||
float InitRoot(GradStats const& root_sum) {
|
||||
snode_.resize(1);
|
||||
@@ -363,12 +360,11 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
|
||||
// The column sampler must be constructed by caller since we need to preserve the rng
|
||||
// for the entire training session.
|
||||
explicit HistEvaluator(TrainParam const ¶m, MetaInfo const &info, int32_t n_threads,
|
||||
std::shared_ptr<common::ColumnSampler> sampler, ObjInfo task)
|
||||
std::shared_ptr<common::ColumnSampler> sampler)
|
||||
: param_{param},
|
||||
column_sampler_{std::move(sampler)},
|
||||
tree_evaluator_{param, static_cast<bst_feature_t>(info.num_col_), GenericParameter::kCpuId},
|
||||
n_threads_{n_threads},
|
||||
task_{task} {
|
||||
n_threads_{n_threads} {
|
||||
interaction_constraints_.Configure(param, info.num_col_);
|
||||
column_sampler_->Init(info.num_col_, info.feature_weights.HostVector(), param_.colsample_bynode,
|
||||
param_.colsample_bylevel, param_.colsample_bytree);
|
||||
|
||||
@@ -28,10 +28,8 @@ DMLC_REGISTRY_FILE_TAG(updater_approx);
|
||||
|
||||
namespace {
|
||||
// Return the BatchParam used by DMatrix.
|
||||
template <typename GradientSumT>
|
||||
auto BatchSpec(TrainParam const &p, common::Span<float> hess,
|
||||
HistEvaluator<GradientSumT, CPUExpandEntry> const &evaluator) {
|
||||
return BatchParam{p.max_bin, hess, !evaluator.Task().const_hess};
|
||||
auto BatchSpec(TrainParam const &p, common::Span<float> hess, ObjInfo const task) {
|
||||
return BatchParam{p.max_bin, hess, !task.const_hess};
|
||||
}
|
||||
|
||||
auto BatchSpec(TrainParam const &p, common::Span<float> hess) {
|
||||
@@ -46,7 +44,8 @@ class GloablApproxBuilder {
|
||||
std::shared_ptr<common::ColumnSampler> col_sampler_;
|
||||
HistEvaluator<GradientSumT, CPUExpandEntry> evaluator_;
|
||||
HistogramBuilder<GradientSumT, CPUExpandEntry> histogram_builder_;
|
||||
GenericParameter const *ctx_;
|
||||
Context const *ctx_;
|
||||
ObjInfo const task_;
|
||||
|
||||
std::vector<ApproxRowPartitioner> partitioner_;
|
||||
// Pointer to last updated tree, used for update prediction cache.
|
||||
@@ -64,8 +63,7 @@ class GloablApproxBuilder {
|
||||
int32_t n_total_bins = 0;
|
||||
partitioner_.clear();
|
||||
// Generating the GHistIndexMatrix is quite slow, is there a way to speed it up?
|
||||
for (auto const &page :
|
||||
p_fmat->GetBatches<GHistIndexMatrix>(BatchSpec(param_, hess, evaluator_))) {
|
||||
for (auto const &page : p_fmat->GetBatches<GHistIndexMatrix>(BatchSpec(param_, hess, task_))) {
|
||||
if (n_total_bins == 0) {
|
||||
n_total_bins = page.cut.TotalBins();
|
||||
feature_values_ = page.cut;
|
||||
@@ -160,8 +158,9 @@ class GloablApproxBuilder {
|
||||
common::Monitor *monitor)
|
||||
: param_{std::move(param)},
|
||||
col_sampler_{std::move(column_sampler)},
|
||||
evaluator_{param_, info, ctx->Threads(), col_sampler_, task},
|
||||
evaluator_{param_, info, ctx->Threads(), col_sampler_},
|
||||
ctx_{ctx},
|
||||
task_{task},
|
||||
monitor_{monitor} {}
|
||||
|
||||
void UpdateTree(RegTree *p_tree, std::vector<GradientPair> const &gpair, common::Span<float> hess,
|
||||
|
||||
@@ -229,16 +229,14 @@ struct GPUHistMakerDevice {
|
||||
// Reset values for each update iteration
|
||||
// Note that the column sampler must be passed by value because it is not
|
||||
// thread safe
|
||||
void Reset(HostDeviceVector<GradientPair>* dh_gpair, DMatrix* dmat, int64_t num_columns,
|
||||
ObjInfo task) {
|
||||
void Reset(HostDeviceVector<GradientPair>* dh_gpair, DMatrix* dmat, int64_t num_columns) {
|
||||
auto const& info = dmat->Info();
|
||||
this->column_sampler.Init(num_columns, info.feature_weights.HostVector(),
|
||||
param.colsample_bynode, param.colsample_bylevel,
|
||||
param.colsample_bytree);
|
||||
dh::safe_cuda(cudaSetDevice(device_id));
|
||||
|
||||
this->evaluator_.Reset(page->Cuts(), feature_types, task, dmat->Info().num_col_, param,
|
||||
device_id);
|
||||
this->evaluator_.Reset(page->Cuts(), feature_types, dmat->Info().num_col_, param, device_id);
|
||||
|
||||
this->interaction_constraints.Reset();
|
||||
std::fill(node_sum_gradients.begin(), node_sum_gradients.end(), GradientPairPrecise{});
|
||||
@@ -260,7 +258,7 @@ struct GPUHistMakerDevice {
|
||||
hist.Reset();
|
||||
}
|
||||
|
||||
GPUExpandEntry EvaluateRootSplit(GradientPairPrecise root_sum, float weight, ObjInfo task) {
|
||||
GPUExpandEntry EvaluateRootSplit(GradientPairPrecise root_sum, float weight) {
|
||||
int nidx = RegTree::kRoot;
|
||||
GPUTrainingParam gpu_param(param);
|
||||
auto sampled_features = column_sampler.GetFeatureSet(0);
|
||||
@@ -277,12 +275,12 @@ struct GPUHistMakerDevice {
|
||||
matrix.gidx_fvalue_map,
|
||||
matrix.min_fvalue,
|
||||
hist.GetNodeHistogram(nidx)};
|
||||
auto split = this->evaluator_.EvaluateSingleSplit(inputs, weight, task);
|
||||
auto split = this->evaluator_.EvaluateSingleSplit(inputs, weight);
|
||||
return split;
|
||||
}
|
||||
|
||||
void EvaluateLeftRightSplits(GPUExpandEntry candidate, ObjInfo task, int left_nidx,
|
||||
int right_nidx, const RegTree& tree,
|
||||
void EvaluateLeftRightSplits(GPUExpandEntry candidate, int left_nidx, int right_nidx,
|
||||
const RegTree& tree,
|
||||
common::Span<GPUExpandEntry> pinned_candidates_out) {
|
||||
dh::TemporaryArray<DeviceSplitCandidate> splits_out(2);
|
||||
GPUTrainingParam gpu_param(param);
|
||||
@@ -316,7 +314,7 @@ struct GPUHistMakerDevice {
|
||||
hist.GetNodeHistogram(right_nidx)};
|
||||
|
||||
dh::TemporaryArray<GPUExpandEntry> entries(2);
|
||||
this->evaluator_.EvaluateSplits(candidate, task, left, right, dh::ToSpan(entries));
|
||||
this->evaluator_.EvaluateSplits(candidate, left, right, dh::ToSpan(entries));
|
||||
dh::safe_cuda(cudaMemcpyAsync(pinned_candidates_out.data(), entries.data().get(),
|
||||
sizeof(GPUExpandEntry) * entries.size(), cudaMemcpyDeviceToHost));
|
||||
}
|
||||
@@ -584,7 +582,7 @@ struct GPUHistMakerDevice {
|
||||
tree[candidate.nid].RightChild());
|
||||
}
|
||||
|
||||
GPUExpandEntry InitRoot(RegTree* p_tree, ObjInfo task, dh::AllReducer* reducer) {
|
||||
GPUExpandEntry InitRoot(RegTree* p_tree, dh::AllReducer* reducer) {
|
||||
constexpr bst_node_t kRootNIdx = 0;
|
||||
dh::XGBCachingDeviceAllocator<char> alloc;
|
||||
auto gpair_it = dh::MakeTransformIterator<GradientPairPrecise>(
|
||||
@@ -605,7 +603,7 @@ struct GPUHistMakerDevice {
|
||||
(*p_tree)[kRootNIdx].SetLeaf(param.learning_rate * weight);
|
||||
|
||||
// Generate first split
|
||||
auto root_entry = this->EvaluateRootSplit(root_sum, weight, task);
|
||||
auto root_entry = this->EvaluateRootSplit(root_sum, weight);
|
||||
return root_entry;
|
||||
}
|
||||
|
||||
@@ -615,11 +613,11 @@ struct GPUHistMakerDevice {
|
||||
Driver<GPUExpandEntry> driver(static_cast<TrainParam::TreeGrowPolicy>(param.grow_policy));
|
||||
|
||||
monitor.Start("Reset");
|
||||
this->Reset(gpair_all, p_fmat, p_fmat->Info().num_col_, task);
|
||||
this->Reset(gpair_all, p_fmat, p_fmat->Info().num_col_);
|
||||
monitor.Stop("Reset");
|
||||
|
||||
monitor.Start("InitRoot");
|
||||
driver.Push({ this->InitRoot(p_tree, task, reducer) });
|
||||
driver.Push({ this->InitRoot(p_tree, reducer) });
|
||||
monitor.Stop("InitRoot");
|
||||
|
||||
auto num_leaves = 1;
|
||||
@@ -656,7 +654,7 @@ struct GPUHistMakerDevice {
|
||||
monitor.Stop("BuildHist");
|
||||
|
||||
monitor.Start("EvaluateSplits");
|
||||
this->EvaluateLeftRightSplits(candidate, task, left_child_nidx, right_child_nidx, *p_tree,
|
||||
this->EvaluateLeftRightSplits(candidate, left_child_nidx, right_child_nidx, *p_tree,
|
||||
new_candidates.subspan(i * 2, 2));
|
||||
monitor.Stop("EvaluateSplits");
|
||||
} else {
|
||||
|
||||
@@ -342,7 +342,7 @@ void QuantileHistMaker::Builder<GradientSumT>::InitData(DMatrix *fmat, const Reg
|
||||
// store a pointer to the tree
|
||||
p_last_tree_ = &tree;
|
||||
evaluator_.reset(new HistEvaluator<GradientSumT, CPUExpandEntry>{
|
||||
param_, info, this->ctx_->Threads(), column_sampler_, task_});
|
||||
param_, info, this->ctx_->Threads(), column_sampler_});
|
||||
|
||||
monitor_->Stop(__func__);
|
||||
}
|
||||
|
||||
@@ -57,8 +57,7 @@ void TestEvaluateSingleSplit(bool is_categorical) {
|
||||
GPUHistEvaluator<GradientPair> evaluator{
|
||||
tparam, static_cast<bst_feature_t>(feature_min_values.size()), 0};
|
||||
dh::device_vector<common::CatBitField::value_type> out_cats;
|
||||
DeviceSplitCandidate result =
|
||||
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
|
||||
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
|
||||
|
||||
EXPECT_EQ(result.findex, 1);
|
||||
EXPECT_EQ(result.fvalue, 11.0);
|
||||
@@ -101,8 +100,7 @@ TEST(GpuHist, EvaluateSingleSplitMissing) {
|
||||
dh::ToSpan(feature_histogram)};
|
||||
|
||||
GPUHistEvaluator<GradientPair> evaluator(tparam, feature_set.size(), 0);
|
||||
DeviceSplitCandidate result =
|
||||
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
|
||||
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
|
||||
|
||||
EXPECT_EQ(result.findex, 0);
|
||||
EXPECT_EQ(result.fvalue, 1.0);
|
||||
@@ -114,10 +112,8 @@ TEST(GpuHist, EvaluateSingleSplitMissing) {
|
||||
TEST(GpuHist, EvaluateSingleSplitEmpty) {
|
||||
TrainParam tparam = ZeroParam();
|
||||
GPUHistEvaluator<GradientPair> evaluator(tparam, 1, 0);
|
||||
DeviceSplitCandidate result = evaluator
|
||||
.EvaluateSingleSplit(EvaluateSplitInputs<GradientPair>{}, 0,
|
||||
ObjInfo{ObjInfo::kRegression})
|
||||
.split;
|
||||
DeviceSplitCandidate result =
|
||||
evaluator.EvaluateSingleSplit(EvaluateSplitInputs<GradientPair>{}, 0).split;
|
||||
EXPECT_EQ(result.findex, -1);
|
||||
EXPECT_LT(result.loss_chg, 0.0f);
|
||||
}
|
||||
@@ -152,8 +148,7 @@ TEST(GpuHist, EvaluateSingleSplitFeatureSampling) {
|
||||
dh::ToSpan(feature_histogram)};
|
||||
|
||||
GPUHistEvaluator<GradientPair> evaluator(tparam, feature_min_values.size(), 0);
|
||||
DeviceSplitCandidate result =
|
||||
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
|
||||
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
|
||||
|
||||
EXPECT_EQ(result.findex, 1);
|
||||
EXPECT_EQ(result.fvalue, 11.0);
|
||||
@@ -191,8 +186,7 @@ TEST(GpuHist, EvaluateSingleSplitBreakTies) {
|
||||
dh::ToSpan(feature_histogram)};
|
||||
|
||||
GPUHistEvaluator<GradientPair> evaluator(tparam, feature_min_values.size(), 0);
|
||||
DeviceSplitCandidate result =
|
||||
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
|
||||
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
|
||||
|
||||
EXPECT_EQ(result.findex, 0);
|
||||
EXPECT_EQ(result.fvalue, 1.0);
|
||||
@@ -243,8 +237,8 @@ TEST(GpuHist, EvaluateSplits) {
|
||||
|
||||
GPUHistEvaluator<GradientPair> evaluator{
|
||||
tparam, static_cast<bst_feature_t>(feature_min_values.size()), 0};
|
||||
evaluator.EvaluateSplits(input_left, input_right, ObjInfo{ObjInfo::kRegression},
|
||||
evaluator.GetEvaluator(), dh::ToSpan(out_splits));
|
||||
evaluator.EvaluateSplits(input_left, input_right, evaluator.GetEvaluator(),
|
||||
dh::ToSpan(out_splits));
|
||||
|
||||
DeviceSplitCandidate result_left = out_splits[0];
|
||||
EXPECT_EQ(result_left.findex, 1);
|
||||
@@ -264,8 +258,7 @@ TEST_F(TestPartitionBasedSplit, GpuHist) {
|
||||
cuts_.cut_values_.SetDevice(0);
|
||||
cuts_.min_vals_.SetDevice(0);
|
||||
|
||||
ObjInfo task{ObjInfo::kRegression};
|
||||
evaluator.Reset(cuts_, dh::ToSpan(ft), task, info_.num_col_, param_, 0);
|
||||
evaluator.Reset(cuts_, dh::ToSpan(ft), info_.num_col_, param_, 0);
|
||||
|
||||
dh::device_vector<GradientPairPrecise> d_hist(hist_[0].size());
|
||||
auto node_hist = hist_[0];
|
||||
@@ -282,7 +275,7 @@ TEST_F(TestPartitionBasedSplit, GpuHist) {
|
||||
cuts_.cut_values_.ConstDeviceSpan(),
|
||||
cuts_.min_vals_.ConstDeviceSpan(),
|
||||
dh::ToSpan(d_hist)};
|
||||
auto split = evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
|
||||
auto split = evaluator.EvaluateSingleSplit(input, 0).split;
|
||||
ASSERT_NEAR(split.loss_chg, best_score_, 1e-16);
|
||||
}
|
||||
} // namespace tree
|
||||
|
||||
@@ -24,8 +24,8 @@ template <typename GradientSumT> void TestEvaluateSplits() {
|
||||
|
||||
auto dmat = RandomDataGenerator(kRows, kCols, 0).Seed(3).GenerateDMatrix();
|
||||
|
||||
auto evaluator = HistEvaluator<GradientSumT, CPUExpandEntry>{
|
||||
param, dmat->Info(), n_threads, sampler, ObjInfo{ObjInfo::kRegression}};
|
||||
auto evaluator =
|
||||
HistEvaluator<GradientSumT, CPUExpandEntry>{param, dmat->Info(), n_threads, sampler};
|
||||
common::HistCollection<GradientSumT> hist;
|
||||
std::vector<GradientPair> row_gpairs = {
|
||||
{1.23f, 0.24f}, {0.24f, 0.25f}, {0.26f, 0.27f}, {2.27f, 0.28f},
|
||||
@@ -97,8 +97,7 @@ TEST(HistEvaluator, Apply) {
|
||||
param.UpdateAllowUnknown(Args{{"min_child_weight", "0"}, {"reg_lambda", "0.0"}});
|
||||
auto dmat = RandomDataGenerator(kNRows, kNCols, 0).Seed(3).GenerateDMatrix();
|
||||
auto sampler = std::make_shared<common::ColumnSampler>();
|
||||
auto evaluator_ = HistEvaluator<float, CPUExpandEntry>{param, dmat->Info(), 4, sampler,
|
||||
ObjInfo{ObjInfo::kRegression}};
|
||||
auto evaluator_ = HistEvaluator<float, CPUExpandEntry>{param, dmat->Info(), 4, sampler};
|
||||
|
||||
CPUExpandEntry entry{0, 0, 10.0f};
|
||||
entry.split.left_sum = GradStats{0.4, 0.6f};
|
||||
@@ -125,7 +124,7 @@ TEST_F(TestPartitionBasedSplit, CPUHist) {
|
||||
std::vector<FeatureType> ft{FeatureType::kCategorical};
|
||||
auto sampler = std::make_shared<common::ColumnSampler>();
|
||||
HistEvaluator<double, CPUExpandEntry> evaluator{param_, info_, common::OmpGetNumThreads(0),
|
||||
sampler, ObjInfo{ObjInfo::kRegression}};
|
||||
sampler};
|
||||
evaluator.InitRoot(GradStats{total_gpair_});
|
||||
RegTree tree;
|
||||
std::vector<CPUExpandEntry> entries(1);
|
||||
@@ -156,8 +155,8 @@ auto CompareOneHotAndPartition(bool onehot) {
|
||||
|
||||
int32_t n_threads = 16;
|
||||
auto sampler = std::make_shared<common::ColumnSampler>();
|
||||
auto evaluator = HistEvaluator<GradientSumT, CPUExpandEntry>{
|
||||
param, dmat->Info(), n_threads, sampler, ObjInfo{ObjInfo::kRegression}};
|
||||
auto evaluator =
|
||||
HistEvaluator<GradientSumT, CPUExpandEntry>{param, dmat->Info(), n_threads, sampler};
|
||||
std::vector<CPUExpandEntry> entries(1);
|
||||
|
||||
for (auto const &gmat : dmat->GetBatches<GHistIndexMatrix>({32, param.sparse_threshold})) {
|
||||
|
||||
@@ -262,7 +262,7 @@ TEST(GpuHist, EvaluateRootSplit) {
|
||||
info.num_col_ = kNCols;
|
||||
|
||||
DeviceSplitCandidate res =
|
||||
maker.EvaluateRootSplit({6.4f, 12.8f}, 0, ObjInfo{ObjInfo::kRegression}).split;
|
||||
maker.EvaluateRootSplit({6.4f, 12.8f}, 0).split;
|
||||
|
||||
ASSERT_EQ(res.findex, 7);
|
||||
ASSERT_NEAR(res.fvalue, 0.26, xgboost::kRtEps);
|
||||
@@ -300,11 +300,11 @@ void TestHistogramIndexImpl() {
|
||||
const auto &maker = hist_maker.maker;
|
||||
auto grad = GenerateRandomGradients(kNRows);
|
||||
grad.SetDevice(0);
|
||||
maker->Reset(&grad, hist_maker_dmat.get(), kNCols, ObjInfo{ObjInfo::kRegression});
|
||||
maker->Reset(&grad, hist_maker_dmat.get(), kNCols);
|
||||
std::vector<common::CompressedByteT> h_gidx_buffer(maker->page->gidx_buffer.HostVector());
|
||||
|
||||
const auto &maker_ext = hist_maker_ext.maker;
|
||||
maker_ext->Reset(&grad, hist_maker_ext_dmat.get(), kNCols, ObjInfo{ObjInfo::kRegression});
|
||||
maker_ext->Reset(&grad, hist_maker_ext_dmat.get(), kNCols);
|
||||
std::vector<common::CompressedByteT> h_gidx_buffer_ext(maker_ext->page->gidx_buffer.HostVector());
|
||||
|
||||
ASSERT_EQ(maker->page->Cuts().TotalBins(), maker_ext->page->Cuts().TotalBins());
|
||||
|
||||
@@ -61,6 +61,9 @@ class TestGPUUpdaters:
|
||||
def test_categorical(self, rows, cols, rounds, cats):
|
||||
self.cputest.run_categorical_basic(rows, cols, rounds, cats, "gpu_hist")
|
||||
|
||||
def test_max_cat(self) -> None:
|
||||
self.cputest.run_max_cat("gpu_hist")
|
||||
|
||||
def test_categorical_32_cat(self):
|
||||
'''32 hits the bound of integer bitset, so special test'''
|
||||
rows = 1000
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
import testing as tm
|
||||
import pytest
|
||||
import xgboost as xgb
|
||||
@@ -167,6 +169,30 @@ class TestTreeMethod:
|
||||
|
||||
def test_invalid_category(self) -> None:
|
||||
self.run_invalid_category("approx")
|
||||
self.run_invalid_category("hist")
|
||||
|
||||
def run_max_cat(self, tree_method: str) -> None:
|
||||
"""Test data with size smaller than number of categories."""
|
||||
import pandas as pd
|
||||
n_cat = 100
|
||||
n = 5
|
||||
X = pd.Series(
|
||||
["".join(choice(ascii_lowercase) for i in range(3)) for i in range(n_cat)],
|
||||
dtype="category",
|
||||
)[:n].to_frame()
|
||||
|
||||
reg = xgb.XGBRegressor(
|
||||
enable_categorical=True,
|
||||
tree_method=tree_method,
|
||||
n_estimators=10,
|
||||
)
|
||||
y = pd.Series(range(n))
|
||||
reg.fit(X=X, y=y, eval_set=[(X, y)])
|
||||
assert tm.non_increasing(reg.evals_result()["validation_0"]["rmse"])
|
||||
|
||||
@pytest.mark.parametrize("tree_method", ["hist", "approx"])
|
||||
def test_max_cat(self, tree_method) -> None:
|
||||
self.run_max_cat(tree_method)
|
||||
|
||||
def run_categorical_basic(self, rows, cols, rounds, cats, tree_method):
|
||||
onehot, label = tm.make_categorical(rows, cols, cats, True)
|
||||
|
||||
Reference in New Issue
Block a user