Skip to content

Commit 474b66f

Browse files
Mindless plumbing
1 parent 45dbc94 commit 474b66f

File tree

21 files changed

+448
-122
lines changed

21 files changed

+448
-122
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/worker_api/partition_processor_rpc_client.rs

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ use restate_types::identifiers::{
2121
use restate_types::invocation::client::{
2222
AttachInvocationResponse, CancelInvocationResponse, GetInvocationOutputResponse,
2323
InvocationClient, InvocationClientError, InvocationOutput, KillInvocationResponse,
24-
PurgeInvocationResponse, SubmittedInvocationNotification,
24+
PurgeInvocationResponse, RestartInvocationResponse, SubmittedInvocationNotification,
25+
};
26+
use restate_types::invocation::restart::{ApplyToWorkflowRun, IfRunning};
27+
use restate_types::invocation::{
28+
InvocationEpoch, InvocationQuery, InvocationRequest, InvocationResponse,
2529
};
26-
use restate_types::invocation::{InvocationQuery, InvocationRequest, InvocationResponse};
2730
use restate_types::journal_v2::Signal;
2831
use restate_types::live::Live;
2932
use restate_types::net::codec::EncodeError;
@@ -32,6 +35,7 @@ use restate_types::net::partition_processor::{
3235
PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse,
3336
};
3437
use restate_types::partition_table::{FindPartition, PartitionTable, PartitionTableError};
38+
use std::time::Duration;
3539
use tracing::trace;
3640

3741
#[derive(Debug, thiserror::Error)]
@@ -450,11 +454,15 @@ where
450454
&self,
451455
request_id: PartitionProcessorRpcRequestId,
452456
invocation_id: InvocationId,
457+
invocation_epoch: InvocationEpoch,
453458
) -> Result<PurgeInvocationResponse, InvocationClientError> {
454459
let response = self
455460
.resolve_partition_id_and_send(
456461
request_id,
457-
PartitionProcessorRpcRequestInner::PurgeInvocation { invocation_id },
462+
PartitionProcessorRpcRequestInner::PurgeInvocation {
463+
invocation_id,
464+
invocation_epoch,
465+
},
458466
)
459467
.await?;
460468

@@ -472,11 +480,15 @@ where
472480
&self,
473481
request_id: PartitionProcessorRpcRequestId,
474482
invocation_id: InvocationId,
483+
invocation_epoch: InvocationEpoch,
475484
) -> Result<PurgeInvocationResponse, InvocationClientError> {
476485
let response = self
477486
.resolve_partition_id_and_send(
478487
request_id,
479-
PartitionProcessorRpcRequestInner::PurgeJournal { invocation_id },
488+
PartitionProcessorRpcRequestInner::PurgeJournal {
489+
invocation_id,
490+
invocation_epoch,
491+
},
480492
)
481493
.await?;
482494

@@ -489,4 +501,34 @@ where
489501
}
490502
})
491503
}
504+
505+
async fn restart_invocation(
506+
&self,
507+
request_id: PartitionProcessorRpcRequestId,
508+
invocation_id: InvocationId,
509+
if_running: IfRunning,
510+
previous_attempt_retention: Option<Duration>,
511+
apply_to_workflow_run: ApplyToWorkflowRun,
512+
) -> Result<RestartInvocationResponse, InvocationClientError> {
513+
let response = self
514+
.resolve_partition_id_and_send(
515+
request_id,
516+
PartitionProcessorRpcRequestInner::RestartInvocation {
517+
invocation_id,
518+
if_running,
519+
previous_attempt_retention,
520+
apply_to_workflow_run,
521+
},
522+
)
523+
.await?;
524+
525+
Ok(match response {
526+
PartitionProcessorRpcResponse::RestartInvocation(purge_invocation_response) => {
527+
purge_invocation_response.into()
528+
}
529+
_ => {
530+
panic!("Expecting RestartInvocation rpc response")
531+
}
532+
})
533+
}
492534
}

crates/types/src/errors.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ pub mod codes {
175175
PROTOCOL_VIOLATION 571 "Protocol violation",
176176
CONFLICT 409 "Conflict",
177177
NOT_READY 470 "Not ready",
178+
RESTARTED 471 "Restarted",
178179
);
179180
}
180181

@@ -295,6 +296,11 @@ impl From<anyhow::Error> for InvocationError {
295296
pub const KILLED_INVOCATION_ERROR: InvocationError =
296297
InvocationError::new_static(codes::ABORTED, "killed");
297298

299+
pub const RESTARTED_INVOCATION_ERROR: InvocationError = InvocationError::new_static(
300+
codes::RESTARTED,
301+
"The invocation was restarted. You can re-attach to it to retrieve the new result.",
302+
);
303+
298304
// TODO: Once we want to distinguish server side cancellations from user code returning the
299305
// UserErrorCode::Cancelled, we need to add a new RestateErrorCode.
300306
pub const CANCELED_INVOCATION_ERROR: InvocationError =

crates/types/src/invocation/client.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@
1010

1111
use crate::errors::InvocationError;
1212
use crate::identifiers::{InvocationId, PartitionProcessorRpcRequestId};
13-
use crate::invocation::{InvocationQuery, InvocationRequest, InvocationResponse, InvocationTarget};
13+
use crate::invocation::restart::{ApplyToWorkflowRun, IfRunning};
14+
use crate::invocation::{
15+
InvocationEpoch, InvocationQuery, InvocationRequest, InvocationResponse, InvocationTarget,
16+
};
1417
use crate::journal_v2::Signal;
1518
use crate::time::MillisSinceEpoch;
1619
use bytes::Bytes;
20+
use std::time::Duration;
1721

1822
#[derive(Debug, thiserror::Error)]
1923
#[error("{inner}")]
@@ -102,6 +106,16 @@ pub enum PurgeInvocationResponse {
102106
NotCompleted,
103107
}
104108

109+
#[derive(Debug, Clone, PartialEq, Eq)]
110+
pub enum RestartInvocationResponse {
111+
Ok,
112+
NotFound,
113+
StillRunning,
114+
Unsupported,
115+
MissingInput,
116+
NotStarted,
117+
}
118+
105119
/// This trait provides the functionalities to interact with Restate invocations.
106120
pub trait InvocationClient {
107121
/// Append the invocation to the log, waiting for the PP to emit [`SubmittedInvocationNotification`] when the command is processed.
@@ -161,17 +175,29 @@ pub trait InvocationClient {
161175
invocation_id: InvocationId,
162176
) -> impl Future<Output = Result<KillInvocationResponse, InvocationClientError>> + Send;
163177

164-
/// Purge the given invocation.
178+
/// Purge the given invocation. If the invocation_epoch is the latest, all the previous epochs will be purged as well.
165179
fn purge_invocation(
166180
&self,
167181
request_id: PartitionProcessorRpcRequestId,
168182
invocation_id: InvocationId,
183+
invocation_epoch: InvocationEpoch,
169184
) -> impl Future<Output = Result<PurgeInvocationResponse, InvocationClientError>> + Send;
170185

171-
/// Purge the given invocation journal.
186+
/// Purge the given invocation journal. If the invocation_epoch is the latest, all the previous epochs will be purged as well.
172187
fn purge_journal(
173188
&self,
174189
request_id: PartitionProcessorRpcRequestId,
175190
invocation_id: InvocationId,
191+
invocation_epoch: InvocationEpoch,
176192
) -> impl Future<Output = Result<PurgeInvocationResponse, InvocationClientError>> + Send;
193+
194+
/// See [`crate::invocation::restart::Request`].
195+
fn restart_invocation(
196+
&self,
197+
request_id: PartitionProcessorRpcRequestId,
198+
invocation_id: InvocationId,
199+
if_running: IfRunning,
200+
previous_attempt_retention: Option<Duration>,
201+
apply_to_workflow_run: ApplyToWorkflowRun,
202+
) -> impl Future<Output = Result<RestartInvocationResponse, InvocationClientError>> + Send;
177203
}

crates/types/src/journal_v2/notification.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ impl From<CompletionType> for NotificationType {
7575
}
7676
}
7777

