-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Create new tables for catalog storage #10226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.instance.configs.migrations; | ||
|
||
import static org.jooq.impl.DSL.constraint; | ||
import static org.jooq.impl.DSL.foreignKey; | ||
import static org.jooq.impl.DSL.primaryKey; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import java.time.OffsetDateTime; | ||
import java.util.UUID; | ||
import org.flywaydb.core.api.migration.BaseJavaMigration; | ||
import org.flywaydb.core.api.migration.Context; | ||
import org.jooq.DSLContext; | ||
import org.jooq.Field; | ||
import org.jooq.JSONB; | ||
import org.jooq.impl.DSL; | ||
import org.jooq.impl.SQLDataType; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class V0_35_26_001__PersistDiscoveredCatalog extends BaseJavaMigration { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_26_001__PersistDiscoveredCatalog.class); | ||
|
||
@Override | ||
public void migrate(final Context context) throws Exception { | ||
LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); | ||
|
||
// Warning: please do not use any jOOQ generated code to write a migration. | ||
// As database schema changes, the generated jOOQ code can be deprecated. So | ||
// old migration may not compile if there is any generated code. | ||
final DSLContext ctx = DSL.using(context.getConnection()); | ||
migrate(ctx); | ||
} | ||
|
||
@VisibleForTesting | ||
public static void migrate(final DSLContext ctx) { | ||
createActorCatalog(ctx); | ||
createCatalogFetchEvent(ctx); | ||
addConnectionTableForeignKey(ctx); | ||
} | ||
|
||
private static void createActorCatalog(final DSLContext ctx) { | ||
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false)); | ||
final Field<JSONB> catalog = DSL.field("catalog", SQLDataType.JSONB.nullable(false)); | ||
final Field<String> catalogHash = DSL.field("catalog_hash", SQLDataType.VARCHAR(32).nullable(false)); | ||
final Field<OffsetDateTime> createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)); | ||
ctx.createTableIfNotExists("actor_catalog") | ||
.columns(id, | ||
catalog, | ||
catalogHash, | ||
createdAt) | ||
.constraints(primaryKey(id)) | ||
.execute(); | ||
LOGGER.info("actor_catalog table created"); | ||
ctx.createIndexIfNotExists("actor_catalog_catalog_hash_id_idx").on("actor_catalog", "catalog_hash").execute(); | ||
} | ||
|
||
private static void createCatalogFetchEvent(final DSLContext ctx) { | ||
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false)); | ||
final Field<UUID> actorCatalogId = DSL.field("actor_catalog_id", SQLDataType.UUID.nullable(false)); | ||
final Field<UUID> actorId = DSL.field("actor_id", SQLDataType.UUID.nullable(false)); | ||
final Field<String> configHash = DSL.field("config_hash", SQLDataType.VARCHAR(32).nullable(false)); | ||
final Field<String> actorVersion = DSL.field("actor_version", SQLDataType.VARCHAR(256).nullable(false)); | ||
|
||
ctx.createTableIfNotExists("actor_catalog_fetch_event") | ||
.columns(id, | ||
actorCatalogId, | ||
actorId, | ||
configHash, | ||
actorVersion) | ||
.constraints(primaryKey(id), | ||
foreignKey(actorCatalogId).references("actor_catalog", "id").onDeleteCascade(), | ||
foreignKey(actorId).references("actor", "id").onDeleteCascade()) | ||
.execute(); | ||
LOGGER.info("actor_catalog_fetch_event table created"); | ||
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_id_idx").on("actor_catalog_fetch_event", "actor_id").execute(); | ||
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_catalog_id_idx").on("actor_catalog_fetch_event", "actor_catalog_id").execute(); | ||
} | ||
|
||
private static void addConnectionTableForeignKey(final DSLContext ctx) { | ||
final Field<UUID> sourceCatalogId = DSL.field("source_catalog_id", SQLDataType.UUID.nullable(true)); | ||
ctx.alterTable("connection") | ||
.addIfNotExists(sourceCatalogId).execute(); | ||
ctx.alterTable("connection") | ||
.dropConstraintIfExists("connection_actor_catalog_id_fk"); | ||
ctx.alterTable("connection") | ||
.add(constraint("connection_actor_catalog_id_fk").foreignKey(sourceCatalogId) | ||
.references("actor_catalog", "id").onDeleteCascade()) | ||
.execute(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,23 @@ create table "public"."actor"( | |
constraint "actor_pkey" | ||
primary key ("id") | ||
); | ||
create table "public"."actor_catalog"( | ||
"id" uuid not null, | ||
"catalog" jsonb not null, | ||
"catalog_hash" varchar(32) not null, | ||
"created_at" timestamptz(35) not null, | ||
constraint "actor_catalog_pkey" | ||
primary key ("id") | ||
); | ||
create table "public"."actor_catalog_fetch_event"( | ||
"id" uuid not null, | ||
"actor_catalog_id" uuid not null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think it would make sense to add an index on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
"actor_id" uuid not null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would make sense to do an index on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
"config_hash" varchar(32) not null, | ||
"actor_version" varchar(256) not null, | ||
constraint "actor_catalog_fetch_event_pkey" | ||
primary key ("id") | ||
); | ||
create table "public"."actor_definition"( | ||
"id" uuid not null, | ||
"name" varchar(256) not null, | ||
|
@@ -73,6 +90,7 @@ create table "public"."connection"( | |
"resource_requirements" jsonb null, | ||
"created_at" timestamptz(35) not null default null, | ||
"updated_at" timestamptz(35) not null default null, | ||
"source_catalog_id" uuid null, | ||
constraint "connection_pkey" | ||
primary key ("id") | ||
); | ||
|
@@ -142,10 +160,22 @@ alter table "public"."actor" | |
add constraint "actor_workspace_id_fkey" | ||
foreign key ("workspace_id") | ||
references "public"."workspace" ("id"); | ||
alter table "public"."actor_catalog_fetch_event" | ||
add constraint "actor_catalog_fetch_event_actor_catalog_id_fkey" | ||
foreign key ("actor_catalog_id") | ||
references "public"."actor_catalog" ("id"); | ||
alter table "public"."actor_catalog_fetch_event" | ||
add constraint "actor_catalog_fetch_event_actor_id_fkey" | ||
foreign key ("actor_id") | ||
references "public"."actor" ("id"); | ||
alter table "public"."actor_oauth_parameter" | ||
add constraint "actor_oauth_parameter_actor_definition_id_fkey" | ||
foreign key ("actor_definition_id") | ||
references "public"."actor_definition" ("id"); | ||
alter table "public"."connection" | ||
add constraint "connection_actor_catalog_id_fk" | ||
foreign key ("source_catalog_id") | ||
references "public"."actor_catalog" ("id"); | ||
alter table "public"."connection" | ||
add constraint "connection_destination_id_fkey" | ||
foreign key ("destination_id") | ||
|
@@ -172,6 +202,11 @@ alter table "public"."state" | |
references "public"."connection" ("id"); | ||
create index "actor_actor_definition_id_idx" on "public"."actor"("actor_definition_id" asc); | ||
create unique index "actor_pkey" on "public"."actor"("id" asc); | ||
create index "actor_catalog_catalog_hash_id_idx" on "public"."actor_catalog"("catalog_hash" asc); | ||
create unique index "actor_catalog_pkey" on "public"."actor_catalog"("id" asc); | ||
create index "actor_catalog_fetch_event_actor_catalog_id_idx" on "public"."actor_catalog_fetch_event"("actor_catalog_id" asc); | ||
create index "actor_catalog_fetch_event_actor_id_idx" on "public"."actor_catalog_fetch_event"("actor_id" asc); | ||
create unique index "actor_catalog_fetch_event_pkey" on "public"."actor_catalog_fetch_event"("id" asc); | ||
create unique index "actor_definition_pkey" on "public"."actor_definition"("id" asc); | ||
create unique index "actor_oauth_parameter_pkey" on "public"."actor_oauth_parameter"("id" asc); | ||
create unique index "airbyte_configs_migrations_pk" on "public"."airbyte_configs_migrations"("installed_rank" asc); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.instance.configs.migrations; | ||
|
||
import io.airbyte.db.Database; | ||
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; | ||
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType; | ||
import java.io.IOException; | ||
import java.sql.SQLException; | ||
import java.time.OffsetDateTime; | ||
import java.util.UUID; | ||
import org.jooq.DSLContext; | ||
import org.jooq.JSONB; | ||
import org.jooq.impl.DSL; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class V0_35_26_001__PersistDiscoveredCatalogTest extends AbstractConfigsDatabaseTest { | ||
|
||
@Test | ||
public void test() throws SQLException, IOException { | ||
|
||
final Database database = getDatabase(); | ||
final DSLContext context = DSL.using(database.getDataSource().getConnection()); | ||
V0_32_8_001__AirbyteConfigDatabaseDenormalization.migrate(context); | ||
V0_35_26_001__PersistDiscoveredCatalog.migrate(context); | ||
assertCanInsertData(context); | ||
} | ||
|
||
private void assertCanInsertData(final DSLContext ctx) { | ||
Assertions.assertDoesNotThrow(() -> { | ||
final UUID catalogId = UUID.randomUUID(); | ||
final UUID actorId = UUID.randomUUID(); | ||
final UUID actorDefinitionId = UUID.randomUUID(); | ||
final UUID workspaceId = UUID.randomUUID(); | ||
|
||
ctx.insertInto(DSL.table("workspace")) | ||
.columns( | ||
DSL.field("id"), | ||
DSL.field("name"), | ||
DSL.field("slug"), | ||
DSL.field("initial_setup_complete")) | ||
.values( | ||
workspaceId, | ||
"default", | ||
"default", | ||
true) | ||
.execute(); | ||
ctx.insertInto(DSL.table("actor_definition")) | ||
.columns( | ||
DSL.field("id"), | ||
DSL.field("name"), | ||
DSL.field("docker_repository"), | ||
DSL.field("docker_image_tag"), | ||
DSL.field("actor_type"), | ||
DSL.field("spec")) | ||
.values( | ||
actorDefinitionId, | ||
"name", | ||
"repo", | ||
"1.0.0", | ||
ActorType.source, | ||
JSONB.valueOf("{}")) | ||
.execute(); | ||
ctx.insertInto(DSL.table("actor")) | ||
.columns( | ||
DSL.field("id"), | ||
DSL.field("workspace_id"), | ||
DSL.field("actor_definition_id"), | ||
DSL.field("name"), | ||
DSL.field("configuration"), | ||
DSL.field("actor_type"), | ||
DSL.field("created_at"), | ||
DSL.field("updated_at")) | ||
.values( | ||
actorId, | ||
workspaceId, | ||
actorDefinitionId, | ||
"some actor", | ||
JSONB.valueOf("{}"), | ||
ActorType.source, | ||
OffsetDateTime.now(), | ||
OffsetDateTime.now()) | ||
.execute(); | ||
ctx.insertInto(DSL.table("actor_catalog")) | ||
.columns( | ||
DSL.field("id"), | ||
DSL.field("catalog"), | ||
DSL.field("catalog_hash"), | ||
DSL.field("created_at")) | ||
.values( | ||
catalogId, | ||
JSONB.valueOf("{}"), | ||
"", | ||
OffsetDateTime.now()) | ||
.execute(); | ||
ctx.insertInto(DSL.table("actor_catalog_fetch_event")) | ||
.columns( | ||
DSL.field("id"), | ||
DSL.field("actor_catalog_id"), | ||
DSL.field("actor_id"), | ||
DSL.field("config_hash"), | ||
DSL.field("actor_version")) | ||
.values( | ||
UUID.randomUUID(), | ||
catalogId, | ||
actorId, | ||
"", | ||
"2.0.1") | ||
.execute(); | ||
}); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there some helper we should right for this? It seems like we have the same truncate query in a bunch of different tests and have to update it in a bunch places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely agree with that. I haven't seen any existing helper function for that. I'll add one in
BaseDatabaseConfigPersistenceTest
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have that ready in a separate commit.