-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-12260] Java - Backport Firestore connector's ramp-up throttling to Datastore connector #14713
[BEAM-12260] Java - Backport Firestore connector's ramp-up throttling to Datastore connector #14713
Conversation
…or-java/clientside-throttling
…or-java/clientside-throttling
@chamikaramj - feel free to delegate to someone else. @danthev - thank you! |
Run Java PreCommit |
Same questions as #14723 but we can do the conversation there. |
R: @nehsyc |
…or-java/clientside-throttling
…or-java/clientside-throttling
Run Java PreCommit |
There seems to be a broken Flink test in master, otherwise this is ready for review. |
…or-java/clientside-throttling
Tests are green, so change is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Daniel! Generally looks good :)
...io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
Outdated
Show resolved
Hide resolved
...le-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
Outdated
Show resolved
Hide resolved
...le-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :)
...le-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java
Outdated
Show resolved
Hide resolved
|
||
/** Tests for {@link RampupThrottlingFn}. */ | ||
@RunWith(JUnit4.class) | ||
public class RampupThrottlingFnTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way to test throttling counter is set properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. (Had to make throttlingMsecs
visible for testing)
Run Apache Beam Java SDK Quickstart - Direct |
retest this please |
Java Wordcount failure seems unrelated, but I can't get it to rerun the test. |
…or-java/clientside-throttling
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM other than a nit.
@@ -212,7 +220,7 @@ | |||
* <p>Testing has found that a batch of 200 entities will generally finish within the timeout even | |||
* in adverse conditions. | |||
*/ | |||
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 200; | |||
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this updated ? If 50 is a better number could you also update the comment please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the comment. We found that a lower starting value helps with entities that have an extremely high number of indexes, these would sometimes start with load that had no realistic chance of succeeding. This is just a start value, so the size quickly corrects up to 500 if write throughput is enough after all.
@@ -225,7 +233,7 @@ | |||
* number of entities per request may be lower when we flush for the end of a bundle or if we hit | |||
* {@link DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT}. | |||
*/ | |||
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 10; | |||
@VisibleForTesting static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. Note that this could potentially double the number of RPCs to Datastore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhat similar issue, the limit of 10 was rather arbitrary and led to some deadlines on expensive writes. Write cost outweighs the cost of processing more RPCs, so we went down a little to accommodate that.
Both variables help with edge cases on complex entities, most traffic will be able to go to batch size 500.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarifications.
Thanks! Rerunning precommit, there's a FlinkSavePoint test that I've noticed more than once now, probably flaky. Run Java PreCommit |
Run Java PreCommit |
It looks like the test added in this change is flaky: https://issues.apache.org/jira/browse/BEAM-12858. @danthev would you by chance be able to take a look? |
A new connector for Cloud Firestore is being added to Beam and currently out for review (see #14261). Some of the features and adjustments are useful for Cloud Datastore as well, so they're being backported to this connector.
Ramp-up throttling
This follows the same implementation as the Firetore connector, though implemented as a subtransform in the Write/Delete PTransform for simplicity and so the change is more easily visible.
In essence, the ramp-up throttler is a generic rate limiter that emits elements only when there is sufficient "budget". This budget starts at 500 entities per second (split across workers) and increases by 50% every 5 minutes, as documented under Datastore Best Practices. Abiding by this rule is not always necessary, the throttling step can be turned off in that case.
Minor tweaks
There have been minor tweaks and adjustments to some of the constants along the way.
datastoreEntitiesMutated
anddatastoreLatencyMsPerMutation
.R: @chamikaramj
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunner
compliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.