Squashed 'subtree/rabit/' changes from d4ec037..28ca7be

28ca7be add linear readme
ca4b20f add linear readme
1133628 add linear readme
6a11676 update docs
a607047 Update build.sh
2c1cfd8 complete yarn
4f28e32 change formater
2fbda81 fix stdin input
3258bcf checkin yarn master
67ebf81 allow setup from env variables
9b6bf57 fix hdfs
395d5c2 add make system
88ce767 refactor io, initial hdfs file access need test
19be870 chgs
a1bd3c6 Merge branch 'master' of ssh://github.com/tqchen/rabit
1a573f9 introduce input split
29476f1 fix timer issue

git-subtree-dir: subtree/rabit
git-subtree-split: 28ca7becbd
This commit is contained in:
tqchen
2015-03-09 13:28:38 -07:00
parent ef2de29f06
commit 57b5d7873f
43 changed files with 1797 additions and 235 deletions

218
rabit-learn/io/base64-inl.h Normal file
View File

@@ -0,0 +1,218 @@
#ifndef RABIT_LEARN_IO_BASE64_INL_H_
#define RABIT_LEARN_IO_BASE64_INL_H_
/*!
* \file base64.h
* \brief data stream support to input and output from/to base64 stream
* base64 is easier to store and pass as text format in mapreduce
* \author Tianqi Chen
*/
#include <cctype>
#include <cstdio>
#include "./io.h"
#include "./buffer_reader-inl.h"
namespace rabit {
namespace io {
/*! \brief namespace of base64 decoding and encoding table */
namespace base64 {
const char DecodeTable[] = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
62, // '+'
0, 0, 0,
63, // '/'
52, 53, 54, 55, 56, 57, 58, 59, 60, 61, // '0'-'9'
0, 0, 0, 0, 0, 0, 0,
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, // 'A'-'Z'
0, 0, 0, 0, 0, 0,
26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38,
39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, // 'a'-'z'
};
static const char EncodeTable[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
} // namespace base64
/*! \brief the stream that reads from base64, note we take from file pointers */
class Base64InStream: public IStream {
public:
explicit Base64InStream(IStream *fs) : reader_(256) {
reader_.set_stream(fs);
num_prev = 0; tmp_ch = 0;
}
/*!
* \brief initialize the stream position to beginning of next base64 stream
* call this function before actually start read
*/
inline void InitPosition(void) {
// get a charater
do {
tmp_ch = reader_.GetChar();
} while (isspace(tmp_ch));
}
/*! \brief whether current position is end of a base64 stream */
inline bool IsEOF(void) const {
return num_prev == 0 && (tmp_ch == EOF || isspace(tmp_ch));
}
virtual size_t Read(void *ptr, size_t size) {
using base64::DecodeTable;
if (size == 0) return 0;
// use tlen to record left size
size_t tlen = size;
unsigned char *cptr = static_cast<unsigned char*>(ptr);
// if anything left, load from previous buffered result
if (num_prev != 0) {
if (num_prev == 2) {
if (tlen >= 2) {
*cptr++ = buf_prev[0];
*cptr++ = buf_prev[1];
tlen -= 2;
num_prev = 0;
} else {
// assert tlen == 1
*cptr++ = buf_prev[0]; --tlen;
buf_prev[0] = buf_prev[1];
num_prev = 1;
}
} else {
// assert num_prev == 1
*cptr++ = buf_prev[0]; --tlen; num_prev = 0;
}
}
if (tlen == 0) return size;
int nvalue;
// note: everything goes with 4 bytes in Base64
// so we process 4 bytes a unit
while (tlen && tmp_ch != EOF && !isspace(tmp_ch)) {
// first byte
nvalue = DecodeTable[tmp_ch] << 18;
{
// second byte
utils::Check((tmp_ch = reader_.GetChar(), tmp_ch != EOF && !isspace(tmp_ch)),
"invalid base64 format");
nvalue |= DecodeTable[tmp_ch] << 12;
*cptr++ = (nvalue >> 16) & 0xFF; --tlen;
}
{
// third byte
utils::Check((tmp_ch = reader_.GetChar(), tmp_ch != EOF && !isspace(tmp_ch)),
"invalid base64 format");
// handle termination
if (tmp_ch == '=') {
utils::Check((tmp_ch = reader_.GetChar(), tmp_ch == '='), "invalid base64 format");
utils::Check((tmp_ch = reader_.GetChar(), tmp_ch == EOF || isspace(tmp_ch)),
"invalid base64 format");
break;
}
nvalue |= DecodeTable[tmp_ch] << 6;
if (tlen) {
*cptr++ = (nvalue >> 8) & 0xFF; --tlen;
} else {
buf_prev[num_prev++] = (nvalue >> 8) & 0xFF;
}
}
{
// fourth byte
utils::Check((tmp_ch = reader_.GetChar(), tmp_ch != EOF && !isspace(tmp_ch)),
"invalid base64 format");
if (tmp_ch == '=') {
utils::Check((tmp_ch = reader_.GetChar(), tmp_ch == EOF || isspace(tmp_ch)),
"invalid base64 format");
break;
}
nvalue |= DecodeTable[tmp_ch];
if (tlen) {
*cptr++ = nvalue & 0xFF; --tlen;
} else {
buf_prev[num_prev ++] = nvalue & 0xFF;
}
}
// get next char
tmp_ch = reader_.GetChar();
}
if (kStrictCheck) {
utils::Check(tlen == 0, "Base64InStream: read incomplete");
}
return size - tlen;
}
virtual void Write(const void *ptr, size_t size) {
utils::Error("Base64InStream do not support write");
}
private:
StreamBufferReader reader_;
int tmp_ch;
int num_prev;
unsigned char buf_prev[2];
// whether we need to do strict check
static const bool kStrictCheck = false;
};
/*! \brief the stream that write to base64, note we take from file pointers */
class Base64OutStream: public IStream {
public:
explicit Base64OutStream(IStream *fp) : fp(fp) {
buf_top = 0;
}
virtual void Write(const void *ptr, size_t size) {
using base64::EncodeTable;
size_t tlen = size;
const unsigned char *cptr = static_cast<const unsigned char*>(ptr);
while (tlen) {
while (buf_top < 3 && tlen != 0) {
buf[++buf_top] = *cptr++; --tlen;
}
if (buf_top == 3) {
// flush 4 bytes out
PutChar(EncodeTable[buf[1] >> 2]);
PutChar(EncodeTable[((buf[1] << 4) | (buf[2] >> 4)) & 0x3F]);
PutChar(EncodeTable[((buf[2] << 2) | (buf[3] >> 6)) & 0x3F]);
PutChar(EncodeTable[buf[3] & 0x3F]);
buf_top = 0;
}
}
}
virtual size_t Read(void *ptr, size_t size) {
utils::Error("Base64OutStream do not support read");
return 0;
}
/*!
* \brief finish writing of all current base64 stream, do some post processing
* \param endch charater to put to end of stream, if it is EOF, then nothing will be done
*/
inline void Finish(char endch = EOF) {
using base64::EncodeTable;
if (buf_top == 1) {
PutChar(EncodeTable[buf[1] >> 2]);
PutChar(EncodeTable[(buf[1] << 4) & 0x3F]);
PutChar('=');
PutChar('=');
}
if (buf_top == 2) {
PutChar(EncodeTable[buf[1] >> 2]);
PutChar(EncodeTable[((buf[1] << 4) | (buf[2] >> 4)) & 0x3F]);
PutChar(EncodeTable[(buf[2] << 2) & 0x3F]);
PutChar('=');
}
buf_top = 0;
if (endch != EOF) PutChar(endch);
this->Flush();
}
private:
IStream *fp;
int buf_top;
unsigned char buf[4];
std::string out_buf;
const static size_t kBufferSize = 256;
inline void PutChar(char ch) {
out_buf += ch;
if (out_buf.length() >= kBufferSize) Flush();
}
inline void Flush(void) {
fp->Write(BeginPtr(out_buf), out_buf.length());
out_buf.clear();
}
};
} // namespace utils
} // namespace rabit
#endif // RABIT_LEARN_UTILS_BASE64_INL_H_

View File

@@ -0,0 +1,57 @@
#ifndef RABIT_LEARN_IO_BUFFER_READER_INL_H_
#define RABIT_LEARN_IO_BUFFER_READER_INL_H_
/*!
* \file buffer_reader-inl.h
* \brief implementation of stream buffer reader
* \author Tianqi Chen
*/
#include "./io.h"
namespace rabit {
namespace io {
/*! \brief buffer reader of the stream that allows you to get */
class StreamBufferReader {
public:
StreamBufferReader(size_t buffer_size)
:stream_(NULL),
read_len_(1), read_ptr_(1) {
buffer_.resize(buffer_size);
}
/*!
* \brief set input stream
*/
inline void set_stream(IStream *stream) {
stream_ = stream;
read_len_ = read_ptr_ = 1;
}
/*!
* \brief allows quick read using get char
*/
inline char GetChar(void) {
while (true) {
if (read_ptr_ < read_len_) {
return buffer_[read_ptr_++];
} else {
read_len_ = stream_->Read(&buffer_[0], buffer_.length());
if (read_len_ == 0) return EOF;
read_ptr_ = 0;
}
}
}
inline bool AtEnd(void) const {
return read_len_ == 0;
}
private:
/*! \brief the underlying stream */
IStream *stream_;
/*! \brief buffer to hold data */
std::string buffer_;
/*! \brief length of valid data in buffer */
size_t read_len_;
/*! \brief pointer in the buffer */
size_t read_ptr_;
};
} // namespace io
} // namespace rabit
#endif // RABIT_LEARN_IO_BUFFER_READER_INL_H_

106
rabit-learn/io/file-inl.h Normal file
View File

@@ -0,0 +1,106 @@
#ifndef RABIT_LEARN_IO_FILE_INL_H_
#define RABIT_LEARN_IO_FILE_INL_H_
/*!
* \file file-inl.h
* \brief normal filesystem I/O
* \author Tianqi Chen
*/
#include <string>
#include <vector>
#include <cstdio>
#include "./io.h"
#include "./line_split-inl.h"
/*! \brief io interface */
namespace rabit {
namespace io {
/*! \brief implementation of file i/o stream */
class FileStream : public utils::ISeekStream {
public:
explicit FileStream(const char *fname, const char *mode)
: use_stdio(false) {
#ifndef RABIT_STRICT_CXX98_
if (!strcmp(fname, "stdin")) {
use_stdio = true; fp = stdin;
}
if (!strcmp(fname, "stdout")) {
use_stdio = true; fp = stdout;
}
#endif
if (!strncmp(fname, "file://", 7)) fname += 7;
if (!use_stdio) {
std::string flag = mode;
if (flag == "w") flag = "wb";
if (flag == "r") flag = "rb";
fp = utils::FopenCheck(fname, flag.c_str());
}
}
virtual ~FileStream(void) {
this->Close();
}
virtual size_t Read(void *ptr, size_t size) {
return std::fread(ptr, 1, size, fp);
}
virtual void Write(const void *ptr, size_t size) {
std::fwrite(ptr, size, 1, fp);
}
virtual void Seek(size_t pos) {
std::fseek(fp, static_cast<long>(pos), SEEK_SET);
}
virtual size_t Tell(void) {
return std::ftell(fp);
}
virtual bool AtEnd(void) const {
return feof(fp) != 0;
}
inline void Close(void) {
if (fp != NULL && !use_stdio) {
std::fclose(fp); fp = NULL;
}
}
private:
FILE *fp;
bool use_stdio;
};
/*! \brief line split from normal file system */
class FileSplit : public LineSplitBase {
public:
explicit FileSplit(const char *uri, unsigned rank, unsigned nsplit) {
LineSplitBase::SplitNames(&fnames_, uri, "#");
std::vector<size_t> fsize;
for (size_t i = 0; i < fnames_.size(); ++i) {
if (!strncmp(fnames_[i].c_str(), "file://", 7)) {
std::string tmp = fnames_[i].c_str() + 7;
fnames_[i] = tmp;
}
fsize.push_back(GetFileSize(fnames_[i].c_str()));
}
LineSplitBase::Init(fsize, rank, nsplit);
}
virtual ~FileSplit(void) {}
protected:
virtual utils::ISeekStream *GetFile(size_t file_index) {
utils::Assert(file_index < fnames_.size(), "file index exceed bound");
return new FileStream(fnames_[file_index].c_str(), "rb");
}
// get file size
inline static size_t GetFileSize(const char *fname) {
FILE *fp = utils::FopenCheck(fname, "rb");
// NOTE: fseek may not be good, but serves as ok solution
fseek(fp, 0, SEEK_END);
size_t fsize = static_cast<size_t>(ftell(fp));
fclose(fp);
return fsize;
}
private:
// file names
std::vector<std::string> fnames_;
};
} // namespace io
} // namespace rabit
#endif // RABIT_LEARN_IO_FILE_INL_H_

