diff --git a/guide/README.md b/guide/README.md index 8764ae9a1..4db7ef4cf 100644 --- a/guide/README.md +++ b/guide/README.md @@ -87,8 +87,7 @@ Besides strings, rabit also allows to broadcast constant size array and vectors. Common Use Case ===== Many distributed machine learning algorithms involve splitting the data into different nodes, -computing statistics locally, and finally aggregating them. Such workflow is usually done repetitively through -many iterations before the algorithm converges. Allreduce naturally meets the structure of such programs, +computing statistics locally, and finally aggregating them. Such workflow is usually done repetitively through many iterations before the algorithm converges. Allreduce naturally meets the structure of such programs, common use cases include: * Aggregation of gradient values, which can be used in optimization methods such as L-BFGS. @@ -131,14 +130,14 @@ and ```CheckPoint```. These two functions are used for fault-tolerance purposes. As mentioned before, traditional machine learning programs involve several iterations. In each iteration, we start with a model, make some calls to Allreduce or Broadcast and update the model. The calling sequence in each iteration does not need to be the same. -* When the nodes start from the beginning (i.e. iteration 0), LoadCheckPoint returns 0, so we can initialize the model. +* When the nodes start from the beginning (i.e. iteration 0), ```LoadCheckPoint``` returns 0, so we can initialize the model. * ```CheckPoint``` saves the model after each iteration. - Efficiency Note: the model is only kept in local memory and no save to disk is performed when calling Checkpoint * When a node goes down and restarts, ```LoadCheckPoint``` will recover the latest saved model, and * When a node goes down, the rest of the nodes will block in the call of Allreduce/Broadcast and wait for the recovery of the failed node until it catches up. -Please see the [fault tolerance procedure](#fault-tolerance) section to understand the recovery procedure executed by rabit. +Please see the [Fault Tolerance](#fault-tolerance) section to understand the recovery procedure executed by rabit. Compile Programs with Rabit ==== @@ -191,73 +190,72 @@ Running Rabit Jobs Rabit is a portable library that can run on multiple platforms. #### Running Rabit Locally -* You can use [../tracker/rabit_demo.py](../tracker/rabit_demo.py) to start n process locally +* You can use [../tracker/rabit_demo.py](../tracker/rabit_demo.py) to start n processes locally * This script will restart the program when it exits with -2, so it can be used for [mock test](#link-against-mock-test-library) #### Running Rabit on Hadoop -* You can use [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) to run rabit program on hadoop -* This will start n rabit program as mapper of MapReduce -* Each program can read its part of data from stdin -* Yarn is highly recommended, since Yarn allows specifying ncpu and memory of each mapper +* You can use [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) to run rabit programs on hadoop +* This will start n rabit programs as mappers of MapReduce +* Each program can read its portion of data from stdin +* Yarn is highly recommended, since Yarn allows specifying number of cpus and memory of each mapper: - This allows multi-threading programs in each node, which can be more efficient - - A good possible practice is OpenMP-rabit hybrid code + - An easy multi-threading solution could be to use OpenMP with rabit code #### Running Rabit on Yarn * To Be modified from [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) #### Running Rabit using MPI -* You can submit rabit programs to MPI cluster using [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py). +* You can submit rabit programs to an MPI cluster using [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py). * If you linked your code against librabit_mpi.a, then you can directly use mpirun to submit the job #### Customize Tracker Script -You can also modify the tracker script to allow rabit run on other platforms. To do so, refer to the existing -tracker script such as [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) and [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py) +You can also modify the tracker script to allow rabit to run on other platforms. To do so, refer to existing +tracker scripts, such as [../tracker/rabit_hadoop.py](../tracker/rabit_hadoop.py) and [../tracker/rabit_mpi.py](../tracker/rabit_mpi.py) to get a sense of how it is done. You will need to implement a platform dependent submission function with the following definition ```python -def fun_submit(nslave, slave_args): +def fun_submit(nworkers, worker_args): """ - customized submit script, that submit nslave jobs, + customized submit script, that submits nslave jobs, each must contain args as parameter note this can be a lambda closure Parameters - nslave number of slave process to start up + nworkers number of worker processes to start worker_args tracker information which must be passed to the arguments - this usually includes the parameters of master_uri and port etc. + this usually includes the parameters of master_uri and port, etc. """ ``` -The submission function should start nslave process in the platform, and append slave_args to the end of other arguments. -Then we can simply call ```tracker.submit``` with fun_submit to submit jobs in the target platform +The submission function should start nworkers processes in the platform, and append worker_args to the end of the other arguments. +Then you can simply call ```tracker.submit``` with fun_submit to submit jobs to the target platform -Note that the current rabit tracker do not restart a worker when it dies, the job of fail-restart thus lies on the platform itself or we should write -fail-restart logic in the customization script. +Note that the current rabit tracker does not restart a worker when it dies, the restart of a node is done by the platform, otherwise we should write the fail-restart logic in the custom script. * Fail-restart is usually provided by most platforms. * For example, mapreduce will restart a mapper when it fails Fault Tolerance ===== -This section introduces the how fault tolerance works in rabit. -We can use the following figure to show the how rabit deals with failures. +This section introduces how fault tolerance works in rabit. +The following figure shows how rabit deals with failures. ![](http://homes.cs.washington.edu/~tqchen/rabit/fig/fault-tol.png) The scenario is as follows: -* Node 1 fails between the first and second call of Allreduce after the latest checkpoint -* Other nodes stay in the call of second Allreduce to help node 1 to recover. +* Node 1 fails between the first and second call of Allreduce after the second checkpoint +* The other nodes wait in the call of the second Allreduce in order to help node 1 to recover. * When node 1 restarts, it will call ```LoadCheckPoint```, and get the latest checkpoint from one of the existing nodes. * Then node 1 can start from the latest checkpoint and continue running. -* When node 1 call the first Allreduce again, because the other nodes already knows the result of allreduce, node 1 can get the result from one of the nodes. -* When node 1 reaches the second Allreduce, other nodes find out that node 1 has catched up and they can continue the program normally. +* When node 1 calls the first Allreduce again, as the other nodes already know the result, node 1 can get it from one of them. +* When node 1 reaches the second Allreduce, the other nodes find out that node 1 has catched up and they can continue the program normally. -We can find that this fault tolerance model is based on the a key property of Allreduce and Broadcast: -All the nodes get the same result when calling Allreduce/Broadcast. Because of this property, we can have some node records the history, -and when a node recovers, the result can be forwarded to the recovering node. +This fault tolerance model is based on a key property of Allreduce and Broadcast: +All the nodes get the same result when calling Allreduce/Broadcast. Because of this property, any node can record the history, +and when a node recovers, the result can be forwarded to it. -The checkpoint is introduced so that we do not have to discard the history before the checkpoint, so that the iterative program can be more -efficient. The strategy of rabit is different from fail-restart strategy where all the nodes restarts from checkpoint -when any of the node fails. All the program only block in the Allreduce call to help the recovery, and the checkpoint is only saved locally without -touching the disk. This makes rabit program more reliable and efficient. +The checkpoint is introduced so that we do not have to discard the history before checkpointing, so that the iterative program can be more +efficient. The strategy of rabit is different from the fail-restart strategy where all the nodes restart from the same checkpoint +when any of them fails. All the processes block in the Allreduce call to help the recovery, and the checkpoint is only saved locally without +touching the disk. This makes rabit programs more reliable and efficient. -This is an conceptual introduction to the fault tolerant model of rabit. The actual implementation is more sophiscated, +This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated, and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.