Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiencing an issue where Apache Beam is unable to establish a connection to MySQL and consequently cannot write data to it #28498

Closed
2 of 15 tasks
kapilsingh421 opened this issue Sep 18, 2023 · 4 comments
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python

Comments

@kapilsingh421
Copy link

kapilsingh421 commented Sep 18, 2023

What happened?

Here is my code to write the data in mysql database

       joined_data = (
            {'location': unique_location_ids, 'procedure': procedures}
            | 'CoGroupByKey' >> beam.CoGroupByKey()
            | 'ProcessJoinedData' >> beam.Map(process_joined_data)
        )
       joined_data = joined_data | 'SetTypeHint' >> beam.Map(lambda x: x).with_output_types(Dict[str, Any])
        # Writing to MySQL:
        write_jdbc_results = joined_data | 'WriteToJdbc' >> WriteToJdbc(
            table_name='testing',
            driver_class_name='com.mysql.jdbc.Driver',
            jdbc_url='jdbc:mysql://IP:9030/database',
            username='root',
            password='',
        ).with_output_types(Dict[str, Any])

Error

    679       response = service.Expand(request)
    680       if response.error:
--> 681         raise RuntimeError(response.error)
    682       self._expanded_components = response.components
    683       if any(env.dependencies

RuntimeError: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1, beam:coder:nullable:v1]
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
	at org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:139)
	at org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$0(ExpansionService.java:526)
	at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:178)
	at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1576)
	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
	at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1601)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:521)
	at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:639)
	at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
	at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
	at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
	at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
	at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1, beam:coder:nullable:v1]

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Sep 18, 2023

The error is

Unknown Coder URN beam:coder:pickled_python:v1

This is due to passing a python dict to WriteToJdbc, while it expects a beam Row. See example here: use a NamedTuple that assigned with RowCoder:

def test_xlang_jdbc_write_read(self, database):

@kapilsingh421
Copy link
Author

kapilsingh421 commented Sep 19, 2023

@Abacn which class I need to use for mysql ?

 driver_class_name='com.mysql.jdbc.Driver'
          or 
driver_class_name='com.mysql.cj.jdbc.Driver'

I am using dataflow runner and getting error for both above driver_class not found

 Caused by: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.cj.jdbc.Driver'
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - 	at org.apache.commons.dbcp2.DriverFactory.createDriver(DriverFactory.java:54)
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - 	at org.apache.commons.dbcp2.BasicDataSource.createConnectionFactory(BasicDataSource.java:459)
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - 	at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:525)
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - 	at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:731)
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - 	at org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.getConnection(JdbcIO.java:2503)
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - 	at org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2555)
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - 	at org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2513)
[2023-09-19, 10:32:25 UTC] {beam.py:113} WARNING - Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
Caused by: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.Driver'

@kapilsilstone
Copy link

kapilsilstone commented Sep 20, 2023

@Abacn... Are there any additional dependencies or configurations I might be missing specifically for the MySQL connector?
Any input on why the MySQL connector isn't working, while the PostgreSQL one is working in Dataflow runner.?

@Abacn
Copy link
Contributor

Abacn commented Sep 20, 2023

For the driver, could also refer to the integration test linked above. Looks like it needs to add parameter classpath=['mysql:mysql-connector-java:8.0.28'] (or given version) for WriteToJdbc. For postgres it is not required likely the driver is included in Beam SDK container (transitive dependency)

@github-actions github-actions bot added this to the 2.52.0 Release milestone Sep 22, 2023
@jrmccluskey jrmccluskey added done & done Issue has been reviewed after it was closed for verification, followups, etc. and removed awaiting triage labels Oct 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python
Projects
None yet
Development

No branches or pull requests

4 participants