Support dataframe data format in native XGBoost. (#9828)

- Implement a columnar adapter.
- Refactor Python pandas handling code to avoid converting into a single numpy array.
- Add support in R for transforming columns.
- Support R data.frame and factor type.
This commit is contained in:
Jiaming Yuan
2023-12-12 09:56:31 +08:00
committed by GitHub
parent b3700bbb3f
commit faf0f2df10
21 changed files with 718 additions and 221 deletions

View File

@@ -822,8 +822,7 @@ class DMatrix: # pylint: disable=too-many-instance-attributes,too-many-public-m
.. note:: This parameter is experimental
Experimental support of specializing for categorical features. Do not set
to True unless you are interested in development. Also, JSON/UBJSON
Experimental support of specializing for categorical features. JSON/UBJSON
serialization format is required.
"""
@@ -1431,6 +1430,12 @@ class _ProxyDMatrix(DMatrix):
_LIB.XGProxyDMatrixSetDataDense(self.handle, _array_interface(data))
)
def _set_data_from_pandas(self, data: DataType) -> None:
"""Set data from a pandas DataFrame. The input is a PandasTransformed instance."""
_check_call(
_LIB.XGProxyDMatrixSetDataColumnar(self.handle, data.array_interface())
)
def _set_data_from_csr(self, csr: scipy.sparse.csr_matrix) -> None:
"""Set data from scipy csr"""
from .data import _array_interface
@@ -2440,6 +2445,7 @@ class Booster:
assert proxy is None or isinstance(proxy, _ProxyDMatrix)
from .data import (
PandasTransformed,
_array_interface,
_arrow_transform,
_is_arrow,
@@ -2494,6 +2500,19 @@ class Booster:
)
)
return _prediction_output(shape, dims, preds, False)
if isinstance(data, PandasTransformed):
_check_call(
_LIB.XGBoosterPredictFromColumnar(
self.handle,
data.array_interface(),
args,
p_handle,
ctypes.byref(shape),
ctypes.byref(dims),
ctypes.byref(preds),
)
)
return _prediction_output(shape, dims, preds, False)
if isinstance(data, scipy.sparse.csr_matrix):
from .data import transform_scipy_sparse

View File

@@ -65,13 +65,18 @@ def _is_scipy_csr(data: DataType) -> bool:
return isinstance(data, scipy.sparse.csr_matrix)
def _array_interface(data: np.ndarray) -> bytes:
def _array_interface_dict(data: np.ndarray) -> dict:
assert (
data.dtype.hasobject is False
), "Input data contains `object` dtype. Expecting numeric data."
interface = data.__array_interface__
if "mask" in interface:
interface["mask"] = interface["mask"].__array_interface__
return interface
def _array_interface(data: np.ndarray) -> bytes:
interface = _array_interface_dict(data)
interface_str = bytes(json.dumps(interface), "utf-8")
return interface_str
@@ -265,24 +270,24 @@ pandas_nullable_mapper = {
"Int16": "int",
"Int32": "int",
"Int64": "int",
"UInt8": "i",
"UInt16": "i",
"UInt32": "i",
"UInt64": "i",
"UInt8": "int",
"UInt16": "int",
"UInt32": "int",
"UInt64": "int",
"Float32": "float",
"Float64": "float",
"boolean": "i",
}
pandas_pyarrow_mapper = {
"int8[pyarrow]": "i",
"int16[pyarrow]": "i",
"int32[pyarrow]": "i",
"int64[pyarrow]": "i",
"uint8[pyarrow]": "i",
"uint16[pyarrow]": "i",
"uint32[pyarrow]": "i",
"uint64[pyarrow]": "i",
"int8[pyarrow]": "int",
"int16[pyarrow]": "int",
"int32[pyarrow]": "int",
"int64[pyarrow]": "int",
"uint8[pyarrow]": "int",
"uint16[pyarrow]": "int",
"uint32[pyarrow]": "int",
"uint64[pyarrow]": "int",
"float[pyarrow]": "float",
"float32[pyarrow]": "float",
"double[pyarrow]": "float",
@@ -295,7 +300,7 @@ _pandas_dtype_mapper.update(pandas_pyarrow_mapper)
_ENABLE_CAT_ERR = (
"When categorical type is supplied, The experimental DMatrix parameter"
"When categorical type is supplied, the experimental DMatrix parameter"
"`enable_categorical` must be set to `True`."
)
@@ -407,89 +412,122 @@ def is_pd_sparse_dtype(dtype: PandasDType) -> bool:
return is_sparse(dtype)
def pandas_cat_null(data: DataFrame) -> DataFrame:
"""Handle categorical dtype and nullable extension types from pandas."""
import pandas as pd
# handle category codes and nullable.
cat_columns = []
nul_columns = []
# avoid an unnecessary conversion if possible
for col, dtype in zip(data.columns, data.dtypes):
if is_pd_cat_dtype(dtype):
cat_columns.append(col)
elif is_pa_ext_categorical_dtype(dtype):
raise ValueError(
"pyarrow dictionary type is not supported. Use pandas category instead."
)
elif is_nullable_dtype(dtype):
nul_columns.append(col)
if cat_columns or nul_columns:
# Avoid transformation due to: PerformanceWarning: DataFrame is highly
# fragmented
transformed = data.copy(deep=False)
else:
transformed = data
def cat_codes(ser: pd.Series) -> pd.Series:
if is_pd_cat_dtype(ser.dtype):
return ser.cat.codes
assert is_pa_ext_categorical_dtype(ser.dtype)
# Not yet supported, the index is not ordered for some reason. Alternately:
# `combine_chunks().to_pandas().cat.codes`. The result is the same.
return ser.array.__arrow_array__().combine_chunks().dictionary_encode().indices
if cat_columns:
# DF doesn't have the cat attribute, as a result, we use apply here
transformed[cat_columns] = (
transformed[cat_columns]
.apply(cat_codes)
.astype(np.float32)
.replace(-1.0, np.NaN)
)
if nul_columns:
transformed[nul_columns] = transformed[nul_columns].astype(np.float32)
# TODO(jiamingy): Investigate the possibility of using dataframe protocol or arrow
# IPC format for pandas so that we can apply the data transformation inside XGBoost
# for better memory efficiency.
return transformed
def pandas_ext_num_types(data: DataFrame) -> DataFrame:
"""Experimental suppport for handling pyarrow extension numeric types."""
def pandas_pa_type(ser: Any) -> np.ndarray:
"""Handle pandas pyarrow extention."""
import pandas as pd
import pyarrow as pa
# No copy, callstack:
# pandas.core.internals.managers.SingleBlockManager.array_values()
# pandas.core.internals.blocks.EABackedBlock.values
d_array: pd.arrays.ArrowExtensionArray = ser.array
# no copy in __arrow_array__
# ArrowExtensionArray._data is a chunked array
aa: pa.ChunkedArray = d_array.__arrow_array__()
# combine_chunks takes the most significant amount of time
chunk: pa.Array = aa.combine_chunks()
# When there's null value, we have to use copy
zero_copy = chunk.null_count == 0
# Alternately, we can use chunk.buffers(), which returns a list of buffers and
# we need to concatenate them ourselves.
# FIXME(jiamingy): Is there a better way to access the arrow buffer along with
# its mask?
# Buffers from chunk.buffers() have the address attribute, but don't expose the
# mask.
arr: np.ndarray = chunk.to_numpy(zero_copy_only=zero_copy, writable=False)
arr, _ = _ensure_np_dtype(arr, arr.dtype)
return arr
def pandas_transform_data(data: DataFrame) -> List[np.ndarray]:
"""Handle categorical dtype and extension types from pandas."""
import pandas as pd
from pandas import Float32Dtype, Float64Dtype
result: List[np.ndarray] = []
def cat_codes(ser: pd.Series) -> np.ndarray:
if is_pd_cat_dtype(ser.dtype):
return _ensure_np_dtype(
ser.cat.codes.astype(np.float32)
.replace(-1.0, np.NaN)
.to_numpy(na_value=np.nan),
np.float32,
)[0]
# Not yet supported, the index is not ordered for some reason. Alternately:
# `combine_chunks().to_pandas().cat.codes`. The result is the same.
assert is_pa_ext_categorical_dtype(ser.dtype)
return (
ser.array.__arrow_array__()
.combine_chunks()
.dictionary_encode()
.indices.astype(np.float32)
.replace(-1.0, np.NaN)
)
def nu_type(ser: pd.Series) -> np.ndarray:
# Avoid conversion when possible
if isinstance(dtype, Float32Dtype):
res_dtype: NumpyDType = np.float32
elif isinstance(dtype, Float64Dtype):
res_dtype = np.float64
else:
res_dtype = np.float32
return _ensure_np_dtype(
ser.to_numpy(dtype=res_dtype, na_value=np.nan), res_dtype
)[0]
def oth_type(ser: pd.Series) -> np.ndarray:
# The dtypes module is added in 1.25.
npdtypes = np.lib.NumpyVersion(np.__version__) > np.lib.NumpyVersion("1.25.0")
npdtypes = npdtypes and isinstance(
ser.dtype,
(
# pylint: disable=no-member
np.dtypes.Float32DType, # type: ignore
# pylint: disable=no-member
np.dtypes.Float64DType, # type: ignore
),
)
if npdtypes or dtype in {np.float32, np.float64}:
array = ser.to_numpy()
else:
# Specifying the dtype can significantly slow down the conversion (about
# 15% slow down for dense inplace-predict)
array = ser.to_numpy(dtype=np.float32, na_value=np.nan)
return _ensure_np_dtype(array, array.dtype)[0]
for col, dtype in zip(data.columns, data.dtypes):
if not is_pa_ext_dtype(dtype):
continue
# No copy, callstack:
# pandas.core.internals.managers.SingleBlockManager.array_values()
# pandas.core.internals.blocks.EABackedBlock.values
d_array: pd.arrays.ArrowExtensionArray = data[col].array
# no copy in __arrow_array__
# ArrowExtensionArray._data is a chunked array
aa: pa.ChunkedArray = d_array.__arrow_array__()
chunk: pa.Array = aa.combine_chunks()
# Alternately, we can use chunk.buffers(), which returns a list of buffers and
# we need to concatenate them ourselves.
arr = chunk.__array__()
data[col] = arr
return data
if is_pa_ext_categorical_dtype(dtype):
raise ValueError(
"pyarrow dictionary type is not supported. Use pandas category instead."
)
if is_pd_cat_dtype(dtype):
result.append(cat_codes(data[col]))
elif is_pa_ext_dtype(dtype):
result.append(pandas_pa_type(data[col]))
elif is_nullable_dtype(dtype):
result.append(nu_type(data[col]))
elif is_pd_sparse_dtype(dtype):
arr = cast(pd.arrays.SparseArray, data[col].values)
arr = arr.to_dense()
if _is_np_array_like(arr):
arr, _ = _ensure_np_dtype(arr, arr.dtype)
result.append(arr)
else:
result.append(oth_type(data[col]))
# FIXME(jiamingy): Investigate the possibility of using dataframe protocol or arrow
# IPC format for pandas so that we can apply the data transformation inside XGBoost
# for better memory efficiency.
return result
def _transform_pandas_df(
data: DataFrame,
enable_categorical: bool,
feature_names: Optional[FeatureNames] = None,
feature_types: Optional[FeatureTypes] = None,
meta: Optional[str] = None,
meta_type: Optional[NumpyDType] = None,
) -> Tuple[np.ndarray, Optional[FeatureNames], Optional[FeatureTypes]]:
pyarrow_extension = False
def pandas_check_dtypes(data: DataFrame, enable_categorical: bool) -> None:
"""Validate the input types, returns True if the dataframe is backed by arrow."""
sparse_extension = False
for dtype in data.dtypes:
if not (
(dtype.name in _pandas_dtype_mapper)
@@ -498,27 +536,65 @@ def _transform_pandas_df(
or is_pa_ext_dtype(dtype)
):
_invalid_dataframe_dtype(data)
if is_pa_ext_dtype(dtype):
pyarrow_extension = True
if is_pd_sparse_dtype(dtype):
sparse_extension = True
if sparse_extension:
warnings.warn("Sparse arrays from pandas are converted into dense.")
class PandasTransformed:
"""A storage class for transformed pandas DataFrame."""
def __init__(self, columns: List[np.ndarray]) -> None:
self.columns = columns
def array_interface(self) -> bytes:
"""Return a byte string for JSON encoded array interface."""
aitfs = list(map(_array_interface_dict, self.columns))
sarrays = bytes(json.dumps(aitfs), "utf-8")
return sarrays
@property
def shape(self) -> Tuple[int, int]:
"""Return shape of the transformed DataFrame."""
return self.columns[0].shape[0], len(self.columns)
def _transform_pandas_df(
data: DataFrame,
enable_categorical: bool,
feature_names: Optional[FeatureNames] = None,
feature_types: Optional[FeatureTypes] = None,
meta: Optional[str] = None,
) -> Tuple[PandasTransformed, Optional[FeatureNames], Optional[FeatureTypes]]:
pandas_check_dtypes(data, enable_categorical)
if meta and len(data.columns) > 1 and meta not in _matrix_meta:
raise ValueError(f"DataFrame for {meta} cannot have multiple columns")
feature_names, feature_types = pandas_feature_info(
data, meta, feature_names, feature_types, enable_categorical
)
transformed = pandas_cat_null(data)
if pyarrow_extension:
if transformed is data:
transformed = data.copy(deep=False)
transformed = pandas_ext_num_types(transformed)
arrays = pandas_transform_data(data)
return PandasTransformed(arrays), feature_names, feature_types
if meta and len(data.columns) > 1 and meta not in _matrix_meta:
raise ValueError(f"DataFrame for {meta} cannot have multiple columns")
dtype = meta_type if meta_type else np.float32
arr: np.ndarray = transformed.values
if meta_type:
arr = arr.astype(dtype)
return arr, feature_names, feature_types
def _meta_from_pandas_df(
data: DataType,
name: str,
dtype: Optional[NumpyDType],
handle: ctypes.c_void_p,
) -> None:
data, _, _ = _transform_pandas_df(data, False, meta=name)
if len(data.columns) == 1:
array = data.columns[0]
else:
array = np.stack(data.columns).T
array, dtype = _ensure_np_dtype(array, dtype)
_meta_from_numpy(array, name, dtype, handle)
def _from_pandas_df(
@@ -530,12 +606,21 @@ def _from_pandas_df(
feature_types: Optional[FeatureTypes],
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> DispatchedDataBackendReturnType:
data, feature_names, feature_types = _transform_pandas_df(
df, feature_names, feature_types = _transform_pandas_df(
data, enable_categorical, feature_names, feature_types
)
return _from_numpy_array(
data, missing, nthread, feature_names, feature_types, data_split_mode
handle = ctypes.c_void_p()
_check_call(
_LIB.XGDMatrixCreateFromColumnar(
df.array_interface(),
make_jcargs(
nthread=nthread, missing=missing, data_split_mode=data_split_mode
),
ctypes.byref(handle),
)
)
return handle, feature_names, feature_types
def _is_pandas_series(data: DataType) -> bool:
@@ -550,7 +635,12 @@ def _meta_from_pandas_series(
data: DataType, name: str, dtype: Optional[NumpyDType], handle: ctypes.c_void_p
) -> None:
"""Help transform pandas series for meta data like labels"""
data = data.values.astype("float")
if is_pd_sparse_dtype(data.dtype):
data = data.values.to_dense().astype(np.float32)
elif is_pa_ext_dtype(data.dtype):
data = pandas_pa_type(data)
else:
data = data.to_numpy(np.float32, na_value=np.nan)
if is_pd_sparse_dtype(getattr(data, "dtype", data)):
data = data.to_dense() # type: ignore
@@ -732,6 +822,8 @@ def _arrow_transform(data: DataType) -> Any:
return pd.ArrowDtype(pa.bool_())
return None
# For common cases, this is zero-copy, can check with:
# pa.total_allocated_bytes()
df = data.to_pandas(types_mapper=type_mapper)
return df
@@ -859,11 +951,10 @@ def _from_cudf_df(
)
interfaces_str = _cudf_array_interfaces(data, cat_codes)
handle = ctypes.c_void_p()
config = bytes(json.dumps({"missing": missing, "nthread": nthread}), "utf-8")
_check_call(
_LIB.XGDMatrixCreateFromCudaColumnar(
interfaces_str,
config,
make_jcargs(nthread=nthread, missing=missing),
ctypes.byref(handle),
)
)
@@ -1221,8 +1312,7 @@ def dispatch_meta_backend(
if _is_arrow(data):
data = _arrow_transform(data)
if _is_pandas_df(data):
data, _, _ = _transform_pandas_df(data, False, meta=name, meta_type=dtype)
_meta_from_numpy(data, name, dtype, handle)
_meta_from_pandas_df(data, name, dtype=dtype, handle=handle)
return
if _is_pandas_series(data):
_meta_from_pandas_series(data, name, dtype, handle)
@@ -1244,8 +1334,7 @@ def dispatch_meta_backend(
_meta_from_dt(data, name, dtype, handle)
return
if _is_modin_df(data):
data, _, _ = _transform_pandas_df(data, False, meta=name, meta_type=dtype)
_meta_from_numpy(data, name, dtype, handle)
_meta_from_pandas_df(data, name, dtype=dtype, handle=handle)
return
if _is_modin_series(data):
data = data.values.astype("float")
@@ -1317,11 +1406,10 @@ def _proxy_transform(
if _is_arrow(data):
data = _arrow_transform(data)
if _is_pandas_df(data):
arr, feature_names, feature_types = _transform_pandas_df(
df, feature_names, feature_types = _transform_pandas_df(
data, enable_categorical, feature_names, feature_types
)
arr, _ = _ensure_np_dtype(arr, arr.dtype)
return arr, None, feature_names, feature_types
return df, None, feature_names, feature_types
raise TypeError("Value type is not supported for data iterator:" + str(type(data)))
@@ -1356,6 +1444,9 @@ def dispatch_proxy_set_data(
if not allow_host:
raise err
if isinstance(data, PandasTransformed):
proxy._set_data_from_pandas(data) # pylint: disable=W0212
return
if _is_np_array_like(data):
_check_data_shape(data)
proxy._set_data_from_array(data) # pylint: disable=W0212