[LIBXGBOOST] pass demo running.

This commit is contained in:
tqchen
2016-01-05 21:49:48 -08:00
parent cee148ed64
commit d75e3ed05d
59 changed files with 1611 additions and 1845 deletions

View File

@@ -1,229 +0,0 @@
// Copyright by Contributors
#define _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_DEPRECATE
#define NOMINMAX
#include <string>
#include "../utils/io.h"
// implements a single no split version of DMLC
// in case we want to avoid dependency on dmlc-core
namespace xgboost {
namespace utils {
/*!
* \brief line split implementation from single FILE
* simply returns lines of files, used for stdin
*/
class SingleFileSplit : public dmlc::InputSplit {
public:
explicit SingleFileSplit(const char *fname)
: use_stdin_(false),
chunk_begin_(NULL), chunk_end_(NULL) {
if (!std::strcmp(fname, "stdin")) {
#ifndef XGBOOST_STRICT_CXX98_
use_stdin_ = true; fp_ = stdin;
#endif
}
if (!use_stdin_) {
fp_ = utils::FopenCheck(fname, "rb");
}
buffer_.resize(kBufferSize);
}
virtual ~SingleFileSplit(void) {
if (!use_stdin_) std::fclose(fp_);
}
virtual size_t Read(void *ptr, size_t size) {
return std::fread(ptr, 1, size, fp_);
}
virtual void Write(const void *ptr, size_t size) {
utils::Error("cannot do write in inputsplit");
}
virtual void BeforeFirst(void) {
std::fseek(fp_, 0, SEEK_SET);
}
virtual bool NextRecord(Blob *out_rec) {
if (chunk_begin_ == chunk_end_) {
if (!LoadChunk()) return false;
}
char *next = FindNextRecord(chunk_begin_,
chunk_end_);
out_rec->dptr = chunk_begin_;
out_rec->size = next - chunk_begin_;
chunk_begin_ = next;
return true;
}
virtual bool NextChunk(Blob *out_chunk) {
if (chunk_begin_ == chunk_end_) {
if (!LoadChunk()) return false;
}
out_chunk->dptr = chunk_begin_;
out_chunk->size = chunk_end_ - chunk_begin_;
chunk_begin_ = chunk_end_;
return true;
}
inline bool ReadChunk(void *buf, size_t *size) {
size_t max_size = *size;
if (max_size <= overflow_.length()) {
*size = 0; return true;
}
if (overflow_.length() != 0) {
std::memcpy(buf, BeginPtr(overflow_), overflow_.length());
}
size_t olen = overflow_.length();
overflow_.resize(0);
size_t nread = this->Read(reinterpret_cast<char*>(buf) + olen,
max_size - olen);
nread += olen;
if (nread == 0) return false;
if (nread != max_size) {
*size = nread;
return true;
} else {
const char *bptr = reinterpret_cast<const char*>(buf);
// return the last position where a record starts
const char *bend = this->FindLastRecordBegin(bptr, bptr + max_size);
*size = bend - bptr;
overflow_.resize(max_size - *size);
if (overflow_.length() != 0) {
std::memcpy(BeginPtr(overflow_), bend, overflow_.length());
}
return true;
}
}
protected:
inline const char* FindLastRecordBegin(const char *begin,
const char *end) {
if (begin == end) return begin;
for (const char *p = end - 1; p != begin; --p) {
if (*p == '\n' || *p == '\r') return p + 1;
}
return begin;
}
inline char* FindNextRecord(char *begin, char *end) {
char *p;
for (p = begin; p != end; ++p) {
if (*p == '\n' || *p == '\r') break;
}
for (; p != end; ++p) {
if (*p != '\n' && *p != '\r') return p;
}
return end;
}
inline bool LoadChunk(void) {
while (true) {
size_t size = buffer_.length();
if (!ReadChunk(BeginPtr(buffer_), &size)) return false;
if (size == 0) {
buffer_.resize(buffer_.length() * 2);
} else {
chunk_begin_ = reinterpret_cast<char *>(BeginPtr(buffer_));
chunk_end_ = chunk_begin_ + size;
break;
}
}
return true;
}
private:
// buffer size
static const size_t kBufferSize = 1 << 18UL;
// file
std::FILE *fp_;
bool use_stdin_;
// internal overflow
std::string overflow_;
// internal buffer
std::string buffer_;
// beginning of chunk
char *chunk_begin_;
// end of chunk
char *chunk_end_;
};
class StdFile : public dmlc::Stream {
public:
explicit StdFile(std::FILE *fp, bool use_stdio)
: fp(fp), use_stdio(use_stdio) {
}
virtual ~StdFile(void) {
this->Close();
}
virtual size_t Read(void *ptr, size_t size) {
return std::fread(ptr, 1, size, fp);
}
virtual void Write(const void *ptr, size_t size) {
Check(std::fwrite(ptr, size, 1, fp) == 1, "StdFile::Write: fwrite error!");
}
virtual void Seek(size_t pos) {
std::fseek(fp, static_cast<long>(pos), SEEK_SET); // NOLINT(*)
}
virtual size_t Tell(void) {
return std::ftell(fp);
}
virtual bool AtEnd(void) const {
return std::feof(fp) != 0;
}
inline void Close(void) {
if (fp != NULL && !use_stdio) {
std::fclose(fp); fp = NULL;
}
}
private:
std::FILE *fp;
bool use_stdio;
};
} // namespace utils
} // namespace xgboost
namespace dmlc {
InputSplit* InputSplit::Create(const char *uri,
unsigned part,
unsigned nsplit,
const char *type) {
using namespace std;
using namespace xgboost;
const char *msg = "xgboost is compiled in local mode\n"\
"to use hdfs, s3 or distributed version, compile with make dmlc=1";
utils::Check(strncmp(uri, "s3://", 5) != 0, msg);
utils::Check(strncmp(uri, "hdfs://", 7) != 0, msg);
utils::Check(nsplit == 1, msg);
return new utils::SingleFileSplit(uri);
}
Stream *Stream::Create(const char *fname, const char * const mode, bool allow_null) {
using namespace std;
using namespace xgboost;
const char *msg = "xgboost is compiled in local mode\n"\
"to use hdfs, s3 or distributed version, compile with make dmlc=1";
utils::Check(strncmp(fname, "s3://", 5) != 0, msg);
utils::Check(strncmp(fname, "hdfs://", 7) != 0, msg);
std::FILE *fp = NULL;
bool use_stdio = false;
using namespace std;
#ifndef XGBOOST_STRICT_CXX98_
if (!strcmp(fname, "stdin")) {
use_stdio = true; fp = stdin;
}
if (!strcmp(fname, "stdout")) {
use_stdio = true; fp = stdout;
}
#endif
if (!strncmp(fname, "file://", 7)) fname += 7;
if (!use_stdio) {
std::string flag = mode;
if (flag == "w") flag = "wb";
if (flag == "r") flag = "rb";
fp = fopen64(fname, flag.c_str());
}
if (fp != NULL) {
return new utils::StdFile(fp, use_stdio);
} else {
utils::Check(allow_null, "fail to open file %s", fname);
return NULL;
}
}
} // namespace dmlc

