[BLOCKING][jvm-packages] fix non-deterministic order within a partition (in the case of an upstream shuffle) on prediction (#4388)
* [jvm-packages][hot-fix] fix column mismatch caused by zip actions at XGBooostModel.transformInternal * apply minibatch in prediction * an iterator-compatible minibatch prediction * regressor impl * continuous working on mini-batch prediction of xgboost4j-spark * Update Booster.java
This commit is contained in:
@@ -16,30 +16,26 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import scala.collection.Iterator
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
import ml.dmlc.xgboost4j.java.Rabit
|
||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, XGBoost => SXGBoost}
|
||||
import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait}
|
||||
import ml.dmlc.xgboost4j.scala.spark.params._
|
||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, EvalTrait, ObjectiveTrait, XGBoost => SXGBoost}
|
||||
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.ml.classification._
|
||||
import org.apache.spark.ml.linalg._
|
||||
import org.apache.spark.ml.param._
|
||||
import org.apache.spark.ml.param.shared.HasWeightCol
|
||||
import org.apache.spark.ml.util._
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql._
|
||||
import org.json4s.DefaultFormats
|
||||
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.{AbstractIterator, Iterator, mutable}
|
||||
|
||||
private[spark] trait XGBoostClassifierParams extends GeneralParams with LearningTaskParams
|
||||
with BoosterParams with HasWeightCol with HasBaseMarginCol with HasNumClass with ParamMapFuncs
|
||||
@@ -216,7 +212,8 @@ class XGBoostClassificationModel private[ml](
|
||||
override val numClasses: Int,
|
||||
private[spark] val _booster: Booster)
|
||||
extends ProbabilisticClassificationModel[Vector, XGBoostClassificationModel]
|
||||
with XGBoostClassifierParams with MLWritable with Serializable {
|
||||
with XGBoostClassifierParams with InferenceParams
|
||||
with MLWritable with Serializable {
|
||||
|
||||
import XGBoostClassificationModel._
|
||||
|
||||
@@ -250,6 +247,8 @@ class XGBoostClassificationModel private[ml](
|
||||
|
||||
def setTreeLimit(value: Int): this.type = set(treeLimit, value)
|
||||
|
||||
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
|
||||
|
||||
/**
|
||||
* Single instance prediction.
|
||||
* Note: The performance is not ideal, use it carefully!
|
||||
@@ -287,46 +286,53 @@ class XGBoostClassificationModel private[ml](
|
||||
val bBooster = dataset.sparkSession.sparkContext.broadcast(_booster)
|
||||
val appName = dataset.sparkSession.sparkContext.appName
|
||||
|
||||
val inputRDD = dataset.asInstanceOf[Dataset[Row]].rdd
|
||||
val predictionRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
|
||||
if (rowIterator.hasNext) {
|
||||
val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap
|
||||
Rabit.init(rabitEnv.asJava)
|
||||
val featuresIterator = rowIterator.map(row => row.getAs[Vector](
|
||||
$(featuresCol))).toList.iterator
|
||||
import DataUtils._
|
||||
val cacheInfo = {
|
||||
if ($(useExternalMemory)) {
|
||||
s"$appName-${TaskContext.get().stageId()}-dtest_cache-${TaskContext.getPartitionId()}"
|
||||
} else {
|
||||
null
|
||||
val resultRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
|
||||
new AbstractIterator[Row] {
|
||||
private var batchCnt = 0
|
||||
|
||||
private val batchIterImpl = rowIterator.grouped($(inferBatchSize)).flatMap { batchRow =>
|
||||
if (batchCnt == 0) {
|
||||
val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap
|
||||
Rabit.init(rabitEnv.asJava)
|
||||
}
|
||||
|
||||
val features = batchRow.iterator.map(row => row.getAs[Vector]($(featuresCol)))
|
||||
|
||||
import DataUtils._
|
||||
val cacheInfo = {
|
||||
if ($(useExternalMemory)) {
|
||||
s"$appName-${TaskContext.get().stageId()}-dtest_cache-" +
|
||||
s"${TaskContext.getPartitionId()}-batch-$batchCnt"
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
val dm = new DMatrix(
|
||||
XGBoost.processMissingValues(features.map(_.asXGB), $(missing)),
|
||||
cacheInfo)
|
||||
try {
|
||||
val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) =
|
||||
producePredictionItrs(bBooster, dm)
|
||||
produceResultIterator(batchRow.iterator,
|
||||
rawPredictionItr, probabilityItr, predLeafItr, predContribItr)
|
||||
} finally {
|
||||
batchCnt += 1
|
||||
dm.delete()
|
||||
}
|
||||
}
|
||||
val dm = new DMatrix(
|
||||
XGBoost.processMissingValues(featuresIterator.map(_.asXGB), $(missing)),
|
||||
cacheInfo)
|
||||
try {
|
||||
val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) =
|
||||
producePredictionItrs(bBooster, dm)
|
||||
Rabit.shutdown()
|
||||
Iterator(rawPredictionItr, probabilityItr, predLeafItr,
|
||||
predContribItr)
|
||||
} finally {
|
||||
dm.delete()
|
||||
|
||||
override def hasNext: Boolean = batchIterImpl.hasNext
|
||||
|
||||
override def next(): Row = {
|
||||
val ret = batchIterImpl.next()
|
||||
if (!batchIterImpl.hasNext) {
|
||||
Rabit.shutdown()
|
||||
}
|
||||
ret
|
||||
}
|
||||
} else {
|
||||
Iterator()
|
||||
}
|
||||
}
|
||||
val resultRDD = inputRDD.zipPartitions(predictionRDD, preservesPartitioning = true) {
|
||||
case (inputIterator, predictionItr) =>
|
||||
if (inputIterator.hasNext) {
|
||||
produceResultIterator(inputIterator, predictionItr.next(), predictionItr.next(),
|
||||
predictionItr.next(), predictionItr.next())
|
||||
} else {
|
||||
Iterator()
|
||||
}
|
||||
}
|
||||
|
||||
bBooster.unpersist(blocking = false)
|
||||
dataset.sparkSession.createDataFrame(resultRDD, generateResultSchema(schema))
|
||||
@@ -527,4 +533,3 @@ object XGBoostClassificationModel extends MLReadable[XGBoostClassificationModel]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,10 +16,10 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import scala.collection.Iterator
|
||||
import scala.collection.{AbstractIterator, Iterator, mutable}
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import ml.dmlc.xgboost4j.java.Rabit
|
||||
import ml.dmlc.xgboost4j.java.{Rabit, XGBoost => JXGBoost}
|
||||
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||
import ml.dmlc.xgboost4j.scala.spark.params.{DefaultXGBoostParamsReader, _}
|
||||
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, XGBoost => SXGBoost}
|
||||
@@ -37,7 +37,7 @@ import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.types._
|
||||
import org.json4s.DefaultFormats
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
|
||||
@@ -207,7 +207,8 @@ class XGBoostRegressionModel private[ml] (
|
||||
override val uid: String,
|
||||
private[spark] val _booster: Booster)
|
||||
extends PredictionModel[Vector, XGBoostRegressionModel]
|
||||
with XGBoostRegressorParams with MLWritable with Serializable {
|
||||
with XGBoostRegressorParams with InferenceParams
|
||||
with MLWritable with Serializable {
|
||||
|
||||
import XGBoostRegressionModel._
|
||||
|
||||
@@ -241,6 +242,8 @@ class XGBoostRegressionModel private[ml] (
|
||||
|
||||
def setTreeLimit(value: Int): this.type = set(treeLimit, value)
|
||||
|
||||
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
|
||||
|
||||
/**
|
||||
* Single instance prediction.
|
||||
* Note: The performance is not ideal, use it carefully!
|
||||
@@ -259,45 +262,53 @@ class XGBoostRegressionModel private[ml] (
|
||||
|
||||
val bBooster = dataset.sparkSession.sparkContext.broadcast(_booster)
|
||||
val appName = dataset.sparkSession.sparkContext.appName
|
||||
val inputRDD = dataset.asInstanceOf[Dataset[Row]].rdd
|
||||
val predictionRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
|
||||
if (rowIterator.hasNext) {
|
||||
val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap
|
||||
Rabit.init(rabitEnv.asJava)
|
||||
val featuresIterator = rowIterator.map(row => row.getAs[Vector](
|
||||
$(featuresCol))).toList.iterator
|
||||
import DataUtils._
|
||||
val cacheInfo = {
|
||||
if ($(useExternalMemory)) {
|
||||
s"$appName-${TaskContext.get().stageId()}-dtest_cache-${TaskContext.getPartitionId()}"
|
||||
} else {
|
||||
null
|
||||
|
||||
val resultRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator =>
|
||||
new AbstractIterator[Row] {
|
||||
private var batchCnt = 0
|
||||
|
||||
private val batchIterImpl = rowIterator.grouped($(inferBatchSize)).flatMap { batchRow =>
|
||||
if (batchCnt == 0) {
|
||||
val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap
|
||||
Rabit.init(rabitEnv.asJava)
|
||||
}
|
||||
|
||||
val features = batchRow.iterator.map(row => row.getAs[Vector]($(featuresCol)))
|
||||
|
||||
import DataUtils._
|
||||
val cacheInfo = {
|
||||
if ($(useExternalMemory)) {
|
||||
s"$appName-${TaskContext.get().stageId()}-dtest_cache-" +
|
||||
s"${TaskContext.getPartitionId()}-batch-$batchCnt"
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
val dm = new DMatrix(
|
||||
XGBoost.processMissingValues(features.map(_.asXGB), $(missing)),
|
||||
cacheInfo)
|
||||
try {
|
||||
val Array(rawPredictionItr, predLeafItr, predContribItr) =
|
||||
producePredictionItrs(bBooster, dm)
|
||||
produceResultIterator(batchRow.iterator, rawPredictionItr, predLeafItr, predContribItr)
|
||||
} finally {
|
||||
batchCnt += 1
|
||||
dm.delete()
|
||||
}
|
||||
}
|
||||
val dm = new DMatrix(
|
||||
XGBoost.processMissingValues(featuresIterator.map(_.asXGB), $(missing)),
|
||||
cacheInfo)
|
||||
try {
|
||||
val Array(originalPredictionItr, predLeafItr, predContribItr) =
|
||||
producePredictionItrs(bBooster, dm)
|
||||
Rabit.shutdown()
|
||||
Iterator(originalPredictionItr, predLeafItr, predContribItr)
|
||||
} finally {
|
||||
dm.delete()
|
||||
|
||||
override def hasNext: Boolean = batchIterImpl.hasNext
|
||||
|
||||
override def next(): Row = {
|
||||
val ret = batchIterImpl.next()
|
||||
if (!batchIterImpl.hasNext) {
|
||||
Rabit.shutdown()
|
||||
}
|
||||
ret
|
||||
}
|
||||
} else {
|
||||
Iterator()
|
||||
}
|
||||
}
|
||||
val resultRDD = inputRDD.zipPartitions(predictionRDD, preservesPartitioning = true) {
|
||||
case (inputIterator, predictionItr) =>
|
||||
if (inputIterator.hasNext) {
|
||||
produceResultIterator(inputIterator, predictionItr.next(), predictionItr.next(),
|
||||
predictionItr.next())
|
||||
} else {
|
||||
Iterator()
|
||||
}
|
||||
}
|
||||
bBooster.unpersist(blocking = false)
|
||||
dataset.sparkSession.createDataFrame(resultRDD, generateResultSchema(schema))
|
||||
}
|
||||
@@ -347,14 +358,14 @@ class XGBoostRegressionModel private[ml] (
|
||||
resultSchema
|
||||
}
|
||||
|
||||
private def producePredictionItrs(broadcastBooster: Broadcast[Booster], dm: DMatrix):
|
||||
private def producePredictionItrs(booster: Broadcast[Booster], dm: DMatrix):
|
||||
Array[Iterator[Row]] = {
|
||||
val originalPredictionItr = {
|
||||
broadcastBooster.value.predict(dm, outPutMargin = false, $(treeLimit)).map(Row(_)).iterator
|
||||
booster.value.predict(dm, outPutMargin = false, $(treeLimit)).map(Row(_)).iterator
|
||||
}
|
||||
val predLeafItr = {
|
||||
if (isDefined(leafPredictionCol)) {
|
||||
broadcastBooster.value.predictLeaf(dm, $(treeLimit)).
|
||||
booster.value.predictLeaf(dm, $(treeLimit)).
|
||||
map(Row(_)).iterator
|
||||
} else {
|
||||
Iterator()
|
||||
@@ -362,7 +373,7 @@ class XGBoostRegressionModel private[ml] (
|
||||
}
|
||||
val predContribItr = {
|
||||
if (isDefined(contribPredictionCol)) {
|
||||
broadcastBooster.value.predictContrib(dm, $(treeLimit)).
|
||||
booster.value.predictContrib(dm, $(treeLimit)).
|
||||
map(Row(_)).iterator
|
||||
} else {
|
||||
Iterator()
|
||||
@@ -373,7 +384,6 @@ class XGBoostRegressionModel private[ml] (
|
||||
|
||||
override def transform(dataset: Dataset[_]): DataFrame = {
|
||||
transformSchema(dataset.schema, logging = true)
|
||||
|
||||
// Output selected columns only.
|
||||
// This is a bit complicated since it tries to avoid repeated computation.
|
||||
var outputData = transformInternal(dataset)
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark.params
|
||||
|
||||
import org.apache.spark.ml.param.{IntParam, Params}
|
||||
|
||||
private[spark] trait InferenceParams extends Params {
|
||||
|
||||
/**
|
||||
* batch size of inference iteration
|
||||
*/
|
||||
final val inferBatchSize = new IntParam(this, "batchSize", "batch size of inference iteration")
|
||||
|
||||
/** @group getParam */
|
||||
final def getInferBatchSize: Int = ${inferBatchSize}
|
||||
|
||||
setDefault(inferBatchSize, 32 << 10)
|
||||
}
|
||||
Reference in New Issue
Block a user