Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Dataflow discarding massive amount of events due to Window object or inner processing #648

Open
turboT4 opened this issue Oct 8, 2019 · 5 comments

Comments

@turboT4
Copy link

turboT4 commented Oct 8, 2019

Been recently developing a Dataflow consumer which read from a PubSub subscription and outputs to Parquet files the combination of all those objects grouped within the same window.

While I was doing testing of this without a huge load everything seemed to work fine.

However, after performing some heavy testing I can see that from 1.000.000 events sent to that PubSub queue, only 1000 make it to Parquet!

According to multiple wall times across different stages, the one which parses the events prior applying the window seems to last 58 minutes. The last stage which writes to Parquet files lasts 1h and 32 minutes.

I will show now the most relevant parts of the code within, hope you can shed some light if its due to the logic that comes before the Window object definition or if it's the Window object iself.

pipeline
        .apply("Reading PubSub Events",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(options.getSubscription()))
        .apply("Map to AvroSchemaRecord (GenericRecord)",
            ParDo.of(new PubsubMessageToGenericRecord()))
        .setCoder(AvroCoder.of(AVRO_SCHEMA))
        .apply("15m window",
            Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )

Also note that I'm running Beam 2.9.0.

Could the logic inside the second stage be too heavy so that messages arrive too late and get discarded in the Window? The logic basically consists reading the payload, parsing into a POJO (reading inner Map attributes, filtering and such)

However, if I sent a million events to PubSub, all those million events make it till the Parquet write to file. Does that make sense?

@lukecwik
Copy link
Contributor

lukecwik commented Oct 8, 2019

Why do you want to use a processing time trigger vs using an event time trigger?

@turboT4
Copy link
Author

turboT4 commented Oct 8, 2019

Just for the sake of performance @lukecwik , but an event time trigger would require a different schema of data to be processed, right?

Tried to do so once and couldn't make it to fire inferring that timestamp by the event itself.

What I currently have are bytes which materialize into JSON and do contain a timestamp that I tried to force as a KV<Timestamp_Here, JsonString>, but no luck so far.

Could you please show me some valuable docs or website to understand more of event time triggering?

@kennknowles
Copy link
Contributor

I have replied to your email on [email protected]. TL;DR the trigger is "closing" the window and dropping all further data.

@turboT4
Copy link
Author

turboT4 commented Oct 9, 2019

Thanks a lot for the fast response @kennknowles , I've also replied to your reply but will also put it here to the Git issue is also updated.

I can recall having done what you just wrote in the email and fought with this issue that I couldn't wrap my head around.

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Cannot output with timestamp 2019-10-08T19:29:14.247Z. Output timestamps must be no earlier than the timestamp of the current input (2019-10-08T19:29:14.283Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

After applying changes into the pipeline it looks like this.

.apply(
                options.getWindowDuration() + "min Window",
                Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
                    .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(
                            AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(1))))
                    .withAllowedLateness(new Duration(Long.MAX_VALUE))
                    .discardingFiredPanes()
            )

I currently have data queued for +10 hours, but apart from that, the timestamp that I'm inferring is the one from the event that I'm parsing (JSON) itself, not the one coming from PubSub (no clue how to consume that? Attributes from PubSubMessage?).

pipeline
            .apply("Read PubSub Events",
                PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(options.getSubscription()))

@kennknowles
Copy link
Contributor

This repository is not updated, so best to continue on the current and active Beam list.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants