* Initial commit to support multi-node multi-gpu xgboost using dask * Fixed NCCL initialization by not ignoring the opg parameter. - it now crashes on NCCL initialization, but at least we're attempting it properly * At the root node, perform a rabit::Allreduce to get initial sum_gradient across workers * Synchronizing in a couple of more places. - now the workers don't go down, but just hang - no more "wild" values of gradients - probably needs syncing in more places * Added another missing max-allreduce operation inside BuildHistLeftRight * Removed unnecessary collective operations. * Simplified rabit::Allreduce() sync of gradient sums. * Removed unnecessary rabit syncs around ncclAllReduce. - this improves performance _significantly_ (7x faster for overall training, 20x faster for xgboost proper) * pulling in latest xgboost * removing changes to updater_quantile_hist.cc * changing use_nccl_opg initialization, removing unnecessary if statements * added definition for opaque ncclUniqueId struct to properly encapsulate GetUniqueId * placing struct defintion in guard to avoid duplicate code errors * addressing linting errors * removing * removing additional arguments to AllReduer initialization * removing distributed flag * making comm init symmetric * removing distributed flag * changing ncclCommInit to support multiple modalities * fix indenting * updating ncclCommInitRank block with necessary group calls * fix indenting * adding print statement, and updating accessor in vector * improving print statement to end-line * generalizing nccl_rank construction using rabit * assume device_ordinals is the same for every node * test, assume device_ordinals is identical for all nodes * test, assume device_ordinals is unique for all nodes * changing names of offset variable to be more descriptive, editing indenting * wrapping ncclUniqueId GetUniqueId() and aesthetic changes * adding synchronization, and tests for distributed * adding to tests * fixing broken #endif * fixing initialization of gpu histograms, correcting errors in tests * adding to contributors list * adding distributed tests to jenkins * fixing bad path in distributed test * debugging * adding kubernetes for distributed tests * adding proper import for OrderedDict * adding urllib3==1.22 to address ordered_dict import error * added sleep to allow workers to save their models for comparison * adding name to GPU contributors under docs
52 lines
1.9 KiB
Python
52 lines
1.9 KiB
Python
#!/usr/bin/python
|
|
import xgboost as xgb
|
|
import time
|
|
from collections import OrderedDict
|
|
|
|
# Always call this before using distributed module
|
|
xgb.rabit.init()
|
|
rank = xgb.rabit.get_rank()
|
|
world = xgb.rabit.get_world_size()
|
|
|
|
# Load file, file will be automatically sharded in distributed mode.
|
|
dtrain = xgb.DMatrix('../../demo/data/agaricus.txt.train')
|
|
dtest = xgb.DMatrix('../../demo/data/agaricus.txt.test')
|
|
|
|
# Specify parameters via map, definition are same as c++ version
|
|
param = {'n_gpus': 2, 'gpu_id': 2*rank, 'tree_method': 'gpu_hist', 'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'binary:logistic' }
|
|
|
|
# Specify validations set to watch performance
|
|
watchlist = [(dtest,'eval'), (dtrain,'train')]
|
|
num_round = 20
|
|
|
|
# Run training, all the features in training API is available.
|
|
# Currently, this script only support calling train once for fault recovery purpose.
|
|
bst = xgb.train(param, dtrain, num_round, watchlist, early_stopping_rounds=2)
|
|
|
|
# Have each worker save its model
|
|
model_name = "test.model.2x2." + str(rank)
|
|
bst.dump_model(model_name, with_stats=True); time.sleep(2)
|
|
xgb.rabit.tracker_print("Finished training\n")
|
|
|
|
fail = False
|
|
if (rank == 0):
|
|
for i in range(0, world):
|
|
model_name_root = "test.model.2x2." + str(i)
|
|
for j in range(0, world):
|
|
if i != j:
|
|
with open(model_name_root, 'r') as model_root:
|
|
model_name_rank = "test.model.2x2." + str(j)
|
|
with open(model_name_rank, 'r') as model_rank:
|
|
diff = set(model_root).difference(model_rank)
|
|
if len(diff) != 0:
|
|
fail = True
|
|
xgb.rabit.finalize()
|
|
raise Exception('Worker models diverged: test.model.2x2.{} differs from test.model.2x2.{}'.format(i, j))
|
|
|
|
if (rank != 0) and (fail):
|
|
xgb.rabit.finalize()
|
|
|
|
# Notify the tracker all training has been successful
|
|
# This is only needed in distributed training.
|
|
xgb.rabit.finalize()
|