[jvm-packages] [breaking] rework xgboost4j-spark and xgboost4j-spark-gpu (#10639)

- Introduce an abstract XGBoost Estimator
- Update to the latest XGBoost parameters
  - Add all XGBoost parameters supported in XGBoost4j-spark.
  - Add setter and getter for these parameters.
  - Remove the deprecated parameters
- Address the missing value handling
- Remove any ETL operations in XGBoost
- Rework the GPU plugin
- Expand sanity tests for CPU and GPU consistency
This commit is contained in:
Bobby Wang
2024-09-11 15:54:19 +08:00
committed by GitHub
parent d94f6679fc
commit 67c8c96784
75 changed files with 4537 additions and 7556 deletions

View File

@@ -54,6 +54,7 @@
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark_${scala.binary.version}</artifactId>
<version>${spark.rapids.version}</version>
<classifier>${spark.rapids.classifier}</classifier>
<scope>provided</scope>
</dependency>
<dependency>

View File

@@ -55,9 +55,9 @@ public class CudfColumn extends Column {
DType dType = cv.getType();
String typeStr = "";
if (dType == DType.FLOAT32 || dType == DType.FLOAT64 ||
dType == DType.TIMESTAMP_DAYS || dType == DType.TIMESTAMP_MICROSECONDS ||
dType == DType.TIMESTAMP_MILLISECONDS || dType == DType.TIMESTAMP_NANOSECONDS ||
dType == DType.TIMESTAMP_SECONDS) {
dType == DType.TIMESTAMP_DAYS || dType == DType.TIMESTAMP_MICROSECONDS ||
dType == DType.TIMESTAMP_MILLISECONDS || dType == DType.TIMESTAMP_NANOSECONDS ||
dType == DType.TIMESTAMP_SECONDS) {
typeStr = "<f" + dType.getSizeInBytes();
} else if (dType == DType.BOOL8 || dType == DType.INT8 || dType == DType.INT16 ||
dType == DType.INT32 || dType == DType.INT64) {

View File

@@ -35,11 +35,39 @@ public class QuantileDMatrix extends DMatrix {
float missing,
int maxBin,
int nthread) throws XGBoostError {
this(iter, null, missing, maxBin, nthread);
}
/**
* Create QuantileDMatrix from iterator based on the cuda array interface
*
* @param iter the XGBoost ColumnBatch batch to provide the corresponding cuda array
* interface
* @param refDMatrix The reference QuantileDMatrix that provides quantile information, needed
* when creating validation/test dataset with QuantileDMatrix. Supplying the
* training DMatrix as a reference means that the same quantisation
* applied to the training data is applied to the validation/test data
* @param missing the missing value
* @param maxBin the max bin
* @param nthread the parallelism
* @throws XGBoostError
*/
public QuantileDMatrix(
Iterator<ColumnBatch> iter,
QuantileDMatrix refDMatrix,
float missing,
int maxBin,
int nthread) throws XGBoostError {
super(0);
long[] out = new long[1];
String conf = getConfig(missing, maxBin, nthread);
long[] ref = null;
if (refDMatrix != null) {
ref = new long[1];
ref[0] = refDMatrix.getHandle();
}
XGBoostJNI.checkCall(XGBoostJNI.XGQuantileDMatrixCreateFromCallback(
iter, null, conf, out));
iter, ref, conf, out));
handle = out[0];
}
@@ -85,6 +113,7 @@ public class QuantileDMatrix extends DMatrix {
private String getConfig(float missing, int maxBin, int nthread) {
return String.format("{\"missing\":%f,\"max_bin\":%d,\"nthread\":%d}",
missing, maxBin, nthread);
missing, maxBin, nthread);
}
}

View File

@@ -1,68 +0,0 @@
/*
Copyright (c) 2021 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.java.nvidia.spark;
import java.util.List;
import ai.rapids.cudf.ColumnVector;
import ai.rapids.cudf.Table;
import org.apache.spark.sql.types.*;
/**
* Wrapper of CudfTable with schema for scala
*/
public class GpuColumnBatch implements AutoCloseable {
private final StructType schema;
private Table table; // the original Table
public GpuColumnBatch(Table table, StructType schema) {
this.table = table;
this.schema = schema;
}
@Override
public void close() {
if (table != null) {
table.close();
table = null;
}
}
/** Slice the columns indicated by indices into a Table*/
public Table slice(List<Integer> indices) {
if (indices == null || indices.size() == 0) {
return null;
}
int len = indices.size();
ColumnVector[] cv = new ColumnVector[len];
for (int i = 0; i < len; i++) {
int index = indices.get(i);
if (index >= table.getNumberOfColumns()) {
throw new RuntimeException("Wrong index");
}
cv[i] = table.getColumn(index);
}
return new Table(cv);
}
public StructType getSchema() {
return schema;
}
}

View File

@@ -1 +0,0 @@
ml.dmlc.xgboost4j.scala.rapids.spark.GpuPreXGBoost

View File

@@ -0,0 +1 @@
ml.dmlc.xgboost4j.scala.spark.GpuXGBoostPlugin

View File

@@ -0,0 +1,133 @@
/*
Copyright (c) 2021-2024 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala
import ml.dmlc.xgboost4j.java.{Column, ColumnBatch, XGBoostError, QuantileDMatrix => JQuantileDMatrix}
import scala.collection.JavaConverters._
class QuantileDMatrix private[scala](
private[scala] override val jDMatrix: JQuantileDMatrix) extends DMatrix(jDMatrix) {
/**
* Create QuantileDMatrix from iterator based on the array interface
*
* @param iter the XGBoost ColumnBatch batch to provide the corresponding array interface
* @param missing the missing value
* @param maxBin the max bin
* @param nthread the parallelism
* @throws XGBoostError
*/
def this(iter: Iterator[ColumnBatch], missing: Float, maxBin: Int, nthread: Int) {
this(new JQuantileDMatrix(iter.asJava, missing, maxBin, nthread))
}
/**
* Create QuantileDMatrix from iterator based on the array interface
*
* @param iter the XGBoost ColumnBatch batch to provide the corresponding array interface
* @param refDMatrix The reference QuantileDMatrix that provides quantile information, needed
* when creating validation/test dataset with QuantileDMatrix. Supplying the
* training DMatrix as a reference means that the same quantisation applied
* to the training data is applied to the validation/test data
* @param missing the missing value
* @param maxBin the max bin
* @param nthread the parallelism
* @throws XGBoostError
*/
def this(iter: Iterator[ColumnBatch],
ref: QuantileDMatrix,
missing: Float,
maxBin: Int,
nthread: Int) {
this(new JQuantileDMatrix(iter.asJava, ref.jDMatrix, missing, maxBin, nthread))
}
/**
* set label of dmatrix
*
* @param labels labels
*/
@throws(classOf[XGBoostError])
override def setLabel(labels: Array[Float]): Unit =
throw new XGBoostError("QuantileDMatrix does not support setLabel.")
/**
* set weight of each instance
*
* @param weights weights
*/
@throws(classOf[XGBoostError])
override def setWeight(weights: Array[Float]): Unit =
throw new XGBoostError("QuantileDMatrix does not support setWeight.")
/**
* if specified, xgboost will start from this init margin
* can be used to specify initial prediction to boost from
*
* @param baseMargin base margin
*/
@throws(classOf[XGBoostError])
override def setBaseMargin(baseMargin: Array[Float]): Unit =
throw new XGBoostError("QuantileDMatrix does not support setBaseMargin.")
/**
* if specified, xgboost will start from this init margin
* can be used to specify initial prediction to boost from
*
* @param baseMargin base margin
*/
@throws(classOf[XGBoostError])
override def setBaseMargin(baseMargin: Array[Array[Float]]): Unit =
throw new XGBoostError("QuantileDMatrix does not support setBaseMargin.")
/**
* Set group sizes of DMatrix (used for ranking)
*
* @param group group size as array
*/
@throws(classOf[XGBoostError])
override def setGroup(group: Array[Int]): Unit =
throw new XGBoostError("QuantileDMatrix does not support setGroup.")
/**
* Set label of DMatrix from array interface
*/
@throws(classOf[XGBoostError])
override def setLabel(column: Column): Unit =
throw new XGBoostError("QuantileDMatrix does not support setLabel.")
/**
* set weight of dmatrix from column array interface
*/
@throws(classOf[XGBoostError])
override def setWeight(column: Column): Unit =
throw new XGBoostError("QuantileDMatrix does not support setWeight.")
/**
* set base margin of dmatrix from column array interface
*/
@throws(classOf[XGBoostError])
override def setBaseMargin(column: Column): Unit =
throw new XGBoostError("QuantileDMatrix does not support setBaseMargin.")
@throws(classOf[XGBoostError])
override def setQueryId(column: Column): Unit = {
throw new XGBoostError("QuantileDMatrix does not support setQueryId.")
}
}

View File

@@ -1,602 +0,0 @@
/*
Copyright (c) 2021-2024 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.rapids.spark
import scala.collection.JavaConverters._
import ml.dmlc.xgboost4j.java.nvidia.spark.GpuColumnBatch
import ml.dmlc.xgboost4j.java.CudfColumnBatch
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, QuantileDMatrix}
import ml.dmlc.xgboost4j.scala.spark.params.XGBoostEstimatorCommon
import ml.dmlc.xgboost4j.scala.spark.{PreXGBoost, PreXGBoostProvider, Watches, XGBoost, XGBoostClassificationModel, XGBoostClassifier, XGBoostExecutionParams, XGBoostRegressionModel, XGBoostRegressor}
import org.apache.commons.logging.LogFactory
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.functions.{col, collect_list, struct}
import org.apache.spark.sql.types.{ArrayType, FloatType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* GpuPreXGBoost brings Rapids-Plugin to XGBoost4j-Spark to accelerate XGBoost4j
* training and transform process
*/
class GpuPreXGBoost extends PreXGBoostProvider {
/**
* Whether the provider is enabled or not
*
* @param dataset the input dataset
* @return Boolean
*/
override def providerEnabled(dataset: Option[Dataset[_]]): Boolean = {
GpuPreXGBoost.providerEnabled(dataset)
}
/**
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
}
/**
* Transform Dataset
*
* @param model [[XGBoostClassificationModel]] or [[XGBoostRegressionModel]]
* @param dataset the input Dataset to transform
* @return the transformed DataFrame
*/
override def transformDataset(model: Model[_], dataset: Dataset[_]): DataFrame = {
GpuPreXGBoost.transformDataset(model, dataset)
}
override def transformSchema(
xgboostEstimator: XGBoostEstimatorCommon,
schema: StructType): StructType = {
GpuPreXGBoost.transformSchema(xgboostEstimator, schema)
}
}
class BoosterFlag extends Serializable {
// indicate if the GPU parameters are set.
var isGpuParamsSet = false
}
object GpuPreXGBoost extends PreXGBoostProvider {
private val logger = LogFactory.getLog("XGBoostSpark")
private val FEATURES_COLS = "features_cols"
private val TRAIN_NAME = "train"
override def providerEnabled(dataset: Option[Dataset[_]]): Boolean = {
// RuntimeConfig
val optionConf = dataset.map(ds => Some(ds.sparkSession.conf))
.getOrElse(SparkSession.getActiveSession.map(ss => ss.conf))
if (optionConf.isDefined) {
val conf = optionConf.get
val rapidsEnabled = try {
conf.get("spark.rapids.sql.enabled").toBoolean
} catch {
// Rapids plugin has default "spark.rapids.sql.enabled" to true
case _: NoSuchElementException => true
case _: Throwable => false // Any exception will return false
}
rapidsEnabled && conf.get("spark.sql.extensions", "")
.split(",")
.contains("com.nvidia.spark.rapids.SQLExecPlugin")
} else false
}
/**
* Convert the Dataset[_] to RDD[() => Watches] which will be fed to XGBoost
*
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {
val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
estimator match {
case est: XGBoostEstimatorCommon =>
require(
est.isDefined(est.device) &&
(est.getDevice.equals("cuda") || est.getDevice.equals("gpu")) ||
est.isDefined(est.treeMethod) && est.getTreeMethod.equals("gpu_hist"),
s"GPU train requires `device` set to `cuda` or `gpu`."
)
val groupName = estimator match {
case regressor: XGBoostRegressor => if (regressor.isDefined(regressor.groupCol)) {
regressor.getGroupCol } else ""
case _: XGBoostClassifier => ""
case _ => throw new RuntimeException("Unsupported estimator: " + estimator)
}
// Check schema and cast columns' type
(GpuUtils.getColumnNames(est)(est.labelCol, est.weightCol, est.baseMarginCol),
est.getFeaturesCols, groupName, est.getEvalSets(params))
case _ => throw new RuntimeException("Unsupported estimator: " + estimator)
}
val castedDF = GpuUtils.prepareColumnType(dataset, feturesCols, labelName, weightName,
marginName)
// Check columns and build column data batch
val trainingData = GpuUtils.buildColumnDataBatch(feturesCols,
labelName, weightName, marginName, groupName, castedDF)
// eval map
val evalDataMap = evalSets.map {
case (name, df) =>
val castDF = GpuUtils.prepareColumnType(df, feturesCols, labelName,
weightName, marginName)
(name, GpuUtils.buildColumnDataBatch(feturesCols, labelName, weightName,
marginName, groupName, castDF))
}
xgbExecParams: XGBoostExecutionParams =>
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
xgbExecParams.cacheTrainingSet)
(buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
}
/**
* Transform Dataset
*
* @param model supporting [[XGBoostClassificationModel]] and [[XGBoostRegressionModel]]
* @param dataset the input Dataset to transform
* @return the transformed DataFrame
*/
override def transformDataset(model: Model[_], dataset: Dataset[_]): DataFrame = {
val (booster, predictFunc, schema, featureColNames, missing) = model match {
case m: XGBoostClassificationModel =>
Seq(XGBoostClassificationModel._rawPredictionCol,
XGBoostClassificationModel._probabilityCol, m.leafPredictionCol, m.contribPredictionCol)
// predict and turn to Row
val predictFunc =
(booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => {
val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) =
m.producePredictionItrs(booster, dm)
m.produceResultIterator(originalRowItr, rawPredictionItr, probabilityItr,
predLeafItr, predContribItr)
}
// prepare the final Schema
var schema = StructType(dataset.schema.fields ++
Seq(StructField(name = XGBoostClassificationModel._rawPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false)) ++
Seq(StructField(name = XGBoostClassificationModel._probabilityCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false)))
if (m.isDefined(m.leafPredictionCol)) {
schema = schema.add(StructField(name = m.getLeafPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false))
}
if (m.isDefined(m.contribPredictionCol)) {
schema = schema.add(StructField(name = m.getContribPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false))
}
(m._booster, predictFunc, schema, m.getFeaturesCols, m.getMissing)
case m: XGBoostRegressionModel =>
Seq(XGBoostRegressionModel._originalPredictionCol, m.leafPredictionCol,
m.contribPredictionCol)
// predict and turn to Row
val predictFunc =
(booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => {
val Array(rawPredictionItr, predLeafItr, predContribItr) =
m.producePredictionItrs(booster, dm)
m.produceResultIterator(originalRowItr, rawPredictionItr, predLeafItr,
predContribItr)
}
// prepare the final Schema
var schema = StructType(dataset.schema.fields ++
Seq(StructField(name = XGBoostRegressionModel._originalPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false)))
if (m.isDefined(m.leafPredictionCol)) {
schema = schema.add(StructField(name = m.getLeafPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false))
}
if (m.isDefined(m.contribPredictionCol)) {
schema = schema.add(StructField(name = m.getContribPredictionCol, dataType =
ArrayType(FloatType, containsNull = false), nullable = false))
}
(m._booster, predictFunc, schema, m.getFeaturesCols, m.getMissing)
}
val sc = dataset.sparkSession.sparkContext
// Prepare some vars will be passed to executors.
val bOrigSchema = sc.broadcast(dataset.schema)
val bRowSchema = sc.broadcast(schema)
val bBooster = sc.broadcast(booster)
val bBoosterFlag = sc.broadcast(new BoosterFlag)
// Small vars so don't need to broadcast them
val isLocal = sc.isLocal
val featureIds = featureColNames.distinct.map(dataset.schema.fieldIndex)
// start transform by df->rd->mapPartition
val rowRDD: RDD[Row] = GpuUtils.toColumnarRdd(dataset.asInstanceOf[DataFrame]).mapPartitions {
tableIters =>
// UnsafeProjection is not serializable so do it on the executor side
val toUnsafe = UnsafeProjection.create(bOrigSchema.value)
// booster is visible for all spark tasks in the same executor
val booster = bBooster.value
val boosterFlag = bBoosterFlag.value
synchronized {
// there are two kind of race conditions,
// 1. multi-taskes set parameters at a time
// 2. one task sets parameter and another task reads the parameter
// both of them can cause potential un-expected behavior, moreover,
// it may cause executor crash
// So add synchronized to allow only one task to set parameter if it is not set.
// and rely on BlockManager to ensure the same booster only be called once to
// set parameter.
if (!boosterFlag.isGpuParamsSet) {
// set some params of gpu related to booster
// - gpu id
// - predictor: Force to gpu predictor since native doesn't save predictor.
val gpuId = if (!isLocal) XGBoost.getGPUAddrFromResources else 0
booster.setParam("device", s"cuda:$gpuId")
logger.info("GPU transform on device: " + gpuId)
boosterFlag.isGpuParamsSet = true;
}
}
// Iterator on Row
new Iterator[Row] {
// Convert InternalRow to Row
private val converter: InternalRow => Row = CatalystTypeConverters
.createToScalaConverter(bOrigSchema.value)
.asInstanceOf[InternalRow => Row]
// GPU batches read in must be closed by the receiver (us)
@transient var currentBatch: ColumnarBatch = null
// Iterator on Row
var iter: Iterator[Row] = null
TaskContext.get().addTaskCompletionListener[Unit](_ => {
closeCurrentBatch() // close the last ColumnarBatch
})
private def closeCurrentBatch(): Unit = {
if (currentBatch != null) {
currentBatch.close()
currentBatch = null
}
}
def loadNextBatch(): Unit = {
closeCurrentBatch()
if (tableIters.hasNext) {
val dataTypes = bOrigSchema.value.fields.map(x => x.dataType)
iter = withResource(tableIters.next()) { table =>
val gpuColumnBatch = new GpuColumnBatch(table, bOrigSchema.value)
// Create DMatrix
val feaTable = gpuColumnBatch.slice(GpuUtils.seqIntToSeqInteger(featureIds).asJava)
if (feaTable == null) {
throw new RuntimeException("Something wrong for feature indices")
}
try {
val cudfColumnBatch = new CudfColumnBatch(feaTable, null, null, null, null)
val dm = new DMatrix(cudfColumnBatch, missing, 1)
if (dm == null) {
Iterator.empty
} else {
try {
currentBatch = new ColumnarBatch(
GpuUtils.extractBatchToHost(table, dataTypes),
table.getRowCount().toInt)
val rowIterator = currentBatch.rowIterator().asScala
.map(toUnsafe)
.map(converter(_))
predictFunc(booster, dm, rowIterator)
} finally {
dm.delete()
}
}
} finally {
feaTable.close()
}
}
} else {
iter = null
}
}
override def hasNext: Boolean = {
val itHasNext = iter != null && iter.hasNext
if (!itHasNext) { // Don't have extra Row for current ColumnarBatch
loadNextBatch()
iter != null && iter.hasNext
} else {
itHasNext
}
}
override def next(): Row = {
if (iter == null || !iter.hasNext) {
loadNextBatch()
}
if (iter == null) {
throw new NoSuchElementException()
}
iter.next()
}
}
}
bOrigSchema.unpersist(blocking = false)
bRowSchema.unpersist(blocking = false)
bBooster.unpersist(blocking = false)
dataset.sparkSession.createDataFrame(rowRDD, schema)
}
/**
* Transform schema
*
* @param est supporting XGBoostClassifier/XGBoostClassificationModel and
* XGBoostRegressor/XGBoostRegressionModel
* @param schema the input schema
* @return the transformed schema
*/
override def transformSchema(
est: XGBoostEstimatorCommon,
schema: StructType): StructType = {
val fit = est match {
case _: XGBoostClassifier | _: XGBoostRegressor => true
case _ => false
}
val Seq(label, weight, margin) = GpuUtils.getColumnNames(est)(est.labelCol, est.weightCol,
est.baseMarginCol)
GpuUtils.validateSchema(schema, est.getFeaturesCols, label, weight, margin, fit)
}
/**
* Repartition all the Columnar Dataset (training and evaluation) to nWorkers,
* and assemble them into a map
*/
private def prepareInputData(
trainingData: ColumnDataBatch,
evalSetsMap: Map[String, ColumnDataBatch],
nWorkers: Int,
isCacheData: Boolean): Map[String, ColumnDataBatch] = {
// Cache is not supported
if (isCacheData) {
logger.warn("the cache param will be ignored by GPU pipeline!")
}
(Map(TRAIN_NAME -> trainingData) ++ evalSetsMap).map {
case (name, colData) =>
// No light cost way to get number of partitions from DataFrame, so always repartition
val newDF = colData.groupColName
.map(gn => repartitionForGroup(gn, colData.rawDF, nWorkers))
.getOrElse(repartitionInputData(colData.rawDF, nWorkers))
name -> ColumnDataBatch(newDF, colData.colIndices, colData.groupColName)
}
}
private def repartitionInputData(dataFrame: DataFrame, nWorkers: Int): DataFrame = {
// we can't involve any coalesce operation here, since Barrier mode will check
// the RDD patterns which does not allow coalesce.
dataFrame.repartition(nWorkers)
}
private def repartitionForGroup(
groupName: String,
dataFrame: DataFrame,
nWorkers: Int): DataFrame = {
// Group the data first
logger.info("Start groupBy for LTR")
val schema = dataFrame.schema
val groupedDF = dataFrame
.groupBy(groupName)
.agg(collect_list(struct(schema.fieldNames.map(col): _*)) as "list")
implicit val encoder = ExpressionEncoder(RowEncoder.encoderFor(schema, false))
// Expand the grouped rows after repartition
repartitionInputData(groupedDF, nWorkers).mapPartitions(iter => {
new Iterator[Row] {
var iterInRow: Iterator[Any] = Iterator.empty
override def hasNext: Boolean = {
if (iter.hasNext && !iterInRow.hasNext) {
// the first is groupId, second is list
iterInRow = iter.next.getSeq(1).iterator
}
iterInRow.hasNext
}
override def next(): Row = {
iterInRow.next.asInstanceOf[Row]
}
}
})
}
private def buildRDDWatches(
dataMap: Map[String, ColumnDataBatch],
xgbExeParams: XGBoostExecutionParams,
noEvalSet: Boolean): RDD[() => Watches] = {
val sc = dataMap(TRAIN_NAME).rawDF.sparkSession.sparkContext
val maxBin = xgbExeParams.toMap.getOrElse("max_bin", 256).asInstanceOf[Int]
// Start training
if (noEvalSet) {
// Get the indices here at driver side to avoid passing the whole Map to executor(s)
val colIndicesForTrain = dataMap(TRAIN_NAME).colIndices
GpuUtils.toColumnarRdd(dataMap(TRAIN_NAME).rawDF).mapPartitions({
iter =>
val iterColBatch = iter.map(table => new GpuColumnBatch(table, null))
Iterator(() => buildWatches(
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
colIndicesForTrain, iterColBatch, maxBin))
})
} else {
// Train with evaluation sets
// Get the indices here at driver side to avoid passing the whole Map to executor(s)
val nameAndColIndices = dataMap.map(nc => (nc._1, nc._2.colIndices))
coPartitionForGpu(dataMap, sc, xgbExeParams.numWorkers).mapPartitions {
nameAndColumnBatchIter =>
Iterator(() => buildWatchesWithEval(
PreXGBoost.getCacheDirName(xgbExeParams.useExternalMemory), xgbExeParams.missing,
nameAndColIndices, nameAndColumnBatchIter, maxBin))
}
}
}
private def buildWatches(
cachedDirName: Option[String],
missing: Float,
indices: ColumnIndices,
iter: Iterator[GpuColumnBatch],
maxBin: Int): Watches = {
val (dm, time) = GpuUtils.time {
buildDMatrix(iter, indices, missing, maxBin)
}
logger.debug("Benchmark[Train: Build DMatrix incrementally] " + time)
val (aDMatrix, aName) = if (dm == null) {
(Array.empty[DMatrix], Array.empty[String])
} else {
(Array(dm), Array("train"))
}
new Watches(aDMatrix, aName, cachedDirName)
}
private def buildWatchesWithEval(
cachedDirName: Option[String],
missing: Float,
indices: Map[String, ColumnIndices],
nameAndColumns: Iterator[(String, Iterator[GpuColumnBatch])],
maxBin: Int): Watches = {
val dms = nameAndColumns.map {
case (name, iter) => (name, {
val (dm, time) = GpuUtils.time {
buildDMatrix(iter, indices(name), missing, maxBin)
}
logger.debug(s"Benchmark[Train build $name DMatrix] " + time)
dm
})
}.filter(_._2 != null).toArray
new Watches(dms.map(_._2), dms.map(_._1), cachedDirName)
}
/**
* Build QuantileDMatrix based on GpuColumnBatches
*
* @param iter a sequence of GpuColumnBatch
* @param indices indicate the feature, label, weight, base margin column ids.
* @param missing the missing value
* @param maxBin the maxBin
* @return DMatrix
*/
private def buildDMatrix(
iter: Iterator[GpuColumnBatch],
indices: ColumnIndices,
missing: Float,
maxBin: Int): DMatrix = {
val rapidsIterator = new RapidsIterator(iter, indices)
new QuantileDMatrix(rapidsIterator, missing, maxBin, 1)
}
// zip all the Columnar RDDs into one RDD containing named column data batch.
private def coPartitionForGpu(
dataMap: Map[String, ColumnDataBatch],
sc: SparkContext,
nWorkers: Int): RDD[(String, Iterator[GpuColumnBatch])] = {
val emptyDataRdd = sc.parallelize(
Array.fill[(String, Iterator[GpuColumnBatch])](nWorkers)(null), nWorkers)
dataMap.foldLeft(emptyDataRdd) {
case (zippedRdd, (name, gdfColData)) =>
zippedRdd.zipPartitions(GpuUtils.toColumnarRdd(gdfColData.rawDF)) {
(itWrapper, iterCol) =>
val itCol = iterCol.map(table => new GpuColumnBatch(table, null))
(itWrapper.toArray :+ (name -> itCol)).filter(x => x != null).toIterator
}
}
}
private[this] class RapidsIterator(
base: Iterator[GpuColumnBatch],
indices: ColumnIndices) extends Iterator[CudfColumnBatch] {
override def hasNext: Boolean = base.hasNext
override def next(): CudfColumnBatch = {
// Since we have sliced original Table into different tables. Needs to close the original one.
withResource(base.next()) { gpuColumnBatch =>
val weights = indices.weightId.map(Seq(_)).getOrElse(Seq.empty)
val margins = indices.marginId.map(Seq(_)).getOrElse(Seq.empty)
new CudfColumnBatch(
gpuColumnBatch.slice(GpuUtils.seqIntToSeqInteger(indices.featureIds).asJava),
gpuColumnBatch.slice(GpuUtils.seqIntToSeqInteger(Seq(indices.labelId)).asJava),
gpuColumnBatch.slice(GpuUtils.seqIntToSeqInteger(weights).asJava),
gpuColumnBatch.slice(GpuUtils.seqIntToSeqInteger(margins).asJava),
null);
}
}
}
/** Executes the provided code block and then closes the resource */
def withResource[T <: AutoCloseable, V](r: T)(block: T => V): V = {
try {
block(r)
} finally {
r.close()
}
}
}

View File

@@ -1,178 +0,0 @@
/*
Copyright (c) 2021 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.rapids.spark
import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.{ColumnarRdd, GpuColumnVectorUtils}
import ml.dmlc.xgboost4j.scala.spark.util.Utils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.ml.param.{Param, Params}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{DataType, FloatType, NumericType, StructType}
import org.apache.spark.sql.vectorized.ColumnVector
private[spark] object GpuUtils {
def extractBatchToHost(table: Table, types: Array[DataType]): Array[ColumnVector] = {
// spark-rapids has shimmed the GpuColumnVector from 22.10
GpuColumnVectorUtils.extractHostColumns(table, types)
}
def toColumnarRdd(df: DataFrame): RDD[Table] = ColumnarRdd(df)
def seqIntToSeqInteger(x: Seq[Int]): Seq[Integer] = x.map(new Integer(_))
/** APIs for gpu column data related */
def buildColumnDataBatch(featureNames: Seq[String],
labelName: String,
weightName: String,
marginName: String,
groupName: String,
dataFrame: DataFrame): ColumnDataBatch = {
// Some check first
val schema = dataFrame.schema
val featureNameSet = featureNames.distinct
GpuUtils.validateSchema(schema, featureNameSet, labelName, weightName, marginName)
// group column
val (opGroup, groupId) = if (groupName.isEmpty) {
(None, None)
} else {
GpuUtils.checkNumericType(schema, groupName)
(Some(groupName), Some(schema.fieldIndex(groupName)))
}
// weight and base margin columns
val Seq(weightId, marginId) = Seq(weightName, marginName).map {
name =>
if (name.isEmpty) None else Some(schema.fieldIndex(name))
}
val colsIndices = ColumnIndices(featureNameSet.map(schema.fieldIndex),
schema.fieldIndex(labelName), weightId, marginId, groupId)
ColumnDataBatch(dataFrame, colsIndices, opGroup)
}
def checkNumericType(schema: StructType, colName: String,
msg: String = ""): Unit = {
val actualDataType = schema(colName).dataType
val message = if (msg != null && msg.trim.length > 0) " " + msg else ""
require(actualDataType.isInstanceOf[NumericType],
s"Column $colName must be of NumericType but found: " +
s"${actualDataType.catalogString}.$message")
}
/** Check and Cast the columns to FloatType */
def prepareColumnType(
dataset: Dataset[_],
featureNames: Seq[String],
labelName: String = "",
weightName: String = "",
marginName: String = "",
fitting: Boolean = true): DataFrame = {
// check first
val featureNameSet = featureNames.distinct
validateSchema(dataset.schema, featureNameSet, labelName, weightName, marginName, fitting)
val castToFloat = (df: DataFrame, colName: String) => {
if (df.schema(colName).dataType.isInstanceOf[FloatType]) {
df
} else {
val colMeta = df.schema(colName).metadata
df.withColumn(colName, col(colName).as(colName, colMeta).cast(FloatType))
}
}
val colNames = if (fitting) {
var names = featureNameSet :+ labelName
if (weightName.nonEmpty) {
names = names :+ weightName
}
if (marginName.nonEmpty) {
names = names :+ marginName
}
names
} else {
featureNameSet
}
colNames.foldLeft(dataset.asInstanceOf[DataFrame])(
(ds, colName) => castToFloat(ds, colName))
}
/** Validate input schema */
def validateSchema(schema: StructType,
featureNames: Seq[String],
labelName: String = "",
weightName: String = "",
marginName: String = "",
fitting: Boolean = true): StructType = {
val msg = if (fitting) "train" else "transform"
// feature columns
require(featureNames.nonEmpty, s"Gpu $msg requires features columns. " +
"please refer to `setFeaturesCol(value: Array[String])`!")
featureNames.foreach(fn => checkNumericType(schema, fn))
if (fitting) {
require(labelName.nonEmpty, "label column is not set.")
checkNumericType(schema, labelName)
if (weightName.nonEmpty) {
checkNumericType(schema, weightName)
}
if (marginName.nonEmpty) {
checkNumericType(schema, marginName)
}
}
schema
}
def time[R](block: => R): (R, Float) = {
val t0 = System.currentTimeMillis
val result = block // call-by-name
val t1 = System.currentTimeMillis
(result, (t1 - t0).toFloat / 1000)
}
/** Get column names from Parameter */
def getColumnNames(params: Params)(cols: Param[String]*): Seq[String] = {
// get column name, null | undefined will be casted to ""
def getColumnName(params: Params)(param: Param[String]): String = {
if (params.isDefined(param)) {
val colName = params.getOrDefault(param)
if (colName != null) colName else ""
} else ""
}
val getName = getColumnName(params)(_)
cols.map(getName)
}
}
/**
* A container to contain the column ids
*/
private[spark] case class ColumnIndices(
featureIds: Seq[Int],
labelId: Int,
weightId: Option[Int],
marginId: Option[Int],
groupId: Option[Int])
private[spark] case class ColumnDataBatch(
rawDF: DataFrame,
colIndices: ColumnIndices,
groupColName: Option[String])

