xgboost/rabit-learn/io/hdfs-inl.h
tqchen 57b5d7873f Squashed 'subtree/rabit/' changes from d4ec037..28ca7be
28ca7be add linear readme
ca4b20f add linear readme
1133628 add linear readme
6a11676 update docs
a607047 Update build.sh
2c1cfd8 complete yarn
4f28e32 change formater
2fbda81 fix stdin input
3258bcf checkin yarn master
67ebf81 allow setup from env variables
9b6bf57 fix hdfs
395d5c2 add make system
88ce767 refactor io, initial hdfs file access need test
19be870 chgs
a1bd3c6 Merge branch 'master' of ssh://github.com/tqchen/rabit
1a573f9 introduce input split
29476f1 fix timer issue

git-subtree-dir: subtree/rabit
git-subtree-split: 28ca7becbdf6503e6b1398588a969efb164c9701
2015-03-09 13:28:38 -07:00

141 lines
3.8 KiB
C++

#ifndef RABIT_LEARN_IO_HDFS_INL_H_
#define RABIT_LEARN_IO_HDFS_INL_H_
/*!
* \file hdfs-inl.h
* \brief HDFS I/O
* \author Tianqi Chen
*/
#include <string>
#include <vector>
#include <hdfs.h>
#include <errno.h>
#include "./io.h"
#include "./line_split-inl.h"
/*! \brief io interface */
namespace rabit {
namespace io {
class HDFSStream : public utils::ISeekStream {
public:
HDFSStream(hdfsFS fs, const char *fname, const char *mode)
: fs_(fs), at_end_(false) {
int flag;
if (!strcmp(mode, "r")) {
flag = O_RDONLY;
} else if (!strcmp(mode, "w")) {
flag = O_WRONLY;
} else if (!strcmp(mode, "a")) {
flag = O_WRONLY | O_APPEND;
} else {
utils::Error("HDFSStream: unknown flag %s", mode);
}
fp_ = hdfsOpenFile(fs_, fname, flag, 0, 0, 0);
utils::Check(fp_ != NULL,
"HDFSStream: fail to open %s", fname);
}
virtual ~HDFSStream(void) {
this->Close();
}
virtual size_t Read(void *ptr, size_t size) {
tSize nread = hdfsRead(fs_, fp_, ptr, size);
if (nread == -1) {
int errsv = errno;
utils::Error("HDFSStream.Read Error:%s", strerror(errsv));
}
if (nread == 0) {
at_end_ = true;
}
return static_cast<size_t>(nread);
}
virtual void Write(const void *ptr, size_t size) {
const char *buf = reinterpret_cast<const char*>(ptr);
while (size != 0) {
tSize nwrite = hdfsWrite(fs_, fp_, buf, size);
if (nwrite == -1) {
int errsv = errno;
utils::Error("HDFSStream.Write Error:%s", strerror(errsv));
}
size_t sz = static_cast<size_t>(nwrite);
buf += sz; size -= sz;
}
}
virtual void Seek(size_t pos) {
if (hdfsSeek(fs_, fp_, pos) != 0) {
int errsv = errno;
utils::Error("HDFSStream.Seek Error:%s", strerror(errsv));
}
}
virtual size_t Tell(void) {
tOffset offset = hdfsTell(fs_, fp_);
if (offset == -1) {
int errsv = errno;
utils::Error("HDFSStream.Tell Error:%s", strerror(errsv));
}
return static_cast<size_t>(offset);
}
virtual bool AtEnd(void) const {
return at_end_;
}
inline void Close(void) {
if (fp_ != NULL) {
if (hdfsCloseFile(fs_, fp_) == -1) {
int errsv = errno;
utils::Error("HDFSStream.Close Error:%s", strerror(errsv));
}
fp_ = NULL;
}
}
private:
hdfsFS fs_;
hdfsFile fp_;
bool at_end_;
};
/*! \brief line split from normal file system */
class HDFSSplit : public LineSplitBase {
public:
explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) {
fs_ = hdfsConnect("default", 0);
std::vector<std::string> paths;
LineSplitBase::SplitNames(&paths, uri, "#");
// get the files
std::vector<size_t> fsize;
for (size_t i = 0; i < paths.size(); ++i) {
hdfsFileInfo *info = hdfsGetPathInfo(fs_, paths[i].c_str());
if (info->mKind == 'D') {
int nentry;
hdfsFileInfo *files = hdfsListDirectory(fs_, info->mName, &nentry);
for (int i = 0; i < nentry; ++i) {
if (files[i].mKind == 'F') {
fsize.push_back(files[i].mSize);
fnames_.push_back(std::string(files[i].mName));
}
}
hdfsFreeFileInfo(files, nentry);
} else {
fsize.push_back(info->mSize);
fnames_.push_back(std::string(info->mName));
}
hdfsFreeFileInfo(info, 1);
}
LineSplitBase::Init(fsize, rank, nsplit);
}
virtual ~HDFSSplit(void) {}
protected:
virtual utils::ISeekStream *GetFile(size_t file_index) {
utils::Assert(file_index < fnames_.size(), "file index exceed bound");
return new HDFSStream(fs_, fnames_[file_index].c_str(), "r");
}
private:
// hdfs handle
hdfsFS fs_;
// file names
std::vector<std::string> fnames_;
};
} // namespace io
} // namespace rabit
#endif // RABIT_LEARN_IO_HDFS_INL_H_