Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decorator functionality to validate_schema function #255

Open
wants to merge 20 commits into
base: planning-1.0-release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3a36964
All changes to make validate_schema behave both as a function and as …
kunaljubce Jul 16, 2024
aafb6f8
update column extension function names and desc in readme
Jul 12, 2024
40a62b1
DOC: CONTRIBUTING.md - add details on precommit & local GitHub Action…
nijanthanvijayakumar Jul 15, 2024
5e4eafa
Update the CONTRIBUTING.md regarding auto-assign issues
nijanthanvijayakumar Jul 16, 2024
ca75b36
Change pip to poetry for pre-commit installation
nijanthanvijayakumar Jul 16, 2024
35234bb
Bump jupyterlab from 3.6.7 to 3.6.8
dependabot[bot] Aug 29, 2024
cd42f32
Drop Spark-2 support and update dependencies
SemyonSinchenko Jul 14, 2024
3afa41f
Remove extension functions
nijanthanvijayakumar Jul 11, 2024
34fa8e5
Restore SparkSession extension module & delete patching row 48
nijanthanvijayakumar Jul 11, 2024
a5fb013
Move the create_df function to dataframe_helpers
nijanthanvijayakumar Jul 14, 2024
7305223
Updates from review
SemyonSinchenko Jul 14, 2024
501f3b6
Update the poetry & pyproject with the dependencies for Spark-Connect
nijanthanvijayakumar Jul 15, 2024
f7cc2c2
update column extension function names and desc in readme
Jul 12, 2024
398198c
improve documentation of makefile
fpgmaas Jul 15, 2024
8345d1f
Update the files according to the review comments
nijanthanvijayakumar Jul 15, 2024
49c9ca1
apply hotfix
fpgmaas Jul 15, 2024
ff48cb6
improve use of ruff
fpgmaas Jul 15, 2024
9d034d4
relax pre-commit version
fpgmaas Jul 15, 2024
086a5c3
Rebasing branch with latest 1.0 upstream
kunaljubce Sep 7, 2024
1653984
Rebasing branch with latest 1.0 upstream
kunaljubce Sep 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,28 @@ quinn.validate_presence_of_columns(source_df, ["name", "age", "fun"])

**validate_schema()**

Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`.
Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`. By default, `ignore_nullable` is set to False, so exception will be raised even if column names and data types are matching but nullability conditions are mismatching.

```python
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, df_to_be_validated=source_df)
```

You can also set `ignore_nullable` to True, so the validation will happen only on column names and data types, not on nullability.

```python
quinn.validate_schema(required_schema, ignore_nullable=True, df_to_be_validated=source_df)
```

> [!TIP]
> This function can also be used as a decorator to other functions that return a dataframe. This can help validate the schema of the returned df. When used as a decorator, you don't need to pass the `df_to_be_validated` argument as this validation is performed on the df returned by the base function on which the decorator is applied.
>
> ```python
> @quinn.validate_schema(required_schema, ignore_nullable=True)
> def get_df():
> return df
> ```


**validate_absence_of_columns()**

Raises an exception if `source_df` contains `age` or `cool` columns.
Expand Down Expand Up @@ -478,15 +494,15 @@ from quinn.extensions import *

**is_falsy()**

Returns a Column indicating whether all values in the Column are False or NULL: `True` if `has_stuff` is `None` or `False`.
Returns a column indicating whether all values in the column are False or NULL: `True` if `has_stuff` is `None` or `False`.

```python
source_df.withColumn("is_stuff_falsy", F.col("has_stuff").isFalsy())
```

**is_truthy()**

Calculates a boolean expression that is the opposite of is_falsy for the given Column: `True` unless `has_stuff` is `None` or `False`.
Calculates a boolean expression that is the opposite of is_falsy for the given column: `True` unless `has_stuff` is `None` or `False`.

