Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Dataflow discarding massive amount of events due to Window object or inner processing #648

Open
@turboT4

Description

@turboT4

Been recently developing a Dataflow consumer which read from a PubSub subscription and outputs to Parquet files the combination of all those objects grouped within the same window.

While I was doing testing of this without a huge load everything seemed to work fine.

However, after performing some heavy testing I can see that from 1.000.000 events sent to that PubSub queue, only 1000 make it to Parquet!

According to multiple wall times across different stages, the one which parses the events prior applying the window seems to last 58 minutes. The last stage which writes to Parquet files lasts 1h and 32 minutes.

I will show now the most relevant parts of the code within, hope you can shed some light if its due to the logic that comes before the Window object definition or if it's the Window object iself.

pipeline
        .apply("Reading PubSub Events",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(options.getSubscription()))
        .apply("Map to AvroSchemaRecord (GenericRecord)",
            ParDo.of(new PubsubMessageToGenericRecord()))
        .setCoder(AvroCoder.of(AVRO_SCHEMA))
        .apply("15m window",
            Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )

Also note that I'm running Beam 2.9.0.

Could the logic inside the second stage be too heavy so that messages arrive too late and get discarded in the Window? The logic basically consists reading the payload, parsing into a POJO (reading inner Map attributes, filtering and such)

However, if I sent a million events to PubSub, all those million events make it till the Parquet write to file. Does that make sense?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions