-
Notifications
You must be signed in to change notification settings - Fork 106
Flush usages data after publishOperation stream is closed #9444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: flush_e_later
Are you sure you want to change the base?
Conversation
699d43b
to
cf6a6f5
Compare
// The scheduler attempts to prevent concurrent execution updates via | ||
// task leasing, but this is not currently 100% reliable. If execution | ||
// progress appears to restart, ignore any future updates. | ||
if lastStage == int64(repb.ExecutionStage_COMPLETED) && event.GetStage() != int64(repb.ExecutionStage_COMPLETED) { | ||
break | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI we might (rarely) see some sequences like
(attempt 1) COMPLETED (execution completed)
(attempt 2) EXECUTING
(attempt 1) COMPLETED (recycling completed)
So we would drop the second COMPLETED event in this case. But it probably happens rarely enough that it's not worth worrying about for now.
@@ -333,17 +337,70 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin | |||
// Backwards-compatible fill of the execution with the ExecutionSummary for | |||
// now. The ExecutionSummary will be removed completely in the future. | |||
if statsUnset(md) { | |||
if decodedMetadata, err := decodeMetadataFromExecutionSummary(executeResponse); err == nil { | |||
if decodedMetadata, _ := decodeMetadataFromExecutionSummary(executeResponse); decodedMetadata != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC this was done in a different PR?
@@ -440,14 +443,11 @@ type publishTest struct { | |||
func testExecuteAndPublishOperation(t *testing.T, test publishTest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add a test that does 2 different PublishOperation streams for the same execution, simulating a retry of a broken stream.
cf6a6f5
to
6b3985e
Compare
Rather than flushing the data when a COMPLETED operation is published, publish when the client requests to close the publishOperation stream. This guarantees that all executions data has been collected (including cleanup data, which may come after an initial COMPLETED operation) before the data is flushed.