merge latest change from upstream

This commit is contained in:
Hui Liu
2024-04-22 09:35:31 -07:00
146 changed files with 3111 additions and 1027 deletions

View File

@@ -909,9 +909,19 @@ def _transform_cudf_df(
enable_categorical: bool,
) -> Tuple[ctypes.c_void_p, list, Optional[FeatureNames], Optional[FeatureTypes]]:
try:
from cudf.api.types import is_categorical_dtype
from cudf.api.types import is_bool_dtype, is_categorical_dtype
except ImportError:
from cudf.utils.dtypes import is_categorical_dtype
from pandas.api.types import is_bool_dtype
# Work around https://github.com/dmlc/xgboost/issues/10181
if _is_cudf_ser(data):
if is_bool_dtype(data.dtype):
data = data.astype(np.uint8)
else:
data = data.astype(
{col: np.uint8 for col in data.select_dtypes(include="bool")}
)
if _is_cudf_ser(data):
dtypes = [data.dtype]

View File

@@ -347,15 +347,14 @@ class _SparkXGBParams(
predict_params[param.name] = self.getOrDefault(param)
return predict_params
def _validate_gpu_params(self) -> None:
def _validate_gpu_params(
self, spark_version: str, conf: SparkConf, is_local: bool = False
) -> None:
"""Validate the gpu parameters and gpu configurations"""
if self._run_on_gpu():
ss = _get_spark_session()
sc = ss.sparkContext
if _is_local(sc):
# Support GPU training in Spark local mode is just for debugging
if is_local:
# Supporting GPU training in Spark local mode is just for debugging
# purposes, so it's okay for printing the below warning instead of
# checking the real gpu numbers and raising the exception.
get_logger(self.__class__.__name__).warning(
@@ -364,33 +363,41 @@ class _SparkXGBParams(
self.getOrDefault(self.num_workers),
)
else:
executor_gpus = sc.getConf().get("spark.executor.resource.gpu.amount")
executor_gpus = conf.get("spark.executor.resource.gpu.amount")
if executor_gpus is None:
raise ValueError(
"The `spark.executor.resource.gpu.amount` is required for training"
" on GPU."
)
if not (
ss.version >= "3.4.0"
and _is_standalone_or_localcluster(sc.getConf())
gpu_per_task = conf.get("spark.task.resource.gpu.amount")
if gpu_per_task is not None and float(gpu_per_task) > 1.0:
get_logger(self.__class__.__name__).warning(
"The configuration assigns %s GPUs to each Spark task, but each "
"XGBoost training task only utilizes 1 GPU, which will lead to "
"unnecessary GPU waste",
gpu_per_task,
)
# For 3.5.1+, Spark supports task stage-level scheduling for
# Yarn/K8s/Standalone/Local cluster
# From 3.4.0 ~ 3.5.0, Spark only supports task stage-level scheduing for
# Standalone/Local cluster
# For spark below 3.4.0, Task stage-level scheduling is not supported.
#
# With stage-level scheduling, spark.task.resource.gpu.amount is not required
# to be set explicitly. Or else, spark.task.resource.gpu.amount is a must-have and
# must be set to 1.0
if spark_version < "3.4.0" or (
"3.4.0" <= spark_version < "3.5.1"
and not _is_standalone_or_localcluster(conf)
):
# We will enable stage-level scheduling in spark 3.4.0+ which doesn't
# require spark.task.resource.gpu.amount to be set explicitly
gpu_per_task = sc.getConf().get("spark.task.resource.gpu.amount")
if gpu_per_task is not None:
if float(gpu_per_task) < 1.0:
raise ValueError(
"XGBoost doesn't support GPU fractional configurations. "
"Please set `spark.task.resource.gpu.amount=spark.executor"
".resource.gpu.amount`"
)
if float(gpu_per_task) > 1.0:
get_logger(self.__class__.__name__).warning(
"%s GPUs for each Spark task is configured, but each "
"XGBoost training task uses only 1 GPU.",
gpu_per_task,
"XGBoost doesn't support GPU fractional configurations. Please set "
"`spark.task.resource.gpu.amount=spark.executor.resource.gpu."
"amount`. To enable GPU fractional configurations, you can try "
"standalone/localcluster with spark 3.4.0+ and"
"YARN/K8S with spark 3.5.1+"
)
else:
raise ValueError(
@@ -475,7 +482,9 @@ class _SparkXGBParams(
"`pyspark.ml.linalg.Vector` type."
)
self._validate_gpu_params()
ss = _get_spark_session()
sc = ss.sparkContext
self._validate_gpu_params(ss.version, sc.getConf(), _is_local(sc))
def _run_on_gpu(self) -> bool:
"""If train or transform on the gpu according to the parameters"""
@@ -925,10 +934,14 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
)
return True
if not _is_standalone_or_localcluster(conf):
if (
"3.4.0" <= spark_version < "3.5.1"
and not _is_standalone_or_localcluster(conf)
):
self.logger.info(
"Stage-level scheduling in xgboost requires spark standalone or "
"local-cluster mode"
"For %s, Stage-level scheduling in xgboost requires spark standalone "
"or local-cluster mode",
spark_version,
)
return True
@@ -980,7 +993,9 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
"""Try to enable stage-level scheduling"""
ss = _get_spark_session()
conf = ss.sparkContext.getConf()
if self._skip_stage_level_scheduling(ss.version, conf):
if _is_local(ss.sparkContext) or self._skip_stage_level_scheduling(
ss.version, conf
):
return rdd
# executor_cores will not be None
@@ -1052,6 +1067,7 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
dev_ordinal = None
use_qdm = _can_use_qdm(booster_params.get("tree_method", None))
verbosity = booster_params.get("verbosity", 1)
msg = "Training on CPUs"
if run_on_gpu:
dev_ordinal = (
@@ -1089,15 +1105,16 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
evals_result: Dict[str, Any] = {}
with CommunicatorContext(context, **_rabit_args):
dtrain, dvalid = create_dmatrix_from_partitions(
pandas_df_iter,
feature_prop.features_cols_names,
dev_ordinal,
use_qdm,
dmatrix_kwargs,
enable_sparse_data_optim=feature_prop.enable_sparse_data_optim,
has_validation_col=feature_prop.has_validation_col,
)
with xgboost.config_context(verbosity=verbosity):
dtrain, dvalid = create_dmatrix_from_partitions(
pandas_df_iter,
feature_prop.features_cols_names,
dev_ordinal,
use_qdm,
dmatrix_kwargs,
enable_sparse_data_optim=feature_prop.enable_sparse_data_optim,
has_validation_col=feature_prop.has_validation_col,
)
if dvalid is not None:
dval = [(dtrain, "training"), (dvalid, "validation")]
else:

View File

@@ -14,7 +14,8 @@ import pyspark
from pyspark import BarrierTaskContext, SparkConf, SparkContext, SparkFiles, TaskContext
from pyspark.sql.session import SparkSession
from xgboost import Booster, XGBModel, collective
from xgboost import Booster, XGBModel
from xgboost.collective import CommunicatorContext as CCtx
from xgboost.tracker import RabitTracker
@@ -42,22 +43,12 @@ def _get_default_params_from_func(
return filtered_params_dict
class CommunicatorContext:
"""A context controlling collective communicator initialization and finalization.
This isn't specificially necessary (note Part 3), but it is more understandable
coding-wise.
"""
class CommunicatorContext(CCtx): # pylint: disable=too-few-public-methods
"""Context with PySpark specific task ID."""
def __init__(self, context: BarrierTaskContext, **args: Any) -> None:
self.args = args
self.args["DMLC_TASK_ID"] = str(context.partitionId())
def __enter__(self) -> None:
collective.init(**self.args)
def __exit__(self, *args: Any) -> None:
collective.finalize()
args["DMLC_TASK_ID"] = str(context.partitionId())
super().__init__(**args)
def _start_tracker(context: BarrierTaskContext, n_workers: int) -> Dict[str, Any]:

View File

@@ -429,8 +429,8 @@ def make_categorical(
categories = np.arange(0, n_categories)
for col in df.columns:
if rng.binomial(1, cat_ratio, size=1)[0] == 1:
df.loc[:, col] = df[col].astype("category")
df.loc[:, col] = df[col].cat.set_categories(categories)
df[col] = df[col].astype("category")
df[col] = df[col].cat.set_categories(categories)
if sparsity > 0.0:
for i in range(n_features):