diff --git a/rabit-learn/io/buffer_reader-inl.h b/rabit-learn/io/buffer_reader-inl.h index 11b5fb88b..c887c5013 100644 --- a/rabit-learn/io/buffer_reader-inl.h +++ b/rabit-learn/io/buffer_reader-inl.h @@ -38,6 +38,7 @@ class StreamBufferReader { } } } + /*! \brief whether we are reaching the end of file */ inline bool AtEnd(void) const { return read_len_ == 0; } diff --git a/rabit-learn/io/file-inl.h b/rabit-learn/io/file-inl.h index 8e9d9c593..6eaa62b33 100644 --- a/rabit-learn/io/file-inl.h +++ b/rabit-learn/io/file-inl.h @@ -66,27 +66,36 @@ class FileStream : public utils::ISeekStream { }; /*! \brief line split from normal file system */ -class FileSplit : public LineSplitBase { +class FileProvider : public LineSplitter::IFileProvider { public: - explicit FileSplit(const char *uri, unsigned rank, unsigned nsplit) { - LineSplitBase::SplitNames(&fnames_, uri, "#"); + explicit FileProvider(const char *uri) { + LineSplitter::SplitNames(&fnames_, uri, "#"); std::vector fsize; for (size_t i = 0; i < fnames_.size(); ++i) { if (!std::strncmp(fnames_[i].c_str(), "file://", 7)) { std::string tmp = fnames_[i].c_str() + 7; fnames_[i] = tmp; } - fsize.push_back(GetFileSize(fnames_[i].c_str())); + size_t fz = GetFileSize(fnames_[i].c_str()); + if (fz != 0) { + fsize_.push_back(fz); + } } - LineSplitBase::Init(fsize, rank, nsplit); } - virtual ~FileSplit(void) {} - - protected: - virtual utils::ISeekStream *GetFile(size_t file_index) { + // destrucor + virtual ~FileProvider(void) {} + virtual utils::ISeekStream *Open(size_t file_index) { utils::Assert(file_index < fnames_.size(), "file index exceed bound"); return new FileStream(fnames_[file_index].c_str(), "rb"); } + virtual const std::vector &FileSize(void) const { + return fsize_; + } + private: + // file sizes + std::vector fsize_; + // file names + std::vector fnames_; // get file size inline static size_t GetFileSize(const char *fname) { std::FILE *fp = utils::FopenCheck(fname, "rb"); @@ -96,10 +105,6 @@ class FileSplit : public LineSplitBase { std::fclose(fp); return fsize; } - - private: - // file names - std::vector fnames_; }; } // namespace io } // namespace rabit diff --git a/rabit-learn/io/hdfs-inl.h b/rabit-learn/io/hdfs-inl.h index 7fcbec8d5..967f7b5c0 100644 --- a/rabit-learn/io/hdfs-inl.h +++ b/rabit-learn/io/hdfs-inl.h @@ -15,7 +15,7 @@ /*! \brief io interface */ namespace rabit { namespace io { -class HDFSStream : public utils::ISeekStream { +class HDFSStream : public ISeekStream { public: HDFSStream(hdfsFS fs, const char *fname, @@ -93,7 +93,7 @@ class HDFSStream : public utils::ISeekStream { } } - private: + private: hdfsFS fs_; hdfsFile fp_; bool at_end_; @@ -101,15 +101,14 @@ class HDFSStream : public utils::ISeekStream { }; /*! \brief line split from normal file system */ -class HDFSSplit : public LineSplitBase { +class HDFSProvider : public LineSplitter::IFileProvider { public: - explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) { + explicit HDFSProvider(const char *uri) { fs_ = hdfsConnect("default", 0); utils::Check(fs_ != NULL, "error when connecting to default HDFS"); std::vector paths; - LineSplitBase::SplitNames(&paths, uri, "#"); + LineSplitter::SplitNames(&paths, uri, "#"); // get the files - std::vector fsize; for (size_t i = 0; i < paths.size(); ++i) { hdfsFileInfo *info = hdfsGetPathInfo(fs_, paths[i].c_str()); utils::Check(info != NULL, "path %s do not exist", paths[i].c_str()); @@ -118,34 +117,37 @@ class HDFSSplit : public LineSplitBase { hdfsFileInfo *files = hdfsListDirectory(fs_, info->mName, &nentry); utils::Check(files != NULL, "error when ListDirectory %s", info->mName); for (int i = 0; i < nentry; ++i) { - if (files[i].mKind == 'F') { - fsize.push_back(files[i].mSize); + if (files[i].mKind == 'F' && files[i].mSize != 0) { + fsize_.push_back(files[i].mSize); fnames_.push_back(std::string(files[i].mName)); } } hdfsFreeFileInfo(files, nentry); } else { - fsize.push_back(info->mSize); - fnames_.push_back(std::string(info->mName)); + if (info->mSize != 0) { + fsize_.push_back(info->mSize); + fnames_.push_back(std::string(info->mName)); + } } hdfsFreeFileInfo(info, 1); } - LineSplitBase::Init(fsize, rank, nsplit); } - virtual ~HDFSSplit(void) { - LineSplitBase::Destroy(); + virtual ~HDFSProvider(void) { utils::Check(hdfsDisconnect(fs_) == 0, "hdfsDisconnect error"); + } + virtual const std::vector &FileSize(void) const { + return fsize_; } - - protected: - virtual utils::ISeekStream *GetFile(size_t file_index) { + virtual ISeekStream *Open(size_t file_index) { utils::Assert(file_index < fnames_.size(), "file index exceed bound"); return new HDFSStream(fs_, fnames_[file_index].c_str(), "r", false); } - + private: // hdfs handle hdfsFS fs_; + // file sizes + std::vector fsize_; // file names std::vector fnames_; }; diff --git a/rabit-learn/io/io-inl.h b/rabit-learn/io/io-inl.h index 6db2f5059..53b24ae1d 100644 --- a/rabit-learn/io/io-inl.h +++ b/rabit-learn/io/io-inl.h @@ -30,16 +30,16 @@ inline InputSplit *CreateInputSplit(const char *uri, return new SingleFileSplit(uri); } if (!strncmp(uri, "file://", 7)) { - return new FileSplit(uri, part, nsplit); + return new LineSplitter(new FileProvider(uri), part, nsplit); } if (!strncmp(uri, "hdfs://", 7)) { #if RABIT_USE_HDFS - return new HDFSSplit(uri, part, nsplit); + return new LineSplitter(new HDFSProvider(uri), part, nsplit); #else utils::Error("Please compile with RABIT_USE_HDFS=1"); #endif } - return new FileSplit(uri, part, nsplit); + return new LineSplitter(new FileProvider(uri), part, nsplit); } /*! * \brief create an stream, the stream must be able to close diff --git a/rabit-learn/io/io.h b/rabit-learn/io/io.h index 9ca20033c..79d2df12e 100644 --- a/rabit-learn/io/io.h +++ b/rabit-learn/io/io.h @@ -19,6 +19,7 @@ namespace rabit { * \brief namespace to handle input split and filesystem interfacing */ namespace io { +/*! \brief reused ISeekStream's definition */ typedef utils::ISeekStream ISeekStream; /*! * \brief user facing input split helper, diff --git a/rabit-learn/io/line_split-inl.h b/rabit-learn/io/line_split-inl.h index f4abb6438..1f8ae4fdc 100644 --- a/rabit-learn/io/line_split-inl.h +++ b/rabit-learn/io/line_split-inl.h @@ -15,11 +15,42 @@ namespace rabit { namespace io { -class LineSplitBase : public InputSplit { + +/*! \brief class that split the files by line */ +class LineSplitter : public InputSplit { public: - virtual ~LineSplitBase() { - this->Destroy(); + class IFileProvider { + public: + /*! + * \brief get the seek stream of given file_index + * \return the corresponding seek stream at head of the stream + * the seek stream's resource can be freed by calling delete + */ + virtual ISeekStream *Open(size_t file_index) = 0; + /*! + * \return const reference to size of each files + */ + virtual const std::vector &FileSize(void) const = 0; + // virtual destructor + virtual ~IFileProvider() {} + }; + // constructor + explicit LineSplitter(IFileProvider *provider, + unsigned rank, + unsigned nsplit) + : provider_(provider), fs_(NULL), + reader_(kBufferSize) { + this->Init(provider_->FileSize(), rank, nsplit); } + // destructor + virtual ~LineSplitter() { + if (fs_ != NULL) { + delete fs_; fs_ = NULL; + } + // delete provider after destructing the streams + delete provider_; + } + // get next line virtual bool NextLine(std::string *out_data) { if (file_ptr_ >= file_ptr_end_ && offset_curr_ >= offset_end_) return false; @@ -29,15 +60,15 @@ class LineSplitBase : public InputSplit { if (reader_.AtEnd()) { if (out_data->length() != 0) return true; file_ptr_ += 1; + if (offset_curr_ >= offset_end_) return false; if (offset_curr_ != file_offset_[file_ptr_]) { - utils::Error("warning:std::FILE size not calculated correctly\n"); + utils::Error("warning: FILE size not calculated correctly\n"); offset_curr_ = file_offset_[file_ptr_]; } - if (offset_curr_ >= offset_end_) return false; utils::Assert(file_ptr_ + 1 < file_offset_.size(), "boundary check"); delete fs_; - fs_ = this->GetFile(file_ptr_); + fs_ = provider_->Open(file_ptr_); reader_.set_stream(fs_); } else { ++offset_curr_; @@ -51,24 +82,27 @@ class LineSplitBase : public InputSplit { } } } - - protected: - // constructor - LineSplitBase(void) - : fs_(NULL), reader_(kBufferSize) { - } /*! - * \brief destroy all the filesystem resources owned - * can be called by child destructor + * \brief split names given + * \param out_fname output std::FILE names + * \param uri_ the iput uri std::FILE + * \param dlm deliminetr */ - inline void Destroy(void) { - if (fs_ != NULL) { - delete fs_; fs_ = NULL; + inline static void SplitNames(std::vector *out_fname, + const char *uri_, + const char *dlm) { + std::string uri = uri_; + char *p = std::strtok(BeginPtr(uri), dlm); + while (p != NULL) { + out_fname->push_back(std::string(p)); + p = std::strtok(NULL, dlm); } } + + private: /*! * \brief initialize the line spliter, - * \param file_size, size of each std::FILEs + * \param file_size, size of each files * \param rank the current rank of the data * \param nsplit number of split we will divide the data into */ @@ -91,7 +125,7 @@ class LineSplitBase : public InputSplit { file_ptr_end_ = std::upper_bound(file_offset_.begin(), file_offset_.end(), offset_end_) - file_offset_.begin() - 1; - fs_ = GetFile(file_ptr_); + fs_ = provider_->Open(file_ptr_); reader_.set_stream(fs_); // try to set the starting position correctly if (file_offset_[file_ptr_] != offset_begin_) { @@ -103,33 +137,15 @@ class LineSplitBase : public InputSplit { } } } - /*! - * \brief get the seek stream of given file_index - * \return the corresponding seek stream at head of std::FILE - */ - virtual utils::ISeekStream *GetFile(size_t file_index) = 0; - /*! - * \brief split names given - * \param out_fname output std::FILE names - * \param uri_ the iput uri std::FILE - * \param dlm deliminetr - */ - inline static void SplitNames(std::vector *out_fname, - const char *uri_, - const char *dlm) { - std::string uri = uri_; - char *p = std::strtok(BeginPtr(uri), dlm); - while (p != NULL) { - out_fname->push_back(std::string(p)); - p = std::strtok(NULL, dlm); - } - } + private: + /*! \brief FileProvider */ + IFileProvider *provider_; /*! \brief current input stream */ utils::ISeekStream *fs_; - /*! \brief std::FILE pointer of which std::FILE to read on */ + /*! \brief file pointer of which file to read on */ size_t file_ptr_; - /*! \brief std::FILE pointer where the end of std::FILE lies */ + /*! \brief file pointer where the end of file lies */ size_t file_ptr_end_; /*! \brief get the current offset */ size_t offset_curr_; @@ -137,7 +153,7 @@ class LineSplitBase : public InputSplit { size_t offset_begin_; /*! \brief end of the offset */ size_t offset_end_; - /*! \brief byte-offset of each std::FILE */ + /*! \brief byte-offset of each file */ std::vector file_offset_; /*! \brief buffer reader */ StreamBufferReader reader_; diff --git a/rabit-learn/linear/linear.cc b/rabit-learn/linear/linear.cc index 53399875c..a29c20ca7 100644 --- a/rabit-learn/linear/linear.cc +++ b/rabit-learn/linear/linear.cc @@ -206,21 +206,22 @@ int main(int argc, char *argv[]) { rabit::Finalize(); return 0; } - rabit::linear::LinearObjFunction linear; + rabit::linear::LinearObjFunction *linear = new rabit::linear::LinearObjFunction(); if (!strcmp(argv[1], "stdin")) { - linear.LoadData(argv[1]); + linear->LoadData(argv[1]); rabit::Init(argc, argv); } else { rabit::Init(argc, argv); - linear.LoadData(argv[1]); + linear->LoadData(argv[1]); } for (int i = 2; i < argc; ++i) { char name[256], val[256]; if (sscanf(argv[i], "%[^=]=%s", name, val) == 2) { - linear.SetParam(name, val); + linear->SetParam(name, val); } } - linear.Run(); + linear->Run(); + delete linear; rabit::Finalize(); return 0; }