[FLINK] remove nWorker from API
This commit is contained in:
parent
084ed6224d
commit
90f7220736
@ -147,7 +147,7 @@ val trainData = MLUtils.readLibSVM(env, "/path/to/data/agaricus.txt.train")
|
|||||||
Model Training can be done as follows
|
Model Training can be done as follows
|
||||||
|
|
||||||
```scala
|
```scala
|
||||||
val xgboostModel = XGBoost.train(trainData, paramMap, round, nWorkers)
|
val xgboostModel = XGBoost.train(trainData, paramMap, round)
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@ -72,7 +72,7 @@ object DistTrainWithSpark {
|
|||||||
"eta" -> 0.1f,
|
"eta" -> 0.1f,
|
||||||
"max_depth" -> 2,
|
"max_depth" -> 2,
|
||||||
"objective" -> "binary:logistic").toMap
|
"objective" -> "binary:logistic").toMap
|
||||||
// use 5 distributed workers to train the model
|
// use 5 distributed workers to train the model
|
||||||
val model = XGBoost.train(trainRDD, paramMap, numRound, nWorkers = 5)
|
val model = XGBoost.train(trainRDD, paramMap, numRound, nWorkers = 5)
|
||||||
// save model to HDFS path
|
// save model to HDFS path
|
||||||
model.saveModelToHadoop(outputModelPath)
|
model.saveModelToHadoop(outputModelPath)
|
||||||
@ -100,9 +100,8 @@ object DistTrainWithFlink {
|
|||||||
"objective" -> "binary:logistic").toMap
|
"objective" -> "binary:logistic").toMap
|
||||||
// number of iterations
|
// number of iterations
|
||||||
val round = 2
|
val round = 2
|
||||||
val nWorkers = 5
|
|
||||||
// train the model
|
// train the model
|
||||||
val model = XGBoost.train(trainData, paramMap, round, nWorkers)
|
val model = XGBoost.train(trainData, paramMap, round)
|
||||||
val predTrain = model.predict(trainData.map{x => x.vector})
|
val predTrain = model.predict(trainData.map{x => x.vector})
|
||||||
model.saveModelToHadoop("file:///path/to/xgboost.model")
|
model.saveModelToHadoop("file:///path/to/xgboost.model")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -33,9 +33,8 @@ object DistTrainWithFlink {
|
|||||||
"objective" -> "binary:logistic").toMap
|
"objective" -> "binary:logistic").toMap
|
||||||
// number of iterations
|
// number of iterations
|
||||||
val round = 2
|
val round = 2
|
||||||
val nWorkers = 5
|
|
||||||
// train the model
|
// train the model
|
||||||
val model = XGBoost.train(trainData, paramMap, round, 5)
|
val model = XGBoost.train(trainData, paramMap, round)
|
||||||
val predTest = model.predict(testData.map{x => x.vector})
|
val predTest = model.predict(testData.map{x => x.vector})
|
||||||
model.saveModelAsHadoopFile("file:///path/to/xgboost.model")
|
model.saveModelAsHadoopFile("file:///path/to/xgboost.model")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -82,9 +82,9 @@ object XGBoost {
|
|||||||
* @param params The parameters to XGBoost.
|
* @param params The parameters to XGBoost.
|
||||||
* @param round Number of rounds to train.
|
* @param round Number of rounds to train.
|
||||||
*/
|
*/
|
||||||
def train(dtrain: DataSet[LabeledVector], params: Map[String, Any], round: Int, nWorkers: Int):
|
def train(dtrain: DataSet[LabeledVector], params: Map[String, Any], round: Int):
|
||||||
XGBoostModel = {
|
XGBoostModel = {
|
||||||
val tracker = new RabitTracker(nWorkers)
|
val tracker = new RabitTracker(dtrain.getExecutionEnvironment.getParallelism)
|
||||||
if (tracker.start()) {
|
if (tracker.start()) {
|
||||||
dtrain
|
dtrain
|
||||||
.mapPartition(new MapFunction(params, round, tracker.getWorkerEnvs))
|
.mapPartition(new MapFunction(params, round, tracker.getWorkerEnvs))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user