From 92c94176c184266bae4418ac946ff9f820ac9f55 Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 13 Jan 2015 00:13:05 -0800 Subject: [PATCH 1/6] adding some changes to kmeans --- toolkit/README.md | 123 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 108 insertions(+), 15 deletions(-) diff --git a/toolkit/README.md b/toolkit/README.md index 0af435e77..7bf610a0f 100644 --- a/toolkit/README.md +++ b/toolkit/README.md @@ -5,21 +5,6 @@ This folder contains some example toolkits developed with rabit to help you get KMeans ==== -#### How to run it -You will need to build the program with ```make```. -If you want to run it with Hadoop, you can execute the [./kmeans_hadoop.sh](./kmeans_hadoop.sh) script from your master node in cluster. -You will have to edit the file in order to specify the path to the Hadoop Streaming jar. Afterwards, you can execute it with the following arguments (in the exact same order): - -* number of worker nodes in your Hadoop cluster (i.e. number of slaves) -* path to the input data (HDFS path where you put the data) -* number of clusters K -* number of iterations to perform -* output path (HDFS path where to store the output data, must be new) - -If you take a look at [./kmeans_hadoop.sh](./kmeans_hadoop.sh), you can see that it runs the kmeans.rabit version. If you want to run the program backed by the mock, you will need to update it accordingly, i.e. use kmeans.mock instead. - -The current implementation runs for the amount of iterations you specify in the command line argument. If you would like to add some convergence criteria (e.g. when no cluster assignment changes between iterations you stop or something like that) you will have to modify [./kmeans.cc](./kmeans.cc). We leave that as an exercise to the reader :) - #### Input File Format KMeans uses LIBSVM format to parse the input. If you are not familiar with LIBSVM, here you will find more details. @@ -31,3 +16,111 @@ where label is a dummy integer value in this case (you can add 1's to every exam #### Output File Format KMeans currently outputs the centroids as dense vectors. Each line in the output file corresponds to a centroid. The number of lines in the file must match the number of clusters K you specified in the command line. + +#### Example + +Let's go over a more detailed example... + +#### Preprocess + +Download the smallwiki dataset used in the Machine Learning for Big Data class at University of Washington. + +wget http://courses.cs.washington.edu/courses/cse547/14wi/datasets/smallwiki.zip + +Unzip it, you should find three files: +* tfidf.txt: each row is in the form of “docid||termid1:tfidf1,termid2:tfidf2,... +* dictionary.txt: map of term to termid +* cluster0.txt: initial cluster centers. Won't needed. + +The first thing to do is to convert the tfidf file format into the input format rabit supports, i.e. LIBSVM. For that, you can use a simple python script. The following should suffice. You should redirect the output to a file, let's say tfidf.libsvm. + +```python + for line in open("tfidf.txt").read().splitlines(): + example = line.split('|')[1].split(',') + example = ' '.join(example) + print '%s %s' % (1, example) +``` +#### Compile + +You will then need to build the KMeans program with ```make```, which will produce three binaries: + +* kmeans.mpi: runs on MPI. +* kmeans.mock: uses a mock to simulate error conditions for testing purposes. +* kmeans.rabit: uses our C++ implementation. + +#### Running with Hadoop + +If you want to run it with Hadoop, you can execute the [./kmeans_hadoop.sh](./kmeans_hadoop.sh) script from your master node in cluster. +You will have to edit the file in order to specify the path to the Hadoop Streaming jar. Afterwards, you can execute it with the following arguments (in the exact same order): + +* number of worker nodes in your Hadoop cluster (i.e. number of slave nodes) +* path to the input data (HDFS path where you put the preprocessed file in libsvm format) +* number of clusters K +* number of iterations to perform +* output path (HDFS path where to store the output data, must be a non-existent folder) + +The current implementation runs for the amount of iterations you specify in the command line argument. If you would like to add some convergence criteria (e.g. when no cluster assignment changes between iterations you stop or something like that) you will have to modify [./kmeans.cc](./kmeans.cc). We leave that as an exercise to the reader :) + +You may have noticed that [./kmeans_hadoop.sh](./kmeans_hadoop.sh) uses kmeans.rabit binary, but you can also use kmeans.mock in order to easily test your system behavior in presence of failures. More on that later. + +Don't forget to copy the preprocessed file into HDFS and create the output folder. For example, inside the bin folder in Hadoop, you can execute the following: + +```bash +$ ./hadoop fs -mkdir kmeans +$ ./hadoop fs -mkdir kmeans/in +$ ./hadoop fs -put tfidf.libsvm kmeans/in +$ ./hadoop fs -mkdir kmeans/out +``` + +#### Running with MPI + +You will need to have a MPI cluster installed, for example OpenMPI. In order to run the program, you can use mpirun to submit the job. This is a non-fault tolerant version as it is backed by MPI. + + +#### Running with Mock + +As previously mentioned, you can execute the kmeans example, an any of your own, with the mock binary. This will allow you to test error conditions while you are developing your algorithms. As explained in the Tutorial, passing the script certain parameters (e.g. mock=0,0,1,0) will cause certain node to exit after calling Allreduce/Broadcast in certain iteration. For more details refer to the [guide](./guide). + +#### Processing Output + +Once the program finishes running, you can fetch the output from HDFS. For example, inside the bin folder in Hadoop, you can execute the following: + +```bash +$ ./hadoop fs -get kmeans/out/part-00000 kmeans.out + +``` + +Each line of the output file is a centroid in dense format. As this dataset contains the words in dictionary.txt file, you can do some simple post processing to recover the top 10 words of each centroid. Something like this should work: + +```python + words = {} + for line in open("dictionary.txt").read().splitlines(): + word, index = line.split(' ') + words[int(index)] = word + + from collections import defaultdict + clusters = defaultdict(list) + cluster_name = 0 + for line in open("kmeans.out").read().splitlines(): + line = line.split(' ') + clusters[cluster_name].extend(line) + cluster_name+=1 + + import numpy as np + for j, key in enumerate(clusters): + elements = clusters[key] + array = np.array(elements).astype(np.float32) + idx = np.argsort(array)[::-1][:10] + ws = [] + for i in idx: + ws.append(words[i]) + print 'cluster %d = %s' % (j, ' '.join(ws)) +``` + + + + + + + + From 48c42bf189f61e22f2ae25495ca02240657cfeb2 Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 13 Jan 2015 00:18:46 -0800 Subject: [PATCH 2/6] fixing stuff --- toolkit/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/toolkit/README.md b/toolkit/README.md index 7bf610a0f..502454982 100644 --- a/toolkit/README.md +++ b/toolkit/README.md @@ -25,7 +25,7 @@ Let's go over a more detailed example... Download the smallwiki dataset used in the Machine Learning for Big Data class at University of Washington. -wget http://courses.cs.washington.edu/courses/cse547/14wi/datasets/smallwiki.zip +http://courses.cs.washington.edu/courses/cse547/14wi/datasets/smallwiki.zip Unzip it, you should find three files: * tfidf.txt: each row is in the form of “docid||termid1:tfidf1,termid2:tfidf2,... @@ -79,7 +79,7 @@ You will need to have a MPI cluster installed, for example OpenMPI. In order to #### Running with Mock -As previously mentioned, you can execute the kmeans example, an any of your own, with the mock binary. This will allow you to test error conditions while you are developing your algorithms. As explained in the Tutorial, passing the script certain parameters (e.g. mock=0,0,1,0) will cause certain node to exit after calling Allreduce/Broadcast in certain iteration. For more details refer to the [guide](./guide). +As previously mentioned, you can execute the kmeans example, an any of your own, with the mock binary. This will allow you to test error conditions while you are developing your algorithms. As explained in the [Tutorial](../guide), passing the script certain parameters (e.g. mock=0,0,1,0) will cause certain node to exit after calling Allreduce/Broadcast in some iteration. #### Processing Output From 54e2f7e90d7e03430dbce62ea999163c9cd887d6 Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 13 Jan 2015 00:48:37 -0800 Subject: [PATCH 3/6] adding wrapper section --- guide/README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/guide/README.md b/guide/README.md index 0f25eae23..e953f3433 100644 --- a/guide/README.md +++ b/guide/README.md @@ -16,6 +16,7 @@ Please also refer to the [API Documentation](http://homes.cs.washington.edu/~tqc - [Running Rabit using MPI](#running-rabit-using-mpi) - [Customize Tracker Script](#customize-tracker-script) * [Fault Tolerance](#fault-tolerance) +* [Python Wrapper](#python-wrapper) What is Allreduce ===== @@ -255,3 +256,11 @@ touching the disk. This makes rabit programs more reliable and efficient. This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated, and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase. +Python Wrapper +===== +In order to make the library available for a wider range of developers, we decided to provide a python wrapper to our C++ code. + +Developers can now program rabit applications in Python! We provide a couple of examples: + +* [./basic.py](./basic.py) : [./basic.cc] counterpart, explained above. +* [./broadcast.py](./broadcast.py) : [./broadcast.cc] counterpart, explained above. From 5c7967e86309098393c9a064a92da228ab78ff5d Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 13 Jan 2015 00:49:57 -0800 Subject: [PATCH 4/6] adding link --- guide/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/guide/README.md b/guide/README.md index e953f3433..205537065 100644 --- a/guide/README.md +++ b/guide/README.md @@ -262,5 +262,5 @@ In order to make the library available for a wider range of developers, we decid Developers can now program rabit applications in Python! We provide a couple of examples: -* [./basic.py](./basic.py) : [./basic.cc] counterpart, explained above. -* [./broadcast.py](./broadcast.py) : [./broadcast.cc] counterpart, explained above. +* [./basic.py](./basic.py) : [./basic.cc](./basic.cc) counterpart, explained above. +* [./broadcast.py](./broadcast.py) : [./broadcast.cc](./broadcast.cc) counterpart, explained above. From 95c6d7398f3f20f094effe6593d2db55f852244f Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 13 Jan 2015 00:59:20 -0800 Subject: [PATCH 5/6] adding more stuff --- toolkit/README.md | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/toolkit/README.md b/toolkit/README.md index 502454982..fd151f218 100644 --- a/toolkit/README.md +++ b/toolkit/README.md @@ -21,7 +21,7 @@ KMeans currently outputs the centroids as dense vectors. Each line in the output Let's go over a more detailed example... -#### Preprocess +# Preprocess Download the smallwiki dataset used in the Machine Learning for Big Data class at University of Washington. @@ -40,7 +40,7 @@ The first thing to do is to convert the tfidf file format into the input format example = ' '.join(example) print '%s %s' % (1, example) ``` -#### Compile +# Compile You will then need to build the KMeans program with ```make```, which will produce three binaries: @@ -48,15 +48,15 @@ You will then need to build the KMeans program with ```make```, which will produ * kmeans.mock: uses a mock to simulate error conditions for testing purposes. * kmeans.rabit: uses our C++ implementation. -#### Running with Hadoop +# Running with Hadoop If you want to run it with Hadoop, you can execute the [./kmeans_hadoop.sh](./kmeans_hadoop.sh) script from your master node in cluster. You will have to edit the file in order to specify the path to the Hadoop Streaming jar. Afterwards, you can execute it with the following arguments (in the exact same order): * number of worker nodes in your Hadoop cluster (i.e. number of slave nodes) * path to the input data (HDFS path where you put the preprocessed file in libsvm format) -* number of clusters K -* number of iterations to perform +* number of clusters K (let's use 20 for this example) +* number of iterations to perform (let's use just 5 iterations) * output path (HDFS path where to store the output data, must be a non-existent folder) The current implementation runs for the amount of iterations you specify in the command line argument. If you would like to add some convergence criteria (e.g. when no cluster assignment changes between iterations you stop or something like that) you will have to modify [./kmeans.cc](./kmeans.cc). We leave that as an exercise to the reader :) @@ -72,16 +72,19 @@ $ ./hadoop fs -put tfidf.libsvm kmeans/in $ ./hadoop fs -mkdir kmeans/out ``` -#### Running with MPI +# Running with MPI You will need to have a MPI cluster installed, for example OpenMPI. In order to run the program, you can use mpirun to submit the job. This is a non-fault tolerant version as it is backed by MPI. -#### Running with Mock +# Running with Mock As previously mentioned, you can execute the kmeans example, an any of your own, with the mock binary. This will allow you to test error conditions while you are developing your algorithms. As explained in the [Tutorial](../guide), passing the script certain parameters (e.g. mock=0,0,1,0) will cause certain node to exit after calling Allreduce/Broadcast in some iteration. -#### Processing Output +You can also run this locally, you will only need to split the input file into several smaller files, each will be used by a particular process in the shared memory environment. You can use some Unix command line tool such as split. + + +# Processing Output Once the program finishes running, you can fetch the output from HDFS. For example, inside the bin folder in Hadoop, you can execute the following: From f79e5fc041c88a042b9dc3557b9ddd5e23ee4962 Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 13 Jan 2015 01:00:58 -0800 Subject: [PATCH 6/6] adding more stuff --- toolkit/README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/toolkit/README.md b/toolkit/README.md index fd151f218..78a0a2e20 100644 --- a/toolkit/README.md +++ b/toolkit/README.md @@ -5,7 +5,7 @@ This folder contains some example toolkits developed with rabit to help you get KMeans ==== -#### Input File Format +## Input File Format KMeans uses LIBSVM format to parse the input. If you are not familiar with LIBSVM, here you will find more details. The format is the following: @@ -14,14 +14,14 @@ The format is the following: where label is a dummy integer value in this case (you can add 1's to every example), index<x> is the index for feature x, and value<x> is the feature x value. -#### Output File Format +## Output File Format KMeans currently outputs the centroids as dense vectors. Each line in the output file corresponds to a centroid. The number of lines in the file must match the number of clusters K you specified in the command line. -#### Example +## Example Let's go over a more detailed example... -# Preprocess +#### Preprocess Download the smallwiki dataset used in the Machine Learning for Big Data class at University of Washington. @@ -40,7 +40,7 @@ The first thing to do is to convert the tfidf file format into the input format example = ' '.join(example) print '%s %s' % (1, example) ``` -# Compile +#### Compile You will then need to build the KMeans program with ```make```, which will produce three binaries: @@ -48,7 +48,7 @@ You will then need to build the KMeans program with ```make```, which will produ * kmeans.mock: uses a mock to simulate error conditions for testing purposes. * kmeans.rabit: uses our C++ implementation. -# Running with Hadoop +#### Running with Hadoop If you want to run it with Hadoop, you can execute the [./kmeans_hadoop.sh](./kmeans_hadoop.sh) script from your master node in cluster. You will have to edit the file in order to specify the path to the Hadoop Streaming jar. Afterwards, you can execute it with the following arguments (in the exact same order): @@ -72,19 +72,19 @@ $ ./hadoop fs -put tfidf.libsvm kmeans/in $ ./hadoop fs -mkdir kmeans/out ``` -# Running with MPI +#### Running with MPI You will need to have a MPI cluster installed, for example OpenMPI. In order to run the program, you can use mpirun to submit the job. This is a non-fault tolerant version as it is backed by MPI. -# Running with Mock +#### Running with Mock As previously mentioned, you can execute the kmeans example, an any of your own, with the mock binary. This will allow you to test error conditions while you are developing your algorithms. As explained in the [Tutorial](../guide), passing the script certain parameters (e.g. mock=0,0,1,0) will cause certain node to exit after calling Allreduce/Broadcast in some iteration. You can also run this locally, you will only need to split the input file into several smaller files, each will be used by a particular process in the shared memory environment. You can use some Unix command line tool such as split. -# Processing Output +#### Processing Output Once the program finishes running, you can fetch the output from HDFS. For example, inside the bin folder in Hadoop, you can execute the following: