force the user to set number of workers

This commit is contained in:
CodingCat 2016-03-12 13:33:57 -05:00
parent 980898f3fb
commit 16b9e92328
6 changed files with 20 additions and 26 deletions

View File

@ -114,7 +114,7 @@ val trainRDD = MLUtils.loadLibSVMFile(sc, inputTrainPath).repartition(args(1).to
We move forward to train the models: We move forward to train the models:
```scala ```scala
val xgboostModel = XGBoost.train(trainRDD, paramMap, numRound) val xgboostModel = XGBoost.train(trainRDD, paramMap, numRound, numWorkers)
``` ```
@ -147,7 +147,7 @@ val trainData = MLUtils.readLibSVM(env, "/path/to/data/agaricus.txt.train")
Model Training can be done as follows Model Training can be done as follows
```scala ```scala
val xgboostModel = XGBoost.train(trainData, paramMap, round) val xgboostModel = XGBoost.train(trainData, paramMap, round, nWorkers)
``` ```

View File

@ -67,7 +67,8 @@ object DistTrainWithSpark {
"eta" -> 0.1f, "eta" -> 0.1f,
"max_depth" -> 2, "max_depth" -> 2,
"objective" -> "binary:logistic").toMap "objective" -> "binary:logistic").toMap
val model = XGBoost.train(trainRDD, paramMap, numRound) // use 5 distributed workers to train the model
val model = XGBoost.train(trainRDD, paramMap, numRound, nWorkers = 5)
// save model to HDFS path // save model to HDFS path
model.saveModelToHadoop(outputModelPath) model.saveModelToHadoop(outputModelPath)
} }
@ -94,8 +95,9 @@ object DistTrainWithFlink {
"objective" -> "binary:logistic").toMap "objective" -> "binary:logistic").toMap
// number of iterations // number of iterations
val round = 2 val round = 2
val nWorkers = 5
// train the model // train the model
val model = XGBoost.train(trainData, paramMap, round) val model = XGBoost.train(trainData, paramMap, round, nWorkers)
val predTrain = model.predict(trainData.map{x => x.vector}) val predTrain = model.predict(trainData.map{x => x.vector})
model.saveModelToHadoop("file:///path/to/xgboost.model") model.saveModelToHadoop("file:///path/to/xgboost.model")
} }

View File

@ -33,8 +33,9 @@ object DistTrainWithFlink {
"objective" -> "binary:logistic").toMap "objective" -> "binary:logistic").toMap
// number of iterations // number of iterations
val round = 2 val round = 2
val nWorkers = 5
// train the model // train the model
val model = XGBoost.train(trainData, paramMap, round) val model = XGBoost.train(trainData, paramMap, round, 5)
val predTest = model.predict(testData.map{x => x.vector}) val predTest = model.predict(testData.map{x => x.vector})
model.saveModelAsHadoopFile("file:///path/to/xgboost.model") model.saveModelAsHadoopFile("file:///path/to/xgboost.model")
} }

View File

