Skip to content

fix(substantial): recover from broken run + race conditions #1033

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

michael-0acf4
Copy link
Contributor

@michael-0acf4 michael-0acf4 commented Jun 18, 2025

  • Broken run
  1. Detect already handled schedules but interrupted before closing then close/skip them
  2. Try to recover from schedule dups such as
 [Start Start ...] or
 [... Stop Stop] or
 [ .... Event X Event X ... ]

These dups can occur when we crash at a given timing and the underlying event of the appointed schedule was not closed. The engine will happily append onto the operation log, we throw by default but realistically we can recover.
However 1. should make sure dups do not occur accross all nodes, this should mitigate unknown unknowns (timestamp identifies schedules so it should be safe).
WARN: Undesirable side effects can still happen if we crash before saving the Saved results.

  • Moved from setInterval to a custom blocking async interval
  • Changed lease renew logic

Migration notes

  • The change comes with new or modified tests
  • Hard-to-understand functions have explanatory comments
  • End-user documentation is updated to reflect the change

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Added a Redis Commander service for easier Redis database management via a web interface.
    • Introduced lease heartbeat renewal to automatically manage lease lifetimes.
    • Added asynchronous interval control to ensure sequential execution of recurring tasks.
  • Bug Fixes

    • Improved handling of duplicate operations and run state validation to prevent data inconsistencies and duplicate scheduling.
    • Enhanced logic to prevent appending operations to stopped runs.
    • Added run log integrity validation to detect corrupted or overlapping run states.
  • Refactor

    • Replaced and reorganized utility functions for checking run status and operation scheduling.
    • Updated type definitions and imports for better code clarity and maintainability.
    • Improved logging, debugging, and tracing for better observability and troubleshooting.
    • Replaced standard intervals with controlled async intervals to avoid concurrency issues.
  • Tests

    • Added assertions to verify lease expiration and run state consistency.
  • Style

    • Reformatted import statements for consistency and readability.
  • Documentation

    • Added comments and documentation to clarify lease handling, concurrency, and run integrity validation.

Copy link

linear bot commented Jun 18, 2025

Copy link
Contributor

coderabbitai bot commented Jun 18, 2025

📝 Walkthrough

Walkthrough

The changes span multiple components, introducing deduplication of operations in Rust backend logic, enhancing logging and debugging in Rust engine code, and refactoring TypeScript logic for run state validation and operation scheduling. Type definitions were centralized, utility functions updated, and new service configuration for Redis Commander was added. Minor test and import adjustments were also made.

Changes

File(s) Change Summary
src/substantial/src/converters.rs Added compact method to deduplicate operations by timestamp in Run; called in recover_from and persist_into; changed persist_into to take &mut self.
src/substantial/tests/mod.rs Added assertion to test that only one lease remains active after expiration in test_state_consistency_logic.
src/typegate/engine/src/runtimes/substantial.rs Added Debug (and some Clone) trait derivations to input/output structs; instrumented async functions with tracing::instrument for debug logging; added debug log for backend root; cloned input in op_sub_store_persist_run.
src/typegate/src/runtimes/substantial.ts Reformatted and clarified import statements; separated type and value imports; no logic changes.
src/typegate/src/runtimes/substantial/agent.ts Refactored Agent class: improved lease handling, replay logic, and run integrity validation; replaced appendIfOngoing with granular checks; added logging and comments; introduced prettyRun and validateRunIntegrity functions; removed old stop-check function; added lease heartbeat renewal mechanism and separate polling interval; introduced mustLockRunIds map to track lease renewals.
src/typegate/src/runtimes/substantial/common.ts Added ExecutionStatus type; removed appendIfOngoing; added runHasStarted, runHasStopped, and checkOperationHasBeenScheduled utility functions.
src/typegate/src/runtimes/substantial/deno_context.ts Replaced appendIfOngoing with direct runHasStopped check before appending operation in #appendOp.
src/typegate/src/runtimes/substantial/filter_utils.ts Removed local ExecutionStatus type; now imports from common.ts.
src/typegate/src/runtimes/substantial/worker.ts Changed import source for types to common.ts; reformatted import statement.
src/typegate/src/runtimes/substantial/workflow_worker_manager.ts Changed import source for types to common.ts; added new BlockingInterval class for async interval handling that waits for handler completion before next iteration; added logger import.
tests/runtimes/substantial/filter_utils_test.ts Changed import source for ExecutionStatus type to common.ts.
tools/compose/compose.subredis.yml Added new redis-commander service with image, restart policy, environment variable for Redis connection, and port mapping.