View File

@@ -1,212 +0,0 @@
/*!
* Copyright (c) 2015 by Contributors
* \file libsvm_parser.h
* \brief iterator parser to parse libsvm format
* \author Tianqi Chen
*/
#ifndef XGBOOST_IO_LIBSVM_PARSER_H_
#define XGBOOST_IO_LIBSVM_PARSER_H_
#define NOMINMAX
#include <vector>
#include <cstring>
#include <cctype>
#include <algorithm>
#include "../utils/omp.h"
#include "../utils/utils.h"
#include "../sync/sync.h"
#include "../utils/thread_buffer.h"
#include "./sparse_batch_page.h"
namespace xgboost {
namespace io {
/*! \brief page returned by libsvm parser */
struct LibSVMPage : public SparsePage {
std::vector<float> label;
// overload clear
inline void Clear() {
SparsePage::Clear();
label.clear();
}
};
/*!
* \brief libsvm parser that parses the input lines
* and returns rows in input data
* factory that was used by threadbuffer template
*/
class LibSVMPageFactory {
public:
LibSVMPageFactory()
: bytes_read_(0), at_head_(true) {
}
inline bool Init(void) {
return true;
}
inline void Setup(dmlc::InputSplit *source,
int nthread) {
source_ = source;
int maxthread;
#pragma omp parallel
{
maxthread = omp_get_num_procs();
}
maxthread = std::max(maxthread / 2, 1);
nthread_ = std::min(maxthread, nthread);
}
inline void SetParam(const char *name, const char *val) {}
inline bool LoadNext(std::vector<LibSVMPage> *data) {
return FillData(data);
}
inline void FreeSpace(std::vector<LibSVMPage> *a) {
delete a;
}
inline std::vector<LibSVMPage> *Create(void) {
return new std::vector<LibSVMPage>();
}
inline void BeforeFirst(void) {
utils::Assert(at_head_, "cannot call beforefirst");
}
inline void Destroy(void) {
delete source_;
}
inline size_t bytes_read(void) const {
return bytes_read_;
}
protected:
inline bool FillData(std::vector<LibSVMPage> *data) {
dmlc::InputSplit::Blob chunk;
if (!source_->NextChunk(&chunk)) return false;
int nthread;
#pragma omp parallel num_threads(nthread_)
{
nthread = omp_get_num_threads();
}
// reserve space for data
data->resize(nthread);
bytes_read_ += chunk.size;
utils::Assert(chunk.size != 0, "LibSVMParser.FileData");
char *head = reinterpret_cast<char*>(chunk.dptr);
#pragma omp parallel num_threads(nthread_)
{
// threadid
int tid = omp_get_thread_num();
size_t nstep = (chunk.size + nthread - 1) / nthread;
size_t sbegin = std::min(tid * nstep, chunk.size);
size_t send = std::min((tid + 1) * nstep, chunk.size);
char *pbegin = BackFindEndLine(head + sbegin, head);
char *pend;
if (tid + 1 == nthread) {
pend = head + send;
} else {
pend = BackFindEndLine(head + send, head);
}
ParseBlock(pbegin, pend, &(*data)[tid]);
}
return true;
}
/*!
* \brief parse data into out
* \param begin beginning of buffer
* \param end end of buffer
*/
inline void ParseBlock(char *begin,
char *end,
LibSVMPage *out) {
using namespace std;
out->Clear();
char *p = begin;
while (p != end) {
while (isspace(*p) && p != end) ++p;
if (p == end) break;
char *head = p;
while (isdigit(*p) && p != end) ++p;
if (*p == ':') {
out->data.push_back(SparseBatch::Entry(atol(head),
static_cast<bst_float>(atof(p + 1))));
} else {
if (out->label.size() != 0) {
out->offset.push_back(out->data.size());
}
out->label.push_back(static_cast<float>(atof(head)));
}
while (!isspace(*p) && p != end) ++p;
}
if (out->label.size() != 0) {
out->offset.push_back(out->data.size());
}
utils::Check(out->label.size() + 1 == out->offset.size(),
"LibSVMParser inconsistent");
}
/*!
* \brief start from bptr, go backward and find first endof line
* \param bptr end position to go backward
* \param begin the beginning position of buffer
* \return position of first endof line going backward
*/
inline char* BackFindEndLine(char *bptr,
char *begin) {
for (; bptr != begin; --bptr) {
if (*bptr == '\n' || *bptr == '\r') return bptr;
}
return begin;
}
private:
// nthread
int nthread_;
// number of bytes readed
size_t bytes_read_;
// at beginning, at end of stream
bool at_head_;
// source split that provides the data
dmlc::InputSplit *source_;
};
class LibSVMParser : public utils::IIterator<LibSVMPage> {
public:
explicit LibSVMParser(dmlc::InputSplit *source,
int nthread)
: at_end_(false), data_ptr_(0), data_(NULL) {
itr.SetParam("buffer_size", "2");
itr.get_factory().Setup(source, nthread);
itr.Init();
}
virtual void BeforeFirst(void) {
itr.BeforeFirst();
}
virtual bool Next(void) {
if (at_end_) return false;
while (true) {
if (data_ == NULL || data_ptr_ >= data_->size()) {
if (!itr.Next(data_)) {
at_end_ = true; return false;
} else {
data_ptr_ = 0;
}
}
while (data_ptr_ < data_->size()) {
data_ptr_ += 1;
if ((*data_)[data_ptr_ - 1].Size() != 0) {
return true;
}
}
}
return true;
}
virtual const LibSVMPage &Value(void) const {
return (*data_)[data_ptr_ - 1];
}
inline size_t bytes_read(void) const {
return itr.get_factory().bytes_read();
}
private:
bool at_end_;
size_t data_ptr_;
std::vector<LibSVMPage> *data_;
utils::ThreadBuffer<std::vector<LibSVMPage>*, LibSVMPageFactory> itr;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_LIBSVM_PARSER_H_

View File

@@ -1,374 +0,0 @@
/*!
* Copyright 2014 by Contributors
* \file simple_fmatrix-inl.hpp
* \brief the input data structure for gradient boosting
* \author Tianqi Chen
*/
#ifndef XGBOOST_IO_SIMPLE_FMATRIX_INL_HPP_
#define XGBOOST_IO_SIMPLE_FMATRIX_INL_HPP_
#include <limits>
#include <algorithm>
#include <vector>
#include "../data.h"
#include "../utils/utils.h"
#include "../utils/random.h"
#include "../utils/omp.h"
#include "../learner/dmatrix.h"
#include "../utils/group_data.h"
#include "./sparse_batch_page.h"
namespace xgboost {
namespace io {
/*!
* \brief sparse matrix that support column access, CSC
*/
class FMatrixS : public IFMatrix {
public:
typedef SparseBatch::Entry Entry;
/*! \brief constructor */
FMatrixS(utils::IIterator<RowBatch> *iter,
const learner::MetaInfo &info)
: info_(info) {
this->iter_ = iter;
}
// destructor
virtual ~FMatrixS(void) {
if (iter_ != NULL) delete iter_;
}
/*! \return whether column access is enabled */
virtual bool HaveColAccess(void) const {
return col_size_.size() != 0;
}
/*! \brief get number of columns */
virtual size_t NumCol(void) const {
utils::Check(this->HaveColAccess(), "NumCol:need column access");
return col_size_.size();
}
/*! \brief get number of buffered rows */
virtual const std::vector<bst_uint> &buffered_rowset(void) const {
return buffered_rowset_;
}
/*! \brief get column size */
virtual size_t GetColSize(size_t cidx) const {
return col_size_[cidx];
}
/*! \brief get column density */
virtual float GetColDensity(size_t cidx) const {
size_t nmiss = buffered_rowset_.size() - col_size_[cidx];
return 1.0f - (static_cast<float>(nmiss)) / buffered_rowset_.size();
}
virtual void InitColAccess(const std::vector<bool> &enabled,
float pkeep, size_t max_row_perbatch) {
if (this->HaveColAccess()) return;
this->InitColData(enabled, pkeep, max_row_perbatch);
}
/*!
* \brief get the row iterator associated with FMatrix
*/
virtual utils::IIterator<RowBatch>* RowIterator(void) {
iter_->BeforeFirst();
return iter_;
}
/*!
* \brief get the column based iterator
*/
virtual utils::IIterator<ColBatch>* ColIterator(void) {
size_t ncol = this->NumCol();
col_iter_.col_index_.resize(ncol);
for (size_t i = 0; i < ncol; ++i) {
col_iter_.col_index_[i] = static_cast<bst_uint>(i);
}
col_iter_.BeforeFirst();
return &col_iter_;
}
/*!
* \brief column based iterator
*/
virtual utils::IIterator<ColBatch> *ColIterator(const std::vector<bst_uint> &fset) {
size_t ncol = this->NumCol();
col_iter_.col_index_.resize(0);
for (size_t i = 0; i < fset.size(); ++i) {
if (fset[i] < ncol) col_iter_.col_index_.push_back(fset[i]);
}
col_iter_.BeforeFirst();
return &col_iter_;
}
/*!
* \brief save column access data into stream
* \param fo output stream to save to
*/
inline void SaveColAccess(utils::IStream &fo) const { // NOLINT(*)
size_t n = 0;
fo.Write(&n, sizeof(n));
}
/*!
* \brief load column access data from stream
* \param fo output stream to load from
*/
inline void LoadColAccess(utils::IStream &fi) { // NOLINT(*)
// do nothing in load col access
}
protected:
/*!
* \brief initialize column data
* \param enabled the list of enabled columns
* \param pkeep probability to keep a row
* \param max_row_perbatch maximum row per batch
*/
inline void InitColData(const std::vector<bool> &enabled,
float pkeep, size_t max_row_perbatch) {
col_iter_.Clear();
if (info_.num_row() < max_row_perbatch) {
SparsePage *page = new SparsePage();
this->MakeOneBatch(enabled, pkeep, page);
col_iter_.cpages_.push_back(page);
} else {
this->MakeManyBatch(enabled, pkeep, max_row_perbatch);
}
// setup col-size
col_size_.resize(info_.num_col());
std::fill(col_size_.begin(), col_size_.end(), 0);
for (size_t i = 0; i < col_iter_.cpages_.size(); ++i) {
SparsePage *pcol = col_iter_.cpages_[i];
for (size_t j = 0; j < pcol->Size(); ++j) {
col_size_[j] += pcol->offset[j + 1] - pcol->offset[j];
}
}
}
/*!
* \brief make column page from iterator
* \param pkeep probability to keep a row
* \param pcol the target column
*/
inline void MakeOneBatch(const std::vector<bool> &enabled,
float pkeep,
SparsePage *pcol) {
// clear rowset
buffered_rowset_.clear();
// bit map
int nthread;
std::vector<bool> bmap;
#pragma omp parallel
{
nthread = omp_get_num_threads();
}
pcol->Clear();
utils::ParallelGroupBuilder<SparseBatch::Entry>
builder(&pcol->offset, &pcol->data);
builder.InitBudget(info_.num_col(), nthread);
// start working
iter_->BeforeFirst();
while (iter_->Next()) {
const RowBatch &batch = iter_->Value();
bmap.resize(bmap.size() + batch.size, true);
long batch_size = static_cast<long>(batch.size); // NOLINT(*)
for (long i = 0; i < batch_size; ++i) { // NOLINT(*)
bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
if (pkeep == 1.0f || random::SampleBinary(pkeep)) {
buffered_rowset_.push_back(ridx);
} else {
bmap[i] = false;
}
}
#pragma omp parallel for schedule(static)
for (long i = 0; i < batch_size; ++i) { // NOLINT(*)
int tid = omp_get_thread_num();
bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
if (bmap[ridx]) {
RowBatch::Inst inst = batch[i];
for (bst_uint j = 0; j < inst.length; ++j) {
if (enabled[inst[j].index]) {
builder.AddBudget(inst[j].index, tid);
}
}
}
}
}
builder.InitStorage();
iter_->BeforeFirst();
while (iter_->Next()) {
const RowBatch &batch = iter_->Value();
#pragma omp parallel for schedule(static)
for (long i = 0; i < static_cast<long>(batch.size); ++i) { // NOLINT(*)
int tid = omp_get_thread_num();
bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
if (bmap[ridx]) {
RowBatch::Inst inst = batch[i];
for (bst_uint j = 0; j < inst.length; ++j) {
if (enabled[inst[j].index]) {
builder.Push(inst[j].index,
Entry((bst_uint)(batch.base_rowid+i),
inst[j].fvalue), tid);
}
}
}
}
}
utils::Assert(pcol->Size() == info_.num_col(),
"inconsistent col data");
// sort columns
bst_omp_uint ncol = static_cast<bst_omp_uint>(pcol->Size());
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread)
for (bst_omp_uint i = 0; i < ncol; ++i) {
if (pcol->offset[i] < pcol->offset[i + 1]) {
std::sort(BeginPtr(pcol->data) + pcol->offset[i],
BeginPtr(pcol->data) + pcol->offset[i + 1],
SparseBatch::Entry::CmpValue);
}
}
}
inline void MakeManyBatch(const std::vector<bool> &enabled,
float pkeep, size_t max_row_perbatch) {
size_t btop = 0;
buffered_rowset_.clear();
// internal temp cache
SparsePage tmp; tmp.Clear();
iter_->BeforeFirst();
while (iter_->Next()) {
const RowBatch &batch = iter_->Value();
for (size_t i = 0; i < batch.size; ++i) {
bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
if (pkeep == 1.0f || random::SampleBinary(pkeep)) {
buffered_rowset_.push_back(ridx);
tmp.Push(batch[i]);
}
if (tmp.Size() >= max_row_perbatch) {
SparsePage *page = new SparsePage();
this->MakeColPage(tmp.GetRowBatch(0),
BeginPtr(buffered_rowset_) + btop,
enabled, page);
col_iter_.cpages_.push_back(page);
btop = buffered_rowset_.size();
tmp.Clear();
}
}
}
if (tmp.Size() != 0) {
SparsePage *page = new SparsePage();
this->MakeColPage(tmp.GetRowBatch(0),
BeginPtr(buffered_rowset_) + btop,
enabled, page);
col_iter_.cpages_.push_back(page);
}
}
// make column page from subset of rowbatchs
inline void MakeColPage(const RowBatch &batch,
const bst_uint *ridx,
const std::vector<bool> &enabled,
SparsePage *pcol) {
int nthread;
#pragma omp parallel
{
nthread = omp_get_num_threads();
int max_nthread = std::max(omp_get_num_procs() / 2 - 2, 1);
if (nthread > max_nthread) {
nthread = max_nthread;
}
}
pcol->Clear();
utils::ParallelGroupBuilder<SparseBatch::Entry>
builder(&pcol->offset, &pcol->data);
builder.InitBudget(info_.num_col(), nthread);
bst_omp_uint ndata = static_cast<bst_uint>(batch.size);
#pragma omp parallel for schedule(static) num_threads(nthread)
for (bst_omp_uint i = 0; i < ndata; ++i) {
int tid = omp_get_thread_num();
RowBatch::Inst inst = batch[i];
for (bst_uint j = 0; j < inst.length; ++j) {
const SparseBatch::Entry &e = inst[j];
if (enabled[e.index]) {
builder.AddBudget(e.index, tid);
}
}
}
builder.InitStorage();
#pragma omp parallel for schedule(static) num_threads(nthread)
for (bst_omp_uint i = 0; i < ndata; ++i) {
int tid = omp_get_thread_num();
RowBatch::Inst inst = batch[i];
for (bst_uint j = 0; j < inst.length; ++j) {
const SparseBatch::Entry &e = inst[j];
builder.Push(e.index,
SparseBatch::Entry(ridx[i], e.fvalue),
tid);
}
}
utils::Assert(pcol->Size() == info_.num_col(), "inconsistent col data");
// sort columns
bst_omp_uint ncol = static_cast<bst_omp_uint>(pcol->Size());
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread)
for (bst_omp_uint i = 0; i < ncol; ++i) {
if (pcol->offset[i] < pcol->offset[i + 1]) {
std::sort(BeginPtr(pcol->data) + pcol->offset[i],
BeginPtr(pcol->data) + pcol->offset[i + 1],
SparseBatch::Entry::CmpValue);
}
}
}
private:
// one batch iterator that return content in the matrix
struct ColBatchIter: utils::IIterator<ColBatch> {
ColBatchIter(void) : data_ptr_(0) {}
virtual ~ColBatchIter(void) {
this->Clear();
}
virtual void BeforeFirst(void) {
data_ptr_ = 0;
}
virtual bool Next(void) {
if (data_ptr_ >= cpages_.size()) return false;
data_ptr_ += 1;
SparsePage *pcol = cpages_[data_ptr_ - 1];
batch_.size = col_index_.size();
col_data_.resize(col_index_.size(), SparseBatch::Inst(NULL, 0));
for (size_t i = 0; i < col_data_.size(); ++i) {
const bst_uint ridx = col_index_[i];
col_data_[i] = SparseBatch::Inst
(BeginPtr(pcol->data) + pcol->offset[ridx],
static_cast<bst_uint>(pcol->offset[ridx + 1] - pcol->offset[ridx]));
}
batch_.col_index = BeginPtr(col_index_);
batch_.col_data = BeginPtr(col_data_);
return true;
}
virtual const ColBatch &Value(void) const {
return batch_;
}
inline void Clear(void) {
for (size_t i = 0; i < cpages_.size(); ++i) {
delete cpages_[i];
}
cpages_.clear();
}
// data content
std::vector<bst_uint> col_index_;
// column content
std::vector<ColBatch::Inst> col_data_;
// column sparse pages
std::vector<SparsePage*> cpages_;
// data pointer
size_t data_ptr_;
// temporal space for batch
ColBatch batch_;
};
// --- data structure used to support InitColAccess --
// column iterator
ColBatchIter col_iter_;
// shared meta info with DMatrix
const learner::MetaInfo &info_;
// row iterator
utils::IIterator<RowBatch> *iter_;
/*! \brief list of row index that are buffered */
std::vector<bst_uint> buffered_rowset_;
// count for column data
std::vector<size_t> col_size_;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_SLICE_FMATRIX_INL_HPP_

View File

@@ -1,272 +0,0 @@
/*!
* Copyright (c) 2014 by Contributors
* \file sparse_batch_page.h
* content holder of sparse batch that can be saved to disk
* the representation can be effectively
* use in external memory computation
* \author Tianqi Chen
*/
#ifndef XGBOOST_IO_SPARSE_BATCH_PAGE_H_
#define XGBOOST_IO_SPARSE_BATCH_PAGE_H_
#include <vector>
#include <algorithm>
#include "../data.h"
namespace xgboost {
namespace io {
/*!
* \brief storage unit of sparse batch
*/
class SparsePage {
public:
/*! \brief offset of the segments */
std::vector<size_t> offset;
/*! \brief the data of the segments */
std::vector<SparseBatch::Entry> data;
/*! \brief constructor */
SparsePage() {
this->Clear();
}
/*! \return number of instance in the page */
inline size_t Size() const {
return offset.size() - 1;
}
/*!
* \brief load only the segments we are interested in
* \param fi the input stream of the file
* \param sorted_index_set sorted index of segments we are interested in
* \return true of the loading as successful, false if end of file was reached
*/
inline bool Load(utils::ISeekStream *fi,
const std::vector<bst_uint> &sorted_index_set) {
if (!fi->Read(&disk_offset_)) return false;
// setup the offset
offset.clear(); offset.push_back(0);
for (size_t i = 0; i < sorted_index_set.size(); ++i) {
bst_uint fid = sorted_index_set[i];
utils::Check(fid + 1 < disk_offset_.size(), "bad col.blob format");
size_t size = disk_offset_[fid + 1] - disk_offset_[fid];
offset.push_back(offset.back() + size);
}
data.resize(offset.back());
// read in the data
size_t begin = fi->Tell();
size_t curr_offset = 0;
for (size_t i = 0; i < sorted_index_set.size();) {
bst_uint fid = sorted_index_set[i];
if (disk_offset_[fid] != curr_offset) {
utils::Assert(disk_offset_[fid] > curr_offset, "fset index was not sorted");
fi->Seek(begin + disk_offset_[fid] * sizeof(SparseBatch::Entry));
curr_offset = disk_offset_[fid];
}
size_t j, size_to_read = 0;
for (j = i; j < sorted_index_set.size(); ++j) {
if (disk_offset_[sorted_index_set[j]] == disk_offset_[fid] + size_to_read) {
size_to_read += offset[j + 1] - offset[j];
} else {
break;
}
}
if (size_to_read != 0) {
utils::Check(fi->Read(BeginPtr(data) + offset[i],
size_to_read * sizeof(SparseBatch::Entry)) != 0,
"Invalid SparsePage file");
curr_offset += size_to_read;
}
i = j;
}
// seek to end of record
if (curr_offset != disk_offset_.back()) {
fi->Seek(begin + disk_offset_.back() * sizeof(SparseBatch::Entry));
}
return true;
}
/*!
* \brief load all the segments
* \param fi the input stream of the file
* \return true of the loading as successful, false if end of file was reached
*/
inline bool Load(utils::IStream *fi) {
if (!fi->Read(&offset)) return false;
utils::Check(offset.size() != 0, "Invalid SparsePage file");
data.resize(offset.back());
if (data.size() != 0) {
utils::Check(fi->Read(BeginPtr(data), data.size() * sizeof(SparseBatch::Entry)) != 0,
"Invalid SparsePage file");
}
return true;
}
/*!
* \brief save the data to fo, when a page was written
* to disk it must contain all the elements in the
* \param fo output stream
*/
inline void Save(utils::IStream *fo) const {
utils::Assert(offset.size() != 0 && offset[0] == 0, "bad offset");
utils::Assert(offset.back() == data.size(), "in consistent SparsePage");
fo->Write(offset);
if (data.size() != 0) {
fo->Write(BeginPtr(data), data.size() * sizeof(SparseBatch::Entry));
}
}
/*! \return estimation of memory cost of this page */
inline size_t MemCostBytes(void) const {
return offset.size() * sizeof(size_t) + data.size() * sizeof(SparseBatch::Entry);
}
/*! \brief clear the page */
inline void Clear(void) {
offset.clear();
offset.push_back(0);
data.clear();
}
/*!
* \brief load all the segments and add it to existing batch
* \param fi the input stream of the file
* \return true of the loading as successful, false if end of file was reached
*/
inline bool PushLoad(utils::IStream *fi) {
if (!fi->Read(&disk_offset_)) return false;
data.resize(offset.back() + disk_offset_.back());
if (disk_offset_.back() != 0) {
utils::Check(fi->Read(BeginPtr(data) + offset.back(),
disk_offset_.back() * sizeof(SparseBatch::Entry)) != 0,
"Invalid SparsePage file");
}
size_t top = offset.back();
size_t begin = offset.size();
offset.resize(offset.size() + disk_offset_.size());
for (size_t i = 0; i < disk_offset_.size(); ++i) {
offset[i + begin] = top + disk_offset_[i];
}
return true;
}
/*!
* \brief Push row batch into the page
* \param batch the row batch
*/
inline void Push(const RowBatch &batch) {
data.resize(offset.back() + batch.ind_ptr[batch.size]);
std::memcpy(BeginPtr(data) + offset.back(),
batch.data_ptr + batch.ind_ptr[0],
sizeof(SparseBatch::Entry) * batch.ind_ptr[batch.size]);
size_t top = offset.back();
size_t begin = offset.size();
offset.resize(offset.size() + batch.size);
for (size_t i = 0; i < batch.size; ++i) {
offset[i + begin] = top + batch.ind_ptr[i + 1] - batch.ind_ptr[0];
}
}
/*!
* \brief Push a sparse page
* \param batch the row page
*/
inline void Push(const SparsePage &batch) {
size_t top = offset.back();
data.resize(top + batch.data.size());
std::memcpy(BeginPtr(data) + top,
BeginPtr(batch.data),
sizeof(SparseBatch::Entry) * batch.data.size());
size_t begin = offset.size();
offset.resize(begin + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {
offset[i + begin] = top + batch.offset[i + 1];
}
}
/*!
* \brief Push one instance into page
* \param row an instance row
*/
inline void Push(const SparseBatch::Inst &inst) {
offset.push_back(offset.back() + inst.length);
size_t begin = data.size();
data.resize(begin + inst.length);
if (inst.length != 0) {
std::memcpy(BeginPtr(data) + begin, inst.data,
sizeof(SparseBatch::Entry) * inst.length);
}
}
/*!
* \param base_rowid base_rowid of the data
* \return row batch representation of the page
*/
inline RowBatch GetRowBatch(size_t base_rowid) const {
RowBatch out;
out.base_rowid = base_rowid;
out.ind_ptr = BeginPtr(offset);
out.data_ptr = BeginPtr(data);
out.size = offset.size() - 1;
return out;
}
private:
/*! \brief external memory column offset */
std::vector<size_t> disk_offset_;
};
/*!
* \brief factory class for SparsePage,
* used in threadbuffer template
*/
class SparsePageFactory {
public:
SparsePageFactory(void)
: action_load_all_(true), set_load_all_(true) {}
inline void SetFile(const utils::FileStream &fi,
size_t file_begin = 0) {
fi_ = fi;
file_begin_ = file_begin;
}
inline const std::vector<bst_uint> &index_set(void) const {
return action_index_set_;
}
// set index set, will be used after next before first
inline void SetIndexSet(const std::vector<bst_uint> &index_set,
bool load_all) {
set_load_all_ = load_all;
if (!set_load_all_) {
set_index_set_ = index_set;
std::sort(set_index_set_.begin(), set_index_set_.end());
}
}
inline bool Init(void) {
return true;
}
inline void SetParam(const char *name, const char *val) {}
inline bool LoadNext(SparsePage *val) {
if (!action_load_all_) {
if (action_index_set_.size() == 0) {
return false;
} else {
return val->Load(&fi_, action_index_set_);
}
} else {
return val->Load(&fi_);
}
}
inline SparsePage *Create(void) {
return new SparsePage();
}
inline void FreeSpace(SparsePage *a) {
delete a;
}
inline void Destroy(void) {
fi_.Close();
}
inline void BeforeFirst(void) {
fi_.Seek(file_begin_);
action_load_all_ = set_load_all_;
if (!set_load_all_) {
action_index_set_ = set_index_set_;
}
}
private:
bool action_load_all_, set_load_all_;
size_t file_begin_;
utils::FileStream fi_;
std::vector<bst_uint> action_index_set_;
std::vector<bst_uint> set_index_set_;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_SPARSE_BATCH_PAGE_H_