Merge branch 'unity'
Conflicts: src/utils/io.h wrapper/xgboost.py
This commit is contained in:
commit
5f6e849b21
4
.gitignore
vendored
4
.gitignore
vendored
@ -2,7 +2,7 @@
|
|||||||
*.slo
|
*.slo
|
||||||
*.lo
|
*.lo
|
||||||
*.o
|
*.o
|
||||||
|
*.page
|
||||||
# Compiled Dynamic libraries
|
# Compiled Dynamic libraries
|
||||||
*.so
|
*.so
|
||||||
*.dylib
|
*.dylib
|
||||||
@ -44,3 +44,5 @@ Debug
|
|||||||
*dump
|
*dump
|
||||||
*save
|
*save
|
||||||
*csv
|
*csv
|
||||||
|
*.cpage.col
|
||||||
|
*.cpage
|
||||||
|
|||||||
@ -5,5 +5,3 @@ PKG_CPPFLAGS= -DXGBOOST_CUSTOMIZE_MSG_ -DXGBOOST_CUSTOMIZE_PRNG_ -DXGBOOST_STRIC
|
|||||||
PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS)
|
PKG_CXXFLAGS= $(SHLIB_OPENMP_CFLAGS)
|
||||||
PKG_LIBS = $(SHLIB_OPENMP_CFLAGS)
|
PKG_LIBS = $(SHLIB_OPENMP_CFLAGS)
|
||||||
OBJECTS= xgboost_R.o xgboost_assert.o $(PKGROOT)/wrapper/xgboost_wrapper.o $(PKGROOT)/src/io/io.o $(PKGROOT)/src/gbm/gbm.o $(PKGROOT)/src/tree/updater.o
|
OBJECTS= xgboost_R.o xgboost_assert.o $(PKGROOT)/wrapper/xgboost_wrapper.o $(PKGROOT)/src/io/io.o $(PKGROOT)/src/gbm/gbm.o $(PKGROOT)/src/tree/updater.o
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -3,12 +3,13 @@
|
|||||||
#include <utility>
|
#include <utility>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
#include "xgboost_R.h"
|
|
||||||
#include "wrapper/xgboost_wrapper.h"
|
#include "wrapper/xgboost_wrapper.h"
|
||||||
#include "src/utils/utils.h"
|
#include "src/utils/utils.h"
|
||||||
#include "src/utils/omp.h"
|
#include "src/utils/omp.h"
|
||||||
#include "src/utils/matrix_csr.h"
|
#include "src/utils/matrix_csr.h"
|
||||||
using namespace std;
|
|
||||||
|
#include "xgboost_R.h"
|
||||||
|
|
||||||
using namespace xgboost;
|
using namespace xgboost;
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
|||||||
@ -5,6 +5,9 @@
|
|||||||
#include "../utils/io.h"
|
#include "../utils/io.h"
|
||||||
#include "../utils/utils.h"
|
#include "../utils/utils.h"
|
||||||
#include "simple_dmatrix-inl.hpp"
|
#include "simple_dmatrix-inl.hpp"
|
||||||
|
#include "page_dmatrix-inl.hpp"
|
||||||
|
#include "page_fmatrix-inl.hpp"
|
||||||
|
|
||||||
// implements data loads using dmatrix simple for now
|
// implements data loads using dmatrix simple for now
|
||||||
|
|
||||||
namespace xgboost {
|
namespace xgboost {
|
||||||
@ -21,6 +24,18 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer) {
|
|||||||
fs.Close();
|
fs.Close();
|
||||||
return dmat;
|
return dmat;
|
||||||
}
|
}
|
||||||
|
if (magic == DMatrixPage::kMagic) {
|
||||||
|
DMatrixPage *dmat = new DMatrixPage();
|
||||||
|
dmat->Load(fs, silent, fname);
|
||||||
|
// the file pointer is hold in page matrix
|
||||||
|
return dmat;
|
||||||
|
}
|
||||||
|
if (magic == DMatrixColPage::kMagic) {
|
||||||
|
DMatrixColPage *dmat = new DMatrixColPage(fname);
|
||||||
|
dmat->Load(fs, silent, fname);
|
||||||
|
// the file pointer is hold in page matrix
|
||||||
|
return dmat;
|
||||||
|
}
|
||||||
fs.Close();
|
fs.Close();
|
||||||
|
|
||||||
DMatrixSimple *dmat = new DMatrixSimple();
|
DMatrixSimple *dmat = new DMatrixSimple();
|
||||||
@ -29,11 +44,21 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, bool savebuffer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) {
|
void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) {
|
||||||
|
if (!strcmp(fname + strlen(fname) - 5, ".page")) {
|
||||||
|
DMatrixPage::Save(fname, dmat, silent);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!strcmp(fname + strlen(fname) - 6, ".cpage")) {
|
||||||
|
DMatrixColPage::Save(fname, dmat, silent);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (dmat.magic == DMatrixSimple::kMagic) {
|
if (dmat.magic == DMatrixSimple::kMagic) {
|
||||||
const DMatrixSimple *p_dmat = static_cast<const DMatrixSimple*>(&dmat);
|
const DMatrixSimple *p_dmat = static_cast<const DMatrixSimple*>(&dmat);
|
||||||
p_dmat->SaveBinary(fname, silent);
|
p_dmat->SaveBinary(fname, silent);
|
||||||
} else {
|
} else {
|
||||||
utils::Error("not implemented");
|
DMatrixSimple smat;
|
||||||
|
smat.CopyFrom(dmat);
|
||||||
|
smat.SaveBinary(fname, silent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
275
src/io/page_dmatrix-inl.hpp
Normal file
275
src/io/page_dmatrix-inl.hpp
Normal file
@ -0,0 +1,275 @@
|
|||||||
|
#ifndef XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_
|
||||||
|
#define XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_
|
||||||
|
/*!
|
||||||
|
* \file page_row_iter-inl.hpp
|
||||||
|
* row iterator based on sparse page
|
||||||
|
* \author Tianqi Chen
|
||||||
|
*/
|
||||||
|
#include <vector>
|
||||||
|
#include "../data.h"
|
||||||
|
#include "../utils/iterator.h"
|
||||||
|
#include "../utils/thread_buffer.h"
|
||||||
|
#include "./simple_fmatrix-inl.hpp"
|
||||||
|
|
||||||
|
namespace xgboost {
|
||||||
|
namespace io {
|
||||||
|
/*! \brief page structure that can be used to store a rowbatch */
|
||||||
|
struct RowBatchPage {
|
||||||
|
public:
|
||||||
|
explicit RowBatchPage(size_t page_size) : kPageSize(page_size) {
|
||||||
|
data_ = new int[kPageSize];
|
||||||
|
utils::Assert(data_ != NULL, "fail to allocate row batch page");
|
||||||
|
this->Clear();
|
||||||
|
}
|
||||||
|
~RowBatchPage(void) {
|
||||||
|
if (data_ != NULL) delete [] data_;
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief Push one row into page
|
||||||
|
* \param row an instance row
|
||||||
|
* \return false or true to push into
|
||||||
|
*/
|
||||||
|
inline bool PushRow(const RowBatch::Inst &row) {
|
||||||
|
const size_t dsize = row.length * sizeof(RowBatch::Entry);
|
||||||
|
if (FreeBytes() < dsize+ sizeof(int)) return false;
|
||||||
|
row_ptr(Size() + 1) = row_ptr(Size()) + row.length;
|
||||||
|
memcpy(data_ptr(row_ptr(Size())) , row.data, dsize);
|
||||||
|
++data_[0];
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief get a row batch representation from the page
|
||||||
|
* \param p_rptr a temporal space that can be used to provide
|
||||||
|
* ind_ptr storage for RowBatch
|
||||||
|
* \return a new RowBatch object
|
||||||
|
*/
|
||||||
|
inline RowBatch GetRowBatch(std::vector<size_t> *p_rptr, size_t base_rowid) {
|
||||||
|
RowBatch batch;
|
||||||
|
batch.base_rowid = base_rowid;
|
||||||
|
batch.data_ptr = this->data_ptr(0);
|
||||||
|
batch.size = static_cast<size_t>(this->Size());
|
||||||
|
std::vector<size_t> &rptr = *p_rptr;
|
||||||
|
rptr.resize(this->Size() + 1);
|
||||||
|
for (size_t i = 0; i < rptr.size(); ++i) {
|
||||||
|
rptr[i] = static_cast<size_t>(this->row_ptr(static_cast<int>(i)));
|
||||||
|
}
|
||||||
|
batch.ind_ptr = &rptr[0];
|
||||||
|
return batch;
|
||||||
|
}
|
||||||
|
/*! \brief get i-th row from the batch */
|
||||||
|
inline RowBatch::Inst operator[](int i) {
|
||||||
|
return RowBatch::Inst(data_ptr(0) + row_ptr(i),
|
||||||
|
static_cast<bst_uint>(row_ptr(i+1) - row_ptr(i)));
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief clear the page, cleanup the content
|
||||||
|
*/
|
||||||
|
inline void Clear(void) {
|
||||||
|
memset(&data_[0], 0, sizeof(int) * kPageSize);
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief load one page form instream
|
||||||
|
* \return true if loading is successful
|
||||||
|
*/
|
||||||
|
inline bool Load(utils::IStream &fi) {
|
||||||
|
return fi.Read(&data_[0], sizeof(int) * kPageSize) != 0;
|
||||||
|
}
|
||||||
|
/*! \brief save one page into outstream */
|
||||||
|
inline void Save(utils::IStream &fo) {
|
||||||
|
fo.Write(&data_[0], sizeof(int) * kPageSize);
|
||||||
|
}
|
||||||
|
/*! \return number of elements */
|
||||||
|
inline int Size(void) const {
|
||||||
|
return data_[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
/*! \return number of elements */
|
||||||
|
inline size_t FreeBytes(void) {
|
||||||
|
return (kPageSize - (Size() + 2)) * sizeof(int) -
|
||||||
|
row_ptr(Size()) * sizeof(RowBatch::Entry);
|
||||||
|
}
|
||||||
|
/*! \brief equivalent row pointer at i */
|
||||||
|
inline int& row_ptr(int i) {
|
||||||
|
return data_[kPageSize - i - 1];
|
||||||
|
}
|
||||||
|
inline RowBatch::Entry* data_ptr(int i) {
|
||||||
|
return (RowBatch::Entry*)(&data_[1]) + i;
|
||||||
|
}
|
||||||
|
// page size
|
||||||
|
const size_t kPageSize;
|
||||||
|
// content of data
|
||||||
|
int *data_;
|
||||||
|
};
|
||||||
|
/*! \brief thread buffer iterator */
|
||||||
|
class ThreadRowPageIterator: public utils::IIterator<RowBatch> {
|
||||||
|
public:
|
||||||
|
ThreadRowPageIterator(void) {
|
||||||
|
itr.SetParam("buffer_size", "2");
|
||||||
|
page_ = NULL;
|
||||||
|
base_rowid_ = 0;
|
||||||
|
}
|
||||||
|
virtual ~ThreadRowPageIterator(void) {}
|
||||||
|
virtual void Init(void) {
|
||||||
|
}
|
||||||
|
virtual void BeforeFirst(void) {
|
||||||
|
itr.BeforeFirst();
|
||||||
|
base_rowid_ = 0;
|
||||||
|
}
|
||||||
|
virtual bool Next(void) {
|
||||||
|
if (!itr.Next(page_)) return false;
|
||||||
|
out_ = page_->GetRowBatch(&tmp_ptr_, base_rowid_);
|
||||||
|
base_rowid_ += out_.size;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
virtual const RowBatch &Value(void) const {
|
||||||
|
return out_;
|
||||||
|
}
|
||||||
|
/*! \brief load and initialize the iterator with fi */
|
||||||
|
inline void Load(const utils::FileStream &fi) {
|
||||||
|
itr.get_factory().SetFile(fi);
|
||||||
|
itr.Init();
|
||||||
|
this->BeforeFirst();
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief save a row iterator to output stream, in row iterator format
|
||||||
|
*/
|
||||||
|
inline static void Save(utils::IIterator<RowBatch> *iter,
|
||||||
|
utils::IStream &fo) {
|
||||||
|
RowBatchPage page(kPageSize);
|
||||||
|
iter->BeforeFirst();
|
||||||
|
while (iter->Next()) {
|
||||||
|
const RowBatch &batch = iter->Value();
|
||||||
|
for (size_t i = 0; i < batch.size; ++i) {
|
||||||
|
if (!page.PushRow(batch[i])) {
|
||||||
|
page.Save(fo);
|
||||||
|
page.Clear();
|
||||||
|
utils::Check(page.PushRow(batch[i]), "row is too big");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (page.Size() != 0) page.Save(fo);
|
||||||
|
}
|
||||||
|
/*! \brief page size 64 MB */
|
||||||
|
static const size_t kPageSize = 64 << 18;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// base row id
|
||||||
|
size_t base_rowid_;
|
||||||
|
// temporal ptr
|
||||||
|
std::vector<size_t> tmp_ptr_;
|
||||||
|
// output data
|
||||||
|
RowBatch out_;
|
||||||
|
// page pointer type
|
||||||
|
typedef RowBatchPage* PagePtr;
|
||||||
|
// loader factory for page
|
||||||
|
struct Factory {
|
||||||
|
public:
|
||||||
|
long file_begin_;
|
||||||
|
utils::FileStream fi;
|
||||||
|
Factory(void) {}
|
||||||
|
inline void SetFile(const utils::FileStream &fi) {
|
||||||
|
this->fi = fi;
|
||||||
|
file_begin_ = this->fi.Tell();
|
||||||
|
}
|
||||||
|
inline bool Init(void) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
inline void SetParam(const char *name, const char *val) {}
|
||||||
|
inline bool LoadNext(PagePtr &val) {
|
||||||
|
return val->Load(fi);
|
||||||
|
}
|
||||||
|
inline PagePtr Create(void) {
|
||||||
|
PagePtr a = new RowBatchPage(kPageSize);
|
||||||
|
return a;
|
||||||
|
}
|
||||||
|
inline void FreeSpace(PagePtr &a) {
|
||||||
|
delete a;
|
||||||
|
}
|
||||||
|
inline void Destroy(void) {
|
||||||
|
fi.Close();
|
||||||
|
}
|
||||||
|
inline void BeforeFirst(void) {
|
||||||
|
fi.Seek(file_begin_);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
protected:
|
||||||
|
PagePtr page_;
|
||||||
|
utils::ThreadBuffer<PagePtr, Factory> itr;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*! \brief data matrix using page */
|
||||||
|
template<int TKMagic>
|
||||||
|
class DMatrixPageBase : public DataMatrix {
|
||||||
|
public:
|
||||||
|
DMatrixPageBase(void) : DataMatrix(kMagic) {
|
||||||
|
iter_ = new ThreadRowPageIterator();
|
||||||
|
}
|
||||||
|
// virtual destructor
|
||||||
|
virtual ~DMatrixPageBase(void) {
|
||||||
|
// do not delete row iterator, since it is owned by fmat
|
||||||
|
// to be cleaned up in a more clear way
|
||||||
|
}
|
||||||
|
/*! \brief load and initialize the iterator with fi */
|
||||||
|
inline void Load(utils::FileStream &fi,
|
||||||
|
bool silent = false,
|
||||||
|
const char *fname = NULL) {
|
||||||
|
int tmagic;
|
||||||
|
utils::Check(fi.Read(&tmagic, sizeof(tmagic)) != 0, "invalid input file format");
|
||||||
|
utils::Check(tmagic == magic, "invalid format,magic number mismatch");
|
||||||
|
this->info.LoadBinary(fi);
|
||||||
|
iter_->Load(fi);
|
||||||
|
if (!silent) {
|
||||||
|
utils::Printf("DMatrixPage: %lux%lu matrix is loaded",
|
||||||
|
static_cast<unsigned long>(info.num_row()),
|
||||||
|
static_cast<unsigned long>(info.num_col()));
|
||||||
|
if (fname != NULL) {
|
||||||
|
utils::Printf(" from %s\n", fname);
|
||||||
|
} else {
|
||||||
|
utils::Printf("\n");
|
||||||
|
}
|
||||||
|
if (info.group_ptr.size() != 0) {
|
||||||
|
utils::Printf("data contains %u groups\n", (unsigned)info.group_ptr.size() - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*! \brief save a DataMatrix as DMatrixPage*/
|
||||||
|
inline static void Save(const char* fname, const DataMatrix &mat, bool silent) {
|
||||||
|
utils::FileStream fs(utils::FopenCheck(fname, "wb"));
|
||||||
|
int magic = kMagic;
|
||||||
|
fs.Write(&magic, sizeof(magic));
|
||||||
|
mat.info.SaveBinary(fs);
|
||||||
|
ThreadRowPageIterator::Save(mat.fmat()->RowIterator(), fs);
|
||||||
|
fs.Close();
|
||||||
|
if (!silent) {
|
||||||
|
utils::Printf("DMatrixPage: %lux%lu is saved to %s\n",
|
||||||
|
static_cast<unsigned long>(mat.info.num_row()),
|
||||||
|
static_cast<unsigned long>(mat.info.num_col()), fname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*! \brief magic number used to identify DMatrix */
|
||||||
|
static const int kMagic = TKMagic;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/*! \brief row iterator */
|
||||||
|
ThreadRowPageIterator *iter_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DMatrixPage : public DMatrixPageBase<0xffffab02> {
|
||||||
|
public:
|
||||||
|
DMatrixPage(void) {
|
||||||
|
fmat_ = new FMatrixS(iter_);
|
||||||
|
}
|
||||||
|
virtual ~DMatrixPage(void) {
|
||||||
|
delete fmat_;
|
||||||
|
}
|
||||||
|
virtual IFMatrix *fmat(void) const {
|
||||||
|
return fmat_;
|
||||||
|
}
|
||||||
|
/*! \brief the real fmatrix */
|
||||||
|
IFMatrix *fmat_;
|
||||||
|
};
|
||||||
|
} // namespace io
|
||||||
|
} // namespace xgboost
|
||||||
|
#endif // XGBOOST_IO_PAGE_ROW_ITER_INL_HPP_
|
||||||
374
src/io/page_fmatrix-inl.hpp
Normal file
374
src/io/page_fmatrix-inl.hpp
Normal file
@ -0,0 +1,374 @@
|
|||||||
|
#ifndef XGBOOST_IO_PAGE_FMATRIX_INL_HPP_
|
||||||
|
#define XGBOOST_IO_PAGE_FMATRIX_INL_HPP_
|
||||||
|
/*!
|
||||||
|
* \file page_fmatrix-inl.hpp
|
||||||
|
* sparse page manager for fmatrix
|
||||||
|
* \author Tianqi Chen
|
||||||
|
*/
|
||||||
|
#include <vector>
|
||||||
|
#include <string>
|
||||||
|
#include <algorithm>
|
||||||
|
#include "../data.h"
|
||||||
|
#include "../utils/iterator.h"
|
||||||
|
#include "../utils/io.h"
|
||||||
|
#include "../utils/matrix_csr.h"
|
||||||
|
#include "../utils/thread_buffer.h"
|
||||||
|
namespace xgboost {
|
||||||
|
namespace io {
|
||||||
|
class CSCMatrixManager {
|
||||||
|
public:
|
||||||
|
/*! \brief in memory page */
|
||||||
|
struct Page {
|
||||||
|
public:
|
||||||
|
/*! \brief initialize the page */
|
||||||
|
explicit Page(size_t size) {
|
||||||
|
buffer.resize(size);
|
||||||
|
col_index.reserve(10);
|
||||||
|
col_data.reserve(10);
|
||||||
|
}
|
||||||
|
/*! \brief clear the page */
|
||||||
|
inline void Clear(void) {
|
||||||
|
num_entry = 0;
|
||||||
|
col_index.clear();
|
||||||
|
col_data.clear();
|
||||||
|
}
|
||||||
|
/*! \brief number of used entries */
|
||||||
|
size_t num_entry;
|
||||||
|
/*! \brief column index */
|
||||||
|
std::vector<bst_uint> col_index;
|
||||||
|
/*! \brief column data */
|
||||||
|
std::vector<ColBatch::Inst> col_data;
|
||||||
|
/*! \brief number of free entries */
|
||||||
|
inline size_t NumFreeEntry(void) const {
|
||||||
|
return buffer.size() - num_entry;
|
||||||
|
}
|
||||||
|
inline ColBatch::Entry* AllocEntry(size_t len) {
|
||||||
|
ColBatch::Entry *p_data = &buffer[0] + num_entry;
|
||||||
|
num_entry += len;
|
||||||
|
return p_data;
|
||||||
|
}
|
||||||
|
/*! \brief get underlying batch */
|
||||||
|
inline ColBatch GetBatch(void) const {
|
||||||
|
ColBatch batch;
|
||||||
|
batch.size = col_index.size();
|
||||||
|
batch.col_index = BeginPtr(col_index);
|
||||||
|
batch.col_data = BeginPtr(col_data);
|
||||||
|
return batch;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
/*! \brief buffer space, not to be changed since ready */
|
||||||
|
std::vector<ColBatch::Entry> buffer;
|
||||||
|
};
|
||||||
|
/*! \brief define type of page pointer */
|
||||||
|
typedef Page *PagePtr;
|
||||||
|
// constructor
|
||||||
|
CSCMatrixManager(void) {
|
||||||
|
fi_ = NULL;
|
||||||
|
}
|
||||||
|
/*! \brief get column pointer */
|
||||||
|
inline const std::vector<size_t> &col_ptr(void) const {
|
||||||
|
return col_ptr_;
|
||||||
|
}
|
||||||
|
inline void SetParam(const char *name, const char *val) {
|
||||||
|
}
|
||||||
|
inline PagePtr Create(void) {
|
||||||
|
return new Page(page_size_);
|
||||||
|
}
|
||||||
|
inline void FreeSpace(PagePtr &a) {
|
||||||
|
delete a;
|
||||||
|
}
|
||||||
|
inline void Destroy(void) {
|
||||||
|
}
|
||||||
|
inline void BeforeFirst(void) {
|
||||||
|
col_index_ = col_todo_;
|
||||||
|
read_top_ = 0;
|
||||||
|
}
|
||||||
|
inline bool LoadNext(PagePtr &val) {
|
||||||
|
val->Clear();
|
||||||
|
if (read_top_ >= col_index_.size()) return false;
|
||||||
|
while (read_top_ < col_index_.size()) {
|
||||||
|
if (!this->TryFill(col_index_[read_top_], val)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
++read_top_;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
inline bool Init(void) {
|
||||||
|
this->BeforeFirst();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
inline void Setup(utils::ISeekStream *fi, double page_ratio) {
|
||||||
|
fi_ = fi;
|
||||||
|
fi_->Read(&begin_meta_ , sizeof(begin_meta_));
|
||||||
|
begin_data_ = static_cast<size_t>(fi->Tell());
|
||||||
|
fi_->Seek(begin_meta_);
|
||||||
|
fi_->Read(&col_ptr_);
|
||||||
|
size_t psmax = 0;
|
||||||
|
for (size_t i = 0; i < col_ptr_.size() - 1; ++i) {
|
||||||
|
psmax = std::max(psmax, col_ptr_[i+1] - col_ptr_[i]);
|
||||||
|
}
|
||||||
|
utils::Check(page_ratio >= 1.0f, "col_page_ratio must be at least 1");
|
||||||
|
page_size_ = std::max(static_cast<size_t>(psmax * page_ratio), psmax);
|
||||||
|
}
|
||||||
|
inline void SetColSet(const std::vector<bst_uint> &cset, bool setall) {
|
||||||
|
if (!setall) {
|
||||||
|
col_todo_.resize(cset.size());
|
||||||
|
for (size_t i = 0; i < cset.size(); ++i) {
|
||||||
|
col_todo_[i] = cset[i];
|
||||||
|
utils::Assert(col_todo_[i] < static_cast<bst_uint>(col_ptr_.size() - 1),
|
||||||
|
"CSCMatrixManager: column index exceed bound");
|
||||||
|
}
|
||||||
|
std::sort(col_todo_.begin(), col_todo_.end());
|
||||||
|
} else {
|
||||||
|
col_todo_.resize(col_ptr_.size()-1);
|
||||||
|
for (size_t i = 0; i < col_todo_.size(); ++i) {
|
||||||
|
col_todo_[i] = static_cast<bst_uint>(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
/*! \brief fill a page with */
|
||||||
|
inline bool TryFill(size_t cidx, Page *p_page) {
|
||||||
|
size_t len = col_ptr_[cidx+1] - col_ptr_[cidx];
|
||||||
|
if (p_page->NumFreeEntry() < len) return false;
|
||||||
|
ColBatch::Entry *p_data = p_page->AllocEntry(len);
|
||||||
|
fi_->Seek(col_ptr_[cidx] * sizeof(ColBatch::Entry) + begin_data_);
|
||||||
|
utils::Check(fi_->Read(p_data, sizeof(ColBatch::Entry) * len) != 0,
|
||||||
|
"invalid column buffer format");
|
||||||
|
p_page->col_data.push_back(ColBatch::Inst(p_data, len));
|
||||||
|
p_page->col_index.push_back(cidx);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// the following are in memory auxiliary data structure
|
||||||
|
/*! \brief top of reader position */
|
||||||
|
size_t read_top_;
|
||||||
|
/*! \brief size of page */
|
||||||
|
size_t page_size_;
|
||||||
|
/*! \brief column index to be loaded */
|
||||||
|
std::vector<bst_uint> col_index_;
|
||||||
|
/*! \brief column index to be after calling before first */
|
||||||
|
std::vector<bst_uint> col_todo_;
|
||||||
|
// the following are input content
|
||||||
|
/*! \brief beginning position of data content */
|
||||||
|
size_t begin_data_;
|
||||||
|
/*! \brief size of data content */
|
||||||
|
size_t begin_meta_;
|
||||||
|
/*! \brief input stream */
|
||||||
|
utils::ISeekStream *fi_;
|
||||||
|
/*! \brief column pointer of CSC format */
|
||||||
|
std::vector<size_t> col_ptr_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class ThreadColPageIterator : public utils::IIterator<ColBatch> {
|
||||||
|
public:
|
||||||
|
explicit ThreadColPageIterator(utils::ISeekStream *fi,
|
||||||
|
float page_ratio, bool silent) {
|
||||||
|
itr_.SetParam("buffer_size", "2");
|
||||||
|
itr_.get_factory().Setup(fi, page_ratio);
|
||||||
|
itr_.Init();
|
||||||
|
if (!silent) {
|
||||||
|
utils::Printf("ThreadColPageIterator: finish initialzing, %u columns\n",
|
||||||
|
static_cast<unsigned>(col_ptr().size() - 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
virtual ~ThreadColPageIterator(void) {
|
||||||
|
}
|
||||||
|
virtual void BeforeFirst(void) {
|
||||||
|
itr_.BeforeFirst();
|
||||||
|
}
|
||||||
|
virtual bool Next(void) {
|
||||||
|
// page to be loaded
|
||||||
|
CSCMatrixManager::PagePtr page;
|
||||||
|
if (!itr_.Next(page)) return false;
|
||||||
|
out_ = page->GetBatch();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
virtual const ColBatch &Value(void) const {
|
||||||
|
return out_;
|
||||||
|
}
|
||||||
|
inline const std::vector<size_t> &col_ptr(void) const {
|
||||||
|
return itr_.get_factory().col_ptr();
|
||||||
|
}
|
||||||
|
inline void SetColSet(const std::vector<bst_uint> &cset,
|
||||||
|
bool setall = false) {
|
||||||
|
itr_.get_factory().SetColSet(cset, setall);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// output data
|
||||||
|
ColBatch out_;
|
||||||
|
// internal iterator
|
||||||
|
utils::ThreadBuffer<CSCMatrixManager::PagePtr, CSCMatrixManager> itr_;
|
||||||
|
};
|
||||||
|
/*!
|
||||||
|
* \brief sparse matrix that support column access
|
||||||
|
*/
|
||||||
|
class FMatrixPage : public IFMatrix {
|
||||||
|
public:
|
||||||
|
/*! \brief constructor */
|
||||||
|
FMatrixPage(utils::IIterator<RowBatch> *iter, std::string fname_buffer)
|
||||||
|
: fname_cbuffer_(fname_buffer) {
|
||||||
|
this->row_iter_ = iter;
|
||||||
|
this->col_iter_ = NULL;
|
||||||
|
this->fi_ = NULL;
|
||||||
|
}
|
||||||
|
// destructor
|
||||||
|
virtual ~FMatrixPage(void) {
|
||||||
|
if (row_iter_ != NULL) delete row_iter_;
|
||||||
|
if (col_iter_ != NULL) delete col_iter_;
|
||||||
|
if (fi_ != NULL) {
|
||||||
|
fi_->Close(); delete fi_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*! \return whether column access is enabled */
|
||||||
|
virtual bool HaveColAccess(void) const {
|
||||||
|
return col_iter_ != NULL;
|
||||||
|
}
|
||||||
|
/*! \brief get number of colmuns */
|
||||||
|
virtual size_t NumCol(void) const {
|
||||||
|
utils::Check(this->HaveColAccess(), "NumCol:need column access");
|
||||||
|
return col_iter_->col_ptr().size() - 1;
|
||||||
|
}
|
||||||
|
/*! \brief get number of buffered rows */
|
||||||
|
virtual const std::vector<bst_uint> &buffered_rowset(void) const {
|
||||||
|
return buffered_rowset_;
|
||||||
|
}
|
||||||
|
/*! \brief get column size */
|
||||||
|
virtual size_t GetColSize(size_t cidx) const {
|
||||||
|
const std::vector<size_t> &col_ptr = col_iter_->col_ptr();
|
||||||
|
return col_ptr[cidx+1] - col_ptr[cidx];
|
||||||
|
}
|
||||||
|
/*! \brief get column density */
|
||||||
|
virtual float GetColDensity(size_t cidx) const {
|
||||||
|
const std::vector<size_t> &col_ptr = col_iter_->col_ptr();
|
||||||
|
size_t nmiss = buffered_rowset_.size() - (col_ptr[cidx+1] - col_ptr[cidx]);
|
||||||
|
return 1.0f - (static_cast<float>(nmiss)) / buffered_rowset_.size();
|
||||||
|
}
|
||||||
|
virtual void InitColAccess(float pkeep = 1.0f) {
|
||||||
|
if (this->HaveColAccess()) return;
|
||||||
|
this->InitColData(pkeep, fname_cbuffer_.c_str(),
|
||||||
|
64 << 20, 5);
|
||||||
|
utils::Check(this->LoadColData(), "fail to read in column data");
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief get the row iterator associated with FMatrix
|
||||||
|
*/
|
||||||
|
virtual utils::IIterator<RowBatch>* RowIterator(void) {
|
||||||
|
row_iter_->BeforeFirst();
|
||||||
|
return row_iter_;
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief get the column based iterator
|
||||||
|
*/
|
||||||
|
virtual utils::IIterator<ColBatch>* ColIterator(void) {
|
||||||
|
std::vector<bst_uint> cset;
|
||||||
|
col_iter_->SetColSet(cset, true);
|
||||||
|
col_iter_->BeforeFirst();
|
||||||
|
return col_iter_;
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief colmun based iterator
|
||||||
|
*/
|
||||||
|
virtual utils::IIterator<ColBatch> *ColIterator(const std::vector<bst_uint> &fset) {
|
||||||
|
col_iter_->SetColSet(fset, false);
|
||||||
|
col_iter_->BeforeFirst();
|
||||||
|
return col_iter_;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/*!
|
||||||
|
* \brief try load column data from file
|
||||||
|
*/
|
||||||
|
inline bool LoadColData(void) {
|
||||||
|
FILE *fp = fopen64(fname_cbuffer_.c_str(), "rb");
|
||||||
|
if (fp == NULL) return false;
|
||||||
|
fi_ = new utils::FileStream(fp);
|
||||||
|
static_cast<utils::IStream*>(fi_)->Read(&buffered_rowset_);
|
||||||
|
col_iter_ = new ThreadColPageIterator(fi_, 2.0f, false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief intialize column data
|
||||||
|
* \param pkeep probability to keep a row
|
||||||
|
*/
|
||||||
|
inline void InitColData(float pkeep, const char *fname,
|
||||||
|
size_t buffer_size, size_t col_step) {
|
||||||
|
buffered_rowset_.clear();
|
||||||
|
utils::FileStream fo(utils::FopenCheck(fname, "wb+"));
|
||||||
|
// use 64M buffer
|
||||||
|
utils::SparseCSRFileBuilder<ColBatch::Entry> builder(&fo, buffer_size);
|
||||||
|
// start working
|
||||||
|
row_iter_->BeforeFirst();
|
||||||
|
while (row_iter_->Next()) {
|
||||||
|
const RowBatch &batch = row_iter_->Value();
|
||||||
|
for (size_t i = 0; i < batch.size; ++i) {
|
||||||
|
if (pkeep == 1.0f || random::SampleBinary(pkeep)) {
|
||||||
|
buffered_rowset_.push_back(static_cast<bst_uint>(batch.base_rowid+i));
|
||||||
|
RowBatch::Inst inst = batch[i];
|
||||||
|
for (bst_uint j = 0; j < inst.length; ++j) {
|
||||||
|
builder.AddBudget(inst[j].index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// write buffered rowset
|
||||||
|
static_cast<utils::IStream*>(&fo)->Write(buffered_rowset_);
|
||||||
|
builder.InitStorage();
|
||||||
|
row_iter_->BeforeFirst();
|
||||||
|
size_t ktop = 0;
|
||||||
|
while (row_iter_->Next()) {
|
||||||
|
const RowBatch &batch = row_iter_->Value();
|
||||||
|
for (size_t i = 0; i < batch.size; ++i) {
|
||||||
|
if (ktop < buffered_rowset_.size() &&
|
||||||
|
buffered_rowset_[ktop] == batch.base_rowid + i) {
|
||||||
|
++ktop;
|
||||||
|
RowBatch::Inst inst = batch[i];
|
||||||
|
for (bst_uint j = 0; j < inst.length; ++j) {
|
||||||
|
builder.PushElem(inst[j].index,
|
||||||
|
ColBatch::Entry((bst_uint)(batch.base_rowid+i),
|
||||||
|
inst[j].fvalue));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder.Finalize();
|
||||||
|
builder.SortRows(ColBatch::Entry::CmpValue, col_step);
|
||||||
|
fo.Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// row iterator
|
||||||
|
utils::IIterator<RowBatch> *row_iter_;
|
||||||
|
// column iterator
|
||||||
|
ThreadColPageIterator *col_iter_;
|
||||||
|
// file pointer to data
|
||||||
|
utils::FileStream *fi_;
|
||||||
|
// file name of column buffer
|
||||||
|
std::string fname_cbuffer_;
|
||||||
|
/*! \brief list of row index that are buffered */
|
||||||
|
std::vector<bst_uint> buffered_rowset_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DMatrixColPage : public DMatrixPageBase<0xffffab03> {
|
||||||
|
public:
|
||||||
|
explicit DMatrixColPage(const char *fname) {
|
||||||
|
std::string fext = fname;
|
||||||
|
fext += ".col";
|
||||||
|
fmat_ = new FMatrixPage(iter_, fext.c_str());
|
||||||
|
}
|
||||||
|
virtual ~DMatrixColPage(void) {
|
||||||
|
delete fmat_;
|
||||||
|
}
|
||||||
|
virtual IFMatrix *fmat(void) const {
|
||||||
|
return fmat_;
|
||||||
|
}
|
||||||
|
/*! \brief the real fmatrix */
|
||||||
|
IFMatrix *fmat_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace io
|
||||||
|
} // namespace xgboost
|
||||||
|
#endif // XGBOOST_IO_PAGE_FMATRIX_INL_HPP_
|
||||||
@ -44,8 +44,8 @@ class DMatrixSimple : public DataMatrix {
|
|||||||
}
|
}
|
||||||
/*! \brief copy content data from source matrix */
|
/*! \brief copy content data from source matrix */
|
||||||
inline void CopyFrom(const DataMatrix &src) {
|
inline void CopyFrom(const DataMatrix &src) {
|
||||||
this->info = src.info;
|
|
||||||
this->Clear();
|
this->Clear();
|
||||||
|
this->info = src.info;
|
||||||
// clone data content in thos matrix
|
// clone data content in thos matrix
|
||||||
utils::IIterator<RowBatch> *iter = src.fmat()->RowIterator();
|
utils::IIterator<RowBatch> *iter = src.fmat()->RowIterator();
|
||||||
iter->BeforeFirst();
|
iter->BeforeFirst();
|
||||||
|
|||||||
@ -38,6 +38,8 @@ struct TrainParam{
|
|||||||
float opt_dense_col;
|
float opt_dense_col;
|
||||||
// leaf vector size
|
// leaf vector size
|
||||||
int size_leaf_vector;
|
int size_leaf_vector;
|
||||||
|
// option for parallelization
|
||||||
|
int parallel_option;
|
||||||
// number of threads to be used for tree construction,
|
// number of threads to be used for tree construction,
|
||||||
// if OpenMP is enabled, if equals 0, use system default
|
// if OpenMP is enabled, if equals 0, use system default
|
||||||
int nthread;
|
int nthread;
|
||||||
@ -55,6 +57,7 @@ struct TrainParam{
|
|||||||
opt_dense_col = 1.0f;
|
opt_dense_col = 1.0f;
|
||||||
nthread = 0;
|
nthread = 0;
|
||||||
size_leaf_vector = 0;
|
size_leaf_vector = 0;
|
||||||
|
parallel_option = 0;
|
||||||
}
|
}
|
||||||
/*!
|
/*!
|
||||||
* \brief set parameters from outside
|
* \brief set parameters from outside
|
||||||
@ -80,6 +83,7 @@ struct TrainParam{
|
|||||||
if (!strcmp(name, "size_leaf_vector")) size_leaf_vector = atoi(val);
|
if (!strcmp(name, "size_leaf_vector")) size_leaf_vector = atoi(val);
|
||||||
if (!strcmp(name, "max_depth")) max_depth = atoi(val);
|
if (!strcmp(name, "max_depth")) max_depth = atoi(val);
|
||||||
if (!strcmp(name, "nthread")) nthread = atoi(val);
|
if (!strcmp(name, "nthread")) nthread = atoi(val);
|
||||||
|
if (!strcmp(name, "parallel_option")) parallel_option = atoi(val);
|
||||||
if (!strcmp(name, "default_direction")) {
|
if (!strcmp(name, "default_direction")) {
|
||||||
if (!strcmp(val, "learn")) default_direction = 0;
|
if (!strcmp(val, "learn")) default_direction = 0;
|
||||||
if (!strcmp(val, "left")) default_direction = 1;
|
if (!strcmp(val, "left")) default_direction = 1;
|
||||||
|
|||||||
@ -45,15 +45,19 @@ class ColMaker: public IUpdater {
|
|||||||
// data structure
|
// data structure
|
||||||
/*! \brief per thread x per node entry to store tmp data */
|
/*! \brief per thread x per node entry to store tmp data */
|
||||||
struct ThreadEntry {
|
struct ThreadEntry {
|
||||||
/*! \brief statistics of data*/
|
/*! \brief statistics of data */
|
||||||
TStats stats;
|
TStats stats;
|
||||||
|
/*! \brief extra statistics of data */
|
||||||
|
TStats stats_extra;
|
||||||
/*! \brief last feature value scanned */
|
/*! \brief last feature value scanned */
|
||||||
float last_fvalue;
|
float last_fvalue;
|
||||||
|
/*! \brief first feature value scanned */
|
||||||
|
float first_fvalue;
|
||||||
/*! \brief current best solution */
|
/*! \brief current best solution */
|
||||||
SplitEntry best;
|
SplitEntry best;
|
||||||
// constructor
|
// constructor
|
||||||
explicit ThreadEntry(const TrainParam ¶m)
|
explicit ThreadEntry(const TrainParam ¶m)
|
||||||
: stats(param) {
|
: stats(param), stats_extra(param) {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
struct NodeEntry {
|
struct NodeEntry {
|
||||||
@ -220,6 +224,137 @@ class ColMaker: public IUpdater {
|
|||||||
// use new nodes for qexpand
|
// use new nodes for qexpand
|
||||||
qexpand = newnodes;
|
qexpand = newnodes;
|
||||||
}
|
}
|
||||||
|
// parallel find the best split of current fid
|
||||||
|
// this function does not support nested functions
|
||||||
|
inline void ParallelFindSplit(const ColBatch::Inst &col,
|
||||||
|
bst_uint fid,
|
||||||
|
const IFMatrix &fmat,
|
||||||
|
const std::vector<bst_gpair> &gpair,
|
||||||
|
const BoosterInfo &info) {
|
||||||
|
bool need_forward = param.need_forward_search(fmat.GetColDensity(fid));
|
||||||
|
bool need_backward = param.need_backward_search(fmat.GetColDensity(fid));
|
||||||
|
const std::vector<int> &qexpand = qexpand_;
|
||||||
|
int nthread;
|
||||||
|
#pragma omp parallel
|
||||||
|
{
|
||||||
|
const int tid = omp_get_thread_num();
|
||||||
|
std::vector<ThreadEntry> &temp = stemp[tid];
|
||||||
|
// cleanup temp statistics
|
||||||
|
for (size_t j = 0; j < qexpand.size(); ++j) {
|
||||||
|
temp[qexpand[j]].stats.Clear();
|
||||||
|
}
|
||||||
|
nthread = omp_get_num_threads();
|
||||||
|
bst_uint step = (col.length + nthread - 1) / nthread;
|
||||||
|
bst_uint end = std::min(col.length, step * (tid + 1));
|
||||||
|
for (bst_uint i = tid * step; i < end; ++i) {
|
||||||
|
const bst_uint ridx = col[i].index;
|
||||||
|
const int nid = position[ridx];
|
||||||
|
if (nid < 0) continue;
|
||||||
|
const float fvalue = col[i].fvalue;
|
||||||
|
if (temp[nid].stats.Empty()) {
|
||||||
|
temp[nid].first_fvalue = fvalue;
|
||||||
|
}
|
||||||
|
temp[nid].stats.Add(gpair, info, ridx);
|
||||||
|
temp[nid].last_fvalue = fvalue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// start collecting the partial sum statistics
|
||||||
|
bst_omp_uint nnode = static_cast<bst_omp_uint>(qexpand.size());
|
||||||
|
#pragma omp parallel for schedule(static)
|
||||||
|
for (bst_omp_uint j = 0; j < nnode; ++j) {
|
||||||
|
const int nid = qexpand[j];
|
||||||
|
TStats sum(param), tmp(param), c(param);
|
||||||
|
for (int tid = 0; tid < nthread; ++tid) {
|
||||||
|
tmp = stemp[tid][nid].stats;
|
||||||
|
stemp[tid][nid].stats = sum;
|
||||||
|
sum.Add(tmp);
|
||||||
|
if (tid != 0) {
|
||||||
|
std::swap(stemp[tid - 1][nid].last_fvalue, stemp[tid][nid].first_fvalue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int tid = 0; tid < nthread; ++tid) {
|
||||||
|
stemp[tid][nid].stats_extra = sum;
|
||||||
|
ThreadEntry &e = stemp[tid][nid];
|
||||||
|
float fsplit;
|
||||||
|
if (tid != 0) {
|
||||||
|
if(fabsf(stemp[tid - 1][nid].last_fvalue - e.first_fvalue) > rt_2eps) {
|
||||||
|
fsplit = (stemp[tid - 1][nid].last_fvalue - e.first_fvalue) * 0.5f;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fsplit = e.first_fvalue - rt_eps;
|
||||||
|
}
|
||||||
|
if (need_forward && tid != 0) {
|
||||||
|
c.SetSubstract(snode[nid].stats, e.stats);
|
||||||
|
if (c.sum_hess >= param.min_child_weight && e.stats.sum_hess >= param.min_child_weight) {
|
||||||
|
bst_float loss_chg = static_cast<bst_float>(e.stats.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
|
||||||
|
e.best.Update(loss_chg, fid, fsplit, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (need_backward) {
|
||||||
|
tmp.SetSubstract(sum, e.stats);
|
||||||
|
c.SetSubstract(snode[nid].stats, tmp);
|
||||||
|
if (c.sum_hess >= param.min_child_weight && tmp.sum_hess >= param.min_child_weight) {
|
||||||
|
bst_float loss_chg = static_cast<bst_float>(tmp.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
|
||||||
|
e.best.Update(loss_chg, fid, fsplit, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (need_backward) {
|
||||||
|
tmp = sum;
|
||||||
|
ThreadEntry &e = stemp[nthread-1][nid];
|
||||||
|
c.SetSubstract(snode[nid].stats, tmp);
|
||||||
|
if (c.sum_hess >= param.min_child_weight && tmp.sum_hess >= param.min_child_weight) {
|
||||||
|
bst_float loss_chg = static_cast<bst_float>(tmp.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
|
||||||
|
e.best.Update(loss_chg, fid, e.last_fvalue + rt_eps, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// rescan, generate candidate split
|
||||||
|
#pragma omp parallel
|
||||||
|
{
|
||||||
|
TStats c(param), cright(param);
|
||||||
|
const int tid = omp_get_thread_num();
|
||||||
|
std::vector<ThreadEntry> &temp = stemp[tid];
|
||||||
|
nthread = static_cast<bst_uint>(omp_get_num_threads());
|
||||||
|
bst_uint step = (col.length + nthread - 1) / nthread;
|
||||||
|
bst_uint end = std::min(col.length, step * (tid + 1));
|
||||||
|
for (bst_uint i = tid * step; i < end; ++i) {
|
||||||
|
const bst_uint ridx = col[i].index;
|
||||||
|
const int nid = position[ridx];
|
||||||
|
if (nid < 0) continue;
|
||||||
|
const float fvalue = col[i].fvalue;
|
||||||
|
// get the statistics of nid
|
||||||
|
ThreadEntry &e = temp[nid];
|
||||||
|
if (e.stats.Empty()) {
|
||||||
|
e.stats.Add(gpair, info, ridx);
|
||||||
|
e.first_fvalue = fvalue;
|
||||||
|
} else {
|
||||||
|
// forward default right
|
||||||
|
if (fabsf(fvalue - e.first_fvalue) > rt_2eps){
|
||||||
|
if (need_forward) {
|
||||||
|
c.SetSubstract(snode[nid].stats, e.stats);
|
||||||
|
if (c.sum_hess >= param.min_child_weight && e.stats.sum_hess >= param.min_child_weight) {
|
||||||
|
bst_float loss_chg = static_cast<bst_float>(e.stats.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
|
||||||
|
e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (need_backward) {
|
||||||
|
cright.SetSubstract(e.stats_extra, e.stats);
|
||||||
|
c.SetSubstract(snode[nid].stats, cright);
|
||||||
|
if (c.sum_hess >= param.min_child_weight && cright.sum_hess >= param.min_child_weight) {
|
||||||
|
bst_float loss_chg = static_cast<bst_float>(cright.CalcGain(param) + c.CalcGain(param) - snode[nid].root_gain);
|
||||||
|
e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.stats.Add(gpair, info, ridx);
|
||||||
|
e.first_fvalue = fvalue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// enumerate the split values of specific feature
|
// enumerate the split values of specific feature
|
||||||
inline void EnumerateSplit(const ColBatch::Entry *begin,
|
inline void EnumerateSplit(const ColBatch::Entry *begin,
|
||||||
const ColBatch::Entry *end,
|
const ColBatch::Entry *end,
|
||||||
@ -273,6 +408,38 @@ class ColMaker: public IUpdater {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// update the solution candidate
|
||||||
|
virtual void UpdateSolution(const ColBatch &batch,
|
||||||
|
const std::vector<bst_gpair> &gpair,
|
||||||
|
const IFMatrix &fmat,
|
||||||
|
const BoosterInfo &info) {
|
||||||
|
// start enumeration
|
||||||
|
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
|
||||||
|
#if defined(_OPENMP)
|
||||||
|
const int batch_size = std::max(static_cast<int>(nsize / this->nthread / 32), 1);
|
||||||
|
#endif
|
||||||
|
if (param.parallel_option == 0) {
|
||||||
|
#pragma omp parallel for schedule(dynamic, batch_size)
|
||||||
|
for (bst_omp_uint i = 0; i < nsize; ++i) {
|
||||||
|
const bst_uint fid = batch.col_index[i];
|
||||||
|
const int tid = omp_get_thread_num();
|
||||||
|
const ColBatch::Inst c = batch[i];
|
||||||
|
if (param.need_forward_search(fmat.GetColDensity(fid))) {
|
||||||
|
this->EnumerateSplit(c.data, c.data + c.length, +1,
|
||||||
|
fid, gpair, info, stemp[tid]);
|
||||||
|
}
|
||||||
|
if (param.need_backward_search(fmat.GetColDensity(fid))) {
|
||||||
|
this->EnumerateSplit(c.data + c.length - 1, c.data - 1, -1,
|
||||||
|
fid, gpair, info, stemp[tid]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (bst_omp_uint i = 0; i < nsize; ++i) {
|
||||||
|
this->ParallelFindSplit(batch[i], batch.col_index[i],
|
||||||
|
fmat, gpair, info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// find splits at current level, do split per level
|
// find splits at current level, do split per level
|
||||||
inline void FindSplit(int depth,
|
inline void FindSplit(int depth,
|
||||||
const std::vector<int> &qexpand,
|
const std::vector<int> &qexpand,
|
||||||
@ -289,26 +456,7 @@ class ColMaker: public IUpdater {
|
|||||||
}
|
}
|
||||||
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator(feat_set);
|
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator(feat_set);
|
||||||
while (iter->Next()) {
|
while (iter->Next()) {
|
||||||
const ColBatch &batch = iter->Value();
|
this->UpdateSolution(iter->Value(), gpair, *p_fmat, info);
|
||||||
// start enumeration
|
|
||||||
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
|
|
||||||
#if defined(_OPENMP)
|
|
||||||
const int batch_size = std::max(static_cast<int>(nsize / this->nthread / 32), 1);
|
|
||||||
#endif
|
|
||||||
#pragma omp parallel for schedule(dynamic, batch_size)
|
|
||||||
for (bst_omp_uint i = 0; i < nsize; ++i) {
|
|
||||||
const bst_uint fid = batch.col_index[i];
|
|
||||||
const int tid = omp_get_thread_num();
|
|
||||||
const ColBatch::Inst c = batch[i];
|
|
||||||
if (param.need_forward_search(p_fmat->GetColDensity(fid))) {
|
|
||||||
this->EnumerateSplit(c.data, c.data + c.length, +1,
|
|
||||||
fid, gpair, info, stemp[tid]);
|
|
||||||
}
|
|
||||||
if (param.need_backward_search(p_fmat->GetColDensity(fid))) {
|
|
||||||
this->EnumerateSplit(c.data + c.length - 1, c.data - 1, -1,
|
|
||||||
fid, gpair, info, stemp[tid]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// after this each thread's stemp will get the best candidates, aggregate results
|
// after this each thread's stemp will get the best candidates, aggregate results
|
||||||
for (size_t i = 0; i < qexpand.size(); ++i) {
|
for (size_t i = 0; i < qexpand.size(); ++i) {
|
||||||
@ -326,6 +474,7 @@ class ColMaker: public IUpdater {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset position of each data points after split is created in the tree
|
// reset position of each data points after split is created in the tree
|
||||||
inline void ResetPosition(const std::vector<int> &qexpand, IFMatrix *p_fmat, const RegTree &tree) {
|
inline void ResetPosition(const std::vector<int> &qexpand, IFMatrix *p_fmat, const RegTree &tree) {
|
||||||
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();
|
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();
|
||||||
|
|||||||
@ -88,12 +88,21 @@ class IStream {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/*! \brief implementation of file i/o stream */
|
/*! \brief interface of i/o stream that support seek */
|
||||||
class FileStream : public IStream {
|
class ISeekStream: public IStream {
|
||||||
private:
|
|
||||||
std::FILE *fp;
|
|
||||||
public:
|
public:
|
||||||
explicit FileStream(std::FILE *fp) : fp(fp) {
|
/*! \brief seek to certain position of the file */
|
||||||
|
virtual void Seek(long pos) = 0;
|
||||||
|
/*! \brief tell the position of the stream */
|
||||||
|
virtual long Tell(void) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*! \brief implementation of file i/o stream */
|
||||||
|
class FileStream : public ISeekStream {
|
||||||
|
public:
|
||||||
|
explicit FileStream(FILE *fp) : fp(fp) {}
|
||||||
|
explicit FileStream(void) {
|
||||||
|
this->fp = NULL;
|
||||||
}
|
}
|
||||||
virtual size_t Read(void *ptr, size_t size) {
|
virtual size_t Read(void *ptr, size_t size) {
|
||||||
return std::fread(ptr, size, 1, fp);
|
return std::fread(ptr, size, 1, fp);
|
||||||
@ -101,14 +110,21 @@ class FileStream : public IStream {
|
|||||||
virtual void Write(const void *ptr, size_t size) {
|
virtual void Write(const void *ptr, size_t size) {
|
||||||
std::fwrite(ptr, size, 1, fp);
|
std::fwrite(ptr, size, 1, fp);
|
||||||
}
|
}
|
||||||
inline void Seek(size_t pos) {
|
virtual void Seek(long pos) {
|
||||||
std::fseek(fp, 0, SEEK_SET);
|
std::fseek(fp, pos, SEEK_SET);
|
||||||
|
}
|
||||||
|
virtual long Tell(void) {
|
||||||
|
return std::ftell(fp);
|
||||||
}
|
}
|
||||||
inline void Close(void) {
|
inline void Close(void) {
|
||||||
std::fclose(fp);
|
if (fp != NULL){
|
||||||
|
std::fclose(fp); fp = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
|
private:
|
||||||
|
FILE *fp;
|
||||||
|
};
|
||||||
} // namespace utils
|
} // namespace utils
|
||||||
} // namespace xgboost
|
} // namespace xgboost
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -6,8 +6,11 @@
|
|||||||
* \author Tianqi Chen
|
* \author Tianqi Chen
|
||||||
*/
|
*/
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <utility>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include "./io.h"
|
||||||
#include "./utils.h"
|
#include "./utils.h"
|
||||||
|
#include "./omp.h"
|
||||||
|
|
||||||
namespace xgboost {
|
namespace xgboost {
|
||||||
namespace utils {
|
namespace utils {
|
||||||
@ -118,6 +121,142 @@ struct SparseCSRMBuilder {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief a class used to help construct CSR format matrix file
|
||||||
|
* \tparam IndexType type of index used to store the index position
|
||||||
|
* \tparam SizeType type of size used in row pointer
|
||||||
|
*/
|
||||||
|
template<typename IndexType, typename SizeType = size_t>
|
||||||
|
struct SparseCSRFileBuilder {
|
||||||
|
public:
|
||||||
|
explicit SparseCSRFileBuilder(utils::ISeekStream *fo, size_t buffer_size)
|
||||||
|
: fo(fo), buffer_size(buffer_size) {
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief step 1: initialize the number of rows in the data, not necessary exact
|
||||||
|
* \nrows number of rows in the matrix, can be smaller than expected
|
||||||
|
*/
|
||||||
|
inline void InitBudget(size_t nrows = 0) {
|
||||||
|
rptr.clear();
|
||||||
|
rptr.resize(nrows + 1, 0);
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief step 2: add budget to each rows
|
||||||
|
* \param row_id the id of the row
|
||||||
|
* \param nelem number of element budget add to this row
|
||||||
|
*/
|
||||||
|
inline void AddBudget(size_t row_id, SizeType nelem = 1) {
|
||||||
|
if (rptr.size() < row_id + 2) {
|
||||||
|
rptr.resize(row_id + 2, 0);
|
||||||
|
}
|
||||||
|
rptr[row_id + 1] += nelem;
|
||||||
|
}
|
||||||
|
/*! \brief step 3: initialize the necessary storage */
|
||||||
|
inline void InitStorage(void) {
|
||||||
|
SizeType nelem = 0;
|
||||||
|
for (size_t i = 1; i < rptr.size(); i++) {
|
||||||
|
nelem += rptr[i];
|
||||||
|
rptr[i] = nelem;
|
||||||
|
}
|
||||||
|
begin_data = static_cast<SizeType>(fo->Tell()) + sizeof(SizeType);
|
||||||
|
SizeType begin_meta = begin_data + nelem * sizeof(IndexType);
|
||||||
|
fo->Write(&begin_meta, sizeof(begin_meta));
|
||||||
|
fo->Seek(begin_meta);
|
||||||
|
fo->Write(rptr);
|
||||||
|
// setup buffer space
|
||||||
|
buffer_rptr.resize(rptr.size());
|
||||||
|
buffer_temp.reserve(buffer_size);
|
||||||
|
buffer_data.resize(buffer_size);
|
||||||
|
saved_offset = rptr;
|
||||||
|
saved_offset.resize(rptr.size() - 1);
|
||||||
|
this->ClearBuffer();
|
||||||
|
}
|
||||||
|
/*! \brief step 4: push element into buffer */
|
||||||
|
inline void PushElem(SizeType row_id, IndexType col_id) {
|
||||||
|
if (buffer_temp.size() == buffer_size) {
|
||||||
|
this->WriteBuffer();
|
||||||
|
this->ClearBuffer();
|
||||||
|
}
|
||||||
|
buffer_rptr[row_id + 1] += 1;
|
||||||
|
buffer_temp.push_back(std::make_pair(row_id, col_id));
|
||||||
|
}
|
||||||
|
/*! \brief finalize the construction */
|
||||||
|
inline void Finalize(void) {
|
||||||
|
this->WriteBuffer();
|
||||||
|
for (size_t i = 0; i < saved_offset.size(); ++i) {
|
||||||
|
utils::Assert(saved_offset[i] == rptr[i+1], "some block not write out");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*! \brief content must be in wb+ */
|
||||||
|
template<typename Comparator>
|
||||||
|
inline void SortRows(Comparator comp, size_t step) {
|
||||||
|
for (size_t i = 0; i < rptr.size() - 1; i += step) {
|
||||||
|
bst_omp_uint begin = static_cast<bst_omp_uint>(i);
|
||||||
|
bst_omp_uint end = static_cast<bst_omp_uint>(std::min(rptr.size() - 1, i + step));
|
||||||
|
if (rptr[end] != rptr[begin]) {
|
||||||
|
fo->Seek(begin_data + rptr[begin] * sizeof(IndexType));
|
||||||
|
buffer_data.resize(rptr[end] - rptr[begin]);
|
||||||
|
fo->Read(BeginPtr(buffer_data), (rptr[end] - rptr[begin]) * sizeof(IndexType));
|
||||||
|
// do parallel sorting
|
||||||
|
#pragma omp parallel for schedule(static)
|
||||||
|
for (bst_omp_uint j = begin; j < end; ++j) {
|
||||||
|
std::sort(&buffer_data[0] + rptr[j] - rptr[begin],
|
||||||
|
&buffer_data[0] + rptr[j+1] - rptr[begin],
|
||||||
|
comp);
|
||||||
|
}
|
||||||
|
fo->Seek(begin_data + rptr[begin] * sizeof(IndexType));
|
||||||
|
fo->Write(BeginPtr(buffer_data), (rptr[end] - rptr[begin]) * sizeof(IndexType));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printf("CSV::begin_dat=%lu\n", begin_data);
|
||||||
|
}
|
||||||
|
protected:
|
||||||
|
inline void WriteBuffer(void) {
|
||||||
|
SizeType start = 0;
|
||||||
|
for (size_t i = 1; i < buffer_rptr.size(); ++i) {
|
||||||
|
size_t rlen = buffer_rptr[i];
|
||||||
|
buffer_rptr[i] = start;
|
||||||
|
start += rlen;
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < buffer_temp.size(); ++i) {
|
||||||
|
SizeType &rp = buffer_rptr[buffer_temp[i].first + 1];
|
||||||
|
buffer_data[rp++] = buffer_temp[i].second;
|
||||||
|
}
|
||||||
|
// write out
|
||||||
|
for (size_t i = 0; i < buffer_rptr.size() - 1; ++i) {
|
||||||
|
size_t nelem = buffer_rptr[i+1] - buffer_rptr[i];
|
||||||
|
if (nelem != 0) {
|
||||||
|
utils::Assert(saved_offset[i] + nelem <= rptr[i+1], "data exceed bound");
|
||||||
|
fo->Seek(saved_offset[i] * sizeof(IndexType) + begin_data);
|
||||||
|
fo->Write(&buffer_data[0] + buffer_rptr[i], nelem * sizeof(IndexType));
|
||||||
|
saved_offset[i] += nelem;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
inline void ClearBuffer(void) {
|
||||||
|
buffer_temp.clear();
|
||||||
|
std::fill(buffer_rptr.begin(), buffer_rptr.end(), 0);
|
||||||
|
}
|
||||||
|
private:
|
||||||
|
/*! \brief output file pointer the data */
|
||||||
|
utils::ISeekStream *fo;
|
||||||
|
/*! \brief pointer to each of the row */
|
||||||
|
std::vector<SizeType> rptr;
|
||||||
|
/*! \brief saved top space of each item */
|
||||||
|
std::vector<SizeType> saved_offset;
|
||||||
|
/*! \brief beginning position of data */
|
||||||
|
size_t begin_data;
|
||||||
|
// ----- the following are buffer space
|
||||||
|
/*! \brief maximum size of content buffer*/
|
||||||
|
size_t buffer_size;
|
||||||
|
/*! \brief store the data content */
|
||||||
|
std::vector< std::pair<SizeType, IndexType> > buffer_temp;
|
||||||
|
/*! \brief saved top space of each item */
|
||||||
|
std::vector<SizeType> buffer_rptr;
|
||||||
|
/*! \brief saved top space of each item */
|
||||||
|
std::vector<IndexType> buffer_data;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace utils
|
} // namespace utils
|
||||||
} // namespace xgboost
|
} // namespace xgboost
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
146
src/utils/thread.h
Normal file
146
src/utils/thread.h
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
#ifndef XGBOOST_UTILS_THREAD_H
|
||||||
|
#define XGBOOST_UTILS_THREAD_H
|
||||||
|
/*!
|
||||||
|
* \file thread.h
|
||||||
|
* \brief this header include the minimum necessary resource for multi-threading
|
||||||
|
* \author Tianqi Chen
|
||||||
|
* Acknowledgement: this file is adapted from SVDFeature project, by same author.
|
||||||
|
* The MAC support part of this code is provided by Artemy Kolchinsky
|
||||||
|
*/
|
||||||
|
#ifdef _MSC_VER
|
||||||
|
#include "utils.h"
|
||||||
|
#include <windows.h>
|
||||||
|
#include <process.h>
|
||||||
|
namespace xgboost {
|
||||||
|
namespace utils {
|
||||||
|
/*! \brief simple semaphore used for synchronization */
|
||||||
|
class Semaphore {
|
||||||
|
public :
|
||||||
|
inline void Init(int init_val) {
|
||||||
|
sem = CreateSemaphore(NULL, init_val, 10, NULL);
|
||||||
|
utils::Assert(sem != NULL, "create Semaphore error");
|
||||||
|
}
|
||||||
|
inline void Destroy(void) {
|
||||||
|
CloseHandle(sem);
|
||||||
|
}
|
||||||
|
inline void Wait(void) {
|
||||||
|
utils::Assert(WaitForSingleObject(sem, INFINITE) == WAIT_OBJECT_0, "WaitForSingleObject error");
|
||||||
|
}
|
||||||
|
inline void Post(void) {
|
||||||
|
utils::Assert(ReleaseSemaphore(sem, 1, NULL) != 0, "ReleaseSemaphore error");
|
||||||
|
}
|
||||||
|
private:
|
||||||
|
HANDLE sem;
|
||||||
|
};
|
||||||
|
/*! \brief simple thread that wraps windows thread */
|
||||||
|
class Thread {
|
||||||
|
private:
|
||||||
|
HANDLE thread_handle;
|
||||||
|
unsigned thread_id;
|
||||||
|
public:
|
||||||
|
inline void Start(unsigned int __stdcall entry(void*), void *param) {
|
||||||
|
thread_handle = (HANDLE)_beginthreadex(NULL, 0, entry, param, 0, &thread_id);
|
||||||
|
}
|
||||||
|
inline int Join(void) {
|
||||||
|
WaitForSingleObject(thread_handle, INFINITE);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
/*! \brief exit function called from thread */
|
||||||
|
inline void ThreadExit(void *status) {
|
||||||
|
_endthreadex(0);
|
||||||
|
}
|
||||||
|
#define XGBOOST_THREAD_PREFIX unsigned int __stdcall
|
||||||
|
} // namespace utils
|
||||||
|
} // namespace xgboost
|
||||||
|
#else
|
||||||
|
// thread interface using g++
|
||||||
|
#include <semaphore.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
namespace xgboost {
|
||||||
|
namespace utils {
|
||||||
|
/*!\brief semaphore class */
|
||||||
|
class Semaphore {
|
||||||
|
#ifdef __APPLE__
|
||||||
|
private:
|
||||||
|
sem_t* semPtr;
|
||||||
|
char sema_name[20];
|
||||||
|
private:
|
||||||
|
inline void GenRandomString(char *s, const int len) {
|
||||||
|
static const char alphanum[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" ;
|
||||||
|
for (int i = 0; i < len; ++i) {
|
||||||
|
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
|
||||||
|
}
|
||||||
|
s[len] = 0;
|
||||||
|
}
|
||||||
|
public:
|
||||||
|
inline void Init(int init_val) {
|
||||||
|
sema_name[0]='/';
|
||||||
|
sema_name[1]='s';
|
||||||
|
sema_name[2]='e';
|
||||||
|
sema_name[3]='/';
|
||||||
|
GenRandomString(&sema_name[4], 16);
|
||||||
|
if((semPtr = sem_open(sema_name, O_CREAT, 0644, init_val)) == SEM_FAILED) {
|
||||||
|
perror("sem_open");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
utils::Assert(semPtr != NULL, "create Semaphore error");
|
||||||
|
}
|
||||||
|
inline void Destroy(void) {
|
||||||
|
if (sem_close(semPtr) == -1) {
|
||||||
|
perror("sem_close");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
if (sem_unlink(sema_name) == -1) {
|
||||||
|
perror("sem_unlink");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
inline void Wait(void) {
|
||||||
|
sem_wait(semPtr);
|
||||||
|
}
|
||||||
|
inline void Post(void) {
|
||||||
|
sem_post(semPtr);
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
private:
|
||||||
|
sem_t sem;
|
||||||
|
public:
|
||||||
|
inline void Init(int init_val) {
|
||||||
|
sem_init(&sem, 0, init_val);
|
||||||
|
}
|
||||||
|
inline void Destroy(void) {
|
||||||
|
sem_destroy(&sem);
|
||||||
|
}
|
||||||
|
inline void Wait(void) {
|
||||||
|
sem_wait(&sem);
|
||||||
|
}
|
||||||
|
inline void Post(void) {
|
||||||
|
sem_post(&sem);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
};
|
||||||
|
/*!\brief simple thread class */
|
||||||
|
class Thread {
|
||||||
|
private:
|
||||||
|
pthread_t thread;
|
||||||
|
public :
|
||||||
|
inline void Start(void * entry(void*), void *param) {
|
||||||
|
pthread_attr_t attr;
|
||||||
|
pthread_attr_init(&attr);
|
||||||
|
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
pthread_create(&thread, &attr, entry, param);
|
||||||
|
}
|
||||||
|
inline int Join(void) {
|
||||||
|
void *status;
|
||||||
|
return pthread_join(thread, &status);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
inline void ThreadExit(void *status) {
|
||||||
|
pthread_exit(status);
|
||||||
|
}
|
||||||
|
} // namespace utils
|
||||||
|
} // namespace xgboost
|
||||||
|
#define XGBOOST_THREAD_PREFIX void *
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
203
src/utils/thread_buffer.h
Normal file
203
src/utils/thread_buffer.h
Normal file
@ -0,0 +1,203 @@
|
|||||||
|
#ifndef XGBOOST_UTILS_THREAD_BUFFER_H
|
||||||
|
#define XGBOOST_UTILS_THREAD_BUFFER_H
|
||||||
|
/*!
|
||||||
|
* \file thread_buffer.h
|
||||||
|
* \brief multi-thread buffer, iterator, can be used to create parallel pipeline
|
||||||
|
* \author Tianqi Chen
|
||||||
|
*/
|
||||||
|
#include <vector>
|
||||||
|
#include <cstring>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include "./utils.h"
|
||||||
|
#include "./thread.h"
|
||||||
|
namespace xgboost {
|
||||||
|
namespace utils {
|
||||||
|
/*!
|
||||||
|
* \brief buffered loading iterator that uses multithread
|
||||||
|
* this template method will assume the following paramters
|
||||||
|
* \tparam Elem elememt type to be buffered
|
||||||
|
* \tparam ElemFactory factory type to implement in order to use thread buffer
|
||||||
|
*/
|
||||||
|
template<typename Elem, typename ElemFactory>
|
||||||
|
class ThreadBuffer {
|
||||||
|
public:
|
||||||
|
/*!\brief constructor */
|
||||||
|
ThreadBuffer(void) {
|
||||||
|
this->init_end = false;
|
||||||
|
this->buf_size = 30;
|
||||||
|
}
|
||||||
|
~ThreadBuffer(void) {
|
||||||
|
if(init_end) this->Destroy();
|
||||||
|
}
|
||||||
|
/*!\brief set parameter, will also pass the parameter to factory */
|
||||||
|
inline void SetParam(const char *name, const char *val) {
|
||||||
|
if (!strcmp( name, "buffer_size")) buf_size = atoi(val);
|
||||||
|
factory.SetParam(name, val);
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief initalize the buffered iterator
|
||||||
|
* \param param a initialize parameter that will pass to factory, ignore it if not necessary
|
||||||
|
* \return false if the initlization can't be done, e.g. buffer file hasn't been created
|
||||||
|
*/
|
||||||
|
inline bool Init(void) {
|
||||||
|
if (!factory.Init()) return false;
|
||||||
|
for (int i = 0; i < buf_size; ++i) {
|
||||||
|
bufA.push_back(factory.Create());
|
||||||
|
bufB.push_back(factory.Create());
|
||||||
|
}
|
||||||
|
this->init_end = true;
|
||||||
|
this->StartLoader();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
/*!\brief place the iterator before first value */
|
||||||
|
inline void BeforeFirst(void) {
|
||||||
|
// wait till last loader end
|
||||||
|
loading_end.Wait();
|
||||||
|
// critcal zone
|
||||||
|
current_buf = 1;
|
||||||
|
factory.BeforeFirst();
|
||||||
|
// reset terminate limit
|
||||||
|
endA = endB = buf_size;
|
||||||
|
// wake up loader for first part
|
||||||
|
loading_need.Post();
|
||||||
|
// wait til first part is loaded
|
||||||
|
loading_end.Wait();
|
||||||
|
// set current buf to right value
|
||||||
|
current_buf = 0;
|
||||||
|
// wake loader for next part
|
||||||
|
data_loaded = false;
|
||||||
|
loading_need.Post();
|
||||||
|
// set buffer value
|
||||||
|
buf_index = 0;
|
||||||
|
}
|
||||||
|
/*! \brief destroy the buffer iterator, will deallocate the buffer */
|
||||||
|
inline void Destroy(void) {
|
||||||
|
// wait until the signal is consumed
|
||||||
|
this->destroy_signal = true;
|
||||||
|
loading_need.Post();
|
||||||
|
loader_thread.Join();
|
||||||
|
loading_need.Destroy();
|
||||||
|
loading_end.Destroy();
|
||||||
|
for (size_t i = 0; i < bufA.size(); ++i) {
|
||||||
|
factory.FreeSpace(bufA[i]);
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < bufB.size(); ++i) {
|
||||||
|
factory.FreeSpace(bufB[i]);
|
||||||
|
}
|
||||||
|
bufA.clear(); bufB.clear();
|
||||||
|
factory.Destroy();
|
||||||
|
this->init_end = false;
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief get the next element needed in buffer
|
||||||
|
* \param elem element to store into
|
||||||
|
* \return whether reaches end of data
|
||||||
|
*/
|
||||||
|
inline bool Next(Elem &elem) {
|
||||||
|
// end of buffer try to switch
|
||||||
|
if (buf_index == buf_size) {
|
||||||
|
this->SwitchBuffer();
|
||||||
|
buf_index = 0;
|
||||||
|
}
|
||||||
|
if (buf_index >= (current_buf ? endA : endB)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
std::vector<Elem> &buf = current_buf ? bufA : bufB;
|
||||||
|
elem = buf[buf_index];
|
||||||
|
++buf_index;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
/*!
|
||||||
|
* \brief get the factory object
|
||||||
|
*/
|
||||||
|
inline ElemFactory &get_factory(void) {
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
inline const ElemFactory &get_factory(void) const{
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
// size of buffer
|
||||||
|
int buf_size;
|
||||||
|
private:
|
||||||
|
// factory object used to load configures
|
||||||
|
ElemFactory factory;
|
||||||
|
// index in current buffer
|
||||||
|
int buf_index;
|
||||||
|
// indicate which one is current buffer
|
||||||
|
int current_buf;
|
||||||
|
// max limit of visit, also marks termination
|
||||||
|
int endA, endB;
|
||||||
|
// double buffer, one is accessed by loader
|
||||||
|
// the other is accessed by consumer
|
||||||
|
// buffer of the data
|
||||||
|
std::vector<Elem> bufA, bufB;
|
||||||
|
// initialization end
|
||||||
|
bool init_end;
|
||||||
|
// singal whether the data is loaded
|
||||||
|
bool data_loaded;
|
||||||
|
// signal to kill the thread
|
||||||
|
bool destroy_signal;
|
||||||
|
// thread object
|
||||||
|
Thread loader_thread;
|
||||||
|
// signal of the buffer
|
||||||
|
Semaphore loading_end, loading_need;
|
||||||
|
/*!
|
||||||
|
* \brief slave thread
|
||||||
|
* this implementation is like producer-consumer style
|
||||||
|
*/
|
||||||
|
inline void RunLoader(void) {
|
||||||
|
while(!destroy_signal) {
|
||||||
|
// sleep until loading is needed
|
||||||
|
loading_need.Wait();
|
||||||
|
std::vector<Elem> &buf = current_buf ? bufB : bufA;
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < buf_size ; ++i) {
|
||||||
|
if (!factory.LoadNext(buf[i])) {
|
||||||
|
int &end = current_buf ? endB : endA;
|
||||||
|
end = i; // marks the termination
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// signal that loading is done
|
||||||
|
data_loaded = true;
|
||||||
|
loading_end.Post();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*!\brief entry point of loader thread */
|
||||||
|
inline static XGBOOST_THREAD_PREFIX LoaderEntry(void *pthread) {
|
||||||
|
static_cast< ThreadBuffer<Elem,ElemFactory>* >(pthread)->RunLoader();
|
||||||
|
ThreadExit(NULL);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
/*!\brief start loader thread */
|
||||||
|
inline void StartLoader(void) {
|
||||||
|
destroy_signal = false;
|
||||||
|
// set param
|
||||||
|
current_buf = 1;
|
||||||
|
loading_need.Init(1);
|
||||||
|
loading_end .Init(0);
|
||||||
|
// reset terminate limit
|
||||||
|
endA = endB = buf_size;
|
||||||
|
loader_thread.Start(LoaderEntry, this);
|
||||||
|
// wait until first part of data is loaded
|
||||||
|
loading_end.Wait();
|
||||||
|
// set current buf to right value
|
||||||
|
current_buf = 0;
|
||||||
|
// wake loader for next part
|
||||||
|
data_loaded = false;
|
||||||
|
loading_need.Post();
|
||||||
|
buf_index = 0;
|
||||||
|
}
|
||||||
|
/*!\brief switch double buffer */
|
||||||
|
inline void SwitchBuffer(void) {
|
||||||
|
loading_end.Wait();
|
||||||
|
// loader shall be sleep now, critcal zone!
|
||||||
|
current_buf = !current_buf;
|
||||||
|
// wake up loader
|
||||||
|
data_loaded = false;
|
||||||
|
loading_need.Post();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} // namespace utils
|
||||||
|
} // namespace xgboost
|
||||||
|
#endif
|
||||||
@ -213,6 +213,71 @@ class DMatrix:
|
|||||||
self.handle, (ctypes.c_int*len(rindex))(*rindex), len(rindex)))
|
self.handle, (ctypes.c_int*len(rindex))(*rindex), len(rindex)))
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
class CVPack:
|
||||||
|
def __init__(self, dtrain, dtest, param):
|
||||||
|
self.dtrain = dtrain
|
||||||
|
self.dtest = dtest
|
||||||
|
self.watchlist = watchlist = [ (dtrain,'train'), (dtest, 'test') ]
|
||||||
|
self.bst = Booster(param, [dtrain,dtest])
|
||||||
|
def update(self,r):
|
||||||
|
self.bst.update(self.dtrain, r)
|
||||||
|
def eval(self,r):
|
||||||
|
return self.bst.eval_set(self.watchlist, r)
|
||||||
|
|
||||||
|
def mknfold(dall, nfold, param, seed, weightscale=None):
|
||||||
|
"""
|
||||||
|
mk nfold list of cvpack from randidx
|
||||||
|
"""
|
||||||
|
randidx = range(dall.num_row())
|
||||||
|
random.seed(seed)
|
||||||
|
random.shuffle(randidx)
|
||||||
|
|
||||||
|
idxset = []
|
||||||
|
kstep = len(randidx) / nfold
|
||||||
|
for i in range(nfold):
|
||||||
|
idxset.append(randidx[ (i*kstep) : min(len(randidx),(i+1)*kstep) ])
|
||||||
|
|
||||||
|
ret = []
|
||||||
|
for k in range(nfold):
|
||||||
|
trainlst = []
|
||||||
|
for j in range(nfold):
|
||||||
|
if j == k:
|
||||||
|
testlst = idxset[j]
|
||||||
|
else:
|
||||||
|
trainlst += idxset[j]
|
||||||
|
dtrain = dall.slice(trainlst)
|
||||||
|
dtest = dall.slice(testlst)
|
||||||
|
# rescale weight of dtrain and dtest
|
||||||
|
if weightscale != None:
|
||||||
|
dtrain.set_weight( dtrain.get_weight() * weightscale * dall.num_row() / dtrain.num_row() )
|
||||||
|
dtest.set_weight( dtest.get_weight() * weightscale * dall.num_row() / dtest.num_row() )
|
||||||
|
|
||||||
|
ret.append(CVPack(dtrain, dtest, param))
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def aggcv(rlist):
|
||||||
|
"""
|
||||||
|
aggregate cross validation results
|
||||||
|
"""
|
||||||
|
cvmap = {}
|
||||||
|
arr = rlist[0].split()
|
||||||
|
ret = arr[0]
|
||||||
|
for it in arr[1:]:
|
||||||
|
k, v = it.split(':')
|
||||||
|
cvmap[k] = [float(v)]
|
||||||
|
for line in rlist[1:]:
|
||||||
|
arr = line.split()
|
||||||
|
assert ret == arr[0]
|
||||||
|
for it in arr[1:]:
|
||||||
|
k, v = it.split(':')
|
||||||
|
cvmap[k].append(float(v))
|
||||||
|
|
||||||
|
for k, v in sorted(cvmap.items(), key = lambda x:x[0]):
|
||||||
|
v = np.array(v)
|
||||||
|
ret += '\t%s:%f+%f' % (k, np.mean(v), np.std(v))
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
class Booster:
|
class Booster:
|
||||||
"""learner class """
|
"""learner class """
|
||||||
def __init__(self, params={}, cache=[], model_file = None):
|
def __init__(self, params={}, cache=[], model_file = None):
|
||||||
@ -290,6 +355,7 @@ class Booster:
|
|||||||
(ctypes.c_float*len(grad))(*grad),
|
(ctypes.c_float*len(grad))(*grad),
|
||||||
(ctypes.c_float*len(hess))(*hess),
|
(ctypes.c_float*len(hess))(*hess),
|
||||||
len(grad))
|
len(grad))
|
||||||
|
|
||||||
def eval_set(self, evals, it = 0, feval = None):
|
def eval_set(self, evals, it = 0, feval = None):
|
||||||
"""evaluates by metric
|
"""evaluates by metric
|
||||||
Args:
|
Args:
|
||||||
@ -325,7 +391,6 @@ class Booster:
|
|||||||
the dmatrix storing the input
|
the dmatrix storing the input
|
||||||
output_margin: bool
|
output_margin: bool
|
||||||
whether output raw margin value that is untransformed
|
whether output raw margin value that is untransformed
|
||||||
|
|
||||||
ntree_limit: limit number of trees in prediction, default to 0, 0 means using all the trees
|
ntree_limit: limit number of trees in prediction, default to 0, 0 means using all the trees
|
||||||
Returns:
|
Returns:
|
||||||
numpy array of prediction
|
numpy array of prediction
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user