This commit is contained in:
tqchen 2015-03-07 09:08:21 -08:00
parent 395d5c29d5
commit 9b6bf57e79
6 changed files with 26 additions and 17 deletions

View File

@ -5,6 +5,8 @@
* \brief normal filesystem I/O * \brief normal filesystem I/O
* \author Tianqi Chen * \author Tianqi Chen
*/ */
#include <string>
#include <vector>
#include <cstdio> #include <cstdio>
#include "./io.h" #include "./io.h"
#include "./line_split-inl.h" #include "./line_split-inl.h"

View File

@ -5,6 +5,8 @@
* \brief HDFS I/O * \brief HDFS I/O
* \author Tianqi Chen * \author Tianqi Chen
*/ */
#include <string>
#include <vector>
#include <hdfs.h> #include <hdfs.h>
#include <errno.h> #include <errno.h>
#include "./io.h" #include "./io.h"
@ -16,14 +18,14 @@ namespace io {
class HDFSStream : public utils::ISeekStream { class HDFSStream : public utils::ISeekStream {
public: public:
HDFSStream(hdfsFS fs, const char *fname, const char *mode) HDFSStream(hdfsFS fs, const char *fname, const char *mode)
: fs_(fs) { : fs_(fs), at_end_(false) {
int flag; int flag;
if (!strcmp(mode, "r")) { if (!strcmp(mode, "r")) {
flag = O_RDONLY; flag = O_RDONLY;
} else if (!strcmp(mode, "w")) { } else if (!strcmp(mode, "w")) {
flag = O_WDONLY; flag = O_WRONLY;
} else if (!strcmp(mode, "a")) { } else if (!strcmp(mode, "a")) {
flag = O_WDONLY | O_APPEND; flag = O_WRONLY | O_APPEND;
} else { } else {
utils::Error("HDFSStream: unknown flag %s", mode); utils::Error("HDFSStream: unknown flag %s", mode);
} }
@ -31,7 +33,7 @@ class HDFSStream : public utils::ISeekStream {
utils::Check(fp_ != NULL, utils::Check(fp_ != NULL,
"HDFSStream: fail to open %s", fname); "HDFSStream: fail to open %s", fname);
} }
virtual ~FileStream(void) { virtual ~HDFSStream(void) {
this->Close(); this->Close();
} }
virtual size_t Read(void *ptr, size_t size) { virtual size_t Read(void *ptr, size_t size) {
@ -40,6 +42,9 @@ class HDFSStream : public utils::ISeekStream {
int errsv = errno; int errsv = errno;
utils::Error("HDFSStream.Read Error:%s", strerror(errsv)); utils::Error("HDFSStream.Read Error:%s", strerror(errsv));
} }
if (nread == 0) {
at_end_ = true;
}
return static_cast<size_t>(nread); return static_cast<size_t>(nread);
} }
virtual void Write(const void *ptr, size_t size) { virtual void Write(const void *ptr, size_t size) {
@ -56,8 +61,8 @@ class HDFSStream : public utils::ISeekStream {
} }
virtual void Seek(size_t pos) { virtual void Seek(size_t pos) {
if (hdfsSeek(fs_, fp_, pos) != 0) { if (hdfsSeek(fs_, fp_, pos) != 0) {
int errsv = errno; int errsv = errno;
utils::Error("HDFSStream.Seek Error:%s", strerror(errsv)); utils::Error("HDFSStream.Seek Error:%s", strerror(errsv));
} }
} }
virtual size_t Tell(void) { virtual size_t Tell(void) {
@ -68,8 +73,11 @@ class HDFSStream : public utils::ISeekStream {
} }
return static_cast<size_t>(offset); return static_cast<size_t>(offset);
} }
virtual bool AtEnd(void) const {
return at_end_;
}
inline void Close(void) { inline void Close(void) {
if (fp != NULL) { if (fp_ != NULL) {
if (hdfsCloseFile(fs_, fp_) == 0) { if (hdfsCloseFile(fs_, fp_) == 0) {
int errsv = errno; int errsv = errno;
utils::Error("HDFSStream.Close Error:%s", strerror(errsv)); utils::Error("HDFSStream.Close Error:%s", strerror(errsv));
@ -81,14 +89,15 @@ class HDFSStream : public utils::ISeekStream {
private: private:
hdfsFS fs_; hdfsFS fs_;
hdfsFile fp_; hdfsFile fp_;
bool at_end_;
}; };
/*! \brief line split from normal file system */ /*! \brief line split from normal file system */
class HDFSSplit : public LineSplitBase { class HDFSSplit : public LineSplitBase {
public: public:
explicit FileSplit(const char *uri, unsigned rank, unsigned nsplit) { explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) {
fs_ = hdfsConnect("default", 0); fs_ = hdfsConnect("default", 0);
std::string paths; std::vector<std::string> paths;
LineSplitBase::SplitNames(&paths, uri, "#"); LineSplitBase::SplitNames(&paths, uri, "#");
// get the files // get the files
std::vector<size_t> fsize; std::vector<size_t> fsize;
@ -103,16 +112,16 @@ class HDFSSplit : public LineSplitBase {
fnames_.push_back(std::string(files[i].mName)); fnames_.push_back(std::string(files[i].mName));
} }
} }
hdfsFileInfo(files, nentry); hdfsFreeFileInfo(files, nentry);
} else { } else {
fsize.push_back(info->mSize); fsize.push_back(info->mSize);
fnames_.push_back(std::string(info->mName)); fnames_.push_back(std::string(info->mName));
} }
hdfsFileInfo(info, 1); hdfsFreeFileInfo(info, 1);
} }
LineSplitBase::Init(fsize, rank, nsplit); LineSplitBase::Init(fsize, rank, nsplit);
} }
virtual ~FileSplit(void) {} virtual ~HDFSSplit(void) {}
protected: protected:
virtual utils::ISeekStream *GetFile(size_t file_index) { virtual utils::ISeekStream *GetFile(size_t file_index) {

View File

@ -8,8 +8,7 @@
#include <vector> #include <vector>
#include <utility> #include <utility>
#include <cstring> #include <cstring>
#include <iostream> #include <string>
#include <fstream>
#include "../../include/rabit.h" #include "../../include/rabit.h"
#include "./io.h" #include "./io.h"
#include "./buffer_reader-inl.h" #include "./buffer_reader-inl.h"

View File

@ -13,4 +13,3 @@ kmeans.rabit: kmeans.o lib
kmeans.mock: kmeans.o lib kmeans.mock: kmeans.o lib
kmeans.mpi: kmeans.o libmpi kmeans.mpi: kmeans.o libmpi
kmeans.o: kmeans.cc ../../src/*.h kmeans.o: kmeans.cc ../../src/*.h

View File

@ -6,6 +6,7 @@ MPIBIN =
OBJ = linear.o OBJ = linear.o
# common build script for programs # common build script for programs
include ../make/config.mk
include ../make/common.mk include ../make/common.mk
CFLAGS+=-fopenmp CFLAGS+=-fopenmp
linear.o: linear.cc ../../src/*.h linear.h ../solver/*.h linear.o: linear.cc ../../src/*.h linear.h ../solver/*.h

View File

@ -5,13 +5,12 @@ export CFLAGS = -Wall -msse2 -Wno-unknown-pragmas -fPIC -I../../include
# setup opencv # setup opencv
ifeq ($(USE_HDFS),1) ifeq ($(USE_HDFS),1)
CFLAGS+= -DRABIT_USE_HDFS=1 -I$(HDFS_HOME)/include CFLAGS+= -DRABIT_USE_HDFS=1 -I$(LIBHDFS_INCLUDE) -I$(JAVA_HOME)/include
LDFLAGS+= -L$(HDFS_HOME)/lib/native -lhdfs LDFLAGS+= -L$(HDFS_HOME)/lib/native -lhdfs
else else
CFLAGS+= -DRABIT_USE_HDFS=0 CFLAGS+= -DRABIT_USE_HDFS=0
endif endif
.PHONY: clean all lib mpi .PHONY: clean all lib mpi
all: $(BIN) $(MOCKBIN) all: $(BIN) $(MOCKBIN)