Skip to content

Commit 4ef54ad

Browse files
authored
Sync stats migration (#16285)
* Sync stats migration
1 parent bc0d7cc commit 4ef54ad

File tree

5 files changed

+93
-6
lines changed

5 files changed

+93
-6
lines changed

airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ void testBootloaderAppBlankDb() throws Exception {
128128
bootloader.load();
129129

130130
val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
131-
assertEquals("0.35.62.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
131+
assertEquals("0.40.3.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
132132

133133
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
134134
// this line should change with every new migration
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.instance.jobs.migrations;
6+
7+
import static org.jooq.impl.DSL.currentOffsetDateTime;
8+
import static org.jooq.impl.DSL.foreignKey;
9+
import static org.jooq.impl.DSL.primaryKey;
10+
11+
import java.time.OffsetDateTime;
12+
import java.util.UUID;
13+
import org.flywaydb.core.api.migration.BaseJavaMigration;
14+
import org.flywaydb.core.api.migration.Context;
15+
import org.jooq.DSLContext;
16+
import org.jooq.Field;
17+
import org.jooq.impl.DSL;
18+
import org.jooq.impl.SQLDataType;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
public class V0_40_3_001__CreateSyncStats extends BaseJavaMigration {
23+
24+
private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_3_001__CreateSyncStats.class);
25+
26+
@Override
27+
public void migrate(final Context context) throws Exception {
28+
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
29+
final DSLContext ctx = DSL.using(context.getConnection());
30+
createSyncStatsTable(ctx);
31+
}
32+
33+
private static void createSyncStatsTable(final DSLContext ctx) {
34+
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
35+
final Field<Integer> attemptId = DSL.field("attempt_id", SQLDataType.INTEGER.nullable(false));
36+
final Field<Long> recordsEmitted = DSL.field("records_emitted", SQLDataType.BIGINT.nullable(true));
37+
final Field<Long> bytesEmitted = DSL.field("bytes_emitted", SQLDataType.BIGINT.nullable(true));
38+
final Field<Long> sourceStateMessagesEmitted = DSL.field("source_state_messages_emitted", SQLDataType.BIGINT.nullable(true));
39+
final Field<Long> destinationStateMessagesEmitted = DSL.field("destination_state_messages_emitted", SQLDataType.BIGINT.nullable(true));
40+
final Field<Long> recordsCommitted = DSL.field("records_committed", SQLDataType.BIGINT.nullable(true));
41+
final Field<Long> meanSecondsBeforeSourceStateMessageEmitted =
42+
DSL.field("mean_seconds_before_source_state_message_emitted", SQLDataType.BIGINT.nullable(true));
43+
final Field<Long> maxSecondsBeforeSourceStateMessageEmitted =
44+
DSL.field("max_seconds_before_source_state_message_emitted", SQLDataType.BIGINT.nullable(true));
45+
final Field<Long> meanSecondsBetweenStateMessageEmittedandCommitted =
46+
DSL.field("mean_seconds_between_state_message_emitted_and_committed", SQLDataType.BIGINT.nullable(true));
47+
final Field<Long> maxSecondsBetweenStateMessageEmittedandCommitted =
48+
DSL.field("max_seconds_between_state_message_emitted_and_committed", SQLDataType.BIGINT.nullable(true));
49+
final Field<OffsetDateTime> createdAt =
50+
DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
51+
final Field<OffsetDateTime> updatedAt =
52+
DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
53+
54+
ctx.createTableIfNotExists("sync_stats")
55+
.columns(id, attemptId, recordsEmitted, bytesEmitted, sourceStateMessagesEmitted, destinationStateMessagesEmitted, recordsCommitted,
56+
meanSecondsBeforeSourceStateMessageEmitted, maxSecondsBeforeSourceStateMessageEmitted, meanSecondsBetweenStateMessageEmittedandCommitted,
57+
maxSecondsBetweenStateMessageEmittedandCommitted, createdAt, updatedAt)
58+
.constraints(primaryKey(id), foreignKey(attemptId).references("attempts", "id").onDeleteCascade())
59+
.execute();
60+
61+
ctx.createIndex("attempt_id_idx").on("sync_stats", "attempt_id").execute();
62+
}
63+
64+
}

airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,27 @@ create table "public"."jobs"(
4949
constraint "jobs_pkey"
5050
primary key ("id")
5151
);
52+
create table "public"."sync_stats"(
53+
"id" uuid not null,
54+
"attempt_id" int4 not null,
55+
"records_emitted" int8 null,
56+
"bytes_emitted" int8 null,
57+
"source_state_messages_emitted" int8 null,
58+
"destination_state_messages_emitted" int8 null,
59+
"records_committed" int8 null,
60+
"mean_seconds_before_source_state_message_emitted" int8 null,
61+
"max_seconds_before_source_state_message_emitted" int8 null,
62+
"mean_seconds_between_state_message_emitted_and_committed" int8 null,
63+
"max_seconds_between_state_message_emitted_and_committed" int8 null,
64+
"created_at" timestamptz(35) not null default null,
65+
"updated_at" timestamptz(35) not null default null,
66+
constraint "sync_stats_pkey"
67+
primary key ("id")
68+
);
69+
alter table "public"."sync_stats"
70+
add constraint "sync_stats_attempt_id_fkey"
71+
foreign key ("attempt_id")
72+
references "public"."attempts" ("id");
5273
create unique index "airbyte_jobs_migrations_pk" on "public"."airbyte_jobs_migrations"("installed_rank" asc);
5374
create index "airbyte_jobs_migrations_s_idx" on "public"."airbyte_jobs_migrations"("success" asc);
5475
create unique index "airbyte_metadata_pkey" on "public"."airbyte_metadata"("key" asc);
@@ -60,3 +81,5 @@ create unique index "job_attempt_idx" on "public"."attempts"(
6081
create index "jobs_config_type_idx" on "public"."jobs"("config_type" asc);
6182
create unique index "jobs_pkey" on "public"."jobs"("id" asc);
6283
create index "jobs_scope_idx" on "public"."jobs"("scope" asc);
84+
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
85+
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);

airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public DefaultJobPersistence(final Database jobDatabase) {
118118
this(jobDatabase, Instant::now, 30, 500, 10);
119119
}
120120

121-
private static String jobSelectAndJoin(String jobsSubquery) {
121+
private static String jobSelectAndJoin(final String jobsSubquery) {
122122
return "SELECT\n"
123123
+ "jobs.id AS job_id,\n"
124124
+ "jobs.config_type AS config_type,\n"
@@ -802,7 +802,7 @@ private static void truncateTable(final DSLContext ctx, final String schema, fin
802802
final Table<Record> backupTableSql = getTable(backupSchema, tableName);
803803
ctx.dropTableIfExists(backupTableSql).execute();
804804
ctx.createTable(backupTableSql).as(DSL.select(DSL.asterisk()).from(tableSql)).withData().execute();
805-
ctx.truncateTable(tableSql).restartIdentity().execute();
805+
ctx.truncateTable(tableSql).restartIdentity().cascade().execute();
806806
}
807807

808808
/**

airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ void tearDown() throws Exception {
198198

199199
private void resetDb() throws SQLException {
200200
// todo (cgardens) - truncate whole db.
201-
jobDatabase.query(ctx -> ctx.truncateTable(JOBS).execute());
202-
jobDatabase.query(ctx -> ctx.truncateTable(ATTEMPTS).execute());
203-
jobDatabase.query(ctx -> ctx.truncateTable(AIRBYTE_METADATA).execute());
201+
jobDatabase.query(ctx -> ctx.truncateTable(JOBS).cascade().execute());
202+
jobDatabase.query(ctx -> ctx.truncateTable(ATTEMPTS).cascade().execute());
203+
jobDatabase.query(ctx -> ctx.truncateTable(AIRBYTE_METADATA).cascade().execute());
204204
}
205205

206206
private Result<Record> getJobRecord(final long jobId) throws SQLException {

0 commit comments

Comments
 (0)