cleanup multi-node

This commit is contained in:
tqchen 2015-01-15 21:55:56 -08:00
parent b762231b02
commit b1f89f29b8
13 changed files with 31 additions and 252 deletions

View File

@ -1,38 +1,36 @@
Distributed XGBoost
======
This folder contains information about experimental version of distributed xgboost.
This folder contains information of Distributed XGBoost.
Build
=====
* In the root folder, run ```make```, this will give you xgboost, which uses rabit allreduce
- this version of xgboost should be fault tolerant eventually
* Alterniatively, run ```make mpi```, this will give you xgboost-mpi
- You will need to have MPI to build xgboost-mpi
Design Choice
=====
* XGBoost replies on [Rabit Library](https://github.com/tqchen/rabit)
* Rabit is an fault tolerant and portable allreduce library that provides Allreduce and Broadcast
* Since rabit is compatible with MPI, xgboost can be compiled using MPI backend
* How is the data distributed?
- There are two solvers in distributed xgboost
- Column-based solver split data by column, each node work on subset of columns,
it uses exactly the same algorithm as single node version.
- Row-based solver split data by row, each node work on subset of rows,
it uses an approximate histogram count algorithm, and will only examine subset of
potential split points as opposed to all split points.
- Hadoop version can run on the existing hadoop platform,
it use Rabit to submit jobs as map-reduce tasks.
* The distributed version is built on Rabit:[Reliable Allreduce and Broadcast Library](https://github.com/tqchen/rabit)
- Rabit is a portable library that provides fault-tolerance for Allreduce calls for distributed machine learning
- This makes xgboost portable and fault-tolerant against node failures
* You can run Distributed XGBoost on common platforms that rabit can port to,
including Hadoop(see [hadoop folder](hadoop)) and MPI
Usage
=====
* In the root folder, run ```./build.sh```, this will give you xgboost, which uses rabit allreduce
Notes
====
* You will need a network filesystem, or copy data to local file system before running the code
* xgboost can be used together with submission script provided in Rabit on different possible types of job scheduler
* ***Note*** The distributed version is still multi-threading optimized.
You should run one process per node that takes most available CPU,
this will reduce the communication overhead and improve the performance.
- One way to do that is limit mpi slot in each machine to be 1, or reserve nthread processors for each process.
* Examples:
- [Column-based version](col-split)
- [Row-based version](row-split)
* Rabit handles all the fault tolerant and communications efficiently, we only use platform specific command to start programs
- The Hadoop version does not rely on Mapreduce to do iterations
- You can expect xgboost not suffering the drawbacks of iterative MapReduce program
* The design choice was made because Allreduce is very natural and efficient for distributed tree building
- In current version of xgboost, the distributed version is only adds several lines of Allreduce synchronization code
* The multi-threading nature of xgboost is inheritated in distributed mode
- This means xgboost efficiently use all the threads in one machine, and communicates only between machines
- Remember to run on xgboost process per machine and this will give you maximum speedup
* For more information about rabit and how it works, see the [tutorial](https://github.com/tqchen/rabit/tree/master/guide)
Solvers
=====
There are two solvers in distributed xgboost. You can check for local demo of the two solvers, see [row-split](row-split) and [col-split](col-split)
* Column-based solver split data by column, each node work on subset of columns,
it uses exactly the same algorithm as single node version.
* Row-based solver split data by row, each node work on subset of rows,
it uses an approximate histogram count algorithm, and will only examine subset of
potential split points as opposed to all split points.
- This is the mode used by current hadoop version, since usually data was stored by rows in many industry system

View File

@ -4,8 +4,6 @@ Distributed XGBoost: Column Split Version
- mushroom-col-rabit.sh starts xgboost job using rabit's allreduce
* run ```bash mushroom-col-rabit-mock.sh <n-process>```
- mushroom-col-rabit-mock.sh starts xgboost job using rabit's allreduce, inserts suicide signal at certain point and test recovery
* run ```bash mushroom-col-mpi.sh <n-mpi-process>```
- mushroom-col.sh starts xgboost-mpi job
How to Use
====

View File

@ -1,24 +0,0 @@
#!/bin/bash
if [[ $# -ne 1 ]]
then
echo "Usage: nprocess"
exit -1
fi
rm -rf train.col* *.model
k=$1
# split the lib svm file into k subfiles
python splitsvm.py ../../demo/data/agaricus.txt.train train $k
# run xgboost mpi
mpirun -n $k ../../xgboost-mpi mushroom-col.conf dsplit=col
# the model can be directly loaded by single machine xgboost solver, as usuall
../../xgboost mushroom-col.conf task=dump model_in=0002.model fmap=../../demo/data/featmap.txt name_dump=dump.nice.$k.txt
# run for one round, and continue training
mpirun -n $k ../../xgboost-mpi mushroom-col.conf dsplit=col num_round=1
mpirun -n $k ../../xgboost-mpi mushroom-col.conf dsplit=col model_in=0001.model
cat dump.nice.$k.txt

View File

@ -1,22 +0,0 @@
#!/bin/bash
if [[ $# -ne 1 ]]
then
echo "Usage: nprocess"
exit -1
fi
#
# This script is same as mushroom-col except that we will be using xgboost python module
#
# xgboost used built in tcp-based allreduce module, and can be run on more enviroment, so long as we know how to start job by modifying ../submit_job_tcp.py
#
rm -rf train.col* *.model
k=$1
# split the lib svm file into k subfiles
python splitsvm.py ../../demo/data/agaricus.txt.train train $k
# run xgboost mpi
../../rabit/tracker/rabit_mpi.py $k local python mushroom-col.py
cat dump.nice.$k.txt

View File

@ -1,33 +0,0 @@
import os
import sys
path = os.path.dirname(__file__)
if path == '':
path = '.'
sys.path.append(path+'/../../wrapper')
import xgboost as xgb
# this is example script of running distributed xgboost using python
# call this additional function to intialize the xgboost sync module
# in distributed mode
xgb.sync_init(sys.argv)
rank = xgb.sync_get_rank()
# read in dataset
dtrain = xgb.DMatrix('train.col%d' % rank)
param = {'max_depth':3, 'eta':1, 'silent':1, 'objective':'binary:logistic' }
param['dsplit'] = 'col'
nround = 3
if rank == 0:
dtest = xgb.DMatrix('../../demo/data/agaricus.txt.test')
model = xgb.train(param, dtrain, nround, [(dtrain, 'train') , (dtest, 'test')])
else:
# if it is a slave node, do not run evaluation
model = xgb.train(param, dtrain, nround)
if rank == 0:
model.save_model('%04d.model' % nround)
# dump model with feature map
model.dump_model('dump.nice.%d.txt' % xgb.sync_get_world_size(),'../../demo/data/featmap.txt')
# shutdown the synchronization module
xgb.sync_finalize()

View File

@ -1,10 +1,8 @@
Distributed XGBoost: Row Split Version
====
* You might be interested to checkout the [Hadoop example](../hadoop)
* Machine Rabit: run ```bash machine-row-rabit.sh <n-mpi-process>```
- machine-col-rabit.sh starts xgboost job using rabit
* Mushroom: run ```bash mushroom-row-mpi.sh <n-mpi-process>```
* Machine: run ```bash machine-row-mpi.sh <n-mpi-process>```
- Machine case also include example to continue training from existing model
How to Use
====

View File

@ -1,20 +0,0 @@
#!/bin/bash
if [[ $# -ne 1 ]]
then
echo "Usage: nprocess"
exit -1
fi
rm -rf train-machine.row* *.model
k=$1
# make machine data
cd ../../demo/regression/
python mapfeat.py
python mknfold.py machine.txt 1
cd -
# split the lib svm file into k subfiles
python splitrows.py ../../demo/regression/machine.txt.train train-machine $k
# run xgboost mpi, take data from stdin
../submit_job_tcp.py $k "bash map.sh train-machine.row ../../xgboost machine-row.conf dsplit=row num_round=3 data=stdin"

View File

@ -1,24 +0,0 @@
#!/bin/bash
if [[ $# -ne 1 ]]
then
echo "Usage: nprocess"
exit -1
fi
rm -rf train-machine.row* *.model
k=$1
# make machine data
cd ../../demo/regression/
python mapfeat.py
python mknfold.py machine.txt 1
cd -
# split the lib svm file into k subfiles
python splitrows.py ../../demo/regression/machine.txt.train train-machine $k
# run xgboost mpi
mpirun -n $k ../../xgboost-mpi machine-row.conf dsplit=row num_round=3
# run xgboost-mpi save model 0001, continue to run from existing model
mpirun -n $k ../../xgboost-mpi machine-row.conf dsplit=row num_round=1
mpirun -n $k ../../xgboost-mpi machine-row.conf dsplit=row num_round=2 model_in=0001.model

View File

@ -1,3 +0,0 @@
# a simple script to simulate mapreduce mapper
echo "cat $1$OMPI_COMM_WORLD_RANK | ${@:2}"
cat $1$OMPI_COMM_WORLD_RANK | ${@:2}

View File

@ -1,19 +0,0 @@
#!/bin/bash
if [[ $# -ne 1 ]]
then
echo "Usage: nprocess"
exit -1
fi
rm -rf train.row* *.model
k=$1
# split the lib svm file into k subfiles
python splitrows.py ../../demo/data/agaricus.txt.train train $k
# run xgboost mpi
mpirun -n $k ../../xgboost-mpi mushroom-row.conf dsplit=row nthread=1
# the model can be directly loaded by single machine xgboost solver, as usuall
../../xgboost mushroom-row.conf task=dump model_in=0002.model fmap=../../demo/data/featmap.txt name_dump=dump.nice.$k.txt
cat dump.nice.$k.txt

View File

@ -1,35 +0,0 @@
# General Parameters, see comment for each definition
# choose the booster, can be gbtree or gblinear
booster = gbtree
# choose logistic regression loss function for binary classification
objective = binary:logistic
# Tree Booster Parameters
# step size shrinkage
eta = 1.0
# minimum loss reduction required to make a further partition
gamma = 1.0
# minimum sum of instance weight(hessian) needed in a child
min_child_weight = 1
# maximum depth of a tree
max_depth = 3
# Task Parameters
# the number of round to do boosting
num_round = 2
# 0 means do not save any model except the final round model
save_period = 0
use_buffer = 0
# The path of training data %d is the wildcard for the rank of the data
# The idea is each process take a feature matrix with subset of columns
#
data = "train.row%d"
# The path of validation data, used to monitor training process, here [test] sets name of the validation set
eval[test] = "../../demo/data/agaricus.txt.test"
# evaluate on training data as well each round
eval_train = 1
# The path of test data, need to use full data of test, try not use it, or keep an subsampled version
test:data = "../../demo/data/agaricus.txt.test"

View File

@ -33,10 +33,6 @@ xglib.XGBoosterCreate.restype = ctypes.c_void_p
xglib.XGBoosterPredict.restype = ctypes.POINTER(ctypes.c_float)
xglib.XGBoosterEvalOneIter.restype = ctypes.c_char_p
xglib.XGBoosterDumpModel.restype = ctypes.POINTER(ctypes.c_char_p)
# sync function
xglib.XGSyncGetRank.restype = ctypes.c_int
xglib.XGSyncGetWorldSize.restype = ctypes.c_int
# initialize communication module
def ctypes2numpy(cptr, length, dtype):
"""convert a ctypes pointer array to numpy array """
@ -557,17 +553,3 @@ def cv(params, dtrain, num_boost_round = 10, nfold=3, metrics=[], \
results.append(res)
return results
# synchronization module
def sync_init(args = sys.argv):
arr = (ctypes.c_char_p * len(args))()
arr[:] = args
xglib.XGSyncInit(len(args), arr)
def sync_finalize():
xglib.XGSyncFinalize()
def sync_get_rank():
return int(xglib.XGSyncGetRank())
def sync_get_world_size():
return int(xglib.XGSyncGetWorldSize())

View File

@ -82,23 +82,6 @@ class Booster: public learner::BoostLearner {
using namespace xgboost::wrapper;
extern "C"{
void XGSyncInit(int argc, char *argv[]) {
rabit::Init(argc, argv);
if (rabit::GetWorldSize() != 1) {
std::string pname = rabit::GetProcessorName();
utils::Printf("distributed job start %s:%d\n", pname.c_str(), rabit::GetRank());
}
}
void XGSyncFinalize(void) {
rabit::Finalize();
}
int XGSyncGetRank(void) {
int rank = rabit::GetRank();
return rank;
}
int XGSyncGetWorldSize(void) {
return rabit::GetWorldSize();
}
void* XGDMatrixCreateFromFile(const char *fname, int silent) {
return LoadDataMatrix(fname, silent != 0, false);
}