[dask] Support more meta data on functional interface. (#6132)

* Add base_margin, label_(lower|upper)_bound.
* Test survival training with dask.
This commit is contained in:
Jiaming Yuan
2020-09-21 16:56:37 +08:00
committed by GitHub
parent 7065779afa
commit 33d80ffad0
4 changed files with 154 additions and 48 deletions

View File

@@ -320,7 +320,8 @@ class DataIter:
def data_handle(data, label=None, weight=None, base_margin=None,
group=None,
label_lower_bound=None, label_upper_bound=None,
feature_names=None, feature_types=None):
feature_names=None, feature_types=None,
feature_weights=None):
from .data import dispatch_device_quantile_dmatrix_set_data
from .data import _device_quantile_transform
data, feature_names, feature_types = _device_quantile_transform(
@@ -333,7 +334,8 @@ class DataIter:
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound,
feature_names=feature_names,
feature_types=feature_types)
feature_types=feature_types,
feature_weights=feature_weights)
try:
# Differ the exception in order to return 0 and stop the iteration.
# Exception inside a ctype callback function has no effect except

View File

@@ -178,6 +178,12 @@ class DaskDMatrix:
to be present as a missing value. If None, defaults to np.nan.
weight : dask.array.Array/dask.dataframe.DataFrame
Weight for each instance.
base_margin : dask.array.Array/dask.dataframe.DataFrame
Global bias for each instance.
label_lower_bound : dask.array.Array/dask.dataframe.DataFrame
Upper bound for survival training.
label_upper_bound : dask.array.Array/dask.dataframe.DataFrame
Lower bound for survival training.
feature_names : list, optional
Set names for features.
feature_types : list, optional
@@ -191,6 +197,9 @@ class DaskDMatrix:
label=None,
missing=None,
weight=None,
base_margin=None,
label_lower_bound=None,
label_upper_bound=None,
feature_names=None,
feature_types=None):
_assert_dask_support()
@@ -216,12 +225,17 @@ class DaskDMatrix:
self.is_quantile = False
self._init = client.sync(self.map_local_data,
client, data, label, weight)
client, data, label=label, weights=weight,
base_margin=base_margin,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound)
def __await__(self):
return self._init.__await__()
async def map_local_data(self, client, data, label=None, weights=None):
async def map_local_data(self, client, data, label=None, weights=None,
base_margin=None,
label_lower_bound=None, label_upper_bound=None):
'''Obtain references to local data.'''
def inconsistent(left, left_name, right, right_name):
@@ -241,10 +255,10 @@ class DaskDMatrix:
' chunks=(partition_size, X.shape[1])'
data = data.persist()
if label is not None:
label = label.persist()
if weights is not None:
weights = weights.persist()
for meta in [label, weights, base_margin, label_lower_bound,
label_upper_bound]:
if meta is not None:
meta = meta.persist()
# Breaking data into partitions, a trick borrowed from dask_xgboost.
# `to_delayed` downgrades high-level objects into numpy or pandas
@@ -254,29 +268,37 @@ class DaskDMatrix:
check_columns(X_parts)
X_parts = X_parts.flatten().tolist()
if label is not None:
y_parts = label.to_delayed()
if isinstance(y_parts, numpy.ndarray):
check_columns(y_parts)
y_parts = y_parts.flatten().tolist()
if weights is not None:
w_parts = weights.to_delayed()
if isinstance(w_parts, numpy.ndarray):
check_columns(w_parts)
w_parts = w_parts.flatten().tolist()
def flatten_meta(meta):
if meta is not None:
meta_parts = meta.to_delayed()
if isinstance(meta_parts, numpy.ndarray):
check_columns(meta_parts)
meta_parts = meta_parts.flatten().tolist()
return meta_parts
return None
y_parts = flatten_meta(label)
w_parts = flatten_meta(weights)
margin_parts = flatten_meta(base_margin)
ll_parts = flatten_meta(label_lower_bound)
lu_parts = flatten_meta(label_upper_bound)
parts = [X_parts]
meta_names = []
if label is not None:
assert len(X_parts) == len(
y_parts), inconsistent(X_parts, 'X', y_parts, 'labels')
parts.append(y_parts)
meta_names.append('labels')
if weights is not None:
assert len(X_parts) == len(
w_parts), inconsistent(X_parts, 'X', w_parts, 'weights')
parts.append(w_parts)
meta_names.append('weights')
def append_meta(m_parts, name: str):
if m_parts is not None:
assert len(X_parts) == len(
m_parts), inconsistent(X_parts, 'X', m_parts, name)
parts.append(m_parts)
meta_names.append(name)
append_meta(y_parts, 'labels')
append_meta(w_parts, 'weights')
append_meta(margin_parts, 'base_margin')
append_meta(ll_parts, 'label_lower_bound')
append_meta(lu_parts, 'label_upper_bound')
parts = list(map(delayed, zip(*parts)))
parts = client.compute(parts)
@@ -339,6 +361,9 @@ def _get_worker_parts(worker_map, meta_names, worker):
data = None
labels = None
weights = None
base_margin = None
label_lower_bound = None
label_upper_bound = None
local_data = list(zip(*list_of_parts))
data = local_data[0]
@@ -348,8 +373,15 @@ def _get_worker_parts(worker_map, meta_names, worker):
labels = part
if meta_names[i] == 'weights':
weights = part
if meta_names[i] == 'base_margin':
base_margin = part
if meta_names[i] == 'label_lower_bound':
label_lower_bound = part
if meta_names[i] == 'label_upper_bound':
label_upper_bound = part
return data, labels, weights
return (data, labels, weights, base_margin, label_lower_bound,
label_upper_bound)
class DaskPartitionIter(DataIter): # pylint: disable=R0902
@@ -456,13 +488,22 @@ class DaskDeviceQuantileDMatrix(DaskDMatrix):
'''
def __init__(self, client, data, label=None, weight=None,
def __init__(self, client,
data,
label=None,
missing=None,
weight=None,
base_margin=None,
label_lower_bound=None,
label_upper_bound=None,
feature_names=None,
feature_types=None,
max_bin=256):
super().__init__(client=client, data=data, label=label, weight=weight,
super().__init__(client=client, data=data, label=label,
missing=missing,
weight=weight, base_margin=base_margin,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound,
feature_names=feature_names,
feature_types=feature_types)
self.max_bin = max_bin
@@ -491,8 +532,13 @@ def _create_device_quantile_dmatrix(feature_names, feature_types,
max_bin=max_bin)
return d
data, labels, weights = _get_worker_parts(worker_map, meta_names, worker)
it = DaskPartitionIter(data=data, label=labels, weight=weights)
(data, labels, weights, base_margin,
label_lower_bound, label_upper_bound) = _get_worker_parts(
worker_map, meta_names, worker)
it = DaskPartitionIter(data=data, label=labels, weight=weights,
base_margin=base_margin,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound)
dmatrix = DeviceQuantileDMatrix(it,
missing=missing,
@@ -524,20 +570,31 @@ def _create_dmatrix(feature_names, feature_types, meta_names, missing,
feature_types=feature_types)
return d
data, labels, weights = _get_worker_parts(worker_map, meta_names, worker)
data = concat(data)
def concat_or_none(data):
if data is not None:
return concat(data)
return data
if labels:
labels = concat(labels)
if weights:
weights = concat(weights)
(data, labels, weights, base_margin,
label_lower_bound, label_upper_bound) = _get_worker_parts(
worker_map, meta_names, worker)
labels = concat_or_none(labels)
weights = concat_or_none(weights)
base_margin = concat_or_none(base_margin)
label_lower_bound = concat_or_none(label_lower_bound)
label_upper_bound = concat_or_none(label_upper_bound)
data = concat(data)
dmatrix = DMatrix(data,
labels,
weight=weights,
missing=missing,
feature_names=feature_names,
feature_types=feature_types,
nthread=worker.nthreads)
dmatrix.set_info(base_margin=base_margin, weight=weights,
label_lower_bound=label_lower_bound,
label_upper_bound=label_upper_bound)
return dmatrix
@@ -683,7 +740,8 @@ async def _direct_predict_impl(client, data, predict_fn):
# pylint: disable=too-many-statements
async def _predict_async(client: Client, model, data, missing=numpy.nan, **kwargs):
async def _predict_async(client: Client, model, data, missing=numpy.nan,
**kwargs):
if isinstance(model, Booster):
booster = model