Skip to content

Commit 7863845

Browse files
committed
Add persistence function for discovered schema
1 parent 3d8a0dc commit 7863845

File tree

9 files changed

+354
-2
lines changed

9 files changed

+354
-2
lines changed

airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
8080
mockedConfigs.getConfigDatabaseUrl())
8181
.getAndInitialize();
8282
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
83-
assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion());
83+
assertEquals("0.35.28.001", configsMigrator.getLatestMigration().getVersion().getVersion());
8484

8585
val jobsPersistence = new DefaultJobPersistence(jobDatabase);
8686
assertEquals(version, jobsPersistence.getVersion().get());
@@ -112,7 +112,7 @@ void testIsLegalUpgradePredicate() {
112112

113113
@Test
114114
void testPostLoadExecutionExecutes() throws Exception {
115-
var testTriggered = new AtomicBoolean();
115+
final var testTriggered = new AtomicBoolean();
116116

117117
val container = new PostgreSQLContainer<>("postgres:13-alpine")
118118
.withDatabaseName("public")

airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public enum ConfigSchema implements AirbyteConfig {
6060

6161
STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),
6262

63+
ACTOR_CATALOG("ActorCatalog.yaml", ActorCatalog.class),
64+
ACTOR_CATALOG_FETCH_EVENT("ActorCatalogFetchEvent.yaml", ActorCatalogFetchEvent.class),
65+
6366
// worker
6467
STANDARD_SYNC_INPUT("StandardSyncInput.yaml", StandardSyncInput.class),
6568
NORMALIZATION_INPUT("NormalizationInput.yaml", NormalizationInput.class),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
---
2+
"$schema": http://json-schema.org/draft-07/schema#
3+
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
4+
title: ActorCatalog
5+
description: Catalog of an actor.
6+
type: object
7+
additionalProperties: false
8+
required:
9+
- id
10+
- catalog
11+
- catalogHash
12+
properties:
13+
id:
14+
type: string
15+
format: uuid
16+
catalog:
17+
type: object
18+
existingJavaType: com.fasterxml.jackson.databind.JsonNode
19+
catalogHash:
20+
type: string
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
---
2+
"$schema": http://json-schema.org/draft-07/schema#
3+
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
4+
title: ActorCatalogFetchEvent
5+
description: Link actor to their actual catalog
6+
type: object
7+
additionalProperties: false
8+
required:
9+
- id
10+
- actorCatalogId
11+
- actorId
12+
- configHash
13+
- connectorVersion
14+
properties:
15+
id:
16+
type: string
17+
format: uuid
18+
actorId:
19+
type: string
20+
format: uuid
21+
actorCatalogId:
22+
type: string
23+
format: uuid
24+
configHash:
25+
type: string
26+
connectorVersion:
27+
type: string

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

+69
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
package io.airbyte.config.persistence;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.google.common.base.Charsets;
9+
import com.google.common.hash.HashFunction;
10+
import com.google.common.hash.Hashing;
811
import io.airbyte.commons.json.Jsons;
912
import io.airbyte.commons.lang.Exceptions;
1013
import io.airbyte.commons.lang.MoreBooleans;
14+
import io.airbyte.config.ActorCatalog;
15+
import io.airbyte.config.ActorCatalogFetchEvent;
1116
import io.airbyte.config.AirbyteConfig;
1217
import io.airbyte.config.ConfigSchema;
1318
import io.airbyte.config.DestinationConnection;
@@ -25,6 +30,7 @@
2530
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
2631
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
2732
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
33+
import io.airbyte.protocol.models.AirbyteCatalog;
2834
import io.airbyte.protocol.models.ConnectorSpecification;
2935
import io.airbyte.validation.json.JsonSchemaValidator;
3036
import io.airbyte.validation.json.JsonValidationException;
@@ -551,6 +557,69 @@ public void updateConnectionState(final UUID connectionId, final State state) th
551557
}
552558
}
553559

560+
public Optional<ActorCatalog> getSourceCatalog(final UUID sourceId,
561+
final String configurationHash,
562+
final String connectorVersion)
563+
throws JsonValidationException, IOException {
564+
for (final ActorCatalogFetchEvent event : listActorCatalogFetchEvents()) {
565+
if (event.getConnectorVersion().equals(connectorVersion)
566+
&& event.getConfigHash().equals(configurationHash)
567+
&& event.getActorId().equals(sourceId)) {
568+
return getCatalogById(event.getActorCatalogId());
569+
}
570+
}
571+
return Optional.empty();
572+
}
573+
574+
public List<ActorCatalogFetchEvent> listActorCatalogFetchEvents()
575+
throws JsonValidationException, IOException {
576+
final List<ActorCatalogFetchEvent> actorCatalogFetchEvents = new ArrayList<>();
577+
578+
for (final ActorCatalogFetchEvent event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
579+
ActorCatalogFetchEvent.class)) {
580+
actorCatalogFetchEvents.add(event);
581+
}
582+
return actorCatalogFetchEvents;
583+
}
584+
585+
public Optional<ActorCatalog> getCatalogById(final UUID catalogId)
586+
throws IOException {
587+
try {
588+
return Optional.of(persistence.getConfig(ConfigSchema.ACTOR_CATALOG, catalogId.toString(),
589+
ActorCatalog.class));
590+
} catch (final ConfigNotFoundException e) {
591+
return Optional.empty();
592+
} catch (final JsonValidationException e) {
593+
throw new IllegalStateException(e);
594+
}
595+
}
596+
597+
public void writeCatalog(final AirbyteCatalog catalog,
598+
final UUID sourceId,
599+
final String configurationHash,
600+
final String connectorVersion)
601+
throws JsonValidationException, IOException {
602+
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
603+
final String configHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes(
604+
Charsets.UTF_8)).toString();
605+
final ActorCatalog actorCatalog = new ActorCatalog()
606+
.withCatalog(Jsons.jsonNode(catalog))
607+
.withId(UUID.randomUUID())
608+
.withCatalogHash(configHash);
609+
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG,
610+
actorCatalog.getId().toString(),
611+
actorCatalog);
612+
final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent()
613+
.withActorCatalogId(actorCatalog.getId())
614+
.withId(UUID.randomUUID())
615+
.withConfigHash(configurationHash)
616+
.withConnectorVersion(connectorVersion)
617+
.withActorId(sourceId);
618+
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
619+
actorCatalogFetchEvent.getId().toString(),
620+
actorCatalogFetchEvent);
621+
}
622+
554623
/**
555624
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
556625
* string/jsonnode into the AirbyteConfig, Stream&lt;Object&lt;AirbyteConfig.getClassName()&gt;&gt;

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

+156
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package io.airbyte.config.persistence;
66

77
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
8+
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG;
9+
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT;
810
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;
911
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER;
1012
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
@@ -23,6 +25,8 @@
2325
import io.airbyte.commons.json.Jsons;
2426
import io.airbyte.commons.util.MoreIterators;
2527
import io.airbyte.commons.version.AirbyteVersion;
28+
import io.airbyte.config.ActorCatalog;
29+
import io.airbyte.config.ActorCatalogFetchEvent;
2630
import io.airbyte.config.AirbyteConfig;
2731
import io.airbyte.config.ConfigSchema;
2832
import io.airbyte.config.ConfigWithMetadata;
@@ -114,6 +118,10 @@ public <T> T getConfig(final AirbyteConfig configType, final String configId, fi
114118
return (T) getStandardSync(configId);
115119
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
116120
return (T) getStandardSyncState(configId);
121+
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
122+
return (T) getActorCatalog(configId);
123+
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
124+
return (T) getActorCatalogFetchEvent(configId);
117125
} else {
118126
throw new IllegalArgumentException("Unknown Config Type " + configType);
119127
}
@@ -181,6 +189,18 @@ private StandardSyncState getStandardSyncState(final String configId) throws IOE
181189
return result.get(0).getConfig();
182190
}
183191

192+
private ActorCatalog getActorCatalog(final String configId) throws IOException, ConfigNotFoundException {
193+
final List<ConfigWithMetadata<ActorCatalog>> result = listActorCatalogWithMetadata(Optional.of(UUID.fromString(configId)));
194+
validate(configId, result, ConfigSchema.ACTOR_CATALOG);
195+
return result.get(0).getConfig();
196+
}
197+
198+
private ActorCatalogFetchEvent getActorCatalogFetchEvent(final String configId) throws IOException, ConfigNotFoundException {
199+
final List<ConfigWithMetadata<ActorCatalogFetchEvent>> result = listActorCatalogFetchEventWithMetadata(Optional.of(UUID.fromString(configId)));
200+
validate(configId, result, ConfigSchema.ACTOR_CATALOG_FETCH_EVENT);
201+
return result.get(0).getConfig();
202+
}
203+
184204
private List<UUID> connectionOperationIds(final UUID connectionId) throws IOException {
185205
final Result<Record> result = database.query(ctx -> ctx.select(asterisk())
186206
.from(CONNECTION_OPERATION)
@@ -243,6 +263,10 @@ public <T> ConfigWithMetadata<T> getConfigWithMetadata(final AirbyteConfig confi
243263
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardSyncWithMetadata(configIdOpt), configType);
244264
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
245265
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardSyncStateWithMetadata(configIdOpt), configType);
266+
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
267+
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogWithMetadata(configIdOpt), configType);
268+
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
269+
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogFetchEventWithMetadata(configIdOpt), configType);
246270
} else {
247271
throw new IllegalArgumentException("Unknown Config Type " + configType);
248272
}
@@ -271,6 +295,10 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf
271295
listStandardSyncWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
272296
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
273297
listStandardSyncStateWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
298+
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
299+
listActorCatalogWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
300+
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
301+
listActorCatalogFetchEventWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
274302
} else {
275303
throw new IllegalArgumentException("Unknown Config Type " + configType);
276304
}
@@ -672,6 +700,72 @@ private StandardSyncState buildStandardSyncState(final Record record) {
672700
.withState(Jsons.deserialize(record.get(STATE.STATE_).data(), State.class));
673701
}
674702

703+
private List<ConfigWithMetadata<ActorCatalog>> listActorCatalogWithMetadata() throws IOException {
704+
return listActorCatalogWithMetadata(Optional.empty());
705+
}
706+
707+
private List<ConfigWithMetadata<ActorCatalog>> listActorCatalogWithMetadata(final Optional<UUID> configId) throws IOException {
708+
final Result<Record> result = database.query(ctx -> {
709+
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(ACTOR_CATALOG);
710+
if (configId.isPresent()) {
711+
return query.where(ACTOR_CATALOG.ID.eq(configId.get())).fetch();
712+
}
713+
return query.fetch();
714+
});
715+
final List<ConfigWithMetadata<ActorCatalog>> actorCatalogs = new ArrayList<>();
716+
for (final Record record : result) {
717+
final ActorCatalog actorCatalog = buildActorCatalog(record);
718+
actorCatalogs.add(new ConfigWithMetadata<>(
719+
record.get(ACTOR_CATALOG.ID).toString(),
720+
ConfigSchema.ACTOR_CATALOG.name(),
721+
record.get(ACTOR_CATALOG.CREATED_AT).toInstant(),
722+
record.get(ACTOR_CATALOG.MODIFIED_AT).toInstant(),
723+
actorCatalog));
724+
}
725+
return actorCatalogs;
726+
}
727+
728+
private ActorCatalog buildActorCatalog(final Record record) {
729+
return new ActorCatalog()
730+
.withId(record.get(ACTOR_CATALOG.ID))
731+
.withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString()))
732+
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
733+
}
734+
735+
private List<ConfigWithMetadata<ActorCatalogFetchEvent>> listActorCatalogFetchEventWithMetadata() throws IOException {
736+
return listActorCatalogFetchEventWithMetadata(Optional.empty());
737+
}
738+
739+
private List<ConfigWithMetadata<ActorCatalogFetchEvent>> listActorCatalogFetchEventWithMetadata(final Optional<UUID> configId) throws IOException {
740+
final Result<Record> result = database.query(ctx -> {
741+
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(ACTOR_CATALOG_FETCH_EVENT);
742+
if (configId.isPresent()) {
743+
return query.where(ACTOR_CATALOG_FETCH_EVENT.ID.eq(configId.get())).fetch();
744+
}
745+
return query.fetch();
746+
});
747+
final List<ConfigWithMetadata<ActorCatalogFetchEvent>> actorCatalogFetchEvents = new ArrayList<>();
748+
for (final Record record : result) {
749+
final ActorCatalogFetchEvent actorCatalogFetchEvent = buildActorCatalogFetchEvent(record);
750+
actorCatalogFetchEvents.add(new ConfigWithMetadata<>(
751+
record.get(ACTOR_CATALOG_FETCH_EVENT.ID).toString(),
752+
ConfigSchema.ACTOR_CATALOG_FETCH_EVENT.name(),
753+
record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT).toInstant(),
754+
record.get(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT).toInstant(),
755+
actorCatalogFetchEvent));
756+
}
757+
return actorCatalogFetchEvents;
758+
}
759+
760+
private ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
761+
return new ActorCatalogFetchEvent()
762+
.withId(record.get(ACTOR_CATALOG_FETCH_EVENT.ID))
763+
.withActorCatalogId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID))
764+
.withConfigHash(record.get(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH))
765+
.withConnectorVersion(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION))
766+
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID));
767+
}
768+
675769
@Override
676770
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException {
677771
if (configType == ConfigSchema.STANDARD_WORKSPACE) {
@@ -694,6 +788,10 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
694788
writeStandardSync(Collections.singletonList((StandardSync) config));
695789
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
696790
writeStandardSyncState(Collections.singletonList((StandardSyncState) config));
791+
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
792+
writeActorCatalog(Collections.singletonList((ActorCatalog) config));
793+
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
794+
writeActorCatalogFetchEvent(Collections.singletonList((ActorCatalogFetchEvent) config));
697795
} else {
698796
throw new IllegalArgumentException("Unknown Config Type " + configType);
699797
}
@@ -1200,6 +1298,60 @@ private void writeStandardSyncState(final List<StandardSyncState> configs, final
12001298
});
12011299
}
12021300

1301+
private void writeActorCatalog(final List<ActorCatalog> configs) throws IOException {
1302+
database.transaction(ctx -> {
1303+
writeActorCatalog(configs, ctx);
1304+
return null;
1305+
});
1306+
}
1307+
1308+
private void writeActorCatalog(final List<ActorCatalog> configs, final DSLContext ctx) {
1309+
final OffsetDateTime timestamp = OffsetDateTime.now();
1310+
configs.forEach((actorCatalog) -> {
1311+
final boolean isExistingConfig = ctx.fetchExists(select()
1312+
.from(ACTOR_CATALOG)
1313+
.where(ACTOR_CATALOG.ID.eq(actorCatalog.getId())));
1314+
1315+
if (isExistingConfig) {} else {
1316+
ctx.insertInto(ACTOR_CATALOG)
1317+
.set(ACTOR_CATALOG.ID, actorCatalog.getId())
1318+
.set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(actorCatalog.getCatalog())))
1319+
.set(ACTOR_CATALOG.CATALOG_HASH, actorCatalog.getCatalogHash())
1320+
.set(ACTOR_CATALOG.CREATED_AT, timestamp)
1321+
.set(ACTOR_CATALOG.MODIFIED_AT, timestamp)
1322+
.execute();
1323+
}
1324+
});
1325+
}
1326+
1327+
private void writeActorCatalogFetchEvent(final List<ActorCatalogFetchEvent> configs) throws IOException {
1328+
database.transaction(ctx -> {
1329+
writeActorCatalogFetchEvent(configs, ctx);
1330+
return null;
1331+
});
1332+
}
1333+
1334+
private void writeActorCatalogFetchEvent(final List<ActorCatalogFetchEvent> configs, final DSLContext ctx) {
1335+
final OffsetDateTime timestamp = OffsetDateTime.now();
1336+
configs.forEach((actorCatalogFetchEvent) -> {
1337+
final boolean isExistingConfig = ctx.fetchExists(select()
1338+
.from(ACTOR_CATALOG_FETCH_EVENT)
1339+
.where(ACTOR_CATALOG_FETCH_EVENT.ID.eq(actorCatalogFetchEvent.getId())));
1340+
1341+
if (isExistingConfig) {} else {
1342+
ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
1343+
.set(ACTOR_CATALOG_FETCH_EVENT.ID, actorCatalogFetchEvent.getId())
1344+
.set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, actorCatalogFetchEvent.getConfigHash())
1345+
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, actorCatalogFetchEvent.getActorCatalogId())
1346+
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorCatalogFetchEvent.getActorId())
1347+
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, actorCatalogFetchEvent.getConnectorVersion())
1348+
.set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp)
1349+
.set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp)
1350+
.execute();
1351+
}
1352+
});
1353+
}
1354+
12031355
@Override
12041356
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) throws IOException, JsonValidationException {
12051357
if (configType == ConfigSchema.STANDARD_WORKSPACE) {
@@ -1222,6 +1374,10 @@ public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T
12221374
writeStandardSync(configs.values().stream().map(c -> (StandardSync) c).collect(Collectors.toList()));
12231375
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
12241376
writeStandardSyncState(configs.values().stream().map(c -> (StandardSyncState) c).collect(Collectors.toList()));
1377+
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
1378+
writeActorCatalog(configs.values().stream().map(c -> (ActorCatalog) c).collect(Collectors.toList()));
1379+
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
1380+
writeActorCatalogFetchEvent(configs.values().stream().map(c -> (ActorCatalogFetchEvent) c).collect(Collectors.toList()));
12251381
} else {
12261382
throw new IllegalArgumentException("Unknown Config Type " + configType);
12271383
}

0 commit comments

Comments
 (0)