[jvm-packages] create dmatrix with specified missing value (#1272)

* create dmatrix with specified missing value

* update dmlc-core

* support for predict method in spark package

repartitioning

work around

* add more elements to work around training set empty partition issue
This commit is contained in:
Nan Zhu
2016-06-21 17:35:17 -04:00
committed by GitHub
parent c9a73fe2a9
commit bd5b07873e
6 changed files with 143 additions and 2 deletions

View File

@@ -18,7 +18,7 @@ package ml.dmlc.xgboost4j.scala.spark
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{TaskContext, SparkContext}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{DenseVector, Vector}
import org.apache.spark.rdd.RDD
import ml.dmlc.xgboost4j.java.{DMatrix => JDMatrix}
import ml.dmlc.xgboost4j.scala.{DMatrix, Booster}
@@ -27,6 +27,7 @@ class XGBoostModel(_booster: Booster)(implicit val sc: SparkContext) extends Ser
/**
* Predict result with the given testset (represented as RDD)
*
* @param testSet test set representd as RDD
* @param useExternalCache whether to use external cache for the test set
*/
@@ -51,6 +52,31 @@ class XGBoostModel(_booster: Booster)(implicit val sc: SparkContext) extends Ser
}
}
/**
* Predict result with the given testset (represented as RDD)
* @param testSet test set representd as RDD
* @param missingValue the specified value to represent the missing value
*/
def predict(testSet: RDD[DenseVector], missingValue: Float): RDD[Array[Array[Float]]] = {
val broadcastBooster = testSet.sparkContext.broadcast(_booster)
testSet.mapPartitions { testSamples =>
val sampleArray = testSamples.toList
val numRows = sampleArray.size
val numColumns = sampleArray.head.size
if (numRows == 0) {
Iterator()
} else {
// translate to required format
val flatSampleArray = new Array[Float](numRows * numColumns)
for (i <- flatSampleArray.indices) {
flatSampleArray(i) = sampleArray(i / numColumns).values(i % numColumns).toFloat
}
val dMatrix = new DMatrix(flatSampleArray, numRows, numColumns, missingValue)
Iterator(broadcastBooster.value.predict(dMatrix))
}
}
}
/**
* predict result given the test data (represented as DMatrix)
*/

View File

@@ -21,6 +21,7 @@ import java.nio.file.Files
import scala.collection.mutable.ListBuffer
import scala.io.Source
import scala.util.Random
import org.apache.commons.logging.LogFactory
import org.apache.spark.mllib.linalg.{Vector => SparkVector, Vectors, DenseVector}
@@ -208,7 +209,41 @@ class XGBoostSuite extends FunSuite with BeforeAndAfter {
"objective" -> "binary:logistic").toMap
val xgBoostModel = XGBoost.train(trainingRDD, paramMap, 5, numWorkers)
println(xgBoostModel.predict(testRDD))
println(xgBoostModel.predict(testRDD).collect())
}
test("test with dense vectors containing missing value") {
def buildDenseRDD(): RDD[LabeledPoint] = {
val nrow = 100
val ncol = 5
val data0 = Array.ofDim[Double](nrow, ncol)
// put random nums
for (r <- 0 until nrow; c <- 0 until ncol) {
data0(r)(c) = {
if (c == ncol - 1) {
-0.1
} else {
Random.nextDouble()
}
}
}
// create label
val label0 = new Array[Double](nrow)
for (i <- label0.indices) {
label0(i) = Random.nextDouble()
}
val points = new ListBuffer[LabeledPoint]
for (r <- 0 until nrow) {
points += LabeledPoint(label0(r), Vectors.dense(data0(r)))
}
sc.parallelize(points)
}
val trainingRDD = buildDenseRDD().repartition(4)
val testRDD = buildDenseRDD().repartition(4)
val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0",
"objective" -> "binary:logistic").toMap
val xgBoostModel = XGBoost.train(trainingRDD, paramMap, 5, 4)
xgBoostModel.predict(testRDD.map(_.features.toDense), missingValue = -0.1f).collect()
}
test("training with external memory cache") {