Skip to content

Commit 20569e1

Browse files
Return the new invocation epoch in restart_invocation API
1 parent 474b66f commit 20569e1

File tree

4 files changed

+38
-26
lines changed

4 files changed

+38
-26
lines changed

crates/admin/src/rest_api/invocations.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,20 @@ use super::error::*;
1212
use crate::generate_meta_api_error;
1313
use crate::rest_api::create_envelope_header;
1414
use crate::state::AdminServiceState;
15+
use axum::Json;
1516
use axum::extract::{Path, Query, State};
1617
use axum::http::StatusCode;
1718
use okapi_operation::*;
1819
use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey};
1920
use restate_types::invocation::client::{
20-
CancelInvocationResponse, InvocationClient, KillInvocationResponse, PurgeInvocationResponse,
21-
RestartInvocationResponse,
21+
self, CancelInvocationResponse, InvocationClient, KillInvocationResponse,
22+
PurgeInvocationResponse,
2223
};
2324
use restate_types::invocation::{
2425
InvocationEpoch, InvocationTermination, PurgeInvocationRequest, TerminationFlavor, restart,
2526
};
2627
use restate_wal_protocol::{Command, Envelope};
27-
use serde::Deserialize;
28+
use serde::{Deserialize, Serialize};
2829
use std::sync::Arc;
2930
use std::time::Duration;
3031
use tracing::warn;
@@ -343,6 +344,12 @@ where
343344
Ok(())
344345
}
345346