Sequence Diagram(s)

sequenceDiagram
    participant Backend
    participant Run (Rust)
    participant Engine (Rust)
    participant Agent (TS)
    participant Workflow

    %% Deduplication during persist/recover
    Engine->>Run (Rust): recover_from(backend)
    Run (Rust)->>Run (Rust): compact() (deduplicate operations)
    Run (Rust)-->>Engine: return deduped Run

    Agent (TS)->>Run (Rust): persist_into(backend)
    Run (Rust)->>Run (Rust): compact() (deduplicate before persist)
    Run (Rust)-->>Agent (TS): persist result

    %% Agent scheduling and validation
    Agent (TS)->>Agent (TS): validateRunIntegrity(run)
    Agent (TS)->>Agent (TS): checkOperationHasBeenScheduled(run, op)
    Agent (TS)->>Agent (TS): runHasStopped(run)
    Agent (TS)->>Workflow: schedule/append operation if valid
Loading

Suggested reviewers

  • Natoandro
  • luckasRanarison
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@michael-0acf4 michael-0acf4 requested a review from a team June 18, 2025 15:30
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
src/typegate/src/runtimes/substantial/agent.ts (1)

136-146: Off-by-one in acquisition loop

while (replayRequests.length <= acquireMaxForThisAgent) allows one more run than the configured maxAcquirePerTick.
Change to strict < to respect the limit:

-while (replayRequests.length <= acquireMaxForThisAgent) {
+while (replayRequests.length < acquireMaxForThisAgent) {
🧹 Nitpick comments (6)
src/typegate/src/runtimes/substantial/common.ts (1)

66-71: Consider using a TypeScript enum for ExecutionStatus

Literal unions work, but an enum (or const enum) provides:
• Exhaustiveness checks in switch statements
• A single source of truth for serialisation (ExecutionStatus.COMPLETED instead of "COMPLETED")

Not critical, but improves maintainability and reduces typo risk.

src/typegate/src/runtimes/substantial/deno_context.ts (1)

35-44: Events silently dropped after stop – consider logging or throwing

#appendOp now silently ignores any operation once runHasStopped(this.run) is true.
When an unexpected component tries to append after a Stop, debugging becomes harder because the write vanishes without a trace.

-if (!runHasStopped(this.run)) {
-  this.run.operations.push({ at: new Date().toJSON(), event: op });
-}
+if (runHasStopped(this.run)) {
+  console.warn(
+    `[substantial] Ignored ${op.type} after Stop for run ${this.run.run_id}`,
+  );
+  return;
+}
+this.run.operations.push({ at: new Date().toJSON(), event: op });

Even a simple console.warn (or logger.warn) would greatly help troubleshooting.

src/substantial/src/converters.rs (1)

96-108: recover_from mutates the run but does not ensure ordering

self.compact() eliminates duplicates but keeps the first encounter order which depends on backend iteration order.
If backends ever return events unordered, the reconstructed run may differ from the original chronological order.

Consider sorting before deduplication:

self.operations.sort_by_key(|op| op.at);
self.compact();

Doing so once inside compact() avoids surprises.

src/typegate/src/runtimes/substantial.ts (1)

27-30: Minor: unused ExecutionStatus import?

ExecutionStatus is only referenced inside the local QueryCompletedWorkflowResult interface in the same file; if that interface is not exported it can be inlined with the import eliminated to keep the surface minimal.

src/typegate/engine/src/runtimes/substantial.rs (1)

123-148: Instrument macro still logs full input

op_sub_store_add_schedule now logs at debug level and only skips state; the whole schedule payload (including backend) will be attached to the span.
If you adopt the redaction strategy suggested above, remember to adjust this attribute as well:

-#[tracing::instrument(level = "debug", skip(state))]
+#[tracing::instrument(level = "debug", skip(state, input))]
src/typegate/src/runtimes/substantial/agent.ts (1)

517-518: Remove stray console.log debug dump

console.log("AAA", …) slipped into validateRunIntegrity; this pollutes stdout and bypasses the project logger.
Delete or replace with logger.debug.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 715ffc9 and 61255e1.

📒 Files selected for processing (12)
  • src/substantial/src/converters.rs (3 hunks)
  • src/substantial/tests/mod.rs (1 hunks)
  • src/typegate/engine/src/runtimes/substantial.rs (19 hunks)
  • src/typegate/src/runtimes/substantial.ts (2 hunks)
  • src/typegate/src/runtimes/substantial/agent.ts (9 hunks)
  • src/typegate/src/runtimes/substantial/common.ts (2 hunks)
  • src/typegate/src/runtimes/substantial/deno_context.ts (2 hunks)
  • src/typegate/src/runtimes/substantial/filter_utils.ts (1 hunks)
  • src/typegate/src/runtimes/substantial/worker.ts (1 hunks)
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1 hunks)
  • tests/runtimes/substantial/filter_utils_test.ts (1 hunks)
  • tools/compose/compose.subredis.yml (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: test-full
  • GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: pre-commit
🔇 Additional comments (5)
tools/compose/compose.subredis.yml (1)

10-18: Pin the image tag & avoid exposing credentials in plaintext

  1. image: ghcr.io/joeferner/redis-commander:latest
    Using latest makes builds non-deterministic and can silently introduce breaking changes. Pin to a digest or semver tag (v0.16.0, etc.).

  2. REDIS_HOSTS=local:subredis:6379:0:password
    The password is committed in plaintext. Consider:
    • Using Docker secrets / env-file
    • Supplying the password at deploy time via CI secrets

  3. Port 6390 is published to the host network. If this is meant only for internal debugging, restrict it (127.0.0.1:6390:8081) or guard it with network-level ACLs.

src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)

11-11: Import switch LGTM

Aligning imports to ./common.ts matches the refactor and removes the now-dead types.ts.

src/typegate/src/runtimes/substantial/worker.ts (1)

7-11: Import relocation acknowledged

No functional impact; keeps the module graph consistent after the refactor.

tests/runtimes/substantial/filter_utils_test.ts (1)

11-11: Test updated to new type location – OK

The test compiles against the refactored source; no further action needed.

src/typegate/src/runtimes/substantial/filter_utils.ts (1)

4-6: Verify ExecutionStatus covers all literal variants used below

SearchItem later assigns "ONGOING" | "COMPLETED" | "COMPLETED_WITH_ERROR" (lines 70-77).
Please ensure ExecutionStatus exported by common.ts is a union that includes those three exact string literals; otherwise TypeScript will widen the type to string or raise a mismatch.

Run a quick search to confirm:

rg --context 2 'export type ExecutionStatus' src/typegate/src/runtimes/substantial

@michael-0acf4 michael-0acf4 force-pushed the met-891-avoid-re-running-broken-workflows branch from 61255e1 to 43c5129 Compare June 18, 2025 15:49
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)

109-165: Excellent solution for async interval management with a minor optimization opportunity.

The BlockingInterval class effectively solves the race condition problem with setInterval and async handlers. The implementation is solid with proper state management and error handling.

However, the kill method could be optimized to avoid polling:

 export class BlockingInterval {
   #killed = false;
   #running = false;
+  #runningResolver?: () => void;

   constructor(private logger?: Logger) {
   }

   async start(delayMs: number, handler: () => Promise<void> | void) {
     if (this.#running) {
       throw new Error("Interval already running");
     }

     this.#killed = false;
     this.#running = true;

     while (!this.#killed) {
       try {
         await handler();
       } catch (err) {
         this.logger?.error("BlockingInterval iteration error:", err);
       }

       if (this.#killed) {
         break;
       }

       await new Promise((res) => setTimeout(res, delayMs));
     }

     this.#running = false;
+    this.#runningResolver?.();
   }

   async kill() {
     this.#killed = true;

     if (this.#running) {
-      const ensureKillMs = 60;
-
-      await new Promise((res) => {
-        const interval = setInterval(() => {
-          if (!this.#running) {
-            clearInterval(interval);
-            res(true);
-          }
-        }, ensureKillMs);
-      });
+      await new Promise<void>((res) => {
+        this.#runningResolver = res;
+      });
     }
   }
 }
