[jvm-packages] Allow for bypassing spark missing value check (#4805)

* Allow for bypassing spark missing value check

* Update documentation for dealing with missing values in spark xgboost
This commit is contained in:
cpfarrell
2019-12-18 10:48:20 -08:00
committed by Nan Zhu
parent 27b3646d29
commit bc9d88259f
6 changed files with 134 additions and 40 deletions

View File

@@ -69,6 +69,7 @@ private[this] case class XGBoostExecutionParams(
obj: ObjectiveTrait,
eval: EvalTrait,
missing: Float,
allowNonZeroForMissing: Boolean,
trackerConf: TrackerConf,
timeoutRequestWorkers: Long,
checkpointParam: CheckpointParam,
@@ -162,6 +163,7 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
val obj = overridedParams.getOrElse("custom_obj", null).asInstanceOf[ObjectiveTrait]
val eval = overridedParams.getOrElse("custom_eval", null).asInstanceOf[EvalTrait]
val missing = overridedParams.getOrElse("missing", Float.NaN).asInstanceOf[Float]
val allowNonZeroForMissing = overridedParams.getOrElse("allow_non_zero_for_missing", false).asInstanceOf[Boolean]
validateSparkSslConf
if (overridedParams.contains("tree_method")) {
@@ -212,7 +214,7 @@ private[this] class XGBoostExecutionParamsFactory(rawParams: Map[String, Any], s
.asInstanceOf[Boolean]
val xgbExecParam = XGBoostExecutionParams(nWorkers, round, useExternalMemory, obj, eval,
missing, trackerConf,
missing, allowNonZeroForMissing, trackerConf,
timeoutRequestWorkers,
checkpointParam,
inputParams,
@@ -255,14 +257,19 @@ private[spark] case class XGBLabeledPointGroup(
object XGBoost extends Serializable {
private val logger = LogFactory.getLog("XGBoostSpark")
private def verifyMissingSetting(xgbLabelPoints: Iterator[XGBLabeledPoint], missing: Float):
Iterator[XGBLabeledPoint] = {
if (missing != 0.0f) {
private def verifyMissingSetting(
xgbLabelPoints: Iterator[XGBLabeledPoint],
missing: Float,
allowNonZeroMissingValue: Boolean): Iterator[XGBLabeledPoint] = {
if (missing != 0.0f && !allowNonZeroMissingValue) {
xgbLabelPoints.map(labeledPoint => {
if (labeledPoint.indices != null) {
throw new RuntimeException(s"you can only specify missing value as 0.0 (the currently" +
s" set value $missing) when you have SparseVector or Empty vector as your feature" +
" format")
s" format. If you didn't use Spark's VectorAssembler class to build your feature " +
s"vector but instead did so in a way that preserves zeros in your feature vector " +
s"you can avoid this check by using the 'allow_non_zero_missing_value parameter'" +
s" (only use if you know what you are doing)")
}
labeledPoint
})
@@ -288,22 +295,28 @@ object XGBoost extends Serializable {
private[spark] def processMissingValues(
xgbLabelPoints: Iterator[XGBLabeledPoint],
missing: Float): Iterator[XGBLabeledPoint] = {
missing: Float,
allowNonZeroMissingValue: Boolean): Iterator[XGBLabeledPoint] = {
if (!missing.isNaN) {
removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing),
removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing, allowNonZeroMissingValue),
missing, (v: Float) => v != missing)
} else {
removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing),
removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing, allowNonZeroMissingValue),
missing, (v: Float) => !v.isNaN)
}
}
private def processMissingValuesWithGroup(
xgbLabelPointGroups: Iterator[Array[XGBLabeledPoint]],
missing: Float): Iterator[Array[XGBLabeledPoint]] = {
missing: Float,
allowNonZeroMissingValue: Boolean): Iterator[Array[XGBLabeledPoint]] = {
if (!missing.isNaN) {
xgbLabelPointGroups.map {
labeledPoints => XGBoost.processMissingValues(labeledPoints.iterator, missing).toArray
labeledPoints => XGBoost.processMissingValues(
labeledPoints.iterator,
missing,
allowNonZeroMissingValue
).toArray
}
} else {
xgbLabelPointGroups
@@ -428,7 +441,7 @@ object XGBoost extends Serializable {
if (evalSetsMap.isEmpty) {
trainingData.mapPartitions(labeledPoints => {
val watches = Watches.buildWatches(xgbExecutionParams,
processMissingValues(labeledPoints, xgbExecutionParams.missing),
processMissingValues(labeledPoints, xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing),
getCacheDirName(xgbExecutionParams.useExternalMemory))
buildDistributedBooster(watches, xgbExecutionParams, rabitEnv, checkpointRound,
xgbExecutionParams.obj, xgbExecutionParams.eval, prevBooster)
@@ -440,7 +453,7 @@ object XGBoost extends Serializable {
val watches = Watches.buildWatches(
nameAndLabeledPointSets.map {
case (name, iter) => (name, processMissingValues(iter,
xgbExecutionParams.missing))
xgbExecutionParams.missing, xgbExecutionParams.allowNonZeroForMissing))
},
getCacheDirName(xgbExecutionParams.useExternalMemory))
buildDistributedBooster(watches, xgbExecutionParams, rabitEnv, checkpointRound,
@@ -459,7 +472,7 @@ object XGBoost extends Serializable {
if (evalSetsMap.isEmpty) {
trainingData.mapPartitions(labeledPointGroups => {
val watches = Watches.buildWatchesWithGroup(xgbExecutionParam,
processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing),
processMissingValuesWithGroup(labeledPointGroups, xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing),
getCacheDirName(xgbExecutionParam.useExternalMemory))
buildDistributedBooster(watches, xgbExecutionParam, rabitEnv, checkpointRound,
xgbExecutionParam.obj, xgbExecutionParam.eval, prevBooster)
@@ -470,7 +483,7 @@ object XGBoost extends Serializable {
val watches = Watches.buildWatchesWithGroup(
labeledPointGroupSets.map {
case (name, iter) => (name, processMissingValuesWithGroup(iter,
xgbExecutionParam.missing))
xgbExecutionParam.missing, xgbExecutionParam.allowNonZeroForMissing))
},
getCacheDirName(xgbExecutionParam.useExternalMemory))
buildDistributedBooster(watches, xgbExecutionParam, rabitEnv, checkpointRound,

View File

@@ -245,6 +245,11 @@ class XGBoostClassificationModel private[ml](
def setMissing(value: Float): this.type = set(missing, value)
def setAllowZeroForMissingValue(value: Boolean): this.type = set(
allowNonZeroForMissingValue,
value
)
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
/**
@@ -253,7 +258,11 @@ class XGBoostClassificationModel private[ml](
*/
override def predict(features: Vector): Double = {
import DataUtils._
val dm = new DMatrix(XGBoost.processMissingValues(Iterator(features.asXGB), $(missing)))
val dm = new DMatrix(XGBoost.processMissingValues(
Iterator(features.asXGB),
$(missing),
$(allowNonZeroForMissingValue)
))
val probability = _booster.predict(data = dm)(0).map(_.toDouble)
if (numClasses == 2) {
math.round(probability(0))
@@ -309,7 +318,11 @@ class XGBoostClassificationModel private[ml](
}
val dm = new DMatrix(
XGBoost.processMissingValues(features.map(_.asXGB), $(missing)),
XGBoost.processMissingValues(
features.map(_.asXGB),
$(missing),
$(allowNonZeroForMissingValue)
),
cacheInfo)
try {
val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) =

View File

@@ -241,6 +241,11 @@ class XGBoostRegressionModel private[ml] (
def setMissing(value: Float): this.type = set(missing, value)
def setAllowZeroForMissingValue(value: Boolean): this.type = set(
allowNonZeroForMissingValue,
value
)
def setInferBatchSize(value: Int): this.type = set(inferBatchSize, value)
/**
@@ -249,7 +254,11 @@ class XGBoostRegressionModel private[ml] (
*/
override def predict(features: Vector): Double = {
import DataUtils._
val dm = new DMatrix(XGBoost.processMissingValues(Iterator(features.asXGB), $(missing)))
val dm = new DMatrix(XGBoost.processMissingValues(
Iterator(features.asXGB),
$(missing),
$(allowNonZeroForMissingValue)
))
_booster.predict(data = dm)(0)(0)
}
@@ -287,7 +296,11 @@ class XGBoostRegressionModel private[ml] (
}
val dm = new DMatrix(
XGBoost.processMissingValues(features.map(_.asXGB), $(missing)),
XGBoost.processMissingValues(
features.map(_.asXGB),
$(missing),
$(allowNonZeroForMissingValue)
),
cacheInfo)
try {
val Array(rawPredictionItr, predLeafItr, predContribItr) =

View File

@@ -105,6 +105,21 @@ private[spark] trait GeneralParams extends Params {
final def getMissing: Float = $(missing)
/**
* Allows for having a non-zero value for missing when training on prediction
* on a Sparse or Empty vector.
*/
final val allowNonZeroForMissingValue = new BooleanParam(
this,
"allowNonZeroForMissingValue",
"Allow to have a non-zero value for missing when training or " +
"predicting on a Sparse or Empty vector. Should only be used if did " +
"not use Spark's VectorAssembler class to construct the feature vector " +
"but instead used a method that preserves zeros in your vector."
)
final def getAllowNonZeroForMissingValue: Boolean = $(allowNonZeroForMissingValue)
/**
* the maximum time to wait for the job requesting new workers. default: 30 minutes
*/
@@ -175,7 +190,8 @@ private[spark] trait GeneralParams extends Params {
useExternalMemory -> false, silent -> 0, verbosity -> 1,
customObj -> null, customEval -> null, missing -> Float.NaN,
trackerConf -> TrackerConf(), seed -> 0, timeoutRequestWorkers -> 30 * 60 * 1000L,
checkpointPath -> "", checkpointInterval -> -1)
checkpointPath -> "", checkpointInterval -> -1,
allowNonZeroForMissingValue -> false)
}
trait HasLeafPredictionCol extends Params {

View File

@@ -150,4 +150,32 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
new XGBoostClassifier(paramMap).fit(inputDF)
}
}
test("specify a non-zero missing value but set allow_non_zero_missing_value " +
"does not stop application") {
val spark = ss
import spark.implicits._
ss.sparkContext.setLogLevel("INFO")
// spark uses 1.5 * (nnz + 1.0) < size as the condition to decide whether using sparse or dense
// vector,
val testDF = Seq(
(7.0f, 0.0f, -1.0f, 1.0f, 1.0),
(1.0f, 0.0f, 1.0f, 1.0f, 1.0),
(0.0f, 1.0f, 0.0f, 1.0f, 0.0),
(1.0f, 0.0f, 1.0f, 1.0f, 1.0),
(1.0f, -1.0f, 0.0f, 1.0f, 0.0),
(0.0f, 0.0f, 0.0f, 1.0f, 1.0),
(-1.0f, 0.0f, 0.0f, 1.0f, 1.0)
).toDF("col1", "col2", "col3", "col4", "label")
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("col1", "col2", "col3", "col4"))
.setOutputCol("features")
val inputDF = vectorAssembler.transform(testDF).select("features", "label")
inputDF.show()
val paramMap = List("eta" -> "1", "max_depth" -> "2",
"objective" -> "binary:logistic", "missing" -> -1.0f,
"num_workers" -> 1, "allow_non_zero_for_missing_value" -> "true").toMap
val model = new XGBoostClassifier(paramMap).fit(inputDF)
model.transform(inputDF).collect()
}
}