Squashed 'subtree/rabit/' content from commit c7282ac

git-subtree-dir: subtree/rabit
git-subtree-split: c7282acb2a
This commit is contained in:
tqchen
2015-01-18 21:08:17 -08:00
commit d87691ec60
68 changed files with 9081 additions and 0 deletions

94
tracker/rabit_demo.py Executable file
View File

@@ -0,0 +1,94 @@
#!/usr/bin/python
"""
This is the demo submission script of rabit, it is created to
submit rabit jobs using hadoop streaming
"""
import argparse
import sys
import os
import subprocess
from threading import Thread
import rabit_tracker as tracker
if os.name == 'nt':
WRAPPER_PATH = os.path.dirname(__file__) + '\\..\\wrapper'
else:
WRAPPER_PATH = os.path.dirname(__file__) + '/../wrapper'
parser = argparse.ArgumentParser(description='Rabit script to submit rabit job locally using python subprocess')
parser.add_argument('-n', '--nworker', required=True, type=int,
help = 'number of worker proccess to be launched')
parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int,
help = 'print more messages into the console')
parser.add_argument('command', nargs='+',
help = 'command for rabit program')
args = parser.parse_args()
# bash script for keepalive
# use it so that python do not need to communicate with subprocess
echo="echo %s rabit_num_trial=$nrep;"
keepalive = """
nrep=0
rc=254
while [ $rc -eq 254 ];
do
%s
%s %s rabit_num_trial=$nrep
rc=$?;
nrep=$((nrep+1));
done
"""
def exec_cmd(cmd, taskid):
if cmd[0].find('/') == -1 and os.path.exists(cmd[0]) and os.name != 'nt':
cmd[0] = './' + cmd[0]
cmd = ' '.join(cmd)
arg = ' rabit_task_id=%d' % (taskid)
cmd = cmd + arg
ntrial = 0
while True:
if os.name == 'nt':
prep = 'SET PYTHONPATH=\"%s\"\n' % WRAPPER_PATH
ret = subprocess.call(prep + cmd + ('rabit_num_trial=%d' % ntrial), shell=True)
if ret == 254:
ntrial += 1
continue
else:
prep = 'PYTHONPATH=\"%s\" ' % WRAPPER_PATH
if args.verbose != 0:
bash = keepalive % (echo % cmd, prep, cmd)
else:
bash = keepalive % ('', prep, cmd)
ret = subprocess.call(bash, shell=True, executable='bash')
if ret == 0:
if args.verbose != 0:
print 'Thread %d exit with 0' % taskid
return
else:
if os.name == 'nt':
os.exit(-1)
else:
raise Exception('Get nonzero return code=%d' % ret)
#
# Note: this submit script is only used for demo purpose
# submission script using pyhton multi-threading
#
def mthread_submit(nslave, worker_args):
"""
customized submit script, that submit nslave jobs, each must contain args as parameter
note this can be a lambda function containing additional parameters in input
Parameters
nslave number of slave process to start up
args arguments to launch each job
this usually includes the parameters of master_uri and parameters passed into submit
"""
procs = {}
for i in range(nslave):
procs[i] = Thread(target = exec_cmd, args = (args.command + worker_args, i))
procs[i].daemon = True
procs[i].start()
for i in range(nslave):
procs[i].join()
# call submit, with nslave, the commands to run each job and submit function
tracker.submit(args.nworker, [], fun_submit = mthread_submit, verbose = args.verbose)

152
tracker/rabit_hadoop.py Executable file
View File

@@ -0,0 +1,152 @@
#!/usr/bin/python
"""
This is a script to submit rabit job using hadoop streaming.
It will submit the rabit process as mappers of MapReduce.
"""
import argparse
import sys
import os
import time
import subprocess
import warnings
import rabit_tracker as tracker
WRAPPER_PATH = os.path.dirname(__file__) + '/../wrapper'
#!!! Set path to hadoop and hadoop streaming jar here
hadoop_binary = 'hadoop'
hadoop_streaming_jar = None
# code
hadoop_home = os.getenv('HADOOP_HOME')
if hadoop_home != None:
if hadoop_binary == None:
hadoop_binary = hadoop_home + '/bin/hadoop'
assert os.path.exists(hadoop_binary), "HADOOP_HOME does not contain the hadoop binary"
if hadoop_streaming_jar == None:
hadoop_streaming_jar = hadoop_home + '/lib/hadoop-streaming.jar'
assert os.path.exists(hadoop_streaming_jar), "HADOOP_HOME does not contain the hadoop streaming jar"
if hadoop_binary == None or hadoop_streaming_jar == None:
warnings.warn('Warning: Cannot auto-detect path to hadoop or hadoop-streaming jar\n'\
'\tneed to set them via arguments -hs and -hb\n'\
'\tTo enable auto-detection, you can set enviroment variable HADOOP_HOME'\
', or modify rabit_hadoop.py line 16', stacklevel = 2)
parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs using Hadoop Streaming.'\
'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended')
parser.add_argument('-n', '--nworker', required=True, type=int,
help = 'number of worker proccess to be launched')
parser.add_argument('-nt', '--nthread', default = -1, type=int,
help = 'number of thread in each mapper to be launched, set it if each rabit job is multi-threaded')
parser.add_argument('-i', '--input', required=True,
help = 'input path in HDFS')
parser.add_argument('-o', '--output', required=True,
help = 'output path in HDFS')
parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int,
help = 'print more messages into the console')
parser.add_argument('-ac', '--auto_file_cache', default=1, choices=[0, 1], type=int,
help = 'whether automatically cache the files in the command to hadoop localfile, this is on by default')
parser.add_argument('-f', '--files', default = [], action='append',
help = 'the cached file list in mapreduce,'\
' the submission script will automatically cache all the files which appears in command'\
' This will also cause rewritten of all the file names in the command to current path,'\
' for example `../../kmeans ../kmeans.conf` will be rewritten to `./kmeans kmeans.conf`'\
' because the two files are cached to running folder.'\
' You may need this option to cache additional files.'\
' You can also use it to manually cache files when auto_file_cache is off')
parser.add_argument('--jobname', default='auto', help = 'customize jobname in tracker')
parser.add_argument('--timeout', default=600000000, type=int,
help = 'timeout (in million seconds) of each mapper job, automatically set to a very long time,'\
'normally you do not need to set this ')
parser.add_argument('-mem', '--memory_mb', default=-1, type=int,
help = 'maximum memory used by the process. Guide: set it large (near mapred.cluster.max.map.memory.mb)'\
'if you are running multi-threading rabit,'\
'so that each node can occupy all the mapper slots in a machine for maximum performance')
if hadoop_binary == None:
parser.add_argument('-hb', '--hadoop_binary', required = True,
help="path to hadoop binary file")
else:
parser.add_argument('-hb', '--hadoop_binary', default = hadoop_binary,
help="path to hadoop binary file")
if hadoop_streaming_jar == None:
parser.add_argument('-hs', '--hadoop_streaming_jar', required = True,
help='path to hadoop streamimg jar file')
else:
parser.add_argument('-hs', '--hadoop_streaming_jar', default = hadoop_streaming_jar,
help='path to hadoop streamimg jar file')
parser.add_argument('command', nargs='+',
help = 'command for rabit program')
args = parser.parse_args()
if args.jobname == 'auto':
args.jobname = ('Rabit[nworker=%d]:' % args.nworker) + args.command[0].split('/')[-1];
# detech hadoop version
(out, err) = subprocess.Popen('%s version' % args.hadoop_binary, shell = True, stdout=subprocess.PIPE).communicate()
out = out.split('\n')[0].split()
assert out[0] == 'Hadoop', 'cannot parse hadoop version string'
hadoop_version = out[1].split('.')
use_yarn = int(hadoop_version[0]) >= 2
print 'Current Hadoop Version is %s' % out[1]
def hadoop_streaming(nworker, worker_args, use_yarn):
fset = set()
if args.auto_file_cache:
for i in range(len(args.command)):
f = args.command[i]
if os.path.exists(f):
fset.add(f)
if i == 0:
args.command[i] = './' + args.command[i].split('/')[-1]
else:
args.command[i] = args.command[i].split('/')[-1]
if args.command[0].endswith('.py'):
flst = [WRAPPER_PATH + '/rabit.py',
WRAPPER_PATH + '/librabit_wrapper.so',
WRAPPER_PATH + '/librabit_wrapper_mock.so']
for f in flst:
if os.path.exists(f):
fset.add(f)
kmap = {}
# setup keymaps
if use_yarn:
kmap['nworker'] = 'mapreduce.job.maps'
kmap['jobname'] = 'mapreduce.job.name'
kmap['nthread'] = 'mapreduce.map.cpu.vcores'
kmap['timeout'] = 'mapreduce.task.timeout'
kmap['memory_mb'] = 'mapreduce.map.memory.mb'
else:
kmap['nworker'] = 'mapred.map.tasks'
kmap['jobname'] = 'mapred.job.name'
kmap['nthread'] = None
kmap['timeout'] = 'mapred.task.timeout'
kmap['memory_mb'] = 'mapred.job.map.memory.mb'
cmd = '%s jar %s' % (args.hadoop_binary, args.hadoop_streaming_jar)
cmd += ' -D%s=%d' % (kmap['nworker'], nworker)
cmd += ' -D%s=%s' % (kmap['jobname'], args.jobname)
if args.nthread != -1:
if kmap['nthread'] is None:
warnings.warn('nthread can only be set in Yarn(Hadoop version greater than 2.0),'\
'it is recommended to use Yarn to submit rabit jobs', stacklevel = 2)
else:
cmd += ' -D%s=%d' % (kmap['nthread'], args.nthread)
cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout)
if args.memory_mb != -1:
cmd += ' -D%s=%d' % (kmap['timeout'], args.timeout)
cmd += ' -input %s -output %s' % (args.input, args.output)
cmd += ' -mapper \"%s\" -reducer \"/bin/cat\" ' % (' '.join(args.command + worker_args))
if args.files != None:
for flst in args.files:
for f in flst.split('#'):
fset.add(f)
for f in fset:
cmd += ' -file %s' % f
print cmd
subprocess.check_call(cmd, shell = True)
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2)
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose)

