more cosmetic stuff

This commit is contained in:
nachocano 2015-01-11 01:31:10 -08:00
parent aea4c10847
commit eef79067a8

View File

@ -87,8 +87,7 @@ Besides strings, rabit also allows to broadcast constant size array and vectors.
Common Use Case Common Use Case
===== =====
Many distributed machine learning algorithms involve splitting the data into different nodes, Many distributed machine learning algorithms involve splitting the data into different nodes,
computing statistics locally, and finally aggregating them. Such workflow is usually done repetitively through computing statistics locally, and finally aggregating them. Such workflow is usually done repetitively through many iterations before the algorithm converges. Allreduce naturally meets the structure of such programs,
many iterations before the algorithm converges. Allreduce naturally meets the structure of such programs,
common use cases include: common use cases include:
* Aggregation of gradient values, which can be used in optimization methods such as L-BFGS. * Aggregation of gradient values, which can be used in optimization methods such as L-BFGS.
@ -131,14 +130,14 @@ and ```CheckPoint```. These two functions are used for fault-tolerance purposes.
As mentioned before, traditional machine learning programs involve several iterations. In each iteration, we start with a model, make some calls As mentioned before, traditional machine learning programs involve several iterations. In each iteration, we start with a model, make some calls
to Allreduce or Broadcast and update the model. The calling sequence in each iteration does not need to be the same. to Allreduce or Broadcast and update the model. The calling sequence in each iteration does not need to be the same.
* When the nodes start from the beginning (i.e. iteration 0), LoadCheckPoint returns 0, so we can initialize the model. * When the nodes start from the beginning (i.e. iteration 0), ```LoadCheckPoint``` returns 0, so we can initialize the model.
* ```CheckPoint``` saves the model after each iteration. * ```CheckPoint``` saves the model after each iteration.
- Efficiency Note: the model is only kept in local memory and no save to disk is performed when calling Checkpoint - Efficiency Note: the model is only kept in local memory and no save to disk is performed when calling Checkpoint
* When a node goes down and restarts, ```LoadCheckPoint``` will recover the latest saved model, and * When a node goes down and restarts, ```LoadCheckPoint``` will recover the latest saved model, and
* When a node goes down, the rest of the nodes will block in the call of Allreduce/Broadcast and wait for * When a node goes down, the rest of the nodes will block in the call of Allreduce/Broadcast and wait for
the recovery of the failed node until it catches up. the recovery of the failed node until it catches up.
Please see the [fault tolerance procedure](#fault-tolerance) section to understand the recovery procedure executed by rabit. Please see the [Fault Tolerance](#fault-tolerance) section to understand the recovery procedure executed by rabit.
Compile Programs with Rabit Compile Programs with Rabit
==== ====
@ -191,73 +190,72 @@ Running Rabit Jobs
Rabit is a portable library that can run on multiple platforms. Rabit is a portable library that can run on multiple platforms.
#### Running Rabit Locally #### Running Rabit Locally
* You can use [../tracker/rabit_demo.py](../tracker/rabit_demo.py) to start n process locally * You can use [../tracker/rabit_demo.py](../tracker/rabit_demo.py) to start n processes locally
* This script will restart the program when it exits with -2, so it can be used for [mock test](#link-against-mock-test-library) * This script will restart the program when it exits with -2, so it can be used for [mock test](#link-against-mock-test-library)
#### Running Rabit on Hadoop #### Running Rabit on Hadoop
* You can use [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) to run rabit program on hadoop * You can use [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) to run rabit programs on hadoop
* This will start n rabit program as mapper of MapReduce * This will start n rabit programs as mappers of MapReduce
* Each program can read its part of data from stdin * Each program can read its portion of data from stdin
* Yarn is highly recommended, since Yarn allows specifying ncpu and memory of each mapper * Yarn is highly recommended, since Yarn allows specifying number of cpus and memory of each mapper:
- This allows multi-threading programs in each node, which can be more efficient - This allows multi-threading programs in each node, which can be more efficient
- A good possible practice is OpenMP-rabit hybrid code - An easy multi-threading solution could be to use OpenMP with rabit code
#### Running Rabit on Yarn #### Running Rabit on Yarn
* To Be modified from [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) * To Be modified from [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py)
#### Running Rabit using MPI #### Running Rabit using MPI
* You can submit rabit programs to MPI cluster using [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py). * You can submit rabit programs to an MPI cluster using [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py).
* If you linked your code against librabit_mpi.a, then you can directly use mpirun to submit the job * If you linked your code against librabit_mpi.a, then you can directly use mpirun to submit the job
#### Customize Tracker Script #### Customize Tracker Script
You can also modify the tracker script to allow rabit run on other platforms. To do so, refer to the existing You can also modify the tracker script to allow rabit to run on other platforms. To do so, refer to existing
tracker script such as [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) and [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py) tracker scripts, such as [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) and [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py) to get a sense of how it is done.
You will need to implement a platform dependent submission function with the following definition You will need to implement a platform dependent submission function with the following definition
```python ```python
def fun_submit(nslave, slave_args): def fun_submit(nworkers, worker_args):
""" """
customized submit script, that submit nslave jobs, customized submit script, that submits nslave jobs,
each must contain args as parameter each must contain args as parameter
note this can be a lambda closure note this can be a lambda closure
Parameters Parameters
nslave number of slave process to start up nworkers number of worker processes to start
worker_args tracker information which must be passed to the arguments worker_args tracker information which must be passed to the arguments
this usually includes the parameters of master_uri and port etc. this usually includes the parameters of master_uri and port, etc.
""" """
``` ```
The submission function should start nslave process in the platform, and append slave_args to the end of other arguments. The submission function should start nworkers processes in the platform, and append worker_args to the end of the other arguments.
Then we can simply call ```tracker.submit``` with fun_submit to submit jobs in the target platform Then you can simply call ```tracker.submit``` with fun_submit to submit jobs to the target platform
Note that the current rabit tracker do not restart a worker when it dies, the job of fail-restart thus lies on the platform itself or we should write Note that the current rabit tracker does not restart a worker when it dies, the restart of a node is done by the platform, otherwise we should write the fail-restart logic in the custom script.
fail-restart logic in the customization script.
* Fail-restart is usually provided by most platforms. * Fail-restart is usually provided by most platforms.
* For example, mapreduce will restart a mapper when it fails * For example, mapreduce will restart a mapper when it fails
Fault Tolerance Fault Tolerance
===== =====
This section introduces the how fault tolerance works in rabit. This section introduces how fault tolerance works in rabit.
We can use the following figure to show the how rabit deals with failures. The following figure shows how rabit deals with failures.
![](http://homes.cs.washington.edu/~tqchen/rabit/fig/fault-tol.png) ![](http://homes.cs.washington.edu/~tqchen/rabit/fig/fault-tol.png)
The scenario is as follows: The scenario is as follows:
* Node 1 fails between the first and second call of Allreduce after the latest checkpoint * Node 1 fails between the first and second call of Allreduce after the second checkpoint
* Other nodes stay in the call of second Allreduce to help node 1 to recover. * The other nodes wait in the call of the second Allreduce in order to help node 1 to recover.
* When node 1 restarts, it will call ```LoadCheckPoint```, and get the latest checkpoint from one of the existing nodes. * When node 1 restarts, it will call ```LoadCheckPoint```, and get the latest checkpoint from one of the existing nodes.
* Then node 1 can start from the latest checkpoint and continue running. * Then node 1 can start from the latest checkpoint and continue running.
* When node 1 call the first Allreduce again, because the other nodes already knows the result of allreduce, node 1 can get the result from one of the nodes. * When node 1 calls the first Allreduce again, as the other nodes already know the result, node 1 can get it from one of them.
* When node 1 reaches the second Allreduce, other nodes find out that node 1 has catched up and they can continue the program normally. * When node 1 reaches the second Allreduce, the other nodes find out that node 1 has catched up and they can continue the program normally.
We can find that this fault tolerance model is based on the a key property of Allreduce and Broadcast: This fault tolerance model is based on a key property of Allreduce and Broadcast:
All the nodes get the same result when calling Allreduce/Broadcast. Because of this property, we can have some node records the history, All the nodes get the same result when calling Allreduce/Broadcast. Because of this property, any node can record the history,
and when a node recovers, the result can be forwarded to the recovering node. and when a node recovers, the result can be forwarded to it.
The checkpoint is introduced so that we do not have to discard the history before the checkpoint, so that the iterative program can be more The checkpoint is introduced so that we do not have to discard the history before checkpointing, so that the iterative program can be more
efficient. The strategy of rabit is different from fail-restart strategy where all the nodes restarts from checkpoint efficient. The strategy of rabit is different from the fail-restart strategy where all the nodes restart from the same checkpoint
when any of the node fails. All the program only block in the Allreduce call to help the recovery, and the checkpoint is only saved locally without when any of them fails. All the processes block in the Allreduce call to help the recovery, and the checkpoint is only saved locally without
touching the disk. This makes rabit program more reliable and efficient. touching the disk. This makes rabit programs more reliable and efficient.
This is an conceptual introduction to the fault tolerant model of rabit. The actual implementation is more sophiscated, This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated,
and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase. and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.