diff --git a/include/dmlc/io.h b/include/dmlc/io.h index e6a6ee566..ad60c3b26 100644 --- a/include/dmlc/io.h +++ b/include/dmlc/io.h @@ -8,6 +8,10 @@ #include #include #include +#include +#include +#include +#include /*! \brief namespace for dmlc */ namespace dmlc { @@ -38,7 +42,6 @@ class Stream { * \param uri the uri of the input currently we support * hdfs://, s3://, and file:// by default file:// will be used * \param flag can be "w", "r", "a" - * \return a created stream */ static Stream *Create(const char *uri, const char* const flag); // helper functions to write/read different data structures @@ -88,12 +91,12 @@ class Serializable { * \brief load the model from a stream * \param fi stream where to load the model from */ - virtual void Load(Stream *fi) = 0; + virtual void Load(Stream &fi) = 0; /*! * \brief saves the model to a stream * \param fo stream where to save the model to */ - virtual void Save(Stream *fo) const = 0; + virtual void Save(Stream &fo) const = 0; }; /*! @@ -116,13 +119,128 @@ class InputSplit { * \param uri the uri of the input, can contain hdfs prefix * \param part_index the part id of current input * \param num_parts total number of splits - * \return a created input split */ static InputSplit* Create(const char *uri, unsigned part_index, unsigned num_parts); }; +/*! + * \brief a std::ostream class that can can wrap Stream objects, + * can use ostream with that output to underlying Stream + * + * Usage example: + * \code + * + * Stream *fs = Stream::Create("hdfs:///test.txt", "w"); + * dmlc::ostream os(fs); + * os << "hello world" << std::endl; + * delete fs; + * \endcode + */ +class ostream : public std::basic_ostream { + public: + /*! + * \brief construct std::ostream type + * \param stream the Stream output to be used + * \param buffer_size internal streambuf size + */ + explicit ostream(Stream *stream, + size_t buffer_size = 1 << 10) + : basic_ostream(NULL), buf_(buffer_size) { + this->set_stream(stream); + } + /*! + * \brief set internal stream to be stream, reset states + * \param stream new stream as output + */ + inline void set_stream(Stream *stream) { + buf_.set_stream(stream); + this->rdbuf(&buf_); + } + + private: + // internal streambuf + class OutBuf : public std::streambuf { + public: + explicit OutBuf(size_t buffer_size) + : stream_(NULL), buffer_(buffer_size) { + assert(buffer_.size() > 0); + } + // set stream to the buffer + inline void set_stream(Stream *stream); + + private: + /*! \brief internal stream by StreamBuf */ + Stream *stream_; + /*! \brief internal buffer */ + std::vector buffer_; + // override sync + inline int_type sync(void); + // override overflow + inline int_type overflow(int c); + }; + /*! \brief buffer of the stream */ + OutBuf buf_; +}; + +/*! + * \brief a std::istream class that can can wrap Stream objects, + * can use istream with that output to underlying Stream + * + * Usage example: + * \code + * + * Stream *fs = Stream::Create("hdfs:///test.txt", "r"); + * dmlc::istream is(fs); + * is >> mydata; + * delete fs; + * \endcode + */ +class istream : public std::basic_istream { + public: + /*! + * \brief construct std::ostream type + * \param stream the Stream output to be used + * \param buffer_size internal buffer size + */ + explicit istream(Stream *stream, + size_t buffer_size = 1 << 10) + : basic_istream(NULL), buf_(buffer_size) { + this->set_stream(stream); + } + /*! + * \brief set internal stream to be stream, reset states + * \param stream new stream as output + */ + inline void set_stream(Stream *stream) { + buf_.set_stream(stream); + this->rdbuf(&buf_); + } + + private: + // internal streambuf + class InBuf : public std::streambuf { + public: + explicit InBuf(size_t buffer_size) + : stream_(NULL), buffer_(buffer_size) { + assert(buffer_.size() > 0); + } + // set stream to the buffer + inline void set_stream(Stream *stream); + + private: + /*! \brief internal stream by StreamBuf */ + Stream *stream_; + /*! \brief internal buffer */ + std::vector buffer_; + // override underflow + inline int_type underflow(); + }; + /*! \brief input buffer */ + InBuf buf_; +}; + // implementations of inline functions template inline void Stream::Write(const std::vector &vec) { @@ -160,5 +278,47 @@ inline bool Stream::Read(std::string *out_str) { } return true; } + +// implementations for ostream +inline void ostream::OutBuf::set_stream(Stream *stream) { + this->stream_ = stream; + this->setp(&buffer_[0], &buffer_[0] + buffer_.size() - 1); +} +inline int ostream::OutBuf::sync(void) { + if (stream_ == NULL) return -1; + std::ptrdiff_t n = pptr() - pbase(); + stream_->Write(pbase(), n); + this->pbump(-n); + return 0; +} +inline int ostream::OutBuf::overflow(int c) { + *(this->pptr()) = c; + std::ptrdiff_t n = pptr() - pbase(); + this->pbump(-n); + if (c == EOF) { + stream_->Write(pbase(), n); + } else { + stream_->Write(pbase(), n + 1); + } + return c; +} + +// implementations for istream +inline void istream::InBuf::set_stream(Stream *stream) { + stream_ = stream; + this->setg(&buffer_[0], &buffer_[0], &buffer_[0]); +} +inline int istream::InBuf::underflow() { + char *bhead = &buffer_[0]; + if (this->gptr() == this->egptr()) { + size_t sz = stream_->Read(bhead, buffer_.size()); + this->setg(bhead, bhead, bhead + sz); + } + if (this->gptr() == this->egptr()) { + return traits_type::eof(); + } else { + return traits_type::to_int_type(*gptr()); + } +} } // namespace dmlc #endif // DMLC_IO_H_