Skip to content

Commit 7097bc6

Browse files
authored
Merge pull request #18 from danielgerlag/saga-feature
Saga feature
2 parents 0d6612d + 2c6e265 commit 7097bc6

19 files changed

+641
-47
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
[![Build Status](https://travis-ci.org/danielgerlag/workflow-es.svg?branch=master)](https://travis-ci.org/danielgerlag/workflow-es)
44

5-
Workflow ES is a workflow / durable task library for Node.js (or modern browsers). It supports pluggable persistence and concurrency providers to allow for multi-node clusters.
5+
Workflow ES is a workflow / saga library for Node.js (or modern browsers). It supports pluggable persistence and concurrency providers to allow for multi-node clusters.
66

77
## Installing
88

core/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
[![Build Status](https://travis-ci.org/danielgerlag/workflow-es.svg?branch=master)](https://travis-ci.org/danielgerlag/workflow-es)
44

5-
Workflow ES is a workflow / durable task library for Node.js (or modern browsers). It supports pluggable persistence and concurrency providers to allow for multi-node clusters.
5+
Workflow ES is a workflow / saga library for Node.js (or modern browsers). It supports pluggable persistence and concurrency providers to allow for multi-node clusters.
66

77
## Installing
88

core/package.json

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "workflow-es",
3-
"version": "2.2.1",
3+
"version": "2.3.0",
44
"description": "A lightweight workflow engine for Node.js",
55
"main": "./build/src/index.js",
66
"typings": "./build/src/index.d.ts",
@@ -10,7 +10,7 @@
1010
"test": "jasmine"
1111
},
1212
"keywords": [
13-
"workflow"
13+
"workflow", "saga"
1414
],
1515
"author": {
1616
"email": "[email protected]",
@@ -30,7 +30,6 @@
3030
},
3131
"dependencies": {
3232
"inversify": "^4.1.0",
33-
"moment": "^2.21.0",
3433
"reflect-metadata": "^0.1.10"
3534
}
3635
}

core/src/abstractions/execution-pointer-factory.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ export interface IExecutionPointerFactory {
44
buildGenesisPointer(defintion: WorkflowDefinition): ExecutionPointer;
55
buildNextPointer(pointer: ExecutionPointer, outcomeTarget: StepOutcome): ExecutionPointer;
66
buildChildPointer(pointer: ExecutionPointer, childId: number, branch: any): ExecutionPointer;
7+
buildCompensationPointer(pointer: ExecutionPointer, exceptionPointer: ExecutionPointer, compensationStepId: number): ExecutionPointer;
78
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { WorkflowInstance, WorkflowExecutorResult, WorkflowStepBase, ExecutionPointer, ExecutionResult } from "../models";
1+
import { WorkflowInstance, WorkflowDefinition, WorkflowExecutorResult, WorkflowStepBase, ExecutionPointer, ExecutionResult } from "../models";
22

33
export interface IExecutionResultProcessor {
44
processExecutionResult(stepResult: ExecutionResult, pointer: ExecutionPointer, instance: WorkflowInstance, step: WorkflowStepBase, workflowResult: WorkflowExecutorResult);
5+
handleStepException(workflow: WorkflowInstance, definition: WorkflowDefinition, pointer: ExecutionPointer, step: WorkflowStepBase);
56
}

core/src/fluent-builders/step-builder.ts

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { StepBody, InlineStepBody } from "../abstractions";
2-
import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling } from "../models";
2+
import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling, SagaContainer } from "../models";
33
import { WaitFor, Foreach, While, If, Delay, Schedule, Sequence } from "../primitives";
44
import { WorkflowBuilder } from "./workflow-builder";
55
import { ReturnStepBuilder } from "./return-step-builder";
@@ -193,6 +193,20 @@ export class StepBuilder<TStepBody extends StepBody, TData> {
193193
return stepBuilder;
194194
}
195195

196+
public saga(builder: (then: WorkflowBuilder<TData>) => void): StepBuilder<Sequence, TData> {
197+
var newStep = new SagaContainer<Sequence>();
198+
newStep.body = Sequence;
199+
this.workflowBuilder.addStep(newStep);
200+
let stepBuilder = new StepBuilder<Sequence, TData>(this.workflowBuilder, newStep);
201+
let outcome = new StepOutcome();
202+
outcome.nextStep = newStep.id;
203+
this.step.outcomes.push(outcome);
204+
builder(this.workflowBuilder);
205+
stepBuilder.step.children.push(stepBuilder.step.id + 1); //TODO: make more elegant
206+
207+
return stepBuilder;
208+
}
209+
196210
public schedule(interval: (data :TData) => number): ReturnStepBuilder<TData, Schedule, TStepBody> {
197211
let newStep = new WorkflowStep<Schedule>();
198212
newStep.body = Schedule;
@@ -223,11 +237,39 @@ export class StepBuilder<TStepBody extends StepBody, TData> {
223237
return stepBuilder;
224238
}
225239

240+
public compensateWith<TNewStepBody extends StepBody>(body: { new(): TNewStepBody; }, setup: (step: StepBuilder<TNewStepBody, TData>) => void = null): StepBuilder<TStepBody, TData> {
241+
let newStep = new WorkflowStep<TNewStepBody>();
242+
newStep.body = body;
243+
this.workflowBuilder.addStep(newStep);
244+
let stepBuilder = new StepBuilder<TNewStepBody, TData>(this.workflowBuilder, newStep);
245+
246+
//setup
247+
if (setup) {
248+
setup(stepBuilder);
249+
}
250+
251+
this.step.compensationStepId = newStep.id;
252+
253+
return this;
254+
}
255+
256+
public compensateWithSequence(sequence: (then: WorkflowBuilder<TData>) => void): StepBuilder<TStepBody, TData> {
257+
let newStep = new WorkflowStep<Sequence>();
258+
newStep.body = Sequence;
259+
this.workflowBuilder.addStep(newStep);
260+
let stepBuilder = new StepBuilder<Sequence, TData>(this.workflowBuilder, newStep);
261+
this.step.compensationStepId = newStep.id;
262+
sequence(this.workflowBuilder);
263+
stepBuilder.step.children.push(stepBuilder.step.id + 1); //TODO: make more elegant
264+
265+
return this;
266+
}
267+
226268
public do(builder: (then: WorkflowBuilder<TData>) => void): StepBuilder<TStepBody, TData> {
227269
builder(this.workflowBuilder);
228-
this.step.children.push(this.step.id + 1); //TODO: make more elegant
270+
this.step.children.push(this.step.id + 1); //TODO: make more elegant
229271

230272
return this;
231273
}
232274

233-
}
275+
}

core/src/models.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ export * from "./models/workflow-error-handling";
1313
export * from "./models/execution-pipeline-directive";
1414
export * from "./models/workflow-executor-result";
1515
export * from "./models/container-data";
16-
export * from "./models/schedule-persistence-data";
16+
export * from "./models/schedule-persistence-data";
17+
export * from "./models/saga-container";

core/src/models/execution-pointer.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,21 @@ export class ExecutionPointer {
1414
public eventData: any;
1515
public outcome: any;
1616
public stepName: string;
17-
public retryCount: number;
17+
public retryCount: number = 0;
1818
public children: string[] = [];
1919
public contextItem: any;
2020
public predecessorId: string;
21-
21+
public scope: string[] = [];
22+
public status: number = 0;
23+
}
24+
25+
export var PointerStatus = {
26+
Legacy: 0,
27+
Pending: 1,
28+
Running: 2,
29+
Complete: 3,
30+
Sleeping: 4,
31+
WaitingForEvent: 5,
32+
Failed: 6,
33+
Compensated: 7
2234
}

core/src/models/saga-container.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { StepBody } from "../abstractions";
2+
import { ExecutionPointer } from "./execution-pointer";
3+
import { WorkflowStep } from "./workflow-step";
4+
5+
export class SagaContainer<T extends StepBody> extends WorkflowStep<T> {
6+
7+
public resumeChildrenAfterCompensation(): boolean {
8+
return false;
9+
}
10+
11+
public revertChildrenAfterCompensation(): boolean {
12+
return true;
13+
}
14+
15+
public primeForRetry(pointer: ExecutionPointer) {
16+
super.primeForRetry(pointer);
17+
pointer.persistenceData = null;
18+
}
19+
20+
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export var WorkflowErrorHandling = {
2-
Retry : 0,
3-
Suspend : 1,
4-
Terminate : 2
2+
Retry : 1,
3+
Suspend : 2,
4+
Terminate : 3,
5+
Compensate: 4
56
}

core/src/models/workflow-step.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ export abstract class WorkflowStepBase {
1616
public outcomes: Array<StepOutcome> = [];
1717
public children: Array<number> = [];
1818
public errorBehavior : number;
19-
public retryInterval : number;
19+
public retryInterval : number = 60000;
20+
public compensationStepId : number;
2021

2122
public inputs: Array<(step: StepBody, data: any) => void> = [];
2223
public outputs: Array<(step: StepBody, data: any) => void> = [];
@@ -31,6 +32,20 @@ export abstract class WorkflowStepBase {
3132

3233
public afterExecute(executorResult: WorkflowExecutorResult, context: StepExecutionContext, stepResult: ExecutionResult, executionPointer: ExecutionPointer) {
3334
}
35+
36+
public afterWorkflowIteration(executorResult: WorkflowExecutorResult, defintion: WorkflowDefinition, workflow: WorkflowInstance, executionPointer: ExecutionPointer) {
37+
}
38+
39+
public resumeChildrenAfterCompensation(): boolean {
40+
return true;
41+
}
42+
43+
public revertChildrenAfterCompensation(): boolean {
44+
return false;
45+
}
46+
47+
public primeForRetry(pointer: ExecutionPointer) {
48+
}
3449

3550
}
3651

core/src/services/execution-pointer-factory.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { injectable, inject } from "inversify";
22
import { IExecutionPointerFactory, ILogger, IWorkflowRegistry, IWorkflowExecutor, TYPES, IExecutionResultProcessor } from "../abstractions";
33
import { WorkflowHost } from "./workflow-host";
4-
import { WorkflowDefinition, ExecutionPointer, EventSubscription, StepOutcome, ExecutionResult, StepExecutionContext, WorkflowStepBase, WorkflowStatus, ExecutionError, WorkflowErrorHandling, ExecutionPipelineDirective, WorkflowExecutorResult } from "../models";
4+
import { WorkflowDefinition, ExecutionPointer, PointerStatus, EventSubscription, StepOutcome, ExecutionResult, StepExecutionContext, WorkflowStepBase, WorkflowStatus, ExecutionError, WorkflowErrorHandling, ExecutionPipelineDirective, WorkflowExecutorResult } from "../models";
55

66
@injectable()
77
export class ExecutionPointerFactory implements IExecutionPointerFactory {
@@ -10,6 +10,7 @@ export class ExecutionPointerFactory implements IExecutionPointerFactory {
1010
let result = new ExecutionPointer();
1111
result.active = true;
1212
result.stepId = 0;
13+
result.status = PointerStatus.Pending;
1314
result.id = this.generatePointerId();
1415

1516
return result;
@@ -19,9 +20,12 @@ export class ExecutionPointerFactory implements IExecutionPointerFactory {
1920
let result = new ExecutionPointer();
2021
result.active = true;
2122
result.stepId = outcomeTarget.nextStep;
23+
result.status = PointerStatus.Pending;
2224
result.id = this.generatePointerId();
2325
result.predecessorId = pointer.id;
24-
result.contextItem = pointer.contextItem;
26+
result.contextItem = pointer.contextItem;
27+
if (pointer.scope)
28+
result.scope = pointer.scope.slice();
2529

2630
return result;
2731
}
@@ -31,10 +35,28 @@ export class ExecutionPointerFactory implements IExecutionPointerFactory {
3135
result.active = true;
3236
result.id = this.generatePointerId();
3337
result.predecessorId = pointer.id;
38+
result.status = PointerStatus.Pending;
3439
result.stepId = childId;
3540
result.contextItem = branch;
41+
if (pointer.scope)
42+
result.scope = pointer.scope.slice();
3643

44+
result.scope.push(pointer.id);
3745
pointer.children.push(result.id);
46+
47+
return result;
48+
}
49+
50+
public buildCompensationPointer(pointer: ExecutionPointer, exceptionPointer: ExecutionPointer, compensationStepId: number): ExecutionPointer {
51+
let result = new ExecutionPointer();
52+
result.active = true;
53+
result.predecessorId = exceptionPointer.id;
54+
result.stepId = compensationStepId,
55+
result.status = PointerStatus.Pending;
56+
result.id = this.generatePointerId();
57+
result.contextItem = pointer.contextItem;
58+
if (pointer.scope)
59+
result.scope = pointer.scope.slice();
3860

3961
return result;
4062
}

0 commit comments

Comments
 (0)