diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index c8dda3ae9..0dc944e5d 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -969,6 +969,29 @@ def _can_output_df(is_df: bool, output_shape: Tuple) -> bool: return is_df and len(output_shape) <= 2 +def _maybe_dataframe( + data: Any, prediction: Any, columns: List[int], is_df: bool +) -> Any: + """Return dataframe for prediction when applicable.""" + if _can_output_df(is_df, prediction.shape): + # Need to preserve the index for dataframe. + # See issue: https://github.com/dmlc/xgboost/issues/6939 + # In older versions of dask, the partition is actually a numpy array when input is + # dataframe. + index = getattr(data, "index", None) + if lazy_isinstance(data, "cudf.core.dataframe", "DataFrame"): + import cudf + + prediction = cudf.DataFrame( + prediction, columns=columns, dtype=numpy.float32, index=index + ) + else: + prediction = DataFrame( + prediction, columns=columns, dtype=numpy.float32, index=index + ) + return prediction + + async def _direct_predict_impl( # pylint: disable=too-many-branches mapped_predict: Callable, booster: "distributed.Future", @@ -1125,13 +1148,7 @@ async def _predict_async( iteration_range=iteration_range, strict_shape=strict_shape, ) - if _can_output_df(is_df, predt.shape): - if lazy_isinstance(partition, "cudf", "core.dataframe.DataFrame"): - import cudf - - predt = cudf.DataFrame(predt, columns=columns, dtype=numpy.float32) - else: - predt = DataFrame(predt, columns=columns, dtype=numpy.float32) + predt = _maybe_dataframe(partition, predt, columns, is_df) return predt # Predict on dask collection directly. @@ -1315,11 +1332,11 @@ async def _inplace_predict_async( # pylint: disable=too-many-branches raise TypeError(_expect([da.Array, dd.DataFrame, dd.Series], type(base_margin))) def mapped_predict( - booster: Booster, data: Any, is_df: bool, columns: List[int], base_margin: Any + booster: Booster, partition: Any, is_df: bool, columns: List[int], base_margin: Any ) -> Any: with config.config_context(**global_config): prediction = booster.inplace_predict( - data, + partition, iteration_range=iteration_range, predict_type=predict_type, missing=missing, @@ -1327,17 +1344,9 @@ async def _inplace_predict_async( # pylint: disable=too-many-branches validate_features=validate_features, strict_shape=strict_shape, ) - if _can_output_df(is_df, prediction.shape): - if lazy_isinstance(data, "cudf.core.dataframe", "DataFrame"): - import cudf - - prediction = cudf.DataFrame( - prediction, columns=columns, dtype=numpy.float32 - ) - else: - # If it's from pandas, the partition is a numpy array - prediction = DataFrame(prediction, columns=columns, dtype=numpy.float32) + prediction = _maybe_dataframe(partition, prediction, columns, is_df) return prediction + # await turns future into value. shape, meta = await client.compute( client.submit( diff --git a/tests/python-gpu/test_gpu_with_dask.py b/tests/python-gpu/test_gpu_with_dask.py index 78b7fc088..69f200cf9 100644 --- a/tests/python-gpu/test_gpu_with_dask.py +++ b/tests/python-gpu/test_gpu_with_dask.py @@ -62,19 +62,17 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None: assert isinstance(out['booster'], dxgb.Booster) assert len(out['history']['X']['rmse']) == 4 - predictions = dxgb.predict(client, out, dtrain).compute() - assert isinstance(predictions, np.ndarray) + predictions = dxgb.predict(client, out, dtrain) + assert isinstance(predictions.compute(), np.ndarray) series_predictions = dxgb.inplace_predict(client, out, X) assert isinstance(series_predictions, dd.Series) - series_predictions = series_predictions.compute() - single_node = out['booster'].predict( - xgboost.DMatrix(X.compute())) + single_node = out['booster'].predict(xgboost.DMatrix(X.compute())) - cp.testing.assert_allclose(single_node, predictions) + cp.testing.assert_allclose(single_node, predictions.compute()) np.testing.assert_allclose(single_node, - series_predictions.to_array()) + series_predictions.compute().to_array()) predt = dxgb.predict(client, out, X) assert isinstance(predt, dd.Series) @@ -92,6 +90,13 @@ def run_with_dask_dataframe(DMatrixT: Type, client: Client) -> None: cp.testing.assert_allclose( predt.values.compute(), single_node) + # Make sure the output can be integrated back to original dataframe + X["predict"] = predictions + X["inplace_predict"] = series_predictions + + has_null = X.isnull().values.any().compute() + assert bool(has_null) is False + def run_with_dask_array(DMatrixT: Type, client: Client) -> None: import cupy as cp diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index eae0f54b1..b69e41b72 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -100,6 +100,12 @@ def test_from_dask_dataframe() -> None: np.testing.assert_allclose(series_predictions.compute().values, from_dmatrix) + # Make sure the output can be integrated back to original dataframe + X["predict"] = prediction + X["inplace_predict"] = series_predictions + + assert bool(X.isnull().values.any().compute()) is False + def test_from_dask_array() -> None: with LocalCluster(n_workers=kWorkers, threads_per_worker=5) as cluster: