From 5f08313cb232a1a4c85b0d465cb28d1aceca473e Mon Sep 17 00:00:00 2001 From: tqchen Date: Sun, 23 Nov 2014 14:03:59 -0800 Subject: [PATCH] make wrapper ok --- Makefile | 4 +-- demo/binary_classification/runexp.sh | 8 +++--- demo/guide-python/runall.sh | 4 +-- multi-node/README.md | 23 +++++++++------- multi-node/col-split/README.md | 7 ++++- multi-node/col-split/mushroom-col-python.sh | 22 ++++++++++++++++ multi-node/col-split/mushroom-col.py | 29 +++++++++++++++++++++ multi-node/submit_job_tcp.py | 4 +++ src/io/io.cpp | 5 ++++ src/io/simple_dmatrix-inl.hpp | 11 ++++++-- src/sync/sync_tcp.cpp | 4 ++- test/Makefile | 2 +- wrapper/xgboost.py | 20 +++++++++++++- wrapper/xgboost_wrapper.cpp | 17 ++++++++++++ wrapper/xgboost_wrapper.h | 24 ++++++++++++++++- 15 files changed, 160 insertions(+), 24 deletions(-) create mode 100755 multi-node/col-split/mushroom-col-python.sh create mode 100644 multi-node/col-split/mushroom-col.py diff --git a/Makefile b/Makefile index 172a7607b..f11c20e21 100644 --- a/Makefile +++ b/Makefile @@ -32,8 +32,8 @@ sync_tcp.o: src/sync/sync_tcp.cpp sync_empty.o: src/sync/sync_empty.cpp main.o: src/xgboost_main.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h xgboost-mpi: updater.o gbm.o io.o main.o sync_mpi.o -xgboost: updater.o gbm.o io.o main.o sync_empty.o -wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h updater.o gbm.o io.o sync_empty.o +xgboost: updater.o gbm.o io.o main.o sync_tcp.o +wrapper/libxgboostwrapper.so: wrapper/xgboost_wrapper.cpp src/utils/*.h src/*.h src/learner/*.hpp src/learner/*.h updater.o gbm.o io.o sync_tcp.o $(BIN) : $(CXX) $(CFLAGS) $(LDFLAGS) -o $@ $(filter %.cpp %.o %.c, $^) diff --git a/demo/binary_classification/runexp.sh b/demo/binary_classification/runexp.sh index c1f191e61..68c3e6fb9 100755 --- a/demo/binary_classification/runexp.sh +++ b/demo/binary_classification/runexp.sh @@ -4,12 +4,12 @@ python mapfeat.py # split train and test python mknfold.py agaricus.txt 1 # training and output the models -mpirun ../../xgboost mushroom.conf +../../xgboost mushroom.conf # output prediction task=pred -mpirun ../../xgboost mushroom.conf task=pred model_in=0002.model +../../xgboost mushroom.conf task=pred model_in=0002.model # print the boosters of 00002.model in dump.raw.txt -mpirun ../../xgboost mushroom.conf task=dump model_in=0002.model name_dump=dump.raw.txt +../../xgboost mushroom.conf task=dump model_in=0002.model name_dump=dump.raw.txt # use the feature map in printing for better visualization -mpirun ../../xgboost mushroom.conf task=dump model_in=0002.model fmap=featmap.txt name_dump=dump.nice.txt +../../xgboost mushroom.conf task=dump model_in=0002.model fmap=featmap.txt name_dump=dump.nice.txt cat dump.nice.txt diff --git a/demo/guide-python/runall.sh b/demo/guide-python/runall.sh index 5317186d5..8f4f9832a 100755 --- a/demo/guide-python/runall.sh +++ b/demo/guide-python/runall.sh @@ -4,5 +4,5 @@ python custom_objective.py python boost_from_prediction.py python generalized_linear_model.py python cross_validation.py -python predict_leaf_index.py -rm -rf *~ *.model *.buffer \ No newline at end of file +python predict_leaf_indices.py +rm -rf *~ *.model *.buffer diff --git a/multi-node/README.md b/multi-node/README.md index 6f1008514..d1e641848 100644 --- a/multi-node/README.md +++ b/multi-node/README.md @@ -4,17 +4,21 @@ This folder contains information about experimental version of distributed xgboo Build ===== -* You will need to have MPI * In the root folder, run ```make mpi```, this will give you xgboost-mpi + - You will need to have MPI to build xgboost-mpi +* Alternatively, you can run ```make```, this will give you xgboost, which uses a beta buildin allreduce + - You do not need MPI to build this, you can modify [submit_job_tcp.py](submit_job_tcp.py) to use any job scheduler you like to submit the job Design Choice ===== -* Does distributed xgboost reply on MPI? - - Yes, but the dependency is isolated in [sync module](../src/sync/sync.h) +* Does distributed xgboost must reply on MPI library? + - No, XGBoost replies on MPI protocol that provide Broadcast and AllReduce, + - The dependency is isolated in [sync module](../src/sync/sync.h) - All other parts of code uses interface defined in sync.h - - sync_mpi.cpp is a implementation of sync interface using standard MPI library - - Specificially, xgboost reply on MPI protocol that provide Broadcast and AllReduce, - if there are platform/framework that implements these protocol, xgboost should naturally extends to these platform + - [sync_mpi.cpp](../src/sync/sync_mpi.cpp) is a implementation of sync interface using standard MPI library, to use xgboost-mpi, you need an MPI library + - If there are platform/framework that implements these protocol, xgboost should naturally extends to these platform + - As an example, [sync_tcp.cpp](../src/sync/sync_tcp.cpp) is an implementation of interface using TCP, and is linked with xgboost by default + * How is the data distributed? - There are two solvers in distributed xgboost - Column-based solver split data by column, each node work on subset of columns, @@ -26,10 +30,11 @@ Design Choice Usage ==== -* The current code run in MPI enviroment, you will need to have a network filesystem, - or copy data to local file system before running the code +* You will need a network filesystem, or copy data to local file system before running the code +* xgboost-mpi run in MPI enviroment, +* xgboost can be used together with [submit_job_tcp.py](submit_job_tcp.py) on other types of job schedulers * ***Note*** The distributed version is still multi-threading optimized. - You should run one xgboost-mpi per node that takes most available CPU, + You should run one process per node that takes most available CPU, this will reduce the communication overhead and improve the performance. - One way to do that is limit mpi slot in each machine to be 1, or reserve nthread processors for each process. * Examples: diff --git a/multi-node/col-split/README.md b/multi-node/col-split/README.md index bdafb2e32..9a9b8dd24 100644 --- a/multi-node/col-split/README.md +++ b/multi-node/col-split/README.md @@ -1,6 +1,11 @@ Distributed XGBoost: Column Split Version ==== -* run ```bash mushroom-row.sh ``` +* run ```bash mushroom-col.sh ``` +* run ```bash mushroom-col-tcp.sh ``` + - mushroom-col-tcp.sh starts xgboost job using xgboost's buildin allreduce +* run ```bash mushroom-col-python.sh ``` + - mushroom-col-python.sh starts xgboost python job using xgboost's buildin all reduce + - see mushroom-col.py How to Use ==== diff --git a/multi-node/col-split/mushroom-col-python.sh b/multi-node/col-split/mushroom-col-python.sh new file mode 100755 index 000000000..45008a1b4 --- /dev/null +++ b/multi-node/col-split/mushroom-col-python.sh @@ -0,0 +1,22 @@ +#!/bin/bash +if [[ $# -ne 1 ]] +then + echo "Usage: nprocess" + exit -1 +fi + +# +# This script is same as mushroom-col except that we will be using xgboost python module +# +# xgboost used built in tcp-based allreduce module, and can be run on more enviroment, so long as we know how to start job by modifying ../submit_job_tcp.py +# +rm -rf train.col* *.model +k=$1 + +# split the lib svm file into k subfiles +python splitsvm.py ../../demo/data/agaricus.txt.train train $k + +# run xgboost mpi +../submit_job_tcp.py $k python mushroom-col.py + +cat dump.nice.$k.txt diff --git a/multi-node/col-split/mushroom-col.py b/multi-node/col-split/mushroom-col.py new file mode 100644 index 000000000..3e24f5f2c --- /dev/null +++ b/multi-node/col-split/mushroom-col.py @@ -0,0 +1,29 @@ +import os +import sys +sys.path.append(os.path.dirname(__file__)+'/../wrapper') +import xgboost as xgb +# this is example script of running distributed xgboost using python + +# call this additional function to intialize the xgboost sync module +# in distributed mode +xgb.sync_init(sys.argv) +rank = xgb.sync_get_rank() +# read in dataset +dtrain = xgb.DMatrix('train.col%d' % rank) +param = {'max_depth':3, 'eta':1, 'silent':1, 'objective':'binary:logistic' } +param['dsplit'] = 'col' +nround = 3 + +if rank == 0: + dtest = xgb.DMatrix('../../demo/data/agaricus.txt.test') + model = xgb.train(param, dtrain, nround, [(dtrain, 'train') , (dtest, 'test')]) +else: + # if it is a slave node, do not run evaluation + model = xgb.train(param, dtrain, nround) + +if rank == 0: + model.save_model('%04d.model' % nround) + # dump model with feature map + model.dump_model('dump.nice.%d.txt' % xgb.sync_get_world_size(),'../../demo/data/featmap.txt') +# shutdown the synchronization module +xgb.sync_finalize() diff --git a/multi-node/submit_job_tcp.py b/multi-node/submit_job_tcp.py index 069f5d577..aa415d07a 100755 --- a/multi-node/submit_job_tcp.py +++ b/multi-node/submit_job_tcp.py @@ -11,6 +11,10 @@ import subprocess sys.path.append(os.path.dirname(__file__)+'/../src/sync/') import tcp_master as master +# +# Note: this submit script is only used for example purpose +# It does not have to be mpirun, it can be any job submission script that starts the job, qsub, hadoop streaming etc. +# def mpi_submit(nslave, args): """ customized submit script, that submit nslave jobs, each must contain args as parameter diff --git a/src/io/io.cpp b/src/io/io.cpp index 8a4579ab8..0f9611e67 100644 --- a/src/io/io.cpp +++ b/src/io/io.cpp @@ -13,6 +13,11 @@ namespace xgboost { namespace io { DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer) { + if (!strcmp(fname, "stdin")) { + DMatrixSimple *dmat = new DMatrixSimple(); + dmat->LoadText(fname, silent); + return dmat; + } std::string tmp_fname; const char *fname_ext = NULL; if (strchr(fname, ';') != NULL) { diff --git a/src/io/simple_dmatrix-inl.hpp b/src/io/simple_dmatrix-inl.hpp index 9165a5832..f3cf6425e 100644 --- a/src/io/simple_dmatrix-inl.hpp +++ b/src/io/simple_dmatrix-inl.hpp @@ -84,7 +84,12 @@ class DMatrixSimple : public DataMatrix { inline void LoadText(const char* fname, bool silent = false) { using namespace std; this->Clear(); - FILE* file = utils::FopenCheck(fname, "r"); + FILE* file; + if (!strcmp(fname, "stdin")) { + file = stdin; + } else { + file = utils::FopenCheck(fname, "r"); + } float label; bool init = true; char tmp[1024]; std::vector feats; @@ -112,7 +117,9 @@ class DMatrixSimple : public DataMatrix { static_cast(info.num_col()), static_cast(row_data_.size()), fname); } - fclose(file); + if (file != stdin) { + fclose(file); + } // try to load in additional file std::string name = fname; std::string gname = name + ".group"; diff --git a/src/sync/sync_tcp.cpp b/src/sync/sync_tcp.cpp index b21451d1c..330b5318d 100644 --- a/src/sync/sync_tcp.cpp +++ b/src/sync/sync_tcp.cpp @@ -352,7 +352,7 @@ class SyncManager { buffer_.resize(std::min(reduce_buffer_size, n)); // make sure align to type_nbytes buffer_size = buffer_.size() * sizeof(uint64_t) / type_nbytes * type_nbytes; - utils::Assert(type_nbytes < buffer_size, "too large type_nbytes=%lu, buffer_size", type_nbytes, buffer_size); + utils::Assert(type_nbytes <= buffer_size, "too large type_nbytes=%lu, buffer_size=%lu", type_nbytes, buffer_size); // set buffer head buffer_head = reinterpret_cast(BeginPtr(buffer_)); } @@ -487,6 +487,8 @@ void AllReduce(uint32_t *sendrecvbuf, int count, ReduceOp op) { typedef uint32_t DType; switch(op) { case kBitwiseOR: manager.AllReduce(sendrecvbuf, sizeof(DType), count, ReduceBitOR); return; + case kSum: manager.AllReduce(sendrecvbuf, sizeof(DType), count, ReduceSum); return; + case kMax: manager.AllReduce(sendrecvbuf, sizeof(DType), count, ReduceMax); return; default: utils::Error("reduce op not supported"); } } diff --git a/test/Makefile b/test/Makefile index 571d1189f..a702d073f 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,5 +1,5 @@ export CC = gcc -export CXX = clang++ +export CXX = g++ export MPICXX = mpicxx export LDFLAGS= -pthread -lm export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../src diff --git a/wrapper/xgboost.py b/wrapper/xgboost.py index 08aacb90e..d351928dc 100644 --- a/wrapper/xgboost.py +++ b/wrapper/xgboost.py @@ -33,7 +33,10 @@ xglib.XGBoosterCreate.restype = ctypes.c_void_p xglib.XGBoosterPredict.restype = ctypes.POINTER(ctypes.c_float) xglib.XGBoosterEvalOneIter.restype = ctypes.c_char_p xglib.XGBoosterDumpModel.restype = ctypes.POINTER(ctypes.c_char_p) - +# sync function +xglib.XGSyncGetRank.restype = ctypes.c_int +xglib.XGSyncGetWorldSize.restype = ctypes.c_int +# initialize communication module def ctypes2numpy(cptr, length, dtype): """convert a ctypes pointer array to numpy array """ @@ -553,3 +556,18 @@ def cv(params, dtrain, num_boost_round = 10, nfold=3, metrics=[], \ sys.stderr.write(res+'\n') results.append(res) return results + +# synchronization module +def sync_init(args = sys.argv): + arr = (ctypes.c_char_p * len(args))() + arr[:] = args + xglib.XGSyncInit(len(args), arr) + +def sync_finalize(): + xglib.XGSyncFinalize() + +def sync_get_rank(): + return int(xglib.XGSyncGetRank()) + +def sync_get_world_size(): + return int(xglib.XGSyncGetWorldSize()) diff --git a/wrapper/xgboost_wrapper.cpp b/wrapper/xgboost_wrapper.cpp index d0efc4bd0..63fb310c6 100644 --- a/wrapper/xgboost_wrapper.cpp +++ b/wrapper/xgboost_wrapper.cpp @@ -80,6 +80,23 @@ class Booster: public learner::BoostLearner { using namespace xgboost::wrapper; extern "C"{ + void XGSyncInit(int argc, char *argv[]) { + sync::Init(argc, argv); + if (sync::IsDistributed()) { + std::string pname = xgboost::sync::GetProcessorName(); + utils::Printf("distributed job start %s:%d\n", pname.c_str(), xgboost::sync::GetRank()); + } + } + void XGSyncFinalize(void) { + sync::Finalize(); + } + int XGSyncGetRank(void) { + int rank = xgboost::sync::GetRank(); + return rank; + } + int XGSyncGetWorldSize(void) { + return sync::GetWorldSize(); + } void* XGDMatrixCreateFromFile(const char *fname, int silent) { return LoadDataMatrix(fname, silent != 0, false); } diff --git a/wrapper/xgboost_wrapper.h b/wrapper/xgboost_wrapper.h index 16d54f62b..c0379a35f 100644 --- a/wrapper/xgboost_wrapper.h +++ b/wrapper/xgboost_wrapper.h @@ -17,6 +17,28 @@ typedef unsigned long bst_ulong; #ifdef __cplusplus extern "C" { #endif + /*! + * \brief initialize sync module, this is needed if used in distributed model + * normally, argv need to contain master_uri and master_port + * if start using submit_job_tcp script, then pass args to this will do + * \param argc number of arguments + * \param argv the arguments to be passed in sync module + */ + XGB_DLL void XGSyncInit(int argc, char *argv[]); + /*! + * \brief finalize sync module, call this when everything is done + */ + XGB_DLL void XGSyncFinalize(void); + /*! + * \brief get the rank + * \return return the rank of + */ + XGB_DLL int XGSyncGetRank(void); + /*! + * \brief get the world size from sync + * \return return the number of distributed job ran in the group + */ + XGB_DLL int XGSyncGetWorldSize(void); /*! * \brief load a data matrix * \return a loaded data matrix @@ -41,7 +63,7 @@ extern "C" { * \param col_ptr pointer to col headers * \param indices findex * \param data fvalue - * \param nindptr number of rows in the matix + 1 + * \param nindptr number of rows in the matix + 1 * \param nelem number of nonzero elements in the matrix * \return created dmatrix */