Skip to content

Commit bb84fac

Browse files
authored
Add new actorCatalogWithUpdatedAt endpoint
1 parent 6ec3967 commit bb84fac

File tree

10 files changed

+186
-31
lines changed

10 files changed

+186
-31
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,29 @@ paths:
599599
$ref: "#/components/responses/NotFoundResponse"
600600
"422":
601601
$ref: "#/components/responses/InvalidInputResponse"
602+
/v1/sources/most_recent_source_actor_catalog:
603+
post:
604+
tags:
605+
- source
606+
summary: Get most recent ActorCatalog for source
607+
operationId: getMostRecentSourceActorCatalog
608+
requestBody:
609+
content:
610+
application/json:
611+
schema:
612+
$ref: "#/components/schemas/SourceIdRequestBody"
613+
required: true
614+
responses:
615+
"200":
616+
description: Successful operation
617+
content:
618+
application/json:
619+
schema:
620+
$ref: "#/components/schemas/ActorCatalogWithUpdatedAt"
621+
"404":
622+
$ref: "#/components/responses/NotFoundResponse"
623+
"422":
624+
$ref: "#/components/responses/InvalidInputResponse"
602625
/v1/sources/search:
603626
post:
604627
tags:
@@ -3692,6 +3715,16 @@ components:
36923715
properties:
36933716
logType:
36943717
$ref: "#/components/schemas/LogType"
3718+
# ACTOR CATALOG
3719+
ActorCatalogWithUpdatedAt:
3720+
description: A source actor catalog with the timestamp it was mostly recently updated
3721+
type: object
3722+
properties:
3723+
updatedAt:
3724+
type: integer
3725+
format: int64
3726+
catalog:
3727+
type: object
36953728
# SCHEMA CATALOG
36963729
AirbyteCatalog:
36973730
description: describes the available schema (catalog).
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
"$schema": http://json-schema.org/draft-07/schema#
3+
title: ActorCatalogWithUpdatedAt
4+
description: Catalog of an actor with its most recent ActorCatalogFetchEvent created_at timestamp.
5+
type: object
6+
additionalProperties: false
7+
required:
8+
- id
9+
- catalog
10+
- catalogHash
11+
- updatedAt
12+
properties:
13+
id:
14+
type: string
15+
format: uuid
16+
catalog:
17+
type: object
18+
existingJavaType: com.fasterxml.jackson.databind.JsonNode
19+
catalogHash:
20+
type: string
21+
updatedAt:
22+
type: integer
23+
format: int64

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.airbyte.commons.version.Version;
3333
import io.airbyte.config.ActorCatalog;
3434
import io.airbyte.config.ActorCatalogFetchEvent;
35+
import io.airbyte.config.ActorCatalogWithUpdatedAt;
3536
import io.airbyte.config.ConfigSchema;
3637
import io.airbyte.config.DestinationConnection;
3738
import io.airbyte.config.DestinationOAuthParameter;
@@ -1314,6 +1315,16 @@ public Optional<ActorCatalog> getActorCatalog(final UUID actorId,
13141315
return records.stream().findFirst().map(DbConverter::buildActorCatalog);
13151316
}
13161317

1318+
public Optional<ActorCatalogWithUpdatedAt> getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException {
1319+
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk(), ACTOR_CATALOG_FETCH_EVENT.CREATED_AT)
1320+
.from(ACTOR_CATALOG)
1321+
.join(ACTOR_CATALOG_FETCH_EVENT)
1322+
.on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID))
1323+
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
1324+
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch());
1325+
return records.stream().findFirst().map(DbConverter::buildActorCatalogWithUpdatedAt);
1326+
}
1327+
13171328
public Optional<ActorCatalog> getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException {
13181329
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
13191330
.from(ACTOR_CATALOG)

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.airbyte.commons.json.Jsons;
1818
import io.airbyte.config.ActorCatalog;
1919
import io.airbyte.config.ActorCatalogFetchEvent;
20+
import io.airbyte.config.ActorCatalogWithUpdatedAt;
2021
import io.airbyte.config.ActorDefinitionResourceRequirements;
2122
import io.airbyte.config.DestinationConnection;
2223
import io.airbyte.config.DestinationOAuthParameter;
@@ -218,6 +219,14 @@ public static ActorCatalog buildActorCatalog(final Record record) {
218219
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
219220
}
220221

222+
public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Record record) {
223+
return new ActorCatalogWithUpdatedAt()
224+
.withId(record.get(ACTOR_CATALOG.ID))
225+
.withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString()))
226+
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH))
227+
.withUpdatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC));
228+
}
229+
221230
public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
222231
return new ActorCatalogFetchEvent()
223232
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID))

airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.server.apis;
66

77
import io.airbyte.api.generated.SourceApi;
8+
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
89
import io.airbyte.api.model.generated.CheckConnectionRead;
910
import io.airbyte.api.model.generated.SourceCloneRequestBody;
1011
import io.airbyte.api.model.generated.SourceCreate;
@@ -66,6 +67,11 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) {
6667
return ApiHelper.execute(() -> sourceHandler.getSource(sourceIdRequestBody));
6768
}
6869

70+
@Override
71+
public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) {
72+
return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(sourceIdRequestBody));
73+
}
74+
6975
@Override
7076
public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
7177
return ApiHelper.execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody));

airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.google.common.collect.Lists;
9+
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
910
import io.airbyte.api.model.generated.ConnectionRead;
1011
import io.airbyte.api.model.generated.SourceCloneConfiguration;
1112
import io.airbyte.api.model.generated.SourceCloneRequestBody;
@@ -31,6 +32,7 @@
3132
import io.airbyte.validation.json.JsonValidationException;
3233
import java.io.IOException;
3334
import java.util.List;
35+
import java.util.Optional;
3436
import java.util.UUID;
3537
import java.util.function.Supplier;
3638

@@ -132,6 +134,17 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody)
132134
return buildSourceRead(sourceIdRequestBody.getSourceId());
133135
}
134136

137+
public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalogWithUpdatedAt(final SourceIdRequestBody sourceIdRequestBody)
138+
throws IOException {
139+
Optional<io.airbyte.config.ActorCatalogWithUpdatedAt> actorCatalog =
140+
configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId());
141+
if (actorCatalog.isEmpty()) {
142+
return new ActorCatalogWithUpdatedAt();
143+
} else {
144+
return new ActorCatalogWithUpdatedAt().updatedAt(actorCatalog.get().getUpdatedAt()).catalog(actorCatalog.get().getCatalog());
145+
}
146+
}
147+
135148
public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody)
136149
throws JsonValidationException, IOException, ConfigNotFoundException {
137150
// read source configuration from db

airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ class WebBackendConnectionsHandlerTest {
123123
private SchedulerHandler schedulerHandler;
124124
private StateHandler stateHandler;
125125
private WebBackendConnectionsHandler wbHandler;
126-
127126
private SourceRead sourceRead;
128127
private ConnectionRead connectionRead;
129128
private ConnectionRead brokenConnectionRead;
@@ -1090,6 +1089,7 @@ void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationExcep
10901089
new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId));
10911090

10921091
final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of());
1092+
10931093
when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize(
10941094
"{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}"))));
10951095
when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff);

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,33 @@
88

99
import datadog.trace.api.Trace;
1010
import io.airbyte.api.client.generated.SourceApi;
11+
import io.airbyte.api.client.invoker.generated.ApiException;
12+
import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt;
1113
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
14+
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
1215
import io.airbyte.commons.features.EnvVariableFeatureFlags;
13-
import io.airbyte.config.ActorCatalogFetchEvent;
14-
import io.airbyte.config.persistence.ConfigRepository;
1516
import jakarta.inject.Singleton;
16-
import java.io.IOException;
1717
import java.time.OffsetDateTime;
18-
import java.util.Optional;
1918
import java.util.UUID;
2019
import lombok.extern.slf4j.Slf4j;
2120

2221
@Slf4j
2322
@Singleton
2423
public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {
2524

26-
private final Optional<ConfigRepository> configRepository;
27-
2825
private final SourceApi sourceApi;
2926
private final EnvVariableFeatureFlags envVariableFeatureFlags;
3027

31-
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository,
32-
SourceApi sourceApi,
28+
public RefreshSchemaActivityImpl(SourceApi sourceApi,
3329
EnvVariableFeatureFlags envVariableFeatureFlags) {
34-
this.configRepository = configRepository;
3530
this.sourceApi = sourceApi;
3631
this.envVariableFeatureFlags = envVariableFeatureFlags;
3732
}
3833

3934
@Override
4035
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
4136
public boolean shouldRefreshSchema(UUID sourceCatalogId) {
42-
// if job persistence is unavailable, default to skipping the schema refresh
43-
if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) {
37+
if (!envVariableFeatureFlags.autoDetectSchema()) {
4438
return false;
4539
}
4640

@@ -66,12 +60,13 @@ public void refreshSchema(UUID sourceCatalogId, UUID connectionId) {
6660

6761
private boolean schemaRefreshRanRecently(UUID sourceCatalogId) {
6862
try {
69-
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);
70-
if (mostRecentFetchEvent.isEmpty()) {
63+
SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId);
64+
ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody);
65+
if (mostRecentFetchEvent.getUpdatedAt() == null) {
7166
return false;
7267
}
73-
return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
74-
} catch (IOException e) {
68+
return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
69+
} catch (ApiException e) {
7570
// catching this exception because we don't want to block replication due to a failed schema refresh
7671
log.info("Encountered an error fetching most recent actor catalog fetch event: ", e);
7772
return true;

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,19 @@
44

55
package io.airbyte.workers.temporal.scheduling.activities;
66

7+
import static org.mockito.ArgumentMatchers.any;
78
import static org.mockito.Mockito.mock;
89
import static org.mockito.Mockito.times;
910
import static org.mockito.Mockito.verify;
1011
import static org.mockito.Mockito.when;
1112

1213
import io.airbyte.api.client.generated.SourceApi;
1314
import io.airbyte.api.client.invoker.generated.ApiException;
15+
import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt;
1416
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
1517
import io.airbyte.commons.features.EnvVariableFeatureFlags;
16-
import io.airbyte.config.ActorCatalogFetchEvent;
17-
import io.airbyte.config.persistence.ConfigRepository;
1818
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
19-
import java.io.IOException;
2019
import java.time.OffsetDateTime;
21-
import java.util.Optional;
2220
import java.util.UUID;
2321
import org.assertj.core.api.Assertions;
2422
import org.junit.jupiter.api.BeforeEach;
@@ -29,7 +27,6 @@
2927
@ExtendWith(MockitoExtension.class)
3028
class RefreshSchemaActivityTest {
3129

32-
static private ConfigRepository mConfigRepository;
3330
static private SourceApi mSourceApi;
3431
static private EnvVariableFeatureFlags mEnvVariableFeatureFlags;
3532

@@ -40,32 +37,31 @@ class RefreshSchemaActivityTest {
4037
@BeforeEach
4138
void setUp() {
4239
mSourceApi = mock(SourceApi.class);
43-
mConfigRepository = mock(ConfigRepository.class);
4440
mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
4541
mSourceApi = mock(SourceApi.class);
4642
when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true);
47-
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags);
43+
refreshSchemaActivity = new RefreshSchemaActivityImpl(mSourceApi, mEnvVariableFeatureFlags);
4844
}
4945

5046
@Test
51-
void testShouldRefreshSchemaNoRecentRefresh() throws IOException {
52-
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty());
47+
void testShouldRefreshSchemaNoRecentRefresh() throws ApiException {
48+
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(new ActorCatalogWithUpdatedAt());
5349
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
5450
}
5551

5652
@Test
57-
void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException {
53+
void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws ApiException {
5854
Long twoDaysAgo = OffsetDateTime.now().minusHours(48l).toEpochSecond();
59-
ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twoDaysAgo);
60-
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent));
55+
ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twoDaysAgo);
56+
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
6157
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
6258
}
6359

6460
@Test
65-
void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException {
61+
void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws ApiException {
6662
Long twelveHoursAgo = OffsetDateTime.now().minusHours(12l).toEpochSecond();
67-
ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twelveHoursAgo);
68-
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent));
63+
ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo);
64+
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
6965
Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
7066
}
7167

0 commit comments

Comments
 (0)