[jvm-packages] Deduplicated train/test data access in tests (#2507)

* [jvm-packages] Deduplicated train/test data access in tests

All datasets are now available via a unified API, e.g. Agaricus.test.
The only exception is the dermatology data which requires parsing a
CSV file.

* Inlined Utils.buildTrainingRDD

The default number of partitions for local mode is equal to the number
of available CPUs.

* Replaced dataset names with problem types
This commit is contained in:
Sergei Lebedev 2017-07-12 18:13:55 +02:00 committed by Nan Zhu
parent 530f01e21c
commit 66874f5777
5 changed files with 124 additions and 149 deletions

View File

@ -0,0 +1,74 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ml.dmlc.xgboost4j.scala.spark
import scala.io.Source
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.{Vectors => MLVectors}
trait TrainTestData {
protected def getResourceLines(resource: String): Iterator[String] = {
require(resource.startsWith("/"), "resource must start with /")
val is = getClass.getResourceAsStream(resource)
if (is == null) {
sys.error(s"failed to resolve resource $resource")
}
Source.fromInputStream(is).getLines()
}
protected def getLabeledPoints(resource: String, zeroBased: Boolean): Seq[MLLabeledPoint] = {
getResourceLines(resource).map { line =>
val labelAndFeatures = line.split(" ")
val label = labelAndFeatures.head.toDouble
val values = new Array[Double](126)
for (feature <- labelAndFeatures.tail) {
val idAndValue = feature.split(":")
if (!zeroBased) {
values(idAndValue(0).toInt - 1) = idAndValue(1).toDouble
} else {
values(idAndValue(0).toInt) = idAndValue(1).toDouble
}
}
MLLabeledPoint(label, MLVectors.dense(values))
}.toList
}
}
object Classification extends TrainTestData {
val train: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.train", zeroBased = false)
val test: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.test", zeroBased = false)
}
object Regression extends TrainTestData {
val train: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.train", zeroBased = true)
val test: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.test", zeroBased = true)
}
object Ranking extends TrainTestData {
val train0: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-0.txt.train", zeroBased = false)
val train1: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-1.txt.train", zeroBased = false)
val trainGroup0: Seq[Int] = getGroups("/rank-demo-0.txt.train.group")
val trainGroup1: Seq[Int] = getGroups("/rank-demo-1.txt.train.group")
val test: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo.txt.test", zeroBased = false)
private def getGroups(resource: String): Seq[Int] = {
getResourceLines(resource).map(_.toInt).toList
}
}

View File

@ -18,18 +18,8 @@ package ml.dmlc.xgboost4j.scala.spark
import java.io.File import java.io.File
import scala.collection.mutable.ListBuffer
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.{DenseVector, Vector => SparkVector}
import org.apache.spark.rdd.RDD
trait Utils extends Serializable { trait Utils extends Serializable {
protected val numWorkers = Runtime.getRuntime().availableProcessors() protected val numWorkers: Int = Runtime.getRuntime.availableProcessors()
protected var labeledPointsRDD: RDD[LabeledPoint] = null
protected def cleanExternalCache(prefix: String): Unit = { protected def cleanExternalCache(prefix: String): Unit = {
val dir = new File(".") val dir = new File(".")
@ -37,54 +27,4 @@ trait Utils extends Serializable {
file.delete() file.delete()
} }
} }
protected def loadLabelPoints(filePath: String, zeroBased: Boolean = false):
List[LabeledPoint] = {
val file = Source.fromFile(new File(filePath))
val sampleList = new ListBuffer[LabeledPoint]
for (sample <- file.getLines()) {
sampleList += fromColValueStringToLabeledPoint(sample, zeroBased)
}
sampleList.toList
}
protected def loadLabelAndVector(filePath: String, zeroBased: Boolean = false):
List[(Double, SparkVector)] = {
val file = Source.fromFile(new File(filePath))
val sampleList = new ListBuffer[(Double, SparkVector)]
for (sample <- file.getLines()) {
sampleList += fromColValueStringToLabelAndVector(sample, zeroBased)
}
sampleList.toList
}
protected def fromColValueStringToLabelAndVector(line: String, zeroBased: Boolean):
(Double, SparkVector) = {
val labelAndFeatures = line.split(" ")
val label = labelAndFeatures(0).toDouble
val features = labelAndFeatures.tail
val denseFeature = new Array[Double](126)
for (feature <- features) {
val idAndValue = feature.split(":")
if (!zeroBased) {
denseFeature(idAndValue(0).toInt - 1) = idAndValue(1).toDouble
} else {
denseFeature(idAndValue(0).toInt) = idAndValue(1).toDouble
}
}
(label, new DenseVector(denseFeature))
}
protected def fromColValueStringToLabeledPoint(line: String, zeroBased: Boolean): LabeledPoint = {
val (label, sv) = fromColValueStringToLabelAndVector(line, zeroBased)
LabeledPoint(label, sv)
}
protected def buildTrainingRDD(sparkContext: SparkContext): RDD[LabeledPoint] = {
if (labeledPointsRDD == null) {
val sampleList = loadLabelPoints(getClass.getResource("/agaricus.txt.train").getFile)
labeledPointsRDD = sparkContext.parallelize(sampleList, numWorkers)
}
labeledPointsRDD
}
} }

View File

@ -29,7 +29,7 @@ class XGBoostConfigureSuite extends FunSuite with Utils {
val customSparkContext = new SparkContext(sparkConf) val customSparkContext = new SparkContext(sparkConf)
customSparkContext.setLogLevel("ERROR") customSparkContext.setLogLevel("ERROR")
// start another app // start another app
val trainingRDD = buildTrainingRDD(customSparkContext) val trainingRDD = customSparkContext.parallelize(Classification.train)
val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic", "nthread" -> 6) "objective" -> "binary:logistic", "nthread" -> 6)
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
@ -39,17 +39,15 @@ class XGBoostConfigureSuite extends FunSuite with Utils {
} }
test("kryoSerializer test") { test("kryoSerializer test") {
labeledPointsRDD = null
val eval = new EvalError() val eval = new EvalError()
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("XGBoostSuite") val sparkConf = new SparkConf().setMaster("local[*]").setAppName("XGBoostSuite")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Booster])) sparkConf.registerKryoClasses(Array(classOf[Booster]))
val customSparkContext = new SparkContext(sparkConf) val customSparkContext = new SparkContext(sparkConf)
customSparkContext.setLogLevel("ERROR") customSparkContext.setLogLevel("ERROR")
val trainingRDD = buildTrainingRDD(customSparkContext) val trainingRDD = customSparkContext.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic") "objective" -> "binary:logistic")
val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers) val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers)

View File

