update tracker for host IP
This commit is contained in:
parent
e4ce8efab5
commit
75c647cd84
@ -37,6 +37,8 @@ parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs
|
|||||||
'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended')
|
'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended')
|
||||||
parser.add_argument('-n', '--nworker', required=True, type=int,
|
parser.add_argument('-n', '--nworker', required=True, type=int,
|
||||||
help = 'number of worker proccess to be launched')
|
help = 'number of worker proccess to be launched')
|
||||||
|
parser.add_argument('-hip', '--host_ip', default='auto', type=str,
|
||||||
|
help = 'host IP address if cannot be automatically guessed, specify the IP of submission machine')
|
||||||
parser.add_argument('-nt', '--nthread', default = -1, type=int,
|
parser.add_argument('-nt', '--nthread', default = -1, type=int,
|
||||||
help = 'number of thread in each mapper to be launched, set it if each rabit job is multi-threaded')
|
help = 'number of thread in each mapper to be launched, set it if each rabit job is multi-threaded')
|
||||||
parser.add_argument('-i', '--input', required=True,
|
parser.add_argument('-i', '--input', required=True,
|
||||||
@ -149,4 +151,4 @@ def hadoop_streaming(nworker, worker_args, use_yarn):
|
|||||||
subprocess.check_call(cmd, shell = True)
|
subprocess.check_call(cmd, shell = True)
|
||||||
|
|
||||||
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2)
|
fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2)
|
||||||
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose)
|
tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose, hostIP = args.host_ip)
|
||||||
|
|||||||
@ -122,7 +122,7 @@ class SlaveEntry:
|
|||||||
return rmset
|
return rmset
|
||||||
|
|
||||||
class Tracker:
|
class Tracker:
|
||||||
def __init__(self, port = 9091, port_end = 9999, verbose = True):
|
def __init__(self, port = 9091, port_end = 9999, verbose = True, hostIP = 'auto'):
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
for port in range(port, port_end):
|
for port in range(port, port_end):
|
||||||
try:
|
try:
|
||||||
@ -134,11 +134,18 @@ class Tracker:
|
|||||||
sock.listen(16)
|
sock.listen(16)
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.verbose = verbose
|
self.verbose = verbose
|
||||||
|
self.hostIP = hostIP
|
||||||
self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1)
|
self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1)
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
def slave_args(self):
|
def slave_args(self):
|
||||||
return ['rabit_tracker_uri=%s' % socket.gethostname(),
|
if self.hostIP == 'auto':
|
||||||
|
host = socket.gethostname()
|
||||||
|
elif self.hostIP = 'ip':
|
||||||
|
host = socket.gethostbyname(socket.getfqdn())
|
||||||
|
else:
|
||||||
|
host = hostIP
|
||||||
|
return ['rabit_tracker_uri=%s' % hostIP,
|
||||||
'rabit_tracker_port=%s' % self.port]
|
'rabit_tracker_port=%s' % self.port]
|
||||||
def get_neighbor(self, rank, nslave):
|
def get_neighbor(self, rank, nslave):
|
||||||
rank = rank + 1
|
rank = rank + 1
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user