* implement broadcast for federated communicator * implement allreduce * add communicator factory * add device adapter * add device communicator to factory * add rabit communicator * add rabit communicator to the factory * add nccl device communicator * add synchronize to device communicator * add back print and getprocessorname * add python wrapper and c api * clean up types * fix non-gpu build * try to fix ci * fix std::size_t * portable string compare ignore case * c style size_t * fix lint errors * cross platform setenv * fix memory leak * fix lint errors * address review feedback * add python test for rabit communicator * fix failing gtest * use json to configure communicators * fix lint error * get rid of factories * fix cpu build * fix include * fix python import * don't export collective.py yet * skip collective communicator pytest on windows * add review feedback * update documentation * remove mpi communicator type * fix tests * shutdown the communicator separately Co-authored-by: Hyunsu Cho <chohyu01@cs.washington.edu>
40 lines
1.3 KiB
Python
40 lines
1.3 KiB
Python
import multiprocessing
|
|
import socket
|
|
import sys
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
import xgboost as xgb
|
|
from xgboost import RabitTracker
|
|
from xgboost import collective
|
|
|
|
if sys.platform.startswith("win"):
|
|
pytest.skip("Skipping collective tests on Windows", allow_module_level=True)
|
|
|
|
|
|
def run_rabit_worker(rabit_env, world_size):
|
|
with xgb.collective.CommunicatorContext(**rabit_env):
|
|
assert xgb.collective.get_world_size() == world_size
|
|
assert xgb.collective.is_distributed()
|
|
assert xgb.collective.get_processor_name() == socket.gethostname()
|
|
ret = xgb.collective.broadcast('test1234', 0)
|
|
assert str(ret) == 'test1234'
|
|
ret = xgb.collective.allreduce(np.asarray([1, 2, 3]), xgb.collective.Op.SUM)
|
|
assert np.array_equal(ret, np.asarray([2, 4, 6]))
|
|
|
|
|
|
def test_rabit_communicator():
|
|
world_size = 2
|
|
tracker = RabitTracker(host_ip='127.0.0.1', n_workers=world_size)
|
|
tracker.start(world_size)
|
|
workers = []
|
|
for _ in range(world_size):
|
|
worker = multiprocessing.Process(target=run_rabit_worker,
|
|
args=(tracker.worker_envs(), world_size))
|
|
workers.append(worker)
|
|
worker.start()
|
|
for worker in workers:
|
|
worker.join()
|
|
assert worker.exitcode == 0
|