diff --git a/multi-node/README.md b/multi-node/README.md index 02d6fc820..1c388d8bc 100644 --- a/multi-node/README.md +++ b/multi-node/README.md @@ -1,38 +1,36 @@ Distributed XGBoost ====== -This folder contains information about experimental version of distributed xgboost. +This folder contains information of Distributed XGBoost. -Build -===== -* In the root folder, run ```make```, this will give you xgboost, which uses rabit allreduce - - this version of xgboost should be fault tolerant eventually -* Alterniatively, run ```make mpi```, this will give you xgboost-mpi - - You will need to have MPI to build xgboost-mpi - -Design Choice -===== -* XGBoost replies on [Rabit Library](https://github.com/tqchen/rabit) -* Rabit is an fault tolerant and portable allreduce library that provides Allreduce and Broadcast -* Since rabit is compatible with MPI, xgboost can be compiled using MPI backend - -* How is the data distributed? - - There are two solvers in distributed xgboost - - Column-based solver split data by column, each node work on subset of columns, - it uses exactly the same algorithm as single node version. - - Row-based solver split data by row, each node work on subset of rows, - it uses an approximate histogram count algorithm, and will only examine subset of - potential split points as opposed to all split points. - - Hadoop version can run on the existing hadoop platform, - it use Rabit to submit jobs as map-reduce tasks. +* The distributed version is built on Rabit:[Reliable Allreduce and Broadcast Library](https://github.com/tqchen/rabit) + - Rabit is a portable library that provides fault-tolerance for Allreduce calls for distributed machine learning + - This makes xgboost portable and fault-tolerant against node failures +* You can run Distributed XGBoost on common platforms that rabit can port to, + including Hadoop(see [hadoop folder](hadoop)) and MPI Usage +===== +* In the root folder, run ```./build.sh```, this will give you xgboost, which uses rabit allreduce + +Notes ==== -* You will need a network filesystem, or copy data to local file system before running the code -* xgboost can be used together with submission script provided in Rabit on different possible types of job scheduler -* ***Note*** The distributed version is still multi-threading optimized. - You should run one process per node that takes most available CPU, - this will reduce the communication overhead and improve the performance. - - One way to do that is limit mpi slot in each machine to be 1, or reserve nthread processors for each process. -* Examples: - - [Column-based version](col-split) - - [Row-based version](row-split) +* Rabit handles all the fault tolerant and communications efficiently, we only use platform specific command to start programs + - The Hadoop version does not rely on Mapreduce to do iterations + - You can expect xgboost not suffering the drawbacks of iterative MapReduce program +* The design choice was made because Allreduce is very natural and efficient for distributed tree building + - In current version of xgboost, the distributed version is only adds several lines of Allreduce synchronization code +* The multi-threading nature of xgboost is inheritated in distributed mode + - This means xgboost efficiently use all the threads in one machine, and communicates only between machines + - Remember to run on xgboost process per machine and this will give you maximum speedup +* For more information about rabit and how it works, see the [tutorial](https://github.com/tqchen/rabit/tree/master/guide) + +Solvers +===== +There are two solvers in distributed xgboost. You can check for local demo of the two solvers, see [row-split](row-split) and [col-split](col-split) + * Column-based solver split data by column, each node work on subset of columns, + it uses exactly the same algorithm as single node version. + * Row-based solver split data by row, each node work on subset of rows, + it uses an approximate histogram count algorithm, and will only examine subset of + potential split points as opposed to all split points. + - This is the mode used by current hadoop version, since usually data was stored by rows in many industry system + diff --git a/multi-node/col-split/README.md b/multi-node/col-split/README.md index 4f0d07b27..3ea0799fe 100644 --- a/multi-node/col-split/README.md +++ b/multi-node/col-split/README.md @@ -4,8 +4,6 @@ Distributed XGBoost: Column Split Version - mushroom-col-rabit.sh starts xgboost job using rabit's allreduce * run ```bash mushroom-col-rabit-mock.sh ``` - mushroom-col-rabit-mock.sh starts xgboost job using rabit's allreduce, inserts suicide signal at certain point and test recovery -* run ```bash mushroom-col-mpi.sh ``` - - mushroom-col.sh starts xgboost-mpi job How to Use ==== diff --git a/multi-node/col-split/mushroom-col-mpi.sh b/multi-node/col-split/mushroom-col-mpi.sh deleted file mode 100755 index 4d7de9892..000000000 --- a/multi-node/col-split/mushroom-col-mpi.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash -if [[ $# -ne 1 ]] -then - echo "Usage: nprocess" - exit -1 -fi - -rm -rf train.col* *.model -k=$1 - -# split the lib svm file into k subfiles -python splitsvm.py ../../demo/data/agaricus.txt.train train $k - -# run xgboost mpi -mpirun -n $k ../../xgboost-mpi mushroom-col.conf dsplit=col - -# the model can be directly loaded by single machine xgboost solver, as usuall -../../xgboost mushroom-col.conf task=dump model_in=0002.model fmap=../../demo/data/featmap.txt name_dump=dump.nice.$k.txt - -# run for one round, and continue training -mpirun -n $k ../../xgboost-mpi mushroom-col.conf dsplit=col num_round=1 -mpirun -n $k ../../xgboost-mpi mushroom-col.conf dsplit=col model_in=0001.model - -cat dump.nice.$k.txt \ No newline at end of file diff --git a/multi-node/col-split/mushroom-col-python.sh b/multi-node/col-split/mushroom-col-python.sh deleted file mode 100755 index 8551ee465..000000000 --- a/multi-node/col-split/mushroom-col-python.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -if [[ $# -ne 1 ]] -then - echo "Usage: nprocess" - exit -1 -fi - -# -# This script is same as mushroom-col except that we will be using xgboost python module -# -# xgboost used built in tcp-based allreduce module, and can be run on more enviroment, so long as we know how to start job by modifying ../submit_job_tcp.py -# -rm -rf train.col* *.model -k=$1 - -# split the lib svm file into k subfiles -python splitsvm.py ../../demo/data/agaricus.txt.train train $k - -# run xgboost mpi -../../rabit/tracker/rabit_mpi.py $k local python mushroom-col.py - -cat dump.nice.$k.txt diff --git a/multi-node/col-split/mushroom-col.py b/multi-node/col-split/mushroom-col.py deleted file mode 100644 index a905aff5c..000000000 --- a/multi-node/col-split/mushroom-col.py +++ /dev/null @@ -1,33 +0,0 @@ -import os -import sys -path = os.path.dirname(__file__) -if path == '': - path = '.' -sys.path.append(path+'/../../wrapper') - -import xgboost as xgb -# this is example script of running distributed xgboost using python - -# call this additional function to intialize the xgboost sync module -# in distributed mode -xgb.sync_init(sys.argv) -rank = xgb.sync_get_rank() -# read in dataset -dtrain = xgb.DMatrix('train.col%d' % rank) -param = {'max_depth':3, 'eta':1, 'silent':1, 'objective':'binary:logistic' } -param['dsplit'] = 'col' -nround = 3 - -if rank == 0: - dtest = xgb.DMatrix('../../demo/data/agaricus.txt.test') - model = xgb.train(param, dtrain, nround, [(dtrain, 'train') , (dtest, 'test')]) -else: - # if it is a slave node, do not run evaluation - model = xgb.train(param, dtrain, nround) - -if rank == 0: - model.save_model('%04d.model' % nround) - # dump model with feature map - model.dump_model('dump.nice.%d.txt' % xgb.sync_get_world_size(),'../../demo/data/featmap.txt') -# shutdown the synchronization module -xgb.sync_finalize() diff --git a/multi-node/row-split/README.md b/multi-node/row-split/README.md index 46656644d..30e2528d3 100644 --- a/multi-node/row-split/README.md +++ b/multi-node/row-split/README.md @@ -1,10 +1,8 @@ Distributed XGBoost: Row Split Version ==== +* You might be interested to checkout the [Hadoop example](../hadoop) * Machine Rabit: run ```bash machine-row-rabit.sh ``` - machine-col-rabit.sh starts xgboost job using rabit -* Mushroom: run ```bash mushroom-row-mpi.sh ``` -* Machine: run ```bash machine-row-mpi.sh ``` - - Machine case also include example to continue training from existing model How to Use ==== diff --git a/multi-node/row-split/machine-row-map.sh b/multi-node/row-split/machine-row-map.sh deleted file mode 100755 index a1c5bfe0c..000000000 --- a/multi-node/row-split/machine-row-map.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -if [[ $# -ne 1 ]] -then - echo "Usage: nprocess" - exit -1 -fi - -rm -rf train-machine.row* *.model -k=$1 -# make machine data -cd ../../demo/regression/ -python mapfeat.py -python mknfold.py machine.txt 1 -cd - - -# split the lib svm file into k subfiles -python splitrows.py ../../demo/regression/machine.txt.train train-machine $k - -# run xgboost mpi, take data from stdin -../submit_job_tcp.py $k "bash map.sh train-machine.row ../../xgboost machine-row.conf dsplit=row num_round=3 data=stdin" diff --git a/multi-node/row-split/machine-row-mpi.sh b/multi-node/row-split/machine-row-mpi.sh deleted file mode 100755 index fdb1f1d6b..000000000 --- a/multi-node/row-split/machine-row-mpi.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash -if [[ $# -ne 1 ]] -then - echo "Usage: nprocess" - exit -1 -fi - -rm -rf train-machine.row* *.model -k=$1 -# make machine data -cd ../../demo/regression/ -python mapfeat.py -python mknfold.py machine.txt 1 -cd - - -# split the lib svm file into k subfiles -python splitrows.py ../../demo/regression/machine.txt.train train-machine $k - -# run xgboost mpi -mpirun -n $k ../../xgboost-mpi machine-row.conf dsplit=row num_round=3 - -# run xgboost-mpi save model 0001, continue to run from existing model -mpirun -n $k ../../xgboost-mpi machine-row.conf dsplit=row num_round=1 -mpirun -n $k ../../xgboost-mpi machine-row.conf dsplit=row num_round=2 model_in=0001.model diff --git a/multi-node/row-split/map.sh b/multi-node/row-split/map.sh deleted file mode 100644 index 624192121..000000000 --- a/multi-node/row-split/map.sh +++ /dev/null @@ -1,3 +0,0 @@ -# a simple script to simulate mapreduce mapper -echo "cat $1$OMPI_COMM_WORLD_RANK | ${@:2}" -cat $1$OMPI_COMM_WORLD_RANK | ${@:2} diff --git a/multi-node/row-split/mushroom-row-mpi.sh b/multi-node/row-split/mushroom-row-mpi.sh deleted file mode 100755 index eb65799b6..000000000 --- a/multi-node/row-split/mushroom-row-mpi.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -if [[ $# -ne 1 ]] -then - echo "Usage: nprocess" - exit -1 -fi - -rm -rf train.row* *.model -k=$1 - -# split the lib svm file into k subfiles -python splitrows.py ../../demo/data/agaricus.txt.train train $k - -# run xgboost mpi -mpirun -n $k ../../xgboost-mpi mushroom-row.conf dsplit=row nthread=1 - -# the model can be directly loaded by single machine xgboost solver, as usuall -../../xgboost mushroom-row.conf task=dump model_in=0002.model fmap=../../demo/data/featmap.txt name_dump=dump.nice.$k.txt -cat dump.nice.$k.txt diff --git a/multi-node/row-split/mushroom-row.conf b/multi-node/row-split/mushroom-row.conf deleted file mode 100644 index 4cc2e8b11..000000000 --- a/multi-node/row-split/mushroom-row.conf +++ /dev/null @@ -1,35 +0,0 @@ -# General Parameters, see comment for each definition -# choose the booster, can be gbtree or gblinear -booster = gbtree -# choose logistic regression loss function for binary classification -objective = binary:logistic - -# Tree Booster Parameters -# step size shrinkage -eta = 1.0 -# minimum loss reduction required to make a further partition -gamma = 1.0 -# minimum sum of instance weight(hessian) needed in a child -min_child_weight = 1 -# maximum depth of a tree -max_depth = 3 - -# Task Parameters -# the number of round to do boosting -num_round = 2 -# 0 means do not save any model except the final round model -save_period = 0 -use_buffer = 0 - -# The path of training data %d is the wildcard for the rank of the data -# The idea is each process take a feature matrix with subset of columns -# -data = "train.row%d" - -# The path of validation data, used to monitor training process, here [test] sets name of the validation set -eval[test] = "../../demo/data/agaricus.txt.test" -# evaluate on training data as well each round -eval_train = 1 - -# The path of test data, need to use full data of test, try not use it, or keep an subsampled version -test:data = "../../demo/data/agaricus.txt.test" diff --git a/wrapper/xgboost.py b/wrapper/xgboost.py index d351928dc..cf442a61f 100644 --- a/wrapper/xgboost.py +++ b/wrapper/xgboost.py @@ -33,10 +33,6 @@ xglib.XGBoosterCreate.restype = ctypes.c_void_p xglib.XGBoosterPredict.restype = ctypes.POINTER(ctypes.c_float) xglib.XGBoosterEvalOneIter.restype = ctypes.c_char_p xglib.XGBoosterDumpModel.restype = ctypes.POINTER(ctypes.c_char_p) -# sync function -xglib.XGSyncGetRank.restype = ctypes.c_int -xglib.XGSyncGetWorldSize.restype = ctypes.c_int -# initialize communication module def ctypes2numpy(cptr, length, dtype): """convert a ctypes pointer array to numpy array """ @@ -557,17 +553,3 @@ def cv(params, dtrain, num_boost_round = 10, nfold=3, metrics=[], \ results.append(res) return results -# synchronization module -def sync_init(args = sys.argv): - arr = (ctypes.c_char_p * len(args))() - arr[:] = args - xglib.XGSyncInit(len(args), arr) - -def sync_finalize(): - xglib.XGSyncFinalize() - -def sync_get_rank(): - return int(xglib.XGSyncGetRank()) - -def sync_get_world_size(): - return int(xglib.XGSyncGetWorldSize()) diff --git a/wrapper/xgboost_wrapper.cpp b/wrapper/xgboost_wrapper.cpp index 700356ade..432ae0bf2 100644 --- a/wrapper/xgboost_wrapper.cpp +++ b/wrapper/xgboost_wrapper.cpp @@ -82,23 +82,6 @@ class Booster: public learner::BoostLearner { using namespace xgboost::wrapper; extern "C"{ - void XGSyncInit(int argc, char *argv[]) { - rabit::Init(argc, argv); - if (rabit::GetWorldSize() != 1) { - std::string pname = rabit::GetProcessorName(); - utils::Printf("distributed job start %s:%d\n", pname.c_str(), rabit::GetRank()); - } - } - void XGSyncFinalize(void) { - rabit::Finalize(); - } - int XGSyncGetRank(void) { - int rank = rabit::GetRank(); - return rank; - } - int XGSyncGetWorldSize(void) { - return rabit::GetWorldSize(); - } void* XGDMatrixCreateFromFile(const char *fname, int silent) { return LoadDataMatrix(fname, silent != 0, false); }