Skip to content

🐛 Save Temporal Workflow Id to the DB instead of the Workspace volume. #5850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db.instance.jobs.migrations;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;

public class V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts extends BaseJavaMigration {

@Override
public void migrate(Context context) throws Exception {
// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
DSLContext ctx = DSL.using(context.getConnection());
ctx.alterTable("attempts").addColumn(DSL.field("temporalWorkflowId", SQLDataType.VARCHAR.nullable(false).defaultValue("")))
.execute();
}

}
2 changes: 2 additions & 0 deletions airbyte-db/lib/src/main/resources/jobs_database/Attempts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ properties:
type: ["null", object]
status:
type: string
temporalWorkflowId:
type: string
created_at:
# todo should be datetime.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ create table "public"."attempts"(
"created_at" timestamptz(35) null,
"updated_at" timestamptz(35) null,
"ended_at" timestamptz(35) null,
"temporalworkflowid" varchar(2147483647) not null default '''''::character varying',
constraint "attempts_pkey"
primary key ("id")
);
Expand Down
1 change: 1 addition & 0 deletions airbyte-scheduler/persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ dependencies {
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-scheduler:models')

testImplementation "org.flywaydb:flyway-core:7.14.0"
testImplementation "org.testcontainers:postgresql:1.15.1"
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,29 @@ public void succeedAttempt(long jobId, int attemptNumber) throws IOException {
});
}

@Override
public void setAttemptTemporalWorkflowId(long jobId, int attemptNumber, String temporalWorkflowId) throws IOException {
database.query(ctx -> ctx.execute(
" UPDATE attempts SET temporalWorkflowId = ? WHERE job_id = ? AND attempt_number = ?",
temporalWorkflowId,
jobId,
attemptNumber));
}

@Override
public String getAttemptTemporalWorkflowId(long jobId, int attemptNumber) throws IOException {
var result = database.query(ctx -> ctx.fetch(
" SELECT temporalWorkflowId from attempts WHERE job_id = ? AND attempt_number = ?",
jobId,
attemptNumber)).stream().findFirst();

if (result.isEmpty()) {
throw new RuntimeException("Unable to find attempt and retrieve temporalWorkflowId.");
}

return result.get().get("temporalworkflowid", String.class);
}

@Override
public <T> void writeOutput(long jobId, int attemptNumber, T output) throws IOException {
final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public interface JobPersistence {
// END OF LIFECYCLE
//

void setAttemptTemporalWorkflowId(long jobId, int attemptNumber, String temporalWorkflowId) throws IOException;

String getAttemptTemporalWorkflowId(long jobId, int attemptNumber) throws IOException;

<T> void writeOutput(long jobId, int attemptNumber, T output) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.State;
import io.airbyte.db.Database;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
Expand Down Expand Up @@ -161,6 +163,10 @@ public void setup() throws Exception {
database = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
resetDb();

DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(database, "test");
jobDbMigrator.createBaseline();
jobDbMigrator.migrate();

timeSupplier = mock(Supplier.class);
when(timeSupplier.get()).thenReturn(NOW);

Expand Down Expand Up @@ -339,6 +345,43 @@ private long createJobAt(Instant created_at) throws IOException {
return jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
}

@Nested
class TemporalWorkflowId {

@Test
void testSuccessfulGet() throws IOException, SQLException {
var jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
var attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);

var defaultWorkflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, attemptNumber);
assertEquals("", defaultWorkflowId);

database.query(ctx -> ctx.execute(
"UPDATE attempts SET temporalWorkflowId = '56a81f3a-006c-42d7-bce2-29d675d08ea4' WHERE job_id = ? AND attempt_number =?", jobId,
attemptNumber));
var workflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, attemptNumber);
assertEquals(workflowId, "56a81f3a-006c-42d7-bce2-29d675d08ea4");
}

@Test
void testGetMissingAttempt() {
assertThrows(RuntimeException.class, () -> jobPersistence.getAttemptTemporalWorkflowId(0, 0));
}

@Test
void testSuccessfulSet() throws IOException {
long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
var attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
var temporalWorkflowId = "test-id-usually-uuid";

jobPersistence.setAttemptTemporalWorkflowId(jobId, attemptNumber, temporalWorkflowId);

var workflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, attemptNumber);
assertEquals(workflowId, temporalWorkflowId);
}

}

@Nested
class GetAndSetVersion {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ public JobInfoRead cancelJob(JobIdRequestBody jobIdRequestBody) throws IOExcepti
// behavior
final Path attemptParentDir = WorkerUtils.getJobRoot(workspaceRoot, String.valueOf(jobId), 0L).getParent();
final String workflowId = IOs.readFile(attemptParentDir, TemporalAttemptExecution.WORKFLOW_ID_FILENAME);

// instead of reading from a shared volume, pull this info from the database.

final WorkflowExecution workflowExecution = WorkflowExecution.newBuilder()
.setWorkflowId(workflowId)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public OUTPUT get() {
LOGGER.debug("Creating local workspace directory..");
jobRootDirCreator.accept(jobRoot);

// This is actually used in cancellation.
// Can we write this to the database? As part of the attempt?
// DefaultJobPersistence.setLatestAttemptWorkflowId(jobId, workflowId) to get the latest attempt
final String workflowId = workflowIdProvider.get();
final Path workflowIdFile = jobRoot.getParent().resolve(WORKFLOW_ID_FILENAME);
IOs.writeFile(workflowIdFile, workflowId);
Expand Down