From f165ffbc9510af6dd803e2c9d50273ce100e52bd Mon Sep 17 00:00:00 2001 From: tqchen Date: Fri, 13 Mar 2015 22:59:04 -0700 Subject: [PATCH] fix hdfs --- rabit-learn/io/hdfs-inl.h | 19 +++++++++++++++---- rabit-learn/io/io-inl.h | 2 +- rabit-learn/io/line_split-inl.h | 11 ++++++++++- yarn/run_hdfs_prog.py | 3 ++- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/rabit-learn/io/hdfs-inl.h b/rabit-learn/io/hdfs-inl.h index a4ec4b253..526580020 100644 --- a/rabit-learn/io/hdfs-inl.h +++ b/rabit-learn/io/hdfs-inl.h @@ -17,8 +17,12 @@ namespace rabit { namespace io { class HDFSStream : public utils::ISeekStream { public: - HDFSStream(hdfsFS fs, const char *fname, const char *mode) - : fs_(fs), at_end_(false) { + HDFSStream(hdfsFS fs, + const char *fname, + const char *mode, + bool disconnect_when_done) + : fs_(fs), at_end_(false), + disconnect_when_done_(disconnect_when_done) { int flag; if (!strcmp(mode, "r")) { flag = O_RDONLY; @@ -35,6 +39,9 @@ class HDFSStream : public utils::ISeekStream { } virtual ~HDFSStream(void) { this->Close(); + if (disconnect_when_done_) { + utils::Check(hdfsDisconnect(fs_) == 0, "hdfsDisconnect error"); + } } virtual size_t Read(void *ptr, size_t size) { tSize nread = hdfsRead(fs_, fp_, ptr, size); @@ -90,6 +97,7 @@ class HDFSStream : public utils::ISeekStream { hdfsFS fs_; hdfsFile fp_; bool at_end_; + bool disconnect_when_done_; }; /*! \brief line split from normal file system */ @@ -124,12 +132,15 @@ class HDFSSplit : public LineSplitBase { } LineSplitBase::Init(fsize, rank, nsplit); } - virtual ~HDFSSplit(void) {} + virtual ~HDFSSplit(void) { + LineSplitBase::Destroy(); + utils::Check(hdfsDisconnect(fs_) == 0, "hdfsDisconnect error"); + } protected: virtual utils::ISeekStream *GetFile(size_t file_index) { utils::Assert(file_index < fnames_.size(), "file index exceed bound"); - return new HDFSStream(fs_, fnames_[file_index].c_str(), "r"); + return new HDFSStream(fs_, fnames_[file_index].c_str(), "r", false); } private: diff --git a/rabit-learn/io/io-inl.h b/rabit-learn/io/io-inl.h index db599cedc..6db2f5059 100644 --- a/rabit-learn/io/io-inl.h +++ b/rabit-learn/io/io-inl.h @@ -55,7 +55,7 @@ inline IStream *CreateStream(const char *uri, const char *mode) { } if (!strncmp(uri, "hdfs://", 7)) { #if RABIT_USE_HDFS - return new HDFSStream(hdfsConnect("default", 0), uri, mode); + return new HDFSStream(hdfsConnect("default", 0), uri, mode, true); #else utils::Error("Please compile with RABIT_USE_HDFS=1"); #endif diff --git a/rabit-learn/io/line_split-inl.h b/rabit-learn/io/line_split-inl.h index ba21d39e4..f4abb6438 100644 --- a/rabit-learn/io/line_split-inl.h +++ b/rabit-learn/io/line_split-inl.h @@ -18,7 +18,7 @@ namespace io { class LineSplitBase : public InputSplit { public: virtual ~LineSplitBase() { - if (fs_ != NULL) delete fs_; + this->Destroy(); } virtual bool NextLine(std::string *out_data) { if (file_ptr_ >= file_ptr_end_ && @@ -57,6 +57,15 @@ class LineSplitBase : public InputSplit { LineSplitBase(void) : fs_(NULL), reader_(kBufferSize) { } + /*! + * \brief destroy all the filesystem resources owned + * can be called by child destructor + */ + inline void Destroy(void) { + if (fs_ != NULL) { + delete fs_; fs_ = NULL; + } + } /*! * \brief initialize the line spliter, * \param file_size, size of each std::FILEs diff --git a/yarn/run_hdfs_prog.py b/yarn/run_hdfs_prog.py index 515e0cbbe..65fc6104f 100755 --- a/yarn/run_hdfs_prog.py +++ b/yarn/run_hdfs_prog.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """ -this script helps setup classpath env for HDFS, before running the program +this script helps setup classpath env for HDFS, before running program +that links with libhdfs """ import glob import sys