@ -72,10 +72,7 @@ object XGBoost {
*/ */
def loadModelFromHadoopFile(modelPath: String) : XGBoostModel = { def loadModelFromHadoopFile(modelPath: String) : XGBoostModel = {
new XGBoostModel( new XGBoostModel(
XGBoostScala.loadModel( XGBoostScala.loadModel(FileSystem.get(new Configuration).open(new Path(modelPath))))
FileSystem
.get(new Configuration)
.open(new Path(modelPath))))
} }
/** /**
@ -85,11 +82,9 @@ object XGBoost {
* @param params The parameters to XGBoost. * @param params The parameters to XGBoost.
* @param round Number of rounds to train. * @param round Number of rounds to train.
*/ */
def train( def train(dtrain: DataSet[LabeledVector], params: Map[String, Any], round: Int, nWorkers: Int):
dtrain: DataSet[LabeledVector], XGBoostModel = {
params: Map[String, Any], val tracker = new RabitTracker(nWorkers)
round: Int): XGBoostModel = {
val tracker = new RabitTracker(dtrain.getExecutionEnvironment.getParallelism)
if (tracker.start()) { if (tracker.start()) {
dtrain dtrain
.mapPartition(new MapFunction(params, round, tracker.getWorkerEnvs)) .mapPartition(new MapFunction(params, round, tracker.getWorkerEnvs))

View File

@ -45,8 +45,10 @@ object XGBoost extends Serializable {
import DataUtils._ import DataUtils._
val partitionedData = { val partitionedData = {
if (numWorkers > trainingData.partitions.length) { if (numWorkers > trainingData.partitions.length) {
logger.info(s"repartitioning training set to $numWorkers partitions")
trainingData.repartition(numWorkers) trainingData.repartition(numWorkers)
} else if (numWorkers < trainingData.partitions.length) { } else if (numWorkers < trainingData.partitions.length) {
logger.info(s"repartitioning training set to $numWorkers partitions")
trainingData.coalesce(numWorkers) trainingData.coalesce(numWorkers)
} else { } else {
trainingData trainingData
@ -79,7 +81,9 @@ object XGBoost extends Serializable {
*/ */
@throws(classOf[XGBoostError]) @throws(classOf[XGBoostError])
def train(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int, def train(trainingData: RDD[LabeledPoint], configMap: Map[String, Any], round: Int,
nWorkers: Int = 0, obj: ObjectiveTrait = null, eval: EvalTrait = null): XGBoostModel = { nWorkers: Int, obj: ObjectiveTrait = null, eval: EvalTrait = null): XGBoostModel = {
require(nWorkers > 0, "you must specify more than 0 workers")
val tracker = new RabitTracker(nWorkers)
implicit val sc = trainingData.sparkContext implicit val sc = trainingData.sparkContext
var overridedConfMap = configMap var overridedConfMap = configMap
if (overridedConfMap.contains("nthread")) { if (overridedConfMap.contains("nthread")) {
@ -91,17 +95,9 @@ object XGBoost extends Serializable {
} else { } else {
overridedConfMap = configMap + ("nthread" -> sc.getConf.get("spark.task.cpus", "1").toInt) overridedConfMap = configMap + ("nthread" -> sc.getConf.get("spark.task.cpus", "1").toInt)
} }
val numWorkers = {
if (nWorkers > 0) {
nWorkers
} else {
trainingData.partitions.length
}
}
val tracker = new RabitTracker(numWorkers)
require(tracker.start(), "FAULT: Failed to start tracker") require(tracker.start(), "FAULT: Failed to start tracker")
val boosters = buildDistributedBoosters(trainingData, overridedConfMap, val boosters = buildDistributedBoosters(trainingData, overridedConfMap,
tracker.getWorkerEnvs.asScala, numWorkers, round, obj, eval) tracker.getWorkerEnvs.asScala, nWorkers, round, obj, eval)
val sparkJobThread = new Thread() { val sparkJobThread = new Thread() {
override def run() { override def run() {
// force the job // force the job

View File

@ -148,7 +148,7 @@ class XGBoostSuite extends FunSuite with BeforeAndAfter {
val tempFile = Files.createTempFile(tempDir, "", "") val tempFile = Files.createTempFile(tempDir, "", "")
val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0", val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0",
"objective" -> "binary:logistic").toMap "objective" -> "binary:logistic").toMap
val xgBoostModel = XGBoost.train(trainingRDD, paramMap, 5) val xgBoostModel = XGBoost.train(trainingRDD, paramMap, 5, numWorkers)
assert(eval.eval(xgBoostModel.predict(testSetDMatrix), testSetDMatrix) < 0.1) assert(eval.eval(xgBoostModel.predict(testSetDMatrix), testSetDMatrix) < 0.1)
xgBoostModel.saveModelAsHadoopFile(tempFile.toFile.getAbsolutePath) xgBoostModel.saveModelAsHadoopFile(tempFile.toFile.getAbsolutePath)
val loadedXGBooostModel = XGBoost.loadModelFromHadoopFile(tempFile.toFile.getAbsolutePath) val loadedXGBooostModel = XGBoost.loadModelFromHadoopFile(tempFile.toFile.getAbsolutePath)
@ -167,7 +167,7 @@ class XGBoostSuite extends FunSuite with BeforeAndAfter {
val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0", val paramMap = List("eta" -> "1", "max_depth" -> "2", "silent" -> "0",
"objective" -> "binary:logistic", "nthread" -> 6).toMap "objective" -> "binary:logistic", "nthread" -> 6).toMap
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
XGBoost.train(trainingRDD, paramMap, 5) XGBoost.train(trainingRDD, paramMap, 5, numWorkers)
} }
customSparkContext.stop() customSparkContext.stop()
} }