From 69af79d45d7ad07fffb0eb9f698b5eeb195bb4a5 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 3 Dec 2014 18:15:28 -0800 Subject: [PATCH] sparse kmeans --- toolkit/kmeans.cpp | 219 ++++++++++++++++++----------------------- toolkit/toolkit_util.h | 89 +++++++++++++++++ 2 files changed, 185 insertions(+), 123 deletions(-) create mode 100644 toolkit/toolkit_util.h diff --git a/toolkit/kmeans.cpp b/toolkit/kmeans.cpp index dff12b77c..5811f7fc8 100644 --- a/toolkit/kmeans.cpp +++ b/toolkit/kmeans.cpp @@ -2,160 +2,133 @@ // facing an exception #include #include -#include -#include -#include -#include -#include -#include -#include +#include "./toolkit_util.h" using namespace rabit; +// kmeans model class Model : public rabit::utils::ISerializable { public: - std::vector centroids; + // matrix of centroids + Matrix centroids; // load from stream virtual void Load(rabit::utils::IStream &fi) { - fi.Read(¢roids); + fi.Read(¢roids.nrow, sizeof(centroids.nrow)); + fi.Read(¢roids.ncol, sizeof(centroids.ncol)); + fi.Read(¢roids.data); } /*! \brief save the model to the stream */ virtual void Save(rabit::utils::IStream &fo) const { - fo.Write(centroids); + fo.Write(¢roids.nrow, sizeof(centroids.nrow)); + fo.Write(¢roids.ncol, sizeof(centroids.ncol)); + fo.Write(centroids.data); } - virtual void InitModel(int k, int d) { - centroids.resize(k * d, 0.0f); + virtual void InitModel(unsigned num_cluster, unsigned feat_dim) { + centroids.Init(num_cluster, feat_dim); + } + // normalize L2 norm + inline void Normalize(void) { + for (size_t i = 0; i < centroids.nrow; ++i) { + float *row = centroids[i]; + double wsum = 0.0; + for (size_t j = 0; j < centroids.ncol; ++j) { + wsum += row[j] * row[j]; + } + wsum = sqrt(wsum); + if (wsum < 1e-6) return; + float winv = 1.0 / wsum; + for (size_t j = 0; j < centroids.ncol; ++j) { + row[j] *= winv; + } + } } - }; - -/*!\brief computes a random number modulo the value */ -inline int Random(int value) { - return rand() % value; -} - -inline void KMeans(int ntrial, int iter, int k, int d, std::vector >& data, Model *model) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - - utils::LogPrintf("[%d] Running KMeans iter=%d\n", rank, iter); - - // compute ndata based on assignments - std::vector ndata(k * d + k, 0.0f); - for (int i = 0; i < data.size(); ++i) { - float max_sim = FLT_MIN; - int cindex = -1; - for (int j = 0; j < k; ++j) { - float sim = 0.0f; - int cstart = j * d; - for (int y = 0, z = cstart; y < d; ++y, ++z) { - sim += model->centroids[z] * data[i][y]; - } - if (sim > max_sim) { - cindex = j; - max_sim = sim; - } +inline void InitCentroids(const SparseMat &data, Matrix *centroids) { + int num_cluster = centroids->nrow; + for (int i = 0; i < num_cluster; ++i) { + int index = Random(data.NumRow()); + SparseMat::Vector v = data[index]; + for (unsigned j = 0; j < v.length; ++j) { + (*centroids)[i][v[j].findex] = v[j].fvalue; } - int start = cindex * d + cindex; - int j = start; - for (int l = 0; l < d; ++j, ++l) { - ndata[j] += data[i][l]; - } - // update count - ndata[j] += 1; } - - // do Allreduce - rabit::Allreduce(&ndata[0], ndata.size()); - - for (int i = 0; i < k; ++i) { - int nstart = i * d + i; - int cstart = i * d; - int cend= cstart + d; - int count = ndata[nstart + d]; - for (int j = nstart, l = cstart; l < cend; ++j, ++l) { - model->centroids[l] = ndata[j] / count; - } + for (int i = 0; i < num_cluster; ++i) { + int proc = Random(rabit::GetWorldSize()); + rabit::Broadcast((*centroids)[i], centroids->ncol * sizeof(float), proc); } } -inline void ReadData(char* data_dir, int d, std::vector >* data) { - int rank = rabit::GetRank(); - std::stringstream ss; - ss << data_dir << rank; - const char* file = ss.str().c_str(); - std::ifstream ifs(file); - utils::Check(ifs.good(), "[%d] File %s does not exist\n", rank, file); - float v = 0.0f; - while(!ifs.eof()) { - int i=0; - std::vector vec; - while (i < d) { - ifs >> v; - vec.push_back(v); - i++; - } - utils::Check(vec.size() % d == 0, "[%d] Invalid data size. %d instead of %d\n", rank, vec.size(), d); - data->push_back(vec); +inline double Cos(const float *row, + const SparseMat::Vector &v) { + double rdot = 0.0, rnorm = 0.0; + for (unsigned i = 0; i < v.length; ++i) { + rdot += row[v[i].findex] * v[i].fvalue; + rnorm += v[i].fvalue * v[i].fvalue; } + return rdot / sqrt(rnorm); } - -inline void InitCentroids(int k, int d, std::vector >& data, Model* model) { - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - std::vector > candidate_centroids; - candidate_centroids.resize(k, std::vector(d)); - int elements = data.size(); - for (size_t i = 0; i < k; ++i) { - int index = Random(elements); - candidate_centroids[i] = data[index]; - } - for (size_t i = 0; i < k; ++i) { - int proc = Random(nproc); - std::vector tmp(d, 0.0f); - if (proc == rank) { - tmp = candidate_centroids[i]; - rabit::Broadcast(&tmp, proc); - } else { - rabit::Broadcast(&tmp, proc); - } - int start = i * d; - int j = start; - for (int l = 0; l < d; ++j, ++l) { - model->centroids[j] = tmp[l]; - } +inline size_t GetCluster(const Matrix ¢roids, + const SparseMat::Vector &v) { + size_t imin = 0; + double dmin = Cos(centroids[0], v); + for (size_t k = 1; k < centroids.nrow; ++k) { + double dist = Cos(centroids[k], v); + if (dist < dmin) { + dmin = dist; imin = k; + } } + return imin; } - + int main(int argc, char *argv[]) { if (argc < 4) { - printf("Usage: \n"); + printf("Usage: num_cluster max_iter\n"); return 0; } - int k = atoi(argv[1]); - int d = atoi(argv[2]); - int max_itr = atoi(argv[3]); - - rabit::Init(argc, argv); - int rank = rabit::GetRank(); - int nproc = rabit::GetWorldSize(); - std::string name = rabit::GetProcessorName(); - srand(0); - int ntrial = 0; - Model model; - - std::vector > data; + // load the data + SparseMat data; + data.Load(argv[1]); + // set the parameters + int num_cluster = atoi(argv[2]); + int max_iter = atoi(argv[3]); + // intialize rabit engine + rabit::Init(argc, argv); + // load model + Model model; int iter = rabit::LoadCheckPoint(&model); if (iter == 0) { - ReadData(argv[4], d, &data); - model.InitModel(k, d); - InitCentroids(k, d, data, &model); + rabit::Allreduce(&data.feat_dim, sizeof(data.feat_dim)); + model.InitModel(num_cluster, data.feat_dim); + InitCentroids(data, &model.centroids); + model.Normalize(); + utils::LogPrintf("[%d] start at %s\n", + rabit::GetRank(), rabit::GetProcessorName().c_str()); } else { - utils::LogPrintf("[%d] reload-trail=%d, init iter=%d\n", rank, ntrial, iter); + utils::LogPrintf("[%d] restart iter=%d\n", rabit::GetRank(), iter); } - for (int r = iter; r < max_itr; ++r) { - KMeans(ntrial, r, k, d, data, &model); + const unsigned num_feat = data.feat_dim; + // matrix to store the result + Matrix temp; + for (int r = iter; r < max_iter; ++r) { + temp.Init(num_cluster, num_feat + 1, 0.0f); + const size_t ndata = data.NumRow(); + for (size_t i = 0; i < ndata; ++i) { + SparseMat::Vector v = data[i]; + size_t k = GetCluster(model.centroids, v); + for (size_t j = 0; j < v.length; ++j) { + temp[k][v[j].findex] += v[j].fvalue; + } + temp[k][num_feat] += 1.0f; + } + rabit::Allreduce(&temp.data[0], temp.data.size()); + for (int k = 0; k < num_cluster; ++k) { + float cnt = temp[k][num_feat]; + for (unsigned i = 0; i < num_feat; ++i) { + model.centroids[k][i] = temp[k][i] / cnt; + } + } + model.Normalize(); rabit::CheckPoint(model); } rabit::Finalize(); diff --git a/toolkit/toolkit_util.h b/toolkit/toolkit_util.h new file mode 100644 index 000000000..71bf888d0 --- /dev/null +++ b/toolkit/toolkit_util.h @@ -0,0 +1,89 @@ +#include +#include +#include +#include +#include + +namespace rabit { +/*! \brief sparse matrix, CSR format */ +struct SparseMat { + // sparse matrix entry + struct Entry { + // feature index + unsigned findex; + // feature value + float fvalue; + }; + // sparse vector + struct Vector { + const Entry *data; + unsigned length; + inline const Entry &operator[](size_t i) const { + return data[i]; + } + }; + inline Vector operator[](size_t i) const { + Vector v; + v.data = &data[0] + row_ptr[i]; + v.length = static_cast(row_ptr[i + 1]-row_ptr[i]); + return v; + } + // load data from file + inline void Load(const char *fname) { + FILE *fi; + if (!strcmp(fname, "stdin")) { + fi = stdin; + } else { + fi = utils::FopenCheck(fname, "r"); + } + row_ptr.clear(); + row_ptr.push_back(0); + data.clear(); + feat_dim = 0; + unsigned num_feat; + while (fscanf(fi, "%u", &num_feat) == 1) { + Entry e; + for (unsigned i = 0; i < num_feat; ++i) { + utils::Check(fscanf(fi, "%u:%f", &e.findex, &e.fvalue) == 2, + "invalid format"); + data.push_back(e); + feat_dim = std::max(e.findex, feat_dim); + } + row_ptr.push_back(data.size()); + } + feat_dim += 1; + // close the filed + if (fi != stdin) fclose(fi); + } + inline size_t NumRow(void) const { + return row_ptr.size() - 1; + } + // maximum feature dimension + unsigned feat_dim; + std::vector row_ptr; + std::vector data; +}; +// dense matrix +struct Matrix { + inline void Init(size_t nrow, size_t ncol, float v = 0.0f) { + this->nrow = nrow; + this->ncol = ncol; + data.resize(nrow * ncol); + std::fill(data.begin(), data.end(), v); + } + inline float *operator[](size_t i) { + return &data[0] + i * ncol; + } + inline const float *operator[](size_t i) const { + return &data[0] + i * ncol; + } + // number of data + size_t nrow, ncol; + std::vector data; +}; + +/*!\brief computes a random number modulo the value */ +inline int Random(int value) { + return rand() % value; +} +} // namespace rabit