diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4bec521d..939b5c86 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -74,9 +74,9 @@ jobs: run: | if [[ "${SPARK_VERSION}" > "3.4" ]]; then sh scripts/run_spark_connect_server.sh - # The tests should be called from here. + make test else - echo "Skipping Spark-Connect tests for Spark version <= 3.4" + echo "Skipping Spark-Connect tests for Spark version ${SPARK_VERSION}, which is <= 3.4" fi check-license-headers: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2fbcb3d4..7827d2d2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -14,6 +14,10 @@ Scan through our [existing issues](https://github.com/MrPowers/quinn/issues) to You can find a list of [good first issues](https://github.com/MrPowers/quinn/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) which can help you better understand code base of the project. +### Auto-assigning issues + +We have a workflow that automatically assigns issues to users who comment 'take' on an issue. This is configured in the `.github/workflows/assign-on-comment.yml` file. When a user comments `take` on the issue, a GitHub Action will be run to assign the issue to the user if it's not already assigned. + ## Contributing ### Fork the repository @@ -49,6 +53,17 @@ make install_deps To run spark tests you need to have properly configured Java. Apache Spark currently supports mainly only Java 8 (1.8). You can find an instruction on how to set up Java [here](https://www.java.com/en/download/help/download_options.html). When you are running spark tests you should have `JAVA_HOME` variable in your environment which points to the installation of Java 8. +### Pre-commit installation and execution + +We use pre-commit hooks to ensure code quality. The configuration for pre-commit hooks is in the `.pre-commit-config.yaml` file. To install pre-commit, run: +```shell +poetry shell +poetry run pre-commit install +``` +To run pre-commit hooks manually, use: +```shell +pre-commit run --all-files +``` ### Running Tests @@ -57,6 +72,89 @@ You can run test as following: ```shell make test ``` + +#### GitHub Actions local setup using 'act' + +You can run GitHub Actions locally using the `act` tool. The configuration for GitHub Actions is in +the `.github/workflows/ci.yml` file. To install `act`, follow the +instructions [here](https://github.com/nektos/act#installation). To run a specific job, use: + +```shell +act -j +``` + +For example, to run the `test` job, use: + +```shell +act -j test +``` + +If you need help with `act`, use: + +```shell +act --help +``` + +For MacBooks with M1 processors, you might have to add the `--container-architecture` tag: + +```shell +act -j --container-architecture linux/arm64 +``` + +#### Running Spark-Connect tests locally + +To run the Spark-Connect tests locally, follow the below steps. Please note, this only works on Mac/UNIX-based systems. + +1. **Set up the required environment variables:** Following variables need to be setup, so that the shell script that + is used to install the Spark-Connect binary & start the server picks the version. + + The version can either be `3.5.1` or `3.4.3`, as those are the ones used in our CI. + + ```shell + export SPARK_VERSION=3.5.1 + export SPARK_CONNECT_MODE_ENABLED=1 + ``` + +2. **Check if the required environment variables are set:** Run the below command to check if the required environment + variables are set. + + ```shell + echo $SPARK_VERSION + echo $SPARK_CONNECT_MODE_ENABLED + ``` + +3. **Install required system packages:** Run the below command to install wget. + + For Mac users: + ```shell + brew install wget + ``` + + For Ubuntu users: + ```shell + sudo apt-get install wget + ``` + +4. **Execute the shell script:** Run the below command to execute the shell script that installs the Spark-Connect & + starts the server. + + ```shell + sh scripts/run_spark_connect_server.sh + ``` + +5. **Run the tests:** Run the below command to execute the tests using Spark-Connect. + + ```shell + make test + ``` + +6. **Cleanups:** After running the tests, you can stop the Spark-Connect server and unset the environment variables. + + ```shell + unset SPARK_VERSION + unset SPARK_CONNECT_MODE_ENABLED + ``` + ### Code style This project follows the [PySpark style guide](https://github.com/MrPowers/spark-style-guide/blob/main/PYSPARK_STYLE_GUIDE.md). All public functions and methods should be documented in `README.md` and also should have docstrings in `sphinx format`: diff --git a/README.md b/README.md index 1fbaa060..c9513c3a 100644 --- a/README.md +++ b/README.md @@ -41,12 +41,28 @@ quinn.validate_presence_of_columns(source_df, ["name", "age", "fun"]) **validate_schema()** -Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`. +Raises an exception unless `source_df` contains all the `StructFields` defined in the `required_schema`. By default, `ignore_nullable` is set to False, so exception will be raised even if column names and data types are matching but nullability conditions are mismatching. ```python -quinn.validate_schema(source_df, required_schema) +quinn.validate_schema(required_schema, df_to_be_validated=source_df) ``` +You can also set `ignore_nullable` to True, so the validation will happen only on column names and data types, not on nullability. + +```python +quinn.validate_schema(required_schema, ignore_nullable=True, df_to_be_validated=source_df) +``` + +> [!TIP] +> This function can also be used as a decorator to other functions that return a dataframe. This can help validate the schema of the returned df. When used as a decorator, you don't need to pass the `df_to_be_validated` argument as this validation is performed on the df returned by the base function on which the decorator is applied. +> +> ```python +> @quinn.validate_schema(required_schema, ignore_nullable=True) +> def get_df(): +> return df +> ``` + + **validate_absence_of_columns()** Raises an exception if `source_df` contains `age` or `cool` columns. diff --git a/quinn/dataframe_validator.py b/quinn/dataframe_validator.py index 54004850..a3a07697 100644 --- a/quinn/dataframe_validator.py +++ b/quinn/dataframe_validator.py @@ -11,10 +11,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations +from __future__ import annotations # noqa: I001 import copy -from typing import TYPE_CHECKING +from typing import Any, Callable, TYPE_CHECKING if TYPE_CHECKING: from pyspark.sql import DataFrame @@ -52,38 +52,60 @@ def validate_presence_of_columns(df: DataFrame, required_col_names: list[str]) - def validate_schema( - df: DataFrame, required_schema: StructType, ignore_nullable: bool = False, -) -> None: + df_to_be_validated: DataFrame = None, +) -> Callable[[Any, Any], Any]: """Function that validate if a given DataFrame has a given StructType as its schema. + Implemented as a decorator factory so can be used both as a standalone function or as + a decorator to another function. - :param df: DataFrame to validate - :type df: DataFrame :param required_schema: StructType required for the DataFrame :type required_schema: StructType :param ignore_nullable: (Optional) A flag for if nullable fields should be ignored during validation :type ignore_nullable: bool, optional + :param df_to_be_validated: DataFrame to validate, mandatory when called as a function. Not required + when called as a decorator + :type df_to_be_validated: DataFrame :raises DataFrameMissingStructFieldError: if any StructFields from the required schema are not included in the DataFrame schema """ - _all_struct_fields = copy.deepcopy(df.schema) - _required_schema = copy.deepcopy(required_schema) - if ignore_nullable: - for x in _all_struct_fields: - x.nullable = None + def decorator(func: Callable[..., DataFrame]) -> Callable[..., DataFrame]: + def wrapper(*args: object, **kwargs: object) -> DataFrame: + dataframe = func(*args, **kwargs) + _all_struct_fields = copy.deepcopy(dataframe.schema) + _required_schema = copy.deepcopy(required_schema) - for x in _required_schema: - x.nullable = None + if ignore_nullable: + for x in _all_struct_fields: + x.nullable = None - missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields] - error_message = f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}" + for x in _required_schema: + x.nullable = None - if missing_struct_fields: - raise DataFrameMissingStructFieldError(error_message) + missing_struct_fields = [x for x in _required_schema if x not in _all_struct_fields] + error_message = ( + f"The {missing_struct_fields} StructFields are not included in the DataFrame with the following StructFields {_all_struct_fields}" + ) + + if missing_struct_fields: + raise DataFrameMissingStructFieldError(error_message) + + print("Success! DataFrame matches the required schema!") + + return dataframe + + return wrapper + + if df_to_be_validated is None: + # This means the function is being used as a decorator + return decorator + + # This means the function is being called directly with a DataFrame + return decorator(lambda: df_to_be_validated)() def validate_absence_of_columns(df: DataFrame, prohibited_col_names: list[str]) -> None: diff --git a/tests/test_dataframe_validator.py b/tests/test_dataframe_validator.py index 19ac7b06..a71a55f3 100644 --- a/tests/test_dataframe_validator.py +++ b/tests/test_dataframe_validator.py @@ -34,7 +34,7 @@ def it_raises_when_struct_field_is_missing1(): ] ) with pytest.raises(quinn.DataFrameMissingStructFieldError) as excinfo: - quinn.validate_schema(source_df, required_schema) + quinn.validate_schema(required_schema, df_to_be_validated=source_df) current_spark_version = semver.Version.parse(spark.version) spark_330 = semver.Version.parse("3.3.0") @@ -53,7 +53,7 @@ def it_does_nothing_when_the_schema_matches(): StructField("age", LongType(), True), ] ) - quinn.validate_schema(source_df, required_schema) + quinn.validate_schema(required_schema, df_to_be_validated=source_df) def nullable_column_mismatches_are_ignored(): data = [("jose", 1), ("li", 2), ("luisa", 3)] @@ -64,7 +64,7 @@ def nullable_column_mismatches_are_ignored(): StructField("age", LongType(), False), ] ) - quinn.validate_schema(source_df, required_schema, ignore_nullable=True) + quinn.validate_schema(required_schema, ignore_nullable=True, df_to_be_validated=source_df) def describe_validate_absence_of_columns():