From 9b6bf57e7998f312d06daffa650ffc2254924aa3 Mon Sep 17 00:00:00 2001 From: tqchen Date: Sat, 7 Mar 2015 09:08:21 -0800 Subject: [PATCH] fix hdfs --- rabit-learn/io/file-inl.h | 2 ++ rabit-learn/io/hdfs-inl.h | 33 +++++++++++++++++++++------------ rabit-learn/io/line_split-inl.h | 3 +-- rabit-learn/kmeans/Makefile | 1 - rabit-learn/linear/Makefile | 1 + rabit-learn/make/common.mk | 3 +-- 6 files changed, 26 insertions(+), 17 deletions(-) diff --git a/rabit-learn/io/file-inl.h b/rabit-learn/io/file-inl.h index 9e608ccf2..d77a943de 100644 --- a/rabit-learn/io/file-inl.h +++ b/rabit-learn/io/file-inl.h @@ -5,6 +5,8 @@ * \brief normal filesystem I/O * \author Tianqi Chen */ +#include +#include #include #include "./io.h" #include "./line_split-inl.h" diff --git a/rabit-learn/io/hdfs-inl.h b/rabit-learn/io/hdfs-inl.h index eeabbb116..d50d0578a 100644 --- a/rabit-learn/io/hdfs-inl.h +++ b/rabit-learn/io/hdfs-inl.h @@ -5,6 +5,8 @@ * \brief HDFS I/O * \author Tianqi Chen */ +#include +#include #include #include #include "./io.h" @@ -16,14 +18,14 @@ namespace io { class HDFSStream : public utils::ISeekStream { public: HDFSStream(hdfsFS fs, const char *fname, const char *mode) - : fs_(fs) { + : fs_(fs), at_end_(false) { int flag; if (!strcmp(mode, "r")) { flag = O_RDONLY; } else if (!strcmp(mode, "w")) { - flag = O_WDONLY; + flag = O_WRONLY; } else if (!strcmp(mode, "a")) { - flag = O_WDONLY | O_APPEND; + flag = O_WRONLY | O_APPEND; } else { utils::Error("HDFSStream: unknown flag %s", mode); } @@ -31,7 +33,7 @@ class HDFSStream : public utils::ISeekStream { utils::Check(fp_ != NULL, "HDFSStream: fail to open %s", fname); } - virtual ~FileStream(void) { + virtual ~HDFSStream(void) { this->Close(); } virtual size_t Read(void *ptr, size_t size) { @@ -40,6 +42,9 @@ class HDFSStream : public utils::ISeekStream { int errsv = errno; utils::Error("HDFSStream.Read Error:%s", strerror(errsv)); } + if (nread == 0) { + at_end_ = true; + } return static_cast(nread); } virtual void Write(const void *ptr, size_t size) { @@ -56,8 +61,8 @@ class HDFSStream : public utils::ISeekStream { } virtual void Seek(size_t pos) { if (hdfsSeek(fs_, fp_, pos) != 0) { - int errsv = errno; - utils::Error("HDFSStream.Seek Error:%s", strerror(errsv)); + int errsv = errno; + utils::Error("HDFSStream.Seek Error:%s", strerror(errsv)); } } virtual size_t Tell(void) { @@ -68,8 +73,11 @@ class HDFSStream : public utils::ISeekStream { } return static_cast(offset); } + virtual bool AtEnd(void) const { + return at_end_; + } inline void Close(void) { - if (fp != NULL) { + if (fp_ != NULL) { if (hdfsCloseFile(fs_, fp_) == 0) { int errsv = errno; utils::Error("HDFSStream.Close Error:%s", strerror(errsv)); @@ -81,14 +89,15 @@ class HDFSStream : public utils::ISeekStream { private: hdfsFS fs_; hdfsFile fp_; + bool at_end_; }; /*! \brief line split from normal file system */ class HDFSSplit : public LineSplitBase { public: - explicit FileSplit(const char *uri, unsigned rank, unsigned nsplit) { + explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) { fs_ = hdfsConnect("default", 0); - std::string paths; + std::vector paths; LineSplitBase::SplitNames(&paths, uri, "#"); // get the files std::vector fsize; @@ -103,16 +112,16 @@ class HDFSSplit : public LineSplitBase { fnames_.push_back(std::string(files[i].mName)); } } - hdfsFileInfo(files, nentry); + hdfsFreeFileInfo(files, nentry); } else { fsize.push_back(info->mSize); fnames_.push_back(std::string(info->mName)); } - hdfsFileInfo(info, 1); + hdfsFreeFileInfo(info, 1); } LineSplitBase::Init(fsize, rank, nsplit); } - virtual ~FileSplit(void) {} + virtual ~HDFSSplit(void) {} protected: virtual utils::ISeekStream *GetFile(size_t file_index) { diff --git a/rabit-learn/io/line_split-inl.h b/rabit-learn/io/line_split-inl.h index f8202e44c..4d66b344c 100644 --- a/rabit-learn/io/line_split-inl.h +++ b/rabit-learn/io/line_split-inl.h @@ -8,8 +8,7 @@ #include #include #include -#include -#include +#include #include "../../include/rabit.h" #include "./io.h" #include "./buffer_reader-inl.h" diff --git a/rabit-learn/kmeans/Makefile b/rabit-learn/kmeans/Makefile index bf8aaa84e..197cf1154 100644 --- a/rabit-learn/kmeans/Makefile +++ b/rabit-learn/kmeans/Makefile @@ -13,4 +13,3 @@ kmeans.rabit: kmeans.o lib kmeans.mock: kmeans.o lib kmeans.mpi: kmeans.o libmpi kmeans.o: kmeans.cc ../../src/*.h - diff --git a/rabit-learn/linear/Makefile b/rabit-learn/linear/Makefile index a7427916b..abcf20acf 100644 --- a/rabit-learn/linear/Makefile +++ b/rabit-learn/linear/Makefile @@ -6,6 +6,7 @@ MPIBIN = OBJ = linear.o # common build script for programs +include ../make/config.mk include ../make/common.mk CFLAGS+=-fopenmp linear.o: linear.cc ../../src/*.h linear.h ../solver/*.h diff --git a/rabit-learn/make/common.mk b/rabit-learn/make/common.mk index cc54dc7c2..7ab25bf34 100644 --- a/rabit-learn/make/common.mk +++ b/rabit-learn/make/common.mk @@ -5,13 +5,12 @@ export CFLAGS = -Wall -msse2 -Wno-unknown-pragmas -fPIC -I../../include # setup opencv ifeq ($(USE_HDFS),1) - CFLAGS+= -DRABIT_USE_HDFS=1 -I$(HDFS_HOME)/include + CFLAGS+= -DRABIT_USE_HDFS=1 -I$(LIBHDFS_INCLUDE) -I$(JAVA_HOME)/include LDFLAGS+= -L$(HDFS_HOME)/lib/native -lhdfs else CFLAGS+= -DRABIT_USE_HDFS=0 endif - .PHONY: clean all lib mpi all: $(BIN) $(MOCKBIN)