Merge branch 'master' of ssh://github.com/tqchen/rabit

Conflicts:
	tracker/rabit_hadoop.py
This commit is contained in:
tqchen 2015-01-12 12:03:00 -08:00
commit 5a457d69fc
3 changed files with 40 additions and 15 deletions

View File

@ -112,7 +112,7 @@ int main(int argc, char *argv[]) {
if (version == 0) model.InitModel();
// the version number marks the iteration to resume
for (int iter = version; iter < max_iter; ++iter) {
// model should be sufficient variable at this point
// at this point, the model object should allow us to recover the program state
...
// each iteration can contain multiple calls of allreduce/broadcast
rabit::Allreduce(&data[0], n);

View File

@ -1,8 +1,33 @@
Toolkit
====
This folder contains example toolkit developed using rabit
This folder contains some example toolkits developed with rabit to help you get started.
KMeans
====
* Kmeans taks in LIBSVM format
* You will need a dummy label field at beginning of all the lines to get KMeans
#### How to run it
You will need to build the program with ```make```.
If you want to run it with Hadoop, you can execute the [./kmeans_hadoop.sh](./kmeans_hadoop.sh) script from your master node in cluster.
You will have to edit the file in order to specify the path to the Hadoop Streaming jar. Afterwards, you can execute it with the following arguments (in the exact same order):
* number of worker nodes in your Hadoop cluster (i.e. number of slaves)
* path to the input data (HDFS path where you put the data)
* number of clusters K
* number of iterations to perform
* output path (HDFS path where to store the output data, must be new)
If you take a look at [./kmeans_hadoop.sh](./kmeans_hadoop.sh), you can see that it runs the kmeans.rabit version. If you want to run the program backed by the mock, you will need to update it accordingly, i.e. use kmeans.mock instead.
The current implementation runs for the amount of iterations you specify in the command line argument. If you would like to add some convergence criteria (e.g. when no cluster assignment changes between iterations you stop or something like that) you will have to modify [./kmeans.cc](./kmeans.cc). We leave that as an exercise to the reader :)
#### Input File Format
KMeans uses LIBSVM format to parse the input. If you are not familiar with LIBSVM, <a href="http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/">here</a> you will find more details.
The format is the following:
&lt;label&gt; &lt;index1&gt;:&lt;value1&gt; &lt;index2&gt;:&lt;value2&gt; ...
where label is a dummy integer value in this case (you can add 1's to every example), index&lt;x&gt; is the index for feature x, and value&lt;x&gt; is the feature x value.
#### Output File Format
KMeans currently outputs the centroids as dense vectors. Each line in the output file corresponds to a centroid. The number of lines in the file must match the number of clusters K you specified in the command line.

View File

@ -1,7 +1,7 @@
#!/usr/bin/python
"""
This is a script to submit rabit job using hadoop streaming
submit the rabit process as mappers of MapReduce
This is a script to submit rabit job using hadoop streaming.
It will submit the rabit process as mappers of MapReduce.
"""
import argparse
import sys
@ -21,16 +21,16 @@ hadoop_home = os.getenv('HADOOP_HOME')
if hadoop_home != None:
if hadoop_binary == None:
hadoop_binary = hadoop_home + '/bin/hadoop'
assert os.path.exists(hadoop_binary), "HADDOP_HOME does not contain the hadoop binary"
assert os.path.exists(hadoop_binary), "HADOOP_HOME does not contain the hadoop binary"
if hadoop_streaming_jar == None:
hadoop_streaming_jar = hadoop_home + '/lib/hadoop-streaming.jar'
assert os.path.exists(hadoop_streaming_jar), "HADDOP_HOME does not contain the haddop streaming jar"
assert os.path.exists(hadoop_streaming_jar), "HADOOP_HOME does not contain the hadoop streaming jar"
if hadoop_binary == None or hadoop_streaming_jar == None:
warnings.warn('Warning: Cannot auto-detect path to hadoop and hadoop-streaming jar\n'\
warnings.warn('Warning: Cannot auto-detect path to hadoop or hadoop-streaming jar\n'\
'\tneed to set them via arguments -hs and -hb\n'\
'\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME'\
', or modify rabit_hadoop.py line 14', stacklevel = 2)
', or modify rabit_hadoop.py line 16', stacklevel = 2)
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming.'\
'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended')
@ -59,22 +59,22 @@ parser.add_argument('--timeout', default=600000000, type=int,
help = 'timeout (in million seconds) of each mapper job, automatically set to a very long time,'\
'normally you do not need to set this ')
parser.add_argument('-mem', '--memory_mb', default=-1, type=int,
help = 'maximum memory used by the process, Guide: set it large (near mapred.cluster.max.map.memory.mb)'\
help = 'maximum memory used by the process. Guide: set it large (near mapred.cluster.max.map.memory.mb)'\
'if you are running multi-threading rabit,'\
'so that each node can occupy all the mapper slots in a machine for maximum performance')
if hadoop_binary == None:
parser.add_argument('-hb', '--hadoop_binary', required = True,
help="path-to-hadoop binary folder")
help="path to hadoop binary file")
else:
parser.add_argument('-hb', '--hadoop_binary', default = hadoop_binary,
help="path-to-hadoop binary folder")
help="path to hadoop binary file")
if hadoop_streaming_jar == None:
parser.add_argument('-hs', '--hadoop_streaming_jar', required = True,
help='path-to hadoop streamimg jar file')
help='path to hadoop streamimg jar file')
else:
parser.add_argument('-hs', '--hadoop_streaming_jar', default = hadoop_streaming_jar,
help='path-to hadoop streamimg jar file')
help='path to hadoop streamimg jar file')
parser.add_argument('command', nargs='+',
help = 'command for rabit program')
args = parser.parse_args()