diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index 65f0ef30f..dd25e79d2 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -18,9 +18,7 @@ package ml.dmlc.xgboost4j.scala.spark import java.io.File import java.nio.file.Files -import java.util.Properties -import scala.collection.mutable.ListBuffer import scala.collection.{AbstractIterator, mutable} import scala.util.Random @@ -32,8 +30,8 @@ import org.apache.commons.io.FileUtils import org.apache.commons.logging.LogFactory import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkContext, SparkException, SparkParallelismTracker, TaskContext} -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.{SparkContext, SparkParallelismTracker, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel @@ -75,8 +73,9 @@ object XGBoost extends Serializable { if (missing != 0.0f) { xgbLabelPoints.map(labeledPoint => { if (labeledPoint.indices != null) { - throw new RuntimeException("you can only specify missing value as 0.0 when you have" + - " SparseVector as your feature format") + throw new RuntimeException(s"you can only specify missing value as 0.0 (the currently" + + s" set value $missing) when you have SparseVector or Empty vector as your feature" + + " format") } labeledPoint }) @@ -107,7 +106,8 @@ object XGBoost extends Serializable { removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing), missing, (v: Float) => v != missing) } else { - removeMissingValues(xgbLabelPoints, missing, (v: Float) => !v.isNaN) + removeMissingValues(verifyMissingSetting(xgbLabelPoints, missing), + missing, (v: Float) => !v.isNaN) } } diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/MissingValueHandlingSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/MissingValueHandlingSuite.scala new file mode 100644 index 000000000..59b0fb1cd --- /dev/null +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/MissingValueHandlingSuite.scala @@ -0,0 +1,148 @@ +/* + Copyright (c) 2014 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j.scala.spark + +import scala.util.Random + +import ml.dmlc.xgboost4j.java.XGBoostError +import org.scalatest.FunSuite + +import org.apache.spark.ml.feature.VectorAssembler +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.sql.DataFrame + +class MissingValueHandlingSuite extends FunSuite with PerTest { + test("dense vectors containing missing value") { + def buildDenseDataFrame(): DataFrame = { + val numRows = 100 + val numCols = 5 + val data = (0 until numRows).map { x => + val label = Random.nextInt(2) + val values = Array.tabulate[Double](numCols) { c => + if (c == numCols - 1) 0 else Random.nextDouble + } + (label, Vectors.dense(values)) + } + ss.createDataFrame(sc.parallelize(data.toList)).toDF("label", "features") + } + val denseDF = buildDenseDataFrame().repartition(4) + val paramMap = List("eta" -> "1", "max_depth" -> "2", + "objective" -> "binary:logistic", "missing" -> 0, "num_workers" -> numWorkers).toMap + val model = new XGBoostClassifier(paramMap).fit(denseDF) + model.transform(denseDF).collect() + } + + test("handle Float.NaN as missing value correctly") { + val spark = ss + import spark.implicits._ + val testDF = Seq( + (1.0f, 0.0f, Float.NaN, 1.0), + (1.0f, 0.0f, 1.0f, 1.0), + (0.0f, 1.0f, 0.0f, 0.0), + (1.0f, 0.0f, 1.0f, 1.0), + (1.0f, Float.NaN, 0.0f, 0.0), + (0.0f, 1.0f, 0.0f, 1.0), + (Float.NaN, 0.0f, 0.0f, 1.0) + ).toDF("col1", "col2", "col3", "label") + val vectorAssembler = new VectorAssembler() + .setInputCols(Array("col1", "col2", "col3")) + .setOutputCol("features") + .setHandleInvalid("keep") + val inputDF = vectorAssembler.transform(testDF).select("features", "label") + val paramMap = List("eta" -> "1", "max_depth" -> "2", + "objective" -> "binary:logistic", "missing" -> Float.NaN, "num_workers" -> 1).toMap + val model = new XGBoostClassifier(paramMap).fit(inputDF) + model.transform(inputDF).collect() + } + + test("specify a non-zero missing value but with dense vector does not stop" + + " application") { + val spark = ss + import spark.implicits._ + // spark uses 1.5 * (nnz + 1.0) < size as the condition to decide whether using sparse or dense + // vector, + val testDF = Seq( + (1.0f, 0.0f, -1.0f, 1.0), + (1.0f, 0.0f, 1.0f, 1.0), + (0.0f, 1.0f, 0.0f, 0.0), + (1.0f, 0.0f, 1.0f, 1.0), + (1.0f, -1.0f, 0.0f, 0.0), + (0.0f, 1.0f, 0.0f, 1.0), + (-1.0f, 0.0f, 0.0f, 1.0) + ).toDF("col1", "col2", "col3", "label") + val vectorAssembler = new VectorAssembler() + .setInputCols(Array("col1", "col2", "col3")) + .setOutputCol("features") + val inputDF = vectorAssembler.transform(testDF).select("features", "label") + val paramMap = List("eta" -> "1", "max_depth" -> "2", + "objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap + val model = new XGBoostClassifier(paramMap).fit(inputDF) + model.transform(inputDF).collect() + } + + test("specify a non-zero missing value and meet an empty vector we should" + + " stop the application") { + val spark = ss + import spark.implicits._ + val testDF = Seq( + (1.0f, 0.0f, -1.0f, 1.0), + (1.0f, 0.0f, 1.0f, 1.0), + (0.0f, 1.0f, 0.0f, 0.0), + (1.0f, 0.0f, 1.0f, 1.0), + (1.0f, -1.0f, 0.0f, 0.0), + (0.0f, 0.0f, 0.0f, 1.0),// empty vector + (-1.0f, 0.0f, 0.0f, 1.0) + ).toDF("col1", "col2", "col3", "label") + val vectorAssembler = new VectorAssembler() + .setInputCols(Array("col1", "col2", "col3")) + .setOutputCol("features") + val inputDF = vectorAssembler.transform(testDF).select("features", "label") + val paramMap = List("eta" -> "1", "max_depth" -> "2", + "objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap + intercept[XGBoostError] { + new XGBoostClassifier(paramMap).fit(inputDF) + } + } + + test("specify a non-zero missing value and meet a Sparse vector we should" + + " stop the application") { + val spark = ss + import spark.implicits._ + ss.sparkContext.setLogLevel("INFO") + // spark uses 1.5 * (nnz + 1.0) < size as the condition to decide whether using sparse or dense + // vector, + val testDF = Seq( + (1.0f, 0.0f, -1.0f, 1.0f, 1.0), + (1.0f, 0.0f, 1.0f, 1.0f, 1.0), + (0.0f, 1.0f, 0.0f, 1.0f, 0.0), + (1.0f, 0.0f, 1.0f, 1.0f, 1.0), + (1.0f, -1.0f, 0.0f, 1.0f, 0.0), + (0.0f, 0.0f, 0.0f, 1.0f, 1.0), + (-1.0f, 0.0f, 0.0f, 1.0f, 1.0) + ).toDF("col1", "col2", "col3", "col4", "label") + val vectorAssembler = new VectorAssembler() + .setInputCols(Array("col1", "col2", "col3", "col4")) + .setOutputCol("features") + val inputDF = vectorAssembler.transform(testDF).select("features", "label") + inputDF.show() + val paramMap = List("eta" -> "1", "max_depth" -> "2", + "objective" -> "binary:logistic", "missing" -> -1.0f, "num_workers" -> 1).toMap + intercept[XGBoostError] { + new XGBoostClassifier(paramMap).fit(inputDF) + } + } +} diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala index 2e94442c4..b21348daf 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala @@ -287,7 +287,7 @@ class XGBoostClassifierSuite extends FunSuite with PerTest { test("infrequent features") { val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic", - "num_round" -> 5, "num_workers" -> 2) + "num_round" -> 5, "num_workers" -> 2, "missing" -> 0) import DataUtils._ val sparkSession = SparkSession.builder().getOrCreate() import sparkSession.implicits._ @@ -308,7 +308,7 @@ class XGBoostClassifierSuite extends FunSuite with PerTest { test("infrequent features (use_external_memory)") { val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic", - "num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true) + "num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true, "missing" -> 0) import DataUtils._ val sparkSession = SparkSession.builder().getOrCreate() import sparkSession.implicits._ diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index 70d2634c4..64eca4b67 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -225,50 +225,6 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { assert(x < 0.1) } - test("dense vectors containing missing value") { - def buildDenseDataFrame(): DataFrame = { - val numRows = 100 - val numCols = 5 - val data = (0 until numRows).map { x => - val label = Random.nextInt(2) - val values = Array.tabulate[Double](numCols) { c => - if (c == numCols - 1) 0 else Random.nextDouble - } - (label, Vectors.dense(values)) - } - ss.createDataFrame(sc.parallelize(data.toList)).toDF("label", "features") - } - val denseDF = buildDenseDataFrame().repartition(4) - val paramMap = List("eta" -> "1", "max_depth" -> "2", - "objective" -> "binary:logistic", "missing" -> 0, "num_workers" -> numWorkers).toMap - val model = new XGBoostClassifier(paramMap).fit(denseDF) - model.transform(denseDF).collect() - } - - test("handle Float.NaN as missing value correctly") { - val spark = ss - import spark.implicits._ - val testDF = Seq( - (1.0f, 0.0f, Float.NaN, 1.0), - (1.0f, 0.0f, 1.0f, 1.0), - (0.0f, 1.0f, 0.0f, 0.0), - (1.0f, 0.0f, 1.0f, 1.0), - (1.0f, Float.NaN, 0.0f, 0.0), - (0.0f, 0.0f, 0.0f, 0.0), - (0.0f, 1.0f, 0.0f, 1.0), - (Float.NaN, 0.0f, 0.0f, 1.0) - ).toDF("col1", "col2", "col3", "label") - val vectorAssembler = new VectorAssembler() - .setInputCols(Array("col1", "col2", "col3")) - .setOutputCol("features") - .setHandleInvalid("keep") - val inputDF = vectorAssembler.transform(testDF).select("features", "label") - val paramMap = List("eta" -> "1", "max_depth" -> "2", - "objective" -> "binary:logistic", "num_workers" -> 1).toMap - val model = new XGBoostClassifier(paramMap).fit(inputDF) - model.transform(inputDF).collect() - } - test("training with spark parallelism checks disabled") { val eval = new EvalError() val training = buildDataFrame(Classification.train) diff --git a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java index 61aeabd98..61db35764 100644 --- a/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java +++ b/jvm-packages/xgboost4j/src/main/java/ml/dmlc/xgboost4j/java/DataBatch.java @@ -4,6 +4,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import ml.dmlc.xgboost4j.LabeledPoint; /** @@ -13,6 +16,7 @@ import ml.dmlc.xgboost4j.LabeledPoint; * This class is used to support advanced creation of DMatrix from Iterator of DataBatch, */ class DataBatch { + private static final Log logger = LogFactory.getLog(DataBatch.class); /** The offset of each rows in the sparse matrix */ final long[] rowOffset; /** weight of each data point, can be null */ @@ -49,44 +53,49 @@ class DataBatch { @Override public DataBatch next() { - int numRows = 0; - int numElem = 0; - List batch = new ArrayList<>(batchSize); - while (base.hasNext() && batch.size() < batchSize) { - LabeledPoint labeledPoint = base.next(); - batch.add(labeledPoint); - numElem += labeledPoint.values().length; - numRows++; - } - - long[] rowOffset = new long[numRows + 1]; - float[] label = new float[numRows]; - int[] featureIndex = new int[numElem]; - float[] featureValue = new float[numElem]; - float[] weight = new float[numRows]; - - int offset = 0; - for (int i = 0; i < batch.size(); i++) { - LabeledPoint labeledPoint = batch.get(i); - rowOffset[i] = offset; - label[i] = labeledPoint.label(); - weight[i] = labeledPoint.weight(); - if (labeledPoint.indices() != null) { - System.arraycopy(labeledPoint.indices(), 0, featureIndex, offset, - labeledPoint.indices().length); - } else { - for (int j = 0; j < labeledPoint.values().length; j++) { - featureIndex[offset + j] = j; - } + try { + int numRows = 0; + int numElem = 0; + List batch = new ArrayList<>(batchSize); + while (base.hasNext() && batch.size() < batchSize) { + LabeledPoint labeledPoint = base.next(); + batch.add(labeledPoint); + numElem += labeledPoint.values().length; + numRows++; } - System.arraycopy(labeledPoint.values(), 0, featureValue, offset, - labeledPoint.values().length); - offset += labeledPoint.values().length; - } + long[] rowOffset = new long[numRows + 1]; + float[] label = new float[numRows]; + int[] featureIndex = new int[numElem]; + float[] featureValue = new float[numElem]; + float[] weight = new float[numRows]; - rowOffset[batch.size()] = offset; - return new DataBatch(rowOffset, weight, label, featureIndex, featureValue); + int offset = 0; + for (int i = 0; i < batch.size(); i++) { + LabeledPoint labeledPoint = batch.get(i); + rowOffset[i] = offset; + label[i] = labeledPoint.label(); + weight[i] = labeledPoint.weight(); + if (labeledPoint.indices() != null) { + System.arraycopy(labeledPoint.indices(), 0, featureIndex, offset, + labeledPoint.indices().length); + } else { + for (int j = 0; j < labeledPoint.values().length; j++) { + featureIndex[offset + j] = j; + } + } + + System.arraycopy(labeledPoint.values(), 0, featureValue, offset, + labeledPoint.values().length); + offset += labeledPoint.values().length; + } + + rowOffset[batch.size()] = offset; + return new DataBatch(rowOffset, weight, label, featureIndex, featureValue); + } catch (RuntimeException runtimeError) { + logger.error(runtimeError); + return null; + } } @Override