diff --git a/src/io/dmlc_simple.cpp b/src/io/dmlc_simple.cpp index 4296d5caa..3138dae94 100644 --- a/src/io/dmlc_simple.cpp +++ b/src/io/dmlc_simple.cpp @@ -8,45 +8,132 @@ namespace xgboost { namespace utils { +/*! + * \brief line split implementation from single FILE + * simply returns lines of files, used for stdin + */ class SingleFileSplit : public dmlc::InputSplit { public: - explicit SingleFileSplit(const char *fname) - : use_stdin_(false) { + explicit SingleFileSplit(const char *fname) + : use_stdin_(false), + chunk_begin_(NULL), chunk_end_(NULL) { if (!std::strcmp(fname, "stdin")) { #ifndef XGBOOST_STRICT_CXX98_ use_stdin_ = true; fp_ = stdin; #endif } if (!use_stdin_) { - fp_ = utils::FopenCheck(fname, "r"); + fp_ = utils::FopenCheck(fname, "rb"); } - end_of_file_ = false; + buffer_.resize(kBufferSize); } virtual ~SingleFileSplit(void) { if (!use_stdin_) std::fclose(fp_); } - virtual bool ReadRecord(std::string *out_data) { - if (end_of_file_) return false; - out_data->clear(); - while (true) { - char c = std::fgetc(fp_); - if (c == EOF) { - end_of_file_ = true; + virtual size_t Read(void *ptr, size_t size) { + return std::fread(ptr, 1, size, fp_); + } + virtual void Write(const void *ptr, size_t size) { + utils::Error("cannot do write in inputsplit"); + } + virtual bool NextRecord(Blob *out_rec) { + if (chunk_begin_ == chunk_end_) { + if (!LoadChunk()) return false; + } + char *next = FindNextRecord(chunk_begin_, + chunk_end_); + out_rec->dptr = chunk_begin_; + out_rec->size = next - chunk_begin_; + chunk_begin_ = next; + return true; + } + virtual bool NextChunk(Blob *out_chunk) { + if (chunk_begin_ == chunk_end_) { + if (!LoadChunk()) return false; + } + out_chunk->dptr = chunk_begin_; + out_chunk->size = chunk_end_ - chunk_begin_; + chunk_begin_ = chunk_end_; + return true; + } + inline bool ReadChunk(void *buf, size_t *size) { + size_t max_size = *size; + if (max_size <= overflow_.length()) { + *size = 0; return true; + } + if (overflow_.length() != 0) { + std::memcpy(buf, BeginPtr(overflow_), overflow_.length()); + } + size_t olen = overflow_.length(); + overflow_.resize(0); + size_t nread = this->Read(reinterpret_cast(buf) + olen, + max_size - olen); + nread += olen; + if (nread == 0) return false; + if (nread != max_size) { + *size = nread; + return true; + } else { + const char *bptr = reinterpret_cast(buf); + // return the last position where a record starts + const char *bend = this->FindLastRecordBegin(bptr, bptr + max_size); + *size = bend - bptr; + overflow_.resize(max_size - *size); + if (overflow_.length() != 0) { + std::memcpy(BeginPtr(overflow_), bend, overflow_.length()); } - if (c != '\r' && c != '\n' && c != EOF) { - *out_data += c; + return true; + } + } + + protected: + inline const char* FindLastRecordBegin(const char *begin, + const char *end) { + if (begin == end) return begin; + for (const char *p = end - 1; p != begin; --p) { + if (*p == '\n' || *p == '\r') return p + 1; + } + return begin; + } + inline char* FindNextRecord(char *begin, char *end) { + char *p; + for (p = begin; p != end; ++p) { + if (*p == '\n' || *p == '\r') break; + } + for (; p != end; ++p) { + if (*p != '\n' && *p != '\r') return p; + } + return end; + } + inline bool LoadChunk(void) { + while (true) { + size_t size = buffer_.length(); + if (!ReadChunk(BeginPtr(buffer_), &size)) return false; + if (size == 0) { + buffer_.resize(buffer_.length() * 2); } else { - if (out_data->length() != 0) return true; - if (end_of_file_) return false; + chunk_begin_ = reinterpret_cast(BeginPtr(buffer_)); + chunk_end_ = chunk_begin_ + size; + break; } } - return false; - } - + return true; + } + private: + // buffer size + static const size_t kBufferSize = 1 << 18UL; + // file std::FILE *fp_; bool use_stdin_; - bool end_of_file_; + // internal overflow + std::string overflow_; + // internal buffer + std::string buffer_; + // beginning of chunk + char *chunk_begin_; + // end of chunk + char *chunk_end_; }; class StdFile : public dmlc::Stream { @@ -105,7 +192,8 @@ class StdFile : public dmlc::Stream { namespace dmlc { InputSplit* InputSplit::Create(const char *uri, unsigned part, - unsigned nsplit) { + unsigned nsplit, + const char *type) { using namespace xgboost; const char *msg = "xgboost is compiled in local mode\n"\ "to use hdfs, s3 or distributed version, compile with make dmlc=1"; diff --git a/src/io/libsvm_parser.h b/src/io/libsvm_parser.h new file mode 100644 index 000000000..6767d61be --- /dev/null +++ b/src/io/libsvm_parser.h @@ -0,0 +1,168 @@ +/*! + * Copyright (c) 2015 by Contributors + * \file libsvm_parser.h + * \brief iterator parser to parse libsvm format + * \author Tianqi Chen + */ +#ifndef XGBOOST_IO_LIBSVM_PARSER_H_ +#define XGBOOST_IO_LIBSVM_PARSER_H_ + +#include +#include +#include +#include "../utils/omp.h" +#include "../utils/utils.h" +#include "../sync/sync.h" +#include "./sparse_batch_page.h" + +namespace xgboost { +namespace io { +/*! \brief page returned by libsvm parser */ +struct LibSVMPage : public SparsePage { + std::vector label; + // overload clear + inline void Clear() { + SparsePage::Clear(); + label.clear(); + } +}; +/*! + * \brief libsvm parser that parses the input lines + * and returns rows in input data + */ +class LibSVMParser : public utils::IIterator { + public: + explicit LibSVMParser(dmlc::InputSplit *source, + int nthread) + : bytes_read_(0), at_head_(true), + data_ptr_(0), data_end_(0), source_(source) { + int maxthread; + #pragma omp parallel + { + maxthread = omp_get_num_threads(); + } + maxthread = std::max(maxthread / 2, 1); + nthread_ = std::min(maxthread, nthread); + } + virtual ~LibSVMParser() { + delete source_; + } + virtual void BeforeFirst(void) { + utils::Assert(at_head_, "cannot call BeforeFirst"); + } + virtual const LibSVMPage &Value(void) const { + return data_[data_ptr_ - 1]; + } + virtual bool Next(void) { + while (true) { + while (data_ptr_ < data_end_) { + data_ptr_ += 1; + if (data_[data_ptr_ - 1].Size() != 0) { + return true; + } + } + if (!FillData()) break; + data_ptr_ = 0; data_end_ = data_.size(); + } + return false; + } + inline size_t bytes_read(void) const { + return bytes_read_; + } + + protected: + inline bool FillData() { + dmlc::InputSplit::Blob chunk; + if (!source_->NextChunk(&chunk)) return false; + int nthread; + #pragma omp parallel num_threads(nthread_) + { + nthread = omp_get_num_threads(); + } + // reserve space for data + data_.resize(nthread); + bytes_read_ += chunk.size; + utils::Assert(chunk.size != 0, "LibSVMParser.FileData"); + char *head = reinterpret_cast(chunk.dptr); + #pragma omp parallel num_threads(nthread_) + { + // threadid + int tid = omp_get_thread_num(); + size_t nstep = (chunk.size + nthread - 1) / nthread; + size_t sbegin = std::min(tid * nstep, chunk.size); + size_t send = std::min((tid + 1) * nstep, chunk.size); + char *pbegin = BackFindEndLine(head + sbegin, head); + char *pend; + if (tid + 1 == nthread) { + pend = head + send; + } else { + pend = BackFindEndLine(head + send, head); + } + ParseBlock(pbegin, pend, &data_[tid]); + } + data_ptr_ = 0; + return true; + } + /*! + * \brief parse data into out + * \param begin beginning of buffer + * \param end end of buffer + */ + inline void ParseBlock(char *begin, + char *end, + LibSVMPage *out) { + out->Clear(); + char *p = begin; + while (p != end) { + while (isspace(*p) && p != end) ++p; + if (p == end) break; + char *head = p; + while (isdigit(*p) && p != end) ++p; + if (*p == ':') { + out->data.push_back(SparseBatch::Entry(atol(head), + atof(p + 1))); + } else { + if (out->label.size() != 0) { + out->offset.push_back(out->data.size()); + } + out->label.push_back(atof(head)); + } + while (!isspace(*p) && p != end) ++p; + } + if (out->label.size() != 0) { + out->offset.push_back(out->data.size()); + } + utils::Check(out->label.size() + 1 == out->offset.size(), + "LibSVMParser inconsistent"); + } + /*! + * \brief start from bptr, go backward and find first endof line + * \param bptr end position to go backward + * \param begin the beginning position of buffer + * \return position of first endof line going backward + */ + inline char* BackFindEndLine(char *bptr, + char *begin) { + for (; bptr != begin; --bptr) { + if (*bptr == '\n' || *bptr == '\r') return bptr; + } + return begin; + } + + private: + // nthread + int nthread_; + // number of bytes readed + size_t bytes_read_; + // at beginning, at end of stream + bool at_head_; + // pointer to begin and end of data + size_t data_ptr_, data_end_; + // source split that provides the data + dmlc::InputSplit *source_; + // internal data + std::vector data_; +}; +} // namespace io +} // namespace xgboost +#endif // XGBOOST_IO_LIBSVM_PARSER_H_ diff --git a/src/io/page_dmatrix-inl.hpp b/src/io/page_dmatrix-inl.hpp index 03f0d5ca8..047d589a1 100644 --- a/src/io/page_dmatrix-inl.hpp +++ b/src/io/page_dmatrix-inl.hpp @@ -12,6 +12,7 @@ #include "./simple_fmatrix-inl.hpp" #include "./sparse_batch_page.h" #include "./page_fmatrix-inl.hpp" +#include "./libsvm_parser.h" namespace xgboost { namespace io { @@ -143,38 +144,32 @@ class DMatrixPageBase : public DataMatrix { std::string fname_row = std::string(cache_file) + ".row.blob"; utils::FileStream fo(utils::FopenCheck(fname_row.c_str(), "wb")); SparsePage page; - dmlc::InputSplit *in = - dmlc::InputSplit::Create(uri, rank, npart); - std::string line; + + LibSVMParser parser( + dmlc::InputSplit::Create(uri, rank, npart, "text"), 4); info.Clear(); - while (in->ReadRecord(&line)) { - float label; - std::istringstream ss(line); - std::vector feats; - ss >> label; - while (!ss.eof()) { - RowBatch::Entry e; - if (!(ss >> e.index)) break; - ss.ignore(32, ':'); - if (!(ss >> e.fvalue)) break; - feats.push_back(e); + while (parser.Next()) { + const LibSVMPage &batch = parser.Value(); + size_t nlabel = info.labels.size(); + info.labels.resize(nlabel + batch.label.size()); + if (batch.label.size() != 0) { + std::memcpy(BeginPtr(info.labels) + nlabel, + BeginPtr(batch.label), + batch.label.size() * sizeof(float)); + } + page.Push(batch); + for (size_t i = 0; i < batch.data.size(); ++i) { + info.info.num_col = std::max(info.info.num_col, + static_cast(batch.data[i].index+1)); } - RowBatch::Inst row(BeginPtr(feats), feats.size()); - page.Push(row); if (page.MemCostBytes() >= kPageSize) { page.Save(&fo); page.Clear(); } - for (size_t i = 0; i < feats.size(); ++i) { - info.info.num_col = std::max(info.info.num_col, - static_cast(feats[i].index+1)); - } - this->info.labels.push_back(label); - info.info.num_row += 1; + info.info.num_row += batch.label.size(); } if (page.data.size() != 0) { page.Save(&fo); } - delete in; fo.Close(); iter_->Load(utils::FileStream(utils::FopenCheck(fname_row.c_str(), "rb"))); // save data matrix diff --git a/src/io/simple_dmatrix-inl.hpp b/src/io/simple_dmatrix-inl.hpp index e0ea7899e..010bf9893 100644 --- a/src/io/simple_dmatrix-inl.hpp +++ b/src/io/simple_dmatrix-inl.hpp @@ -19,6 +19,7 @@ #include "./io.h" #include "./simple_fmatrix-inl.hpp" #include "../sync/sync.h" +#include "./libsvm_parser.h" namespace xgboost { namespace io { @@ -72,7 +73,8 @@ class DMatrixSimple : public DataMatrix { inline size_t AddRow(const std::vector &feats) { for (size_t i = 0; i < feats.size(); ++i) { row_data_.push_back(feats[i]); - info.info.num_col = std::max(info.info.num_col, static_cast(feats[i].index+1)); + info.info.num_col = std::max(info.info.num_col, + static_cast(feats[i].index+1)); } row_ptr_.push_back(row_ptr_.back() + feats.size()); info.info.num_row += 1; @@ -90,26 +92,35 @@ class DMatrixSimple : public DataMatrix { rank = rabit::GetRank(); npart = rabit::GetWorldSize(); } - dmlc::InputSplit *in = - dmlc::InputSplit::Create(uri, rank, npart); + LibSVMParser parser( + dmlc::InputSplit::Create(uri, rank, npart, "text"), 4); this->Clear(); - std::string line; - while (in->ReadRecord(&line)) { - float label; - std::istringstream ss(line); - std::vector feats; - ss >> label; - while (!ss.eof()) { - RowBatch::Entry e; - if (!(ss >> e.index)) break; - ss.ignore(32, ':'); - if (!(ss >> e.fvalue)) break; - feats.push_back(e); + while (parser.Next()) { + const LibSVMPage &batch = parser.Value(); + size_t nlabel = info.labels.size(); + info.labels.resize(nlabel + batch.label.size()); + if (batch.label.size() != 0) { + std::memcpy(BeginPtr(info.labels) + nlabel, + BeginPtr(batch.label), + batch.label.size() * sizeof(float)); } - info.labels.push_back(label); - this->AddRow(feats); + size_t ndata = row_data_.size(); + row_data_.resize(ndata + batch.data.size()); + if (batch.data.size() != 0) { + std::memcpy(BeginPtr(row_data_) + ndata, + BeginPtr(batch.data), + batch.data.size() * sizeof(RowBatch::Entry)); + } + row_ptr_.resize(row_ptr_.size() + batch.label.size()); + for (size_t i = 0; i < batch.label.size(); ++i) { + row_ptr_[nlabel + i + 1] = row_ptr_[nlabel] + batch.offset[i + 1]; + } + info.info.num_row += batch.Size(); + for (size_t i = 0; i < batch.data.size(); ++i) { + info.info.num_col = std::max(info.info.num_col, + static_cast(batch.data[i].index+1)); + } } - delete in; if (!silent) { utils::Printf("%lux%lu matrix with %lu entries is loaded from %s\n", static_cast(info.num_row()), diff --git a/src/io/sparse_batch_page.h b/src/io/sparse_batch_page.h index 756bd1034..61155f380 100644 --- a/src/io/sparse_batch_page.h +++ b/src/io/sparse_batch_page.h @@ -150,7 +150,23 @@ class SparsePage { size_t begin = offset.size(); offset.resize(offset.size() + batch.size); for (size_t i = 0; i < batch.size; ++i) { - offset[i + begin] = top + batch.ind_ptr[i] - batch.ind_ptr[0]; + offset[i + begin] = top + batch.ind_ptr[i + 1] - batch.ind_ptr[0]; + } + } + /*! + * \brief Push a sparse page + * \param batch the row page + */ + inline void Push(const SparsePage &batch) { + data.resize(offset.back() + batch.Size()); + std::memcpy(BeginPtr(data) + offset.back(), + BeginPtr(batch.data), + sizeof(SparseBatch::Entry) * batch.data.size()); + size_t top = offset.back(); + size_t begin = offset.size(); + offset.resize(offset.size() + batch.Size()); + for (size_t i = 0; i < batch.Size(); ++i) { + offset[i + begin] = top + batch.offset[i + 1]; } } /*! diff --git a/src/utils/utils.h b/src/utils/utils.h index afe17f64c..7aee5a29a 100644 --- a/src/utils/utils.h +++ b/src/utils/utils.h @@ -174,5 +174,13 @@ inline const T *BeginPtr(const std::vector &vec) { return &vec[0]; } } +inline char* BeginPtr(std::string &str) { + if (str.length() == 0) return NULL; + return &str[0]; +} +inline const char* BeginPtr(const std::string &str) { + if (str.length() == 0) return NULL; + return &str[0]; +} } // namespace xgboost #endif // XGBOOST_UTILS_UTILS_H_