Skip to content

Commit 43c5129

Browse files
committed
fix: recover instead of throw on dup schedules
1 parent e5392c4 commit 43c5129

File tree

5 files changed

+62
-8
lines changed

5 files changed

+62
-8
lines changed

src/substantial/src/converters.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
22
// SPDX-License-Identifier: MPL-2.0
33

4-
use std::collections::HashMap;
4+
use std::collections::{HashMap, HashSet};
55

66
use anyhow::{bail, Context, Ok, Result};
77
use chrono::{DateTime, Utc};
@@ -104,12 +104,15 @@ impl Run {
104104
format!("Recovering operations from backend for {}", self.run_id)
105105
})?;
106106
self.operations = operations;
107+
self.compact();
107108
}
108109

109110
Ok(())
110111
}
111112

112-
pub fn persist_into(&self, backend: &dyn Backend) -> Result<()> {
113+
pub fn persist_into(&mut self, backend: &dyn Backend) -> Result<()> {
114+
self.compact();
115+
113116
let mut records = Records::new();
114117
records.events = self
115118
.operations
@@ -128,6 +131,35 @@ impl Run {
128131
pub fn reset(&mut self) {
129132
self.operations = vec![];
130133
}
134+
135+
/// Try to recover from dups such as
136+
///
137+
/// ```
138+
/// [Start Start ...] or
139+
/// [... Stop Stop] or
140+
/// [ .... Event X Event X ... ]
141+
/// ```
142+
///
143+
/// These dups can occur when we crash at a given timing
144+
/// and the underlying event of the appointed schedule was not closed.
145+
/// The engine will happily append onto the operation log,
146+
/// we throw by default but realistically we can recover.
147+
///
148+
/// WARN: undesirable side effects can still happen if we crash before saving the Saved results.
149+
pub fn compact(&mut self) {
150+
let mut operations = Vec::new();
151+
let mut dup_schedules = HashSet::new();
152+
for operation in &self.operations {
153+
if dup_schedules.contains(&operation.at) {
154+
continue;
155+
}
156+
157+
operations.push(operation.clone());
158+
dup_schedules.insert(operation.at);
159+
}
160+
161+
self.operations = operations;
162+
}
131163
}
132164

133165
impl TryFrom<Event> for Operation {

src/typegate/engine/src/runtimes/substantial.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub async fn op_sub_store_create_or_get_run(
7777
Ok(CreateOrGetOutput { run })
7878
}
7979

80-
#[derive(Deserialize, Debug)]
80+
#[derive(Deserialize, Debug, Clone)]
8181
pub struct PersistRunInput {
8282
pub run: Run,
8383
pub backend: SubstantialBackend,
@@ -95,6 +95,8 @@ pub async fn op_sub_store_persist_run(
9595
state.borrow::<Ctx>().clone()
9696
};
9797

98+
let mut input = input.clone();
99+
98100
let backend = ctx
99101
.backends
100102
.entry(input.backend.as_key())

src/typegate/src/runtimes/substantial/agent.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,22 @@ export class Agent {
227227
// worker matching the runId.
228228
if (this.workerManager.isOngoing(next.run_id)) {
229229
this.logger.warn(
230-
`skip triggering ${next.run_id} for the current tick as it is still ongoing`,
230+
`skip triggering ${next.run_id} for the current tick as it is still ongoing, will renew the lease`,
231231
);
232232

233+
// REMOVE_ME: once an a better lock approach is devised
234+
// Ideally, we want to renew the lease but this might hide other timing bugs
235+
// Hypothesis:
236+
// lease timing is such that when 2+ agents fights for it, one holds onto it and runs,
237+
// but one amoung the others, there is one that is still in process of freeing up its ressource since deallocation != un-leased
238+
// FIX: hearbeat lease, but even with that we need a strong guarantee that ressource-freeing up == un-leased
239+
// timing bugs can still occur but more rare
240+
// await Meta.substantial.agentRenewLease({
241+
// backend: this.backend,
242+
// lease_seconds: this.config.leaseLifespanSec,
243+
// run_id: next.run_id,
244+
// });
245+
233246
return;
234247
}
235248

@@ -419,7 +432,7 @@ export class Agent {
419432
schedule: newSchedule,
420433
});
421434

422-
// FIXME:
435+
// FIXME(see REMOVE_ME above):
423436
// renew lease as a heartbeat side effect of the worker
424437
// this is fine most of the time though (assuming worker execution is within the lease lifespan)
425438
this.logger.info(`Renew lease ${runId}`);
@@ -501,7 +514,6 @@ function prettyRun(run: Run) {
501514

502515
function validateRunIntegrity(run: Run) {
503516
const logger = getLoggerByAddress(import.meta, "substantial");
504-
console.log("AAA", prettyRun(run));
505517

506518
let life = 0;
507519

src/typegate/src/runtimes/substantial/common.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,16 @@ export class Interrupt extends Error {
109109
}
110110
}
111111

112+
export function runHasStarted(run: Run) {
113+
return run.operations.some(({ event }) => event.type == "Start");
114+
}
115+
112116
export function runHasStopped(run: Run) {
113117
return run.operations.some(({ event }) => event.type == "Stop");
114118
}
115119

116120
export function checkOperationHasBeenScheduled(run: Run, operation: Operation) {
117121
return run.operations.some(({ at, event }) =>
118-
at == operation.at && event == operation.event
122+
at == operation.at && event.type == operation.event.type
119123
);
120124
}

src/typegate/src/runtimes/substantial/deno_context.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ export class Context {
3434

3535
#appendOp(op: OperationEvent) {
3636
if (!runHasStopped(this.run)) {
37-
console.log("Append context", op.type);
37+
// console.log(
38+
// "Append context",
39+
// op.type,
40+
// this.run.operations.map((o) => o.event.type),
41+
// );
3842
this.run.operations.push({ at: new Date().toJSON(), event: op });
3943
}
4044
}

0 commit comments

Comments
 (0)