diff --git a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py index d2609748f..a28465f8b 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_streaming_dataset.py @@ -149,10 +149,11 @@ def _exists(self) -> bool: load_path, self._file_format ) except AnalysisException as exception: - if ( - exception.desc.startswith("Path does not exist:") - or "is not a Streaming data" in exception.desc - ): + # `AnalysisException.desc` is deprecated with pyspark >= 3.4 + message = ( + exception.desc if hasattr(exception, "desc") else exception.message + ) + if "Path does not exist:" in message or "is not a Streaming data" in message: return False raise return True diff --git a/kedro-datasets/tests/spark/test_deltatable_dataset.py b/kedro-datasets/tests/spark/test_deltatable_dataset.py index 8a79a04ab..e1de445e1 100644 --- a/kedro-datasets/tests/spark/test_deltatable_dataset.py +++ b/kedro-datasets/tests/spark/test_deltatable_dataset.py @@ -82,7 +82,6 @@ def test_exists_raises_error(self, mocker): with pytest.raises(DataSetError, match="Other Exception"): delta_ds.exists() - @pytest.mark.parametrize("is_async", [False, True]) def test_parallel_runner(self, is_async): """Test ParallelRunner with SparkDataSet fails.""" diff --git a/kedro-datasets/tests/spark/test_spark_streaming_dataset.py b/kedro-datasets/tests/spark/test_spark_streaming_dataset.py index c4fb6c005..660e43a82 100644 --- a/kedro-datasets/tests/spark/test_spark_streaming_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_streaming_dataset.py @@ -168,11 +168,16 @@ def test_exists_raises_error(self, mocker): # exists should raise all errors except for # AnalysisExceptions clearly indicating a missing file spark_data_set = SparkStreamingDataSet(filepath="") - mocker.patch.object( - spark_data_set, - "_get_spark", - side_effect=AnalysisException("Other Exception", []), - ) + if SPARK_VERSION.match(">=3.4.0"): + mocker.patch.object( + spark_data_set, "_get_spark", side_effect=AnalysisException("Other Exception") + ) + else: + mocker.patch.object( + spark_data_set, + "_get_spark", + side_effect=AnalysisException("Other Exception", []), + ) with pytest.raises(DataSetError, match="Other Exception"): spark_data_set.exists()