78+
impl From<CompletionType> for EntryType {
79+
fn from(value: CompletionType) -> Self {
80+
EntryType::Notification(value.into())
81+
}
82+
}
83+
7884
#[enum_dispatch]
7985
pub trait NotificationMetadata {
8086
fn id(&self) -> NotificationId;

crates/types/src/net/partition_processor.rs

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ use crate::identifiers::{
1313
};
1414
use crate::invocation::client::{
1515
CancelInvocationResponse, InvocationOutput, KillInvocationResponse, PurgeInvocationResponse,
16-
SubmittedInvocationNotification,
16+
RestartInvocationResponse, SubmittedInvocationNotification,
1717
};
18-
use crate::invocation::{InvocationQuery, InvocationRequest, InvocationResponse};
18+
use crate::invocation::restart::{ApplyToWorkflowRun, IfRunning};
19+
use crate::invocation::{InvocationEpoch, InvocationQuery, InvocationRequest, InvocationResponse};
1920
use crate::journal_v2::Signal;
2021
use crate::net::ServiceTag;
2122
use crate::net::{default_wire_codec, define_rpc, define_service};
2223
use serde::{Deserialize, Serialize};
24+
use std::time::Duration;
2325

2426
pub struct PartitionLeaderService;
2527

@@ -68,10 +70,26 @@ pub enum PartitionProcessorRpcRequestInner {
6870
GetInvocationOutput(InvocationQuery, GetInvocationOutputResponseMode),
6971
AppendInvocationResponse(InvocationResponse),
7072
AppendSignal(InvocationId, Signal),
71-
CancelInvocation { invocation_id: InvocationId },
72-
KillInvocation { invocation_id: InvocationId },
73-
PurgeInvocation { invocation_id: InvocationId },
74-
PurgeJournal { invocation_id: InvocationId },
73+
CancelInvocation {
74+
invocation_id: InvocationId,
75+
},
76+
KillInvocation {
77+
invocation_id: InvocationId,
78+
},
79+
PurgeInvocation {
80+
invocation_id: InvocationId,
81+
invocation_epoch: InvocationEpoch,
82+
},
83+
PurgeJournal {
84+
invocation_id: InvocationId,
85+
invocation_epoch: InvocationEpoch,
86+
},
87+
RestartInvocation {
88+
invocation_id: InvocationId,
89+
if_running: IfRunning,
90+
previous_attempt_retention: Option<Duration>,
91+
apply_to_workflow_run: ApplyToWorkflowRun,
92+
},
7593
}
7694

7795
impl WithPartitionKey for PartitionProcessorRpcRequestInner {
@@ -87,10 +105,13 @@ impl WithPartitionKey for PartitionProcessorRpcRequestInner {
87105
PartitionProcessorRpcRequestInner::KillInvocation { invocation_id } => {
88106
invocation_id.partition_key()
89107
}
90-
PartitionProcessorRpcRequestInner::PurgeInvocation { invocation_id } => {
108+
PartitionProcessorRpcRequestInner::PurgeInvocation { invocation_id, .. } => {
91109
invocation_id.partition_key()
92110
}
93-
PartitionProcessorRpcRequestInner::PurgeJournal { invocation_id } => {
111+
PartitionProcessorRpcRequestInner::PurgeJournal { invocation_id, .. } => {
112+
invocation_id.partition_key()
113+
}
114+
PartitionProcessorRpcRequestInner::RestartInvocation { invocation_id, .. } => {
94115
invocation_id.partition_key()
95116
}
96117
}
@@ -204,6 +225,42 @@ impl From<PurgeInvocationResponse> for PurgeInvocationRpcResponse {
204225
}
205226
}
206227

228+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229+
pub enum RestartInvocationRpcResponse {
230+
Ok,
231+
NotFound,
232+
StillRunning,
233+
Unsupported,
234+
MissingInput,
235+
NotStarted,
236+
}
237+
238+
impl From<RestartInvocationRpcResponse> for RestartInvocationResponse {
239+
fn from(value: RestartInvocationRpcResponse) -> Self {
240+
match value {
241+
RestartInvocationRpcResponse::Ok => RestartInvocationResponse::Ok,
242+
RestartInvocationRpcResponse::NotFound => RestartInvocationResponse::NotFound,
243+
RestartInvocationRpcResponse::StillRunning => RestartInvocationResponse::StillRunning,
244+
RestartInvocationRpcResponse::Unsupported => RestartInvocationResponse::Unsupported,
245+
RestartInvocationRpcResponse::MissingInput => RestartInvocationResponse::MissingInput,
246+
RestartInvocationRpcResponse::NotStarted => RestartInvocationResponse::NotStarted,
247+
}
248+
}
249+
}
250+
251+
impl From<RestartInvocationResponse> for RestartInvocationRpcResponse {
252+
fn from(value: RestartInvocationResponse) -> Self {
253+
match value {
254+
RestartInvocationResponse::Ok => RestartInvocationRpcResponse::Ok,
255+
RestartInvocationResponse::NotFound => RestartInvocationRpcResponse::NotFound,
256+
RestartInvocationResponse::StillRunning => RestartInvocationRpcResponse::StillRunning,
257+
RestartInvocationResponse::Unsupported => RestartInvocationRpcResponse::Unsupported,
258+
RestartInvocationResponse::MissingInput => RestartInvocationRpcResponse::MissingInput,
259+
RestartInvocationResponse::NotStarted => RestartInvocationRpcResponse::NotStarted,
260+
}
261+
}
262+
}
263+
207264
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208265
pub enum PartitionProcessorRpcResponse {
209266
Appended,
@@ -216,4 +273,5 @@ pub enum PartitionProcessorRpcResponse {
216273
KillInvocation(KillInvocationRpcResponse),
217274
PurgeInvocation(PurgeInvocationRpcResponse),
218275
PurgeJournal(PurgeInvocationRpcResponse),
276+
RestartInvocation(RestartInvocationRpcResponse),
219277
}

crates/wal-protocol/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use restate_types::GenerationalNodeId;
1313
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey};
1414
use restate_types::invocation::{
1515
AttachInvocationRequest, GetInvocationOutputResponse, InvocationResponse,
16-
InvocationTermination, NotifySignalRequest, PurgeInvocationRequest, ServiceInvocation,
16+
InvocationTermination, NotifySignalRequest, PurgeInvocationRequest, ServiceInvocation, restart,
1717
};
1818
use restate_types::logs::{HasRecordKeys, Keys, MatchKeyQuery};
1919
use restate_types::message::MessageIndex;
@@ -148,6 +148,8 @@ pub enum Command {
148148
ProxyThrough(ServiceInvocation),
149149
/// Attach to an existing invocation
150150
AttachInvocation(AttachInvocationRequest),
151+
/// Restart an invocation
152+
RestartInvocation(restart::Request),
151153

152154
// -- Partition processor events for PP
153155
/// Invoker is reporting effect(s) from an ongoing invocation.
@@ -203,6 +205,7 @@ impl HasRecordKeys for Envelope {
203205
}
204206
Command::PurgeInvocation(purge) => Keys::Single(purge.invocation_id.partition_key()),
205207
Command::PurgeJournal(purge) => Keys::Single(purge.invocation_id.partition_key()),
208+
Command::RestartInvocation(restart) => Keys::Single(restart.partition_key()),
206209
Command::Invoke(invoke) => Keys::Single(invoke.partition_key()),
207210
// todo: Remove this, or pass the partition key range but filter based on partition-id
208211
// on read if needed.
@@ -316,6 +319,7 @@ mod envelope {
316319
NotifyGetInvocationOutputResponse = 13, // bilrost
317320
NotifySignal = 14, // protobuf
318321
PurgeJournal = 15, // bilrost
322+
RestartInvocation = 16, // serde
319323
}
320324

321325
#[derive(bilrost::Message)]
@@ -439,6 +443,10 @@ mod envelope {
439443
Command::PurgeJournal(value) => {
440444
(CommandKind::PurgeJournal, Field::encode_bilrost(value))
441445
}
446+
Command::RestartInvocation(value) => (
447+
CommandKind::RestartInvocation,
448+
Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value),
449+
),
442450
Command::Invoke(value) => {
443451
let value = protobuf::ServiceInvocation::from(value.clone());
444452
(CommandKind::Invoke, Field::encode_protobuf(&value))
@@ -528,6 +536,10 @@ mod envelope {
528536
codec_or_error!(envelope.command, StorageCodecKind::Bilrost);
529537
Command::PurgeJournal(envelope.command.decode_bilrost()?)
530538
}
539+
CommandKind::RestartInvocation => {
540+
codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde);
541+
Command::RestartInvocation(envelope.command.decode_serde()?)
542+
}
531543
CommandKind::Invoke => {
532544
codec_or_error!(envelope.command, StorageCodecKind::Protobuf);
533545
let value: protobuf::ServiceInvocation = envelope.command.decode_protobuf()?;

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,16 @@ impl LeaderState {
501501
)));
502502
}
503503
}
504+
Action::ForwardRestartInvocationResponse {
505+
request_id,
506+
response,
507+
} => {
508+
if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) {
509+
response_tx.send(Ok(PartitionProcessorRpcResponse::RestartInvocation(
510+
response.into(),
511+
)));
512+
}
513+
}
504514
}
505515

506516
Ok(())

0 commit comments

Comments
 (0)