Skip to content

Flush usages data after publishOperation stream is closed #9444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: flush_e_later
Choose a base branch
from

Conversation

maggie-lou
Copy link
Collaborator

@maggie-lou maggie-lou commented May 22, 2025

Rather than flushing the data when a COMPLETED operation is published, publish when the client requests to close the publishOperation stream. This guarantees that all executions data has been collected (including cleanup data, which may come after an initial COMPLETED operation) before the data is flushed.

@maggie-lou maggie-lou force-pushed the flush_usage_later branch from 699d43b to cf6a6f5 Compare May 22, 2025 21:10
@maggie-lou maggie-lou requested review from bduffany and vanja-p May 22, 2025 21:32
@maggie-lou maggie-lou marked this pull request as ready for review May 23, 2025 17:29
Comment on lines 304 to 309
// The scheduler attempts to prevent concurrent execution updates via
// task leasing, but this is not currently 100% reliable. If execution
// progress appears to restart, ignore any future updates.
if lastStage == int64(repb.ExecutionStage_COMPLETED) && event.GetStage() != int64(repb.ExecutionStage_COMPLETED) {
break
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI we might (rarely) see some sequences like

(attempt 1) COMPLETED (execution completed)
(attempt 2) EXECUTING
(attempt 1) COMPLETED (recycling completed)

So we would drop the second COMPLETED event in this case. But it probably happens rarely enough that it's not worth worrying about for now.

@@ -333,17 +337,70 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin
// Backwards-compatible fill of the execution with the ExecutionSummary for
// now. The ExecutionSummary will be removed completely in the future.
if statsUnset(md) {
if decodedMetadata, err := decodeMetadataFromExecutionSummary(executeResponse); err == nil {
if decodedMetadata, _ := decodeMetadataFromExecutionSummary(executeResponse); decodedMetadata != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this was done in a different PR?

@@ -440,14 +443,11 @@ type publishTest struct {
func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a test that does 2 different PublishOperation streams for the same execution, simulating a retry of a broken stream.

@maggie-lou maggie-lou force-pushed the flush_usage_later branch from cf6a6f5 to 6b3985e Compare May 28, 2025 18:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants