diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4a731bda3..131127e7e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -171,10 +171,15 @@ jobs: architecture: 'x64' - name: Install Python packages run: | - python -m pip install wheel setuptools mypy + python -m pip install wheel setuptools mypy dask[complete] distributed - name: Run mypy run: | - cd python-package && mypy . || true + cd python-package + # dask is required to pass, others are not + mypy ./xgboost/dask.py ../tests/python/test_with_dask.py --follow-imports=silent + mypy ../tests/python-gpu/test_gpu_with_dask.py --follow-imports=silent + # If any of the above failed, contributor won't see the next error. + mypy . || true doxygen: runs-on: ubuntu-latest diff --git a/doc/conf.py b/doc/conf.py index a8259022a..45237c853 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -94,6 +94,8 @@ extensions = [ 'recommonmark' ] +autodoc_typehints = "description" + graphviz_output_format = 'png' plot_formats = [('svg', 300), ('png', 100), ('hires.png', 300)] plot_html_show_source_link = False diff --git a/python-package/xgboost/core.py b/python-package/xgboost/core.py index 6288d8a29..b1469d936 100644 --- a/python-package/xgboost/core.py +++ b/python-package/xgboost/core.py @@ -7,6 +7,7 @@ import collections from collections.abc import Mapping from typing import List, Optional, Any, Union, Dict # pylint: enable=no-name-in-module,import-error +from typing import Callable, Tuple import ctypes import os import re @@ -991,6 +992,10 @@ class DeviceQuantileDMatrix(DMatrix): ) +Objective = Callable[[np.ndarray, DMatrix], Tuple[np.ndarray, np.ndarray]] +Metric = Callable[[np.ndarray, DMatrix], Tuple[np.ndarray, np.ndarray]] + + class Booster(object): # pylint: disable=too-many-public-methods """A Booster of XGBoost. diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 585d7fea3..b80454797 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -1,6 +1,7 @@ -# pylint: disable=too-many-arguments, too-many-locals +# pylint: disable=too-many-arguments, too-many-locals, no-name-in-module # pylint: disable=missing-class-docstring, invalid-name # pylint: disable=too-many-lines +# pylint: disable=import-error """Dask extensions for distributed training. See https://xgboost.readthedocs.io/en/latest/tutorials/dask.html for simple tutorial. Also xgboost/demo/dask for some examples. @@ -18,18 +19,22 @@ import logging from collections import defaultdict from collections.abc import Sequence from threading import Thread -from typing import List +from typing import TYPE_CHECKING, List, Tuple, Callable, Optional, Any, Union, Dict, Set +from typing import Awaitable, Generator, TypeVar import numpy from . import rabit, config +from .callback import TrainingCallback + from .compat import LazyLoader from .compat import sparse, scipy_sparse from .compat import PANDAS_INSTALLED, DataFrame, Series, pandas_concat from .compat import lazy_isinstance from .core import DMatrix, DeviceQuantileDMatrix, Booster, _expect, DataIter +from .core import Objective, Metric from .core import _deprecate_positional_args from .training import train as worker_train from .tracker import RabitTracker, get_host_ip @@ -37,34 +42,57 @@ from .sklearn import XGBModel, XGBRegressorBase, XGBClassifierBase, _objective_d from .sklearn import xgboost_model_doc -dd = LazyLoader('dd', globals(), 'dask.dataframe') -da = LazyLoader('da', globals(), 'dask.array') -dask = LazyLoader('dask', globals(), 'dask') -distributed = LazyLoader('distributed', globals(), 'dask.distributed') +if TYPE_CHECKING: + from dask import dataframe as dd + from dask import array as da + import dask + import distributed +else: + dd = LazyLoader('dd', globals(), 'dask.dataframe') + da = LazyLoader('da', globals(), 'dask.array') + dask = LazyLoader('dask', globals(), 'dask') + distributed = LazyLoader('distributed', globals(), 'dask.distributed') -# Current status is considered as initial support, many features are -# not properly supported yet. +_DaskCollection = Union["da.Array", "dd.DataFrame", "dd.Series"] + +try: + from mypy_extensions import TypedDict + TrainReturnT = TypedDict('TrainReturnT', { + 'booster': Booster, + 'history': Dict, + }) +except ImportError: + TrainReturnT = Dict[str, Any] # type:ignore + +# Current status is considered as initial support, many features are not properly +# supported yet. # # TODOs: # - CV # - Ranking # # Note for developers: - -# As of writing asyncio is still a new feature of Python and in depth -# documentation is rare. Best examples of various asyncio tricks are in dask -# (luckily). Classes like Client, Worker are awaitable. Some general rules -# for the implementation here: -# - Synchronous world is different from asynchronous one, and they don't -# mix well. -# - Write everything with async, then use distributed Client sync function -# to do the switch. +# +# As of writing asyncio is still a new feature of Python and in depth documentation is +# rare. Best examples of various asyncio tricks are in dask (luckily). Classes like +# Client, Worker are awaitable. Some general rules for the implementation here: +# +# - Synchronous world is different from asynchronous one, and they don't mix well. +# - Write everything with async, then use distributed Client sync function to do the +# switch. +# - Use Any for type hint when the return value can be union of Awaitable and plain +# value. This is caused by Client.sync can return both types depending on context. +# Right now there's no good way to silent: +# +# await train(...) +# +# if train returns an Union type. LOGGER = logging.getLogger('[xgboost.dask]') -def _start_tracker(n_workers): +def _start_tracker(n_workers: int) -> Dict[str, Any]: """Start Rabit tracker """ env = {'DMLC_NUM_WORKER': n_workers} host = get_host_ip('auto') @@ -78,7 +106,7 @@ def _start_tracker(n_workers): return env -def _assert_dask_support(): +def _assert_dask_support() -> None: try: import dask # pylint: disable=W0621,W0611 except ImportError as e: @@ -93,22 +121,22 @@ def _assert_dask_support(): class RabitContext: '''A context controling rabit initialization and finalization.''' - def __init__(self, args): + def __init__(self, args: List[bytes]) -> None: self.args = args worker = distributed.get_worker() self.args.append( ('DMLC_TASK_ID=[xgboost.dask]:' + str(worker.address)).encode()) - def __enter__(self): + def __enter__(self) -> None: rabit.init(self.args) LOGGER.debug('-------------- rabit say hello ------------------') - def __exit__(self, *args): + def __exit__(self, *args: List) -> None: rabit.finalize() LOGGER.debug('--------------- rabit say bye ------------------') -def concat(value): # pylint: disable=too-many-return-statements +def concat(value: Any) -> Any: # pylint: disable=too-many-return-statements '''To be replaced with dask builtin.''' if isinstance(value[0], numpy.ndarray): return numpy.concatenate(value, axis=0) @@ -123,7 +151,7 @@ def concat(value): # pylint: disable=too-many-return-statements from cudf import concat as CUDF_concat # pylint: disable=import-error return CUDF_concat(value, axis=0) if lazy_isinstance(value[0], 'cupy.core.core', 'ndarray'): - import cupy # pylint: disable=import-error + import cupy # pylint: disable=c-extension-no-member,no-member d = cupy.cuda.runtime.getDevice() for v in value: @@ -133,7 +161,7 @@ def concat(value): # pylint: disable=too-many-return-statements return dd.multi.concat(list(value), axis=0) -def _xgb_get_client(client): +def _xgb_get_client(client: Optional["distributed.Client"]) -> "distributed.Client": '''Simple wrapper around testing None.''' if not isinstance(client, (type(distributed.get_client()), type(None))): raise TypeError( @@ -164,47 +192,49 @@ class DaskDMatrix: Parameters ---------- - client: dask.distributed.Client - Specify the dask client used for training. Use default client - returned from dask if it's set to None. - data : dask.array.Array/dask.dataframe.DataFrame + client : + Specify the dask client used for training. Use default client returned from dask + if it's set to None. + data : data source of DMatrix. - label: dask.array.Array/dask.dataframe.DataFrame + label : label used for trainin. - missing : float, optional - Value in the input data (e.g. `numpy.ndarray`) which needs - to be present as a missing value. If None, defaults to np.nan. - weight : dask.array.Array/dask.dataframe.DataFrame + missing : + Value in the input data (e.g. `numpy.ndarray`) which needs to be present as a + missing value. If None, defaults to np.nan. + weight : Weight for each instance. - base_margin : dask.array.Array/dask.dataframe.DataFrame + base_margin : Global bias for each instance. - label_lower_bound : dask.array.Array/dask.dataframe.DataFrame + label_lower_bound : Upper bound for survival training. - label_upper_bound : dask.array.Array/dask.dataframe.DataFrame + label_upper_bound : Lower bound for survival training. - feature_weights : dask.array.Array/dask.dataframe.DataFrame + feature_weights : Weight for features used in column sampling. - feature_names : list, optional + feature_names : Set names for features. - feature_types : list, optional + feature_types : Set types for features ''' - def __init__(self, - client, - data, - label=None, - missing=None, - weight=None, - base_margin=None, - label_lower_bound=None, - label_upper_bound=None, - feature_weights=None, - feature_names=None, - feature_types=None): + def __init__( + self, + client: "distributed.Client", + data: _DaskCollection, + label: Optional[_DaskCollection] = None, + missing: float = None, + weight: Optional[_DaskCollection] = None, + base_margin: Optional[_DaskCollection] = None, + label_lower_bound: Optional[_DaskCollection] = None, + label_upper_bound: Optional[_DaskCollection] = None, + feature_weights: Optional[_DaskCollection] = None, + feature_names: Optional[Union[str, List[str]]] = None, + feature_types: Optional[Union[Any, List[Any]]] = None + ) -> None: _assert_dask_support() - client: distributed.Client = _xgb_get_client(client) + client = _xgb_get_client(client) self.feature_names = feature_names self.feature_types = feature_types @@ -222,8 +252,8 @@ class DaskDMatrix: raise TypeError( _expect((dd.DataFrame, da.Array, dd.Series), type(label))) - self.worker_map = None - self.is_quantile = False + self.worker_map: Dict[str, "distributed.Future"] = defaultdict(list) + self.is_quantile: bool = False self._init = client.sync(self.map_local_data, client, data, label=label, weights=weight, @@ -232,15 +262,25 @@ class DaskDMatrix: label_lower_bound=label_lower_bound, label_upper_bound=label_upper_bound) - def __await__(self): + def __await__(self) -> Generator: return self._init.__await__() - async def map_local_data(self, client, data, label=None, weights=None, - base_margin=None, feature_weights=None, - label_lower_bound=None, label_upper_bound=None): + async def map_local_data( + self, + client: "distributed.Client", + data: _DaskCollection, + label: Optional[_DaskCollection] = None, + weights: Optional[_DaskCollection] = None, + base_margin: Optional[_DaskCollection] = None, + feature_weights: Optional[_DaskCollection] = None, + label_lower_bound: Optional[_DaskCollection] = None, + label_upper_bound: Optional[_DaskCollection] = None + ) -> "DaskDMatrix": '''Obtain references to local data.''' - def inconsistent(left, left_name, right, right_name): + def inconsistent( + left: List[Any], left_name: str, right: List[Any], right_name: str + ) -> str: msg = 'Partitions between {a_name} and {b_name} are not ' \ 'consistent: {a_len} != {b_len}. ' \ 'Please try to repartition/rechunk your data.'.format( @@ -249,7 +289,7 @@ class DaskDMatrix: ) return msg - def check_columns(parts): + def check_columns(parts: Any) -> None: # x is required to be 2 dim in __init__ assert parts.ndim == 1 or parts.shape[1], 'Data should be' \ ' partitioned by row. To avoid this specify the number' \ @@ -270,7 +310,9 @@ class DaskDMatrix: check_columns(X_parts) X_parts = X_parts.flatten().tolist() - def flatten_meta(meta): + def flatten_meta( + meta: Optional[_DaskCollection] + ) -> "Optional[List[dask.delayed.Delayed]]": if meta is not None: meta_parts = meta.to_delayed() if isinstance(meta_parts, numpy.ndarray): @@ -288,7 +330,9 @@ class DaskDMatrix: parts = [X_parts] meta_names = [] - def append_meta(m_parts, name: str): + def append_meta( + m_parts: Optional[List["dask.delayed.delayed"]], name: str + ) -> None: if m_parts is not None: assert len(X_parts) == len( m_parts), inconsistent(X_parts, 'X', m_parts, name) @@ -304,7 +348,7 @@ class DaskDMatrix: # [(x0, x1, ..), (y0, y1, ..), ..] in delayed form # delay the zipped result - parts = list(map(dask.delayed, zip(*parts))) + parts = list(map(dask.delayed, zip(*parts))) # pylint: disable=no-member # At this point, the mental model should look like: # [(x0, y0, ..), (x1, y1, ..), ..] in delayed form @@ -322,7 +366,7 @@ class DaskDMatrix: key_to_partition = {part.key: part for part in parts} who_has = await client.scheduler.who_has(keys=[part.key for part in parts]) - worker_map = defaultdict(list) + worker_map: Dict[str, "distributed.Future"] = defaultdict(list) for key, workers in who_has.items(): worker_map[next(iter(workers))].append(key_to_partition[key]) @@ -337,7 +381,7 @@ class DaskDMatrix: return self - def create_fn_args(self, worker_addr: str): + def create_fn_args(self, worker_addr: str) -> Dict[str, Any]: '''Create a dictionary of objects that can be pickled for function arguments. @@ -351,7 +395,13 @@ class DaskDMatrix: 'is_quantile': self.is_quantile} -def _get_worker_parts_ordered(meta_names, list_of_parts): +_DataParts = List[Tuple[Any, Optional[Any], Optional[Any], Optional[Any], Optional[Any], + Optional[Any]]] + + +def _get_worker_parts_ordered( + meta_names: List[str], list_of_parts: _DataParts +) -> _DataParts: # List of partitions like: [(x3, y3, w3, m3, ..), ..], order is not preserved. assert isinstance(list_of_parts, list) @@ -384,22 +434,32 @@ def _get_worker_parts_ordered(meta_names, list_of_parts): return result -def _unzip(list_of_parts): +def _unzip(list_of_parts: _DataParts) -> List[Tuple[Any, ...]]: return list(zip(*list_of_parts)) -def _get_worker_parts(list_of_parts: List[tuple], meta_names): +def _get_worker_parts( + list_of_parts: _DataParts, meta_names: List[str] +) -> List[Tuple[Any, ...]]: partitions = _get_worker_parts_ordered(meta_names, list_of_parts) - partitions = _unzip(partitions) - return partitions + partitions_unzipped = _unzip(partitions) + return partitions_unzipped class DaskPartitionIter(DataIter): # pylint: disable=R0902 - '''A data iterator for `DaskDeviceQuantileDMatrix`. - ''' - def __init__(self, data, label=None, weight=None, base_margin=None, - label_lower_bound=None, label_upper_bound=None, - feature_names=None, feature_types=None): + """A data iterator for `DaskDeviceQuantileDMatrix`.""" + + def __init__( + self, + data: Tuple[Any, ...], + label: Optional[Tuple[Any, ...]] = None, + weight: Optional[Tuple[Any, ...]] = None, + base_margin: Optional[Tuple[Any, ...]] = None, + label_lower_bound: Optional[Tuple[Any, ...]] = None, + label_upper_bound: Optional[Tuple[Any, ...]] = None, + feature_names: Optional[Union[str, List[str]]] = None, + feature_types: Optional[Union[Any, List[Any]]] = None + ) -> None: self._data = data self._labels = label self._weights = weight @@ -421,51 +481,52 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902 self._iter = 0 # set iterator to 0 super().__init__() - def data(self): + def data(self) -> Any: '''Utility function for obtaining current batch of data.''' return self._data[self._iter] - def labels(self): + def labels(self) -> Any: '''Utility function for obtaining current batch of label.''' if self._labels is not None: return self._labels[self._iter] return None - def weights(self): + def weights(self) -> Any: '''Utility function for obtaining current batch of label.''' if self._weights is not None: return self._weights[self._iter] return None - def base_margins(self): + def base_margins(self) -> Any: '''Utility function for obtaining current batch of base_margin.''' if self._base_margin is not None: return self._base_margin[self._iter] return None - def label_lower_bounds(self): + def label_lower_bounds(self) -> Any: '''Utility function for obtaining current batch of label_lower_bound. ''' if self._label_lower_bound is not None: return self._label_lower_bound[self._iter] return None - def label_upper_bounds(self): + def label_upper_bounds(self) -> Any: '''Utility function for obtaining current batch of label_upper_bound. ''' if self._label_upper_bound is not None: return self._label_upper_bound[self._iter] return None - def reset(self): + def reset(self) -> None: '''Reset the iterator''' self._iter = 0 - def next(self, input_data): + def next(self, input_data: Callable) -> int: '''Yield next batch of data''' if self._iter == len(self._data): # Return 0 when there's no more batch. return 0 + feature_names: Optional[Union[List[str], str]] = None if self._feature_names: feature_names = self._feature_names else: @@ -484,31 +545,34 @@ class DaskPartitionIter(DataIter): # pylint: disable=R0902 class DaskDeviceQuantileDMatrix(DaskDMatrix): - '''Specialized data type for `gpu_hist` tree method. This class is - used to reduce the memory usage by eliminating data copies. - Internally the data is merged by weighted GK sketching. So the - number of partitions from dask may affect training accuracy as GK - generates error for each merge. + '''Specialized data type for `gpu_hist` tree method. This class is used to + reduce the memory usage by eliminating data copies. Internally the all + partitions/chunks of data are merged by weighted GK sketching. So the + number of partitions from dask may affect training accuracy as GK generates + bounded error for each merge. .. versionadded:: 1.2.0 Parameters ---------- - max_bin: Number of bins for histogram construction. - + max_bin : Number of bins for histogram construction. ''' - def __init__(self, client, - data, - label=None, - missing=None, - weight=None, - base_margin=None, - label_lower_bound=None, - label_upper_bound=None, - feature_names=None, - feature_types=None, - max_bin=256): + def __init__( + self, + client: "distributed.Client", + data: _DaskCollection, + label: Optional[_DaskCollection] = None, + missing: float = None, + weight: Optional[_DaskCollection] = None, + base_margin: Optional[_DaskCollection] = None, + label_lower_bound: Optional[_DaskCollection] = None, + label_upper_bound: Optional[_DaskCollection] = None, + feature_weights: Optional[_DaskCollection] = None, + feature_names: Optional[Union[str, List[str]]] = None, + feature_types: Optional[Union[Any, List[Any]]] = None, + max_bin: int = 256 + ) -> None: super().__init__(client=client, data=data, label=label, missing=missing, weight=weight, base_margin=base_margin, @@ -519,22 +583,27 @@ class DaskDeviceQuantileDMatrix(DaskDMatrix): self.max_bin = max_bin self.is_quantile = True - def create_fn_args(self, worker_addr: str): + def create_fn_args(self, worker_addr: str) -> Dict[str, Any]: args = super().create_fn_args(worker_addr) args['max_bin'] = self.max_bin return args -def _create_device_quantile_dmatrix(feature_names, feature_types, - feature_weights, - meta_names, missing, parts, - max_bin): +def _create_device_quantile_dmatrix( + feature_names: Optional[Union[str, List[str]]], + feature_types: Optional[Union[Any, List[Any]]], + feature_weights: Optional[Any], + meta_names: List[str], + missing: float, + parts: Optional[_DataParts], + max_bin: int +) -> DeviceQuantileDMatrix: worker = distributed.get_worker() if parts is None: msg = 'worker {address} has an empty DMatrix. '.format( address=worker.address) LOGGER.warning(msg) - import cupy # pylint: disable=import-error + import cupy d = DeviceQuantileDMatrix(cupy.zeros((0, 0)), feature_names=feature_names, feature_types=feature_types, @@ -559,8 +628,14 @@ def _create_device_quantile_dmatrix(feature_names, feature_types, return dmatrix -def _create_dmatrix(feature_names, feature_types, feature_weights, meta_names, missing, - parts): +def _create_dmatrix( + feature_names: Optional[Union[str, List[str]]], + feature_types: Optional[Union[Any, List[Any]]], + feature_weights: Optional[Any], + meta_names: List[str], + missing: float, + parts: Optional[_DataParts] +) -> DMatrix: '''Get data that local to worker from DaskDMatrix. Returns @@ -578,7 +653,9 @@ def _create_dmatrix(feature_names, feature_types, feature_weights, meta_names, m feature_types=feature_types) return d - def concat_or_none(data): + T = TypeVar('T') + + def concat_or_none(data: Tuple[Optional[T], ...]) -> Optional[T]: if any([part is None for part in data]): return None return concat(data) @@ -586,33 +663,35 @@ def _create_dmatrix(feature_names, feature_types, feature_weights, meta_names, m (data, labels, weights, base_margin, label_lower_bound, label_upper_bound) = _get_worker_parts(list_of_parts, meta_names) - labels = concat_or_none(labels) - weights = concat_or_none(weights) - base_margin = concat_or_none(base_margin) - label_lower_bound = concat_or_none(label_lower_bound) - label_upper_bound = concat_or_none(label_upper_bound) + _labels = concat_or_none(labels) + _weights = concat_or_none(weights) + _base_margin = concat_or_none(base_margin) + _label_lower_bound = concat_or_none(label_lower_bound) + _label_upper_bound = concat_or_none(label_upper_bound) - data = concat(data) - dmatrix = DMatrix(data, - labels, + _data = concat(data) + dmatrix = DMatrix(_data, + _labels, missing=missing, feature_names=feature_names, feature_types=feature_types, nthread=worker.nthreads) - dmatrix.set_info(base_margin=base_margin, weight=weights, - label_lower_bound=label_lower_bound, - label_upper_bound=label_upper_bound, + dmatrix.set_info(base_margin=_base_margin, weight=_weights, + label_lower_bound=_label_lower_bound, + label_upper_bound=_label_upper_bound, feature_weights=feature_weights) return dmatrix -def _dmatrix_from_list_of_parts(is_quantile, **kwargs): +def _dmatrix_from_list_of_parts( + is_quantile: bool, **kwargs: Any +) -> Union[DMatrix, DeviceQuantileDMatrix]: if is_quantile: return _create_device_quantile_dmatrix(**kwargs) return _create_dmatrix(**kwargs) -async def _get_rabit_args(n_workers: int, client): +async def _get_rabit_args(n_workers: int, client: "distributed.Client") -> List[bytes]: '''Get rabit context arguments from data distribution in DaskDMatrix.''' env = await client.run_on_scheduler(_start_tracker, n_workers) rabit_args = [('%s=%s' % item).encode() for item in env.items()] @@ -625,8 +704,11 @@ async def _get_rabit_args(n_workers: int, client): # evaluation history is instead returned. -def _get_workers_from_data(dtrain: DaskDMatrix, evals=()): - X_worker_map = set(dtrain.worker_map.keys()) +def _get_workers_from_data( + dtrain: DaskDMatrix, + evals: Optional[List[Tuple[DaskDMatrix, str]]] +) -> Set[str]: + X_worker_map: Set[str] = set(dtrain.worker_map.keys()) if evals: for e in evals: assert len(e) == 2 @@ -636,22 +718,30 @@ def _get_workers_from_data(dtrain: DaskDMatrix, evals=()): return X_worker_map -async def _train_async(client, - global_config, - params, - dtrain, - num_boost_round, - evals, - obj, - feval, - early_stopping_rounds, - verbose_eval, - xgb_model, - callbacks): +async def _train_async( + client: "distributed.Client", + global_config: Dict[str, Any], + params: Dict[str, Any], + dtrain: DaskDMatrix, + num_boost_round: int, + evals: Optional[List[Tuple[DaskDMatrix, str]]], + obj: Optional[Objective], + feval: Optional[Metric], + early_stopping_rounds: Optional[int], + verbose_eval: Union[int, bool], + xgb_model: Optional[Booster], + callbacks: Optional[List[TrainingCallback]] +) -> Optional[TrainReturnT]: workers = list(_get_workers_from_data(dtrain, evals)) _rabit_args = await _get_rabit_args(len(workers), client) - def dispatched_train(worker_addr, rabit_args, dtrain_ref, dtrain_idt, evals_ref): + def dispatched_train( + worker_addr: str, + rabit_args: List[bytes], + dtrain_ref: Dict, + dtrain_idt: int, + evals_ref: Dict + ) -> Optional[Dict[str, Union[Booster, Dict]]]: '''Perform training on a single worker. A local function prevents pickling. ''' @@ -667,7 +757,7 @@ async def _train_async(client, continue local_evals.append((_dmatrix_from_list_of_parts(**ref), name)) - local_history = {} + local_history: Dict = {} local_param = params.copy() # just to be consistent msg = 'Overriding `nthreads` defined in dask worker.' override = ['nthread', 'n_jobs'] @@ -688,7 +778,8 @@ async def _train_async(client, verbose_eval=verbose_eval, xgb_model=xgb_model, callbacks=callbacks) - ret = {'booster': bst, 'history': local_history} + ret: Optional[Dict[str, Union[Booster, Dict]]] = { + 'booster': bst, 'history': local_history} if local_dtrain.num_row() == 0: ret = None return ret @@ -718,43 +809,45 @@ async def _train_async(client, return list(filter(lambda ret: ret is not None, results))[0] -def train(client, - params, - dtrain, - num_boost_round=10, - evals=(), - obj=None, - feval=None, - early_stopping_rounds=None, - xgb_model=None, - verbose_eval=True, - callbacks=None): +def train( + client: "distributed.Client", + params: Dict[str, Any], + dtrain: DaskDMatrix, + num_boost_round: int = 10, + evals: Optional[List[Tuple[DaskDMatrix, str]]] = None, + obj: Optional[Objective] = None, + feval: Optional[Metric] = None, + early_stopping_rounds: Optional[int] = None, + xgb_model: Optional[Booster] = None, + verbose_eval: Union[int, bool] = True, + callbacks: Optional[List[TrainingCallback]] = None +) -> Any: '''Train XGBoost model. .. versionadded:: 1.0.0 + .. note:: + + Other parameters are the same as `xgboost.train` except for `evals_result`, which + is returned as part of function return value instead of argument. + Parameters ---------- - client: dask.distributed.Client - Specify the dask client used for training. Use default client - returned from dask if it's set to None. - \\*\\*kwargs: - Other parameters are the same as `xgboost.train` except for - `evals_result`, which is returned as part of function return value - instead of argument. + client : + Specify the dask client used for training. Use default client returned from dask + if it's set to None. Returns ------- results: dict - A dictionary containing trained booster and evaluation history. - `history` field is the same as `eval_result` from `xgboost.train`. + A dictionary containing trained booster and evaluation history. `history` field + is the same as `eval_result` from `xgboost.train`. .. code-block:: python {'booster': xgboost.Booster, 'history': {'train': {'logloss': ['0.48253', '0.35953']}, 'eval': {'logloss': ['0.480385', '0.357756']}}} - ''' _assert_dask_support() client = _xgb_get_client(client) @@ -776,7 +869,11 @@ def train(client, callbacks=callbacks) -async def _direct_predict_impl(client, data, predict_fn): +async def _direct_predict_impl( + client: "distributed.Client", + data: _DaskCollection, + predict_fn: Callable +) -> _DaskCollection: if isinstance(data, da.Array): predictions = await client.submit( da.map_blocks, @@ -796,8 +893,19 @@ async def _direct_predict_impl(client, data, predict_fn): # pylint: disable=too-many-statements -async def _predict_async(client, global_config, model, data, missing, validate_features, - **kwargs): +async def _predict_async( + client: "distributed.Client", + global_config: Dict[str, Any], + model: Union[Booster, Dict], + data: _DaskCollection, + output_margin: bool, + missing: float, + pred_leaf: bool, + pred_contribs: bool, + approx_contribs: bool, + pred_interactions: bool, + validate_features: bool +) -> _DaskCollection: if isinstance(model, Booster): booster = model elif isinstance(model, dict): @@ -808,15 +916,23 @@ async def _predict_async(client, global_config, model, data, missing, validate_f raise TypeError(_expect([DaskDMatrix, da.Array, dd.DataFrame], type(data))) - def mapped_predict(partition, is_df): + def mapped_predict(partition: Any, is_df: bool) -> Any: worker = distributed.get_worker() with config.config_context(**global_config): booster.set_param({'nthread': worker.nthreads}) m = DMatrix(partition, missing=missing, nthread=worker.nthreads) - predt = booster.predict(m, validate_features=validate_features, **kwargs) + predt = booster.predict( + data=m, + output_margin=output_margin, + pred_leaf=pred_leaf, + pred_contribs=pred_contribs, + approx_contribs=approx_contribs, + pred_interactions=pred_interactions, + validate_features=validate_features + ) if is_df: if lazy_isinstance(partition, 'cudf', 'core.dataframe.DataFrame'): - import cudf # pylint: disable=import-error + import cudf predt = cudf.DataFrame(predt, columns=['prediction']) else: predt = DataFrame(predt, columns=['prediction']) @@ -833,7 +949,9 @@ async def _predict_async(client, global_config, model, data, missing, validate_f missing = data.missing meta_names = data.meta_names - def dispatched_predict(worker_id, list_of_orders, list_of_parts): + def dispatched_predict( + worker_id: int, list_of_orders: List[int], list_of_parts: _DataParts + ) -> List[Tuple[Tuple["dask.delayed.Delayed", int], int]]: '''Perform prediction on each worker.''' LOGGER.info('Predicting on %d', worker_id) with config.config_context(**global_config): @@ -855,15 +973,22 @@ async def _predict_async(client, global_config, model, data, missing, validate_f ) predt = booster.predict( data=local_part, - validate_features=validate_features, - **kwargs) + output_margin=output_margin, + pred_leaf=pred_leaf, + pred_contribs=pred_contribs, + approx_contribs=approx_contribs, + pred_interactions=pred_interactions, + validate_features=validate_features + ) columns = 1 if len(predt.shape) == 1 else predt.shape[1] - ret = ((dask.delayed(predt), columns), order) + ret = ((dask.delayed(predt), columns), order) # pylint: disable=no-member predictions.append(ret) return predictions - def dispatched_get_shape(worker_id, list_of_orders, list_of_parts): + def dispatched_get_shape( + worker_id: int, list_of_orders: List[int], list_of_parts: _DataParts + ) -> List[Tuple[int, int]]: '''Get shape of data in each worker.''' LOGGER.info('Get shape on %d', worker_id) list_of_parts = _get_worker_parts_ordered(meta_names, list_of_parts) @@ -873,7 +998,9 @@ async def _predict_async(client, global_config, model, data, missing, validate_f shapes.append((data.shape, list_of_orders[i])) return shapes - async def map_function(func): + async def map_function( + func: Callable[[int, List[int], _DataParts], Any] + ) -> List[Any]: '''Run function for each part of the data.''' futures = [] workers_address = list(worker_map.keys()) @@ -912,7 +1039,18 @@ async def _predict_async(client, global_config, model, data, missing, validate_f return predictions -def predict(client, model, data, missing=numpy.nan, validate_features=True, **kwargs): +def predict( + client: "distributed.Client", + model: Union[TrainReturnT, Booster], + data: Union[DaskDMatrix, _DaskCollection], + output_margin: bool = False, + missing: float = numpy.nan, + pred_leaf: bool = False, + pred_contribs: bool = False, + approx_contribs: bool = False, + pred_interactions: bool = False, + validate_features: bool = True +) -> Any: '''Run prediction with a trained booster. .. note:: @@ -923,15 +1061,15 @@ def predict(client, model, data, missing=numpy.nan, validate_features=True, **kw Parameters ---------- - client: dask.distributed.Client + client: Specify the dask client used for training. Use default client returned from dask if it's set to None. - model: A Booster or a dictionary returned by `xgboost.dask.train`. + model: The trained model. - data: DaskDMatrix/dask.dataframe.DataFrame/dask.array.Array + data: Input data used for prediction. When input is a dataframe object, prediction output is a series. - missing: float + missing: Used when input data is not DaskDMatrix. Specify the value considered as missing. @@ -943,14 +1081,27 @@ def predict(client, model, data, missing=numpy.nan, validate_features=True, **kw _assert_dask_support() client = _xgb_get_client(client) global_config = config.get_config() - return client.sync(_predict_async, client, global_config, model, data, - missing=missing, validate_features=validate_features, **kwargs) + return client.sync( + _predict_async, client, global_config, model, data, + output_margin=output_margin, + missing=missing, + pred_leaf=pred_leaf, + pred_contribs=pred_contribs, + approx_contribs=approx_contribs, + pred_interactions=pred_interactions, + validate_features=validate_features + ) -async def _inplace_predict_async(client, global_config, model, data, - iteration_range=(0, 0), - predict_type='value', - missing=numpy.nan): +async def _inplace_predict_async( + client: "distributed.Client", + global_config: Dict[str, Any], + model: Union[Booster, Dict], + data: _DaskCollection, + iteration_range: Tuple[int, int] = (0, 0), + predict_type: str = 'value', + missing: float = numpy.nan +) -> _DaskCollection: client = _xgb_get_client(client) if isinstance(model, Booster): booster = model @@ -961,7 +1112,7 @@ async def _inplace_predict_async(client, global_config, model, data, if not isinstance(data, (da.Array, dd.DataFrame)): raise TypeError(_expect([da.Array, dd.DataFrame], type(data))) - def mapped_predict(data, is_df): + def mapped_predict(data: Any, is_df: bool) -> Any: worker = distributed.get_worker() config.set_config(**global_config) booster.set_param({'nthread': worker.nthreads}) @@ -972,7 +1123,7 @@ async def _inplace_predict_async(client, global_config, model, data, missing=missing) if is_df: if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'): - import cudf # pylint: disable=import-error + import cudf prediction = cudf.DataFrame({'prediction': prediction}, dtype=numpy.float32) else: @@ -984,32 +1135,37 @@ async def _inplace_predict_async(client, global_config, model, data, return await _direct_predict_impl(client, data, mapped_predict) -def inplace_predict(client, model, data, - iteration_range=(0, 0), - predict_type='value', - missing=numpy.nan): +def inplace_predict( + client: "distributed.Client", + model: Union[TrainReturnT, Booster], + data: _DaskCollection, + iteration_range: Tuple[int, int] = (0, 0), + predict_type: str = 'value', + missing: float = numpy.nan +) -> Any: '''Inplace prediction. .. versionadded:: 1.1.0 Parameters ---------- - client: dask.distributed.Client + client: Specify the dask client used for training. Use default client returned from dask if it's set to None. - model: Booster/dict + model: The trained model. - iteration_range: tuple + iteration_range: Specify the range of trees used for prediction. - predict_type: str + predict_type: * 'value': Normal prediction result. * 'margin': Output the raw untransformed margin value. - missing: float + missing: Value in the input data which needs to be present as a missing value. If None, defaults to np.nan. + Returns ------- - prediction: dask.array.Array + prediction ''' _assert_dask_support() client = _xgb_get_client(client) @@ -1021,7 +1177,12 @@ def inplace_predict(client, model, data, missing=missing) -async def _evaluation_matrices(client, validation_set, sample_weight, missing): +async def _evaluation_matrices( + client: "distributed.Client", + validation_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]], + sample_weight: Optional[List[_DaskCollection]], + missing: float +) -> Optional[List[Tuple[DaskDMatrix, str]]]: ''' Parameters ---------- @@ -1040,13 +1201,14 @@ async def _evaluation_matrices(client, validation_set, sample_weight, missing): ------- evals: list of validation DMatrix ''' - evals = [] + evals: Optional[List[Tuple[DaskDMatrix, str]]] = [] if validation_set is not None: assert isinstance(validation_set, list) for i, e in enumerate(validation_set): w = (sample_weight[i] if sample_weight is not None else None) dmat = await DaskDMatrix(client=client, data=e[0], label=e[1], weight=w, missing=missing) + assert isinstance(evals, list) evals.append((dmat, 'validation_{}'.format(i))) else: evals = None @@ -1060,16 +1222,21 @@ class DaskScikitLearnBase(XGBModel): # pylint: disable=arguments-differ @_deprecate_positional_args - def fit(self, X, y, *, - sample_weight=None, - base_margin=None, - eval_set=None, - eval_metric=None, - sample_weight_eval_set=None, - early_stopping_rounds=None, - verbose=True, - feature_weights=None, - callbacks=None): + def fit( + self, + X: _DaskCollection, + y: _DaskCollection, + *, + sample_weight: Optional[_DaskCollection] = None, + base_margin: Optional[_DaskCollection] = None, + eval_set: List[Tuple[_DaskCollection, _DaskCollection]] = None, + eval_metric: Optional[Callable] = None, + sample_weight_eval_set: Optional[List[_DaskCollection]] = None, + early_stopping_rounds: Optional[int] = None, + verbose: bool = True, + feature_weights: Optional[_DaskCollection] = None, + callbacks: List[TrainingCallback] = None + ) -> "DaskScikitLearnBase": '''Fit gradient boosting model Parameters @@ -1111,30 +1278,44 @@ class DaskScikitLearnBase(XGBModel): ''' raise NotImplementedError - def predict(self, data): # pylint: disable=arguments-differ + def predict( + self, + data: _DaskCollection, + output_margin: bool = False, + ntree_limit: Optional[int] = None, + validate_features: bool = True, + base_margin: Optional[_DaskCollection] = None + ) -> Any: '''Predict with `data`. Parameters ---------- - data: data that can be used to construct a DaskDMatrix + data: data that can be used to construct a DaskDMatrix + output_margin : Whether to output the raw untransformed margin value. + ntree_limit : NOT supported on dask interface. + validate_features : + When this is True, validate that the Booster's and data's feature_names are + identical. Otherwise, it is assumed that the feature_names are the same. Returns ------- - prediction : dask.array.Array''' + prediction: + + ''' raise NotImplementedError - def __await__(self): + def __await__(self) -> Awaitable[Any]: # Generate a coroutine wrapper to make this class awaitable. - async def _(): + async def _() -> Awaitable[Any]: return self return self.client.sync(_).__await__() @property - def client(self): + def client(self) -> "distributed.Client": '''The dask client used in this model.''' client = _xgb_get_client(self._client) return client @client.setter - def client(self, clt): + def client(self, clt: "distributed.Client") -> None: self._client = clt @@ -1142,10 +1323,19 @@ class DaskScikitLearnBase(XGBModel): ['estimators', 'model']) class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase): # pylint: disable=missing-class-docstring - async def _fit_async(self, X, y, sample_weight, base_margin, eval_set, - eval_metric, sample_weight_eval_set, - early_stopping_rounds, verbose, feature_weights, - callbacks): + async def _fit_async( + self, X: _DaskCollection, + y: _DaskCollection, + sample_weight: Optional[_DaskCollection], + base_margin: Optional[_DaskCollection], + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]], + eval_metric: Optional[Union[str, List[str], Callable]], + sample_weight_eval_set: Optional[List[_DaskCollection]], + early_stopping_rounds: int, + verbose: bool, + feature_weights: Optional[_DaskCollection], + callbacks: Optional[List[TrainingCallback]] + ) -> _DaskCollection: dtrain = await DaskDMatrix(client=self.client, data=X, label=y, @@ -1186,19 +1376,21 @@ class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase): # pylint: disable=missing-docstring @_deprecate_positional_args - def fit(self, - X, - y, - *, - sample_weight=None, - base_margin=None, - eval_set=None, - eval_metric=None, - sample_weight_eval_set=None, - early_stopping_rounds=None, - verbose=True, - feature_weights=None, - callbacks=None): + def fit( + self, + X: _DaskCollection, + y: _DaskCollection, + *, + sample_weight: Optional[_DaskCollection] = None, + base_margin: Optional[_DaskCollection] = None, + eval_set: List[Tuple[_DaskCollection, _DaskCollection]] = None, + eval_metric: Optional[Callable] = None, + sample_weight_eval_set: Optional[List[_DaskCollection]] = None, + early_stopping_rounds: Optional[int] = None, + verbose: bool = True, + feature_weights: Optional[_DaskCollection] = None, + callbacks: List[TrainingCallback] = None + ) -> "DaskXGBRegressor": _assert_dask_support() return self.client.sync(self._fit_async, X=X, @@ -1214,21 +1406,36 @@ class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase): callbacks=callbacks) async def _predict_async( - self, data, output_margin=False, base_margin=None): + self, data: _DaskCollection, + output_margin: bool = False, + validate_features: bool = True, + base_margin: Optional[_DaskCollection] = None + ) -> _DaskCollection: test_dmatrix = await DaskDMatrix( client=self.client, data=data, base_margin=base_margin, missing=self.missing ) pred_probs = await predict(client=self.client, model=self.get_booster(), data=test_dmatrix, - output_margin=output_margin) + output_margin=output_margin, + validate_features=validate_features) return pred_probs # pylint: disable=arguments-differ - def predict(self, data, output_margin=False, base_margin=None): + def predict( + self, + data: _DaskCollection, + output_margin: bool = False, + ntree_limit: Optional[int] = None, + validate_features: bool = True, + base_margin: Optional[_DaskCollection] = None + ) -> Any: _assert_dask_support() + msg = '`ntree_limit` is not supported on dask, use model slicing instead.' + assert ntree_limit is None, msg return self.client.sync(self._predict_async, data, output_margin=output_margin, + validate_features=validate_features, base_margin=base_margin) @@ -1237,10 +1444,18 @@ class DaskXGBRegressor(DaskScikitLearnBase, XGBRegressorBase): ['estimators', 'model']) class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase): # pylint: disable=missing-class-docstring - async def _fit_async(self, X, y, sample_weight, base_margin, eval_set, - eval_metric, sample_weight_eval_set, - early_stopping_rounds, verbose, feature_weights, - callbacks): + async def _fit_async( + self, X: _DaskCollection, y: _DaskCollection, + sample_weight: Optional[_DaskCollection], + base_margin: Optional[_DaskCollection], + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]], + eval_metric: Optional[Union[str, List[str], Callable]], + sample_weight_eval_set: Optional[List[_DaskCollection]], + early_stopping_rounds: int, + verbose: bool, + feature_weights: Optional[_DaskCollection], + callbacks: Optional[List[TrainingCallback]] + ) -> "DaskXGBClassifier": dtrain = await DaskDMatrix(client=self.client, data=X, label=y, @@ -1294,19 +1509,21 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase): return self @_deprecate_positional_args - def fit(self, - X, - y, - *, - sample_weight=None, - base_margin=None, - eval_set=None, - eval_metric=None, - sample_weight_eval_set=None, - early_stopping_rounds=None, - verbose=True, - feature_weights=None, - callbacks=None): + def fit( + self, + X: _DaskCollection, + y: _DaskCollection, + *, + sample_weight: Optional[_DaskCollection] = None, + base_margin: Optional[_DaskCollection] = None, + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]] = None, + eval_metric: Optional[Union[str, List[str], Callable]] = None, + sample_weight_eval_set: Optional[List[_DaskCollection]] = None, + early_stopping_rounds: int = None, + verbose: bool = True, + feature_weights: _DaskCollection = None, + callbacks: Optional[List[TrainingCallback]] = None + ) -> "DaskXGBClassifier": _assert_dask_support() return self.client.sync(self._fit_async, X=X, @@ -1321,8 +1538,13 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase): feature_weights=feature_weights, callbacks=callbacks) - async def _predict_proba_async(self, X, output_margin=False, - base_margin=None): + async def _predict_proba_async( + self, + X: _DaskCollection, + validate_features: bool, + output_margin: bool, + base_margin: Optional[_DaskCollection] + ) -> _DaskCollection: test_dmatrix = await DaskDMatrix( client=self.client, data=X, base_margin=base_margin, missing=self.missing @@ -1330,28 +1552,47 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase): pred_probs = await predict(client=self.client, model=self.get_booster(), data=test_dmatrix, + validate_features=validate_features, output_margin=output_margin) return pred_probs # pylint: disable=arguments-differ,missing-docstring - def predict_proba(self, X, output_margin=False, base_margin=None): + def predict_proba( + self, + X: _DaskCollection, + ntree_limit: Optional[int] = None, + validate_features: bool = True, + output_margin: bool = False, + base_margin: Optional[_DaskCollection] = None + ) -> Any: _assert_dask_support() + msg = '`ntree_limit` is not supported on dask, use model slicing instead.' + assert ntree_limit is None, msg return self.client.sync( self._predict_proba_async, X=X, + validate_features=validate_features, output_margin=output_margin, base_margin=base_margin ) - async def _predict_async(self, data, output_margin=False, base_margin=None): + async def _predict_async( + self, data: _DaskCollection, + output_margin: bool = False, + validate_features: bool = True, + base_margin: Optional[_DaskCollection] = None + ) -> _DaskCollection: test_dmatrix = await DaskDMatrix( client=self.client, data=data, base_margin=base_margin, missing=self.missing ) - pred_probs = await predict(client=self.client, - model=self.get_booster(), - data=test_dmatrix, - output_margin=output_margin) + pred_probs = await predict( + client=self.client, + model=self.get_booster(), + data=test_dmatrix, + output_margin=output_margin, + validate_features=validate_features + ) if self.n_classes_ == 2: preds = (pred_probs > 0.5).astype(int) @@ -1361,11 +1602,21 @@ class DaskXGBClassifier(DaskScikitLearnBase, XGBClassifierBase): return preds # pylint: disable=arguments-differ - def predict(self, data, output_margin=False, base_margin=None): + def predict( + self, + data: _DaskCollection, + output_margin: bool = False, + ntree_limit: Optional[int] = None, + validate_features: bool = True, + base_margin: Optional[_DaskCollection] = None + ) -> Any: _assert_dask_support() + msg = '`ntree_limit` is not supported on dask, use model slicing instead.' + assert ntree_limit is None, msg return self.client.sync( self._predict_async, data, output_margin=output_margin, + validate_features=validate_features, base_margin=base_margin ) diff --git a/python-package/xgboost/libpath.py b/python-package/xgboost/libpath.py index 8ca53fac6..f7a7d9cd3 100644 --- a/python-package/xgboost/libpath.py +++ b/python-package/xgboost/libpath.py @@ -3,6 +3,7 @@ import os import platform +from typing import List import sys @@ -10,12 +11,12 @@ class XGBoostLibraryNotFound(Exception): """Error thrown by when xgboost is not found""" -def find_lib_path(): +def find_lib_path() -> List[str]: """Find the path to xgboost dynamic library files. Returns ------- - lib_path: list(string) + lib_path List of all found library path to xgboost """ curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) diff --git a/python-package/xgboost/py.typed b/python-package/xgboost/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/python-package/xgboost/sklearn.py b/python-package/xgboost/sklearn.py index bf9da0a13..3a88aba2d 100644 --- a/python-package/xgboost/sklearn.py +++ b/python-package/xgboost/sklearn.py @@ -443,8 +443,8 @@ class XGBModel(XGBModelBase): except TypeError: warnings.warn(str(k) + ' is not saved in Scikit-Learn meta.') meta['type'] = type(self).__name__ - meta = json.dumps(meta) - self.get_booster().set_attr(scikit_learn=meta) + meta_str = json.dumps(meta) + self.get_booster().set_attr(scikit_learn=meta_str) self.get_booster().save_model(fname) # Delete the attribute after save self.get_booster().set_attr(scikit_learn=None) diff --git a/tests/python-gpu/test_gpu_with_dask.py b/tests/python-gpu/test_gpu_with_dask.py index 823536d18..dc5992859 100644 --- a/tests/python-gpu/test_gpu_with_dask.py +++ b/tests/python-gpu/test_gpu_with_dask.py @@ -1,5 +1,6 @@ import sys import os +from typing import Type, TypeVar, Any, Dict, List import pytest import numpy as np import asyncio @@ -25,15 +26,16 @@ try: from xgboost import dask as dxgb from dask.distributed import Client from dask import array as da + from dask_cuda import LocalCUDACluster import cudf except ImportError: pass -def run_with_dask_dataframe(DMatrixT, client): +def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None: import cupy as cp cp.cuda.runtime.setDevice(0) - X, y = generate_array() + X, y, _ = generate_array() X = dd.from_dask_array(X) y = dd.from_dask_array(y) @@ -68,7 +70,9 @@ def run_with_dask_dataframe(DMatrixT, client): predt = dxgb.predict(client, out, X) assert isinstance(predt, dd.Series) - def is_df(part): + T = TypeVar('T') + + def is_df(part: T) -> T: assert isinstance(part, cudf.DataFrame), part return part @@ -80,10 +84,10 @@ def run_with_dask_dataframe(DMatrixT, client): predt.values.compute(), single_node) -def run_with_dask_array(DMatrixT, client): +def run_with_dask_array(DMatrixT: Type, client: Client) -> None: import cupy as cp cp.cuda.runtime.setDevice(0) - X, y = generate_array() + X, y, _ = generate_array() X = X.map_blocks(cp.asarray) y = y.map_blocks(cp.asarray) @@ -108,7 +112,7 @@ def run_with_dask_array(DMatrixT, client): inplace_predictions) -def to_cp(x, DMatrixT): +def to_cp(x: Any, DMatrixT: Type) -> Any: import cupy if isinstance(x, np.ndarray) and \ DMatrixT is dxgb.DaskDeviceQuantileDMatrix: @@ -118,7 +122,13 @@ def to_cp(x, DMatrixT): return X -def run_gpu_hist(params, num_rounds, dataset, DMatrixT, client): +def run_gpu_hist( + params: Dict, + num_rounds: int, + dataset: tm.TestDataset, + DMatrixT: Type, + client: Client +) -> None: params['tree_method'] = 'gpu_hist' params = dataset.set_params(params) # It doesn't make sense to distribute a completely @@ -156,7 +166,7 @@ class TestDistributedGPU: @pytest.mark.skipif(**tm.no_dask_cudf()) @pytest.mark.skipif(**tm.no_dask_cuda()) @pytest.mark.mgpu - def test_dask_dataframe(self, local_cuda_cluster): + def test_dask_dataframe(self, local_cuda_cluster: LocalCUDACluster) -> None: with Client(local_cuda_cluster) as client: run_with_dask_dataframe(dxgb.DaskDMatrix, client) run_with_dask_dataframe(dxgb.DaskDeviceQuantileDMatrix, client) @@ -168,7 +178,13 @@ class TestDistributedGPU: @pytest.mark.skipif(**tm.no_dask_cuda()) @pytest.mark.parametrize('local_cuda_cluster', [{'n_workers': 2}], indirect=['local_cuda_cluster']) @pytest.mark.mgpu - def test_gpu_hist(self, params, num_rounds, dataset, local_cuda_cluster): + def test_gpu_hist( + self, + params: Dict, + num_rounds: int, + dataset: tm.TestDataset, + local_cuda_cluster: LocalCUDACluster + ) -> None: with Client(local_cuda_cluster) as client: run_gpu_hist(params, num_rounds, dataset, dxgb.DaskDMatrix, client) @@ -179,7 +195,7 @@ class TestDistributedGPU: @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.skipif(**tm.no_dask_cuda()) @pytest.mark.mgpu - def test_dask_array(self, local_cuda_cluster): + def test_dask_array(self, local_cuda_cluster: LocalCUDACluster) -> None: with Client(local_cuda_cluster) as client: run_with_dask_array(dxgb.DaskDMatrix, client) run_with_dask_array(dxgb.DaskDeviceQuantileDMatrix, client) @@ -187,9 +203,8 @@ class TestDistributedGPU: @pytest.mark.skipif(**tm.no_cupy()) @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.skipif(**tm.no_dask_cuda()) - def test_early_stopping(self, local_cuda_cluster): + def test_early_stopping(self, local_cuda_cluster: LocalCUDACluster) -> None: from sklearn.datasets import load_breast_cancer - import cupy with Client(local_cuda_cluster) as client: X, y = load_breast_cancer(return_X_y=True) X, y = da.from_array(X), da.from_array(y) @@ -224,14 +239,14 @@ class TestDistributedGPU: @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.skipif(**tm.no_dask_cuda()) @pytest.mark.mgpu - def test_empty_dmatrix(self, local_cuda_cluster): + def test_empty_dmatrix(self, local_cuda_cluster: LocalCUDACluster) -> None: with Client(local_cuda_cluster) as client: parameters = {'tree_method': 'gpu_hist', 'debug_synchronize': True} run_empty_dmatrix_reg(client, parameters) run_empty_dmatrix_cls(client, parameters) - def run_quantile(self, name, local_cuda_cluster): + def run_quantile(self, name: str, local_cuda_cluster: LocalCUDACluster) -> None: if sys.platform.startswith("win"): pytest.skip("Skipping dask tests on Windows") @@ -243,16 +258,18 @@ class TestDistributedGPU: assert exe, 'No testxgboost executable found.' test = "--gtest_filter=GPUQuantile." + name - def runit(worker_addr, rabit_args): - port = None + def runit( + worker_addr: str, rabit_args: List[bytes] + ) -> subprocess.CompletedProcess: + port_env = '' # setup environment for running the c++ part. for arg in rabit_args: if arg.decode('utf-8').startswith('DMLC_TRACKER_PORT'): - port = arg.decode('utf-8') - port = port.split('=') + port_env = arg.decode('utf-8') + port = port_env.split('=') env = os.environ.copy() env[port[0]] = port[1] - return subprocess.run([exe, test], env=env, stdout=subprocess.PIPE) + return subprocess.run([str(exe), test], env=env, stdout=subprocess.PIPE) with Client(local_cuda_cluster) as client: workers = list(_get_client_workers(client).keys()) @@ -272,21 +289,23 @@ class TestDistributedGPU: @pytest.mark.skipif(**tm.no_dask_cuda()) @pytest.mark.mgpu @pytest.mark.gtest - def test_quantile_basic(self, local_cuda_cluster): + def test_quantile_basic(self, local_cuda_cluster: LocalCUDACluster) -> None: self.run_quantile('AllReduceBasic', local_cuda_cluster) @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.skipif(**tm.no_dask_cuda()) @pytest.mark.mgpu @pytest.mark.gtest - def test_quantile_same_on_all_workers(self, local_cuda_cluster): + def test_quantile_same_on_all_workers( + self, local_cuda_cluster: LocalCUDACluster + ) -> None: self.run_quantile('SameOnAllWorkers', local_cuda_cluster) -async def run_from_dask_array_asyncio(scheduler_address): +async def run_from_dask_array_asyncio(scheduler_address: str) -> dxgb.TrainReturnT: async with Client(scheduler_address, asynchronous=True) as client: import cupy as cp - X, y = generate_array() + X, y, _ = generate_array() X = X.map_blocks(cp.array) y = y.map_blocks(cp.array) @@ -313,7 +332,7 @@ async def run_from_dask_array_asyncio(scheduler_address): @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.skipif(**tm.no_dask_cuda()) @pytest.mark.mgpu -def test_with_asyncio(local_cuda_cluster): +def test_with_asyncio(local_cuda_cluster: LocalCUDACluster) -> None: with Client(local_cuda_cluster) as client: address = client.scheduler.address output = asyncio.run(run_from_dask_array_asyncio(address)) diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index ae0b5e8e1..9826ddbff 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -1,9 +1,12 @@ +from pathlib import Path + import testing as tm import pytest import xgboost as xgb import sys import numpy as np import json +from typing import List, Tuple, Union, Dict, Optional, Callable, Type import asyncio import tempfile from sklearn.datasets import make_classification @@ -19,56 +22,46 @@ if tm.no_dask()['condition']: pytest.skip(msg=tm.no_dask()['reason'], allow_module_level=True) -try: - from distributed import LocalCluster, Client, get_client - from distributed.utils_test import client, loop, cluster_fixture - import dask.dataframe as dd - import dask.array as da - from xgboost.dask import DaskDMatrix - import dask -except ImportError: - LocalCluster = None - Client = None - get_client = None - client = None - loop = None - cluster_fixture = None - dd = None - da = None - DaskDMatrix = None - dask = None +from distributed import LocalCluster, Client, get_client +from distributed.utils_test import client, loop, cluster_fixture +import dask.dataframe as dd +import dask.array as da +from xgboost.dask import DaskDMatrix + kRows = 1000 kCols = 10 kWorkers = 5 -def _get_client_workers(client): +def _get_client_workers(client: "Client") -> Dict[str, Dict]: workers = client.scheduler_info()['workers'] return workers -def generate_array(with_weights=False): +def generate_array( + with_weights: bool = False +) -> Tuple[xgb.dask._DaskCollection, xgb.dask._DaskCollection, + Optional[xgb.dask._DaskCollection]]: partition_size = 20 X = da.random.random((kRows, kCols), partition_size) y = da.random.random(kRows, partition_size) if with_weights: w = da.random.random(kRows, partition_size) return X, y, w - return X, y + return X, y, None -def test_from_dask_dataframe(): +def test_from_dask_dataframe() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: - X, y = generate_array() + X, y, _ = generate_array() X = dd.from_dask_array(X) y = dd.from_dask_array(y) dtrain = DaskDMatrix(client, X, y) - booster = xgb.dask.train( - client, {}, dtrain, num_boost_round=2)['booster'] + booster = xgb.dask.train(client, {}, dtrain, num_boost_round=2)['booster'] prediction = xgb.dask.predict(client, model=booster, data=dtrain) @@ -78,7 +71,7 @@ def test_from_dask_dataframe(): with pytest.raises(TypeError): # evals_result is not supported in dask interface. - xgb.dask.train( + xgb.dask.train( # type:ignore client, {}, dtrain, num_boost_round=2, evals_result={}) # force prediction to be computed from_dmatrix = prediction.compute() @@ -96,10 +89,10 @@ def test_from_dask_dataframe(): from_dmatrix) -def test_from_dask_array(): +def test_from_dask_array() -> None: with LocalCluster(n_workers=kWorkers, threads_per_worker=5) as cluster: with Client(cluster) as client: - X, y = generate_array() + X, y, _ = generate_array() dtrain = DaskDMatrix(client, X, y) # results is {'booster': Booster, 'history': {...}} result = xgb.dask.train(client, {}, dtrain) @@ -111,7 +104,7 @@ def test_from_dask_array(): # force prediction to be computed prediction = prediction.compute() - booster = result['booster'] + booster: xgb.Booster = result['booster'] single_node_predt = booster.predict( xgb.DMatrix(X.compute()) ) @@ -127,7 +120,7 @@ def test_from_dask_array(): assert np.all(single_node_predt == from_arr.compute()) -def test_dask_predict_shape_infer(): +def test_dask_predict_shape_infer() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: X, y = make_classification(n_samples=1000, n_informative=5, @@ -148,7 +141,7 @@ def test_dask_predict_shape_infer(): @pytest.mark.parametrize("tree_method", ["hist", "approx"]) -def test_boost_from_prediction(tree_method): +def test_boost_from_prediction(tree_method: str) -> None: if tree_method == 'approx': pytest.xfail(reason='test_boost_from_prediction[approx] is flaky') @@ -212,7 +205,7 @@ def test_boost_from_prediction(tree_method): np.testing.assert_almost_equal(proba_1.compute(), proba_2.compute()) -def test_dask_missing_value_reg(): +def test_dask_missing_value_reg() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: X_0 = np.ones((20 // 2, kCols)) @@ -236,7 +229,7 @@ def test_dask_missing_value_reg(): np.testing.assert_allclose(np_predt, dd_predt) -def test_dask_missing_value_cls(): +def test_dask_missing_value_cls() -> None: with LocalCluster() as cluster: with Client(cluster) as client: X_0 = np.ones((kRows // 2, kCols)) @@ -263,7 +256,7 @@ def test_dask_missing_value_cls(): assert hasattr(cls, 'missing') -def test_dask_regressor(): +def test_dask_regressor() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: X, y, w = generate_array(with_weights=True) @@ -285,7 +278,7 @@ def test_dask_regressor(): assert len(history['validation_0']['rmse']) == 2 -def test_dask_classifier(): +def test_dask_classifier() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: X, y, w = generate_array(with_weights=True) @@ -335,11 +328,11 @@ def test_dask_classifier(): @pytest.mark.skipif(**tm.no_sklearn()) -def test_sklearn_grid_search(): +def test_sklearn_grid_search() -> None: from sklearn.model_selection import GridSearchCV with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: - X, y = generate_array() + X, y, _ = generate_array() reg = xgb.dask.DaskXGBRegressor(learning_rate=0.1, tree_method='hist') reg.client = client @@ -353,7 +346,7 @@ def test_sklearn_grid_search(): assert len(means) == len(set(means)) -def test_empty_dmatrix_training_continuation(client): +def test_empty_dmatrix_training_continuation(client: "Client") -> None: kRows, kCols = 1, 97 X = dd.from_array(np.random.randn(kRows, kCols)) y = dd.from_array(np.random.rand(kRows)) @@ -377,8 +370,8 @@ def test_empty_dmatrix_training_continuation(client): assert xgb.dask.predict(client, out, dtrain).compute().shape[0] == 1 -def run_empty_dmatrix_reg(client, parameters): - def _check_outputs(out, predictions): +def run_empty_dmatrix_reg(client: "Client", parameters: dict) -> None: + def _check_outputs(out: xgb.dask.TrainReturnT, predictions: np.ndarray) -> None: assert isinstance(out['booster'], xgb.dask.Booster) assert len(out['history']['validation']['rmse']) == 2 assert isinstance(predictions, np.ndarray) @@ -426,10 +419,10 @@ def run_empty_dmatrix_reg(client, parameters): _check_outputs(out, predictions) -def run_empty_dmatrix_cls(client, parameters): +def run_empty_dmatrix_cls(client: "Client", parameters: dict) -> None: n_classes = 4 - def _check_outputs(out, predictions): + def _check_outputs(out: xgb.dask.TrainReturnT, predictions: np.ndarray) -> None: assert isinstance(out['booster'], xgb.dask.Booster) assert len(out['history']['validation']['merror']) == 2 assert isinstance(predictions, np.ndarray) @@ -472,7 +465,7 @@ def run_empty_dmatrix_cls(client, parameters): # No test for Exact, as empty DMatrix handling are mostly for distributed # environment and Exact doesn't support it. -def test_empty_dmatrix_hist(): +def test_empty_dmatrix_hist() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: parameters = {'tree_method': 'hist'} @@ -480,7 +473,7 @@ def test_empty_dmatrix_hist(): run_empty_dmatrix_cls(client, parameters) -def test_empty_dmatrix_approx(): +def test_empty_dmatrix_approx() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: parameters = {'tree_method': 'approx'} @@ -488,9 +481,9 @@ def test_empty_dmatrix_approx(): run_empty_dmatrix_cls(client, parameters) -async def run_from_dask_array_asyncio(scheduler_address): +async def run_from_dask_array_asyncio(scheduler_address: str) -> xgb.dask.TrainReturnT: async with Client(scheduler_address, asynchronous=True) as client: - X, y = generate_array() + X, y, _ = generate_array() m = await DaskDMatrix(client, X, y) output = await xgb.dask.train(client, {}, dtrain=m) @@ -510,9 +503,9 @@ async def run_from_dask_array_asyncio(scheduler_address): return output -async def run_dask_regressor_asyncio(scheduler_address): +async def run_dask_regressor_asyncio(scheduler_address: str) -> None: async with Client(scheduler_address, asynchronous=True) as client: - X, y = generate_array() + X, y, _ = generate_array() regressor = await xgb.dask.DaskXGBRegressor(verbosity=1, n_estimators=2) regressor.set_params(tree_method='hist') @@ -532,9 +525,9 @@ async def run_dask_regressor_asyncio(scheduler_address): assert len(history['validation_0']['rmse']) == 2 -async def run_dask_classifier_asyncio(scheduler_address): +async def run_dask_classifier_asyncio(scheduler_address: str) -> None: async with Client(scheduler_address, asynchronous=True) as client: - X, y = generate_array() + X, y, _ = generate_array() y = (y * 10).astype(np.int32) classifier = await xgb.dask.DaskXGBClassifier( verbosity=1, n_estimators=2, eval_metric='merror') @@ -574,7 +567,7 @@ async def run_dask_classifier_asyncio(scheduler_address): assert prediction.shape[0] == kRows -def test_with_asyncio(): +def test_with_asyncio() -> None: with LocalCluster() as cluster: with Client(cluster) as client: address = client.scheduler.address @@ -586,10 +579,10 @@ def test_with_asyncio(): asyncio.run(run_dask_classifier_asyncio(address)) -def test_predict(): +def test_predict() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: - X, y = generate_array() + X, y, _ = generate_array() dtrain = DaskDMatrix(client, X, y) booster = xgb.dask.train( client, {}, dtrain, num_boost_round=2)['booster'] @@ -610,13 +603,14 @@ def test_predict(): assert shap.shape[1] == kCols + 1 -def test_predict_with_meta(client): +def test_predict_with_meta(client: "Client") -> None: X, y, w = generate_array(with_weights=True) + assert w is not None partition_size = 20 margin = da.random.random(kRows, partition_size) + 1e4 dtrain = DaskDMatrix(client, X, y, weight=w, base_margin=margin) - booster = xgb.dask.train( + booster: xgb.Booster = xgb.dask.train( client, {}, dtrain, num_boost_round=4)['booster'] prediction = xgb.dask.predict(client, model=booster, data=dtrain) @@ -632,7 +626,7 @@ def test_predict_with_meta(client): assert np.all(prediction == single) -def run_aft_survival(client, dmatrix_t): +def run_aft_survival(client: "Client", dmatrix_t: Type) -> None: df = dd.read_csv(os.path.join(tm.PROJECT_ROOT, 'demo', 'data', 'veterans_lung_cancer.csv')) y_lower_bound = df['Survival_label_lower_bound'] @@ -669,39 +663,43 @@ def run_aft_survival(client, dmatrix_t): assert nloglik_rec['extreme'][-1] > 4.9 -def test_aft_survival(): +def test_aft_survival() -> None: with LocalCluster(n_workers=kWorkers) as cluster: with Client(cluster) as client: run_aft_survival(client, DaskDMatrix) class TestWithDask: - def test_global_config(self, client): - X, y = generate_array() + def test_global_config(self, client: "Client") -> None: + X, y, _ = generate_array() xgb.config.set_config(verbosity=0) dtrain = DaskDMatrix(client, X, y) before_fname = './before_training-test_global_config' after_fname = './after_training-test_global_config' class TestCallback(xgb.callback.TrainingCallback): - def write_file(self, fname): + def write_file(self, fname: str) -> None: with open(fname, 'w') as fd: fd.write(str(xgb.config.get_config()['verbosity'])) - def before_training(self, model): + def before_training(self, model: xgb.Booster) -> xgb.Booster: self.write_file(before_fname) assert xgb.config.get_config()['verbosity'] == 0 return model - def after_training(self, model): + def after_training(self, model: xgb.Booster) -> xgb.Booster: assert xgb.config.get_config()['verbosity'] == 0 return model - def before_iteration(self, model, epoch, evals_log): + def before_iteration( + self, model: xgb.Booster, epoch: int, evals_log: Dict + ) -> bool: assert xgb.config.get_config()['verbosity'] == 0 return False - def after_iteration(self, model, epoch, evals_log): + def after_iteration( + self, model: xgb.Booster, epoch: int, evals_log: Dict + ) -> bool: self.write_file(after_fname) assert xgb.config.get_config()['verbosity'] == 0 return False @@ -716,8 +714,14 @@ class TestWithDask: os.remove(before_fname) os.remove(after_fname) - def run_updater_test(self, client, params, num_rounds, dataset, - tree_method): + def run_updater_test( + self, + client: "Client", + params: Dict, + num_rounds: int, + dataset: tm.TestDataset, + tree_method: str + ) -> None: params['tree_method'] = tree_method params = dataset.set_params(params) # It doesn't make sense to distribute a completely @@ -748,22 +752,26 @@ class TestWithDask: @given(params=hist_parameter_strategy, dataset=tm.dataset_strategy) @settings(deadline=None) - def test_hist(self, params, dataset, client): + def test_hist( + self, params: Dict, dataset: tm.TestDataset, client: "Client" + ) -> None: num_rounds = 30 self.run_updater_test(client, params, num_rounds, dataset, 'hist') @given(params=exact_parameter_strategy, dataset=tm.dataset_strategy) @settings(deadline=None) - def test_approx(self, client, params, dataset): + def test_approx( + self, client: "Client", params: Dict, dataset: tm.TestDataset + ) -> None: num_rounds = 30 self.run_updater_test(client, params, num_rounds, dataset, 'approx') - def run_quantile(self, name): + def run_quantile(self, name: str) -> None: if sys.platform.startswith("win"): pytest.skip("Skipping dask tests on Windows") - exe = None + exe: Optional[str] = None for possible_path in {'./testxgboost', './build/testxgboost', '../build/testxgboost', '../cpu-build/testxgboost'}: @@ -774,16 +782,16 @@ class TestWithDask: test = "--gtest_filter=Quantile." + name - def runit(worker_addr, rabit_args): - port = None + def runit(worker_addr: str, rabit_args: List[bytes]) -> subprocess.CompletedProcess: + port_env = '' # setup environment for running the c++ part. for arg in rabit_args: if arg.decode('utf-8').startswith('DMLC_TRACKER_PORT'): - port = arg.decode('utf-8') - port = port.split('=') + port_env = arg.decode('utf-8') + port = port_env.split('=') env = os.environ.copy() env[port[0]] = port[1] - return subprocess.run([exe, test], env=env, capture_output=True) + return subprocess.run([str(exe), test], env=env, capture_output=True) with LocalCluster(n_workers=4) as cluster: with Client(cluster) as client: @@ -804,20 +812,20 @@ class TestWithDask: @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.gtest - def test_quantile_basic(self): + def test_quantile_basic(self) -> None: self.run_quantile('DistributedBasic') @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.gtest - def test_quantile(self): + def test_quantile(self) -> None: self.run_quantile('Distributed') @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.gtest - def test_quantile_same_on_all_workers(self): + def test_quantile_same_on_all_workers(self) -> None: self.run_quantile('SameOnAllWorkers') - def test_n_workers(self): + def test_n_workers(self) -> None: with LocalCluster(n_workers=2) as cluster: with Client(cluster) as client: workers = list(_get_client_workers(client).keys()) @@ -837,7 +845,7 @@ class TestWithDask: assert len(merged) == 2 @pytest.mark.skipif(**tm.no_dask()) - def test_feature_weights(self, client): + def test_feature_weights(self, client: "Client") -> None: kRows = 1024 kCols = 64 @@ -863,7 +871,7 @@ class TestWithDask: @pytest.mark.skipif(**tm.no_dask()) @pytest.mark.skipif(**tm.no_sklearn()) - def test_custom_objective(self, client): + def test_custom_objective(self, client: "Client") -> None: from sklearn.datasets import load_boston X, y = load_boston(return_X_y=True) X, y = da.from_array(X), da.from_array(y) @@ -872,7 +880,7 @@ class TestWithDask: with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'log') - def sqr(labels, predts): + def sqr(labels: np.ndarray, predts: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: with open(path, 'a') as fd: print('Running sqr', file=fd) grad = predts - labels @@ -898,21 +906,21 @@ class TestWithDask: results_native['validation_0']['rmse']) tm.non_increasing(results_native['validation_0']['rmse']) - def test_data_initialization(self): + def test_data_initialization(self) -> None: '''Assert each worker has the correct amount of data, and DMatrix initialization doesn't generate unnecessary copies of data. ''' with LocalCluster(n_workers=2) as cluster: with Client(cluster) as client: - X, y = generate_array() + X, y, _ = generate_array() n_partitions = X.npartitions m = xgb.dask.DaskDMatrix(client, X, y) workers = list(_get_client_workers(client).keys()) rabit_args = client.sync(xgb.dask._get_rabit_args, len(workers), client) n_workers = len(workers) - def worker_fn(worker_addr, data_ref): + def worker_fn(worker_addr: str, data_ref: Dict) -> None: with xgb.dask.RabitContext(rabit_args): local_dtrain = xgb.dask._dmatrix_from_list_of_parts(**data_ref) total = np.array([local_dtrain.num_row()]) @@ -941,7 +949,7 @@ class TestWithDask: class TestDaskCallbacks: @pytest.mark.skipif(**tm.no_sklearn()) - def test_early_stopping(self, client): + def test_early_stopping(self, client: "Client") -> None: from sklearn.datasets import load_breast_cancer X, y = load_breast_cancer(return_X_y=True) X, y = da.from_array(X), da.from_array(y) @@ -983,7 +991,7 @@ class TestDaskCallbacks: assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 @pytest.mark.skipif(**tm.no_sklearn()) - def test_early_stopping_custom_eval(self, client): + def test_early_stopping_custom_eval(self, client: "Client") -> None: from sklearn.datasets import load_breast_cancer X, y = load_breast_cancer(return_X_y=True) X, y = da.from_array(X), da.from_array(y) @@ -1015,7 +1023,7 @@ class TestDaskCallbacks: assert len(dump) - booster.best_iteration == early_stopping_rounds + 1 @pytest.mark.skipif(**tm.no_sklearn()) - def test_callback(self, client): + def test_callback(self, client: "Client") -> None: from sklearn.datasets import load_breast_cancer X, y = load_breast_cancer(return_X_y=True) X, y = da.from_array(X), da.from_array(y) @@ -1025,9 +1033,11 @@ class TestDaskCallbacks: cls.client = client with tempfile.TemporaryDirectory() as tmpdir: - cls.fit(X, y, callbacks=[xgb.callback.TrainingCheckPoint(directory=tmpdir, - iterations=1, - name='model')]) + cls.fit(X, y, callbacks=[xgb.callback.TrainingCheckPoint( + directory=Path(tmpdir), + iterations=1, + name='model' + )]) for i in range(1, 10): assert os.path.exists( os.path.join(tmpdir, 'model_' + str(i) + '.json'))