```python
source_df.withColumn("is_stuff_truthy", F.col("has_stuff").isTruthy())
Expand Down
56 changes: 39 additions & 17 deletions quinn/dataframe_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from __future__ import annotations # noqa: I001

import copy
from typing import TYPE_CHECKING
from typing import Any, Callable, TYPE_CHECKING

if TYPE_CHECKING:
from pyspark.sql import DataFrame
Expand Down Expand Up @@ -52,38 +52,60 @@ def validate_presence_of_columns(df: DataFrame, required_col_names: list[str]) -


def validate_schema(
df: DataFrame,
required_schema: StructType,
ignore_nullable: bool = False,
) -> None:
df_to_be_validated: DataFrame = None,
) -> Callable[[Any, Any], Any]:
"""Function that validate if a given DataFrame has a given StructType as its schema.
Implemented as a decorator factory so can be used both as a standalone function or as
a decorator to another function.

:param df: DataFrame to validate
:type df: DataFrame
:param required_schema: StructType required for the DataFrame
:type required_schema: StructType
:param ignore_nullable: (Optional) A flag for if nullable fields should be
ignored during validation
:type ignore_nullable: bool, optional
:param df_to_be_validated: DataFrame to validate, mandatory when called as a function. Not required
when called as a decorator
:type df_to_be_validated: DataFrame

:raises DataFrameMissingStructFieldError: if any StructFields from the required
schema are not included in the DataFrame schema
"""
_all_struct_fields = copy.deepcopy(df.schema)
_required_schema = copy.deepcopy(required_schema)

if ignore_nullable:
for x in _all_struct_fields:
x.nullable = None
def decorator(func: Callable[..., DataFrame]) -> Callable[..., DataFrame]:
def wrapper(*args: object, **kwargs: object) -> DataFrame:
dataframe = func(*args, **kwargs)
_all_struct_fields = copy.deepcopy(dataframe.schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are doing a deep copy of two potentially long schemas on each call. I do not like it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko I took a step back and re-evaluated the implementation. The question is - do we even need deepcopy here? We are not changing the original dfs or their schemas. Is shallow copy a better idea here? Lemme know your thoughts!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sound good!

_required_schema = copy.deepcopy(required_schema)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to check that lengths of both schemas are the same? I mean before do the deepcopy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko Are you suggesting that we check if the length matches, only then we do the deepcopy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! Deep copy may be quite expensive for big objects.

for x in _required_schema:
x.nullable = None
if ignore_nullable:
for x in _all_struct_fields:
x.nullable = None

missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields]
error_message = f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}"
for x in _required_schema:
x.nullable = None

if missing_struct_fields:
raise DataFrameMissingStructFieldError(error_message)
missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to transform this comprehension into a loop, like:

for field_name in _required_schema.fieldNames():
   if field_name not in ...
   else:
       if ignore_nullable:
           %% compare name, dataType %%
       else:
           %% compare name, dataType, nullabe %% 

error_message = (
f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of nested schemas this error message will be unreadable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko How do you suggest we put this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to flatten all field first to the way parent.parent.field and only after that render all the missing fields. I would suggest to put the flattening logic in a separate function

)

if missing_struct_fields:
raise DataFrameMissingStructFieldError(error_message)

print("Success! DataFrame matches the required schema!")

return dataframe

return wrapper

if df_to_be_validated is None:
# This means the function is being used as a decorator
return decorator

# This means the function is being called directly with a DataFrame
return decorator(lambda: df_to_be_validated)()


def validate_absence_of_columns(df: DataFrame, prohibited_col_names: list[str]) -> None:
Expand Down
6 changes: 3 additions & 3 deletions tests/test_dataframe_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def it_raises_when_struct_field_is_missing1():
]
)
with pytest.raises(quinn.DataFrameMissingStructFieldError) as excinfo:
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, df_to_be_validated=source_df)

current_spark_version = semver.Version.parse(spark.version)
spark_330 = semver.Version.parse("3.3.0")
Expand All @@ -53,7 +53,7 @@ def it_does_nothing_when_the_schema_matches():
StructField("age", LongType(), True),
]
)
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, df_to_be_validated=source_df)

def nullable_column_mismatches_are_ignored():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
Expand All @@ -64,7 +64,7 @@ def nullable_column_mismatches_are_ignored():
StructField("age", LongType(), False),
]
)
quinn.validate_schema(source_df, required_schema, ignore_nullable=True)
quinn.validate_schema(required_schema, ignore_nullable=True, df_to_be_validated=source_df)


def describe_validate_absence_of_columns():
Expand Down
Loading