140
rabit-learn/io/hdfs-inl.h Normal file
View File

@@ -0,0 +1,140 @@
#ifndef RABIT_LEARN_IO_HDFS_INL_H_
#define RABIT_LEARN_IO_HDFS_INL_H_
/*!
* \file hdfs-inl.h
* \brief HDFS I/O
* \author Tianqi Chen
*/
#include <string>
#include <vector>
#include <hdfs.h>
#include <errno.h>
#include "./io.h"
#include "./line_split-inl.h"
/*! \brief io interface */
namespace rabit {
namespace io {
class HDFSStream : public utils::ISeekStream {
public:
HDFSStream(hdfsFS fs, const char *fname, const char *mode)
: fs_(fs), at_end_(false) {
int flag;
if (!strcmp(mode, "r")) {
flag = O_RDONLY;
} else if (!strcmp(mode, "w")) {
flag = O_WRONLY;
} else if (!strcmp(mode, "a")) {
flag = O_WRONLY | O_APPEND;
} else {
utils::Error("HDFSStream: unknown flag %s", mode);
}
fp_ = hdfsOpenFile(fs_, fname, flag, 0, 0, 0);
utils::Check(fp_ != NULL,
"HDFSStream: fail to open %s", fname);
}
virtual ~HDFSStream(void) {
this->Close();
}
virtual size_t Read(void *ptr, size_t size) {
tSize nread = hdfsRead(fs_, fp_, ptr, size);
if (nread == -1) {
int errsv = errno;
utils::Error("HDFSStream.Read Error:%s", strerror(errsv));
}
if (nread == 0) {
at_end_ = true;
}
return static_cast<size_t>(nread);
}
virtual void Write(const void *ptr, size_t size) {
const char *buf = reinterpret_cast<const char*>(ptr);
while (size != 0) {
tSize nwrite = hdfsWrite(fs_, fp_, buf, size);
if (nwrite == -1) {
int errsv = errno;
utils::Error("HDFSStream.Write Error:%s", strerror(errsv));
}
size_t sz = static_cast<size_t>(nwrite);
buf += sz; size -= sz;
}
}
virtual void Seek(size_t pos) {
if (hdfsSeek(fs_, fp_, pos) != 0) {
int errsv = errno;
utils::Error("HDFSStream.Seek Error:%s", strerror(errsv));
}
}
virtual size_t Tell(void) {
tOffset offset = hdfsTell(fs_, fp_);
if (offset == -1) {
int errsv = errno;
utils::Error("HDFSStream.Tell Error:%s", strerror(errsv));
}
return static_cast<size_t>(offset);
}
virtual bool AtEnd(void) const {
return at_end_;
}
inline void Close(void) {
if (fp_ != NULL) {
if (hdfsCloseFile(fs_, fp_) == -1) {
int errsv = errno;
utils::Error("HDFSStream.Close Error:%s", strerror(errsv));
}
fp_ = NULL;
}
}
private:
hdfsFS fs_;
hdfsFile fp_;
bool at_end_;
};
/*! \brief line split from normal file system */
class HDFSSplit : public LineSplitBase {
public:
explicit HDFSSplit(const char *uri, unsigned rank, unsigned nsplit) {
fs_ = hdfsConnect("default", 0);
std::vector<std::string> paths;
LineSplitBase::SplitNames(&paths, uri, "#");
// get the files
std::vector<size_t> fsize;
for (size_t i = 0; i < paths.size(); ++i) {
hdfsFileInfo *info = hdfsGetPathInfo(fs_, paths[i].c_str());
if (info->mKind == 'D') {
int nentry;
hdfsFileInfo *files = hdfsListDirectory(fs_, info->mName, &nentry);
for (int i = 0; i < nentry; ++i) {
if (files[i].mKind == 'F') {
fsize.push_back(files[i].mSize);
fnames_.push_back(std::string(files[i].mName));
}
}
hdfsFreeFileInfo(files, nentry);
} else {
fsize.push_back(info->mSize);
fnames_.push_back(std::string(info->mName));
}
hdfsFreeFileInfo(info, 1);
}
LineSplitBase::Init(fsize, rank, nsplit);
}
virtual ~HDFSSplit(void) {}
protected:
virtual utils::ISeekStream *GetFile(size_t file_index) {
utils::Assert(file_index < fnames_.size(), "file index exceed bound");
return new HDFSStream(fs_, fnames_[file_index].c_str(), "r");
}
private:
// hdfs handle
hdfsFS fs_;
// file names
std::vector<std::string> fnames_;
};
} // namespace io
} // namespace rabit
#endif // RABIT_LEARN_IO_HDFS_INL_H_

65
rabit-learn/io/io-inl.h Normal file
View File

@@ -0,0 +1,65 @@
#ifndef RABIT_LEARN_IO_IO_INL_H_
#define RABIT_LEARN_IO_IO_INL_H_
/*!
* \file io-inl.h
* \brief Input/Output utils that handles read/write
* of files in distrubuted enviroment
* \author Tianqi Chen
*/
#include <cstring>
#include "./io.h"
#if RABIT_USE_HDFS
#include "./hdfs-inl.h"
#endif
#include "./file-inl.h"
namespace rabit {
namespace io {
/*!
* \brief create input split given a uri
* \param uri the uri of the input, can contain hdfs prefix
* \param part the part id of current input
* \param nsplit total number of splits
*/
inline InputSplit *CreateInputSplit(const char *uri,
unsigned part,
unsigned nsplit) {
if (!strcmp(uri, "stdin")) {
return new SingleFileSplit(uri);
}
if (!strncmp(uri, "file://", 7)) {
return new FileSplit(uri, part, nsplit);
}
if (!strncmp(uri, "hdfs://", 7)) {
#if RABIT_USE_HDFS
return new HDFSSplit(uri, part, nsplit);
#else
utils::Error("Please compile with RABIT_USE_HDFS=1");
#endif
}
return new FileSplit(uri, part, nsplit);
}
/*!
* \brief create an stream, the stream must be able to close
* the underlying resources(files) when deleted
*
* \param uri the uri of the input, can contain hdfs prefix
* \param mode can be 'w' or 'r' for read or write
*/
inline IStream *CreateStream(const char *uri, const char *mode) {
if (!strncmp(uri, "file://", 7)) {
return new FileStream(uri + 7, mode);
}
if (!strncmp(uri, "hdfs://", 7)) {
#if RABIT_USE_HDFS
return new HDFSStream(hdfsConnect("default", 0), uri, mode);
#else
utils::Error("Please compile with RABIT_USE_HDFS=1");
#endif
}
return new FileStream(uri, mode);
}
} // namespace io
} // namespace rabit
#endif // RABIT_LEARN_IO_IO_INL_H_

61
rabit-learn/io/io.h Normal file
View File

@@ -0,0 +1,61 @@
#ifndef RABIT_LEARN_IO_IO_H_
#define RABIT_LEARN_IO_IO_H_
/*!
* \file io.h
* \brief Input/Output utils that handles read/write
* of files in distrubuted enviroment
* \author Tianqi Chen
*/
#include "../../include/rabit_serializable.h"
/*! \brief whether compile with HDFS support */
#ifndef RABIT_USE_HDFS
#define RABIT_USE_HDFS 0
#endif
/*! \brief io interface */
namespace rabit {
/*!
* \brief namespace to handle input split and filesystem interfacing
*/
namespace io {
typedef utils::ISeekStream ISeekStream;
/*!
* \brief user facing input split helper,
* can be used to get the partition of data used by current node
*/
class InputSplit {
public:
/*!
* \brief get next line, store into out_data
* \param out_data the string that stores the line data,
* \n is not included
* \return true of next line was found, false if we read all the lines
*/
virtual bool NextLine(std::string *out_data) = 0;
/*! \brief destructor*/
virtual ~InputSplit(void) {}
};
/*!
* \brief create input split given a uri
* \param uri the uri of the input, can contain hdfs prefix
* \param part the part id of current input
* \param nsplit total number of splits
*/
inline InputSplit *CreateInputSplit(const char *uri,
unsigned part,
unsigned nsplit);
/*!
* \brief create an stream, the stream must be able to close
* the underlying resources(files) when deleted
*
* \param uri the uri of the input, can contain hdfs prefix
* \param mode can be 'w' or 'r' for read or write
*/
inline IStream *CreateStream(const char *uri, const char *mode);
} // namespace io
} // namespace rabit
#include "./io-inl.h"
#include "./base64-inl.h"
#endif // RABIT_LEARN_IO_IO_H_

View File

@@ -0,0 +1,181 @@
#ifndef RABIT_LEARN_IO_LINE_SPLIT_INL_H_
#define RABIT_LEARN_IO_LINE_SPLIT_INL_H_
/*!
* \file line_split-inl.h
* \brief base implementation of line-spliter
* \author Tianqi Chen
*/
#include <vector>
#include <utility>
#include <cstring>
#include <string>
#include "../../include/rabit.h"
#include "./io.h"
#include "./buffer_reader-inl.h"
namespace rabit {
namespace io {
class LineSplitBase : public InputSplit {
public:
virtual ~LineSplitBase() {
if (fs_ != NULL) delete fs_;
}
virtual bool NextLine(std::string *out_data) {
if (file_ptr_ >= file_ptr_end_ &&
offset_curr_ >= offset_end_) return false;
out_data->clear();
while (true) {
char c = reader_.GetChar();
if (reader_.AtEnd()) {
if (out_data->length() != 0) return true;
file_ptr_ += 1;
if (offset_curr_ != file_offset_[file_ptr_]) {
utils::Error("warning:file size not calculated correctly\n");
offset_curr_ = file_offset_[file_ptr_];
}
if (offset_curr_ >= offset_end_) return false;
utils::Assert(file_ptr_ + 1 < file_offset_.size(),
"boundary check");
delete fs_;
fs_ = this->GetFile(file_ptr_);
reader_.set_stream(fs_);
} else {
++offset_curr_;
if (c != '\r' && c != '\n' && c != EOF) {
*out_data += c;
} else {
if (out_data->length() != 0) return true;
if (file_ptr_ >= file_ptr_end_ &&
offset_curr_ >= offset_end_) return false;
}
}
}
}
protected:
// constructor
LineSplitBase(void)
: fs_(NULL), reader_(kBufferSize) {
}
/*!
* \brief initialize the line spliter,
* \param file_size, size of each files
* \param rank the current rank of the data
* \param nsplit number of split we will divide the data into
*/
inline void Init(const std::vector<size_t> &file_size,
unsigned rank, unsigned nsplit) {
file_offset_.resize(file_size.size() + 1);
file_offset_[0] = 0;
for (size_t i = 0; i < file_size.size(); ++i) {
file_offset_[i + 1] = file_offset_[i] + file_size[i];
}
size_t ntotal = file_offset_.back();
size_t nstep = (ntotal + nsplit - 1) / nsplit;
offset_begin_ = std::min(nstep * rank, ntotal);
offset_end_ = std::min(nstep * (rank + 1), ntotal);
offset_curr_ = offset_begin_;
if (offset_begin_ == offset_end_) return;
file_ptr_ = std::upper_bound(file_offset_.begin(),
file_offset_.end(),
offset_begin_) - file_offset_.begin() - 1;
file_ptr_end_ = std::upper_bound(file_offset_.begin(),
file_offset_.end(),
offset_end_) - file_offset_.begin() - 1;
fs_ = GetFile(file_ptr_);
reader_.set_stream(fs_);
// try to set the starting position correctly
if (file_offset_[file_ptr_] != offset_begin_) {
fs_->Seek(offset_begin_ - file_offset_[file_ptr_]);
while (true) {
char c = reader_.GetChar();
if (!reader_.AtEnd()) ++offset_curr_;
if (c == '\n' || c == '\r' || c == EOF) return;
}
}
}
/*!
* \brief get the seek stream of given file_index
* \return the corresponding seek stream at head of file
*/
virtual utils::ISeekStream *GetFile(size_t file_index) = 0;
/*!
* \brief split names given
* \param out_fname output file names
* \param uri_ the iput uri file
* \param dlm deliminetr
*/
inline static void SplitNames(std::vector<std::string> *out_fname,
const char *uri_,
const char *dlm) {
std::string uri = uri_;
char *p = strtok(BeginPtr(uri), dlm);
while (p != NULL) {
out_fname->push_back(std::string(p));
p = strtok(NULL, dlm);
}
}
private:
/*! \brief current input stream */
utils::ISeekStream *fs_;
/*! \brief file pointer of which file to read on */
size_t file_ptr_;
/*! \brief file pointer where the end of file lies */
size_t file_ptr_end_;
/*! \brief get the current offset */
size_t offset_curr_;
/*! \brief beginning of offset */
size_t offset_begin_;
/*! \brief end of the offset */
size_t offset_end_;
/*! \brief byte-offset of each file */
std::vector<size_t> file_offset_;
/*! \brief buffer reader */
StreamBufferReader reader_;
/*! \brief buffer size */
const static size_t kBufferSize = 256;
};
/*! \brief line split from single file */
class SingleFileSplit : public InputSplit {
public:
explicit SingleFileSplit(const char *fname) {
if (!strcmp(fname, "stdin")) {
#ifndef RABIT_STRICT_CXX98_
use_stdin_ = true; fp_ = stdin;
#endif
}
if (!use_stdin_) {
fp_ = utils::FopenCheck(fname, "r");
}
end_of_file_ = false;
}
virtual ~SingleFileSplit(void) {
if (!use_stdin_) fclose(fp_);
}
virtual bool NextLine(std::string *out_data) {
if (end_of_file_) return false;
out_data->clear();
while (true) {
char c = fgetc(fp_);
if (c == EOF) {
end_of_file_ = true;
}
if (c != '\r' && c != '\n' && c != EOF) {
*out_data += c;
} else {
if (out_data->length() != 0) return true;
if (end_of_file_) return false;
}
}
return false;
}
private:
FILE *fp_;
bool use_stdin_;
bool end_of_file_;
};
} // namespace io
} // namespace rabit
#endif // RABIT_LEARN_IO_LINE_SPLIT_INL_H_