Skip to content

Restart invocation #3360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ restate-core = { workspace = true, features = ["options_schema"] }
restate-errors = { workspace = true }
restate-futures-util = { workspace = true }
restate-metadata-store = { workspace = true }
restate-serde-util = { workspace = true }
restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-storage-query-datafusion = { workspace = true }
Expand Down
24 changes: 24 additions & 0 deletions crates/admin/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,30 @@ impl_meta_api_error!(InvocationWasAlreadyCompletedError: CONFLICT "The invocatio
pub(crate) struct PurgeInvocationNotCompletedError(pub(crate) String);
impl_meta_api_error!(PurgeInvocationNotCompletedError: CONFLICT "The invocation is not yet completed. An invocation can be purged only when completed.");

#[derive(Debug, thiserror::Error)]
#[error("The invocation '{0}' is still running.")]
pub(crate) struct RestartInvocationStillRunningError(pub(crate) String);
impl_meta_api_error!(RestartInvocationStillRunningError: CONFLICT "The invocation is still running. An invocation can be restarted only when completed, or if the query parameter if_running=kill is provided.");

#[derive(Debug, thiserror::Error)]
#[error(
"Restarting the invocation '{0}' is not supported, because it was started using the old service protocol."
)]
pub(crate) struct RestartInvocationUnsupportedError(pub(crate) String);
impl_meta_api_error!(RestartInvocationUnsupportedError: UNPROCESSABLE_ENTITY "Restarting the invocation is not supported, because it was started using the old service protocol.");

#[derive(Debug, thiserror::Error)]
#[error(
"The invocation '{0}' cannot be restarted because the input is not available. This indicates that the journal was already purged, or not retained at all."
)]
pub(crate) struct RestartInvocationMissingInputError(pub(crate) String);
impl_meta_api_error!(RestartInvocationMissingInputError: GONE "The invocation cannot be restarted because the input is not available. In order to restart an invocation, the journal must be available in order to read the input again. Journal can be retained after completion by enabling journal retention.");

#[derive(Debug, thiserror::Error)]
#[error("The invocation '{0}' cannot be restarted because it's not running yet.")]
pub(crate) struct RestartInvocationNotStartedError(pub(crate) String);
impl_meta_api_error!(RestartInvocationNotStartedError: TOO_EARLY "The invocation cannot be restarted because it's not running yet, meaning it might have been scheduled or inboxed.");

// --- Old Meta API errors. Please don't use these anymore.

/// This error is used by handlers to propagate API errors,
Expand Down
243 changes: 225 additions & 18 deletions crates/admin/src/rest_api/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,25 @@
// by the Apache License, Version 2.0.

use super::error::*;
use std::sync::Arc;

use crate::generate_meta_api_error;
use crate::rest_api::create_envelope_header;
use crate::state::AdminServiceState;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use okapi_operation::*;
use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey};
use restate_types::invocation::client::{
CancelInvocationResponse, InvocationClient, KillInvocationResponse, PurgeInvocationResponse,
self, CancelInvocationResponse, InvocationClient, KillInvocationResponse,
PurgeInvocationResponse,
};
use restate_types::invocation::{
InvocationEpoch, InvocationTermination, PurgeInvocationRequest, TerminationFlavor, restart,
};
use restate_types::invocation::{InvocationTermination, PurgeInvocationRequest, TerminationFlavor};
use restate_wal_protocol::{Command, Envelope};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tracing::warn;

#[derive(Debug, Default, Deserialize, JsonSchema)]
Expand Down Expand Up @@ -98,6 +102,7 @@ pub async fn delete_invocation<V, IC>(
DeletionMode::Purge => Command::PurgeInvocation(PurgeInvocationRequest {
invocation_id,
response_sink: None,
invocation_epoch: 0,
}),
};

Expand Down Expand Up @@ -224,21 +229,37 @@ where

generate_meta_api_error!(PurgeInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, PurgeInvocationNotCompletedError]);

#[derive(Debug, Default, Deserialize, JsonSchema)]
pub struct PurgeInvocationParams {
pub epoch: Option<InvocationEpoch>,
}

/// Purge an invocation
#[openapi(
summary = "Purge an invocation",
description = "Purge the given invocation. This cleanups all the state for the given invocation. This command applies only to completed invocations.",
description = "Purge the given invocation. This cleanups all the state for the given invocation, including its journal. This command applies only to completed invocations.",
operation_id = "purge_invocation",
tags = "invocation",
parameters(path(
name = "invocation_id",
description = "Invocation identifier.",
schema = "std::string::String"
))
parameters(
path(
name = "invocation_id",
description = "Invocation identifier.",
schema = "std::string::String"
),
query(
name = "epoch",
description = "Remove the specific epoch. If not provided, epoch 0 will be removed. When removing the latest epoch, all the previous epochs will be cleaned up as well.",
required = false,
style = "simple",
allow_empty_value = false,
schema = InvocationEpoch,
)
)
)]
pub async fn purge_invocation<V, IC>(
State(state): State<AdminServiceState<V, IC>>,
Path(invocation_id): Path<String>,
Query(PurgeInvocationParams { epoch }): Query<PurgeInvocationParams>,
) -> Result<(), PurgeInvocationError>
where
IC: InvocationClient,
Expand All @@ -249,7 +270,11 @@ where

match state
.invocation_client
.purge_invocation(PartitionProcessorRpcRequestId::new(), invocation_id)
.purge_invocation(
PartitionProcessorRpcRequestId::new(),
invocation_id,
epoch.unwrap_or_default(),
)
.await
.map_err(InvocationClientError)?
{
Expand All @@ -273,15 +298,26 @@ generate_meta_api_error!(PurgeJournalError: [InvocationNotFoundError, Invocation
description = "Purge the given invocation journal. This cleanups only the journal for the given invocation, retaining the metadata. This command applies only to completed invocations.",
operation_id = "purge_journal",
tags = "invocation",
parameters(path(
name = "invocation_id",
description = "Invocation identifier.",
schema = "std::string::String"
))
parameters(
path(
name = "invocation_id",
description = "Invocation identifier.",
schema = "std::string::String"
),
query(
name = "epoch",
description = "Remove the specific epoch. If not provided, epoch 0 will be removed. When removing the latest epoch, all the previous epochs will be cleaned up as well.",
required = false,
style = "simple",
allow_empty_value = false,
schema = InvocationEpoch,
)
)
)]
pub async fn purge_journal<V, IC>(
State(state): State<AdminServiceState<V, IC>>,
Path(invocation_id): Path<String>,
Query(PurgeInvocationParams { epoch }): Query<PurgeInvocationParams>,
) -> Result<(), PurgeJournalError>
where
IC: InvocationClient,
Expand All @@ -292,7 +328,11 @@ where

match state
.invocation_client
.purge_journal(PartitionProcessorRpcRequestId::new(), invocation_id)
.purge_journal(
PartitionProcessorRpcRequestId::new(),
invocation_id,
epoch.unwrap_or_default(),
)
.await
.map_err(InvocationClientError)?
{
Expand All @@ -307,3 +347,170 @@ where

Ok(())
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct RestartInvocationResponse {
/// The new invocation epoch of the invocation.
pub new_invocation_epoch: InvocationEpoch,
}

/// What to do if the invocation is still running. By default, the running invocation will be killed.
#[derive(Default, Debug, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum RestartInvocationIfRunning {
/// Kill the invocation, sending a failure to the waiting callers, then restart the invocation.
#[default]
Kill,
/// Fail the Restart operation if the invocation is still running.
Fail,
}

impl From<RestartInvocationIfRunning> for restart::IfRunning {
fn from(value: RestartInvocationIfRunning) -> Self {
match value {
RestartInvocationIfRunning::Kill => restart::IfRunning::Kill,
RestartInvocationIfRunning::Fail => restart::IfRunning::Fail,
}
}
}

/// What to do in case of restarting a workflow run. By default, clears all promises and state.
#[derive(Default, Debug, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum RestartInvocationApplyToWorkflowRun {
Nothing,
/// Clear all the promises, retain the state
ClearAllPromises,
/// Clear all the state, retain the promises
ClearAllState,
/// Clear all the promises and state
#[default]
ClearAllPromisesAndState,
}

impl From<RestartInvocationApplyToWorkflowRun> for restart::ApplyToWorkflowRun {
fn from(value: RestartInvocationApplyToWorkflowRun) -> Self {
match value {
RestartInvocationApplyToWorkflowRun::Nothing => restart::ApplyToWorkflowRun::Nothing,
RestartInvocationApplyToWorkflowRun::ClearAllPromises => {
restart::ApplyToWorkflowRun::ClearOnlyPromises
}
RestartInvocationApplyToWorkflowRun::ClearAllState => {
restart::ApplyToWorkflowRun::ClearOnlyState
}
RestartInvocationApplyToWorkflowRun::ClearAllPromisesAndState => {
restart::ApplyToWorkflowRun::ClearAllPromisesAndState
}
}
}
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
pub struct RestartInvocationParams {
pub if_running: Option<RestartInvocationIfRunning>,
#[serde(
default,
with = "serde_with::As::<Option<restate_serde_util::DurationString>>"
)]
#[schemars(with = "Option<String>")]
pub previous_attempt_retention: Option<Duration>,
pub apply_to_workflow_run: Option<RestartInvocationApplyToWorkflowRun>,
}

generate_meta_api_error!(RestartInvocationError: [
InvocationNotFoundError,
InvocationClientError,
InvalidFieldError,
RestartInvocationStillRunningError,
RestartInvocationUnsupportedError,
RestartInvocationMissingInputError,
RestartInvocationNotStartedError
]);

/// Restart an invocation
#[openapi(
summary = "Restart an invocation",
description = "Restart the given invocation. This will restart the invocation, given its input is available.",
operation_id = "restart_invocation",
tags = "invocation",
parameters(
path(
name = "invocation_id",
description = "Invocation identifier.",
schema = "std::string::String"
),
query(
name = "if_running",
description = "What to do if the invocation is still running. By default, the running invocation will be killed.",
required = false,
style = "simple",
allow_empty_value = false,
schema = RestartInvocationIfRunning,
),
query(
name = "previous_attempt_retention",
description = "If set, it will override the configured completion_retention/journal_retention when the invocation was executed the first time. If none of the completion_retention/journal_retention are configured, and neither this previous_attempt_retention, then the previous attempt won't be retained at all. Can be configured using humantime format or ISO8601.",
required = false,
style = "simple",
allow_empty_value = false,
schema = String,
),
query(
name = "apply_to_workflow_run",
description = "What to do in case of restarting a workflow run. By default, clears all promises and state.",
required = false,
style = "simple",
allow_empty_value = false,
schema = RestartInvocationApplyToWorkflowRun,
)
)
)]
pub async fn restart_invocation<V, IC>(
State(state): State<AdminServiceState<V, IC>>,
Path(invocation_id): Path<String>,
Query(RestartInvocationParams {
if_running,
previous_attempt_retention,
apply_to_workflow_run,
}): Query<RestartInvocationParams>,
) -> Result<Json<RestartInvocationResponse>, RestartInvocationError>
where
IC: InvocationClient,
{
let invocation_id = invocation_id
.parse::<InvocationId>()
.map_err(|e| InvalidFieldError("invocation_id", e.to_string()))?;

match state
.invocation_client
.restart_invocation(
PartitionProcessorRpcRequestId::new(),
invocation_id,
if_running.unwrap_or_default().into(),
previous_attempt_retention,
apply_to_workflow_run.unwrap_or_default().into(),
)
.await
.map_err(InvocationClientError)?
{
client::RestartInvocationResponse::Ok { new_epoch } => Ok(RestartInvocationResponse {
new_invocation_epoch: new_epoch,
}
.into()),
client::RestartInvocationResponse::NotFound => {
Err(InvocationNotFoundError(invocation_id.to_string()))?
}
client::RestartInvocationResponse::StillRunning => Err(
RestartInvocationStillRunningError(invocation_id.to_string()),
)?,
client::RestartInvocationResponse::Unsupported => {
Err(RestartInvocationUnsupportedError(invocation_id.to_string()))?
}
client::RestartInvocationResponse::MissingInput => Err(
RestartInvocationMissingInputError(invocation_id.to_string()),
)?,
client::RestartInvocationResponse::NotStarted => {
Err(RestartInvocationNotStartedError(invocation_id.to_string()))?
}
}
}
4 changes: 4 additions & 0 deletions crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ where
"/invocations/:invocation_id/purge-journal",
patch(openapi_handler!(invocations::purge_journal)),
)
.route(
"/invocations/:invocation_id/restart",
patch(openapi_handler!(invocations::restart_invocation)),
)
.route(
"/subscriptions",
post(openapi_handler!(subscriptions::create_subscription)),
Expand Down
Loading
Loading