Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Mongo to Bigquery fails if Mongo document contains BSON timestamp #1716

Open
ggprod opened this issue Jul 6, 2024 · 2 comments
Open
Labels
bug Something isn't working needs triage p2

Comments

@ggprod
Copy link
Contributor

ggprod commented Jul 6, 2024

Related Template(s)

mongodb_to_bigquery

Template Version

2024-06-18-00_rc01

What happened?

When attempting to use the template with a Mongo collection that has documents with a bson timestamp, the Job will fail with error.

Relevant log output

Error message from worker: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'Document{{_id=664ca1a214e2486dcb269ff9, historyDoc=664ca1a214e2486dcb269ff3, opType=update, opDatedAt=Timestamp{value=7371444407255433218, seconds=1716298146, inc=2}, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024, change=Document{{newValue=Document{{isTrusted=true, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024}}}}, facilityNurse=664bbab3077870b0be2551ce, facility=62cd8ec60918180034f3fe51, nurse=5d8292819a5bca002ffc43f1, __v=0}}' with coder 'SerializableCoder(org.bson.Document)'.
	org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1794)
	org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
	org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2219)
	org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322)
	org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1100)
	org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:143)
	org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:659)
	org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:654)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
	org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
	org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
	org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
	org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
	org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'Document{{_id=664ca1a214e2486dcb269ff9, historyDoc=664ca1a214e2486dcb269ff3, opType=update, opDatedAt=Timestamp{value=7371444407255433218, seconds=1716298146, inc=2}, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024, change=Document{{newValue=Document{{isTrusted=true, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024}}}}, facilityNurse=664bbab3077870b0be2551ce, facility=62cd8ec60918180034f3fe51, nurse=5d8292819a5bca002ffc43f1, __v=0}}' with coder 'SerializableCoder(org.bson.Document)'.
	org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
	org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:509)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:336)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
Caused by: java.io.NotSerializableException: org.bson.BsonTimestamp
	java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
	java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
	java.base/java.util.LinkedHashMap.internalWriteEntries(LinkedHashMap.java:333)
	java.base/java.util.HashMap.writeObject(HashMap.java:1412)
	java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	java.base/java.lang.reflect.Method.invoke(Method.java:566)
	java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
	java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
	java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
	java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
	java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
	java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
	java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
	java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
	java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
	org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:192)
	org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:57)
	org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
	org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:509)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:336)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
	org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)
	org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
	org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2219)
	org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322)
	org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1100)
	org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:143)
	org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:659)
	org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:654)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
	org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
	org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
	org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
	org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
	org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
	org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
	java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:829)
@ggprod ggprod added bug Something isn't working needs triage p2 labels Jul 6, 2024
@ggprod
Copy link
Contributor Author

ggprod commented Jul 6, 2024

Should this actually be a bug logged in the beam github for the MongoDbIO? Should it be stripping out or converting BSON timestamps to some serializable format?

@ggprod
Copy link
Contributor Author

ggprod commented Jul 6, 2024

I found this similar error described by a Spark user: https://www.mongodb.com/community/forums/t/timestamp-giving-error/229311

@ggprod ggprod changed the title [Bug]: Mongo to Bigquery fails if Mongo document contains Bson timestamp [Bug]: Mongo to Bigquery fails if Mongo document contains BSON timestamp Jul 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage p2
Projects
None yet
Development

No branches or pull requests

1 participant