sparse kmeans

This commit is contained in:
tqchen 2014-12-03 18:15:28 -08:00
parent e3a95b2d1a
commit 69af79d45d
2 changed files with 185 additions and 123 deletions

View File

@ -2,160 +2,133 @@
// facing an exception
#include <rabit.h>
#include <utils.h>
#include <cstdio>
#include <cstdlib>
#include <cmath>
#include <sstream>
#include <fstream>
#include <ctime>
#include <cfloat>
#include "./toolkit_util.h"
using namespace rabit;
// kmeans model
class Model : public rabit::utils::ISerializable {
public:
std::vector<float> centroids;
// matrix of centroids
Matrix centroids;
// load from stream
virtual void Load(rabit::utils::IStream &fi) {
fi.Read(&centroids);
fi.Read(&centroids.nrow, sizeof(centroids.nrow));
fi.Read(&centroids.ncol, sizeof(centroids.ncol));
fi.Read(&centroids.data);
}
/*! \brief save the model to the stream */
virtual void Save(rabit::utils::IStream &fo) const {
fo.Write(centroids);
fo.Write(&centroids.nrow, sizeof(centroids.nrow));
fo.Write(&centroids.ncol, sizeof(centroids.ncol));
fo.Write(centroids.data);
}
virtual void InitModel(unsigned num_cluster, unsigned feat_dim) {
centroids.Init(num_cluster, feat_dim);
}
// normalize L2 norm
inline void Normalize(void) {
for (size_t i = 0; i < centroids.nrow; ++i) {
float *row = centroids[i];
double wsum = 0.0;
for (size_t j = 0; j < centroids.ncol; ++j) {
wsum += row[j] * row[j];
}
wsum = sqrt(wsum);
if (wsum < 1e-6) return;
float winv = 1.0 / wsum;
for (size_t j = 0; j < centroids.ncol; ++j) {
row[j] *= winv;
}
}
virtual void InitModel(int k, int d) {
centroids.resize(k * d, 0.0f);
}
};
/*!\brief computes a random number modulo the value */
inline int Random(int value) {
return rand() % value;
}
inline void KMeans(int ntrial, int iter, int k, int d, std::vector<std::vector<float> >& data, Model *model) {
int rank = rabit::GetRank();
int nproc = rabit::GetWorldSize();
utils::LogPrintf("[%d] Running KMeans iter=%d\n", rank, iter);
// compute ndata based on assignments
std::vector<float> ndata(k * d + k, 0.0f);
for (int i = 0; i < data.size(); ++i) {
float max_sim = FLT_MIN;
int cindex = -1;
for (int j = 0; j < k; ++j) {
float sim = 0.0f;
int cstart = j * d;
for (int y = 0, z = cstart; y < d; ++y, ++z) {
sim += model->centroids[z] * data[i][y];
}
if (sim > max_sim) {
cindex = j;
max_sim = sim;
inline void InitCentroids(const SparseMat &data, Matrix *centroids) {
int num_cluster = centroids->nrow;
for (int i = 0; i < num_cluster; ++i) {
int index = Random(data.NumRow());
SparseMat::Vector v = data[index];
for (unsigned j = 0; j < v.length; ++j) {
(*centroids)[i][v[j].findex] = v[j].fvalue;
}
}
int start = cindex * d + cindex;
int j = start;
for (int l = 0; l < d; ++j, ++l) {
ndata[j] += data[i][l];
}
// update count
ndata[j] += 1;
}
// do Allreduce
rabit::Allreduce<op::Sum>(&ndata[0], ndata.size());
for (int i = 0; i < k; ++i) {
int nstart = i * d + i;
int cstart = i * d;
int cend= cstart + d;
int count = ndata[nstart + d];
for (int j = nstart, l = cstart; l < cend; ++j, ++l) {
model->centroids[l] = ndata[j] / count;
}
}
}
inline void ReadData(char* data_dir, int d, std::vector<std::vector<float> >* data) {
int rank = rabit::GetRank();
std::stringstream ss;
ss << data_dir << rank;
const char* file = ss.str().c_str();
std::ifstream ifs(file);
utils::Check(ifs.good(), "[%d] File %s does not exist\n", rank, file);
float v = 0.0f;
while(!ifs.eof()) {
int i=0;
std::vector<float> vec;
while (i < d) {
ifs >> v;
vec.push_back(v);
i++;
}
utils::Check(vec.size() % d == 0, "[%d] Invalid data size. %d instead of %d\n", rank, vec.size(), d);
data->push_back(vec);
for (int i = 0; i < num_cluster; ++i) {
int proc = Random(rabit::GetWorldSize());
rabit::Broadcast((*centroids)[i], centroids->ncol * sizeof(float), proc);
}
}
inline void InitCentroids(int k, int d, std::vector<std::vector<float> >& data, Model* model) {
int rank = rabit::GetRank();
int nproc = rabit::GetWorldSize();
std::vector<std::vector<float> > candidate_centroids;
candidate_centroids.resize(k, std::vector<float>(d));
int elements = data.size();
for (size_t i = 0; i < k; ++i) {
int index = Random(elements);
candidate_centroids[i] = data[index];
inline double Cos(const float *row,
const SparseMat::Vector &v) {
double rdot = 0.0, rnorm = 0.0;
for (unsigned i = 0; i < v.length; ++i) {
rdot += row[v[i].findex] * v[i].fvalue;
rnorm += v[i].fvalue * v[i].fvalue;
}
for (size_t i = 0; i < k; ++i) {
int proc = Random(nproc);
std::vector<float> tmp(d, 0.0f);
if (proc == rank) {
tmp = candidate_centroids[i];
rabit::Broadcast(&tmp, proc);
} else {
rabit::Broadcast(&tmp, proc);
return rdot / sqrt(rnorm);
}
int start = i * d;
int j = start;
for (int l = 0; l < d; ++j, ++l) {
model->centroids[j] = tmp[l];
inline size_t GetCluster(const Matrix &centroids,
const SparseMat::Vector &v) {
size_t imin = 0;
double dmin = Cos(centroids[0], v);
for (size_t k = 1; k < centroids.nrow; ++k) {
double dist = Cos(centroids[k], v);
if (dist < dmin) {
dmin = dist; imin = k;
}
}
return imin;
}
int main(int argc, char *argv[]) {
if (argc < 4) {
printf("Usage: <k> <d> <itr> <data_dir>\n");
printf("Usage: <data_dir> num_cluster max_iter\n");
return 0;
}
int k = atoi(argv[1]);
int d = atoi(argv[2]);
int max_itr = atoi(argv[3]);
rabit::Init(argc, argv);
int rank = rabit::GetRank();
int nproc = rabit::GetWorldSize();
std::string name = rabit::GetProcessorName();
srand(0);
int ntrial = 0;
// load the data
SparseMat data;
data.Load(argv[1]);
// set the parameters
int num_cluster = atoi(argv[2]);
int max_iter = atoi(argv[3]);
// intialize rabit engine
rabit::Init(argc, argv);
// load model
Model model;
std::vector<std::vector<float> > data;
int iter = rabit::LoadCheckPoint(&model);
if (iter == 0) {
ReadData(argv[4], d, &data);
model.InitModel(k, d);
InitCentroids(k, d, data, &model);
rabit::Allreduce<op::Max>(&data.feat_dim, sizeof(data.feat_dim));
model.InitModel(num_cluster, data.feat_dim);
InitCentroids(data, &model.centroids);
model.Normalize();
utils::LogPrintf("[%d] start at %s\n",
rabit::GetRank(), rabit::GetProcessorName().c_str());
} else {
utils::LogPrintf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter);
utils::LogPrintf("[%d] restart iter=%d\n", rabit::GetRank(), iter);
}
for (int r = iter; r < max_itr; ++r) {
KMeans(ntrial, r, k, d, data, &model);
const unsigned num_feat = data.feat_dim;
// matrix to store the result
Matrix temp;
for (int r = iter; r < max_iter; ++r) {
temp.Init(num_cluster, num_feat + 1, 0.0f);
const size_t ndata = data.NumRow();
for (size_t i = 0; i < ndata; ++i) {
SparseMat::Vector v = data[i];
size_t k = GetCluster(model.centroids, v);
for (size_t j = 0; j < v.length; ++j) {
temp[k][v[j].findex] += v[j].fvalue;
}
temp[k][num_feat] += 1.0f;
}
rabit::Allreduce<op::Sum>(&temp.data[0], temp.data.size());
for (int k = 0; k < num_cluster; ++k) {
float cnt = temp[k][num_feat];
for (unsigned i = 0; i < num_feat; ++i) {
model.centroids[k][i] = temp[k][i] / cnt;
}
}
model.Normalize();
rabit::CheckPoint(model);
}
rabit::Finalize();

89
toolkit/toolkit_util.h Normal file
View File

@ -0,0 +1,89 @@
#include <rabit.h>
#include <vector>
#include <cstdlib>
#include <cstdio>
#include <cmath>
namespace rabit {
/*! \brief sparse matrix, CSR format */
struct SparseMat {
// sparse matrix entry
struct Entry {
// feature index
unsigned findex;
// feature value
float fvalue;
};
// sparse vector
struct Vector {
const Entry *data;
unsigned length;
inline const Entry &operator[](size_t i) const {
return data[i];
}
};
inline Vector operator[](size_t i) const {
Vector v;
v.data = &data[0] + row_ptr[i];
v.length = static_cast<unsigned>(row_ptr[i + 1]-row_ptr[i]);
return v;
}
// load data from file
inline void Load(const char *fname) {
FILE *fi;
if (!strcmp(fname, "stdin")) {
fi = stdin;
} else {
fi = utils::FopenCheck(fname, "r");
}
row_ptr.clear();
row_ptr.push_back(0);
data.clear();
feat_dim = 0;
unsigned num_feat;
while (fscanf(fi, "%u", &num_feat) == 1) {
Entry e;
for (unsigned i = 0; i < num_feat; ++i) {
utils::Check(fscanf(fi, "%u:%f", &e.findex, &e.fvalue) == 2,
"invalid format");
data.push_back(e);
feat_dim = std::max(e.findex, feat_dim);
}
row_ptr.push_back(data.size());
}
feat_dim += 1;
// close the filed
if (fi != stdin) fclose(fi);
}
inline size_t NumRow(void) const {
return row_ptr.size() - 1;
}
// maximum feature dimension
unsigned feat_dim;
std::vector<size_t> row_ptr;
std::vector<Entry> data;
};
// dense matrix
struct Matrix {
inline void Init(size_t nrow, size_t ncol, float v = 0.0f) {
this->nrow = nrow;
this->ncol = ncol;
data.resize(nrow * ncol);
std::fill(data.begin(), data.end(), v);
}
inline float *operator[](size_t i) {
return &data[0] + i * ncol;
}
inline const float *operator[](size_t i) const {
return &data[0] + i * ncol;
}
// number of data
size_t nrow, ncol;
std::vector<float> data;
};
/*!\brief computes a random number modulo the value */
inline int Random(int value) {
return rand() % value;
}
} // namespace rabit