improve tracker script

This commit is contained in:
tqchen
2014-12-19 04:20:45 -08:00
parent 69d7f71ae8
commit 9f42b78a18
6 changed files with 88 additions and 48 deletions

44
tracker/rabit_hadoop.py Executable file
View File

@@ -0,0 +1,44 @@
#!/usr/bin/python
"""
This is a script to submit rabit job using hadoop streaming
submit the rabit process as mappers of MapReduce
"""
import argparse
import sys
import os
import time
import subprocess
import rabit_tracker as tracker
#!!! you can directly set hadoop binary path and hadoop streaming path here
hadoop_binary = 'hadoop'
hadoop_streaming_jar = None
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using hadoop streaming')
parser.add_argument('-s', '--nslaves', required=True, type=int,
help = "number of slaves proccess to be launched")
if hadoop_binary == None:
parser.add_argument('-hb', '--hadoop_binary', required=True,
help="path-to-hadoop binary folder")
if hadoop_streaming_jar == None:
parser.add_argument('-hs', '--hadoop_streaming_jar', required=True,
help='path-to hadoop streamimg jar file')
parser.add_argument('-i', '--input', required=True)
parser.add_argument('-o', '--output', required=True)
parser.add_argument('-m', '--mapper', required=True)
parser.add_argument('-a', '--args', required=True)
args = parser.parse_args()
if hadoop_binary != None:
args.hadoop_binary = hadoop_binary
if hadoop_streaming_jar != None:
args.hadoop_streaming_jar = hadoop_streaming_jar
def hadoop_streaming(nslaves, slave_args):
cmd = '%s jar %s -input %s -output %s -mapper \"%s stdin %d %d stdout %s\" -reducer \"/bin/cat\" -file %s' % (args.hadoop_binary, args.hadoop_streaming_jar, args.input, args.output, args.mapper, args.nclusters, args.iterations, ' '.join(slave_args), args.mapper)
print cmd
subprocess.check_call(cmd, shell = True)
start = time.time()
tracker.submit(args.nslaves, [], fun_submit= hadoop_streaming)
print 'All run took %s' % (time.time() - start)

40
tracker/rabit_mpi.py Executable file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/python
"""
This is an example script to create a customized job submit with mpi
script using rabit engine
"""
import sys
import os
import subprocess
# import the tcp_master.py
# add path to sync
sys.path.append(os.path.dirname(__file__)+'/src/')
import rabit_tracker as tracker
#
# Note: this submit script is only used for example purpose
# It does not have to be mpirun, it can be any job submission script that starts the job, qsub, hadoop streaming etc.
#
def mpi_submit(nslave, args):
"""
customized submit script, that submit nslave jobs, each must contain args as parameter
note this can be a lambda function containing additional parameters in input
Parameters
nslave number of slave process to start up
args arguments to launch each job
this usually includes the parameters of master_uri and parameters passed into submit
"""
if args[0] == 'local':
cmd = ' '.join(['mpirun -n %d' % (nslave)] + args[1:])
else:
cmd = ' '.join(['mpirun -n %d --hostfile %s' % (nslave, args[0])] + args[1:])
print cmd
subprocess.check_call(cmd, shell = True)
if __name__ == '__main__':
if len(sys.argv) < 2:
print 'Usage: <nslave> <machine_file> <cmd>'
print 'if <machine_file> == local, we will run using local mode'
exit(0)
# call submit, with nslave, the commands to run each job and submit function
tracker.submit(int(sys.argv[1]), sys.argv[2:], fun_submit= mpi_submit)

253
tracker/rabit_tracker.py Normal file
View File

@@ -0,0 +1,253 @@
"""
Tracker script for rabit
Implements the tracker control protocol
- start rabit jobs
- help nodes to establish links with each other
Tianqi Chen
"""
import sys
import os
import socket
import struct
import subprocess
import random
from threading import Thread
"""
Extension of socket to handle recv and send of special data
"""
class ExSocket:
def __init__(self, sock):
self.sock = sock
def recvall(self, nbytes):
res = []
sock = self.sock
nread = 0
while nread < nbytes:
chunk = self.sock.recv(min(nbytes - nread, 1024), socket.MSG_WAITALL)
nread += len(chunk)
res.append(chunk)
return ''.join(res)
def recvint(self):
return struct.unpack('@i', self.recvall(4))[0]
def sendint(self, n):
self.sock.sendall(struct.pack('@i', n))
def sendstr(self, s):
self.sendint(len(s))
self.sock.sendall(s)
def recvstr(self):
slen = self.recvint()
return self.recvall(slen)
# magic number used to verify existence of data
kMagic = 0xff99
class SlaveEntry:
def __init__(self, sock, s_addr):
slave = ExSocket(sock)
self.sock = slave
self.host = s_addr[0]
magic = slave.recvint()
assert magic == kMagic, 'invalid magic number=%d from %s' % (magic, s_addr[0])
slave.sendint(kMagic)
self.rank = slave.recvint()
self.world_size = slave.recvint()
self.jobid = slave.recvstr()
self.cmd = slave.recvstr()
def decide_rank(self, job_map):
if self.rank >= 0:
return self.rank
if self.jobid != 'NULL' and self.jobid in job_map:
return job_map[self.jobid]
return -1
def assign_rank(self, rank, wait_conn, tree_map, parent_map, ring_map):
self.rank = rank
nnset = set(tree_map[rank])
rprev, rnext = ring_map[rank]
self.sock.sendint(rank)
# send parent rank
self.sock.sendint(parent_map[rank])
# send world size
self.sock.sendint(len(tree_map))
self.sock.sendint(len(nnset))
# send the rprev and next link
for r in nnset:
self.sock.sendint(r)
# send prev link
if rprev != -1 and rprev != rank:
nnset.add(rprev)
self.sock.sendint(rprev)
else:
self.sock.sendint(-1)
# send next link
if rnext != -1 and rnext != rank:
nnset.add(rnext)
self.sock.sendint(rnext)
else:
self.sock.sendint(-1)
while True:
ngood = self.sock.recvint()
goodset = set([])
for i in xrange(ngood):
goodset.add(self.sock.recvint())
assert goodset.issubset(nnset)
badset = nnset - goodset
conset = []
for r in badset:
if r in wait_conn:
conset.append(r)
self.sock.sendint(len(conset))
self.sock.sendint(len(badset) - len(conset))
for r in conset:
self.sock.sendstr(wait_conn[r].host)
self.sock.sendint(wait_conn[r].port)
self.sock.sendint(r)
nerr = self.sock.recvint()
if nerr != 0:
continue
self.port = self.sock.recvint()
rmset = []
# all connection was successuly setup
for r in conset:
wait_conn[r].wait_accept -= 1
if wait_conn[r].wait_accept == 0:
rmset.append(r)
for r in rmset:
wait_conn.pop(r, None)
self.wait_accept = len(badset) - len(conset)
return rmset
class Tracker:
def __init__(self, port = 9091, port_end = 9999):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for port in range(port, port_end):
try:
sock.bind(('', port))
self.port = port
break
except socket.error:
continue
sock.listen(16)
self.sock = sock
print 'start listen on %s:%d' % (socket.gethostname(), self.port)
def __del__(self):
self.sock.close()
def slave_args(self):
return ['rabit_tracker_uri=%s' % socket.gethostname(),
'rabit_tracker_port=%s' % self.port]
def get_neighbor(self, rank, nslave):
rank = rank + 1
ret = []
if rank > 1:
ret.append(rank / 2 - 1)
if rank * 2 - 1 < nslave:
ret.append(rank * 2 - 1)
if rank * 2 < nslave:
ret.append(rank * 2)
return ret
def get_tree(self, nslave):
tree_map = {}
parent_map = {}
for r in range(nslave):
tree_map[r] = self.get_neighbor(r, nslave)
parent_map[r] = (r + 1) / 2 - 1
return tree_map, parent_map
def find_share_ring(self, tree_map, parent_map, r):
"""
get a ring structure that tends to share nodes with the tree
return a list starting from r
"""
nset = set(tree_map[r])
cset = nset - set([parent_map[r]])
if len(cset) == 0:
return [r]
rlst = [r]
cnt = 0
for v in cset:
vlst = self.find_share_ring(tree_map, parent_map, v)
cnt += 1
if cnt == len(cset):
vlst.reverse()
rlst += vlst
return rlst
def get_ring(self, tree_map, parent_map):
"""
get a ring connection used to recover local data
"""
assert parent_map[0] == -1
rlst = self.find_share_ring(tree_map, parent_map, 0)
assert len(rlst) == len(tree_map)
ring_map = {}
nslave = len(tree_map)
for r in range(nslave):
rprev = (r + nslave - 1) % nslave
rnext = (r + 1) % nslave
ring_map[rlst[r]] = (rlst[rprev], rlst[rnext])
return ring_map
def accept_slaves(self, nslave):
# set of nodes that finishs the job
shutdown = {}
# set of nodes that is waiting for connections
wait_conn = {}
# maps job id to rank
job_map = {}
# list of workers that is pending to be assigned rank
pending = []
# lazy initialize tree_map
tree_map = None
while len(shutdown) != nslave:
fd, s_addr = self.sock.accept()
s = SlaveEntry(fd, s_addr)
if s.cmd == 'shutdown':
assert s.rank >= 0 and s.rank not in shutdown
assert s.rank not in wait_conn
shutdown[s.rank] = s
print 'Recieve %s signal from %d' % (s.cmd, s.rank)
continue
assert s.cmd == 'start' or s.cmd == 'recover'
# lazily initialize the slaves
if tree_map == None:
assert s.cmd == 'start'
print s.world_size
if s.world_size > 0:
nslave = s.world_size
tree_map, parent_map = self.get_tree(nslave)
ring_map = self.get_ring(tree_map, parent_map)
# set of nodes that is pending for getting up
todo_nodes = range(nslave)
random.shuffle(todo_nodes)
else:
assert s.world_size == -1 or s.world_size == nslave
if s.cmd == 'recover':
assert s.rank >= 0
rank = s.decide_rank(job_map)
if rank == -1:
assert len(todo_nodes) != 0
rank = todo_nodes.pop(0)
if s.jobid != 'NULL':
job_map[s.jobid] = rank
s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map)
if s.cmd != 'start':
print 'Recieve %s signal from %d' % (s.cmd, s.rank)
else:
print 'Recieve %s signal from %s assign rank %d' % (s.cmd, s.host, s.rank)
if s.wait_accept > 0:
wait_conn[rank] = s
print 'All nodes finishes job'
def mpi_submit(nslave, args):
cmd = ' '.join(['mpirun -n %d' % nslave] + args)
print cmd
return subprocess.check_call(cmd, shell = True)
def submit(nslave, args, fun_submit = mpi_submit):
master = Tracker()
submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args()))
submit_thread.start()
master.accept_slaves(nslaves)
submit_thread.join()