View File

@@ -0,0 +1,315 @@
/*
Copyright (c) 2024 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.spark
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.{ColumnarRdd, GpuColumnVectorUtils}
import org.apache.commons.logging.LogFactory
import org.apache.spark.TaskContext
import org.apache.spark.ml.param.Param
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.types.{DataType, FloatType, IntegerType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import ml.dmlc.xgboost4j.java.CudfColumnBatch
import ml.dmlc.xgboost4j.scala.{DMatrix, QuantileDMatrix}
import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
import ml.dmlc.xgboost4j.scala.spark.params.HasGroupCol
/**
* GpuXGBoostPlugin is the XGBoost plugin which leverages spark-rapids
* to accelerate the XGBoost from ETL to train.
*/
class GpuXGBoostPlugin extends XGBoostPlugin {
private val logger = LogFactory.getLog("XGBoostSparkGpuPlugin")
/**
* Whether the plugin is enabled or not, if not enabled, fallback
* to the regular CPU pipeline
*
* @param dataset the input dataset
* @return Boolean
*/
override def isEnabled(dataset: Dataset[_]): Boolean = {
val conf = dataset.sparkSession.conf
val hasRapidsPlugin = conf.get("spark.plugins", "").split(",").contains(
"com.nvidia.spark.SQLPlugin")
val rapidsEnabled = try {
conf.get("spark.rapids.sql.enabled").toBoolean
} catch {
// Rapids plugin has default "spark.rapids.sql.enabled" to true
case _: NoSuchElementException => true
case _: Throwable => false // Any exception will return false
}
hasRapidsPlugin && rapidsEnabled
}
// TODO, support numeric type
private[spark] def preprocess[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M], dataset: Dataset[_]): Dataset[_] = {
// Columns to be selected for XGBoost training
val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty
val schema = dataset.schema
def selectCol(c: Param[String], targetType: DataType = FloatType) = {
// TODO support numeric types
if (estimator.isDefinedNonEmpty(c)) {
selectedCols.append(estimator.castIfNeeded(schema, estimator.getOrDefault(c), targetType))
}
}
Seq(estimator.labelCol, estimator.weightCol, estimator.baseMarginCol)
.foreach(p => selectCol(p))
estimator match {
case p: HasGroupCol => selectCol(p.groupCol, IntegerType)
case _ =>
}
// TODO support array/vector feature
estimator.getFeaturesCols.foreach { name =>
val col = estimator.castIfNeeded(dataset.schema, name)
selectedCols.append(col)
}
val input = dataset.select(selectedCols.toArray: _*)
estimator.repartitionIfNeeded(input)
}
// visible for testing
private[spark] def validate[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M],
dataset: Dataset[_]): Unit = {
require(estimator.getTreeMethod == "gpu_hist" || estimator.getDevice != "cpu",
"Using Spark-Rapids to accelerate XGBoost must set device=cuda")
}
/**
* Convert Dataset to RDD[Watches] which will be fed into XGBoost
*
* @param estimator which estimator to be handled.
* @param dataset to be converted.
* @return RDD[Watches]
*/
override def buildRddWatches[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M],
dataset: Dataset[_]): RDD[Watches] = {
validate(estimator, dataset)
val train = preprocess(estimator, dataset)
val schema = train.schema
val indices = estimator.buildColumnIndices(schema)
val maxBin = estimator.getMaxBins
val nthread = estimator.getNthread
val missing = estimator.getMissing
/** build QuantileDMatrix on the executor side */
def buildQuantileDMatrix(iter: Iterator[Table],
ref: Option[QuantileDMatrix] = None): QuantileDMatrix = {
val colBatchIter = iter.map { table =>
withResource(new GpuColumnBatch(table)) { batch =>
new CudfColumnBatch(
batch.select(indices.featureIds.get),
batch.select(indices.labelId),
batch.select(indices.weightId.getOrElse(-1)),
batch.select(indices.marginId.getOrElse(-1)),
batch.select(indices.groupId.getOrElse(-1)));
}
}
ref.map(r => new QuantileDMatrix(colBatchIter, r, missing, maxBin, nthread)).getOrElse(
new QuantileDMatrix(colBatchIter, missing, maxBin, nthread)
)
}
estimator.getEvalDataset().map { evalDs =>
val evalProcessed = preprocess(estimator, evalDs)
ColumnarRdd(train.toDF()).zipPartitions(ColumnarRdd(evalProcessed.toDF())) {
(trainIter, evalIter) =>
new Iterator[Watches] {
override def hasNext: Boolean = trainIter.hasNext
override def next(): Watches = {
val trainDM = buildQuantileDMatrix(trainIter)
val evalDM = buildQuantileDMatrix(evalIter, Some(trainDM))
new Watches(Array(trainDM, evalDM),
Array(Utils.TRAIN_NAME, Utils.VALIDATION_NAME), None)
}
}
}
}.getOrElse(
ColumnarRdd(train.toDF()).mapPartitions { iter =>
new Iterator[Watches] {
override def hasNext: Boolean = iter.hasNext
override def next(): Watches = {
val dm = buildQuantileDMatrix(iter)
new Watches(Array(dm), Array(Utils.TRAIN_NAME), None)
}
}
}
)
}
override def transform[M <: XGBoostModel[M]](model: XGBoostModel[M],
dataset: Dataset[_]): DataFrame = {
val sc = dataset.sparkSession.sparkContext
val (transformedSchema, pred) = model.preprocess(dataset)
val bBooster = sc.broadcast(model.nativeBooster)
val bOriginalSchema = sc.broadcast(dataset.schema)
val featureIds = model.getFeaturesCols.distinct.map(dataset.schema.fieldIndex).toList
val isLocal = sc.isLocal
val missing = model.getMissing
val nThread = model.getNthread
val rdd = ColumnarRdd(dataset.asInstanceOf[DataFrame]).mapPartitions { tableIters =>
// booster is visible for all spark tasks in the same executor
val booster = bBooster.value
val originalSchema = bOriginalSchema.value
// UnsafeProjection is not serializable so do it on the executor side
val toUnsafe = UnsafeProjection.create(originalSchema)
if (!booster.deviceIsSet) {
booster.deviceIsSet.synchronized {
if (!booster.deviceIsSet) {
booster.deviceIsSet = true
val gpuId = if (!isLocal) XGBoost.getGPUAddrFromResources else 0
booster.setParam("device", s"cuda:$gpuId")
logger.info("GPU transform on GPU device: cuda:" + gpuId)
}
}
}
// Iterator on Row
new Iterator[Row] {
// Convert InternalRow to Row
private val converter: InternalRow => Row = CatalystTypeConverters
.createToScalaConverter(originalSchema)
.asInstanceOf[InternalRow => Row]
// GPU batches read in must be closed by the receiver
@transient var currentBatch: ColumnarBatch = null
// Iterator on Row
var iter: Iterator[Row] = null
TaskContext.get().addTaskCompletionListener[Unit](_ => {
closeCurrentBatch() // close the last ColumnarBatch
})
private def closeCurrentBatch(): Unit = {
if (currentBatch != null) {
currentBatch.close()
currentBatch = null
}
}
def loadNextBatch(): Unit = {
closeCurrentBatch()
if (tableIters.hasNext) {
val dataTypes = originalSchema.fields.map(x => x.dataType)
iter = withResource(tableIters.next()) { table =>
// Create DMatrix
val featureTable = new GpuColumnBatch(table).select(featureIds)
if (featureTable == null) {
val msg = featureIds.mkString(",")
throw new RuntimeException(s"Couldn't create feature table for the " +
s"feature indices $msg")
}
try {
val cudfColumnBatch = new CudfColumnBatch(featureTable, null, null, null, null)
val dm = new DMatrix(cudfColumnBatch, missing, nThread)
if (dm == null) {
Iterator.empty
} else {
try {
currentBatch = new ColumnarBatch(
GpuColumnVectorUtils.extractHostColumns(table, dataTypes),
table.getRowCount().toInt)
val rowIterator = currentBatch.rowIterator().asScala.map(toUnsafe)
.map(converter(_))
model.predictInternal(booster, dm, pred, rowIterator).toIterator
} finally {
dm.delete()
}
}
} finally {
featureTable.close()
}
}
} else {
iter = null
}
}
override def hasNext: Boolean = {
val itHasNext = iter != null && iter.hasNext
if (!itHasNext) { // Don't have extra Row for current ColumnarBatch
loadNextBatch()
iter != null && iter.hasNext
} else {
itHasNext
}
}
override def next(): Row = {
if (iter == null || !iter.hasNext) {
loadNextBatch()
}
if (iter == null) {
throw new NoSuchElementException()
}
iter.next()
}
}
}
bBooster.unpersist(false)
bOriginalSchema.unpersist(false)
val output = dataset.sparkSession.createDataFrame(rdd, transformedSchema)
model.postTransform(output, pred).toDF()
}
}
private class GpuColumnBatch(table: Table) extends AutoCloseable {
def select(index: Int): Table = {
select(Seq(index))
}
def select(indices: Seq[Int]): Table = {
if (!indices.forall(index => index < table.getNumberOfColumns && index >= 0)) {
return null;
}
new Table(indices.map(table.getColumn): _*)
}
override def close(): Unit = {
if (Option(table).isDefined) {
table.close()
}
}
}

View File

@@ -16,9 +16,7 @@
package ml.dmlc.xgboost4j.java;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import ai.rapids.cudf.Table;
import junit.framework.TestCase;
@@ -122,8 +120,7 @@ public class DMatrixTest {
tables.add(new CudfColumnBatch(X_0, y_0, w_0, m_0, q_0));
tables.add(new CudfColumnBatch(X_1, y_1, w_1, m_1, q_1));
DMatrix dmat = new QuantileDMatrix(tables.iterator(), 0.0f, 256, 1);
QuantileDMatrix dmat = new QuantileDMatrix(tables.iterator(), 0.0f, 256, 1);
float[] anchorLabel = convertFloatTofloat(label1, label2);
float[] anchorWeight = convertFloatTofloat(weight1, weight2);
float[] anchorBaseMargin = convertFloatTofloat(baseMargin1, baseMargin2);
@@ -135,6 +132,57 @@ public class DMatrixTest {
}
}
private Float[] generateFloatArray(int size, long seed) {
Float[] array = new Float[size];
Random random = new Random(seed);
for (int i = 0; i < size; i++) {
array[i] = random.nextFloat();
}
return array;
}
@Test
public void testGetQuantileCut() throws XGBoostError {
int rows = 100;
try (
Table X_0 = new Table.TestBuilder()
.column(generateFloatArray(rows, 1l))
.column(generateFloatArray(rows, 2l))
.column(generateFloatArray(rows, 3l))
.column(generateFloatArray(rows, 4l))
.column(generateFloatArray(rows, 5l))
.build();
Table y_0 = new Table.TestBuilder().column(generateFloatArray(rows, 6l)).build();
Table X_1 = new Table.TestBuilder()
.column(generateFloatArray(rows, 11l))
.column(generateFloatArray(rows, 12l))
.column(generateFloatArray(rows, 13l))
.column(generateFloatArray(rows, 14l))
.column(generateFloatArray(rows, 15l))
.build();
Table y_1 = new Table.TestBuilder().column(generateFloatArray(rows, 16l)).build();
) {
List<ColumnBatch> tables = new LinkedList<>();
tables.add(new CudfColumnBatch(X_0, y_0, null, null, null));
QuantileDMatrix train = new QuantileDMatrix(tables.iterator(), 0.0f, 256, 1);
tables.clear();
tables.add(new CudfColumnBatch(X_1, y_1, null, null, null));
QuantileDMatrix eval = new QuantileDMatrix(tables.iterator(), train, 0.0f, 256, 1);
DMatrix.QuantileCut trainCut = train.getQuantileCut();
DMatrix.QuantileCut evalCut = eval.getQuantileCut();
TestCase.assertTrue(trainCut.getIndptr().length == evalCut.getIndptr().length);
TestCase.assertTrue(Arrays.equals(trainCut.getIndptr(), evalCut.getIndptr()));
TestCase.assertTrue(trainCut.getValues().length == evalCut.getValues().length);
TestCase.assertTrue(Arrays.equals(trainCut.getValues(), evalCut.getValues()));
}
}
private float[] convertFloatTofloat(Float[]... datas) {
int totalLength = 0;
for (Float[] data : datas) {

View File

@@ -16,11 +16,13 @@
package ml.dmlc.xgboost4j.scala
import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.Table
import ml.dmlc.xgboost4j.java.CudfColumnBatch
import org.scalatest.funsuite.AnyFunSuite
import scala.collection.mutable.ArrayBuffer
import ml.dmlc.xgboost4j.java.CudfColumnBatch
import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
class QuantileDMatrixSuite extends AnyFunSuite {
@@ -73,13 +75,4 @@ class QuantileDMatrixSuite extends AnyFunSuite {
}
}
}
/** Executes the provided code block and then closes the resource */
private def withResource[T <: AutoCloseable, V](r: T)(block: T => V): V = {
try {
block(r)
} finally {
r.close()
}
}
}

View File

