Skip to content

Add migrations to support progress bar. #19191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.14.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.18.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.currentOffsetDateTime;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;
import static org.jooq.impl.DSL.unique;

import java.time.OffsetDateTime;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The estimated columns contains the overall estimated records and bytes for an attempt.
* <p>
* The new stream_stats table contains the estimated and emitted records/bytes for an attempt at the
* per-stream level. This lets us track per-stream stats as an attempt is in progress.
*/
public class V0_40_18_001__AddProgressBarStats extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_18_001__AddProgressBarStats.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
try (final DSLContext ctx = DSL.using(context.getConnection())) {
addEstimatedColumnsToSyncStats(ctx);
addStreamStatsTable(ctx);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heads up @gosusnp I spoke to @benmoriceau and decided creating a new table now is lower effort than trying to sidestep the potential migration when we have parallel syncs. Letting you know in case you want to re-review.

}
}

private static void addEstimatedColumnsToSyncStats(final DSLContext ctx) {
ctx.alterTable("sync_stats")
.add(
field("estimated_records", SQLDataType.BIGINT.nullable(true)),
field("estimated_bytes", SQLDataType.BIGINT.nullable(true)))
.execute();
}

private static void addStreamStatsTable(final DSLContext ctx) {
// Metadata Columns
final Field<UUID> id = field("id", SQLDataType.UUID.nullable(false));
final Field<Integer> attemptId = field("attempt_id", SQLDataType.INTEGER.nullable(false));
final Field<String> streamName = field("stream_name", SQLDataType.VARCHAR.nullable(false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add stream_namespace as well since stream_name could collision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

@evantahler stream_namespace is something we might need to change on the FE too.


// Stats Columns
final Field<Long> recordsEmitted = field("records_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> bytesEmitted = field("bytes_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> estimatedRecords = field("estimated_records", SQLDataType.BIGINT.nullable(true));
final Field<Long> estimatedBytes = field("estimated_bytes", SQLDataType.BIGINT.nullable(true));

// Time Columns
final Field<OffsetDateTime> createdAt =
field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
final Field<OffsetDateTime> updatedAt =
field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));

ctx.createTableIfNotExists("stream_stats")
.columns(
id, attemptId, streamName, recordsEmitted, bytesEmitted, estimatedRecords, estimatedBytes, createdAt, updatedAt)
.constraints(
primaryKey(id),
foreignKey(attemptId).references("attempts", "id").onDeleteCascade(),
// Prevent duplicate stat records of the same stream and attempt.
unique("attempt_id", "stream_name"))
.execute();

// Create an index on attempt_id, since all read queries on this table as of this migration will be
// WHERE clauses on the attempt id.
ctx.createIndex("index").on("stream_stats", "attempt_id").execute();

}

}
25 changes: 25 additions & 0 deletions airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ create table "public"."normalization_summaries"(
constraint "normalization_summaries_pkey"
primary key ("id")
);
create table "public"."stream_stats"(
"id" uuid not null,
"attempt_id" int4 not null,
"stream_name" varchar(2147483647) not null,
"records_emitted" int8 null,
"bytes_emitted" int8 null,
"estimated_records" int8 null,
"estimated_bytes" int8 null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
constraint "stream_stats_pkey"
primary key ("id")
);
create table "public"."sync_stats"(
"id" uuid not null,
"attempt_id" int8 not null,
Expand All @@ -75,13 +88,19 @@ create table "public"."sync_stats"(
"max_seconds_between_state_message_emitted_and_committed" int8 null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
"estimated_records" int8 null,
"estimated_bytes" int8 null,
constraint "sync_stats_pkey"
primary key ("id")
);
alter table "public"."normalization_summaries"
add constraint "normalization_summaries_attempt_id_fkey"
foreign key ("attempt_id")
references "public"."attempts" ("id");
alter table "public"."stream_stats"
add constraint "stream_stats_attempt_id_fkey"
foreign key ("attempt_id")
references "public"."attempts" ("id");
alter table "public"."sync_stats"
add constraint "sync_stats_attempt_id_fkey"
foreign key ("attempt_id")
Expand All @@ -99,5 +118,11 @@ create unique index "jobs_pkey" on "public"."jobs"("id" asc);
create index "jobs_scope_idx" on "public"."jobs"("scope" asc);
create unique index "normalization_summaries_pkey" on "public"."normalization_summaries"("id" asc);
create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc);
create index "index" on "public"."stream_stats"("attempt_id" asc);
create unique index "stream_stats_attempt_id_stream_name_key" on "public"."stream_stats"(
"attempt_id" asc,
"stream_name" asc
);
create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc);
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);