This commit is contained in:
tqchen 2015-03-06 21:12:04 -08:00
parent a1bd3c64f0
commit 19be870562
2 changed files with 12 additions and 4 deletions

View File

@ -23,6 +23,8 @@ class ISeekStream: public IStream {
virtual void Seek(size_t pos) = 0; virtual void Seek(size_t pos) = 0;
/*! \brief tell the position of the stream */ /*! \brief tell the position of the stream */
virtual size_t Tell(void) = 0; virtual size_t Tell(void) = 0;
/*! \return whether we are at end of file */
virtual bool AtEnd(void) const = 0;
}; };
/*! \brief fixed size memory buffer */ /*! \brief fixed size memory buffer */
@ -55,7 +57,9 @@ struct MemoryFixSizeBuffer : public ISeekStream {
virtual size_t Tell(void) { virtual size_t Tell(void) {
return curr_ptr_; return curr_ptr_;
} }
virtual bool AtEnd(void) const {
return curr_ptr_ == buffer_size_;
}
private: private:
/*! \brief in memory buffer */ /*! \brief in memory buffer */
char *p_buffer_; char *p_buffer_;
@ -95,7 +99,9 @@ struct MemoryBufferStream : public ISeekStream {
virtual size_t Tell(void) { virtual size_t Tell(void) {
return curr_ptr_; return curr_ptr_;
} }
virtual bool AtEnd(void) const {
return curr_ptr_ == p_buffer_->length();
}
private: private:
/*! \brief in memory buffer */ /*! \brief in memory buffer */
std::string *p_buffer_; std::string *p_buffer_;

View File

@ -134,12 +134,14 @@ class Tracker:
sock.listen(16) sock.listen(16)
self.sock = sock self.sock = sock
self.verbose = verbose self.verbose = verbose
if hostIP == 'auto':
hostIP = 'dns'
self.hostIP = hostIP self.hostIP = hostIP
self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1) self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1)
def __del__(self): def __del__(self):
self.sock.close() self.sock.close()
def slave_args(self): def slave_args(self):
if self.hostIP == 'auto': if self.hostIP == 'dns':
host = socket.gethostname() host = socket.gethostname()
elif self.hostIP == 'ip': elif self.hostIP == 'ip':
host = socket.gethostbyname(socket.getfqdn()) host = socket.gethostbyname(socket.getfqdn())
@ -261,7 +263,7 @@ class Tracker:
wait_conn[rank] = s wait_conn[rank] = s
self.log_print('@tracker All nodes finishes job', 2) self.log_print('@tracker All nodes finishes job', 2)
def submit(nslave, args, fun_submit, verbose, hostIP): def submit(nslave, args, fun_submit, verbose, hostIP = 'auto'):
master = Tracker(verbose = verbose, hostIP = hostIP) master = Tracker(verbose = verbose, hostIP = hostIP)
submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args())) submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args()))
submit_thread.daemon = True submit_thread.daemon = True