Run training with empty DMatrix. (#4990)

This makes GPU Hist robust in distributed environment as some workers might not
be associated with any data in either training or evaluation.

* Disable rabit mock test for now: See #5012 .

* Disable dask-cudf test at prediction for now: See #5003

* Launch dask job for all workers despite they might not have any data.
* Check 0 rows in elementwise evaluation metrics.

   Using AUC and AUC-PR still throws an error.  See #4663 for a robust fix.

* Add tests for edge cases.
* Add `LaunchKernel` wrapper handling zero sized grid.
* Move some parts of allreducer into a cu file.
* Don't validate feature names when the booster is empty.

* Sync number of columns in DMatrix.

  As num_feature is required to be the same across all workers in data split
  mode.

* Filtering in dask interface now by default syncs all booster that's not
empty, instead of using rank 0.

* Fix Jenkins' GPU tests.

* Install dask-cuda from source in Jenkins' test.

  Now all tests are actually running.

* Restore GPU Hist tree synchronization test.

* Check UUID of running devices.

  The check is only performed on CUDA version >= 10.x, as 9.x doesn't have UUID field.

* Fix CMake policy and project variables.

  Use xgboost_SOURCE_DIR uniformly, add policy for CMake >= 3.13.

* Fix copying data to CPU

* Fix race condition in cpu predictor.

* Fix duplicated DMatrix construction.

* Don't download extra nccl in CI script.
This commit is contained in:
Jiaming Yuan
2019-11-06 16:13:13 +08:00
committed by GitHub
parent 807a244517
commit 7663de956c
44 changed files with 603 additions and 272 deletions

View File

@@ -513,7 +513,7 @@ class DMatrix(object):
try:
csr = scipy.sparse.csr_matrix(data)
self._init_from_csr(csr)
except:
except Exception:
raise TypeError('can not initialize DMatrix from'
' {}'.format(type(data).__name__))
@@ -577,9 +577,9 @@ class DMatrix(object):
if len(mat.shape) != 2:
raise ValueError('Expecting 2 dimensional numpy.ndarray, got: ',
mat.shape)
# flatten the array by rows and ensure it is float32.
# we try to avoid data copies if possible (reshape returns a view when possible
# and we explicitly tell np.array to try and avoid copying)
# flatten the array by rows and ensure it is float32. we try to avoid
# data copies if possible (reshape returns a view when possible and we
# explicitly tell np.array to try and avoid copying)
data = np.array(mat.reshape(mat.size), copy=False, dtype=np.float32)
handle = ctypes.c_void_p()
missing = missing if missing is not None else np.nan
@@ -1391,8 +1391,9 @@ class Booster(object):
value of the prediction. Note the last row and column correspond to the bias term.
validate_features : bool
When this is True, validate that the Booster's and data's feature_names are identical.
Otherwise, it is assumed that the feature_names are the same.
When this is True, validate that the Booster's and data's
feature_names are identical. Otherwise, it is assumed that the
feature_names are the same.
Returns
-------
@@ -1811,8 +1812,8 @@ class Booster(object):
msg = 'feature_names mismatch: {0} {1}'
if dat_missing:
msg += ('\nexpected ' + ', '.join(str(s) for s in dat_missing) +
' in input data')
msg += ('\nexpected ' + ', '.join(
str(s) for s in dat_missing) + ' in input data')
if my_missing:
msg += ('\ntraining data did not have the following fields: ' +
@@ -1821,7 +1822,8 @@ class Booster(object):
raise ValueError(msg.format(self.feature_names,
data.feature_names))
def get_split_value_histogram(self, feature, fmap='', bins=None, as_pandas=True):
def get_split_value_histogram(self, feature, fmap='', bins=None,
as_pandas=True):
"""Get split value histogram of a feature
Parameters

View File

@@ -55,10 +55,14 @@ def _start_tracker(host, n_workers):
return env
def _assert_dask_installed():
def _assert_dask_support():
if not DASK_INSTALLED:
raise ImportError(
'Dask needs to be installed in order to use this module')
if platform.system() == 'Windows':
msg = 'Windows is not officially supported for dask/xgboost,'
msg += ' contribution are welcomed.'
logging.warning(msg)
class RabitContext:
@@ -96,6 +100,11 @@ def _xgb_get_client(client):
return ret
def _get_client_workers(client):
workers = client.scheduler_info()['workers']
return workers
class DaskDMatrix:
# pylint: disable=missing-docstring, too-many-instance-attributes
'''DMatrix holding on references to Dask DataFrame or Dask Array.
@@ -132,7 +141,7 @@ class DaskDMatrix:
weight=None,
feature_names=None,
feature_types=None):
_assert_dask_installed()
_assert_dask_support()
self._feature_names = feature_names
self._feature_types = feature_types
@@ -263,6 +272,17 @@ class DaskDMatrix:
A DMatrix object.
'''
if worker.address not in set(self.worker_map.keys()):
msg = 'worker {address} has an empty DMatrix. ' \
'All workers associated with this DMatrix: {workers}'.format(
address=worker.address,
workers=set(self.worker_map.keys()))
logging.warning(msg)
d = DMatrix(numpy.empty((0, 0)),
feature_names=self._feature_names,
feature_types=self._feature_types)
return d
data, labels, weights = self.get_worker_parts(worker)
data = concat(data)
@@ -275,7 +295,6 @@ class DaskDMatrix:
weights = concat(weights)
else:
weights = None
dmatrix = DMatrix(data,
labels,
weight=weights,
@@ -342,35 +361,33 @@ def train(client, params, dtrain, *args, evals=(), **kwargs):
'eval': {'logloss': ['0.480385', '0.357756']}}}
'''
_assert_dask_installed()
if platform.system() == 'Windows':
msg = 'Windows is not officially supported for dask/xgboost,'
msg += ' contribution are welcomed.'
logging.warning(msg)
_assert_dask_support()
if 'evals_result' in kwargs.keys():
raise ValueError(
'evals_result is not supported in dask interface.',
'The evaluation history is returned as result of training.')
client = _xgb_get_client(client)
workers = list(_get_client_workers(client).keys())
worker_map = dtrain.worker_map
rabit_args = _get_rabit_args(worker_map, client)
rabit_args = _get_rabit_args(workers, client)
def dispatched_train(worker_id):
'''Perform training on worker.'''
logging.info('Training on %d', worker_id)
def dispatched_train(worker_addr):
'''Perform training on a single worker.'''
logging.info('Training on %s', str(worker_addr))
worker = distributed_get_worker()
local_dtrain = dtrain.get_worker_data(worker)
local_evals = []
if evals:
for mat, name in evals:
local_mat = mat.get_worker_data(worker)
local_evals.append((local_mat, name))
with RabitContext(rabit_args):
local_dtrain = dtrain.get_worker_data(worker)
local_evals = []
if evals:
for mat, name in evals:
if mat is dtrain:
local_evals.append((local_dtrain, name))
continue
local_mat = mat.get_worker_data(worker)
local_evals.append((local_mat, name))
local_history = {}
local_param = params.copy() # just to be consistent
bst = worker_train(params=local_param,
@@ -380,14 +397,14 @@ def train(client, params, dtrain, *args, evals=(), **kwargs):
evals=local_evals,
**kwargs)
ret = {'booster': bst, 'history': local_history}
if rabit.get_rank() != 0:
if local_dtrain.num_row() == 0:
ret = None
return ret
futures = client.map(dispatched_train,
range(len(worker_map)),
workers,
pure=False,
workers=list(worker_map.keys()))
workers=workers)
results = client.gather(futures)
return list(filter(lambda ret: ret is not None, results))[0]
@@ -414,7 +431,7 @@ def predict(client, model, data, *args):
prediction: dask.array.Array
'''
_assert_dask_installed()
_assert_dask_support()
if isinstance(model, Booster):
booster = model
elif isinstance(model, dict):
@@ -437,7 +454,8 @@ def predict(client, model, data, *args):
local_x = data.get_worker_data(worker)
with RabitContext(rabit_args):
local_predictions = booster.predict(data=local_x, *args)
local_predictions = booster.predict(
data=local_x, validate_features=local_x.num_row() != 0, *args)
return local_predictions
futures = client.map(dispatched_predict,
@@ -563,7 +581,7 @@ class DaskXGBRegressor(DaskScikitLearnBase):
sample_weights=None,
eval_set=None,
sample_weight_eval_set=None):
_assert_dask_installed()
_assert_dask_support()
dtrain = DaskDMatrix(client=self.client,
data=X, label=y, weight=sample_weights)
params = self.get_xgb_params()
@@ -579,7 +597,7 @@ class DaskXGBRegressor(DaskScikitLearnBase):
return self
def predict(self, data): # pylint: disable=arguments-differ
_assert_dask_installed()
_assert_dask_support()
test_dmatrix = DaskDMatrix(client=self.client, data=data)
pred_probs = predict(client=self.client,
model=self.get_booster(), data=test_dmatrix)
@@ -599,7 +617,7 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
sample_weights=None,
eval_set=None,
sample_weight_eval_set=None):
_assert_dask_installed()
_assert_dask_support()
dtrain = DaskDMatrix(client=self.client,
data=X, label=y, weight=sample_weights)
params = self.get_xgb_params()
@@ -626,7 +644,7 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
return self
def predict(self, data): # pylint: disable=arguments-differ
_assert_dask_installed()
_assert_dask_support()
test_dmatrix = DaskDMatrix(client=self.client, data=data)
pred_probs = predict(client=self.client,
model=self.get_booster(), data=test_dmatrix)

View File

@@ -332,7 +332,7 @@ class RabitTracker(object):
self.thread.start()
def join(self):
while self.thread.isAlive():
while self.thread.is_alive():
self.thread.join(100)
def alive(self):