From ef2de29f068c0b22a4fb85ca556b7b77950073d6 Mon Sep 17 00:00:00 2001 From: tqchen Date: Tue, 3 Mar 2015 13:13:21 -0800 Subject: [PATCH] Squashed 'subtree/rabit/' changes from 4db0a62..d4ec037 d4ec037 fix rabit 6612fcf Merge branch 'master' of ssh://github.com/tqchen/rabit d29892c add mock option statis 4fa054e new tracker 75c647c update tracker for host IP e4ce8ef add hadoop linear example 76ecb4a add hadoop linear example 2e1c4c9 add hadoop linear example git-subtree-dir: subtree/rabit git-subtree-split: d4ec037f2e94c52fbbc5acfe14824454b4a26cec --- include/rabit/io.h | 8 ++-- rabit-learn/linear/README.md | 6 ++- rabit-learn/linear/run-hadoop.sh | 20 +++++++++ src/allreduce_mock.h | 77 +++++++++++++++++++++++++++++++- src/allreduce_robust.h | 2 +- tracker/rabit_hadoop.py | 4 +- tracker/rabit_tracker.py | 15 +++++-- 7 files changed, 119 insertions(+), 13 deletions(-) create mode 100755 rabit-learn/linear/run-hadoop.sh diff --git a/include/rabit/io.h b/include/rabit/io.h index d7597b915..b6b627b7f 100644 --- a/include/rabit/io.h +++ b/include/rabit/io.h @@ -38,7 +38,7 @@ struct MemoryFixSizeBuffer : public ISeekStream { utils::Assert(curr_ptr_ + size <= buffer_size_, "read can not have position excceed buffer length"); size_t nread = std::min(buffer_size_ - curr_ptr_, size); - if (nread != 0) memcpy(ptr, p_buffer_ + curr_ptr_, nread); + if (nread != 0) std::memcpy(ptr, p_buffer_ + curr_ptr_, nread); curr_ptr_ += nread; return nread; } @@ -46,7 +46,7 @@ struct MemoryFixSizeBuffer : public ISeekStream { if (size == 0) return; utils::Assert(curr_ptr_ + size <= buffer_size_, "write position exceed fixed buffer size"); - memcpy(p_buffer_ + curr_ptr_, ptr, size); + std::memcpy(p_buffer_ + curr_ptr_, ptr, size); curr_ptr_ += size; } virtual void Seek(size_t pos) { @@ -77,7 +77,7 @@ struct MemoryBufferStream : public ISeekStream { utils::Assert(curr_ptr_ <= p_buffer_->length(), "read can not have position excceed buffer length"); size_t nread = std::min(p_buffer_->length() - curr_ptr_, size); - if (nread != 0) memcpy(ptr, &(*p_buffer_)[0] + curr_ptr_, nread); + if (nread != 0) std::memcpy(ptr, &(*p_buffer_)[0] + curr_ptr_, nread); curr_ptr_ += nread; return nread; } @@ -86,7 +86,7 @@ struct MemoryBufferStream : public ISeekStream { if (curr_ptr_ + size > p_buffer_->length()) { p_buffer_->resize(curr_ptr_+size); } - memcpy(&(*p_buffer_)[0] + curr_ptr_, ptr, size); + std::memcpy(&(*p_buffer_)[0] + curr_ptr_, ptr, size); curr_ptr_ += size; } virtual void Seek(size_t pos) { diff --git a/rabit-learn/linear/README.md b/rabit-learn/linear/README.md index e94868155..68d132b66 100644 --- a/rabit-learn/linear/README.md +++ b/rabit-learn/linear/README.md @@ -1,8 +1,10 @@ Linear and Logistic Regression ==== * input format: LibSVM -* Example: [run-linear.sh](run-linear.sh) - +* Local Example: [run-linear.sh](run-linear.sh) +* Runnig on Hadoop: [run-hadoop.sh](run-hadoop.sh) + - Set input data to stdin, and model_out=stdout + Parameters === All the parameters can be set by param=value diff --git a/rabit-learn/linear/run-hadoop.sh b/rabit-learn/linear/run-hadoop.sh new file mode 100755 index 000000000..c090f3966 --- /dev/null +++ b/rabit-learn/linear/run-hadoop.sh @@ -0,0 +1,20 @@ +#!/bin/bash +if [ "$#" -lt 3 ]; +then + echo "Usage: [param=val]" + exit -1 +fi + +# put the local training file to HDFS +hadoop fs -rm -r -f $2/data +hadoop fs -rm -r -f $2/mushroom.linear.model +hadoop fs -mkdir $2/data +hadoop fs -put ../data/agaricus.txt.train $2/data + +# submit to hadoop +../../tracker/rabit_hadoop.py --host_ip ip -n $1 -i $2/data/agaricus.txt.train -o $2/mushroom.linear.model linear.rabit stdin model_out=stdout "${*:3}" + +# get the final model file +hadoop fs -get $2/mushroom.linear.model/part-00000 ./linear.model + +./linear.rabit ../data/agaricus.txt.test task=pred model_in=linear.model diff --git a/src/allreduce_mock.h b/src/allreduce_mock.h index 33b8b60ae..7a16c1393 100644 --- a/src/allreduce_mock.h +++ b/src/allreduce_mock.h @@ -9,7 +9,9 @@ #define RABIT_ALLREDUCE_MOCK_H #include #include +#include #include "../include/rabit/engine.h" +#include "../include/rabit/timer.h" #include "./allreduce_robust.h" namespace rabit { @@ -19,6 +21,9 @@ class AllreduceMock : public AllreduceRobust { // constructor AllreduceMock(void) { num_trial = 0; + force_local = 0; + report_stats = 0; + tsum_allreduce = 0.0; } // destructor virtual ~AllreduceMock(void) {} @@ -26,6 +31,8 @@ class AllreduceMock : public AllreduceRobust { AllreduceRobust::SetParam(name, val); // additional parameters if (!strcmp(name, "rabit_num_trial")) num_trial = atoi(val); + if (!strcmp(name, "report_stats")) report_stats = atoi(val); + if (!strcmp(name, "force_local")) force_local = atoi(val); if (!strcmp(name, "mock")) { MockKey k; utils::Check(sscanf(val, "%d,%d,%d,%d", @@ -41,25 +48,93 @@ class AllreduceMock : public AllreduceRobust { PreprocFunction prepare_fun, void *prepare_arg) { this->Verify(MockKey(rank, version_number, seq_counter, num_trial), "AllReduce"); + double tstart = utils::GetTime(); AllreduceRobust::Allreduce(sendrecvbuf_, type_nbytes, count, reducer, prepare_fun, prepare_arg); + tsum_allreduce += utils::GetTime() - tstart; } virtual void Broadcast(void *sendrecvbuf_, size_t total_size, int root) { this->Verify(MockKey(rank, version_number, seq_counter, num_trial), "Broadcast"); AllreduceRobust::Broadcast(sendrecvbuf_, total_size, root); } + virtual int LoadCheckPoint(ISerializable *global_model, + ISerializable *local_model) { + tsum_allreduce = 0.0; + time_checkpoint = utils::GetTime(); + if (force_local == 0) { + return AllreduceRobust::LoadCheckPoint(global_model, local_model); + } else { + DummySerializer dum; + ComboSerializer com(global_model, local_model); + return AllreduceRobust::LoadCheckPoint(&dum, &com); + } + } virtual void CheckPoint(const ISerializable *global_model, const ISerializable *local_model) { this->Verify(MockKey(rank, version_number, seq_counter, num_trial), "CheckPoint"); - AllreduceRobust::CheckPoint(global_model, local_model); + double tstart = utils::GetTime(); + double tbet_chkpt = tstart - time_checkpoint; + if (force_local == 0) { + AllreduceRobust::CheckPoint(global_model, local_model); + } else { + DummySerializer dum; + ComboSerializer com(global_model, local_model); + AllreduceRobust::CheckPoint(&dum, &com); + } + tsum_allreduce = 0.0; + time_checkpoint = utils::GetTime(); + double tcost = utils::GetTime() - tstart; + if (report_stats != 0 && rank == 0) { + std::stringstream ss; + ss << "[v" << version_number << "] global_size=" << global_checkpoint.length() + << "local_size=" << local_chkpt[local_chkpt_version].length() + << "check_tcost="<< tcost <<" sec," + << "allreduce_tcost=" << tsum_allreduce << " sec," + << "between_chpt=" << tbet_chkpt << "sec\n"; + this->TrackerPrint(ss.str()); + } } virtual void LazyCheckPoint(const ISerializable *global_model) { this->Verify(MockKey(rank, version_number, seq_counter, num_trial), "LazyCheckPoint"); AllreduceRobust::LazyCheckPoint(global_model); } + protected: + // force checkpoint to local + int force_local; + // whether report statistics + int report_stats; + // sum of allreduce + double tsum_allreduce; + double time_checkpoint; private: + struct DummySerializer : public ISerializable { + virtual void Load(IStream &fi) { + } + virtual void Save(IStream &fo) const { + } + }; + struct ComboSerializer : public ISerializable { + ISerializable *lhs; + ISerializable *rhs; + const ISerializable *c_lhs; + const ISerializable *c_rhs; + ComboSerializer(ISerializable *lhs, ISerializable *rhs) + : lhs(lhs), rhs(rhs), c_lhs(lhs), c_rhs(rhs) { + } + ComboSerializer(const ISerializable *lhs, const ISerializable *rhs) + : lhs(NULL), rhs(NULL), c_lhs(lhs), c_rhs(rhs) { + } + virtual void Load(IStream &fi) { + if (lhs != NULL) lhs->Load(fi); + if (rhs != NULL) rhs->Load(fi); + } + virtual void Save(IStream &fo) const { + if (c_lhs != NULL) c_lhs->Save(fo); + if (c_rhs != NULL) c_rhs->Save(fo); + } + }; // key to identify the mock stage struct MockKey { int rank; diff --git a/src/allreduce_robust.h b/src/allreduce_robust.h index ff5c046ac..34b740dfe 100644 --- a/src/allreduce_robust.h +++ b/src/allreduce_robust.h @@ -138,7 +138,7 @@ class AllreduceRobust : public AllreduceBase { ReConnectLinks("recover"); } - private: + protected: // constant one byte out of band message to indicate error happening // and mark for channel cleanup static const char kOOBReset = 95; diff --git a/tracker/rabit_hadoop.py b/tracker/rabit_hadoop.py index 682ec69a1..4c87460d8 100755 --- a/tracker/rabit_hadoop.py +++ b/tracker/rabit_hadoop.py @@ -37,6 +37,8 @@ parser = argparse.ArgumentParser(description='Rabit script to submit rabit jobs 'This script support both Hadoop 1.0 and Yarn(MRv2), Yarn is recommended') parser.add_argument('-n', '--nworker', required=True, type=int, help = 'number of worker proccess to be launched') +parser.add_argument('-hip', '--host_ip', default='auto', type=str, + help = 'host IP address if cannot be automatically guessed, specify the IP of submission machine') parser.add_argument('-nt', '--nthread', default = -1, type=int, help = 'number of thread in each mapper to be launched, set it if each rabit job is multi-threaded') parser.add_argument('-i', '--input', required=True, @@ -149,4 +151,4 @@ def hadoop_streaming(nworker, worker_args, use_yarn): subprocess.check_call(cmd, shell = True) fun_submit = lambda nworker, worker_args: hadoop_streaming(nworker, worker_args, int(hadoop_version[0]) >= 2) -tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose) +tracker.submit(args.nworker, [], fun_submit = fun_submit, verbose = args.verbose, hostIP = args.host_ip) diff --git a/tracker/rabit_tracker.py b/tracker/rabit_tracker.py index aa21973e5..fe1ec7eb8 100644 --- a/tracker/rabit_tracker.py +++ b/tracker/rabit_tracker.py @@ -122,7 +122,7 @@ class SlaveEntry: return rmset class Tracker: - def __init__(self, port = 9091, port_end = 9999, verbose = True): + def __init__(self, port = 9091, port_end = 9999, verbose = True, hostIP = 'auto'): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for port in range(port, port_end): try: @@ -134,11 +134,18 @@ class Tracker: sock.listen(16) self.sock = sock self.verbose = verbose + self.hostIP = hostIP self.log_print('start listen on %s:%d' % (socket.gethostname(), self.port), 1) def __del__(self): self.sock.close() def slave_args(self): - return ['rabit_tracker_uri=%s' % socket.gethostname(), + if self.hostIP == 'auto': + host = socket.gethostname() + elif self.hostIP == 'ip': + host = socket.gethostbyname(socket.getfqdn()) + else: + host = self.hostIP + return ['rabit_tracker_uri=%s' % host, 'rabit_tracker_port=%s' % self.port] def get_neighbor(self, rank, nslave): rank = rank + 1 @@ -254,8 +261,8 @@ class Tracker: wait_conn[rank] = s self.log_print('@tracker All nodes finishes job', 2) -def submit(nslave, args, fun_submit, verbose): - master = Tracker(verbose = verbose) +def submit(nslave, args, fun_submit, verbose, hostIP): + master = Tracker(verbose = verbose, hostIP = hostIP) submit_thread = Thread(target = fun_submit, args = (nslave, args + master.slave_args())) submit_thread.daemon = True submit_thread.start()