@ -45,8 +45,8 @@ class XGBoostDFSuite extends SharedSparkContext with Utils {
private def buildTrainingDataframe(sparkContext: Option[SparkContext] = None): DataFrame = { private def buildTrainingDataframe(sparkContext: Option[SparkContext] = None): DataFrame = {
if (trainingDF == null) { if (trainingDF == null) {
val rowList = loadLabelPoints(getClass.getResource("/agaricus.txt.train").getFile) val labeledPointsRDD = sparkContext.getOrElse(sc)
val labeledPointsRDD = sparkContext.getOrElse(sc).parallelize(rowList, numWorkers) .parallelize(Classification.train, numWorkers)
val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate() val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate()
import sparkSession.implicits._ import sparkSession.implicits._
trainingDF = sparkSession.createDataset(labeledPointsRDD).toDF trainingDF = sparkSession.createDataset(labeledPointsRDD).toDF
@ -57,10 +57,8 @@ class XGBoostDFSuite extends SharedSparkContext with Utils {
test("test consistency and order preservation of dataframe-based model") { test("test consistency and order preservation of dataframe-based model") {
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic") "objective" -> "binary:logistic")
val trainingItr = loadLabelPoints(getClass.getResource("/agaricus.txt.train").getFile). val trainingItr = Classification.train.iterator
iterator val (testItr, auxTestItr) = Classification.test.iterator.duplicate
val (testItr, auxTestItr) =
loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator.duplicate
import DataUtils._ import DataUtils._
val round = 5 val round = 5
val trainDMatrix = new DMatrix(new JDMatrix(trainingItr, null)) val trainDMatrix = new DMatrix(new JDMatrix(trainingItr, null))
@ -91,7 +89,7 @@ class XGBoostDFSuite extends SharedSparkContext with Utils {
test("test transformLeaf") { test("test transformLeaf") {
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic") "objective" -> "binary:logistic")
val testItr = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator val testItr = Classification.test.iterator
val trainingDF = buildTrainingDataframe() val trainingDF = buildTrainingDataframe()
val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap, val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap,
round = 5, nWorkers = numWorkers) round = 5, nWorkers = numWorkers)
@ -107,15 +105,10 @@ class XGBoostDFSuite extends SharedSparkContext with Utils {
test("test schema of XGBoostRegressionModel") { test("test schema of XGBoostRegressionModel") {
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "reg:linear") "objective" -> "reg:linear")
val testItr = loadLabelPoints(getClass.getResource("/machine.txt.test").getFile, val testItr = Regression.test.iterator.zipWithIndex
zeroBased = true).iterator. .map { case (instance: LabeledPoint, id: Int) => (id, instance.features, instance.label) }
zipWithIndex.map { case (instance: LabeledPoint, id: Int) =>
(id, instance.features, instance.label)
}
val trainingDF = { val trainingDF = {
val rowList = loadLabelPoints(getClass.getResource("/machine.txt.train").getFile, val labeledPointsRDD = sc.parallelize(Regression.train, numWorkers)
zeroBased = true)
val labeledPointsRDD = sc.parallelize(rowList, numWorkers)
val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate() val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate()
import sparkSession.implicits._ import sparkSession.implicits._
sparkSession.createDataset(labeledPointsRDD).toDF sparkSession.createDataset(labeledPointsRDD).toDF
@ -136,7 +129,7 @@ class XGBoostDFSuite extends SharedSparkContext with Utils {
test("test schema of XGBoostClassificationModel") { test("test schema of XGBoostClassificationModel") {
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic") "objective" -> "binary:logistic")
val testItr = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator. val testItr = Classification.test.iterator.
zipWithIndex.map { case (instance: LabeledPoint, id: Int) => zipWithIndex.map { case (instance: LabeledPoint, id: Int) =>
(id, instance.features, instance.label) (id, instance.features, instance.label)
} }
@ -199,7 +192,7 @@ class XGBoostDFSuite extends SharedSparkContext with Utils {
"objective" -> "binary:logistic", "tree_method" -> "hist", "objective" -> "binary:logistic", "tree_method" -> "hist",
"grow_policy" -> "depthwise", "max_depth" -> "2", "max_bin" -> "2", "grow_policy" -> "depthwise", "max_depth" -> "2", "max_bin" -> "2",
"eval_metric" -> "error") "eval_metric" -> "error")
val testItr = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator val testItr = Classification.test.iterator
val trainingDF = buildTrainingDataframe() val trainingDF = buildTrainingDataframe()
val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap, val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap,
round = 10, nWorkers = math.min(2, numWorkers)) round = 10, nWorkers = math.min(2, numWorkers))
@ -245,25 +238,17 @@ class XGBoostDFSuite extends SharedSparkContext with Utils {
} }
test("test DF use nested groupData") { test("test DF use nested groupData") {
val testItr = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile).iterator. val testItr = Ranking.test.iterator.zipWithIndex
zipWithIndex.map { case (instance: LabeledPoint, id: Int) => .map { case (instance: LabeledPoint, id: Int) => (id, instance.features, instance.label) }
(id, instance.features, instance.label)
}
val trainingDF = { val trainingDF = {
val rowList0 = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) val labeledPointsRDD0 = sc.parallelize(Ranking.train0, numSlices = 1)
val labeledPointsRDD0 = sc.parallelize(rowList0, numSlices = 1) val labeledPointsRDD1 = sc.parallelize(Ranking.train1, numSlices = 1)
val rowList1 = loadLabelPoints(getClass.getResource("/rank-demo-1.txt.train").getFile)
val labeledPointsRDD1 = sc.parallelize(rowList1, numSlices = 1)
val labeledPointsRDD = labeledPointsRDD0.union(labeledPointsRDD1) val labeledPointsRDD = labeledPointsRDD0.union(labeledPointsRDD1)
val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate() val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate()
import sparkSession.implicits._ import sparkSession.implicits._
sparkSession.createDataset(labeledPointsRDD).toDF sparkSession.createDataset(labeledPointsRDD).toDF
} }
val trainGroupData0: Seq[Int] = Source.fromFile( val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0, Ranking.trainGroup1)
getClass.getResource("/rank-demo-0.txt.train.group").getFile).getLines().map(_.toInt).toList
val trainGroupData1: Seq[Int] = Source.fromFile(
getClass.getResource("/rank-demo-1.txt.train.group").getFile).getLines().map(_.toInt).toList
val trainGroupData: Seq[Seq[Int]] = Seq(trainGroupData0, trainGroupData1)
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "rank:pairwise", "groupData" -> trainGroupData) "objective" -> "rank:pairwise", "groupData" -> trainGroupData)

View File

@ -20,7 +20,6 @@ import java.nio.file.Files
import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.LinkedBlockingDeque
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.io.Source
import scala.util.Random import scala.util.Random
import ml.dmlc.xgboost4j.java.{Rabit, DMatrix => JDMatrix} import ml.dmlc.xgboost4j.java.{Rabit, DMatrix => JDMatrix}
@ -75,7 +74,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
} }
test("build RDD containing boosters with the specified worker number") { test("build RDD containing boosters with the specified worker number") {
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val boosterRDD = XGBoost.buildDistributedBoosters( val boosterRDD = XGBoost.buildDistributedBoosters(
trainingRDD, trainingRDD,
List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", List("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
@ -90,10 +89,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
test("training with external memory cache") { test("training with external memory cache") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic").toMap "objective" -> "binary:logistic").toMap
val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5,
@ -106,10 +104,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
test("training with Scala-implemented Rabit tracker") { test("training with Scala-implemented Rabit tracker") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic", "objective" -> "binary:logistic",
"tracker_conf" -> TrackerConf(60 * 60 * 1000, "scala")).toMap "tracker_conf" -> TrackerConf(60 * 60 * 1000, "scala")).toMap
@ -121,10 +118,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
ignore("test with fast histo depthwise") { ignore("test with fast histo depthwise") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic", "tree_method" -> "hist", "objective" -> "binary:logistic", "tree_method" -> "hist",
"grow_policy" -> "depthwise", "eval_metric" -> "error") "grow_policy" -> "depthwise", "eval_metric" -> "error")
@ -137,10 +133,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
ignore("test with fast histo lossguide") { ignore("test with fast histo lossguide") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "1", val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "1",
"objective" -> "binary:logistic", "tree_method" -> "hist", "objective" -> "binary:logistic", "tree_method" -> "hist",
"grow_policy" -> "lossguide", "max_leaves" -> "8", "eval_metric" -> "error") "grow_policy" -> "lossguide", "max_leaves" -> "8", "eval_metric" -> "error")
@ -153,10 +148,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
ignore("test with fast histo lossguide with max bin") { ignore("test with fast histo lossguide with max bin") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0",
"objective" -> "binary:logistic", "tree_method" -> "hist", "objective" -> "binary:logistic", "tree_method" -> "hist",
"grow_policy" -> "lossguide", "max_leaves" -> "8", "max_bin" -> "16", "grow_policy" -> "lossguide", "max_leaves" -> "8", "max_bin" -> "16",
@ -170,10 +164,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
ignore("test with fast histo depthwidth with max depth") { ignore("test with fast histo depthwidth with max depth") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0",
"objective" -> "binary:logistic", "tree_method" -> "hist", "objective" -> "binary:logistic", "tree_method" -> "hist",
"grow_policy" -> "depthwise", "max_leaves" -> "8", "max_depth" -> "2", "grow_policy" -> "depthwise", "max_leaves" -> "8", "max_depth" -> "2",
@ -187,10 +180,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
ignore("test with fast histo depthwidth with max depth and max bin") { ignore("test with fast histo depthwidth with max depth and max bin") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0",
"objective" -> "binary:logistic", "tree_method" -> "hist", "objective" -> "binary:logistic", "tree_method" -> "hist",
"grow_policy" -> "depthwise", "max_depth" -> "2", "max_bin" -> "2", "grow_policy" -> "depthwise", "max_depth" -> "2", "max_bin" -> "2",
@ -241,8 +233,8 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
} }
test("test consistency of prediction functions with RDD") { test("test consistency of prediction functions with RDD") {
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile) val testSet = Classification.test
val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features) val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features)
val testCollection = testRDD.collect() val testCollection = testRDD.collect()
for (i <- testSet.indices) { for (i <- testSet.indices) {
@ -262,7 +254,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
} }
test("test eval functions with RDD") { test("test eval functions with RDD") {
val trainingRDD = buildTrainingRDD(sc).cache() val trainingRDD = sc.parallelize(Classification.train).cache()
val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic") "objective" -> "binary:logistic")
val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, nWorkers = numWorkers) val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, nWorkers = numWorkers)
@ -276,7 +268,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
sparkContext.getOrElse(sc).parallelize(List[SparkVector](), numWorkers) sparkContext.getOrElse(sc).parallelize(List[SparkVector](), numWorkers)
} }
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testRDD = buildEmptyRDD() val testRDD = buildEmptyRDD()
val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "1", val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
"objective" -> "binary:logistic").toMap "objective" -> "binary:logistic").toMap
@ -286,10 +278,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
test("test model consistency after save and load") { test("test model consistency after save and load") {
val eval = new EvalError() val eval = new EvalError()
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator
import DataUtils._ import DataUtils._
val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null))
val tempDir = Files.createTempDirectory("xgboosttest-") val tempDir = Files.createTempDirectory("xgboosttest-")
val tempFile = Files.createTempFile(tempDir, "", "") val tempFile = Files.createTempFile(tempDir, "", "")
val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
@ -308,7 +299,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
test("test save and load of different types of models") { test("test save and load of different types of models") {
val tempDir = Files.createTempDirectory("xgboosttest-") val tempDir = Files.createTempDirectory("xgboosttest-")
val tempFile = Files.createTempFile(tempDir, "", "") val tempFile = Files.createTempFile(tempDir, "", "")
val trainingRDD = buildTrainingRDD(sc) val trainingRDD = sc.parallelize(Classification.train)
var paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", var paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "reg:linear") "objective" -> "reg:linear")
// validate regression model // validate regression model
@ -343,12 +334,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
} }
test("test use groupData") { test("test use groupData") {
val trainSet = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) val trainingRDD = sc.parallelize(Ranking.train0, numSlices = 1)
val trainingRDD = sc.parallelize(trainSet, numSlices = 1) val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0)
val trainGroupData: Seq[Seq[Int]] = Seq(Source.fromFile( val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features)
getClass.getResource("/rank-demo-0.txt.train.group").getFile).getLines().map(_.toInt).toList)
val testSet = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile)
val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features)
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "rank:pairwise", "eval_metric" -> "ndcg", "groupData" -> trainGroupData) "objective" -> "rank:pairwise", "eval_metric" -> "ndcg", "groupData" -> trainGroupData)
@ -363,20 +351,13 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
} }
test("test use nested groupData") { test("test use nested groupData") {
val trainSet0 = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) val trainingRDD0 = sc.parallelize(Ranking.train0, numSlices = 1)
val trainingRDD0 = sc.parallelize(trainSet0, numSlices = 1) val trainingRDD1 = sc.parallelize(Ranking.train1, numSlices = 1)
val trainSet1 = loadLabelPoints(getClass.getResource("/rank-demo-1.txt.train").getFile)
val trainingRDD1 = sc.parallelize(trainSet1, numSlices = 1)
val trainingRDD = trainingRDD0.union(trainingRDD1) val trainingRDD = trainingRDD0.union(trainingRDD1)
val trainGroupData0: Seq[Int] = Source.fromFile( val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0, Ranking.trainGroup1)
getClass.getResource("/rank-demo-0.txt.train.group").getFile).getLines().map(_.toInt).toList
val trainGroupData1: Seq[Int] = Source.fromFile(
getClass.getResource("/rank-demo-1.txt.train.group").getFile).getLines().map(_.toInt).toList
val trainGroupData: Seq[Seq[Int]] = Seq(trainGroupData0, trainGroupData1)
val testSet = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile) val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features)
val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features)
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "rank:pairwise", "groupData" -> trainGroupData) "objective" -> "rank:pairwise", "groupData" -> trainGroupData)
@ -388,11 +369,8 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils {
} }
test("test use base margin") { test("test use base margin") {
val trainSet = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) val trainRDD = sc.parallelize(Ranking.train0, numSlices = 1)
val trainRDD = sc.parallelize(trainSet, numSlices = 1) val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features)
val testSet = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile)
val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features)
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "rank:pairwise") "objective" -> "rank:pairwise")