384 lines
20 KiB
Markdown
384 lines
20 KiB
Markdown
Tutorial
|
|
========
|
|
This is rabit's tutorial, a ***Reliable Allreduce and Broadcast Interface***.
|
|
All the example codes are in the [guide](https://github.com/dmlc/rabit/blob/master/guide/) folder of the project.
|
|
To run the examples locally, you will need to build them with ```make```.
|
|
|
|
**List of Topics**
|
|
* [What is Allreduce](#what-is-allreduce)
|
|
* [Common Use Case](#common-use-case)
|
|
* [Use Rabit API](#use-rabit-api)
|
|
- [Structure of a Rabit Program](#structure-of-a-rabit-program)
|
|
- [Allreduce and Lazy Preparation](#allreduce-and-lazy-preparation)
|
|
- [Checkpoint and LazyCheckpoint](#checkpoint-and-lazycheckpoint)
|
|
* [Compile Programs with Rabit](#compile-programs-with-rabit)
|
|
* [Running Rabit Jobs](#running-rabit-jobs)
|
|
* [Fault Tolerance](#fault-tolerance)
|
|
|
|
What is Allreduce
|
|
-----------------
|
|
The main methods provided by rabit are Allreduce and Broadcast. Allreduce performs reduction across different computation nodes,
|
|
and returns the result to every node. To understand the behavior of the function, consider the following example in [basic.cc](../guide/basic.cc) (there is a python example right after this if you are more familiar with python).
|
|
```c++
|
|
#include <rabit.h>
|
|
using namespace rabit;
|
|
const int N = 3;
|
|
int main(int argc, char *argv[]) {
|
|
int a[N];
|
|
rabit::Init(argc, argv);
|
|
for (int i = 0; i < N; ++i) {
|
|
a[i] = rabit::GetRank() + i;
|
|
}
|
|
printf("@node[%d] before-allreduce: a={%d, %d, %d}\n",
|
|
rabit::GetRank(), a[0], a[1], a[2]);
|
|
// allreduce take max of each elements in all processes
|
|
Allreduce<op::Max>(&a[0], N);
|
|
printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n",
|
|
rabit::GetRank(), a[0], a[1], a[2]);
|
|
// second allreduce that sums everything up
|
|
Allreduce<op::Sum>(&a[0], N);
|
|
printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n",
|
|
rabit::GetRank(), a[0], a[1], a[2]);
|
|
rabit::Finalize();
|
|
return 0;
|
|
}
|
|
```
|
|
You can run the example using the rabit_demo.py script. The following command
|
|
starts the rabit program with two worker processes.
|
|
```bash
|
|
../tracker/rabit_demo.py -n 2 basic.rabit
|
|
```
|
|
This will start two processes, one process with rank 0 and the other with rank 1, both processes run the same code.
|
|
The ```rabit::GetRank()``` function returns the rank of current process.
|
|
|
|
Before the call to Allreduce, process 0 contains the array ```a = {0, 1, 2}```, while process 1 has the array
|
|
```a = {1, 2, 3}```. After the call to Allreduce, the array contents in all processes are replaced by the
|
|
reduction result (in this case, the maximum value in each position across all the processes). So, after the
|
|
Allreduce call, the result will become ```a = {1, 2, 3}```.
|
|
Rabit provides different reduction operators, for example, if you change ```op::Max``` to ```op::Sum```,
|
|
the reduction operation will be a summation, and the result will become ```a = {1, 3, 5}```.
|
|
You can also run the example with different processes by setting -n to different values.
|
|
|
|
If you are more familiar with python, you can also use rabit in python. The same example as before can be found in [basic.py](../guide/basic.py):
|
|
|
|
```python
|
|
import numpy as np
|
|
import rabit
|
|
|
|
rabit.init()
|
|
n = 3
|
|
rank = rabit.get_rank()
|
|
a = np.zeros(n)
|
|
for i in xrange(n):
|
|
a[i] = rank + i
|
|
|
|
print '@node[%d] before-allreduce: a=%s' % (rank, str(a))
|
|
a = rabit.allreduce(a, rabit.MAX)
|
|
print '@node[%d] after-allreduce-max: a=%s' % (rank, str(a))
|
|
a = rabit.allreduce(a, rabit.SUM)
|
|
print '@node[%d] after-allreduce-sum: a=%s' % (rank, str(a))
|
|
rabit.finalize()
|
|
```
|
|
You can run the program using the following command
|
|
```bash
|
|
../tracker/rabit_demo.py -n 2 basic.py
|
|
```
|
|
|
|
Broadcast is another method provided by rabit besides Allreduce. This function allows one node to broadcast its
|
|
local data to all other nodes. The following code in [broadcast.cc](../guide/broadcast.cc) broadcasts a string from
|
|
node 0 to all other nodes.
|
|
```c++
|
|
#include <rabit.h>
|
|
using namespace rabit;
|
|
const int N = 3;
|
|
int main(int argc, char *argv[]) {
|
|
rabit::Init(argc, argv);
|
|
std::string s;
|
|
if (rabit::GetRank() == 0) s = "hello world";
|
|
printf("@node[%d] before-broadcast: s=\"%s\"\n",
|
|
rabit::GetRank(), s.c_str());
|
|
// broadcast s from node 0 to all other nodes
|
|
rabit::Broadcast(&s, 0);
|
|
printf("@node[%d] after-broadcast: s=\"%s\"\n",
|
|
rabit::GetRank(), s.c_str());
|
|
rabit::Finalize();
|
|
return 0;
|
|
}
|
|
```
|
|
The following command starts the program with three worker processes.
|
|
```bash
|
|
../tracker/rabit_demo.py -n 3 broadcast.rabit
|
|
```
|
|
Besides strings, rabit also allows to broadcast constant size array and vectors.
|
|
|
|
The counterpart in python can be found in [broadcast.py](../guide/broadcast.py). Here is a snippet so that you can get a better sense of how simple is to use the python library:
|
|
|
|
```python
|
|
import rabit
|
|
rabit.init()
|
|
n = 3
|
|
rank = rabit.get_rank()
|
|
s = None
|
|
if rank == 0:
|
|
s = {'hello world':100, 2:3}
|
|
print '@node[%d] before-broadcast: s=\"%s\"' % (rank, str(s))
|
|
s = rabit.broadcast(s, 0)
|
|
print '@node[%d] after-broadcast: s=\"%s\"' % (rank, str(s))
|
|
rabit.finalize()
|
|
```
|
|
|
|
Common Use Case
|
|
---------------
|
|
Many distributed machine learning algorithms involve splitting the data into different nodes,
|
|
computing statistics locally, and finally aggregating them. Such workflow is usually done repetitively through many iterations before the algorithm converges. Allreduce naturally meets the structure of such programs,
|
|
common use cases include:
|
|
|
|
* Aggregation of gradient values, which can be used in optimization methods such as L-BFGS.
|
|
* Aggregation of other statistics, which can be used in KMeans and Gaussian Mixture Models.
|
|
* Find the best split candidate and aggregation of split statistics, used for tree based models.
|
|
|
|
Rabit is a reliable and portable library for distributed machine learning programs, that allow programs to run reliably on different platforms.
|
|
|
|
Use Rabit API
|
|
-------------
|
|
This section introduces topics about how to use rabit API.
|
|
You can always refer to [API Documentation](http://homes.cs.washington.edu/~tqchen/rabit/doc) for definition of each functions.
|
|
This section trys to gives examples of different aspectes of rabit API.
|
|
|
|
#### Structure of a Rabit Program
|
|
The following code illustrates the common structure of a rabit program. This is an abstract example,
|
|
you can also refer to [wormhole](https://github.com/dmlc/wormhole/blob/master/learn/kmeans/kmeans.cc) for an example implementation of kmeans algorithm.
|
|
|
|
```c++
|
|
#include <rabit.h>
|
|
int main(int argc, char *argv[]) {
|
|
...
|
|
rabit::Init(argc, argv);
|
|
// sync on expected model size before load checkpoint, if we pass rabit_bootstrap_cache=true
|
|
rabit::Allreduce<rabit::op::Max>(&model.size(), 1);
|
|
// load the latest checked model
|
|
int version = rabit::LoadCheckPoint(&model);
|
|
// initialize the model if it is the first version
|
|
if (version == 0) model.InitModel();
|
|
// the version number marks the iteration to resume
|
|
for (int iter = version; iter < max_iter; ++iter) {
|
|
// at this point, the model object should allow us to recover the program state
|
|
...
|
|
// each iteration can contain multiple calls of allreduce/broadcast
|
|
rabit::Allreduce<rabit::op::Max>(&data[0], n);
|
|
...
|
|
// checkpoint model after one iteration finishes
|
|
rabit::CheckPoint(&model);
|
|
}
|
|
rabit::Finalize();
|
|
return 0;
|
|
}
|
|
```
|
|
|
|
Besides the common Allreduce and Broadcast functions, there are two additional functions: ```LoadCheckPoint```
|
|
and ```CheckPoint```. These two functions are used for fault-tolerance purposes.
|
|
As mentioned before, traditional machine learning programs involve several iterations. In each iteration, we start with a model, make some calls
|
|
to Allreduce or Broadcast and update the model. The calling sequence in each iteration does not need to be the same.
|
|
|
|
* When the nodes start from the beginning (i.e. iteration 0), ```LoadCheckPoint``` returns 0, so we can initialize the model.
|
|
* ```CheckPoint``` saves the model after each iteration.
|
|
- Efficiency Note: the model is only kept in local memory and no save to disk is performed when calling Checkpoint
|
|
* When a node goes down and restarts, ```LoadCheckPoint``` will recover the latest saved model, and
|
|
* When a node goes down, the rest of the nodes will block in the call of Allreduce/Broadcast and wait for
|
|
the recovery of the failed node until it catches up.
|
|
|
|
Please see the [Fault Tolerance](#fault-tolerance) section to understand the recovery procedure executed by rabit.
|
|
|
|
#### Allreduce and Lazy Preparation
|
|
Allreduce is one of the most important function provided by rabit. You can call allreduce by specifying the
|
|
reduction operator, pointer to the data and size of the buffer, as follows
|
|
```c++
|
|
Allreduce<operator>(pointer_of_data, size_of_data);
|
|
```
|
|
This is the basic use case of Allreduce function. It is common that user writes the code to prepare the data needed
|
|
into the data buffer, pass the data to Allreduce function, and get the reduced result. However, when a node restarts
|
|
from failure, we can directly recover the result from other nodes(see also [Fault Tolerance](#fault-tolerance)) and
|
|
the data preparation procedure no longer necessary. Rabit Allreduce add an optional parameter preparation function
|
|
to support such scenario. User can pass in a function that corresponds to the data preparation procedure to Allreduce
|
|
calls, and the data preparation function will only be called when necessary. We use [lazy_allreduce.cc](../guide/lazy_allreduce.cc)
|
|
as an example to demonstrate this feature. It is modified from [basic.cc](../guide/basic.cc), and you can compare the two codes.
|
|
```c++
|
|
#include <rabit.h>
|
|
using namespace rabit;
|
|
const int N = 3;
|
|
int main(int argc, char *argv[]) {
|
|
int a[N] = {0};
|
|
rabit::Init(argc, argv);
|
|
// lazy preparation function
|
|
auto prepare = [&]() {
|
|
printf("@node[%d] run prepare function\n", rabit::GetRank());
|
|
for (int i = 0; i < N; ++i) {
|
|
a[i] = rabit::GetRank() + i;
|
|
}
|
|
};
|
|
printf("@node[%d] before-allreduce: a={%d, %d, %d}\n",
|
|
rabit::GetRank(), a[0], a[1], a[2]);
|
|
// allreduce take max of each elements in all processes
|
|
Allreduce<op::Max>(&a[0], N, prepare);
|
|
printf("@node[%d] after-allreduce-sum: a={%d, %d, %d}\n",
|
|
rabit::GetRank(), a[0], a[1], a[2]);
|
|
// rum second allreduce
|
|
Allreduce<op::Sum>(&a[0], N);
|
|
printf("@node[%d] after-allreduce-max: a={%d, %d, %d}\n",
|
|
rabit::GetRank(), a[0], a[1], a[2]);
|
|
rabit::Finalize();
|
|
return 0;
|
|
}
|
|
```
|
|
Here we use features of C++11 because the lambda function makes things much shorter.
|
|
There is also C++ compatible callback interface provided in the [API](http://homes.cs.washington.edu/~tqchen/rabit/doc).
|
|
You can compile the program by typing ```make lazy_allreduce.mock```. We link against the mock library so that we can see
|
|
the effect when a process goes down. You can run the program using the following command
|
|
```bash
|
|
../tracker/rabit_demo.py -n 2 lazy_allreduce.mock mock=0,0,1,0
|
|
```
|
|
The additional arguments ```mock=0,0,1,0``` will cause node 0 to kill itself before second call of Allreduce (see also [mock test](#link-against-mock-test-rabit-library)).
|
|
You will find that the prepare function's print is only executed once and node 0 will no longer execute the preparation function when it restarts from failure.
|
|
|
|
You can also find python version of the example in [lazy_allreduce.py](../guide/lazy_allreduce.py), and run it using the followin command
|
|
```bash
|
|
../tracker/rabit_demo.py -n 2 lazy_allreduce.py mock=0,0,1,0
|
|
|
|
```
|
|
|
|
Since lazy preparation function may not be called during execution. User should be careful when using this feature. For example, a possible mistake
|
|
could be putting some memory allocation code in the lazy preparation function, and the computing memory was not allocated when lazy preparation function is not called.
|
|
The example in [lazy_allreduce.cc](../guide/lazy_allreduce.cc) provides a simple way to migrate normal prepration code([basic.cc](../guide/basic.cc)) to lazy version: wrap the preparation
|
|
code with a lambda function, and pass it to allreduce.
|
|
|
|
#### Checkpoint and LazyCheckpoint
|
|
Common machine learning algorithms usually involves iterative computation. As mentioned in the section ([Structure of a Rabit Program](#structure-of-a-rabit-program)),
|
|
user can and should use Checkpoint to ```save``` the progress so far, so that when a node fails, the latest checkpointed model can be loaded.
|
|
|
|
There are two model arguments you can pass to Checkpoint and LoadCheckpoint: ```global_model``` and ```local_model```:
|
|
* ```global_model``` refers to the model that is commonly shared across all the nodes
|
|
- For example, the centriods of clusters in kmeans is shared across all nodes
|
|
* ```local_model``` refers to the model that is specifically tied to the current node
|
|
- For example, in topic modeling, the topic assignments of subset of documents in current node is local model
|
|
|
|
Because the different nature of the two types of models, different strategy will be used for them.
|
|
```global_model``` is simply saved in local memory of each node, while ```local_model``` will replicated to some other
|
|
nodes (selected using a ring replication strategy). The checkpoint is only saved in the memory without touching the disk which makes rabit programs more efficient.
|
|
User is encouraged to use ```global_model``` only when is sufficient for better efficiency.
|
|
|
|
To enable a model class to be checked pointed, user can implement a [serialization interface](../include/rabit_serialization.h). The serialization interface already
|
|
provide serialization functions of STL vector and string. For python API, user can checkpoint any python object that can be pickled.
|
|
|
|
There is a special Checkpoint function called [LazyCheckpoint](http://homes.cs.washington.edu/~tqchen/rabit/doc/namespacerabit.html#a99f74c357afa5fba2c80cc0363e4e459),
|
|
which can be used for ```global_model``` only cases under certain condition.
|
|
When LazyCheckpoint is called, no action is taken and the rabit engine only remembers the pointer to the model.
|
|
The serialization will only happen when another node fails and the recovery starts. So user basically pays no extra cost calling LazyCheckpoint.
|
|
To use this function, the user need to ensure the model remain unchanged until the last call of Allreduce/Broadcast in the current version finishes.
|
|
So that when recovery procedure happens in these function calls, the serialized model will be the same.
|
|
|
|
For example, consider the following calling sequence
|
|
```
|
|
LazyCheckPoint, code1, Allreduce, code2, Broadcast, code3, LazyCheckPoint
|
|
```
|
|
The user must only change the model in code3. Such condition can usually be satiesfied in many scenarios, and user can use LazyCheckpoint to further
|
|
improve the efficiency of the program.
|
|
|
|
|
|
Compile Programs with Rabit
|
|
---------------------------
|
|
Rabit is a portable library, to use it, you only need to include the rabit header file.
|
|
* You will need to add the path to [../include](../include) to the header search path of the compiler
|
|
- Solution 1: add ```-I/path/to/rabit/include``` to the compiler flag in gcc or clang
|
|
- Solution 2: add the path to the environment variable CPLUS_INCLUDE_PATH
|
|
* You will need to add the path to [../lib](../lib) to the library search path of the compiler
|
|
- Solution 1: add ```-L/path/to/rabit/lib``` to the linker flag
|
|
- Solution 2: add the path to environment variable LIBRARY_PATH AND LD_LIBRARY_PATH
|
|
* Link against lib/rabit.a
|
|
- Add ```-lrabit``` to the linker flag
|
|
|
|
The procedure above allows you to compile a program with rabit. The following two sections contain additional
|
|
options you can use to link against different backends other than the normal one.
|
|
|
|
#### Link against MPI Allreduce
|
|
You can link against ```rabit_mpi.a``` instead of using MPI Allreduce, however, the resulting program is backed by MPI and
|
|
is not fault tolerant anymore.
|
|
* Simply change the linker flag from ```-lrabit``` to ```-lrabit_mpi```
|
|
* The final linking needs to be done by mpi wrapper compiler ```mpicxx```
|
|
|
|
#### Link against Mock Test Rabit Library
|
|
If you want to use a mock to test the program in order to see the behavior of the code when some nodes go down, you can link against ```rabit_mock.a``` .
|
|
* Simply change the linker flag from ```-lrabit``` to ```-lrabit_mock```
|
|
|
|
The resulting rabit mock program can take in additional arguments in the following format
|
|
```
|
|
mock=rank,version,seq,ndeath
|
|
```
|
|
|
|
The four integers specify an event that will cause the program to ```commit suicide```(exit with -2)
|
|
* rank specifies the rank of the node to kill
|
|
* version specifies the version (iteration) of the model where you want the process to die
|
|
* seq specifies the sequence number of the Allreduce/Broadcast call since last checkpoint, where the process will be killed
|
|
* ndeath specifies how many times this node died already
|
|
|
|
For example, consider the following script in the test case
|
|
```bash
|
|
../tracker/rabit_demo.py -n 10 test_model_recover 10000\
|
|
mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1
|
|
```
|
|
* The first mock will cause node 0 to exit when calling the second Allreduce/Broadcast (seq = 1) in iteration 0
|
|
* The second mock will cause node 1 to exit when calling the second Allreduce/Broadcast (seq = 1) in iteration 1
|
|
* The third mock will cause node 1 to exit again when calling second Allreduce/Broadcast (seq = 1) in iteration 1
|
|
- Note that ndeath = 1 means this will happen only if node 1 died once, which is our case
|
|
|
|
Running Rabit Jobs
|
|
------------------
|
|
Rabit is a portable library that can run on multiple platforms.
|
|
All the rabit jobs can be submitted using [dmlc-tracker](https://github.com/dmlc/dmlc-core/tree/master/tracker)
|
|
|
|
Fault Tolerance
|
|
---------------
|
|
This section introduces how fault tolerance works in rabit.
|
|
The following figure shows how rabit deals with failures.
|
|
|
|

|
|
|
|
The scenario is as follows:
|
|
* Node 1 fails between the first and second call of Allreduce after the second checkpoint
|
|
* The other nodes wait in the call of the second Allreduce in order to help node 1 to recover.
|
|
* When node 1 restarts, it will call ```LoadCheckPoint```, and get the latest checkpoint from one of the existing nodes.
|
|
* Then node 1 can start from the latest checkpoint and continue running.
|
|
* When node 1 calls the first Allreduce again, as the other nodes already know the result, node 1 can get it from one of them.
|
|
* When node 1 reaches the second Allreduce, the other nodes find out that node 1 has catched up and they can continue the program normally.
|
|
|
|
This fault tolerance model is based on a key property of Allreduce and
|
|
Broadcast: All the nodes get the same result after calling Allreduce/Broadcast.
|
|
Because of this property, any node can record the results of history
|
|
Allreduce/Broadcast calls. When a node is recovered, it can fetch the lost
|
|
results from some alive nodes and rebuild its model.
|
|
|
|
The checkpoint is introduced so that we can discard the history results of
|
|
Allreduce/Broadcast calls before the latest checkpoint. This saves memory
|
|
consumption used for backup. The checkpoint of each node is a model defined by
|
|
users and can be split into 2 parts: a global model and a local model. The
|
|
global model is shared by all nodes and can be backed up by any nodes. The
|
|
local model of a node is replicated to some other nodes (selected using a ring
|
|
replication strategy). The checkpoint is only saved in the memory without
|
|
touching the disk which makes rabit programs more efficient. The strategy of
|
|
rabit is different from the fail-restart strategy where all the nodes restart
|
|
from the same checkpoint when any of them fail. In rabit, all the alive nodes
|
|
will block in the Allreduce call and help the recovery. To catch up, the
|
|
recovered node fetches its latest checkpoint and the results of
|
|
Allreduce/Broadcast calls after the checkpoint from some alive nodes.
|
|
|
|
This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated,
|
|
and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.
|
|
|
|
Rabit Timeout
|
|
---------------
|
|
|
|
In certain cases, rabit cluster may suffer lack of resources to retry failed workers.
|
|
Thanks to fault tolerant assumption with infinite retry, it might cause entire cluster hang infinitely.
|
|
We introduce sidecar thread which runs when rabit fault tolerant runtime observed allreduce/broadcast errors.
|
|
By default, it will wait for 30 mins before all workers program exit.
|
|
User can opt-in this feature and change treshold by passing rabit_timeout=true and rabit_timeout_sec=x (in seconds).
|