Merge commit '1e56ba86d9d3e44b14c0a8f5ff71369307dbe86c'

This commit is contained in:
tqchen 2015-04-27 15:58:57 -07:00
commit 59b96cdda5
2 changed files with 24 additions and 5 deletions

View File

@ -36,13 +36,17 @@ class Stream {
virtual ~Stream(void) {} virtual ~Stream(void) {}
/*! /*!
* \brief generic factory function * \brief generic factory function
* create an stream, the stream will close the underlying files * create an stream, the stream will close the underlying files upon deletion
* upon deletion *
* \param uri the uri of the input currently we support * \param uri the uri of the input currently we support
* hdfs://, s3://, and file:// by default file:// will be used * hdfs://, s3://, and file:// by default file:// will be used
* \param flag can be "w", "r", "a" * \param flag can be "w", "r", "a"
* \param allow_null whether NULL can be returned, or directly report error
* \return the created stream, can be NULL when allow_null == true and file do not exist
*/ */
static Stream *Create(const char *uri, const char* const flag); static Stream *Create(const char *uri,
const char* const flag,
bool allow_null = false);
// helper functions to write/read different data structures // helper functions to write/read different data structures
/*! /*!
* \brief writes a vector * \brief writes a vector
@ -80,7 +84,19 @@ class SeekStream: public Stream {
/*! \brief tell the position of the stream */ /*! \brief tell the position of the stream */
virtual size_t Tell(void) = 0; virtual size_t Tell(void) = 0;
/*! \return whether we are at end of file */ /*! \return whether we are at end of file */
virtual bool AtEnd(void) const = 0; virtual bool AtEnd(void) const = 0;
/*!
* \brief generic factory function
* create an SeekStream for read only,
* the stream will close the underlying files upon deletion
* error will be reported and the system will exit when create failed
* \param uri the uri of the input currently we support
* hdfs://, s3://, and file:// by default file:// will be used
* \param allow_null whether NULL can be returned, or directly report error
* \return the created stream, can be NULL when allow_null == true and file do not exist
*/
static SeekStream *CreateForRead(const char *uri,
bool allow_null = false);
}; };
/*! \brief interface for serializable objects */ /*! \brief interface for serializable objects */
@ -114,6 +130,8 @@ class InputSplit {
/*! \brief size of the memory region */ /*! \brief size of the memory region */
size_t size; size_t size;
}; };
/*! \brief reset the position of InputSplit to beginning */
virtual void BeforeFirst(void) = 0;
/*! /*!
* \brief get the next record, the returning value * \brief get the next record, the returning value
* is valid until next call to NextRecord or NextChunk * is valid until next call to NextRecord or NextChunk

View File

@ -38,6 +38,7 @@ else:
runscript = '%s/runrabit.sh' % args.logdir runscript = '%s/runrabit.sh' % args.logdir
fo = open(runscript, 'w') fo = open(runscript, 'w')
fo.write('source ~/.bashrc\n')
fo.write('\"$@\"\n') fo.write('\"$@\"\n')
fo.close() fo.close()
# #
@ -51,7 +52,7 @@ def sge_submit(nslave, worker_args, worker_envs):
nslave number of slave process to start up nslave number of slave process to start up
args arguments to launch each job args arguments to launch each job
this usually includes the parameters of master_uri and parameters passed into submit this usually includes the parameters of master_uri and parameters passed into submit
""" """
env_arg = ','.join(['%s=\"%s\"' % (k, str(v)) for k, v in worker_envs.items()]) env_arg = ','.join(['%s=\"%s\"' % (k, str(v)) for k, v in worker_envs.items()])
cmd = 'qsub -cwd -t 1-%d -S /bin/bash' % nslave cmd = 'qsub -cwd -t 1-%d -S /bin/bash' % nslave
if args.queue != 'default': if args.queue != 'default':