Fix deterministic partitioning with dataset containing Double.NaN (#5996)

The functions featureValueOfSparseVector or featureValueOfDenseVector could return a Float.NaN if the input vectore was containing any missing values. This would make fail the partition key computation and most of the vectors would end up in the same partition. We fix this by avoid returning a NaN and simply use the row HashCode in this case.
We added a test to ensure that the repartition is indeed now uniform on input dataset containing values by checking that the partitions size variance is below a certain threshold.

Signed-off-by: Anthony D'Amato <anthony.damato@hotmail.fr>
This commit is contained in:
Anthony D'Amato 2020-08-19 03:55:37 +02:00 committed by GitHub
parent e51cba6195
commit f58e41bad8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 1 deletions

View File

@ -103,7 +103,8 @@ object DataUtils extends Serializable {
case sparseVector: SparseVector => case sparseVector: SparseVector =>
featureValueOfSparseVector(rowHashCode, sparseVector) featureValueOfSparseVector(rowHashCode, sparseVector)
} }
math.abs((rowHashCode.toLong + featureValue).toString.hashCode % numPartitions) val nonNaNFeatureValue = if (featureValue.isNaN) { 0.0f } else { featureValue }
math.abs((rowHashCode.toLong + nonNaNFeatureValue).toString.hashCode % numPartitions)
} }
private def attachPartitionKey( private def attachPartitionKey(

View File

@ -16,6 +16,7 @@
package ml.dmlc.xgboost4j.scala.spark package ml.dmlc.xgboost4j.scala.spark
import org.apache.spark.ml.linalg.Vectors
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
@ -79,4 +80,34 @@ class DeterministicPartitioningSuite extends FunSuite with TmpFolderPerSuite wit
map2 map2
} }
} }
test("deterministic partitioning has a uniform repartition on dataset with missing values") {
val N = 10000
val dataset = (0 until N).map{ n =>
(n, n % 2, Vectors.sparse(3, Array(0, 1, 2), Array(Double.NaN, n, Double.NaN)))
}
val df = ss.createDataFrame(sc.parallelize(dataset)).toDF("id", "label", "features")
val dfRepartitioned = DataUtils.convertDataFrameToXGBLabeledPointRDDs(
col("label"),
col("features"),
lit(1.0),
lit(Float.NaN),
None,
10,
deterministicPartition = true,
df
).head
val partitionsSizes = dfRepartitioned
.mapPartitions(iter => Array(iter.size.toDouble).iterator, true)
.collect()
val partitionMean = partitionsSizes.sum / partitionsSizes.length
val squaredDiffSum = partitionsSizes
.map(partitionSize => Math.pow(partitionSize - partitionMean, 2))
val standardDeviation = math.sqrt(squaredDiffSum.sum / squaredDiffSum.length)
assert(standardDeviation < math.sqrt(N.toDouble))
}
} }