Skip to content

Commit 0cbf121

Browse files
authored
Trace refresh schema operations (#22326)
1 parent bb4d777 commit 0cbf121

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package io.airbyte.workers.temporal.sync;
66

77
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
8+
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
9+
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY;
810

911
import datadog.trace.api.Trace;
1012
import io.airbyte.api.client.generated.SourceApi;
@@ -13,8 +15,10 @@
1315
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
1416
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
1517
import io.airbyte.commons.features.EnvVariableFeatureFlags;
18+
import io.airbyte.metrics.lib.ApmTraceUtils;
1619
import jakarta.inject.Singleton;
1720
import java.time.OffsetDateTime;
21+
import java.util.Map;
1822
import java.util.UUID;
1923
import lombok.extern.slf4j.Slf4j;
2024

@@ -25,48 +29,54 @@ public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {
2529
private final SourceApi sourceApi;
2630
private final EnvVariableFeatureFlags envVariableFeatureFlags;
2731

28-
public RefreshSchemaActivityImpl(SourceApi sourceApi,
29-
EnvVariableFeatureFlags envVariableFeatureFlags) {
32+
public RefreshSchemaActivityImpl(final SourceApi sourceApi,
33+
final EnvVariableFeatureFlags envVariableFeatureFlags) {
3034
this.sourceApi = sourceApi;
3135
this.envVariableFeatureFlags = envVariableFeatureFlags;
3236
}
3337

3438
@Override
3539
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
36-
public boolean shouldRefreshSchema(UUID sourceCatalogId) {
40+
public boolean shouldRefreshSchema(final UUID sourceCatalogId) {
3741
if (!envVariableFeatureFlags.autoDetectSchema()) {
3842
return false;
3943
}
4044

45+
ApmTraceUtils.addTagsToTrace(Map.of(SOURCE_ID_KEY, sourceCatalogId));
4146
return !schemaRefreshRanRecently(sourceCatalogId);
4247
}
4348

4449
@Override
45-
public void refreshSchema(UUID sourceCatalogId, UUID connectionId) {
50+
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
51+
public void refreshSchema(final UUID sourceCatalogId, final UUID connectionId) {
4652
if (!envVariableFeatureFlags.autoDetectSchema()) {
4753
return;
4854
}
4955

50-
SourceDiscoverSchemaRequestBody requestBody =
56+
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, SOURCE_ID_KEY, sourceCatalogId));
57+
58+
final SourceDiscoverSchemaRequestBody requestBody =
5159
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId);
5260

5361
try {
5462
sourceApi.discoverSchemaForSource(requestBody);
5563
} catch (final Exception e) {
64+
ApmTraceUtils.addExceptionToTrace(e);
5665
// catching this exception because we don't want to block replication due to a failed schema refresh
5766
log.error("Attempted schema refresh, but failed with error: ", e);
5867
}
5968
}
6069

61-
private boolean schemaRefreshRanRecently(UUID sourceCatalogId) {
70+
private boolean schemaRefreshRanRecently(final UUID sourceCatalogId) {
6271
try {
63-
SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId);
64-
ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody);
72+
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId);
73+
final ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody);
6574
if (mostRecentFetchEvent.getUpdatedAt() == null) {
6675
return false;
6776
}
6877
return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
69-
} catch (ApiException e) {
78+
} catch (final ApiException e) {
79+
ApmTraceUtils.addExceptionToTrace(e);
7080
// catching this exception because we don't want to block replication due to a failed schema refresh
7181
log.info("Encountered an error fetching most recent actor catalog fetch event: ", e);
7282
return true;

0 commit comments

Comments
 (0)