Squashed 'subtree/rabit/' changes from fed1683..e1ddcc2

e1ddcc2 Merge branch 'master' of ssh://github.com/dmlc/rabit
6745667 new dmlc io
c5b4610 sge scheduler change

git-subtree-dir: subtree/rabit
git-subtree-split: e1ddcc2eb70f4a3e34c6c1b67b4d9671bfe62b97
This commit is contained in:
tqchen 2015-04-27 15:58:57 -07:00
parent d16b2c9670
commit 1e56ba86d9
2 changed files with 24 additions and 5 deletions

View File

@ -36,13 +36,17 @@ class Stream {
virtual ~Stream(void) {} virtual ~Stream(void) {}
/*! /*!
* \brief generic factory function * \brief generic factory function
* create an stream, the stream will close the underlying files * create an stream, the stream will close the underlying files upon deletion
* upon deletion *
* \param uri the uri of the input currently we support * \param uri the uri of the input currently we support
* hdfs://, s3://, and file:// by default file:// will be used * hdfs://, s3://, and file:// by default file:// will be used
* \param flag can be "w", "r", "a" * \param flag can be "w", "r", "a"
* \param allow_null whether NULL can be returned, or directly report error
* \return the created stream, can be NULL when allow_null == true and file do not exist
*/ */
static Stream *Create(const char *uri, const char* const flag); static Stream *Create(const char *uri,
const char* const flag,
bool allow_null = false);
// helper functions to write/read different data structures // helper functions to write/read different data structures
/*! /*!
* \brief writes a vector * \brief writes a vector
@ -80,7 +84,19 @@ class SeekStream: public Stream {
/*! \brief tell the position of the stream */ /*! \brief tell the position of the stream */
virtual size_t Tell(void) = 0; virtual size_t Tell(void) = 0;
/*! \return whether we are at end of file */ /*! \return whether we are at end of file */
virtual bool AtEnd(void) const = 0; virtual bool AtEnd(void) const = 0;
/*!
* \brief generic factory function
* create an SeekStream for read only,
* the stream will close the underlying files upon deletion
* error will be reported and the system will exit when create failed
* \param uri the uri of the input currently we support
* hdfs://, s3://, and file:// by default file:// will be used
* \param allow_null whether NULL can be returned, or directly report error
* \return the created stream, can be NULL when allow_null == true and file do not exist
*/
static SeekStream *CreateForRead(const char *uri,
bool allow_null = false);
}; };
/*! \brief interface for serializable objects */ /*! \brief interface for serializable objects */
@ -114,6 +130,8 @@ class InputSplit {
/*! \brief size of the memory region */ /*! \brief size of the memory region */
size_t size; size_t size;
}; };
/*! \brief reset the position of InputSplit to beginning */
virtual void BeforeFirst(void) = 0;
/*! /*!
* \brief get the next record, the returning value * \brief get the next record, the returning value
* is valid until next call to NextRecord or NextChunk * is valid until next call to NextRecord or NextChunk

View File

@ -38,6 +38,7 @@ else:
runscript = '%s/runrabit.sh' % args.logdir runscript = '%s/runrabit.sh' % args.logdir
fo = open(runscript, 'w') fo = open(runscript, 'w')
fo.write('source ~/.bashrc\n')
fo.write('\"$@\"\n') fo.write('\"$@\"\n')
fo.close() fo.close()
# #
@ -51,7 +52,7 @@ def sge_submit(nslave, worker_args, worker_envs):
nslave number of slave process to start up nslave number of slave process to start up
args arguments to launch each job args arguments to launch each job
this usually includes the parameters of master_uri and parameters passed into submit this usually includes the parameters of master_uri and parameters passed into submit
""" """
env_arg = ','.join(['%s=\"%s\"' % (k, str(v)) for k, v in worker_envs.items()]) env_arg = ','.join(['%s=\"%s\"' % (k, str(v)) for k, v in worker_envs.items()])
cmd = 'qsub -cwd -t 1-%d -S /bin/bash' % nslave cmd = 'qsub -cwd -t 1-%d -S /bin/bash' % nslave
if args.queue != 'default': if args.queue != 'default':