@@ -1,288 +0,0 @@
/*
Copyright (c) 2021-2023 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.rapids.spark
import java.nio.file.{Files, Path}
import java.sql.{Date, Timestamp}
import java.util.{Locale, TimeZone}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.apache.spark.{GpuTestUtils, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.{Row, SparkSession}
trait GpuTestSuite extends AnyFunSuite with TmpFolderSuite {
import SparkSessionHolder.withSparkSession
protected def getResourcePath(resource: String): String = {
require(resource.startsWith("/"), "resource must start with /")
getClass.getResource(resource).getPath
}
def enableCsvConf(): SparkConf = {
new SparkConf()
.set("spark.rapids.sql.csv.read.float.enabled", "true")
.set("spark.rapids.sql.csv.read.double.enabled", "true")
}
def withGpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
// set "spark.rapids.sql.explain" to "ALL" to check if the operators
// can be replaced by GPU
val c = conf.clone()
.set("spark.rapids.sql.enabled", "true")
withSparkSession(c, f)
}
def withCpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
val c = conf.clone()
.set("spark.rapids.sql.enabled", "false") // Just to be sure
withSparkSession(c, f)
}
def compareResults(
sort: Boolean,
floatEpsilon: Double,
fromLeft: Array[Row],
fromRight: Array[Row]): Boolean = {
if (sort) {
val left = fromLeft.map(_.toSeq).sortWith(seqLt)
val right = fromRight.map(_.toSeq).sortWith(seqLt)
compare(left, right, floatEpsilon)
} else {
compare(fromLeft, fromRight, floatEpsilon)
}
}
// we guarantee that the types will be the same
private def seqLt(a: Seq[Any], b: Seq[Any]): Boolean = {
if (a.length < b.length) {
return true
}
// lengths are the same
for (i <- a.indices) {
val v1 = a(i)
val v2 = b(i)
if (v1 != v2) {
// null is always < anything but null
if (v1 == null) {
return true
}
if (v2 == null) {
return false
}
(v1, v2) match {
case (i1: Int, i2: Int) => if (i1 < i2) {
return true
} else if (i1 > i2) {
return false
}// else equal go on
case (i1: Long, i2: Long) => if (i1 < i2) {
return true
} else if (i1 > i2) {
return false
} // else equal go on
case (i1: Float, i2: Float) => if (i1.isNaN() && !i2.isNaN()) return false
else if (!i1.isNaN() && i2.isNaN()) return true
else if (i1 < i2) {
return true
} else if (i1 > i2) {
return false
} // else equal go on
case (i1: Date, i2: Date) => if (i1.before(i2)) {
return true
} else if (i1.after(i2)) {
return false
} // else equal go on
case (i1: Double, i2: Double) => if (i1.isNaN() && !i2.isNaN()) return false
else if (!i1.isNaN() && i2.isNaN()) return true
else if (i1 < i2) {
return true
} else if (i1 > i2) {
return false
} // else equal go on
case (i1: Short, i2: Short) => if (i1 < i2) {
return true
} else if (i1 > i2) {
return false
} // else equal go on
case (i1: Timestamp, i2: Timestamp) => if (i1.before(i2)) {
return true
} else if (i1.after(i2)) {
return false
} // else equal go on
case (s1: String, s2: String) =>
val cmp = s1.compareTo(s2)
if (cmp < 0) {
return true
} else if (cmp > 0) {
return false
} // else equal go on
case (o1, _) =>
throw new UnsupportedOperationException(o1.getClass + " is not supported yet")
}
}
}
// They are equal...
false
}
private def compare(expected: Any, actual: Any, epsilon: Double = 0.0): Boolean = {
def doublesAreEqualWithinPercentage(expected: Double, actual: Double): (String, Boolean) = {
if (!compare(expected, actual)) {
if (expected != 0) {
val v = Math.abs((expected - actual) / expected)
(s"\n\nABS($expected - $actual) / ABS($actual) == $v is not <= $epsilon ", v <= epsilon)
} else {
val v = Math.abs(expected - actual)
(s"\n\nABS($expected - $actual) == $v is not <= $epsilon ", v <= epsilon)
}
} else {
("SUCCESS", true)
}
}
(expected, actual) match {
case (a: Float, b: Float) if a.isNaN && b.isNaN => true
case (a: Double, b: Double) if a.isNaN && b.isNaN => true
case (null, null) => true
case (null, _) => false
case (_, null) => false
case (a: Array[_], b: Array[_]) =>
a.length == b.length && a.zip(b).forall { case (l, r) => compare(l, r, epsilon) }
case (a: Map[_, _], b: Map[_, _]) =>
a.size == b.size && a.keys.forall { aKey =>
b.keys.find(bKey => compare(aKey, bKey))
.exists(bKey => compare(a(aKey), b(bKey), epsilon))
}
case (a: Iterable[_], b: Iterable[_]) =>
a.size == b.size && a.zip(b).forall { case (l, r) => compare(l, r, epsilon) }
case (a: Product, b: Product) =>
compare(a.productIterator.toSeq, b.productIterator.toSeq, epsilon)
case (a: Row, b: Row) =>
compare(a.toSeq, b.toSeq, epsilon)
// 0.0 == -0.0, turn float/double to bits before comparison, to distinguish 0.0 and -0.0.
case (a: Double, b: Double) if epsilon <= 0 =>
java.lang.Double.doubleToRawLongBits(a) == java.lang.Double.doubleToRawLongBits(b)
case (a: Double, b: Double) if epsilon > 0 =>
val ret = doublesAreEqualWithinPercentage(a, b)
if (!ret._2) {
System.err.println(ret._1 + " (double)")
}
ret._2
case (a: Float, b: Float) if epsilon <= 0 =>
java.lang.Float.floatToRawIntBits(a) == java.lang.Float.floatToRawIntBits(b)
case (a: Float, b: Float) if epsilon > 0 =>
val ret = doublesAreEqualWithinPercentage(a, b)
if (!ret._2) {
System.err.println(ret._1 + " (float)")
}
ret._2
case (a, b) => a == b
}
}
}
trait TmpFolderSuite extends BeforeAndAfterAll { self: AnyFunSuite =>
protected var tempDir: Path = _
override def beforeAll(): Unit = {
super.beforeAll()
tempDir = Files.createTempDirectory(getClass.getName)
}
override def afterAll(): Unit = {
JavaUtils.deleteRecursively(tempDir.toFile)
super.afterAll()
}
protected def createTmpFolder(prefix: String): Path = {
Files.createTempDirectory(tempDir, prefix)
}
}
object SparkSessionHolder extends Logging {
private var spark = createSparkSession()
private var origConf = spark.conf.getAll
private var origConfKeys = origConf.keys.toSet
private def setAllConfs(confs: Array[(String, String)]): Unit = confs.foreach {
case (key, value) if spark.conf.get(key, null) != value =>
spark.conf.set(key, value)
case _ => // No need to modify it
}
private def createSparkSession(): SparkSession = {
GpuTestUtils.cleanupAnyExistingSession()
// Timezone is fixed to UTC to allow timestamps to work by default
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
// Add Locale setting
Locale.setDefault(Locale.US)
val builder = SparkSession.builder()
.master("local[2]")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.rapids.sql.enabled", "false")
.config("spark.rapids.sql.test.enabled", "false")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.config("spark.rapids.memory.gpu.pooling.enabled", "false") // Disable RMM for unit tests.
.config("spark.sql.files.maxPartitionBytes", "1000")
.appName("XGBoost4j-Spark-Gpu unit test")
builder.getOrCreate()
}
private def reinitSession(): Unit = {
spark = createSparkSession()
origConf = spark.conf.getAll
origConfKeys = origConf.keys.toSet
}
def sparkSession: SparkSession = {
if (SparkSession.getActiveSession.isEmpty) {
reinitSession()
}
spark
}
def resetSparkSessionConf(): Unit = {
if (SparkSession.getActiveSession.isEmpty) {
reinitSession()
} else {
setAllConfs(origConf.toArray)
val currentKeys = spark.conf.getAll.keys.toSet
val toRemove = currentKeys -- origConfKeys
toRemove.foreach(spark.conf.unset)
}
logDebug(s"RESET CONF TO: ${spark.conf.getAll}")
}
def withSparkSession[U](conf: SparkConf, f: SparkSession => U): U = {
resetSparkSessionConf
logDebug(s"SETTING CONF: ${conf.getAll.toMap}")
setAllConfs(conf.getAll)
logDebug(s"RUN WITH CONF: ${spark.conf.getAll}\n")
spark.sparkContext.setLogLevel("WARN")
f(spark)
}
}

View File

@@ -1,232 +0,0 @@
/*
Copyright (c) 2021-2022 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.rapids.spark
import java.io.File
import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassificationModel, XGBoostClassifier}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.functions.{col, udf, when}
import org.apache.spark.sql.types.{FloatType, StructField, StructType}
class GpuXGBoostClassifierSuite extends GpuTestSuite {
private val dataPath = if (new java.io.File("../../demo/data/veterans_lung_cancer.csv").isFile) {
"../../demo/data/veterans_lung_cancer.csv"
} else {
"../demo/data/veterans_lung_cancer.csv"
}
val labelName = "label_col"
val schema = StructType(Seq(
StructField("f1", FloatType), StructField("f2", FloatType), StructField("f3", FloatType),
StructField("f4", FloatType), StructField("f5", FloatType), StructField("f6", FloatType),
StructField("f7", FloatType), StructField("f8", FloatType), StructField("f9", FloatType),
StructField("f10", FloatType), StructField("f11", FloatType), StructField("f12", FloatType),
StructField(labelName, FloatType)
))
val featureNames = schema.fieldNames.filter(s => !s.equals(labelName))
test("The transform result should be same for several runs on same model") {
withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "binary:logistic",
"num_round" -> 10, "num_workers" -> 1, "tree_method" -> "gpu_hist",
"features_cols" -> featureNames, "label_col" -> labelName)
val Array(originalDf, testDf) = spark.read.option("header", "true").schema(schema)
.csv(dataPath).withColumn("f2", when(col("f2").isin(Float.PositiveInfinity), 0))
.randomSplit(Array(0.7, 0.3), seed = 1)
// Get a model
val model = new XGBoostClassifier(xgbParam)
.fit(originalDf)
val left = model.transform(testDf).collect()
val right = model.transform(testDf).collect()
// The left should be same with right
assert(compareResults(true, 0.000001, left, right))
}
}
test("use weight") {
withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "binary:logistic",
"num_round" -> 10, "num_workers" -> 1, "tree_method" -> "gpu_hist",
"features_cols" -> featureNames, "label_col" -> labelName)
val Array(originalDf, testDf) = spark.read.option("header", "true").schema(schema)
.csv(dataPath).withColumn("f2", when(col("f2").isin(Float.PositiveInfinity), 0))
.randomSplit(Array(0.7, 0.3), seed = 1)
val getWeightFromF1 = udf({ f1: Float => if (f1.toInt % 2 == 0) 1.0f else 0.001f })
val dfWithWeight = originalDf.withColumn("weight", getWeightFromF1(col("f1")))
val model = new XGBoostClassifier(xgbParam)
.fit(originalDf)
val model2 = new XGBoostClassifier(xgbParam)
.setWeightCol("weight")
.fit(dfWithWeight)
val left = model.transform(testDf).collect()
val right = model2.transform(testDf).collect()
// left should be different with right
assert(!compareResults(true, 0.000001, left, right))
}
}
test("Save model and transform GPU dataset") {
// Train a model on GPU
val (gpuModel, testDf) = withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "binary:logistic",
"num_round" -> 10, "num_workers" -> 1)
val Array(rawInput, testDf) = spark.read.option("header", "true").schema(schema)
.csv(dataPath).withColumn("f2", when(col("f2").isin(Float.PositiveInfinity), 0))
.randomSplit(Array(0.7, 0.3), seed = 1)
val classifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setTreeMethod("gpu_hist")
(classifier.fit(rawInput), testDf)
}
val xgbrModel = new File(tempDir.toFile, "xgbrModel").getPath
gpuModel.write.overwrite().save(xgbrModel)
val gpuModelFromFile = XGBoostClassificationModel.load(xgbrModel)
// transform on GPU
withGpuSparkSession() { spark =>
val left = gpuModel
.transform(testDf)
.select(labelName, "rawPrediction", "probability", "prediction")
.collect()
val right = gpuModelFromFile
.transform(testDf)
.select(labelName, "rawPrediction", "probability", "prediction")
.collect()
assert(compareResults(true, 0.000001, left, right))
}
}
test("Model trained on CPU can transform GPU dataset") {
// Train a model on CPU
val cpuModel = withCpuSparkSession() { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "binary:logistic",
"num_round" -> 10, "num_workers" -> 1)
val Array(rawInput, _) = spark.read.option("header", "true").schema(schema)
.csv(dataPath).withColumn("f2", when(col("f2").isin(Float.PositiveInfinity), 0))
.randomSplit(Array(0.7, 0.3), seed = 1)
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames)
.setOutputCol("features")
val trainingDf = vectorAssembler.transform(rawInput).select("features", labelName)
val classifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol("features")
.setLabelCol(labelName)
.setTreeMethod("auto")
classifier.fit(trainingDf)
}
val xgbrModel = new File(tempDir.toFile, "xgbrModel").getPath
cpuModel.write.overwrite().save(xgbrModel)
val cpuModelFromFile = XGBoostClassificationModel.load(xgbrModel)
// transform on GPU
withGpuSparkSession() { spark =>
val Array(_, testDf) = spark.read.option("header", "true").schema(schema)
.csv(dataPath).withColumn("f2", when(col("f2").isin(Float.PositiveInfinity), 0))
.randomSplit(Array(0.7, 0.3), seed = 1)
// Since CPU model does not know the information about the features cols that GPU transform
// pipeline requires. End user needs to setFeaturesCol(features: Array[String]) in the model
// manually
val thrown = intercept[NoSuchElementException](cpuModel
.transform(testDf)
.collect())
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
val left = cpuModel
.setFeaturesCol(featureNames)
.transform(testDf)
.collect()
val right = cpuModelFromFile
.setFeaturesCol(featureNames)
.transform(testDf)
.collect()
assert(compareResults(true, 0.000001, left, right))
}
}
test("Model trained on GPU can transform CPU dataset") {
// Train a model on GPU
val gpuModel = withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "binary:logistic",
"num_round" -> 10, "num_workers" -> 1)
val Array(rawInput, _) = spark.read.option("header", "true").schema(schema)
.csv(dataPath).withColumn("f2", when(col("f2").isin(Float.PositiveInfinity), 0))
.randomSplit(Array(0.7, 0.3), seed = 1)
val classifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setTreeMethod("gpu_hist")
classifier.fit(rawInput)
}
val xgbrModel = new File(tempDir.toFile, "xgbrModel").getPath
gpuModel.write.overwrite().save(xgbrModel)
val gpuModelFromFile = XGBoostClassificationModel.load(xgbrModel)
// transform on CPU
withCpuSparkSession() { spark =>
val Array(_, rawInput) = spark.read.option("header", "true").schema(schema)
.csv(dataPath).withColumn("f2", when(col("f2").isin(Float.PositiveInfinity), 0))
.randomSplit(Array(0.7, 0.3), seed = 1)
val featureColName = "feature_col"
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames)
.setOutputCol(featureColName)
val testDf = vectorAssembler.transform(rawInput).select(featureColName, labelName)
// Since GPU model does not know the information about the features col name that CPU
// transform pipeline requires. End user needs to setFeaturesCol in the model manually
intercept[IllegalArgumentException](
gpuModel
.transform(testDf)
.collect())
val left = gpuModel
.setFeaturesCol(featureColName)
.transform(testDf)
.select(labelName, "rawPrediction", "probability", "prediction")
.collect()
val right = gpuModelFromFile
.setFeaturesCol(featureColName)
.transform(testDf)
.select(labelName, "rawPrediction", "probability", "prediction")
.collect()
assert(compareResults(true, 0.000001, left, right))
}
}
}

View File

@@ -1,212 +0,0 @@
/*
Copyright (c) 2021-2023 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.rapids.spark
import java.io.File
import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StringType
class GpuXGBoostGeneralSuite extends GpuTestSuite {
private val labelName = "label_col"
private val weightName = "weight_col"
private val baseMarginName = "margin_col"
private val featureNames = Array("f1", "f2", "f3")
private val allColumnNames = featureNames :+ weightName :+ baseMarginName :+ labelName
private val trainingData = Seq(
// f1, f2, f3, weight, margin, label
(1.0f, 2.0f, 3.0f, 1.0f, 0.5f, 0),
(2.0f, 3.0f, 4.0f, 2.0f, 0.6f, 0),
(1.2f, 2.1f, 3.1f, 1.1f, 0.51f, 0),
(2.3f, 3.1f, 4.1f, 2.1f, 0.61f, 0),
(3.0f, 4.0f, 5.0f, 1.5f, 0.3f, 1),
(4.0f, 5.0f, 6.0f, 2.5f, 0.4f, 1),
(3.1f, 4.1f, 5.1f, 1.6f, 0.4f, 1),
(4.1f, 5.1f, 6.1f, 2.6f, 0.5f, 1),
(5.0f, 6.0f, 7.0f, 1.0f, 0.2f, 2),
(6.0f, 7.0f, 8.0f, 1.3f, 0.6f, 2),
(5.1f, 6.1f, 7.1f, 1.2f, 0.1f, 2),
(6.1f, 7.1f, 8.1f, 1.4f, 0.7f, 2),
(6.2f, 7.2f, 8.2f, 1.5f, 0.8f, 2))
test("MLlib way setting features_cols should work") {
withGpuSparkSession() { spark =>
import spark.implicits._
val trainingDf = trainingData.toDF(allColumnNames: _*)
val xgbParam = Map(
"eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1,
"tree_method" -> "hist", "device" -> "cuda",
"features_cols" -> featureNames, "label_col" -> labelName
)
new XGBoostClassifier(xgbParam)
.fit(trainingDf)
}
}
test("disorder feature columns should work") {
withGpuSparkSession() { spark =>
import spark.implicits._
var trainingDf = trainingData.toDF(allColumnNames: _*)
trainingDf = trainingDf.select(labelName, "f2", weightName, "f3", baseMarginName, "f1")
val xgbParam = Map(
"eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1,
"tree_method" -> "hist", "device" -> "cuda"
)
new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.fit(trainingDf)
}
}
test("Throw exception when feature/label columns are not numeric type") {
withGpuSparkSession() { spark =>
import spark.implicits._
val originalDf = trainingData.toDF(allColumnNames: _*)
var trainingDf = originalDf.withColumn("f2", col("f2").cast(StringType))
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1, "tree_method" -> "gpu_hist")
val thrown1 = intercept[IllegalArgumentException] {
new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.fit(trainingDf)
}
assert(thrown1.getMessage.contains("Column f2 must be of NumericType but found: string."))
trainingDf = originalDf.withColumn(labelName, col(labelName).cast(StringType))
val thrown2 = intercept[IllegalArgumentException] {
new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.fit(trainingDf)
}
assert(thrown2.getMessage.contains(
s"Column $labelName must be of NumericType but found: string."))
}
}
test("Throw exception when features_cols or label_col is not set") {
withGpuSparkSession() { spark =>
import spark.implicits._
val trainingDf = trainingData.toDF(allColumnNames: _*)
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1, "tree_method" -> "gpu_hist")
// GPU train requires featuresCols. If not specified,
// then NoSuchElementException will be thrown
val thrown = intercept[NoSuchElementException] {
new XGBoostClassifier(xgbParam)
.setLabelCol(labelName)
.fit(trainingDf)
}
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
val thrown1 = intercept[IllegalArgumentException] {
new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.fit(trainingDf)
}
assert(thrown1.getMessage.contains("label does not exist."))
}
}
test("Throw exception when device is not set to cuda") {
withGpuSparkSession() { spark =>
import spark.implicits._
val trainingDf = trainingData.toDF(allColumnNames: _*)
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1, "tree_method" -> "hist")
val thrown = intercept[IllegalArgumentException] {
new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.fit(trainingDf)
}
assert(thrown.getMessage.contains("GPU train requires `device` set to `cuda`"))
}
}
test("Train with eval") {
withGpuSparkSession() { spark =>
import spark.implicits._
val Array(trainingDf, eval1, eval2) = trainingData.toDF(allColumnNames: _*)
.randomSplit(Array(0.6, 0.2, 0.2), seed = 1)
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1, "tree_method" -> "gpu_hist")
val model1 = new XGBoostClassifier(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setEvalSets(Map("eval1" -> eval1, "eval2" -> eval2))
.fit(trainingDf)
assert(model1.summary.validationObjectiveHistory.length === 2)
assert(model1.summary.validationObjectiveHistory.map(_._1).toSet === Set("eval1", "eval2"))
assert(model1.summary.validationObjectiveHistory(0)._2.length === 5)
assert(model1.summary.validationObjectiveHistory(1)._2.length === 5)
assert(model1.summary.trainObjectiveHistory !== model1.summary.validationObjectiveHistory(0))
assert(model1.summary.trainObjectiveHistory !== model1.summary.validationObjectiveHistory(1))
}
}
test("test persistence of XGBoostClassifier and XGBoostClassificationModel") {
val xgbcPath = new File(tempDir.toFile, "xgbc").getPath
withGpuSparkSession() { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob",
"num_class" -> 3, "num_round" -> 5, "num_workers" -> 1, "tree_method" -> "gpu_hist",
"features_cols" -> featureNames, "label_col" -> labelName)
val xgbc = new XGBoostClassifier(xgbParam)
xgbc.write.overwrite().save(xgbcPath)
val paramMap2 = XGBoostClassifier.load(xgbcPath).MLlib2XGBoostParams
xgbParam.foreach {
case (k, v: Array[String]) =>
assert(v.sameElements(paramMap2(k).asInstanceOf[Array[String]]))
case (k, v) =>
assert(v.toString == paramMap2(k).toString)
}
}
}
test("device ordinal should not be specified") {
withGpuSparkSession() { spark =>
import spark.implicits._
val trainingDf = trainingData.toDF(allColumnNames: _*)
val params = Map(
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 5,
"num_workers" -> 1
)
val thrown = intercept[IllegalArgumentException] {
new XGBoostClassifier(params)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setDevice("cuda:1")
.fit(trainingDf)
}
assert(thrown.getMessage.contains("device given invalid value cuda:1"))
}
}
}

View File

@@ -1,258 +0,0 @@
/*
Copyright (c) 2021-2023 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.rapids.spark
import java.io.File
import ml.dmlc.xgboost4j.scala.spark.{XGBoostRegressionModel, XGBoostRegressor}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{FloatType, IntegerType, StructField, StructType}
class GpuXGBoostRegressorSuite extends GpuTestSuite {
val labelName = "label_col"
val groupName = "group_col"
val schema = StructType(Seq(
StructField(labelName, FloatType),
StructField("f1", FloatType),
StructField("f2", FloatType),
StructField("f3", FloatType),
StructField(groupName, IntegerType)))
val featureNames = schema.fieldNames.filter(s =>
!(s.equals(labelName) || s.equals(groupName)))
test("The transform result should be same for several runs on same model") {
withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "reg:squarederror",
"num_round" -> 10, "num_workers" -> 1, "tree_method" -> "hist", "device" -> "cuda",
"features_cols" -> featureNames, "label_col" -> labelName)
val Array(originalDf, testDf) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
// Get a model
val model = new XGBoostRegressor(xgbParam)
.fit(originalDf)
val left = model.transform(testDf).collect()
val right = model.transform(testDf).collect()
// The left should be same with right
assert(compareResults(true, 0.000001, left, right))
}
}
test("Tree method gpu_hist still works") {
withGpuSparkSession(enableCsvConf()) { spark =>
val params = Map(
"tree_method" -> "gpu_hist",
"features_cols" -> featureNames,
"label_col" -> labelName,
"num_round" -> 10,
"num_workers" -> 1
)
val Array(originalDf, testDf) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
// Get a model
val model = new XGBoostRegressor(params).fit(originalDf)
val left = model.transform(testDf).collect()
val right = model.transform(testDf).collect()
// The left should be same with right
assert(compareResults(true, 0.000001, left, right))
}
}
test("use weight") {
withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "reg:squarederror",
"num_round" -> 10, "num_workers" -> 1, "tree_method" -> "hist", "device" -> "cuda",
"features_cols" -> featureNames, "label_col" -> labelName)
val Array(originalDf, testDf) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val getWeightFromF1 = udf({ f1: Float => if (f1.toInt % 2 == 0) 1.0f else 0.001f })
val dfWithWeight = originalDf.withColumn("weight", getWeightFromF1(col("f1")))
val model = new XGBoostRegressor(xgbParam)
.fit(originalDf)
val model2 = new XGBoostRegressor(xgbParam)
.setWeightCol("weight")
.fit(dfWithWeight)
val left = model.transform(testDf).collect()
val right = model2.transform(testDf).collect()
// left should be different with right
assert(!compareResults(true, 0.000001, left, right))
}
}
test("Save model and transform GPU dataset") {
// Train a model on GPU
val (gpuModel, testDf) = withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "binary:logistic",
"num_round" -> 10, "num_workers" -> 1)
val Array(rawInput, testDf) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val classifier = new XGBoostRegressor(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setTreeMethod("hist")
.setDevice("cuda")
(classifier.fit(rawInput), testDf)
}
val xgbrModel = new File(tempDir.toFile, "xgbrModel").getPath
gpuModel.write.overwrite().save(xgbrModel)
val gpuModelFromFile = XGBoostRegressionModel.load(xgbrModel)
// transform on GPU
withGpuSparkSession() { spark =>
val left = gpuModel
.transform(testDf)
.select(labelName, "prediction")
.collect()
val right = gpuModelFromFile
.transform(testDf)
.select(labelName, "prediction")
.collect()
assert(compareResults(true, 0.000001, left, right))
}
}
test("Model trained on CPU can transform GPU dataset") {
// Train a model on CPU
val cpuModel = withCpuSparkSession() { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "reg:squarederror",
"num_round" -> 10, "num_workers" -> 1)
val Array(rawInput, _) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames)
.setOutputCol("features")
val trainingDf = vectorAssembler.transform(rawInput).select("features", labelName)
val classifier = new XGBoostRegressor(xgbParam)
.setFeaturesCol("features")
.setLabelCol(labelName)
.setTreeMethod("auto")
classifier.fit(trainingDf)
}
val xgbrModel = new File(tempDir.toFile, "xgbrModel").getPath
cpuModel.write.overwrite().save(xgbrModel)
val cpuModelFromFile = XGBoostRegressionModel.load(xgbrModel)
// transform on GPU
withGpuSparkSession() { spark =>
val Array(_, testDf) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
// Since CPU model does not know the information about the features cols that GPU transform
// pipeline requires. End user needs to setFeaturesCol(features: Array[String]) in the model
// manually
val thrown = intercept[NoSuchElementException](cpuModel
.transform(testDf)
.collect())
assert(thrown.getMessage.contains("Failed to find a default value for featuresCols"))
val left = cpuModel
.setFeaturesCol(featureNames)
.transform(testDf)
.collect()
val right = cpuModelFromFile
.setFeaturesCol(featureNames)
.transform(testDf)
.collect()
assert(compareResults(true, 0.000001, left, right))
}
}
test("Model trained on GPU can transform CPU dataset") {
// Train a model on GPU
val gpuModel = withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "reg:squarederror",
"num_round" -> 10, "num_workers" -> 1)
val Array(rawInput, _) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val classifier = new XGBoostRegressor(xgbParam)
.setFeaturesCol(featureNames)
.setLabelCol(labelName)
.setDevice("cuda")
classifier.fit(rawInput)
}
val xgbrModel = new File(tempDir.toFile, "xgbrModel").getPath
gpuModel.write.overwrite().save(xgbrModel)
val gpuModelFromFile = XGBoostRegressionModel.load(xgbrModel)
// transform on CPU
withCpuSparkSession() { spark =>
val Array(_, rawInput) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val featureColName = "feature_col"
val vectorAssembler = new VectorAssembler()
.setHandleInvalid("keep")
.setInputCols(featureNames)
.setOutputCol(featureColName)
val testDf = vectorAssembler.transform(rawInput).select(featureColName, labelName)
// Since GPU model does not know the information about the features col name that CPU
// transform pipeline requires. End user needs to setFeaturesCol in the model manually
intercept[IllegalArgumentException](
gpuModel
.transform(testDf)
.collect())
val left = gpuModel
.setFeaturesCol(featureColName)
.transform(testDf)
.select(labelName, "prediction")
.collect()
val right = gpuModelFromFile
.setFeaturesCol(featureColName)
.transform(testDf)
.select(labelName, "prediction")
.collect()
assert(compareResults(true, 0.000001, left, right))
}
}
test("Ranking: train with Group") {
withGpuSparkSession(enableCsvConf()) { spark =>
val xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "rank:ndcg",
"num_round" -> 10, "num_workers" -> 1, "tree_method" -> "gpu_hist",
"features_cols" -> featureNames, "label_col" -> labelName)
val Array(trainingDf, testDf) = spark.read.option("header", "true").schema(schema)
.csv(getResourcePath("/rank.train.csv")).randomSplit(Array(0.7, 0.3), seed = 1)
val model = new XGBoostRegressor(xgbParam)
.setGroupCol(groupName)
.fit(trainingDf)
val ret = model.transform(testDf).collect()
assert(testDf.count() === ret.length)
}
}
}

View File

@@ -0,0 +1,145 @@
/*
Copyright (c) 2021-2024 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.rapids.spark
import java.nio.file.{Files, Path}
import java.sql.{Date, Timestamp}
import java.util.{Locale, TimeZone}
import org.apache.spark.{GpuTestUtils, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
trait GpuTestSuite extends AnyFunSuite with TmpFolderSuite {
import SparkSessionHolder.withSparkSession
protected def getResourcePath(resource: String): String = {
require(resource.startsWith("/"), "resource must start with /")
getClass.getResource(resource).getPath
}
def enableCsvConf(): SparkConf = {
new SparkConf()
.set("spark.rapids.sql.csv.read.float.enabled", "true")
.set("spark.rapids.sql.csv.read.double.enabled", "true")
}
def withGpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
// set "spark.rapids.sql.explain" to "ALL" to check if the operators
// can be replaced by GPU
val c = conf.clone()
.set("spark.rapids.sql.enabled", "true")
withSparkSession(c, f)
}
def withCpuSparkSession[U](conf: SparkConf = new SparkConf())(f: SparkSession => U): U = {
val c = conf.clone()
.set("spark.rapids.sql.enabled", "false") // Just to be sure
withSparkSession(c, f)
}
}
trait TmpFolderSuite extends BeforeAndAfterAll {
self: AnyFunSuite =>
protected var tempDir: Path = _
override def beforeAll(): Unit = {
super.beforeAll()
tempDir = Files.createTempDirectory(getClass.getName)
}
override def afterAll(): Unit = {
JavaUtils.deleteRecursively(tempDir.toFile)
super.afterAll()
}
protected def createTmpFolder(prefix: String): Path = {
Files.createTempDirectory(tempDir, prefix)
}
}
object SparkSessionHolder extends Logging {
private var spark = createSparkSession()
private var origConf = spark.conf.getAll
private var origConfKeys = origConf.keys.toSet
private def setAllConfs(confs: Array[(String, String)]): Unit = confs.foreach {
case (key, value) if spark.conf.get(key, null) != value =>
spark.conf.set(key, value)
case _ => // No need to modify it
}
private def createSparkSession(): SparkSession = {
GpuTestUtils.cleanupAnyExistingSession()
// Timezone is fixed to UTC to allow timestamps to work by default
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
// Add Locale setting
Locale.setDefault(Locale.US)
val builder = SparkSession.builder()
.master("local[2]")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.rapids.sql.test.enabled", "false")
.config("spark.stage.maxConsecutiveAttempts", "1")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.config("spark.rapids.memory.gpu.pooling.enabled", "false") // Disable RMM for unit tests.
.config("spark.sql.files.maxPartitionBytes", "1000")
.appName("XGBoost4j-Spark-Gpu unit test")
builder.getOrCreate()
}
private def reinitSession(): Unit = {
spark = createSparkSession()
origConf = spark.conf.getAll
origConfKeys = origConf.keys.toSet
}
def sparkSession: SparkSession = {
if (SparkSession.getActiveSession.isEmpty) {
reinitSession()
}
spark
}
def resetSparkSessionConf(): Unit = {
if (SparkSession.getActiveSession.isEmpty) {
reinitSession()
} else {
setAllConfs(origConf.toArray)
val currentKeys = spark.conf.getAll.keys.toSet
val toRemove = currentKeys -- origConfKeys
toRemove.foreach(spark.conf.unset)
}
logDebug(s"RESET CONF TO: ${spark.conf.getAll}")
}
def withSparkSession[U](conf: SparkConf, f: SparkSession => U): U = {
resetSparkSessionConf
logDebug(s"SETTING CONF: ${conf.getAll.toMap}")
setAllConfs(conf.getAll)
logDebug(s"RUN WITH CONF: ${spark.conf.getAll}\n")
spark.sparkContext.setLogLevel("WARN")
f(spark)
}
}

View File

@@ -0,0 +1,523 @@
/*
Copyright (c) 2024 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.spark
import ai.rapids.cudf.Table
import ml.dmlc.xgboost4j.java.CudfColumnBatch
import ml.dmlc.xgboost4j.scala.{DMatrix, QuantileDMatrix, XGBoost => ScalaXGBoost}
import ml.dmlc.xgboost4j.scala.rapids.spark.GpuTestSuite
import ml.dmlc.xgboost4j.scala.rapids.spark.SparkSessionHolder.withSparkSession
import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.SparkConf
import java.io.File
import scala.collection.mutable.ArrayBuffer
class GpuXGBoostPluginSuite extends GpuTestSuite {
test("params") {
withGpuSparkSession() { spark =>
import spark.implicits._
val df = Seq((1.0f, 2.0f, 1.0f, 2.0f, 0.0f, 0.0f),
(2.0f, 3.0f, 2.0f, 3.0f, 1.0f, 0.1f),
(3.0f, 4.0f, 5.0f, 6.0f, 0.0f, 0.1f),
(4.0f, 5.0f, 6.0f, 7.0f, 0.0f, 0.1f),
(5.0f, 6.0f, 7.0f, 8.0f, 0.0f, 0.1f)
).toDF("c1", "c2", "weight", "margin", "label", "other")
val xgbParams: Map[String, Any] = Map(
"max_depth" -> 5,
"eta" -> 0.2,
"objective" -> "binary:logistic"
)
val features = Array("c1", "c2")
val estimator = new XGBoostClassifier(xgbParams)
.setFeaturesCol(features)
.setMissing(0.2f)
.setAlpha(0.97)
.setLeafPredictionCol("leaf")
.setContribPredictionCol("contrib")
.setNumRound(3)
.setDevice("cuda")
assert(estimator.getMaxDepth === 5)
assert(estimator.getEta === 0.2)
assert(estimator.getObjective === "binary:logistic")
assert(estimator.getFeaturesCols === features)
assert(estimator.getMissing === 0.2f)
assert(estimator.getAlpha === 0.97)
assert(estimator.getDevice === "cuda")
assert(estimator.getNumRound === 3)
estimator.setEta(0.66).setMaxDepth(7)
assert(estimator.getMaxDepth === 7)
assert(estimator.getEta === 0.66)
val model = estimator.fit(df)
assert(model.getMaxDepth === 7)
assert(model.getEta === 0.66)
assert(model.getObjective === "binary:logistic")
assert(model.getFeaturesCols === features)
assert(model.getMissing === 0.2f)
assert(model.getAlpha === 0.97)
assert(model.getLeafPredictionCol === "leaf")
assert(model.getContribPredictionCol === "contrib")
assert(model.getDevice === "cuda")
assert(model.getNumRound === 3)
}
}
test("isEnabled") {
def checkIsEnabled(spark: SparkSession, expected: Boolean): Unit = {
import spark.implicits._
val df = Seq((1.0f, 2.0f, 0.0f),
(2.0f, 3.0f, 1.0f)
).toDF("c1", "c2", "label")
val classifier = new XGBoostClassifier()
assert(classifier.getPlugin.isDefined)
assert(classifier.getPlugin.get.isEnabled(df) === expected)
}
// spark.rapids.sql.enabled is not set explicitly, default to true
withSparkSession(new SparkConf(), spark => {checkIsEnabled(spark, true)})
// set spark.rapids.sql.enabled to false
withCpuSparkSession() { spark =>
checkIsEnabled(spark, false)
}
// set spark.rapids.sql.enabled to true
withGpuSparkSession() { spark =>
checkIsEnabled(spark, true)
}
}
test("parameter validation") {
withGpuSparkSession() { spark =>
import spark.implicits._
val df = Seq((1.0f, 2.0f, 1.0f, 2.0f, 0.0f, 0.0f),
(2.0f, 3.0f, 2.0f, 3.0f, 1.0f, 0.1f),
(3.0f, 4.0f, 5.0f, 6.0f, 0.0f, 0.1f),
(4.0f, 5.0f, 6.0f, 7.0f, 0.0f, 0.1f),
(5.0f, 6.0f, 7.0f, 8.0f, 0.0f, 0.1f)
).toDF("c1", "c2", "weight", "margin", "label", "other")
val classifier = new XGBoostClassifier()
val plugin = classifier.getPlugin.get.asInstanceOf[GpuXGBoostPlugin]
intercept[IllegalArgumentException] {
plugin.validate(classifier, df)
}
classifier.setDevice("cuda")
plugin.validate(classifier, df)
classifier.setDevice("gpu")
plugin.validate(classifier, df)
classifier.setDevice("cpu")
classifier.setTreeMethod("gpu_hist")
plugin.validate(classifier, df)
}
}
test("preprocess") {
withGpuSparkSession() { spark =>
import spark.implicits._
val df = Seq((1.0f, 2.0f, 1.0f, 2.0f, 0.0f, 0.0f),
(2.0f, 3.0f, 2.0f, 3.0f, 1.0f, 0.1f),
(3.0f, 4.0f, 5.0f, 6.0f, 0.0f, 0.1f),
(4.0f, 5.0f, 6.0f, 7.0f, 0.0f, 0.1f),
(5.0f, 6.0f, 7.0f, 8.0f, 0.0f, 0.1f)
).toDF("c1", "c2", "weight", "margin", "label", "other")
.repartition(5)
assert(df.schema.names.contains("other"))
assert(df.rdd.getNumPartitions === 5)
val features = Array("c1", "c2")
var classifier = new XGBoostClassifier()
.setNumWorkers(3)
.setFeaturesCol(features)
assert(classifier.getPlugin.isDefined)
assert(classifier.getPlugin.get.isInstanceOf[GpuXGBoostPlugin])
var out = classifier.getPlugin.get.asInstanceOf[GpuXGBoostPlugin]
.preprocess(classifier, df)
assert(out.schema.names.contains("c1") && out.schema.names.contains("c2"))
assert(out.schema.names.contains(classifier.getLabelCol))
assert(!out.schema.names.contains("weight") && !out.schema.names.contains("margin"))
assert(out.rdd.getNumPartitions === 3)
classifier = new XGBoostClassifier()
.setNumWorkers(4)
.setFeaturesCol(features)
.setWeightCol("weight")
.setBaseMarginCol("margin")
.setDevice("cuda")
out = classifier.getPlugin.get.asInstanceOf[GpuXGBoostPlugin]
.preprocess(classifier, df)
assert(out.schema.names.contains("c1") && out.schema.names.contains("c2"))
assert(out.schema.names.contains(classifier.getLabelCol))
assert(out.schema.names.contains("weight") && out.schema.names.contains("margin"))
assert(out.rdd.getNumPartitions === 4)
}
}
// test distributed
test("build RDD Watches") {
withGpuSparkSession() { spark =>
import spark.implicits._
// dataPoint -> (missing, rowNum, nonMissing)
Map(0.0f -> (0.0f, 5, 9), Float.NaN -> (0.0f, 5, 9)).foreach {
case (data, (missing, expectedRowNum, expectedNonMissing)) =>
val df = Seq(
(1.0f, 2.0f, 1.0f, 2.0f, 0.0f, 0.0f),
(2.0f, 3.0f, 2.0f, 3.0f, 1.0f, 0.1f),
(3.0f, data, 5.0f, 6.0f, 0.0f, 0.1f),
(4.0f, 5.0f, 6.0f, 7.0f, 0.0f, 0.1f),
(5.0f, 6.0f, 7.0f, 8.0f, 1.0f, 0.1f)
).toDF("c1", "c2", "weight", "margin", "label", "other")
val features = Array("c1", "c2")
val classifier = new XGBoostClassifier()
.setNumWorkers(2)
.setWeightCol("weight")
.setBaseMarginCol("margin")
.setFeaturesCol(features)
.setDevice("cuda")
.setMissing(missing)
val rdd = classifier.getPlugin.get.buildRddWatches(classifier, df)
val result = rdd.mapPartitions { iter =>
val watches = iter.next()
val size = watches.size
val labels = watches.datasets(0).getLabel
val weight = watches.datasets(0).getWeight
val margins = watches.datasets(0).getBaseMargin
val rowNumber = watches.datasets(0).rowNum
val nonMissing = watches.datasets(0).nonMissingNum
Iterator.single(size, rowNumber, nonMissing, labels, weight, margins)
}.collect()
val labels: ArrayBuffer[Float] = ArrayBuffer.empty
val weight: ArrayBuffer[Float] = ArrayBuffer.empty
val margins: ArrayBuffer[Float] = ArrayBuffer.empty
val rowNumber: ArrayBuffer[Long] = ArrayBuffer.empty
val nonMissing: ArrayBuffer[Long] = ArrayBuffer.empty
for (row <- result) {
assert(row._1 === 1)
rowNumber.append(row._2)
nonMissing.append(row._3)
labels.append(row._4: _*)
weight.append(row._5: _*)
margins.append(row._6: _*)
}
assert(labels.sorted === Array(0.0f, 1.0f, 0.0f, 0.0f, 1.0f).sorted)
assert(weight.sorted === Array(1.0f, 2.0f, 5.0f, 6.0f, 7.0f).sorted)
assert(margins.sorted === Array(2.0f, 3.0f, 6.0f, 7.0f, 8.0f).sorted)
assert(rowNumber.sum === expectedRowNum)
assert(nonMissing.sum === expectedNonMissing)
}
}
}
test("build RDD Watches with Eval") {
withGpuSparkSession() { spark =>
import spark.implicits._
val train = Seq(
(1.0f, 2.0f, 1.0f, 2.0f, 0.0f, 0.0f),
(2.0f, 3.0f, 2.0f, 3.0f, 1.0f, 0.1f)
).toDF("c1", "c2", "weight", "margin", "label", "other")
// dataPoint -> (missing, rowNum, nonMissing)
Map(0.0f -> (0.0f, 5, 9), Float.NaN -> (0.0f, 5, 9)).foreach {
case (data, (missing, expectedRowNum, expectedNonMissing)) =>
val eval = Seq(
(1.0f, 2.0f, 1.0f, 2.0f, 0.0f, 0.0f),
(2.0f, 3.0f, 2.0f, 3.0f, 1.0f, 0.1f),
(3.0f, data, 5.0f, 6.0f, 0.0f, 0.1f),
(4.0f, 5.0f, 6.0f, 7.0f, 0.0f, 0.1f),
(5.0f, 6.0f, 7.0f, 8.0f, 1.0f, 0.1f)
).toDF("c1", "c2", "weight", "margin", "label", "other")
val features = Array("c1", "c2")
val classifier = new XGBoostClassifier()
.setNumWorkers(2)
.setWeightCol("weight")
.setBaseMarginCol("margin")
.setFeaturesCol(features)
.setDevice("cuda")
.setMissing(missing)
.setEvalDataset(eval)
val rdd = classifier.getPlugin.get.buildRddWatches(classifier, train)
val result = rdd.mapPartitions { iter =>
val watches = iter.next()
val size = watches.size
val labels = watches.datasets(1).getLabel
val weight = watches.datasets(1).getWeight
val margins = watches.datasets(1).getBaseMargin
val rowNumber = watches.datasets(1).rowNum
val nonMissing = watches.datasets(1).nonMissingNum
Iterator.single(size, rowNumber, nonMissing, labels, weight, margins)
}.collect()
val labels: ArrayBuffer[Float] = ArrayBuffer.empty
val weight: ArrayBuffer[Float] = ArrayBuffer.empty
val margins: ArrayBuffer[Float] = ArrayBuffer.empty
val rowNumber: ArrayBuffer[Long] = ArrayBuffer.empty
val nonMissing: ArrayBuffer[Long] = ArrayBuffer.empty
for (row <- result) {
assert(row._1 === 2)
rowNumber.append(row._2)
nonMissing.append(row._3)
labels.append(row._4: _*)
weight.append(row._5: _*)
margins.append(row._6: _*)
}
assert(labels.sorted === Array(0.0f, 1.0f, 0.0f, 0.0f, 1.0f).sorted)
assert(weight.sorted === Array(1.0f, 2.0f, 5.0f, 6.0f, 7.0f).sorted)
assert(margins.sorted === Array(2.0f, 3.0f, 6.0f, 7.0f, 8.0f).sorted)
assert(rowNumber.sum === expectedRowNum)
assert(nonMissing.sum === expectedNonMissing)
}
}
}
test("transformed schema") {
withGpuSparkSession() { spark =>
import spark.implicits._
val df = Seq(
(1.0f, 2.0f, 1.0f, 2.0f, 0.0f, 0.0f),
(2.0f, 3.0f, 2.0f, 3.0f, 1.0f, 0.1f),
(3.0f, 4.0f, 5.0f, 6.0f, 0.0f, 0.1f),
(4.0f, 5.0f, 6.0f, 7.0f, 0.0f, 0.1f),
(5.0f, 6.0f, 7.0f, 8.0f, 1.0f, 0.1f)
).toDF("c1", "c2", "weight", "margin", "label", "other")
val estimator = new XGBoostClassifier()
.setNumWorkers(1)
.setNumRound(2)
.setFeaturesCol(Array("c1", "c2"))
.setLabelCol("label")
.setDevice("cuda")
assert(estimator.getPlugin.isDefined && estimator.getPlugin.get.isEnabled(df))
val out = estimator.fit(df).transform(df)
// Transform should not discard the other columns of the transforming dataframe
Seq("c1", "c2", "weight", "margin", "label", "other").foreach { v =>
assert(out.schema.names.contains(v))
}
// Transform for XGBoostClassifier needs to add extra columns
Seq("rawPrediction", "probability", "prediction").foreach { v =>
assert(out.schema.names.contains(v))
}
assert(out.schema.names.length === 9)
val out1 = estimator.setLeafPredictionCol("leaf").setContribPredictionCol("contrib")
.fit(df)
.transform(df)
Seq("leaf", "contrib").foreach { v =>
assert(out1.schema.names.contains(v))
}
}
}
private def checkEqual(left: Array[Array[Float]],
right: Array[Array[Float]],
epsilon: Float = 1e-4f): Unit = {
assert(left.size === right.size)
left.zip(right).foreach { case (leftValue, rightValue) =>
leftValue.zip(rightValue).foreach { case (l, r) =>
assert(math.abs(l - r) < epsilon)
}
}
}
Seq("binary:logistic", "multi:softprob").foreach { case objective =>
test(s"$objective: XGBoost-Spark should match xgboost4j") {
withGpuSparkSession() { spark =>
import spark.implicits._
val numRound = 100
var xgboostParams: Map[String, Any] = Map(
"objective" -> objective,
"device" -> "cuda"
)
val (trainPath, testPath) = if (objective == "binary:logistic") {
(writeFile(Classification.train.toDF("label", "weight", "c1", "c2", "c3")),
writeFile(Classification.test.toDF("label", "weight", "c1", "c2", "c3")))
} else {
xgboostParams = xgboostParams ++ Map("num_class" -> 6)
(writeFile(MultiClassification.train.toDF("label", "weight", "c1", "c2", "c3")),
writeFile(MultiClassification.test.toDF("label", "weight", "c1", "c2", "c3")))
}
val df = spark.read.parquet(trainPath)
val testdf = spark.read.parquet(testPath)
val features = Array("c1", "c2", "c3")
val featuresIndices = features.map(df.schema.fieldIndex)
val label = "label"
val classifier = new XGBoostClassifier(xgboostParams)
.setFeaturesCol(features)
.setLabelCol(label)
.setNumRound(numRound)
.setLeafPredictionCol("leaf")
.setContribPredictionCol("contrib")
.setDevice("cuda")
val xgb4jModel = withResource(new GpuColumnBatch(
Table.readParquet(new File(trainPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices),
batch.select(df.schema.fieldIndex(label)), null, null, null
)
val qdm = new QuantileDMatrix(Seq(cb).iterator, classifier.getMissing,
classifier.getMaxBins, classifier.getNthread)
ScalaXGBoost.train(qdm, xgboostParams, numRound)
}
val (xgb4jLeaf, xgb4jContrib, xgb4jProb, xgb4jRaw) = withResource(new GpuColumnBatch(
Table.readParquet(new File(testPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
)
val qdm = new DMatrix(cb, classifier.getMissing, classifier.getNthread)
(xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
xgb4jModel.predict(qdm), xgb4jModel.predict(qdm, outPutMargin = true))
}
val rows = classifier.fit(df).transform(testdf).collect()
// Check Leaf
val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
checkEqual(xgb4jLeaf, xgbSparkLeaf)
// Check contrib
val xgbSparkContrib = rows.map(row =>
row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
checkEqual(xgb4jContrib, xgbSparkContrib)
// Check probability
var xgbSparkProb = rows.map(row =>
row.getAs[DenseVector]("probability").toArray.map(_.toFloat))
if (objective == "binary:logistic") {
xgbSparkProb = xgbSparkProb.map(v => Array(v(1)))
}
checkEqual(xgb4jProb, xgbSparkProb)
// Check raw
var xgbSparkRaw = rows.map(row =>
row.getAs[DenseVector]("rawPrediction").toArray.map(_.toFloat))
if (objective == "binary:logistic") {
xgbSparkRaw = xgbSparkRaw.map(v => Array(v(1)))
}
checkEqual(xgb4jRaw, xgbSparkRaw)
}
}
}
test(s"Regression: XGBoost-Spark should match xgboost4j") {
withGpuSparkSession() { spark =>
import spark.implicits._
val trainPath = writeFile(Regression.train.toDF("label", "weight", "c1", "c2", "c3"))
val testPath = writeFile(Regression.test.toDF("label", "weight", "c1", "c2", "c3"))
val df = spark.read.parquet(trainPath)
val testdf = spark.read.parquet(testPath)
val features = Array("c1", "c2", "c3")
val featuresIndices = features.map(df.schema.fieldIndex)
val label = "label"
val numRound = 100
val xgboostParams: Map[String, Any] = Map(
"device" -> "cuda"
)
val regressor = new XGBoostRegressor(xgboostParams)
.setFeaturesCol(features)
.setLabelCol(label)
.setNumRound(numRound)
.setLeafPredictionCol("leaf")
.setContribPredictionCol("contrib")
.setDevice("cuda")
val xgb4jModel = withResource(new GpuColumnBatch(
Table.readParquet(new File(trainPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices),
batch.select(df.schema.fieldIndex(label)), null, null, null
)
val qdm = new QuantileDMatrix(Seq(cb).iterator, regressor.getMissing,
regressor.getMaxBins, regressor.getNthread)
ScalaXGBoost.train(qdm, xgboostParams, numRound)
}
val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch(
Table.readParquet(new File(testPath)))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null
)
val qdm = new DMatrix(cb, regressor.getMissing, regressor.getNthread)
(xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm),
xgb4jModel.predict(qdm))
}
val rows = regressor.fit(df).transform(testdf).collect()
// Check Leaf
val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat))
checkEqual(xgb4jLeaf, xgbSparkLeaf)
// Check contrib
val xgbSparkContrib = rows.map(row =>
row.getAs[DenseVector]("contrib").toArray.map(_.toFloat))
checkEqual(xgb4jContrib, xgbSparkContrib)
// Check prediction
val xgbSparkPred = rows.map(row =>
Array(row.getAs[Double]("prediction").toFloat))
checkEqual(xgb4jPred, xgbSparkPred)
}
}
def writeFile(df: Dataset[_]): String = {
def listFiles(directory: String): Array[String] = {
val dir = new File(directory)
if (dir.exists && dir.isDirectory) {
dir.listFiles.filter(f => f.isFile && f.getName.startsWith("part-")).map(_.getName)
} else {
Array.empty[String]
}
}
val dir = createTmpFolder("gpu_").toAbsolutePath.toString
df.coalesce(1).write.parquet(s"$dir/data")
val file = listFiles(s"$dir/data")(0)
s"$dir/data/$file"
}
}

View File

@@ -0,0 +1,86 @@
/*
Copyright (c) 2014-2024 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.spark
import scala.util.Random
trait TrainTestData {
protected def generateClassificationDataset(
numRows: Int,
numClass: Int,
seed: Int = 1): Seq[(Int, Float, Float, Float, Float)] = {
val random = new Random()
random.setSeed(seed)
(1 to numRows).map { _ =>
val label = random.nextInt(numClass)
// label, weight, c1, c2, c3
(label, random.nextFloat().abs, random.nextGaussian().toFloat, random.nextGaussian().toFloat,
random.nextGaussian().toFloat)
}
}
protected def generateRegressionDataset(
numRows: Int,
seed: Int = 11): Seq[(Float, Float, Float, Float, Float)] = {
val random = new Random()
random.setSeed(seed)
(1 to numRows).map { _ =>
// label, weight, c1, c2, c3
(random.nextFloat(), random.nextFloat().abs, random.nextGaussian().toFloat,
random.nextGaussian().toFloat,
random.nextGaussian().toFloat)
}
}
protected def generateRankDataset(
numRows: Int,
numClass: Int,
maxGroup: Int = 12,
seed: Int = 99): Seq[(Int, Float, Int, Float, Float, Float)] = {
val random = new Random()
random.setSeed(seed)
(1 to numRows).map { _ =>
val group = random.nextInt(maxGroup)
// label, weight, group, c1, c2, c3
(random.nextInt(numClass), group.toFloat, group,
random.nextGaussian().toFloat,
random.nextGaussian().toFloat,
random.nextGaussian().toFloat)
}
}
}
object Classification extends TrainTestData {
val train = generateClassificationDataset(300, 2, 3)
val test = generateClassificationDataset(150, 2, 5)
}
object MultiClassification extends TrainTestData {
val train = generateClassificationDataset(300, 4, 11)
val test = generateClassificationDataset(150, 4, 12)
}
object Regression extends TrainTestData {
val train = generateRegressionDataset(300, 222)
val test = generateRegressionDataset(150, 223)
}
object Ranking extends TrainTestData {
val train = generateRankDataset(300, 10, 555)
val test = generateRankDataset(150, 10, 556)
}