diff --git a/.gitignore b/.gitignore index bf6d3e539..504802743 100644 --- a/.gitignore +++ b/.gitignore @@ -32,5 +32,5 @@ *.exe *.txt *tmp* -doc *.rabit +*.mock diff --git a/doc/.gitignore b/doc/.gitignore new file mode 100644 index 000000000..9036e38b3 --- /dev/null +++ b/doc/.gitignore @@ -0,0 +1,3 @@ +html +latex +*.sh diff --git a/doc/README.md b/doc/README.md new file mode 100644 index 000000000..f4891a456 --- /dev/null +++ b/doc/README.md @@ -0,0 +1,9 @@ +Rabit Documentation +==== +* [Tutorial](../guide) +* [API Documentation](http://homes.cs.washington.edu/~tqchen/rabit/doc) + - You can also run ```./mkdoc.sh``` to make the document locally +* [Parameters](#parameters) + +Parameters +==== diff --git a/guide/Makefile b/guide/Makefile index 1770a90dd..7213e1bf7 100644 --- a/guide/Makefile +++ b/guide/Makefile @@ -6,22 +6,21 @@ export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../include .PHONY: clean all lib libmpi BIN = basic.rabit broadcast.rabit -MOCKBIN= +MOCKBIN= lazy_allreduce.mock -all: $(BIN) $(MOCKBIN) +all: $(BIN) basic.rabit: basic.cc lib broadcast.rabit: broadcast.cc lib +lazy_allreduce.mock: lazy_allreduce.cc lib $(BIN) : $(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc, $^) $(LDFLAGS) -lrabit + $(MOCKBIN) : - $(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc, $^) $(LDFLAGS) -lrabit_mock + $(CXX) $(CFLAGS) -std=c++11 -o $@ $(filter %.cpp %.o %.c %.cc, $^) $(LDFLAGS) -lrabit_mock $(OBJ) : $(CXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c %.cc, $^) ) -$(MPIBIN) : - $(MPICXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc %.a, $^) $(LDFLAGS) -lrabit_mpi - clean: - $(RM) $(OBJ) $(BIN) $(MPIBIN) *~ ../src/*~ \ No newline at end of file + $(RM) $(OBJ) $(BIN) $(MOCKBIN) *~ ../src/*~ \ No newline at end of file diff --git a/guide/README.md b/guide/README.md index e19b04ce3..f900f5a23 100644 --- a/guide/README.md +++ b/guide/README.md @@ -5,11 +5,13 @@ To run the examples locally, you will need to build them with ```make```. Please also refer to the [API Documentation](http://homes.cs.washington.edu/~tqchen/rabit/doc) for further details. - **List of Topics** * [What is Allreduce](#what-is-allreduce) * [Common Use Case](#common-use-case) -* [Structure of a Rabit Program](#structure-of-rabit-program) +* [Use Rabit API](#use-rabit-api) + - [Structure of a Rabit Program](#structure-of-rabit-program) + - [Allreduce and Lazy Preparation](#allreduce-and-lazy-preparation) + - [Checkpoint and LazyCheckpoint](#checkpoint-and-lazycheckpoint) * [Compile Programs with Rabit](#compile-programs-with-rabit) * [Running Rabit Jobs](#running-rabit-jobs) - [Running Rabit on Hadoop](#running-rabit-on-hadoop) @@ -35,8 +37,12 @@ int main(int argc, char *argv[]) { rabit::GetRank(), a[0], a[1], a[2]); // allreduce take max of each elements in all processes Allreduce(&a[0], N); - printf("@node[%d] after-allreduce: a={%d, %d, %d}\n", + printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n", rabit::GetRank(), a[0], a[1], a[2]); + // second allreduce that sums everything up + Allreduce(&a[0], N); + printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); rabit::Finalize(); return 0; } @@ -62,6 +68,7 @@ If you are more familiar with python, you can also use rabit in python. The same ```python import numpy as np import rabit + rabit.init() n = 3 rank = rabit.get_rank() @@ -71,9 +78,15 @@ for i in xrange(n): print '@node[%d] before-allreduce: a=%s' % (rank, str(a)) a = rabit.allreduce(a, rabit.MAX) -print '@node[%d] after-allreduce: a=%s' % (rank, str(a)) +print '@node[%d] after-allreduce-max: a=%s' % (rank, str(a)) +a = rabit.allreduce(a, rabit.SUM) +print '@node[%d] after-allreduce-sum: a=%s' % (rank, str(a)) rabit.finalize() ``` +You can run the program using the following command +```bash +../tracker/rabit_demo.py -n 2 basic.py +``` Broadcast is another method provided by rabit besides Allreduce. This function allows one node to broadcast its local data to all other nodes. The following code in [broadcast.cc](broadcast.cc) broadcasts a string from @@ -130,10 +143,15 @@ common use cases include: Rabit is a reliable and portable library for distributed machine learning programs, that allow programs to run reliably on different platforms. -Structure of a Rabit Program -===== +Use Rabit API +==== +This section introduces topics about how to use rabit API. +You can always refer to [API Documentation](http://homes.cs.washington.edu/~tqchen/rabit/doc) for definition of each functions. +This section trys to gives examples of different aspectes of rabit API. + +#### Structure of a Rabit Program The following code illustrates the common structure of a rabit program. This is an abstract example, -you can also refer to [kmeans.cc](../toolkit/kmeans.cc) for an example implementation of kmeans algorithm. +you can also refer to [kmeans.cc](../rabit-learn/kmeans/kmeans.cc) for an example implementation of kmeans algorithm. ```c++ #include @@ -173,6 +191,97 @@ to Allreduce or Broadcast and update the model. The calling sequence in each ite Please see the [Fault Tolerance](#fault-tolerance) section to understand the recovery procedure executed by rabit. +#### Allreduce and Lazy Preparation +Allreduce is one of the most important function provided by rabit. You can call allreduce by specifying the +reduction operator, pointer to the data and size of the buffer, as follows +```c++ +Allreduce(pointer_of_data, size_of_data); +``` +This is the basic use case of Allreduce function. It is common that user writes the code to prepare the data needed +into the data buffer, pass the data to Allreduce function, and get the reduced result. However, when a node restarts +from failure, we can directly recover the result from other nodes(see also [Fault Tolerance](#fault-tolerance)) and +the data preparation procedure no longer necessary. Rabit Allreduce add an optional parameter preparation function +to support such scenario. User can pass in a function that corresponds to the data preparation procedure to Allreduce +calls, and the data preparation function will only be called when necessary. We use [lazy_allreduce.cc](lazy_allreduce.cc) +as an example to demonstrate this feature. It is modified from [basic.cc](basic.cc), and you can compare the two codes. +```c++ +#include +using namespace rabit; +const int N = 3; +int main(int argc, char *argv[]) { + int a[N] = {0}; + rabit::Init(argc, argv); + // lazy preparation function + auto prepare = [&]() { + printf("@node[%d] run prepare function\n", rabit::GetRank()); + for (int i = 0; i < N; ++i) { + a[i] = rabit::GetRank() + i; + } + }; + printf("@node[%d] before-allreduce: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); + // allreduce take max of each elements in all processes + Allreduce(&a[0], N, prepare); + printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); + // rum second allreduce + Allreduce(&a[0], N); + printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); + rabit::Finalize(); + return 0; +} +``` +Here we use features of C++11 because the lambda function makes things much shorter. +There is also C++ compatible callback interface provided in the [API](http://homes.cs.washington.edu/~tqchen/rabit/doc). +You can compile the program by typing ```make lazy_allreduce.mock```. We link against the mock library so that we can see +the effect when a process goes down. You can run the program using the following command +```bash +../tracker/rabit_demo.py -n 2 lazy_allreduce.mock mock=0,0,1,0 +``` +The additional arguments ```mock=0,0,1,0``` will cause node 0 to kill itself before second call of Allreduce (see also [mock test](#link-against-mock-test-rabit-library)). +You will find that the prepare function's print is only executed once and node 0 will no longer execute the preparation function when it restarts from failure. + +You can also find python version of the example in [lazy_allreduce.py](lazy_allreduce.py), and run it using the followin command +```bash +../tracker/rabit_demo.py -n 2 lazy_allreduce.py mock=0,0,1,0 + +``` + +Since lazy preparation function may not be called during execution. User should be careful when using this feature. For example, a possible mistake +could be putting some memory allocation code in the lazy preparation function, and the computing memory was not allocated when lazy preparation function is not called. +The example in [lazy_allreduce.cc](lazy_allreduce.cc) provides a simple way to migrate normal prepration code([basic.cc](basic.cc)) to lazy version: wrap the preparation +code with a lambda function, and pass it to allreduce. + +#### Checkpoint and LazyCheckpoint +Common machine learning algorithms usually involves iterative computation. As mentioned in the [Structure of Rabit Program](structure-of-a-rabit-program), +user can and should use Checkpoint to ```save``` the progress so far, so that when a node fails, the latest checkpointed model can be loaded. + +There are two model arguments you can pass to Checkpoint and LoadCheckpoint: ```global_model``` and ```local_model```: +* ```global_model``` refers to the model that is commonly shared across all the nodes + - For example, the centriods of clusters in kmeans is shared across all nodes +* ```local_model``` refers to the model that is specifically tied to the current node + - For example, in topic modeling, the topic assignments of subset of documents in current node is local model + +Because the different nature of the two types of models, different strategy will be used for them. +```global_model``` is simply saved in local memory of each node, while ```local_model``` will replicated to some other +nodes (selected using a ring replication strategy). The checkpoint is only saved in the memory without touching the disk which makes rabit programs more efficient. +User is encouraged to use ```global_model``` only when is sufficient for better efficiency. + +There is a special Checkpoint function called [LazyCheckpoint](http://homes.cs.washington.edu/~tqchen/rabit/doc/namespacerabit.html#a99f74c357afa5fba2c80cc0363e4e459), +which can be used for ```global_model``` only cases under certain condition. +When LazyCheckpoint is called, no action is taken and the rabit engine only remembers the pointer to the model. +The serialization will only happen when another node fails and the recovery starts. So user basically pays no extra cost calling LazyCheckpoint. +However, to use this function, the user MUST ensure the model remain unchanged until the last call of Allreduce/Broadcast in the current version finishes. +So that when recovery procedure happens in these function calls, the serialized model will be the same. + +For example, consider the following calling sequence +``` +LazyCheckPoint, code1, Allreduce, code2, Broadcast, code3, LazyCheckPoint +``` +The user must only change the model in code3. Such condition can usually be satiesfied in many scenarios, and user can use LazyCheckpoint to further +improve the efficiency of the program. + Compile Programs with Rabit ==== Rabit is a portable library, to use it, you only need to include the rabit header file. @@ -299,4 +408,4 @@ recovered node fetches its latest checkpoint and the results of Allreduce/Broadcast calls after the checkpoint from some alive nodes. This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated, -and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase. \ No newline at end of file +and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase. diff --git a/guide/basic.cc b/guide/basic.cc index e7863b1fd..62c0fc165 100644 --- a/guide/basic.cc +++ b/guide/basic.cc @@ -18,7 +18,11 @@ int main(int argc, char *argv[]) { rabit::GetRank(), a[0], a[1], a[2]); // allreduce take max of each elements in all processes Allreduce(&a[0], N); - printf("@node[%d] after-allreduce: a={%d, %d, %d}\n", + printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); + // second allreduce that sums everything up + Allreduce(&a[0], N); + printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n", rabit::GetRank(), a[0], a[1], a[2]); rabit::Finalize(); return 0; diff --git a/guide/basic.py b/guide/basic.py index 800d1cb80..becdae07d 100755 --- a/guide/basic.py +++ b/guide/basic.py @@ -19,5 +19,7 @@ for i in xrange(n): print '@node[%d] before-allreduce: a=%s' % (rank, str(a)) a = rabit.allreduce(a, rabit.MAX) -print '@node[%d] after-allreduce: a=%s' % (rank, str(a)) +print '@node[%d] after-allreduce-max: a=%s' % (rank, str(a)) +a = rabit.allreduce(a, rabit.SUM) +print '@node[%d] after-allreduce-sum: a=%s' % (rank, str(a)) rabit.finalize() diff --git a/guide/lazy_allreduce.cc b/guide/lazy_allreduce.cc new file mode 100644 index 000000000..b54776ecc --- /dev/null +++ b/guide/lazy_allreduce.cc @@ -0,0 +1,33 @@ +/*! + * Copyright (c) 2014 by Contributors + * \file basic.cc + * \brief This is an example demonstrating what is Allreduce + * + * \author Tianqi Chen + */ +#include +using namespace rabit; +const int N = 3; +int main(int argc, char *argv[]) { + int a[N] = {0}; + rabit::Init(argc, argv); + // lazy preparation function + auto prepare = [&]() { + printf("@node[%d] run prepare function\n", rabit::GetRank()); + for (int i = 0; i < N; ++i) { + a[i] = rabit::GetRank() + i; + } + }; + printf("@node[%d] before-allreduce: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); + // allreduce take max of each elements in all processes + Allreduce(&a[0], N, prepare); + printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); + // rum second allreduce + Allreduce(&a[0], N); + printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n", + rabit::GetRank(), a[0], a[1], a[2]); + rabit::Finalize(); + return 0; +} diff --git a/guide/lazy_allreduce.py b/guide/lazy_allreduce.py new file mode 100755 index 000000000..a195f58d2 --- /dev/null +++ b/guide/lazy_allreduce.py @@ -0,0 +1,31 @@ +#!/usr/bin/python +""" +demo python script of rabit: Lazy preparation function +""" +import os +import sys +import numpy as np +# import rabit, the tracker script will setup the lib path correctly +# for normal run without tracker script, add following line +# sys.path.append(os.path.dirname(__file__) + '/../wrapper') +import rabit + + +# use mock library so that we can run failure test +rabit.init(lib = 'mock') +n = 3 +rank = rabit.get_rank() +a = np.zeros(n) + +def prepare(a): + print '@node[%d] run prepare function' % rank + # must take in reference and modify the reference + for i in xrange(n): + a[i] = rank + i + +print '@node[%d] before-allreduce: a=%s' % (rank, str(a)) +a = rabit.allreduce(a, rabit.MAX, prepare_fun = prepare) +print '@node[%d] after-allreduce-max: a=%s' % (rank, str(a)) +a = rabit.allreduce(a, rabit.SUM) +print '@node[%d] after-allreduce-sum: a=%s' % (rank, str(a)) +rabit.finalize() diff --git a/include/rabit.h b/include/rabit.h index 28b72e454..1c5c70e5f 100644 --- a/include/rabit.h +++ b/include/rabit.h @@ -214,9 +214,9 @@ inline void CheckPoint(const ISerializable *global_model, * Allreduce/Broadcast and LazyCheckPoint, both in the same version * * For example, suppose the calling sequence is: - * LazyCheckPoint, code1, Allreduce, code2, Broadcast, code3, LazyCheckPoint + * LazyCheckPoint, code1, Allreduce, code2, Broadcast, code3, LazyCheckPoint/(or can be CheckPoint) * - * If the user wants to use LazyCheckPoint, then she MUST only change the global_model in code3. + * Then the user MUST only change the global_model in code3. * * The use of LazyCheckPoint instead of CheckPoint will improve the efficiency of the program. * \param global_model pointer to the globally shared model/state