update guide
This commit is contained in:
parent
06206e1d03
commit
90a8505208
@ -5,11 +5,12 @@ export LDFLAGS= -pthread -lm -L../lib
|
||||
export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../include
|
||||
|
||||
.PHONY: clean all lib libmpi
|
||||
BIN = basic.rabit
|
||||
BIN = basic.rabit broadcast.rabit
|
||||
MOCKBIN=
|
||||
|
||||
all: $(BIN) $(MOCKBIN)
|
||||
basic.rabit: basic.cc lib
|
||||
broadcast.rabit: broadcast.cc lib
|
||||
|
||||
$(BIN) :
|
||||
$(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc, $^) $(LDFLAGS) -lrabit
|
||||
|
||||
119
guide/README.md
119
guide/README.md
@ -1,11 +1,13 @@
|
||||
Tutorial of Rabit
|
||||
=====
|
||||
This is an tutorial of rabit, a Reliable Allreduce and Broadcast interface.
|
||||
This is an tutorial of rabit, a ***Reliable Allreduce and Broadcast interface***.
|
||||
To run the examples locally, you will need to type ```make``` to build all the examples.
|
||||
|
||||
**List of Topics**
|
||||
* [What is Allreduce](#what-is-allreduce)
|
||||
* [Common Usecase of Allreduce](#common-use-case)
|
||||
* [Common Use Case](#common-use-case)
|
||||
* [Structure of Rabit Program](#structure-of-rabit-program)
|
||||
* [Fault Tolerance](#fault-tolerance)
|
||||
|
||||
What is Allreduce
|
||||
=====
|
||||
@ -42,13 +44,118 @@ The ```rabit::GetRank()``` function return the rank of current process.
|
||||
Before the call the allreduce, process 0 contains array ```a = {0, 1, 2}```, while process 1 have array
|
||||
```a = {1, 2, 3}```. After the call of Allreduce, the array contents in all processes are replaced by the
|
||||
reduction result (in this case, the maximum value in each position across all the processes). So after the
|
||||
Allreduce call, the result will become ```a={1, 2, 3}```.
|
||||
|
||||
Allreduce call, the result will become ```a = {1, 2, 3}```.
|
||||
Rabit provides different reduction operators, for example, you can change ```op::Max``` to ```op::Sum```,
|
||||
then the reduction operation will become the summation, and the result will become ```a = {1, 3, 5}```.
|
||||
You can also run example with different processes by setting -n to different values, to see the outcomming result.
|
||||
Rabit provides different reduction operators, for example, you can change ```op::Max``` to ```op::Sum```, to change
|
||||
the reduction method from maximum to summation.
|
||||
|
||||
Broadcast is another method provided by rabit besides Allreduce, this function allows one node to broadcast its
|
||||
local data to all the other nodes. The following code in [broadcast.cc](broadcast.cc) broadcast a string from
|
||||
node 0 to all other nodes.
|
||||
```c++
|
||||
#include <rabit.h>
|
||||
using namespace rabit;
|
||||
const int N = 3;
|
||||
int main(int argc, char *argv[]) {
|
||||
rabit::Init(argc, argv);
|
||||
std::string s;
|
||||
if (rabit::GetRank() == 0) s = "hello world";
|
||||
printf("@node[%d] before-broadcast: s=\"%s\"\n",
|
||||
rabit::GetRank(), s.c_str());
|
||||
// broadcast s from node 0 to all other nodes
|
||||
rabit::Broadcast(&s, 0);
|
||||
printf("@node[%d] after-broadcast: s=\"%s\"\n",
|
||||
rabit::GetRank(), s.c_str());
|
||||
rabit::Finalize();
|
||||
return 0;
|
||||
}
|
||||
```
|
||||
You can run the program by the following command, using three workers.
|
||||
```bash
|
||||
../tracker/rabit_demo.py -n 3 broadcast.rabit
|
||||
```
|
||||
Besides string, rabit also allows broadcast of constant size array and vector.
|
||||
|
||||
Common Use Case
|
||||
=====
|
||||
Many distributed machine learning algorithm involves dividing the data into each node,
|
||||
compute statistics locally and aggregates them together. Such process is usually done repeatively in
|
||||
many iterations before the algorithm converge. Allreduce naturally meets the need of such programs,
|
||||
common use cases include:
|
||||
|
||||
* Aggregation of gradient values, which can be used in optimization methods such as L-BFGS.
|
||||
* Aggregation of other statistics, which can be used in KMeans and Gaussian Mixture Model.
|
||||
* Find the best split candidate and aggregation of split statistics, used for tree based models.
|
||||
|
||||
The main purpose of Rabit is to provide reliable and portable library for distributed machine learning programs.
|
||||
So that the program can be run reliably on different types of platforms.
|
||||
|
||||
Structure of Rabit Program
|
||||
=====
|
||||
The following code illustrates the common structure of rabit program. This is an abstract example,
|
||||
you can also refer to [kmeans.cc](../toolkit/kmeans.cc) for an example implementation of kmeans.
|
||||
|
||||
```c++
|
||||
#include <rabit.h>
|
||||
int main(int argc, char *argv[]) {
|
||||
...
|
||||
rabit::Init(argc, argv);
|
||||
// load the latest checked model
|
||||
int version = rabit::LoadCheckPoint(&model);
|
||||
// initialize the model if it is the first version
|
||||
if (version == 0) model.InitModel();
|
||||
// the version number marks the iteration to resume
|
||||
for (int iter = version; iter < max_iter; ++iter) {
|
||||
// model should be sufficient variable at this point
|
||||
...
|
||||
// each iteration can contain multiple calls of allreduce/broadcast
|
||||
rabit::Allreduce(&data[0], n);
|
||||
...
|
||||
// checkpoint model after one iteration finishes
|
||||
rabit::CheckPoint(&model);
|
||||
}
|
||||
rabit::Finalize();
|
||||
return 0;
|
||||
}
|
||||
```
|
||||
|
||||
Besides the common Allreduce and Broadcast function, there are two additional functions: ```CheckPoint```
|
||||
and ```CheckPoint```. These two functions are used for fault-tolerance purpose.
|
||||
Common machine learning programs involves several iterations. In each iteration, we start from a model, do some calls
|
||||
to Allreduce or Broadcasts and update the model to a new one. The calling sequence in each iteration does not need to be the same.
|
||||
|
||||
* When the nodes start from beginning, LoadCheckPoint returns 0, and we can initialize the model.
|
||||
* ```CheckPoint``` saves the model after each iteration.
|
||||
- Efficiency Note: the model is only kept in local memory and no save to disk is involved in Checkpoint
|
||||
* When a node goes down and restarts, ```LoadCheckPoint``` will recover the latest saved model, and
|
||||
* When a node goes down, the rest of the node will block in the call of Allreduce/Broadcast and helps
|
||||
the recovery of the failure nodes, util it catches up.
|
||||
|
||||
Please also see the next section for introduction of fault tolerance procedure in rabit.
|
||||
|
||||
Fault Tolerance
|
||||
=====
|
||||
This section introduces the how fault tolerance works in rabit.
|
||||
We can use the following figure to show the how rabit deals with failures.
|
||||
|
||||

|
||||
|
||||
The scenario is as follows:
|
||||
* Node 1 fails between the first and second call of Allreduce after the latest checkpoint
|
||||
* Other nodes stay in the call of second Allreduce to help node 1 to recover.
|
||||
* When node 1 restarts, it will call ```LoadCheckPoint```, and get the latest checkpoint from one of the existing nodes.
|
||||
* Then node 1 can start from the latest checkpoint and continue running.
|
||||
* When node 1 call the first Allreduce again, because the other nodes already knows the result of allreduce, node 1 can get the result from one of the nodes.
|
||||
* When node 1 reaches the second Allreduce, other nodes find out that node 1 has catched up and they can continue the program normally.
|
||||
|
||||
We can find that this fault tolerance model is based on the a key property of Allreduce and Broadcast:
|
||||
All the nodes get the same result when calling Allreduce/Broadcast. Because of this property, we can have some node records the history,
|
||||
and when a node recovers, the result can be forwarded to the recovering node.
|
||||
|
||||
The checkpoint is introduced so that we do not have to discard the history before the checkpoint, so that the iterative program can be more
|
||||
efficient. The strategy of rabit is different from fail-restart strategy where all the nodes restarts from checkpoint
|
||||
when any of the node fails. All the program only block in the Allreduce call to help the recovery, and the checkpoint is only saved locally without
|
||||
touching the disk. This makes rabit program more reliable and efficient.
|
||||
|
||||
This is an conceptual introduction to the fault tolerant model of rabit. The actual implementation is more sophiscated,
|
||||
and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.
|
||||
|
||||
@ -18,7 +18,7 @@ lib:
|
||||
libmpi:
|
||||
cd ..;make lib/librabit_mpi.a;cd -
|
||||
|
||||
kmeans.o: kmeans.cpp ../src/*.h
|
||||
kmeans.o: kmeans.cc ../src/*.h
|
||||
|
||||
# we can link against MPI version to get use MPI
|
||||
kmeans.rabit: kmeans.o lib
|
||||
|
||||
@ -1,3 +1,8 @@
|
||||
Toolkit
|
||||
====
|
||||
This folder contains example toolkit developed using rabit
|
||||
|
||||
KMeans
|
||||
====
|
||||
* Kmeans taks in LIBSVM format
|
||||
* You will need a dummy label field at beginning of all the lines to get KMeans
|
||||
|
||||
@ -43,7 +43,7 @@ struct SparseMat {
|
||||
feat_dim = 0;
|
||||
float label; bool init = true;
|
||||
char tmp[1024];
|
||||
while (fscanf(file, "%s", tmp) == 1) {
|
||||
while (fscanf(fi, "%s", tmp) == 1) {
|
||||
Entry e;
|
||||
if (sscanf(tmp, "%u:%f", &e.findex, &e.fvalue) == 2) {
|
||||
data.push_back(e);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user