Merge remote-tracking branch 'origin/unity'

Conflicts:
	R-package/src/Makevars
	R-package/src/Makevars.win
	src/utils/io.h
	wrapper/xgboost.py
This commit is contained in:
tqchen 2014-09-02 11:22:47 -07:00
commit eeb04a0603
14 changed files with 1535 additions and 68 deletions

2
.gitignore vendored
View File

@ -2,7 +2,7 @@
*.slo *.slo
*.lo *.lo
*.o *.o
*.page
# Compiled Dynamic libraries # Compiled Dynamic libraries
*.so *.so
*.dylib *.dylib

View File

@ -5,5 +5,3 @@ PKG_CPPFLAGS= -DXGBOOST_CUSTOMIZE_MSG_ -DXGBOOST_CUSTOMIZE_PRNG_ -DXGBOOST_STRIC
PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS) PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS)
PKG_LIBS = $(SHLIB_OPENMP_CFLAGS) PKG_LIBS = $(SHLIB_OPENMP_CFLAGS)
OBJECTS= xgboost_R.o xgboost_assert.o $(PKGROOT)/wrapper/xgboost_wrapper.o $(PKGROOT)/src/io/io.o $(PKGROOT)/src/gbm/gbm.o $(PKGROOT)/src/tree/updater.o OBJECTS= xgboost_R.o xgboost_assert.o $(PKGROOT)/wrapper/xgboost_wrapper.o $(PKGROOT)/src/io/io.o $(PKGROOT)/src/gbm/gbm.o $(PKGROOT)/src/tree/updater.o

View File

@ -6,6 +6,8 @@ using namespace std;
#include "../utils/io.h" #include "../utils/io.h"
#include "../utils/utils.h" #include "../utils/utils.h"
#include "simple_dmatrix-inl.hpp" #include "simple_dmatrix-inl.hpp"
#include "page_dmatrix-inl.hpp"
// implements data loads using dmatrix simple for now // implements data loads using dmatrix simple for now
namespace xgboost { namespace xgboost {
@ -21,7 +23,13 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer) {
dmat->LoadBinary(fs, silent, fname); dmat->LoadBinary(fs, silent, fname);
fs.Close(); fs.Close();
return dmat; return dmat;
} }
if (magic == DMatrixPage::kMagic) {
DMatrixPage *dmat = new DMatrixPage();
dmat->Load(fs, silent, fname);
// the file pointer is hold in page matrix
return dmat;
}
fs.Close(); fs.Close();
DMatrixSimple *dmat = new DMatrixSimple(); DMatrixSimple *dmat = new DMatrixSimple();
@ -30,11 +38,17 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer) {
} }
void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) { void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) {
if (!strcmp(fname + strlen(fname) - 5, ".page")) {
DMatrixPage::Save(fname, dmat, silent);
return;
}
if (dmat.magic == DMatrixSimple::kMagic) { if (dmat.magic == DMatrixSimple::kMagic) {
const DMatrixSimple *p_dmat = static_cast<const DMatrixSimple*>(&dmat); const DMatrixSimple *p_dmat = static_cast<const DMatrixSimple*>(&dmat);
p_dmat->SaveBinary(fname, silent); p_dmat->SaveBinary(fname, silent);
} else { } else {
utils::Error("not implemented"); DMatrixSimple smat;
smat.CopyFrom(dmat);
smat.SaveBinary(fname, silent);
} }
} }

262
src/io/page_dmatrix-inl.hpp Normal file
View File

