Skip to content

Commit 295f985

Browse files
committed
Create new tables for catalog storage (airbytehq#9895)
closes airbytehq#9895
1 parent 3c5221d commit 295f985

File tree

8 files changed

+256
-5
lines changed

8 files changed

+256
-5
lines changed

airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
8080
mockedConfigs.getConfigDatabaseUrl())
8181
.getAndInitialize();
8282
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
83-
assertEquals("0.35.15.001", configsMigrator.getLatestMigration().getVersion().getVersion());
83+
assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion());
8484

8585
val jobsPersistence = new DefaultJobPersistence(jobDatabase);
8686
assertEquals(version, jobsPersistence.getVersion().get());

airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public void setup() throws Exception {
4343
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
4444
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
4545
database.query(ctx -> ctx
46-
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
46+
.execute(
47+
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
4748
}
4849

4950
@AfterEach

airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public static void setup() throws Exception {
5454
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
5555
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
5656
database.query(ctx -> ctx
57-
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
57+
.execute(
58+
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
5859
}
5960

6061
@AfterAll

airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public void setup() throws Exception {
5757
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
5858
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
5959
database.query(ctx -> ctx
60-
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
60+
.execute(
61+
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
6162
}
6263

6364
@AfterEach

airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public static void tearDown() throws Exception {
5454
@BeforeEach
5555
public void resetDatabase() throws SQLException {
5656
database.query(ctx -> ctx
57-
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
57+
.execute(
58+
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
5859
}
5960

6061
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.instance.configs.migrations;
6+
7+
import static org.jooq.impl.DSL.constraint;
8+
import static org.jooq.impl.DSL.foreignKey;
9+
import static org.jooq.impl.DSL.primaryKey;
10+
11+
import com.google.common.annotations.VisibleForTesting;
12+
import java.time.OffsetDateTime;
13+
import java.util.UUID;
14+
import org.flywaydb.core.api.migration.BaseJavaMigration;
15+
import org.flywaydb.core.api.migration.Context;
16+
import org.jooq.DSLContext;
17+
import org.jooq.Field;
18+
import org.jooq.JSONB;
19+
import org.jooq.impl.DSL;
20+
import org.jooq.impl.SQLDataType;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
public class V0_35_26_001__PersistDiscoveredCatalog extends BaseJavaMigration {
25+
26+
private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_26_001__PersistDiscoveredCatalog.class);
27+
28+
@Override
29+
public void migrate(final Context context) throws Exception {
30+
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
31+
32+
// Warning: please do not use any jOOQ generated code to write a migration.
33+
// As database schema changes, the generated jOOQ code can be deprecated. So
34+
// old migration may not compile if there is any generated code.
35+
final DSLContext ctx = DSL.using(context.getConnection());
36+
migrate(ctx);
37+
}
38+
39+
@VisibleForTesting
40+
public static void migrate(final DSLContext ctx) {
41+
createActorCatalog(ctx);
42+
createCatalogFetchEvent(ctx);
43+
addConnectionTableForeignKey(ctx);
44+
}
45+
46+
private static void createActorCatalog(final DSLContext ctx) {
47+
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
48+
final Field<JSONB> catalog = DSL.field("catalog", SQLDataType.JSONB.nullable(false));
49+
final Field<String> catalogHash = DSL.field("catalog_hash", SQLDataType.VARCHAR(32).nullable(false));
50+
final Field<OffsetDateTime> createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false));
51+
ctx.createTableIfNotExists("actor_catalog")
52+
.columns(id,
53+
catalog,
54+
catalogHash,
55+
createdAt)
56+
.constraints(primaryKey(id))
57+
.execute();
58+
LOGGER.info("actor_catalog table created");
59+
ctx.createIndexIfNotExists("actor_catalog_catalog_hash_id_idx").on("actor_catalog", "catalog_hash").execute();
60+
}
61+
62+
private static void createCatalogFetchEvent(final DSLContext ctx) {
63+
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
64+
final Field<UUID> actorCatalogId = DSL.field("actor_catalog_id", SQLDataType.UUID.nullable(false));
65+
final Field<UUID> actorId = DSL.field("actor_id", SQLDataType.UUID.nullable(false));
66+
final Field<String> configHash = DSL.field("config_hash", SQLDataType.VARCHAR(32).nullable(false));
67+
final Field<String> actorVersion = DSL.field("actor_version", SQLDataType.VARCHAR(256).nullable(false));
68+
69+
ctx.createTableIfNotExists("actor_catalog_fetch_event")
70+
.columns(id,
71+
actorCatalogId,
72+
actorId,
73+
configHash,
74+
actorVersion)
75+
.constraints(primaryKey(id),
76+
foreignKey(actorCatalogId).references("actor_catalog", "id").onDeleteCascade(),
77+
foreignKey(actorId).references("actor", "id").onDeleteCascade())
78+
.execute();
79+
LOGGER.info("actor_catalog_fetch_event table created");
80+
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_id_idx").on("actor_catalog_fetch_event", "actor_id").execute();
81+
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_catalog_id_idx").on("actor_catalog_fetch_event", "actor_catalog_id").execute();
82+
}
83+
84+
private static void addConnectionTableForeignKey(final DSLContext ctx) {
85+
final Field<UUID> sourceCatalogId = DSL.field("source_catalog_id", SQLDataType.UUID.nullable(true));
86+
ctx.alterTable("connection")
87+
.addIfNotExists(sourceCatalogId).execute();
88+
ctx.alterTable("connection")
89+
.dropConstraintIfExists("connection_actor_catalog_id_fk");
90+
ctx.alterTable("connection")
91+
.add(constraint("connection_actor_catalog_id_fk").foreignKey(sourceCatalogId)
92+
.references("actor_catalog", "id").onDeleteCascade())
93+
.execute();
94+
}
95+
96+
}

airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt

+35
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,23 @@ create table "public"."actor"(
1515
constraint "actor_pkey"
1616
primary key ("id")
1717
);
18+
create table "public"."actor_catalog"(
19+
"id" uuid not null,
20+
"catalog" jsonb not null,
21+
"catalog_hash" varchar(32) not null,
22+
"created_at" timestamptz(35) not null,
23+
constraint "actor_catalog_pkey"
24+
primary key ("id")
25+
);
26+
create table "public"."actor_catalog_fetch_event"(
27+
"id" uuid not null,
28+
"actor_catalog_id" uuid not null,
29+
"actor_id" uuid not null,
30+
"config_hash" varchar(32) not null,
31+
"actor_version" varchar(256) not null,
32+
constraint "actor_catalog_fetch_event_pkey"
33+
primary key ("id")
34+
);
1835
create table "public"."actor_definition"(
1936
"id" uuid not null,
2037
"name" varchar(256) not null,
@@ -73,6 +90,7 @@ create table "public"."connection"(
7390
"resource_requirements" jsonb null,
7491
"created_at" timestamptz(35) not null default null,
7592
"updated_at" timestamptz(35) not null default null,
93+
"source_catalog_id" uuid null,
7694
constraint "connection_pkey"
7795
primary key ("id")
7896
);
@@ -142,10 +160,22 @@ alter table "public"."actor"
142160
add constraint "actor_workspace_id_fkey"
143161
foreign key ("workspace_id")
144162
references "public"."workspace" ("id");
163+
alter table "public"."actor_catalog_fetch_event"
164+
add constraint "actor_catalog_fetch_event_actor_catalog_id_fkey"
165+
foreign key ("actor_catalog_id")
166+
references "public"."actor_catalog" ("id");
167+
alter table "public"."actor_catalog_fetch_event"
168+
add constraint "actor_catalog_fetch_event_actor_id_fkey"
169+
foreign key ("actor_id")
170+
references "public"."actor" ("id");
145171
alter table "public"."actor_oauth_parameter"
146172
add constraint "actor_oauth_parameter_actor_definition_id_fkey"
147173
foreign key ("actor_definition_id")
148174
references "public"."actor_definition" ("id");
175+
alter table "public"."connection"
176+
add constraint "connection_actor_catalog_id_fk"
177+
foreign key ("source_catalog_id")
178+
references "public"."actor_catalog" ("id");
149179
alter table "public"."connection"
150180
add constraint "connection_destination_id_fkey"
151181
foreign key ("destination_id")
@@ -172,6 +202,11 @@ alter table "public"."state"
172202
references "public"."connection" ("id");
173203
create index "actor_actor_definition_id_idx" on "public"."actor"("actor_definition_id" asc);
174204
create unique index "actor_pkey" on "public"."actor"("id" asc);
205+
create index "actor_catalog_catalog_hash_id_idx" on "public"."actor_catalog"("catalog_hash" asc);
206+
create unique index "actor_catalog_pkey" on "public"."actor_catalog"("id" asc);
207+
create index "actor_catalog_fetch_event_actor_catalog_id_idx" on "public"."actor_catalog_fetch_event"("actor_catalog_id" asc);
208+
create index "actor_catalog_fetch_event_actor_id_idx" on "public"."actor_catalog_fetch_event"("actor_id" asc);
209+
create unique index "actor_catalog_fetch_event_pkey" on "public"."actor_catalog_fetch_event"("id" asc);
175210
create unique index "actor_definition_pkey" on "public"."actor_definition"("id" asc);
176211
create unique index "actor_oauth_parameter_pkey" on "public"."actor_oauth_parameter"("id" asc);
177212
create unique index "airbyte_configs_migrations_pk" on "public"."airbyte_configs_migrations"("installed_rank" asc);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.instance.configs.migrations;
6+
7+
import io.airbyte.db.Database;
8+
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest;
9+
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType;
10+
import java.io.IOException;
11+
import java.sql.SQLException;
12+
import java.time.OffsetDateTime;
13+
import java.util.UUID;
14+
import org.jooq.DSLContext;
15+
import org.jooq.JSONB;
16+
import org.jooq.impl.DSL;
17+
import org.junit.jupiter.api.Assertions;
18+
import org.junit.jupiter.api.Test;
19+
20+
class V0_35_26_001__PersistDiscoveredCatalogTest extends AbstractConfigsDatabaseTest {
21+
22+
@Test
23+
public void test() throws SQLException, IOException {
24+
25+
final Database database = getDatabase();
26+
final DSLContext context = DSL.using(database.getDataSource().getConnection());
27+
V0_32_8_001__AirbyteConfigDatabaseDenormalization.migrate(context);
28+
V0_35_26_001__PersistDiscoveredCatalog.migrate(context);
29+
assertCanInsertData(context);
30+
}
31+
32+
private void assertCanInsertData(final DSLContext ctx) {
33+
Assertions.assertDoesNotThrow(() -> {
34+
final UUID catalogId = UUID.randomUUID();
35+
final UUID actorId = UUID.randomUUID();
36+
final UUID actorDefinitionId = UUID.randomUUID();
37+
final UUID workspaceId = UUID.randomUUID();
38+
39+
ctx.insertInto(DSL.table("workspace"))
40+
.columns(
41+
DSL.field("id"),
42+
DSL.field("name"),
43+
DSL.field("slug"),
44+
DSL.field("initial_setup_complete"))
45+
.values(
46+
workspaceId,
47+
"default",
48+
"default",
49+
true)
50+
.execute();
51+
ctx.insertInto(DSL.table("actor_definition"))
52+
.columns(
53+
DSL.field("id"),
54+
DSL.field("name"),
55+
DSL.field("docker_repository"),
56+
DSL.field("docker_image_tag"),
57+
DSL.field("actor_type"),
58+
DSL.field("spec"))
59+
.values(
60+
actorDefinitionId,
61+
"name",
62+
"repo",
63+
"1.0.0",
64+
ActorType.source,
65+
JSONB.valueOf("{}"))
66+
.execute();
67+
ctx.insertInto(DSL.table("actor"))
68+
.columns(
69+
DSL.field("id"),
70+
DSL.field("workspace_id"),
71+
DSL.field("actor_definition_id"),
72+
DSL.field("name"),
73+
DSL.field("configuration"),
74+
DSL.field("actor_type"),
75+
DSL.field("created_at"),
76+
DSL.field("updated_at"))
77+
.values(
78+
actorId,
79+
workspaceId,
80+
actorDefinitionId,
81+
"some actor",
82+
JSONB.valueOf("{}"),
83+
ActorType.source,
84+
OffsetDateTime.now(),
85+
OffsetDateTime.now())
86+
.execute();
87+
ctx.insertInto(DSL.table("actor_catalog"))
88+
.columns(
89+
DSL.field("id"),
90+
DSL.field("catalog"),
91+
DSL.field("catalog_hash"),
92+
DSL.field("created_at"))
93+
.values(
94+
catalogId,
95+
JSONB.valueOf("{}"),
96+
"",
97+
OffsetDateTime.now())
98+
.execute();
99+
ctx.insertInto(DSL.table("actor_catalog_fetch_event"))
100+
.columns(
101+
DSL.field("id"),
102+
DSL.field("actor_catalog_id"),
103+
DSL.field("actor_id"),
104+
DSL.field("config_hash"),
105+
DSL.field("actor_version"))
106+
.values(
107+
UUID.randomUUID(),
108+
catalogId,
109+
actorId,
110+
"",
111+
"2.0.1")
112+
.execute();
113+
});
114+
}
115+
116+
}

0 commit comments

Comments
 (0)