test code

This commit is contained in:
tqchen 2015-03-20 13:02:46 -07:00
parent 7751b2b320
commit 2035799817
2 changed files with 17 additions and 5 deletions

View File

@ -23,6 +23,7 @@ class HDFSStream : public ISeekStream {
bool disconnect_when_done) bool disconnect_when_done)
: fs_(fs), at_end_(false), : fs_(fs), at_end_(false),
disconnect_when_done_(disconnect_when_done) { disconnect_when_done_(disconnect_when_done) {
fsbk_ = fs_;
int flag = 0; int flag = 0;
if (!strcmp(mode, "r")) { if (!strcmp(mode, "r")) {
flag = O_RDONLY; flag = O_RDONLY;
@ -44,6 +45,7 @@ class HDFSStream : public ISeekStream {
} }
} }
virtual size_t Read(void *ptr, size_t size) { virtual size_t Read(void *ptr, size_t size) {
CheckFS();
tSize nread = hdfsRead(fs_, fp_, ptr, size); tSize nread = hdfsRead(fs_, fp_, ptr, size);
if (nread == -1) { if (nread == -1) {
int errsv = errno; int errsv = errno;
@ -55,6 +57,7 @@ class HDFSStream : public ISeekStream {
return static_cast<size_t>(nread); return static_cast<size_t>(nread);
} }
virtual void Write(const void *ptr, size_t size) { virtual void Write(const void *ptr, size_t size) {
CheckFS();
const char *buf = reinterpret_cast<const char*>(ptr); const char *buf = reinterpret_cast<const char*>(ptr);
while (size != 0) { while (size != 0) {
tSize nwrite = hdfsWrite(fs_, fp_, buf, size); tSize nwrite = hdfsWrite(fs_, fp_, buf, size);
@ -67,12 +70,14 @@ class HDFSStream : public ISeekStream {
} }
} }
virtual void Seek(size_t pos) { virtual void Seek(size_t pos) {
CheckFS();
if (hdfsSeek(fs_, fp_, pos) != 0) { if (hdfsSeek(fs_, fp_, pos) != 0) {
int errsv = errno; int errsv = errno;
utils::Error("HDFSStream.Seek Error:%s", strerror(errsv)); utils::Error("HDFSStream.Seek Error:%s", strerror(errsv));
} }
} }
virtual size_t Tell(void) { virtual size_t Tell(void) {
CheckFS();
tOffset offset = hdfsTell(fs_, fp_); tOffset offset = hdfsTell(fs_, fp_);
if (offset == -1) { if (offset == -1) {
int errsv = errno; int errsv = errno;
@ -84,6 +89,7 @@ class HDFSStream : public ISeekStream {
return at_end_; return at_end_;
} }
inline void Close(void) { inline void Close(void) {
CheckFS();
if (fp_ != NULL) { if (fp_ != NULL) {
if (hdfsCloseFile(fs_, fp_) == -1) { if (hdfsCloseFile(fs_, fp_) == -1) {
int errsv = errno; int errsv = errno;
@ -94,10 +100,16 @@ class HDFSStream : public ISeekStream {
} }
private: private:
inline void CheckFS(void) const {
if (fs_ != fsbk_) {
rabit::TrackerPrintf("[%d] fs flag inconstent\n", rabit::GetRank());
}
}
hdfsFS fs_; hdfsFS fs_;
hdfsFile fp_; hdfsFile fp_;
bool at_end_; bool at_end_;
bool disconnect_when_done_; bool disconnect_when_done_;
hdfsFS fsbk_;
}; };
/*! \brief line split from normal file system */ /*! \brief line split from normal file system */

View File

@ -3,11 +3,11 @@
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
using namespace rabit::io; using namespace rabit::io;
if (argc < 2) { if (argc < 4) {
// intialize rabit engine // intialize rabit engine
rabit::Init(argc, argv); rabit::Init(argc, argv);
if (rabit::GetRank() == 0) { if (rabit::GetRank() == 0) {
rabit::TrackerPrintf("Usage: <data_in> param=val\n"); rabit::TrackerPrintf("Usage: <data_in> npart rank\n");
} }
rabit::Finalize(); rabit::Finalize();
return 0; return 0;
@ -15,8 +15,8 @@ int main(int argc, char *argv[]) {
rabit::Init(argc, argv); rabit::Init(argc, argv);
int n = 0; int n = 0;
InputSplit *in = CreateInputSplit(argv[1], InputSplit *in = CreateInputSplit(argv[1],
rabit::GetRank(), atoi(argv[2]),
rabit::GetWorldSize()); atoi(argv[3]));
std::string line; std::string line;
while (in->NextLine(&line)) { while (in->NextLine(&line)) {
if (n % 100 == 0) { if (n % 100 == 0) {