add namenode

This commit is contained in:
tqchen 2015-03-21 00:35:30 -07:00
parent 75a6d349c6
commit 14477f9f5a
4 changed files with 16 additions and 14 deletions

View File

@ -6,6 +6,7 @@
* \author Tianqi Chen * \author Tianqi Chen
*/ */
#include <string> #include <string>
#include <cstdlib>
#include <vector> #include <vector>
#include <hdfs.h> #include <hdfs.h>
#include <errno.h> #include <errno.h>
@ -23,7 +24,6 @@ class HDFSStream : public ISeekStream {
bool disconnect_when_done) bool disconnect_when_done)
: fs_(fs), at_end_(false), : fs_(fs), at_end_(false),
disconnect_when_done_(disconnect_when_done) { disconnect_when_done_(disconnect_when_done) {
fsbk_ = fs_;
int flag = 0; int flag = 0;
if (!strcmp(mode, "r")) { if (!strcmp(mode, "r")) {
flag = O_RDONLY; flag = O_RDONLY;
@ -45,7 +45,6 @@ class HDFSStream : public ISeekStream {
} }
} }
virtual size_t Read(void *ptr, size_t size) { virtual size_t Read(void *ptr, size_t size) {
CheckFS();
tSize nread = hdfsRead(fs_, fp_, ptr, size); tSize nread = hdfsRead(fs_, fp_, ptr, size);
if (nread == -1) { if (nread == -1) {
int errsv = errno; int errsv = errno;
@ -57,7 +56,6 @@ class HDFSStream : public ISeekStream {
return static_cast<size_t>(nread); return static_cast<size_t>(nread);
} }
virtual void Write(const void *ptr, size_t size) { virtual void Write(const void *ptr, size_t size) {
CheckFS();
const char *buf = reinterpret_cast<const char*>(ptr); const char *buf = reinterpret_cast<const char*>(ptr);
while (size != 0) { while (size != 0) {
tSize nwrite = hdfsWrite(fs_, fp_, buf, size); tSize nwrite = hdfsWrite(fs_, fp_, buf, size);
@ -70,14 +68,12 @@ class HDFSStream : public ISeekStream {
} }
} }
virtual void Seek(size_t pos) { virtual void Seek(size_t pos) {
CheckFS();
if (hdfsSeek(fs_, fp_, pos) != 0) { if (hdfsSeek(fs_, fp_, pos) != 0) {
int errsv = errno; int errsv = errno;
utils::Error("HDFSStream.Seek Error:%s", strerror(errsv)); utils::Error("HDFSStream.Seek Error:%s", strerror(errsv));
} }
} }
virtual size_t Tell(void) { virtual size_t Tell(void) {
CheckFS();
tOffset offset = hdfsTell(fs_, fp_); tOffset offset = hdfsTell(fs_, fp_);
if (offset == -1) { if (offset == -1) {
int errsv = errno; int errsv = errno;
@ -89,7 +85,6 @@ class HDFSStream : public ISeekStream {
return at_end_; return at_end_;
} }
inline void Close(void) { inline void Close(void) {
CheckFS();
if (fp_ != NULL) { if (fp_ != NULL) {
if (hdfsCloseFile(fs_, fp_) == -1) { if (hdfsCloseFile(fs_, fp_) == -1) {
int errsv = errno; int errsv = errno;
@ -99,24 +94,26 @@ class HDFSStream : public ISeekStream {
} }
} }
private: inline static std::string GetNameNode(void) {
inline void CheckFS(void) const { const char *nn = getenv("rabit_hdfs_namenode");
if (fs_ != fsbk_) { if (nn == NULL) {
rabit::TrackerPrintf("[%d] fs flag inconstent\n", rabit::GetRank()); return std::string("default");
} else {
return std::string(nn);
} }
} }
private:
hdfsFS fs_; hdfsFS fs_;
hdfsFile fp_; hdfsFile fp_;
bool at_end_; bool at_end_;
bool disconnect_when_done_; bool disconnect_when_done_;
hdfsFS fsbk_;
}; };
/*! \brief line split from normal file system */ /*! \brief line split from normal file system */
class HDFSProvider : public LineSplitter::IFileProvider { class HDFSProvider : public LineSplitter::IFileProvider {
public: public:
explicit HDFSProvider(const char *uri) { explicit HDFSProvider(const char *uri) {
fs_ = hdfsConnect("default", 0); fs_ = hdfsConnect(HDFSStream::GetNameNode().c_str(), 0);
utils::Check(fs_ != NULL, "error when connecting to default HDFS"); utils::Check(fs_ != NULL, "error when connecting to default HDFS");
std::vector<std::string> paths; std::vector<std::string> paths;
LineSplitter::SplitNames(&paths, uri, "#"); LineSplitter::SplitNames(&paths, uri, "#");

View File

@ -55,7 +55,8 @@ inline IStream *CreateStream(const char *uri, const char *mode) {
} }
if (!strncmp(uri, "hdfs://", 7)) { if (!strncmp(uri, "hdfs://", 7)) {
#if RABIT_USE_HDFS #if RABIT_USE_HDFS
return new HDFSStream(hdfsConnect("default", 0), uri, mode, true); return new HDFSStream(hdfsConnect(HDFSStream::GetNameNode().c_str(), 0),
uri, mode, true);
#else #else
utils::Error("Please compile with RABIT_USE_HDFS=1"); utils::Error("Please compile with RABIT_USE_HDFS=1");
#endif #endif

View File

@ -5,7 +5,7 @@ else
endif endif
include $(config) include $(config)
BIN = linear.rabit test.rabit BIN = linear.rabit
MOCKBIN= linear.mock MOCKBIN= linear.mock
MPIBIN = MPIBIN =
# objectives that makes up rabit library # objectives that makes up rabit library

View File

@ -61,6 +61,9 @@ parser.add_argument('-mem', '--memory_mb', default=1024, type=int,
'so that each node can occupy all the mapper slots in a machine for maximum performance') 'so that each node can occupy all the mapper slots in a machine for maximum performance')
parser.add_argument('--libhdfs-opts', default='-Xmx128m', type=str, parser.add_argument('--libhdfs-opts', default='-Xmx128m', type=str,
help = 'setting to be passed to libhdfs') help = 'setting to be passed to libhdfs')
parser.add_argument('--name-node', default='default', type=str,
help = 'the namenode address of hdfs, libhdfs should connect to, normally leave it as default')
parser.add_argument('command', nargs='+', parser.add_argument('command', nargs='+',
help = 'command for rabit program') help = 'command for rabit program')
args = parser.parse_args() args = parser.parse_args()
@ -118,6 +121,7 @@ def submit_yarn(nworker, worker_args, worker_env):
env['rabit_memory_mb'] = str(args.memory_mb) env['rabit_memory_mb'] = str(args.memory_mb)
env['rabit_world_size'] = str(args.nworker) env['rabit_world_size'] = str(args.nworker)
env['rabit_hdfs_opts'] = str(args.libhdfs_opts) env['rabit_hdfs_opts'] = str(args.libhdfs_opts)
env['rabit_hdfs_namenode'] = str(args.name_node)
if args.files != None: if args.files != None:
for flst in args.files: for flst in args.files: