Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decorator functionality to validate_schema function #255

Open
wants to merge 20 commits into
base: planning-1.0-release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3a36964
All changes to make validate_schema behave both as a function and as …
kunaljubce Jul 16, 2024
aafb6f8
update column extension function names and desc in readme
Jul 12, 2024
40a62b1
DOC: CONTRIBUTING.md - add details on precommit & local GitHub Action…
nijanthanvijayakumar Jul 15, 2024
5e4eafa
Update the CONTRIBUTING.md regarding auto-assign issues
nijanthanvijayakumar Jul 16, 2024
ca75b36
Change pip to poetry for pre-commit installation
nijanthanvijayakumar Jul 16, 2024
35234bb
Bump jupyterlab from 3.6.7 to 3.6.8
dependabot[bot] Aug 29, 2024
cd42f32
Drop Spark-2 support and update dependencies
SemyonSinchenko Jul 14, 2024
3afa41f
Remove extension functions
nijanthanvijayakumar Jul 11, 2024
34fa8e5
Restore SparkSession extension module & delete patching row 48
nijanthanvijayakumar Jul 11, 2024
a5fb013
Move the create_df function to dataframe_helpers
nijanthanvijayakumar Jul 14, 2024
7305223
Updates from review
SemyonSinchenko Jul 14, 2024
501f3b6
Update the poetry & pyproject with the dependencies for Spark-Connect
nijanthanvijayakumar Jul 15, 2024
f7cc2c2
update column extension function names and desc in readme
Jul 12, 2024
398198c
improve documentation of makefile
fpgmaas Jul 15, 2024
8345d1f
Update the files according to the review comments
nijanthanvijayakumar Jul 15, 2024
49c9ca1
apply hotfix
fpgmaas Jul 15, 2024
ff48cb6
improve use of ruff
fpgmaas Jul 15, 2024
9d034d4
relax pre-commit version
fpgmaas Jul 15, 2024
086a5c3
Rebasing branch with latest 1.0 upstream
kunaljubce Sep 7, 2024
1653984
Rebasing branch with latest 1.0 upstream
kunaljubce Sep 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ jobs:
run: |
if [[ "${SPARK_VERSION}" > "3.4" ]]; then
sh scripts/run_spark_connect_server.sh
# The tests should be called from here.
make test
else
echo "Skipping Spark-Connect tests for Spark version <= 3.4"
echo "Skipping Spark-Connect tests for Spark version ${SPARK_VERSION}, which is <= 3.4"
fi

check-license-headers:
Expand Down
98 changes: 98 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Scan through our [existing issues](https://github.com/MrPowers/quinn/issues) to

You can find a list of [good first issues](https://github.com/MrPowers/quinn/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) which can help you better understand code base of the project.

### Auto-assigning issues

We have a workflow that automatically assigns issues to users who comment 'take' on an issue. This is configured in the `.github/workflows/assign-on-comment.yml` file. When a user comments `take` on the issue, a GitHub Action will be run to assign the issue to the user if it's not already assigned.

## Contributing

### Fork the repository
Expand Down Expand Up @@ -49,6 +53,17 @@ make install_deps

To run spark tests you need to have properly configured Java. Apache Spark currently supports mainly only Java 8 (1.8). You can find an instruction on how to set up Java [here](https://www.java.com/en/download/help/download_options.html). When you are running spark tests you should have `JAVA_HOME` variable in your environment which points to the installation of Java 8.

### Pre-commit installation and execution

We use pre-commit hooks to ensure code quality. The configuration for pre-commit hooks is in the `.pre-commit-config.yaml` file. To install pre-commit, run:
```shell
poetry shell
poetry run pre-commit install
```
To run pre-commit hooks manually, use:
```shell
pre-commit run --all-files
```

### Running Tests

Expand All @@ -57,6 +72,89 @@ You can run test as following:
```shell
make test
```

#### GitHub Actions local setup using 'act'

You can run GitHub Actions locally using the `act` tool. The configuration for GitHub Actions is in
the `.github/workflows/ci.yml` file. To install `act`, follow the
instructions [here](https://github.com/nektos/act#installation). To run a specific job, use:

```shell
act -j <job-name>
```

For example, to run the `test` job, use:

```shell
act -j test
```

If you need help with `act`, use:

```shell
act --help
```

For MacBooks with M1 processors, you might have to add the `--container-architecture` tag:

```shell
act -j <job-name> --container-architecture linux/arm64
```

#### Running Spark-Connect tests locally

To run the Spark-Connect tests locally, follow the below steps. Please note, this only works on Mac/UNIX-based systems.

1. **Set up the required environment variables:** Following variables need to be setup, so that the shell script that
is used to install the Spark-Connect binary & start the server picks the version.

The version can either be `3.5.1` or `3.4.3`, as those are the ones used in our CI.

```shell
export SPARK_VERSION=3.5.1
export SPARK_CONNECT_MODE_ENABLED=1
```

2. **Check if the required environment variables are set:** Run the below command to check if the required environment
variables are set.

```shell
echo $SPARK_VERSION
echo $SPARK_CONNECT_MODE_ENABLED
```

3. **Install required system packages:** Run the below command to install wget.

For Mac users:
```shell
brew install wget
```

For Ubuntu users:
```shell
sudo apt-get install wget
```

4. **Execute the shell script:** Run the below command to execute the shell script that installs the Spark-Connect &
starts the server.

```shell
sh scripts/run_spark_connect_server.sh
```

5. **Run the tests:** Run the below command to execute the tests using Spark-Connect.

```shell
make test
```

6. **Cleanups:** After running the tests, you can stop the Spark-Connect server and unset the environment variables.

```shell
unset SPARK_VERSION
unset SPARK_CONNECT_MODE_ENABLED
```

### Code style

This project follows the [PySpark style guide](https://github.com/MrPowers/spark-style-guide/blob/main/PYSPARK_STYLE_GUIDE.md). All public functions and methods should be documented in `README.md` and also should have docstrings in `sphinx format`:
Expand Down
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,28 @@ quinn.validate_presence_of_columns(source_df, ["name", "age", "fun"])

**validate_schema()**

Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`.
Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`. By default, `ignore_nullable` is set to False, so exception will be raised even if column names and data types are matching but nullability conditions are mismatching.

```python
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, df_to_be_validated=source_df)
```

You can also set `ignore_nullable` to True, so the validation will happen only on column names and data types, not on nullability.

```python
quinn.validate_schema(required_schema, ignore_nullable=True, df_to_be_validated=source_df)
```

> [!TIP]
> This function can also be used as a decorator to other functions that return a dataframe. This can help validate the schema of the returned df. When used as a decorator, you don't need to pass the `df_to_be_validated` argument as this validation is performed on the df returned by the base function on which the decorator is applied.
>
> ```python
> @quinn.validate_schema(required_schema, ignore_nullable=True)
> def get_df():
> return df
> ```


**validate_absence_of_columns()**

Raises an exception if `source_df` contains `age` or `cool` columns.
Expand Down
56 changes: 39 additions & 17 deletions quinn/dataframe_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from __future__ import annotations # noqa: I001

import copy
from typing import TYPE_CHECKING
from typing import Any, Callable, TYPE_CHECKING

if TYPE_CHECKING:
from pyspark.sql import DataFrame
Expand Down Expand Up @@ -52,38 +52,60 @@ def validate_presence_of_columns(df: DataFrame, required_col_names: list[str]) -


def validate_schema(
df: DataFrame,
required_schema: StructType,
ignore_nullable: bool = False,
) -> None:
df_to_be_validated: DataFrame = None,
) -> Callable[[Any, Any], Any]:
"""Function that validate if a given DataFrame has a given StructType as its schema.
Implemented as a decorator factory so can be used both as a standalone function or as
a decorator to another function.

:param df: DataFrame to validate
:type df: DataFrame
:param required_schema: StructType required for the DataFrame
:type required_schema: StructType
:param ignore_nullable: (Optional) A flag for if nullable fields should be
ignored during validation
:type ignore_nullable: bool, optional
:param df_to_be_validated: DataFrame to validate, mandatory when called as a function. Not required
when called as a decorator
:type df_to_be_validated: DataFrame

:raises DataFrameMissingStructFieldError: if any StructFields from the required
schema are not included in the DataFrame schema
"""
_all_struct_fields = copy.deepcopy(df.schema)
_required_schema = copy.deepcopy(required_schema)

if ignore_nullable:
for x in _all_struct_fields:
x.nullable = None
def decorator(func: Callable[..., DataFrame]) -> Callable[..., DataFrame]:
def wrapper(*args: object, **kwargs: object) -> DataFrame:
dataframe = func(*args, **kwargs)
_all_struct_fields = copy.deepcopy(dataframe.schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are doing a deep copy of two potentially long schemas on each call. I do not like it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko I took a step back and re-evaluated the implementation. The question is - do we even need deepcopy here? We are not changing the original dfs or their schemas. Is shallow copy a better idea here? Lemme know your thoughts!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sound good!

_required_schema = copy.deepcopy(required_schema)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to check that lengths of both schemas are the same? I mean before do the deepcopy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko Are you suggesting that we check if the length matches, only then we do the deepcopy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! Deep copy may be quite expensive for big objects.

for x in _required_schema:
x.nullable = None
if ignore_nullable:
for x in _all_struct_fields:
x.nullable = None

missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields]
error_message = f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}"
for x in _required_schema:
x.nullable = None

if missing_struct_fields:
raise DataFrameMissingStructFieldError(error_message)
missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to transform this comprehension into a loop, like:

for field_name in _required_schema.fieldNames():
   if field_name not in ...
   else:
       if ignore_nullable:
           %% compare name, dataType %%
       else:
           %% compare name, dataType, nullabe %% 

error_message = (
f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of nested schemas this error message will be unreadable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko How do you suggest we put this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to flatten all field first to the way parent.parent.field and only after that render all the missing fields. I would suggest to put the flattening logic in a separate function

)

if missing_struct_fields:
raise DataFrameMissingStructFieldError(error_message)

print("Success! DataFrame matches the required schema!")

return dataframe

return wrapper

if df_to_be_validated is None:
# This means the function is being used as a decorator
return decorator

# This means the function is being called directly with a DataFrame
return decorator(lambda: df_to_be_validated)()


def validate_absence_of_columns(df: DataFrame, prohibited_col_names: list[str]) -> None:
Expand Down
6 changes: 3 additions & 3 deletions tests/test_dataframe_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def it_raises_when_struct_field_is_missing1():
]
)
with pytest.raises(quinn.DataFrameMissingStructFieldError) as excinfo:
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, df_to_be_validated=source_df)

current_spark_version = semver.Version.parse(spark.version)
spark_330 = semver.Version.parse("3.3.0")
Expand All @@ -53,7 +53,7 @@ def it_does_nothing_when_the_schema_matches():
StructField("age", LongType(), True),
]
)
quinn.validate_schema(source_df, required_schema)
quinn.validate_schema(required_schema, df_to_be_validated=source_df)

def nullable_column_mismatches_are_ignored():
data = [("jose", 1), ("li", 2), ("luisa", 3)]
Expand All @@ -64,7 +64,7 @@ def nullable_column_mismatches_are_ignored():
StructField("age", LongType(), False),
]
)
quinn.validate_schema(source_df, required_schema, ignore_nullable=True)
quinn.validate_schema(required_schema, ignore_nullable=True, df_to_be_validated=source_df)


def describe_validate_absence_of_columns():
Expand Down
Loading