Compare commits

...

12 Commits

Author SHA1 Message Date
Jiaming Yuan
5d92a7d936 Bump release version to 1.6.1. (#7872) 2022-05-08 14:20:50 +08:00
Jiaming Yuan
c2508814ff [backport] Use maximum category in sketch. (#7853) (#7866) 2022-05-06 21:11:33 +08:00
Jiaming Yuan
b1b6246e35 [backport] Always use partition based categorical splits. (#7857) (#7865) 2022-05-06 19:14:19 +08:00
Jiaming Yuan
f4eb6b984e [backport] jvm-packages 1.6.1 (#7849)
* [jvm-packages] move the dmatrix building into rabit context (#7823)

This fixes the QuantileDeviceDMatrix in distributed environment.

* [doc] update the jvm tutorial to 1.6.1 [skip ci] (#7834)

* [Breaking][jvm-packages] Use barrier execution mode (#7836)

With the introduction of the barrier execution mode. we don't need to kill SparkContext when some xgboost tasks failed. Instead, Spark will handle the errors for us. So in this PR, `killSparkContextOnWorkerFailure` parameter is deleted.

* [doc] remove the doc about killing SparkContext [skip ci] (#7840)

* [jvm-package] remove the coalesce in barrier mode (#7846)

* [jvm-packages] Fix model compatibility (#7845)

* Ignore all Java exceptions when looking for Linux musl support (#7844)

Co-authored-by: Bobby Wang <wbo4958@gmail.com>
Co-authored-by: Michael Allman <msa@allman.ms>
2022-04-29 17:20:58 +08:00
Jiaming Yuan
f75c007f27 Make 1.6.0 release. (#7813) 2022-04-16 08:43:21 +08:00
Jiaming Yuan
816e788b29 [backport] #7808 #7810 (#7811)
* [jvm-packages] add hostIp and python exec for rabit tracker (#7808)

* Fix training continuation with categorical model. (#7810)

* Make sure the task is initialized before construction of tree updater.

This is a quick fix meant to be backported to 1.6, for a full fix we should pass the model
param into tree updater by reference instead.

Co-authored-by: Bobby Wang <wbo4958@gmail.com>
2022-04-15 19:56:42 +08:00
Jiaming Yuan
3ee3b18a22 [doc] fix a typo in jvm/index.rst (#7806) [skip ci] (#7807)
Co-authored-by: Bobby Wang <wbo4958@gmail.com>
2022-04-14 10:41:54 +08:00
Jiaming Yuan
ece4dc457b [backport] Backport jvm changes to 1.6. (#7803)
* [doc] improve xgboost4j-spark-gpu doc [skip ci] (#7793)


Co-authored-by: Sameer Raheja <sameerz@users.noreply.github.com>

* [jvm-packages] fix evaluation when featuresCols is used (#7798)

Co-authored-by: Bobby Wang <wbo4958@gmail.com>
Co-authored-by: Sameer Raheja <sameerz@users.noreply.github.com>
2022-04-13 17:35:29 +08:00
Jiaming Yuan
67298ccd03 [backport] Backport JVM fixes and document update to 1.6 (#7792)
* [jvm-packages] unify setFeaturesCol API for XGBoostRegressor (#7784)

* [jvm-packages] add doc for xgboost4j-spark-gpu (#7779)


Co-authored-by: Jiaming Yuan <jm.yuan@outlook.com>

* [jvm-packages] remove the dep of com.fasterxml.jackson (#7791)

* [jvm-packages] xgboost4j-spark should work when featuresCols is specified (#7789)

Co-authored-by: Bobby Wang <wbo4958@gmail.com>
2022-04-08 14:18:46 +08:00
Philip Hyunsu Cho
78d231264a [CI] Enable faulthandler to show details when 0xC0000005 error occurs (#7771) 2022-03-30 19:16:54 -07:00
Jiaming Yuan
4615fa51ef Drop support for deprecated CUDA architecture. (#7767)
* Drop support for deprecated CUDA architecture.

* Check file size at release branch.

* Use 200 MB limit

Co-authored-by: Philip Hyunsu Cho <chohyu01@cs.washington.edu>
2022-03-30 15:16:35 -07:00
Jiaming Yuan
4bd5a33b10 Make rc1 release. (#7764) 2022-03-30 21:32:40 +08:00
75 changed files with 1311 additions and 988 deletions

View File

@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.14 FATAL_ERROR)
project(xgboost LANGUAGES CXX C VERSION 1.6.0)
project(xgboost LANGUAGES CXX C VERSION 1.6.1)
include(cmake/Utils.cmake)
list(APPEND CMAKE_MODULE_PATH "${xgboost_SOURCE_DIR}/cmake/modules")
cmake_policy(SET CMP0022 NEW)

View File

@@ -153,9 +153,9 @@ def TestWin64() {
conda activate ${env_name} && for /R %%i in (python-package\\dist\\*.whl) DO python -m pip install "%%i"
"""
echo "Running Python tests..."
bat "conda activate ${env_name} && python -m pytest -v -s -rxXs --fulltrace tests\\python"
bat "conda activate ${env_name} && python -X faulthandler -m pytest -v -s -rxXs --fulltrace tests\\python"
bat """
conda activate ${env_name} && python -m pytest -v -s -rxXs --fulltrace -m "(not slow) and (not mgpu)" tests\\python-gpu
conda activate ${env_name} && python -X faulthandler -m pytest -v -s -rxXs --fulltrace -m "(not slow) and (not mgpu)" tests\\python-gpu
"""
bat "conda env remove --name ${env_name}"
deleteDir()

View File

@@ -1 +1 @@
@xgboost_VERSION_MAJOR@.@xgboost_VERSION_MINOR@.@xgboost_VERSION_PATCH@-dev
@xgboost_VERSION_MAJOR@.@xgboost_VERSION_MINOR@.@xgboost_VERSION_PATCH@

View File

@@ -91,9 +91,9 @@ function(format_gencode_flags flags out)
# Set up architecture flags
if(NOT flags)
if (CUDA_VERSION VERSION_GREATER_EQUAL "11.1")
set(flags "50;52;60;61;70;75;80;86")
set(flags "52;60;61;70;75;80;86")
elseif (CUDA_VERSION VERSION_GREATER_EQUAL "11.0")
set(flags "35;50;52;60;61;70;75;80")
set(flags "52;60;61;70;75;80")
elseif(CUDA_VERSION VERSION_GREATER_EQUAL "10.0")
set(flags "35;50;52;60;61;70;75")
elseif(CUDA_VERSION VERSION_GREATER_EQUAL "9.0")

View File

@@ -101,7 +101,7 @@ R
JVM
---
You can use XGBoost4J in your Java/Scala application by adding XGBoost4J as a dependency:
* XGBoost4j/XGBoost4j-Spark
.. code-block:: xml
:caption: Maven
@@ -134,6 +134,39 @@ You can use XGBoost4J in your Java/Scala application by adding XGBoost4J as a de
"ml.dmlc" %% "xgboost4j-spark" % "latest_version_num"
)
* XGBoost4j-GPU/XGBoost4j-Spark-GPU
.. code-block:: xml
:caption: Maven
<properties>
...
<!-- Specify Scala version in package name -->
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
...
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-gpu_${scala.binary.version}</artifactId>
<version>latest_version_num</version>
</dependency>
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-spark-gpu_${scala.binary.version}</artifactId>
<version>latest_version_num</version>
</dependency>
</dependencies>
.. code-block:: scala
:caption: sbt
libraryDependencies ++= Seq(
"ml.dmlc" %% "xgboost4j-gpu" % "latest_version_num",
"ml.dmlc" %% "xgboost4j-spark-gpu" % "latest_version_num"
)
This will check out the latest stable version from the Maven Central.
For the latest release version number, please check `release page <https://github.com/dmlc/xgboost/releases>`_.
@@ -185,7 +218,7 @@ and Windows.) Download it and run the following commands:
JVM
---
First add the following Maven repository hosted by the XGBoost project:
* XGBoost4j/XGBoost4j-Spark
.. code-block:: xml
:caption: Maven
@@ -234,6 +267,40 @@ Then add XGBoost4J as a dependency:
"ml.dmlc" %% "xgboost4j-spark" % "latest_version_num-SNAPSHOT"
)
* XGBoost4j-GPU/XGBoost4j-Spark-GPU
.. code-block:: xml
:caption: maven
<properties>
...
<!-- Specify Scala version in package name -->
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
...
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-gpu_${scala.binary.version}</artifactId>
<version>latest_version_num-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-spark-gpu_${scala.binary.version}</artifactId>
<version>latest_version_num-SNAPSHOT</version>
</dependency>
</dependencies>
.. code-block:: scala
:caption: sbt
libraryDependencies ++= Seq(
"ml.dmlc" %% "xgboost4j-gpu" % "latest_version_num-SNAPSHOT",
"ml.dmlc" %% "xgboost4j-spark-gpu" % "latest_version_num-SNAPSHOT"
)
Look up the ``version`` field in `pom.xml <https://github.com/dmlc/xgboost/blob/master/jvm-packages/pom.xml>`_ to get the correct version number.
The SNAPSHOT JARs are hosted by the XGBoost project. Every commit in the ``master`` branch will automatically trigger generation of a new SNAPSHOT JAR. You can control how often Maven should upgrade your SNAPSHOT installation by specifying ``updatePolicy``. See `here <http://maven.apache.org/pom.html#Repositories>`_ for details.

View File

@@ -35,6 +35,7 @@ Contents
java_intro
XGBoost4J-Spark Tutorial <xgboost4j_spark_tutorial>
XGBoost4J-Spark-GPU Tutorial <xgboost4j_spark_gpu_tutorial>
Code Examples <https://github.com/dmlc/xgboost/tree/master/jvm-packages/xgboost4j-example>
XGBoost4J Java API <javadocs/index>
XGBoost4J Scala API <scaladocs/xgboost4j/index>

View File

@@ -0,0 +1,246 @@
#############################################
XGBoost4J-Spark-GPU Tutorial (version 1.6.1+)
#############################################
**XGBoost4J-Spark-GPU** is an open source library aiming to accelerate distributed XGBoost training on Apache Spark cluster from
end to end with GPUs by leveraging the `RAPIDS Accelerator for Apache Spark <https://nvidia.github.io/spark-rapids/>`_ product.
This tutorial will show you how to use **XGBoost4J-Spark-GPU**.
.. contents::
:backlinks: none
:local:
************************************************
Build an ML Application with XGBoost4J-Spark-GPU
************************************************
Add XGBoost to Your Project
===========================
Before we go into the tour of how to use XGBoost4J-Spark-GPU, you should first consult
:ref:`Installation from Maven repository <install_jvm_packages>` in order to add XGBoost4J-Spark-GPU as
a dependency for your project. We provide both stable releases and snapshots.
Data Preparation
================
In this section, we use the `Iris <https://archive.ics.uci.edu/ml/datasets/iris>`_ dataset as an example to
showcase how we use Apache Spark to transform a raw dataset and make it fit the data interface of XGBoost.
The Iris dataset is shipped in CSV format. Each instance contains 4 features, "sepal length", "sepal width",
"petal length" and "petal width". In addition, it contains the "class" column, which is essentially the
label with three possible values: "Iris Setosa", "Iris Versicolour" and "Iris Virginica".
Read Dataset with Spark's Built-In Reader
-----------------------------------------
.. code-block:: scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val spark = SparkSession.builder().getOrCreate()
val labelName = "class"
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField(labelName, StringType, true)))
val xgbInput = spark.read.option("header", "false")
.schema(schema)
.csv(dataPath)
In the first line, we create an instance of a `SparkSession <https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession>`_
which is the entry point of any Spark application working with DataFrames. The ``schema`` variable
defines the schema of the DataFrame wrapping Iris data. With this explicitly set schema, we
can define the column names as well as their types; otherwise the column names would be
the default ones derived by Spark, such as ``_col0``, etc. Finally, we can use Spark's
built-in CSV reader to load the Iris CSV file as a DataFrame named ``xgbInput``.
Apache Spark also contains many built-in readers for other formats such as ORC, Parquet, Avro, JSON.
Transform Raw Iris Dataset
--------------------------
To make the Iris dataset recognizable to XGBoost, we need to encode the String-typed
label, i.e. "class", to the Double-typed label.
One way to convert the String-typed label to Double is to use Spark's built-in feature transformer
`StringIndexer <https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.ml.feature.StringIndexer>`_.
But this feature is not accelerated in RAPIDS Accelerator, which means it will fall back
to CPU. Instead, we use an alternative way to achieve the same goal with the following code:
.. code-block:: scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val spec = Window.orderBy(labelName)
val Array(train, test) = xgbInput
.withColumn("tmpClassName", dense_rank().over(spec) - 1)
.drop(labelName)
.withColumnRenamed("tmpClassName", labelName)
.randomSplit(Array(0.7, 0.3), seed = 1)
train.show(5)
.. code-block:: none
+------------+-----------+------------+-----------+-----+
|sepal length|sepal width|petal length|petal width|class|
+------------+-----------+------------+-----------+-----+
| 4.3| 3.0| 1.1| 0.1| 0|
| 4.4| 2.9| 1.4| 0.2| 0|
| 4.4| 3.0| 1.3| 0.2| 0|
| 4.4| 3.2| 1.3| 0.2| 0|
| 4.6| 3.2| 1.4| 0.2| 0|
+------------+-----------+------------+-----------+-----+
With window operations, we have mapped the string column of labels to label indices.
Training
========
The GPU version of XGBoost-Spark supports both regression and classification
models. Although we use the Iris dataset in this tutorial to show how we use
``XGBoost/XGBoost4J-Spark-GPU`` to resolve a multi-classes classification problem, the
usage in Regression is very similar to classification.
To train a XGBoost model for classification, we need to claim a XGBoostClassifier first:
.. code-block:: scala
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map(
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"tree_method" -> "gpu_hist",
"num_workers" -> 1)
val featuresNames = schema.fieldNames.filter(name => name != labelName)
val xgbClassifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol(featuresNames)
.setLabelCol(labelName)
The available parameters for training a XGBoost model can be found in :doc:`here </parameter>`.
Similar to the XGBoost4J-Spark package, in addition to the default set of parameters,
XGBoost4J-Spark-GPU also supports the camel-case variant of these parameters to be
consistent with Spark's MLlib naming convention.
Specifically, each parameter in :doc:`this page </parameter>` has its equivalent form in
XGBoost4J-Spark-GPU with camel case. For example, to set ``max_depth`` for each tree, you can pass
parameter just like what we did in the above code snippet (as ``max_depth`` wrapped in a Map), or
you can do it through setters in XGBoostClassifer:
.. code-block:: scala
val xgbClassifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol(featuresNames)
.setLabelCol(labelName)
xgbClassifier.setMaxDepth(2)
.. note::
In contrast with XGBoost4j-Spark which accepts both a feature column with VectorUDT type and
an array of feature column names, XGBoost4j-Spark-GPU only accepts an array of feature
column names by ``setFeaturesCol(value: Array[String])``.
After setting XGBoostClassifier parameters and feature/label columns, we can build a
transformer, XGBoostClassificationModel by fitting XGBoostClassifier with the input
DataFrame. This ``fit`` operation is essentially the training process and the generated
model can then be used in other tasks like prediction.
.. code-block:: scala
val xgbClassificationModel = xgbClassifier.fit(train)
Prediction
==========
When we get a model, either a XGBoostClassificationModel or a XGBoostRegressionModel, it takes a DataFrame as an input,
reads the column containing feature vectors, predicts for each feature vector, and outputs a new DataFrame
with the following columns by default:
* XGBoostClassificationModel will output margins (``rawPredictionCol``), probabilities(``probabilityCol``) and the eventual prediction labels (``predictionCol``) for each possible label.
* XGBoostRegressionModel will output prediction a label(``predictionCol``).
.. code-block:: scala
val xgbClassificationModel = xgbClassifier.fit(train)
val results = xgbClassificationModel.transform(test)
results.show()
With the above code snippet, we get a DataFrame as result, which contains the margin, probability for each class,
and the prediction for each instance.
.. code-block:: none
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
|sepal length|sepal width| petal length| petal width|class| rawPrediction| probability|prediction|
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
| 4.5| 2.3| 1.3|0.30000000000000004| 0|[3.16666603088378...|[0.98853939771652...| 0.0|
| 4.6| 3.1| 1.5| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.8| 3.1| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.8| 3.4| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.8| 3.4|1.9000000000000001| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.9| 2.4| 3.3| 1.0| 1|[-2.1498908996582...|[0.00596602633595...| 1.0|
| 4.9| 2.5| 4.5| 1.7| 2|[-2.1498908996582...|[0.00596602633595...| 1.0|
| 5.0| 3.5| 1.3|0.30000000000000004| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.1| 2.5| 3.0| 1.1| 1|[3.16666603088378...|[0.98853939771652...| 0.0|
| 5.1| 3.3| 1.7| 0.5| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.1| 3.5| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.1| 3.8| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.2| 3.4| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.2| 3.5| 1.5| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.2| 4.1| 1.5| 0.1| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.4| 3.9| 1.7| 0.4| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.5| 2.4| 3.8| 1.1| 1|[-2.1498908996582...|[0.00596602633595...| 1.0|
| 5.5| 4.2| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.7| 2.5| 5.0| 2.0| 2|[-2.1498908996582...|[0.00280966912396...| 2.0|
| 5.7| 3.0| 4.2| 1.2| 1|[-2.1498908996582...|[0.00643939292058...| 1.0|
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
**********************
Submit the application
**********************
Heres an example to submit an end-to-end XGBoost-4j-Spark-GPU Spark application to an
Apache Spark Standalone cluster, assuming the application main class is Iris and the
application jar is iris-1.0.0.jar
.. code-block:: bash
cudf_version=22.02.0
rapids_version=22.02.0
xgboost_version=1.6.1
main_class=Iris
app_jar=iris-1.0.0.jar
spark-submit \
--master $master \
--packages ai.rapids:cudf:${cudf_version},com.nvidia:rapids-4-spark_2.12:${rapids_version},ml.dmlc:xgboost4j-gpu_2.12:${xgboost_version},ml.dmlc:xgboost4j-spark-gpu_2.12:${xgboost_version} \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.08 \
--conf spark.rapids.sql.csv.read.double.enabled=true \
--conf spark.rapids.sql.hasNans=false \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--class ${main_class} \
${app_jar}
* First, we need to specify the ``RAPIDS Accelerator, cudf, xgboost4j-gpu, xgboost4j-spark-gpu`` packages by ``--packages``
* Second, ``RAPIDS Accelerator`` is a Spark plugin, so we need to configure it by specifying ``spark.plugins=com.nvidia.spark.SQLPlugin``
For details about other ``RAPIDS Accelerator`` other configurations, please refer to the `configuration <https://nvidia.github.io/spark-rapids/docs/configs.html>`_.
For ``RAPIDS Accelerator Frequently Asked Questions``, please refer to the
`frequently-asked-questions <https://nvidia.github.io/spark-rapids/docs/FAQ.html#frequently-asked-questions>`_.

View File

@@ -16,12 +16,6 @@ This tutorial is to cover the end-to-end process to build a machine learning pip
* Building a Machine Learning Pipeline with XGBoost4J-Spark
* Running XGBoost4J-Spark in Production
.. note::
**SparkContext will be stopped by default when XGBoost training task fails**.
XGBoost4J-Spark 1.2.0+ exposes a parameter **kill_spark_context_on_worker_failure**. Set **kill_spark_context_on_worker_failure** to **false** so that the SparkContext will not be stopping on training failure. Instead of stopping the SparkContext, XGBoost4J-Spark will throw an exception instead. Users who want to re-use the SparkContext should wrap the training code in a try-catch block.
.. contents::
:backlinks: none
:local:
@@ -127,6 +121,11 @@ Now, we have a DataFrame containing only two columns, "features" which contains
"sepal length", "sepal width", "petal length" and "petal width" and "classIndex" which has Double-typed
labels. A DataFrame like this (containing vector-represented features and numeric labels) can be fed to XGBoost4J-Spark's training engine directly.
.. note::
There is no need to assemble feature columns from version 1.6.1+. Instead, users can specify an array of
feture column names by ``setFeaturesCol(value: Array[String])`` and XGBoost4j-Spark will do it.
Dealing with missing values
~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@@ -74,23 +74,20 @@ Optimal Partitioning
.. versionadded:: 1.6
Optimal partitioning is a technique for partitioning the categorical predictors for each
node split, the proof of optimality for numerical objectives like ``RMSE`` was first
introduced by `[1] <#references>`__. The algorithm is used in decision trees for handling
regression and binary classification tasks `[2] <#references>`__, later LightGBM `[3]
<#references>`__ brought it to the context of gradient boosting trees and now is also
adopted in XGBoost as an optional feature for handling categorical splits. More
specifically, the proof by Fisher `[1] <#references>`__ states that, when trying to
partition a set of discrete values into groups based on the distances between a measure of
these values, one only needs to look at sorted partitions instead of enumerating all
possible permutations. In the context of decision trees, the discrete values are
categories, and the measure is the output leaf value. Intuitively, we want to group the
categories that output similar leaf values. During split finding, we first sort the
gradient histogram to prepare the contiguous partitions then enumerate the splits
node split, the proof of optimality for numerical output was first introduced by `[1]
<#references>`__. The algorithm is used in decision trees `[2] <#references>`__, later
LightGBM `[3] <#references>`__ brought it to the context of gradient boosting trees and
now is also adopted in XGBoost as an optional feature for handling categorical
splits. More specifically, the proof by Fisher `[1] <#references>`__ states that, when
trying to partition a set of discrete values into groups based on the distances between a
measure of these values, one only needs to look at sorted partitions instead of
enumerating all possible permutations. In the context of decision trees, the discrete
values are categories, and the measure is the output leaf value. Intuitively, we want to
group the categories that output similar leaf values. During split finding, we first sort
the gradient histogram to prepare the contiguous partitions then enumerate the splits
according to these sorted values. One of the related parameters for XGBoost is
``max_cat_to_one_hot``, which controls whether one-hot encoding or partitioning should be
used for each feature, see :doc:`/parameter` for details. When objective is not
regression or binary classification, XGBoost will fallback to using onehot encoding
instead.
used for each feature, see :doc:`/parameter` for details.
**********************

View File

@@ -14,6 +14,7 @@ See `Awesome XGBoost <https://github.com/dmlc/xgboost/tree/master/demo>`_ for mo
Distributed XGBoost with AWS YARN <aws_yarn>
kubernetes
Distributed XGBoost with XGBoost4J-Spark <https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html>
Distributed XGBoost with XGBoost4J-Spark-GPU <https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_gpu_tutorial.html>
dask
ray
dart

View File

@@ -36,10 +36,6 @@ struct ObjInfo {
explicit ObjInfo(Task t) : task{t} {}
ObjInfo(Task t, bool khess) : task{t}, const_hess{khess} {}
XGBOOST_DEVICE bool UseOneHot() const {
return (task != ObjInfo::kRegression && task != ObjInfo::kBinary);
}
};
} // namespace xgboost
#endif // XGBOOST_TASK_H_

View File

@@ -6,6 +6,6 @@
#define XGBOOST_VER_MAJOR 1
#define XGBOOST_VER_MINOR 6
#define XGBOOST_VER_PATCH 0
#define XGBOOST_VER_PATCH 1
#endif // XGBOOST_VERSION_CONFIG_H_

View File

@@ -6,7 +6,7 @@
<groupId>ml.dmlc</groupId>
<artifactId>xgboost-jvm_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
<packaging>pom</packaging>
<name>XGBoost JVM Package</name>
<description>JVM Package for XGBoost</description>

View File

@@ -6,10 +6,10 @@
<parent>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost-jvm_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</parent>
<artifactId>xgboost4j-example_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
<packaging>jar</packaging>
<build>
<plugins>
@@ -26,7 +26,7 @@
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-spark_${scala.binary.version}</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -37,7 +37,7 @@
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-flink_${scala.binary.version}</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@@ -6,10 +6,10 @@
<parent>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost-jvm_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</parent>
<artifactId>xgboost4j-flink_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
<build>
<plugins>
<plugin>
@@ -26,7 +26,7 @@
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j_${scala.binary.version}</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@@ -6,10 +6,10 @@
<parent>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost-jvm_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</parent>
<artifactId>xgboost4j-gpu_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
<packaging>jar</packaging>
<dependencies>
@@ -20,11 +20,6 @@
<classifier>${cudf.classifier}</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 by Contributors
Copyright (c) 2021-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,15 +16,7 @@
package ml.dmlc.xgboost4j.gpu.java;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
/**
* Cudf utilities to build cuda array interface against {@link CudfColumn}
@@ -42,58 +34,64 @@ class CudfUtils {
// Helper class to build array interface string
private static class Builder {
private JsonNodeFactory nodeFactory = new JsonNodeFactory(false);
private ArrayNode rootArrayNode = nodeFactory.arrayNode();
private ArrayList<String> colArrayInterfaces = new ArrayList<String>();
private Builder add(CudfColumn... columns) {
if (columns == null || columns.length <= 0) {
throw new IllegalArgumentException("At least one ColumnData is required.");
}
for (CudfColumn cd : columns) {
rootArrayNode.add(buildColumnObject(cd));
colArrayInterfaces.add(buildColumnObject(cd));
}
return this;
}
private String build() {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
JsonGenerator jsonGen = new JsonFactory().createGenerator(bos);
new ObjectMapper().writeTree(jsonGen, rootArrayNode);
return bos.toString();
} catch (IOException ie) {
ie.printStackTrace();
throw new RuntimeException("Failed to build array interface. Error: " + ie);
StringBuilder builder = new StringBuilder();
builder.append("[");
for (int i = 0; i < colArrayInterfaces.size(); i++) {
builder.append(colArrayInterfaces.get(i));
if (i != colArrayInterfaces.size() - 1) {
builder.append(",");
}
}
builder.append("]");
return builder.toString();
}
private ObjectNode buildColumnObject(CudfColumn column) {
/** build the whole column information including data and valid info */
private String buildColumnObject(CudfColumn column) {
if (column.getDataPtr() == 0) {
throw new IllegalArgumentException("Empty column data is NOT accepted!");
}
if (column.getTypeStr() == null || column.getTypeStr().isEmpty()) {
throw new IllegalArgumentException("Empty type string is NOT accepted!");
}
ObjectNode colDataObj = buildMetaObject(column.getDataPtr(), column.getShape(),
column.getTypeStr());
StringBuilder builder = new StringBuilder();
String colData = buildMetaObject(column.getDataPtr(), column.getShape(),
column.getTypeStr());
builder.append("{");
builder.append(colData);
if (column.getValidPtr() != 0 && column.getNullCount() != 0) {
ObjectNode validObj = buildMetaObject(column.getValidPtr(), column.getShape(), "<t1");
colDataObj.set("mask", validObj);
String validString = buildMetaObject(column.getValidPtr(), column.getShape(), "<t1");
builder.append(",\"mask\":");
builder.append("{");
builder.append(validString);
builder.append("}");
}
return colDataObj;
builder.append("}");
return builder.toString();
}
private ObjectNode buildMetaObject(long ptr, long shape, final String typeStr) {
ObjectNode objNode = nodeFactory.objectNode();
ArrayNode shapeNode = objNode.putArray("shape");
shapeNode.add(shape);
ArrayNode dataNode = objNode.putArray("data");
dataNode.add(ptr)
.add(false);
objNode.put("typestr", typeStr)
.put("version", 1);
return objNode;
/** build the base information of a column */
private String buildMetaObject(long ptr, long shape, final String typeStr) {
StringBuilder builder = new StringBuilder();
builder.append("\"shape\":[" + shape + "],");
builder.append("\"data\":[" + ptr + "," + "false" + "],");
builder.append("\"typestr\":\"" + typeStr + "\",");
builder.append("\"version\":" + 1);
return builder.toString();
}
}

View File

@@ -69,7 +69,7 @@ public class BoosterTest {
.hasHeader().build();
int maxBin = 16;
int round = 100;
int round = 10;
//set params
Map<String, Object> paramMap = new HashMap<String, Object>() {
{

View File

@@ -6,7 +6,7 @@
<parent>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost-jvm_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</parent>
<artifactId>xgboost4j-spark-gpu_2.12</artifactId>
<build>
@@ -24,7 +24,7 @@
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-gpu_${scala.binary.version}</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>

View File

@@ -56,18 +56,20 @@ class GpuPreXGBoost extends PreXGBoostProvider {
}
/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
}
@@ -116,19 +118,21 @@ object GpuPreXGBoost extends PreXGBoostProvider {
}
/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
estimator match {
@@ -166,7 +170,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
xgbExecParams: XGBoostExecutionParams =>
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
xgbExecParams.cacheTrainingSet)
(buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
(true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
}
/**
@@ -403,14 +407,9 @@ object GpuPreXGBoost extends PreXGBoostProvider {
}
private def repartitionInputData(dataFrame: DataFrame, nWorkers: Int): DataFrame = {
// We can't check dataFrame.rdd.getNumPartitions == nWorkers here, since dataFrame.rdd is
// a lazy variable. If we call it here, we will not directly extract RDD[Table] again,
// instead, we will involve Columnar -> Row -> Columnar and decrease the performance
if (nWorkers == 1) {
dataFrame.coalesce(1)
} else {
dataFrame.repartition(nWorkers)
}
// we can't involve any coalesce operation here, since Barrier mode will check
// the RDD patterns which does not allow coalesce.
dataFrame.repartition(nWorkers)
}
private def repartitionForGroup(
@@ -448,7 +447,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
private def buildRDDWatches(
dataMap: Map[String, ColumnDataBatch],
xgbExeParams: XGBoostExecutionParams,
noEvalSet: Boolean): RDD[Watches] = {
noEvalSet: Boolean): RDD[() => Watches] = {
val sc = dataMap(TRAIN_NAME).rawDF.sparkSession.sparkContext
val maxBin = xgbExeParams.toMap.getOrElse("max_bin", 256).asInstanceOf[Int]
@@ -459,7 +458,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
GpuUtils.toColumnarRdd(dataMap(TRAIN_NAME).rawDF).mapPartitions({
iter =>
val iterColBatch = iter.map(table => new GpuColumnBatch(table, null))
Iterator(buildWatches(
Iterator(() => buildWatches(
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
colIndicesForTrain, iterColBatch, maxBin))
})
@@ -469,7 +468,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
val nameAndColIndices = dataMap.map(nc => (nc._1, nc._2.colIndices))
coPartitionForGpu(dataMap, sc, xgbExeParams.numWorkers).mapPartitions {
nameAndColumnBatchIter =>
Iterator(buildWatchesWithEval(
Iterator(() => buildWatchesWithEval(
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
nameAndColIndices, nameAndColumnBatchIter, maxBin))
}

View File

@@ -112,7 +112,7 @@ private[spark] object GpuUtils {
val msg = if (fitting) "train" else "transform"
// feature columns
require(featureNames.nonEmpty, s"Gpu $msg requires features columns. " +
"please refer to setFeaturesCols!")
"please refer to `setFeaturesCol(value: Array[String])`!")
featureNames.foreach(fn => checkNumericType(schema, fn))
if (fitting) {
require(labelName.nonEmpty, "label column is not set.")

View File

@@ -39,13 +39,8 @@ trait GpuTestSuite extends FunSuite with TmpFolderSuite {
def enableCsvConf(): SparkConf = {
new SparkConf()
.set(RapidsConf.ENABLE_READ_CSV_DATES.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_BYTES.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_SHORTS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_INTEGERS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_LONGS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true")
.set("spark.rapids.sql.csv.read.float.enabled", "true")
.set("spark.rapids.sql.csv.read.double.enabled", "true")
}
def withGpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
@@ -246,12 +241,13 @@ object SparkSessionHolder extends Logging {
Locale.setDefault(Locale.US)
val builder = SparkSession.builder()
.master("local[1]")
.master("local[2]")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.rapids.sql.enabled", "false")
.config("spark.rapids.sql.test.enabled", "false")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.config("spark.rapids.memory.gpu.pooling.enabled", "false") // Disable RMM for unit tests.
.config("spark.sql.files.maxPartitionBytes", "1000")
.appName("XGBoost4j-Spark-Gpu unit test")
builder.getOrCreate()

View File

@@ -126,7 +126,7 @@ class GpuXGBoostClassifierSuite extends GpuTestSuite {
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames.toArray)
.setInputCols(featureNames)
.setOutputCol("features")
val trainingDf = vectorAssembler.transform(rawInput).select("features", labelName)
@@ -147,12 +147,12 @@ class GpuXGBoostClassifierSuite extends GpuTestSuite {
.csv(dataPath).randomSplit(Array(0.7, 0.3), seed = 1)
// Since CPU model does not know the information about the features cols that GPU transform
// pipeline requires. End user needs to setFeaturesCols in the model manually
val thrown = intercept[IllegalArgumentException](cpuModel
// pipeline requires. End user needs to setFeaturesCol(features: Array[String]) in the model
// manually
val thrown = intercept[NoSuchElementException](cpuModel
.transform(testDf)
.collect())
assert(thrown.getMessage.contains("Gpu transform requires features columns. " +
"please refer to setFeaturesCols"))
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
val left = cpuModel
.setFeaturesCol(featureNames)
@@ -195,17 +195,16 @@ class GpuXGBoostClassifierSuite extends GpuTestSuite {
val featureColName = "feature_col"
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames.toArray)
.setInputCols(featureNames)
.setOutputCol(featureColName)
val testDf = vectorAssembler.transform(rawInput).select(featureColName, labelName)
// Since GPU model does not know the information about the features col name that CPU
// transform pipeline requires. End user needs to setFeaturesCol in the model manually
val thrown = intercept[IllegalArgumentException](
intercept[IllegalArgumentException](
gpuModel
.transform(testDf)
.collect())
assert(thrown.getMessage.contains("features does not exist"))
val left = gpuModel
.setFeaturesCol(featureColName)

View File

@@ -108,12 +108,15 @@ class GpuXGBoostGeneralSuite extends GpuTestSuite {
val trainingDf = trainingData.toDF(allColumnNames: _*)
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1, "tree_method" -> "gpu_hist")
val thrown = intercept[IllegalArgumentException] {
// GPU train requires featuresCols. If not specified,
// then NoSuchElementException will be thrown
val thrown = intercept[NoSuchElementException] {
new XGBoostClassifier(xgbParam)
.setLabelCol(labelName)
.fit(trainingDf)
}
assert(thrown.getMessage.contains("Gpu train requires features columns."))
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
val thrown1 = intercept[IllegalArgumentException] {
new XGBoostClassifier(xgbParam)

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 by Contributors
Copyright (c) 2021-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -86,7 +86,7 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val classifier = new XGBoostRegressor(xgbParam)
.setFeaturesCols(featureNames)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setTreeMethod("gpu_hist")
(classifier.fit(rawInput), testDf)
@@ -122,7 +122,7 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames.toArray)
.setInputCols(featureNames)
.setOutputCol("features")
val trainingDf = vectorAssembler.transform(rawInput).select("features", labelName)
@@ -143,20 +143,20 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
// Since CPU model does not know the information about the features cols that GPU transform
// pipeline requires. End user needs to setFeaturesCols in the model manually
val thrown = intercept[IllegalArgumentException](cpuModel
// pipeline requires. End user needs to setFeaturesCol(features: Array[String]) in the model
// manually
val thrown = intercept[NoSuchElementException](cpuModel
.transform(testDf)
.collect())
assert(thrown.getMessage.contains("Gpu transform requires features columns. " +
"please refer to setFeaturesCols"))
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
val left = cpuModel
.setFeaturesCols(featureNames)
.setFeaturesCol(featureNames)
.transform(testDf)
.collect()
val right = cpuModelFromFile
.setFeaturesCols(featureNames)
.setFeaturesCol(featureNames)
.transform(testDf)
.collect()
@@ -173,7 +173,7 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val classifier = new XGBoostRegressor(xgbParam)
.setFeaturesCols(featureNames)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setTreeMethod("gpu_hist")
classifier.fit(rawInput)
@@ -191,17 +191,16 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
val featureColName = "feature_col"
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames.toArray)
.setInputCols(featureNames)
.setOutputCol(featureColName)
val testDf = vectorAssembler.transform(rawInput).select(featureColName, labelName)
// Since GPU model does not know the information about the features col name that CPU
// transform pipeline requires. End user needs to setFeaturesCol in the model manually
val thrown = intercept[IllegalArgumentException](
intercept[IllegalArgumentException](
gpuModel
.transform(testDf)
.collect())
assert(thrown.getMessage.contains("features does not exist"))
val left = gpuModel
.setFeaturesCol(featureColName)

View File

@@ -6,7 +6,7 @@
<parent>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost-jvm_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</parent>
<artifactId>xgboost4j-spark_2.12</artifactId>
<build>
@@ -24,7 +24,7 @@
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j_${scala.binary.version}</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 by Contributors
Copyright (c) 2021-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -35,8 +35,10 @@ import org.apache.commons.logging.LogFactory
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Estimator, Model, PipelineStage}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.xgboost.XGBoostSchemaUtils
import org.apache.spark.sql.types.{ArrayType, FloatType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
@@ -94,25 +96,27 @@ object PreXGBoost extends PreXGBoostProvider {
}
/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input
* Option[RDD[_]\] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
params: Map[String, Any]): XGBoostExecutionParams =>
(Boolean, RDD[() => Watches], Option[RDD[_]]) = {
if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
}
val (packedParams, evalSet) = estimator match {
val (packedParams, evalSet, xgbInput) = estimator match {
case est: XGBoostEstimatorCommon =>
// get weight column, if weight is not defined, default to lit(1.0)
val weight = if (!est.isDefined(est.weightCol) || est.getWeightCol.isEmpty) {
@@ -136,20 +140,28 @@ object PreXGBoost extends PreXGBoostProvider {
}
(PackedParams(col(est.getLabelCol), col(est.getFeaturesCol), weight, baseMargin, group,
est.getNumWorkers, est.needDeterministicRepartitioning), est.getEvalSets(params))
val (xgbInput, featuresName) = est.vectorize(dataset)
val evalSets = est.getEvalSets(params).transform((_, df) => {
val (dfTransformed, _) = est.vectorize(df)
dfTransformed
})
(PackedParams(col(est.getLabelCol), col(featuresName), weight, baseMargin, group,
est.getNumWorkers, est.needDeterministicRepartitioning), evalSets, xgbInput)
case _ => throw new RuntimeException("Unsupporting " + estimator)
}
// transform the training Dataset[_] to RDD[XGBLabeledPoint]
val trainingSet: RDD[XGBLabeledPoint] = DataUtils.convertDataFrameToXGBLabeledPointRDDs(
packedParams, dataset.asInstanceOf[DataFrame]).head
packedParams, xgbInput.asInstanceOf[DataFrame]).head
// transform the eval Dataset[_] to RDD[XGBLabeledPoint]
val evalRDDMap = evalSet.map {
case (name, dataFrame) => (name,
DataUtils.convertDataFrameToXGBLabeledPointRDDs(packedParams, dataFrame).head)
DataUtils.convertDataFrameToXGBLabeledPointRDDs(packedParams,
dataFrame.asInstanceOf[DataFrame]).head)
}
val hasGroup = packedParams.group.map(_ != defaultGroupColumn).getOrElse(false)
@@ -160,12 +172,12 @@ object PreXGBoost extends PreXGBoostProvider {
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}
}
@@ -184,11 +196,11 @@ object PreXGBoost extends PreXGBoostProvider {
}
/** get the necessary parameters */
val (booster, inferBatchSize, featuresCol, useExternalMemory, missing, allowNonZeroForMissing,
predictFunc, schema) =
val (booster, inferBatchSize, xgbInput, featuresCol, useExternalMemory, missing,
allowNonZeroForMissing, predictFunc, schema) =
model match {
case m: XGBoostClassificationModel =>
val (xgbInput, featuresName) = m.vectorize(dataset)
// predict and turn to Row
val predictFunc =
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
@@ -199,7 +211,7 @@ object PreXGBoost extends PreXGBoostProvider {
}
// prepare the final Schema
var schema = StructType(dataset.schema.fields ++
var schema = StructType(xgbInput.schema.fields ++
Seq(StructField(name = XGBoostClassificationModel._rawPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false)) ++
Seq(StructField(name = XGBoostClassificationModel._probabilityCol, dataType =
@@ -214,11 +226,12 @@ object PreXGBoost extends PreXGBoostProvider {
ArrayType(FloatType, containsNull = false), nullable = false))
}
(m._booster, m.getInferBatchSize, m.getFeaturesCol, m.getUseExternalMemory, m.getMissing,
m.getAllowNonZeroForMissingValue, predictFunc, schema)
(m._booster, m.getInferBatchSize, xgbInput, featuresName, m.getUseExternalMemory,
m.getMissing, m.getAllowNonZeroForMissingValue, predictFunc, schema)
case m: XGBoostRegressionModel =>
// predict and turn to Row
val (xgbInput, featuresName) = m.vectorize(dataset)
val predictFunc =
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
val Array(rawPredictionItr, predLeafItr, predContribItr) =
@@ -227,7 +240,7 @@ object PreXGBoost extends PreXGBoostProvider {
}
// prepare the final Schema
var schema = StructType(dataset.schema.fields ++
var schema = StructType(xgbInput.schema.fields ++
Seq(StructField(name = XGBoostRegressionModel._originalPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false)))
@@ -240,14 +253,14 @@ object PreXGBoost extends PreXGBoostProvider {
ArrayType(FloatType, containsNull = false), nullable = false))
}
(m._booster, m.getInferBatchSize, m.getFeaturesCol, m.getUseExternalMemory, m.getMissing,
m.getAllowNonZeroForMissingValue, predictFunc, schema)
(m._booster, m.getInferBatchSize, xgbInput, featuresName, m.getUseExternalMemory,
m.getMissing, m.getAllowNonZeroForMissingValue, predictFunc, schema)
}
val bBooster = dataset.sparkSession.sparkContext.broadcast(booster)
val appName = dataset.sparkSession.sparkContext.appName
val bBooster = xgbInput.sparkSession.sparkContext.broadcast(booster)
val appName = xgbInput.sparkSession.sparkContext.appName
val resultRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
val resultRDD = xgbInput.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
new AbstractIterator[Row] {
private var batchCnt = 0
@@ -295,22 +308,23 @@ object PreXGBoost extends PreXGBoostProvider {
}
bBooster.unpersist(blocking = false)
dataset.sparkSession.createDataFrame(resultRDD, schema)
xgbInput.sparkSession.createDataFrame(resultRDD, schema)
}
/**
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[Watches]
* Converting the RDD[XGBLabeledPoint] to the function to build RDD[() => Watches]
*
* @param trainingSet the input training RDD[XGBLabeledPoint]
* @param evalRDDMap the eval set
* @param hasGroup if has group
* @return function to build (RDD[Watches], the cached RDD)
* @return function to build (RDD[() => Watches], the cached RDD)
*/
private[spark] def buildRDDLabeledPointToRDDWatches(
trainingSet: RDD[XGBLabeledPoint],
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
hasGroup: Boolean = false): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]) = {
hasGroup: Boolean = false):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
xgbExecParams: XGBoostExecutionParams =>
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
@@ -318,12 +332,12 @@ object PreXGBoost extends PreXGBoostProvider {
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}
}
@@ -363,34 +377,34 @@ object PreXGBoost extends PreXGBoostProvider {
}
/**
* Build RDD[Watches] for Ranking
* Build RDD[() => Watches] for Ranking
* @param trainingData the training data RDD
* @param xgbExecutionParams xgboost execution params
* @param evalSetsMap the eval RDD
* @return RDD[Watches]
* @return RDD[() => Watches]
*/
private def trainForRanking(
trainingData: RDD[Array[XGBLabeledPoint]],
xgbExecutionParam: XGBoostExecutionParams,
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
if (evalSetsMap.isEmpty) {
trainingData.mapPartitions(labeledPointGroups => {
val watches = Watches.buildWatchesWithGroup(xgbExecutionParam,
val buildWatches = () => Watches.buildWatchesWithGroup(xgbExecutionParam,
DataUtils.processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing,
xgbExecutionParam.allowNonZeroForMissing),
getCacheDirName(xgbExecutionParam.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}).cache()
} else {
coPartitionGroupSets(trainingData, evalSetsMap, xgbExecutionParam.numWorkers).mapPartitions(
labeledPointGroupSets => {
val watches = Watches.buildWatchesWithGroup(
val buildWatches = () => Watches.buildWatchesWithGroup(
labeledPointGroupSets.map {
case (name, iter) => (name, DataUtils.processMissingValuesWithGroup(iter,
xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing))
},
getCacheDirName(xgbExecutionParam.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}).cache()
}
}
@@ -451,35 +465,35 @@ object PreXGBoost extends PreXGBoostProvider {
}
/**
* Build RDD[Watches] for Non-Ranking
* Build RDD[() => Watches] for Non-Ranking
* @param trainingData the training data RDD
* @param xgbExecutionParams xgboost execution params
* @param evalSetsMap the eval RDD
* @return RDD[Watches]
* @return RDD[() => Watches]
*/
private def trainForNonRanking(
trainingData: RDD[XGBLabeledPoint],
xgbExecutionParams: XGBoostExecutionParams,
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[Watches] = {
evalSetsMap: Map[String, RDD[XGBLabeledPoint]]): RDD[() => Watches] = {
if (evalSetsMap.isEmpty) {
trainingData.mapPartitions { labeledPoints => {
val watches = Watches.buildWatches(xgbExecutionParams,
val buildWatches = () => Watches.buildWatches(xgbExecutionParams,
DataUtils.processMissingValues(labeledPoints, xgbExecutionParams.missing,
xgbExecutionParams.allowNonZeroForMissing),
getCacheDirName(xgbExecutionParams.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}}.cache()
} else {
coPartitionNoGroupSets(trainingData, evalSetsMap, xgbExecutionParams.numWorkers).
mapPartitions {
nameAndLabeledPointSets =>
val watches = Watches.buildWatches(
val buildWatches = () => Watches.buildWatches(
nameAndLabeledPointSets.map {
case (name, iter) => (name, DataUtils.processMissingValues(iter,
xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing))
},
getCacheDirName(xgbExecutionParams.useExternalMemory))
Iterator.single(watches)
Iterator.single(buildWatches)
}.cache()
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 by Contributors
Copyright (c) 2021-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -45,19 +45,21 @@ private[scala] trait PreXGBoostProvider {
def transformSchema(xgboostEstimator: XGBoostEstimatorCommon, schema: StructType): StructType
/**
* Convert the Dataset[_] to RDD[Watches] which will be fed to XGBoost
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[Watches]], Option[ RDD[_] ])
* RDD[Watches] will be used as the training input
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]])
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]])
/**
* Transform Dataset

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014,2021 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import java.io.File
import scala.collection.mutable
import scala.util.Random
import scala.collection.JavaConverters._
import ml.dmlc.xgboost4j.java.{IRabitTracker, Rabit, XGBoostError, RabitTracker => PyRabitTracker}
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
import ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams
@@ -30,8 +31,9 @@ import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.commons.io.FileUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkParallelismTracker, TaskContext}
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.sql.SparkSession
/**
@@ -46,8 +48,14 @@ import org.apache.spark.sql.SparkSession
* the Python Rabit tracker (in dmlc_core), whereas the latter is implemented
* in Scala without Python components, and with full support of timeouts.
* The Scala implementation is currently experimental, use at your own risk.
*
* @param hostIp The Rabit Tracker host IP address which is only used for python implementation.
* This is only needed if the host IP cannot be automatically guessed.
* @param pythonExec The python executed path for Rabit Tracker,
* which is only used for python implementation.
*/
case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String )
case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String,
hostIp: String = "", pythonExec: String = "")
object TrackerConf {
def apply(): TrackerConf = TrackerConf(0L, "python")
@@ -73,8 +81,7 @@ private[scala] case class XGBoostExecutionParams(
earlyStoppingParams: XGBoostExecutionEarlyStoppingParams,
cacheTrainingSet: Boolean,
treeMethod: Option[String],
isLocal: Boolean,
killSparkContextOnWorkerFailure: Boolean) {
isLocal: Boolean) {
private var rawParamMap: Map[String, Any] = _
@@ -218,9 +225,6 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
val cacheTrainingSet = overridedParams.getOrElse("cache_training_set", false)
.asInstanceOf[Boolean]
val killSparkContext = overridedParams.getOrElse("kill_spark_context_on_worker_failure", true)
.asInstanceOf[Boolean]
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
missing, allowNonZeroForMissing, trackerConf,
timeoutRequestWorkers,
@@ -229,8 +233,7 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
xgbExecEarlyStoppingParams,
cacheTrainingSet,
treeMethod,
isLocal,
killSparkContext)
isLocal)
xgbExecParam.setRawParamMap(overridedParams)
xgbExecParam
}
@@ -277,13 +280,8 @@ object XGBoost extends Serializable {
}
}
private def buildDistributedBooster(
watches: Watches,
xgbExecutionParam: XGBoostExecutionParams,
rabitEnv: java.util.Map[String, String],
obj: ObjectiveTrait,
eval: EvalTrait,
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {
private def buildWatchesAndCheck(buildWatchesFun: () => Watches): Watches = {
val watches = buildWatchesFun()
// to workaround the empty partitions in training dataset,
// this might not be the best efficient implementation, see
// (https://github.com/dmlc/xgboost/issues/1277)
@@ -292,14 +290,39 @@ object XGBoost extends Serializable {
s"detected an empty partition in the training data, partition ID:" +
s" ${TaskContext.getPartitionId()}")
}
watches
}
private def buildDistributedBooster(
buildDMatrixInRabit: Boolean,
buildWatches: () => Watches,
xgbExecutionParam: XGBoostExecutionParams,
rabitEnv: java.util.Map[String, String],
obj: ObjectiveTrait,
eval: EvalTrait,
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {
var watches: Watches = null
if (!buildDMatrixInRabit) {
// for CPU pipeline, we need to build DMatrix out of rabit context
watches = buildWatchesAndCheck(buildWatches)
}
val taskId = TaskContext.getPartitionId().toString
val attempt = TaskContext.get().attemptNumber.toString
rabitEnv.put("DMLC_TASK_ID", taskId)
rabitEnv.put("DMLC_NUM_ATTEMPT", attempt)
val numRounds = xgbExecutionParam.numRounds
val makeCheckpoint = xgbExecutionParam.checkpointParam.isDefined && taskId.toInt == 0
try {
Rabit.init(rabitEnv)
if (buildDMatrixInRabit) {
// for GPU pipeline, we need to move dmatrix building into rabit context
watches = buildWatchesAndCheck(buildWatches)
}
val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds
val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds))
val externalCheckpointParams = xgbExecutionParam.checkpointParam
@@ -325,24 +348,33 @@ object XGBoost extends Serializable {
watches.toMap, metrics, obj, eval,
earlyStoppingRound = numEarlyStoppingRounds, prevBooster)
}
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
if (TaskContext.get().partitionId() == 0) {
Iterator(booster -> watches.toMap.keys.zip(metrics).toMap)
} else {
Iterator.empty
}
} catch {
case xgbException: XGBoostError =>
logger.error(s"XGBooster worker $taskId has failed $attempt times due to ", xgbException)
throw xgbException
} finally {
Rabit.shutdown()
watches.delete()
if (watches != null) watches.delete()
}
}
private def startTracker(nWorkers: Int, trackerConf: TrackerConf): IRabitTracker = {
/** visiable for testing */
private[scala] def getTracker(nWorkers: Int, trackerConf: TrackerConf): IRabitTracker = {
val tracker: IRabitTracker = trackerConf.trackerImpl match {
case "scala" => new RabitTracker(nWorkers)
case "python" => new PyRabitTracker(nWorkers)
case "python" => new PyRabitTracker(nWorkers, trackerConf.hostIp, trackerConf.pythonExec)
case _ => new PyRabitTracker(nWorkers)
}
tracker
}
private def startTracker(nWorkers: Int, trackerConf: TrackerConf): IRabitTracker = {
val tracker = getTracker(nWorkers, trackerConf)
require(tracker.start(trackerConf.workerConnectionTimeout), "FAULT: Failed to start tracker")
tracker
}
@@ -353,7 +385,7 @@ object XGBoost extends Serializable {
@throws(classOf[XGBoostError])
private[spark] def trainDistributed(
sc: SparkContext,
buildTrainingData: XGBoostExecutionParams => (RDD[Watches], Option[RDD[_]]),
buildTrainingData: XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]),
params: Map[String, Any]):
(Booster, Map[String, Array[Float]]) = {
@@ -372,50 +404,36 @@ object XGBoost extends Serializable {
}.orNull
// Get the training data RDD and the cachedRDD
val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
val (buildDMatrixInRabit, trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
try {
// Train for every ${savingRound} rounds and save the partially completed booster
val tracker = startTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
val (booster, metrics) = try {
val parallelismTracker = new SparkParallelismTracker(sc,
xgbExecParams.timeoutRequestWorkers,
xgbExecParams.numWorkers,
xgbExecParams.killSparkContextOnWorkerFailure)
tracker.getWorkerEnvs().putAll(xgbRabitParams)
val rabitEnv = tracker.getWorkerEnvs
val boostersAndMetrics = trainingRDD.mapPartitions { iter => {
var optionWatches: Option[Watches] = None
val boostersAndMetrics = trainingRDD.barrier().mapPartitions { iter => {
var optionWatches: Option[() => Watches] = None
// take the first Watches to train
if (iter.hasNext) {
optionWatches = Some(iter.next())
}
optionWatches.map { watches => buildDistributedBooster(watches, xgbExecParams, rabitEnv,
xgbExecParams.obj, xgbExecParams.eval, prevBooster)}
optionWatches.map { buildWatches => buildDistributedBooster(buildDMatrixInRabit,
buildWatches, xgbExecParams, rabitEnv, xgbExecParams.obj,
xgbExecParams.eval, prevBooster)}
.getOrElse(throw new RuntimeException("No Watches to train"))
}}.cache()
val sparkJobThread = new Thread() {
override def run() {
// force the job
boostersAndMetrics.foreachPartition(() => _)
}
}
sparkJobThread.setUncaughtExceptionHandler(tracker)
val trackerReturnVal = parallelismTracker.execute {
sparkJobThread.start()
tracker.waitFor(0L)
}
}}
val (booster, metrics) = boostersAndMetrics.collect()(0)
val trackerReturnVal = tracker.waitFor(0L)
logger.info(s"Rabit returns with exit code $trackerReturnVal")
val (booster, metrics) = postTrackerReturnProcessing(trackerReturnVal,
boostersAndMetrics, sparkJobThread)
if (trackerReturnVal != 0) {
throw new XGBoostError("XGBoostModel training failed.")
}
(booster, metrics)
} finally {
tracker.stop()
@@ -435,42 +453,12 @@ object XGBoost extends Serializable {
case t: Throwable =>
// if the job was aborted due to an exception
logger.error("the job was aborted due to ", t)
if (xgbExecParams.killSparkContextOnWorkerFailure) {
sc.stop()
}
throw t
} finally {
optionalCachedRDD.foreach(_.unpersist())
}
}
private def postTrackerReturnProcessing(
trackerReturnVal: Int,
distributedBoostersAndMetrics: RDD[(Booster, Map[String, Array[Float]])],
sparkJobThread: Thread): (Booster, Map[String, Array[Float]]) = {
if (trackerReturnVal == 0) {
// Copies of the final booster and the corresponding metrics
// reside in each partition of the `distributedBoostersAndMetrics`.
// Any of them can be used to create the model.
// it's safe to block here forever, as the tracker has returned successfully, and the Spark
// job should have finished, there is no reason for the thread cannot return
sparkJobThread.join()
val (booster, metrics) = distributedBoostersAndMetrics.first()
distributedBoostersAndMetrics.unpersist(false)
(booster, metrics)
} else {
try {
if (sparkJobThread.isAlive) {
sparkJobThread.interrupt()
}
} catch {
case _: InterruptedException =>
logger.info("spark job thread is interrupted")
}
throw new XGBoostError("XGBoostModel training failed")
}
}
}
class Watches private[scala] (

View File

@@ -144,13 +144,6 @@ class XGBoostClassifier (
def setSinglePrecisionHistogram(value: Boolean): this.type =
set(singlePrecisionHistogram, value)
/**
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
* all feature columns must be numeric types.
*/
def setFeaturesCol(value: Array[String]): this.type =
set(featuresCols, value)
// called at the start of fit/train when 'eval_metric' is not defined
private def setupDefaultEvalMetric(): String = {
require(isDefined(objective), "Users must set \'objective\' via xgboostParams.")
@@ -165,7 +158,12 @@ class XGBoostClassifier (
// Callback from PreXGBoost
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
super.transformSchema(schema)
if (isFeaturesColSet(schema)) {
// User has vectorized the features into VectorUDT.
super.transformSchema(schema)
} else {
transformSchemaWithFeaturesCols(true, schema)
}
}
override def transformSchema(schema: StructType): StructType = {
@@ -260,13 +258,6 @@ class XGBoostClassificationModel private[ml](
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
/**
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
* all feature columns must be numeric types.
*/
def setFeaturesCol(value: Array[String]): this.type =
set(featuresCols, value)
/**
* Single instance prediction.
* Note: The performance is not ideal, use it carefully!
@@ -359,7 +350,12 @@ class XGBoostClassificationModel private[ml](
}
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
super.transformSchema(schema)
if (isFeaturesColSet(schema)) {
// User has vectorized the features into VectorUDT.
super.transformSchema(schema)
} else {
transformSchemaWithFeaturesCols(false, schema)
}
}
override def transformSchema(schema: StructType): StructType = {
@@ -385,8 +381,6 @@ class XGBoostClassificationModel private[ml](
Vectors.dense(rawPredictions)
}
if ($(rawPredictionCol).nonEmpty) {
outputData = outputData
.withColumn(getRawPredictionCol, rawPredictionUDF(col(_rawPredictionCol)))

View File

@@ -146,13 +146,6 @@ class XGBoostRegressor (
def setSinglePrecisionHistogram(value: Boolean): this.type =
set(singlePrecisionHistogram, value)
/**
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
* all feature columns must be numeric types.
*/
def setFeaturesCols(value: Array[String]): this.type =
set(featuresCols, value)
// called at the start of fit/train when 'eval_metric' is not defined
private def setupDefaultEvalMetric(): String = {
require(isDefined(objective), "Users must set \'objective\' via xgboostParams.")
@@ -164,7 +157,12 @@ class XGBoostRegressor (
}
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
super.transformSchema(schema)
if (isFeaturesColSet(schema)) {
// User has vectorized the features into VectorUDT.
super.transformSchema(schema)
} else {
transformSchemaWithFeaturesCols(false, schema)
}
}
override def transformSchema(schema: StructType): StructType = {
@@ -253,13 +251,6 @@ class XGBoostRegressionModel private[ml] (
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
/**
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
* all feature columns must be numeric types.
*/
def setFeaturesCols(value: Array[String]): this.type =
set(featuresCols, value)
/**
* Single instance prediction.
* Note: The performance is not ideal, use it carefully!
@@ -331,7 +322,12 @@ class XGBoostRegressionModel private[ml] (
}
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
super.transformSchema(schema)
if (isFeaturesColSet(schema)) {
// User has vectorized the features into VectorUDT.
super.transformSchema(schema)
} else {
transformSchemaWithFeaturesCols(false, schema)
}
}
override def transformSchema(schema: StructType): StructType = {

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,18 +16,22 @@
package ml.dmlc.xgboost4j.scala.spark.params
import ml.dmlc.xgboost4j.scala.spark
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.Path
import org.json4s.{DefaultFormats, JValue}
import org.json4s.JsonAST.JObject
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.apache.spark.SparkContext
import org.apache.spark.ml.param.{Param, Params}
import org.apache.spark.ml.param.Params
import org.apache.spark.ml.util.MLReader
// This originates from apache-spark DefaultPramsReader copy paste
private[spark] object DefaultXGBoostParamsReader {
private val logger = LogFactory.getLog("XGBoostSpark")
private val paramNameCompatibilityMap: Map[String, String] = Map("silent" -> "verbosity")
private val paramValueCompatibilityMap: Map[String, Map[Any, Any]] =
@@ -126,9 +130,16 @@ private[spark] object DefaultXGBoostParamsReader {
metadata.params match {
case JObject(pairs) =>
pairs.foreach { case (paramName, jsonValue) =>
val param = instance.getParam(handleBrokenlyChangedName(paramName))
val value = param.jsonDecode(compact(render(jsonValue)))
instance.set(param, handleBrokenlyChangedValue(paramName, value))
val finalName = handleBrokenlyChangedName(paramName)
// For the deleted parameters, we'd better to remove it instead of throwing an exception.
// So we need to check if the parameter exists instead of blindly setting it.
if (instance.hasParam(finalName)) {
val param = instance.getParam(finalName)
val value = param.jsonDecode(compact(render(jsonValue)))
instance.set(param, handleBrokenlyChangedValue(paramName, value))
} else {
logger.warn(s"$finalName is no longer used in ${spark.VERSION}")
}
}
case _ =>
throw new IllegalArgumentException(

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014,2021 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -247,6 +247,27 @@ trait HasNumClass extends Params {
final def getNumClass: Int = $(numClass)
}
/**
* Trait for shared param featuresCols.
*/
trait HasFeaturesCols extends Params {
/**
* Param for the names of feature columns.
* @group param
*/
final val featuresCols: StringArrayParam = new StringArrayParam(this, "featuresCols",
"an array of feature column names.")
/** @group getParam */
final def getFeaturesCols: Array[String] = $(featuresCols)
/** Check if featuresCols is valid */
def isFeaturesColsValid: Boolean = {
isDefined(featuresCols) && $(featuresCols) != Array.empty
}
}
private[spark] trait ParamMapFuncs extends Params {
def XGBoost2MLlibParams(xgboostParams: Map[String, Any]): Unit = {

View File

@@ -1,34 +0,0 @@
/*
Copyright (c) 2021-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.spark.params
import org.apache.spark.ml.param.{Params, StringArrayParam}
trait GpuParams extends Params {
/**
* Param for the names of feature columns for GPU pipeline.
* @group param
*/
final val featuresCols: StringArrayParam = new StringArrayParam(this, "featuresCols",
"an array of feature column names for GPU pipeline.")
setDefault(featuresCols, Array.empty[String])
/** @group getParam */
final def getFeaturesCols: Array[String] = $(featuresCols)
}

View File

@@ -105,14 +105,8 @@ private[spark] trait LearningTaskParams extends Params {
final def getMaximizeEvaluationMetrics: Boolean = $(maximizeEvaluationMetrics)
/**
* whether killing SparkContext when training task fails
*/
final val killSparkContextOnWorkerFailure = new BooleanParam(this,
"killSparkContextOnWorkerFailure", "whether killing SparkContext when training task fails")
setDefault(objective -> "reg:squarederror", baseScore -> 0.5, trainTestRatio -> 1.0,
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false, killSparkContextOnWorkerFailure -> true)
numEarlyStoppingRounds -> 0, cacheTrainingSet -> false)
}
private[spark] object LearningTaskParams {

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014,2021 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,16 +16,101 @@
package ml.dmlc.xgboost4j.scala.spark.params
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasWeightCol}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.xgboost.XGBoostSchemaUtils
import org.apache.spark.ml.param.{Param, ParamValidators}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, HasLabelCol, HasWeightCol}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types.StructType
private[scala] sealed trait XGBoostEstimatorCommon extends GeneralParams with LearningTaskParams
with BoosterParams with RabitParams with ParamMapFuncs with NonParamVariables with HasWeightCol
with HasBaseMarginCol with HasLeafPredictionCol with HasContribPredictionCol with HasFeaturesCol
with HasLabelCol with GpuParams {
with HasLabelCol with HasFeaturesCols with HasHandleInvalid {
def needDeterministicRepartitioning: Boolean = {
getCheckpointPath != null && getCheckpointPath.nonEmpty && getCheckpointInterval > 0
}
/**
* Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with
* invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the
* output). Column lengths are taken from the size of ML Attribute Group, which can be set using
* `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred
* from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'.
* Default: "error"
* @group param
*/
override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
"""Param for how to handle invalid data (NULL and NaN values). Options are 'skip' (filter out
|rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN
|in the output). Column lengths are taken from the size of ML Attribute Group, which can be
|set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also
|be inferred from first rows of the data since it is safe to do so but only in case of 'error'
|or 'skip'.""".stripMargin.replaceAll("\n", " "),
ParamValidators.inArray(Array("skip", "error", "keep")))
setDefault(handleInvalid, "error")
/**
* Specify an array of feature column names which must be numeric types.
*/
def setFeaturesCol(value: Array[String]): this.type = set(featuresCols, value)
/** Set the handleInvalid for VectorAssembler */
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
/**
* Check if schema has a field named with the value of "featuresCol" param and it's data type
* must be VectorUDT
*/
def isFeaturesColSet(schema: StructType): Boolean = {
schema.fieldNames.contains(getFeaturesCol) &&
XGBoostSchemaUtils.isVectorUDFType(schema(getFeaturesCol).dataType)
}
/** check the features columns type */
def transformSchemaWithFeaturesCols(fit: Boolean, schema: StructType): StructType = {
if (isFeaturesColsValid) {
if (fit) {
XGBoostSchemaUtils.checkNumericType(schema, $(labelCol))
}
$(featuresCols).foreach(feature =>
XGBoostSchemaUtils.checkFeatureColumnType(schema(feature).dataType))
schema
} else {
throw new IllegalArgumentException("featuresCol or featuresCols must be specified")
}
}
/**
* Vectorize the features columns if necessary.
*
* @param input the input dataset
* @return (output dataset and the feature column name)
*/
def vectorize(input: Dataset[_]): (Dataset[_], String) = {
val schema = input.schema
if (isFeaturesColSet(schema)) {
// Dataset already has vectorized.
(input, getFeaturesCol)
} else if (isFeaturesColsValid) {
val featuresName = if (!schema.fieldNames.contains(getFeaturesCol)) {
getFeaturesCol
} else {
"features_" + uid
}
val vectorAssembler = new VectorAssembler()
.setHandleInvalid($(handleInvalid))
.setInputCols(getFeaturesCols)
.setOutputCol(featuresName)
(vectorAssembler.transform(input).select(featuresName, getLabelCol), featuresName)
} else {
// never reach here, since transformSchema will take care of the case
// that featuresCols is invalid
(input, getFeaturesCol)
}
}
}
private[scala] trait XGBoostClassifierParams extends XGBoostEstimatorCommon with HasNumClass

View File

@@ -1,175 +0,0 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.spark
import org.apache.commons.logging.LogFactory
import org.apache.spark.scheduler._
import scala.collection.mutable.{HashMap, HashSet}
/**
* A tracker that ensures enough number of executor cores are alive.
* Throws an exception when the number of alive cores is less than nWorkers.
*
* @param sc The SparkContext object
* @param timeout The maximum time to wait for enough number of workers.
* @param numWorkers nWorkers used in an XGBoost Job
* @param killSparkContextOnWorkerFailure kill SparkContext or not when task fails
*/
class SparkParallelismTracker(
val sc: SparkContext,
timeout: Long,
numWorkers: Int,
killSparkContextOnWorkerFailure: Boolean = true) {
private[this] val requestedCores = numWorkers * sc.conf.getInt("spark.task.cpus", 1)
private[this] val logger = LogFactory.getLog("XGBoostSpark")
private[this] def numAliveCores: Int = {
sc.statusStore.executorList(true).map(_.totalCores).sum
}
private[this] def waitForCondition(
condition: => Boolean,
timeout: Long,
checkInterval: Long = 100L) = {
val waitImpl = new ((Long, Boolean) => Boolean) {
override def apply(waitedTime: Long, status: Boolean): Boolean = status match {
case s if s => true
case _ => waitedTime match {
case t if t < timeout =>
Thread.sleep(checkInterval)
apply(t + checkInterval, status = condition)
case _ => false
}
}
}
waitImpl(0L, condition)
}
private[this] def safeExecute[T](body: => T): T = {
val listener = new TaskFailedListener(killSparkContextOnWorkerFailure)
sc.addSparkListener(listener)
try {
body
} finally {
sc.removeSparkListener(listener)
}
}
/**
* Execute a blocking function call with two checks on enough nWorkers:
* - Before the function starts, wait until there are enough executor cores.
* - During the execution, throws an exception if there is any executor lost.
*
* @param body A blocking function call
* @tparam T Return type
* @return The return of body
*/
def execute[T](body: => T): T = {
if (timeout <= 0) {
logger.info("starting training without setting timeout for waiting for resources")
safeExecute(body)
} else {
logger.info(s"starting training with timeout set as $timeout ms for waiting for resources")
if (!waitForCondition(numAliveCores >= requestedCores, timeout)) {
throw new IllegalStateException(s"Unable to get $requestedCores cores for XGBoost training")
}
safeExecute(body)
}
}
}
class TaskFailedListener(killSparkContext: Boolean = true) extends SparkListener {
private[this] val logger = LogFactory.getLog("XGBoostTaskFailedListener")
// {jobId, [stageId0, stageId1, ...] }
// keep track of the mapping of job id and stage ids
// when a task fails, find the job id and stage id the task belongs to, finally
// cancel the jobs
private val jobIdToStageIds: HashMap[Int, HashSet[Int]] = HashMap.empty
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (!killSparkContext) {
jobStart.stageIds.foreach(stageId => {
jobIdToStageIds.getOrElseUpdate(jobStart.jobId, new HashSet[Int]()) += stageId
})
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
if (!killSparkContext) {
jobIdToStageIds.remove(jobEnd.jobId)
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEnd.reason match {
case taskEndReason: TaskFailedReason =>
logger.error(s"Training Task Failed during XGBoost Training: " +
s"$taskEndReason")
if (killSparkContext) {
logger.error("killing SparkContext")
TaskFailedListener.startedSparkContextKiller()
} else {
val stageId = taskEnd.stageId
// find job ids according to stage id and then cancel the job
jobIdToStageIds.foreach {
case (jobId, stageIds) =>
if (stageIds.contains(stageId)) {
logger.error("Cancelling jobId:" + jobId)
jobIdToStageIds.remove(jobId)
SparkContext.getOrCreate().cancelJob(jobId)
}
}
}
case _ =>
}
}
}
object TaskFailedListener {
var killerStarted: Boolean = false
var sparkContextKiller: Thread = _
val sparkContextShutdownLock = new AnyRef
private def startedSparkContextKiller(): Unit = this.synchronized {
if (!killerStarted) {
killerStarted = true
// Spark does not allow ListenerThread to shutdown SparkContext so that we have to do it
// in a separate thread
sparkContextKiller = new Thread() {
override def run(): Unit = {
LiveListenerBus.withinListenerThread.withValue(false) {
sparkContextShutdownLock.synchronized {
SparkContext.getActive.foreach(_.stop())
killerStarted = false
sparkContextShutdownLock.notify()
}
}
}
}
sparkContextKiller.setDaemon(true)
sparkContextKiller.start()
}
}
}

View File

@@ -0,0 +1,51 @@
/*
Copyright (c) 2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.spark.ml.linalg.xgboost
import org.apache.spark.sql.types.{BooleanType, DataType, NumericType, StructType}
import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.ml.util.SchemaUtils
object XGBoostSchemaUtils {
/** check if the dataType is VectorUDT */
def isVectorUDFType(dataType: DataType): Boolean = {
dataType match {
case _: VectorUDT => true
case _ => false
}
}
/** The feature columns will be vectorized by VectorAssembler first, which only
* supports Numeric, Boolean and VectorUDT types */
def checkFeatureColumnType(dataType: DataType): Unit = {
dataType match {
case _: NumericType | BooleanType =>
case _: VectorUDT =>
case d => throw new UnsupportedOperationException(s"featuresCols only supports Numeric, " +
s"boolean and VectorUDT types, found: ${d}")
}
}
def checkNumericType(
schema: StructType,
colName: String,
msg: String = ""): Unit = {
SchemaUtils.checkNumericType(schema, colName, msg)
}
}

View File

@@ -1 +1 @@
log4j.logger.org.apache.spark=ERROR
log4j.logger.org.apache.spark=ERROR

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package ml.dmlc.xgboost4j.scala.spark
import java.io.File
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, ExternalCheckpointManager, XGBoost => SXGBoost}
import org.scalatest.{FunSuite, Ignore}
import org.scalatest.FunSuite
import org.apache.hadoop.fs.{FileSystem, Path}
class ExternalCheckpointManagerSuite extends FunSuite with TmpFolderPerSuite with PerTest {

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,10 +16,8 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.java.XGBoostError
import org.apache.spark.Partitioner
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SparkSession
import org.scalatest.FunSuite
import org.apache.spark.sql.functions._
@@ -53,7 +51,7 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest {
"objective" -> "binary:logistic",
"num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true, "missing" -> 0)
import DataUtils._
val sparkSession = SparkSession.builder().getOrCreate()
val sparkSession = ss
import sparkSession.implicits._
val repartitioned = sc.parallelize(Synthetic.trainWithDiffFeatureSize, 2)
.map(lp => (lp.label, lp)).partitionBy(

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,14 +16,14 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.java.XGBoostError
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.DataFrame
import org.scalatest.FunSuite
import scala.util.Random
import org.apache.spark.SparkException
class MissingValueHandlingSuite extends FunSuite with PerTest {
test("dense vectors containing missing value") {
def buildDenseDataFrame(): DataFrame = {
@@ -113,7 +113,7 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
val inputDF = vectorAssembler.transform(testDF).select("features", "label")
val paramMap = List("eta" -> "1", "max_depth" -> "2",
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
intercept[XGBoostError] {
intercept[SparkException] {
new XGBoostClassifier(paramMap).fit(inputDF)
}
}
@@ -140,7 +140,7 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
inputDF.show()
val paramMap = List("eta" -> "1", "max_depth" -> "2",
"objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap
intercept[XGBoostError] {
intercept[SparkException] {
new XGBoostClassifier(paramMap).fit(inputDF)
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,9 +16,9 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.java.XGBoostError
import org.scalatest.{BeforeAndAfterAll, FunSuite, Ignore}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.SparkException
import org.apache.spark.ml.param.ParamMap
class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
@@ -40,28 +40,16 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
assert(xgbCopy2.MLlib2XGBoostParams("eval_metric").toString === "logloss")
}
private def waitForSparkContextShutdown(): Unit = {
var totalWaitedTime = 0L
while (!ss.sparkContext.isStopped && totalWaitedTime <= 120000) {
Thread.sleep(10000)
totalWaitedTime += 10000
}
assert(ss.sparkContext.isStopped === true)
}
test("fail training elegantly with unsupported objective function") {
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "wrong_objective_function", "num_class" -> "6", "num_round" -> 5,
"num_workers" -> numWorkers)
val trainingDF = buildDataFrame(MultiClassification.train)
val xgb = new XGBoostClassifier(paramMap)
try {
val model = xgb.fit(trainingDF)
} catch {
case e: Throwable => // swallow anything
} finally {
waitForSparkContextShutdown()
intercept[SparkException] {
xgb.fit(trainingDF)
}
}
test("fail training elegantly with unsupported eval metrics") {
@@ -70,12 +58,8 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
"num_workers" -> numWorkers, "eval_metric" -> "wrong_eval_metrics")
val trainingDF = buildDataFrame(MultiClassification.train)
val xgb = new XGBoostClassifier(paramMap)
try {
val model = xgb.fit(trainingDF)
} catch {
case e: Throwable => // swallow anything
} finally {
waitForSparkContextShutdown()
intercept[SparkException] {
xgb.fit(trainingDF)
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package ml.dmlc.xgboost4j.scala.spark
import java.io.File
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.spark.{SparkConf, SparkContext, TaskFailedListener}
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.scalatest.{BeforeAndAfterEach, FunSuite}
@@ -40,32 +40,16 @@ trait PerTest extends BeforeAndAfterEach { self: FunSuite =>
.appName("XGBoostSuite")
.config("spark.ui.enabled", false)
.config("spark.driver.memory", "512m")
.config("spark.barrier.sync.timeout", 10)
.config("spark.task.cpus", 1)
override def beforeEach(): Unit = getOrCreateSession
override def afterEach() {
TaskFailedListener.sparkContextShutdownLock.synchronized {
if (currentSession != null) {
// this synchronization is mostly for the tests involving SparkContext shutdown
// for unit test involving the sparkContext shutdown there are two different events sequence
// 1. SparkContext killer is executed before afterEach, in this case, before SparkContext
// is fully stopped, afterEach() will block at the following code block
// 2. SparkContext killer is executed afterEach, in this case, currentSession.stop() in will
// block to wait for all msgs in ListenerBus get processed. Because currentSession.stop()
// has been called, SparkContext killer will not take effect
while (TaskFailedListener.killerStarted) {
TaskFailedListener.sparkContextShutdownLock.wait()
}
currentSession.stop()
cleanExternalCache(currentSession.sparkContext.appName)
currentSession = null
}
if (TaskFailedListener.sparkContextKiller != null) {
TaskFailedListener.sparkContextKiller.interrupt()
TaskFailedListener.sparkContextKiller = null
}
TaskFailedListener.killerStarted = false
if (currentSession != null) {
currentSession.stop()
cleanExternalCache(currentSession.sparkContext.appName)
currentSession = null
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014,2021 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -24,11 +24,61 @@ import ml.dmlc.xgboost4j.java.{Rabit, RabitTracker => PyRabitTracker}
import ml.dmlc.xgboost4j.scala.rabit.{RabitTracker => ScalaRabitTracker}
import ml.dmlc.xgboost4j.java.IRabitTracker.TrackerStatus
import ml.dmlc.xgboost4j.scala.DMatrix
import org.scalatest.{FunSuite, Ignore}
import org.scalatest.{FunSuite}
class RabitRobustnessSuite extends FunSuite with PerTest {
private def getXGBoostExecutionParams(paramMap: Map[String, Any]): XGBoostExecutionParams = {
val classifier = new XGBoostClassifier(paramMap)
val xgbParamsFactory = new XGBoostExecutionParamsFactory(classifier.MLlib2XGBoostParams, sc)
xgbParamsFactory.buildXGBRuntimeParams
}
test("Customize host ip and python exec for Rabit tracker") {
val hostIp = "192.168.22.111"
val pythonExec = "/usr/bin/python3"
val paramMap = Map(
"num_workers" -> numWorkers,
"tracker_conf" -> TrackerConf(0L, "python", hostIp))
val xgbExecParams = getXGBoostExecutionParams(paramMap)
val tracker = XGBoost.getTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
tracker match {
case pyTracker: PyRabitTracker =>
val cmd = pyTracker.getRabitTrackerCommand
assert(cmd.contains(hostIp))
assert(cmd.startsWith("python"))
case _ => assert(false, "expected python tracker implementation")
}
val paramMap1 = Map(
"num_workers" -> numWorkers,
"tracker_conf" -> TrackerConf(0L, "python", "", pythonExec))
val xgbExecParams1 = getXGBoostExecutionParams(paramMap1)
val tracker1 = XGBoost.getTracker(xgbExecParams1.numWorkers, xgbExecParams1.trackerConf)
tracker1 match {
case pyTracker: PyRabitTracker =>
val cmd = pyTracker.getRabitTrackerCommand
assert(cmd.startsWith(pythonExec))
assert(!cmd.contains(hostIp))
case _ => assert(false, "expected python tracker implementation")
}
val paramMap2 = Map(
"num_workers" -> numWorkers,
"tracker_conf" -> TrackerConf(0L, "python", hostIp, pythonExec))
val xgbExecParams2 = getXGBoostExecutionParams(paramMap2)
val tracker2 = XGBoost.getTracker(xgbExecParams2.numWorkers, xgbExecParams2.trackerConf)
tracker2 match {
case pyTracker: PyRabitTracker =>
val cmd = pyTracker.getRabitTrackerCommand
assert(cmd.startsWith(pythonExec))
assert(cmd.contains(s" --host-ip=${hostIp}"))
case _ => assert(false, "expected python tracker implementation")
}
}
test("training with Scala-implemented Rabit tracker") {
val eval = new EvalError()
val training = buildDataFrame(Classification.train)

View File

@@ -23,6 +23,7 @@ import org.apache.spark.sql._
import org.scalatest.FunSuite
import org.apache.spark.Partitioner
import org.apache.spark.ml.feature.VectorAssembler
class XGBoostClassifierSuite extends FunSuite with PerTest {
@@ -316,4 +317,78 @@ class XGBoostClassifierSuite extends FunSuite with PerTest {
xgb.fit(repartitioned)
}
test("featuresCols with features column can work") {
val spark = ss
import spark.implicits._
val xgbInput = Seq(
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
.toDF("f1", "f2", "f3", "features", "label")
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> 1)
val featuresName = Array("f1", "f2", "f3", "features")
val xgbClassifier = new XGBoostClassifier(paramMap)
.setFeaturesCol(featuresName)
.setLabelCol("label")
val model = xgbClassifier.fit(xgbInput)
assert(model.getFeaturesCols.sameElements(featuresName))
val df = model.transform(xgbInput)
assert(df.schema.fieldNames.contains("features_" + model.uid))
df.show()
val newFeatureName = "features_new"
// transform also can work for vectorized dataset
val vectorizedInput = new VectorAssembler()
.setInputCols(featuresName)
.setOutputCol(newFeatureName)
.transform(xgbInput)
.select(newFeatureName, "label")
val df1 = model
.setFeaturesCol(newFeatureName)
.transform(vectorizedInput)
assert(df1.schema.fieldNames.contains(newFeatureName))
df1.show()
}
test("featuresCols without features column can work") {
val spark = ss
import spark.implicits._
val xgbInput = Seq(
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
.toDF("f1", "f2", "f3", "f4", "label")
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> 1)
val featuresName = Array("f1", "f2", "f3", "f4")
val xgbClassifier = new XGBoostClassifier(paramMap)
.setFeaturesCol(featuresName)
.setLabelCol("label")
.setEvalSets(Map("eval" -> xgbInput))
val model = xgbClassifier.fit(xgbInput)
assert(model.getFeaturesCols.sameElements(featuresName))
// transform should work for the dataset which includes the feature column names.
val df = model.transform(xgbInput)
assert(df.schema.fieldNames.contains("features"))
df.show()
// transform also can work for vectorized dataset
val vectorizedInput = new VectorAssembler()
.setInputCols(featuresName)
.setOutputCol("features")
.transform(xgbInput)
.select("features", "label")
val df1 = model.transform(vectorizedInput)
df1.show()
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,10 +16,8 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.java.Rabit
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.scalatest.FunSuite

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,13 +16,12 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.java.XGBoostError
import scala.util.Random
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import ml.dmlc.xgboost4j.scala.DMatrix
import org.apache.spark.TaskContext
import org.apache.spark.{SparkException, TaskContext}
import org.scalatest.FunSuite
import org.apache.spark.ml.feature.VectorAssembler
@@ -375,13 +374,14 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
test("throw exception for empty partition in trainingset") {
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "multi:softmax", "num_class" -> "2", "num_round" -> 5,
"num_workers" -> numWorkers, "tree_method" -> "auto")
"objective" -> "binary:logistic", "num_class" -> "2", "num_round" -> 5,
"num_workers" -> numWorkers, "tree_method" -> "auto", "allow_non_zero_for_missing" -> true)
// The Dmatrix will be empty
val trainingDF = buildDataFrame(Seq(XGBLabeledPoint(1.0f, 1, Array(), Array())))
val trainingDF = buildDataFrame(Seq(XGBLabeledPoint(1.0f, 4,
Array(0, 1, 2, 3), Array(0, 1, 2, 3))))
val xgb = new XGBoostClassifier(paramMap)
intercept[XGBoostError] {
val model = xgb.fit(trainingDF)
intercept[SparkException] {
xgb.fit(trainingDF)
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -16,14 +16,15 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.java.{Rabit, XGBoostError}
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
import org.apache.spark.TaskFailedListener
import org.apache.spark.SparkException
import ml.dmlc.xgboost4j.java.Rabit
import ml.dmlc.xgboost4j.scala.Booster
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.scalatest.FunSuite
import org.apache.spark.SparkException
class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
val predictionErrorMin = 0.00001f
val maxFailure = 2;
@@ -33,15 +34,6 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
.config("spark.kryo.classesToRegister", classOf[Booster].getName)
.master(s"local[${numWorkers},${maxFailure}]")
private def waitAndCheckSparkShutdown(waitMiliSec: Int): Boolean = {
var totalWaitedTime = 0L
while (!ss.sparkContext.isStopped && totalWaitedTime <= waitMiliSec) {
Thread.sleep(10)
totalWaitedTime += 10
}
return ss.sparkContext.isStopped
}
test("test classification prediction parity w/o ring reduce") {
val training = buildDataFrame(Classification.train)
val testDF = buildDataFrame(Classification.test)
@@ -91,14 +83,11 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
}
test("test rabit timeout fail handle") {
// disable spark kill listener to verify if rabit_timeout take effect and kill tasks
TaskFailedListener.killerStarted = true
val training = buildDataFrame(Classification.train)
// mock rank 0 failure during 8th allreduce synchronization
Rabit.mockList = Array("0,8,0,0").toList.asJava
try {
intercept[SparkException] {
new XGBoostClassifier(Map(
"eta" -> "0.1",
"max_depth" -> "10",
@@ -108,37 +97,7 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
"num_workers" -> numWorkers,
"rabit_timeout" -> 0))
.fit(training)
} catch {
case e: Throwable => // swallow anything
} finally {
// assume all tasks throw exception almost same time
// 100ms should be enough to exhaust all retries
assert(waitAndCheckSparkShutdown(100) == true)
TaskFailedListener.killerStarted = false
}
}
test("test SparkContext should not be killed ") {
val training = buildDataFrame(Classification.train)
// mock rank 0 failure during 8th allreduce synchronization
Rabit.mockList = Array("0,8,0,0").toList.asJava
try {
new XGBoostClassifier(Map(
"eta" -> "0.1",
"max_depth" -> "10",
"verbosity" -> "1",
"objective" -> "binary:logistic",
"num_round" -> 5,
"num_workers" -> numWorkers,
"kill_spark_context_on_worker_failure" -> false,
"rabit_timeout" -> 0))
.fit(training)
} catch {
case e: Throwable => // swallow anything
} finally {
// wait 3s to check if SparkContext is killed
assert(waitAndCheckSparkShutdown(3000) == false)
}
}
}

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2014 by Contributors
Copyright (c) 2014-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -17,12 +17,14 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.scalatest.FunSuite
import org.apache.spark.ml.feature.VectorAssembler
class XGBoostRegressorSuite extends FunSuite with PerTest {
protected val treeMethod: String = "auto"
@@ -216,4 +218,78 @@ class XGBoostRegressorSuite extends FunSuite with PerTest {
assert(resultDF.columns.contains("predictLeaf"))
assert(resultDF.columns.contains("predictContrib"))
}
test("featuresCols with features column can work") {
val spark = ss
import spark.implicits._
val xgbInput = Seq(
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
.toDF("f1", "f2", "f3", "features", "label")
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "reg:squarederror", "num_round" -> 5, "num_workers" -> 1)
val featuresName = Array("f1", "f2", "f3", "features")
val xgbClassifier = new XGBoostRegressor(paramMap)
.setFeaturesCol(featuresName)
.setLabelCol("label")
val model = xgbClassifier.fit(xgbInput)
assert(model.getFeaturesCols.sameElements(featuresName))
val df = model.transform(xgbInput)
assert(df.schema.fieldNames.contains("features_" + model.uid))
df.show()
val newFeatureName = "features_new"
// transform also can work for vectorized dataset
val vectorizedInput = new VectorAssembler()
.setInputCols(featuresName)
.setOutputCol(newFeatureName)
.transform(xgbInput)
.select(newFeatureName, "label")
val df1 = model
.setFeaturesCol(newFeatureName)
.transform(vectorizedInput)
assert(df1.schema.fieldNames.contains(newFeatureName))
df1.show()
}
test("featuresCols without features column can work") {
val spark = ss
import spark.implicits._
val xgbInput = Seq(
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
.toDF("f1", "f2", "f3", "f4", "label")
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "reg:squarederror", "num_round" -> 5, "num_workers" -> 1)
val featuresName = Array("f1", "f2", "f3", "f4")
val xgbClassifier = new XGBoostRegressor(paramMap)
.setFeaturesCol(featuresName)
.setLabelCol("label")
.setEvalSets(Map("eval" -> xgbInput))
val model = xgbClassifier.fit(xgbInput)
assert(model.getFeaturesCols.sameElements(featuresName))
// transform should work for the dataset which includes the feature column names.
val df = model.transform(xgbInput)
assert(df.schema.fieldNames.contains("features"))
df.show()
// transform also can work for vectorized dataset
val vectorizedInput = new VectorAssembler()
.setInputCols(featuresName)
.setOutputCol("features")
.transform(xgbInput)
.select("features", "label")
val df1 = model.transform(vectorizedInput)
df1.show()
}
}

View File

@@ -1,151 +0,0 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.spark
import org.scalatest.FunSuite
import _root_.ml.dmlc.xgboost4j.scala.spark.PerTest
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.math.min
class SparkParallelismTrackerSuite extends FunSuite with PerTest {
val numParallelism: Int = min(Runtime.getRuntime.availableProcessors(), 4)
override protected def sparkSessionBuilder: SparkSession.Builder = SparkSession.builder()
.master(s"local[${numParallelism}]")
.appName("XGBoostSuite")
.config("spark.ui.enabled", true)
.config("spark.driver.memory", "512m")
.config("spark.task.cpus", 1)
private def waitAndCheckSparkShutdown(waitMiliSec: Int): Boolean = {
var totalWaitedTime = 0L
while (!ss.sparkContext.isStopped && totalWaitedTime <= waitMiliSec) {
Thread.sleep(100)
totalWaitedTime += 100
}
ss.sparkContext.isStopped
}
test("tracker should not affect execution result when timeout is not larger than 0") {
val nWorkers = numParallelism
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
val tracker = new SparkParallelismTracker(sc, 10000, nWorkers)
val disabledTracker = new SparkParallelismTracker(sc, 0, nWorkers)
assert(tracker.execute(rdd.sum()) == rdd.sum())
assert(disabledTracker.execute(rdd.sum()) == rdd.sum())
}
test("tracker should throw exception if parallelism is not sufficient") {
val nWorkers = numParallelism * 3
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
val tracker = new SparkParallelismTracker(sc, 1000, nWorkers)
intercept[IllegalStateException] {
tracker.execute {
rdd.map { i =>
// Test interruption
Thread.sleep(Long.MaxValue)
i
}.sum()
}
}
}
test("tracker should throw exception if parallelism is not sufficient with" +
" spark.task.cpus larger than 1") {
sc.conf.set("spark.task.cpus", "2")
val nWorkers = numParallelism
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers)
val tracker = new SparkParallelismTracker(sc, 1000, nWorkers)
intercept[IllegalStateException] {
tracker.execute {
rdd.map { i =>
// Test interruption
Thread.sleep(Long.MaxValue)
i
}.sum()
}
}
}
test("tracker should not kill SparkContext when killSparkContextOnWorkerFailure=false") {
val nWorkers = numParallelism
val tracker = new SparkParallelismTracker(sc, 0, nWorkers, false)
val rdd: RDD[Int] = sc.parallelize(1 to nWorkers, nWorkers)
try {
tracker.execute {
rdd.map { i =>
val partitionId = TaskContext.get().partitionId()
if (partitionId == 0) {
throw new RuntimeException("mocking task failing")
}
i
}.sum()
}
} catch {
case e: Exception => // catch the exception
} finally {
// wait 3s to check if SparkContext is killed
assert(waitAndCheckSparkShutdown(3000) == false)
}
}
test("tracker should cancel the correct job when killSparkContextOnWorkerFailure=false") {
val nWorkers = 2
val tracker = new SparkParallelismTracker(sc, 0, nWorkers, false)
val rdd: RDD[Int] = sc.parallelize(1 to 10, nWorkers)
val thread = new TestThread(sc)
thread.start()
try {
tracker.execute {
rdd.map { i =>
Thread.sleep(100)
val partitionId = TaskContext.get().partitionId()
if (partitionId == 0) {
throw new RuntimeException("mocking task failing")
}
i
}.sum()
}
} catch {
case e: Exception => // catch the exception
} finally {
thread.join(8000)
// wait 3s to check if SparkContext is killed
assert(waitAndCheckSparkShutdown(3000) == false)
}
}
private[this] class TestThread(sc: SparkContext) extends Thread {
override def run(): Unit = {
var sum: Double = 0.0f
try {
val rdd = sc.parallelize(1 to 4, 2)
sum = rdd.mapPartitions(iter => {
// sleep 2s to ensure task is alive when cancelling other jobs
Thread.sleep(2000)
iter
}).sum()
} finally {
// get the correct result
assert(sum.toInt == 10)
}
}
}
}

View File

@@ -6,10 +6,10 @@
<parent>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost-jvm_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
</parent>
<artifactId>xgboost4j_2.12</artifactId>
<version>1.6.0-SNAPSHOT</version>
<version>1.6.1</version>
<packaging>jar</packaging>
<dependencies>

View File

@@ -100,7 +100,7 @@ class NativeLibLoader {
});
return muslRelatedMemoryMappedFilename.isPresent();
} catch (IOException ignored) {
} catch (Exception ignored) {
// ignored
}
return false;

View File

@@ -30,6 +30,8 @@ public class RabitTracker implements IRabitTracker {
private Map<String, String> envs = new HashMap<String, String>();
// number of workers to be submitted.
private int numWorkers;
private String hostIp = "";
private String pythonExec = "";
private AtomicReference<Process> trackerProcess = new AtomicReference<Process>();
static {
@@ -85,6 +87,13 @@ public class RabitTracker implements IRabitTracker {
this.numWorkers = numWorkers;
}
public RabitTracker(int numWorkers, String hostIp, String pythonExec)
throws XGBoostError {
this(numWorkers);
this.hostIp = hostIp;
this.pythonExec = pythonExec;
}
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught exception thrown by worker:", e);
try {
@@ -126,12 +135,34 @@ public class RabitTracker implements IRabitTracker {
}
}
/** visible for testing */
public String getRabitTrackerCommand() {
StringBuilder sb = new StringBuilder();
if (pythonExec == null || pythonExec.isEmpty()) {
sb.append("python ");
} else {
sb.append(pythonExec + " ");
}
sb.append(" " + tracker_py + " ");
sb.append(" --log-level=DEBUG" + " ");
sb.append(" --num-workers=" + numWorkers + " ");
// we first check the property then check the parameter
String hostIpFromProperties = trackerProperties.getHostIp();
if(hostIpFromProperties != null && !hostIpFromProperties.isEmpty()) {
logger.debug("Using provided host-ip: " + hostIpFromProperties + " from properties");
sb.append(" --host-ip=" + hostIpFromProperties + " ");
} else if (hostIp != null & !hostIp.isEmpty()) {
logger.debug("Using the parametr host-ip: " + hostIp);
sb.append(" --host-ip=" + hostIp + " ");
}
return sb.toString();
}
private boolean startTrackerProcess() {
try {
String trackerExecString = this.addTrackerProperties("python " + tracker_py +
" --log-level=DEBUG --num-workers=" + String.valueOf(numWorkers));
trackerProcess.set(Runtime.getRuntime().exec(trackerExecString));
String cmd = getRabitTrackerCommand();
trackerProcess.set(Runtime.getRuntime().exec(cmd));
loadEnvs(trackerProcess.get().getInputStream());
return true;
} catch (IOException ioe) {
@@ -140,18 +171,6 @@ public class RabitTracker implements IRabitTracker {
}
}
private String addTrackerProperties(String trackerExecString) {
StringBuilder sb = new StringBuilder(trackerExecString);
String hostIp = trackerProperties.getHostIp();
if(hostIp != null && !hostIp.isEmpty()){
logger.debug("Using provided host-ip: " + hostIp);
sb.append(" --host-ip=").append(hostIp);
}
return sb.toString();
}
public void stop() {
if (trackerProcess.get() != null) {
trackerProcess.get().destroy();

View File

@@ -1 +1 @@
1.6.0-dev
1.6.1

View File

@@ -12,7 +12,6 @@
#include "xgboost/data.h"
#include "xgboost/parameter.h"
#include "xgboost/span.h"
#include "xgboost/task.h"
namespace xgboost {
namespace common {
@@ -75,15 +74,20 @@ inline void InvalidCategory() {
// values to be less than this last representable value.
auto str = std::to_string(OutOfRangeCat());
LOG(FATAL) << "Invalid categorical value detected. Categorical value should be non-negative, "
"less than total umber of categories in training data and less than " +
"less than total number of categories in training data and less than " +
str;
}
inline void CheckMaxCat(float max_cat, size_t n_categories) {
CHECK_GE(max_cat + 1, n_categories)
<< "Maximum cateogry should not be lesser than the total number of categories.";
}
/*!
* \brief Whether should we use onehot encoding for categorical data.
*/
XGBOOST_DEVICE inline bool UseOneHot(uint32_t n_cats, uint32_t max_cat_to_onehot, ObjInfo task) {
bool use_one_hot = n_cats < max_cat_to_onehot || task.UseOneHot();
XGBOOST_DEVICE inline bool UseOneHot(uint32_t n_cats, uint32_t max_cat_to_onehot) {
bool use_one_hot = n_cats < max_cat_to_onehot;
return use_one_hot;
}

View File

@@ -164,6 +164,74 @@ class Range {
Iterator end_;
};
/**
* \brief Transform iterator that takes an index and calls transform operator.
*
* This is CPU-only right now as taking host device function as operator complicates the
* code. For device side one can use `thrust::transform_iterator` instead.
*/
template <typename Fn>
class IndexTransformIter {
size_t iter_{0};
Fn fn_;
public:
using iterator_category = std::random_access_iterator_tag; // NOLINT
using value_type = std::result_of_t<Fn(size_t)>; // NOLINT
using difference_type = detail::ptrdiff_t; // NOLINT
using reference = std::add_lvalue_reference_t<value_type>; // NOLINT
using pointer = std::add_pointer_t<value_type>; // NOLINT
public:
/**
* \param op Transform operator, takes a size_t index as input.
*/
explicit IndexTransformIter(Fn &&op) : fn_{op} {}
IndexTransformIter(IndexTransformIter const &) = default;
IndexTransformIter& operator=(IndexTransformIter&&) = default;
IndexTransformIter& operator=(IndexTransformIter const& that) {
iter_ = that.iter_;
return *this;
}
value_type operator*() const { return fn_(iter_); }
auto operator-(IndexTransformIter const &that) const { return iter_ - that.iter_; }
bool operator==(IndexTransformIter const &that) const { return iter_ == that.iter_; }
bool operator!=(IndexTransformIter const &that) const { return !(*this == that); }
IndexTransformIter &operator++() {
iter_++;
return *this;
}
IndexTransformIter operator++(int) {
auto ret = *this;
++(*this);
return ret;
}
IndexTransformIter &operator+=(difference_type n) {
iter_ += n;
return *this;
}
IndexTransformIter &operator-=(difference_type n) {
(*this) += -n;
return *this;
}
IndexTransformIter operator+(difference_type n) const {
auto ret = *this;
return ret += n;
}
IndexTransformIter operator-(difference_type n) const {
auto ret = *this;
return ret -= n;
}
};
template <typename Fn>
auto MakeIndexTransformIter(Fn&& fn) {
return IndexTransformIter<Fn>(std::forward<Fn>(fn));
}
int AllVisibleGPUs();
inline void AssertGPUSupport() {

View File

@@ -468,11 +468,17 @@ void AddCutPoint(typename SketchType::SummaryContainer const &summary, int max_b
}
}
void AddCategories(std::set<float> const &categories, HistogramCuts *cuts) {
auto &cut_values = cuts->cut_values_.HostVector();
for (auto const &v : categories) {
cut_values.push_back(AsCat(v));
auto AddCategories(std::set<float> const &categories, HistogramCuts *cuts) {
if (std::any_of(categories.cbegin(), categories.cend(), InvalidCat)) {
InvalidCategory();
}
auto &cut_values = cuts->cut_values_.HostVector();
auto max_cat = *std::max_element(categories.cbegin(), categories.cend());
CheckMaxCat(max_cat, categories.size());
for (bst_cat_t i = 0; i <= AsCat(max_cat); ++i) {
cut_values.push_back(i);
}
return max_cat;
}
template <typename WQSketch>
@@ -505,11 +511,12 @@ void SketchContainerImpl<WQSketch>::MakeCuts(HistogramCuts* cuts) {
}
});
float max_cat{-1.f};
for (size_t fid = 0; fid < reduced.size(); ++fid) {
size_t max_num_bins = std::min(num_cuts[fid], max_bins_);
typename WQSketch::SummaryContainer const& a = final_summaries[fid];
if (IsCat(feature_types_, fid)) {
AddCategories(categories_.at(fid), cuts);
max_cat = std::max(max_cat, AddCategories(categories_.at(fid), cuts));
} else {
AddCutPoint<WQSketch>(a, max_num_bins, cuts);
// push a value that is greater than anything
@@ -527,30 +534,7 @@ void SketchContainerImpl<WQSketch>::MakeCuts(HistogramCuts* cuts) {
cuts->cut_ptrs_.HostVector().push_back(cut_size);
}
if (has_categorical_) {
for (auto const &feat : categories_) {
if (std::any_of(feat.cbegin(), feat.cend(), InvalidCat)) {
InvalidCategory();
}
}
auto const &ptrs = cuts->Ptrs();
auto const &vals = cuts->Values();
float max_cat{-std::numeric_limits<float>::infinity()};
for (size_t i = 1; i < ptrs.size(); ++i) {
if (IsCat(feature_types_, i - 1)) {
auto beg = ptrs[i - 1];
auto end = ptrs[i];
auto feat = Span<float const>{vals}.subspan(beg, end - beg);
auto max_elem = *std::max_element(feat.cbegin(), feat.cend());
if (max_elem > max_cat) {
max_cat = max_elem;
}
}
}
cuts->SetCategorical(true, max_cat);
}
cuts->SetCategorical(this->has_categorical_, max_cat);
monitor_.Stop(__func__);
}

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2020 by XGBoost Contributors
* Copyright 2020-2022 by XGBoost Contributors
*/
#include <thrust/binary_search.h>
#include <thrust/execution_policy.h>
@@ -583,13 +583,13 @@ void SketchContainer::AllReduce() {
namespace {
struct InvalidCatOp {
Span<float const> values;
Span<uint32_t const> ptrs;
Span<SketchEntry const> values;
Span<size_t const> ptrs;
Span<FeatureType const> ft;
XGBOOST_DEVICE bool operator()(size_t i) const {
auto fidx = dh::SegmentId(ptrs, i);
return IsCat(ft, fidx) && InvalidCat(values[i]);
return IsCat(ft, fidx) && InvalidCat(values[i].value);
}
};
} // anonymous namespace
@@ -611,7 +611,7 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
p_cuts->min_vals_.SetDevice(device_);
auto d_min_values = p_cuts->min_vals_.DeviceSpan();
auto in_cut_values = dh::ToSpan(this->Current());
auto const in_cut_values = dh::ToSpan(this->Current());
// Set up output ptr
p_cuts->cut_ptrs_.SetDevice(device_);
@@ -619,26 +619,70 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
h_out_columns_ptr.clear();
h_out_columns_ptr.push_back(0);
auto const& h_feature_types = this->feature_types_.ConstHostSpan();
for (bst_feature_t i = 0; i < num_columns_; ++i) {
size_t column_size = std::max(static_cast<size_t>(1ul),
this->Column(i).size());
if (IsCat(h_feature_types, i)) {
h_out_columns_ptr.push_back(static_cast<size_t>(column_size));
} else {
h_out_columns_ptr.push_back(std::min(static_cast<size_t>(column_size),
static_cast<size_t>(num_bins_)));
auto d_ft = feature_types_.ConstDeviceSpan();
std::vector<SketchEntry> max_values;
float max_cat{-1.f};
if (has_categorical_) {
dh::XGBCachingDeviceAllocator<char> alloc;
auto key_it = dh::MakeTransformIterator<bst_feature_t>(
thrust::make_counting_iterator(0ul), [=] XGBOOST_DEVICE(size_t i) -> bst_feature_t {
return dh::SegmentId(d_in_columns_ptr, i);
});
auto invalid_op = InvalidCatOp{in_cut_values, d_in_columns_ptr, d_ft};
auto val_it = dh::MakeTransformIterator<SketchEntry>(
thrust::make_counting_iterator(0ul), [=] XGBOOST_DEVICE(size_t i) {
auto fidx = dh::SegmentId(d_in_columns_ptr, i);
auto v = in_cut_values[i];
if (IsCat(d_ft, fidx)) {
if (invalid_op(i)) {
// use inf to indicate invalid value, this way we can keep it as in
// indicator in the reduce operation as it's always the greatest value.
v.value = std::numeric_limits<float>::infinity();
}
}
return v;
});
CHECK_EQ(num_columns_, d_in_columns_ptr.size() - 1);
max_values.resize(d_in_columns_ptr.size() - 1);
dh::caching_device_vector<SketchEntry> d_max_values(d_in_columns_ptr.size() - 1);
thrust::reduce_by_key(thrust::cuda::par(alloc), key_it, key_it + in_cut_values.size(), val_it,
thrust::make_discard_iterator(), d_max_values.begin(),
thrust::equal_to<bst_feature_t>{},
[] __device__(auto l, auto r) { return l.value > r.value ? l : r; });
dh::CopyDeviceSpanToVector(&max_values, dh::ToSpan(d_max_values));
auto max_it = common::MakeIndexTransformIter([&](auto i) {
if (IsCat(h_feature_types, i)) {
return max_values[i].value;
}
return -1.f;
});
max_cat = *std::max_element(max_it, max_it + max_values.size());
if (std::isinf(max_cat)) {
InvalidCategory();
}
}
std::partial_sum(h_out_columns_ptr.begin(), h_out_columns_ptr.end(),
h_out_columns_ptr.begin());
auto d_out_columns_ptr = p_cuts->cut_ptrs_.ConstDeviceSpan();
// Set up output cuts
for (bst_feature_t i = 0; i < num_columns_; ++i) {
size_t column_size = std::max(static_cast<size_t>(1ul), this->Column(i).size());
if (IsCat(h_feature_types, i)) {
// column_size is the number of unique values in that feature.
CheckMaxCat(max_values[i].value, column_size);
h_out_columns_ptr.push_back(max_values[i].value + 1); // includes both max_cat and 0.
} else {
h_out_columns_ptr.push_back(
std::min(static_cast<size_t>(column_size), static_cast<size_t>(num_bins_)));
}
}
std::partial_sum(h_out_columns_ptr.begin(), h_out_columns_ptr.end(), h_out_columns_ptr.begin());
auto d_out_columns_ptr = p_cuts->cut_ptrs_.ConstDeviceSpan();
size_t total_bins = h_out_columns_ptr.back();
p_cuts->cut_values_.SetDevice(device_);
p_cuts->cut_values_.Resize(total_bins);
auto out_cut_values = p_cuts->cut_values_.DeviceSpan();
auto d_ft = feature_types_.ConstDeviceSpan();
dh::LaunchN(total_bins, [=] __device__(size_t idx) {
auto column_id = dh::SegmentId(d_out_columns_ptr, idx);
@@ -667,8 +711,7 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
}
if (IsCat(d_ft, column_id)) {
assert(out_column.size() == in_column.size());
out_column[idx] = in_column[idx].value;
out_column[idx] = idx;
return;
}
@@ -684,36 +727,7 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
out_column[idx] = in_column[idx+1].value;
});
float max_cat{-1.0f};
if (has_categorical_) {
auto invalid_op = InvalidCatOp{out_cut_values, d_out_columns_ptr, d_ft};
auto it = dh::MakeTransformIterator<thrust::pair<bool, float>>(
thrust::make_counting_iterator(0ul), [=] XGBOOST_DEVICE(size_t i) {
auto fidx = dh::SegmentId(d_out_columns_ptr, i);
if (IsCat(d_ft, fidx)) {
auto invalid = invalid_op(i);
auto v = out_cut_values[i];
return thrust::make_pair(invalid, v);
}
return thrust::make_pair(false, std::numeric_limits<float>::min());
});
bool invalid{false};
dh::XGBCachingDeviceAllocator<char> alloc;
thrust::tie(invalid, max_cat) =
thrust::reduce(thrust::cuda::par(alloc), it, it + out_cut_values.size(),
thrust::make_pair(false, std::numeric_limits<float>::min()),
[=] XGBOOST_DEVICE(thrust::pair<bool, bst_cat_t> const &l,
thrust::pair<bool, bst_cat_t> const &r) {
return thrust::make_pair(l.first || r.first, std::max(l.second, r.second));
});
if (invalid) {
InvalidCategory();
}
}
p_cuts->SetCategorical(this->has_categorical_, max_cat);
timer_.Stop(__func__);
}
} // namespace common

View File

@@ -419,6 +419,7 @@ class LearnerConfiguration : public Learner {
obj_.reset(ObjFunction::Create(tparam_.objective, &generic_parameters_));
}
obj_->LoadConfig(objective_fn);
learner_model_param_.task = obj_->Task();
tparam_.booster = get<String>(gradient_booster["name"]);
if (!gbm_) {

View File

@@ -199,13 +199,11 @@ __device__ void EvaluateFeature(
}
template <int BLOCK_THREADS, typename GradientSumT>
__global__ void EvaluateSplitsKernel(
EvaluateSplitInputs<GradientSumT> left,
EvaluateSplitInputs<GradientSumT> right,
ObjInfo task,
common::Span<bst_feature_t> sorted_idx,
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
common::Span<DeviceSplitCandidate> out_candidates) {
__global__ void EvaluateSplitsKernel(EvaluateSplitInputs<GradientSumT> left,
EvaluateSplitInputs<GradientSumT> right,
common::Span<bst_feature_t> sorted_idx,
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
common::Span<DeviceSplitCandidate> out_candidates) {
// KeyValuePair here used as threadIdx.x -> gain_value
using ArgMaxT = cub::KeyValuePair<int, float>;
using BlockScanT =
@@ -241,7 +239,7 @@ __global__ void EvaluateSplitsKernel(
if (common::IsCat(inputs.feature_types, fidx)) {
auto n_bins_in_feat = inputs.feature_segments[fidx + 1] - inputs.feature_segments[fidx];
if (common::UseOneHot(n_bins_in_feat, inputs.param.max_cat_to_onehot, task)) {
if (common::UseOneHot(n_bins_in_feat, inputs.param.max_cat_to_onehot)) {
EvaluateFeature<BLOCK_THREADS, SumReduceT, BlockScanT, MaxReduceT, TempStorage, GradientSumT,
kOneHot>(fidx, inputs, evaluator, sorted_idx, 0, &best_split, &temp_storage);
} else {
@@ -310,7 +308,7 @@ __device__ void SortBasedSplit(EvaluateSplitInputs<GradientSumT> const &input,
template <typename GradientSumT>
void GPUHistEvaluator<GradientSumT>::EvaluateSplits(
EvaluateSplitInputs<GradientSumT> left, EvaluateSplitInputs<GradientSumT> right, ObjInfo task,
EvaluateSplitInputs<GradientSumT> left, EvaluateSplitInputs<GradientSumT> right,
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
common::Span<DeviceSplitCandidate> out_splits) {
if (!split_cats_.empty()) {
@@ -323,7 +321,7 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(
// One block for each feature
uint32_t constexpr kBlockThreads = 256;
dh::LaunchKernel {static_cast<uint32_t>(combined_num_features), kBlockThreads, 0}(
EvaluateSplitsKernel<kBlockThreads, GradientSumT>, left, right, task, this->SortedIdx(left),
EvaluateSplitsKernel<kBlockThreads, GradientSumT>, left, right, this->SortedIdx(left),
evaluator, dh::ToSpan(feature_best_splits));
// Reduce to get best candidate for left and right child over all features
@@ -365,7 +363,7 @@ void GPUHistEvaluator<GradientSumT>::CopyToHost(EvaluateSplitInputs<GradientSumT
}
template <typename GradientSumT>
void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, ObjInfo task,
void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate,
EvaluateSplitInputs<GradientSumT> left,
EvaluateSplitInputs<GradientSumT> right,
common::Span<GPUExpandEntry> out_entries) {
@@ -373,7 +371,7 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, Ob
dh::TemporaryArray<DeviceSplitCandidate> splits_out_storage(2);
auto out_splits = dh::ToSpan(splits_out_storage);
this->EvaluateSplits(left, right, task, evaluator, out_splits);
this->EvaluateSplits(left, right, evaluator, out_splits);
auto d_sorted_idx = this->SortedIdx(left);
auto d_entries = out_entries;
@@ -385,7 +383,7 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, Ob
auto fidx = out_splits[i].findex;
if (split.is_cat &&
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot, task)) {
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot)) {
bool is_left = i == 0;
auto out = is_left ? cats_out.first(cats_out.size() / 2) : cats_out.last(cats_out.size() / 2);
SortBasedSplit(input, d_sorted_idx, fidx, is_left, out, &out_splits[i]);
@@ -405,11 +403,11 @@ void GPUHistEvaluator<GradientSumT>::EvaluateSplits(GPUExpandEntry candidate, Ob
template <typename GradientSumT>
GPUExpandEntry GPUHistEvaluator<GradientSumT>::EvaluateSingleSplit(
EvaluateSplitInputs<GradientSumT> input, float weight, ObjInfo task) {
EvaluateSplitInputs<GradientSumT> input, float weight) {
dh::TemporaryArray<DeviceSplitCandidate> splits_out(1);
auto out_split = dh::ToSpan(splits_out);
auto evaluator = tree_evaluator_.GetEvaluator<GPUTrainingParam>();
this->EvaluateSplits(input, {}, task, evaluator, out_split);
this->EvaluateSplits(input, {}, evaluator, out_split);
auto cats_out = this->DeviceCatStorage(input.nidx);
auto d_sorted_idx = this->SortedIdx(input);
@@ -421,7 +419,7 @@ GPUExpandEntry GPUHistEvaluator<GradientSumT>::EvaluateSingleSplit(
auto fidx = out_split[i].findex;
if (split.is_cat &&
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot, task)) {
!common::UseOneHot(input.FeatureBins(fidx), input.param.max_cat_to_onehot)) {
SortBasedSplit(input, d_sorted_idx, fidx, true, cats_out, &out_split[i]);
}

View File

@@ -114,7 +114,7 @@ class GPUHistEvaluator {
/**
* \brief Reset the evaluator, should be called before any use.
*/
void Reset(common::HistogramCuts const &cuts, common::Span<FeatureType const> ft, ObjInfo task,
void Reset(common::HistogramCuts const &cuts, common::Span<FeatureType const> ft,
bst_feature_t n_features, TrainParam const &param, int32_t device);
/**
@@ -150,21 +150,20 @@ class GPUHistEvaluator {
// impl of evaluate splits, contains CUDA kernels so it's public
void EvaluateSplits(EvaluateSplitInputs<GradientSumT> left,
EvaluateSplitInputs<GradientSumT> right, ObjInfo task,
EvaluateSplitInputs<GradientSumT> right,
TreeEvaluator::SplitEvaluator<GPUTrainingParam> evaluator,
common::Span<DeviceSplitCandidate> out_splits);
/**
* \brief Evaluate splits for left and right nodes.
*/
void EvaluateSplits(GPUExpandEntry candidate, ObjInfo task,
void EvaluateSplits(GPUExpandEntry candidate,
EvaluateSplitInputs<GradientSumT> left,
EvaluateSplitInputs<GradientSumT> right,
common::Span<GPUExpandEntry> out_splits);
/**
* \brief Evaluate splits for root node.
*/
GPUExpandEntry EvaluateSingleSplit(EvaluateSplitInputs<GradientSumT> input, float weight,
ObjInfo task);
GPUExpandEntry EvaluateSingleSplit(EvaluateSplitInputs<GradientSumT> input, float weight);
};
} // namespace tree
} // namespace xgboost

View File

@@ -16,12 +16,12 @@ namespace xgboost {
namespace tree {
template <typename GradientSumT>
void GPUHistEvaluator<GradientSumT>::Reset(common::HistogramCuts const &cuts,
common::Span<FeatureType const> ft, ObjInfo task,
common::Span<FeatureType const> ft,
bst_feature_t n_features, TrainParam const &param,
int32_t device) {
param_ = param;
tree_evaluator_ = TreeEvaluator{param, n_features, device};
if (cuts.HasCategorical() && !task.UseOneHot()) {
if (cuts.HasCategorical()) {
dh::XGBCachingDeviceAllocator<char> alloc;
auto ptrs = cuts.cut_ptrs_.ConstDeviceSpan();
auto beg = thrust::make_counting_iterator<size_t>(1ul);
@@ -34,7 +34,7 @@ void GPUHistEvaluator<GradientSumT>::Reset(common::HistogramCuts const &cuts,
auto idx = i - 1;
if (common::IsCat(ft, idx)) {
auto n_bins = ptrs[i] - ptrs[idx];
bool use_sort = !common::UseOneHot(n_bins, to_onehot, task);
bool use_sort = !common::UseOneHot(n_bins, to_onehot);
return use_sort;
}
return false;

View File

@@ -11,7 +11,6 @@
#include <utility>
#include <vector>
#include "xgboost/task.h"
#include "../param.h"
#include "../constraints.h"
#include "../split_evaluator.h"
@@ -39,7 +38,6 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
int32_t n_threads_ {0};
FeatureInteractionConstraintHost interaction_constraints_;
std::vector<NodeEntry> snode_;
ObjInfo task_;
// if sum of statistics for non-missing values in the node
// is equal to sum of statistics for all values:
@@ -244,7 +242,7 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
}
if (is_cat) {
auto n_bins = cut_ptrs.at(fidx + 1) - cut_ptrs[fidx];
if (common::UseOneHot(n_bins, param_.max_cat_to_onehot, task_)) {
if (common::UseOneHot(n_bins, param_.max_cat_to_onehot)) {
EnumerateSplit<+1, kOneHot>(cut, {}, histogram, fidx, nidx, evaluator, best);
EnumerateSplit<-1, kOneHot>(cut, {}, histogram, fidx, nidx, evaluator, best);
} else {
@@ -345,7 +343,6 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
auto Evaluator() const { return tree_evaluator_.GetEvaluator(); }
auto const& Stats() const { return snode_; }
auto Task() const { return task_; }
float InitRoot(GradStats const& root_sum) {
snode_.resize(1);
@@ -363,12 +360,11 @@ template <typename GradientSumT, typename ExpandEntry> class HistEvaluator {
// The column sampler must be constructed by caller since we need to preserve the rng
// for the entire training session.
explicit HistEvaluator(TrainParam const &param, MetaInfo const &info, int32_t n_threads,
std::shared_ptr<common::ColumnSampler> sampler, ObjInfo task)
std::shared_ptr<common::ColumnSampler> sampler)
: param_{param},
column_sampler_{std::move(sampler)},
tree_evaluator_{param, static_cast<bst_feature_t>(info.num_col_), GenericParameter::kCpuId},
n_threads_{n_threads},
task_{task} {
n_threads_{n_threads} {
interaction_constraints_.Configure(param, info.num_col_);
column_sampler_->Init(info.num_col_, info.feature_weights.HostVector(), param_.colsample_bynode,
param_.colsample_bylevel, param_.colsample_bytree);

View File

@@ -28,10 +28,8 @@ DMLC_REGISTRY_FILE_TAG(updater_approx);
namespace {
// Return the BatchParam used by DMatrix.
template <typename GradientSumT>
auto BatchSpec(TrainParam const &p, common::Span<float> hess,
HistEvaluator<GradientSumT, CPUExpandEntry> const &evaluator) {
return BatchParam{p.max_bin, hess, !evaluator.Task().const_hess};
auto BatchSpec(TrainParam const &p, common::Span<float> hess, ObjInfo const task) {
return BatchParam{p.max_bin, hess, !task.const_hess};
}
auto BatchSpec(TrainParam const &p, common::Span<float> hess) {
@@ -46,7 +44,8 @@ class GloablApproxBuilder {
std::shared_ptr<common::ColumnSampler> col_sampler_;
HistEvaluator<GradientSumT, CPUExpandEntry> evaluator_;
HistogramBuilder<GradientSumT, CPUExpandEntry> histogram_builder_;
GenericParameter const *ctx_;
Context const *ctx_;
ObjInfo const task_;
std::vector<ApproxRowPartitioner> partitioner_;
// Pointer to last updated tree, used for update prediction cache.
@@ -64,8 +63,7 @@ class GloablApproxBuilder {
int32_t n_total_bins = 0;
partitioner_.clear();
// Generating the GHistIndexMatrix is quite slow, is there a way to speed it up?
for (auto const &page :
p_fmat->GetBatches<GHistIndexMatrix>(BatchSpec(param_, hess, evaluator_))) {
for (auto const &page : p_fmat->GetBatches<GHistIndexMatrix>(BatchSpec(param_, hess, task_))) {
if (n_total_bins == 0) {
n_total_bins = page.cut.TotalBins();
feature_values_ = page.cut;
@@ -160,8 +158,9 @@ class GloablApproxBuilder {
common::Monitor *monitor)
: param_{std::move(param)},
col_sampler_{std::move(column_sampler)},
evaluator_{param_, info, ctx->Threads(), col_sampler_, task},
evaluator_{param_, info, ctx->Threads(), col_sampler_},
ctx_{ctx},
task_{task},
monitor_{monitor} {}
void UpdateTree(RegTree *p_tree, std::vector<GradientPair> const &gpair, common::Span<float> hess,

View File

@@ -229,16 +229,14 @@ struct GPUHistMakerDevice {
// Reset values for each update iteration
// Note that the column sampler must be passed by value because it is not
// thread safe
void Reset(HostDeviceVector<GradientPair>* dh_gpair, DMatrix* dmat, int64_t num_columns,
ObjInfo task) {
void Reset(HostDeviceVector<GradientPair>* dh_gpair, DMatrix* dmat, int64_t num_columns) {
auto const& info = dmat->Info();
this->column_sampler.Init(num_columns, info.feature_weights.HostVector(),
param.colsample_bynode, param.colsample_bylevel,
param.colsample_bytree);
dh::safe_cuda(cudaSetDevice(device_id));
this->evaluator_.Reset(page->Cuts(), feature_types, task, dmat->Info().num_col_, param,
device_id);
this->evaluator_.Reset(page->Cuts(), feature_types, dmat->Info().num_col_, param, device_id);
this->interaction_constraints.Reset();
std::fill(node_sum_gradients.begin(), node_sum_gradients.end(), GradientPairPrecise{});
@@ -260,7 +258,7 @@ struct GPUHistMakerDevice {
hist.Reset();
}
GPUExpandEntry EvaluateRootSplit(GradientPairPrecise root_sum, float weight, ObjInfo task) {
GPUExpandEntry EvaluateRootSplit(GradientPairPrecise root_sum, float weight) {
int nidx = RegTree::kRoot;
GPUTrainingParam gpu_param(param);
auto sampled_features = column_sampler.GetFeatureSet(0);
@@ -277,12 +275,12 @@ struct GPUHistMakerDevice {
matrix.gidx_fvalue_map,
matrix.min_fvalue,
hist.GetNodeHistogram(nidx)};
auto split = this->evaluator_.EvaluateSingleSplit(inputs, weight, task);
auto split = this->evaluator_.EvaluateSingleSplit(inputs, weight);
return split;
}
void EvaluateLeftRightSplits(GPUExpandEntry candidate, ObjInfo task, int left_nidx,
int right_nidx, const RegTree& tree,
void EvaluateLeftRightSplits(GPUExpandEntry candidate, int left_nidx, int right_nidx,
const RegTree& tree,
common::Span<GPUExpandEntry> pinned_candidates_out) {
dh::TemporaryArray<DeviceSplitCandidate> splits_out(2);
GPUTrainingParam gpu_param(param);
@@ -316,7 +314,7 @@ struct GPUHistMakerDevice {
hist.GetNodeHistogram(right_nidx)};
dh::TemporaryArray<GPUExpandEntry> entries(2);
this->evaluator_.EvaluateSplits(candidate, task, left, right, dh::ToSpan(entries));
this->evaluator_.EvaluateSplits(candidate, left, right, dh::ToSpan(entries));
dh::safe_cuda(cudaMemcpyAsync(pinned_candidates_out.data(), entries.data().get(),
sizeof(GPUExpandEntry) * entries.size(), cudaMemcpyDeviceToHost));
}
@@ -584,7 +582,7 @@ struct GPUHistMakerDevice {
tree[candidate.nid].RightChild());
}
GPUExpandEntry InitRoot(RegTree* p_tree, ObjInfo task, dh::AllReducer* reducer) {
GPUExpandEntry InitRoot(RegTree* p_tree, dh::AllReducer* reducer) {
constexpr bst_node_t kRootNIdx = 0;
dh::XGBCachingDeviceAllocator<char> alloc;
auto gpair_it = dh::MakeTransformIterator<GradientPairPrecise>(
@@ -605,7 +603,7 @@ struct GPUHistMakerDevice {
(*p_tree)[kRootNIdx].SetLeaf(param.learning_rate * weight);
// Generate first split
auto root_entry = this->EvaluateRootSplit(root_sum, weight, task);
auto root_entry = this->EvaluateRootSplit(root_sum, weight);
return root_entry;
}
@@ -615,11 +613,11 @@ struct GPUHistMakerDevice {
Driver<GPUExpandEntry> driver(static_cast<TrainParam::TreeGrowPolicy>(param.grow_policy));
monitor.Start("Reset");
this->Reset(gpair_all, p_fmat, p_fmat->Info().num_col_, task);
this->Reset(gpair_all, p_fmat, p_fmat->Info().num_col_);
monitor.Stop("Reset");
monitor.Start("InitRoot");
driver.Push({ this->InitRoot(p_tree, task, reducer) });
driver.Push({ this->InitRoot(p_tree, reducer) });
monitor.Stop("InitRoot");
auto num_leaves = 1;
@@ -656,7 +654,7 @@ struct GPUHistMakerDevice {
monitor.Stop("BuildHist");
monitor.Start("EvaluateSplits");
this->EvaluateLeftRightSplits(candidate, task, left_child_nidx, right_child_nidx, *p_tree,
this->EvaluateLeftRightSplits(candidate, left_child_nidx, right_child_nidx, *p_tree,
new_candidates.subspan(i * 2, 2));
monitor.Stop("EvaluateSplits");
} else {

View File

@@ -342,7 +342,7 @@ void QuantileHistMaker::Builder<GradientSumT>::InitData(DMatrix *fmat, const Reg
// store a pointer to the tree
p_last_tree_ = &tree;
evaluator_.reset(new HistEvaluator<GradientSumT, CPUExpandEntry>{
param_, info, this->ctx_->Threads(), column_sampler_, task_});
param_, info, this->ctx_->Threads(), column_sampler_});
monitor_->Stop(__func__);
}

View File

@@ -14,7 +14,6 @@ dependencies:
- jsonschema
- cupy
- python-graphviz
- modin-ray
- pip
- py-ubjson
- cffi

View File

@@ -2,6 +2,7 @@ import sys
import os
from contextlib import contextmanager
@contextmanager
def cd(path):
path = os.path.normpath(path)
@@ -13,10 +14,12 @@ def cd(path):
finally:
os.chdir(cwd)
if len(sys.argv) != 4:
print('Usage: {} [wheel to rename] [commit id] [platform tag]'.format(sys.argv[0]))
sys.exit(1)
whl_path = sys.argv[1]
commit_id = sys.argv[2]
platform_tag = sys.argv[3]
@@ -36,3 +39,7 @@ with cd(dirname):
if os.path.isfile(new_name):
os.remove(new_name)
os.rename(basename, new_name)
filesize = os.path.getsize(new_name) / 1024 / 1024 # MB
msg = f"Limit of wheel size set by PyPI is exceeded. {new_name}: {filesize}"
assert filesize <= 200, msg

View File

@@ -57,8 +57,7 @@ void TestEvaluateSingleSplit(bool is_categorical) {
GPUHistEvaluator<GradientPair> evaluator{
tparam, static_cast<bst_feature_t>(feature_min_values.size()), 0};
dh::device_vector<common::CatBitField::value_type> out_cats;
DeviceSplitCandidate result =
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
EXPECT_EQ(result.findex, 1);
EXPECT_EQ(result.fvalue, 11.0);
@@ -101,8 +100,7 @@ TEST(GpuHist, EvaluateSingleSplitMissing) {
dh::ToSpan(feature_histogram)};
GPUHistEvaluator<GradientPair> evaluator(tparam, feature_set.size(), 0);
DeviceSplitCandidate result =
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
EXPECT_EQ(result.findex, 0);
EXPECT_EQ(result.fvalue, 1.0);
@@ -114,10 +112,8 @@ TEST(GpuHist, EvaluateSingleSplitMissing) {
TEST(GpuHist, EvaluateSingleSplitEmpty) {
TrainParam tparam = ZeroParam();
GPUHistEvaluator<GradientPair> evaluator(tparam, 1, 0);
DeviceSplitCandidate result = evaluator
.EvaluateSingleSplit(EvaluateSplitInputs<GradientPair>{}, 0,
ObjInfo{ObjInfo::kRegression})
.split;
DeviceSplitCandidate result =
evaluator.EvaluateSingleSplit(EvaluateSplitInputs<GradientPair>{}, 0).split;
EXPECT_EQ(result.findex, -1);
EXPECT_LT(result.loss_chg, 0.0f);
}
@@ -152,8 +148,7 @@ TEST(GpuHist, EvaluateSingleSplitFeatureSampling) {
dh::ToSpan(feature_histogram)};
GPUHistEvaluator<GradientPair> evaluator(tparam, feature_min_values.size(), 0);
DeviceSplitCandidate result =
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
EXPECT_EQ(result.findex, 1);
EXPECT_EQ(result.fvalue, 11.0);
@@ -191,8 +186,7 @@ TEST(GpuHist, EvaluateSingleSplitBreakTies) {
dh::ToSpan(feature_histogram)};
GPUHistEvaluator<GradientPair> evaluator(tparam, feature_min_values.size(), 0);
DeviceSplitCandidate result =
evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
DeviceSplitCandidate result = evaluator.EvaluateSingleSplit(input, 0).split;
EXPECT_EQ(result.findex, 0);
EXPECT_EQ(result.fvalue, 1.0);
@@ -243,8 +237,8 @@ TEST(GpuHist, EvaluateSplits) {
GPUHistEvaluator<GradientPair> evaluator{
tparam, static_cast<bst_feature_t>(feature_min_values.size()), 0};
evaluator.EvaluateSplits(input_left, input_right, ObjInfo{ObjInfo::kRegression},
evaluator.GetEvaluator(), dh::ToSpan(out_splits));
evaluator.EvaluateSplits(input_left, input_right, evaluator.GetEvaluator(),
dh::ToSpan(out_splits));
DeviceSplitCandidate result_left = out_splits[0];
EXPECT_EQ(result_left.findex, 1);
@@ -264,8 +258,7 @@ TEST_F(TestPartitionBasedSplit, GpuHist) {
cuts_.cut_values_.SetDevice(0);
cuts_.min_vals_.SetDevice(0);
ObjInfo task{ObjInfo::kRegression};
evaluator.Reset(cuts_, dh::ToSpan(ft), task, info_.num_col_, param_, 0);
evaluator.Reset(cuts_, dh::ToSpan(ft), info_.num_col_, param_, 0);
dh::device_vector<GradientPairPrecise> d_hist(hist_[0].size());
auto node_hist = hist_[0];
@@ -282,7 +275,7 @@ TEST_F(TestPartitionBasedSplit, GpuHist) {
cuts_.cut_values_.ConstDeviceSpan(),
cuts_.min_vals_.ConstDeviceSpan(),
dh::ToSpan(d_hist)};
auto split = evaluator.EvaluateSingleSplit(input, 0, ObjInfo{ObjInfo::kRegression}).split;
auto split = evaluator.EvaluateSingleSplit(input, 0).split;
ASSERT_NEAR(split.loss_chg, best_score_, 1e-16);
}
} // namespace tree

View File

@@ -24,8 +24,8 @@ template <typename GradientSumT> void TestEvaluateSplits() {
auto dmat = RandomDataGenerator(kRows, kCols, 0).Seed(3).GenerateDMatrix();
auto evaluator = HistEvaluator<GradientSumT, CPUExpandEntry>{
param, dmat->Info(), n_threads, sampler, ObjInfo{ObjInfo::kRegression}};
auto evaluator =
HistEvaluator<GradientSumT, CPUExpandEntry>{param, dmat->Info(), n_threads, sampler};
common::HistCollection<GradientSumT> hist;
std::vector<GradientPair> row_gpairs = {
{1.23f, 0.24f}, {0.24f, 0.25f}, {0.26f, 0.27f}, {2.27f, 0.28f},
@@ -97,8 +97,7 @@ TEST(HistEvaluator, Apply) {
param.UpdateAllowUnknown(Args{{"min_child_weight", "0"}, {"reg_lambda", "0.0"}});
auto dmat = RandomDataGenerator(kNRows, kNCols, 0).Seed(3).GenerateDMatrix();
auto sampler = std::make_shared<common::ColumnSampler>();
auto evaluator_ = HistEvaluator<float, CPUExpandEntry>{param, dmat->Info(), 4, sampler,
ObjInfo{ObjInfo::kRegression}};
auto evaluator_ = HistEvaluator<float, CPUExpandEntry>{param, dmat->Info(), 4, sampler};
CPUExpandEntry entry{0, 0, 10.0f};
entry.split.left_sum = GradStats{0.4, 0.6f};
@@ -125,7 +124,7 @@ TEST_F(TestPartitionBasedSplit, CPUHist) {
std::vector<FeatureType> ft{FeatureType::kCategorical};
auto sampler = std::make_shared<common::ColumnSampler>();
HistEvaluator<double, CPUExpandEntry> evaluator{param_, info_, common::OmpGetNumThreads(0),
sampler, ObjInfo{ObjInfo::kRegression}};
sampler};
evaluator.InitRoot(GradStats{total_gpair_});
RegTree tree;
std::vector<CPUExpandEntry> entries(1);
@@ -156,8 +155,8 @@ auto CompareOneHotAndPartition(bool onehot) {
int32_t n_threads = 16;
auto sampler = std::make_shared<common::ColumnSampler>();
auto evaluator = HistEvaluator<GradientSumT, CPUExpandEntry>{
param, dmat->Info(), n_threads, sampler, ObjInfo{ObjInfo::kRegression}};
auto evaluator =
HistEvaluator<GradientSumT, CPUExpandEntry>{param, dmat->Info(), n_threads, sampler};
std::vector<CPUExpandEntry> entries(1);
for (auto const &gmat : dmat->GetBatches<GHistIndexMatrix>({32, param.sparse_threshold})) {

View File

@@ -262,7 +262,7 @@ TEST(GpuHist, EvaluateRootSplit) {
info.num_col_ = kNCols;
DeviceSplitCandidate res =
maker.EvaluateRootSplit({6.4f, 12.8f}, 0, ObjInfo{ObjInfo::kRegression}).split;
maker.EvaluateRootSplit({6.4f, 12.8f}, 0).split;
ASSERT_EQ(res.findex, 7);
ASSERT_NEAR(res.fvalue, 0.26, xgboost::kRtEps);
@@ -300,11 +300,11 @@ void TestHistogramIndexImpl() {
const auto &maker = hist_maker.maker;
auto grad = GenerateRandomGradients(kNRows);
grad.SetDevice(0);
maker->Reset(&grad, hist_maker_dmat.get(), kNCols, ObjInfo{ObjInfo::kRegression});
maker->Reset(&grad, hist_maker_dmat.get(), kNCols);
std::vector<common::CompressedByteT> h_gidx_buffer(maker->page->gidx_buffer.HostVector());
const auto &maker_ext = hist_maker_ext.maker;
maker_ext->Reset(&grad, hist_maker_ext_dmat.get(), kNCols, ObjInfo{ObjInfo::kRegression});
maker_ext->Reset(&grad, hist_maker_ext_dmat.get(), kNCols);
std::vector<common::CompressedByteT> h_gidx_buffer_ext(maker_ext->page->gidx_buffer.HostVector());
ASSERT_EQ(maker->page->Cuts().TotalBins(), maker_ext->page->Cuts().TotalBins());

View File

@@ -61,6 +61,9 @@ class TestGPUUpdaters:
def test_categorical(self, rows, cols, rounds, cats):
self.cputest.run_categorical_basic(rows, cols, rounds, cats, "gpu_hist")
def test_max_cat(self) -> None:
self.cputest.run_max_cat("gpu_hist")
def test_categorical_32_cat(self):
'''32 hits the bound of integer bitset, so special test'''
rows = 1000

View File

@@ -1,3 +1,5 @@
from random import choice
from string import ascii_lowercase
import testing as tm
import pytest
import xgboost as xgb
@@ -167,6 +169,30 @@ class TestTreeMethod:
def test_invalid_category(self) -> None:
self.run_invalid_category("approx")
self.run_invalid_category("hist")
def run_max_cat(self, tree_method: str) -> None:
"""Test data with size smaller than number of categories."""
import pandas as pd
n_cat = 100
n = 5
X = pd.Series(
["".join(choice(ascii_lowercase) for i in range(3)) for i in range(n_cat)],
dtype="category",
)[:n].to_frame()
reg = xgb.XGBRegressor(
enable_categorical=True,
tree_method=tree_method,
n_estimators=10,
)
y = pd.Series(range(n))
reg.fit(X=X, y=y, eval_set=[(X, y)])
assert tm.non_increasing(reg.evals_result()["validation_0"]["rmse"])
@pytest.mark.parametrize("tree_method", ["hist", "approx"])
def test_max_cat(self, tree_method) -> None:
self.run_max_cat(tree_method)
def run_categorical_basic(self, rows, cols, rounds, cats, tree_method):
onehot, label = tm.make_categorical(rows, cols, cats, True)