Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f75c007f27 | ||
|
|
816e788b29 | ||
|
|
3ee3b18a22 | ||
|
|
ece4dc457b | ||
|
|
67298ccd03 | ||
|
|
78d231264a | ||
|
|
4615fa51ef | ||
|
|
4bd5a33b10 |
@@ -153,9 +153,9 @@ def TestWin64() {
|
||||
conda activate ${env_name} && for /R %%i in (python-package\\dist\\*.whl) DO python -m pip install "%%i"
|
||||
"""
|
||||
echo "Running Python tests..."
|
||||
bat "conda activate ${env_name} && python -m pytest -v -s -rxXs --fulltrace tests\\python"
|
||||
bat "conda activate ${env_name} && python -X faulthandler -m pytest -v -s -rxXs --fulltrace tests\\python"
|
||||
bat """
|
||||
conda activate ${env_name} && python -m pytest -v -s -rxXs --fulltrace -m "(not slow) and (not mgpu)" tests\\python-gpu
|
||||
conda activate ${env_name} && python -X faulthandler -m pytest -v -s -rxXs --fulltrace -m "(not slow) and (not mgpu)" tests\\python-gpu
|
||||
"""
|
||||
bat "conda env remove --name ${env_name}"
|
||||
deleteDir()
|
||||
|
||||
@@ -1 +1 @@
|
||||
@xgboost_VERSION_MAJOR@.@xgboost_VERSION_MINOR@.@xgboost_VERSION_PATCH@-dev
|
||||
@xgboost_VERSION_MAJOR@.@xgboost_VERSION_MINOR@.@xgboost_VERSION_PATCH@
|
||||
@@ -91,9 +91,9 @@ function(format_gencode_flags flags out)
|
||||
# Set up architecture flags
|
||||
if(NOT flags)
|
||||
if (CUDA_VERSION VERSION_GREATER_EQUAL "11.1")
|
||||
set(flags "50;52;60;61;70;75;80;86")
|
||||
set(flags "52;60;61;70;75;80;86")
|
||||
elseif (CUDA_VERSION VERSION_GREATER_EQUAL "11.0")
|
||||
set(flags "35;50;52;60;61;70;75;80")
|
||||
set(flags "52;60;61;70;75;80")
|
||||
elseif(CUDA_VERSION VERSION_GREATER_EQUAL "10.0")
|
||||
set(flags "35;50;52;60;61;70;75")
|
||||
elseif(CUDA_VERSION VERSION_GREATER_EQUAL "9.0")
|
||||
|
||||
@@ -101,7 +101,7 @@ R
|
||||
JVM
|
||||
---
|
||||
|
||||
You can use XGBoost4J in your Java/Scala application by adding XGBoost4J as a dependency:
|
||||
* XGBoost4j/XGBoost4j-Spark
|
||||
|
||||
.. code-block:: xml
|
||||
:caption: Maven
|
||||
@@ -134,6 +134,39 @@ You can use XGBoost4J in your Java/Scala application by adding XGBoost4J as a de
|
||||
"ml.dmlc" %% "xgboost4j-spark" % "latest_version_num"
|
||||
)
|
||||
|
||||
* XGBoost4j-GPU/XGBoost4j-Spark-GPU
|
||||
|
||||
.. code-block:: xml
|
||||
:caption: Maven
|
||||
|
||||
<properties>
|
||||
...
|
||||
<!-- Specify Scala version in package name -->
|
||||
<scala.binary.version>2.12</scala.binary.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
...
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-gpu_${scala.binary.version}</artifactId>
|
||||
<version>latest_version_num</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-spark-gpu_${scala.binary.version}</artifactId>
|
||||
<version>latest_version_num</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
.. code-block:: scala
|
||||
:caption: sbt
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
"ml.dmlc" %% "xgboost4j-gpu" % "latest_version_num",
|
||||
"ml.dmlc" %% "xgboost4j-spark-gpu" % "latest_version_num"
|
||||
)
|
||||
|
||||
This will check out the latest stable version from the Maven Central.
|
||||
|
||||
For the latest release version number, please check `release page <https://github.com/dmlc/xgboost/releases>`_.
|
||||
@@ -185,7 +218,7 @@ and Windows.) Download it and run the following commands:
|
||||
JVM
|
||||
---
|
||||
|
||||
First add the following Maven repository hosted by the XGBoost project:
|
||||
* XGBoost4j/XGBoost4j-Spark
|
||||
|
||||
.. code-block:: xml
|
||||
:caption: Maven
|
||||
@@ -234,6 +267,40 @@ Then add XGBoost4J as a dependency:
|
||||
"ml.dmlc" %% "xgboost4j-spark" % "latest_version_num-SNAPSHOT"
|
||||
)
|
||||
|
||||
* XGBoost4j-GPU/XGBoost4j-Spark-GPU
|
||||
|
||||
.. code-block:: xml
|
||||
:caption: maven
|
||||
|
||||
<properties>
|
||||
...
|
||||
<!-- Specify Scala version in package name -->
|
||||
<scala.binary.version>2.12</scala.binary.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
...
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-gpu_${scala.binary.version}</artifactId>
|
||||
<version>latest_version_num-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-spark-gpu_${scala.binary.version}</artifactId>
|
||||
<version>latest_version_num-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
.. code-block:: scala
|
||||
:caption: sbt
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
"ml.dmlc" %% "xgboost4j-gpu" % "latest_version_num-SNAPSHOT",
|
||||
"ml.dmlc" %% "xgboost4j-spark-gpu" % "latest_version_num-SNAPSHOT"
|
||||
)
|
||||
|
||||
|
||||
Look up the ``version`` field in `pom.xml <https://github.com/dmlc/xgboost/blob/master/jvm-packages/pom.xml>`_ to get the correct version number.
|
||||
|
||||
The SNAPSHOT JARs are hosted by the XGBoost project. Every commit in the ``master`` branch will automatically trigger generation of a new SNAPSHOT JAR. You can control how often Maven should upgrade your SNAPSHOT installation by specifying ``updatePolicy``. See `here <http://maven.apache.org/pom.html#Repositories>`_ for details.
|
||||
|
||||
@@ -35,6 +35,7 @@ Contents
|
||||
|
||||
java_intro
|
||||
XGBoost4J-Spark Tutorial <xgboost4j_spark_tutorial>
|
||||
XGBoost4J-Spark-GPU Tutorial <xgboost4j_spark_gpu_tutorial>
|
||||
Code Examples <https://github.com/dmlc/xgboost/tree/master/jvm-packages/xgboost4j-example>
|
||||
XGBoost4J Java API <javadocs/index>
|
||||
XGBoost4J Scala API <scaladocs/xgboost4j/index>
|
||||
|
||||
246
doc/jvm/xgboost4j_spark_gpu_tutorial.rst
Normal file
246
doc/jvm/xgboost4j_spark_gpu_tutorial.rst
Normal file
@@ -0,0 +1,246 @@
|
||||
#############################################
|
||||
XGBoost4J-Spark-GPU Tutorial (version 1.6.0+)
|
||||
#############################################
|
||||
|
||||
**XGBoost4J-Spark-GPU** is an open source library aiming to accelerate distributed XGBoost training on Apache Spark cluster from
|
||||
end to end with GPUs by leveraging the `RAPIDS Accelerator for Apache Spark <https://nvidia.github.io/spark-rapids/>`_ product.
|
||||
|
||||
This tutorial will show you how to use **XGBoost4J-Spark-GPU**.
|
||||
|
||||
.. contents::
|
||||
:backlinks: none
|
||||
:local:
|
||||
|
||||
************************************************
|
||||
Build an ML Application with XGBoost4J-Spark-GPU
|
||||
************************************************
|
||||
|
||||
Add XGBoost to Your Project
|
||||
===========================
|
||||
|
||||
Before we go into the tour of how to use XGBoost4J-Spark-GPU, you should first consult
|
||||
:ref:`Installation from Maven repository <install_jvm_packages>` in order to add XGBoost4J-Spark-GPU as
|
||||
a dependency for your project. We provide both stable releases and snapshots.
|
||||
|
||||
Data Preparation
|
||||
================
|
||||
|
||||
In this section, we use the `Iris <https://archive.ics.uci.edu/ml/datasets/iris>`_ dataset as an example to
|
||||
showcase how we use Apache Spark to transform a raw dataset and make it fit the data interface of XGBoost.
|
||||
|
||||
The Iris dataset is shipped in CSV format. Each instance contains 4 features, "sepal length", "sepal width",
|
||||
"petal length" and "petal width". In addition, it contains the "class" column, which is essentially the
|
||||
label with three possible values: "Iris Setosa", "Iris Versicolour" and "Iris Virginica".
|
||||
|
||||
Read Dataset with Spark's Built-In Reader
|
||||
-----------------------------------------
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
|
||||
|
||||
val spark = SparkSession.builder().getOrCreate()
|
||||
|
||||
val labelName = "class"
|
||||
val schema = new StructType(Array(
|
||||
StructField("sepal length", DoubleType, true),
|
||||
StructField("sepal width", DoubleType, true),
|
||||
StructField("petal length", DoubleType, true),
|
||||
StructField("petal width", DoubleType, true),
|
||||
StructField(labelName, StringType, true)))
|
||||
|
||||
val xgbInput = spark.read.option("header", "false")
|
||||
.schema(schema)
|
||||
.csv(dataPath)
|
||||
|
||||
In the first line, we create an instance of a `SparkSession <https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession>`_
|
||||
which is the entry point of any Spark application working with DataFrames. The ``schema`` variable
|
||||
defines the schema of the DataFrame wrapping Iris data. With this explicitly set schema, we
|
||||
can define the column names as well as their types; otherwise the column names would be
|
||||
the default ones derived by Spark, such as ``_col0``, etc. Finally, we can use Spark's
|
||||
built-in CSV reader to load the Iris CSV file as a DataFrame named ``xgbInput``.
|
||||
|
||||
Apache Spark also contains many built-in readers for other formats such as ORC, Parquet, Avro, JSON.
|
||||
|
||||
|
||||
Transform Raw Iris Dataset
|
||||
--------------------------
|
||||
|
||||
To make the Iris dataset recognizable to XGBoost, we need to encode the String-typed
|
||||
label, i.e. "class", to the Double-typed label.
|
||||
|
||||
One way to convert the String-typed label to Double is to use Spark's built-in feature transformer
|
||||
`StringIndexer <https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.ml.feature.StringIndexer>`_.
|
||||
But this feature is not accelerated in RAPIDS Accelerator, which means it will fall back
|
||||
to CPU. Instead, we use an alternative way to achieve the same goal with the following code:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import org.apache.spark.sql.expressions.Window
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
val spec = Window.orderBy(labelName)
|
||||
val Array(train, test) = xgbInput
|
||||
.withColumn("tmpClassName", dense_rank().over(spec) - 1)
|
||||
.drop(labelName)
|
||||
.withColumnRenamed("tmpClassName", labelName)
|
||||
.randomSplit(Array(0.7, 0.3), seed = 1)
|
||||
|
||||
train.show(5)
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
+------------+-----------+------------+-----------+-----+
|
||||
|sepal length|sepal width|petal length|petal width|class|
|
||||
+------------+-----------+------------+-----------+-----+
|
||||
| 4.3| 3.0| 1.1| 0.1| 0|
|
||||
| 4.4| 2.9| 1.4| 0.2| 0|
|
||||
| 4.4| 3.0| 1.3| 0.2| 0|
|
||||
| 4.4| 3.2| 1.3| 0.2| 0|
|
||||
| 4.6| 3.2| 1.4| 0.2| 0|
|
||||
+------------+-----------+------------+-----------+-----+
|
||||
|
||||
|
||||
With window operations, we have mapped the string column of labels to label indices.
|
||||
|
||||
Training
|
||||
========
|
||||
|
||||
The GPU version of XGBoost-Spark supports both regression and classification
|
||||
models. Although we use the Iris dataset in this tutorial to show how we use
|
||||
``XGBoost/XGBoost4J-Spark-GPU`` to resolve a multi-classes classification problem, the
|
||||
usage in Regression is very similar to classification.
|
||||
|
||||
To train a XGBoost model for classification, we need to claim a XGBoostClassifier first:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
|
||||
val xgbParam = Map(
|
||||
"objective" -> "multi:softprob",
|
||||
"num_class" -> 3,
|
||||
"num_round" -> 100,
|
||||
"tree_method" -> "gpu_hist",
|
||||
"num_workers" -> 1)
|
||||
|
||||
val featuresNames = schema.fieldNames.filter(name => name != labelName)
|
||||
|
||||
val xgbClassifier = new XGBoostClassifier(xgbParam)
|
||||
.setFeaturesCol(featuresNames)
|
||||
.setLabelCol(labelName)
|
||||
|
||||
The available parameters for training a XGBoost model can be found in :doc:`here </parameter>`.
|
||||
Similar to the XGBoost4J-Spark package, in addition to the default set of parameters,
|
||||
XGBoost4J-Spark-GPU also supports the camel-case variant of these parameters to be
|
||||
consistent with Spark's MLlib naming convention.
|
||||
|
||||
Specifically, each parameter in :doc:`this page </parameter>` has its equivalent form in
|
||||
XGBoost4J-Spark-GPU with camel case. For example, to set ``max_depth`` for each tree, you can pass
|
||||
parameter just like what we did in the above code snippet (as ``max_depth`` wrapped in a Map), or
|
||||
you can do it through setters in XGBoostClassifer:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
val xgbClassifier = new XGBoostClassifier(xgbParam)
|
||||
.setFeaturesCol(featuresNames)
|
||||
.setLabelCol(labelName)
|
||||
xgbClassifier.setMaxDepth(2)
|
||||
|
||||
.. note::
|
||||
|
||||
In contrast with XGBoost4j-Spark which accepts both a feature column with VectorUDT type and
|
||||
an array of feature column names, XGBoost4j-Spark-GPU only accepts an array of feature
|
||||
column names by ``setFeaturesCol(value: Array[String])``.
|
||||
|
||||
After setting XGBoostClassifier parameters and feature/label columns, we can build a
|
||||
transformer, XGBoostClassificationModel by fitting XGBoostClassifier with the input
|
||||
DataFrame. This ``fit`` operation is essentially the training process and the generated
|
||||
model can then be used in other tasks like prediction.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
val xgbClassificationModel = xgbClassifier.fit(train)
|
||||
|
||||
Prediction
|
||||
==========
|
||||
|
||||
When we get a model, either a XGBoostClassificationModel or a XGBoostRegressionModel, it takes a DataFrame as an input,
|
||||
reads the column containing feature vectors, predicts for each feature vector, and outputs a new DataFrame
|
||||
with the following columns by default:
|
||||
|
||||
* XGBoostClassificationModel will output margins (``rawPredictionCol``), probabilities(``probabilityCol``) and the eventual prediction labels (``predictionCol``) for each possible label.
|
||||
* XGBoostRegressionModel will output prediction a label(``predictionCol``).
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
val xgbClassificationModel = xgbClassifier.fit(train)
|
||||
val results = xgbClassificationModel.transform(test)
|
||||
results.show()
|
||||
|
||||
With the above code snippet, we get a DataFrame as result, which contains the margin, probability for each class,
|
||||
and the prediction for each instance.
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
|
||||
|sepal length|sepal width| petal length| petal width|class| rawPrediction| probability|prediction|
|
||||
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
|
||||
| 4.5| 2.3| 1.3|0.30000000000000004| 0|[3.16666603088378...|[0.98853939771652...| 0.0|
|
||||
| 4.6| 3.1| 1.5| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 4.8| 3.1| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 4.8| 3.4| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 4.8| 3.4|1.9000000000000001| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 4.9| 2.4| 3.3| 1.0| 1|[-2.1498908996582...|[0.00596602633595...| 1.0|
|
||||
| 4.9| 2.5| 4.5| 1.7| 2|[-2.1498908996582...|[0.00596602633595...| 1.0|
|
||||
| 5.0| 3.5| 1.3|0.30000000000000004| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.1| 2.5| 3.0| 1.1| 1|[3.16666603088378...|[0.98853939771652...| 0.0|
|
||||
| 5.1| 3.3| 1.7| 0.5| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.1| 3.5| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.1| 3.8| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.2| 3.4| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.2| 3.5| 1.5| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.2| 4.1| 1.5| 0.1| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.4| 3.9| 1.7| 0.4| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.5| 2.4| 3.8| 1.1| 1|[-2.1498908996582...|[0.00596602633595...| 1.0|
|
||||
| 5.5| 4.2| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
|
||||
| 5.7| 2.5| 5.0| 2.0| 2|[-2.1498908996582...|[0.00280966912396...| 2.0|
|
||||
| 5.7| 3.0| 4.2| 1.2| 1|[-2.1498908996582...|[0.00643939292058...| 1.0|
|
||||
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
|
||||
|
||||
**********************
|
||||
Submit the application
|
||||
**********************
|
||||
|
||||
Here’s an example to submit an end-to-end XGBoost-4j-Spark-GPU Spark application to an
|
||||
Apache Spark Standalone cluster, assuming the application main class is Iris and the
|
||||
application jar is iris-1.0.0.jar
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
cudf_version=22.02.0
|
||||
rapids_version=22.02.0
|
||||
xgboost_version=1.6.0
|
||||
main_class=Iris
|
||||
app_jar=iris-1.0.0.jar
|
||||
|
||||
spark-submit \
|
||||
--master $master \
|
||||
--packages ai.rapids:cudf:${cudf_version},com.nvidia:rapids-4-spark_2.12:${rapids_version},ml.dmlc:xgboost4j-gpu_2.12:${xgboost_version},ml.dmlc:xgboost4j-spark-gpu_2.12:${xgboost_version} \
|
||||
--conf spark.executor.cores=12 \
|
||||
--conf spark.task.cpus=1 \
|
||||
--conf spark.executor.resource.gpu.amount=1 \
|
||||
--conf spark.task.resource.gpu.amount=0.08 \
|
||||
--conf spark.rapids.sql.csv.read.double.enabled=true \
|
||||
--conf spark.rapids.sql.hasNans=false \
|
||||
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
|
||||
--class ${main_class} \
|
||||
${app_jar}
|
||||
|
||||
* First, we need to specify the ``RAPIDS Accelerator, cudf, xgboost4j-gpu, xgboost4j-spark-gpu`` packages by ``--packages``
|
||||
* Second, ``RAPIDS Accelerator`` is a Spark plugin, so we need to configure it by specifying ``spark.plugins=com.nvidia.spark.SQLPlugin``
|
||||
|
||||
For details about other ``RAPIDS Accelerator`` other configurations, please refer to the `configuration <https://nvidia.github.io/spark-rapids/docs/configs.html>`_.
|
||||
|
||||
For ``RAPIDS Accelerator Frequently Asked Questions``, please refer to the
|
||||
`frequently-asked-questions <https://nvidia.github.io/spark-rapids/docs/FAQ.html#frequently-asked-questions>`_.
|
||||
@@ -127,6 +127,11 @@ Now, we have a DataFrame containing only two columns, "features" which contains
|
||||
"sepal length", "sepal width", "petal length" and "petal width" and "classIndex" which has Double-typed
|
||||
labels. A DataFrame like this (containing vector-represented features and numeric labels) can be fed to XGBoost4J-Spark's training engine directly.
|
||||
|
||||
.. note::
|
||||
|
||||
There is no need to assemble feature columns from version 1.6.0+. Instead, users can specify an array of
|
||||
feture column names by ``setFeaturesCol(value: Array[String])`` and XGBoost4j-Spark will do it.
|
||||
|
||||
Dealing with missing values
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ See `Awesome XGBoost <https://github.com/dmlc/xgboost/tree/master/demo>`_ for mo
|
||||
Distributed XGBoost with AWS YARN <aws_yarn>
|
||||
kubernetes
|
||||
Distributed XGBoost with XGBoost4J-Spark <https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html>
|
||||
Distributed XGBoost with XGBoost4J-Spark-GPU <https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_gpu_tutorial.html>
|
||||
dask
|
||||
ray
|
||||
dart
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>XGBoost JVM Package</name>
|
||||
<description>JVM Package for XGBoost</description>
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-example_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
<packaging>jar</packaging>
|
||||
<build>
|
||||
<plugins>
|
||||
@@ -26,7 +26,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-spark_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
@@ -37,7 +37,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-flink_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-flink_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
@@ -26,7 +26,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-gpu_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
@@ -20,11 +20,6 @@
|
||||
<classifier>${cudf.classifier}</classifier>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>2.10.5.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2021 by Contributors
|
||||
Copyright (c) 2021-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,15 +16,7 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.gpu.java;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* Cudf utilities to build cuda array interface against {@link CudfColumn}
|
||||
@@ -42,58 +34,64 @@ class CudfUtils {
|
||||
|
||||
// Helper class to build array interface string
|
||||
private static class Builder {
|
||||
private JsonNodeFactory nodeFactory = new JsonNodeFactory(false);
|
||||
private ArrayNode rootArrayNode = nodeFactory.arrayNode();
|
||||
private ArrayList<String> colArrayInterfaces = new ArrayList<String>();
|
||||
|
||||
private Builder add(CudfColumn... columns) {
|
||||
if (columns == null || columns.length <= 0) {
|
||||
throw new IllegalArgumentException("At least one ColumnData is required.");
|
||||
}
|
||||
for (CudfColumn cd : columns) {
|
||||
rootArrayNode.add(buildColumnObject(cd));
|
||||
colArrayInterfaces.add(buildColumnObject(cd));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private String build() {
|
||||
try {
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
JsonGenerator jsonGen = new JsonFactory().createGenerator(bos);
|
||||
new ObjectMapper().writeTree(jsonGen, rootArrayNode);
|
||||
return bos.toString();
|
||||
} catch (IOException ie) {
|
||||
ie.printStackTrace();
|
||||
throw new RuntimeException("Failed to build array interface. Error: " + ie);
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("[");
|
||||
for (int i = 0; i < colArrayInterfaces.size(); i++) {
|
||||
builder.append(colArrayInterfaces.get(i));
|
||||
if (i != colArrayInterfaces.size() - 1) {
|
||||
builder.append(",");
|
||||
}
|
||||
}
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private ObjectNode buildColumnObject(CudfColumn column) {
|
||||
/** build the whole column information including data and valid info */
|
||||
private String buildColumnObject(CudfColumn column) {
|
||||
if (column.getDataPtr() == 0) {
|
||||
throw new IllegalArgumentException("Empty column data is NOT accepted!");
|
||||
}
|
||||
if (column.getTypeStr() == null || column.getTypeStr().isEmpty()) {
|
||||
throw new IllegalArgumentException("Empty type string is NOT accepted!");
|
||||
}
|
||||
ObjectNode colDataObj = buildMetaObject(column.getDataPtr(), column.getShape(),
|
||||
column.getTypeStr());
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
String colData = buildMetaObject(column.getDataPtr(), column.getShape(),
|
||||
column.getTypeStr());
|
||||
builder.append("{");
|
||||
builder.append(colData);
|
||||
if (column.getValidPtr() != 0 && column.getNullCount() != 0) {
|
||||
ObjectNode validObj = buildMetaObject(column.getValidPtr(), column.getShape(), "<t1");
|
||||
colDataObj.set("mask", validObj);
|
||||
String validString = buildMetaObject(column.getValidPtr(), column.getShape(), "<t1");
|
||||
builder.append(",\"mask\":");
|
||||
builder.append("{");
|
||||
builder.append(validString);
|
||||
builder.append("}");
|
||||
}
|
||||
return colDataObj;
|
||||
builder.append("}");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private ObjectNode buildMetaObject(long ptr, long shape, final String typeStr) {
|
||||
ObjectNode objNode = nodeFactory.objectNode();
|
||||
ArrayNode shapeNode = objNode.putArray("shape");
|
||||
shapeNode.add(shape);
|
||||
ArrayNode dataNode = objNode.putArray("data");
|
||||
dataNode.add(ptr)
|
||||
.add(false);
|
||||
objNode.put("typestr", typeStr)
|
||||
.put("version", 1);
|
||||
return objNode;
|
||||
/** build the base information of a column */
|
||||
private String buildMetaObject(long ptr, long shape, final String typeStr) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("\"shape\":[" + shape + "],");
|
||||
builder.append("\"data\":[" + ptr + "," + "false" + "],");
|
||||
builder.append("\"typestr\":\"" + typeStr + "\",");
|
||||
builder.append("\"version\":" + 1);
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-spark-gpu_2.12</artifactId>
|
||||
<build>
|
||||
@@ -24,7 +24,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j-gpu_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
|
||||
@@ -112,7 +112,7 @@ private[spark] object GpuUtils {
|
||||
val msg = if (fitting) "train" else "transform"
|
||||
// feature columns
|
||||
require(featureNames.nonEmpty, s"Gpu $msg requires features columns. " +
|
||||
"please refer to setFeaturesCols!")
|
||||
"please refer to `setFeaturesCol(value: Array[String])`!")
|
||||
featureNames.foreach(fn => checkNumericType(schema, fn))
|
||||
if (fitting) {
|
||||
require(labelName.nonEmpty, "label column is not set.")
|
||||
|
||||
@@ -126,7 +126,7 @@ class GpuXGBoostClassifierSuite extends GpuTestSuite {
|
||||
|
||||
val vectorAssembler = new VectorAssembler()
|
||||
.setHandleInvalid("keep")
|
||||
.setInputCols(featureNames.toArray)
|
||||
.setInputCols(featureNames)
|
||||
.setOutputCol("features")
|
||||
val trainingDf = vectorAssembler.transform(rawInput).select("features", labelName)
|
||||
|
||||
@@ -147,12 +147,12 @@ class GpuXGBoostClassifierSuite extends GpuTestSuite {
|
||||
.csv(dataPath).randomSplit(Array(0.7, 0.3), seed = 1)
|
||||
|
||||
// Since CPU model does not know the information about the features cols that GPU transform
|
||||
// pipeline requires. End user needs to setFeaturesCols in the model manually
|
||||
val thrown = intercept[IllegalArgumentException](cpuModel
|
||||
// pipeline requires. End user needs to setFeaturesCol(features: Array[String]) in the model
|
||||
// manually
|
||||
val thrown = intercept[NoSuchElementException](cpuModel
|
||||
.transform(testDf)
|
||||
.collect())
|
||||
assert(thrown.getMessage.contains("Gpu transform requires features columns. " +
|
||||
"please refer to setFeaturesCols"))
|
||||
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
|
||||
|
||||
val left = cpuModel
|
||||
.setFeaturesCol(featureNames)
|
||||
@@ -195,17 +195,16 @@ class GpuXGBoostClassifierSuite extends GpuTestSuite {
|
||||
val featureColName = "feature_col"
|
||||
val vectorAssembler = new VectorAssembler()
|
||||
.setHandleInvalid("keep")
|
||||
.setInputCols(featureNames.toArray)
|
||||
.setInputCols(featureNames)
|
||||
.setOutputCol(featureColName)
|
||||
val testDf = vectorAssembler.transform(rawInput).select(featureColName, labelName)
|
||||
|
||||
// Since GPU model does not know the information about the features col name that CPU
|
||||
// transform pipeline requires. End user needs to setFeaturesCol in the model manually
|
||||
val thrown = intercept[IllegalArgumentException](
|
||||
intercept[IllegalArgumentException](
|
||||
gpuModel
|
||||
.transform(testDf)
|
||||
.collect())
|
||||
assert(thrown.getMessage.contains("features does not exist"))
|
||||
|
||||
val left = gpuModel
|
||||
.setFeaturesCol(featureColName)
|
||||
|
||||
@@ -108,12 +108,15 @@ class GpuXGBoostGeneralSuite extends GpuTestSuite {
|
||||
val trainingDf = trainingData.toDF(allColumnNames: _*)
|
||||
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
|
||||
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1, "tree_method" -> "gpu_hist")
|
||||
val thrown = intercept[IllegalArgumentException] {
|
||||
|
||||
// GPU train requires featuresCols. If not specified,
|
||||
// then NoSuchElementException will be thrown
|
||||
val thrown = intercept[NoSuchElementException] {
|
||||
new XGBoostClassifier(xgbParam)
|
||||
.setLabelCol(labelName)
|
||||
.fit(trainingDf)
|
||||
}
|
||||
assert(thrown.getMessage.contains("Gpu train requires features columns."))
|
||||
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
|
||||
|
||||
val thrown1 = intercept[IllegalArgumentException] {
|
||||
new XGBoostClassifier(xgbParam)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2021 by Contributors
|
||||
Copyright (c) 2021-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -86,7 +86,7 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
|
||||
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
|
||||
|
||||
val classifier = new XGBoostRegressor(xgbParam)
|
||||
.setFeaturesCols(featureNames)
|
||||
.setFeaturesCol(featureNames)
|
||||
.setLabelCol(labelName)
|
||||
.setTreeMethod("gpu_hist")
|
||||
(classifier.fit(rawInput), testDf)
|
||||
@@ -122,7 +122,7 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
|
||||
|
||||
val vectorAssembler = new VectorAssembler()
|
||||
.setHandleInvalid("keep")
|
||||
.setInputCols(featureNames.toArray)
|
||||
.setInputCols(featureNames)
|
||||
.setOutputCol("features")
|
||||
val trainingDf = vectorAssembler.transform(rawInput).select("features", labelName)
|
||||
|
||||
@@ -143,20 +143,20 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
|
||||
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
|
||||
|
||||
// Since CPU model does not know the information about the features cols that GPU transform
|
||||
// pipeline requires. End user needs to setFeaturesCols in the model manually
|
||||
val thrown = intercept[IllegalArgumentException](cpuModel
|
||||
// pipeline requires. End user needs to setFeaturesCol(features: Array[String]) in the model
|
||||
// manually
|
||||
val thrown = intercept[NoSuchElementException](cpuModel
|
||||
.transform(testDf)
|
||||
.collect())
|
||||
assert(thrown.getMessage.contains("Gpu transform requires features columns. " +
|
||||
"please refer to setFeaturesCols"))
|
||||
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
|
||||
|
||||
val left = cpuModel
|
||||
.setFeaturesCols(featureNames)
|
||||
.setFeaturesCol(featureNames)
|
||||
.transform(testDf)
|
||||
.collect()
|
||||
|
||||
val right = cpuModelFromFile
|
||||
.setFeaturesCols(featureNames)
|
||||
.setFeaturesCol(featureNames)
|
||||
.transform(testDf)
|
||||
.collect()
|
||||
|
||||
@@ -173,7 +173,7 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
|
||||
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
|
||||
|
||||
val classifier = new XGBoostRegressor(xgbParam)
|
||||
.setFeaturesCols(featureNames)
|
||||
.setFeaturesCol(featureNames)
|
||||
.setLabelCol(labelName)
|
||||
.setTreeMethod("gpu_hist")
|
||||
classifier.fit(rawInput)
|
||||
@@ -191,17 +191,16 @@ class GpuXGBoostRegressorSuite extends GpuTestSuite {
|
||||
val featureColName = "feature_col"
|
||||
val vectorAssembler = new VectorAssembler()
|
||||
.setHandleInvalid("keep")
|
||||
.setInputCols(featureNames.toArray)
|
||||
.setInputCols(featureNames)
|
||||
.setOutputCol(featureColName)
|
||||
val testDf = vectorAssembler.transform(rawInput).select(featureColName, labelName)
|
||||
|
||||
// Since GPU model does not know the information about the features col name that CPU
|
||||
// transform pipeline requires. End user needs to setFeaturesCol in the model manually
|
||||
val thrown = intercept[IllegalArgumentException](
|
||||
intercept[IllegalArgumentException](
|
||||
gpuModel
|
||||
.transform(testDf)
|
||||
.collect())
|
||||
assert(thrown.getMessage.contains("features does not exist"))
|
||||
|
||||
val left = gpuModel
|
||||
.setFeaturesCol(featureColName)
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j-spark_2.12</artifactId>
|
||||
<build>
|
||||
@@ -24,7 +24,7 @@
|
||||
<dependency>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost4j_${scala.binary.version}</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2021 by Contributors
|
||||
Copyright (c) 2021-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -35,8 +35,10 @@ import org.apache.commons.logging.LogFactory
|
||||
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
import org.apache.spark.ml.{Estimator, Model, PipelineStage}
|
||||
import org.apache.spark.ml.linalg.Vector
|
||||
import org.apache.spark.ml.linalg.xgboost.XGBoostSchemaUtils
|
||||
import org.apache.spark.sql.types.{ArrayType, FloatType, StructField, StructType}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
@@ -112,7 +114,7 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
|
||||
}
|
||||
|
||||
val (packedParams, evalSet) = estimator match {
|
||||
val (packedParams, evalSet, xgbInput) = estimator match {
|
||||
case est: XGBoostEstimatorCommon =>
|
||||
// get weight column, if weight is not defined, default to lit(1.0)
|
||||
val weight = if (!est.isDefined(est.weightCol) || est.getWeightCol.isEmpty) {
|
||||
@@ -136,20 +138,28 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
|
||||
}
|
||||
|
||||
(PackedParams(col(est.getLabelCol), col(est.getFeaturesCol), weight, baseMargin, group,
|
||||
est.getNumWorkers, est.needDeterministicRepartitioning), est.getEvalSets(params))
|
||||
val (xgbInput, featuresName) = est.vectorize(dataset)
|
||||
|
||||
val evalSets = est.getEvalSets(params).transform((_, df) => {
|
||||
val (dfTransformed, _) = est.vectorize(df)
|
||||
dfTransformed
|
||||
})
|
||||
|
||||
(PackedParams(col(est.getLabelCol), col(featuresName), weight, baseMargin, group,
|
||||
est.getNumWorkers, est.needDeterministicRepartitioning), evalSets, xgbInput)
|
||||
|
||||
case _ => throw new RuntimeException("Unsupporting " + estimator)
|
||||
}
|
||||
|
||||
// transform the training Dataset[_] to RDD[XGBLabeledPoint]
|
||||
val trainingSet: RDD[XGBLabeledPoint] = DataUtils.convertDataFrameToXGBLabeledPointRDDs(
|
||||
packedParams, dataset.asInstanceOf[DataFrame]).head
|
||||
packedParams, xgbInput.asInstanceOf[DataFrame]).head
|
||||
|
||||
// transform the eval Dataset[_] to RDD[XGBLabeledPoint]
|
||||
val evalRDDMap = evalSet.map {
|
||||
case (name, dataFrame) => (name,
|
||||
DataUtils.convertDataFrameToXGBLabeledPointRDDs(packedParams, dataFrame).head)
|
||||
DataUtils.convertDataFrameToXGBLabeledPointRDDs(packedParams,
|
||||
dataFrame.asInstanceOf[DataFrame]).head)
|
||||
}
|
||||
|
||||
val hasGroup = packedParams.group.map(_ != defaultGroupColumn).getOrElse(false)
|
||||
@@ -184,11 +194,11 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
/** get the necessary parameters */
|
||||
val (booster, inferBatchSize, featuresCol, useExternalMemory, missing, allowNonZeroForMissing,
|
||||
predictFunc, schema) =
|
||||
val (booster, inferBatchSize, xgbInput, featuresCol, useExternalMemory, missing,
|
||||
allowNonZeroForMissing, predictFunc, schema) =
|
||||
model match {
|
||||
case m: XGBoostClassificationModel =>
|
||||
|
||||
val (xgbInput, featuresName) = m.vectorize(dataset)
|
||||
// predict and turn to Row
|
||||
val predictFunc =
|
||||
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
|
||||
@@ -199,7 +209,7 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
// prepare the final Schema
|
||||
var schema = StructType(dataset.schema.fields ++
|
||||
var schema = StructType(xgbInput.schema.fields ++
|
||||
Seq(StructField(name = XGBoostClassificationModel._rawPredictionCol, dataType =
|
||||
ArrayType(FloatType, containsNull = false), nullable = false)) ++
|
||||
Seq(StructField(name = XGBoostClassificationModel._probabilityCol, dataType =
|
||||
@@ -214,11 +224,12 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
ArrayType(FloatType, containsNull = false), nullable = false))
|
||||
}
|
||||
|
||||
(m._booster, m.getInferBatchSize, m.getFeaturesCol, m.getUseExternalMemory, m.getMissing,
|
||||
m.getAllowNonZeroForMissingValue, predictFunc, schema)
|
||||
(m._booster, m.getInferBatchSize, xgbInput, featuresName, m.getUseExternalMemory,
|
||||
m.getMissing, m.getAllowNonZeroForMissingValue, predictFunc, schema)
|
||||
|
||||
case m: XGBoostRegressionModel =>
|
||||
// predict and turn to Row
|
||||
val (xgbInput, featuresName) = m.vectorize(dataset)
|
||||
val predictFunc =
|
||||
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
|
||||
val Array(rawPredictionItr, predLeafItr, predContribItr) =
|
||||
@@ -227,7 +238,7 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
// prepare the final Schema
|
||||
var schema = StructType(dataset.schema.fields ++
|
||||
var schema = StructType(xgbInput.schema.fields ++
|
||||
Seq(StructField(name = XGBoostRegressionModel._originalPredictionCol, dataType =
|
||||
ArrayType(FloatType, containsNull = false), nullable = false)))
|
||||
|
||||
@@ -240,14 +251,14 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
ArrayType(FloatType, containsNull = false), nullable = false))
|
||||
}
|
||||
|
||||
(m._booster, m.getInferBatchSize, m.getFeaturesCol, m.getUseExternalMemory, m.getMissing,
|
||||
m.getAllowNonZeroForMissingValue, predictFunc, schema)
|
||||
(m._booster, m.getInferBatchSize, xgbInput, featuresName, m.getUseExternalMemory,
|
||||
m.getMissing, m.getAllowNonZeroForMissingValue, predictFunc, schema)
|
||||
}
|
||||
|
||||
val bBooster = dataset.sparkSession.sparkContext.broadcast(booster)
|
||||
val appName = dataset.sparkSession.sparkContext.appName
|
||||
val bBooster = xgbInput.sparkSession.sparkContext.broadcast(booster)
|
||||
val appName = xgbInput.sparkSession.sparkContext.appName
|
||||
|
||||
val resultRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
|
||||
val resultRDD = xgbInput.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
|
||||
new AbstractIterator[Row] {
|
||||
private var batchCnt = 0
|
||||
|
||||
@@ -295,7 +306,7 @@ object PreXGBoost extends PreXGBoostProvider {
|
||||
}
|
||||
|
||||
bBooster.unpersist(blocking = false)
|
||||
dataset.sparkSession.createDataFrame(resultRDD, schema)
|
||||
xgbInput.sparkSession.createDataFrame(resultRDD, schema)
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014,2021 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -46,8 +46,14 @@ import org.apache.spark.sql.SparkSession
|
||||
* the Python Rabit tracker (in dmlc_core), whereas the latter is implemented
|
||||
* in Scala without Python components, and with full support of timeouts.
|
||||
* The Scala implementation is currently experimental, use at your own risk.
|
||||
*
|
||||
* @param hostIp The Rabit Tracker host IP address which is only used for python implementation.
|
||||
* This is only needed if the host IP cannot be automatically guessed.
|
||||
* @param pythonExec The python executed path for Rabit Tracker,
|
||||
* which is only used for python implementation.
|
||||
*/
|
||||
case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String )
|
||||
case class TrackerConf(workerConnectionTimeout: Long, trackerImpl: String,
|
||||
hostIp: String = "", pythonExec: String = "")
|
||||
|
||||
object TrackerConf {
|
||||
def apply(): TrackerConf = TrackerConf(0L, "python")
|
||||
@@ -336,13 +342,18 @@ object XGBoost extends Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
private def startTracker(nWorkers: Int, trackerConf: TrackerConf): IRabitTracker = {
|
||||
/** visiable for testing */
|
||||
private[scala] def getTracker(nWorkers: Int, trackerConf: TrackerConf): IRabitTracker = {
|
||||
val tracker: IRabitTracker = trackerConf.trackerImpl match {
|
||||
case "scala" => new RabitTracker(nWorkers)
|
||||
case "python" => new PyRabitTracker(nWorkers)
|
||||
case "python" => new PyRabitTracker(nWorkers, trackerConf.hostIp, trackerConf.pythonExec)
|
||||
case _ => new PyRabitTracker(nWorkers)
|
||||
}
|
||||
tracker
|
||||
}
|
||||
|
||||
private def startTracker(nWorkers: Int, trackerConf: TrackerConf): IRabitTracker = {
|
||||
val tracker = getTracker(nWorkers, trackerConf)
|
||||
require(tracker.start(trackerConf.workerConnectionTimeout), "FAULT: Failed to start tracker")
|
||||
tracker
|
||||
}
|
||||
|
||||
@@ -144,13 +144,6 @@ class XGBoostClassifier (
|
||||
def setSinglePrecisionHistogram(value: Boolean): this.type =
|
||||
set(singlePrecisionHistogram, value)
|
||||
|
||||
/**
|
||||
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
|
||||
* all feature columns must be numeric types.
|
||||
*/
|
||||
def setFeaturesCol(value: Array[String]): this.type =
|
||||
set(featuresCols, value)
|
||||
|
||||
// called at the start of fit/train when 'eval_metric' is not defined
|
||||
private def setupDefaultEvalMetric(): String = {
|
||||
require(isDefined(objective), "Users must set \'objective\' via xgboostParams.")
|
||||
@@ -165,7 +158,12 @@ class XGBoostClassifier (
|
||||
|
||||
// Callback from PreXGBoost
|
||||
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
|
||||
super.transformSchema(schema)
|
||||
if (isFeaturesColSet(schema)) {
|
||||
// User has vectorized the features into VectorUDT.
|
||||
super.transformSchema(schema)
|
||||
} else {
|
||||
transformSchemaWithFeaturesCols(true, schema)
|
||||
}
|
||||
}
|
||||
|
||||
override def transformSchema(schema: StructType): StructType = {
|
||||
@@ -260,13 +258,6 @@ class XGBoostClassificationModel private[ml](
|
||||
|
||||
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
|
||||
|
||||
/**
|
||||
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
|
||||
* all feature columns must be numeric types.
|
||||
*/
|
||||
def setFeaturesCol(value: Array[String]): this.type =
|
||||
set(featuresCols, value)
|
||||
|
||||
/**
|
||||
* Single instance prediction.
|
||||
* Note: The performance is not ideal, use it carefully!
|
||||
@@ -359,7 +350,12 @@ class XGBoostClassificationModel private[ml](
|
||||
}
|
||||
|
||||
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
|
||||
super.transformSchema(schema)
|
||||
if (isFeaturesColSet(schema)) {
|
||||
// User has vectorized the features into VectorUDT.
|
||||
super.transformSchema(schema)
|
||||
} else {
|
||||
transformSchemaWithFeaturesCols(false, schema)
|
||||
}
|
||||
}
|
||||
|
||||
override def transformSchema(schema: StructType): StructType = {
|
||||
@@ -385,8 +381,6 @@ class XGBoostClassificationModel private[ml](
|
||||
Vectors.dense(rawPredictions)
|
||||
}
|
||||
|
||||
|
||||
|
||||
if ($(rawPredictionCol).nonEmpty) {
|
||||
outputData = outputData
|
||||
.withColumn(getRawPredictionCol, rawPredictionUDF(col(_rawPredictionCol)))
|
||||
|
||||
@@ -146,13 +146,6 @@ class XGBoostRegressor (
|
||||
def setSinglePrecisionHistogram(value: Boolean): this.type =
|
||||
set(singlePrecisionHistogram, value)
|
||||
|
||||
/**
|
||||
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
|
||||
* all feature columns must be numeric types.
|
||||
*/
|
||||
def setFeaturesCols(value: Array[String]): this.type =
|
||||
set(featuresCols, value)
|
||||
|
||||
// called at the start of fit/train when 'eval_metric' is not defined
|
||||
private def setupDefaultEvalMetric(): String = {
|
||||
require(isDefined(objective), "Users must set \'objective\' via xgboostParams.")
|
||||
@@ -164,7 +157,12 @@ class XGBoostRegressor (
|
||||
}
|
||||
|
||||
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
|
||||
super.transformSchema(schema)
|
||||
if (isFeaturesColSet(schema)) {
|
||||
// User has vectorized the features into VectorUDT.
|
||||
super.transformSchema(schema)
|
||||
} else {
|
||||
transformSchemaWithFeaturesCols(false, schema)
|
||||
}
|
||||
}
|
||||
|
||||
override def transformSchema(schema: StructType): StructType = {
|
||||
@@ -253,13 +251,6 @@ class XGBoostRegressionModel private[ml] (
|
||||
|
||||
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
|
||||
|
||||
/**
|
||||
* This API is only used in GPU train pipeline of xgboost4j-spark-gpu, which requires
|
||||
* all feature columns must be numeric types.
|
||||
*/
|
||||
def setFeaturesCols(value: Array[String]): this.type =
|
||||
set(featuresCols, value)
|
||||
|
||||
/**
|
||||
* Single instance prediction.
|
||||
* Note: The performance is not ideal, use it carefully!
|
||||
@@ -331,7 +322,12 @@ class XGBoostRegressionModel private[ml] (
|
||||
}
|
||||
|
||||
private[spark] def transformSchemaInternal(schema: StructType): StructType = {
|
||||
super.transformSchema(schema)
|
||||
if (isFeaturesColSet(schema)) {
|
||||
// User has vectorized the features into VectorUDT.
|
||||
super.transformSchema(schema)
|
||||
} else {
|
||||
transformSchemaWithFeaturesCols(false, schema)
|
||||
}
|
||||
}
|
||||
|
||||
override def transformSchema(schema: StructType): StructType = {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014,2021 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -247,6 +247,27 @@ trait HasNumClass extends Params {
|
||||
final def getNumClass: Int = $(numClass)
|
||||
}
|
||||
|
||||
/**
|
||||
* Trait for shared param featuresCols.
|
||||
*/
|
||||
trait HasFeaturesCols extends Params {
|
||||
/**
|
||||
* Param for the names of feature columns.
|
||||
* @group param
|
||||
*/
|
||||
final val featuresCols: StringArrayParam = new StringArrayParam(this, "featuresCols",
|
||||
"an array of feature column names.")
|
||||
|
||||
/** @group getParam */
|
||||
final def getFeaturesCols: Array[String] = $(featuresCols)
|
||||
|
||||
/** Check if featuresCols is valid */
|
||||
def isFeaturesColsValid: Boolean = {
|
||||
isDefined(featuresCols) && $(featuresCols) != Array.empty
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private[spark] trait ParamMapFuncs extends Params {
|
||||
|
||||
def XGBoost2MLlibParams(xgboostParams: Map[String, Any]): Unit = {
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
/*
|
||||
Copyright (c) 2021-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark.params
|
||||
|
||||
import org.apache.spark.ml.param.{Params, StringArrayParam}
|
||||
|
||||
trait GpuParams extends Params {
|
||||
/**
|
||||
* Param for the names of feature columns for GPU pipeline.
|
||||
* @group param
|
||||
*/
|
||||
final val featuresCols: StringArrayParam = new StringArrayParam(this, "featuresCols",
|
||||
"an array of feature column names for GPU pipeline.")
|
||||
|
||||
setDefault(featuresCols, Array.empty[String])
|
||||
|
||||
/** @group getParam */
|
||||
final def getFeaturesCols: Array[String] = $(featuresCols)
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014,2021 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -16,16 +16,101 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark.params
|
||||
|
||||
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasWeightCol}
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
import org.apache.spark.ml.linalg.xgboost.XGBoostSchemaUtils
|
||||
import org.apache.spark.ml.param.{Param, ParamValidators}
|
||||
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, HasLabelCol, HasWeightCol}
|
||||
import org.apache.spark.sql.Dataset
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
private[scala] sealed trait XGBoostEstimatorCommon extends GeneralParams with LearningTaskParams
|
||||
with BoosterParams with RabitParams with ParamMapFuncs with NonParamVariables with HasWeightCol
|
||||
with HasBaseMarginCol with HasLeafPredictionCol with HasContribPredictionCol with HasFeaturesCol
|
||||
with HasLabelCol with GpuParams {
|
||||
with HasLabelCol with HasFeaturesCols with HasHandleInvalid {
|
||||
|
||||
def needDeterministicRepartitioning: Boolean = {
|
||||
getCheckpointPath != null && getCheckpointPath.nonEmpty && getCheckpointInterval > 0
|
||||
}
|
||||
|
||||
/**
|
||||
* Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with
|
||||
* invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the
|
||||
* output). Column lengths are taken from the size of ML Attribute Group, which can be set using
|
||||
* `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred
|
||||
* from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'.
|
||||
* Default: "error"
|
||||
* @group param
|
||||
*/
|
||||
override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
|
||||
"""Param for how to handle invalid data (NULL and NaN values). Options are 'skip' (filter out
|
||||
|rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN
|
||||
|in the output). Column lengths are taken from the size of ML Attribute Group, which can be
|
||||
|set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also
|
||||
|be inferred from first rows of the data since it is safe to do so but only in case of 'error'
|
||||
|or 'skip'.""".stripMargin.replaceAll("\n", " "),
|
||||
ParamValidators.inArray(Array("skip", "error", "keep")))
|
||||
|
||||
setDefault(handleInvalid, "error")
|
||||
|
||||
/**
|
||||
* Specify an array of feature column names which must be numeric types.
|
||||
*/
|
||||
def setFeaturesCol(value: Array[String]): this.type = set(featuresCols, value)
|
||||
|
||||
/** Set the handleInvalid for VectorAssembler */
|
||||
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
|
||||
|
||||
/**
|
||||
* Check if schema has a field named with the value of "featuresCol" param and it's data type
|
||||
* must be VectorUDT
|
||||
*/
|
||||
def isFeaturesColSet(schema: StructType): Boolean = {
|
||||
schema.fieldNames.contains(getFeaturesCol) &&
|
||||
XGBoostSchemaUtils.isVectorUDFType(schema(getFeaturesCol).dataType)
|
||||
}
|
||||
|
||||
/** check the features columns type */
|
||||
def transformSchemaWithFeaturesCols(fit: Boolean, schema: StructType): StructType = {
|
||||
if (isFeaturesColsValid) {
|
||||
if (fit) {
|
||||
XGBoostSchemaUtils.checkNumericType(schema, $(labelCol))
|
||||
}
|
||||
$(featuresCols).foreach(feature =>
|
||||
XGBoostSchemaUtils.checkFeatureColumnType(schema(feature).dataType))
|
||||
schema
|
||||
} else {
|
||||
throw new IllegalArgumentException("featuresCol or featuresCols must be specified")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Vectorize the features columns if necessary.
|
||||
*
|
||||
* @param input the input dataset
|
||||
* @return (output dataset and the feature column name)
|
||||
*/
|
||||
def vectorize(input: Dataset[_]): (Dataset[_], String) = {
|
||||
val schema = input.schema
|
||||
if (isFeaturesColSet(schema)) {
|
||||
// Dataset already has vectorized.
|
||||
(input, getFeaturesCol)
|
||||
} else if (isFeaturesColsValid) {
|
||||
val featuresName = if (!schema.fieldNames.contains(getFeaturesCol)) {
|
||||
getFeaturesCol
|
||||
} else {
|
||||
"features_" + uid
|
||||
}
|
||||
val vectorAssembler = new VectorAssembler()
|
||||
.setHandleInvalid($(handleInvalid))
|
||||
.setInputCols(getFeaturesCols)
|
||||
.setOutputCol(featuresName)
|
||||
(vectorAssembler.transform(input).select(featuresName, getLabelCol), featuresName)
|
||||
} else {
|
||||
// never reach here, since transformSchema will take care of the case
|
||||
// that featuresCols is invalid
|
||||
(input, getFeaturesCol)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[scala] trait XGBoostClassifierParams extends XGBoostEstimatorCommon with HasNumClass
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
/*
|
||||
Copyright (c) 2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.ml.linalg.xgboost
|
||||
|
||||
import org.apache.spark.sql.types.{BooleanType, DataType, NumericType, StructType}
|
||||
import org.apache.spark.ml.linalg.VectorUDT
|
||||
import org.apache.spark.ml.util.SchemaUtils
|
||||
|
||||
object XGBoostSchemaUtils {
|
||||
|
||||
/** check if the dataType is VectorUDT */
|
||||
def isVectorUDFType(dataType: DataType): Boolean = {
|
||||
dataType match {
|
||||
case _: VectorUDT => true
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
|
||||
/** The feature columns will be vectorized by VectorAssembler first, which only
|
||||
* supports Numeric, Boolean and VectorUDT types */
|
||||
def checkFeatureColumnType(dataType: DataType): Unit = {
|
||||
dataType match {
|
||||
case _: NumericType | BooleanType =>
|
||||
case _: VectorUDT =>
|
||||
case d => throw new UnsupportedOperationException(s"featuresCols only supports Numeric, " +
|
||||
s"boolean and VectorUDT types, found: ${d}")
|
||||
}
|
||||
}
|
||||
|
||||
def checkNumericType(
|
||||
schema: StructType,
|
||||
colName: String,
|
||||
msg: String = ""): Unit = {
|
||||
SchemaUtils.checkNumericType(schema, colName, msg)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -24,11 +24,61 @@ import ml.dmlc.xgboost4j.java.{Rabit, RabitTracker => PyRabitTracker}
|
||||
import ml.dmlc.xgboost4j.scala.rabit.{RabitTracker => ScalaRabitTracker}
|
||||
import ml.dmlc.xgboost4j.java.IRabitTracker.TrackerStatus
|
||||
import ml.dmlc.xgboost4j.scala.DMatrix
|
||||
|
||||
import org.scalatest.{FunSuite, Ignore}
|
||||
import org.scalatest.{FunSuite}
|
||||
|
||||
class RabitRobustnessSuite extends FunSuite with PerTest {
|
||||
|
||||
private def getXGBoostExecutionParams(paramMap: Map[String, Any]): XGBoostExecutionParams = {
|
||||
val classifier = new XGBoostClassifier(paramMap)
|
||||
val xgbParamsFactory = new XGBoostExecutionParamsFactory(classifier.MLlib2XGBoostParams, sc)
|
||||
xgbParamsFactory.buildXGBRuntimeParams
|
||||
}
|
||||
|
||||
|
||||
test("Customize host ip and python exec for Rabit tracker") {
|
||||
val hostIp = "192.168.22.111"
|
||||
val pythonExec = "/usr/bin/python3"
|
||||
|
||||
val paramMap = Map(
|
||||
"num_workers" -> numWorkers,
|
||||
"tracker_conf" -> TrackerConf(0L, "python", hostIp))
|
||||
val xgbExecParams = getXGBoostExecutionParams(paramMap)
|
||||
val tracker = XGBoost.getTracker(xgbExecParams.numWorkers, xgbExecParams.trackerConf)
|
||||
tracker match {
|
||||
case pyTracker: PyRabitTracker =>
|
||||
val cmd = pyTracker.getRabitTrackerCommand
|
||||
assert(cmd.contains(hostIp))
|
||||
assert(cmd.startsWith("python"))
|
||||
case _ => assert(false, "expected python tracker implementation")
|
||||
}
|
||||
|
||||
val paramMap1 = Map(
|
||||
"num_workers" -> numWorkers,
|
||||
"tracker_conf" -> TrackerConf(0L, "python", "", pythonExec))
|
||||
val xgbExecParams1 = getXGBoostExecutionParams(paramMap1)
|
||||
val tracker1 = XGBoost.getTracker(xgbExecParams1.numWorkers, xgbExecParams1.trackerConf)
|
||||
tracker1 match {
|
||||
case pyTracker: PyRabitTracker =>
|
||||
val cmd = pyTracker.getRabitTrackerCommand
|
||||
assert(cmd.startsWith(pythonExec))
|
||||
assert(!cmd.contains(hostIp))
|
||||
case _ => assert(false, "expected python tracker implementation")
|
||||
}
|
||||
|
||||
val paramMap2 = Map(
|
||||
"num_workers" -> numWorkers,
|
||||
"tracker_conf" -> TrackerConf(0L, "python", hostIp, pythonExec))
|
||||
val xgbExecParams2 = getXGBoostExecutionParams(paramMap2)
|
||||
val tracker2 = XGBoost.getTracker(xgbExecParams2.numWorkers, xgbExecParams2.trackerConf)
|
||||
tracker2 match {
|
||||
case pyTracker: PyRabitTracker =>
|
||||
val cmd = pyTracker.getRabitTrackerCommand
|
||||
assert(cmd.startsWith(pythonExec))
|
||||
assert(cmd.contains(s" --host-ip=${hostIp}"))
|
||||
case _ => assert(false, "expected python tracker implementation")
|
||||
}
|
||||
}
|
||||
|
||||
test("training with Scala-implemented Rabit tracker") {
|
||||
val eval = new EvalError()
|
||||
val training = buildDataFrame(Classification.train)
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.apache.spark.sql._
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.Partitioner
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
|
||||
class XGBoostClassifierSuite extends FunSuite with PerTest {
|
||||
|
||||
@@ -316,4 +317,78 @@ class XGBoostClassifierSuite extends FunSuite with PerTest {
|
||||
xgb.fit(repartitioned)
|
||||
}
|
||||
|
||||
test("featuresCols with features column can work") {
|
||||
val spark = ss
|
||||
import spark.implicits._
|
||||
val xgbInput = Seq(
|
||||
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
|
||||
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
|
||||
.toDF("f1", "f2", "f3", "features", "label")
|
||||
|
||||
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
|
||||
"objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> 1)
|
||||
|
||||
val featuresName = Array("f1", "f2", "f3", "features")
|
||||
val xgbClassifier = new XGBoostClassifier(paramMap)
|
||||
.setFeaturesCol(featuresName)
|
||||
.setLabelCol("label")
|
||||
|
||||
val model = xgbClassifier.fit(xgbInput)
|
||||
assert(model.getFeaturesCols.sameElements(featuresName))
|
||||
|
||||
val df = model.transform(xgbInput)
|
||||
assert(df.schema.fieldNames.contains("features_" + model.uid))
|
||||
df.show()
|
||||
|
||||
val newFeatureName = "features_new"
|
||||
// transform also can work for vectorized dataset
|
||||
val vectorizedInput = new VectorAssembler()
|
||||
.setInputCols(featuresName)
|
||||
.setOutputCol(newFeatureName)
|
||||
.transform(xgbInput)
|
||||
.select(newFeatureName, "label")
|
||||
|
||||
val df1 = model
|
||||
.setFeaturesCol(newFeatureName)
|
||||
.transform(vectorizedInput)
|
||||
assert(df1.schema.fieldNames.contains(newFeatureName))
|
||||
df1.show()
|
||||
}
|
||||
|
||||
test("featuresCols without features column can work") {
|
||||
val spark = ss
|
||||
import spark.implicits._
|
||||
val xgbInput = Seq(
|
||||
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
|
||||
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
|
||||
.toDF("f1", "f2", "f3", "f4", "label")
|
||||
|
||||
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
|
||||
"objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> 1)
|
||||
|
||||
val featuresName = Array("f1", "f2", "f3", "f4")
|
||||
val xgbClassifier = new XGBoostClassifier(paramMap)
|
||||
.setFeaturesCol(featuresName)
|
||||
.setLabelCol("label")
|
||||
.setEvalSets(Map("eval" -> xgbInput))
|
||||
|
||||
val model = xgbClassifier.fit(xgbInput)
|
||||
assert(model.getFeaturesCols.sameElements(featuresName))
|
||||
|
||||
// transform should work for the dataset which includes the feature column names.
|
||||
val df = model.transform(xgbInput)
|
||||
assert(df.schema.fieldNames.contains("features"))
|
||||
df.show()
|
||||
|
||||
// transform also can work for vectorized dataset
|
||||
val vectorizedInput = new VectorAssembler()
|
||||
.setInputCols(featuresName)
|
||||
.setOutputCol("features")
|
||||
.transform(xgbInput)
|
||||
.select("features", "label")
|
||||
|
||||
val df1 = model.transform(vectorizedInput)
|
||||
df1.show()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
Copyright (c) 2014-2022 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -17,12 +17,15 @@
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost}
|
||||
import org.apache.spark.ml.linalg.Vector
|
||||
|
||||
import org.apache.spark.ml.linalg.{Vector, Vectors}
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.{DataFrame, Row}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
|
||||
class XGBoostRegressorSuite extends FunSuite with PerTest {
|
||||
protected val treeMethod: String = "auto"
|
||||
|
||||
@@ -216,4 +219,78 @@ class XGBoostRegressorSuite extends FunSuite with PerTest {
|
||||
assert(resultDF.columns.contains("predictLeaf"))
|
||||
assert(resultDF.columns.contains("predictContrib"))
|
||||
}
|
||||
|
||||
test("featuresCols with features column can work") {
|
||||
val spark = ss
|
||||
import spark.implicits._
|
||||
val xgbInput = Seq(
|
||||
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
|
||||
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
|
||||
.toDF("f1", "f2", "f3", "features", "label")
|
||||
|
||||
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
|
||||
"objective" -> "reg:squarederror", "num_round" -> 5, "num_workers" -> 1)
|
||||
|
||||
val featuresName = Array("f1", "f2", "f3", "features")
|
||||
val xgbClassifier = new XGBoostRegressor(paramMap)
|
||||
.setFeaturesCol(featuresName)
|
||||
.setLabelCol("label")
|
||||
|
||||
val model = xgbClassifier.fit(xgbInput)
|
||||
assert(model.getFeaturesCols.sameElements(featuresName))
|
||||
|
||||
val df = model.transform(xgbInput)
|
||||
assert(df.schema.fieldNames.contains("features_" + model.uid))
|
||||
df.show()
|
||||
|
||||
val newFeatureName = "features_new"
|
||||
// transform also can work for vectorized dataset
|
||||
val vectorizedInput = new VectorAssembler()
|
||||
.setInputCols(featuresName)
|
||||
.setOutputCol(newFeatureName)
|
||||
.transform(xgbInput)
|
||||
.select(newFeatureName, "label")
|
||||
|
||||
val df1 = model
|
||||
.setFeaturesCol(newFeatureName)
|
||||
.transform(vectorizedInput)
|
||||
assert(df1.schema.fieldNames.contains(newFeatureName))
|
||||
df1.show()
|
||||
}
|
||||
|
||||
test("featuresCols without features column can work") {
|
||||
val spark = ss
|
||||
import spark.implicits._
|
||||
val xgbInput = Seq(
|
||||
(Vectors.dense(1.0, 7.0), true, 10.1, 100.2, 0),
|
||||
(Vectors.dense(2.0, 20.0), false, 2.1, 2.2, 1))
|
||||
.toDF("f1", "f2", "f3", "f4", "label")
|
||||
|
||||
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
|
||||
"objective" -> "reg:squarederror", "num_round" -> 5, "num_workers" -> 1)
|
||||
|
||||
val featuresName = Array("f1", "f2", "f3", "f4")
|
||||
val xgbClassifier = new XGBoostRegressor(paramMap)
|
||||
.setFeaturesCol(featuresName)
|
||||
.setLabelCol("label")
|
||||
.setEvalSets(Map("eval" -> xgbInput))
|
||||
|
||||
val model = xgbClassifier.fit(xgbInput)
|
||||
assert(model.getFeaturesCols.sameElements(featuresName))
|
||||
|
||||
// transform should work for the dataset which includes the feature column names.
|
||||
val df = model.transform(xgbInput)
|
||||
assert(df.schema.fieldNames.contains("features"))
|
||||
df.show()
|
||||
|
||||
// transform also can work for vectorized dataset
|
||||
val vectorizedInput = new VectorAssembler()
|
||||
.setInputCols(featuresName)
|
||||
.setOutputCol("features")
|
||||
.transform(xgbInput)
|
||||
.select("features", "label")
|
||||
|
||||
val df1 = model.transform(vectorizedInput)
|
||||
df1.show()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
<parent>
|
||||
<groupId>ml.dmlc</groupId>
|
||||
<artifactId>xgboost-jvm_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
</parent>
|
||||
<artifactId>xgboost4j_2.12</artifactId>
|
||||
<version>1.6.0-SNAPSHOT</version>
|
||||
<version>1.6.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -30,6 +30,8 @@ public class RabitTracker implements IRabitTracker {
|
||||
private Map<String, String> envs = new HashMap<String, String>();
|
||||
// number of workers to be submitted.
|
||||
private int numWorkers;
|
||||
private String hostIp = "";
|
||||
private String pythonExec = "";
|
||||
private AtomicReference<Process> trackerProcess = new AtomicReference<Process>();
|
||||
|
||||
static {
|
||||
@@ -85,6 +87,13 @@ public class RabitTracker implements IRabitTracker {
|
||||
this.numWorkers = numWorkers;
|
||||
}
|
||||
|
||||
public RabitTracker(int numWorkers, String hostIp, String pythonExec)
|
||||
throws XGBoostError {
|
||||
this(numWorkers);
|
||||
this.hostIp = hostIp;
|
||||
this.pythonExec = pythonExec;
|
||||
}
|
||||
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
logger.error("Uncaught exception thrown by worker:", e);
|
||||
try {
|
||||
@@ -126,12 +135,34 @@ public class RabitTracker implements IRabitTracker {
|
||||
}
|
||||
}
|
||||
|
||||
/** visible for testing */
|
||||
public String getRabitTrackerCommand() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (pythonExec == null || pythonExec.isEmpty()) {
|
||||
sb.append("python ");
|
||||
} else {
|
||||
sb.append(pythonExec + " ");
|
||||
}
|
||||
sb.append(" " + tracker_py + " ");
|
||||
sb.append(" --log-level=DEBUG" + " ");
|
||||
sb.append(" --num-workers=" + numWorkers + " ");
|
||||
|
||||
// we first check the property then check the parameter
|
||||
String hostIpFromProperties = trackerProperties.getHostIp();
|
||||
if(hostIpFromProperties != null && !hostIpFromProperties.isEmpty()) {
|
||||
logger.debug("Using provided host-ip: " + hostIpFromProperties + " from properties");
|
||||
sb.append(" --host-ip=" + hostIpFromProperties + " ");
|
||||
} else if (hostIp != null & !hostIp.isEmpty()) {
|
||||
logger.debug("Using the parametr host-ip: " + hostIp);
|
||||
sb.append(" --host-ip=" + hostIp + " ");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private boolean startTrackerProcess() {
|
||||
try {
|
||||
String trackerExecString = this.addTrackerProperties("python " + tracker_py +
|
||||
" --log-level=DEBUG --num-workers=" + String.valueOf(numWorkers));
|
||||
|
||||
trackerProcess.set(Runtime.getRuntime().exec(trackerExecString));
|
||||
String cmd = getRabitTrackerCommand();
|
||||
trackerProcess.set(Runtime.getRuntime().exec(cmd));
|
||||
loadEnvs(trackerProcess.get().getInputStream());
|
||||
return true;
|
||||
} catch (IOException ioe) {
|
||||
@@ -140,18 +171,6 @@ public class RabitTracker implements IRabitTracker {
|
||||
}
|
||||
}
|
||||
|
||||
private String addTrackerProperties(String trackerExecString) {
|
||||
StringBuilder sb = new StringBuilder(trackerExecString);
|
||||
String hostIp = trackerProperties.getHostIp();
|
||||
|
||||
if(hostIp != null && !hostIp.isEmpty()){
|
||||
logger.debug("Using provided host-ip: " + hostIp);
|
||||
sb.append(" --host-ip=").append(hostIp);
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (trackerProcess.get() != null) {
|
||||
trackerProcess.get().destroy();
|
||||
|
||||
@@ -1 +1 @@
|
||||
1.6.0-dev
|
||||
1.6.0
|
||||
|
||||
@@ -419,6 +419,7 @@ class LearnerConfiguration : public Learner {
|
||||
obj_.reset(ObjFunction::Create(tparam_.objective, &generic_parameters_));
|
||||
}
|
||||
obj_->LoadConfig(objective_fn);
|
||||
learner_model_param_.task = obj_->Task();
|
||||
|
||||
tparam_.booster = get<String>(gradient_booster["name"]);
|
||||
if (!gbm_) {
|
||||
|
||||
@@ -14,7 +14,6 @@ dependencies:
|
||||
- jsonschema
|
||||
- cupy
|
||||
- python-graphviz
|
||||
- modin-ray
|
||||
- pip
|
||||
- py-ubjson
|
||||
- cffi
|
||||
|
||||
@@ -2,6 +2,7 @@ import sys
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
@contextmanager
|
||||
def cd(path):
|
||||
path = os.path.normpath(path)
|
||||
@@ -13,10 +14,12 @@ def cd(path):
|
||||
finally:
|
||||
os.chdir(cwd)
|
||||
|
||||
|
||||
if len(sys.argv) != 4:
|
||||
print('Usage: {} [wheel to rename] [commit id] [platform tag]'.format(sys.argv[0]))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
whl_path = sys.argv[1]
|
||||
commit_id = sys.argv[2]
|
||||
platform_tag = sys.argv[3]
|
||||
@@ -36,3 +39,7 @@ with cd(dirname):
|
||||
if os.path.isfile(new_name):
|
||||
os.remove(new_name)
|
||||
os.rename(basename, new_name)
|
||||
|
||||
filesize = os.path.getsize(new_name) / 1024 / 1024 # MB
|
||||
msg = f"Limit of wheel size set by PyPI is exceeded. {new_name}: {filesize}"
|
||||
assert filesize <= 200, msg
|
||||
|
||||
Reference in New Issue
Block a user