43
tracker/rabit_mpi.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/python
"""
This is the demo submission script of rabit, it is created to
submit rabit jobs using hadoop streaming
"""
import argparse
import sys
import os
import subprocess
import rabit_tracker as tracker
parser = argparse.ArgumentParser(description='Rabit script to submit rabit job using MPI')
parser.add_argument('-n', '--nworker', required=True, type=int,
help = 'number of worker proccess to be launched')
parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int,
help = 'print more messages into the console')
parser.add_argument('-H', '--hostfile', type=str,
help = 'the hostfile of mpi server')
parser.add_argument('command', nargs='+',
help = 'command for rabit program')
args = parser.parse_args()
#
# submission script using MPI
#
def mpi_submit(nslave, worker_args):
"""
customized submit script, that submit nslave jobs, each must contain args as parameter
note this can be a lambda function containing additional parameters in input
Parameters
nslave number of slave process to start up
args arguments to launch each job
this usually includes the parameters of master_uri and parameters passed into submit
"""
sargs = ' '.join(args.command + worker_args)
if args.hostfile is None:
cmd = ' '.join(['mpirun -n %d' % (nslave)] + args.command + worker_args)
else:
' '.join(['mpirun -n %d --hostfile %s' % (nslave, args.hostfile)] + args.command + worker_args)
print cmd
subprocess.check_call(cmd, shell = True)
# call submit, with nslave, the commands to run each job and submit function
tracker.submit(args.nworker, [], fun_submit = mpi_submit, verbose = args.verbose)

263
tracker/rabit_tracker.py Normal file
View File

@@ -0,0 +1,263 @@
"""
Tracker script for rabit
Implements the tracker control protocol
- start rabit jobs
- help nodes to establish links with each other
Tianqi Chen
"""
import sys
import os
import socket
import struct
import subprocess
import random
from threading import Thread
"""
Extension of socket to handle recv and send of special data
"""
class ExSocket:
def __init__(self, sock):
self.sock = sock
def recvall(self, nbytes):
res = []
sock = self.sock
nread = 0
while nread < nbytes:
chunk = self.sock.recv(min(nbytes - nread, 1024))
nread += len(chunk)
res.append(chunk)
return ''.join(res)
def recvint(self):
return struct.unpack('@i', self.recvall(4))[0]
def sendint(self, n):
self.sock.sendall(struct.pack('@i', n))
def sendstr(self, s):
self.sendint(len(s))
self.sock.sendall(s)
def recvstr(self):
slen = self.recvint()
return self.recvall(slen)
# magic number used to verify existence of data
kMagic = 0xff99
class SlaveEntry:
def __init__(self, sock, s_addr):
slave = ExSocket(sock)
self.sock = slave
self.host = s_addr[0]
magic = slave.recvint()
assert magic == kMagic, 'invalid magic number=%d from %s' % (magic, s_addr[0])
slave.sendint(kMagic)
self.rank = slave.recvint()
self.world_size = slave.recvint()
self.jobid = slave.recvstr()
self.cmd = slave.recvstr()
def decide_rank(self, job_map):
if self.rank >= 0:
return self.rank
if self.jobid != 'NULL' and self.jobid in job_map:
return job_map[self.jobid]
return -1
def assign_rank(self, rank, wait_conn, tree_map, parent_map, ring_map):
self.rank = rank
nnset = set(tree_map[rank])
rprev, rnext = ring_map[rank]
self.sock.sendint(rank)
# send parent rank
self.sock.sendint(parent_map[rank])
# send world size
self.sock.sendint(len(tree_map))
self.sock.sendint(len(nnset))
# send the rprev and next link
for r in nnset:
self.sock.sendint(r)
# send prev link
if rprev != -1 and rprev != rank:
nnset.add(rprev)
self.sock.sendint(rprev)
else:
self.sock.sendint(-1)
# send next link
if rnext != -1 and rnext != rank:
nnset.add(rnext)
self.sock.sendint(rnext)
else:
self.sock.sendint(-1)
while True:
ngood = self.sock.recvint()
goodset = set([])
for i in xrange(ngood):
goodset.add(self.sock.recvint())
assert goodset.issubset(nnset)
badset = nnset - goodset
conset = []
for r in badset:
if r in wait_conn:
conset.append(r)
self.sock.sendint(len(conset))
self.sock.sendint(len(badset) - len(conset))
for r in conset:
self.sock.sendstr(wait_conn[r].host)
self.sock.sendint(wait_conn[r].port)
self.sock.sendint(r)
nerr = self.sock.recvint()
if nerr != 0:
continue
self.port = self.sock.recvint()
rmset = []
# all connection was successuly setup
for r in conset:
wait_conn[r].wait_accept -= 1
if wait_conn[r].wait_accept == 0:
rmset.append(r)
for r in rmset:
wait_conn.pop(r, None)
self.wait_accept = len(badset) - len(conset)
return rmset
class Tracker:
def __init__(self, port = 9091, port_end = 9999, verbose = True):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for port in range(port, port_end):
try:
sock.bind(('', port))
self.port = port
break
except socket.error:
continue
sock.listen(16)
self.sock = sock
self.verbose = verbose
self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1)
def __del__(self):
self.sock.close()
def slave_args(self):
return ['rabit_tracker_uri=%s' % socket.gethostname(),
'rabit_tracker_port=%s' % self.port]
def get_neighbor(self, rank, nslave):
rank = rank + 1
ret = []
if rank > 1:
ret.append(rank / 2 - 1)
if rank * 2 - 1 < nslave:
ret.append(rank * 2 - 1)
if rank * 2 < nslave:
ret.append(rank * 2)
return ret
def get_tree(self, nslave):
tree_map = {}
parent_map = {}
for r in range(nslave):
tree_map[r] = self.get_neighbor(r, nslave)
parent_map[r] = (r + 1) / 2 - 1
return tree_map, parent_map
def find_share_ring(self, tree_map, parent_map, r):
"""
get a ring structure that tends to share nodes with the tree
return a list starting from r
"""
nset = set(tree_map[r])
cset = nset - set([parent_map[r]])
if len(cset) == 0:
return [r]
rlst = [r]
cnt = 0
for v in cset:
vlst = self.find_share_ring(tree_map, parent_map, v)
cnt += 1
if cnt == len(cset):
vlst.reverse()
rlst += vlst
return rlst
def get_ring(self, tree_map, parent_map):
"""
get a ring connection used to recover local data
"""
assert parent_map[0] == -1
rlst = self.find_share_ring(tree_map, parent_map, 0)
assert len(rlst) == len(tree_map)
ring_map = {}
nslave = len(tree_map)
for r in range(nslave):
rprev = (r + nslave - 1) % nslave
rnext = (r + 1) % nslave
ring_map[rlst[r]] = (rlst[rprev], rlst[rnext])
return ring_map
def handle_print(self,slave, msg):
sys.stdout.write(msg)
def log_print(self, msg, level):
if level == 1:
if self.verbose:
sys.stderr.write(msg + '\n')
else:
sys.stderr.write(msg + '\n')
def accept_slaves(self, nslave):
# set of nodes that finishs the job
shutdown = {}
# set of nodes that is waiting for connections
wait_conn = {}
# maps job id to rank
job_map = {}
# list of workers that is pending to be assigned rank
pending = []
# lazy initialize tree_map
tree_map = None
while len(shutdown) != nslave:
fd, s_addr = self.sock.accept()
s = SlaveEntry(fd, s_addr)
if s.cmd == 'print':
msg = s.sock.recvstr()
self.handle_print(s, msg)
continue
if s.cmd == 'shutdown':
assert s.rank >= 0 and s.rank not in shutdown
assert s.rank not in wait_conn
shutdown[s.rank] = s
self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank), 1)
continue
assert s.cmd == 'start' or s.cmd == 'recover'
# lazily initialize the slaves
if tree_map == None:
assert s.cmd == 'start'
if s.world_size > 0:
nslave = s.world_size
tree_map, parent_map = self.get_tree(nslave)
ring_map = self.get_ring(tree_map, parent_map)
# set of nodes that is pending for getting up
todo_nodes = range(nslave)
random.shuffle(todo_nodes)
else:
assert s.world_size == -1 or s.world_size == nslave
if s.cmd == 'recover':
assert s.rank >= 0
rank = s.decide_rank(job_map)
if rank == -1:
assert len(todo_nodes) != 0
rank = todo_nodes.pop(0)
if s.jobid != 'NULL':
job_map[s.jobid] = rank
if len(todo_nodes) == 0:
self.log_print('@tracker All of %d nodes getting started' % nslave, 2)
s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map)
if s.cmd != 'start':
self.log_print('Recieve %s signal from %d' % (s.cmd, s.rank), 1)
else:
self.log_print('Recieve %s signal from %s; assign rank %d' % (s.cmd, s.host, s.rank), 1)
if s.wait_accept > 0:
wait_conn[rank] = s
self.log_print('@tracker All nodes finishes job', 2)
def submit(nslave, args, fun_submit, verbose):
master = Tracker(verbose = verbose)
submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args()))
submit_thread.daemon = True
submit_thread.start()
master.accept_slaves(nslave)
submit_thread.join()