You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.
I took the title from a previous issue #105 that has been resolved and it sounds just like my issue.
I have a pipeline that reads from unbonded pubsubio and writes to pubsubio.
Testing it locally reading from TextIO file and writing to pubsubio, works great, but when I switch the source to pubsubio (with large amount of data) and deploy it to google's dataflow, it runs fine but when data needs to be written out to pubsubio I get the following NullPointerException (seems related to creating the protobuf that wraps the message sent to pubsub):
Using Apache Beam 2.0.0 and the unique thing about my linear pipeline is that it contains 2 different windows. The first one a sliding window and the second a fixed window.
Ok, I found the problem and it is that google dataflow worker can not handle pubsubmessage with no attributes. The local runner handles it fine but when deployed to dataflow it crashes, so had to add attributes hashmap instead of just passing null:
Map<String, String> attributes = new HashMap<String,String>();
attributes.put("carlos", "carlos");
PubsubMessage m = new PubsubMessage(data, attributes);
I took the title from a previous issue #105 that has been resolved and it sounds just like my issue.
I have a pipeline that reads from unbonded pubsubio and writes to pubsubio.
Testing it locally reading from TextIO file and writing to pubsubio, works great, but when I switch the source to pubsubio (with large amount of data) and deploy it to google's dataflow, it runs fine but when data needs to be written out to pubsubio I get the following NullPointerException (seems related to creating the protobuf that wraps the message sent to pubsub):
Using Apache Beam 2.0.0 and the unique thing about my linear pipeline is that it contains 2 different windows. The first one a sliding window and the second a fixed window.
Notice that I write to pubsubio with 'writeMessages' (i.e. no attributes), yet code seems to die when trying to write out attributes.
Carlos.
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
org.apache.beam.sdk.transforms.MapElements$1$auxiliary$u4qfc9DK.invokeProcessElement(Unknown Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:246)
com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:194)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
org.apache.beam.examples.GaugeKVToMessage.processElement(GaugeDebug.java:396)
org.apache.beam.examples.GaugeKVToMessage$auxiliary$xI4IkwZo.invokeProcessElement(Unknown Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:246)
com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:149)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:266)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner$DoFnProcessContext$1.outputWindowedValue(SimpleOldDoFnRunner.java:463)
com.google.cloud.dataflow.worker.runners.worker.WindowingInternalsAdapters$2.outputWindowedValue(WindowingInternalsAdapters.java:63)
org.apache.beam.runners.core.ReduceFnRunner$2.output(ReduceFnRunner.java:1014)
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:428)
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:118)
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1019)
org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:893)
org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:758)
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowViaWindowSetDoFn.processElement(GroupAlsoByWindowViaWindowSetDoFn.java:90)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:122)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:101)
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:73)
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:106)
com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:198)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72)
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:791)
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.access$600(StreamingDataflowWorker.java:104)
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker$9.run(StreamingDataflowWorker.java:873)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
java.util.HashMap.putMapEntries(HashMap.java:500)
java.util.HashMap.putAll(HashMap.java:784)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.MapField$MutatabilityAwareMap.putAll(MapField.java:343)
com.google.cloud.dataflow.worker.repackaged.com.google.pubsub.v1.PubsubMessage$Builder.putAllAttributes(PubsubMessage.java:880)
com.google.cloud.dataflow.worker.runners.worker.PubsubSink$PubsubWriter.add(PubsubSink.java:131)
com.google.cloud.dataflow.worker.runners.worker.PubsubSink$PubsubWriter.add(PubsubSink.java:111)
com.google.cloud.dataflow.worker.util.common.worker.WriteOperation.process(WriteOperation.java:82)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:194)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
java.lang.RuntimeException: org.apache.beam.sdk.util.
Aug 04, 2017 1:50:32 PM org.apache.beam.runners.dataflow.DataflowPipelineJob$1 run
The text was updated successfully, but these errors were encountered: