From 16b9e92328e238b80903ccee0f84a33b03db4630 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 12 Mar 2016 13:33:57 -0500 Subject: [PATCH] force the user to set number of workers --- doc/jvm/xgboost4j-intro.md | 4 ++-- jvm-packages/README.md | 6 ++++-- .../scala/example/flink/DistTrainWithFlink.scala | 3 ++- .../ml/dmlc/xgboost4j/scala/flink/XGBoost.scala | 13 ++++--------- .../ml/dmlc/xgboost4j/scala/spark/XGBoost.scala | 16 ++++++---------- .../xgboost4j/scala/spark/XGBoostSuite.scala | 4 ++-- 6 files changed, 20 insertions(+), 26 deletions(-) diff --git a/doc/jvm/xgboost4j-intro.md b/doc/jvm/xgboost4j-intro.md index 09d5b29c7..05aaf419c 100644 --- a/doc/jvm/xgboost4j-intro.md +++ b/doc/jvm/xgboost4j-intro.md @@ -114,7 +114,7 @@ val trainRDD = MLUtils.loadLibSVMFile(sc, inputTrainPath).repartition(args(1).to We move forward to train the models: ```scala -val xgboostModel = XGBoost.train(trainRDD, paramMap, numRound) +val xgboostModel = XGBoost.train(trainRDD, paramMap, numRound, numWorkers) ``` @@ -147,7 +147,7 @@ val trainData = MLUtils.readLibSVM(env, "/path/to/data/agaricus.txt.train") Model Training can be done as follows ```scala -val xgboostModel = XGBoost.train(trainData, paramMap, round) +val xgboostModel = XGBoost.train(trainData, paramMap, round, nWorkers) ``` diff --git a/jvm-packages/README.md b/jvm-packages/README.md index 28dbba685..a390a7288 100644 --- a/jvm-packages/README.md +++ b/jvm-packages/README.md @@ -67,7 +67,8 @@ object DistTrainWithSpark { "eta" -> 0.1f, "max_depth" -> 2, "objective" -> "binary:logistic").toMap - val model = XGBoost.train(trainRDD, paramMap, numRound) + // use 5 distributed workers to train the model + val model = XGBoost.train(trainRDD, paramMap, numRound, nWorkers = 5) // save model to HDFS path model.saveModelToHadoop(outputModelPath) } @@ -94,8 +95,9 @@ object DistTrainWithFlink { "objective" -> "binary:logistic").toMap // number of iterations val round = 2 + val nWorkers = 5 // train the model - val model = XGBoost.train(trainData, paramMap, round) + val model = XGBoost.train(trainData, paramMap, round, nWorkers) val predTrain = model.predict(trainData.map{x => x.vector}) model.saveModelToHadoop("file:///path/to/xgboost.model") } diff --git a/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/flink/DistTrainWithFlink.scala b/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/flink/DistTrainWithFlink.scala index 74b24ac35..cd2dacb5a 100644 --- a/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/flink/DistTrainWithFlink.scala +++ b/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/flink/DistTrainWithFlink.scala @@ -33,8 +33,9 @@ object DistTrainWithFlink { "objective" -> "binary:logistic").toMap // number of iterations val round = 2 + val nWorkers = 5 // train the model - val model = XGBoost.train(trainData, paramMap, round) + val model = XGBoost.train(trainData, paramMap, round, 5) val predTest = model.predict(testData.map{x => x.vector}) model.saveModelAsHadoopFile("file:///path/to/xgboost.model") } diff --git a/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala b/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala index 3577ebcc1..3056d28a1 100644 --- a/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala +++ b/jvm-packages/xgboost4j-flink/src/main/scala/ml/dmlc/xgboost4j/scala/flink/XGBoost.scala @@ -72,10 +72,7 @@ object XGBoost { */ def loadModelFromHadoopFile(modelPath: String) : XGBoostModel = { new XGBoostModel( - XGBoostScala.loadModel( - FileSystem - .get(new Configuration) - .open(new Path(modelPath)))) + XGBoostScala.loadModel(FileSystem.get(new Configuration).open(new Path(modelPath)))) } /** @@ -85,11 +82,9 @@ object XGBoost { * @param params The parameters to XGBoost. * @param round Number of rounds to train. */ - def train( - dtrain: DataSet[LabeledVector], - params: Map[String, Any], - round: Int): XGBoostModel = { - val tracker = new RabitTracker(dtrain.getExecutionEnvironment.getParallelism) + def train(dtrain: DataSet[LabeledVector], params: Map[String, Any], round: Int, nWorkers: Int): + XGBoostModel = { + val tracker = new RabitTracker(nWorkers) if (tracker.start()) { dtrain .mapPartition(new MapFunction(params, round, tracker.getWorkerEnvs)) diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index 5fcdf81e5..dc1b5382c 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -45,8 +45,10 @@ object XGBoost extends Serializable { import DataUtils._ val partitionedData = { if (numWorkers > trainingData.partitions.length) { + logger.info(s"repartitioning training set to $numWorkers partitions") trainingData.repartition(numWorkers) } else if (numWorkers < trainingData.partitions.length) { + logger.info(s"repartitioning training set to $numWorkers partitions") trainingData.coalesce(numWorkers) } else { trainingData @@ -79,7 +81,9 @@ object XGBoost extends Serializable { */ @throws(classOf[XGBoostError]) def train(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int, - nWorkers: Int = 0, obj: ObjectiveTrait = null, eval: EvalTrait = null): XGBoostModel = { + nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null): XGBoostModel = { + require(nWorkers > 0, "you must specify more than 0 workers") + val tracker = new RabitTracker(nWorkers) implicit val sc = trainingData.sparkContext var overridedConfMap = configMap if (overridedConfMap.contains("nthread")) { @@ -91,17 +95,9 @@ object XGBoost extends Serializable { } else { overridedConfMap = configMap + ("nthread" -> sc.getConf.get("spark.task.cpus", "1").toInt) } - val numWorkers = { - if (nWorkers > 0) { - nWorkers - } else { - trainingData.partitions.length - } - } - val tracker = new RabitTracker(numWorkers) require(tracker.start(), "FAULT: Failed to start tracker") val boosters = buildDistributedBoosters(trainingData, overridedConfMap, - tracker.getWorkerEnvs.asScala, numWorkers, round, obj, eval) + tracker.getWorkerEnvs.asScala, nWorkers, round, obj, eval) val sparkJobThread = new Thread() { override def run() { // force the job diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostSuite.scala index 513d54cdf..26032694d 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostSuite.scala @@ -148,7 +148,7 @@ class XGBoostSuite extends FunSuite with BeforeAndAfter { val tempFile = Files.createTempFile(tempDir, "", "") val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0", "objective" -> "binary:logistic").toMap - val xgBoostModel = XGBoost.train(trainingRDD, paramMap, 5) + val xgBoostModel = XGBoost.train(trainingRDD, paramMap, 5, numWorkers) assert(eval.eval(xgBoostModel.predict(testSetDMatrix), testSetDMatrix) < 0.1) xgBoostModel.saveModelAsHadoopFile(tempFile.toFile.getAbsolutePath) val loadedXGBooostModel = XGBoost.loadModelFromHadoopFile(tempFile.toFile.getAbsolutePath) @@ -167,7 +167,7 @@ class XGBoostSuite extends FunSuite with BeforeAndAfter { val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0", "objective" -> "binary:logistic", "nthread" -> 6).toMap intercept[IllegalArgumentException] { - XGBoost.train(trainingRDD, paramMap, 5) + XGBoost.train(trainingRDD, paramMap, 5, numWorkers) } customSparkContext.stop() }