Skip to content

Commit 328d23a

Browse files
authored
feat(orchestrator): add endpoint to retrigger workflow in error state (#1343)
* feat(orchestrator): add endpoint to retrigger workflow in error state * Minor improvements
1 parent ff1629a commit 328d23a

File tree

10 files changed

+473
-58
lines changed

10 files changed

+473
-58
lines changed

plugins/orchestrator-backend/src/service/OrchestratorService.test.ts

Lines changed: 133 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ const workflowOverview = createWorkflowOverviewMock(1);
5252
const workflowOverviews = createWorkflowOverviewsMock(3);
5353
const instance = createInstanceMock(1);
5454
const instances = createInstancesMock(3);
55+
const serviceUrl = 'http://localhost';
56+
const inputData = { foo: 'bar' };
5557

5658
// Mocked dependencies
5759
const sonataFlowServiceMock = {} as SonataFlowService;
5860
const workflowCacheServiceMock = {} as WorkflowCacheService;
5961
const dataIndexServiceMock = {} as DataIndexService;
6062

61-
// Target service
63+
// Target
6264
const orchestratorService = new OrchestratorService(
6365
sonataFlowServiceMock,
6466
dataIndexServiceMock,
@@ -87,10 +89,8 @@ describe('OrchestratorService', () => {
8789

8890
expect(
8991
dataIndexServiceMock.fetchDefinitionIdByInstanceId,
90-
).toHaveBeenCalledWith(instanceId);
91-
expect(dataIndexServiceMock.abortWorkflowInstance).toHaveBeenCalledWith(
92-
instanceId,
93-
);
92+
).toHaveBeenCalled();
93+
expect(dataIndexServiceMock.abortWorkflowInstance).toHaveBeenCalled();
9494
});
9595

9696
it('should skip and not execute the operation when the workflow is not available', async () => {
@@ -106,7 +106,7 @@ describe('OrchestratorService', () => {
106106

107107
expect(
108108
dataIndexServiceMock.fetchDefinitionIdByInstanceId,
109-
).toHaveBeenCalledWith(instanceId);
109+
).toHaveBeenCalled();
110110
expect(dataIndexServiceMock.abortWorkflowInstance).not.toHaveBeenCalled();
111111
});
112112

@@ -129,11 +129,8 @@ describe('OrchestratorService', () => {
129129

130130
expect(
131131
dataIndexServiceMock.fetchDefinitionIdByInstanceId,
132-
).toHaveBeenCalledWith(instanceId);
133-
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalledWith(
134-
definitionId,
135-
'throw',
136-
);
132+
).toHaveBeenCalled();
133+
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
137134
expect(dataIndexServiceMock.abortWorkflowInstance).not.toHaveBeenCalled();
138135
});
139136
});
@@ -433,7 +430,6 @@ describe('OrchestratorService', () => {
433430
});
434431

435432
describe('fetchWorkflowInfoOnService', () => {
436-
const serviceUrl = 'http://localhost';
437433
beforeEach(() => {
438434
jest.clearAllMocks();
439435
});
@@ -602,8 +598,6 @@ describe('OrchestratorService', () => {
602598
});
603599

604600
describe('executeWorkflow', () => {
605-
const serviceUrl = 'http://localhost';
606-
const inputData = {};
607601
const executeResponse: WorkflowExecutionResponse = {
608602
id: createInstanceIdMock(1),
609603
};
@@ -661,4 +655,129 @@ describe('OrchestratorService', () => {
661655
expect(sonataFlowServiceMock.executeWorkflow).not.toHaveBeenCalled();
662656
});
663657
});
658+
659+
describe('retriggerInstanceInError', () => {
660+
beforeEach(() => {
661+
jest.clearAllMocks();
662+
});
663+
664+
it('should execute the operation when the workflow is available', async () => {
665+
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(true);
666+
sonataFlowServiceMock.retriggerInstanceInError = jest
667+
.fn()
668+
.mockResolvedValue(true);
669+
670+
await orchestratorService.retriggerInstanceInError({
671+
definitionId,
672+
serviceUrl,
673+
instanceId,
674+
cacheHandler: 'skip',
675+
});
676+
677+
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
678+
expect(sonataFlowServiceMock.retriggerInstanceInError).toHaveBeenCalled();
679+
});
680+
681+
it('should skip and not execute the operation when the workflow is not available', async () => {
682+
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(false);
683+
684+
await orchestratorService.retriggerInstanceInError({
685+
definitionId,
686+
serviceUrl,
687+
instanceId,
688+
cacheHandler: 'skip',
689+
});
690+
691+
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
692+
expect(
693+
sonataFlowServiceMock.retriggerInstanceInError,
694+
).not.toHaveBeenCalled();
695+
});
696+
697+
it('should throw an error and not execute the operation when the workflow is not available', async () => {
698+
workflowCacheServiceMock.isAvailable = jest
699+
.fn()
700+
.mockImplementation(() => {
701+
throw new Error();
702+
});
703+
704+
const promise = orchestratorService.retriggerInstanceInError({
705+
definitionId,
706+
serviceUrl,
707+
instanceId,
708+
cacheHandler: 'throw',
709+
});
710+
711+
await expect(promise).rejects.toThrow();
712+
713+
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
714+
expect(
715+
sonataFlowServiceMock.retriggerInstanceInError,
716+
).not.toHaveBeenCalled();
717+
});
718+
});
719+
720+
describe('updateInstanceInputData', () => {
721+
beforeEach(() => {
722+
jest.clearAllMocks();
723+
});
724+
725+
it('should execute the operation when the workflow is available', async () => {
726+
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(true);
727+
sonataFlowServiceMock.updateInstanceInputData = jest
728+
.fn()
729+
.mockResolvedValue(true);
730+
731+
await orchestratorService.updateInstanceInputData({
732+
definitionId,
733+
serviceUrl,
734+
instanceId,
735+
inputData,
736+
cacheHandler: 'skip',
737+
});
738+
739+
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
740+
expect(sonataFlowServiceMock.updateInstanceInputData).toHaveBeenCalled();
741+
});
742+
743+
it('should skip and not execute the operation when the workflow is not available', async () => {
744+
workflowCacheServiceMock.isAvailable = jest.fn().mockReturnValue(false);
745+
746+
await orchestratorService.updateInstanceInputData({
747+
definitionId,
748+
serviceUrl,
749+
instanceId,
750+
inputData,
751+
cacheHandler: 'skip',
752+
});
753+
754+
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
755+
expect(
756+
sonataFlowServiceMock.updateInstanceInputData,
757+
).not.toHaveBeenCalled();
758+
});
759+
760+
it('should throw an error and not execute the operation when the workflow is not available', async () => {
761+
workflowCacheServiceMock.isAvailable = jest
762+
.fn()
763+
.mockImplementation(() => {
764+
throw new Error();
765+
});
766+
767+
const promise = orchestratorService.updateInstanceInputData({
768+
definitionId,
769+
serviceUrl,
770+
instanceId,
771+
inputData,
772+
cacheHandler: 'throw',
773+
});
774+
775+
await expect(promise).rejects.toThrow();
776+
777+
expect(workflowCacheServiceMock.isAvailable).toHaveBeenCalled();
778+
expect(
779+
sonataFlowServiceMock.updateInstanceInputData,
780+
).not.toHaveBeenCalled();
781+
});
782+
});
664783
});

plugins/orchestrator-backend/src/service/OrchestratorService.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ export class OrchestratorService {
161161
public async executeWorkflow(args: {
162162
definitionId: string;
163163
serviceUrl: string;
164-
inputData: Record<string, string>;
164+
inputData: ProcessInstanceVariables;
165165
businessKey?: string;
166166
cacheHandler?: CacheHandler;
167167
}): Promise<WorkflowExecutionResponse | undefined> {
@@ -188,4 +188,41 @@ export class OrchestratorService {
188188
? await this.sonataFlowService.fetchWorkflowOverview(definitionId)
189189
: undefined;
190190
}
191+
192+
public async retriggerInstanceInError(args: {
193+
definitionId: string;
194+
serviceUrl: string;
195+
instanceId: string;
196+
cacheHandler?: CacheHandler;
197+
}): Promise<boolean> {
198+
const { definitionId, cacheHandler } = args;
199+
200+
const isWorkflowAvailable = this.workflowCacheService.isAvailable(
201+
definitionId,
202+
cacheHandler,
203+
);
204+
205+
return isWorkflowAvailable
206+
? await this.sonataFlowService.retriggerInstanceInError(args)
207+
: false;
208+
}
209+
210+
public async updateInstanceInputData(args: {
211+
definitionId: string;
212+
serviceUrl: string;
213+
instanceId: string;
214+
inputData: ProcessInstanceVariables;
215+
cacheHandler?: CacheHandler;
216+
}): Promise<boolean> {
217+
const { definitionId, cacheHandler } = args;
218+
219+
const isWorkflowAvailable = this.workflowCacheService.isAvailable(
220+
definitionId,
221+
cacheHandler,
222+
);
223+
224+
return isWorkflowAvailable
225+
? await this.sonataFlowService.updateInstanceInputData(args)
226+
: false;
227+
}
191228
}

plugins/orchestrator-backend/src/service/SonataFlowService.ts

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
getWorkflowCategory,
77
ProcessInstance,
88
ProcessInstanceStateValues,
9+
ProcessInstanceVariables,
910
WorkflowDefinition,
1011
WorkflowExecutionResponse,
1112
WorkflowInfo,
@@ -14,7 +15,6 @@ import {
1415

1516
import { Pagination } from '../types/pagination';
1617
import { DataIndexService } from './DataIndexService';
17-
import { executeWithRetry } from './Helper';
1818

1919
export class SonataFlowService {
2020
constructor(
@@ -28,7 +28,7 @@ export class SonataFlowService {
2828
}): Promise<WorkflowInfo | undefined> {
2929
try {
3030
const urlToFetch = `${args.serviceUrl}/management/processes/${args.definitionId}`;
31-
const response = await executeWithRetry(() => fetch(urlToFetch));
31+
const response = await fetch(urlToFetch);
3232

3333
if (response.ok) {
3434
const json = await response.json();
@@ -93,15 +93,15 @@ export class SonataFlowService {
9393
public async executeWorkflow(args: {
9494
definitionId: string;
9595
serviceUrl: string;
96-
inputData: Record<string, string>;
96+
inputData: ProcessInstanceVariables;
9797
businessKey?: string;
9898
}): Promise<WorkflowExecutionResponse | undefined> {
9999
try {
100-
const workflowEndpoint = args.businessKey
100+
const urlToFetch = args.businessKey
101101
? `${args.serviceUrl}/${args.definitionId}?businessKey=${args.businessKey}`
102102
: `${args.serviceUrl}/${args.definitionId}`;
103103

104-
const result = await fetch(workflowEndpoint, {
104+
const result = await fetch(urlToFetch, {
105105
method: 'POST',
106106
body: JSON.stringify(args.inputData),
107107
headers: { 'content-type': 'application/json' },
@@ -195,4 +195,43 @@ export class SonataFlowService {
195195
}
196196
return false;
197197
}
198+
199+
public async retriggerInstanceInError(args: {
200+
definitionId: string;
201+
serviceUrl: string;
202+
instanceId: string;
203+
}): Promise<boolean> {
204+
const { definitionId, serviceUrl, instanceId } = args;
205+
try {
206+
const urlToFetch = `${serviceUrl}/management/processes/${definitionId}/instances/${instanceId}/retrigger`;
207+
const response = await fetch(urlToFetch, {
208+
method: 'POST',
209+
});
210+
return response.ok;
211+
} catch (error) {
212+
this.logger.error(`Error when retriggering workflow in error: ${error}`);
213+
}
214+
return false;
215+
}
216+
217+
public async updateInstanceInputData(args: {
218+
definitionId: string;
219+
serviceUrl: string;
220+
instanceId: string;
221+
inputData: ProcessInstanceVariables;
222+
}): Promise<boolean> {
223+
const { definitionId, serviceUrl, instanceId, inputData } = args;
224+
try {
225+
const urlToFetch = `${serviceUrl}/${definitionId}/${instanceId}`;
226+
const response = await fetch(urlToFetch, {
227+
method: 'PATCH',
228+
body: JSON.stringify(inputData),
229+
headers: { 'content-type': 'application/json' },
230+
});
231+
return response.ok;
232+
} catch (error) {
233+
this.logger.error(`Error when updating instance input data: ${error}`);
234+
}
235+
return false;
236+
}
198237
}

0 commit comments

Comments
 (0)