From b97331de027abacb516d73ed759bb9a52dadbff5 Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 17 Oct 2023 15:19:43 +0100 Subject: [PATCH 01/12] remove partitioned and incremental Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/kedro.io.rst | 2 - kedro/io/__init__.py | 6 - kedro/io/partitioned_dataset.py | 552 -------------------------- tests/io/test_data_catalog.py | 5 +- tests/io/test_incremental_dataset.py | 505 ------------------------ tests/io/test_partitioned_dataset.py | 553 --------------------------- 6 files changed, 3 insertions(+), 1620 deletions(-) delete mode 100644 kedro/io/partitioned_dataset.py delete mode 100644 tests/io/test_incremental_dataset.py delete mode 100644 tests/io/test_partitioned_dataset.py diff --git a/docs/source/kedro.io.rst b/docs/source/kedro.io.rst index e86af85047..4d9b59138e 100644 --- a/docs/source/kedro.io.rst +++ b/docs/source/kedro.io.rst @@ -15,10 +15,8 @@ kedro.io kedro.io.AbstractVersionedDataset kedro.io.CachedDataset kedro.io.DataCatalog - kedro.io.IncrementalDataset kedro.io.LambdaDataset kedro.io.MemoryDataset - kedro.io.PartitionedDataset kedro.io.Version .. rubric:: Exceptions diff --git a/kedro/io/__init__.py b/kedro/io/__init__.py index ad1fc1f99f..32b7b86fcb 100644 --- a/kedro/io/__init__.py +++ b/kedro/io/__init__.py @@ -15,10 +15,6 @@ from .data_catalog import DataCatalog from .lambda_dataset import LambdaDataset from .memory_dataset import MemoryDataset -from .partitioned_dataset import ( - IncrementalDataset, - PartitionedDataset, -) __all__ = [ "AbstractDataset", @@ -28,9 +24,7 @@ "DatasetAlreadyExistsError", "DatasetError", "DatasetNotFoundError", - "IncrementalDataset", "LambdaDataset", "MemoryDataset", - "PartitionedDataset", "Version", ] diff --git a/kedro/io/partitioned_dataset.py b/kedro/io/partitioned_dataset.py deleted file mode 100644 index 08a0a8569b..0000000000 --- a/kedro/io/partitioned_dataset.py +++ /dev/null @@ -1,552 +0,0 @@ -"""``PartitionedDataset`` loads and saves partitioned file-like data using the -underlying dataset definition. It also uses `fsspec` for filesystem level operations. -""" -from __future__ import annotations - -import operator -import warnings -from copy import deepcopy -from typing import Any, Callable -from urllib.parse import urlparse - -from cachetools import Cache, cachedmethod - -from kedro.io.core import ( - VERSION_KEY, - VERSIONED_FLAG_KEY, - AbstractDataset, - DatasetError, - parse_dataset_definition, -) -from kedro.io.data_catalog import CREDENTIALS_KEY -from kedro.utils import load_obj - -DATASET_CREDENTIALS_KEY = "dataset_credentials" -CHECKPOINT_CREDENTIALS_KEY = "checkpoint_credentials" - -KEY_PROPAGATION_WARNING = ( - "Top-level %(keys)s will not propagate into the %(target)s since " - "%(keys)s were explicitly defined in the %(target)s config." -) - -S3_PROTOCOLS = ("s3", "s3a", "s3n") - - -class PartitionedDataset(AbstractDataset): - # noqa: too-many-instance-attributes,protected-access - """``PartitionedDataset`` loads and saves partitioned file-like data using the - underlying dataset definition. For filesystem level operations it uses `fsspec`: - https://github.com/intake/filesystem_spec. - - It also supports advanced features like - `lazy saving `_. - - Example usage for the - `YAML API `_: - - - .. code-block:: yaml - - station_data: - type: PartitionedDataset - path: data/03_primary/station_data - dataset: - type: pandas.CSVDataset - load_args: - sep: '\\t' - save_args: - sep: '\\t' - index: true - filename_suffix: '.dat' - - Example usage for the - `Python API `_: - :: - - >>> import pandas as pd - >>> from kedro.io import PartitionedDataset - >>> - >>> # Create a fake pandas dataframe with 10 rows of data - >>> df = pd.DataFrame([{"DAY_OF_MONTH": str(i), "VALUE": i} for i in range(1, 11)]) - >>> - >>> # Convert it to a dict of pd.DataFrame with DAY_OF_MONTH as the dict key - >>> dict_df = { - day_of_month: df[df["DAY_OF_MONTH"] == day_of_month] - for day_of_month in df["DAY_OF_MONTH"] - } - >>> - >>> # Save it as small paritions with DAY_OF_MONTH as the partition key - >>> dataset = PartitionedDataset( - path="df_with_partition", - dataset="pandas.CSVDataset", - filename_suffix=".csv" - ) - >>> # This will create a folder `df_with_partition` and save multiple files - >>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc. - >>> dataset.save(dict_df) - >>> - >>> # This will create lazy load functions instead of loading data into memory immediately. - >>> loaded = dataset.load() - >>> - >>> # Load all the partitions - >>> for partition_id, partition_load_func in loaded.items(): - # The actual function that loads the data - partition_data = partition_load_func() - >>> - >>> # Add the processing logic for individual partition HERE - >>> print(partition_data) - - You can also load multiple partitions from a remote storage and combine them - like this: - :: - - >>> import pandas as pd - >>> from kedro.io import PartitionedDataset - >>> - >>> # these credentials will be passed to both 'fsspec.filesystem()' call - >>> # and the dataset initializer - >>> credentials = {"key1": "secret1", "key2": "secret2"} - >>> - >>> dataset = PartitionedDataset( - path="s3://bucket-name/path/to/folder", - dataset="pandas.CSVDataset", - credentials=credentials - ) - >>> loaded = dataset.load() - >>> # assert isinstance(loaded, dict) - >>> - >>> combine_all = pd.DataFrame() - >>> - >>> for partition_id, partition_load_func in loaded.items(): - partition_data = partition_load_func() - combine_all = pd.concat( - [combine_all, partition_data], ignore_index=True, sort=True - ) - >>> - >>> new_data = pd.DataFrame({"new": [1, 2]}) - >>> # creates "s3://bucket-name/path/to/folder/new/partition.csv" - >>> dataset.save({"new/partition.csv": new_data}) - - """ - - def __init__( # noqa: too-many-arguments - self, - path: str, - dataset: str | type[AbstractDataset] | dict[str, Any], - filepath_arg: str = "filepath", - filename_suffix: str = "", - credentials: dict[str, Any] = None, - load_args: dict[str, Any] = None, - fs_args: dict[str, Any] = None, - overwrite: bool = False, - metadata: dict[str, Any] = None, - ): - """Creates a new instance of ``PartitionedDataset``. - - Args: - path: Path to the folder containing partitioned data. - If path starts with the protocol (e.g., ``s3://``) then the - corresponding ``fsspec`` concrete filesystem implementation will - be used. If protocol is not specified, - ``fsspec.implementations.local.LocalFileSystem`` will be used. - **Note:** Some concrete implementations are bundled with ``fsspec``, - while others (like ``s3`` or ``gcs``) must be installed separately - prior to usage of the ``PartitionedDataset``. - dataset: Underlying dataset definition. This is used to instantiate - the dataset for each file located inside the ``path``. - Accepted formats are: - a) object of a class that inherits from ``AbstractDataset`` - b) a string representing a fully qualified class name to such class - c) a dictionary with ``type`` key pointing to a string from b), - other keys are passed to the Dataset initializer. - Credentials for the dataset can be explicitly specified in - this configuration. - filepath_arg: Underlying dataset initializer argument that will - contain a path to each corresponding partition file. - If unspecified, defaults to "filepath". - filename_suffix: If specified, only partitions that end with this - string will be processed. - credentials: Protocol-specific options that will be passed to - ``fsspec.filesystem`` - https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem - and the dataset initializer. If the dataset config contains - explicit credentials spec, then such spec will take precedence. - All possible credentials management scenarios are documented here: - https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#partitioned-dataset-credentials - load_args: Keyword arguments to be passed into ``find()`` method of - the filesystem implementation. - fs_args: Extra arguments to pass into underlying filesystem class constructor - (e.g. `{"project": "my-project"}` for ``GCSFileSystem``) - overwrite: If True, any existing partitions will be removed. - metadata: Any arbitrary metadata. - This is ignored by Kedro, but may be consumed by users or external plugins. - - Raises: - DatasetError: If versioning is enabled for the underlying dataset. - """ - # noqa: import-outside-toplevel - from fsspec.utils import infer_storage_options # for performance reasons - - super().__init__() - - self._path = path - self._filename_suffix = filename_suffix - self._overwrite = overwrite - self._protocol = infer_storage_options(self._path)["protocol"] - self._partition_cache: Cache = Cache(maxsize=1) - self.metadata = metadata - - dataset = dataset if isinstance(dataset, dict) else {"type": dataset} - self._dataset_type, self._dataset_config = parse_dataset_definition(dataset) - if VERSION_KEY in self._dataset_config: - raise DatasetError( - f"'{self.__class__.__name__}' does not support versioning of the " - f"underlying dataset. Please remove '{VERSIONED_FLAG_KEY}' flag from " - f"the dataset definition." - ) - - if credentials: - if CREDENTIALS_KEY in self._dataset_config: - self._logger.warning( - KEY_PROPAGATION_WARNING, - {"keys": CREDENTIALS_KEY, "target": "underlying dataset"}, - ) - else: - self._dataset_config[CREDENTIALS_KEY] = deepcopy(credentials) - - self._credentials = deepcopy(credentials) or {} - - self._fs_args = deepcopy(fs_args) or {} - if self._fs_args: - if "fs_args" in self._dataset_config: - self._logger.warning( - KEY_PROPAGATION_WARNING, - {"keys": "filesystem arguments", "target": "underlying dataset"}, - ) - else: - self._dataset_config["fs_args"] = deepcopy(self._fs_args) - - self._filepath_arg = filepath_arg - if self._filepath_arg in self._dataset_config: - warnings.warn( - f"'{self._filepath_arg}' key must not be specified in the dataset " - f"definition as it will be overwritten by partition path" - ) - - self._load_args = deepcopy(load_args) or {} - self._sep = self._filesystem.sep - # since some filesystem implementations may implement a global cache - self._invalidate_caches() - - @property - def _filesystem(self): - # for performance reasons - import fsspec # noqa: import-outside-toplevel - - protocol = "s3" if self._protocol in S3_PROTOCOLS else self._protocol - return fsspec.filesystem(protocol, **self._credentials, **self._fs_args) - - @property - def _normalized_path(self) -> str: - if self._protocol in S3_PROTOCOLS: - return urlparse(self._path)._replace(scheme="s3").geturl() - return self._path - - @cachedmethod(cache=operator.attrgetter("_partition_cache")) - def _list_partitions(self) -> list[str]: - return [ - path - for path in self._filesystem.find(self._normalized_path, **self._load_args) - if path.endswith(self._filename_suffix) - ] - - def _join_protocol(self, path: str) -> str: - protocol_prefix = f"{self._protocol}://" - if self._path.startswith(protocol_prefix) and not path.startswith( - protocol_prefix - ): - return f"{protocol_prefix}{path}" - return path - - def _partition_to_path(self, path: str): - dir_path = self._path.rstrip(self._sep) - path = path.lstrip(self._sep) - full_path = self._sep.join([dir_path, path]) + self._filename_suffix - return full_path - - def _path_to_partition(self, path: str) -> str: - dir_path = self._filesystem._strip_protocol(self._normalized_path) - path = path.split(dir_path, 1).pop().lstrip(self._sep) - if self._filename_suffix and path.endswith(self._filename_suffix): - path = path[: -len(self._filename_suffix)] - return path - - def _load(self) -> dict[str, Callable[[], Any]]: - partitions = {} - - for partition in self._list_partitions(): - kwargs = deepcopy(self._dataset_config) - # join the protocol back since PySpark may rely on it - kwargs[self._filepath_arg] = self._join_protocol(partition) - dataset = self._dataset_type(**kwargs) # type: ignore - partition_id = self._path_to_partition(partition) - partitions[partition_id] = dataset.load - - if not partitions: - raise DatasetError(f"No partitions found in '{self._path}'") - - return partitions - - def _save(self, data: dict[str, Any]) -> None: - if self._overwrite and self._filesystem.exists(self._normalized_path): - self._filesystem.rm(self._normalized_path, recursive=True) - - for partition_id, partition_data in sorted(data.items()): - kwargs = deepcopy(self._dataset_config) - partition = self._partition_to_path(partition_id) - # join the protocol back since tools like PySpark may rely on it - kwargs[self._filepath_arg] = self._join_protocol(partition) - dataset = self._dataset_type(**kwargs) # type: ignore - if callable(partition_data): - partition_data = partition_data() # noqa: redefined-loop-name - dataset.save(partition_data) - self._invalidate_caches() - - def _describe(self) -> dict[str, Any]: - clean_dataset_config = ( - {k: v for k, v in self._dataset_config.items() if k != CREDENTIALS_KEY} - if isinstance(self._dataset_config, dict) - else self._dataset_config - ) - return { - "path": self._path, - "dataset_type": self._dataset_type.__name__, - "dataset_config": clean_dataset_config, - } - - def _invalidate_caches(self): - self._partition_cache.clear() - self._filesystem.invalidate_cache(self._normalized_path) - - def _exists(self) -> bool: - return bool(self._list_partitions()) - - def _release(self) -> None: - super()._release() - self._invalidate_caches() - - -class IncrementalDataset(PartitionedDataset): - """``IncrementalDataset`` inherits from ``PartitionedDataset``, which loads - and saves partitioned file-like data using the underlying dataset - definition. For filesystem level operations it uses `fsspec`: - https://github.com/intake/filesystem_spec. ``IncrementalDataset`` also stores - the information about the last processed partition in so-called `checkpoint` - that is persisted to the location of the data partitions by default, so that - subsequent pipeline run loads only new partitions past the checkpoint. - - Example: - :: - - >>> from kedro.io import IncrementalDataset - >>> - >>> # these credentials will be passed to: - >>> # a) 'fsspec.filesystem()' call, - >>> # b) the dataset initializer, - >>> # c) the checkpoint initializer - >>> credentials = {"key1": "secret1", "key2": "secret2"} - >>> - >>> dataset = IncrementalDataset( - >>> path="s3://bucket-name/path/to/folder", - >>> dataset="pandas.CSVDataset", - >>> credentials=credentials - >>> ) - >>> loaded = dataset.load() # loads all available partitions - >>> # assert isinstance(loaded, dict) - >>> - >>> dataset.confirm() # update checkpoint value to the last processed partition ID - >>> reloaded = dataset.load() # still loads all available partitions - >>> - >>> dataset.release() # clears load cache - >>> # returns an empty dictionary as no new partitions were added - >>> dataset.load() - """ - - DEFAULT_CHECKPOINT_TYPE = "kedro_datasets.text.TextDataset" - DEFAULT_CHECKPOINT_FILENAME = "CHECKPOINT" - - def __init__( # noqa: too-many-arguments - self, - path: str, - dataset: str | type[AbstractDataset] | dict[str, Any], - checkpoint: str | dict[str, Any] | None = None, - filepath_arg: str = "filepath", - filename_suffix: str = "", - credentials: dict[str, Any] = None, - load_args: dict[str, Any] = None, - fs_args: dict[str, Any] = None, - metadata: dict[str, Any] = None, - ): - - """Creates a new instance of ``IncrementalDataset``. - - Args: - path: Path to the folder containing partitioned data. - If path starts with the protocol (e.g., ``s3://``) then the - corresponding ``fsspec`` concrete filesystem implementation will - be used. If protocol is not specified, - ``fsspec.implementations.local.LocalFileSystem`` will be used. - **Note:** Some concrete implementations are bundled with ``fsspec``, - while others (like ``s3`` or ``gcs``) must be installed separately - prior to usage of the ``PartitionedDataset``. - dataset: Underlying dataset definition. This is used to instantiate - the dataset for each file located inside the ``path``. - Accepted formats are: - a) object of a class that inherits from ``AbstractDataset`` - b) a string representing a fully qualified class name to such class - c) a dictionary with ``type`` key pointing to a string from b), - other keys are passed to the Dataset initializer. - Credentials for the dataset can be explicitly specified in - this configuration. - checkpoint: Optional checkpoint configuration. Accepts a dictionary - with the corresponding dataset definition including ``filepath`` - (unlike ``dataset`` argument). Checkpoint configuration is - described here: - https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#checkpoint-configuration - Credentials for the checkpoint can be explicitly specified - in this configuration. - filepath_arg: Underlying dataset initializer argument that will - contain a path to each corresponding partition file. - If unspecified, defaults to "filepath". - filename_suffix: If specified, only partitions that end with this - string will be processed. - credentials: Protocol-specific options that will be passed to - ``fsspec.filesystem`` - https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem, - the dataset dataset initializer and the checkpoint. If - the dataset or the checkpoint configuration contains explicit - credentials spec, then such spec will take precedence. - All possible credentials management scenarios are documented here: - https://docs.kedro.org/en/stable/data/partitioned_and_incremental_datasets.html#checkpoint-configuration - load_args: Keyword arguments to be passed into ``find()`` method of - the filesystem implementation. - fs_args: Extra arguments to pass into underlying filesystem class constructor - (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). - metadata: Any arbitrary metadata. - This is ignored by Kedro, but may be consumed by users or external plugins. - - Raises: - DatasetError: If versioning is enabled for the underlying dataset. - """ - - super().__init__( - path=path, - dataset=dataset, - filepath_arg=filepath_arg, - filename_suffix=filename_suffix, - credentials=credentials, - load_args=load_args, - fs_args=fs_args, - ) - - self._checkpoint_config = self._parse_checkpoint_config(checkpoint) - self._force_checkpoint = self._checkpoint_config.pop("force_checkpoint", None) - self.metadata = metadata - - comparison_func = self._checkpoint_config.pop("comparison_func", operator.gt) - if isinstance(comparison_func, str): - comparison_func = load_obj(comparison_func) - self._comparison_func = comparison_func - - def _parse_checkpoint_config( - self, checkpoint_config: str | dict[str, Any] | None - ) -> dict[str, Any]: - checkpoint_config = deepcopy(checkpoint_config) - if isinstance(checkpoint_config, str): - checkpoint_config = {"force_checkpoint": checkpoint_config} - checkpoint_config = checkpoint_config or {} - - for key in {VERSION_KEY, VERSIONED_FLAG_KEY} & checkpoint_config.keys(): - raise DatasetError( - f"'{self.__class__.__name__}' does not support versioning of the " - f"checkpoint. Please remove '{key}' key from the checkpoint definition." - ) - - default_checkpoint_path = self._sep.join( - [self._normalized_path.rstrip(self._sep), self.DEFAULT_CHECKPOINT_FILENAME] - ) - default_config = { - "type": self.DEFAULT_CHECKPOINT_TYPE, - self._filepath_arg: default_checkpoint_path, - } - if self._credentials: - default_config[CREDENTIALS_KEY] = deepcopy(self._credentials) - - if CREDENTIALS_KEY in default_config.keys() & checkpoint_config.keys(): - self._logger.warning( - KEY_PROPAGATION_WARNING, - {"keys": CREDENTIALS_KEY, "target": "checkpoint"}, - ) - - return {**default_config, **checkpoint_config} - - @cachedmethod(cache=operator.attrgetter("_partition_cache")) - def _list_partitions(self) -> list[str]: - checkpoint = self._read_checkpoint() - checkpoint_path = self._filesystem._strip_protocol( # noqa: protected-access - self._checkpoint_config[self._filepath_arg] - ) - - def _is_valid_partition(partition) -> bool: - if not partition.endswith(self._filename_suffix): - return False - if partition == checkpoint_path: - return False - if checkpoint is None: - # nothing was processed yet - return True - partition_id = self._path_to_partition(partition) - return self._comparison_func(partition_id, checkpoint) - - return sorted( - part - for part in self._filesystem.find(self._normalized_path, **self._load_args) - if _is_valid_partition(part) - ) - - @property - def _checkpoint(self) -> AbstractDataset: - type_, kwargs = parse_dataset_definition(self._checkpoint_config) - return type_(**kwargs) # type: ignore - - def _read_checkpoint(self) -> str | None: - if self._force_checkpoint is not None: - return self._force_checkpoint - try: - return self._checkpoint.load() - except DatasetError: - return None - - def _load(self) -> dict[str, Callable[[], Any]]: - partitions = {} - - for partition in self._list_partitions(): - partition_id = self._path_to_partition(partition) - kwargs = deepcopy(self._dataset_config) - # join the protocol back since PySpark may rely on it - kwargs[self._filepath_arg] = self._join_protocol(partition) - partitions[partition_id] = self._dataset_type( # type: ignore - **kwargs - ).load() - - return partitions - - def confirm(self) -> None: - """Confirm the dataset by updating the checkpoint value to the latest - processed partition ID""" - partition_ids = [self._path_to_partition(p) for p in self._list_partitions()] - if partition_ids: - self._checkpoint.save(partition_ids[-1]) # checkpoint to last partition diff --git a/tests/io/test_data_catalog.py b/tests/io/test_data_catalog.py index afd1707bea..00228c4c53 100644 --- a/tests/io/test_data_catalog.py +++ b/tests/io/test_data_catalog.py @@ -8,6 +8,7 @@ import pandas as pd import pytest from kedro_datasets.pandas import CSVDataset, ParquetDataset +from kedro_datasets.partitions import IncrementalDataset from pandas.testing import assert_frame_equal from kedro.io import ( @@ -594,10 +595,10 @@ def test_error_dataset_init(self, bad_config): def test_confirm(self, tmp_path, caplog, mocker): """Confirm the dataset""" with caplog.at_level(logging.INFO): - mock_confirm = mocker.patch("kedro.io.IncrementalDataset.confirm") + mock_confirm = mocker.patch("kedro_datasets.partitions.incremental_dataset.IncrementalDataset.confirm") catalog = { "ds_to_confirm": { - "type": "IncrementalDataset", + "type": "kedro_datasets.partitions.incremental_dataset.IncrementalDataset", "dataset": "pandas.CSVDataset", "path": str(tmp_path), } diff --git a/tests/io/test_incremental_dataset.py b/tests/io/test_incremental_dataset.py deleted file mode 100644 index c36c6b62f9..0000000000 --- a/tests/io/test_incremental_dataset.py +++ /dev/null @@ -1,505 +0,0 @@ -from __future__ import annotations - -import os -import re -from pathlib import Path -from typing import Any - -import boto3 -import pandas as pd -import pytest -from kedro_datasets.pickle import PickleDataset -from kedro_datasets.text import TextDataset -from moto import mock_s3 -from pandas.testing import assert_frame_equal - -from kedro.io import AbstractDataset, DatasetError, IncrementalDataset -from kedro.io.data_catalog import CREDENTIALS_KEY - -DATASET = "kedro_datasets.pandas.CSVDataset" - - -@pytest.fixture -def partitioned_data_pandas(): - return { - f"p{counter:02d}/data.csv": pd.DataFrame( - {"part": counter, "col": list(range(counter + 1))} - ) - for counter in range(5) - } - - -@pytest.fixture -def local_csvs(tmp_path, partitioned_data_pandas): - local_dir = Path(tmp_path / "csvs") - local_dir.mkdir() - - for k, data in partitioned_data_pandas.items(): - path = local_dir / k - path.parent.mkdir(parents=True) - data.to_csv(str(path), index=False) - return local_dir - - -class DummyDataset(AbstractDataset): # pragma: no cover - def __init__(self, filepath): - pass - - def _describe(self) -> dict[str, Any]: - return {"dummy": True} - - def _load(self) -> Any: - pass - - def _save(self, data: Any) -> None: - pass - - -def dummy_gt_func(value1: str, value2: str): - return value1 > value2 - - -def dummy_lt_func(value1: str, value2: str): - return value1 < value2 - - -class TestIncrementalDatasetLocal: - def test_load_and_confirm(self, local_csvs, partitioned_data_pandas): - """Test the standard flow for loading, confirming and reloading - an IncrementalDataset""" - pds = IncrementalDataset(str(local_csvs), DATASET) - loaded = pds.load() - assert loaded.keys() == partitioned_data_pandas.keys() - for partition_id, data in loaded.items(): - assert_frame_equal(data, partitioned_data_pandas[partition_id]) - - checkpoint_path = local_csvs / pds.DEFAULT_CHECKPOINT_FILENAME - assert not checkpoint_path.exists() - pds.confirm() - assert checkpoint_path.is_file() - assert checkpoint_path.read_text() == pds._read_checkpoint() == "p04/data.csv" - - reloaded = pds.load() - assert reloaded.keys() == loaded.keys() - - pds.release() - reloaded_after_release = pds.load() - assert reloaded_after_release == {} - - def test_save(self, local_csvs): - """Test saving a new partition into an IncrementalDataset""" - df = pd.DataFrame({"dummy": [1, 2, 3]}) - new_partition_key = "p05/data.csv" - new_partition_path = local_csvs / new_partition_key - pds = IncrementalDataset(str(local_csvs), DATASET) - - assert not new_partition_path.exists() - assert new_partition_key not in pds.load() - - pds.save({new_partition_key: df}) - assert new_partition_path.exists() - loaded = pds.load() - assert_frame_equal(loaded[new_partition_key], df) - - @pytest.mark.parametrize( - "filename_suffix,expected_partitions", - [ - ( - "", - { - "p00/data.csv", - "p01/data.csv", - "p02/data.csv", - "p03/data.csv", - "p04/data.csv", - }, - ), - (".csv", {"p00/data", "p01/data", "p02/data", "p03/data", "p04/data"}), - (".fake", set()), - ], - ) - def test_filename_suffix(self, filename_suffix, expected_partitions, local_csvs): - """Test how specifying filename_suffix affects the available - partitions and their names""" - pds = IncrementalDataset( - str(local_csvs), DATASET, filename_suffix=filename_suffix - ) - loaded = pds.load() - assert loaded.keys() == expected_partitions - - @pytest.mark.parametrize( - "forced_checkpoint,expected_partitions", - [ - ( - "", - { - "p00/data.csv", - "p01/data.csv", - "p02/data.csv", - "p03/data.csv", - "p04/data.csv", - }, - ), - ( - "p00/data.csv", - {"p01/data.csv", "p02/data.csv", "p03/data.csv", "p04/data.csv"}, - ), - ("p03/data.csv", {"p04/data.csv"}), - ], - ) - def test_force_checkpoint_no_checkpoint_file( - self, forced_checkpoint, expected_partitions, local_csvs - ): - """Test how forcing checkpoint value affects the available partitions - if the checkpoint file does not exist""" - pds = IncrementalDataset(str(local_csvs), DATASET, checkpoint=forced_checkpoint) - loaded = pds.load() - assert loaded.keys() == expected_partitions - - confirm_path = local_csvs / pds.DEFAULT_CHECKPOINT_FILENAME - assert not confirm_path.exists() - pds.confirm() - assert confirm_path.is_file() - assert confirm_path.read_text() == max(expected_partitions) - - @pytest.mark.parametrize( - "forced_checkpoint,expected_partitions", - [ - ( - "", - { - "p00/data.csv", - "p01/data.csv", - "p02/data.csv", - "p03/data.csv", - "p04/data.csv", - }, - ), - ( - "p00/data.csv", - {"p01/data.csv", "p02/data.csv", "p03/data.csv", "p04/data.csv"}, - ), - ("p03/data.csv", {"p04/data.csv"}), - ], - ) - def test_force_checkpoint_checkpoint_file_exists( - self, forced_checkpoint, expected_partitions, local_csvs - ): - """Test how forcing checkpoint value affects the available partitions - if the checkpoint file exists""" - IncrementalDataset(str(local_csvs), DATASET).confirm() - checkpoint = local_csvs / IncrementalDataset.DEFAULT_CHECKPOINT_FILENAME - assert checkpoint.read_text() == "p04/data.csv" - - pds = IncrementalDataset(str(local_csvs), DATASET, checkpoint=forced_checkpoint) - assert pds._checkpoint.exists() - loaded = pds.load() - assert loaded.keys() == expected_partitions - - @pytest.mark.parametrize( - "forced_checkpoint", ["p04/data.csv", "p10/data.csv", "p100/data.csv"] - ) - def test_force_checkpoint_no_partitions(self, forced_checkpoint, local_csvs): - """Test that forcing the checkpoint to certain values results in no - partitions being returned""" - pds = IncrementalDataset(str(local_csvs), DATASET, checkpoint=forced_checkpoint) - loaded = pds.load() - assert loaded == {} - - confirm_path = local_csvs / pds.DEFAULT_CHECKPOINT_FILENAME - assert not confirm_path.exists() - pds.confirm() - # confirming with no partitions available must have no effect - assert not confirm_path.exists() - - def test_checkpoint_path(self, local_csvs, partitioned_data_pandas): - """Test configuring a different checkpoint path""" - checkpoint_path = local_csvs / "checkpoint_folder" / "checkpoint_file" - assert not checkpoint_path.exists() - - IncrementalDataset( - str(local_csvs), DATASET, checkpoint={"filepath": str(checkpoint_path)} - ).confirm() - assert checkpoint_path.is_file() - assert checkpoint_path.read_text() == max(partitioned_data_pandas) - - @pytest.mark.parametrize( - "checkpoint_config,expected_checkpoint_class", - [ - (None, TextDataset), - ({"type": "kedro_datasets.pickle.PickleDataset"}, PickleDataset), - ({"type": "tests.io.test_incremental_dataset.DummyDataset"}, DummyDataset), - ], - ) - def test_checkpoint_type( - self, tmp_path, checkpoint_config, expected_checkpoint_class - ): - """Test configuring a different checkpoint dataset type""" - pds = IncrementalDataset(str(tmp_path), DATASET, checkpoint=checkpoint_config) - assert isinstance(pds._checkpoint, expected_checkpoint_class) - - @pytest.mark.parametrize( - "checkpoint_config,error_pattern", - [ - ( - {"versioned": True}, - "'IncrementalDataset' does not support versioning " - "of the checkpoint. Please remove 'versioned' key from the " - "checkpoint definition.", - ), - ( - {"version": None}, - "'IncrementalDataset' does not support versioning " - "of the checkpoint. Please remove 'version' key from the " - "checkpoint definition.", - ), - ], - ) - def test_version_not_allowed(self, tmp_path, checkpoint_config, error_pattern): - """Test that invalid checkpoint configurations raise expected errors""" - with pytest.raises(DatasetError, match=re.escape(error_pattern)): - IncrementalDataset(str(tmp_path), DATASET, checkpoint=checkpoint_config) - - @pytest.mark.parametrize( - "pds_config,fs_creds,dataset_creds,checkpoint_creds", - [ - ( - {"dataset": DATASET, "credentials": {"cred": "common"}}, - {"cred": "common"}, - {"cred": "common"}, - {"cred": "common"}, - ), - ( - { - "dataset": {"type": DATASET, "credentials": {"ds": "only"}}, - "credentials": {"cred": "common"}, - }, - {"cred": "common"}, - {"ds": "only"}, - {"cred": "common"}, - ), - ( - { - "dataset": DATASET, - "credentials": {"cred": "common"}, - "checkpoint": {"credentials": {"cp": "only"}}, - }, - {"cred": "common"}, - {"cred": "common"}, - {"cp": "only"}, - ), - ( - { - "dataset": {"type": DATASET, "credentials": {"ds": "only"}}, - "checkpoint": {"credentials": {"cp": "only"}}, - }, - {}, - {"ds": "only"}, - {"cp": "only"}, - ), - ( - { - "dataset": {"type": DATASET, "credentials": None}, - "credentials": {"cred": "common"}, - "checkpoint": {"credentials": None}, - }, - {"cred": "common"}, - None, - None, - ), - ], - ) - def test_credentials(self, pds_config, fs_creds, dataset_creds, checkpoint_creds): - """Test correctness of credentials propagation into the dataset and - checkpoint constructors""" - pds = IncrementalDataset(str(Path.cwd()), **pds_config) - assert pds._credentials == fs_creds - assert pds._dataset_config[CREDENTIALS_KEY] == dataset_creds - assert pds._checkpoint_config[CREDENTIALS_KEY] == checkpoint_creds - - @pytest.mark.parametrize( - "comparison_func,expected_partitions", - [ - ( - "tests.io.test_incremental_dataset.dummy_gt_func", - {"p03/data.csv", "p04/data.csv"}, - ), - (dummy_gt_func, {"p03/data.csv", "p04/data.csv"}), - ( - "tests.io.test_incremental_dataset.dummy_lt_func", - {"p00/data.csv", "p01/data.csv"}, - ), - (dummy_lt_func, {"p00/data.csv", "p01/data.csv"}), - ], - ) - def test_comparison_func(self, comparison_func, expected_partitions, local_csvs): - """Test that specifying a custom function for comparing the checkpoint value - to a partition id results in expected partitions being returned on load""" - checkpoint_config = { - "force_checkpoint": "p02/data.csv", - "comparison_func": comparison_func, - } - pds = IncrementalDataset(str(local_csvs), DATASET, checkpoint=checkpoint_config) - assert pds.load().keys() == expected_partitions - - -BUCKET_NAME = "fake_bucket_name" - - -@pytest.fixture -def mocked_s3_bucket(): - """Create a bucket for testing using moto.""" - with mock_s3(): - conn = boto3.client( - "s3", - region_name="us-east-1", - aws_access_key_id="fake_access_key", - aws_secret_access_key="fake_secret_key", - ) - conn.create_bucket(Bucket=BUCKET_NAME) - yield conn - - -@pytest.fixture -def mocked_csvs_in_s3(mocked_s3_bucket, partitioned_data_pandas): - prefix = "csvs" - for key, data in partitioned_data_pandas.items(): - mocked_s3_bucket.put_object( - Bucket=BUCKET_NAME, - Key=f"{prefix}/{key}", - Body=data.to_csv(index=False), - ) - return f"s3://{BUCKET_NAME}/{prefix}" - - -class TestPartitionedDatasetS3: - os.environ["AWS_ACCESS_KEY_ID"] = "FAKE_ACCESS_KEY" - os.environ["AWS_SECRET_ACCESS_KEY"] = "FAKE_SECRET_KEY" - - def test_load_and_confirm(self, mocked_csvs_in_s3, partitioned_data_pandas): - """Test the standard flow for loading, confirming and reloading - a IncrementalDataset in S3""" - pds = IncrementalDataset(mocked_csvs_in_s3, DATASET) - assert pds._checkpoint._protocol == "s3" - loaded = pds.load() - assert loaded.keys() == partitioned_data_pandas.keys() - for partition_id, data in loaded.items(): - assert_frame_equal(data, partitioned_data_pandas[partition_id]) - - assert not pds._checkpoint.exists() - assert pds._read_checkpoint() is None - pds.confirm() - assert pds._checkpoint.exists() - assert pds._read_checkpoint() == max(partitioned_data_pandas) - - def test_load_and_confirm_s3a( - self, mocked_csvs_in_s3, partitioned_data_pandas, mocker - ): - s3a_path = f"s3a://{mocked_csvs_in_s3.split('://', 1)[1]}" - pds = IncrementalDataset(s3a_path, DATASET) - assert pds._protocol == "s3a" - assert pds._checkpoint._protocol == "s3" - - mocked_ds = mocker.patch.object(pds, "_dataset_type") - mocked_ds.__name__ = "mocked" - loaded = pds.load() - - assert loaded.keys() == partitioned_data_pandas.keys() - assert not pds._checkpoint.exists() - assert pds._read_checkpoint() is None - pds.confirm() - assert pds._checkpoint.exists() - assert pds._read_checkpoint() == max(partitioned_data_pandas) - - @pytest.mark.parametrize( - "forced_checkpoint,expected_partitions", - [ - ( - "", - { - "p00/data.csv", - "p01/data.csv", - "p02/data.csv", - "p03/data.csv", - "p04/data.csv", - }, - ), - ( - "p00/data.csv", - {"p01/data.csv", "p02/data.csv", "p03/data.csv", "p04/data.csv"}, - ), - ("p03/data.csv", {"p04/data.csv"}), - ], - ) - def test_force_checkpoint_no_checkpoint_file( - self, forced_checkpoint, expected_partitions, mocked_csvs_in_s3 - ): - """Test how forcing checkpoint value affects the available partitions - in S3 if the checkpoint file does not exist""" - pds = IncrementalDataset( - mocked_csvs_in_s3, DATASET, checkpoint=forced_checkpoint - ) - loaded = pds.load() - assert loaded.keys() == expected_partitions - - assert not pds._checkpoint.exists() - pds.confirm() - assert pds._checkpoint.exists() - assert pds._checkpoint.load() == max(expected_partitions) - - @pytest.mark.parametrize( - "forced_checkpoint,expected_partitions", - [ - ( - "", - { - "p00/data.csv", - "p01/data.csv", - "p02/data.csv", - "p03/data.csv", - "p04/data.csv", - }, - ), - ( - "p00/data.csv", - {"p01/data.csv", "p02/data.csv", "p03/data.csv", "p04/data.csv"}, - ), - ("p03/data.csv", {"p04/data.csv"}), - ], - ) - def test_force_checkpoint_checkpoint_file_exists( - self, forced_checkpoint, expected_partitions, mocked_csvs_in_s3 - ): - """Test how forcing checkpoint value affects the available partitions - in S3 if the checkpoint file exists""" - # create checkpoint and assert that it exists - IncrementalDataset(mocked_csvs_in_s3, DATASET).confirm() - checkpoint_path = ( - f"{mocked_csvs_in_s3}/{IncrementalDataset.DEFAULT_CHECKPOINT_FILENAME}" - ) - checkpoint_value = TextDataset(checkpoint_path).load() - assert checkpoint_value == "p04/data.csv" - - pds = IncrementalDataset( - mocked_csvs_in_s3, DATASET, checkpoint=forced_checkpoint - ) - assert pds._checkpoint.exists() - loaded = pds.load() - assert loaded.keys() == expected_partitions - - @pytest.mark.parametrize( - "forced_checkpoint", ["p04/data.csv", "p10/data.csv", "p100/data.csv"] - ) - def test_force_checkpoint_no_partitions(self, forced_checkpoint, mocked_csvs_in_s3): - """Test that forcing the checkpoint to certain values results in no - partitions returned from S3""" - pds = IncrementalDataset( - mocked_csvs_in_s3, DATASET, checkpoint=forced_checkpoint - ) - loaded = pds.load() - assert loaded == {} - - assert not pds._checkpoint.exists() - pds.confirm() - # confirming with no partitions available must have no effect - assert not pds._checkpoint.exists() diff --git a/tests/io/test_partitioned_dataset.py b/tests/io/test_partitioned_dataset.py deleted file mode 100644 index 0acece1eab..0000000000 --- a/tests/io/test_partitioned_dataset.py +++ /dev/null @@ -1,553 +0,0 @@ -import logging -import os -import re -from pathlib import Path - -import boto3 -import pandas as pd -import pytest -import s3fs -from kedro_datasets.pandas import CSVDataset, ParquetDataset -from moto import mock_s3 -from pandas.testing import assert_frame_equal - -from kedro.io import DatasetError, PartitionedDataset -from kedro.io.data_catalog import CREDENTIALS_KEY -from kedro.io.partitioned_dataset import KEY_PROPAGATION_WARNING - - -@pytest.fixture -def partitioned_data_pandas(): - keys = ("p1/data1.csv", "p2.csv", "p1/data2.csv", "p3", "_p4") - return { - k: pd.DataFrame({"part": k, "counter": list(range(counter))}) - for counter, k in enumerate(keys, 1) - } - - -@pytest.fixture -def local_csvs(tmp_path, partitioned_data_pandas): - local_dir = Path(str(tmp_path / "csvs")) - local_dir.mkdir() - - for k, data in partitioned_data_pandas.items(): - path = local_dir / k - path.parent.mkdir(parents=True, exist_ok=True) - data.to_csv(str(path), index=False) - return local_dir - - -LOCAL_DATASET_DEFINITION = [ - "pandas.CSVDataset", - "kedro_datasets.pandas.CSVDataset", - CSVDataset, - {"type": "pandas.CSVDataset", "save_args": {"index": False}}, - {"type": CSVDataset}, -] - - -class FakeDataset: - pass - - -class TestPartitionedDatasetLocal: - @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) - @pytest.mark.parametrize( - "suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] - ) - def test_load( - self, dataset, local_csvs, partitioned_data_pandas, suffix, expected_num_parts - ): - pds = PartitionedDataset(str(local_csvs), dataset, filename_suffix=suffix) - loaded_partitions = pds.load() - - assert len(loaded_partitions.keys()) == expected_num_parts - for partition_id, load_func in loaded_partitions.items(): - df = load_func() - assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix]) - if suffix: - assert not partition_id.endswith(suffix) - - @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) - @pytest.mark.parametrize("suffix", ["", ".csv"]) - def test_save(self, dataset, local_csvs, suffix): - pds = PartitionedDataset(str(local_csvs), dataset, filename_suffix=suffix) - original_data = pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) - part_id = "new/data" - pds.save({part_id: original_data}) - - assert (local_csvs / "new" / ("data" + suffix)).is_file() - loaded_partitions = pds.load() - assert part_id in loaded_partitions - reloaded_data = loaded_partitions[part_id]() - assert_frame_equal(reloaded_data, original_data) - - @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) - @pytest.mark.parametrize("suffix", ["", ".csv"]) - def test_lazy_save(self, dataset, local_csvs, suffix): - pds = PartitionedDataset(str(local_csvs), dataset, filename_suffix=suffix) - - def original_data(): - return pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) - - part_id = "new/data" - pds.save({part_id: original_data}) - - assert (local_csvs / "new" / ("data" + suffix)).is_file() - loaded_partitions = pds.load() - assert part_id in loaded_partitions - reloaded_data = loaded_partitions[part_id]() - assert_frame_equal(reloaded_data, original_data()) - - def test_save_invalidates_cache(self, local_csvs, mocker): - """Test that save calls invalidate partition cache""" - pds = PartitionedDataset(str(local_csvs), "pandas.CSVDataset") - mocked_fs_invalidate = mocker.patch.object(pds._filesystem, "invalidate_cache") - first_load = pds.load() - assert pds._partition_cache.currsize == 1 - mocked_fs_invalidate.assert_not_called() - - # save clears cache - data = pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) - new_partition = "new/data.csv" - pds.save({new_partition: data}) - assert pds._partition_cache.currsize == 0 - # it seems that `_filesystem.invalidate_cache` calls itself inside, - # resulting in not one, but 2 mock calls - # hence using `assert_any_call` instead of `assert_called_once_with` - mocked_fs_invalidate.assert_any_call(pds._normalized_path) - - # new load returns new partition too - second_load = pds.load() - assert new_partition not in first_load - assert new_partition in second_load - - @pytest.mark.parametrize("overwrite,expected_num_parts", [(False, 6), (True, 1)]) - def test_overwrite(self, local_csvs, overwrite, expected_num_parts): - pds = PartitionedDataset( - str(local_csvs), "pandas.CSVDataset", overwrite=overwrite - ) - original_data = pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) - part_id = "new/data" - pds.save({part_id: original_data}) - loaded_partitions = pds.load() - - assert part_id in loaded_partitions - assert len(loaded_partitions.keys()) == expected_num_parts - - def test_release_instance_cache(self, local_csvs): - """Test that cache invalidation does not affect other instances""" - ds_a = PartitionedDataset(str(local_csvs), "pandas.CSVDataset") - ds_a.load() - ds_b = PartitionedDataset(str(local_csvs), "pandas.CSVDataset") - ds_b.load() - - assert ds_a._partition_cache.currsize == 1 - assert ds_b._partition_cache.currsize == 1 - - # invalidate cache of the dataset A - ds_a.release() - assert ds_a._partition_cache.currsize == 0 - # cache of the dataset B is unaffected - assert ds_b._partition_cache.currsize == 1 - - @pytest.mark.parametrize("dataset", ["pandas.CSVDataset", "pandas.ParquetDataset"]) - def test_exists(self, local_csvs, dataset): - assert PartitionedDataset(str(local_csvs), dataset).exists() - - empty_folder = local_csvs / "empty" / "folder" - assert not PartitionedDataset(str(empty_folder), dataset).exists() - empty_folder.mkdir(parents=True) - assert not PartitionedDataset(str(empty_folder), dataset).exists() - - @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) - def test_release(self, dataset, local_csvs): - partition_to_remove = "p2.csv" - pds = PartitionedDataset(str(local_csvs), dataset) - initial_load = pds.load() - assert partition_to_remove in initial_load - - (local_csvs / partition_to_remove).unlink() - cached_load = pds.load() - assert initial_load.keys() == cached_load.keys() - - pds.release() - load_after_release = pds.load() - assert initial_load.keys() ^ load_after_release.keys() == {partition_to_remove} - - @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) - def test_describe(self, dataset): - path = str(Path.cwd()) - pds = PartitionedDataset(path, dataset) - - assert f"path={path}" in str(pds) - assert "dataset_type=CSVDataset" in str(pds) - assert "dataset_config" in str(pds) - - def test_load_args(self, mocker): - fake_partition_name = "fake_partition" - mocked_filesystem = mocker.patch("fsspec.filesystem") - mocked_find = mocked_filesystem.return_value.find - mocked_find.return_value = [fake_partition_name] - - path = str(Path.cwd()) - load_args = {"maxdepth": 42, "withdirs": True} - pds = PartitionedDataset(path, "pandas.CSVDataset", load_args=load_args) - mocker.patch.object(pds, "_path_to_partition", return_value=fake_partition_name) - - assert pds.load().keys() == {fake_partition_name} - mocked_find.assert_called_once_with(path, **load_args) - - @pytest.mark.parametrize( - "credentials,expected_pds_creds,expected_dataset_creds", - [({"cred": "common"}, {"cred": "common"}, {"cred": "common"}), (None, {}, {})], - ) - def test_credentials( - self, mocker, credentials, expected_pds_creds, expected_dataset_creds - ): - mocked_filesystem = mocker.patch("fsspec.filesystem") - path = str(Path.cwd()) - pds = PartitionedDataset(path, "pandas.CSVDataset", credentials=credentials) - - assert mocked_filesystem.call_count == 2 - mocked_filesystem.assert_called_with("file", **expected_pds_creds) - if expected_dataset_creds: - assert pds._dataset_config[CREDENTIALS_KEY] == expected_dataset_creds - else: - assert CREDENTIALS_KEY not in pds._dataset_config - - str_repr = str(pds) - - def _assert_not_in_repr(value): - if isinstance(value, dict): - for k_, v_ in value.items(): - _assert_not_in_repr(k_) - _assert_not_in_repr(v_) - if value is not None: - assert str(value) not in str_repr - - _assert_not_in_repr(credentials) - - def test_fs_args(self, mocker): - fs_args = {"foo": "bar"} - - mocked_filesystem = mocker.patch("fsspec.filesystem") - path = str(Path.cwd()) - pds = PartitionedDataset(path, "pandas.CSVDataset", fs_args=fs_args) - - assert mocked_filesystem.call_count == 2 - mocked_filesystem.assert_called_with("file", **fs_args) - assert pds._dataset_config["fs_args"] == fs_args - - @pytest.mark.parametrize("dataset", ["pandas.ParquetDataset", ParquetDataset]) - def test_invalid_dataset(self, dataset, local_csvs): - pds = PartitionedDataset(str(local_csvs), dataset) - loaded_partitions = pds.load() - - for partition, df_loader in loaded_partitions.items(): - pattern = r"Failed while loading data from data set ParquetDataset(.*)" - with pytest.raises(DatasetError, match=pattern) as exc_info: - df_loader() - error_message = str(exc_info.value) - assert ( - "Either the file is corrupted or this is not a parquet file" - in error_message - ) - assert str(partition) in error_message - - @pytest.mark.parametrize( - "dataset_config,error_pattern", - [ - ("UndefinedDatasetType", "Class 'UndefinedDatasetType' not found"), - ( - "missing.module.UndefinedDatasetType", - r"Class 'missing\.module\.UndefinedDatasetType' not found", - ), - ( - FakeDataset, - r"Dataset type 'tests\.io\.test_partitioned_dataset\.FakeDataset' " - r"is invalid\: all data set types must extend 'AbstractDataset'", - ), - ({}, "'type' is missing from dataset catalog configuration"), - ], - ) - def test_invalid_dataset_config(self, dataset_config, error_pattern): - with pytest.raises(DatasetError, match=error_pattern): - PartitionedDataset(str(Path.cwd()), dataset_config) - - @pytest.mark.parametrize( - "dataset_config", - [ - {"type": CSVDataset, "versioned": True}, - {"type": "pandas.CSVDataset", "versioned": True}, - ], - ) - def test_versioned_dataset_not_allowed(self, dataset_config): - pattern = ( - "'PartitionedDataset' does not support versioning of the underlying " - "dataset. Please remove 'versioned' flag from the dataset definition." - ) - with pytest.raises(DatasetError, match=re.escape(pattern)): - PartitionedDataset(str(Path.cwd()), dataset_config) - - def test_no_partitions(self, tmpdir): - pds = PartitionedDataset(str(tmpdir), "pandas.CSVDataset") - - pattern = re.escape(f"No partitions found in '{tmpdir}'") - with pytest.raises(DatasetError, match=pattern): - pds.load() - - @pytest.mark.parametrize( - "pds_config,filepath_arg", - [ - ( - { - "path": str(Path.cwd()), - "dataset": {"type": CSVDataset, "filepath": "fake_path"}, - }, - "filepath", - ), - ( - { - "path": str(Path.cwd()), - "dataset": {"type": CSVDataset, "other_arg": "fake_path"}, - "filepath_arg": "other_arg", - }, - "other_arg", - ), - ], - ) - def test_filepath_arg_warning(self, pds_config, filepath_arg): - pattern = ( - f"'{filepath_arg}' key must not be specified in the dataset definition as it " - f"will be overwritten by partition path" - ) - with pytest.warns(UserWarning, match=re.escape(pattern)): - PartitionedDataset(**pds_config) - - def test_credentials_log_warning(self, caplog): - """Check that the warning is logged if the dataset credentials will overwrite - the top-level ones""" - pds = PartitionedDataset( - path=str(Path.cwd()), - dataset={"type": CSVDataset, "credentials": {"secret": "dataset"}}, - credentials={"secret": "global"}, - ) - log_message = KEY_PROPAGATION_WARNING % { - "keys": "credentials", - "target": "underlying dataset", - } - assert caplog.record_tuples == [("kedro.io.core", logging.WARNING, log_message)] - assert pds._dataset_config["credentials"] == {"secret": "dataset"} - - def test_fs_args_log_warning(self, caplog): - """Check that the warning is logged if the dataset filesystem - arguments will overwrite the top-level ones""" - pds = PartitionedDataset( - path=str(Path.cwd()), - dataset={"type": CSVDataset, "fs_args": {"args": "dataset"}}, - fs_args={"args": "dataset"}, - ) - log_message = KEY_PROPAGATION_WARNING % { - "keys": "filesystem arguments", - "target": "underlying dataset", - } - assert caplog.record_tuples == [("kedro.io.core", logging.WARNING, log_message)] - assert pds._dataset_config["fs_args"] == {"args": "dataset"} - - @pytest.mark.parametrize( - "pds_config,expected_ds_creds,global_creds", - [ - ( - {"dataset": "pandas.CSVDataset", "credentials": {"secret": "global"}}, - {"secret": "global"}, - {"secret": "global"}, - ), - ( - { - "dataset": { - "type": CSVDataset, - "credentials": {"secret": "expected"}, - }, - }, - {"secret": "expected"}, - {}, - ), - ( - { - "dataset": {"type": CSVDataset, "credentials": None}, - "credentials": {"secret": "global"}, - }, - None, - {"secret": "global"}, - ), - ( - { - "dataset": { - "type": CSVDataset, - "credentials": {"secret": "expected"}, - }, - "credentials": {"secret": "global"}, - }, - {"secret": "expected"}, - {"secret": "global"}, - ), - ], - ) - def test_dataset_creds(self, pds_config, expected_ds_creds, global_creds): - """Check that global credentials do not interfere dataset credentials.""" - pds = PartitionedDataset(path=str(Path.cwd()), **pds_config) - assert pds._dataset_config["credentials"] == expected_ds_creds - assert pds._credentials == global_creds - - -BUCKET_NAME = "fake_bucket_name" -S3_DATASET_DEFINITION = [ - "pandas.CSVDataset", - "kedro_datasets.pandas.CSVDataset", - CSVDataset, - {"type": "pandas.CSVDataset", "save_args": {"index": False}}, - {"type": CSVDataset}, -] - - -@pytest.fixture -def mocked_s3_bucket(): - """Create a bucket for testing using moto.""" - with mock_s3(): - conn = boto3.client( - "s3", - region_name="us-east-1", - aws_access_key_id="fake_access_key", - aws_secret_access_key="fake_secret_key", - ) - conn.create_bucket(Bucket=BUCKET_NAME) - yield conn - - -@pytest.fixture -def mocked_csvs_in_s3(mocked_s3_bucket, partitioned_data_pandas): - prefix = "csvs" - for key, data in partitioned_data_pandas.items(): - mocked_s3_bucket.put_object( - Bucket=BUCKET_NAME, - Key=f"{prefix}/{key}", - Body=data.to_csv(index=False), - ) - return f"s3://{BUCKET_NAME}/{prefix}" - - -class TestPartitionedDatasetS3: - os.environ["AWS_ACCESS_KEY_ID"] = "FAKE_ACCESS_KEY" - os.environ["AWS_SECRET_ACCESS_KEY"] = "FAKE_SECRET_KEY" - - @pytest.mark.parametrize("dataset", S3_DATASET_DEFINITION) - def test_load(self, dataset, mocked_csvs_in_s3, partitioned_data_pandas): - pds = PartitionedDataset(mocked_csvs_in_s3, dataset) - loaded_partitions = pds.load() - - assert loaded_partitions.keys() == partitioned_data_pandas.keys() - for partition_id, load_func in loaded_partitions.items(): - df = load_func() - assert_frame_equal(df, partitioned_data_pandas[partition_id]) - - def test_load_s3a(self, mocked_csvs_in_s3, partitioned_data_pandas, mocker): - path = mocked_csvs_in_s3.split("://", 1)[1] - s3a_path = f"s3a://{path}" - # any type is fine as long as it passes isinstance check - # since _dataset_type is mocked later anyways - pds = PartitionedDataset(s3a_path, "pandas.CSVDataset") - assert pds._protocol == "s3a" - - mocked_ds = mocker.patch.object(pds, "_dataset_type") - mocked_ds.__name__ = "mocked" - loaded_partitions = pds.load() - - assert loaded_partitions.keys() == partitioned_data_pandas.keys() - assert mocked_ds.call_count == len(loaded_partitions) - expected = [ - mocker.call(filepath=f"{s3a_path}/{partition_id}") - for partition_id in loaded_partitions - ] - mocked_ds.assert_has_calls(expected, any_order=True) - - @pytest.mark.parametrize( - "partition_path", ["s3_bucket/dummy.csv", "fake_bucket/dummy.csv"] - ) - def test_join_protocol_with_bucket_name_startswith_protocol( - self, mocked_csvs_in_s3, partition_path - ): - """Make sure protocol is joined correctly for the edge case when - bucket name starts with the protocol name, i.e. `s3://s3_bucket/dummy_.txt` - """ - - pds = PartitionedDataset(mocked_csvs_in_s3, "pandas.CSVDataset") - assert pds._join_protocol(partition_path) == f"s3://{partition_path}" - - @pytest.mark.parametrize("dataset", S3_DATASET_DEFINITION) - def test_save(self, dataset, mocked_csvs_in_s3): - pds = PartitionedDataset(mocked_csvs_in_s3, dataset) - original_data = pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) - part_id = "new/data.csv" - pds.save({part_id: original_data}) - - s3 = s3fs.S3FileSystem() - assert s3.exists("/".join([mocked_csvs_in_s3, part_id])) - - loaded_partitions = pds.load() - assert part_id in loaded_partitions - reloaded_data = loaded_partitions[part_id]() - assert_frame_equal(reloaded_data, original_data) - - def test_save_s3a(self, mocked_csvs_in_s3, mocker): - """Test that save works in case of s3a protocol""" - path = mocked_csvs_in_s3.split("://", 1)[1] - s3a_path = f"s3a://{path}" - # any type is fine as long as it passes isinstance check - # since _dataset_type is mocked later anyways - pds = PartitionedDataset(s3a_path, "pandas.CSVDataset", filename_suffix=".csv") - assert pds._protocol == "s3a" - - mocked_ds = mocker.patch.object(pds, "_dataset_type") - mocked_ds.__name__ = "mocked" - new_partition = "new/data" - data = "data" - - pds.save({new_partition: data}) - mocked_ds.assert_called_once_with(filepath=f"{s3a_path}/{new_partition}.csv") - mocked_ds.return_value.save.assert_called_once_with(data) - - @pytest.mark.parametrize("dataset", ["pandas.CSVDataset", "pandas.HDFDataset"]) - def test_exists(self, dataset, mocked_csvs_in_s3): - assert PartitionedDataset(mocked_csvs_in_s3, dataset).exists() - - empty_folder = "/".join([mocked_csvs_in_s3, "empty", "folder"]) - assert not PartitionedDataset(empty_folder, dataset).exists() - - s3fs.S3FileSystem().mkdir(empty_folder) - assert not PartitionedDataset(empty_folder, dataset).exists() - - @pytest.mark.parametrize("dataset", S3_DATASET_DEFINITION) - def test_release(self, dataset, mocked_csvs_in_s3): - partition_to_remove = "p2.csv" - pds = PartitionedDataset(mocked_csvs_in_s3, dataset) - initial_load = pds.load() - assert partition_to_remove in initial_load - - s3 = s3fs.S3FileSystem() - s3.rm("/".join([mocked_csvs_in_s3, partition_to_remove])) - cached_load = pds.load() - assert initial_load.keys() == cached_load.keys() - - pds.release() - load_after_release = pds.load() - assert initial_load.keys() ^ load_after_release.keys() == {partition_to_remove} - - @pytest.mark.parametrize("dataset", S3_DATASET_DEFINITION) - def test_describe(self, dataset): - path = f"s3://{BUCKET_NAME}/foo/bar" - pds = PartitionedDataset(path, dataset) - - assert f"path={path}" in str(pds) - assert "dataset_type=CSVDataset" in str(pds) - assert "dataset_config" in str(pds) From 5e3fc120a8e84aca01767c00a84a3c0a3731c65c Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 17 Oct 2023 15:24:04 +0100 Subject: [PATCH 02/12] lint Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- tests/io/test_data_catalog.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/io/test_data_catalog.py b/tests/io/test_data_catalog.py index 00228c4c53..b7d9369cba 100644 --- a/tests/io/test_data_catalog.py +++ b/tests/io/test_data_catalog.py @@ -8,7 +8,6 @@ import pandas as pd import pytest from kedro_datasets.pandas import CSVDataset, ParquetDataset -from kedro_datasets.partitions import IncrementalDataset from pandas.testing import assert_frame_equal from kedro.io import ( @@ -595,7 +594,9 @@ def test_error_dataset_init(self, bad_config): def test_confirm(self, tmp_path, caplog, mocker): """Confirm the dataset""" with caplog.at_level(logging.INFO): - mock_confirm = mocker.patch("kedro_datasets.partitions.incremental_dataset.IncrementalDataset.confirm") + mock_confirm = mocker.patch( + "kedro_datasets.partitions.incremental_dataset.IncrementalDataset.confirm" + ) catalog = { "ds_to_confirm": { "type": "kedro_datasets.partitions.incremental_dataset.IncrementalDataset", From c8be77fb0a81a0e5d7f29cf0e0fa400918e37b8d Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 17 Oct 2023 15:35:57 +0100 Subject: [PATCH 03/12] update docs Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/data/how_to_create_a_custom_dataset.md | 2 +- docs/source/data/partitioned_and_incremental_datasets.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/data/how_to_create_a_custom_dataset.md b/docs/source/data/how_to_create_a_custom_dataset.md index 56d5e781f0..c1e4d81ced 100644 --- a/docs/source/data/how_to_create_a_custom_dataset.md +++ b/docs/source/data/how_to_create_a_custom_dataset.md @@ -271,7 +271,7 @@ class ImageDataset(AbstractDataset[np.ndarray, np.ndarray]): Currently, the `ImageDataset` only works with a single image, but this example needs to load all Pokemon images from the raw data directory for further processing. -Kedro's [`PartitionedDataset`](/kedro.io.PartitionedDataset) is a convenient way to load multiple separate data files of the same underlying dataset type into a directory. +Kedro's [`PartitionedDataset`](/kedro_datasets.partitions.PartitionedDataset) is a convenient way to load multiple separate data files of the same underlying dataset type into a directory. To use `PartitionedDataset` with `ImageDataset` to load all Pokemon PNG images, add this to the data catalog YAML so that `PartitionedDataset` loads all PNG files from the data directory using `ImageDataset`: diff --git a/docs/source/data/partitioned_and_incremental_datasets.md b/docs/source/data/partitioned_and_incremental_datasets.md index 6fe9796b35..fd9655fbe0 100644 --- a/docs/source/data/partitioned_and_incremental_datasets.md +++ b/docs/source/data/partitioned_and_incremental_datasets.md @@ -4,7 +4,7 @@ Distributed systems play an increasingly important role in ETL data pipelines. They increase the processing throughput, enabling us to work with much larger volumes of input data. A situation may arise where your Kedro node needs to read the data from a directory full of uniform files of the same type like JSON or CSV. Tools like `PySpark` and the corresponding [SparkDataset](/kedro_datasets.spark.SparkDataset) cater for such use cases but may not always be possible. -This is why Kedro provides a built-in [PartitionedDataset](/kedro.io.PartitionedDataset), with the following features: +This is why Kedro provides a built-in [PartitionedDataset](/kedro_datasets.partitions.PartitionedDataset), with the following features: * `PartitionedDataset` can recursively load/save all or specific files from a given location. * It is platform agnostic, and can work with any filesystem implementation supported by [fsspec](https://filesystem-spec.readthedocs.io/) including local, S3, GCS, and many more. @@ -240,7 +240,7 @@ When using lazy saving, the dataset will be written _after_ the `after_node_run` ## Incremental datasets -[IncrementalDataset](/kedro.io.IncrementalDataset) is a subclass of `PartitionedDataset`, which stores the information about the last processed partition in the so-called `checkpoint`. `IncrementalDataset` addresses the use case when partitions have to be processed incrementally, i.e. each subsequent pipeline run should only process the partitions which were not processed by the previous runs. +[IncrementalDataset](/kedro_datasets.partitions.IncrementalDataset) is a subclass of `PartitionedDataset`, which stores the information about the last processed partition in the so-called `checkpoint`. `IncrementalDataset` addresses the use case when partitions have to be processed incrementally, i.e. each subsequent pipeline run should only process the partitions which were not processed by the previous runs. This checkpoint, by default, is persisted to the location of the data partitions. For example, for `IncrementalDataset` instantiated with path `s3://my-bucket-name/path/to/folder`, the checkpoint will be saved to `s3://my-bucket-name/path/to/folder/CHECKPOINT`, unless [the checkpoint configuration is explicitly overwritten](#checkpoint-configuration). @@ -309,7 +309,7 @@ pipeline( Important notes about the confirmation operation: -* Confirming a partitioned dataset does not affect any subsequent loads within the same run. All downstream nodes that input the same partitioned dataset as input will all receive the _same_ partitions. Partitions that are created externally during the run will also not affect the dataset loads and won't appear in the list of loaded partitions until the next run or until the [`release()`](/kedro.io.IncrementalDataset) method is called on the dataset object. +* Confirming a partitioned dataset does not affect any subsequent loads within the same run. All downstream nodes that input the same partitioned dataset as input will all receive the _same_ partitions. Partitions that are created externally during the run will also not affect the dataset loads and won't appear in the list of loaded partitions until the next run or until the [`release()`](/kedro_datasets.partitions.IncrementalDataset) method is called on the dataset object. * A pipeline cannot contain more than one node confirming the same dataset. From 6a38eec6664f25deee5b38f4129f6db2f2b1e651 Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Wed, 18 Oct 2023 10:34:25 +0100 Subject: [PATCH 04/12] changes based on review Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/data/partitioned_and_incremental_datasets.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/data/partitioned_and_incremental_datasets.md b/docs/source/data/partitioned_and_incremental_datasets.md index fd9655fbe0..2cb2f8d370 100644 --- a/docs/source/data/partitioned_and_incremental_datasets.md +++ b/docs/source/data/partitioned_and_incremental_datasets.md @@ -240,7 +240,7 @@ When using lazy saving, the dataset will be written _after_ the `after_node_run` ## Incremental datasets -[IncrementalDataset](/kedro_datasets.partitions.IncrementalDataset) is a subclass of `PartitionedDataset`, which stores the information about the last processed partition in the so-called `checkpoint`. `IncrementalDataset` addresses the use case when partitions have to be processed incrementally, i.e. each subsequent pipeline run should only process the partitions which were not processed by the previous runs. +[IncrementalDataset](/kedro_datasets.partitions.IncrementalDataset) is a subclass of `PartitionedDataset`, which stores the information about the last processed partition in the so-called `checkpoint`. `IncrementalDataset` addresses the use case when partitions have to be processed incrementally, that is, each subsequent pipeline run should process just the partitions which were not processed by the previous runs. This checkpoint, by default, is persisted to the location of the data partitions. For example, for `IncrementalDataset` instantiated with path `s3://my-bucket-name/path/to/folder`, the checkpoint will be saved to `s3://my-bucket-name/path/to/folder/CHECKPOINT`, unless [the checkpoint configuration is explicitly overwritten](#checkpoint-configuration). From 1a9755cbe935a000021315d9abca95516c4f7fbe Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 13:13:28 +0100 Subject: [PATCH 05/12] Update RELEASE.md Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- RELEASE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE.md b/RELEASE.md index d693e468fa..d0ee688e6d 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -5,6 +5,7 @@ * Introduced add-ons to the `kedro new` CLI flow. ## Bug fixes and other changes +* Removed `PartitionedDataset` and `IncrementalDataset` from `kedro.io` ## Breaking changes to the API * Renamed the `data_sets` argument and the `_data_sets` attribute in `Catalog` and their references to `datasets` and `_datasets` respectively. From f14c81bb5bbac7e4abb12a8191045c6fdb766a30 Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 13:16:01 +0100 Subject: [PATCH 06/12] Update RELEASE.md Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- RELEASE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE.md b/RELEASE.md index d0ee688e6d..0ede3945e7 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -5,7 +5,6 @@ * Introduced add-ons to the `kedro new` CLI flow. ## Bug fixes and other changes -* Removed `PartitionedDataset` and `IncrementalDataset` from `kedro.io` ## Breaking changes to the API * Renamed the `data_sets` argument and the `_data_sets` attribute in `Catalog` and their references to `datasets` and `_datasets` respectively. @@ -16,6 +15,7 @@ ### DataSets * Removed `kedro.extras.datasets` and tests. * Reduced constructor arguments for `APIDataSet` by replacing most arguments with a single constructor argument `load_args`. This makes it more consistent with other Kedro DataSets and the underlying `requests` API, and automatically enables the full configuration domain: stream, certificates, proxies, and more. +* Removed `PartitionedDataset` and `IncrementalDataset` from `kedro.io` ### CLI * Removed deprecated `kedro docs` command. From 6433aae5913989b7b7f0d8902b8c56f41d273d41 Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:56:30 +0100 Subject: [PATCH 07/12] update test config type for PartitonedDataset test Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- tests/io/test_data_catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/io/test_data_catalog.py b/tests/io/test_data_catalog.py index b7d9369cba..0bd2bf6973 100644 --- a/tests/io/test_data_catalog.py +++ b/tests/io/test_data_catalog.py @@ -112,7 +112,7 @@ def config_with_dataset_factories_nested(): return { "catalog": { "{brand}_cars": { - "type": "PartitionedDataset", + "type": "kedro_datasets.partitions.PartitionedDataset", "path": "data/01_raw", "dataset": "pandas.CSVDataset", "metadata": { From 83a2d0814b7bbfe211fac5560bb5cdda88ba4e91 Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 15:29:37 +0100 Subject: [PATCH 08/12] Update kedro_datasets.rst Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/kedro_datasets.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/kedro_datasets.rst b/docs/source/kedro_datasets.rst index 6d3077c338..3df2a476ca 100644 --- a/docs/source/kedro_datasets.rst +++ b/docs/source/kedro_datasets.rst @@ -36,6 +36,8 @@ kedro_datasets kedro_datasets.pandas.SQLQueryDataset kedro_datasets.pandas.SQLTableDataset kedro_datasets.pandas.XMLDataset + kedro_datasets.partition.IncrementalDataset + kedro_datasets.partition.PartitionedDataset kedro_datasets.pickle.PickleDataset kedro_datasets.pillow.ImageDataset kedro_datasets.plotly.JSONDataset From 25057d9731cf1a5649d91d9549776e8f7521d17a Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 15:46:54 +0100 Subject: [PATCH 09/12] Update kedro_datasets.rst Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/kedro_datasets.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/kedro_datasets.rst b/docs/source/kedro_datasets.rst index 3df2a476ca..b58b4177ae 100644 --- a/docs/source/kedro_datasets.rst +++ b/docs/source/kedro_datasets.rst @@ -36,8 +36,8 @@ kedro_datasets kedro_datasets.pandas.SQLQueryDataset kedro_datasets.pandas.SQLTableDataset kedro_datasets.pandas.XMLDataset - kedro_datasets.partition.IncrementalDataset - kedro_datasets.partition.PartitionedDataset + kedro_datasets.partitions.IncrementalDataset + kedro_datasets.partitions.PartitionedDataset kedro_datasets.pickle.PickleDataset kedro_datasets.pillow.ImageDataset kedro_datasets.plotly.JSONDataset From 6b9a2a306b083a9281819bea8a327d520e160aae Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 16:06:41 +0100 Subject: [PATCH 10/12] update kedro-datasets for docs Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5a21556c5e..00810d6600 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,7 +99,7 @@ docs = [ "sphinxcontrib-mermaid~=0.7.1", "myst-parser~=1.0.0", "Jinja2<3.1.0", - "kedro-datasets[all]~=1.7.0", + "kedro-datasets[all]~=1.8.0", ] all = [ "kedro[test,docs]" ] From b9fc220a17de865c90b006e0c64b531e52ac7f87 Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 16:20:08 +0100 Subject: [PATCH 11/12] add eager and lazy datasets to rst Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/kedro_datasets.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/kedro_datasets.rst b/docs/source/kedro_datasets.rst index b58b4177ae..f9c144aa81 100644 --- a/docs/source/kedro_datasets.rst +++ b/docs/source/kedro_datasets.rst @@ -44,6 +44,8 @@ kedro_datasets kedro_datasets.plotly.PlotlyDataset kedro_datasets.polars.CSVDataset kedro_datasets.polars.GenericDataset + kedro_datasets.polars.EagerPolarsDataset + kedro_datasets.polars.LazyPolarsDataset kedro_datasets.redis.PickleDataset kedro_datasets.snowflake.SnowparkTableDataset kedro_datasets.spark.DeltaTableDataset From 3b9adfe1246363da22680e5da927a54c7731436e Mon Sep 17 00:00:00 2001 From: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> Date: Tue, 24 Oct 2023 17:38:23 +0100 Subject: [PATCH 12/12] remove LazyPolarsDataset from rst temporarily Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/kedro_datasets.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/source/kedro_datasets.rst b/docs/source/kedro_datasets.rst index f9c144aa81..7f0459a8e0 100644 --- a/docs/source/kedro_datasets.rst +++ b/docs/source/kedro_datasets.rst @@ -45,7 +45,6 @@ kedro_datasets kedro_datasets.polars.CSVDataset kedro_datasets.polars.GenericDataset kedro_datasets.polars.EagerPolarsDataset - kedro_datasets.polars.LazyPolarsDataset kedro_datasets.redis.PickleDataset kedro_datasets.snowflake.SnowparkTableDataset kedro_datasets.spark.DeltaTableDataset