Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datasets): Improved compatibility, functionality and testing for SnowflakeTableDataset #881

Merged
merged 97 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
2305c3d
check and convert pd df to snowflake df
tdhooghe Oct 10, 2024
07ef67b
add tests
tdhooghe Oct 10, 2024
04928ba
revert import to previous state
tdhooghe Oct 10, 2024
77551d6
relax snowpark version
tdhooghe Oct 10, 2024
fbb3374
revert pyproject.toml
tdhooghe Oct 15, 2024
fb621ae
bump snowpark version
tdhooghe Oct 15, 2024
cb320bd
relax python version for snowpark
tdhooghe Oct 15, 2024
628def6
downgrade snowpark to 1.22
tdhooghe Oct 15, 2024
136fb09
upgrade cloudpickle to 2.2.1
tdhooghe Oct 15, 2024
fe3bfbe
feat(datasets): create separate `ibis.FileDataset` (#842)
deepyaman Oct 11, 2024
fce2974
ci(docker): Update error code in e2e test (#888)
ankatiyar Oct 14, 2024
60b4a86
update docstring
tdhooghe Oct 15, 2024
3e578c6
test ci/cd without python version 3.12
tdhooghe Oct 15, 2024
a161191
fix linting error
tdhooghe Oct 15, 2024
e4509bc
add max python version for compatibility
tdhooghe Oct 16, 2024
c98aef8
fix error
tdhooghe Oct 17, 2024
e4f1c3c
chore: Update PR template with checkbox for core dataset contribution…
merelcht Oct 15, 2024
bcec735
ci(datasets): Fix mypy errors (#893)
ankatiyar Oct 17, 2024
c243c6e
fix(datasets): default to DuckDB in in-memory mode (#897)
deepyaman Oct 18, 2024
388f455
build(datasets): make Kedro-Datasets 5.1.0 release (#887)
deepyaman Oct 18, 2024
3c85b1f
ci: Add GH action to check for TSC votes on core dataset changes (#896)
merelcht Oct 18, 2024
101696c
update testing framework and docstrings
tdhooghe Oct 21, 2024
c1eeb77
Update kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py
tdhooghe Oct 16, 2024
b9bf73e
validate and check table name for nones
tdhooghe Oct 21, 2024
2c804c9
remove breakpoint
tdhooghe Oct 21, 2024
7b7811d
change doc type hinting for tests to pass
tdhooghe Oct 21, 2024
23b056a
feat(datasets): create separate `ibis.FileDataset` (#842)
deepyaman Oct 11, 2024
27e58cd
ci(datasets): Fix mypy errors (#893)
ankatiyar Oct 17, 2024
178f0f4
more strict type hinting and casting for linting
tdhooghe Oct 21, 2024
0133025
add back python 3.12 to see if it is compatible
tdhooghe Oct 21, 2024
fc2b3b8
update testing documentation
tdhooghe Oct 21, 2024
c18fc13
Merge branch 'main' into feature/save-pd-to-snowflaketable
merelcht Oct 21, 2024
7efd162
remove uv lock
tdhooghe Oct 21, 2024
f9f3d40
Merge remote-tracking branch 'refs/remotes/origin/feature/save-pd-to-…
tdhooghe Oct 21, 2024
620b27a
remove unnecessary change to ibis file
tdhooghe Oct 21, 2024
4d514d9
Update kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py
tdhooghe Oct 21, 2024
2fddf19
Update kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py
tdhooghe Oct 21, 2024
e25be21
touch as little reqs ass possible
tdhooghe Oct 21, 2024
e3bf820
update changelog
tdhooghe Oct 21, 2024
1e0f0c4
bump cloudpickle version to enable snowpark 1.23
tdhooghe Oct 21, 2024
00832f7
better formatting of release
tdhooghe Oct 22, 2024
db5554e
skip tests if python geq 3.12 as not supported by snowpark
tdhooghe Oct 22, 2024
6230dfb
try except block for snowpark imports
tdhooghe Oct 22, 2024
edd2703
put name in right place
tdhooghe Oct 22, 2024
94d7360
add import of snowflaketabledataset in try except block as well
tdhooghe Oct 22, 2024
7a014c4
add if statement to only execute code when snowpark is available
tdhooghe Oct 22, 2024
57ccd10
remove ignore of snowflake in makefile and update test execution logic
tdhooghe Oct 22, 2024
f09c96e
change makefile to include snowflake doctest
tdhooghe Oct 22, 2024
698167d
add if statement also in original code
tdhooghe Oct 22, 2024
5a0dcf8
add snowflake available after succesful import
tdhooghe Oct 22, 2024
076921e
remove try except blocks in code
tdhooghe Oct 22, 2024
004c5e5
echo python version for debugging
tdhooghe Oct 22, 2024
a25b693
also skipifs for fixtures
tdhooghe Oct 22, 2024
fa00b6e
echo python version in cicd
tdhooghe Oct 22, 2024
5e0c92a
try with different make rule
tdhooghe Oct 22, 2024
fbc275d
revert changes to makefile
tdhooghe Oct 22, 2024
6b3e3f3
skip entire module pytest
tdhooghe Oct 22, 2024
d464909
revert local changes
tdhooghe Oct 22, 2024
4810dc9
make docstrings look nicer
tdhooghe Oct 22, 2024
0886454
improve docstring and remove ignore from dataset-doctest
tdhooghe Oct 22, 2024
25c28c4
Merge branch 'main' into feature/save-pd-to-snowflaketable
tdhooghe Oct 22, 2024
624baa7
add back ignore snowflake for doctesting and remove unexpected additi…
tdhooghe Oct 22, 2024
80f7765
only skip for python 3.12 or higher
tdhooghe Oct 22, 2024
ba0a030
remove separate pytest skips as we have a module wide one
tdhooghe Oct 22, 2024
36264c2
remove test from omit in pytproject.toml
tdhooghe Oct 22, 2024
fe43b5c
100 test coverage
tdhooghe Oct 22, 2024
10829ed
bump matplotlib to prevent installing test requirements from failing
tdhooghe Oct 23, 2024
435159b
revert matplotlib version bump
tdhooghe Oct 23, 2024
ba6975c
exclude lines from coverage
tdhooghe Oct 23, 2024
52ce490
only test python 3.12
tdhooghe Oct 23, 2024
71a3801
omit snowflake testing from python 3.12
tdhooghe Oct 23, 2024
b89a552
add python version check
tdhooghe Oct 23, 2024
d967ff5
fix bug in ignoreopts and add back python versions 3.10 and 3.11 in m…
tdhooghe Oct 23, 2024
233b4f0
fix bug in ignoreopts and add back python versions 3.10 and 3.11 in m…
tdhooghe Oct 23, 2024
7349fac
revert requires-python to 3.10
tdhooghe Oct 23, 2024
e64551e
ignore entire folder
tdhooghe Oct 23, 2024
ceacc8c
add back omit for snowflake as python 3.12 is not supported, hence ci…
tdhooghe Oct 23, 2024
e0735ad
revert makefile to original state
tdhooghe Oct 23, 2024
d17b8a6
remove commented out setters
tdhooghe Oct 23, 2024
c5c9e4d
remove commented out fixtures
tdhooghe Oct 23, 2024
fbc2545
Merge branch 'main' into feature/save-pd-to-snowflaketable
tdhooghe Oct 24, 2024
67f24c0
remove commented out fixtures
tdhooghe Oct 23, 2024
be5fb6f
skip test if import is missing
tdhooghe Oct 24, 2024
217e60b
build: Bump matplotlib test dependency (#904)
merelcht Oct 23, 2024
f288ab7
remove snowpark from omit
tdhooghe Oct 24, 2024
6a21d1f
change snowpark to snowflake
tdhooghe Oct 24, 2024
0e2abe3
fix error in import or skip and add noqa for ruff
tdhooghe Oct 24, 2024
c0c42a8
implement pr feedback to raise if data is not pandas or snowpark data…
tdhooghe Oct 24, 2024
41dfa22
add back omit for snowflake as python 3.12 is not supported, hence ci…
tdhooghe Oct 24, 2024
0f3d838
add test for new raising error
tdhooghe Oct 24, 2024
2a688d8
process final pr comments
tdhooghe Oct 24, 2024
a974010
Merge branch 'main' into feature/save-pd-to-snowflaketable
merelcht Oct 25, 2024
660a428
Merge branch 'main' into feature/save-pd-to-snowflaketable
merelcht Oct 25, 2024
ecabf7d
Merge branch 'feature/save-pd-to-snowflaketable' of https://github.co…
merelcht Oct 28, 2024
fb6d73e
Merge branch 'main' into feature/save-pd-to-snowflaketable
merelcht Oct 28, 2024
b6643a2
Merge branch 'feature/save-pd-to-snowflaketable' of https://github.co…
merelcht Oct 28, 2024
77567b7
Fix lint by checking session for None
merelcht Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@

## Major features and improvements

- Added functionality to save Pandas DataFrame directly to Snowflake, facilitating seemless `.csv` ingestion
- Added Python 3.9, 3.10 and 3.11 support for SnowflakeTableDataset
- Added the following new **experimental** datasets:

| Type | Description | Location |
| --------------------------------- | ------------------------------------------------------ | ---------------------------------------- |
| `databricks.ExternalTableDataset` | A dataset for accessing external tables in Databricks. | `kedro_datasets_experimental.databricks` |


## Bug fixes and other changes
- Implemented Snowflake's (local testing framework)[https://docs.snowflake.com/en/developer-guide/snowpark/python/testing-locally] for testing purposes

## Breaking Changes

## Community contributions

Many thanks to the following Kedroids for contributing PRs to this release:

- [Thomas d'Hooghe](https://github.com/tdhooghe)
- [Minura Punchihewa](https://github.com/MinuraPunchihewa)

# Release 5.1.0
Expand Down
250 changes: 155 additions & 95 deletions kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,99 @@
"""``AbstractDataset`` implementation to access Snowflake using Snowpark dataframes
"""
"""``AbstractDataset`` implementation to access Snowflake using Snowpark dataframes"""

from __future__ import annotations

import logging
from typing import Any
from typing import Any, cast

import snowflake.snowpark as sp
import pandas as pd
from kedro.io.core import AbstractDataset, DatasetError
from snowflake.snowpark import DataFrame, Session
from snowflake.snowpark import context as sp_context
from snowflake.snowpark import exceptions as sp_exceptions

logger = logging.getLogger(__name__)


class SnowparkTableDataset(AbstractDataset):
"""``SnowparkTableDataset`` loads and saves Snowpark dataframes.
"""``SnowparkTableDataset`` loads and saves Snowpark DataFrames.

As of Mar-2023, the snowpark connector only works with Python 3.8.
As of October 2024, the Snowpark connector works with Python 3.9, 3.10, and 3.11.
Python 3.12 is not supported yet.

Example usage for the
`YAML API <https://docs.kedro.org/en/stable/data/\
data_catalog_yaml_examples.html>`_:

.. code-block:: yaml

weather:
type: kedro_datasets.snowflake.SnowparkTableDataset
table_name: "weather_data"
database: "meteorology"
schema: "observations"
credentials: db_credentials
save_args:
mode: overwrite
column_order: name
table_type: ''

You can skip everything but "table_name" if the database and
schema are provided via credentials. That way catalog entries can be shorter
if, for example, all used Snowflake tables live in same database/schema.
Values in the dataset definition take priority over those defined in credentials.
weather:
type: kedro_datasets.snowflake.SnowparkTableDataset
table_name: "weather_data"
database: "meteorology"
schema: "observations"
credentials: db_credentials
save_args:
mode: overwrite
column_order: name
table_type: ''

You can skip everything but "table_name" if the database and schema are
provided via credentials. This allows catalog entries to be shorter when
all Snowflake tables are in the same database and schema. Values in the
dataset definition take priority over those defined in credentials.

Example:
Credentials file provides all connection attributes, catalog entry
"weather" reuses credentials parameters, "polygons" catalog entry reuses
all credentials parameters except providing a different schema name.
Second example of credentials file uses ``externalbrowser`` authentication.
The credentials file provides all connection attributes. The catalog entry
for "weather" reuses the credentials parameters, while the "polygons" catalog
entry reuses all credentials parameters except for specifying a different
schema. The second example demonstrates the use of ``externalbrowser`` authentication.

catalog.yml
catalog.yml:

.. code-block:: yaml

weather:
type: kedro_datasets.snowflake.SnowparkTableDataset
table_name: "weather_data"
database: "meteorology"
schema: "observations"
credentials: snowflake_client
save_args:
mode: overwrite
column_order: name
table_type: ''

polygons:
type: kedro_datasets.snowflake.SnowparkTableDataset
table_name: "geopolygons"
credentials: snowflake_client
schema: "geodata"

credentials.yml
weather:
type: kedro_datasets.snowflake.SnowparkTableDataset
table_name: "weather_data"
database: "meteorology"
schema: "observations"
credentials: snowflake_client
save_args:
mode: overwrite
column_order: name
table_type: ''

polygons:
type: kedro_datasets.snowflake.SnowparkTableDataset
table_name: "geopolygons"
credentials: snowflake_client
schema: "geodata"

credentials.yml:

.. code-block:: yaml

snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "service_account_abc"
password: "supersecret"
snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "service_account_abc"
password: "supersecret"

credentials.yml (with externalbrowser authenticator)
credentials.yml (with externalbrowser authentication):

.. code-block:: yaml

snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "[email protected]"
authenticator: "externalbrowser"
snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "[email protected]"
authenticator: "externalbrowser"

"""

Expand All @@ -110,9 +114,11 @@ def __init__( # noqa: PLR0913
load_args: dict[str, Any] | None = None,
save_args: dict[str, Any] | None = None,
credentials: dict[str, Any] | None = None,
session: Session | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new instance of ``SnowparkTableDataset``.
"""
Creates a new instance of ``SnowparkTableDataset``.

Args:
table_name: The table name to load or save data to.
Expand Down Expand Up @@ -154,6 +160,7 @@ def __init__( # noqa: PLR0913
"'schema' must be provided by credentials or dataset."
)
schema = credentials["schema"]

# Handle default load and save arguments
self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}
Expand All @@ -167,6 +174,7 @@ def __init__( # noqa: PLR0913
{"database": self._database, "schema": self._schema}
)
self._connection_parameters = connection_parameters
self._session = session

self.metadata = metadata

Expand All @@ -178,8 +186,9 @@ def _describe(self) -> dict[str, Any]:
}

@staticmethod
def _get_session(connection_parameters) -> sp.Session:
"""Given a connection string, create singleton connection
def _get_session(connection_parameters) -> Session:
"""
Given a connection string, create singleton connection
to be used across all instances of `SnowparkTableDataset` that
need to connect to the same source.
connection_parameters is a dictionary of any values
Expand All @@ -199,45 +208,96 @@ def _get_session(connection_parameters) -> sp.Session:
"""
try:
logger.debug("Trying to reuse active snowpark session...")
session = sp.context.get_active_session()
except sp.exceptions.SnowparkSessionException:
session = sp_context.get_active_session()
except sp_exceptions.SnowparkSessionException:
logger.debug("No active snowpark session found. Creating...")
session = sp.Session.builder.configs(connection_parameters).create()
session = Session.builder.configs(connection_parameters).create()
return session

@property
def _session(self) -> sp.Session:
return self._get_session(self._connection_parameters)
def session(self) -> Session:
"""
Retrieve or create a session.
Returns:
Session: The current session associated with the object.
"""
if not self._session:
self._session = self._get_session(self._connection_parameters)
return self._session

def load(self) -> sp.DataFrame:
table_name: list = [
self._database,
self._schema,
self._table_name,
]
def load(self) -> DataFrame:
"""
Load data from a specified database table.

sp_df = self._session.table(".".join(table_name))
return sp_df
Returns:
DataFrame: The loaded data as a Snowpark DataFrame.
"""
if self._session is None:
raise DatasetError(
"No active session. Please initialise a Snowpark session before loading data."
)
return self._session.table(self._validate_and_get_table_name())

def save(self, data: pd.DataFrame | DataFrame) -> None:
"""
Check if the data is a Snowpark DataFrame or a Pandas DataFrame,
convert it to a Snowpark DataFrame if needed, and save it to the specified table.

def save(self, data: sp.DataFrame) -> None:
table_name = [
self._database,
self._schema,
self._table_name,
]
Args:
data (pd.DataFrame | DataFrame): The data to save.
"""
if self._session is None:
raise DatasetError(
"No active session. Please initialise a Snowpark session before loading data."
)
if isinstance(data, pd.DataFrame):
snowpark_df = self._session.create_dataframe(data)
elif isinstance(data, DataFrame):
snowpark_df = data
else:
raise DatasetError(
f"Data of type {type(data)} is not supported for saving."
)

data.write.save_as_table(table_name, **self._save_args)
snowpark_df.write.save_as_table(
self._validate_and_get_table_name(), **self._save_args
)

def _exists(self) -> bool:
session = self._session
query = "SELECT COUNT(*) FROM {database}.INFORMATION_SCHEMA.TABLES \
WHERE TABLE_SCHEMA = '{schema}' \
AND TABLE_NAME = '{table_name}'"
rows = session.sql(
query.format(
database=self._database,
schema=self._schema,
table_name=self._table_name,
"""
Check if a specified table exists in the database.

Returns:
bool: True if the table exists, False otherwise.
"""
if self._session is None:
raise DatasetError(
"No active session. Please initialise a Snowpark session before loading data."
)
).collect()
return rows[0][0] == 1
try:
self._session.table(
f"{self._database}.{self._schema}.{self._table_name}"
).show()
return True
except Exception as e:
logger.debug(f"Table {self._table_name} does not exist: {e}")
return False

def _validate_and_get_table_name(self) -> str:
"""
Validate that all parts of the table name are not None and join them into a string.

Args:
parts (list[str | None]): The list containing database, schema, and table name.

Returns:
str: The joined table name in the format 'database.schema.table'.

Raises:
ValueError: If any part of the table name is None.
"""
parts: list[str | None] = [self._database, self._schema, self._table_name]
if any(part is None or part == "" for part in parts):
raise DatasetError("Database, schema or table name cannot be None or empty")
parts_str = cast(list[str], parts) # make linting happy
return ".".join(parts_str)
8 changes: 4 additions & 4 deletions kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ polars = [
redis-pickledataset = ["redis~=4.1"]
redis = ["kedro-datasets[redis-pickledataset]"]

snowflake-snowparktabledataset = ["snowflake-snowpark-python~=1.0"]
snowflake-snowparktabledataset = ["snowflake-snowpark-python>=1.23"]
snowflake = ["kedro-datasets[snowflake-snowparktabledataset]"]

spark-deltatabledataset = ["kedro-datasets[spark-base,hdfs-base,s3fs-base,delta-base]"]
Expand Down Expand Up @@ -205,7 +205,7 @@ test = [
"adlfs~=2023.1",
"behave==1.2.6",
"biopython~=1.73",
"cloudpickle<=2.0.0",
"cloudpickle~=2.2.1",
"compress-pickle[lz4]~=2.1.0",
"coverage>=7.2.0",
"dask[complete]>=2021.10",
Expand Down Expand Up @@ -250,7 +250,7 @@ test = [
"requests-mock~=1.6",
"requests~=2.20",
"s3fs>=2021.04",
"snowflake-snowpark-python~=1.0; python_version < '3.11'",
"snowflake-snowpark-python>=1.23; python_version < '3.12'",
"scikit-learn>=1.0.2,<2",
"scipy>=1.7.3",
"packaging",
Expand Down Expand Up @@ -320,7 +320,7 @@ version = {attr = "kedro_datasets.__version__"}
fail_under = 100
show_missing = true
# temporarily ignore kedro_datasets/__init__.py in coverage report
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/snowflake/*", "kedro_datasets/tensorflow/*", "kedro_datasets/__init__.py", "kedro_datasets/conftest.py"]
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/tensorflow/*", "kedro_datasets/snowflake/*", "kedro_datasets/__init__.py", "kedro_datasets/conftest.py"]
exclude_also = ["raise NotImplementedError", "if TYPE_CHECKING:"]

[tool.pytest.ini_options]
Expand Down
Loading
Loading