Update base margin dask (#6155)

* Add `base-margin`
* Add `output_margin` to regressor.

Co-authored-by: fis <jm.yuan@outlook.com>
This commit is contained in:
Kyle Nicholson
2020-09-26 09:30:52 -04:00
committed by GitHub
parent 03b8fdec74
commit e6a238c020
2 changed files with 160 additions and 48 deletions

View File

@@ -337,14 +337,22 @@ class DaskDMatrix:
'is_quantile': self.is_quantile}
def _get_worker_x_ordered(worker_map, partition_order, worker):
def _get_worker_parts_ordered(has_base_margin, worker_map, partition_order,
worker):
list_of_parts = worker_map[worker.address]
client = get_client()
list_of_parts_value = client.gather(list_of_parts)
result = []
for i, part in enumerate(list_of_parts):
result.append((list_of_parts_value[i][0],
partition_order[part.key]))
data = list_of_parts_value[i][0]
if has_base_margin:
base_margin = list_of_parts_value[i][1]
else:
base_margin = None
result.append((data, base_margin, partition_order[part.key]))
return result
@@ -740,9 +748,7 @@ async def _direct_predict_impl(client, data, predict_fn):
# pylint: disable=too-many-statements
async def _predict_async(client: Client, model, data, missing=numpy.nan,
**kwargs):
async def _predict_async(client: Client, model, data, missing=numpy.nan, **kwargs):
if isinstance(model, Booster):
booster = model
elif isinstance(model, dict):
@@ -775,22 +781,30 @@ async def _predict_async(client: Client, model, data, missing=numpy.nan,
feature_names = data.feature_names
feature_types = data.feature_types
missing = data.missing
has_margin = "base_margin" in data.meta_names
def dispatched_predict(worker_id):
'''Perform prediction on each worker.'''
LOGGER.info('Predicting on %d', worker_id)
worker = distributed_get_worker()
list_of_parts = _get_worker_x_ordered(worker_map, partition_order,
worker)
list_of_parts = _get_worker_parts_ordered(
has_margin, worker_map, partition_order, worker)
predictions = []
booster.set_param({'nthread': worker.nthreads})
for part, order in list_of_parts:
local_x = DMatrix(part, feature_names=feature_names,
feature_types=feature_types,
missing=missing, nthread=worker.nthreads)
predt = booster.predict(data=local_x,
validate_features=local_x.num_row() != 0,
**kwargs)
for data, base_margin, order in list_of_parts:
local_part = DMatrix(
data,
base_margin=base_margin,
feature_names=feature_names,
feature_types=feature_types,
missing=missing,
nthread=worker.nthreads
)
predt = booster.predict(
data=local_part,
validate_features=local_part.num_row() != 0,
**kwargs)
columns = 1 if len(predt.shape) == 1 else predt.shape[1]
ret = ((delayed(predt), columns), order)
predictions.append(ret)
@@ -800,9 +814,13 @@ async def _predict_async(client: Client, model, data, missing=numpy.nan,
'''Get shape of data in each worker.'''
LOGGER.info('Get shape on %d', worker_id)
worker = distributed_get_worker()
list_of_parts = _get_worker_x_ordered(worker_map,
partition_order, worker)
shapes = [(part.shape, order) for part, order in list_of_parts]
list_of_parts = _get_worker_parts_ordered(
False,
worker_map,
partition_order,
worker
)
shapes = [(part.shape, order) for part, _, order in list_of_parts]
return shapes
async def map_function(func):
@@ -984,6 +1002,7 @@ class DaskScikitLearnBase(XGBModel):
# pylint: disable=arguments-differ
def fit(self, X, y,
sample_weights=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
verbose=True):
@@ -1044,12 +1063,14 @@ class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase):
X,
y,
sample_weights=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
verbose=True):
dtrain = await DaskDMatrix(client=self.client,
data=X, label=y, weight=sample_weights,
missing=self.missing)
dtrain = await DaskDMatrix(
client=self.client, data=X, label=y, weight=sample_weights,
base_margin=base_margin, missing=self.missing
)
params = self.get_xgb_params()
evals = await _evaluation_matrices(self.client,
eval_set, sample_weight_eval_set,
@@ -1065,24 +1086,33 @@ class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase):
# pylint: disable=missing-docstring
def fit(self, X, y,
sample_weights=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
verbose=True):
_assert_dask_support()
return self.client.sync(self._fit_async, X, y, sample_weights,
eval_set, sample_weight_eval_set,
verbose)
return self.client.sync(
self._fit_async, X, y, sample_weights, base_margin,
eval_set, sample_weight_eval_set, verbose
)
async def _predict_async(self, data): # pylint: disable=arguments-differ
test_dmatrix = await DaskDMatrix(client=self.client, data=data,
missing=self.missing)
async def _predict_async(
self, data, output_margin=False, base_margin=None):
test_dmatrix = await DaskDMatrix(
client=self.client, data=data, base_margin=base_margin,
missing=self.missing
)
pred_probs = await predict(client=self.client,
model=self.get_booster(), data=test_dmatrix)
model=self.get_booster(), data=test_dmatrix,
output_margin=output_margin)
return pred_probs
def predict(self, data):
# pylint: disable=arguments-differ
def predict(self, data, output_margin=False, base_margin=None):
_assert_dask_support()
return self.client.sync(self._predict_async, data)
return self.client.sync(self._predict_async, data,
output_margin=output_margin,
base_margin=base_margin)
@xgboost_model_doc(
@@ -1092,11 +1122,13 @@ class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase):
class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
async def _fit_async(self, X, y,
sample_weights=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
verbose=True):
dtrain = await DaskDMatrix(client=self.client,
data=X, label=y, weight=sample_weights,
base_margin=base_margin,
missing=self.missing)
params = self.get_xgb_params()
@@ -1126,33 +1158,46 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
def fit(self, X, y,
sample_weights=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
verbose=True):
_assert_dask_support()
return self.client.sync(self._fit_async, X, y, sample_weights,
eval_set, sample_weight_eval_set, verbose)
return self.client.sync(
self._fit_async, X, y, sample_weights, base_margin, eval_set,
sample_weight_eval_set, verbose
)
async def _predict_proba_async(self, data):
_assert_dask_support()
test_dmatrix = await DaskDMatrix(client=self.client, data=data,
missing=self.missing)
async def _predict_proba_async(self, data, output_margin=False,
base_margin=None):
test_dmatrix = await DaskDMatrix(
client=self.client, data=data, base_margin=base_margin,
missing=self.missing
)
pred_probs = await predict(client=self.client,
model=self.get_booster(), data=test_dmatrix)
model=self.get_booster(),
data=test_dmatrix,
output_margin=output_margin)
return pred_probs
def predict_proba(self, data): # pylint: disable=arguments-differ,missing-docstring
def predict_proba(self, data, output_margin=False, base_margin=None): # pylint: disable=arguments-differ,missing-docstring
_assert_dask_support()
return self.client.sync(self._predict_proba_async, data)
return self.client.sync(
self._predict_proba_async,
data,
output_margin=output_margin,
base_margin=base_margin
)
async def _predict_async(self, data):
_assert_dask_support()
test_dmatrix = await DaskDMatrix(client=self.client, data=data,
missing=self.missing)
async def _predict_async(self, data, output_margin=False, base_margin=None):
test_dmatrix = await DaskDMatrix(
client=self.client, data=data, base_margin=base_margin,
missing=self.missing
)
pred_probs = await predict(client=self.client,
model=self.get_booster(), data=test_dmatrix)
model=self.get_booster(),
data=test_dmatrix,
output_margin=output_margin)
if self.n_classes_ == 2:
preds = (pred_probs > 0.5).astype(int)
@@ -1161,6 +1206,11 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase):
return preds
def predict(self, data): # pylint: disable=arguments-differ
def predict(self, data, output_margin=False, base_margin=None): # pylint: disable=arguments-differ
_assert_dask_support()
return self.client.sync(self._predict_async, data)
return self.client.sync(
self._predict_async,
data,
output_margin=output_margin,
base_margin=base_margin
)