Init estimation for regression. (#8272)

This commit is contained in:
Jiaming Yuan
2023-01-11 02:04:56 +08:00
committed by GitHub
parent 1b58d81315
commit badeff1d74
29 changed files with 466 additions and 132 deletions

View File

@@ -12,7 +12,7 @@ from itertools import starmap
from math import ceil
from operator import attrgetter, getitem
from pathlib import Path
from typing import Any, Dict, Generator, Optional, Tuple, Type, Union
from typing import Any, Dict, Generator, Optional, Tuple, Type, TypeVar, Union
import hypothesis
import numpy as np
@@ -32,7 +32,7 @@ from xgboost.testing.shared import (
import xgboost as xgb
from xgboost import testing as tm
pytestmark = [tm.timeout(30), pytest.mark.skipif(**tm.no_dask())]
pytestmark = [tm.timeout(60), pytest.mark.skipif(**tm.no_dask())]
import dask
import dask.array as da
@@ -40,6 +40,7 @@ import dask.dataframe as dd
from distributed import Client, LocalCluster
from toolz import sliding_window # dependency of dask
from xgboost.dask import DaskDMatrix
from xgboost.testing.dask import check_init_estimation
dask.config.set({"distributed.scheduler.allowed-failures": False})
@@ -52,8 +53,10 @@ else:
@pytest.fixture(scope="module")
def cluster() -> Generator:
n_threads = os.cpu_count()
assert n_threads is not None
with LocalCluster(
n_workers=2, threads_per_worker=2, dashboard_address=":0"
n_workers=2, threads_per_worker=n_threads // 2, dashboard_address=":0"
) as dask_cluster:
yield dask_cluster
@@ -151,12 +154,15 @@ def deterministic_persist_per_worker(df: dd.DataFrame, client: "Client") -> dd.D
return df2
Margin = TypeVar("Margin", dd.DataFrame, dd.Series, None)
def deterministic_repartition(
client: Client,
X: dd.DataFrame,
y: dd.Series,
m: Optional[Union[dd.DataFrame, dd.Series]],
) -> Tuple[dd.DataFrame, dd.Series, Optional[Union[dd.DataFrame, dd.Series]]]:
m: Margin,
) -> Tuple[dd.DataFrame, dd.Series, Margin]:
# force repartition the data to avoid non-deterministic result
if any(X.map_partitions(lambda x: _is_cudf_df(x)).compute()):
# dask_cudf seems to be doing fine for now
@@ -474,14 +480,20 @@ def run_boost_from_prediction(
X, y, margin = deterministic_repartition(client, X, y, margin)
predictions_1: dd.Series = model_1.predict(X, base_margin=margin)
cls_2 = xgb.dask.DaskXGBClassifier(
model_2 = xgb.dask.DaskXGBClassifier(
learning_rate=0.3, n_estimators=8, tree_method=tree_method, max_bin=512
)
X, y, _ = deterministic_repartition(client, X, y, None)
cls_2.fit(X=X, y=y)
predictions_2: dd.Series = cls_2.predict(X)
model_2.fit(X=X, y=y)
predictions_2: dd.Series = model_2.predict(X)
assert np.all(predictions_1.compute() == predictions_2.compute())
predt_1 = predictions_1.compute()
predt_2 = predictions_2.compute()
if hasattr(predt_1, "to_numpy"):
predt_1 = predt_1.to_numpy()
if hasattr(predt_2, "to_numpy"):
predt_2 = predt_2.to_numpy()
np.testing.assert_allclose(predt_1, predt_2, atol=1e-5)
margined = xgb.dask.DaskXGBClassifier(n_estimators=4)
X, y, margin = deterministic_repartition(client, X, y, margin)
@@ -706,6 +718,7 @@ def run_dask_classifier(
def test_dask_classifier(model: str, client: "Client") -> None:
X, y, w = generate_array(with_weights=True)
y = (y * 10).astype(np.int32)
assert w is not None
run_dask_classifier(X, y, w, model, None, client, 10)
y_bin = y.copy()
@@ -1386,16 +1399,22 @@ class TestWithDask:
else:
w = None
m = xgb.dask.DaskDMatrix(
client, data=X, label=y, weight=w)
history = xgb.dask.train(client, params=params, dtrain=m,
num_boost_round=num_rounds,
evals=[(m, 'train')])['history']
m = xgb.dask.DaskDMatrix(client, data=X, label=y, weight=w)
history = xgb.dask.train(
client,
params=params,
dtrain=m,
num_boost_round=num_rounds,
evals=[(m, "train")],
)["history"]
note(history)
history = history['train'][dataset.metric]
history = history["train"][dataset.metric]
def is_stump() -> bool:
return params["max_depth"] == 1 or params["max_leaves"] == 1
def is_stump():
return (
params.get("max_depth", None) == 1
or params.get("max_leaves", None) == 1
)
def minimum_bin() -> bool:
return "max_bin" in params and params["max_bin"] == 2
@@ -1410,7 +1429,11 @@ class TestWithDask:
else:
assert tm.non_increasing(history)
# Make sure that it's decreasing
assert history[-1] < history[0]
if is_stump():
# we might have already got the best score with base_score.
assert history[-1] <= history[0]
else:
assert history[-1] < history[0]
@given(params=hist_parameter_strategy,
dataset=tm.dataset_strategy)
@@ -1646,13 +1669,17 @@ class TestWithDask:
results_custom = reg.evals_result()
reg = xgb.dask.DaskXGBRegressor(n_estimators=rounds, tree_method='hist')
reg = xgb.dask.DaskXGBRegressor(
n_estimators=rounds, tree_method="hist", base_score=0.5
)
reg.fit(X, y, eval_set=[(X, y)])
results_native = reg.evals_result()
np.testing.assert_allclose(results_custom['validation_0']['rmse'],
results_native['validation_0']['rmse'])
tm.non_increasing(results_native['validation_0']['rmse'])
np.testing.assert_allclose(
results_custom["validation_0"]["rmse"],
results_native["validation_0"]["rmse"],
)
tm.non_increasing(results_native["validation_0"]["rmse"])
def test_no_duplicated_partition(self) -> None:
'''Assert each worker has the correct amount of data, and DMatrix initialization doesn't
@@ -1994,6 +2021,10 @@ def test_parallel_submit_multi_clients() -> None:
assert f.result().get_booster().num_boosted_rounds() == i + 1
def test_init_estimation(client: Client) -> None:
check_init_estimation("hist", client)
class TestDaskCallbacks:
@pytest.mark.skipif(**tm.no_sklearn())
def test_early_stopping(self, client: "Client") -> None: