Skip to content

Commit

Permalink
Fix streaming dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
SajidAlamQB committed Aug 16, 2023
1 parent 7e06695 commit a6867c7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,11 @@ def _exists(self) -> bool:
load_path, self._file_format
)
except AnalysisException as exception:
if (
exception.desc.startswith("Path does not exist:")
or "is not a Streaming data" in exception.desc
):
# `AnalysisException.desc` is deprecated with pyspark >= 3.4
message = (
exception.desc if hasattr(exception, "desc") else exception.message
)
if "Path does not exist:" in message or "is not a Streaming data" in message:
return False
raise
return True
1 change: 0 additions & 1 deletion kedro-datasets/tests/spark/test_deltatable_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def test_exists_raises_error(self, mocker):
with pytest.raises(DataSetError, match="Other Exception"):
delta_ds.exists()


@pytest.mark.parametrize("is_async", [False, True])
def test_parallel_runner(self, is_async):
"""Test ParallelRunner with SparkDataSet fails."""
Expand Down
15 changes: 10 additions & 5 deletions kedro-datasets/tests/spark/test_spark_streaming_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,16 @@ def test_exists_raises_error(self, mocker):
# exists should raise all errors except for
# AnalysisExceptions clearly indicating a missing file
spark_data_set = SparkStreamingDataSet(filepath="")
mocker.patch.object(
spark_data_set,
"_get_spark",
side_effect=AnalysisException("Other Exception", []),
)

if SPARK_VERSION.match(">=3.4.0"):
mocker.patch.object(
spark_data_set, "_get_spark", side_effect=AnalysisException("Other Exception")
)
else:
mocker.patch.object(
spark_data_set,
"_get_spark",
side_effect=AnalysisException("Other Exception", []),
)
with pytest.raises(DataSetError, match="Other Exception"):
spark_data_set.exists()

0 comments on commit a6867c7

Please sign in to comment.