From 1b4921977f164209a1a0fea07668d365510c499e Mon Sep 17 00:00:00 2001 From: tqchen Date: Sat, 3 Jan 2015 05:20:18 -0800 Subject: [PATCH] update doc --- Makefile | 8 +- guide/README.md | 112 +++++++++++++++++--- include/rabit.h | 27 ++++- include/rabit_serializable.h | 2 +- lib/README | 1 - lib/README.md | 15 +++ test/config.h | 196 ----------------------------------- test/mock.h | 121 --------------------- tracker/rabit_demo.py | 4 +- tracker/rabit_hadoop.py | 4 +- tracker/rabit_mpi.py | 8 +- 11 files changed, 149 insertions(+), 349 deletions(-) delete mode 100644 lib/README create mode 100644 lib/README.md delete mode 100644 test/config.h delete mode 100644 test/mock.h diff --git a/Makefile b/Makefile index 64cba30fa..27c2b1915 100644 --- a/Makefile +++ b/Makefile @@ -4,15 +4,17 @@ export MPICXX = mpicxx export LDFLAGS= export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -Iinclude -BPATH=lib +# build path +BPATH=. # objectives that makes up rabit library MPIOBJ= $(BPATH)/engine_mpi.o OBJ= $(BPATH)/allreduce_base.o $(BPATH)/allreduce_robust.o $(BPATH)/engine.o $(BPATH)/engine_empty.o $(BPATH)/engine_mock.o ALIB= lib/librabit.a lib/librabit_mpi.a lib/librabit_empty.a lib/librabit_mock.a HEADERS=src/*.h include/*.h include/rabit/*.h -.PHONY: clean all +.PHONY: clean all install mpi -all: $(ALIB) +all: lib/librabit.a lib/librabit_mock.a +mpi: lib/librabit_mpi.a $(BPATH)/allreduce_base.o: src/allreduce_base.cc $(HEADERS) $(BPATH)/engine.o: src/engine.cc $(HEADERS) diff --git a/guide/README.md b/guide/README.md index c019440eb..2ea6dccc8 100644 --- a/guide/README.md +++ b/guide/README.md @@ -10,10 +10,12 @@ Please also refer to the [API Documentation](http://homes.cs.washington.edu/~tqc * [What is Allreduce](#what-is-allreduce) * [Common Use Case](#common-use-case) * [Structure of Rabit Program](#structure-of-rabit-program) -* [Fault Tolerance](#fault-tolerance) +* [Compile Programs with Rabit](#compile-programs-with-rabit) * [Running Rabit Jobs](#running-rabit-jobs) - [Running Rabit on Hadoop](#running-rabit-on-hadoop) - [Running Rabit using MPI](#running-rabit-using-mpi) + - [Customize Tracker Script](#customize-tracker-script) +* [Fault Tolerance](#fault-tolerance) What is Allreduce ===== @@ -137,7 +139,101 @@ to Allreduce or Broadcasts and update the model to a new one. The calling sequen * When a node goes down, the rest of the node will block in the call of Allreduce/Broadcast and helps the recovery of the failure nodes, util it catches up. -Please also see the next section for introduction of fault tolerance procedure in rabit. +Please also see the section of [fault tolerance procedure](#fault-tolerance) in rabit to understand the recovery procedure under going in rabit + +Compile Programs with Rabit +==== +Rabit is a portable library, to use it, you only need to include the rabit header file. +* You will need to add path to [../include](../include) to the header search path of compiler + - Solution 1: add ```-I/path/to/rabit/include``` to the compiler flag in gcc or clang + - Solution 2: add the path to enviroment variable CPLUS_INCLUDE_PATH +* You will need to add path to [../lib](../lib) to the library search path of compiler + - Solution 1: add ```-L/path/to/rabit/lib``` to the linker flag + - Solution 2: add the path to enviroment variable LIBRARY_PATH AND LD_LIBRARY_PATH +* Link against lib/rabit.a + - Add ```-lrabit``` to linker flag + +The procedure above allows you to compile a program with rabit. The following two sections are additional +advanced options you can take to link against different backend other than the normal one. + +#### Link against MPI Allreduce +You can link against ```rabit_mpi.a``` instead to use MPI Allreduce, however, the resulting program is backed by MPI and +is not fault tolerant anymore. +* Simply change linker flag from ```-lrabit``` to ```-lrabit_mpi``` +* The final linking needs to be done by mpi wrapper compiler ```mpicxx``` + +#### Link against Mock Test Rabit Library +If you want to mock test the program to see the behavior of the code when some nodes goes down. You can link against ```rabit_mock.a``` . +* Simply change linker flag from ```-lrabit``` to ```-lrabit_mock``` + +The resulting rabit program can take in additional arguments in format of +``` +mock=rank,version,seq,ndeath +``` + +The four integers specifies an event that will cause the program to suicide(exit with -2) +* rank specifies the rank of the node +* version specifies the current version(iteration) of the model +* seq specifies the sequence number of Allreduce/Broadcast call since last checkpoint +* ndeath specifies how many times this node died already + +For example, consider the following script in the test case +```bash +../tracker/rabit_demo.py -n 10 test_model_recover 10000\ + mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 +``` +* The first mock will cause node 0 to exit when calling second Allreduce/Broadcast (seq = 1) in iteration 0 +* The second mock will cause node 1 to exit when calling second Allreduce/Broadcast (seq = 1) in iteration 1 +* The second mock will cause node 0 to exit again when calling second Allreduce/Broadcast (seq = 1) in iteration 1 + - Note that ndeath = 1 means this will happen only if node 0 died once, which is our case + +Running Rabit Jobs +==== +Rabit is a portable library that can run on multiple platforms. + +#### Running Rabit Locally +* You can use [../tracker/rabit_demo.py](../tracker/rabit_demo.py) to start n process locally +* This script will restart the program when it exits with -2, so it can be used for [mock test](#link-against-mock-test-library) + +#### Running Rabit on Hadoop +* You can use [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) to run rabit program on hadoop +* This will start n rabit program as mapper of MapReduce +* Each program can read its part of data from stdin +* Yarn is highly recommended, since Yarn allows specifying ncpu and memory of each mapper + - This allows multi-threading programs in each node, which can be more efficient + - A good possible practice is OpenMP-rabit hybrid code + +#### Running Rabit on Yarn +* To Be modified from [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) + +#### Running Rabit using MPI +* You can submit rabit programs to MPI cluster using [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py). +* If you linked your code against librabit_mpi.a, then you can directly use mpirun to submit the job + +#### Customize Tracker Script +You can also modify the tracker script to allow rabit run on other platforms. To do so, refer to the existing +tracker script such as [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) and [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py) + +You will need to implement a platform dependent submission function with the following definition +```python +def fun_submit(nslave, slave_args): + """ + customized submit script, that submit nslave jobs, + each must contain args as parameter + note this can be a lambda closure + Parameters + nslave number of slave process to start up + worker_args tracker information which must be passed to the arguments + this usually includes the parameters of master_uri and port etc. + """ +``` +The submission function should start nslave process in the platform, and append slave_args to the end of other arguments. +Then we can simply call ```tracker.submit``` with fun_submit to submit jobs in the target platform + +Note that the current rabit tracker do not restart a worker when it dies, the job of fail-restart thus lies on the platform itself or we should write +fail-restart logic in the customization script. +* Fail-restart is usually provided by most platforms. +* For example, mapreduce will restart a mapper when it fails Fault Tolerance ===== @@ -166,15 +262,3 @@ touching the disk. This makes rabit program more reliable and efficient. This is an conceptual introduction to the fault tolerant model of rabit. The actual implementation is more sophiscated, and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase. -Running Rabit Jobs -==== -* To run demo locally, use [rabit_demo.py](../tracker/rabit_demo.py) -TODO - -Running Rabit on Hadoop -==== -TODO, use [rabit_hadoop.py](../tracker/rabit_hadoop.py) - -Running Rabit using MPI -==== -TODO, use [rabit_mpi.py](../tracker/rabit_mpi.py) or directly use mpirun if compiled with MPI backend. diff --git a/include/rabit.h b/include/rabit.h index 7940b8616..d3620d8d0 100644 --- a/include/rabit.h +++ b/include/rabit.h @@ -25,15 +25,29 @@ /*! \brief namespace of rabit */ namespace rabit { -/*! \brief namespace of operator */ +/*! + * \brief namespace of reduction operators + */ namespace op { -/*! \brief maximum value */ +/*! + * \class rabit::op::Max + * \brief maximum reduction operator + */ struct Max; -/*! \brief minimum value */ +/*! + * \class rabit::op::Min + * \brief minimum reduction operator + */ struct Min; -/*! \brief perform sum */ +/*! + * \class rabit::op::Sum + * \brief sum reduction operator + */ struct Sum; -/*! \brief perform bitwise OR */ +/*! + * \class rabit::op::BitOR + * \brief bitwise or reduction operator + */ struct BitOR; } // namespace op /*! @@ -75,6 +89,7 @@ inline void TrackerPrintf(const char *fmt, ...); #endif /*! * \brief broadcast an memory region to all others from root + * * Example: int a = 1; Broadcast(&a, sizeof(a), root); * \param sendrecv_data the pointer to send or recive buffer, * \param size the size of the data @@ -101,6 +116,7 @@ inline void Broadcast(std::string *sendrecv_data, int root); /*! * \brief perform in-place allreduce, on sendrecvbuf * this function is NOT thread-safe + * * Example Usage: the following code gives sum of the result * vector data(10); * ... @@ -125,6 +141,7 @@ inline void Allreduce(DType *sendrecvbuf, size_t count, /*! * \brief perform in-place allreduce, on sendrecvbuf * with a prepare function specified by lambda function + * * Example Usage: the following code gives sum of the result * vector data(10); * ... diff --git a/include/rabit_serializable.h b/include/rabit_serializable.h index bf90593c8..f6c1423c7 100644 --- a/include/rabit_serializable.h +++ b/include/rabit_serializable.h @@ -88,7 +88,7 @@ class IStream { } }; -/*! \brief interface of se*/ +/*! \brief interface of serializable objects */ class ISerializable { public: /*! \brief load the model from file */ diff --git a/lib/README b/lib/README deleted file mode 100644 index c734c76ab..000000000 --- a/lib/README +++ /dev/null @@ -1 +0,0 @@ -This folder holds the library file generated by the compiler \ No newline at end of file diff --git a/lib/README.md b/lib/README.md new file mode 100644 index 000000000..b6a5aa8b2 --- /dev/null +++ b/lib/README.md @@ -0,0 +1,15 @@ +Rabit Library +===== +This folder holds the library file generated by the compiler. To generate the library file, type ```make``` in the project root folder. If you want mpi compatible library, type ```make mpi``` + +***List of Files*** +* rabit.a The rabit package library + - Normally you need to link with this one +* rabit_mock.a The rabit package library with mock test + - This library allows additional mock-test +* rabit_mpi.a The MPI backed library + - Link against this library makes the program use MPI Allreduce + - This library is not fault-tolerant +* rabit_empty.a Dummy package implementation + - This is an empty library that does not provide anything + - Only introduced to minimize code dependency for projects that only need single machine code diff --git a/test/config.h b/test/config.h deleted file mode 100644 index 467e8f63e..000000000 --- a/test/config.h +++ /dev/null @@ -1,196 +0,0 @@ -#ifndef RABIT_UTILS_CONFIG_H_ -#define RABIT_UTILS_CONFIG_H_ -/*! - * \file config.h - * \brief helper class to load in configures from file - * \author Tianqi Chen - */ -#include -#include -#include -#include -#include -#include "./rabit/utils.h" - -namespace rabit { -namespace utils { -/*! - * \brief base implementation of config reader - */ -class ConfigReaderBase { - public: - /*! - * \brief get current name, called after Next returns true - * \return current parameter name - */ - inline const char *name(void) const { - return s_name; - } - /*! - * \brief get current value, called after Next returns true - * \return current parameter value - */ - inline const char *val(void) const { - return s_val; - } - /*! - * \brief move iterator to next position - * \return true if there is value in next position - */ - inline bool Next(void) { - while (!this->IsEnd()) { - GetNextToken(s_name); - if (s_name[0] == '=') return false; - if (GetNextToken( s_buf ) || s_buf[0] != '=') return false; - if (GetNextToken( s_val ) || s_val[0] == '=') return false; - return true; - } - return false; - } - // called before usage - inline void Init(void) { - ch_buf = this->GetChar(); - } - - protected: - /*! - * \brief to be implemented by subclass, - * get next token, return EOF if end of file - */ - virtual char GetChar(void) = 0; - /*! \brief to be implemented by child, check if end of stream */ - virtual bool IsEnd(void) = 0; - - private: - char ch_buf; - char s_name[100000], s_val[100000], s_buf[100000]; - - inline void SkipLine(void) { - do { - ch_buf = this->GetChar(); - } while (ch_buf != EOF && ch_buf != '\n' && ch_buf != '\r'); - } - - inline void ParseStr(char tok[]) { - int i = 0; - while ((ch_buf = this->GetChar()) != EOF) { - switch (ch_buf) { - case '\\': tok[i++] = this->GetChar(); break; - case '\"': tok[i++] = '\0'; return; - case '\r': - case '\n': Error("ConfigReader: unterminated string"); - default: tok[i++] = ch_buf; - } - } - Error("ConfigReader: unterminated string"); - } - inline void ParseStrML(char tok[]) { - int i = 0; - while ((ch_buf = this->GetChar()) != EOF) { - switch (ch_buf) { - case '\\': tok[i++] = this->GetChar(); break; - case '\'': tok[i++] = '\0'; return; - default: tok[i++] = ch_buf; - } - } - Error("unterminated string"); - } - // return newline - inline bool GetNextToken(char tok[]) { - int i = 0; - bool new_line = false; - while (ch_buf != EOF) { - switch (ch_buf) { - case '#' : SkipLine(); new_line = true; break; - case '\"': - if (i == 0) { - ParseStr(tok); ch_buf = this->GetChar(); return new_line; - } else { - Error("ConfigReader: token followed directly by string"); - } - case '\'': - if (i == 0) { - ParseStrML( tok ); ch_buf = this->GetChar(); return new_line; - } else { - Error("ConfigReader: token followed directly by string"); - } - case '=': - if (i == 0) { - ch_buf = this->GetChar(); - tok[0] = '='; - tok[1] = '\0'; - } else { - tok[i] = '\0'; - } - return new_line; - case '\r': - case '\n': - if (i == 0) new_line = true; - case '\t': - case ' ' : - ch_buf = this->GetChar(); - if (i > 0) { - tok[i] = '\0'; - return new_line; - } - break; - default: - tok[i++] = ch_buf; - ch_buf = this->GetChar(); - break; - } - } - return true; - } -}; -/*! - * \brief an iterator use stream base, allows use all types of istream - */ -class ConfigStreamReader: public ConfigReaderBase { - public: - /*! - * \brief constructor - * \param istream input stream - */ - explicit ConfigStreamReader(std::istream &fin) : fin(fin) {} - - protected: - virtual char GetChar(void) { - return fin.get(); - } - /*! \brief to be implemented by child, check if end of stream */ - virtual bool IsEnd(void) { - return fin.eof(); - } - - private: - std::istream &fin; -}; - -/*! - * \brief an iterator that iterates over a configure file and gets the configures - */ -class ConfigIterator: public ConfigStreamReader { - public: - /*! - * \brief constructor - * \param fname name of configure file - */ - explicit ConfigIterator(const char *fname) : ConfigStreamReader(fi) { - fi.open(fname); - if (fi.fail()) { - utils::Error("cannot open file %s", fname); - } - ConfigReaderBase::Init(); - } - /*! \brief destructor */ - ~ConfigIterator(void) { - fi.close(); - } - - private: - std::ifstream fi; -}; -} // namespace utils -} // namespace rabit -#endif // RABIT_UTILS_CONFIG_H_ diff --git a/test/mock.h b/test/mock.h deleted file mode 100644 index 17e5b75c9..000000000 --- a/test/mock.h +++ /dev/null @@ -1,121 +0,0 @@ -#ifndef RABIT_MOCK_H -#define RABIT_MOCK_H -/*! - * \file mock.h - * \brief This file defines a mock object to test the system - * \author Ignacio Cano - */ -#include "./rabit.h" -#include "./config.h" -#include -#include -#include - -struct MockException { -}; - -namespace rabit { -/*! \brief namespace of mock */ -namespace test { - -class Mock { - - -public: - - explicit Mock(const int& rank, char *config, char* round_dir) : rank(rank) { - Init(config, round_dir); - } - - template - inline void Allreduce(float *sendrecvbuf, size_t count) { - utils::Assert(verify(allReduce), "[%d] error when calling allReduce", rank); - rabit::Allreduce(sendrecvbuf, count); - } - -inline int LoadCheckPoint(ISerializable *global_model, - ISerializable *local_model) { - utils::Assert(verify(loadCheckpoint), "[%d] error when loading checkpoint", rank); - return rabit::LoadCheckPoint(global_model, local_model); - } - - inline void CheckPoint(const ISerializable *global_model, - const ISerializable *local_model) { - utils::Assert(verify(checkpoint), "[%d] error when checkpointing", rank); - rabit::CheckPoint(global_model, local_model); - } - - inline void Broadcast(std::string *sendrecv_data, int root) { - utils::Assert(verify(broadcast), "[%d] error when broadcasting", rank); - rabit::Broadcast(sendrecv_data, root); - - } - -private: - - inline void Init(char* config, char* round_dir) { - std::stringstream ss; - ss << round_dir << "node" << rank << ".round"; - const char* round_file = ss.str().c_str(); - std::ifstream ifs(round_file); - int current_round = 1; - if (!ifs.good()) { - // file does not exists, it's the first time, so save the current round to 1 - std::ofstream ofs(round_file); - ofs << current_round; - ofs.close(); - } else { - // file does exists, read the previous round, increment by one, and save it back - ifs >> current_round; - current_round++; - ifs.close(); - std::ofstream ofs(round_file); - ofs << current_round; - ofs.close(); - } - printf("[%d] in round %d\n", rank, current_round); - utils::ConfigIterator itr(config); - while (itr.Next()) { - char round[4], node_rank[4]; - sscanf(itr.name(), "%[^_]_%s", round, node_rank); - int i_node_rank = atoi(node_rank); - // if it's something for me - if (i_node_rank == rank) { - int i_round = atoi(round); - // in my current round - if (i_round == current_round) { - printf("[%d] round %d, value %s\n", rank, i_round, itr.val()); - if (strcmp("allreduce", itr.val())) record(allReduce); - else if (strcmp("broadcast", itr.val())) record(broadcast); - else if (strcmp("loadcheckpoint", itr.val())) record(loadCheckpoint); - else if (strcmp("checkpoint", itr.val())) record(checkpoint); - } - } - } - } - - inline void record(std::map& m) { - m[rank] = false; - } - - inline bool verify(std::map& m) { - bool result = true; - if (m.find(rank) != m.end()) { - result = m[rank]; - } - return result; - } - - int rank; - std::map allReduce; - std::map broadcast; - std::map loadCheckpoint; - std::map checkpoint; - - -}; - -} // namespace test -} // namespace rabit - -#endif // RABIT_MOCK_H diff --git a/tracker/rabit_demo.py b/tracker/rabit_demo.py index aeeb6e9a3..ba14554ab 100755 --- a/tracker/rabit_demo.py +++ b/tracker/rabit_demo.py @@ -37,7 +37,7 @@ def exec_cmd(cmd, taskid): # Note: this submit script is only used for demo purpose # submission script using pyhton multi-threading # -def mthread_submit(nslave, slave_args): +def mthread_submit(nslave, worker_args): """ customized submit script, that submit nslave jobs, each must contain args as parameter note this can be a lambda function containing additional parameters in input @@ -48,7 +48,7 @@ def mthread_submit(nslave, slave_args): """ procs = {} for i in range(nslave): - procs[i] = Thread(target = exec_cmd, args = (args.command + slave_args, i)) + procs[i] = Thread(target = exec_cmd, args = (args.command + worker_args, i)) procs[i].start() for i in range(nslave): procs[i].join() diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop.py index d25fae7c0..59866d55a 100755 --- a/tracker/rabit_hadoop.py +++ b/tracker/rabit_hadoop.py @@ -65,11 +65,11 @@ args = parser.parse_args() if args.jobname is None: args.jobname = ('Rabit(nworker=%d):' % args.nworker) + args.command[0].split('/')[-1]; -def hadoop_streaming(nworker, slave_args): +def hadoop_streaming(nworker, worker_args): cmd = '%s jar %s -D mapred.map.tasks=%d' % (args.hadoop_binary, args.hadoop_streaming_jar, nworker) cmd += ' -D mapred.job.name=%d' % (a) cmd += ' -input %s -output %s' % (args.input, args.output) - cmd += ' -mapper \"%s\" -reducer \"/bin/cat\" ' % (' '.join(args.command + slave_args)) + cmd += ' -mapper \"%s\" -reducer \"/bin/cat\" ' % (' '.join(args.command + worker_args)) fset = set() if args.auto_file_cache: for f in args.command: diff --git a/tracker/rabit_mpi.py b/tracker/rabit_mpi.py index 604ed3bf7..599a9a7c5 100755 --- a/tracker/rabit_mpi.py +++ b/tracker/rabit_mpi.py @@ -22,7 +22,7 @@ args = parser.parse_args() # # submission script using MPI # -def mpi_submit(nslave, slave_args): +def mpi_submit(nslave, worker_args): """ customized submit script, that submit nslave jobs, each must contain args as parameter note this can be a lambda function containing additional parameters in input @@ -31,11 +31,11 @@ def mpi_submit(nslave, slave_args): args arguments to launch each job this usually includes the parameters of master_uri and parameters passed into submit """ - sargs = ' '.join(args.command + slave_args) + sargs = ' '.join(args.command + worker_args) if args.hostfile is None: - cmd = ' '.join(['mpirun -n %d' % (nslave)] + args.command + slave_args) + cmd = ' '.join(['mpirun -n %d' % (nslave)] + args.command + worker_args) else: - ' '.join(['mpirun -n %d --hostfile %s' % (nslave, args.hostfile)] + args.command + slave_args) + ' '.join(['mpirun -n %d --hostfile %s' % (nslave, args.hostfile)] + args.command + worker_args) print cmd subprocess.check_call(cmd, shell = True)