Jiaming Yuan 7eba285a1e
Support sklearn cross validation for ranker. (#8859)
* Support sklearn cross validation for ranker.

- Add a convention for X to include a special `qid` column.

sklearn utilities consider only `X`, `y` and `sample_weight` for supervised learning
algorithms, but we need an additional qid array for ranking.

It's important to be able to support the cross validation function in sklearn since all
other tuning functions like grid search are based on cross validation.
2023-03-07 00:22:08 +08:00

170 lines
4.2 KiB
Python

"""Compatibility shim for xgboost.rabit; to be removed in 2.0"""
import logging
import warnings
from enum import IntEnum, unique
from typing import Any, Callable, List, Optional, TypeVar
import numpy as np
from . import collective
LOGGER = logging.getLogger("[xgboost.rabit]")
def _deprecation_warning() -> str:
return (
"The xgboost.rabit submodule is marked as deprecated in 1.7 and will be removed "
"in 2.0. Please use xgboost.collective instead."
)
def init(args: Optional[List[bytes]] = None) -> None:
"""Initialize the rabit library with arguments"""
warnings.warn(_deprecation_warning(), FutureWarning)
parsed = {}
if args:
for arg in args:
kv = arg.decode().split("=")
if len(kv) == 2:
parsed[kv[0]] = kv[1]
collective.init(**parsed)
def finalize() -> None:
"""Finalize the process, notify tracker everything is done."""
collective.finalize()
def get_rank() -> int:
"""Get rank of current process.
Returns
-------
rank : int
Rank of current process.
"""
return collective.get_rank()
def get_world_size() -> int:
"""Get total number workers.
Returns
-------
n : int
Total number of process.
"""
return collective.get_world_size()
def is_distributed() -> int:
"""If rabit is distributed."""
return collective.is_distributed()
def tracker_print(msg: Any) -> None:
"""Print message to the tracker.
This function can be used to communicate the information of
the progress to the tracker
Parameters
----------
msg : str
The message to be printed to tracker.
"""
collective.communicator_print(msg)
def get_processor_name() -> bytes:
"""Get the processor name.
Returns
-------
name : str
the name of processor(host)
"""
return collective.get_processor_name().encode()
T = TypeVar("T") # pylint:disable=invalid-name
def broadcast(data: T, root: int) -> T:
"""Broadcast object from one node to all other nodes.
Parameters
----------
data : any type that can be pickled
Input data, if current rank does not equal root, this can be None
root : int
Rank of the node to broadcast data from.
Returns
-------
object : int
the result of broadcast.
"""
return collective.broadcast(data, root)
@unique
class Op(IntEnum):
"""Supported operations for rabit."""
MAX = 0
MIN = 1
SUM = 2
OR = 3
def allreduce( # pylint:disable=invalid-name
data: np.ndarray, op: Op, prepare_fun: Optional[Callable[[np.ndarray], None]] = None
) -> np.ndarray:
"""Perform allreduce, return the result.
Parameters
----------
data :
Input data.
op :
Reduction operators, can be MIN, MAX, SUM, BITOR
prepare_fun :
Lazy preprocessing function, if it is not None, prepare_fun(data)
will be called by the function before performing allreduce, to initialize the data
If the result of Allreduce can be recovered directly,
then prepare_fun will NOT be called
Returns
-------
result :
The result of allreduce, have same shape as data
Notes
-----
This function is not thread-safe.
"""
if prepare_fun is None:
return collective.allreduce(data, collective.Op(op))
raise ValueError("preprocessing function is no longer supported")
def version_number() -> int:
"""Returns version number of current stored model.
This means how many calls to CheckPoint we made so far.
Returns
-------
version : int
Version number of currently stored model
"""
return 0
class RabitContext:
"""A context controlling rabit initialization and finalization."""
def __init__(self, args: Optional[List[bytes]] = None) -> None:
if args is None:
args = []
self.args = args
def __enter__(self) -> None:
init(self.args)
assert is_distributed()
LOGGER.warning(_deprecation_warning())
LOGGER.debug("-------------- rabit say hello ------------------")
def __exit__(self, *args: List) -> None:
finalize()
LOGGER.debug("--------------- rabit say bye ------------------")