@ -0,0 +1,262 @@
#ifndef XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_
#define XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_
/*!
* \file page_row_iter-inl.hpp
* row iterator based on sparse page
* \author Tianqi Chen
*/
#include "../data.h"
#include "../utils/iterator.h"
#include "../utils/thread_buffer.h"
#include "./simple_fmatrix-inl.hpp"
#include "./page_fmatrix-inl.hpp"
namespace xgboost {
namespace io {
/*! \brief page structure that can be used to store a rowbatch */
struct RowBatchPage {
public:
RowBatchPage(size_t page_size) : kPageSize(page_size) {
data_ = new int[kPageSize];
utils::Assert(data_ != NULL, "fail to allocate row batch page");
this->Clear();
}
~RowBatchPage(void) {
if (data_ != NULL) delete [] data_;
}
/*!
* \brief Push one row into page
* \param row an instance row
* \return false or true to push into
*/
inline bool PushRow(const RowBatch::Inst &row) {
const size_t dsize = row.length * sizeof(RowBatch::Entry);
if (FreeBytes() < dsize+ sizeof(int)) return false;
row_ptr(Size() + 1) = row_ptr(Size()) + row.length;
memcpy(data_ptr(row_ptr(Size())) , row.data, dsize);
++ data_[0];
return true;
}
/*!
* \brief get a row batch representation from the page
* \param p_rptr a temporal space that can be used to provide
* ind_ptr storage for RowBatch
* \return a new RowBatch object
*/
inline RowBatch GetRowBatch(std::vector<size_t> *p_rptr, size_t base_rowid) {
RowBatch batch;
batch.base_rowid = base_rowid;
batch.data_ptr = this->data_ptr(0);
batch.size = static_cast<size_t>(this->Size());
std::vector<size_t> &rptr = *p_rptr;
rptr.resize(this->Size() + 1);
for (size_t i = 0; i < rptr.size(); ++i) {
rptr[i] = static_cast<size_t>(this->row_ptr(static_cast<int>(i)));
}
batch.ind_ptr = &rptr[0];
return batch;
}
/*! \brief get i-th row from the batch */
inline RowBatch::Inst operator[](int i) {
return RowBatch::Inst(data_ptr(0) + row_ptr(i),
static_cast<bst_uint>(row_ptr(i+1) - row_ptr(i)));
}
/*!
* \brief clear the page, cleanup the content
*/
inline void Clear(void) {
memset(&data_[0], 0, sizeof(int) * kPageSize);
}
/*!
* \brief load one page form instream
* \return true if loading is successful
*/
inline bool Load(utils::IStream &fi) {
return fi.Read(&data_[0], sizeof(int) * kPageSize) != 0;
}
/*! \brief save one page into outstream */
inline void Save(utils::IStream &fo) {
fo.Write(&data_[0], sizeof(int) * kPageSize);
}
/*! \return number of elements */
inline int Size(void) const {
return data_[0];
}
private:
/*! \return number of elements */
inline size_t FreeBytes(void) {
return (kPageSize - (Size() + 2)) * sizeof(int)
- row_ptr(Size()) * sizeof(RowBatch::Entry) ;
}
/*! \brief equivalent row pointer at i */
inline int& row_ptr(int i) {
return data_[kPageSize - i - 1];
}
inline RowBatch::Entry* data_ptr(int i) {
return (RowBatch::Entry*)(&data_[1]) + i;
}
// page size
const size_t kPageSize;
// content of data
int *data_;
};
/*! \brief thread buffer iterator */
class ThreadRowPageIterator: public utils::IIterator<RowBatch> {
public:
ThreadRowPageIterator(void) {
itr.SetParam("buffer_size", "2");
page_ = NULL;
base_rowid_ = 0;
}
virtual ~ThreadRowPageIterator(void) {
}
virtual void Init(void) {
}
virtual void BeforeFirst(void) {
itr.BeforeFirst();
base_rowid_ = 0;
}
virtual bool Next(void) {
if(!itr.Next(page_)) return false;
out_ = page_->GetRowBatch(&tmp_ptr_, base_rowid_);
base_rowid_ += out_.size;
return true;
}
virtual const RowBatch &Value(void) const{
return out_;
}
/*! \brief load and initialize the iterator with fi */
inline void Load(const utils::FileStream &fi) {
itr.get_factory().SetFile(fi);
itr.Init();
this->BeforeFirst();
}
/*!
* \brief save a row iterator to output stream, in row iterator format
*/
inline static void Save(utils::IIterator<RowBatch> *iter,
utils::IStream &fo) {
RowBatchPage page(kPageSize);
iter->BeforeFirst();
while (iter->Next()) {
const RowBatch &batch = iter->Value();
for (size_t i = 0; i < batch.size; ++i) {
if (!page.PushRow(batch[i])) {
page.Save(fo);
page.Clear();
utils::Check(page.PushRow(batch[i]), "row is too big");
}
}
}
if (page.Size() != 0) page.Save(fo);
}
/*! \brief page size 64 MB */
static const size_t kPageSize = 64 << 18;
private:
// base row id
size_t base_rowid_;
// temporal ptr
std::vector<size_t> tmp_ptr_;
// output data
RowBatch out_;
// page pointer type
typedef RowBatchPage* PagePtr;
// loader factory for page
struct Factory {
public:
long file_begin_;
utils::FileStream fi;
Factory(void) {}
inline void SetFile(const utils::FileStream &fi) {
this->fi = fi;
file_begin_ = this->fi.Tell();
}
inline bool Init(void) {
return true;
}
inline void SetParam(const char *name, const char *val) {}
inline bool LoadNext(PagePtr &val) {
return val->Load(fi);
}
inline PagePtr Create(void) {
PagePtr a = new RowBatchPage(kPageSize);
return a;
}
inline void FreeSpace(PagePtr &a) {
delete a;
}
inline void Destroy(void) {
fi.Close();
}
inline void BeforeFirst(void) {
fi.Seek(file_begin_);
}
};
protected:
PagePtr page_;
utils::ThreadBuffer<PagePtr,Factory> itr;
};
/*! \brief data matrix using page */
class DMatrixPage : public DataMatrix {
public:
DMatrixPage(void) : DataMatrix(kMagic) {
iter_ = new ThreadRowPageIterator();
fmat_ = new FMatrixS(iter_);
}
// virtual destructor
virtual ~DMatrixPage(void) {
delete fmat_;
}
virtual IFMatrix *fmat(void) const {
return fmat_;
}
/*! \brief load and initialize the iterator with fi */
inline void Load(utils::FileStream &fi,
bool silent = false,
const char *fname = NULL){
int magic;
utils::Check(fi.Read(&magic, sizeof(magic)) != 0, "invalid input file format");
utils::Check(magic == kMagic, "invalid format,magic number mismatch");
this->info.LoadBinary(fi);
iter_->Load(fi);
if (!silent) {
printf("DMatrixPage: %lux%lu matrix is loaded",
static_cast<unsigned long>(info.num_row()),
static_cast<unsigned long>(info.num_col()));
if (fname != NULL) {
printf(" from %s\n", fname);
} else {
printf("\n");
}
if (info.group_ptr.size() != 0) {
printf("data contains %u groups\n", (unsigned)info.group_ptr.size()-1);
}
}
}
/*! \brief save a DataMatrix as DMatrixPage*/
inline static void Save(const char* fname, const DataMatrix &mat, bool silent) {
utils::FileStream fs(utils::FopenCheck(fname, "wb"));
int magic = kMagic;
fs.Write(&magic, sizeof(magic));
mat.info.SaveBinary(fs);
ThreadRowPageIterator::Save(mat.fmat()->RowIterator(), fs);
fs.Close();
if (!silent) {
printf("DMatrixPage: %lux%lu is saved to %s\n",
static_cast<unsigned long>(mat.info.num_row()),
static_cast<unsigned long>(mat.info.num_col()), fname);
}
}
/*! \brief the real fmatrix */
FMatrixS *fmat_;
/*! \brief row iterator */
ThreadRowPageIterator *iter_;
/*! \brief magic number used to identify DMatrix */
static const int kMagic = 0xffffab02;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_

316
src/io/page_fmatrix-inl.hpp Normal file
View File

@ -0,0 +1,316 @@
#ifndef XGBOOST_IO_PAGE_FMATRIX_INL_HPP_
#define XGBOOST_IO_PAGE_FMATRIX_INL_HPP_
/*!
* \file page_fmatrix-inl.hpp
* sparse page manager for fmatrix
* \author Tianqi Chen
*/
#include "../data.h"
#include "../utils/iterator.h"
#include "../utils/thread_buffer.h"
namespace xgboost {
namespace io {
class CSCMatrixManager {
public:
/*! \brief in memory page */
struct Page {
public:
/*! \brief initialize the page */
explicit Page(size_t size) {
buffer.resize(size);
col_index.reserve(10);
col_data.reserve(10);
}
/*! \brief clear the page */
inline void Clear(void) {
num_entry = 0;
col_index.clear();
col_data.clear();
}
/*! \brief number of used entries */
size_t num_entry;
/*! \brief column index */
std::vector<bst_uint> col_index;
/*! \brief column data */
std::vector<ColBatch::Inst> col_data;
/*! \brief number of free entries */
inline size_t NumFreeEntry(void) const {
return buffer.size() - num_entry;
}
inline ColBatch::Entry* AllocEntry(size_t len) {
ColBatch::Entry *p_data = &buffer[0] + num_entry;
num_entry += len;
return p_data;
}
/*! \brief get underlying batch */
inline ColBatch GetBatch(void) const {
ColBatch batch;
batch.col_index = &col_index[0];
batch.col_data = &col_data[0];
return batch;
}
private:
/*! \brief buffer space, not to be changed since ready */
std::vector<ColBatch::Entry> buffer;
};
/*! \brief define type of page pointer */
typedef Page *PagePtr;
/*! \brief get column pointer */
inline const std::vector<size_t> &col_ptr(void) const {
return col_ptr_;
}
inline void SetParam(const char *name, const char *val) {
}
inline PagePtr Create(void) {
return new Page(page_size_);
}
inline void FreeSpace(PagePtr &a) {
delete a;
}
inline void Destroy(void) {
}
inline void BeforeFirst(void) {
col_index_ = col_todo_;
read_top_ = 0;
}
inline bool LoadNext(PagePtr &val) {
val->Clear();
if (read_top_ >= col_index_.size()) return false;
while (read_top_ < col_index_.size()) {
if (!this->TryFill(col_index_[read_top_], val)) return true;
++read_top_;
}
return true;
}
inline bool Init(void) {
this->BeforeFirst();
return true;
}
inline void Setup(utils::ISeekStream *fi, double page_ratio) {
fi_ = fi;
fi_->Read(&begin_meta_ , sizeof(size_t));
fi_->Seek(begin_meta_);
fi_->Read(&col_ptr_);
size_t psmax = 0;
for (size_t i = 0; i < col_ptr_.size() - 1; ++i) {
psmax = std::max(psmax, col_ptr_[i+1] - col_ptr_[i]);
}
utils::Check(page_ratio >= 1.0f, "col_page_ratio must be at least 1");
page_size_ = std::max(static_cast<size_t>(psmax * page_ratio), psmax);
}
inline void SetColSet(const std::vector<bst_uint> &cset, bool setall) {
if (!setall) {
col_todo_.resize(cset.size());
for (size_t i = 0; i < cset.size(); ++i) {
col_todo_[i] = cset[i];
utils::Assert(col_todo_[i] < static_cast<bst_uint>(col_ptr_.size() - 1),
"CSCMatrixManager: column index exceed bound");
}
std::sort(col_todo_.begin(), col_todo_.end());
} else {
col_todo_.resize(col_ptr_.size()-1);
for (size_t i = 0; i < col_todo_.size(); ++i) {
col_todo_[i] = static_cast<bst_uint>(i);
}
}
}
private:
/*! \brief fill a page with */
inline bool TryFill(size_t cidx, Page *p_page) {
size_t len = col_ptr_[cidx+1] - col_ptr_[cidx];
if (p_page->NumFreeEntry() < len) return false;
ColBatch::Entry *p_data = p_page->AllocEntry(len);
fi_->Seek(col_ptr_[cidx] * sizeof(ColBatch::Entry) + sizeof(size_t));
utils::Check(fi_->Read(p_data, sizeof(ColBatch::Entry) * len) != 0,
"invalid column buffer format");
p_page->col_data.push_back(ColBatch::Inst(p_data, len));
p_page->col_index.push_back(cidx);
}
// the following are in memory auxiliary data structure
/*! \brief top of reader position */
size_t read_top_;
/*! \brief size of page */
size_t page_size_;
/*! \brief column index to be loaded */
std::vector<bst_uint> col_index_;
/*! \brief column index to be after calling before first */
std::vector<bst_uint> col_todo_;
// the following are input content
/*! \brief size of data content */
size_t begin_meta_;
/*! \brief input stream */
utils::ISeekStream *fi_;
/*! \brief column pointer of CSC format */
std::vector<size_t> col_ptr_;
};
class ThreadColPageIterator : public utils::IIterator<ColBatch> {
public:
ThreadColPageIterator(void) {
itr_.SetParam("buffer_size", "2");
page_ = NULL;
fi_ = NULL;
silent = 0;
}
virtual ~ThreadColPageIterator(void) {
if (fi_ != NULL) {
fi_->Close(); delete fi_;
}
}
virtual void Init(void) {
fi_ = new utils::FileStream(utils::FopenCheck(col_pagefile_.c_str(), "rb"));
itr_.get_factory().Setup(fi_, col_pageratio_);
if (silent == 0) {
printf("ThreadColPageIterator: finish initialzing from %s, %u columns\n",
col_pagefile_.c_str(), static_cast<unsigned>(col_ptr().size() - 1));
}
}
virtual void SetParam(const char *name, const char *val) {
if (!strcmp("col_pageratio", val)) col_pageratio_ = atof(val);
if (!strcmp("col_pagefile", val)) col_pagefile_ = val;
if (!strcmp("silent", val)) silent = atoi(val);
}
virtual void BeforeFirst(void) {
itr_.BeforeFirst();
}
virtual bool Next(void) {
if(!itr_.Next(page_)) return false;
out_ = page_->GetBatch();
return true;
}
virtual const ColBatch &Value(void) const{
return out_;
}
inline const std::vector<size_t> &col_ptr(void) const {
return itr_.get_factory().col_ptr();
}
inline void SetColSet(const std::vector<bst_uint> &cset, bool setall = false) {
itr_.get_factory().SetColSet(cset, setall);
}
private:
// shutup
int silent;
// input file
utils::FileStream *fi_;
// size of page
float col_pageratio_;
// name of file
std::string col_pagefile_;
// output data
ColBatch out_;
// page to be loaded
CSCMatrixManager::PagePtr page_;
// internal iterator
utils::ThreadBuffer<CSCMatrixManager::PagePtr,CSCMatrixManager> itr_;
};
/*!
* \brief sparse matrix that support column access
*/
class FMatrixPage : public IFMatrix {
public:
/*! \brief constructor */
FMatrixPage(utils::IIterator<RowBatch> *iter) {
this->row_iter_ = iter;
this->col_iter_ = NULL;
}
// destructor
virtual ~FMatrixPage(void) {
if (row_iter_ != NULL) delete row_iter_;
if (col_iter_ != NULL) delete col_iter_;
}
/*! \return whether column access is enabled */
virtual bool HaveColAccess(void) const {
return col_iter_ != NULL;
}
/*! \brief get number of colmuns */
virtual size_t NumCol(void) const {
utils::Check(this->HaveColAccess(), "NumCol:need column access");
return col_iter_->col_ptr().size() - 1;
}
/*! \brief get number of buffered rows */
virtual const std::vector<bst_uint> &buffered_rowset(void) const {
return buffered_rowset_;
}
/*! \brief get column size */
virtual size_t GetColSize(size_t cidx) const {
const std::vector<size_t> &col_ptr = col_iter_->col_ptr();
return col_ptr[cidx+1] - col_ptr[cidx];
}
/*! \brief get column density */
virtual float GetColDensity(size_t cidx) const {
const std::vector<size_t> &col_ptr = col_iter_->col_ptr();
size_t nmiss = buffered_rowset_.size() - (col_ptr[cidx+1] - col_ptr[cidx]);
return 1.0f - (static_cast<float>(nmiss)) / buffered_rowset_.size();
}
virtual void InitColAccess(float pkeep = 1.0f) {
if (this->HaveColAccess()) return;
this->InitColData(pkeep);
}
/*!
* \brief get the row iterator associated with FMatrix
*/
virtual utils::IIterator<RowBatch>* RowIterator(void) {
row_iter_->BeforeFirst();
return row_iter_;
}
/*!
* \brief get the column based iterator
*/
virtual utils::IIterator<ColBatch>* ColIterator(void) {
std::vector<bst_uint> cset;
col_iter_->SetColSet(cset, true);
col_iter_->BeforeFirst();
return col_iter_;
}
/*!
* \brief colmun based iterator
*/
virtual utils::IIterator<ColBatch> *ColIterator(const std::vector<bst_uint> &fset) {
col_iter_->SetColSet(fset, false);
col_iter_->BeforeFirst();
return col_iter_;
}
protected:
/*!
* \brief intialize column data
* \param pkeep probability to keep a row
*/
inline void InitColData(float pkeep) {
buffered_rowset_.clear();
// start working
row_iter_->BeforeFirst();
while (row_iter_->Next()) {
const RowBatch &batch = row_iter_->Value();
}
row_iter_->BeforeFirst();
size_t ktop = 0;
while (row_iter_->Next()) {
const RowBatch &batch = row_iter_->Value();
for (size_t i = 0; i < batch.size; ++i) {
if (ktop < buffered_rowset_.size() &&
buffered_rowset_[ktop] == batch.base_rowid + i) {
++ktop;
// TODO1
}
}
}
// sort columns
}
private:
// row iterator
utils::IIterator<RowBatch> *row_iter_;
// column iterator
ThreadColPageIterator *col_iter_;
/*! \brief list of row index that are buffered */
std::vector<bst_uint> buffered_rowset_;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_PAGE_FMATRIX_INL_HPP_

View File

@ -44,8 +44,8 @@ class DMatrixSimple : public DataMatrix {
} }
/*! \brief copy content data from source matrix */ /*! \brief copy content data from source matrix */
inline void CopyFrom(const DataMatrix &src) { inline void CopyFrom(const DataMatrix &src) {
this->info = src.info;
this->Clear(); this->Clear();
this->info = src.info;
// clone data content in thos matrix // clone data content in thos matrix
utils::IIterator<RowBatch> *iter = src.fmat()->RowIterator(); utils::IIterator<RowBatch> *iter = src.fmat()->RowIterator();
iter->BeforeFirst(); iter->BeforeFirst();

View File

@ -150,7 +150,7 @@ class FMatrixS : public IFMatrix{
iter_->BeforeFirst(); iter_->BeforeFirst();
while (iter_->Next()) { while (iter_->Next()) {
const RowBatch &batch = iter_->Value(); const RowBatch &batch = iter_->Value();
for (size_t i = 0; i < batch.size; ++i) { for (size_t i = 0; i < batch.size; ++i) {
if (pkeep == 1.0f || random::SampleBinary(pkeep)) { if (pkeep == 1.0f || random::SampleBinary(pkeep)) {
buffered_rowset_.push_back(static_cast<bst_uint>(batch.base_rowid+i)); buffered_rowset_.push_back(static_cast<bst_uint>(batch.base_rowid+i));
RowBatch::Inst inst = batch[i]; RowBatch::Inst inst = batch[i];

View File

@ -37,7 +37,9 @@ struct TrainParam{
// speed optimization for dense column // speed optimization for dense column
float opt_dense_col; float opt_dense_col;
// leaf vector size // leaf vector size
int size_leaf_vector; int size_leaf_vector;
// option for parallelization
int parallel_option;
// number of threads to be used for tree construction, // number of threads to be used for tree construction,
// if OpenMP is enabled, if equals 0, use system default // if OpenMP is enabled, if equals 0, use system default
int nthread; int nthread;
@ -55,6 +57,7 @@ struct TrainParam{
opt_dense_col = 1.0f; opt_dense_col = 1.0f;
nthread = 0; nthread = 0;
size_leaf_vector = 0; size_leaf_vector = 0;
parallel_option = 0;
} }
/*! /*!
* \brief set parameters from outside * \brief set parameters from outside
@ -79,6 +82,7 @@ struct TrainParam{
if (!strcmp(name, "size_leaf_vector")) size_leaf_vector = atoi(val); if (!strcmp(name, "size_leaf_vector")) size_leaf_vector = atoi(val);
if (!strcmp(name, "max_depth")) max_depth = atoi(val); if (!strcmp(name, "max_depth")) max_depth = atoi(val);
if (!strcmp(name, "nthread")) nthread = atoi(val); if (!strcmp(name, "nthread")) nthread = atoi(val);
if (!strcmp(name, "parallel_option")) parallel_option = atoi(val);
if (!strcmp(name, "default_direction")) { if (!strcmp(name, "default_direction")) {
if (!strcmp(val, "learn")) default_direction = 0; if (!strcmp(val, "learn")) default_direction = 0;
if (!strcmp(val, "left")) default_direction = 1; if (!strcmp(val, "left")) default_direction = 1;

View File

@ -45,15 +45,19 @@ class ColMaker: public IUpdater {
// data structure // data structure
/*! \brief per thread x per node entry to store tmp data */ /*! \brief per thread x per node entry to store tmp data */
struct ThreadEntry { struct ThreadEntry {
/*! \brief statistics of data*/ /*! \brief statistics of data */
TStats stats; TStats stats;
/*! \brief extra statistics of data */
TStats stats_extra;
/*! \brief last feature value scanned */ /*! \brief last feature value scanned */
float last_fvalue; float last_fvalue;
/*! \brief first feature value scanned */
float first_fvalue;
/*! \brief current best solution */ /*! \brief current best solution */
SplitEntry best; SplitEntry best;
// constructor // constructor
explicit ThreadEntry(const TrainParam &param) explicit ThreadEntry(const TrainParam &param)
: stats(param) { : stats(param), stats_extra(param) {
} }
}; };
struct NodeEntry { struct NodeEntry {
@ -219,7 +223,137 @@ class ColMaker: public IUpdater {
} }
// use new nodes for qexpand // use new nodes for qexpand
qexpand = newnodes; qexpand = newnodes;
} }
// parallel find the best split of current fid
// this function does not support nested functions
inline void ParallelFindSplit(const ColBatch::Inst &col,
bst_uint fid,
const IFMatrix &fmat,
const std::vector<bst_gpair> &gpair,
const BoosterInfo &info) {
bool need_forward = param.need_forward_search(fmat.GetColDensity(fid));
bool need_backward = param.need_backward_search(fmat.GetColDensity(fid));
int nthread;
#pragma omp parallel
{
const int tid = omp_get_thread_num();
std::vector<ThreadEntry> &temp = stemp[tid];
// cleanup temp statistics
for (size_t j = 0; j < qexpand.size(); ++j) {
temp[qexpand[j]].stats.Clear();
}
nthread = omp_get_num_threads();
bst_uint step = (col.length + nthread - 1) / nthread;
bst_uint end = std::min(col.length, step * (tid + 1));
for (bst_uint i = tid * step; i < end; ++i) {
const bst_uint ridx = col[i].index;
const int nid = position[ridx];
if (nid < 0) continue;
const float fvalue = col[i].fvalue;
if (temp[nid].stats.Empty()) {
temp[nid].first_fvalue = fvalue;
}
temp[nid].stats.Add(gpair, info, ridx);
temp[nid].last_fvalue = fvalue;
}
}
// start collecting the partial sum statistics
bst_omp_uint nnode = static_cast<bst_omp_uint>(qexpand.size());
#pragma omp parallel for schedule(static)
for (bst_omp_uint j = 0; j < nnode; ++j) {
const int nid = qexpand[j];
TStats sum(param), tmp(param), c(param);
for (int tid = 0; tid < nthread; ++tid) {
tmp = stemp[tid][nid].stats;
stemp[tid][nid].stats = sum;
sum.Add(tmp);
if (tid != 0) {
std::swap(stemp[tid - 1][nid].last_fvalue, stemp[tid][nid].first_fvalue);
}
}
for (int tid = 0; tid < nthread; ++tid) {
stemp[tid][nid].stats_extra = sum;
ThreadEntry &e = stemp[tid][nid];
float fsplit;
if (tid != 0) {
if(fabsf(stemp[tid - 1][nid].last_fvalue - e.first_fvalue) > rt_2eps) {
fsplit = (stemp[tid - 1][nid].last_fvalue - e.first_fvalue) * 0.5f;
} else {
continue;
}
} else {
fsplit = e.first_fvalue - rt_eps;
}
if (need_forward && tid != 0) {
c.SetSubstract(snode[nid].stats, e.stats);
if (c.sum_hess >= param.min_child_weight && e.stats.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(e.stats.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, fsplit, false);
}
}
if (need_backward) {
tmp.SetSubstract(sum, e.stats);
c.SetSubstract(snode[nid].stats, tmp);
if (c.sum_hess >= param.min_child_weight && tmp.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(tmp.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, fsplit, true);
}
}
}
if (need_backward) {
tmp = sum;
ThreadEntry &e = stemp[nthread-1][nid];
c.SetSubstract(snode[nid].stats, tmp);
if (c.sum_hess >= param.min_child_weight && tmp.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(tmp.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, e.last_fvalue + rt_eps, true);
}
}
}
// rescan, generate candidate split
#pragma omp parallel
{
TStats c(param), cright(param);
const int tid = omp_get_thread_num();
std::vector<ThreadEntry> &temp = stemp[tid];
nthread = static_cast<bst_uint>(omp_get_num_threads());
bst_uint step = (col.length + nthread - 1) / nthread;
bst_uint end = std::min(col.length, step * (tid + 1));
for (bst_uint i = tid * step; i < end; ++i) {
const bst_uint ridx = col[i].index;
const int nid = position[ridx];
if (nid < 0) continue;
const float fvalue = col[i].fvalue;
// get the statistics of nid
ThreadEntry &e = temp[nid];
if (e.stats.Empty()) {
e.stats.Add(gpair, info, ridx);
e.first_fvalue = fvalue;
} else {
// forward default right
if (fabsf(fvalue - e.first_fvalue) > rt_2eps){
if (need_forward) {
c.SetSubstract(snode[nid].stats, e.stats);
if (c.sum_hess >= param.min_child_weight && e.stats.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(e.stats.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, false);
}
}
if (need_backward) {
cright.SetSubstract(e.stats_extra, e.stats);
c.SetSubstract(snode[nid].stats, cright);
if (c.sum_hess >= param.min_child_weight && cright.sum_hess >= param.min_child_weight) {
bst_float loss_chg = static_cast<bst_float>(cright.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, true);
}
}
}
e.stats.Add(gpair, info, ridx);
e.first_fvalue = fvalue;
}
}
}
}
// enumerate the split values of specific feature // enumerate the split values of specific feature
inline void EnumerateSplit(const ColBatch::Entry *begin, inline void EnumerateSplit(const ColBatch::Entry *begin,
const ColBatch::Entry *end, const ColBatch::Entry *end,
@ -272,6 +406,38 @@ class ColMaker: public IUpdater {
} }
} }
} }
// update the solution candidate
virtual void UpdateSolution(const ColBatch &batch,
const std::vector<bst_gpair> &gpair,
const IFMatrix &fmat,
const BoosterInfo &info) {
// start enumeration
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
#if defined(_OPENMP)
const int batch_size = std::max(static_cast<int>(nsize / this->nthread / 32), 1);
#endif
if (param.parallel_option == 0) {
#pragma omp parallel for schedule(dynamic, batch_size)
for (bst_omp_uint i = 0; i < nsize; ++i) {
const bst_uint fid = batch.col_index[i];
const int tid = omp_get_thread_num();
const ColBatch::Inst c = batch[i];
if (param.need_forward_search(fmat.GetColDensity(fid))) {
this->EnumerateSplit(c.data, c.data + c.length, +1,
fid, gpair, info, stemp[tid]);
}
if (param.need_backward_search(fmat.GetColDensity(fid))) {
this->EnumerateSplit(c.data + c.length - 1, c.data - 1, -1,
fid, gpair, info, stemp[tid]);
}
}
} else {
for (bst_omp_uint i = 0; i < nsize; ++i) {
this->ParallelFindSplit(batch[i], batch.col_index[i],
fmat, gpair, info);
}
}
}
// find splits at current level, do split per level // find splits at current level, do split per level
inline void FindSplit(int depth, inline void FindSplit(int depth,
const std::vector<int> &qexpand, const std::vector<int> &qexpand,
@ -288,26 +454,7 @@ class ColMaker: public IUpdater {
} }
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator(feat_set); utils::IIterator<ColBatch> *iter = p_fmat->ColIterator(feat_set);
while (iter->Next()) { while (iter->Next()) {
const ColBatch &batch = iter->Value(); this->UpdateSolution(iter->Value(), gpair, *p_fmat, info);
// start enumeration
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
#if defined(_OPENMP)
const int batch_size = std::max(static_cast<int>(nsize / this->nthread / 32), 1);
#endif
#pragma omp parallel for schedule(dynamic, batch_size)
for (bst_omp_uint i = 0; i < nsize; ++i) {
const bst_uint fid = batch.col_index[i];
const int tid = omp_get_thread_num();
const ColBatch::Inst c = batch[i];
if (param.need_forward_search(p_fmat->GetColDensity(fid))) {
this->EnumerateSplit(c.data, c.data + c.length, +1,
fid, gpair, info, stemp[tid]);
}
if (param.need_backward_search(p_fmat->GetColDensity(fid))) {
this->EnumerateSplit(c.data + c.length - 1, c.data - 1, -1,
fid, gpair, info, stemp[tid]);
}
}
} }
// after this each thread's stemp will get the best candidates, aggregate results // after this each thread's stemp will get the best candidates, aggregate results
for (size_t i = 0; i < qexpand.size(); ++i) { for (size_t i = 0; i < qexpand.size(); ++i) {
@ -325,6 +472,7 @@ class ColMaker: public IUpdater {
} }
} }
} }
// reset position of each data points after split is created in the tree // reset position of each data points after split is created in the tree
inline void ResetPosition(const std::vector<int> &qexpand, IFMatrix *p_fmat, const RegTree &tree) { inline void ResetPosition(const std::vector<int> &qexpand, IFMatrix *p_fmat, const RegTree &tree) {
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset(); const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();

View File

@ -88,12 +88,21 @@ class IStream {
} }
}; };
/*! \brief implementation of file i/o stream */ /*! \brief interface of i/o stream that support seek */
class FileStream : public IStream { class ISeekStream: public IStream {
private:
FILE *fp;
public: public:
explicit FileStream(FILE *fp) : fp(fp) { /*! \brief seek to certain position of the file */
virtual void Seek(long pos) = 0;
/*! \brief tell the position of the stream */
virtual long Tell(void) = 0;
};
/*! \brief implementation of file i/o stream */
class FileStream : public ISeekStream {
public:
explicit FileStream(FILE *fp) : fp(fp) {}
explicit FileStream(void) {
this->fp = NULL;
} }
virtual size_t Read(void *ptr, size_t size) { virtual size_t Read(void *ptr, size_t size) {
return fread(ptr, size, 1, fp); return fread(ptr, size, 1, fp);
@ -101,12 +110,20 @@ class FileStream : public IStream {
virtual void Write(const void *ptr, size_t size) { virtual void Write(const void *ptr, size_t size) {
fwrite(ptr, size, 1, fp); fwrite(ptr, size, 1, fp);
} }
inline void Seek(size_t pos) { virtual void Seek(long pos) {
fseek(fp, 0, SEEK_SET); fseek(fp, pos, SEEK_SET);
}
virtual long Tell(void) {
return ftell(fp);
} }
inline void Close(void) { inline void Close(void) {
fclose(fp); if (fp != NULL){
fclose(fp); fp = NULL;
}
} }
private:
FILE *fp;
}; };
} // namespace utils } // namespace utils

View File

@ -7,6 +7,7 @@
*/ */
#include <vector> #include <vector>
#include <algorithm> #include <algorithm>
#include "./io.h"
#include "./utils.h" #include "./utils.h"
namespace xgboost { namespace xgboost {
@ -118,6 +119,117 @@ struct SparseCSRMBuilder {
} }
}; };
/*!
* \brief a class used to help construct CSR format matrix file
* \tparam IndexType type of index used to store the index position
* \tparam SizeType type of size used in row pointer
*/
template<typename IndexType, typename SizeType = size_t>
struct SparseCSRFileBuilder {
public:
explicit SparseCSRFileBuilder(utils::ISeekStream *fo, size_t buffer_size)
: fo(fo), buffer_size(buffer_size) {
}
/*!
* \brief step 1: initialize the number of rows in the data, not necessary exact
* \nrows number of rows in the matrix, can be smaller than expected
*/
inline void InitBudget(size_t nrows = 0) {
rptr.clear();
rptr.resize(nrows + 1, 0);
}
/*!
* \brief step 2: add budget to each rows
* \param row_id the id of the row
* \param nelem number of element budget add to this row
*/
inline void AddBudget(size_t row_id, SizeType nelem = 1) {
if (rptr.size() < row_id + 2) {
rptr.resize(row_id + 2, 0);
}
rptr[row_id + 1] += nelem;
}
/*! \brief step 3: initialize the necessary storage */
inline void InitStorage(void) {
SizeType nelem = 0;
for (size_t i = 1; i < rptr.size(); i++) {
nelem += rptr[i];
rptr[i] = nelem;
}
SizeType begin_meta = sizeof(SizeType) + nelem * sizeof(IndexType);
fo->Seek(0);
fo->Write(&begin_meta, sizeof(begin_meta));
fo->Seek(begin_meta);
fo->Write(rptr);
// setup buffer space
buffer_rptr.resize(rptr.size());
buffer_temp.reserve(buffer_size);
buffer_data.resize(buffer_size);
saved_offset.clear();
saved_offset.resize(rptr.size() - 1, 0);
this->ClearBuffer();
}
/*! \brief step 4: push element into buffer */
inline void PushElem(SizeType row_id, IndexType col_id) {
if (buffer_temp.size() == buffer_size) {
this->WriteBuffer();
this->ClearBuffer();
}
buffer_temp.push_back(std::make_pair(row_id, col_id));
}
/*! \brief finalize the construction */
inline void Finalize(void) {
this->WriteBuffer();
for (size_t i = 0; i < saved_offset.size(); ++i) {
utils::Assert(saved_offset[i] == rptr[i+1], "some block not write out");
}
}
protected:
inline void WriteBuffer(void) {
SizeType start = 0;
for (size_t i = 1; i < buffer_rptr.size(); ++i) {
size_t rlen = buffer_rptr[i];
buffer_rptr[i] = start;
start += rlen;
}
for (size_t i = 0; i < buffer_temp.size(); ++i) {
SizeType &rp = buffer_rptr[buffer_temp[i].first + 1];
buffer_data[rp++] = buffer_temp[i].second;
}
// write out
for (size_t i = 0; i < buffer_rptr.size(); ++i) {
size_t nelem = buffer_rptr[i+1] - buffer_rptr[i];
if (nelem != 0) {
utils::Assert(saved_offset[i] < rptr[i+1], "data exceed bound");
fo->Seek((rptr[i] + saved_offset[i]) * sizeof(IndexType) + sizeof(SizeType));
fo->Write(&buffer_data[0] + buffer_rptr[i], nelem * sizeof(IndexType));
saved_offset[i] += nelem;
}
}
}
inline void ClearBuffer(void) {
buffer_temp.clear();
std::fill(buffer_rptr.begin(), buffer_rptr.end(), 0);
}
private:
/*! \brief output file pointer the data */
utils::ISeekStream *fo;
/*! \brief pointer to each of the row */
std::vector<SizeType> rptr;
/*! \brief saved top space of each item */
std::vector<SizeType> saved_offset;
// ----- the following are buffer space
/*! \brief maximum size of content buffer*/
size_t buffer_size;
/*! \brief store the data content */
std::vector< std::pair<SizeType, IndexType> > buffer_temp;
/*! \brief saved top space of each item */
std::vector<SizeType> buffer_rptr;
/*! \brief saved top space of each item */
std::vector<IndexType> buffer_data;
};
} // namespace utils } // namespace utils
} // namespace xgboost } // namespace xgboost
#endif #endif

146
src/utils/thread.h Normal file
View File

@ -0,0 +1,146 @@
#ifndef XGBOOST_UTILS_THREAD_H
#define XGBOOST_UTILS_THREAD_H
/*!
* \file thread.h
* \brief this header include the minimum necessary resource for multi-threading
* \author Tianqi Chen
* Acknowledgement: this file is adapted from SVDFeature project, by same author.
* The MAC support part of this code is provided by Artemy Kolchinsky
*/
#ifdef _MSC_VER
#include "utils.h"
#include <windows.h>
#include <process.h>
namespace xgboost {
namespace utils {
/*! \brief simple semaphore used for synchronization */
class Semaphore {
public :
inline void Init(int init_val) {
sem = CreateSemaphore(NULL, init_val, 10, NULL);
utils::Assert(sem != NULL, "create Semaphore error");
}
inline void Destroy(void) {
CloseHandle(sem);
}
inline void Wait(void) {
utils::Assert(WaitForSingleObject(sem, INFINITE) == WAIT_OBJECT_0, "WaitForSingleObject error");
}
inline void Post(void) {
utils::Assert(ReleaseSemaphore(sem, 1, NULL) != 0, "ReleaseSemaphore error");
}
private:
HANDLE sem;
};
/*! \brief simple thread that wraps windows thread */
class Thread {
private:
HANDLE thread_handle;
unsigned thread_id;
public:
inline void Start(unsigned int __stdcall entry(void*), void *param) {
thread_handle = (HANDLE)_beginthreadex(NULL, 0, entry, param, 0, &thread_id);
}
inline int Join(void) {
WaitForSingleObject(thread_handle, INFINITE);
return 0;
}
};
/*! \brief exit function called from thread */
inline void ThreadExit(void *status) {
_endthreadex(0);
}
#define XGBOOST_THREAD_PREFIX unsigned int __stdcall
} // namespace utils
} // namespace xgboost
#else
// thread interface using g++
#include <semaphore.h>
#include <pthread.h>
namespace xgboost {
namespace utils {
/*!\brief semaphore class */
class Semaphore {
#ifdef __APPLE__
private:
sem_t* semPtr;
char sema_name[20];
private:
inline void GenRandomString(char *s, const int len) {
static const char alphanum[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" ;
for (int i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
public:
inline void Init(int init_val) {
sema_name[0]='/';
sema_name[1]='s';
sema_name[2]='e';
sema_name[3]='/';
GenRandomString(&sema_name[4], 16);
if((semPtr = sem_open(sema_name, O_CREAT, 0644, init_val)) == SEM_FAILED) {
perror("sem_open");
exit(1);
}
utils::Assert(semPtr != NULL, "create Semaphore error");
}
inline void Destroy(void) {
if (sem_close(semPtr) == -1) {
perror("sem_close");
exit(EXIT_FAILURE);
}
if (sem_unlink(sema_name) == -1) {
perror("sem_unlink");
exit(EXIT_FAILURE);
}
}
inline void Wait(void) {
sem_wait(semPtr);
}
inline void Post(void) {
sem_post(semPtr);
}
#else
private:
sem_t sem;
public:
inline void Init(int init_val) {
sem_init(&sem, 0, init_val);
}
inline void Destroy(void) {
sem_destroy(&sem);
}
inline void Wait(void) {
sem_wait(&sem);
}
inline void Post(void) {
sem_post(&sem);
}
#endif
};
/*!\brief simple thread class */
class Thread {
private:
pthread_t thread;
public :
inline void Start(void * entry(void*), void *param) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&thread, &attr, entry, param);
}
inline int Join(void) {
void *status;
return pthread_join(thread, &status);
}
};
inline void ThreadExit(void *status) {
pthread_exit(status);
}
} // namespace utils
} // namespace xgboost
#define XGBOOST_THREAD_PREFIX void *
#endif
#endif

203
src/utils/thread_buffer.h Normal file
View File

@ -0,0 +1,203 @@
#ifndef XGBOOST_UTILS_THREAD_BUFFER_H
#define XGBOOST_UTILS_THREAD_BUFFER_H
/*!
* \file thread_buffer.h
* \brief multi-thread buffer, iterator, can be used to create parallel pipeline
* \author Tianqi Chen
*/
#include <vector>
#include <cstring>
#include <cstdlib>
#include "./utils.h"
#include "./thread.h"
namespace xgboost {
namespace utils {
/*!
* \brief buffered loading iterator that uses multithread
* this template method will assume the following paramters
* \tparam Elem elememt type to be buffered
* \tparam ElemFactory factory type to implement in order to use thread buffer
*/
template<typename Elem, typename ElemFactory>
class ThreadBuffer {
public:
/*!\brief constructor */
ThreadBuffer(void) {
this->init_end = false;
this->buf_size = 30;
}
~ThreadBuffer(void) {
if(init_end) this->Destroy();
}
/*!\brief set parameter, will also pass the parameter to factory */
inline void SetParam(const char *name, const char *val) {
if (!strcmp( name, "buffer_size")) buf_size = atoi(val);
factory.SetParam(name, val);
}
/*!
* \brief initalize the buffered iterator
* \param param a initialize parameter that will pass to factory, ignore it if not necessary
* \return false if the initlization can't be done, e.g. buffer file hasn't been created
*/
inline bool Init(void) {
if (!factory.Init()) return false;
for (int i = 0; i < buf_size; ++i) {
bufA.push_back(factory.Create());
bufB.push_back(factory.Create());
}
this->init_end = true;
this->StartLoader();
return true;
}
/*!\brief place the iterator before first value */
inline void BeforeFirst(void) {
// wait till last loader end
loading_end.Wait();
// critcal zone
current_buf = 1;
factory.BeforeFirst();
// reset terminate limit
endA = endB = buf_size;
// wake up loader for first part
loading_need.Post();
// wait til first part is loaded
loading_end.Wait();
// set current buf to right value
current_buf = 0;
// wake loader for next part
data_loaded = false;
loading_need.Post();
// set buffer value
buf_index = 0;
}
/*! \brief destroy the buffer iterator, will deallocate the buffer */
inline void Destroy(void) {
// wait until the signal is consumed
this->destroy_signal = true;
loading_need.Post();
loader_thread.Join();
loading_need.Destroy();
loading_end.Destroy();
for (size_t i = 0; i < bufA.size(); ++i) {
factory.FreeSpace(bufA[i]);
}
for (size_t i = 0; i < bufB.size(); ++i) {
factory.FreeSpace(bufB[i]);
}
bufA.clear(); bufB.clear();
factory.Destroy();
this->init_end = false;
}
/*!
* \brief get the next element needed in buffer
* \param elem element to store into
* \return whether reaches end of data
*/
inline bool Next(Elem &elem) {
// end of buffer try to switch
if (buf_index == buf_size) {
this->SwitchBuffer();
buf_index = 0;
}
if (buf_index >= (current_buf ? endA : endB)) {
return false;
}
std::vector<Elem> &buf = current_buf ? bufA : bufB;
elem = buf[buf_index];
++buf_index;
return true;
}
/*!
* \brief get the factory object
*/
inline ElemFactory &get_factory(void) {
return factory;
}
inline const ElemFactory &get_factory(void) const{
return factory;
}
// size of buffer
int buf_size;
private:
// factory object used to load configures
ElemFactory factory;
// index in current buffer
int buf_index;
// indicate which one is current buffer
int current_buf;
// max limit of visit, also marks termination
int endA, endB;
// double buffer, one is accessed by loader
// the other is accessed by consumer
// buffer of the data
std::vector<Elem> bufA, bufB;
// initialization end
bool init_end;
// singal whether the data is loaded
bool data_loaded;
// signal to kill the thread
bool destroy_signal;
// thread object
Thread loader_thread;
// signal of the buffer
Semaphore loading_end, loading_need;
/*!
* \brief slave thread
* this implementation is like producer-consumer style
*/
inline void RunLoader(void) {
while(!destroy_signal) {
// sleep until loading is needed
loading_need.Wait();
std::vector<Elem> &buf = current_buf ? bufB : bufA;
int i;
for (i = 0; i < buf_size ; ++i) {
if (!factory.LoadNext(buf[i])) {
int &end = current_buf ? endB : endA;
end = i; // marks the termination
break;
}
}
// signal that loading is done
data_loaded = true;
loading_end.Post();
}
}
/*!\brief entry point of loader thread */
inline static XGBOOST_THREAD_PREFIX LoaderEntry(void *pthread) {
static_cast< ThreadBuffer<Elem,ElemFactory>* >(pthread)->RunLoader();
ThreadExit(NULL);
return NULL;
}
/*!\brief start loader thread */
inline void StartLoader(void) {
destroy_signal = false;
// set param
current_buf = 1;
loading_need.Init(1);
loading_end .Init(0);
// reset terminate limit
endA = endB = buf_size;
loader_thread.Start(LoaderEntry, this);
// wait until first part of data is loaded
loading_end.Wait();
// set current buf to right value
current_buf = 0;
// wake loader for next part
data_loaded = false;
loading_need.Post();
buf_index = 0;
}
/*!\brief switch double buffer */
inline void SwitchBuffer(void) {
loading_end.Wait();
// loader shall be sleep now, critcal zone!
current_buf = !current_buf;
// wake up loader
data_loaded = false;
loading_need.Post();
}
};
} // namespace utils
} // namespace xgboost
#endif

View File

@ -3,10 +3,11 @@
import ctypes import ctypes
import os import os
# optinally have scipy sparse, though not necessary # optinally have scipy sparse, though not necessary
import numpy import numpy as np
import sys import sys
import numpy.ctypeslib import numpy.ctypeslib
import scipy.sparse as scp import scipy.sparse as scp
import random
# set this line correctly # set this line correctly
if os.name == 'nt': if os.name == 'nt':
@ -32,18 +33,30 @@ xglib.XGBoosterDumpModel.restype = ctypes.POINTER(ctypes.c_char_p)
def ctypes2numpy(cptr, length, dtype): def ctypes2numpy(cptr, length, dtype):
# convert a ctypes pointer array to numpy """convert a ctypes pointer array to numpy array """
assert isinstance(cptr, ctypes.POINTER(ctypes.c_float)) assert isinstance(cptr, ctypes.POINTER(ctypes.c_float))
res = numpy.zeros(length, dtype=dtype) res = numpy.zeros(length, dtype=dtype)
assert ctypes.memmove(res.ctypes.data, cptr, length * res.strides[0]) assert ctypes.memmove(res.ctypes.data, cptr, length * res.strides[0])
return res return res
# data matrix used in xgboost
class DMatrix: class DMatrix:
"""data matrix used in xgboost"""
# constructor # constructor
def __init__(self, data, label=None, missing=0.0, weight = None): def __init__(self, data, label=None, missing=0.0, weight = None):
""" constructor of DMatrix
Args:
data: string/numpy array/scipy.sparse
data source, string type is the path of svmlight format txt file or xgb buffer
label: list or numpy 1d array, optional
label of training data
missing: float
value in data which need to be present as missing value
weight: list or numpy 1d array, optional
weight for each instances
"""
# force into void_p, mac need to pass things in as void_p # force into void_p, mac need to pass things in as void_p
if data == None: if data is None:
self.handle = None self.handle = None
return return
if isinstance(data, str): if isinstance(data, str):
@ -63,22 +76,25 @@ class DMatrix:
self.set_label(label) self.set_label(label)
if weight !=None: if weight !=None:
self.set_weight(weight) self.set_weight(weight)
# convert data from csr matrix
def __init_from_csr(self, csr): def __init_from_csr(self, csr):
"""convert data from csr matrix"""
assert len(csr.indices) == len(csr.data) assert len(csr.indices) == len(csr.data)
self.handle = ctypes.c_void_p(xglib.XGDMatrixCreateFromCSR( self.handle = ctypes.c_void_p(xglib.XGDMatrixCreateFromCSR(
(ctypes.c_ulong * len(csr.indptr))(*csr.indptr), (ctypes.c_ulong * len(csr.indptr))(*csr.indptr),
(ctypes.c_uint * len(csr.indices))(*csr.indices), (ctypes.c_uint * len(csr.indices))(*csr.indices),
(ctypes.c_float * len(csr.data))(*csr.data), (ctypes.c_float * len(csr.data))(*csr.data),
len(csr.indptr), len(csr.data))) len(csr.indptr), len(csr.data)))
# convert data from numpy matrix
def __init_from_npy2d(self,mat,missing): def __init_from_npy2d(self,mat,missing):
"""convert data from numpy matrix"""
data = numpy.array(mat.reshape(mat.size), dtype='float32') data = numpy.array(mat.reshape(mat.size), dtype='float32')
self.handle = ctypes.c_void_p(xglib.XGDMatrixCreateFromMat( self.handle = ctypes.c_void_p(xglib.XGDMatrixCreateFromMat(
data.ctypes.data_as(ctypes.POINTER(ctypes.c_float)), data.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
mat.shape[0], mat.shape[1], ctypes.c_float(missing))) mat.shape[0], mat.shape[1], ctypes.c_float(missing)))
# destructor
def __del__(self): def __del__(self):
"""destructor"""
xglib.XGDMatrixFree(self.handle) xglib.XGDMatrixFree(self.handle)
def get_float_info(self, field): def get_float_info(self, field):
length = ctypes.c_ulong() length = ctypes.c_ulong()
@ -96,16 +112,39 @@ class DMatrix:
def set_uint_info(self, field, data): def set_uint_info(self, field, data):
xglib.XGDMatrixSetUIntInfo(self.handle, ctypes.c_char_p(field.encode('utf-8')), xglib.XGDMatrixSetUIntInfo(self.handle, ctypes.c_char_p(field.encode('utf-8')),
(ctypes.c_uint*len(data))(*data), len(data)) (ctypes.c_uint*len(data))(*data), len(data))
# load data from file
def save_binary(self, fname, silent=True): def save_binary(self, fname, silent=True):
"""save DMatrix to XGBoost buffer
Args:
fname: string
name of buffer file
slient: bool, option
whether print info
Returns:
None
"""
xglib.XGDMatrixSaveBinary(self.handle, ctypes.c_char_p(fname.encode('utf-8')), int(silent)) xglib.XGDMatrixSaveBinary(self.handle, ctypes.c_char_p(fname.encode('utf-8')), int(silent))
# set label of dmatrix
def set_label(self, label): def set_label(self, label):
"""set label of dmatrix
Args:
label: list
label for DMatrix
Returns:
None
"""
self.set_float_info('label', label) self.set_float_info('label', label)
# set weight of each instances
def set_weight(self, weight): def set_weight(self, weight):
"""set weight of each instances
Args:
weight: float
weight for positive instance
Returns:
None
"""
self.set_float_info('weight', weight) self.set_float_info('weight', weight)
# set initialized margin prediction
def set_base_margin(self, margin): def set_base_margin(self, margin):
""" """
set base margin of booster to start from set base margin of booster to start from
@ -116,31 +155,143 @@ class DMatrix:
see also example/demo.py see also example/demo.py
""" """
self.set_float_info('base_margin', margin) self.set_float_info('base_margin', margin)
# set group size of dmatrix, used for rank
def set_group(self, group): def set_group(self, group):
"""set group size of dmatrix, used for rank
Args:
group:
Returns:
None
"""
xglib.XGDMatrixSetGroup(self.handle, (ctypes.c_uint*len(group))(*group), len(group)) xglib.XGDMatrixSetGroup(self.handle, (ctypes.c_uint*len(group))(*group), len(group))
# get label from dmatrix
def get_label(self): def get_label(self):
"""get label from dmatrix
Args:
None
Returns:
list, label of data
"""
return self.get_float_info('label') return self.get_float_info('label')
# get weight from dmatrix
def get_weight(self): def get_weight(self):
"""get weight from dmatrix
Args:
None
Returns:
float, weight
"""
return self.get_float_info('weight') return self.get_float_info('weight')
# get base_margin from dmatrix
def get_base_margin(self): def get_base_margin(self):
"""get base_margin from dmatrix
Args:
None
Returns:
float, base margin
"""
return self.get_float_info('base_margin') return self.get_float_info('base_margin')
def num_row(self): def num_row(self):
"""get number of rows
Args:
None
Returns:
int, num rows
"""
return xglib.XGDMatrixNumRow(self.handle) return xglib.XGDMatrixNumRow(self.handle)
# slice the DMatrix to return a new DMatrix that only contains rindex
def slice(self, rindex): def slice(self, rindex):
"""slice the DMatrix to return a new DMatrix that only contains rindex
Args:
rindex: list
list of index to be chosen
Returns:
res: DMatrix
new DMatrix with chosen index
"""
res = DMatrix(None) res = DMatrix(None)
res.handle = ctypes.c_void_p(xglib.XGDMatrixSliceDMatrix( res.handle = ctypes.c_void_p(xglib.XGDMatrixSliceDMatrix(
self.handle, (ctypes.c_int*len(rindex))(*rindex), len(rindex))) self.handle, (ctypes.c_int*len(rindex))(*rindex), len(rindex)))
return res return res
class CVPack:
def __init__(self, dtrain, dtest, param):
self.dtrain = dtrain
self.dtest = dtest
self.watchlist = watchlist = [ (dtrain,'train'), (dtest, 'test') ]
self.bst = Booster(param, [dtrain,dtest])
def update(self,r):
self.bst.update(self.dtrain, r)
def eval(self,r):
return self.bst.eval_set(self.watchlist, r)
def mknfold(dall, nfold, param, seed, weightscale=None):
"""
mk nfold list of cvpack from randidx
"""
randidx = range(dall.num_row())
random.seed(seed)
random.shuffle(randidx)
idxset = []
kstep = len(randidx) / nfold
for i in range(nfold):
idxset.append(randidx[ (i*kstep) : min(len(randidx),(i+1)*kstep) ])
ret = []
for k in range(nfold):
trainlst = []
for j in range(nfold):
if j == k:
testlst = idxset[j]
else:
trainlst += idxset[j]
dtrain = dall.slice(trainlst)
dtest = dall.slice(testlst)
# rescale weight of dtrain and dtest
if weightscale != None:
dtrain.set_weight( dtrain.get_weight() * weightscale * dall.num_row() / dtrain.num_row() )
dtest.set_weight( dtest.get_weight() * weightscale * dall.num_row() / dtest.num_row() )
ret.append(CVPack(dtrain, dtest, param))
return ret
def aggcv(rlist):
"""
aggregate cross validation results
"""
cvmap = {}
arr = rlist[0].split()
ret = arr[0]
for it in arr[1:]:
k, v = it.split(':')
cvmap[k] = [float(v)]
for line in rlist[1:]:
arr = line.split()
assert ret == arr[0]
for it in arr[1:]:
k, v = it.split(':')
cvmap[k].append(float(v))
for k, v in sorted(cvmap.items(), key = lambda x:x[0]):
v = np.array(v)
ret += '\t%s:%f+%f' % (k, np.mean(v), np.std(v))
return ret
class Booster: class Booster:
"""learner class """ """learner class """
def __init__(self, params={}, cache=[], model_file = None): def __init__(self, params={}, cache=[], model_file = None):
""" constructor, param: """ """ constructor
Args:
params: dict
params for boosters
cache: list
list of cache item
model_file: string
path of model file
Returns:
None
"""
for d in cache: for d in cache:
assert isinstance(d, DMatrix) assert isinstance(d, DMatrix)
dmats = (ctypes.c_void_p * len(cache))(*[ d.handle for d in cache]) dmats = (ctypes.c_void_p * len(cache))(*[ d.handle for d in cache])
@ -166,16 +317,30 @@ class Booster:
xglib.XGBoosterSetParam( xglib.XGBoosterSetParam(
self.handle, ctypes.c_char_p(k.encode('utf-8')), self.handle, ctypes.c_char_p(k.encode('utf-8')),
ctypes.c_char_p(str(v).encode('utf-8'))) ctypes.c_char_p(str(v).encode('utf-8')))
def update(self, dtrain, it): def update(self, dtrain, it):
""" """
update update
dtrain: the training DMatrix Args:
it: current iteration number dtrain: DMatrix
the training DMatrix
it: int
current iteration number
Returns:
None
""" """
assert isinstance(dtrain, DMatrix) assert isinstance(dtrain, DMatrix)
xglib.XGBoosterUpdateOneIter(self.handle, it, dtrain.handle) xglib.XGBoosterUpdateOneIter(self.handle, it, dtrain.handle)
def boost(self, dtrain, grad, hess): def boost(self, dtrain, grad, hess):
""" update """ """ update
Args:
dtrain: DMatrix
the training DMatrix
grad: list
the first order of gradient
hess: list
the second order of gradient
"""
assert len(grad) == len(hess) assert len(grad) == len(hess)
assert isinstance(dtrain, DMatrix) assert isinstance(dtrain, DMatrix)
xglib.XGBoosterBoostOneIter(self.handle, dtrain.handle, xglib.XGBoosterBoostOneIter(self.handle, dtrain.handle,
@ -183,6 +348,14 @@ class Booster:
(ctypes.c_float*len(hess))(*hess), (ctypes.c_float*len(hess))(*hess),
len(grad)) len(grad))
def eval_set(self, evals, it = 0): def eval_set(self, evals, it = 0):
"""evaluates by metric
Args:
evals: list of tuple (DMatrix, string)
lists of items to be evaluated
it: int
Returns:
evals result
"""
for d in evals: for d in evals:
assert isinstance(d[0], DMatrix) assert isinstance(d[0], DMatrix)
assert isinstance(d[1], str) assert isinstance(d[1], str)
@ -195,22 +368,48 @@ class Booster:
def predict(self, data, output_margin=False, ntree_limit=0): def predict(self, data, output_margin=False, ntree_limit=0):
""" """
predict with data predict with data
data: the dmatrix storing the input Args:
output_margin: whether output raw margin value that is untransformed data: DMatrix
ntree_limit: limit number of trees in prediction, default to 0, 0 means using all the trees the dmatrix storing the input
output_margin: bool
whether output raw margin value that is untransformed
ntree_limit: limit number of trees in prediction, default to 0, 0 means using all the trees
Returns:
numpy array of prediction
""" """
length = ctypes.c_ulong() length = ctypes.c_ulong()
preds = xglib.XGBoosterPredict(self.handle, data.handle, preds = xglib.XGBoosterPredict(self.handle, data.handle,
int(output_margin), ntree_limit, ctypes.byref(length)) int(output_margin), ntree_limit, ctypes.byref(length))
return ctypes2numpy(preds, length.value, 'float32') return ctypes2numpy(preds, length.value, 'float32')
def save_model(self, fname): def save_model(self, fname):
""" save model to file """ """ save model to file
Args:
fname: string
file name of saving model
Returns:
None
"""
xglib.XGBoosterSaveModel(self.handle, ctypes.c_char_p(fname.encode('utf-8'))) xglib.XGBoosterSaveModel(self.handle, ctypes.c_char_p(fname.encode('utf-8')))
def load_model(self, fname): def load_model(self, fname):
"""load model from file""" """load model from file
Args:
fname: string
file name of saving model
Returns:
None
"""
xglib.XGBoosterLoadModel( self.handle, ctypes.c_char_p(fname.encode('utf-8')) ) xglib.XGBoosterLoadModel( self.handle, ctypes.c_char_p(fname.encode('utf-8')) )
def dump_model(self, fo, fmap=''): def dump_model(self, fo, fmap=''):
"""dump model into text file""" """dump model into text file
Args:
fo: string
file name to be dumped
fmap: string, optional
file name of feature map names
Returns:
None
"""
if isinstance(fo,str): if isinstance(fo,str):
fo = open(fo,'w') fo = open(fo,'w')
need_close = True need_close = True
@ -249,7 +448,17 @@ class Booster:
return fmap return fmap
def evaluate(bst, evals, it, feval = None): def evaluate(bst, evals, it, feval = None):
"""evaluation on eval set""" """evaluation on eval set
Args:
bst: XGBoost object
object of XGBoost model
evals: list of tuple (DMatrix, string)
obj need to be evaluated
it: int
feval: optional
Returns:
eval result
"""
if feval != None: if feval != None:
res = '[%d]' % it res = '[%d]' % it
for dm, evname in evals: for dm, evname in evals:
@ -260,10 +469,24 @@ def evaluate(bst, evals, it, feval = None):
return res return res
def train(params, dtrain, num_boost_round = 10, evals = [], obj=None, feval=None): def train(params, dtrain, num_boost_round = 10, evals = [], obj=None, feval=None):
""" train a booster with given paramaters """ """ train a booster with given paramaters
Args:
params: dict
params of booster
dtrain: DMatrix
data to be trained
num_boost_round: int
num of round to be boosted
evals: list
list of items to be evaluated
obj:
feval:
"""
bst = Booster(params, [dtrain]+[ d[0] for d in evals ] ) bst = Booster(params, [dtrain]+[ d[0] for d in evals ] )
if obj == None: if obj is None:
for i in range(num_boost_round): for i in range(num_boost_round):
bst.update( dtrain, i ) bst.update( dtrain, i )
if len(evals) != 0: if len(evals) != 0:
@ -277,3 +500,27 @@ def train(params, dtrain, num_boost_round = 10, evals = [], obj=None, feval=None
if len(evals) != 0: if len(evals) != 0:
sys.stderr.write(evaluate(bst, evals, i, feval)+'\n') sys.stderr.write(evaluate(bst, evals, i, feval)+'\n')
return bst return bst
def cv(params, dtrain, num_boost_round = 10, nfold=3, evals = [], obj=None, feval=None):
""" cross validation with given paramaters
Args:
params: dict
params of booster
dtrain: DMatrix
data to be trained
num_boost_round: int
num of round to be boosted
nfold: int
folds to do cv
evals: list
list of items to be evaluated
obj:
feval:
"""
plst = list(params.items())+[('eval_metric', itm) for itm in evals]
cvfolds = mknfold(dtrain, nfold, plst, 0)
for i in range(num_boost_round):
for f in cvfolds:
f.update(i)
res = aggcv([f.eval(i) for f in cvfolds])
sys.stderr.write(res+'\n')