347+
#[derive(Debug, Serialize, JsonSchema)]
348+
pub struct RestartInvocationResponse {
349+
/// The new invocation epoch of the invocation.
350+
pub new_invocation_epoch: InvocationEpoch,
351+
}
352+
346353
/// What to do if the invocation is still running. By default, the running invocation will be killed.
347354
#[derive(Default, Debug, Deserialize, JsonSchema)]
348355
#[serde(rename_all = "snake_case")]
@@ -462,7 +469,7 @@ pub async fn restart_invocation<V, IC>(
462469
previous_attempt_retention,
463470
apply_to_workflow_run,
464471
}): Query<RestartInvocationParams>,
465-
) -> Result<(), RestartInvocationError>
472+
) -> Result<Json<RestartInvocationResponse>, RestartInvocationError>
466473
where
467474
IC: InvocationClient,
468475
{
@@ -482,23 +489,24 @@ where
482489
.await
483490
.map_err(InvocationClientError)?
484491
{
485-
RestartInvocationResponse::Ok => {}
486-
RestartInvocationResponse::NotFound => {
492+
client::RestartInvocationResponse::Ok { new_epoch } => Ok(RestartInvocationResponse {
493+
new_invocation_epoch: new_epoch,
494+
}
495+
.into()),
496+
client::RestartInvocationResponse::NotFound => {
487497
Err(InvocationNotFoundError(invocation_id.to_string()))?
488498
}
489-
RestartInvocationResponse::StillRunning => Err(RestartInvocationStillRunningError(
490-
invocation_id.to_string(),
491-
))?,
492-
RestartInvocationResponse::Unsupported => {
499+
client::RestartInvocationResponse::StillRunning => Err(
500+
RestartInvocationStillRunningError(invocation_id.to_string()),
501+
)?,
502+
client::RestartInvocationResponse::Unsupported => {
493503
Err(RestartInvocationUnsupportedError(invocation_id.to_string()))?
494504
}
495-
RestartInvocationResponse::MissingInput => Err(RestartInvocationMissingInputError(
496-
invocation_id.to_string(),
497-
))?,
498-
RestartInvocationResponse::NotStarted => {
505+
client::RestartInvocationResponse::MissingInput => Err(
506+
RestartInvocationMissingInputError(invocation_id.to_string()),
507+
)?,
508+
client::RestartInvocationResponse::NotStarted => {
499509
Err(RestartInvocationNotStartedError(invocation_id.to_string()))?
500510
}
501-
};
502-
503-
Ok(())
511+
}
504512
}

crates/types/src/invocation/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub enum PurgeInvocationResponse {
108108

109109
#[derive(Debug, Clone, PartialEq, Eq)]
110110
pub enum RestartInvocationResponse {
111-
Ok,
111+
Ok { new_epoch: InvocationEpoch },
112112
NotFound,
113113
StillRunning,
114114
Unsupported,

crates/types/src/net/partition_processor.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ impl From<PurgeInvocationResponse> for PurgeInvocationRpcResponse {
227227

228228
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229229
pub enum RestartInvocationRpcResponse {
230-
Ok,
230+
Ok { new_epoch: InvocationEpoch },
231231
NotFound,
232232
StillRunning,
233233
Unsupported,
@@ -238,7 +238,9 @@ pub enum RestartInvocationRpcResponse {
238238
impl From<RestartInvocationRpcResponse> for RestartInvocationResponse {
239239
fn from(value: RestartInvocationRpcResponse) -> Self {
240240
match value {
241-
RestartInvocationRpcResponse::Ok => RestartInvocationResponse::Ok,
241+
RestartInvocationRpcResponse::Ok { new_epoch } => {
242+
RestartInvocationResponse::Ok { new_epoch }
243+
}
242244
RestartInvocationRpcResponse::NotFound => RestartInvocationResponse::NotFound,
243245
RestartInvocationRpcResponse::StillRunning => RestartInvocationResponse::StillRunning,
244246
RestartInvocationRpcResponse::Unsupported => RestartInvocationResponse::Unsupported,
@@ -251,7 +253,9 @@ impl From<RestartInvocationRpcResponse> for RestartInvocationResponse {
251253
impl From<RestartInvocationResponse> for RestartInvocationRpcResponse {
252254
fn from(value: RestartInvocationResponse) -> Self {
253255
match value {
254-
RestartInvocationResponse::Ok => RestartInvocationRpcResponse::Ok,
256+
RestartInvocationResponse::Ok { new_epoch } => {
257+
RestartInvocationRpcResponse::Ok { new_epoch }
258+
}
255259
RestartInvocationResponse::NotFound => RestartInvocationRpcResponse::NotFound,
256260
RestartInvocationResponse::StillRunning => RestartInvocationRpcResponse::StillRunning,
257261
RestartInvocationResponse::Unsupported => RestartInvocationRpcResponse::Unsupported,

crates/worker/src/partition/state_machine/lifecycle/restart.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ where
315315
.await?;
316316

317317
// Reply to the listener, restart went well
318-
ctx.reply_to_restart_invocation(response_sink, RestartInvocationResponse::Ok);
318+
ctx.reply_to_restart_invocation(response_sink, RestartInvocationResponse::Ok { new_epoch });
319319

320320
Ok(())
321321
}
@@ -478,7 +478,7 @@ mod tests {
478478
all!(
479479
contains(pat!(Action::ForwardRestartInvocationResponse {
480480
request_id: eq(restart_request_id),
481-
response: eq(RestartInvocationResponse::Ok)
481+
response: eq(RestartInvocationResponse::Ok { new_epoch: 1 })
482482
})),
483483
contains(pat!(Action::Invoke {
484484
invocation_id: eq(invocation_id),
@@ -603,7 +603,7 @@ mod tests {
603603
// Verify the restart response is sent
604604
contains(pat!(Action::ForwardRestartInvocationResponse {
605605
request_id: eq(restart_request_id),
606-
response: eq(RestartInvocationResponse::Ok)
606+
response: eq(RestartInvocationResponse::Ok { new_epoch: 1 })
607607
})),
608608
// Verify the invocation is restarted
609609
contains(pat!(Action::Invoke {
@@ -787,7 +787,7 @@ mod tests {
787787
actions,
788788
contains(pat!(Action::ForwardRestartInvocationResponse {
789789
request_id: eq(restart_request_id),
790-
response: eq(RestartInvocationResponse::Ok)
790+
response: eq(RestartInvocationResponse::Ok { new_epoch: 1 })
791791
}))
792792
);
793793

@@ -927,7 +927,7 @@ mod tests {
927927
// Verify the restart response is sent
928928
contains(pat!(Action::ForwardRestartInvocationResponse {
929929
request_id: eq(restart_request_id),
930-
response: eq(RestartInvocationResponse::Ok)
930+
response: eq(RestartInvocationResponse::Ok { new_epoch: 1 })
931931
})),
932932
// Verify the invocation is restarted
933933
contains(pat!(Action::Invoke {

0 commit comments

Comments
 (0)