Skip to content

Commit d4417f0

Browse files
authored
fix: Fix duplicate error events. (#803)
## Proposed changes Fix issue with duplicate error events being emitted.
1 parent 31640c5 commit d4417f0

File tree

1 file changed

+25
-23
lines changed

1 file changed

+25
-23
lines changed

packages/gensx-core/src/component.ts

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
RunInContext,
2323
withContext,
2424
} from "./context.js";
25+
import { WorkflowExecutionContext } from "./workflow-context.js";
2526
import { WorkflowMessageListener } from "./workflow-state.js";
2627

2728
export { STREAMING_PLACEHOLDER };
@@ -220,36 +221,16 @@ export function Component<P extends object = {}, R = unknown>(
220221

221222
if (result instanceof Promise) {
222223
return result
223-
.then((value) => {
224-
return handleResultValue(value, runInContext);
225-
})
224+
.then((value) => handleResultValue(value, runInContext))
226225
.catch((error: unknown) => {
227-
if (error instanceof Error) {
228-
checkpointManager.addMetadata(nodeId, {
229-
error: serializeError(error),
230-
});
231-
checkpointManager.completeNode(nodeId, undefined);
232-
workflowContext.sendWorkflowMessage({
233-
type: "error",
234-
error: JSON.stringify(serializeError(error)),
235-
});
236-
}
226+
handleError(nodeId, error, workflowContext);
237227
throw error;
238228
}) as R;
239229
}
240230

241231
return handleResultValue(result, runInContext!) as R;
242232
} catch (error) {
243-
if (error instanceof Error) {
244-
checkpointManager.addMetadata(nodeId, {
245-
error: serializeError(error),
246-
});
247-
checkpointManager.completeNode(nodeId, undefined);
248-
workflowContext.sendWorkflowMessage({
249-
type: "error",
250-
error: JSON.stringify(serializeError(error)),
251-
});
252-
}
233+
handleError(nodeId, error, workflowContext);
253234
throw error;
254235
}
255236
};
@@ -265,6 +246,27 @@ export function Component<P extends object = {}, R = unknown>(
265246
return ComponentFn;
266247
}
267248

249+
function handleError(
250+
nodeId: string,
251+
error: unknown,
252+
workflowContext: WorkflowExecutionContext,
253+
) {
254+
const serializedError = serializeError(error);
255+
workflowContext.checkpointManager.addMetadata(nodeId, {
256+
error: serializedError,
257+
});
258+
workflowContext.checkpointManager.completeNode(nodeId, undefined);
259+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
260+
if (!(error as any).__gensxErrorEventEmitted) {
261+
workflowContext.sendWorkflowMessage({
262+
type: "error",
263+
error: JSON.stringify(serializedError),
264+
});
265+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
266+
(error as any).__gensxErrorEventEmitted = true;
267+
}
268+
}
269+
268270
type WorkflowRuntimeOpts = WorkflowOpts & {
269271
workflowExecutionId?: string;
270272
messageListener?: WorkflowMessageListener;

0 commit comments

Comments
 (0)