[jvm-packages]add feature size for LabelPoint and DataBatch (#5303)
* fix type error * Validate number of features. * resolve comments * add feature size for LabelPoint and DataBatch * pass the feature size to native * move feature size validating tests into a separate suite * resolve comments Co-authored-by: fis <jm.yuan@outlook.com>
This commit is contained in:
@@ -38,15 +38,11 @@ object DataUtils extends Serializable {
|
||||
|
||||
/**
|
||||
* Returns feature of the point as [[org.apache.spark.ml.linalg.Vector]].
|
||||
*
|
||||
* If the point is sparse, the dimensionality of the resulting sparse
|
||||
* vector would be [[Int.MaxValue]]. This is the only safe value, since
|
||||
* XGBoost does not store the dimensionality explicitly.
|
||||
*/
|
||||
def features: Vector = if (labeledPoint.indices == null) {
|
||||
Vectors.dense(labeledPoint.values.map(_.toDouble))
|
||||
} else {
|
||||
Vectors.sparse(Int.MaxValue, labeledPoint.indices, labeledPoint.values.map(_.toDouble))
|
||||
Vectors.sparse(labeledPoint.size, labeledPoint.indices, labeledPoint.values.map(_.toDouble))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,9 +64,9 @@ object DataUtils extends Serializable {
|
||||
*/
|
||||
def asXGB: XGBLabeledPoint = v match {
|
||||
case v: DenseVector =>
|
||||
XGBLabeledPoint(0.0f, null, v.values.map(_.toFloat))
|
||||
XGBLabeledPoint(0.0f, v.size, null, v.values.map(_.toFloat))
|
||||
case v: SparseVector =>
|
||||
XGBLabeledPoint(0.0f, v.indices, v.values.map(_.toFloat))
|
||||
XGBLabeledPoint(0.0f, v.size, v.indices, v.values.map(_.toFloat))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,18 +158,18 @@ object DataUtils extends Serializable {
|
||||
df => df.select(selectedColumns: _*).rdd.map {
|
||||
case row @ Row(label: Float, features: Vector, weight: Float, group: Int,
|
||||
baseMargin: Float) =>
|
||||
val (indices, values) = features match {
|
||||
case v: SparseVector => (v.indices, v.values.map(_.toFloat))
|
||||
case v: DenseVector => (null, v.values.map(_.toFloat))
|
||||
val (size, indices, values) = features match {
|
||||
case v: SparseVector => (v.size, v.indices, v.values.map(_.toFloat))
|
||||
case v: DenseVector => (v.size, null, v.values.map(_.toFloat))
|
||||
}
|
||||
val xgbLp = XGBLabeledPoint(label, indices, values, weight, group, baseMargin)
|
||||
val xgbLp = XGBLabeledPoint(label, size, indices, values, weight, group, baseMargin)
|
||||
attachPartitionKey(row, deterministicPartition, numWorkers, xgbLp)
|
||||
case row @ Row(label: Float, features: Vector, weight: Float, baseMargin: Float) =>
|
||||
val (indices, values) = features match {
|
||||
case v: SparseVector => (v.indices, v.values.map(_.toFloat))
|
||||
case v: DenseVector => (null, v.values.map(_.toFloat))
|
||||
val (size, indices, values) = features match {
|
||||
case v: SparseVector => (v.size, v.indices, v.values.map(_.toFloat))
|
||||
case v: DenseVector => (v.size, null, v.values.map(_.toFloat))
|
||||
}
|
||||
val xgbLp = XGBLabeledPoint(label, indices, values, weight, baseMargin = baseMargin)
|
||||
val xgbLp = XGBLabeledPoint(label, size, indices, values, weight, baseMargin = baseMargin)
|
||||
attachPartitionKey(row, deterministicPartition, numWorkers, xgbLp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
Copyright (c) 2014 by Contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.java.XGBoostError
|
||||
import org.apache.spark.Partitioner
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
class FeatureSizeValidatingSuite extends FunSuite with PerTest {
|
||||
|
||||
test("transform throwing exception if feature size of dataset is different with model's") {
|
||||
val modelPath = getClass.getResource("/model/0.82/model").getPath
|
||||
val model = XGBoostClassificationModel.read.load(modelPath)
|
||||
val r = new Random(0)
|
||||
// 0.82/model was trained with 251 features. and transform will throw exception
|
||||
// if feature size of data is not equal to 251
|
||||
val df = ss.createDataFrame(Seq.fill(100)(r.nextInt(2)).map(i => (i, i))).
|
||||
toDF("feature", "label")
|
||||
val assembler = new VectorAssembler()
|
||||
.setInputCols(df.columns.filter(!_.contains("label")))
|
||||
.setOutputCol("features")
|
||||
val thrown = intercept[Exception] {
|
||||
model.transform(assembler.transform(df)).show()
|
||||
}
|
||||
assert(thrown.getMessage.contains(
|
||||
"Number of columns does not match number of features in booster"))
|
||||
}
|
||||
|
||||
test("train throwing exception if feature size of dataset is different on distributed train") {
|
||||
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
|
||||
"objective" -> "binary:logistic",
|
||||
"num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true, "missing" -> 0)
|
||||
import DataUtils._
|
||||
val sparkSession = SparkSession.builder().getOrCreate()
|
||||
import sparkSession.implicits._
|
||||
val repartitioned = sc.parallelize(Synthetic.trainWithDiffFeatureSize, 2)
|
||||
.map(lp => (lp.label, lp)).partitionBy(
|
||||
new Partitioner {
|
||||
override def numPartitions: Int = 2
|
||||
|
||||
override def getPartition(key: Any): Int = key.asInstanceOf[Float].toInt
|
||||
}
|
||||
).map(_._2).zipWithIndex().map {
|
||||
case (lp, id) =>
|
||||
(id, lp.label, lp.features)
|
||||
}.toDF("id", "label", "features")
|
||||
val xgb = new XGBoostClassifier(paramMap)
|
||||
intercept[XGBoostError] {
|
||||
xgb.fit(repartitioned)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -19,13 +19,12 @@ package ml.dmlc.xgboost4j.scala.spark
|
||||
import java.io.File
|
||||
import java.util.Arrays
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
import ml.dmlc.xgboost4j.scala.DMatrix
|
||||
import scala.util.Random
|
||||
|
||||
import scala.util.Random
|
||||
import org.apache.spark.ml.feature._
|
||||
import org.apache.spark.ml.{Pipeline, PipelineModel}
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
class PersistenceSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||
@@ -138,12 +137,21 @@ class PersistenceSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||
val modelPath = getClass.getResource("/model/0.82/model").getPath
|
||||
val model = XGBoostClassificationModel.read.load(modelPath)
|
||||
val r = new Random(0)
|
||||
val df = ss.createDataFrame(Seq.fill(100)(r.nextInt(2)).map(i => (i, i))).
|
||||
var df = ss.createDataFrame(Seq.fill(100)(r.nextInt(2)).map(i => (i, i))).
|
||||
toDF("feature", "label")
|
||||
// 0.82/model was trained with 251 features. and transform will throw exception
|
||||
// if feature size of data is not equal to 251
|
||||
for (x <- 1 to 250) {
|
||||
df = df.withColumn(s"feature_${x}", lit(1))
|
||||
}
|
||||
val assembler = new VectorAssembler()
|
||||
.setInputCols(df.columns.filter(!_.contains("label")))
|
||||
.setOutputCol("features")
|
||||
model.transform(assembler.transform(df)).show()
|
||||
df = assembler.transform(df)
|
||||
for (x <- 1 to 250) {
|
||||
df = df.drop(s"feature_${x}")
|
||||
}
|
||||
model.transform(df).show()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,11 +31,12 @@ trait TrainTestData {
|
||||
Source.fromInputStream(is).getLines()
|
||||
}
|
||||
|
||||
protected def getLabeledPoints(resource: String, zeroBased: Boolean): Seq[XGBLabeledPoint] = {
|
||||
protected def getLabeledPoints(resource: String, featureSize: Int, zeroBased: Boolean):
|
||||
Seq[XGBLabeledPoint] = {
|
||||
getResourceLines(resource).map { line =>
|
||||
val labelAndFeatures = line.split(" ")
|
||||
val label = labelAndFeatures.head.toFloat
|
||||
val values = new Array[Float](126)
|
||||
val values = new Array[Float](featureSize)
|
||||
for (feature <- labelAndFeatures.tail) {
|
||||
val idAndValue = feature.split(":")
|
||||
if (!zeroBased) {
|
||||
@@ -45,7 +46,7 @@ trait TrainTestData {
|
||||
}
|
||||
}
|
||||
|
||||
XGBLabeledPoint(label, null, values)
|
||||
XGBLabeledPoint(label, featureSize, null, values)
|
||||
}.toList
|
||||
}
|
||||
|
||||
@@ -56,14 +57,14 @@ trait TrainTestData {
|
||||
val label = original.head.toFloat
|
||||
val group = original.last.toInt
|
||||
val values = original.slice(1, length - 1).map(_.toFloat)
|
||||
XGBLabeledPoint(label, null, values, 1f, group, Float.NaN)
|
||||
XGBLabeledPoint(label, values.size, null, values, 1f, group, Float.NaN)
|
||||
}.toList
|
||||
}
|
||||
}
|
||||
|
||||
object Classification extends TrainTestData {
|
||||
val train: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.train", zeroBased = false)
|
||||
val test: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.test", zeroBased = false)
|
||||
val train: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.train", 126, zeroBased = false)
|
||||
val test: Seq[XGBLabeledPoint] = getLabeledPoints("/agaricus.txt.test", 126, zeroBased = false)
|
||||
}
|
||||
|
||||
object MultiClassification extends TrainTestData {
|
||||
@@ -80,19 +81,24 @@ object MultiClassification extends TrainTestData {
|
||||
values(i) = featuresAndLabel(i).toFloat
|
||||
}
|
||||
|
||||
XGBLabeledPoint(label, null, values.take(values.length - 1))
|
||||
XGBLabeledPoint(label, values.length - 1, null, values.take(values.length - 1))
|
||||
}.toList
|
||||
}
|
||||
}
|
||||
|
||||
object Regression extends TrainTestData {
|
||||
val train: Seq[XGBLabeledPoint] = getLabeledPoints("/machine.txt.train", zeroBased = true)
|
||||
val test: Seq[XGBLabeledPoint] = getLabeledPoints("/machine.txt.test", zeroBased = true)
|
||||
val MACHINE_COL_NUM = 36
|
||||
val train: Seq[XGBLabeledPoint] = getLabeledPoints(
|
||||
"/machine.txt.train", MACHINE_COL_NUM, zeroBased = true)
|
||||
val test: Seq[XGBLabeledPoint] = getLabeledPoints(
|
||||
"/machine.txt.test", MACHINE_COL_NUM, zeroBased = true)
|
||||
}
|
||||
|
||||
object Ranking extends TrainTestData {
|
||||
val RANK_COL_NUM = 3
|
||||
val train: Seq[XGBLabeledPoint] = getLabeledPointsWithGroup("/rank.train.csv")
|
||||
val test: Seq[XGBLabeledPoint] = getLabeledPoints("/rank.test.txt", zeroBased = false)
|
||||
val test: Seq[XGBLabeledPoint] = getLabeledPoints(
|
||||
"/rank.test.txt", RANK_COL_NUM, zeroBased = false)
|
||||
|
||||
private def getGroups(resource: String): Seq[Int] = {
|
||||
getResourceLines(resource).map(_.toInt).toList
|
||||
@@ -100,10 +106,17 @@ object Ranking extends TrainTestData {
|
||||
}
|
||||
|
||||
object Synthetic extends {
|
||||
val TRAIN_COL_NUM = 3
|
||||
val TRAIN_WRONG_COL_NUM = 2
|
||||
val train: Seq[XGBLabeledPoint] = Seq(
|
||||
XGBLabeledPoint(1.0f, Array(0, 1), Array(1.0f, 2.0f)),
|
||||
XGBLabeledPoint(0.0f, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)),
|
||||
XGBLabeledPoint(0.0f, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)),
|
||||
XGBLabeledPoint(1.0f, Array(0, 1), Array(1.0f, 2.0f))
|
||||
XGBLabeledPoint(1.0f, TRAIN_COL_NUM, Array(0, 1), Array(1.0f, 2.0f)),
|
||||
XGBLabeledPoint(0.0f, TRAIN_COL_NUM, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)),
|
||||
XGBLabeledPoint(0.0f, TRAIN_COL_NUM, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)),
|
||||
XGBLabeledPoint(1.0f, TRAIN_COL_NUM, Array(0, 1), Array(1.0f, 2.0f))
|
||||
)
|
||||
|
||||
val trainWithDiffFeatureSize: Seq[XGBLabeledPoint] = Seq(
|
||||
XGBLabeledPoint(1.0f, TRAIN_WRONG_COL_NUM, Array(0, 1), Array(1.0f, 2.0f)),
|
||||
XGBLabeledPoint(0.0f, TRAIN_COL_NUM, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f))
|
||||
)
|
||||
}
|
||||
|
||||
@@ -17,12 +17,9 @@
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost}
|
||||
|
||||
import org.apache.spark.ml.linalg._
|
||||
import org.apache.spark.ml.param.ParamMap
|
||||
import org.apache.spark.sql._
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.Partitioner
|
||||
|
||||
class XGBoostClassifierSuite extends FunSuite with PerTest {
|
||||
@@ -308,4 +305,5 @@ class XGBoostClassifierSuite extends FunSuite with PerTest {
|
||||
val xgb = new XGBoostClassifier(paramMap)
|
||||
xgb.fit(repartitioned)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,19 +16,13 @@
|
||||
|
||||
package ml.dmlc.xgboost4j.scala.spark
|
||||
|
||||
import java.nio.file.Files
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
|
||||
import ml.dmlc.xgboost4j.scala.DMatrix
|
||||
import ml.dmlc.xgboost4j.scala.{XGBoost => SXGBoost, _}
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.{TaskContext}
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.ml.feature.VectorAssembler
|
||||
import org.apache.spark.sql.functions.lit
|
||||
|
||||
class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||
|
||||
@@ -350,12 +344,21 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||
val modelPath = getClass.getResource("/model/0.82/model").getPath
|
||||
val model = XGBoostClassificationModel.read.load(modelPath)
|
||||
val r = new Random(0)
|
||||
val df = ss.createDataFrame(Seq.fill(100000)(1).map(i => (i, i))).
|
||||
var df = ss.createDataFrame(Seq.fill(100000)(1).map(i => (i, i))).
|
||||
toDF("feature", "label").repartition(5)
|
||||
// 0.82/model was trained with 251 features. and transform will throw exception
|
||||
// if feature size of data is not equal to 251
|
||||
for (x <- 1 to 250) {
|
||||
df = df.withColumn(s"feature_${x}", lit(1))
|
||||
}
|
||||
val assembler = new VectorAssembler()
|
||||
.setInputCols(df.columns.filter(!_.contains("label")))
|
||||
.setOutputCol("features")
|
||||
val df1 = model.transform(assembler.transform(df)).withColumnRenamed(
|
||||
df = assembler.transform(df)
|
||||
for (x <- 1 to 250) {
|
||||
df = df.drop(s"feature_${x}")
|
||||
}
|
||||
val df1 = model.transform(df).withColumnRenamed(
|
||||
"prediction", "prediction1").withColumnRenamed(
|
||||
"rawPrediction", "rawPrediction1").withColumnRenamed(
|
||||
"probability", "probability1")
|
||||
@@ -363,4 +366,5 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
|
||||
df1.collect()
|
||||
df2.collect()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -69,8 +69,7 @@ class XGBoostRabitRegressionSuite extends FunSuite with PerTest {
|
||||
|
||||
test("test regression prediction parity w/o ring reduce") {
|
||||
val training = buildDataFrame(Regression.train)
|
||||
val testDM = new DMatrix(Regression.test.iterator, null)
|
||||
val testDF = buildDataFrame(Classification.test)
|
||||
val testDF = buildDataFrame(Regression.test)
|
||||
val xgbSettings = Map("eta" -> "1", "max_depth" -> "2", "verbosity" -> "1",
|
||||
"objective" -> "reg:squarederror", "num_round" -> 5, "num_workers" -> numWorkers)
|
||||
val model1 = new XGBoostRegressor(xgbSettings).fit(training)
|
||||
|
||||
Reference in New Issue
Block a user