Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Option to run Incremental Export From Bigtable (Timestamp Filtering) #2233

Open
wants to merge 9 commits into
base: release_2024-11-26-00_RC00
Choose a base branch
from

Conversation

maheshgoyal15
Copy link

Description:

This pull request introduces a new feature to enable incremental data exports from Google Cloud Bigtable based on timestamp filtering. This addresses a specific customer requirement for efficiently extracting daily data subsets for transfer to a non-production Bigtable instance.

Problem:

The customer needs to export data from their production Bigtable database on a daily basis, filtering it based on a timestamp criterion. Currently, the existing export functionality lacks the ability to perform incremental exports based on timestamps, forcing them to perform full table scans or implement complex custom solutions.

Solution:

This PR implements the following changes to enable incremental exports based on timestamps:

  • Introduce Timestamp Filtering Mechanism:
    • Adds two new command-line options: --startTimestamp and --endTimestamp.
    • These options allow users to specify a time range in UTC format (YYYY-MM-DDTHH:MM:SSZ) to filter the data during export.
    • The export process will then only retrieve and export rows that fall within the specified time range.
  • Leverage Bigtable Timestamp Filtering Capabilities:
    • Utilizes Bigtable's TimestampRangeFilter to efficiently retrieve only the required data.
    • This ensures that only the data within the specified time range is scanned and exported, minimizing resource consumption and export time.
  • Implementation Details:
    • The BigtableToAvro.Options interface has been extended to include getStartTimestamp() and getEndTimestamp() methods.
    • A timestampConverter function is implemented to convert the provided UTC timestamp strings into microseconds.
    • A filterProvider ValueProvider is used to construct the RowFilter based on the provided start and end timestamps.
    • The BigtableIO.Read operation now includes a withRowFilter(filterProvider) call to apply the timestamp filter.
    • Validation is added to make sure if one time stamp is provided, the other one must be provided as well.
  • Code Snippet (RowFilter Construction):
ValueProvider<RowFilter> filterProvider = new DualInputNestedValueProvider<>(
        options.getStartTimestamp(),
        options.getEndTimestamp(),
        (TranslatorInput<String, String> input) -> {
            String startTimestamp = input.getX();
            String endTimestamp = input.getY();

            boolean hasStart = startTimestamp != null && !startTimestamp.isEmpty();
            boolean hasEnd = endTimestamp != null && !endTimestamp.isEmpty();

            // Check if exactly one timestamp is provided (which is invalid)
            if ((hasStart && !hasEnd) || (!hasStart && hasEnd)) {
                throw new IllegalArgumentException(
                        "Both startTimestamp and endTimestamp must be provided together, or neither should be provided.");
            }

            // If neither timestamp is provided, return null (no filter)
            if (!hasStart && !hasEnd) {
                return null;
            }

            // Convert timestamps to microseconds
            Long startMicros = timestampConverter.apply(startTimestamp);
            Long endMicros = timestampConverter.apply(endTimestamp);

            // Build the timestamp filter
            com.google.cloud.bigtable.data.v2.models.Filters.TimestampRangeFilter filterBuilder =
                    FILTERS.timestamp().range();

            if (startMicros != null) {
                filterBuilder.startClosed(startMicros);
            }
            if (endMicros != null) {
                filterBuilder.endOpen(endMicros);
            }

            return filterBuilder.toProto();
        });

read = read.withRowFilter(filterProvider);

@maheshgoyal15 maheshgoyal15 requested a review from a team as a code owner March 4, 2025 17:07
Copy link
Contributor

@ron-gal ron-gal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very difficult to see the diffs =\ is there anything we can do about it? seems formatting is wrong...

v1/pom.xml Outdated
@@ -852,7 +852,7 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this done on purpose?

@maheshgoyal15
Copy link
Author

R: @svetakvsundhar

@pull-request-size pull-request-size bot added size/XS and removed size/L labels Mar 4, 2025
@pull-request-size pull-request-size bot added size/M and removed size/XS labels Mar 4, 2025
Copy link
Contributor

@ron-gal ron-gal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the integration test to use the filter, and maybe add unit tests to test edge cases like start without end etc

* Dataflow pipeline that exports data from a Cloud Bigtable table to Avro files in GCS. Currently,
* filtering on Cloud Bigtable table is not supported.
* Dataflow pipeline that exports data from a Cloud Bigtable table to Avro files in GCS.
* 2/25 Add filtering rows based on timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2/25?

groupName = "Source",
optional = true,
regexes = {"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]+)?Z"},
description = "Start Timestamp in UTC Format (YYYY-MM-DDTHH:MM:SSZ) for exporting ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something in this line seems broken. If not, please remove the whitespace after "Exporting".

optional = true,
regexes = {"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]+)?Z"},
description = "End Timestamp in UTC Format (YYYY-MM-DDTHH:MM:SSZ)",
helpText = " Example UTC timestamp 2024-10-27T10:15:30.00Z"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing text in the beginning

Copy link
Contributor

@svetakvsundhar svetakvsundhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @maheshgoyal15 ! Could you add some e2e tests here to ensure this WAI?

Added test case for edge cases and the filtering the rows.
Added optional parameter section in Readme
@pull-request-size pull-request-size bot added size/L and removed size/M labels Apr 2, 2025
@maheshgoyal15
Copy link
Author

@ron-gal There were few delays but I am able to incorporate all the recommendation you provided. Can you take a look at this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants