From 1e56ba86d9d3e44b14c0a8f5ff71369307dbe86c Mon Sep 17 00:00:00 2001 From: tqchen Date: Mon, 27 Apr 2015 15:58:57 -0700 Subject: [PATCH] Squashed 'subtree/rabit/' changes from fed1683..e1ddcc2 e1ddcc2 Merge branch 'master' of ssh://github.com/dmlc/rabit 6745667 new dmlc io c5b4610 sge scheduler change git-subtree-dir: subtree/rabit git-subtree-split: e1ddcc2eb70f4a3e34c6c1b67b4d9671bfe62b97 --- include/dmlc/io.h | 26 ++++++++++++++++++++++---- tracker/rabit_sge.py | 3 ++- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/include/dmlc/io.h b/include/dmlc/io.h index 3db41c109..e42585cdf 100644 --- a/include/dmlc/io.h +++ b/include/dmlc/io.h @@ -36,13 +36,17 @@ class Stream { virtual ~Stream(void) {} /*! * \brief generic factory function - * create an stream, the stream will close the underlying files - * upon deletion + * create an stream, the stream will close the underlying files upon deletion + * * \param uri the uri of the input currently we support * hdfs://, s3://, and file:// by default file:// will be used * \param flag can be "w", "r", "a" + * \param allow_null whether NULL can be returned, or directly report error + * \return the created stream, can be NULL when allow_null == true and file do not exist */ - static Stream *Create(const char *uri, const char* const flag); + static Stream *Create(const char *uri, + const char* const flag, + bool allow_null = false); // helper functions to write/read different data structures /*! * \brief writes a vector @@ -80,7 +84,19 @@ class SeekStream: public Stream { /*! \brief tell the position of the stream */ virtual size_t Tell(void) = 0; /*! \return whether we are at end of file */ - virtual bool AtEnd(void) const = 0; + virtual bool AtEnd(void) const = 0; + /*! + * \brief generic factory function + * create an SeekStream for read only, + * the stream will close the underlying files upon deletion + * error will be reported and the system will exit when create failed + * \param uri the uri of the input currently we support + * hdfs://, s3://, and file:// by default file:// will be used + * \param allow_null whether NULL can be returned, or directly report error + * \return the created stream, can be NULL when allow_null == true and file do not exist + */ + static SeekStream *CreateForRead(const char *uri, + bool allow_null = false); }; /*! \brief interface for serializable objects */ @@ -114,6 +130,8 @@ class InputSplit { /*! \brief size of the memory region */ size_t size; }; + /*! \brief reset the position of InputSplit to beginning */ + virtual void BeforeFirst(void) = 0; /*! * \brief get the next record, the returning value * is valid until next call to NextRecord or NextChunk diff --git a/tracker/rabit_sge.py b/tracker/rabit_sge.py index 0a6130639..3026a4fcb 100755 --- a/tracker/rabit_sge.py +++ b/tracker/rabit_sge.py @@ -38,6 +38,7 @@ else: runscript = '%s/runrabit.sh' % args.logdir fo = open(runscript, 'w') +fo.write('source ~/.bashrc\n') fo.write('\"$@\"\n') fo.close() # @@ -51,7 +52,7 @@ def sge_submit(nslave, worker_args, worker_envs): nslave number of slave process to start up args arguments to launch each job this usually includes the parameters of master_uri and parameters passed into submit - """ + """ env_arg = ','.join(['%s=\"%s\"' % (k, str(v)) for k, v in worker_envs.items()]) cmd = 'qsub -cwd -t 1-%d -S /bin/bash' % nslave if args.queue != 'default':