-
Notifications
You must be signed in to change notification settings - Fork 13
fix(substantial): recover from broken run + race conditions #1033
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThe changes span multiple components, introducing deduplication of operations in Rust backend logic, enhancing logging and debugging in Rust engine code, and refactoring TypeScript logic for run state validation and operation scheduling. Type definitions were centralized, utility functions updated, and new service configuration for Redis Commander was added. Minor test and import adjustments were also made. Changes
Sequence Diagram(s)sequenceDiagram
participant Backend
participant Run (Rust)
participant Engine (Rust)
participant Agent (TS)
participant Workflow
%% Deduplication during persist/recover
Engine->>Run (Rust): recover_from(backend)
Run (Rust)->>Run (Rust): compact() (deduplicate operations)
Run (Rust)-->>Engine: return deduped Run
Agent (TS)->>Run (Rust): persist_into(backend)
Run (Rust)->>Run (Rust): compact() (deduplicate before persist)
Run (Rust)-->>Agent (TS): persist result
%% Agent scheduling and validation
Agent (TS)->>Agent (TS): validateRunIntegrity(run)
Agent (TS)->>Agent (TS): checkOperationHasBeenScheduled(run, op)
Agent (TS)->>Agent (TS): runHasStopped(run)
Agent (TS)->>Workflow: schedule/append operation if valid
Suggested reviewers
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
src/typegate/src/runtimes/substantial/agent.ts (1)
136-146
: Off-by-one in acquisition loop
while (replayRequests.length <= acquireMaxForThisAgent)
allows one more run than the configuredmaxAcquirePerTick
.
Change to strict<
to respect the limit:-while (replayRequests.length <= acquireMaxForThisAgent) { +while (replayRequests.length < acquireMaxForThisAgent) {
🧹 Nitpick comments (6)
src/typegate/src/runtimes/substantial/common.ts (1)
66-71
: Consider using a TypeScript enum forExecutionStatus
Literal unions work, but an
enum
(orconst enum
) provides:
• Exhaustiveness checks in switch statements
• A single source of truth for serialisation (ExecutionStatus.COMPLETED
instead of"COMPLETED"
)Not critical, but improves maintainability and reduces typo risk.
src/typegate/src/runtimes/substantial/deno_context.ts (1)
35-44
: Events silently dropped after stop – consider logging or throwing
#appendOp
now silently ignores any operation oncerunHasStopped(this.run)
is true.
When an unexpected component tries to append after a Stop, debugging becomes harder because the write vanishes without a trace.-if (!runHasStopped(this.run)) { - this.run.operations.push({ at: new Date().toJSON(), event: op }); -} +if (runHasStopped(this.run)) { + console.warn( + `[substantial] Ignored ${op.type} after Stop for run ${this.run.run_id}`, + ); + return; +} +this.run.operations.push({ at: new Date().toJSON(), event: op });Even a simple
console.warn
(orlogger.warn
) would greatly help troubleshooting.src/substantial/src/converters.rs (1)
96-108
:recover_from
mutates the run but does not ensure ordering
self.compact()
eliminates duplicates but keeps the first encounter order which depends on backend iteration order.
If backends ever return events unordered, the reconstructed run may differ from the original chronological order.Consider sorting before deduplication:
self.operations.sort_by_key(|op| op.at); self.compact();Doing so once inside
compact()
avoids surprises.src/typegate/src/runtimes/substantial.ts (1)
27-30
: Minor: unusedExecutionStatus
import?
ExecutionStatus
is only referenced inside the localQueryCompletedWorkflowResult
interface in the same file; if that interface is not exported it can be inlined with the import eliminated to keep the surface minimal.src/typegate/engine/src/runtimes/substantial.rs (1)
123-148
: Instrument macro still logs fullinput
op_sub_store_add_schedule
now logs at debug level and only skipsstate
; the whole schedule payload (includingbackend
) will be attached to the span.
If you adopt the redaction strategy suggested above, remember to adjust this attribute as well:-#[tracing::instrument(level = "debug", skip(state))] +#[tracing::instrument(level = "debug", skip(state, input))]src/typegate/src/runtimes/substantial/agent.ts (1)
517-518
: Remove strayconsole.log
debug dump
console.log("AAA", …)
slipped intovalidateRunIntegrity
; this pollutes stdout and bypasses the project logger.
Delete or replace withlogger.debug
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
src/substantial/src/converters.rs
(3 hunks)src/substantial/tests/mod.rs
(1 hunks)src/typegate/engine/src/runtimes/substantial.rs
(19 hunks)src/typegate/src/runtimes/substantial.ts
(2 hunks)src/typegate/src/runtimes/substantial/agent.ts
(9 hunks)src/typegate/src/runtimes/substantial/common.ts
(2 hunks)src/typegate/src/runtimes/substantial/deno_context.ts
(2 hunks)src/typegate/src/runtimes/substantial/filter_utils.ts
(1 hunks)src/typegate/src/runtimes/substantial/worker.ts
(1 hunks)src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
(1 hunks)tests/runtimes/substantial/filter_utils_test.ts
(1 hunks)tools/compose/compose.subredis.yml
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
- GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
- GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
- GitHub Check: test-full
- GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
- GitHub Check: pre-commit
🔇 Additional comments (5)
tools/compose/compose.subredis.yml (1)
10-18
: Pin the image tag & avoid exposing credentials in plaintext
image: ghcr.io/joeferner/redis-commander:latest
Usinglatest
makes builds non-deterministic and can silently introduce breaking changes. Pin to a digest or semver tag (v0.16.0
, etc.).
REDIS_HOSTS=local:subredis:6379:0:password
The password is committed in plaintext. Consider:
• Using Docker secrets / env-file
• Supplying the password at deploy time via CI secretsPort
6390
is published to the host network. If this is meant only for internal debugging, restrict it (127.0.0.1:6390:8081
) or guard it with network-level ACLs.src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)
11-11
: Import switch LGTMAligning imports to
./common.ts
matches the refactor and removes the now-deadtypes.ts
.src/typegate/src/runtimes/substantial/worker.ts (1)
7-11
: Import relocation acknowledgedNo functional impact; keeps the module graph consistent after the refactor.
tests/runtimes/substantial/filter_utils_test.ts (1)
11-11
: Test updated to new type location – OKThe test compiles against the refactored source; no further action needed.
src/typegate/src/runtimes/substantial/filter_utils.ts (1)
4-6
: VerifyExecutionStatus
covers all literal variants used below
SearchItem
later assigns"ONGOING" | "COMPLETED" | "COMPLETED_WITH_ERROR"
(lines 70-77).
Please ensureExecutionStatus
exported bycommon.ts
is a union that includes those three exact string literals; otherwise TypeScript will widen the type tostring
or raise a mismatch.Run a quick search to confirm:
rg --context 2 'export type ExecutionStatus' src/typegate/src/runtimes/substantial
61255e1
to
43c5129
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)
109-165
: Excellent solution for async interval management with a minor optimization opportunity.The
BlockingInterval
class effectively solves the race condition problem withsetInterval
and async handlers. The implementation is solid with proper state management and error handling.However, the
kill
method could be optimized to avoid polling:export class BlockingInterval { #killed = false; #running = false; + #runningResolver?: () => void; constructor(private logger?: Logger) { } async start(delayMs: number, handler: () => Promise<void> | void) { if (this.#running) { throw new Error("Interval already running"); } this.#killed = false; this.#running = true; while (!this.#killed) { try { await handler(); } catch (err) { this.logger?.error("BlockingInterval iteration error:", err); } if (this.#killed) { break; } await new Promise((res) => setTimeout(res, delayMs)); } this.#running = false; + this.#runningResolver?.(); } async kill() { this.#killed = true; if (this.#running) { - const ensureKillMs = 60; - - await new Promise((res) => { - const interval = setInterval(() => { - if (!this.#running) { - clearInterval(interval); - res(true); - } - }, ensureKillMs); - }); + await new Promise<void>((res) => { + this.#runningResolver = res; + }); } } }src/typegate/src/runtimes/substantial/agent.ts (2)
254-283
: Solid lease renewal logic with acknowledged race condition concerns.The lease renewal mechanism is well-implemented with proper boundary checking and cleanup. The comments appropriately acknowledge the inherent race condition limitations.
Consider logging when leases are cleaned up for inactive runs:
if (!isRunning) { + this.logger.debug(`Cleaning up lease tracking for completed run: ${runId}`); this.mustLockRunIds.delete(runId); }
533-537
: Excellent run integrity validation with comprehensive error reporting.The
validateRunIntegrity
function provides thorough validation of run log consistency, detecting overlapping runs and improper start/stop sequences. TheprettyRun
helper enhances debugging by providing readable operation summaries.Minor typo fix needed:
throw new Error( - `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start`, + `"${run.run_id}" has potentially corrupted logs, attempted stopping already closed run, or run with a missing Start`, );Also applies to: 539-567
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/substantial/src/converters.rs
(3 hunks)src/typegate/engine/src/runtimes/substantial.rs
(19 hunks)src/typegate/src/runtimes/substantial/agent.ts
(11 hunks)src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/substantial/src/converters.rs
- src/typegate/engine/src/runtimes/substantial.rs
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
- GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
- GitHub Check: test-full
- GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
- GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
- GitHub Check: pre-commit
🔇 Additional comments (8)
src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)
5-5
: LGTM! Clean import updates.The import changes are well-structured - adding the Logger type and updating the module path to common.ts aligns with the codebase reorganization.
Also applies to: 11-11
src/typegate/src/runtimes/substantial/agent.ts (7)
9-9
: LGTM! Import updates align with refactoring.The import changes properly reflect the reorganization of types and addition of new utilities for operation checking and run validation.
Also applies to: 19-19, 21-21, 24-25
47-50
: Well-structured lease management setup.The addition of
BlockingInterval
instances andmustLockRunIds
map provides a solid foundation for the new lease renewal mechanism.Also applies to: 59-62
119-138
: Excellent dual-interval approach for lease management.The separation of polling and heartbeat intervals is well-designed. Running the heartbeat at half the lease lifespan is a smart approach to prevent lease expiration during operations.
299-299
: Critical addition for run integrity validation.Adding
validateRunIntegrity(run)
is essential for detecting corrupted run logs early in the replay process.
312-323
: Smart duplicate operation prevention.The logic to check and discard already persisted schedules effectively prevents duplicate operations from crashed/interrupted processes. This directly addresses the PR objectives for recovery from duplicated schedule events.
362-362
: Proper lease tracking initialization.Adding the run ID to
mustLockRunIds
when starting a worker ensures the lease renewal mechanism can track and maintain the lease for active runs.
493-507
: Robust duplicate stop operation prevention.The logic to check if a run has already stopped or if the stop operation is already scheduled prevents duplicate stop operations, which aligns with the PR's goal of recovering from duplicated schedule events.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1033 +/- ##
==========================================
- Coverage 80.64% 80.60% -0.04%
==========================================
Files 145 145
Lines 17545 17625 +80
Branches 1980 1990 +10
==========================================
+ Hits 14149 14207 +58
- Misses 3378 3398 +20
- Partials 18 20 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/typegate/src/runtimes/substantial/agent.ts (2)
112-117
: Minor improvement: Consider extracting workflow name formatting.The workflow name formatting logic could be extracted for better readability, though the current implementation is functional.
+ const workflowNames = workflows.map(({ name }) => name).join(", "); this.logger.warn( - `Initializing agent to handle ${ - workflows - .map(({ name }) => name) - .join(", ") - }`, + `Initializing agent to handle ${workflowNames}`, );
143-145
: Add proper error handling for interval cleanup.While the BlockingInterval.kill() calls are appropriate, consider adding error handling for cleanup failures.
async stop() { await this.workerManager.deinit(); - // TODO: how are we so sure that deinit == worker doing no more work - await this.pollInterval.kill(); - await this.hearbeatInterval.kill(); + // TODO: how are we so sure that deinit == worker doing no more work + try { + await this.pollInterval.kill(); + await this.hearbeatInterval.kill(); + } catch (err) { + this.logger.warn(`Failed to clean up intervals: ${err}`); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/typegate/src/runtimes/substantial/agent.ts
(11 hunks)src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
- GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
- GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
- GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
- GitHub Check: test-full
- GitHub Check: pre-commit
🔇 Additional comments (15)
src/typegate/src/runtimes/substantial/agent.ts (15)
9-9
: LGTM! Import changes align with enhanced functionality.The new imports properly support the enhanced deduplication logic (
Operation
,checkOperationHasBeenScheduled
,runHasStopped
) and asynchronous interval management (BlockingInterval
).Also applies to: 19-19, 21-21, 24-25
59-63
: LGTM! Proper initialization of new data structures.The Map initialization and BlockingInterval instances are correctly set up to support the enhanced lease management and asynchronous polling mechanisms.
119-125
: LGTM! BlockingInterval prevents overlapping poll iterations.The replacement of
setInterval
withBlockingInterval
effectively addresses the timing and concurrency issues mentioned in the PR objectives. This ensures each polling iteration completes before the next one begins.
127-138
: Excellent lease renewal heartbeat implementation.The heartbeat interval with a frequency of
leaseLifespanSec / 2
provides good safety margin for lease renewal before expiration. The error handling ensures robustness.
254-290
: Robust lease renewal mechanism with proper race condition mitigation.The
#renewLeases()
method effectively addresses the lease expiration race conditions mentioned in the PR. The boundary-based renewal logic (durationMs / 3
) provides good safety margin, and the cleanup of inactive runs prevents memory leaks.
306-306
: Enhanced run integrity validation improves robustness.The addition of
validateRunIntegrity(run)
provides better detection of corrupted run logs, addressing the recovery scenarios outlined in the PR objectives.
308-317
: LGTM! Proper handling of already stopped runs.The early return for stopped runs prevents unnecessary processing and ensures schedules are properly closed, addressing the interrupted schedule recovery mentioned in the PR.
319-326
: Excellent deduplication logic for crash recovery.This addition directly addresses the PR objective of recovering from duplicated schedule events. The logic properly detects already persisted schedules and closes them to prevent inconsistent states.
369-369
: Proper lease timestamp tracking for renewal logic.Setting the lease acquisition timestamp enables the heartbeat mechanism to determine when renewal is needed. This integrates well with the
#renewLeases()
method.
500-514
: Enhanced stop operation deduplication prevents duplicate completions.The logic properly checks for already stopped runs and duplicate stop operations before adding them to the operations list, addressing the race conditions mentioned in the PR objectives.
538-538
: Proper cleanup of lease tracking on completion.Removing the runId from
mustLockRunIds
on completion prevents memory leaks and ensures accurate lease tracking.
542-546
: Useful debugging utility for run visualization.The
prettyRun
function provides valuable debugging information by showing the operation timeline, which aids in troubleshooting the complex run lifecycle scenarios.
548-576
: Comprehensive run integrity validation logic.The
validateRunIntegrity
function effectively detects corrupted run logs by tracking the lifecycle balance (Start/Stop events). The detailed error messages withprettyRun
output will help diagnose issues in production.
329-329
: Verify the removal of appendIfOngoing function.Direct manipulation of the operations array is cleaner, but ensure that the removed
appendIfOngoing
function's logic is fully replaced by the new deduplication checks.#!/bin/bash # Verify that appendIfOngoing is no longer used anywhere in the codebase rg -A 3 'appendIfOngoing'
47-50
: ```shell
#!/bin/bashSearch for all references to mustLockRunIds and its Map operations
rg -n "mustLockRunIds" .
rg -n "mustLockRunIds.set" .
rg -n "mustLockRunIds.get" .
rg -n "mustLockRunIds.has" .
rg -n "mustLockRunIds.delete" .</details> </blockquote></details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
90a8390
to
166226f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
src/substantial/src/converters.rs
(3 hunks)src/substantial/tests/mod.rs
(1 hunks)src/typegate/engine/src/runtimes/substantial.rs
(19 hunks)src/typegate/src/runtimes/substantial.ts
(2 hunks)src/typegate/src/runtimes/substantial/agent.ts
(11 hunks)src/typegate/src/runtimes/substantial/common.ts
(2 hunks)src/typegate/src/runtimes/substantial/deno_context.ts
(2 hunks)src/typegate/src/runtimes/substantial/filter_utils.ts
(1 hunks)src/typegate/src/runtimes/substantial/worker.ts
(1 hunks)src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
(2 hunks)tests/runtimes/substantial/filter_utils_test.ts
(1 hunks)tools/compose/compose.subredis.yml
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (11)
- src/typegate/src/runtimes/substantial/worker.ts
- src/typegate/src/runtimes/substantial/filter_utils.ts
- tests/runtimes/substantial/filter_utils_test.ts
- src/substantial/tests/mod.rs
- src/typegate/src/runtimes/substantial.ts
- src/typegate/src/runtimes/substantial/deno_context.ts
- src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
- src/substantial/src/converters.rs
- src/typegate/src/runtimes/substantial/common.ts
- tools/compose/compose.subredis.yml
- src/typegate/engine/src/runtimes/substantial.rs
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
- GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
- GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
- GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
- GitHub Check: test-full
- GitHub Check: pre-commit
🔇 Additional comments (6)
src/typegate/src/runtimes/substantial/agent.ts (6)
254-290
: Excellent lease renewal mechanism with proper race condition mitigation.The
#renewLeases
method effectively addresses the race condition concerns mentioned in the PR objectives. The approach of using a heartbeat interval that's smaller than the lease lifespan (half the duration) and checking if runs are about to expire before renewal is well-designed.
306-326
: Robust deduplication and validation logic.The addition of
validateRunIntegrity
andcheckOperationHasBeenScheduled
calls effectively prevents duplicate scheduling and corrupted run states, which aligns with the PR's goal of recovering from interrupted or duplicated schedule events.
369-369
: Proper lease tracking integration.The addition of run IDs to
mustLockRunIds
map correctly integrates with the new lease renewal mechanism.
509-514
: Smart duplicate prevention in graceful completion.The logic to check if a run has already stopped or if the stop operation has been scheduled prevents duplicate stop operations, which is crucial for maintaining run log integrity.
548-576
: Well-implemented run integrity validation.The
validateRunIntegrity
function effectively detects corrupted run logs by tracking the lifecycle state (life counter) and ensuring proper Start/Stop event ordering. The error messages are descriptive and include run details for debugging.
542-546
: Useful debugging utility.The
prettyRun
function provides a clean way to visualize run operations for logging and debugging purposes.
These dups can occur when we crash at a given timing and the underlying event of the appointed schedule was not closed. The engine will happily append onto the operation log, we throw by default but realistically we can recover.
However 1. should make sure dups do not occur accross all nodes, this should mitigate unknown unknowns (timestamp identifies schedules so it should be safe).
WARN: Undesirable side effects can still happen if we crash before saving the Saved results.
Migration notes
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
Style
Documentation