From 6bc5d6f0b44b957cc9f0d0b1fe5d420b0b59b8e2 Mon Sep 17 00:00:00 2001 From: tqchen Date: Fri, 17 Apr 2015 21:07:33 -0700 Subject: [PATCH 1/2] Squashed 'subtree/rabit/' changes from 3bf8661..7568f75 7568f75 new io interface git-subtree-dir: subtree/rabit git-subtree-split: 7568f75f450232b81536914cb4f083c92b5752ec --- include/dmlc/io.h | 85 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 18 deletions(-) diff --git a/include/dmlc/io.h b/include/dmlc/io.h index bd9e127a0..4feeb30d2 100644 --- a/include/dmlc/io.h +++ b/include/dmlc/io.h @@ -11,7 +11,6 @@ #include #include #include -#include /*! \brief namespace for dmlc */ namespace dmlc { @@ -100,32 +99,71 @@ class Serializable { }; /*! - * \brief input split header, used to create input split on input dataset - * this class can be used to obtain filesystem invariant splits from input files + * \brief input split creates that allows reading + * of records from split of data, + * independent part that covers all the dataset + * + * see InputSplit::Create for definition of record */ class InputSplit { public: + /*! \brief a blob of memory region */ + struct Blob { + /*! \brief points to start of the memory region */ + void *dptr; + /*! \brief size of the memory region */ + size_t size; + }; /*! - * \brief read next record, store into out_data - * the data in outcomming record depends on the input data format - * if input is text data, each line is returned as a record (\n not included) - * if input is recordio, each record is returned - * \param out_data the string that stores the line data, \n is not included - * \return true of next line was found, false if we read all the lines + * \brief get the next record, the returning value + * is valid until next call to NextRecord or NextChunk + * caller can modify the memory content of out_rec + * \param out_rec used to store the result + * \return true if we can successfully get next record + * false if we reached end of split + * \sa InputSplit::Create for definition of record */ - virtual bool ReadRecord(std::string *out_data) = 0; + virtual bool NextRecord(Blob *out_rec) = 0; + /*! + * \brief get a chunk of memory that can contain multiple records, + * the caller needs to parse the content of the resulting chunk, + * for text file, out_chunk can contain data of multiple lines + * for recordio, out_chunk can contain data of multiple records + * + * This function ensures there won't be partial record in the chunk + * caller can modify the memory content of out_chunk, + * the memory is valid until next call to NextRecord or NextChunk + * + * Usually NextRecord is sufficient, NextChunk can be used by some + * multi-threaded parsers to parse the input content + * + * \param out_chunk used to store the result + * \return true if we can successfully get next record + * false if we reached end of split + * \sa InputSplit::Create for definition of record + */ + virtual bool NextChunk(Blob *out_chunk) = 0; /*! \brief destructor*/ - virtual ~InputSplit(void) {} + virtual ~InputSplit(void) {} /*! * \brief factory function: * create input split given a uri * \param uri the uri of the input, can contain hdfs prefix * \param part_index the part id of current input * \param num_parts total number of splits + * \param type type of record + * List of possible types: "text", "recordio" + * - "text": + * text file, each line is treated as a record + * input split will split on \n or \r + * - "recordio": + * binary recordio file, see recordio.h + * \sa InputSplit::Type */ static InputSplit* Create(const char *uri, unsigned part_index, - unsigned num_parts); + unsigned num_parts, + const char *type); }; /*! @@ -172,7 +210,7 @@ class ostream : public std::basic_ostream { public: explicit OutBuf(size_t buffer_size) : stream_(NULL), buffer_(buffer_size) { - assert(buffer_.size() > 0); + if (buffer_size == 0) buffer_.resize(2); } // set stream to the buffer inline void set_stream(Stream *stream); @@ -225,22 +263,32 @@ class istream : public std::basic_istream { buf_.set_stream(stream); this->rdbuf(&buf_); } - + /*! \return how many bytes we read so far */ + inline size_t bytes_read(void) const { + return buf_.bytes_read(); + } + private: // internal streambuf class InBuf : public std::streambuf { public: explicit InBuf(size_t buffer_size) - : stream_(NULL), buffer_(buffer_size) { - assert(buffer_.size() > 0); + : stream_(NULL), bytes_read_(0), + buffer_(buffer_size) { + if (buffer_size == 0) buffer_.resize(2); } // set stream to the buffer inline void set_stream(Stream *stream); - + // return how many bytes read so far + inline size_t bytes_read(void) const { + return bytes_read_; + } private: /*! \brief internal stream by StreamBuf */ Stream *stream_; - /*! \brief internal buffer */ + /*! \brief how many bytes we read so far */ + size_t bytes_read_; + /*! \brief internal buffer */ std::vector buffer_; // override underflow inline int_type underflow(); @@ -322,6 +370,7 @@ inline int istream::InBuf::underflow() { if (this->gptr() == this->egptr()) { size_t sz = stream_->Read(bhead, buffer_.size()); this->setg(bhead, bhead, bhead + sz); + bytes_read_ += sz; } if (this->gptr() == this->egptr()) { return traits_type::eof(); From 788785f164a3e465a44e5d33021031d2773cec7c Mon Sep 17 00:00:00 2001 From: tqchen Date: Fri, 17 Apr 2015 22:07:59 -0700 Subject: [PATCH 2/2] faster libsvm parser --- src/io/dmlc_simple.cpp | 128 ++++++++++++++++++++++---- src/io/libsvm_parser.h | 168 ++++++++++++++++++++++++++++++++++ src/io/page_dmatrix-inl.hpp | 41 ++++----- src/io/simple_dmatrix-inl.hpp | 47 ++++++---- src/io/sparse_batch_page.h | 18 +++- src/utils/utils.h | 8 ++ 6 files changed, 348 insertions(+), 62 deletions(-) create mode 100644 src/io/libsvm_parser.h diff --git a/src/io/dmlc_simple.cpp b/src/io/dmlc_simple.cpp index 4296d5caa..3138dae94 100644 --- a/src/io/dmlc_simple.cpp +++ b/src/io/dmlc_simple.cpp @@ -8,45 +8,132 @@ namespace xgboost { namespace utils { +/*! + * \brief line split implementation from single FILE + * simply returns lines of files, used for stdin + */ class SingleFileSplit : public dmlc::InputSplit { public: - explicit SingleFileSplit(const char *fname) - : use_stdin_(false) { + explicit SingleFileSplit(const char *fname) + : use_stdin_(false), + chunk_begin_(NULL), chunk_end_(NULL) { if (!std::strcmp(fname, "stdin")) { #ifndef XGBOOST_STRICT_CXX98_ use_stdin_ = true; fp_ = stdin; #endif } if (!use_stdin_) { - fp_ = utils::FopenCheck(fname, "r"); + fp_ = utils::FopenCheck(fname, "rb"); } - end_of_file_ = false; + buffer_.resize(kBufferSize); } virtual ~SingleFileSplit(void) { if (!use_stdin_) std::fclose(fp_); } - virtual bool ReadRecord(std::string *out_data) { - if (end_of_file_) return false; - out_data->clear(); - while (true) { - char c = std::fgetc(fp_); - if (c == EOF) { - end_of_file_ = true; + virtual size_t Read(void *ptr, size_t size) { + return std::fread(ptr, 1, size, fp_); + } + virtual void Write(const void *ptr, size_t size) { + utils::Error("cannot do write in inputsplit"); + } + virtual bool NextRecord(Blob *out_rec) { + if (chunk_begin_ == chunk_end_) { + if (!LoadChunk()) return false; + } + char *next = FindNextRecord(chunk_begin_, + chunk_end_); + out_rec->dptr = chunk_begin_; + out_rec->size = next - chunk_begin_; + chunk_begin_ = next; + return true; + } + virtual bool NextChunk(Blob *out_chunk) { + if (chunk_begin_ == chunk_end_) { + if (!LoadChunk()) return false; + } + out_chunk->dptr = chunk_begin_; + out_chunk->size = chunk_end_ - chunk_begin_; + chunk_begin_ = chunk_end_; + return true; + } + inline bool ReadChunk(void *buf, size_t *size) { + size_t max_size = *size; + if (max_size <= overflow_.length()) { + *size = 0; return true; + } + if (overflow_.length() != 0) { + std::memcpy(buf, BeginPtr(overflow_), overflow_.length()); + } + size_t olen = overflow_.length(); + overflow_.resize(0); + size_t nread = this->Read(reinterpret_cast(buf) + olen, + max_size - olen); + nread += olen; + if (nread == 0) return false; + if (nread != max_size) { + *size = nread; + return true; + } else { + const char *bptr = reinterpret_cast(buf); + // return the last position where a record starts + const char *bend = this->FindLastRecordBegin(bptr, bptr + max_size); + *size = bend - bptr; + overflow_.resize(max_size - *size); + if (overflow_.length() != 0) { + std::memcpy(BeginPtr(overflow_), bend, overflow_.length()); } - if (c != '\r' && c != '\n' && c != EOF) { - *out_data += c; + return true; + } + } + + protected: + inline const char* FindLastRecordBegin(const char *begin, + const char *end) { + if (begin == end) return begin; + for (const char *p = end - 1; p != begin; --p) { + if (*p == '\n' || *p == '\r') return p + 1; + } + return begin; + } + inline char* FindNextRecord(char *begin, char *end) { + char *p; + for (p = begin; p != end; ++p) { + if (*p == '\n' || *p == '\r') break; + } + for (; p != end; ++p) { + if (*p != '\n' && *p != '\r') return p; + } + return end; + } + inline bool LoadChunk(void) { + while (true) { + size_t size = buffer_.length(); + if (!ReadChunk(BeginPtr(buffer_), &size)) return false; + if (size == 0) { + buffer_.resize(buffer_.length() * 2); } else { - if (out_data->length() != 0) return true; - if (end_of_file_) return false; + chunk_begin_ = reinterpret_cast(BeginPtr(buffer_)); + chunk_end_ = chunk_begin_ + size; + break; } } - return false; - } - + return true; + } + private: + // buffer size + static const size_t kBufferSize = 1 << 18UL; + // file std::FILE *fp_; bool use_stdin_; - bool end_of_file_; + // internal overflow + std::string overflow_; + // internal buffer + std::string buffer_; + // beginning of chunk + char *chunk_begin_; + // end of chunk + char *chunk_end_; }; class StdFile : public dmlc::Stream { @@ -105,7 +192,8 @@ class StdFile : public dmlc::Stream { namespace dmlc { InputSplit* InputSplit::Create(const char *uri, unsigned part, - unsigned nsplit) { + unsigned nsplit, + const char *type) { using namespace xgboost; const char *msg = "xgboost is compiled in local mode\n"\ "to use hdfs, s3 or distributed version, compile with make dmlc=1"; diff --git a/src/io/libsvm_parser.h b/src/io/libsvm_parser.h new file mode 100644 index 000000000..6767d61be --- /dev/null +++ b/src/io/libsvm_parser.h @@ -0,0 +1,168 @@ +/*! + * Copyright (c) 2015 by Contributors + * \file libsvm_parser.h + * \brief iterator parser to parse libsvm format + * \author Tianqi Chen + */ +#ifndef XGBOOST_IO_LIBSVM_PARSER_H_ +#define XGBOOST_IO_LIBSVM_PARSER_H_ + +#include +#include +#include +#include "../utils/omp.h" +#include "../utils/utils.h" +#include "../sync/sync.h" +#include "./sparse_batch_page.h" + +namespace xgboost { +namespace io { +/*! \brief page returned by libsvm parser */ +struct LibSVMPage : public SparsePage { + std::vector label; + // overload clear + inline void Clear() { + SparsePage::Clear(); + label.clear(); + } +}; +/*! + * \brief libsvm parser that parses the input lines + * and returns rows in input data + */ +class LibSVMParser : public utils::IIterator { + public: + explicit LibSVMParser(dmlc::InputSplit *source, + int nthread) + : bytes_read_(0), at_head_(true), + data_ptr_(0), data_end_(0), source_(source) { + int maxthread; + #pragma omp parallel + { + maxthread = omp_get_num_threads(); + } + maxthread = std::max(maxthread / 2, 1); + nthread_ = std::min(maxthread, nthread); + } + virtual ~LibSVMParser() { + delete source_; + } + virtual void BeforeFirst(void) { + utils::Assert(at_head_, "cannot call BeforeFirst"); + } + virtual const LibSVMPage &Value(void) const { + return data_[data_ptr_ - 1]; + } + virtual bool Next(void) { + while (true) { + while (data_ptr_ < data_end_) { + data_ptr_ += 1; + if (data_[data_ptr_ - 1].Size() != 0) { + return true; + } + } + if (!FillData()) break; + data_ptr_ = 0; data_end_ = data_.size(); + } + return false; + } + inline size_t bytes_read(void) const { + return bytes_read_; + } + + protected: + inline bool FillData() { + dmlc::InputSplit::Blob chunk; + if (!source_->NextChunk(&chunk)) return false; + int nthread; + #pragma omp parallel num_threads(nthread_) + { + nthread = omp_get_num_threads(); + } + // reserve space for data + data_.resize(nthread); + bytes_read_ += chunk.size; + utils::Assert(chunk.size != 0, "LibSVMParser.FileData"); + char *head = reinterpret_cast(chunk.dptr); + #pragma omp parallel num_threads(nthread_) + { + // threadid + int tid = omp_get_thread_num(); + size_t nstep = (chunk.size + nthread - 1) / nthread; + size_t sbegin = std::min(tid * nstep, chunk.size); + size_t send = std::min((tid + 1) * nstep, chunk.size); + char *pbegin = BackFindEndLine(head + sbegin, head); + char *pend; + if (tid + 1 == nthread) { + pend = head + send; + } else { + pend = BackFindEndLine(head + send, head); + } + ParseBlock(pbegin, pend, &data_[tid]); + } + data_ptr_ = 0; + return true; + } + /*! + * \brief parse data into out + * \param begin beginning of buffer + * \param end end of buffer + */ + inline void ParseBlock(char *begin, + char *end, + LibSVMPage *out) { + out->Clear(); + char *p = begin; + while (p != end) { + while (isspace(*p) && p != end) ++p; + if (p == end) break; + char *head = p; + while (isdigit(*p) && p != end) ++p; + if (*p == ':') { + out->data.push_back(SparseBatch::Entry(atol(head), + atof(p + 1))); + } else { + if (out->label.size() != 0) { + out->offset.push_back(out->data.size()); + } + out->label.push_back(atof(head)); + } + while (!isspace(*p) && p != end) ++p; + } + if (out->label.size() != 0) { + out->offset.push_back(out->data.size()); + } + utils::Check(out->label.size() + 1 == out->offset.size(), + "LibSVMParser inconsistent"); + } + /*! + * \brief start from bptr, go backward and find first endof line + * \param bptr end position to go backward + * \param begin the beginning position of buffer + * \return position of first endof line going backward + */ + inline char* BackFindEndLine(char *bptr, + char *begin) { + for (; bptr != begin; --bptr) { + if (*bptr == '\n' || *bptr == '\r') return bptr; + } + return begin; + } + + private: + // nthread + int nthread_; + // number of bytes readed + size_t bytes_read_; + // at beginning, at end of stream + bool at_head_; + // pointer to begin and end of data + size_t data_ptr_, data_end_; + // source split that provides the data + dmlc::InputSplit *source_; + // internal data + std::vector data_; +}; +} // namespace io +} // namespace xgboost +#endif // XGBOOST_IO_LIBSVM_PARSER_H_ diff --git a/src/io/page_dmatrix-inl.hpp b/src/io/page_dmatrix-inl.hpp index 03f0d5ca8..047d589a1 100644 --- a/src/io/page_dmatrix-inl.hpp +++ b/src/io/page_dmatrix-inl.hpp @@ -12,6 +12,7 @@ #include "./simple_fmatrix-inl.hpp" #include "./sparse_batch_page.h" #include "./page_fmatrix-inl.hpp" +#include "./libsvm_parser.h" namespace xgboost { namespace io { @@ -143,38 +144,32 @@ class DMatrixPageBase : public DataMatrix { std::string fname_row = std::string(cache_file) + ".row.blob"; utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb")); SparsePage page; - dmlc::InputSplit *in = - dmlc::InputSplit::Create(uri, rank, npart); - std::string line; + + LibSVMParser parser( + dmlc::InputSplit::Create(uri, rank, npart, "text"), 4); info.Clear(); - while (in->ReadRecord(&line)) { - float label; - std::istringstream ss(line); - std::vector feats; - ss >> label; - while (!ss.eof()) { - RowBatch::Entry e; - if (!(ss >> e.index)) break; - ss.ignore(32, ':'); - if (!(ss >> e.fvalue)) break; - feats.push_back(e); + while (parser.Next()) { + const LibSVMPage &batch = parser.Value(); + size_t nlabel = info.labels.size(); + info.labels.resize(nlabel + batch.label.size()); + if (batch.label.size() != 0) { + std::memcpy(BeginPtr(info.labels) + nlabel, + BeginPtr(batch.label), + batch.label.size() * sizeof(float)); + } + page.Push(batch); + for (size_t i = 0; i < batch.data.size(); ++i) { + info.info.num_col = std::max(info.info.num_col, + static_cast(batch.data[i].index+1)); } - RowBatch::Inst row(BeginPtr(feats), feats.size()); - page.Push(row); if (page.MemCostBytes() >= kPageSize) { page.Save(&fo); page.Clear(); } - for (size_t i = 0; i < feats.size(); ++i) { - info.info.num_col = std::max(info.info.num_col, - static_cast(feats[i].index+1)); - } - this->info.labels.push_back(label); - info.info.num_row += 1; + info.info.num_row += batch.label.size(); } if (page.data.size() != 0) { page.Save(&fo); } - delete in; fo.Close(); iter_->Load(utils::FileStream(utils::FopenCheck(fname_row.c_str(), "rb"))); // save data matrix diff --git a/src/io/simple_dmatrix-inl.hpp b/src/io/simple_dmatrix-inl.hpp index e0ea7899e..010bf9893 100644 --- a/src/io/simple_dmatrix-inl.hpp +++ b/src/io/simple_dmatrix-inl.hpp @@ -19,6 +19,7 @@ #include "./io.h" #include "./simple_fmatrix-inl.hpp" #include "../sync/sync.h" +#include "./libsvm_parser.h" namespace xgboost { namespace io { @@ -72,7 +73,8 @@ class DMatrixSimple : public DataMatrix { inline size_t AddRow(const std::vector &feats) { for (size_t i = 0; i < feats.size(); ++i) { row_data_.push_back(feats[i]); - info.info.num_col = std::max(info.info.num_col, static_cast(feats[i].index+1)); + info.info.num_col = std::max(info.info.num_col, + static_cast(feats[i].index+1)); } row_ptr_.push_back(row_ptr_.back() + feats.size()); info.info.num_row += 1; @@ -90,26 +92,35 @@ class DMatrixSimple : public DataMatrix { rank = rabit::GetRank(); npart = rabit::GetWorldSize(); } - dmlc::InputSplit *in = - dmlc::InputSplit::Create(uri, rank, npart); + LibSVMParser parser( + dmlc::InputSplit::Create(uri, rank, npart, "text"), 4); this->Clear(); - std::string line; - while (in->ReadRecord(&line)) { - float label; - std::istringstream ss(line); - std::vector feats; - ss >> label; - while (!ss.eof()) { - RowBatch::Entry e; - if (!(ss >> e.index)) break; - ss.ignore(32, ':'); - if (!(ss >> e.fvalue)) break; - feats.push_back(e); + while (parser.Next()) { + const LibSVMPage &batch = parser.Value(); + size_t nlabel = info.labels.size(); + info.labels.resize(nlabel + batch.label.size()); + if (batch.label.size() != 0) { + std::memcpy(BeginPtr(info.labels) + nlabel, + BeginPtr(batch.label), + batch.label.size() * sizeof(float)); } - info.labels.push_back(label); - this->AddRow(feats); + size_t ndata = row_data_.size(); + row_data_.resize(ndata + batch.data.size()); + if (batch.data.size() != 0) { + std::memcpy(BeginPtr(row_data_) + ndata, + BeginPtr(batch.data), + batch.data.size() * sizeof(RowBatch::Entry)); + } + row_ptr_.resize(row_ptr_.size() + batch.label.size()); + for (size_t i = 0; i < batch.label.size(); ++i) { + row_ptr_[nlabel + i + 1] = row_ptr_[nlabel] + batch.offset[i + 1]; + } + info.info.num_row += batch.Size(); + for (size_t i = 0; i < batch.data.size(); ++i) { + info.info.num_col = std::max(info.info.num_col, + static_cast(batch.data[i].index+1)); + } } - delete in; if (!silent) { utils::Printf("%lux%lu matrix with %lu entries is loaded from %s\n", static_cast(info.num_row()), diff --git a/src/io/sparse_batch_page.h b/src/io/sparse_batch_page.h index 756bd1034..61155f380 100644 --- a/src/io/sparse_batch_page.h +++ b/src/io/sparse_batch_page.h @@ -150,7 +150,23 @@ class SparsePage { size_t begin = offset.size(); offset.resize(offset.size() + batch.size); for (size_t i = 0; i < batch.size; ++i) { - offset[i + begin] = top + batch.ind_ptr[i] - batch.ind_ptr[0]; + offset[i + begin] = top + batch.ind_ptr[i + 1] - batch.ind_ptr[0]; + } + } + /*! + * \brief Push a sparse page + * \param batch the row page + */ + inline void Push(const SparsePage &batch) { + data.resize(offset.back() + batch.Size()); + std::memcpy(BeginPtr(data) + offset.back(), + BeginPtr(batch.data), + sizeof(SparseBatch::Entry) * batch.data.size()); + size_t top = offset.back(); + size_t begin = offset.size(); + offset.resize(offset.size() + batch.Size()); + for (size_t i = 0; i < batch.Size(); ++i) { + offset[i + begin] = top + batch.offset[i + 1]; } } /*! diff --git a/src/utils/utils.h b/src/utils/utils.h index afe17f64c..7aee5a29a 100644 --- a/src/utils/utils.h +++ b/src/utils/utils.h @@ -174,5 +174,13 @@ inline const T *BeginPtr(const std::vector &vec) { return &vec[0]; } } +inline char* BeginPtr(std::string &str) { + if (str.length() == 0) return NULL; + return &str[0]; +} +inline const char* BeginPtr(const std::string &str) { + if (str.length() == 0) return NULL; + return &str[0]; +} } // namespace xgboost #endif // XGBOOST_UTILS_UTILS_H_