[breaking] Bump Python requirement to 3.10. (#10434)

- Bump the Python requirement.
- Fix type hints.
- Use loky to avoid deadlock.
- Workaround cupy-numpy compatibility issue on Windows caused by the `safe` casting rule.
- Simplify the repartitioning logic to avoid dask errors.
This commit is contained in:
Jiaming Yuan
2024-07-30 17:31:06 +08:00
committed by GitHub
parent 757aafc131
commit 827d0e8edb
33 changed files with 284 additions and 286 deletions

View File

@@ -1,4 +1,4 @@
"""Copyright 2019-2022 XGBoost contributors"""
"""Copyright 2019-2024, XGBoost contributors"""
import asyncio
import json
@@ -7,12 +7,24 @@ import pickle
import socket
import tempfile
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from functools import partial
from itertools import starmap
from math import ceil
from operator import attrgetter, getitem
from pathlib import Path
from typing import Any, Dict, Generator, Literal, Optional, Tuple, Type, TypeVar, Union
from typing import (
Any,
Dict,
Generator,
List,
Literal,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
import hypothesis
import numpy as np
@@ -133,34 +145,6 @@ def generate_array(
return X, y, None
def deterministic_persist_per_worker(
df: dd.DataFrame, client: "Client"
) -> dd.DataFrame:
# Got this script from https://github.com/dmlc/xgboost/issues/7927
# Query workers
n_workers = len(client.cluster.workers)
workers = map(attrgetter("worker_address"), client.cluster.workers.values())
# Slice data into roughly equal partitions
subpartition_size = ceil(df.npartitions / n_workers)
subpartition_divisions = range(
0, df.npartitions + subpartition_size, subpartition_size
)
subpartition_slices = starmap(slice, sliding_window(2, subpartition_divisions))
subpartitions = map(partial(getitem, df.partitions), subpartition_slices)
# Persist each subpartition on each worker
# Rebuild dataframe from persisted subpartitions
df2 = dd.concat(
[
sp.persist(workers=w, allow_other_workers=False)
for sp, w in zip(subpartitions, workers)
]
)
return df2
Margin = TypeVar("Margin", dd.DataFrame, dd.Series, None)
@@ -169,30 +153,14 @@ def deterministic_repartition(
X: dd.DataFrame,
y: dd.Series,
m: Margin,
divisions,
) -> Tuple[dd.DataFrame, dd.Series, Margin]:
# force repartition the data to avoid non-deterministic result
if any(X.map_partitions(lambda x: _is_cudf_df(x)).compute()):
# dask_cudf seems to be doing fine for now
return X, y, m
X["_y"] = y
if m is not None:
if isinstance(m, dd.DataFrame):
m_columns = m.columns
X = dd.concat([X, m], join="outer", axis=1)
else:
m_columns = ["_m"]
X["_m"] = m
X = deterministic_persist_per_worker(X, client)
y = X["_y"]
X = X[X.columns.difference(["_y"])]
if m is not None:
m = X[m_columns]
X = X[X.columns.difference(m_columns)]
return X, y, m
X, y, margin = (
dd.repartition(X, divisions=divisions, force=True),
dd.repartition(y, divisions=divisions, force=True),
dd.repartition(m, divisions=divisions, force=True) if m is not None else None,
)
return X, y, margin
@pytest.mark.parametrize("to_frame", [True, False])
@@ -218,10 +186,10 @@ def test_xgbclassifier_classes_type_and_value(to_frame: bool, client: "Client"):
def test_from_dask_dataframe() -> None:
with LocalCluster(n_workers=kWorkers, dashboard_address=":0") as cluster:
with Client(cluster) as client:
X, y, _ = generate_array()
X_, y_, _ = generate_array()
X = dd.from_dask_array(X)
y = dd.from_dask_array(y)
X = dd.from_dask_array(X_)
y = dd.from_dask_array(y_)
dtrain = DaskDMatrix(client, X, y)
booster = xgb.dask.train(client, {}, dtrain, num_boost_round=2)["booster"]
@@ -456,6 +424,7 @@ def run_boost_from_prediction_multi_class(
tree_method: str,
device: str,
client: "Client",
divisions: List[int],
) -> None:
model_0 = xgb.dask.DaskXGBClassifier(
learning_rate=0.3,
@@ -464,7 +433,7 @@ def run_boost_from_prediction_multi_class(
max_bin=768,
device=device,
)
X, y, _ = deterministic_repartition(client, X, y, None)
X, y, _ = deterministic_repartition(client, X, y, None, divisions)
model_0.fit(X=X, y=y)
margin = xgb.dask.inplace_predict(
client, model_0.get_booster(), X, predict_type="margin"
@@ -478,7 +447,7 @@ def run_boost_from_prediction_multi_class(
max_bin=768,
device=device,
)
X, y, margin = deterministic_repartition(client, X, y, margin)
X, y, margin = deterministic_repartition(client, X, y, margin, divisions)
model_1.fit(X=X, y=y, base_margin=margin)
predictions_1 = xgb.dask.predict(
client,
@@ -494,7 +463,7 @@ def run_boost_from_prediction_multi_class(
max_bin=768,
device=device,
)
X, y, _ = deterministic_repartition(client, X, y, None)
X, y, _ = deterministic_repartition(client, X, y, None, divisions)
model_2.fit(X=X, y=y)
predictions_2 = xgb.dask.inplace_predict(
client, model_2.get_booster(), X, predict_type="margin"
@@ -517,6 +486,7 @@ def run_boost_from_prediction(
tree_method: str,
device: str,
client: "Client",
divisions: List[int],
) -> None:
X, y = client.persist([X, y])
@@ -527,7 +497,7 @@ def run_boost_from_prediction(
max_bin=512,
device=device,
)
X, y, _ = deterministic_repartition(client, X, y, None)
X, y, _ = deterministic_repartition(client, X, y, None, divisions)
model_0.fit(X=X, y=y)
margin: dd.Series = model_0.predict(X, output_margin=True)
@@ -538,9 +508,9 @@ def run_boost_from_prediction(
max_bin=512,
device=device,
)
X, y, margin = deterministic_repartition(client, X, y, margin)
X, y, margin = deterministic_repartition(client, X, y, margin, divisions)
model_1.fit(X=X, y=y, base_margin=margin)
X, y, margin = deterministic_repartition(client, X, y, margin)
X, y, margin = deterministic_repartition(client, X, y, margin, divisions)
predictions_1: dd.Series = model_1.predict(X, base_margin=margin)
model_2 = xgb.dask.DaskXGBClassifier(
@@ -550,7 +520,7 @@ def run_boost_from_prediction(
max_bin=512,
device=device,
)
X, y, _ = deterministic_repartition(client, X, y, None)
X, y, _ = deterministic_repartition(client, X, y, None, divisions)
model_2.fit(X=X, y=y)
predictions_2: dd.Series = model_2.predict(X)
@@ -563,13 +533,13 @@ def run_boost_from_prediction(
np.testing.assert_allclose(predt_1, predt_2, atol=1e-5)
margined = xgb.dask.DaskXGBClassifier(n_estimators=4)
X, y, margin = deterministic_repartition(client, X, y, margin)
X, y, margin = deterministic_repartition(client, X, y, margin, divisions)
margined.fit(
X=X, y=y, base_margin=margin, eval_set=[(X, y)], base_margin_eval_set=[margin]
)
unmargined = xgb.dask.DaskXGBClassifier(n_estimators=4)
X, y, margin = deterministic_repartition(client, X, y, margin)
X, y, margin = deterministic_repartition(client, X, y, margin, divisions)
unmargined.fit(X=X, y=y, eval_set=[(X, y)], base_margin=margin)
margined_res = margined.evals_result()["validation_0"]["logloss"]
@@ -587,11 +557,13 @@ def test_boost_from_prediction(tree_method: str, client: "Client") -> None:
X_, y_ = load_breast_cancer(return_X_y=True)
X, y = dd.from_array(X_, chunksize=200), dd.from_array(y_, chunksize=200)
run_boost_from_prediction(X, y, tree_method, "cpu", client)
divisions = copy(X.divisions)
run_boost_from_prediction(X, y, tree_method, "cpu", client, divisions)
X_, y_ = load_digits(return_X_y=True)
X, y = dd.from_array(X_, chunksize=100), dd.from_array(y_, chunksize=100)
run_boost_from_prediction_multi_class(X, y, tree_method, "cpu", client)
divisions = copy(X.divisions)
run_boost_from_prediction_multi_class(X, y, tree_method, "cpu", client, divisions)
def test_inplace_predict(client: "Client") -> None:
@@ -1594,7 +1566,7 @@ class TestWithDask:
def test_empty_quantile_dmatrix(self, client: Client) -> None:
X, y = make_categorical(client, 2, 30, 13)
X_valid, y_valid = make_categorical(client, 10000, 30, 13)
X_valid, y_valid, _ = deterministic_repartition(client, X_valid, y_valid, None)
divisions = copy(X_valid.divisions)
Xy = xgb.dask.DaskQuantileDMatrix(client, X, y, enable_categorical=True)
Xy_valid = xgb.dask.DaskQuantileDMatrix(