From 15e085cd32c7a3f1d4f3b7007569205cea244ee3 Mon Sep 17 00:00:00 2001 From: tqchen Date: Mon, 12 Jan 2015 22:59:36 -0800 Subject: [PATCH] basic allreduce lib ready --- guide/basic.py | 23 ++++++++++++++++ guide/broadcast.py | 22 ++++++++++++++++ wrapper/rabit.py | 65 ++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 105 insertions(+), 5 deletions(-) create mode 100755 guide/basic.py create mode 100755 guide/broadcast.py diff --git a/guide/basic.py b/guide/basic.py new file mode 100755 index 000000000..32f4e018b --- /dev/null +++ b/guide/basic.py @@ -0,0 +1,23 @@ +#!/usr/bin/python +""" +demo python script of rabit +""" +import os +import sys +import numpy as np +# add path to wrapper +sys.path.append(os.path.dirname(__file__) + '/../wrapper') +import rabit + +rabit.init() +n = 3 +rank = rabit.get_rank() +a = np.zeros(n) +for i in xrange(n): + a[i] = rank + i + +print '@node[%d] before-allreduce: a=%s' % (rank, str(a)) +a = rabit.allreduce(a, rabit.MAX) +print '@node[%d] after-allreduce: a=%s' % (rank, str(a)) +rabit.finalize() + diff --git a/guide/broadcast.py b/guide/broadcast.py new file mode 100755 index 000000000..cb2f45088 --- /dev/null +++ b/guide/broadcast.py @@ -0,0 +1,22 @@ +#!/usr/bin/python +""" +demo python script of rabit +""" +import os +import sys +# add path to wrapper +sys.path.append(os.path.dirname(__file__) + '/../wrapper') +import rabit + +rabit.init() +n = 3 +rank = rabit.get_rank() +s = None +if rank == 0: + s = {'hello world':100, 2:3} +print '@node[%d] before-broadcast: s=\"%s\"' % (rank, str(s)) +s = rabit.broadcast(s, 0) + +print '@node[%d] after-broadcast: s=\"%s\"' % (rank, str(s)) +rabit.finalize() + diff --git a/wrapper/rabit.py b/wrapper/rabit.py index 19e7b2f9d..5458c339c 100644 --- a/wrapper/rabit.py +++ b/wrapper/rabit.py @@ -7,6 +7,8 @@ import cPickle as pickle import ctypes import os import sys +import numpy as np + if os.name == 'nt': assert False, "Rabit windows is not yet compiled" else: @@ -17,6 +19,12 @@ rbtlib = ctypes.cdll.LoadLibrary(RABIT_PATH) rbtlib.RabitGetRank.restype = ctypes.c_int rbtlib.RabitGetWorldSize.restype = ctypes.c_int +# reduction operators +MAX = 0 +MIN = 1 +SUM = 2 +BITOR = 3 + def check_err__(): """ reserved function used to check error @@ -89,12 +97,15 @@ def broadcast(data, root): Example: the following example broadcast hello from rank 0 to all other nodes ```python - import rabit rabit.init() - if rabit.get_rank() == 0: - res = rabit.broadcast('hello', 0) - else: - res = rabit.broadcast(None, 0) + n = 3 + rank = rabit.get_rank() + s = None + if rank == 0: + s = {'hello world':100, 2:3} + print '@node[%d] before-broadcast: s=\"%s\"' % (rank, str(s)) + s = rabit.broadcast(s, 0) + print '@node[%d] after-broadcast: s=\"%s\"' % (rank, str(s)) rabit.finalize() ``` @@ -103,6 +114,8 @@ def broadcast(data, root): input data, if current rank does not equal root, this can be None root: int rank of the node to broadcast data from + Returns: + the result of broadcast """ rank = get_rank() length = ctypes.c_ulong() @@ -124,3 +137,45 @@ def broadcast(data, root): length.value, root) check_err__() return pickle.loads(dptr.value) + +def allreduce(data, op, prepare_fun = None): + """ + perform allreduce, return the result, this function is not thread-safe + Arguments: + data: numpy ndarray + input data + op: reduction operators, can be MIN, MAX, SUM, BITOR + prepare_fun: lambda : + Lazy preprocessing function, if it is not None, prepare_fun() + will be called by the function before performing allreduce, to intialize the data + If the result of Allreduce can be recovered directly, then prepare_fun will NOT be called + Returns: + the result of allreduce, have same shape as data + """ + if not isinstance(data, np.ndarray): + raise Exception('allreduce only takes in numpy.ndarray') + buf = data.ravel() + if buf.base is data.base: + buf = buf.copy() + if buf.dtype is np.dtype('int8'): + dtype = 0 + elif buf.dtype is np.dtype('uint8'): + dtype = 1 + elif buf.dtype is np.dtype('int32'): + dtype = 2 + elif buf.dtype is np.dtype('uint32'): + dtype = 3 + elif buf.dtype is np.dtype('int64'): + dtype = 4 + elif buf.dtype is np.dtype('uint64'): + dtype = 5 + elif buf.dtype is np.dtype('float32'): + dtype = 6 + elif buf.dtype is np.dtype('float64'): + dtype = 7 + else: + raise Exception('data type %s not supported' % str(buf.dtype)) + rbtlib.RabitAllreduce(buf.ctypes.data_as(ctypes.c_void_p), + buf.size, dtype, op, None, None); + check_err__() + return buf