From 66874f57779a28da7fd7451906299702305bed7b Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Wed, 12 Jul 2017 18:13:55 +0200 Subject: [PATCH] [jvm-packages] Deduplicated train/test data access in tests (#2507) * [jvm-packages] Deduplicated train/test data access in tests All datasets are now available via a unified API, e.g. Agaricus.test. The only exception is the dermatology data which requires parsing a CSV file. * Inlined Utils.buildTrainingRDD The default number of partitions for local mode is equal to the number of available CPUs. * Replaced dataset names with problem types --- .../xgboost4j/scala/spark/TrainTestData.scala | 74 ++++++++++++++++ .../ml/dmlc/xgboost4j/scala/spark/Utils.scala | 62 +------------- .../scala/spark/XGBoostConfigureSuite.scala | 8 +- .../scala/spark/XGBoostDFSuite.scala | 45 ++++------ .../scala/spark/XGBoostGeneralSuite.scala | 84 +++++++------------ 5 files changed, 124 insertions(+), 149 deletions(-) create mode 100644 jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala new file mode 100644 index 000000000..f9488605c --- /dev/null +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala @@ -0,0 +1,74 @@ +/* + Copyright (c) 2014 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j.scala.spark + +import scala.io.Source + +import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} +import org.apache.spark.ml.linalg.{Vectors => MLVectors} + +trait TrainTestData { + protected def getResourceLines(resource: String): Iterator[String] = { + require(resource.startsWith("/"), "resource must start with /") + val is = getClass.getResourceAsStream(resource) + if (is == null) { + sys.error(s"failed to resolve resource $resource") + } + + Source.fromInputStream(is).getLines() + } + + protected def getLabeledPoints(resource: String, zeroBased: Boolean): Seq[MLLabeledPoint] = { + getResourceLines(resource).map { line => + val labelAndFeatures = line.split(" ") + val label = labelAndFeatures.head.toDouble + val values = new Array[Double](126) + for (feature <- labelAndFeatures.tail) { + val idAndValue = feature.split(":") + if (!zeroBased) { + values(idAndValue(0).toInt - 1) = idAndValue(1).toDouble + } else { + values(idAndValue(0).toInt) = idAndValue(1).toDouble + } + } + + MLLabeledPoint(label, MLVectors.dense(values)) + }.toList + } +} + +object Classification extends TrainTestData { + val train: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.train", zeroBased = false) + val test: Seq[MLLabeledPoint] = getLabeledPoints("/agaricus.txt.test", zeroBased = false) +} + +object Regression extends TrainTestData { + val train: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.train", zeroBased = true) + val test: Seq[MLLabeledPoint] = getLabeledPoints("/machine.txt.test", zeroBased = true) +} + +object Ranking extends TrainTestData { + val train0: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-0.txt.train", zeroBased = false) + val train1: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo-1.txt.train", zeroBased = false) + val trainGroup0: Seq[Int] = getGroups("/rank-demo-0.txt.train.group") + val trainGroup1: Seq[Int] = getGroups("/rank-demo-1.txt.train.group") + val test: Seq[MLLabeledPoint] = getLabeledPoints("/rank-demo.txt.test", zeroBased = false) + + private def getGroups(resource: String): Seq[Int] = { + getResourceLines(resource).map(_.toInt).toList + } +} diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala index f50c8011d..b9d5b0677 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala @@ -18,18 +18,8 @@ package ml.dmlc.xgboost4j.scala.spark import java.io.File -import scala.collection.mutable.ListBuffer -import scala.io.Source - -import org.apache.spark.SparkContext -import org.apache.spark.ml.feature.LabeledPoint -import org.apache.spark.ml.linalg.{DenseVector, Vector => SparkVector} -import org.apache.spark.rdd.RDD - trait Utils extends Serializable { - protected val numWorkers = Runtime.getRuntime().availableProcessors() - - protected var labeledPointsRDD: RDD[LabeledPoint] = null + protected val numWorkers: Int = Runtime.getRuntime.availableProcessors() protected def cleanExternalCache(prefix: String): Unit = { val dir = new File(".") @@ -37,54 +27,4 @@ trait Utils extends Serializable { file.delete() } } - - protected def loadLabelPoints(filePath: String, zeroBased: Boolean = false): - List[LabeledPoint] = { - val file = Source.fromFile(new File(filePath)) - val sampleList = new ListBuffer[LabeledPoint] - for (sample <- file.getLines()) { - sampleList += fromColValueStringToLabeledPoint(sample, zeroBased) - } - sampleList.toList - } - - protected def loadLabelAndVector(filePath: String, zeroBased: Boolean = false): - List[(Double, SparkVector)] = { - val file = Source.fromFile(new File(filePath)) - val sampleList = new ListBuffer[(Double, SparkVector)] - for (sample <- file.getLines()) { - sampleList += fromColValueStringToLabelAndVector(sample, zeroBased) - } - sampleList.toList - } - - protected def fromColValueStringToLabelAndVector(line: String, zeroBased: Boolean): - (Double, SparkVector) = { - val labelAndFeatures = line.split(" ") - val label = labelAndFeatures(0).toDouble - val features = labelAndFeatures.tail - val denseFeature = new Array[Double](126) - for (feature <- features) { - val idAndValue = feature.split(":") - if (!zeroBased) { - denseFeature(idAndValue(0).toInt - 1) = idAndValue(1).toDouble - } else { - denseFeature(idAndValue(0).toInt) = idAndValue(1).toDouble - } - } - (label, new DenseVector(denseFeature)) - } - - protected def fromColValueStringToLabeledPoint(line: String, zeroBased: Boolean): LabeledPoint = { - val (label, sv) = fromColValueStringToLabelAndVector(line, zeroBased) - LabeledPoint(label, sv) - } - - protected def buildTrainingRDD(sparkContext: SparkContext): RDD[LabeledPoint] = { - if (labeledPointsRDD == null) { - val sampleList = loadLabelPoints(getClass.getResource("/agaricus.txt.train").getFile) - labeledPointsRDD = sparkContext.parallelize(sampleList, numWorkers) - } - labeledPointsRDD - } } diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala index c45d1f1f1..f98e25b13 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostConfigureSuite.scala @@ -29,7 +29,7 @@ class XGBoostConfigureSuite extends FunSuite with Utils { val customSparkContext = new SparkContext(sparkConf) customSparkContext.setLogLevel("ERROR") // start another app - val trainingRDD = buildTrainingRDD(customSparkContext) + val trainingRDD = customSparkContext.parallelize(Classification.train) val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic", "nthread" -> 6) intercept[IllegalArgumentException] { @@ -39,17 +39,15 @@ class XGBoostConfigureSuite extends FunSuite with Utils { } test("kryoSerializer test") { - labeledPointsRDD = null val eval = new EvalError() val sparkConf = new SparkConf().setMaster("local[*]").setAppName("XGBoostSuite") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[Booster])) val customSparkContext = new SparkContext(sparkConf) customSparkContext.setLogLevel("ERROR") - val trainingRDD = buildTrainingRDD(customSparkContext) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = customSparkContext.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic") val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 5, numWorkers) diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala index a57cffc8f..2ea85c2e5 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostDFSuite.scala @@ -45,8 +45,8 @@ class XGBoostDFSuite extends SharedSparkContext with Utils { private def buildTrainingDataframe(sparkContext: Option[SparkContext] = None): DataFrame = { if (trainingDF == null) { - val rowList = loadLabelPoints(getClass.getResource("/agaricus.txt.train").getFile) - val labeledPointsRDD = sparkContext.getOrElse(sc).parallelize(rowList, numWorkers) + val labeledPointsRDD = sparkContext.getOrElse(sc) + .parallelize(Classification.train, numWorkers) val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate() import sparkSession.implicits._ trainingDF = sparkSession.createDataset(labeledPointsRDD).toDF @@ -57,10 +57,8 @@ class XGBoostDFSuite extends SharedSparkContext with Utils { test("test consistency and order preservation of dataframe-based model") { val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic") - val trainingItr = loadLabelPoints(getClass.getResource("/agaricus.txt.train").getFile). - iterator - val (testItr, auxTestItr) = - loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator.duplicate + val trainingItr = Classification.train.iterator + val (testItr, auxTestItr) = Classification.test.iterator.duplicate import DataUtils._ val round = 5 val trainDMatrix = new DMatrix(new JDMatrix(trainingItr, null)) @@ -91,7 +89,7 @@ class XGBoostDFSuite extends SharedSparkContext with Utils { test("test transformLeaf") { val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic") - val testItr = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val testItr = Classification.test.iterator val trainingDF = buildTrainingDataframe() val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap, round = 5, nWorkers = numWorkers) @@ -107,15 +105,10 @@ class XGBoostDFSuite extends SharedSparkContext with Utils { test("test schema of XGBoostRegressionModel") { val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "reg:linear") - val testItr = loadLabelPoints(getClass.getResource("/machine.txt.test").getFile, - zeroBased = true).iterator. - zipWithIndex.map { case (instance: LabeledPoint, id: Int) => - (id, instance.features, instance.label) - } + val testItr = Regression.test.iterator.zipWithIndex + .map { case (instance: LabeledPoint, id: Int) => (id, instance.features, instance.label) } val trainingDF = { - val rowList = loadLabelPoints(getClass.getResource("/machine.txt.train").getFile, - zeroBased = true) - val labeledPointsRDD = sc.parallelize(rowList, numWorkers) + val labeledPointsRDD = sc.parallelize(Regression.train, numWorkers) val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate() import sparkSession.implicits._ sparkSession.createDataset(labeledPointsRDD).toDF @@ -136,7 +129,7 @@ class XGBoostDFSuite extends SharedSparkContext with Utils { test("test schema of XGBoostClassificationModel") { val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic") - val testItr = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator. + val testItr = Classification.test.iterator. zipWithIndex.map { case (instance: LabeledPoint, id: Int) => (id, instance.features, instance.label) } @@ -199,7 +192,7 @@ class XGBoostDFSuite extends SharedSparkContext with Utils { "objective" -> "binary:logistic", "tree_method" -> "hist", "grow_policy" -> "depthwise", "max_depth" -> "2", "max_bin" -> "2", "eval_metric" -> "error") - val testItr = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val testItr = Classification.test.iterator val trainingDF = buildTrainingDataframe() val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingDF, paramMap, round = 10, nWorkers = math.min(2, numWorkers)) @@ -245,25 +238,17 @@ class XGBoostDFSuite extends SharedSparkContext with Utils { } test("test DF use nested groupData") { - val testItr = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile).iterator. - zipWithIndex.map { case (instance: LabeledPoint, id: Int) => - (id, instance.features, instance.label) - } + val testItr = Ranking.test.iterator.zipWithIndex + .map { case (instance: LabeledPoint, id: Int) => (id, instance.features, instance.label) } val trainingDF = { - val rowList0 = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) - val labeledPointsRDD0 = sc.parallelize(rowList0, numSlices = 1) - val rowList1 = loadLabelPoints(getClass.getResource("/rank-demo-1.txt.train").getFile) - val labeledPointsRDD1 = sc.parallelize(rowList1, numSlices = 1) + val labeledPointsRDD0 = sc.parallelize(Ranking.train0, numSlices = 1) + val labeledPointsRDD1 = sc.parallelize(Ranking.train1, numSlices = 1) val labeledPointsRDD = labeledPointsRDD0.union(labeledPointsRDD1) val sparkSession = SparkSession.builder().appName("XGBoostDFSuite").getOrCreate() import sparkSession.implicits._ sparkSession.createDataset(labeledPointsRDD).toDF } - val trainGroupData0: Seq[Int] = Source.fromFile( - getClass.getResource("/rank-demo-0.txt.train.group").getFile).getLines().map(_.toInt).toList - val trainGroupData1: Seq[Int] = Source.fromFile( - getClass.getResource("/rank-demo-1.txt.train.group").getFile).getLines().map(_.toInt).toList - val trainGroupData: Seq[Seq[Int]] = Seq(trainGroupData0, trainGroupData1) + val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0, Ranking.trainGroup1) val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "rank:pairwise", "groupData" -> trainGroupData) diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index 83ee6da9a..b9d40b5e7 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -20,7 +20,6 @@ import java.nio.file.Files import java.util.concurrent.LinkedBlockingDeque import scala.collection.mutable.ListBuffer -import scala.io.Source import scala.util.Random import ml.dmlc.xgboost4j.java.{Rabit, DMatrix => JDMatrix} @@ -75,7 +74,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { } test("build RDD containing boosters with the specified worker number") { - val trainingRDD = buildTrainingRDD(sc) + val trainingRDD = sc.parallelize(Classification.train) val boosterRDD = XGBoost.buildDistributedBoosters( trainingRDD, List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", @@ -90,10 +89,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { test("training with external memory cache") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic").toMap val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, @@ -106,10 +104,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { test("training with Scala-implemented Rabit tracker") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic", "tracker_conf" -> TrackerConf(60 * 60 * 1000, "scala")).toMap @@ -121,10 +118,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { ignore("test with fast histo depthwise") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "6", "silent" -> "1", "objective" -> "binary:logistic", "tree_method" -> "hist", "grow_policy" -> "depthwise", "eval_metric" -> "error") @@ -137,10 +133,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { ignore("test with fast histo lossguide") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "1", "objective" -> "binary:logistic", "tree_method" -> "hist", "grow_policy" -> "lossguide", "max_leaves" -> "8", "eval_metric" -> "error") @@ -153,10 +148,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { ignore("test with fast histo lossguide with max bin") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", "objective" -> "binary:logistic", "tree_method" -> "hist", "grow_policy" -> "lossguide", "max_leaves" -> "8", "max_bin" -> "16", @@ -170,10 +164,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { ignore("test with fast histo depthwidth with max depth") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", "objective" -> "binary:logistic", "tree_method" -> "hist", "grow_policy" -> "depthwise", "max_leaves" -> "8", "max_depth" -> "2", @@ -187,10 +180,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { ignore("test with fast histo depthwidth with max depth and max bin") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val paramMap = Map("eta" -> "1", "gamma" -> "0.5", "max_depth" -> "0", "silent" -> "0", "objective" -> "binary:logistic", "tree_method" -> "hist", "grow_policy" -> "depthwise", "max_depth" -> "2", "max_bin" -> "2", @@ -241,8 +233,8 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { } test("test consistency of prediction functions with RDD") { - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile) + val trainingRDD = sc.parallelize(Classification.train) + val testSet = Classification.test val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features) val testCollection = testRDD.collect() for (i <- testSet.indices) { @@ -262,7 +254,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { } test("test eval functions with RDD") { - val trainingRDD = buildTrainingRDD(sc).cache() + val trainingRDD = sc.parallelize(Classification.train).cache() val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic") val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, round = 5, nWorkers = numWorkers) @@ -276,7 +268,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { sparkContext.getOrElse(sc).parallelize(List[SparkVector](), numWorkers) } - val trainingRDD = buildTrainingRDD(sc) + val trainingRDD = sc.parallelize(Classification.train) val testRDD = buildEmptyRDD() val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "1", "objective" -> "binary:logistic").toMap @@ -286,10 +278,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { test("test model consistency after save and load") { val eval = new EvalError() - val trainingRDD = buildTrainingRDD(sc) - val testSet = loadLabelPoints(getClass.getResource("/agaricus.txt.test").getFile).iterator + val trainingRDD = sc.parallelize(Classification.train) import DataUtils._ - val testSetDMatrix = new DMatrix(new JDMatrix(testSet, null)) + val testSetDMatrix = new DMatrix(new JDMatrix(Classification.test.iterator, null)) val tempDir = Files.createTempDirectory("xgboosttest-") val tempFile = Files.createTempFile(tempDir, "", "") val paramMap = Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1", @@ -308,7 +299,7 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { test("test save and load of different types of models") { val tempDir = Files.createTempDirectory("xgboosttest-") val tempFile = Files.createTempFile(tempDir, "", "") - val trainingRDD = buildTrainingRDD(sc) + val trainingRDD = sc.parallelize(Classification.train) var paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "reg:linear") // validate regression model @@ -343,12 +334,9 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { } test("test use groupData") { - val trainSet = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) - val trainingRDD = sc.parallelize(trainSet, numSlices = 1) - val trainGroupData: Seq[Seq[Int]] = Seq(Source.fromFile( - getClass.getResource("/rank-demo-0.txt.train.group").getFile).getLines().map(_.toInt).toList) - val testSet = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile) - val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features) + val trainingRDD = sc.parallelize(Ranking.train0, numSlices = 1) + val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0) + val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features) val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "rank:pairwise", "eval_metric" -> "ndcg", "groupData" -> trainGroupData) @@ -363,20 +351,13 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { } test("test use nested groupData") { - val trainSet0 = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) - val trainingRDD0 = sc.parallelize(trainSet0, numSlices = 1) - val trainSet1 = loadLabelPoints(getClass.getResource("/rank-demo-1.txt.train").getFile) - val trainingRDD1 = sc.parallelize(trainSet1, numSlices = 1) + val trainingRDD0 = sc.parallelize(Ranking.train0, numSlices = 1) + val trainingRDD1 = sc.parallelize(Ranking.train1, numSlices = 1) val trainingRDD = trainingRDD0.union(trainingRDD1) - val trainGroupData0: Seq[Int] = Source.fromFile( - getClass.getResource("/rank-demo-0.txt.train.group").getFile).getLines().map(_.toInt).toList - val trainGroupData1: Seq[Int] = Source.fromFile( - getClass.getResource("/rank-demo-1.txt.train.group").getFile).getLines().map(_.toInt).toList - val trainGroupData: Seq[Seq[Int]] = Seq(trainGroupData0, trainGroupData1) + val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0, Ranking.trainGroup1) - val testSet = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile) - val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features) + val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features) val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "rank:pairwise", "groupData" -> trainGroupData) @@ -388,11 +369,8 @@ class XGBoostGeneralSuite extends SharedSparkContext with Utils { } test("test use base margin") { - val trainSet = loadLabelPoints(getClass.getResource("/rank-demo-0.txt.train").getFile) - val trainRDD = sc.parallelize(trainSet, numSlices = 1) - - val testSet = loadLabelPoints(getClass.getResource("/rank-demo.txt.test").getFile) - val testRDD = sc.parallelize(testSet, numSlices = 1).map(_.features) + val trainRDD = sc.parallelize(Ranking.train0, numSlices = 1) + val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features) val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "rank:pairwise")