Skip to content

Commit 8a0d12f

Browse files
authored
Add migrations to support progress bar. (#19191)
Follow up to #18953. Implement all the DB migrations required for a progress bar. The main change here is to support saving: the estimated records/bytes at the sync level the estimated records/bytes and emitted records/bytes at the stream level After this, I'll put up a PR for the persistence layer changes, which will writing to and reading from these columns. Finally, I'll wire this into the API changes, which are currently stubs. - add the estimated_records and estimated_bytes columns to the SyncStats table. - create a stream_stats table - estimated and emitted records/bytes column - contains attempt_id and stream_name columns. Unique constraints on these two columns. - foreign key to the attempt_id table. - this table hopefully sets us up for the parallel sync work.
1 parent 33227f5 commit 8a0d12f

File tree

3 files changed

+117
-1
lines changed

3 files changed

+117
-1
lines changed

airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ void testBootloaderAppBlankDb() throws Exception {
136136
bootloader.load();
137137

138138
val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
139-
assertEquals("0.40.18.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
139+
assertEquals("0.40.18.002", jobsMigrator.getLatestMigration().getVersion().getVersion());
140140

141141
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
142142
// this line should change with every new migration
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.instance.jobs.migrations;
6+
7+
import static org.jooq.impl.DSL.currentOffsetDateTime;
8+
import static org.jooq.impl.DSL.field;
9+
import static org.jooq.impl.DSL.foreignKey;
10+
import static org.jooq.impl.DSL.primaryKey;
11+
import static org.jooq.impl.DSL.unique;
12+
13+
import java.time.OffsetDateTime;
14+
import java.util.UUID;
15+
import org.flywaydb.core.api.migration.BaseJavaMigration;
16+
import org.flywaydb.core.api.migration.Context;
17+
import org.jooq.DSLContext;
18+
import org.jooq.Field;
19+
import org.jooq.impl.DSL;
20+
import org.jooq.impl.SQLDataType;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* The estimated columns contains the overall estimated records and bytes for an attempt.
26+
* <p>
27+
* The new stream_stats table contains the estimated and emitted records/bytes for an attempt at the
28+
* per-stream level. This lets us track per-stream stats as an attempt is in progress.
29+
*/
30+
public class V0_40_18_002__AddProgressBarStats extends BaseJavaMigration {
31+
32+
private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_18_002__AddProgressBarStats.class);
33+
34+
@Override
35+
public void migrate(final Context context) throws Exception {
36+
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
37+
38+
// Warning: please do not use any jOOQ generated code to write a migration.
39+
// As database schema changes, the generated jOOQ code can be deprecated. So
40+
// old migration may not compile if there is any generated code.
41+
try (final DSLContext ctx = DSL.using(context.getConnection())) {
42+
addEstimatedColumnsToSyncStats(ctx);
43+
addStreamStatsTable(ctx);
44+
}
45+
}
46+
47+
private static void addEstimatedColumnsToSyncStats(final DSLContext ctx) {
48+
ctx.alterTable("sync_stats")
49+
.add(
50+
field("estimated_records", SQLDataType.BIGINT.nullable(true)),
51+
field("estimated_bytes", SQLDataType.BIGINT.nullable(true)))
52+
.execute();
53+
}
54+
55+
private static void addStreamStatsTable(final DSLContext ctx) {
56+
// Metadata Columns
57+
final Field<UUID> id = field("id", SQLDataType.UUID.nullable(false));
58+
final Field<Integer> attemptId = field("attempt_id", SQLDataType.INTEGER.nullable(false));
59+
final Field<String> streamNamespace = field("stream_namespace", SQLDataType.VARCHAR.nullable(false));
60+
final Field<String> streamName = field("stream_name", SQLDataType.VARCHAR.nullable(false));
61+
62+
// Stats Columns
63+
final Field<Long> recordsEmitted = field("records_emitted", SQLDataType.BIGINT.nullable(true));
64+
final Field<Long> bytesEmitted = field("bytes_emitted", SQLDataType.BIGINT.nullable(true));
65+
final Field<Long> estimatedRecords = field("estimated_records", SQLDataType.BIGINT.nullable(true));
66+
final Field<Long> estimatedBytes = field("estimated_bytes", SQLDataType.BIGINT.nullable(true));
67+
68+
// Time Columns
69+
final Field<OffsetDateTime> createdAt =
70+
field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
71+
final Field<OffsetDateTime> updatedAt =
72+
field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
73+
74+
ctx.createTableIfNotExists("stream_stats")
75+
.columns(
76+
id, attemptId, streamNamespace, streamName, recordsEmitted, bytesEmitted, estimatedRecords, estimatedBytes, createdAt, updatedAt)
77+
.constraints(
78+
primaryKey(id),
79+
foreignKey(attemptId).references("attempts", "id").onDeleteCascade(),
80+
// Prevent duplicate stat records of the same stream and attempt.
81+
unique("attempt_id", "stream_name"))
82+
.execute();
83+
84+
// Create an index on attempt_id, since all read queries on this table as of this migration will be
85+
// WHERE clauses on the attempt id.
86+
ctx.createIndex("index").on("stream_stats", "attempt_id").execute();
87+
88+
}
89+
90+
}

airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,20 @@ create table "public"."normalization_summaries"(
6161
constraint "normalization_summaries_pkey"
6262
primary key ("id")
6363
);
64+
create table "public"."stream_stats"(
65+
"id" uuid not null,
66+
"attempt_id" int4 not null,
67+
"stream_namespace" varchar(2147483647) not null,
68+
"stream_name" varchar(2147483647) not null,
69+
"records_emitted" int8 null,
70+
"bytes_emitted" int8 null,
71+
"estimated_records" int8 null,
72+
"estimated_bytes" int8 null,
73+
"created_at" timestamptz(35) not null default null,
74+
"updated_at" timestamptz(35) not null default null,
75+
constraint "stream_stats_pkey"
76+
primary key ("id")
77+
);
6478
create table "public"."sync_stats"(
6579
"id" uuid not null,
6680
"attempt_id" int8 not null,
@@ -75,13 +89,19 @@ create table "public"."sync_stats"(
7589
"max_seconds_between_state_message_emitted_and_committed" int8 null,
7690
"created_at" timestamptz(35) not null default null,
7791
"updated_at" timestamptz(35) not null default null,
92+
"estimated_records" int8 null,
93+
"estimated_bytes" int8 null,
7894
constraint "sync_stats_pkey"
7995
primary key ("id")
8096
);
8197
alter table "public"."normalization_summaries"
8298
add constraint "normalization_summaries_attempt_id_fkey"
8399
foreign key ("attempt_id")
84100
references "public"."attempts" ("id");
101+
alter table "public"."stream_stats"
102+
add constraint "stream_stats_attempt_id_fkey"
103+
foreign key ("attempt_id")
104+
references "public"."attempts" ("id");
85105
alter table "public"."sync_stats"
86106
add constraint "sync_stats_attempt_id_fkey"
87107
foreign key ("attempt_id")
@@ -101,5 +121,11 @@ create index "jobs_scope_idx" on "public"."jobs"("scope" asc);
101121
create index "jobs_status_idx" on "public"."jobs"("status" asc);
102122
create unique index "normalization_summaries_pkey" on "public"."normalization_summaries"("id" asc);
103123
create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc);
124+
create index "index" on "public"."stream_stats"("attempt_id" asc);
125+
create unique index "stream_stats_attempt_id_stream_name_key" on "public"."stream_stats"(
126+
"attempt_id" asc,
127+
"stream_name" asc
128+
);
129+
create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc);
104130
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
105131
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);

0 commit comments

Comments
 (0)