[dask] Set dataframe index in predict. (#6944)

This commit is contained in:
Jiaming Yuan 2021-05-12 13:24:21 +08:00 committed by GitHub
parent 3e7e426b36
commit 05ac415780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 26 deletions

View File

@ -969,6 +969,29 @@ def _can_output_df(is_df: bool, output_shape: Tuple) -> bool:
return is_df and len(output_shape) <= 2
def _maybe_dataframe(
data: Any, prediction: Any, columns: List[int], is_df: bool
) -> Any:
"""Return dataframe for prediction when applicable."""
if _can_output_df(is_df, prediction.shape):
# Need to preserve the index for dataframe.
# See issue: https://github.com/dmlc/xgboost/issues/6939
# In older versions of dask, the partition is actually a numpy array when input is
# dataframe.
index = getattr(data, "index", None)
if lazy_isinstance(data, "cudf.core.dataframe", "DataFrame"):
import cudf
prediction = cudf.DataFrame(
prediction, columns=columns, dtype=numpy.float32, index=index
)
else:
prediction = DataFrame(
prediction, columns=columns, dtype=numpy.float32, index=index
)
return prediction
async def _direct_predict_impl( # pylint: disable=too-many-branches
mapped_predict: Callable,
booster: "distributed.Future",
@ -1125,13 +1148,7 @@ async def _predict_async(
iteration_range=iteration_range,
strict_shape=strict_shape,
)
if _can_output_df(is_df, predt.shape):
if lazy_isinstance(partition, "cudf", "core.dataframe.DataFrame"):
import cudf
predt = cudf.DataFrame(predt, columns=columns, dtype=numpy.float32)
else:
predt = DataFrame(predt, columns=columns, dtype=numpy.float32)
predt = _maybe_dataframe(partition, predt, columns, is_df)
return predt
# Predict on dask collection directly.
@ -1315,11 +1332,11 @@ async def _inplace_predict_async( # pylint: disable=too-many-branches
raise TypeError(_expect([da.Array, dd.DataFrame, dd.Series], type(base_margin)))
def mapped_predict(
booster: Booster, data: Any, is_df: bool, columns: List[int], base_margin: Any
booster: Booster, partition: Any, is_df: bool, columns: List[int], base_margin: Any
) -> Any:
with config.config_context(**global_config):
prediction = booster.inplace_predict(
data,
partition,
iteration_range=iteration_range,
predict_type=predict_type,
missing=missing,
@ -1327,17 +1344,9 @@ async def _inplace_predict_async( # pylint: disable=too-many-branches
validate_features=validate_features,
strict_shape=strict_shape,
)
if _can_output_df(is_df, prediction.shape):
if lazy_isinstance(data, "cudf.core.dataframe", "DataFrame"):
import cudf
prediction = cudf.DataFrame(
prediction, columns=columns, dtype=numpy.float32
)
else:
# If it's from pandas, the partition is a numpy array
prediction = DataFrame(prediction, columns=columns, dtype=numpy.float32)
prediction = _maybe_dataframe(partition, prediction, columns, is_df)
return prediction
# await turns future into value.
shape, meta = await client.compute(
client.submit(

View File

@ -62,19 +62,17 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None:
assert isinstance(out['booster'], dxgb.Booster)
assert len(out['history']['X']['rmse']) == 4
predictions = dxgb.predict(client, out, dtrain).compute()
assert isinstance(predictions, np.ndarray)
predictions = dxgb.predict(client, out, dtrain)
assert isinstance(predictions.compute(), np.ndarray)
series_predictions = dxgb.inplace_predict(client, out, X)
assert isinstance(series_predictions, dd.Series)
series_predictions = series_predictions.compute()
single_node = out['booster'].predict(
xgboost.DMatrix(X.compute()))
single_node = out['booster'].predict(xgboost.DMatrix(X.compute()))
cp.testing.assert_allclose(single_node, predictions)
cp.testing.assert_allclose(single_node, predictions.compute())
np.testing.assert_allclose(single_node,
series_predictions.to_array())
series_predictions.compute().to_array())
predt = dxgb.predict(client, out, X)
assert isinstance(predt, dd.Series)
@ -92,6 +90,13 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None:
cp.testing.assert_allclose(
predt.values.compute(), single_node)
# Make sure the output can be integrated back to original dataframe
X["predict"] = predictions
X["inplace_predict"] = series_predictions
has_null = X.isnull().values.any().compute()
assert bool(has_null) is False
def run_with_dask_array(DMatrixT: Type, client: Client) -> None:
import cupy as cp

View File

@ -100,6 +100,12 @@ def test_from_dask_dataframe() -> None:
np.testing.assert_allclose(series_predictions.compute().values,
from_dmatrix)
# Make sure the output can be integrated back to original dataframe
X["predict"] = prediction
X["inplace_predict"] = series_predictions
assert bool(X.isnull().values.any().compute()) is False
def test_from_dask_array() -> None:
with LocalCluster(n_workers=kWorkers, threads_per_worker=5) as cluster: