faster libsvm parser

This commit is contained in:
tqchen 2015-04-17 22:07:59 -07:00
parent c528c1e8e6
commit 788785f164
6 changed files with 348 additions and 62 deletions

View File

@ -8,45 +8,132 @@
namespace xgboost {
namespace utils {
/*!
* \brief line split implementation from single FILE
* simply returns lines of files, used for stdin
*/
class SingleFileSplit : public dmlc::InputSplit {
public:
explicit SingleFileSplit(const char *fname)
: use_stdin_(false) {
explicit SingleFileSplit(const char *fname)
: use_stdin_(false),
chunk_begin_(NULL), chunk_end_(NULL) {
if (!std::strcmp(fname, "stdin")) {
#ifndef XGBOOST_STRICT_CXX98_
use_stdin_ = true; fp_ = stdin;
#endif
}
if (!use_stdin_) {
fp_ = utils::FopenCheck(fname, "r");
fp_ = utils::FopenCheck(fname, "rb");
}
end_of_file_ = false;
buffer_.resize(kBufferSize);
}
virtual ~SingleFileSplit(void) {
if (!use_stdin_) std::fclose(fp_);
}
virtual bool ReadRecord(std::string *out_data) {
if (end_of_file_) return false;
out_data->clear();
while (true) {
char c = std::fgetc(fp_);
if (c == EOF) {
end_of_file_ = true;
virtual size_t Read(void *ptr, size_t size) {
return std::fread(ptr, 1, size, fp_);
}
virtual void Write(const void *ptr, size_t size) {
utils::Error("cannot do write in inputsplit");
}
virtual bool NextRecord(Blob *out_rec) {
if (chunk_begin_ == chunk_end_) {
if (!LoadChunk()) return false;
}
char *next = FindNextRecord(chunk_begin_,
chunk_end_);
out_rec->dptr = chunk_begin_;
out_rec->size = next - chunk_begin_;
chunk_begin_ = next;
return true;
}
virtual bool NextChunk(Blob *out_chunk) {
if (chunk_begin_ == chunk_end_) {
if (!LoadChunk()) return false;
}
out_chunk->dptr = chunk_begin_;
out_chunk->size = chunk_end_ - chunk_begin_;
chunk_begin_ = chunk_end_;
return true;
}
inline bool ReadChunk(void *buf, size_t *size) {
size_t max_size = *size;
if (max_size <= overflow_.length()) {
*size = 0; return true;
}
if (overflow_.length() != 0) {
std::memcpy(buf, BeginPtr(overflow_), overflow_.length());
}
size_t olen = overflow_.length();
overflow_.resize(0);
size_t nread = this->Read(reinterpret_cast<char*>(buf) + olen,
max_size - olen);
nread += olen;
if (nread == 0) return false;
if (nread != max_size) {
*size = nread;
return true;
} else {
const char *bptr = reinterpret_cast<const char*>(buf);
// return the last position where a record starts
const char *bend = this->FindLastRecordBegin(bptr, bptr + max_size);
*size = bend - bptr;
overflow_.resize(max_size - *size);
if (overflow_.length() != 0) {
std::memcpy(BeginPtr(overflow_), bend, overflow_.length());
}
if (c != '\r' && c != '\n' && c != EOF) {
*out_data += c;
return true;
}
}
protected:
inline const char* FindLastRecordBegin(const char *begin,
const char *end) {
if (begin == end) return begin;
for (const char *p = end - 1; p != begin; --p) {
if (*p == '\n' || *p == '\r') return p + 1;
}
return begin;
}
inline char* FindNextRecord(char *begin, char *end) {
char *p;
for (p = begin; p != end; ++p) {
if (*p == '\n' || *p == '\r') break;
}
for (; p != end; ++p) {
if (*p != '\n' && *p != '\r') return p;
}
return end;
}
inline bool LoadChunk(void) {
while (true) {
size_t size = buffer_.length();
if (!ReadChunk(BeginPtr(buffer_), &size)) return false;
if (size == 0) {
buffer_.resize(buffer_.length() * 2);
} else {
if (out_data->length() != 0) return true;
if (end_of_file_) return false;
chunk_begin_ = reinterpret_cast<char *>(BeginPtr(buffer_));
chunk_end_ = chunk_begin_ + size;
break;
}
}
return false;
}
return true;
}
private:
// buffer size
static const size_t kBufferSize = 1 << 18UL;
// file
std::FILE *fp_;
bool use_stdin_;
bool end_of_file_;
// internal overflow
std::string overflow_;
// internal buffer
std::string buffer_;
// beginning of chunk
char *chunk_begin_;
// end of chunk
char *chunk_end_;
};
class StdFile : public dmlc::Stream {
@ -105,7 +192,8 @@ class StdFile : public dmlc::Stream {
namespace dmlc {
InputSplit* InputSplit::Create(const char *uri,
unsigned part,
unsigned nsplit) {
unsigned nsplit,
const char *type) {
using namespace xgboost;
const char *msg = "xgboost is compiled in local mode\n"\
"to use hdfs, s3 or distributed version, compile with make dmlc=1";

168
src/io/libsvm_parser.h Normal file
View File

@ -0,0 +1,168 @@
/*!
* Copyright (c) 2015 by Contributors
* \file libsvm_parser.h
* \brief iterator parser to parse libsvm format
* \author Tianqi Chen
*/
#ifndef XGBOOST_IO_LIBSVM_PARSER_H_
#define XGBOOST_IO_LIBSVM_PARSER_H_
#include <vector>
#include <cstring>
#include <cctype>
#include "../utils/omp.h"
#include "../utils/utils.h"
#include "../sync/sync.h"
#include "./sparse_batch_page.h"
namespace xgboost {
namespace io {
/*! \brief page returned by libsvm parser */
struct LibSVMPage : public SparsePage {
std::vector<float> label;
// overload clear
inline void Clear() {
SparsePage::Clear();
label.clear();
}
};
/*!
* \brief libsvm parser that parses the input lines
* and returns rows in input data
*/
class LibSVMParser : public utils::IIterator<LibSVMPage> {
public:
explicit LibSVMParser(dmlc::InputSplit *source,
int nthread)
: bytes_read_(0), at_head_(true),
data_ptr_(0), data_end_(0), source_(source) {
int maxthread;
#pragma omp parallel
{
maxthread = omp_get_num_threads();
}
maxthread = std::max(maxthread / 2, 1);
nthread_ = std::min(maxthread, nthread);
}
virtual ~LibSVMParser() {
delete source_;
}
virtual void BeforeFirst(void) {
utils::Assert(at_head_, "cannot call BeforeFirst");
}
virtual const LibSVMPage &Value(void) const {
return data_[data_ptr_ - 1];
}
virtual bool Next(void) {
while (true) {
while (data_ptr_ < data_end_) {
data_ptr_ += 1;
if (data_[data_ptr_ - 1].Size() != 0) {
return true;
}
}
if (!FillData()) break;
data_ptr_ = 0; data_end_ = data_.size();
}
return false;
}
inline size_t bytes_read(void) const {
return bytes_read_;
}
protected:
inline bool FillData() {
dmlc::InputSplit::Blob chunk;
if (!source_->NextChunk(&chunk)) return false;
int nthread;
#pragma omp parallel num_threads(nthread_)
{
nthread = omp_get_num_threads();
}
// reserve space for data
data_.resize(nthread);
bytes_read_ += chunk.size;
utils::Assert(chunk.size != 0, "LibSVMParser.FileData");
char *head = reinterpret_cast<char*>(chunk.dptr);
#pragma omp parallel num_threads(nthread_)
{
// threadid
int tid = omp_get_thread_num();
size_t nstep = (chunk.size + nthread - 1) / nthread;
size_t sbegin = std::min(tid * nstep, chunk.size);
size_t send = std::min((tid + 1) * nstep, chunk.size);
char *pbegin = BackFindEndLine(head + sbegin, head);
char *pend;
if (tid + 1 == nthread) {
pend = head + send;
} else {
pend = BackFindEndLine(head + send, head);
}
ParseBlock(pbegin, pend, &data_[tid]);
}
data_ptr_ = 0;
return true;
}
/*!
* \brief parse data into out
* \param begin beginning of buffer
* \param end end of buffer
*/
inline void ParseBlock(char *begin,
char *end,
LibSVMPage *out) {
out->Clear();
char *p = begin;
while (p != end) {
while (isspace(*p) && p != end) ++p;
if (p == end) break;
char *head = p;
while (isdigit(*p) && p != end) ++p;
if (*p == ':') {
out->data.push_back(SparseBatch::Entry(atol(head),
atof(p + 1)));
} else {
if (out->label.size() != 0) {
out->offset.push_back(out->data.size());
}
out->label.push_back(atof(head));
}
while (!isspace(*p) && p != end) ++p;
}
if (out->label.size() != 0) {
out->offset.push_back(out->data.size());
}
utils::Check(out->label.size() + 1 == out->offset.size(),
"LibSVMParser inconsistent");
}
/*!
* \brief start from bptr, go backward and find first endof line
* \param bptr end position to go backward
* \param begin the beginning position of buffer
* \return position of first endof line going backward
*/
inline char* BackFindEndLine(char *bptr,
char *begin) {
for (; bptr != begin; --bptr) {
if (*bptr == '\n' || *bptr == '\r') return bptr;
}
return begin;
}
private:
// nthread
int nthread_;
// number of bytes readed
size_t bytes_read_;
// at beginning, at end of stream
bool at_head_;
// pointer to begin and end of data
size_t data_ptr_, data_end_;
// source split that provides the data
dmlc::InputSplit *source_;
// internal data
std::vector<LibSVMPage> data_;
};
} // namespace io
} // namespace xgboost
#endif // XGBOOST_IO_LIBSVM_PARSER_H_

View File

@ -12,6 +12,7 @@
#include "./simple_fmatrix-inl.hpp"
#include "./sparse_batch_page.h"
#include "./page_fmatrix-inl.hpp"
#include "./libsvm_parser.h"
namespace xgboost {
namespace io {
@ -143,38 +144,32 @@ class DMatrixPageBase : public DataMatrix {
std::string fname_row = std::string(cache_file) + ".row.blob";
utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb"));
SparsePage page;
dmlc::InputSplit *in =
dmlc::InputSplit::Create(uri, rank, npart);
std::string line;
LibSVMParser parser(
dmlc::InputSplit::Create(uri, rank, npart, "text"), 4);
info.Clear();
while (in->ReadRecord(&line)) {
float label;
std::istringstream ss(line);
std::vector<RowBatch::Entry> feats;
ss >> label;
while (!ss.eof()) {
RowBatch::Entry e;
if (!(ss >> e.index)) break;
ss.ignore(32, ':');
if (!(ss >> e.fvalue)) break;
feats.push_back(e);
while (parser.Next()) {
const LibSVMPage &batch = parser.Value();
size_t nlabel = info.labels.size();
info.labels.resize(nlabel + batch.label.size());
if (batch.label.size() != 0) {
std::memcpy(BeginPtr(info.labels) + nlabel,
BeginPtr(batch.label),
batch.label.size() * sizeof(float));
}
page.Push(batch);
for (size_t i = 0; i < batch.data.size(); ++i) {
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(batch.data[i].index+1));
}
RowBatch::Inst row(BeginPtr(feats), feats.size());
page.Push(row);
if (page.MemCostBytes() >= kPageSize) {
page.Save(&fo); page.Clear();
}
for (size_t i = 0; i < feats.size(); ++i) {
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(feats[i].index+1));
}
this->info.labels.push_back(label);
info.info.num_row += 1;
info.info.num_row += batch.label.size();
}
if (page.data.size() != 0) {
page.Save(&fo);
}
delete in;
fo.Close();
iter_->Load(utils::FileStream(utils::FopenCheck(fname_row.c_str(), "rb")));
// save data matrix

View File

@ -19,6 +19,7 @@
#include "./io.h"
#include "./simple_fmatrix-inl.hpp"
#include "../sync/sync.h"
#include "./libsvm_parser.h"
namespace xgboost {
namespace io {
@ -72,7 +73,8 @@ class DMatrixSimple : public DataMatrix {
inline size_t AddRow(const std::vector<RowBatch::Entry> &feats) {
for (size_t i = 0; i < feats.size(); ++i) {
row_data_.push_back(feats[i]);
info.info.num_col = std::max(info.info.num_col, static_cast<size_t>(feats[i].index+1));
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(feats[i].index+1));
}
row_ptr_.push_back(row_ptr_.back() + feats.size());
info.info.num_row += 1;
@ -90,26 +92,35 @@ class DMatrixSimple : public DataMatrix {
rank = rabit::GetRank();
npart = rabit::GetWorldSize();
}
dmlc::InputSplit *in =
dmlc::InputSplit::Create(uri, rank, npart);
LibSVMParser parser(
dmlc::InputSplit::Create(uri, rank, npart, "text"), 4);
this->Clear();
std::string line;
while (in->ReadRecord(&line)) {
float label;
std::istringstream ss(line);
std::vector<RowBatch::Entry> feats;
ss >> label;
while (!ss.eof()) {
RowBatch::Entry e;
if (!(ss >> e.index)) break;
ss.ignore(32, ':');
if (!(ss >> e.fvalue)) break;
feats.push_back(e);
while (parser.Next()) {
const LibSVMPage &batch = parser.Value();
size_t nlabel = info.labels.size();
info.labels.resize(nlabel + batch.label.size());
if (batch.label.size() != 0) {
std::memcpy(BeginPtr(info.labels) + nlabel,
BeginPtr(batch.label),
batch.label.size() * sizeof(float));
}
info.labels.push_back(label);
this->AddRow(feats);
size_t ndata = row_data_.size();
row_data_.resize(ndata + batch.data.size());
if (batch.data.size() != 0) {
std::memcpy(BeginPtr(row_data_) + ndata,
BeginPtr(batch.data),
batch.data.size() * sizeof(RowBatch::Entry));
}
row_ptr_.resize(row_ptr_.size() + batch.label.size());
for (size_t i = 0; i < batch.label.size(); ++i) {
row_ptr_[nlabel + i + 1] = row_ptr_[nlabel] + batch.offset[i + 1];
}
info.info.num_row += batch.Size();
for (size_t i = 0; i < batch.data.size(); ++i) {
info.info.num_col = std::max(info.info.num_col,
static_cast<size_t>(batch.data[i].index+1));
}
}
delete in;
if (!silent) {
utils::Printf("%lux%lu matrix with %lu entries is loaded from %s\n",
static_cast<unsigned long>(info.num_row()),

View File

@ -150,7 +150,23 @@ class SparsePage {
size_t begin = offset.size();
offset.resize(offset.size() + batch.size);
for (size_t i = 0; i < batch.size; ++i) {
offset[i + begin] = top + batch.ind_ptr[i] - batch.ind_ptr[0];
offset[i + begin] = top + batch.ind_ptr[i + 1] - batch.ind_ptr[0];
}
}
/*!
* \brief Push a sparse page
* \param batch the row page
*/
inline void Push(const SparsePage &batch) {
data.resize(offset.back() + batch.Size());
std::memcpy(BeginPtr(data) + offset.back(),
BeginPtr(batch.data),
sizeof(SparseBatch::Entry) * batch.data.size());
size_t top = offset.back();
size_t begin = offset.size();
offset.resize(offset.size() + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {
offset[i + begin] = top + batch.offset[i + 1];
}
}
/*!

View File

@ -174,5 +174,13 @@ inline const T *BeginPtr(const std::vector<T> &vec) {
return &vec[0];
}
}
inline char* BeginPtr(std::string &str) {
if (str.length() == 0) return NULL;
return &str[0];
}
inline const char* BeginPtr(const std::string &str) {
if (str.length() == 0) return NULL;
return &str[0];
}
} // namespace xgboost
#endif // XGBOOST_UTILS_UTILS_H_