Merge branch 'unity'

Conflicts:
	R-package/src/Makevars
	R-package/src/Makevars.win
This commit is contained in:
tqchen
2014-08-30 15:01:36 -07:00
13 changed files with 1115 additions and 69 deletions

View File

@@ -5,6 +5,8 @@
#include "../utils/io.h"
#include "../utils/utils.h"
#include "simple_dmatrix-inl.hpp"
#include "page_dmatrix-inl.hpp"
// implements data loads using dmatrix simple for now
namespace xgboost {
@@ -20,7 +22,13 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer) {
dmat->LoadBinary(fs, silent, fname);
fs.Close();
return dmat;
}
}
if (magic == DMatrixPage::kMagic) {
DMatrixPage *dmat = new DMatrixPage();
dmat->Load(fs, silent, fname);
// the file pointer is hold in page matrix
return dmat;
}
fs.Close();
DMatrixSimple *dmat = new DMatrixSimple();
@@ -29,11 +37,17 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer) {
}
void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) {
if (!strcmp(fname + strlen(fname) - 5, ".page")) {
DMatrixPage::Save(fname, dmat, silent);
return;
}
if (dmat.magic == DMatrixSimple::kMagic) {
const DMatrixSimple *p_dmat = static_cast<const DMatrixSimple*>(&dmat);
p_dmat->SaveBinary(fname, silent);
} else {
utils::Error("not implemented");
DMatrixSimple smat;
smat.CopyFrom(dmat);
smat.SaveBinary(fname, silent);
}
}

269
src/io/page_dmatrix-inl.hpp Normal file
View File

@@ -0,0 +1,269 @@
#ifndef XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_
#define XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_
/*!
* \file page_row_iter-inl.hpp
* row iterator based on sparse page
* \author Tianqi Chen
*/
#include "../data.h"
#include "../utils/iterator.h"
#include "../utils/thread_buffer.h"
#include "./simple_fmatrix-inl.hpp"
namespace xgboost {
namespace io {
/*! \brief page structure that can be used to store a rowbatch */
struct RowBatchPage {
public:
RowBatchPage(void) {
data_ = new int[kPageSize];
utils::Assert(data_ != NULL, "fail to allocate row batch page");
this->Clear();
}
~RowBatchPage(void) {
if (data_ != NULL) delete [] data_;
}
/*!
* \brief Push one row into page
* \param row an instance row
* \return false or true to push into
*/
inline bool PushRow(const RowBatch::Inst &row) {
const size_t dsize = row.length * sizeof(RowBatch::Entry);
if (FreeBytes() < dsize+ sizeof(int)) return false;
row_ptr(Size() + 1) = row_ptr(Size()) + row.length;
memcpy(data_ptr(row_ptr(Size())) , row.data, dsize);
++ data_[0];
return true;
}
/*!
* \brief get a row batch representation from the page
* \param p_rptr a temporal space that can be used to provide
* ind_ptr storage for RowBatch
* \return a new RowBatch object
*/
inline RowBatch GetRowBatch(std::vector<size_t> *p_rptr, size_t base_rowid) {
RowBatch batch;
batch.base_rowid = base_rowid;
batch.data_ptr = this->data_ptr(0);
batch.size = static_cast<size_t>(this->Size());
std::vector<size_t> &rptr = *p_rptr;
rptr.resize(this->Size() + 1);
for (size_t i = 0; i < rptr.size(); ++i) {
rptr[i] = static_cast<size_t>(this->row_ptr(static_cast<int>(i)));
}
batch.ind_ptr = &rptr[0];
return batch;
}
/*! \brief get i-th row from the batch */
inline RowBatch::Inst operator[](int i) {
return RowBatch::Inst(data_ptr(0) + row_ptr(i),
static_cast<bst_uint>(row_ptr(i+1) - row_ptr(i)));
}
/*!
* \brief clear the page, cleanup the content
*/
inline void Clear(void) {
memset(&data_[0], 0, sizeof(int) * kPageSize);
}
/*!
* \brief load one page form instream
* \return true if loading is successful
*/
inline bool Load(utils::IStream &fi) {
return fi.Read(&data_[0], sizeof(int) * kPageSize) != 0;
}
/*! \brief save one page into outstream */
inline void Save(utils::IStream &fo) {
fo.Write(&data_[0], sizeof(int) * kPageSize);
}
/*! \return number of elements */
inline int Size(void) const {
return data_[0];
}
/*! \brief page size 64 MB */
static const size_t kPageSize = 64 << 18;
private:
/*! \return number of elements */
inline size_t FreeBytes(void) {
return (kPageSize - (Size() + 2)) * sizeof(int)
- row_ptr(Size()) * sizeof(RowBatch::Entry) ;
}
/*! \brief equivalent row pointer at i */
inline int& row_ptr(int i) {
return data_[kPageSize - i - 1];
}
inline RowBatch::Entry* data_ptr(int i) {
return (RowBatch::Entry*)(&data_[1]) + i;
}
// content of data
int *data_;
};
/*! \brief thread buffer iterator */
class ThreadRowPageIterator: public utils::IIterator<RowBatch> {
public:
ThreadRowPageIterator(void) {
itr.SetParam("buffer_size", "2");
page_ = NULL;
base_rowid_ = 0;
isend_ = false;
}
virtual ~ThreadRowPageIterator(void) {
}
virtual void Init(void) {
}
virtual void BeforeFirst(void) {
itr.BeforeFirst();
isend_ = false;
base_rowid_ = 0;
}
virtual bool Next(void) {
if(!this->LoadNextPage()) return false;
out_ = page_->GetRowBatch(&tmp_ptr_, base_rowid_);
base_rowid_ += out_.size;
return true;
}
virtual const RowBatch &Value(void) const{
return out_;
}
/*! \brief load and initialize the iterator with fi */
inline void Load(const utils::FileStream &fi) {
itr.get_factory().SetFile(fi);
itr.Init();
this->BeforeFirst();
}
/*!
* \brief save a row iterator to output stream, in row iterator format
*/
inline static void Save(utils::IIterator<RowBatch> *iter,
utils::IStream &fo) {
RowBatchPage page;
iter->BeforeFirst();
while (iter->Next()) {
const RowBatch &batch = iter->Value();
for (size_t i = 0; i < batch.size; ++i) {
if (!page.PushRow(batch[i])) {
page.Save(fo);
page.Clear();
utils::Check(page.PushRow(batch[i]), "row is too big");
}
}
}
if (page.Size() != 0) page.Save(fo);
}
private:
// load in next page
inline bool LoadNextPage(void) {
ptop_ = 0;
bool ret = itr.Next(page_);
isend_ = !ret;
return ret;
}
// base row id
size_t base_rowid_;
// temporal ptr
std::vector<size_t> tmp_ptr_;
// output data
RowBatch out_;
// whether we reach end of file
bool isend_;
// page pointer type
typedef RowBatchPage* PagePtr;
// loader factory for page
struct Factory {
public:
long file_begin_;
utils::FileStream fi;
Factory(void) {}
inline void SetFile(const utils::FileStream &fi) {
this->fi = fi;
file_begin_ = this->fi.Tell();
}
inline bool Init(void) {
return true;
}
inline void SetParam(const char *name, const char *val) {}
inline bool LoadNext(PagePtr &val) {
return val->Load(fi);
}
inline PagePtr Create(void) {
PagePtr a = new RowBatchPage();
return a;
}
inline void FreeSpace(PagePtr &a) {
delete a;
}
inline void Destroy(void) {
fi.Close();
}
inline void BeforeFirst(void) {
fi.Seek(file_begin_);
}
};
protected:
PagePtr page_;
int ptop_;
utils::ThreadBuffer<PagePtr,Factory> itr;
};
/*! \brief data matrix using page */
class DMatrixPage : public DataMatrix {
public:
DMatrixPage(void) : DataMatrix(kMagic) {
iter_ = new ThreadRowPageIterator();
fmat_ = new FMatrixS(iter_);
}
// virtual destructor
virtual ~DMatrixPage(void) {
delete fmat_;
}
virtual IFMatrix *fmat(void) const {
return fmat_;
}
/*! \brief load and initialize the iterator with fi */
inline void Load(utils::FileStream &fi,
bool silent = false,
const char *fname = NULL){
int magic;
utils::Check(fi.Read(&magic, sizeof(magic)) != 0, "invalid input file format");
utils::Check(magic == kMagic, "invalid format,magic number mismatch");
this->info.LoadBinary(fi);
iter_->Load(fi);
if (!silent) {
printf("DMatrixPage: %lux%lu matrix is loaded",
info.num_row(), info.num_col());
if (fname != NULL) {
printf(" from %s\n", fname);
} else {
printf("\n");
}
if (info.group_ptr.size() != 0) {
printf("data contains %u groups\n", (unsigned)info.group_ptr.size()-1);
}
}
}
/*! \brief save a DataMatrix as DMatrixPage*/
inline static void Save(const char* fname, const DataMatrix &mat, bool silent) {
utils::FileStream fs(utils::FopenCheck(fname, "wb"));
int magic = kMagic;
fs.Write(&magic, sizeof(magic));
mat.info.SaveBinary(fs);
ThreadRowPageIterator::Save(mat.fmat()->RowIterator(), fs);
fs.Close();
if (!silent) {
printf("DMatrixPage: %lux%lu is saved to %s\n",
mat.info.num_row(), mat.info.num_col(), fname);
}
}
/*! \brief the real fmatrix */
FMatrixS *fmat_;
/*! \brief row iterator */
ThreadRowPageIterator *iter_;
/*! \brief magic number used to identify DMatrix */
static const int kMagic = 0xffffab02;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_

View File

@@ -44,8 +44,8 @@ class DMatrixSimple : public DataMatrix {
}
/*! \brief copy content data from source matrix */
inline void CopyFrom(const DataMatrix &src) {
this->info = src.info;
this->Clear();
this->info = src.info;
// clone data content in thos matrix
utils::IIterator<RowBatch> *iter = src.fmat()->RowIterator();
iter->BeforeFirst();

View File

@@ -150,7 +150,7 @@ class FMatrixS : public IFMatrix{
iter_->BeforeFirst();
while (iter_->Next()) {
const RowBatch &batch = iter_->Value();
for (size_t i = 0; i < batch.size; ++i) {
for (size_t i = 0; i < batch.size; ++i) {
if (pkeep == 1.0f || random::SampleBinary(pkeep)) {
buffered_rowset_.push_back(static_cast<bst_uint>(batch.base_rowid+i));
RowBatch::Inst inst = batch[i];

View File

@@ -37,7 +37,9 @@ struct TrainParam{
// speed optimization for dense column
float opt_dense_col;
// leaf vector size
int size_leaf_vector;
int size_leaf_vector;
// option for parallelization
int parallel_option;
// number of threads to be used for tree construction,
// if OpenMP is enabled, if equals 0, use system default
int nthread;
@@ -55,6 +57,7 @@ struct TrainParam{
opt_dense_col = 1.0f;
nthread = 0;
size_leaf_vector = 0;
parallel_option = 0;
}
/*!
* \brief set parameters from outside
@@ -79,6 +82,7 @@ struct TrainParam{
if (!strcmp(name, "size_leaf_vector")) size_leaf_vector = atoi(val);
if (!strcmp(name, "max_depth")) max_depth = atoi(val);
if (!strcmp(name, "nthread")) nthread = atoi(val);
if (!strcmp(name, "parallel_option")) parallel_option = atoi(val);
if (!strcmp(name, "default_direction")) {
if (!strcmp(val, "learn")) default_direction = 0;
if (!strcmp(val, "left")) default_direction = 1;

View File

@@ -45,15 +45,19 @@ class ColMaker: public IUpdater {
// data structure
/*! \brief per thread x per node entry to store tmp data */
struct ThreadEntry {
/*! \brief statistics of data*/
/*! \brief statistics of data */
TStats stats;
/*! \brief extra statistics of data */
TStats stats_extra;
/*! \brief last feature value scanned */
float last_fvalue;
/*! \brief first feature value scanned */
float first_fvalue;
/*! \brief current best solution */
SplitEntry best;
// constructor
explicit ThreadEntry(const TrainParam &param)
: stats(param) {
: stats(param), stats_extra(param) {
}
};
struct NodeEntry {
@@ -219,7 +223,137 @@ class ColMaker: public IUpdater {
}
// use new nodes for qexpand
qexpand = newnodes;
}
}
// parallel find the best split of current fid
// this function does not support nested functions
inline void ParallelFindSplit(const ColBatch::Inst &col,
bst_uint fid,
const IFMatrix &fmat,
const std::vector<bst_gpair> &gpair,
const BoosterInfo &info) {
bool need_forward = param.need_forward_search(fmat.GetColDensity(fid));
bool need_backward = param.need_backward_search(fmat.GetColDensity(fid));
int nthread;
#pragma omp parallel
{
const int tid = omp_get_thread_num();
std::vector<ThreadEntry> &temp = stemp[tid];
// cleanup temp statistics
for (size_t j = 0; j < qexpand.size(); ++j) {
temp[qexpand[j]].stats.Clear();
}
nthread = omp_get_num_threads();
bst_uint step = (col.length + nthread - 1) / nthread;
bst_uint end = std::min(col.length, step * (tid + 1));
for (bst_uint i = tid * step; i < end; ++i) {
const bst_uint ridx = col[i].index;
const int nid = position[ridx];
if (nid < 0) continue;
const float fvalue = col[i].fvalue;
if (temp[nid].stats.Empty()) {
temp[nid].first_fvalue = fvalue;
}
temp[nid].stats.Add(gpair, info, ridx);
temp[nid].last_fvalue = fvalue;
}
}
// start collecting the partial sum statistics
bst_omp_uint nnode = static_cast<bst_omp_uint>(qexpand.size());
#pragma omp parallel for schedule(static)
for (bst_omp_uint j = 0; j < nnode; ++j) {
const int nid = qexpand[j];
TStats sum(param), tmp(param), c(param);
for (int tid = 0; tid < nthread; ++tid) {
tmp = stemp[tid][nid].stats;
stemp[tid][nid].stats = sum;
sum.Add(tmp);
if (tid != 0) {
std::swap(stemp[tid - 1][nid].last_fvalue, stemp[tid][nid].first_fvalue);
}
}
for (int tid = 0; tid < nthread; ++tid) {
stemp[tid][nid].stats_extra = sum;
ThreadEntry &e = stemp[tid][nid];
float fsplit;
if (tid != 0) {
if(fabsf(stemp[tid - 1][nid].last_fvalue - e.first_fvalue) > rt_2eps) {
fsplit = (stemp[tid - 1][nid].last_fvalue - e.first_fvalue) * 0.5f;
} else {
continue;
}
} else {
fsplit = e.first_fvalue - rt_eps;
}
if (need_forward && tid != 0) {
c.SetSubstract(snode[nid].stats, e.stats);
if (c.sum_hess >= param.min_child_weight && e.stats.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(e.stats.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, fsplit, false);
}
}
if (need_backward) {
tmp.SetSubstract(sum, e.stats);
c.SetSubstract(snode[nid].stats, tmp);
if (c.sum_hess >= param.min_child_weight && tmp.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(tmp.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, fsplit, true);
}
}
}
if (need_backward) {
tmp = sum;
ThreadEntry &e = stemp[nthread-1][nid];
c.SetSubstract(snode[nid].stats, tmp);
if (c.sum_hess >= param.min_child_weight && tmp.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(tmp.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, e.last_fvalue + rt_eps, true);
}
}
}
// rescan, generate candidate split
#pragma omp parallel
{
TStats c(param), cright(param);
const int tid = omp_get_thread_num();
std::vector<ThreadEntry> &temp = stemp[tid];
nthread = static_cast<bst_uint>(omp_get_num_threads());
bst_uint step = (col.length + nthread - 1) / nthread;
bst_uint end = std::min(col.length, step * (tid + 1));
for (bst_uint i = tid * step; i < end; ++i) {
const bst_uint ridx = col[i].index;
const int nid = position[ridx];
if (nid < 0) continue;
const float fvalue = col[i].fvalue;
// get the statistics of nid
ThreadEntry &e = temp[nid];
if (e.stats.Empty()) {
e.stats.Add(gpair, info, ridx);
e.first_fvalue = fvalue;
} else {
// forward default right
if (fabsf(fvalue - e.first_fvalue) > rt_2eps){
if (need_forward) {
c.SetSubstract(snode[nid].stats, e.stats);
if (c.sum_hess >= param.min_child_weight && e.stats.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(e.stats.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, false);
}
}
if (need_backward) {
cright.SetSubstract(e.stats_extra, e.stats);
c.SetSubstract(snode[nid].stats, cright);
if (c.sum_hess >= param.min_child_weight && cright.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(cright.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, true);
}
}
}
e.stats.Add(gpair, info, ridx);
e.first_fvalue = fvalue;
}
}
}
}
// enumerate the split values of specific feature
inline void EnumerateSplit(const ColBatch::Entry *begin,
const ColBatch::Entry *end,
@@ -272,6 +406,38 @@ class ColMaker: public IUpdater {
}
}
}
// update the solution candidate
virtual void UpdateSolution(const ColBatch &batch,
const std::vector<bst_gpair> &gpair,
const IFMatrix &fmat,
const BoosterInfo &info) {
// start enumeration
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
#if defined(_OPENMP)
const int batch_size = std::max(static_cast<int>(nsize / this->nthread / 32), 1);
#endif
if (param.parallel_option == 0) {
#pragma omp parallel for schedule(dynamic, batch_size)
for (bst_omp_uint i = 0; i < nsize; ++i) {
const bst_uint fid = batch.col_index[i];
const int tid = omp_get_thread_num();
const ColBatch::Inst c = batch[i];
if (param.need_forward_search(fmat.GetColDensity(fid))) {
this->EnumerateSplit(c.data, c.data + c.length, +1,
fid, gpair, info, stemp[tid]);
}
if (param.need_backward_search(fmat.GetColDensity(fid))) {
this->EnumerateSplit(c.data + c.length - 1, c.data - 1, -1,
fid, gpair, info, stemp[tid]);
}
}
} else {
for (bst_omp_uint i = 0; i < nsize; ++i) {
this->ParallelFindSplit(batch[i], batch.col_index[i],
fmat, gpair, info);
}
}
}
// find splits at current level, do split per level
inline void FindSplit(int depth,
const std::vector<int> &qexpand,
@@ -288,26 +454,7 @@ class ColMaker: public IUpdater {
}
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator(feat_set);
while (iter->Next()) {
const ColBatch &batch = iter->Value();
// start enumeration
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
#if defined(_OPENMP)
const int batch_size = std::max(static_cast<int>(nsize / this->nthread / 32), 1);
#endif
#pragma omp parallel for schedule(dynamic, batch_size)
for (bst_omp_uint i = 0; i < nsize; ++i) {
const bst_uint fid = batch.col_index[i];
const int tid = omp_get_thread_num();
const ColBatch::Inst c = batch[i];
if (param.need_forward_search(p_fmat->GetColDensity(fid))) {
this->EnumerateSplit(c.data, c.data + c.length, +1,
fid, gpair, info, stemp[tid]);
}
if (param.need_backward_search(p_fmat->GetColDensity(fid))) {
this->EnumerateSplit(c.data + c.length - 1, c.data - 1, -1,
fid, gpair, info, stemp[tid]);
}
}
this->UpdateSolution(iter->Value(), gpair, *p_fmat, info);
}
// after this each thread's stemp will get the best candidates, aggregate results
for (size_t i = 0; i < qexpand.size(); ++i) {
@@ -325,6 +472,7 @@ class ColMaker: public IUpdater {
}
}
}
// reset position of each data points after split is created in the tree
inline void ResetPosition(const std::vector<int> &qexpand, IFMatrix *p_fmat, const RegTree &tree) {
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();

View File

@@ -88,11 +88,21 @@ class IStream {
}
};
/*! \brief implementation of file i/o stream */
class FileStream : public IStream {
private:
FILE *fp;
/*! \brief interface of i/o stream that support seek */
class ISeekStream: public IStream {
public:
/*! \brief seek to certain position of the file */
virtual void Seek(long pos) = 0;
/*! \brief tell the position of the stream */
virtual long Tell(void) = 0;
};
/*! \brief implementation of file i/o stream */
class FileStream : public ISeekStream {
public:
explicit FileStream(void) {
this->fp = NULL;
}
explicit FileStream(FILE *fp) {
this->fp = fp;
}
@@ -102,12 +112,20 @@ class FileStream : public IStream {
virtual void Write(const void *ptr, size_t size) {
fwrite(ptr, size, 1, fp);
}
inline void Seek(size_t pos) {
fseek(fp, 0, SEEK_SET);
virtual void Seek(long pos) {
fseek(fp, pos, SEEK_SET);
}
virtual long Tell(void) {
return ftell(fp);
}
inline void Close(void) {
fclose(fp);
if (fp != NULL){
fclose(fp); fp = NULL;
}
}
private:
FILE *fp;
};
} // namespace utils

146
src/utils/thread.h Normal file
View File

@@ -0,0 +1,146 @@
#ifndef XGBOOST_UTILS_THREAD_H
#define XGBOOST_UTILS_THREAD_H
/*!
* \file thread.h
* \brief this header include the minimum necessary resource for multi-threading
* \author Tianqi Chen
* Acknowledgement: this file is adapted from SVDFeature project, by same author.
* The MAC support part of this code is provided by Artemy Kolchinsky
*/
#ifdef _MSC_VER
#include "utils.h"
#include <windows.h>
#include <process.h>
namespace xgboost {
namespace utils {
/*! \brief simple semaphore used for synchronization */
class Semaphore {
public :
inline void Init(int init_val) {
sem = CreateSemaphore(NULL, init_val, 10, NULL);
utils::Assert(sem != NULL, "create Semaphore error");
}
inline void Destroy(void) {
CloseHandle(sem);
}
inline void Wait(void) {
utils::Assert(WaitForSingleObject(sem, INFINITE) == WAIT_OBJECT_0, "WaitForSingleObject error");
}
inline void Post(void) {
utils::Assert(ReleaseSemaphore(sem, 1, NULL) != 0, "ReleaseSemaphore error");
}
private:
HANDLE sem;
};
/*! \brief simple thread that wraps windows thread */
class Thread {
private:
HANDLE thread_handle;
unsigned thread_id;
public:
inline void Start(unsigned int __stdcall entry(void*), void *param) {
thread_handle = (HANDLE)_beginthreadex(NULL, 0, entry, param, 0, &thread_id);
}
inline int Join(void) {
WaitForSingleObject(thread_handle, INFINITE);
return 0;
}
};
/*! \brief exit function called from thread */
inline void ThreadExit(void *status) {
_endthreadex(0);
}
#define XGBOOST_THREAD_PREFIX unsigned int __stdcall
} // namespace utils
} // namespace xgboost
#else
// thread interface using g++
#include <semaphore.h>
#include <pthread.h>
namespace xgboost {
namespace utils {
/*!\brief semaphore class */
class Semaphore {
#ifdef __APPLE__
private:
sem_t* semPtr;
char sema_name[20];
private:
inline void GenRandomString(char *s, const int len) {
static const char alphanum[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" ;
for (int i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
public:
inline void Init(int init_val) {
sema_name[0]='/';
sema_name[1]='s';
sema_name[2]='e';
sema_name[3]='/';
GenRandomString(&sema_name[4], 16);
if((semPtr = sem_open(sema_name, O_CREAT, 0644, init_val)) == SEM_FAILED) {
perror("sem_open");
exit(1);
}
utils::Assert(semPtr != NULL, "create Semaphore error");
}
inline void Destroy(void) {
if (sem_close(semPtr) == -1) {
perror("sem_close");
exit(EXIT_FAILURE);
}
if (sem_unlink(sema_name) == -1) {
perror("sem_unlink");
exit(EXIT_FAILURE);
}
}
inline void Wait(void) {
sem_wait(semPtr);
}
inline void Post(void) {
sem_post(semPtr);
}
#else
private:
sem_t sem;
public:
inline void Init(int init_val) {
sem_init(&sem, 0, init_val);
}
inline void Destroy(void) {
sem_destroy(&sem);
}
inline void Wait(void) {
sem_wait(&sem);
}
inline void Post(void) {
sem_post(&sem);
}
#endif
};
/*!\brief simple thread class */
class Thread {
private:
pthread_t thread;
public :
inline void Start(void * entry(void*), void *param) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&thread, &attr, entry, param);
}
inline int Join(void) {
void *status;
return pthread_join(thread, &status);
}
};
inline void ThreadExit(void *status) {
pthread_exit(status);
}
} // namespace utils
} // namespace xgboost
#define XGBOOST_THREAD_PREFIX void *
#endif
#endif

200
src/utils/thread_buffer.h Normal file
View File

@@ -0,0 +1,200 @@
#ifndef XGBOOST_UTILS_THREAD_BUFFER_H
#define XGBOOST_UTILS_THREAD_BUFFER_H
/*!
* \file thread_buffer.h
* \brief multi-thread buffer, iterator, can be used to create parallel pipeline
* \author Tianqi Chen
*/
#include <vector>
#include <cstring>
#include <cstdlib>
#include "./utils.h"
#include "./thread.h"
namespace xgboost {
namespace utils {
/*!
* \brief buffered loading iterator that uses multithread
* this template method will assume the following paramters
* \tparam Elem elememt type to be buffered
* \tparam ElemFactory factory type to implement in order to use thread buffer
*/
template<typename Elem, typename ElemFactory>
class ThreadBuffer {
public:
/*!\brief constructor */
ThreadBuffer(void) {
this->init_end = false;
this->buf_size = 30;
}
~ThreadBuffer(void) {
if(init_end) this->Destroy();
}
/*!\brief set parameter, will also pass the parameter to factory */
inline void SetParam(const char *name, const char *val) {
if (!strcmp( name, "buffer_size")) buf_size = atoi(val);
factory.SetParam(name, val);
}
/*!
* \brief initalize the buffered iterator
* \param param a initialize parameter that will pass to factory, ignore it if not necessary
* \return false if the initlization can't be done, e.g. buffer file hasn't been created
*/
inline bool Init(void) {
if (!factory.Init()) return false;
for (int i = 0; i < buf_size; ++i) {
bufA.push_back(factory.Create());
bufB.push_back(factory.Create());
}
this->init_end = true;
this->StartLoader();
return true;
}
/*!\brief place the iterator before first value */
inline void BeforeFirst(void) {
// wait till last loader end
loading_end.Wait();
// critcal zone
current_buf = 1;
factory.BeforeFirst();
// reset terminate limit
endA = endB = buf_size;
// wake up loader for first part
loading_need.Post();
// wait til first part is loaded
loading_end.Wait();
// set current buf to right value
current_buf = 0;
// wake loader for next part
data_loaded = false;
loading_need.Post();
// set buffer value
buf_index = 0;
}
/*! \brief destroy the buffer iterator, will deallocate the buffer */
inline void Destroy(void) {
// wait until the signal is consumed
this->destroy_signal = true;
loading_need.Post();
loader_thread.Join();
loading_need.Destroy();
loading_end.Destroy();
for (size_t i = 0; i < bufA.size(); ++i) {
factory.FreeSpace(bufA[i]);
}
for (size_t i = 0; i < bufB.size(); ++i) {
factory.FreeSpace(bufB[i]);
}
bufA.clear(); bufB.clear();
factory.Destroy();
this->init_end = false;
}
/*!
* \brief get the next element needed in buffer
* \param elem element to store into
* \return whether reaches end of data
*/
inline bool Next(Elem &elem) {
// end of buffer try to switch
if (buf_index == buf_size) {
this->SwitchBuffer();
buf_index = 0;
}
if (buf_index >= (current_buf ? endA : endB)) {
return false;
}
std::vector<Elem> &buf = current_buf ? bufA : bufB;
elem = buf[buf_index];
++buf_index;
return true;
}
/*!
* \brief get the factory object
*/
inline ElemFactory &get_factory(void) {
return factory;
}
// size of buffer
int buf_size;
private:
// factory object used to load configures
ElemFactory factory;
// index in current buffer
int buf_index;
// indicate which one is current buffer
int current_buf;
// max limit of visit, also marks termination
int endA, endB;
// double buffer, one is accessed by loader
// the other is accessed by consumer
// buffer of the data
std::vector<Elem> bufA, bufB;
// initialization end
bool init_end;
// singal whether the data is loaded
bool data_loaded;
// signal to kill the thread
bool destroy_signal;
// thread object
Thread loader_thread;
// signal of the buffer
Semaphore loading_end, loading_need;
/*!
* \brief slave thread
* this implementation is like producer-consumer style
*/
inline void RunLoader(void) {
while(!destroy_signal) {
// sleep until loading is needed
loading_need.Wait();
std::vector<Elem> &buf = current_buf ? bufB : bufA;
int i;
for (i = 0; i < buf_size ; ++i) {
if (!factory.LoadNext(buf[i])) {
int &end = current_buf ? endB : endA;
end = i; // marks the termination
break;
}
}
// signal that loading is done
data_loaded = true;
loading_end.Post();
}
}
/*!\brief entry point of loader thread */
inline static XGBOOST_THREAD_PREFIX LoaderEntry(void *pthread) {
static_cast< ThreadBuffer<Elem,ElemFactory>* >(pthread)->RunLoader();
ThreadExit(NULL);
return NULL;
}
/*!\brief start loader thread */
inline void StartLoader(void) {
destroy_signal = false;
// set param
current_buf = 1;
loading_need.Init(1);
loading_end .Init(0);
// reset terminate limit
endA = endB = buf_size;
loader_thread.Start(LoaderEntry, this);
// wait until first part of data is loaded
loading_end.Wait();
// set current buf to right value
current_buf = 0;
// wake loader for next part
data_loaded = false;
loading_need.Post();
buf_index = 0;
}
/*!\brief switch double buffer */
inline void SwitchBuffer(void) {
loading_end.Wait();
// loader shall be sleep now, critcal zone!
current_buf = !current_buf;
// wake up loader
data_loaded = false;
loading_need.Post();
}
};
} // namespace utils
} // namespace xgboost
#endif