From f58e41bad8905d8531038ce70f7c752c4b1c1557 Mon Sep 17 00:00:00 2001 From: Anthony D'Amato Date: Wed, 19 Aug 2020 03:55:37 +0200 Subject: [PATCH] Fix deterministic partitioning with dataset containing Double.NaN (#5996) The functions featureValueOfSparseVector or featureValueOfDenseVector could return a Float.NaN if the input vectore was containing any missing values. This would make fail the partition key computation and most of the vectors would end up in the same partition. We fix this by avoid returning a NaN and simply use the row HashCode in this case. We added a test to ensure that the repartition is indeed now uniform on input dataset containing values by checking that the partitions size variance is below a certain threshold. Signed-off-by: Anthony D'Amato --- .../xgboost4j/scala/spark/DataUtils.scala | 3 +- .../DeterministicPartitioningSuite.scala | 31 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala index df787d8eb..15ffe4c06 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/DataUtils.scala @@ -103,7 +103,8 @@ object DataUtils extends Serializable { case sparseVector: SparseVector => featureValueOfSparseVector(rowHashCode, sparseVector) } - math.abs((rowHashCode.toLong + featureValue).toString.hashCode % numPartitions) + val nonNaNFeatureValue = if (featureValue.isNaN) { 0.0f } else { featureValue } + math.abs((rowHashCode.toLong + nonNaNFeatureValue).toString.hashCode % numPartitions) } private def attachPartitionKey( diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala index 986b0843b..ff0492f41 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/DeterministicPartitioningSuite.scala @@ -16,6 +16,7 @@ package ml.dmlc.xgboost4j.scala.spark +import org.apache.spark.ml.linalg.Vectors import org.scalatest.FunSuite import org.apache.spark.sql.functions._ @@ -79,4 +80,34 @@ class DeterministicPartitioningSuite extends FunSuite with TmpFolderPerSuite wit map2 } } + + test("deterministic partitioning has a uniform repartition on dataset with missing values") { + val N = 10000 + val dataset = (0 until N).map{ n => + (n, n % 2, Vectors.sparse(3, Array(0, 1, 2), Array(Double.NaN, n, Double.NaN))) + } + + val df = ss.createDataFrame(sc.parallelize(dataset)).toDF("id", "label", "features") + + val dfRepartitioned = DataUtils.convertDataFrameToXGBLabeledPointRDDs( + col("label"), + col("features"), + lit(1.0), + lit(Float.NaN), + None, + 10, + deterministicPartition = true, + df + ).head + + val partitionsSizes = dfRepartitioned + .mapPartitions(iter => Array(iter.size.toDouble).iterator, true) + .collect() + val partitionMean = partitionsSizes.sum / partitionsSizes.length + val squaredDiffSum = partitionsSizes + .map(partitionSize => Math.pow(partitionSize - partitionMean, 2)) + val standardDeviation = math.sqrt(squaredDiffSum.sum / squaredDiffSum.length) + + assert(standardDeviation < math.sqrt(N.toDouble)) + } }