diff --git a/src/allreduce_mock.h b/src/allreduce_mock.h index f46ee6885..96bc55800 100644 --- a/src/allreduce_mock.h +++ b/src/allreduce_mock.h @@ -85,7 +85,8 @@ class AllreduceMock : public AllreduceRobust { inline void Verify(const MockKey &key, const char *name) { if (mock_map.count(key) != 0) { num_trial += 1; - utils::Error("[%d]@@@Hit Mock Error:%s", rank, name); + fprintf(stderr, "[%d]@@@Hit Mock Error:%s\n", rank, name); + exit(-2); } } }; diff --git a/test/keepalive.sh b/test/keepalive.sh deleted file mode 100755 index c4df061a9..000000000 --- a/test/keepalive.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash -if [ "$#" -lt 1 ]; -then - echo "Usage: program parameters" - echo "Repeatively run program until success" - exit -1 -fi -nrep=0 -echo ./$@ rabit_task_id=$OMPI_COMM_WORLD_RANK -until ./$@ rabit_task_id=$OMPI_COMM_WORLD_RANK rabit_num_trial=$nrep; do - sleep 1 - nrep=$((nrep+1)) - echo ./$@ rabit_task_id=$OMPI_COMM_WORLD_RANK rabit_num_trial=$nrep -done diff --git a/test/test.mk b/test/test.mk index b1dddb0b4..b3525b6d4 100644 --- a/test/test.mk +++ b/test/test.mk @@ -10,17 +10,17 @@ endif local_recover: - ../tracker/rabit_mpi.py -n $(nslave) test_local_recover $(ndata) rabit_local_replica=1 + ../tracker/rabit_demo.py -n $(nslave) test_local_recover $(ndata) rabit_local_replica=1 local_recover_10_10k: - ../tracker/rabit_mpi.py -n 10 test_local_recover 10000 rabit_local_replica=1 + ../tracker/rabit_demo.py -n 10 test_local_recover 10000 rabit_local_replica=1 # this experiment test recovery with actually process exit, use keepalive to keep program alive model_recover_10_10k: - ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 + ../tracker/rabit_demo.py -n 10 test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 model_recover_10_10k_die_same: - ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 + ../tracker/rabit_demo.py -n 10 test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 model_recover_10_10k_die_hard: - ../tracker/rabit_mpi.py -n 10 keepalive.sh test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 + ../tracker/rabit_demo.py -n 10 test_model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 diff --git a/tracker/rabit_demo.py b/tracker/rabit_demo.py new file mode 100755 index 000000000..aeeb6e9a3 --- /dev/null +++ b/tracker/rabit_demo.py @@ -0,0 +1,57 @@ +#!/usr/bin/python +""" +This is the demo submission script of rabit, it is created to +submit rabit jobs using hadoop streaming +""" +import argparse +import sys +import os +import subprocess +from threading import Thread +import rabit_tracker as tracker + +parser = argparse.ArgumentParser(description='Rabit script to submit rabit job locally using python subprocess') +parser.add_argument('-n', '--nworker', required=True, type=int, + help = 'number of worker proccess to be launched') +parser.add_argument('-v', '--verbose', default=0, choices=[0, 1], type=int, + help = 'print more messages into the console') +parser.add_argument('command', nargs='+', + help = 'command for rabit program') +args = parser.parse_args() + +def exec_cmd(cmd, taskid): + if cmd[0].find('/') == -1 and os.path.exists(cmd[0]): + cmd[0] = './' + cmd[0] + cmd = ' '.join(cmd) + ntrial = 0 + while True: + arg = ' rabit_task_id=%d rabit_num_trial=%d' % (taskid, ntrial) + ret = subprocess.call(cmd + arg, shell = True) + if ret == 254 or ret == -2: + ntrial += 1 + continue + if ret == 0: + return + raise Exception('Get nonzero return code=%d' % ret) +# +# Note: this submit script is only used for demo purpose +# submission script using pyhton multi-threading +# +def mthread_submit(nslave, slave_args): + """ + customized submit script, that submit nslave jobs, each must contain args as parameter + note this can be a lambda function containing additional parameters in input + Parameters + nslave number of slave process to start up + args arguments to launch each job + this usually includes the parameters of master_uri and parameters passed into submit + """ + procs = {} + for i in range(nslave): + procs[i] = Thread(target = exec_cmd, args = (args.command + slave_args, i)) + procs[i].start() + for i in range(nslave): + procs[i].join() + +# call submit, with nslave, the commands to run each job and submit function +tracker.submit(args.nworker, [], fun_submit = mthread_submit, verbose = args.verbose) diff --git a/tracker/rabit_mpi.py b/tracker/rabit_mpi.py index 662c173bc..604ed3bf7 100755 --- a/tracker/rabit_mpi.py +++ b/tracker/rabit_mpi.py @@ -20,9 +20,7 @@ parser.add_argument('command', nargs='+', help = 'command for rabit program') args = parser.parse_args() # -# Note: this submit script is only used for demo purpose -# It does not have to be mpirun, it can be any job submission -# script that starts the job, qsub, hadoop streaming etc. +# submission script using MPI # def mpi_submit(nslave, slave_args): """