Merge branch 'lite' of ssh://github.com/tqchen/xgboost into lite

Conflicts:
	src/io/page_dmatrix-inl.hpp
This commit is contained in:
tqchen 2015-04-17 22:10:56 -07:00
commit 6d9cb3a2fa
7 changed files with 414 additions and 81 deletions

View File

@ -8,45 +8,132 @@
namespace xgboost {
namespace utils {
/*!
* \brief line split implementation from single FILE
* simply returns lines of files, used for stdin
*/
class SingleFileSplit : public dmlc::InputSplit {
public:
explicit SingleFileSplit(const char *fname)
: use_stdin_(false) {
explicit SingleFileSplit(const char *fname)
: use_stdin_(false),
chunk_begin_(NULL), chunk_end_(NULL) {
if (!std::strcmp(fname, "stdin")) {
#ifndef XGBOOST_STRICT_CXX98_
use_stdin_ = true; fp_ = stdin;
#endif
}
if (!use_stdin_) {
fp_ = utils::FopenCheck(fname, "r");
fp_ = utils::FopenCheck(fname, "rb");
}
end_of_file_ = false;
buffer_.resize(kBufferSize);
}
virtual ~SingleFileSplit(void) {
if (!use_stdin_) std::fclose(fp_);
}
virtual bool ReadRecord(std::string *out_data) {
if (end_of_file_) return false;
out_data->clear();
while (true) {
char c = std::fgetc(fp_);
if (c == EOF) {
end_of_file_ = true;
virtual size_t Read(void *ptr, size_t size) {
return std::fread(ptr, 1, size, fp_);
}
virtual void Write(const void *ptr, size_t size) {
utils::Error("cannot do write in inputsplit");
}
virtual bool NextRecord(Blob *out_rec) {
if (chunk_begin_ == chunk_end_) {
if (!LoadChunk()) return false;
}
char *next = FindNextRecord(chunk_begin_,
chunk_end_);
out_rec->dptr = chunk_begin_;
out_rec->size = next - chunk_begin_;
chunk_begin_ = next;
return true;
}
virtual bool NextChunk(Blob *out_chunk) {
if (chunk_begin_ == chunk_end_) {
if (!LoadChunk()) return false;
}
out_chunk->dptr = chunk_begin_;
out_chunk->size = chunk_end_ - chunk_begin_;
chunk_begin_ = chunk_end_;
return true;
}
inline bool ReadChunk(void *buf, size_t *size) {
size_t max_size = *size;
if (max_size <= overflow_.length()) {
*size = 0; return true;
}
if (overflow_.length() != 0) {
std::memcpy(buf, BeginPtr(overflow_), overflow_.length());
}
size_t olen = overflow_.length();
overflow_.resize(0);
size_t nread = this->Read(reinterpret_cast<char*>(buf) + olen,
max_size - olen);
nread += olen;
if (nread == 0) return false;
if (nread != max_size) {
*size = nread;
return true;
} else {
const char *bptr = reinterpret_cast<const char*>(buf);
// return the last position where a record starts
const char *bend = this->FindLastRecordBegin(bptr, bptr + max_size);
*size = bend - bptr;
overflow_.resize(max_size - *size);
if (overflow_.length() != 0) {
std::memcpy(BeginPtr(overflow_), bend, overflow_.length());
}
if (c != '\r' && c != '\n' && c != EOF) {
*out_data += c;
return true;
}
}
protected:
inline const char* FindLastRecordBegin(const char *begin,
const char *end) {
if (begin == end) return begin;
for (const char *p = end - 1; p != begin; --p) {
if (*p == '\n' || *p == '\r') return p + 1;
}
return begin;
}
inline char* FindNextRecord(char *begin, char *end) {
char *p;
for (p = begin; p != end; ++p) {
if (*p == '\n' || *p == '\r') break;
}
for (; p != end; ++p) {
if (*p != '\n' && *p != '\r') return p;
}
return end;
}
inline bool LoadChunk(void) {
while (true) {
size_t size = buffer_.length();
if (!ReadChunk(BeginPtr(buffer_), &size)) return false;
if (size == 0) {
buffer_.resize(buffer_.length() * 2);
} else {
if (out_data->length() != 0) return true;
if (end_of_file_) return false;
chunk_begin_ = reinterpret_cast<char *>(BeginPtr(buffer_));
chunk_end_ = chunk_begin_ + size;
break;
}
}
return false;
}
return true;
}
private:
// buffer size
static const size_t kBufferSize = 1 << 18UL;
// file
std::FILE *fp_;
bool use_stdin_;
bool end_of_file_;
// internal overflow
std::string overflow_;
// internal buffer
std::string buffer_;
// beginning of chunk
char *chunk_begin_;
// end of chunk
char *chunk_end_;
};
class StdFile : public dmlc::Stream {
@ -105,7 +192,8 @@ class StdFile : public dmlc::Stream {
namespace dmlc {
InputSplit* InputSplit::Create(const char *uri,
unsigned part,
unsigned nsplit) {
unsigned nsplit,
const char *type) {
using namespace xgboost;
const char *msg = "xgboost is compiled in local mode\n"\
"to use hdfs, s3 or distributed version, compile with make dmlc=1";

168
src/io/libsvm_parser.h Normal file
View File

@ -0,0 +1,168 @@
/*!
* Copyright (c) 2015 by Contributors
* \file libsvm_parser.h
* \brief iterator parser to parse libsvm format
* \author Tianqi Chen
*/
#ifndef XGBOOST_IO_LIBSVM_PARSER_H_
#define XGBOOST_IO_LIBSVM_PARSER_H_
#include <vector>
#include <cstring>
#include <cctype>
#include "../utils/omp.h"
#include "../utils/utils.h"
#include "../sync/sync.h"
#include "./sparse_batch_page.h"
namespace xgboost {
namespace io {
/*! \brief page returned by libsvm parser */
struct LibSVMPage : public SparsePage {
std::vector<float> label;
// overload clear
inline void Clear() {
SparsePage::Clear();
label.clear();
}
};
/*!
* \brief libsvm parser that parses the input lines
* and returns rows in input data
*/
class LibSVMParser : public utils::IIterator<LibSVMPage> {
public:
explicit LibSVMParser(dmlc::InputSplit *source,
int nthread)
: bytes_read_(0), at_head_(true),
data_ptr_(0), data_end_(0), source_(source) {
int maxthread;
#pragma omp parallel
{
maxthread = omp_get_num_threads();
}
maxthread = std::max(maxthread / 2, 1);
nthread_ = std::min(maxthread, nthread);
}
virtual ~LibSVMParser() {
delete source_;
}
virtual void BeforeFirst(void) {
utils::Assert(at_head_, "cannot call BeforeFirst");
}
virtual const LibSVMPage &Value(void) const {
return data_[data_ptr_ - 1];
}
virtual bool Next(void) {
while (true) {
while (data_ptr_ < data_end_) {
data_ptr_ += 1;
if (data_[data_ptr_ - 1].Size() != 0) {
return true;
}
}
if (!FillData()) break;
data_ptr_ = 0; data_end_ = data_.size();
}
return false;
}
inline size_t bytes_read(void) const {
return bytes_read_;
}
protected:
inline bool FillData() {
dmlc::InputSplit::Blob chunk;
if (!source_->NextChunk(&chunk)) return false;
int nthread;
#pragma omp parallel num_threads(nthread_)
{
nthread = omp_get_num_threads();
}
// reserve space for data
data_.resize(nthread);
bytes_read_ += chunk.size;
utils::Assert(chunk.size != 0, "LibSVMParser.FileData");
char *head = reinterpret_cast<char*>(chunk.dptr);
#pragma omp parallel num_threads(nthread_)
{
// threadid
int tid = omp_get_thread_num();
size_t nstep = (chunk.size + nthread - 1) / nthread;
size_t sbegin = std::min(tid * nstep, chunk.size);
size_t send = std::min((tid + 1) * nstep, chunk.size);
char *pbegin = BackFindEndLine(head + sbegin, head);
char *pend;
if (tid + 1 == nthread) {
pend = head + send;
} else {
pend = BackFindEndLine(head + send, head);
}
ParseBlock(pbegin, pend, &data_[tid]);
}
data_ptr_ = 0;
return true;
}
/*!
* \brief parse data into out
* \param begin beginning of buffer
* \param end end of buffer
*/
inline void ParseBlock(char *begin,
char *end,
LibSVMPage *out) {
out->Clear();
char *p = begin;
while (p != end) {
while (isspace(*p) && p != end) ++p;
if (p == end) break;
char *head = p;
while (isdigit(*p) && p != end) ++p;
if (*p == ':') {
out->data.push_back(SparseBatch::Entry(atol(head),
atof(p + 1)));
} else {
if (out->label.size() != 0) {
out->offset.push_back(out->data.size());
}
out->label.push_back(atof(head));
}
while (!isspace(*p) && p != end) ++p;
}
if (out->label.size() != 0) {
out->offset.push_back(out->data.size());
}
utils::Check(out->label.size() + 1 == out->offset.size(),
"LibSVMParser inconsistent");
}
/*!
* \brief start from bptr, go backward and find first endof line
* \param bptr end position to go backward
* \param begin the beginning position of buffer
* \return position of first endof line going backward
*/
inline char* BackFindEndLine(char *bptr,
char *begin) {
for (; bptr != begin; --bptr) {
if (*bptr == '\n' || *bptr == '\r') return bptr;
}
return begin;
}
private:
// nthread
int nthread_;
// number of bytes readed
size_t bytes_read_;
// at beginning, at end of stream
bool at_head_;
// pointer to begin and end of data
size_t data_ptr_, data_end_;
// source split that provides the data
dmlc::InputSplit *source_;
// internal data
std::vector<LibSVMPage> data_;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_LIBSVM_PARSER_H_

View File

@ -12,6 +12,7 @@
#include "./simple_fmatrix-inl.hpp"
#include "./sparse_batch_page.h"
#include "./page_fmatrix-inl.hpp"
#include "./libsvm_parser.h"
namespace xgboost {
namespace io {
@ -145,26 +146,25 @@ class DMatrixPageBase : public DataMatrix {
std::string fname_row = std::string(cache_file) + ".row.blob";
utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb"));
SparsePage page;
dmlc::InputSplit *in =
dmlc::InputSplit::Create(uri, rank, npart);
std::string line;
size_t bytes_write = 0;
double tstart = rabit::utils::GetTime();
LibSVMParser parser(
dmlc::InputSplit::Create(uri, rank, npart, "text"), 4);
info.Clear();
while (in->ReadRecord(&line)) {
float label;
std::istringstream ss(line);
std::vector<RowBatch::Entry> feats;
ss >> label;
while (!ss.eof()) {
RowBatch::Entry e;
if (!(ss >> e.index)) break;
ss.ignore(32, ':');
if (!(ss >> e.fvalue)) break;
feats.push_back(e);
while (parser.Next()) {
const LibSVMPage &batch = parser.Value();
size_t nlabel = info.labels.size();
info.labels.resize(nlabel + batch.label.size());
if (batch.label.size() != 0) {
std::memcpy(BeginPtr(info.labels) + nlabel,
BeginPtr(batch.label),
batch.label.size() * sizeof(float));
}
page.Push(batch);
for (size_t i = 0; i < batch.data.size(); ++i) {
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(batch.data[i].index+1));
}
RowBatch::Inst row(BeginPtr(feats), feats.size());
page.Push(row);
if (page.MemCostBytes() >= kPageSize) {
bytes_write += page.MemCostBytes();
page.Save(&fo);
@ -176,18 +176,11 @@ class DMatrixPageBase : public DataMatrix {
(bytes_write >> 20UL));
}
}
for (size_t i = 0; i < feats.size(); ++i) {
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(feats[i].index+1));
}
this->info.labels.push_back(label);
info.info.num_row += 1;
info.info.num_row += batch.label.size();
}
if (page.data.size() != 0) {
page.Save(&fo);
}
delete in;
fo.Close();
iter_->Load(utils::FileStream(utils::FopenCheck(fname_row.c_str(), "rb")));
// save data matrix

View File

@ -19,6 +19,7 @@
#include "./io.h"
#include "./simple_fmatrix-inl.hpp"
#include "../sync/sync.h"
#include "./libsvm_parser.h"
namespace xgboost {
namespace io {
@ -72,7 +73,8 @@ class DMatrixSimple : public DataMatrix {
inline size_t AddRow(const std::vector<RowBatch::Entry> &feats) {
for (size_t i = 0; i < feats.size(); ++i) {
row_data_.push_back(feats[i]);
info.info.num_col = std::max(info.info.num_col, static_cast<size_t>(feats[i].index+1));
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(feats[i].index+1));
}
row_ptr_.push_back(row_ptr_.back() + feats.size());
info.info.num_row += 1;
@ -90,26 +92,35 @@ class DMatrixSimple : public DataMatrix {
rank = rabit::GetRank();
npart = rabit::GetWorldSize();
}
dmlc::InputSplit *in =
dmlc::InputSplit::Create(uri, rank, npart);
LibSVMParser parser(
dmlc::InputSplit::Create(uri, rank, npart, "text"), 4);
this->Clear();
std::string line;
while (in->ReadRecord(&line)) {
float label;
std::istringstream ss(line);
std::vector<RowBatch::Entry> feats;
ss >> label;
while (!ss.eof()) {
RowBatch::Entry e;
if (!(ss >> e.index)) break;
ss.ignore(32, ':');
if (!(ss >> e.fvalue)) break;
feats.push_back(e);
while (parser.Next()) {
const LibSVMPage &batch = parser.Value();
size_t nlabel = info.labels.size();
info.labels.resize(nlabel + batch.label.size());
if (batch.label.size() != 0) {
std::memcpy(BeginPtr(info.labels) + nlabel,
BeginPtr(batch.label),
batch.label.size() * sizeof(float));
}
info.labels.push_back(label);
this->AddRow(feats);
size_t ndata = row_data_.size();
row_data_.resize(ndata + batch.data.size());
if (batch.data.size() != 0) {
std::memcpy(BeginPtr(row_data_) + ndata,
BeginPtr(batch.data),
batch.data.size() * sizeof(RowBatch::Entry));
}
row_ptr_.resize(row_ptr_.size() + batch.label.size());
for (size_t i = 0; i < batch.label.size(); ++i) {
row_ptr_[nlabel + i + 1] = row_ptr_[nlabel] + batch.offset[i + 1];
}
info.info.num_row += batch.Size();
for (size_t i = 0; i < batch.data.size(); ++i) {
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(batch.data[i].index+1));
}
}
delete in;
if (!silent) {
utils::Printf("%lux%lu matrix with %lu entries is loaded from %s\n",
static_cast<unsigned long>(info.num_row()),

View File

@ -150,7 +150,23 @@ class SparsePage {
size_t begin = offset.size();
offset.resize(offset.size() + batch.size);
for (size_t i = 0; i < batch.size; ++i) {
offset[i + begin] = top + batch.ind_ptr[i] - batch.ind_ptr[0];
offset[i + begin] = top + batch.ind_ptr[i + 1] - batch.ind_ptr[0];
}
}
/*!
* \brief Push a sparse page
* \param batch the row page
*/
inline void Push(const SparsePage &batch) {
data.resize(offset.back() + batch.Size());
std::memcpy(BeginPtr(data) + offset.back(),
BeginPtr(batch.data),
sizeof(SparseBatch::Entry) * batch.data.size());
size_t top = offset.back();
size_t begin = offset.size();
offset.resize(offset.size() + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {
offset[i + begin] = top + batch.offset[i + 1];
}
}
/*!

View File

@ -174,5 +174,13 @@ inline const T *BeginPtr(const std::vector<T> &vec) {
return &vec[0];
}
}
inline char* BeginPtr(std::string &str) {
if (str.length() == 0) return NULL;
return &str[0];
}
inline const char* BeginPtr(const std::string &str) {
if (str.length() == 0) return NULL;
return &str[0];
}
} // namespace xgboost
#endif // XGBOOST_UTILS_UTILS_H_

View File

@ -11,7 +11,6 @@
#include <istream>
#include <ostream>
#include <streambuf>
#include <cassert>
/*! \brief namespace for dmlc */
namespace dmlc {
@ -100,32 +99,71 @@ class Serializable {
};
/*!
* \brief input split header, used to create input split on input dataset
* this class can be used to obtain filesystem invariant splits from input files
* \brief input split creates that allows reading
* of records from split of data,
* independent part that covers all the dataset
*
* see InputSplit::Create for definition of record
*/
class InputSplit {
public:
/*! \brief a blob of memory region */
struct Blob {
/*! \brief points to start of the memory region */
void *dptr;
/*! \brief size of the memory region */
size_t size;
};
/*!
* \brief read next record, store into out_data
* the data in outcomming record depends on the input data format
* if input is text data, each line is returned as a record (\n not included)
* if input is recordio, each record is returned
* \param out_data the string that stores the line data, \n is not included
* \return true of next line was found, false if we read all the lines
* \brief get the next record, the returning value
* is valid until next call to NextRecord or NextChunk
* caller can modify the memory content of out_rec
* \param out_rec used to store the result
* \return true if we can successfully get next record
* false if we reached end of split
* \sa InputSplit::Create for definition of record
*/
virtual bool ReadRecord(std::string *out_data) = 0;
virtual bool NextRecord(Blob *out_rec) = 0;
/*!
* \brief get a chunk of memory that can contain multiple records,
* the caller needs to parse the content of the resulting chunk,
* for text file, out_chunk can contain data of multiple lines
* for recordio, out_chunk can contain data of multiple records
*
* This function ensures there won't be partial record in the chunk
* caller can modify the memory content of out_chunk,
* the memory is valid until next call to NextRecord or NextChunk
*
* Usually NextRecord is sufficient, NextChunk can be used by some
* multi-threaded parsers to parse the input content
*
* \param out_chunk used to store the result
* \return true if we can successfully get next record
* false if we reached end of split
* \sa InputSplit::Create for definition of record
*/
virtual bool NextChunk(Blob *out_chunk) = 0;
/*! \brief destructor*/
virtual ~InputSplit(void) {}
virtual ~InputSplit(void) {}
/*!
* \brief factory function:
* create input split given a uri
* \param uri the uri of the input, can contain hdfs prefix
* \param part_index the part id of current input
* \param num_parts total number of splits
* \param type type of record
* List of possible types: "text", "recordio"
* - "text":
* text file, each line is treated as a record
* input split will split on \n or \r
* - "recordio":
* binary recordio file, see recordio.h
* \sa InputSplit::Type
*/
static InputSplit* Create(const char *uri,
unsigned part_index,
unsigned num_parts);
unsigned num_parts,
const char *type);
};
/*!
@ -172,7 +210,7 @@ class ostream : public std::basic_ostream<char> {
public:
explicit OutBuf(size_t buffer_size)
: stream_(NULL), buffer_(buffer_size) {
assert(buffer_.size() > 0);
if (buffer_size == 0) buffer_.resize(2);
}
// set stream to the buffer
inline void set_stream(Stream *stream);
@ -225,22 +263,32 @@ class istream : public std::basic_istream<char> {
buf_.set_stream(stream);
this->rdbuf(&buf_);
}
/*! \return how many bytes we read so far */
inline size_t bytes_read(void) const {
return buf_.bytes_read();
}
private:
// internal streambuf
class InBuf : public std::streambuf {
public:
explicit InBuf(size_t buffer_size)
: stream_(NULL), buffer_(buffer_size) {
assert(buffer_.size() > 0);
: stream_(NULL), bytes_read_(0),
buffer_(buffer_size) {
if (buffer_size == 0) buffer_.resize(2);
}
// set stream to the buffer
inline void set_stream(Stream *stream);
// return how many bytes read so far
inline size_t bytes_read(void) const {
return bytes_read_;
}
private:
/*! \brief internal stream by StreamBuf */
Stream *stream_;
/*! \brief internal buffer */
/*! \brief how many bytes we read so far */
size_t bytes_read_;
/*! \brief internal buffer */
std::vector<char> buffer_;
// override underflow
inline int_type underflow();
@ -322,6 +370,7 @@ inline int istream::InBuf::underflow() {
if (this->gptr() == this->egptr()) {
size_t sz = stream_->Read(bhead, buffer_.size());
this->setg(bhead, bhead, bhead + sz);
bytes_read_ += sz;
}
if (this->gptr() == this->egptr()) {
return traits_type::eof();