src/typegate/src/runtimes/substantial/agent.ts (2)

254-283: Solid lease renewal logic with acknowledged race condition concerns.

The lease renewal mechanism is well-implemented with proper boundary checking and cleanup. The comments appropriately acknowledge the inherent race condition limitations.

Consider logging when leases are cleaned up for inactive runs:

       if (!isRunning) {
+        this.logger.debug(`Cleaning up lease tracking for completed run: ${runId}`);
         this.mustLockRunIds.delete(runId);
       }

533-537: Excellent run integrity validation with comprehensive error reporting.

The validateRunIntegrity function provides thorough validation of run log consistency, detecting overlapping runs and improper start/stop sequences. The prettyRun helper enhances debugging by providing readable operation summaries.

Minor typo fix needed:

         throw new Error(
-          `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start`,
+          `"${run.run_id}" has potentially corrupted logs, attempted stopping already closed run, or run with a missing Start`,
         );

Also applies to: 539-567

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 43c5129 and a79c8e6.

📒 Files selected for processing (4)
  • src/substantial/src/converters.rs (3 hunks)
  • src/typegate/engine/src/runtimes/substantial.rs (19 hunks)
  • src/typegate/src/runtimes/substantial/agent.ts (11 hunks)
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/substantial/src/converters.rs
  • src/typegate/engine/src/runtimes/substantial.rs
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: test-full
  • GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
  • GitHub Check: pre-commit
🔇 Additional comments (8)
src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (1)

5-5: LGTM! Clean import updates.

The import changes are well-structured - adding the Logger type and updating the module path to common.ts aligns with the codebase reorganization.

Also applies to: 11-11

src/typegate/src/runtimes/substantial/agent.ts (7)

9-9: LGTM! Import updates align with refactoring.

The import changes properly reflect the reorganization of types and addition of new utilities for operation checking and run validation.

Also applies to: 19-19, 21-21, 24-25


47-50: Well-structured lease management setup.

The addition of BlockingInterval instances and mustLockRunIds map provides a solid foundation for the new lease renewal mechanism.

Also applies to: 59-62


119-138: Excellent dual-interval approach for lease management.

The separation of polling and heartbeat intervals is well-designed. Running the heartbeat at half the lease lifespan is a smart approach to prevent lease expiration during operations.


299-299: Critical addition for run integrity validation.

Adding validateRunIntegrity(run) is essential for detecting corrupted run logs early in the replay process.


312-323: Smart duplicate operation prevention.

The logic to check and discard already persisted schedules effectively prevents duplicate operations from crashed/interrupted processes. This directly addresses the PR objectives for recovery from duplicated schedule events.


362-362: Proper lease tracking initialization.

Adding the run ID to mustLockRunIds when starting a worker ensures the lease renewal mechanism can track and maintain the lease for active runs.


493-507: Robust duplicate stop operation prevention.

The logic to check if a run has already stopped or if the stop operation is already scheduled prevents duplicate stop operations, which aligns with the PR's goal of recovering from duplicated schedule events.

@michael-0acf4 michael-0acf4 changed the title fix(substantial): try recover from broken run fix(substantial): recover from broken run + race conditions Jun 19, 2025
Copy link

codecov bot commented Jun 19, 2025

Codecov Report

Attention: Patch coverage is 77.69231% with 29 lines in your changes missing coverage. Please review.

Project coverage is 80.60%. Comparing base (b8b8820) to head (90a8390).

Files with missing lines Patch % Lines
src/typegate/src/runtimes/substantial/agent.ts 79.31% 17 Missing and 1 partial ⚠️
...rc/runtimes/substantial/workflow_worker_manager.ts 80.00% 6 Missing ⚠️
src/typegate/src/runtimes/substantial/common.ts 44.44% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1033      +/-   ##
==========================================
- Coverage   80.64%   80.60%   -0.04%     
==========================================
  Files         145      145              
  Lines       17545    17625      +80     
  Branches     1980     1990      +10     
==========================================
+ Hits        14149    14207      +58     
- Misses       3378     3398      +20     
- Partials       18       20       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/typegate/src/runtimes/substantial/agent.ts (2)

112-117: Minor improvement: Consider extracting workflow name formatting.

The workflow name formatting logic could be extracted for better readability, though the current implementation is functional.

+    const workflowNames = workflows.map(({ name }) => name).join(", ");
     this.logger.warn(
-      `Initializing agent to handle ${
-        workflows
-          .map(({ name }) => name)
-          .join(", ")
-      }`,
+      `Initializing agent to handle ${workflowNames}`,
     );

143-145: Add proper error handling for interval cleanup.

While the BlockingInterval.kill() calls are appropriate, consider adding error handling for cleanup failures.

   async stop() {
     await this.workerManager.deinit();
-    // TODO: how are we so sure that deinit == worker doing no more work
-    await this.pollInterval.kill();
-    await this.hearbeatInterval.kill();
+    // TODO: how are we so sure that deinit == worker doing no more work
+    try {
+      await this.pollInterval.kill();
+      await this.hearbeatInterval.kill();
+    } catch (err) {
+      this.logger.warn(`Failed to clean up intervals: ${err}`);
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eecde6c and 90a8390.

📒 Files selected for processing (2)
  • src/typegate/src/runtimes/substantial/agent.ts (11 hunks)
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: test-full
  • GitHub Check: pre-commit
🔇 Additional comments (15)
src/typegate/src/runtimes/substantial/agent.ts (15)

9-9: LGTM! Import changes align with enhanced functionality.

The new imports properly support the enhanced deduplication logic (Operation, checkOperationHasBeenScheduled, runHasStopped) and asynchronous interval management (BlockingInterval).

Also applies to: 19-19, 21-21, 24-25


59-63: LGTM! Proper initialization of new data structures.

The Map initialization and BlockingInterval instances are correctly set up to support the enhanced lease management and asynchronous polling mechanisms.


119-125: LGTM! BlockingInterval prevents overlapping poll iterations.

The replacement of setInterval with BlockingInterval effectively addresses the timing and concurrency issues mentioned in the PR objectives. This ensures each polling iteration completes before the next one begins.


127-138: Excellent lease renewal heartbeat implementation.

The heartbeat interval with a frequency of leaseLifespanSec / 2 provides good safety margin for lease renewal before expiration. The error handling ensures robustness.


254-290: Robust lease renewal mechanism with proper race condition mitigation.

The #renewLeases() method effectively addresses the lease expiration race conditions mentioned in the PR. The boundary-based renewal logic (durationMs / 3) provides good safety margin, and the cleanup of inactive runs prevents memory leaks.


306-306: Enhanced run integrity validation improves robustness.

The addition of validateRunIntegrity(run) provides better detection of corrupted run logs, addressing the recovery scenarios outlined in the PR objectives.


308-317: LGTM! Proper handling of already stopped runs.

The early return for stopped runs prevents unnecessary processing and ensures schedules are properly closed, addressing the interrupted schedule recovery mentioned in the PR.


319-326: Excellent deduplication logic for crash recovery.

This addition directly addresses the PR objective of recovering from duplicated schedule events. The logic properly detects already persisted schedules and closes them to prevent inconsistent states.


369-369: Proper lease timestamp tracking for renewal logic.

Setting the lease acquisition timestamp enables the heartbeat mechanism to determine when renewal is needed. This integrates well with the #renewLeases() method.


500-514: Enhanced stop operation deduplication prevents duplicate completions.

The logic properly checks for already stopped runs and duplicate stop operations before adding them to the operations list, addressing the race conditions mentioned in the PR objectives.


538-538: Proper cleanup of lease tracking on completion.

Removing the runId from mustLockRunIds on completion prevents memory leaks and ensures accurate lease tracking.


542-546: Useful debugging utility for run visualization.

The prettyRun function provides valuable debugging information by showing the operation timeline, which aids in troubleshooting the complex run lifecycle scenarios.


548-576: Comprehensive run integrity validation logic.

The validateRunIntegrity function effectively detects corrupted run logs by tracking the lifecycle balance (Start/Stop events). The detailed error messages with prettyRun output will help diagnose issues in production.


329-329: Verify the removal of appendIfOngoing function.

Direct manipulation of the operations array is cleaner, but ensure that the removed appendIfOngoing function's logic is fully replaced by the new deduplication checks.

#!/bin/bash
# Verify that appendIfOngoing is no longer used anywhere in the codebase
rg -A 3 'appendIfOngoing'

47-50: ```shell
#!/bin/bash

Search for all references to mustLockRunIds and its Map operations

rg -n "mustLockRunIds" .
rg -n "mustLockRunIds.set" .
rg -n "mustLockRunIds.get" .
rg -n "mustLockRunIds.has" .
rg -n "mustLockRunIds.delete" .


</details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

@michael-0acf4 michael-0acf4 force-pushed the met-891-avoid-re-running-broken-workflows branch from 90a8390 to 166226f Compare June 25, 2025 16:20
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 90a8390 and 166226f.

📒 Files selected for processing (12)
  • src/substantial/src/converters.rs (3 hunks)
  • src/substantial/tests/mod.rs (1 hunks)
  • src/typegate/engine/src/runtimes/substantial.rs (19 hunks)
  • src/typegate/src/runtimes/substantial.ts (2 hunks)
  • src/typegate/src/runtimes/substantial/agent.ts (11 hunks)
  • src/typegate/src/runtimes/substantial/common.ts (2 hunks)
  • src/typegate/src/runtimes/substantial/deno_context.ts (2 hunks)
  • src/typegate/src/runtimes/substantial/filter_utils.ts (1 hunks)
  • src/typegate/src/runtimes/substantial/worker.ts (1 hunks)
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts (2 hunks)
  • tests/runtimes/substantial/filter_utils_test.ts (1 hunks)
  • tools/compose/compose.subredis.yml (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (11)
  • src/typegate/src/runtimes/substantial/worker.ts
  • src/typegate/src/runtimes/substantial/filter_utils.ts
  • tests/runtimes/substantial/filter_utils_test.ts
  • src/substantial/tests/mod.rs
  • src/typegate/src/runtimes/substantial.ts
  • src/typegate/src/runtimes/substantial/deno_context.ts
  • src/typegate/src/runtimes/substantial/workflow_worker_manager.ts
  • src/substantial/src/converters.rs
  • src/typegate/src/runtimes/substantial/common.ts
  • tools/compose/compose.subredis.yml
  • src/typegate/engine/src/runtimes/substantial.rs
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: lint-compat (macos-13, x86_64-apple-darwin, false)
  • GitHub Check: build-docker (linux/amd64, custom-ubuntu-large)
  • GitHub Check: build-docker (linux/arm64, ubuntu-22.04-arm)
  • GitHub Check: lint-compat (macos-14, aarch64-apple-darwin, false)
  • GitHub Check: test-full
  • GitHub Check: pre-commit
🔇 Additional comments (6)
src/typegate/src/runtimes/substantial/agent.ts (6)

254-290: Excellent lease renewal mechanism with proper race condition mitigation.

The #renewLeases method effectively addresses the race condition concerns mentioned in the PR objectives. The approach of using a heartbeat interval that's smaller than the lease lifespan (half the duration) and checking if runs are about to expire before renewal is well-designed.


306-326: Robust deduplication and validation logic.

The addition of validateRunIntegrity and checkOperationHasBeenScheduled calls effectively prevents duplicate scheduling and corrupted run states, which aligns with the PR's goal of recovering from interrupted or duplicated schedule events.


369-369: Proper lease tracking integration.

The addition of run IDs to mustLockRunIds map correctly integrates with the new lease renewal mechanism.


509-514: Smart duplicate prevention in graceful completion.

The logic to check if a run has already stopped or if the stop operation has been scheduled prevents duplicate stop operations, which is crucial for maintaining run log integrity.


548-576: Well-implemented run integrity validation.

The validateRunIntegrity function effectively detects corrupted run logs by tracking the lifecycle state (life counter) and ensuring proper Start/Stop event ordering. The error messages are descriptive and include run details for debugging.


542-546: Useful debugging utility.

The prettyRun function provides a clean way to visualize run operations for logging and debugging purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant