Skip to content

Commit 6b92151

Browse files
authored
Filter out Temporal errors from SyncWorkflowImpl (#19293)
1 parent cc93c46 commit 6b92151

File tree

2 files changed

+31
-12
lines changed

2 files changed

+31
-12
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/tracing/TemporalSdkInterceptor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222
public class TemporalSdkInterceptor implements TraceInterceptor {
2323

2424
/**
25-
* Trace resource name used to scope the filtering performed by this interceptor.
25+
* Connection Manager trace resource name used to scope the filtering performed by this interceptor.
2626
*/
2727
static final String CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME = "ConnectionManagerWorkflowImpl.run";
2828

29+
/**
30+
* Sync Workflow trace resource name used to scope the filtering performed by this interceptor.
31+
*/
32+
static final String SYNC_WORKFLOW_IMPL_RESOURCE_NAME = "SyncWorkflowImpl.run";
33+
2934
/**
3035
* Error message tag key name that contains the Temporal exit error message.
3136
*/
@@ -79,7 +84,8 @@ boolean isExitTrace(final MutableSpan trace) {
7984
return trace.isError() &&
8085
EXIT_ERROR_MESSAGE.equalsIgnoreCase(trace.getTags().getOrDefault(ERROR_MESSAGE_TAG_KEY, "").toString()) &&
8186
(safeEquals(trace.getOperationName(), WORKFLOW_TRACE_OPERATION_NAME)
82-
|| safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME));
87+
|| safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME)
88+
|| safeEquals(trace.getResourceName(), SYNC_WORKFLOW_IMPL_RESOURCE_NAME));
8389
}
8490

8591
/**

airbyte-workers/src/test/java/io/airbyte/workers/tracing/TemporalSdkInterceptorTest.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME;
99
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG_KEY;
1010
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.EXIT_ERROR_MESSAGE;
11+
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.SYNC_WORKFLOW_IMPL_RESOURCE_NAME;
1112
import static org.junit.jupiter.api.Assertions.assertEquals;
1213
import static org.junit.jupiter.api.Assertions.assertFalse;
1314
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -39,10 +40,15 @@ void testOnTraceComplete() {
3940
temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
4041
temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
4142

42-
final var temporalExitMsgResourceNameError = new DummySpan();
43-
temporalExitMsgResourceNameError.setError(true);
44-
temporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
45-
temporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
43+
final var connectionManagerTemporalExitMsgResourceNameError = new DummySpan();
44+
connectionManagerTemporalExitMsgResourceNameError.setError(true);
45+
connectionManagerTemporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
46+
connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
47+
48+
final var syncWorkflowTemporalExitMsgResourceNameError = new DummySpan();
49+
syncWorkflowTemporalExitMsgResourceNameError.setError(true);
50+
syncWorkflowTemporalExitMsgResourceNameError.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME);
51+
syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
4652

4753
final var temporalExitMsgOtherOperationError = new DummySpan();
4854
temporalExitMsgOtherOperationError.setError(true);
@@ -55,7 +61,8 @@ void testOnTraceComplete() {
5561
temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
5662

5763
final var spans = List.of(
58-
simple, noError, otherError, temporalExitMsgOperationNameError, temporalExitMsgResourceNameError, temporalExitMsgOtherOperationError,
64+
simple, noError, otherError, temporalExitMsgOperationNameError, connectionManagerTemporalExitMsgResourceNameError,
65+
syncWorkflowTemporalExitMsgResourceNameError, temporalExitMsgOtherOperationError,
5966
temporalExitMsgOtherResourceError);
6067

6168
final var interceptor = new TemporalSdkInterceptor();
@@ -66,7 +73,8 @@ void testOnTraceComplete() {
6673
assertFalse(noError.isError());
6774
assertTrue(otherError.isError());
6875
assertFalse(temporalExitMsgOperationNameError.isError());
69-
assertFalse(temporalExitMsgResourceNameError.isError());
76+
assertFalse(connectionManagerTemporalExitMsgResourceNameError.isError());
77+
assertFalse(syncWorkflowTemporalExitMsgResourceNameError.isError());
7078
assertTrue(temporalExitMsgOtherOperationError.isError());
7179
assertTrue(temporalExitMsgOtherResourceError.isError());
7280
}
@@ -91,10 +99,15 @@ void testIsExitTrace() {
9199
temporalTraceWithErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
92100
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndOperationName));
93101

94-
final var temporalTraceWithErrorAndResourceName = new DummySpan();
95-
temporalTraceWithErrorAndResourceName.setError(true);
96-
temporalTraceWithErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
97-
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndResourceName));
102+
final var temporalTraceWithErrorAndConnectionManagerResourceName = new DummySpan();
103+
temporalTraceWithErrorAndConnectionManagerResourceName.setError(true);
104+
temporalTraceWithErrorAndConnectionManagerResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
105+
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndConnectionManagerResourceName));
106+
107+
final var temporalTraceWithErrorAndSyncWorkflowResourceName = new DummySpan();
108+
temporalTraceWithErrorAndSyncWorkflowResourceName.setError(true);
109+
temporalTraceWithErrorAndSyncWorkflowResourceName.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME);
110+
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndSyncWorkflowResourceName));
98111

99112
final var temporalTraceWithExitErrorAndOperationName = new DummySpan();
100113
temporalTraceWithExitErrorAndOperationName.setError(true);

0 commit comments

Comments
 (0)