change toolkit to rabitlearn

This commit is contained in:
tqchen
2015-01-16 10:45:54 -08:00
parent f5245c615c
commit c798fc2a29
9 changed files with 50 additions and 39 deletions

17
rabit-learn/README.md Normal file
View File

@@ -0,0 +1,17 @@
Rabit-Learn
====
This folder contains implementation of distributed machine learning algorithm using rabit.
It also contain links to the Machine Learning packages that uses rabit.
* Contribution of toolkits, examples, benchmarks is more than welcomed!
Toolkits
====
* [KMeans Clustering](kmeans)
* [XGBoost: eXtreme Gradient Boosting](https://github.com/tqchen/xgboost/tree/unity/multi-node)
- xgboost is a very fast boosted tree(also known as GBDT) library, that can run more than
10 times faster than existing packages
- Rabit carries xgboost to distributed enviroment, inheritating all the benefits of xgboost
single node version, and scale it to even larger problems

30
rabit-learn/common.mk Normal file
View File

@@ -0,0 +1,30 @@
# this is the common build script for rabit programs
# you do not have to use it
export CC = gcc
export CXX = g++
export MPICXX = mpicxx
export LDFLAGS= -pthread -lm -L../../lib
export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../../include -I../common
.PHONY: clean all lib mpi
all: $(BIN) $(MOCKBIN)
mpi: $(MPIBIN)
lib:
cd ../..;make lib/librabit.a lib/librabit_mock.a; cd -
libmpi:
cd ../..;make lib/librabit_mpi.a;cd -
$(BIN) :
$(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc, $^) $(LDFLAGS) -lrabit
$(MOCKBIN) :
$(CXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc, $^) $(LDFLAGS) -lrabit_mock
$(OBJ) :
$(CXX) -c $(CFLAGS) -o $@ $(firstword $(filter %.cpp %.c %.cc, $^) )
$(MPIBIN) :
$(MPICXX) $(CFLAGS) -o $@ $(filter %.cpp %.o %.c %.cc %.a, $^) $(LDFLAGS) -lrabit_mpi
clean:
$(RM) $(OBJ) $(BIN) $(MPIBIN) $(MOCKBIN) *~ ../src/*~

View File

@@ -0,0 +1,117 @@
#include <rabit.h>
#include <vector>
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <cmath>
namespace rabit {
/*! \brief sparse matrix, CSR format */
struct SparseMat {
// sparse matrix entry
struct Entry {
// feature index
unsigned findex;
// feature value
float fvalue;
};
// sparse vector
struct Vector {
const Entry *data;
unsigned length;
inline const Entry &operator[](size_t i) const {
return data[i];
}
};
inline Vector operator[](size_t i) const {
Vector v;
v.data = &data[0] + row_ptr[i];
v.length = static_cast<unsigned>(row_ptr[i + 1]-row_ptr[i]);
return v;
}
// load data from LibSVM format
inline void Load(const char *fname) {
FILE *fi;
if (!strcmp(fname, "stdin")) {
fi = stdin;
} else {
fi = utils::FopenCheck(fname, "r");
}
row_ptr.clear();
row_ptr.push_back(0);
data.clear();
feat_dim = 0;
float label; bool init = true;
char tmp[1024];
while (fscanf(fi, "%s", tmp) == 1) {
Entry e;
if (sscanf(tmp, "%u:%f", &e.findex, &e.fvalue) == 2) {
data.push_back(e);
feat_dim = std::max(e.findex, feat_dim);
} else {
if (!init) {
labels.push_back(label);
row_ptr.push_back(data.size());
}
utils::Check(sscanf(tmp, "%f", &label) == 1, "invalid LibSVM format");
init = false;
}
}
// last row
labels.push_back(label);
row_ptr.push_back(data.size());
feat_dim += 1;
// close the filed
if (fi != stdin) fclose(fi);
}
inline size_t NumRow(void) const {
return row_ptr.size() - 1;
}
// maximum feature dimension
unsigned feat_dim;
std::vector<size_t> row_ptr;
std::vector<Entry> data;
std::vector<float> labels;
};
// dense matrix
struct Matrix {
inline void Init(size_t nrow, size_t ncol, float v = 0.0f) {
this->nrow = nrow;
this->ncol = ncol;
data.resize(nrow * ncol);
std::fill(data.begin(), data.end(), v);
}
inline float *operator[](size_t i) {
return &data[0] + i * ncol;
}
inline const float *operator[](size_t i) const {
return &data[0] + i * ncol;
}
inline void Print(const char *fname) {
FILE *fo;
if (!strcmp(fname, "stdout")) {
fo = stdout;
} else {
fo = utils::FopenCheck(fname, "w");
}
for (size_t i = 0; i < data.size(); ++i) {
fprintf(fo, "%g", data[i]);
if ((i+1) % ncol == 0) {
fprintf(fo, "\n");
} else {
fprintf(fo, " ");
}
}
// close the filed
if (fo != stdout) fclose(fo);
}
// number of data
size_t nrow, ncol;
std::vector<float> data;
};
/*!\brief computes a random number modulo the value */
inline int Random(int value) {
return rand() % value;
}
} // namespace rabit

View File

@@ -0,0 +1,16 @@
# specify tensor path
BIN = kmeans.rabit
MOCKBIN= kmeans.mock
MPIBIN = kmeans.mpi
# objectives that makes up rabit library
OBJ = kmeans.o
# common build script for programs
include ../common.mk
# dependenies here
kmeans.rabit: kmeans.o lib
kmeans.mock: kmeans.o lib
kmeans.mpi: kmeans.o libmpi
kmeans.o: kmeans.cc ../../src/*.h

View File

@@ -0,0 +1,129 @@
Toolkit
====
This folder contains some example toolkits developed with rabit to help you get started.
KMeans
====
## Input File Format
KMeans uses LIBSVM format to parse the input. If you are not familiar with LIBSVM, <a href="http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/">here</a> you will find more details.
The format is the following:
&lt;label&gt; &lt;index1&gt;:&lt;value1&gt; &lt;index2&gt;:&lt;value2&gt; ...
where label is a dummy integer value in this case (you can add 1's to every example), index&lt;x&gt; is the index for feature x, and value&lt;x&gt; is the feature x value.
## Output File Format
KMeans currently outputs the centroids as dense vectors. Each line in the output file corresponds to a centroid. The number of lines in the file must match the number of clusters K you specified in the command line.
## Example
Let's go over a more detailed example...
#### Preprocess
Download the smallwiki dataset used in the Machine Learning for Big Data class at University of Washington.
http://courses.cs.washington.edu/courses/cse547/14wi/datasets/smallwiki.zip
Unzip it, you should find three files:
* tfidf.txt: each row is in the form of “docid||termid1:tfidf1,termid2:tfidf2,...
* dictionary.txt: map of term to termid
* cluster0.txt: initial cluster centers. Won't needed.
The first thing to do is to convert the tfidf file format into the input format rabit supports, i.e. LIBSVM. For that, you can use a simple python script. The following should suffice. You should redirect the output to a file, let's say tfidf.libsvm.
```python
for line in open("tfidf.txt").read().splitlines():
example = line.split('|')[1].split(',')
example = ' '.join(example)
print '%s %s' % (1, example)
```
#### Compile
You will then need to build the KMeans program with ```make```, which will produce three binaries:
* kmeans.mpi: runs on MPI.
* kmeans.mock: uses a mock to simulate error conditions for testing purposes.
* kmeans.rabit: uses our C++ implementation.
#### Running with Hadoop
If you want to run it with Hadoop, you can execute the [./kmeans_hadoop.sh](./kmeans_hadoop.sh) script from your master node in cluster.
You will have to edit the file in order to specify the path to the Hadoop Streaming jar. Afterwards, you can execute it with the following arguments (in the exact same order):
* number of worker nodes in your Hadoop cluster (i.e. number of slave nodes)
* path to the input data (HDFS path where you put the preprocessed file in libsvm format)
* number of clusters K (let's use 20 for this example)
* number of iterations to perform (let's use just 5 iterations)
* output path (HDFS path where to store the output data, must be a non-existent folder)
The current implementation runs for the amount of iterations you specify in the command line argument. If you would like to add some convergence criteria (e.g. when no cluster assignment changes between iterations you stop or something like that) you will have to modify [./kmeans.cc](./kmeans.cc). We leave that as an exercise to the reader :)
You may have noticed that [./kmeans_hadoop.sh](./kmeans_hadoop.sh) uses kmeans.rabit binary, but you can also use kmeans.mock in order to easily test your system behavior in presence of failures. More on that later.
Don't forget to copy the preprocessed file into HDFS and create the output folder. For example, inside the bin folder in Hadoop, you can execute the following:
```bash
$ ./hadoop fs -mkdir kmeans
$ ./hadoop fs -mkdir kmeans/in
$ ./hadoop fs -put tfidf.libsvm kmeans/in
$ ./hadoop fs -mkdir kmeans/out
```
#### Running with MPI
You will need to have a MPI cluster installed, for example OpenMPI. In order to run the program, you can use mpirun to submit the job. This is a non-fault tolerant version as it is backed by MPI.
#### Running with Mock
As previously mentioned, you can execute the kmeans example, an any of your own, with the mock binary. This will allow you to test error conditions while you are developing your algorithms. As explained in the [Tutorial](../guide), passing the script certain parameters (e.g. mock=0,0,1,0) will cause certain node to exit after calling Allreduce/Broadcast in some iteration.
You can also run this locally, you will only need to split the input file into several smaller files, each will be used by a particular process in the shared memory environment. You can use some Unix command line tool such as split.
#### Processing Output
Once the program finishes running, you can fetch the output from HDFS. For example, inside the bin folder in Hadoop, you can execute the following:
```bash
$ ./hadoop fs -get kmeans/out/part-00000 kmeans.out
```
Each line of the output file is a centroid in dense format. As this dataset contains the words in dictionary.txt file, you can do some simple post processing to recover the top 10 words of each centroid. Something like this should work:
```python
words = {}
for line in open("dictionary.txt").read().splitlines():
word, index = line.split(' ')
words[int(index)] = word
from collections import defaultdict
clusters = defaultdict(list)
cluster_name = 0
for line in open("kmeans.out").read().splitlines():
line = line.split(' ')
clusters[cluster_name].extend(line)
cluster_name+=1
import numpy as np
for j, key in enumerate(clusters):
elements = clusters[key]
array = np.array(elements).astype(np.float32)
idx = np.argsort(array)[::-1][:10]
ws = []
for i in idx:
ws.append(words[i])
print 'cluster %d = %s' % (j, ' '.join(ws))
```

View File

@@ -0,0 +1,162 @@
// this is a test case to test whether rabit can recover model when
// facing an exception
#include <rabit.h>
#include <rabit/utils.h>
#include "./toolkit_util.h"
#include <time.h>
using namespace rabit;
// kmeans model
class Model : public rabit::ISerializable {
public:
// matrix of centroids
Matrix centroids;
// load from stream
virtual void Load(rabit::IStream &fi) {
fi.Read(&centroids.nrow, sizeof(centroids.nrow));
fi.Read(&centroids.ncol, sizeof(centroids.ncol));
fi.Read(&centroids.data);
}
/*! \brief save the model to the stream */
virtual void Save(rabit::IStream &fo) const {
fo.Write(&centroids.nrow, sizeof(centroids.nrow));
fo.Write(&centroids.ncol, sizeof(centroids.ncol));
fo.Write(centroids.data);
}
virtual void InitModel(unsigned num_cluster, unsigned feat_dim) {
centroids.Init(num_cluster, feat_dim);
}
// normalize L2 norm
inline void Normalize(void) {
for (size_t i = 0; i < centroids.nrow; ++i) {
float *row = centroids[i];
double wsum = 0.0;
for (size_t j = 0; j < centroids.ncol; ++j) {
wsum += row[j] * row[j];
}
wsum = sqrt(wsum);
if (wsum < 1e-6) return;
float winv = 1.0 / wsum;
for (size_t j = 0; j < centroids.ncol; ++j) {
row[j] *= winv;
}
}
}
};
inline void InitCentroids(const SparseMat &data, Matrix *centroids) {
int num_cluster = centroids->nrow;
for (int i = 0; i < num_cluster; ++i) {
int index = Random(data.NumRow());
SparseMat::Vector v = data[index];
for (unsigned j = 0; j < v.length; ++j) {
(*centroids)[i][v[j].findex] = v[j].fvalue;
}
}
for (int i = 0; i < num_cluster; ++i) {
int proc = Random(rabit::GetWorldSize());
rabit::Broadcast((*centroids)[i], centroids->ncol * sizeof(float), proc);
}
}
inline double Cos(const float *row,
const SparseMat::Vector &v) {
double rdot = 0.0, rnorm = 0.0;
for (unsigned i = 0; i < v.length; ++i) {
rdot += row[v[i].findex] * v[i].fvalue;
rnorm += v[i].fvalue * v[i].fvalue;
}
return rdot / sqrt(rnorm);
}
inline size_t GetCluster(const Matrix &centroids,
const SparseMat::Vector &v) {
size_t imin = 0;
double dmin = Cos(centroids[0], v);
for (size_t k = 1; k < centroids.nrow; ++k) {
double dist = Cos(centroids[k], v);
if (dist > dmin) {
dmin = dist; imin = k;
}
}
return imin;
}
int main(int argc, char *argv[]) {
if (argc < 5) {
if (rabit::GetRank() == 0) {
rabit::TrackerPrintf("Usage: <data_dir> num_cluster max_iter <out_model>\n");
}
return 0;
}
clock_t tStart = clock();
srand(0);
// load the data
SparseMat data;
data.Load(argv[1]);
// set the parameters
int num_cluster = atoi(argv[2]);
int max_iter = atoi(argv[3]);
// intialize rabit engine
rabit::Init(argc, argv);
// load model
Model model;
int iter = rabit::LoadCheckPoint(&model);
if (iter == 0) {
rabit::Allreduce<op::Max>(&data.feat_dim, 1);
model.InitModel(num_cluster, data.feat_dim);
InitCentroids(data, &model.centroids);
model.Normalize();
rabit::TrackerPrintf("[%d] start at %s\n",
rabit::GetRank(), rabit::GetProcessorName().c_str());
} else {
rabit::TrackerPrintf("[%d] restart iter=%d\n", rabit::GetRank(), iter);
}
const unsigned num_feat = data.feat_dim;
// matrix to store the result
Matrix temp;
for (int r = iter; r < max_iter; ++r) {
temp.Init(num_cluster, num_feat + 1, 0.0f);
#if __cplusplus >= 201103L
auto lazy_get_centroid = [&]()
#endif
{
// lambda function used to calculate the data if necessary
// this function may not be called when the result can be directly recovered
const size_t ndata = data.NumRow();
for (size_t i = 0; i < ndata; ++i) {
SparseMat::Vector v = data[i];
size_t k = GetCluster(model.centroids, v);
// temp[k] += v
for (size_t j = 0; j < v.length; ++j) {
temp[k][v[j].findex] += v[j].fvalue;
}
// use last column to record counts
temp[k][num_feat] += 1.0f;
}
};
// call allreduce
#if __cplusplus >= 201103L
rabit::Allreduce<op::Sum>(&temp.data[0], temp.data.size(), lazy_get_centroid);
#else
rabit::Allreduce<op::Sum>(&temp.data[0], temp.data.size());
#endif
// set number
for (int k = 0; k < num_cluster; ++k) {
float cnt = temp[k][num_feat];
utils::Check(cnt != 0.0f, "get zero sized cluster");
for (unsigned i = 0; i < num_feat; ++i) {
model.centroids[k][i] = temp[k][i] / cnt;
}
}
model.Normalize();
rabit::CheckPoint(&model);
}
// output the model file to somewhere
if (rabit::GetRank() == 0) {
model.centroids.Print(argv[4]);
}
rabit::TrackerPrintf("[%d] Time taken: %f seconds\n", rabit::GetRank(), static_cast<float>(clock() - tStart) / CLOCKS_PER_SEC);
rabit::Finalize();
return 0;
}

View File

@@ -0,0 +1,9 @@
#!/bin/bash
if [ "$#" -lt 5 ];
then
echo "Usage: <nslaves> <input_data> <ncluster> <max_iteration> <output>"
exit -1
fi
#set path to hadoop streaming jar here
STREAMING_JAR=
python ../tracker/rabit_hadoop.py -hs $STREAMING_JAR -n $1 -i $2 -o $5 kmeans.rabit stdin $3 $4 stdout