Skip to content

Add persistence function for discovered schema #10326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 17, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.28.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down Expand Up @@ -112,7 +112,7 @@ void testIsLegalUpgradePredicate() {

@Test
void testPostLoadExecutionExecutes() throws Exception {
var testTriggered = new AtomicBoolean();
final var testTriggered = new AtomicBoolean();

val container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("public")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public enum ConfigSchema implements AirbyteConfig {

STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),

ACTOR_CATALOG("ActorCatalog.yaml", ActorCatalog.class),
ACTOR_CATALOG_FETCH_EVENT("ActorCatalogFetchEvent.yaml", ActorCatalogFetchEvent.class),

// worker
STANDARD_SYNC_INPUT("StandardSyncInput.yaml", StandardSyncInput.class),
NORMALIZATION_INPUT("NormalizationInput.yaml", NormalizationInput.class),
Expand Down
20 changes: 20 additions & 0 deletions airbyte-config/models/src/main/resources/types/ActorCatalog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: ActorCatalog
description: Catalog of an actor.
type: object
additionalProperties: false
required:
- id
- catalog
- catalogHash
properties:
id:
type: string
format: uuid
catalog:
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalogHash:
type: string
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: ActorCatalogFetchEvent
description: Link actor to their actual catalog
type: object
additionalProperties: false
required:
- id
- actorCatalogId
- actorId
- configHash
- connectorVersion
properties:
id:
type: string
format: uuid
actorId:
type: string
format: uuid
actorCatalogId:
type: string
format: uuid
configHash:
type: string
connectorVersion:
type: string
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
package io.airbyte.config.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
Expand All @@ -25,6 +30,7 @@
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -551,6 +557,95 @@ public void updateConnectionState(final UUID connectionId, final State state) th
}
}

public Optional<ActorCatalog> getSourceCatalog(final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
for (final ActorCatalogFetchEvent event : listActorCatalogFetchEvents()) {
if (event.getConnectorVersion().equals(connectorVersion)
&& event.getConfigHash().equals(configurationHash)
&& event.getActorId().equals(sourceId)) {
return getCatalogById(event.getActorCatalogId());
}
}
return Optional.empty();
}

public List<ActorCatalogFetchEvent> listActorCatalogFetchEvents()
throws JsonValidationException, IOException {
final List<ActorCatalogFetchEvent> actorCatalogFetchEvents = new ArrayList<>();

for (final ActorCatalogFetchEvent event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
ActorCatalogFetchEvent.class)) {
actorCatalogFetchEvents.add(event);
}
return actorCatalogFetchEvents;
}

public Optional<ActorCatalog> getCatalogById(final UUID catalogId)
throws IOException {
try {
return Optional.of(persistence.getConfig(ConfigSchema.ACTOR_CATALOG, catalogId.toString(),
ActorCatalog.class));
} catch (final ConfigNotFoundException e) {
return Optional.empty();
} catch (final JsonValidationException e) {
throw new IllegalStateException(e);
}
}

public Optional<ActorCatalog> findExistingCatalog(final ActorCatalog actorCatalog)
throws JsonValidationException, IOException {
for (final ActorCatalog fetchedCatalog : listActorCatalogs()) {
if (actorCatalog.getCatalogHash().equals(fetchedCatalog.getCatalogHash())) {
return Optional.of(fetchedCatalog);
}
}
return Optional.empty();
}

public List<ActorCatalog> listActorCatalogs()
throws JsonValidationException, IOException {
final List<ActorCatalog> actorCatalogs = new ArrayList<>();

for (final ActorCatalog event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG,
ActorCatalog.class)) {
actorCatalogs.add(event);
}
return actorCatalogs;
}

public void writeCatalog(final AirbyteCatalog catalog,
final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes(
Charsets.UTF_8)).toString();
ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(catalog))
.withId(UUID.randomUUID())
.withCatalogHash(catalogHash);
final Optional<ActorCatalog> existingCatalog = findExistingCatalog(actorCatalog);
if (existingCatalog.isPresent()) {
actorCatalog = existingCatalog.get();
} else {
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG,
actorCatalog.getId().toString(),
actorCatalog);
}
final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent()
.withActorCatalogId(actorCatalog.getId())
.withId(UUID.randomUUID())
.withConfigHash(configurationHash)
.withConnectorVersion(connectorVersion)
.withActorId(sourceId);
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
actorCatalogFetchEvent.getId().toString(),
actorCatalogFetchEvent);
}

/**
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
* string/jsonnode into the AirbyteConfig, Stream&lt;Object&lt;AirbyteConfig.getClassName()&gt;&gt;
Expand Down
Loading