Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create some "schema safe append" functionality #50

Closed
MrPowers opened this issue Feb 9, 2023 · 17 comments · Fixed by #86
Closed

Create some "schema safe append" functionality #50

MrPowers opened this issue Feb 9, 2023 · 17 comments · Fixed by #86

Comments

@MrPowers
Copy link
Collaborator

MrPowers commented Feb 9, 2023

The function should append the data if the append_df has a schema that matches the df exactly.

If the schema doesn't match exactly, then it should error out.

This "schema safe append" could prevent bad appends.

@puneetsharma04
Copy link
Contributor

@MrPowers :
Could you please check the below given function which compares the source schema with target schema?

from pyspark.sql.types import StructType

def append_if_schema_identical(source_df, target_df):
    # Retrieve the schemas of the source and target dataframes
    source_schema = source_df.schema
    target_schema = target_df.schema

    # Convert the schemas to a list of tuples
    source_schema_list = [(field.name, str(field.dataType)) for field in source_schema]
    target_schema_list = [(field.name, str(field.dataType)) for field in target_schema]

    # Compare the two schema lists
    if source_schema_list == target_schema_list:
        # Append the dataframes if the schemas are identical
        appended_df = source_df.union(target_df)
        return appended_df
    else:
        # Raise an error if the schemas are different
        raise ValueError("The schemas of the source and target dataframes are not identical.")

# Create the source and target dataframes
source_data = [(1, "Alice", 25), (2, "Bob", 30)]
target_data = [(3, "Charlie", "New York"), (4, "Dave", "Los Angeles")]

source_df = spark.createDataFrame(source_data, schema=StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("age", IntegerType())
]))

target_df = spark.createDataFrame(target_data, schema=StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("city", StringType())
]))

# Call the append_if_schema_identical function
appended_df = append_if_schema_identical(source_df, target_df)

Let me know if thats as per the expectation.

@MrPowers
Copy link
Collaborator Author

@puneetsharma04 - I realized I specified this issue pretty poorly, sorry for the confusion.

Suppose there is an existing Parquet table. This schema_safe_append function should only append the data if the append_df matches the schema of the existing parquet table.

Something like this: schema_safe_append(destination_path, append_df).

spark.read.parquet(destination_path).schema() should be the same as append_df.schema(). This isn't necessary for Delta tables because they have schema enforcement already built-in.

@puneetsharma04
Copy link
Contributor

@MrPowers : Thanks for sharing the update.
I have made the changes as per your advice.

from pyspark.sql import SparkSession

def append_if_schema_identical(source_df, target_df):
    # Retrieve the schemas of the source and target dataframes
    source_schema = source_df.schema
    target_schema = target_df.schema

    # Convert the schemas to a list of tuples
    source_schema_list = [(field.name, str(field.dataType)) for field in source_schema]
    target_schema_list = [(field.name, str(field.dataType)) for field in target_schema]

    # Compare the two schema lists
    if source_schema_list == target_schema_list:
        # Append the dataframes if the schemas are identical
        appended_df = source_df.union(target_df)
        return appended_df
    else:
        # Raise an error if the schemas are different
        raise ValueError("The schemas of the source and target dataframes are not identical.")

# Create a SparkSession
spark = SparkSession.builder.appName("Schema Comparison").getOrCreate()

# Read the source and target parquet files
source_df = spark.read.parquet("source.parquet")
target_df = spark.read.parquet("target.parquet")

# Call the append_if_schema_identical function
appended_df = append_if_schema_identical(source_df, target_df)

# Write the result to a new parquet file
appended_df.write.mode("overwrite").parquet("result.parquet")

Could you please check if this looks as per the expectations or you may expect some more changes to it ?

@puneetsharma04
Copy link
Contributor

@MrPowers : Any updates on this issue ? so that i can go ahead with creating PR.

@puneetsharma04
Copy link
Contributor

puneetsharma04 commented Mar 17, 2023

@MrPowers & @cosmincatalin
Tried testing the code in pycharm. However, getting the below given , if someone has faced it earlier, so that any help can be provided in this context.
Below given is the code i am trying to test.
image

Error given below:
image

Thanks in Advance!

@cosmincatalin
Copy link
Contributor

@puneetsharma04 I'm not sure why you've tagged me, does the issue have anything to do with #44? I can otherwise do my best to diagnose the issue anyway 🙂, just want to make sure I have the correct context.

@puneetsharma04
Copy link
Contributor

@cosmincatalin : Thanks for reverting back 🙂 , actually the issue is not related to the #44 , however as you are among the other contributors and may have faced same kind of issue while testing the code in Pycharm.
So just checking in this form if someone can share tips or hints to resolve this, so that i would be able to contribute to this issue.

@SemyonSinchenko
Copy link
Collaborator

SemyonSinchenko commented Mar 17, 2023

However, getting the below given , if someone has faced it earlier, so that any help can be provided in this context.

Please may you share the whole error? Also maybe stupid question but did you configure Java properly? Because it looks like py4j failed in creating a gateway. Are you getting this error on this test only or on other tests too?

@puneetsharma04
Copy link
Contributor

@SemyonSinchenko : Thanks for getting back on this issue. I have installed the java and tested running java program on the MAC OS. However, below given is the full description of error. I am getting this error for all the tests.
`
/Users/puneet_sharma1/Documents/GitHub/quinn-fixing/venv/bin/python /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py --path /Users/puneet_sharma1/Documents/GitHub/quinn-fixing/tests/test_append_if_schema_identical.py
Testing started at 22:01 ...
Launching pytest with arguments /Users/puneet_sharma1/Documents/GitHub/quinn-fixing/tests/test_append_if_schema_identical.py in /Users/puneet_sharma1/Documents/GitHub/quinn-fixing/tests

============================= test session starts ==============================
platform darwin -- Python 3.7.5, pytest-3.2.2, py-1.11.0, pluggy-0.4.0 -- /Users/puneet_sharma1/Documents/GitHub/quinn-fixing/venv/bin/python
cachedir: .cache
rootdir: /Users/puneet_sharma1/Documents/GitHub/quinn-fixing/tests, inifile:
collecting ... collected 1 item

test_append_if_schema_identical.py::test_append_if_schema_identical ERRORException in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.unsafe.array.ByteArrayMethods.(ByteArrayMethods.java:54)
at org.apache.spark.internal.config.package$.(package.scala:1006)
at org.apache.spark.internal.config.package$.(package.scala)
at org.apache.spark.deploy.SparkSubmitArguments.$anonfun$loadEnvironmentArguments$3(SparkSubmitArguments.scala:157)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:157)
at org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:115)
at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.(SparkSubmit.scala:990)
at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:990)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" to unnamed module @4d740d85
at java.base/java.lang.reflect.AccessibleObject.throwInaccessibleObjectException(AccessibleObject.java:387)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:363)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:311)
at java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:192)
at java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:185)
at org.apache.spark.unsafe.Platform.(Platform.java:56)
... 13 more

test setup failed
@pytest.fixture(scope="session")
def spark():
spark = SparkProvider.set_up_spark(

      "Testing", "local[*]", extra_dependencies=[], conf=test_spark_conf()
    )

conftest.py:37:


../quinn/spark.py:49: in set_up_spark
.config(conf=conf)
../venv/lib/python3.7/site-packages/pyspark/sql/session.py:186: in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
../venv/lib/python3.7/site-packages/pyspark/context.py:378: in getOrCreate
SparkContext(conf=conf or SparkConf())
../venv/lib/python3.7/site-packages/pyspark/context.py:133: in init
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
../venv/lib/python3.7/site-packages/pyspark/context.py:327: in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)


conf = <pyspark.conf.SparkConf object at 0x7fbe2875e150>
popen_kwargs = {'env': {'COMMAND_MODE': 'unix2003', 'HOME': '/Users/puneet_sharma1', 'LC_CTYPE': 'en_US.UTF-8', 'LOGNAME': 'puneet_sharma1', ...}, 'preexec_fn': <function launch_gateway..preexec_func at 0x7fbdf81cf5f0>, 'stdin': -1}

def launch_gateway(conf=None, popen_kwargs=None):
    """
    launch jvm gateway
    :param conf: spark configuration passed to spark-submit
    :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning
        the py4j JVM. This is a developer feature intended for use in
        customizing how pyspark interacts with the py4j JVM (e.g., capturing
        stdout/stderr).
    :return:
    """
    if "PYSPARK_GATEWAY_PORT" in os.environ:
        gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
        gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
        # Process already exists
        proc = None
    else:
        SPARK_HOME = _find_spark_home()
        # Launch the Py4j gateway using Spark's run command so that we pick up the
        # proper classpath and settings from spark-env.sh
        on_windows = platform.system() == "Windows"
        script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
        command = [os.path.join(SPARK_HOME, script)]
        if conf:
            for k, v in conf.getAll():
                command += ['--conf', '%s=%s' % (k, v)]
        submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
        if os.environ.get("SPARK_TESTING"):
            submit_args = ' '.join([
                "--conf spark.ui.enabled=false",
                submit_args
            ])
        command = command + shlex.split(submit_args)

        # Create a temporary directory where the gateway server should write the connection
        # information.
        conn_info_dir = tempfile.mkdtemp()
        try:
            fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
            os.close(fd)
            os.unlink(conn_info_file)

            env = dict(os.environ)
            env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file

            # Launch the Java gateway.
            popen_kwargs = {} if popen_kwargs is None else popen_kwargs
            # We open a pipe to stdin so that the Java gateway can die when the pipe is broken
            popen_kwargs['stdin'] = PIPE
            # We always set the necessary environment variables.
            popen_kwargs['env'] = env
            if not on_windows:
                # Don't send ctrl-c / SIGINT to the Java gateway:
                def preexec_func():
                    signal.signal(signal.SIGINT, signal.SIG_IGN)
                popen_kwargs['preexec_fn'] = preexec_func
                proc = Popen(command, **popen_kwargs)
            else:
                # preexec_fn not supported on Windows
                proc = Popen(command, **popen_kwargs)

            # Wait for the file to appear, or for the process to exit, whichever happens first.
            while not proc.poll() and not os.path.isfile(conn_info_file):
                time.sleep(0.1)

            if not os.path.isfile(conn_info_file):
              raise Exception("Java gateway process exited before sending its port number")

E Exception: Java gateway process exited before sending its port number

../venv/lib/python3.7/site-packages/pyspark/java_gateway.py:105: Exception

==================================== ERRORS ====================================
______________ ERROR at setup of test_append_if_schema_identical _______________

@pytest.fixture(scope="session")
def spark():
    spark = SparkProvider.set_up_spark(
      "Testing", "local[*]", extra_dependencies=[], conf=test_spark_conf()
    )

conftest.py:37:


../quinn/spark.py:49: in set_up_spark
.config(conf=conf)
../venv/lib/python3.7/site-packages/pyspark/sql/session.py:186: in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
../venv/lib/python3.7/site-packages/pyspark/context.py:378: in getOrCreate
SparkContext(conf=conf or SparkConf())
../venv/lib/python3.7/site-packages/pyspark/context.py:133: in init
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
../venv/lib/python3.7/site-packages/pyspark/context.py:327: in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)


conf = <pyspark.conf.SparkConf object at 0x7fbe2875e150>
popen_kwargs = {'env': {'COMMAND_MODE': 'unix2003', 'HOME': '/Users/puneet_sharma1', 'LC_CTYPE': 'en_US.UTF-8', 'LOGNAME': 'puneet_sharma1', ...}, 'preexec_fn': <function launch_gateway..preexec_func at 0x7fbdf81cf5f0>, 'stdin': -1}

def launch_gateway(conf=None, popen_kwargs=None):
    """
    launch jvm gateway
    :param conf: spark configuration passed to spark-submit
    :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning
        the py4j JVM. This is a developer feature intended for use in
        customizing how pyspark interacts with the py4j JVM (e.g., capturing
        stdout/stderr).
    :return:
    """
    if "PYSPARK_GATEWAY_PORT" in os.environ:
        gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
        gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
        # Process already exists
        proc = None
    else:
        SPARK_HOME = _find_spark_home()
        # Launch the Py4j gateway using Spark's run command so that we pick up the
        # proper classpath and settings from spark-env.sh
        on_windows = platform.system() == "Windows"
        script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
        command = [os.path.join(SPARK_HOME, script)]
        if conf:
            for k, v in conf.getAll():
                command += ['--conf', '%s=%s' % (k, v)]
        submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
        if os.environ.get("SPARK_TESTING"):
            submit_args = ' '.join([
                "--conf spark.ui.enabled=false",
                submit_args
            ])
        command = command + shlex.split(submit_args)

        # Create a temporary directory where the gateway server should write the connection
        # information.
        conn_info_dir = tempfile.mkdtemp()
        try:
            fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
            os.close(fd)
            os.unlink(conn_info_file)

            env = dict(os.environ)
            env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file

            # Launch the Java gateway.
            popen_kwargs = {} if popen_kwargs is None else popen_kwargs
            # We open a pipe to stdin so that the Java gateway can die when the pipe is broken
            popen_kwargs['stdin'] = PIPE
            # We always set the necessary environment variables.
            popen_kwargs['env'] = env
            if not on_windows:
                # Don't send ctrl-c / SIGINT to the Java gateway:
                def preexec_func():
                    signal.signal(signal.SIGINT, signal.SIG_IGN)
                popen_kwargs['preexec_fn'] = preexec_func
                proc = Popen(command, **popen_kwargs)
            else:
                # preexec_fn not supported on Windows
                proc = Popen(command, **popen_kwargs)

            # Wait for the file to appear, or for the process to exit, whichever happens first.
            while not proc.poll() and not os.path.isfile(conn_info_file):
                time.sleep(0.1)

            if not os.path.isfile(conn_info_file):
              raise Exception("Java gateway process exited before sending its port number")

E Exception: Java gateway process exited before sending its port number

../venv/lib/python3.7/site-packages/pyspark/java_gateway.py:105: Exception
---------------------------- Captured stderr setup -----------------------------
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.unsafe.array.ByteArrayMethods.(ByteArrayMethods.java:54)
at org.apache.spark.internal.config.package$.(package.scala:1006)
at org.apache.spark.internal.config.package$.(package.scala)
at org.apache.spark.deploy.SparkSubmitArguments.$anonfun$loadEnvironmentArguments$3(SparkSubmitArguments.scala:157)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:157)
at org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:115)
at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.(SparkSubmit.scala:990)
at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:990)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" to unnamed module @4d740d85
at java.base/java.lang.reflect.AccessibleObject.throwInaccessibleObjectException(AccessibleObject.java:387)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:363)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:311)
at java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:192)
at java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:185)
at org.apache.spark.unsafe.Platform.(Platform.java:56)
... 13 more
=========================== 1 error in 0.95 seconds ============================

Process finished with exit code 1

`

@SemyonSinchenko
Copy link
Collaborator

Which Java version are you using?

@puneetsharma04
Copy link
Contributor

Details of Java version:
java version "19.0.2" 2023-01-17
Java(TM) SE Runtime Environment (build 19.0.2+7-44)
Java HotSpot(TM) 64-Bit Server VM (build 19.0.2+7-44, mixed mode, sharing)

@SemyonSinchenko
Copy link
Collaborator

SemyonSinchenko commented Mar 17, 2023

Details of Java version:
java version "19.0.2" 2023-01-17
Java(TM) SE Runtime Environment (build 19.0.2+7-44)
Java HotSpot(TM) 64-Bit Server VM (build 19.0.2+7-44, mixed mode, sharing)

You should use Java8 (1.8). Spark is so old that it still works only with Java 8.
(Maybe the latest version will work with Java 11 too, but I'm not sure).

@MrPowers
Copy link
Collaborator Author

@puneetsharma04 - feel free to open up a pull request and we can help you push this over the finish line ;)

@puneetsharma04
Copy link
Contributor

puneetsharma04 commented Mar 18, 2023

@MrPowers : Thanks for the assurance 👍🏻.
@SemyonSinchenko : I have installed the JAVA 11 and got the below error.
One observation, while running the other tests , i am not getting the error.

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/puneet_sharma1/Documents/GitHub/quinn-fixing/venv/lib/python3.7/site-packages/pyspark/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
23/03/18 18:29:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
FAILED
test_append_if_schema_identical.py:13 (test_append_if_schema_identical)
spark = <pyspark.sql.session.SparkSession object at 0x7fc5485e2cd0>

    @auto_inject_fixtures("spark")
    def test_append_if_schema_identical(spark):
        source_data = [(1, "Alice", 25), (2, "Bob", 30)]
        target_data = [(3, "Charlie", "New York"), (4, "Dave", "Los Angeles")]
    
        source_df = spark.createDataFrame(source_data, schema=StructType([
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("age", IntegerType())
        ]))
    
        target_df = spark.createDataFrame(target_data, schema=StructType([
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("city", StringType())
        ]))
    
        # Call the append_if_schema_identical function
>       appended_df = quinn.append_if_schema_identical(source_df, target_df)
E       AttributeError: module 'quinn' has no attribute 'append_if_schema_identical'

test_append_if_schema_identical.py:32: AttributeError

@SemyonSinchenko
Copy link
Collaborator

AttributeError: module 'quinn' has no attribute 'append_if_schema_identical'

This error message is clear now and doesn't relate to spark or java. So please check your code and check that there is such a method and you write all the imports in the right way and thequinn package is installed or placed in the PYTHON_PATH variable.

@puneetsharma04
Copy link
Contributor

@SemyonSinchenko : Thanks a lot , its working now. I will work towards creating the PR now.

@puneetsharma04
Copy link
Contributor

#86 : Raised the PR for review.

@SemyonSinchenko SemyonSinchenko linked a pull request Apr 1, 2023 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants