Skip to content

Migrations for scoped connectors #11305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.56.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.59.003", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ private YamlSeedConfigPersistence(final Class<?> seedResourceClass) throws IOExc
final Map<String, JsonNode> sourceSpecConfigs = getConfigs(seedResourceClass, SeedType.SOURCE_SPEC);
final Map<String, JsonNode> fullSourceDefinitionConfigs = sourceDefinitionConfigs.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> {
final JsonNode withTombstone = addMissingTombstoneField(e.getValue());
final JsonNode output = mergeSpecIntoDefinition(withTombstone, sourceSpecConfigs);
final JsonNode withMissingFields =
addMissingCustomField(
addMissingPublicField(
addMissingTombstoneField(e.getValue())));
final JsonNode output = mergeSpecIntoDefinition(withMissingFields, sourceSpecConfigs);
AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_SOURCE_DEFINITION, output);
return output;
}));
Expand All @@ -67,8 +70,11 @@ private YamlSeedConfigPersistence(final Class<?> seedResourceClass) throws IOExc
final Map<String, JsonNode> destinationSpecConfigs = getConfigs(seedResourceClass, SeedType.DESTINATION_SPEC);
final Map<String, JsonNode> fullDestinationDefinitionConfigs = destinationDefinitionConfigs.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> {
final JsonNode withTombstone = addMissingTombstoneField(e.getValue());
final JsonNode output = mergeSpecIntoDefinition(withTombstone, destinationSpecConfigs);
final JsonNode withMissingFields =
addMissingCustomField(
addMissingPublicField(
addMissingTombstoneField(e.getValue())));
final JsonNode output = mergeSpecIntoDefinition(withMissingFields, destinationSpecConfigs);
AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_DESTINATION_DEFINITION, output);
return output;
}));
Expand Down Expand Up @@ -106,6 +112,24 @@ private JsonNode addMissingTombstoneField(final JsonNode definitionJson) {
return definitionJson;
}

private JsonNode addMissingPublicField(final JsonNode definitionJson) {
final JsonNode currPublic = definitionJson.get("public");
if (currPublic == null || currPublic.isNull()) {
// definitions loaded from seed yamls are by definition public
((ObjectNode) definitionJson).set("public", BooleanNode.TRUE);
}
return definitionJson;
}

private JsonNode addMissingCustomField(final JsonNode definitionJson) {
final JsonNode currCustom = definitionJson.get("custom");
if (currCustom == null || currCustom.isNull()) {
// definitions loaded from seed yamls are by definition not custom
((ObjectNode) definitionJson).set("custom", BooleanNode.FALSE);
}
return definitionJson;
}

@SuppressWarnings("UnstableApiUsage")

private static Map<String, JsonNode> getConfigs(final Class<?> seedDefinitionsResourceClass, final SeedType seedType) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testGetConfig() throws Exception {
assertEquals("https://docs.airbyte.io/integrations/sources/mysql", mysqlSource.getDocumentationUrl());
assertEquals("mysql.svg", mysqlSource.getIcon());
assertEquals(URI.create("https://docs.airbyte.io/integrations/sources/mysql"), mysqlSource.getSpec().getDocumentationUrl());
assertEquals(true, mysqlSource.getPublic());

// destination
final String s3DestinationId = "4816b78f-1489-44c1-9060-4b19d5fa9362";
Expand All @@ -54,6 +55,7 @@ public void testGetConfig() throws Exception {
assertEquals("airbyte/destination-s3", s3Destination.getDockerRepository());
assertEquals("https://docs.airbyte.io/integrations/destinations/s3", s3Destination.getDocumentationUrl());
assertEquals(URI.create("https://docs.airbyte.io/integrations/destinations/s3"), s3Destination.getSpec().getDocumentationUrl());
assertEquals(true, s3Destination.getPublic());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ properties:
if not set or false, the configuration is active. if true, then this
configuration is permanently off.
type: boolean
public:
description: true if this connector definition is available to all workspaces
type: boolean
default: false
custom:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from @michel-tricot in https://github.com/airbytehq/airbyte/pull/11339/files#r832831845

What is the definition of custom vs public? I would be careful before adding that kind of specific flags on the data model. They risk proliferating.
Are they mutually exclusive? then it should be a enum and if we foresee that it is only these two states then just one boolean
Can it be true on both? false on both? what is the expected behavior in these cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my take on what the possible combinations mean:

Connector Definition

public custom Valid? Meaning
true true no  
true false yes Part of Airbyte’s catalog, available by default
false true yes User-uploaded custom connector
false false yes Part of Airbyte’s catalog, opt-in via workspace grant

Originally, the plan was just to introduce the public boolean on its own. But, we want to support an endpoint that returns all public: false connector definitions that are eligible for opt-in grants. The use case here is an 'early access' connector that is in the Airbyte Catalog, but not available by default.

We need to clearly differentiate these private, opt-in connectors from user-uploaded custom connectors so that an instance admin doesn't accidentally grant a custom connector to a different customer. The data model doesn't currently support this differentiation, so we decided that adding a 'custom' boolean in addition to the 'public' boolean made sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. It also seems possible that public: true && custom: true could map to something valid in the future (for example third-party supported and maintained connectors that are approved by Airbyte as available to everyone).

I do agree though that we should be wary of flag proliferation and the current states can be conveniently captured in a single enum as well.

fyi the original discussion about whether to use an enum for this is here

Copy link
Contributor

@pmossman pmossman Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgardens fyi in case you want to weigh in, since you were part of the original discussion about boolean vs enum tradeoffs.

I think the original resistance to an enum came from the fact that we'd be duplicating information that we can already glean from the source_type enum, which already has a 'custom' value.

However, this source_type enum wouldn't be appropriate for describing destination definitions, and existing use-cases of source_type: custom would IMO be better served by querying for a new custom: true boolean.

In general, I think conflating custom with public visibility at the database level by trying to define an enum that describes both feels a bit inflexible to me. I think there are independent use-cases of each, so defining two booleans feels like the cleanest approach for supporting current needs while remaining flexible to the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with @pmossman 's comment. The original source_type enum (originated for billing purposes) was misleading. e.g. we had: api, file, db, custom in a single enum. One of these things is not like the others. Custom is a different concept from other enums and concepts we've discussed and having it be clearly separate makes that clear. Private / public are also totally disjoint concepts from the custom idea. Parker's table lays it out well (and we should probably integrate it into how we document the project).

description: whether this is a custom connector definition
type: boolean
default: false
releaseStage:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ properties:
if not set or false, the configuration is active. if true, then this
configuration is permanently off.
type: boolean
public:
description: true if this connector definition is available to all workspaces
type: boolean
default: false
custom:
description: whether this is a custom connector definition
type: boolean
default: false
releaseStage:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record2;
import org.jooq.Result;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -778,4 +781,12 @@ public void loadDataNoSecrets(final ConfigPersistence seedPersistenceWithoutSecr
persistence.loadData(seedPersistenceWithoutSecrets);
}

private Condition includeTombstones(final Field<Boolean> tombstoneField, final boolean includeTombstones) {
if (includeTombstones) {
return DSL.trueCondition();
} else {
return tombstoneField.eq(false);
}
}

Comment on lines +793 to +800
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one doesn't fit in this PR thematically but I wanted to use this convenience method in multiple upcoming PRs

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
Expand Down Expand Up @@ -344,7 +343,7 @@ private List<ConfigWithMetadata<StandardSourceDefinition>> listStandardSourceDef
final List<ConfigWithMetadata<StandardSourceDefinition>> standardSourceDefinitions = new ArrayList<>();

for (final Record record : result) {
final StandardSourceDefinition standardSourceDefinition = buildStandardSourceDefinition(record);
final StandardSourceDefinition standardSourceDefinition = DbConverter.buildStandardSourceDefinition(record);
standardSourceDefinitions.add(new ConfigWithMetadata<>(
record.get(ACTOR_DEFINITION.ID).toString(),
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
Expand All @@ -355,27 +354,6 @@ private List<ConfigWithMetadata<StandardSourceDefinition>> listStandardSourceDef
return standardSourceDefinitions;
}

private StandardSourceDefinition buildStandardSourceDefinition(final Record record) {
return new StandardSourceDefinition()
.withSourceDefinitionId(record.get(ACTOR_DEFINITION.ID))
.withDockerImageTag(record.get(ACTOR_DEFINITION.DOCKER_IMAGE_TAG))
.withIcon(record.get(ACTOR_DEFINITION.ICON))
.withDockerRepository(record.get(ACTOR_DEFINITION.DOCKER_REPOSITORY))
.withDocumentationUrl(record.get(ACTOR_DEFINITION.DOCUMENTATION_URL))
.withName(record.get(ACTOR_DEFINITION.NAME))
.withSourceType(record.get(ACTOR_DEFINITION.SOURCE_TYPE) == null ? null
: Enums.toEnum(record.get(ACTOR_DEFINITION.SOURCE_TYPE, String.class), SourceType.class).orElseThrow())
.withSpec(Jsons.deserialize(record.get(ACTOR_DEFINITION.SPEC).data(), ConnectorSpecification.class))
.withTombstone(record.get(ACTOR_DEFINITION.TOMBSTONE))
.withReleaseStage(record.get(ACTOR_DEFINITION.RELEASE_STAGE) == null ? null
: Enums.toEnum(record.get(ACTOR_DEFINITION.RELEASE_STAGE, String.class), StandardSourceDefinition.ReleaseStage.class).orElseThrow())
.withReleaseDate(record.get(ACTOR_DEFINITION.RELEASE_DATE) == null ? null
: record.get(ACTOR_DEFINITION.RELEASE_DATE).toString())
.withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class));
}

private List<ConfigWithMetadata<StandardDestinationDefinition>> listStandardDestinationDefinitionWithMetadata() throws IOException {
return listStandardDestinationDefinitionWithMetadata(Optional.empty());
}
Expand All @@ -393,7 +371,7 @@ private List<ConfigWithMetadata<StandardDestinationDefinition>> listStandardDest
final List<ConfigWithMetadata<StandardDestinationDefinition>> standardDestinationDefinitions = new ArrayList<>();

for (final Record record : result) {
final StandardDestinationDefinition standardDestinationDefinition = buildStandardDestinationDefinition(record);
final StandardDestinationDefinition standardDestinationDefinition = DbConverter.buildStandardDestinationDefinition(record);
standardDestinationDefinitions.add(new ConfigWithMetadata<>(
record.get(ACTOR_DEFINITION.ID).toString(),
ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(),
Expand All @@ -404,25 +382,6 @@ private List<ConfigWithMetadata<StandardDestinationDefinition>> listStandardDest
return standardDestinationDefinitions;
}

private StandardDestinationDefinition buildStandardDestinationDefinition(final Record record) {
return new StandardDestinationDefinition()
.withDestinationDefinitionId(record.get(ACTOR_DEFINITION.ID))
.withDockerImageTag(record.get(ACTOR_DEFINITION.DOCKER_IMAGE_TAG))
.withIcon(record.get(ACTOR_DEFINITION.ICON))
.withDockerRepository(record.get(ACTOR_DEFINITION.DOCKER_REPOSITORY))
.withDocumentationUrl(record.get(ACTOR_DEFINITION.DOCUMENTATION_URL))
.withName(record.get(ACTOR_DEFINITION.NAME))
.withSpec(Jsons.deserialize(record.get(ACTOR_DEFINITION.SPEC).data(), ConnectorSpecification.class))
.withTombstone(record.get(ACTOR_DEFINITION.TOMBSTONE))
.withReleaseStage(record.get(ACTOR_DEFINITION.RELEASE_STAGE) == null ? null
: Enums.toEnum(record.get(ACTOR_DEFINITION.RELEASE_STAGE, String.class), StandardDestinationDefinition.ReleaseStage.class).orElseThrow())
.withReleaseDate(record.get(ACTOR_DEFINITION.RELEASE_DATE) == null ? null
: record.get(ACTOR_DEFINITION.RELEASE_DATE).toString())
.withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class));
}

private List<ConfigWithMetadata<SourceConnection>> listSourceConnectionWithMetadata() throws IOException {
return listSourceConnectionWithMetadata(Optional.empty());
}
Expand Down Expand Up @@ -840,6 +799,8 @@ private void writeStandardSourceDefinition(final List<StandardSourceDefinition>
io.airbyte.db.instance.configs.jooq.enums.SourceType.class).orElseThrow())
.set(ACTOR_DEFINITION.SPEC, JSONB.valueOf(Jsons.serialize(standardSourceDefinition.getSpec())))
.set(ACTOR_DEFINITION.TOMBSTONE, standardSourceDefinition.getTombstone())
.set(ACTOR_DEFINITION.PUBLIC, standardSourceDefinition.getPublic())
.set(ACTOR_DEFINITION.CUSTOM, standardSourceDefinition.getCustom())
.set(ACTOR_DEFINITION.RELEASE_STAGE, standardSourceDefinition.getReleaseStage() == null ? null
: Enums.toEnum(standardSourceDefinition.getReleaseStage().value(),
io.airbyte.db.instance.configs.jooq.enums.ReleaseStage.class).orElseThrow())
Expand Down Expand Up @@ -867,6 +828,8 @@ private void writeStandardSourceDefinition(final List<StandardSourceDefinition>
io.airbyte.db.instance.configs.jooq.enums.SourceType.class).orElseThrow())
.set(ACTOR_DEFINITION.SPEC, JSONB.valueOf(Jsons.serialize(standardSourceDefinition.getSpec())))
.set(ACTOR_DEFINITION.TOMBSTONE, standardSourceDefinition.getTombstone() != null && standardSourceDefinition.getTombstone())
.set(ACTOR_DEFINITION.PUBLIC, standardSourceDefinition.getPublic())
.set(ACTOR_DEFINITION.CUSTOM, standardSourceDefinition.getCustom())
.set(ACTOR_DEFINITION.RELEASE_STAGE,
standardSourceDefinition.getReleaseStage() == null ? null
: Enums.toEnum(standardSourceDefinition.getReleaseStage().value(),
Expand Down Expand Up @@ -908,6 +871,8 @@ private void writeStandardDestinationDefinition(final List<StandardDestinationDe
.set(ACTOR_DEFINITION.ACTOR_TYPE, ActorType.destination)
.set(ACTOR_DEFINITION.SPEC, JSONB.valueOf(Jsons.serialize(standardDestinationDefinition.getSpec())))
.set(ACTOR_DEFINITION.TOMBSTONE, standardDestinationDefinition.getTombstone())
.set(ACTOR_DEFINITION.PUBLIC, standardDestinationDefinition.getPublic())
.set(ACTOR_DEFINITION.CUSTOM, standardDestinationDefinition.getCustom())
.set(ACTOR_DEFINITION.RELEASE_STAGE, standardDestinationDefinition.getReleaseStage() == null ? null
: Enums.toEnum(standardDestinationDefinition.getReleaseStage().value(),
io.airbyte.db.instance.configs.jooq.enums.ReleaseStage.class).orElseThrow())
Expand All @@ -931,6 +896,8 @@ private void writeStandardDestinationDefinition(final List<StandardDestinationDe
.set(ACTOR_DEFINITION.ACTOR_TYPE, ActorType.destination)
.set(ACTOR_DEFINITION.SPEC, JSONB.valueOf(Jsons.serialize(standardDestinationDefinition.getSpec())))
.set(ACTOR_DEFINITION.TOMBSTONE, standardDestinationDefinition.getTombstone() != null && standardDestinationDefinition.getTombstone())
.set(ACTOR_DEFINITION.PUBLIC, standardDestinationDefinition.getPublic())
.set(ACTOR_DEFINITION.CUSTOM, standardDestinationDefinition.getCustom())
.set(ACTOR_DEFINITION.RELEASE_STAGE,
standardDestinationDefinition.getReleaseStage() == null ? null
: Enums.toEnum(standardDestinationDefinition.getReleaseStage().value(),
Expand Down Expand Up @@ -1752,6 +1719,8 @@ Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(final DSLContext ctx)
.withDockerRepository(row.get(ACTOR_DEFINITION.DOCKER_REPOSITORY))
.withDocumentationUrl(row.get(ACTOR_DEFINITION.DOCUMENTATION_URL))
.withName(row.get(ACTOR_DEFINITION.NAME))
.withPublic(row.get(ACTOR_DEFINITION.PUBLIC))
.withCustom(row.get(ACTOR_DEFINITION.CUSTOM))
.withSourceType(row.get(ACTOR_DEFINITION.SOURCE_TYPE) == null ? null
: Enums.toEnum(row.get(ACTOR_DEFINITION.SOURCE_TYPE, String.class), SourceType.class).orElseThrow())
.withSpec(Jsons.deserialize(row.get(ACTOR_DEFINITION.SPEC).data(), ConnectorSpecification.class)));
Expand All @@ -1763,6 +1732,8 @@ Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(final DSLContext ctx)
.withDockerRepository(row.get(ACTOR_DEFINITION.DOCKER_REPOSITORY))
.withDocumentationUrl(row.get(ACTOR_DEFINITION.DOCUMENTATION_URL))
.withName(row.get(ACTOR_DEFINITION.NAME))
.withPublic(row.get(ACTOR_DEFINITION.PUBLIC))
.withCustom(row.get(ACTOR_DEFINITION.CUSTOM))
.withSpec(Jsons.deserialize(row.get(ACTOR_DEFINITION.SPEC).data(), ConnectorSpecification.class)));
} else {
throw new RuntimeException("Unknown Actor Type " + row.get(ACTOR_DEFINITION.ACTOR_TYPE));
Expand Down
Loading