From 8fb05c8c957817e730d311bc5f2a01dd3c5ea47c Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Sat, 20 Apr 2024 00:24:40 +0800 Subject: [PATCH] [pyspark] support stage-level for yarn/k8s (#10209) --- python-package/xgboost/spark/core.py | 73 ++++--- .../test_with_spark/test_spark_local.py | 206 ++++++++++++++---- 2 files changed, 213 insertions(+), 66 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 741adcb03..2f24effe5 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -347,15 +347,14 @@ class _SparkXGBParams( predict_params[param.name] = self.getOrDefault(param) return predict_params - def _validate_gpu_params(self) -> None: + def _validate_gpu_params( + self, spark_version: str, conf: SparkConf, is_local: bool = False + ) -> None: """Validate the gpu parameters and gpu configurations""" if self._run_on_gpu(): - ss = _get_spark_session() - sc = ss.sparkContext - - if _is_local(sc): - # Support GPU training in Spark local mode is just for debugging + if is_local: + # Supporting GPU training in Spark local mode is just for debugging # purposes, so it's okay for printing the below warning instead of # checking the real gpu numbers and raising the exception. get_logger(self.__class__.__name__).warning( @@ -364,33 +363,41 @@ class _SparkXGBParams( self.getOrDefault(self.num_workers), ) else: - executor_gpus = sc.getConf().get("spark.executor.resource.gpu.amount") + executor_gpus = conf.get("spark.executor.resource.gpu.amount") if executor_gpus is None: raise ValueError( "The `spark.executor.resource.gpu.amount` is required for training" " on GPU." ) - - if not ( - ss.version >= "3.4.0" - and _is_standalone_or_localcluster(sc.getConf()) + gpu_per_task = conf.get("spark.task.resource.gpu.amount") + if gpu_per_task is not None and float(gpu_per_task) > 1.0: + get_logger(self.__class__.__name__).warning( + "The configuration assigns %s GPUs to each Spark task, but each " + "XGBoost training task only utilizes 1 GPU, which will lead to " + "unnecessary GPU waste", + gpu_per_task, + ) + # For 3.5.1+, Spark supports task stage-level scheduling for + # Yarn/K8s/Standalone/Local cluster + # From 3.4.0 ~ 3.5.0, Spark only supports task stage-level scheduing for + # Standalone/Local cluster + # For spark below 3.4.0, Task stage-level scheduling is not supported. + # + # With stage-level scheduling, spark.task.resource.gpu.amount is not required + # to be set explicitly. Or else, spark.task.resource.gpu.amount is a must-have and + # must be set to 1.0 + if spark_version < "3.4.0" or ( + "3.4.0" <= spark_version < "3.5.1" + and not _is_standalone_or_localcluster(conf) ): - # We will enable stage-level scheduling in spark 3.4.0+ which doesn't - # require spark.task.resource.gpu.amount to be set explicitly - gpu_per_task = sc.getConf().get("spark.task.resource.gpu.amount") if gpu_per_task is not None: if float(gpu_per_task) < 1.0: raise ValueError( - "XGBoost doesn't support GPU fractional configurations. " - "Please set `spark.task.resource.gpu.amount=spark.executor" - ".resource.gpu.amount`" - ) - - if float(gpu_per_task) > 1.0: - get_logger(self.__class__.__name__).warning( - "%s GPUs for each Spark task is configured, but each " - "XGBoost training task uses only 1 GPU.", - gpu_per_task, + "XGBoost doesn't support GPU fractional configurations. Please set " + "`spark.task.resource.gpu.amount=spark.executor.resource.gpu." + "amount`. To enable GPU fractional configurations, you can try " + "standalone/localcluster with spark 3.4.0+ and" + "YARN/K8S with spark 3.5.1+" ) else: raise ValueError( @@ -475,7 +482,9 @@ class _SparkXGBParams( "`pyspark.ml.linalg.Vector` type." ) - self._validate_gpu_params() + ss = _get_spark_session() + sc = ss.sparkContext + self._validate_gpu_params(ss.version, sc.getConf(), _is_local(sc)) def _run_on_gpu(self) -> bool: """If train or transform on the gpu according to the parameters""" @@ -925,10 +934,14 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable): ) return True - if not _is_standalone_or_localcluster(conf): + if ( + "3.4.0" <= spark_version < "3.5.1" + and not _is_standalone_or_localcluster(conf) + ): self.logger.info( - "Stage-level scheduling in xgboost requires spark standalone or " - "local-cluster mode" + "For %s, Stage-level scheduling in xgboost requires spark standalone " + "or local-cluster mode", + spark_version, ) return True @@ -980,7 +993,9 @@ class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable): """Try to enable stage-level scheduling""" ss = _get_spark_session() conf = ss.sparkContext.getConf() - if self._skip_stage_level_scheduling(ss.version, conf): + if _is_local(ss.sparkContext) or self._skip_stage_level_scheduling( + ss.version, conf + ): return rdd # executor_cores will not be None diff --git a/tests/test_distributed/test_with_spark/test_spark_local.py b/tests/test_distributed/test_with_spark/test_spark_local.py index b8c16ef1c..ab983c920 100644 --- a/tests/test_distributed/test_with_spark/test_spark_local.py +++ b/tests/test_distributed/test_with_spark/test_spark_local.py @@ -929,8 +929,127 @@ class TestPySparkLocal: model_loaded.set_device("cuda") assert model_loaded._run_on_gpu() + def test_validate_gpu_params(self) -> None: + # Standalone + standalone_conf = ( + SparkConf() + .setMaster("spark://foo") + .set("spark.executor.cores", "12") + .set("spark.task.cpus", "1") + .set("spark.executor.resource.gpu.amount", "1") + .set("spark.task.resource.gpu.amount", "0.08") + ) + classifer_on_cpu = SparkXGBClassifier(use_gpu=False) + classifer_on_gpu = SparkXGBClassifier(use_gpu=True) + + # No exception for classifier on CPU + classifer_on_cpu._validate_gpu_params("3.4.0", standalone_conf) + + with pytest.raises( + ValueError, match="XGBoost doesn't support GPU fractional configurations" + ): + classifer_on_gpu._validate_gpu_params("3.3.0", standalone_conf) + + # No issues + classifer_on_gpu._validate_gpu_params("3.4.0", standalone_conf) + classifer_on_gpu._validate_gpu_params("3.4.1", standalone_conf) + classifer_on_gpu._validate_gpu_params("3.5.0", standalone_conf) + classifer_on_gpu._validate_gpu_params("3.5.1", standalone_conf) + + # no spark.executor.resource.gpu.amount + standalone_bad_conf = ( + SparkConf() + .setMaster("spark://foo") + .set("spark.executor.cores", "12") + .set("spark.task.cpus", "1") + .set("spark.task.resource.gpu.amount", "0.08") + ) + msg_match = ( + "The `spark.executor.resource.gpu.amount` is required for training on GPU" + ) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.3.0", standalone_bad_conf) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.4.0", standalone_bad_conf) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.4.1", standalone_bad_conf) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.5.0", standalone_bad_conf) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.5.1", standalone_bad_conf) + + standalone_bad_conf = ( + SparkConf() + .setMaster("spark://foo") + .set("spark.executor.cores", "12") + .set("spark.task.cpus", "1") + .set("spark.executor.resource.gpu.amount", "1") + ) + msg_match = ( + "The `spark.task.resource.gpu.amount` is required for training on GPU" + ) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.3.0", standalone_bad_conf) + + classifer_on_gpu._validate_gpu_params("3.4.0", standalone_bad_conf) + classifer_on_gpu._validate_gpu_params("3.5.0", standalone_bad_conf) + classifer_on_gpu._validate_gpu_params("3.5.1", standalone_bad_conf) + + # Yarn and K8s mode + for mode in ["yarn", "k8s://"]: + conf = ( + SparkConf() + .setMaster(mode) + .set("spark.executor.cores", "12") + .set("spark.task.cpus", "1") + .set("spark.executor.resource.gpu.amount", "1") + .set("spark.task.resource.gpu.amount", "0.08") + ) + with pytest.raises( + ValueError, + match="XGBoost doesn't support GPU fractional configurations", + ): + classifer_on_gpu._validate_gpu_params("3.3.0", conf) + with pytest.raises( + ValueError, + match="XGBoost doesn't support GPU fractional configurations", + ): + classifer_on_gpu._validate_gpu_params("3.4.0", conf) + with pytest.raises( + ValueError, + match="XGBoost doesn't support GPU fractional configurations", + ): + classifer_on_gpu._validate_gpu_params("3.4.1", conf) + with pytest.raises( + ValueError, + match="XGBoost doesn't support GPU fractional configurations", + ): + classifer_on_gpu._validate_gpu_params("3.5.0", conf) + + classifer_on_gpu._validate_gpu_params("3.5.1", conf) + + for mode in ["yarn", "k8s://"]: + bad_conf = ( + SparkConf() + .setMaster(mode) + .set("spark.executor.cores", "12") + .set("spark.task.cpus", "1") + .set("spark.executor.resource.gpu.amount", "1") + ) + msg_match = ( + "The `spark.task.resource.gpu.amount` is required for training on GPU" + ) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.3.0", bad_conf) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.4.0", bad_conf) + with pytest.raises(ValueError, match=msg_match): + classifer_on_gpu._validate_gpu_params("3.5.0", bad_conf) + + classifer_on_gpu._validate_gpu_params("3.5.1", bad_conf) + def test_skip_stage_level_scheduling(self) -> None: - conf = ( + standalone_conf = ( SparkConf() .setMaster("spark://foo") .set("spark.executor.cores", "12") @@ -943,26 +1062,36 @@ class TestPySparkLocal: classifer_on_gpu = SparkXGBClassifier(use_gpu=True) # the correct configurations should not skip stage-level scheduling - assert not classifer_on_gpu._skip_stage_level_scheduling("3.4.0", conf) + assert not classifer_on_gpu._skip_stage_level_scheduling( + "3.4.0", standalone_conf + ) + assert not classifer_on_gpu._skip_stage_level_scheduling( + "3.4.1", standalone_conf + ) + assert not classifer_on_gpu._skip_stage_level_scheduling( + "3.5.0", standalone_conf + ) + assert not classifer_on_gpu._skip_stage_level_scheduling( + "3.5.1", standalone_conf + ) # spark version < 3.4.0 - assert classifer_on_gpu._skip_stage_level_scheduling("3.3.0", conf) - + assert classifer_on_gpu._skip_stage_level_scheduling("3.3.0", standalone_conf) # not run on GPU - assert classifer_on_cpu._skip_stage_level_scheduling("3.4.0", conf) + assert classifer_on_cpu._skip_stage_level_scheduling("3.4.0", standalone_conf) # spark.executor.cores is not set - badConf = ( + bad_conf = ( SparkConf() .setMaster("spark://foo") .set("spark.task.cpus", "1") .set("spark.executor.resource.gpu.amount", "1") .set("spark.task.resource.gpu.amount", "0.08") ) - assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf) # spark.executor.cores=1 - badConf = ( + bad_conf = ( SparkConf() .setMaster("spark://foo") .set("spark.executor.cores", "1") @@ -970,20 +1099,20 @@ class TestPySparkLocal: .set("spark.executor.resource.gpu.amount", "1") .set("spark.task.resource.gpu.amount", "0.08") ) - assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf) # spark.executor.resource.gpu.amount is not set - badConf = ( + bad_conf = ( SparkConf() .setMaster("spark://foo") .set("spark.executor.cores", "12") .set("spark.task.cpus", "1") .set("spark.task.resource.gpu.amount", "0.08") ) - assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf) # spark.executor.resource.gpu.amount>1 - badConf = ( + bad_conf = ( SparkConf() .setMaster("spark://foo") .set("spark.executor.cores", "12") @@ -991,20 +1120,20 @@ class TestPySparkLocal: .set("spark.executor.resource.gpu.amount", "2") .set("spark.task.resource.gpu.amount", "0.08") ) - assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf) # spark.task.resource.gpu.amount is not set - badConf = ( + bad_conf = ( SparkConf() .setMaster("spark://foo") .set("spark.executor.cores", "12") .set("spark.task.cpus", "1") .set("spark.executor.resource.gpu.amount", "1") ) - assert not classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + assert not classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf) # spark.task.resource.gpu.amount=1 - badConf = ( + bad_conf = ( SparkConf() .setMaster("spark://foo") .set("spark.executor.cores", "12") @@ -1012,29 +1141,32 @@ class TestPySparkLocal: .set("spark.executor.resource.gpu.amount", "1") .set("spark.task.resource.gpu.amount", "1") ) - assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", bad_conf) - # yarn - badConf = ( - SparkConf() - .setMaster("yarn") - .set("spark.executor.cores", "12") - .set("spark.task.cpus", "1") - .set("spark.executor.resource.gpu.amount", "1") - .set("spark.task.resource.gpu.amount", "1") - ) - assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + # For Yarn and K8S + for mode in ["yarn", "k8s://"]: + for gpu_amount in ["0.08", "0.2", "1.0"]: + conf = ( + SparkConf() + .setMaster(mode) + .set("spark.executor.cores", "12") + .set("spark.task.cpus", "1") + .set("spark.executor.resource.gpu.amount", "1") + .set("spark.task.resource.gpu.amount", gpu_amount) + ) + assert classifer_on_gpu._skip_stage_level_scheduling("3.3.0", conf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", conf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.4.1", conf) + assert classifer_on_gpu._skip_stage_level_scheduling("3.5.0", conf) - # k8s - badConf = ( - SparkConf() - .setMaster("k8s://") - .set("spark.executor.cores", "12") - .set("spark.task.cpus", "1") - .set("spark.executor.resource.gpu.amount", "1") - .set("spark.task.resource.gpu.amount", "1") - ) - assert classifer_on_gpu._skip_stage_level_scheduling("3.4.0", badConf) + # This will be fixed when spark 4.0.0 is released. + if gpu_amount == "1.0": + assert classifer_on_gpu._skip_stage_level_scheduling("3.5.1", conf) + else: + # Starting from 3.5.1+, stage-level scheduling is working for Yarn and K8s + assert not classifer_on_gpu._skip_stage_level_scheduling( + "3.5.1", conf + ) class XgboostLocalTest(SparkTestCase):