From a5e0818b3d52af760efa04f4e97d66264559134f Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Thu, 27 Jan 2022 07:29:30 -0800 Subject: [PATCH] configure temporal workflow execution ttl --- .../java/io/airbyte/workers/WorkerApp.java | 2 ++ .../workers/temporal/TemporalUtils.java | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index e27dd65e3130e..8aad702844c7b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -323,6 +323,8 @@ public static void main(final String[] args) throws IOException, InterruptedExce final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + TemporalUtils.configureTemporalNamespace(temporalService); + final Database configDatabase = new ConfigsDatabaseInstance( configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index 2793f24550a2f..85a69ca141570 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -9,9 +9,12 @@ import io.airbyte.commons.lang.Exceptions; import io.airbyte.scheduler.models.JobRunConfig; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.namespace.v1.NamespaceConfig; import io.temporal.api.namespace.v1.NamespaceInfo; +import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest; import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse; import io.temporal.api.workflowservice.v1.ListNamespacesRequest; +import io.temporal.api.workflowservice.v1.UpdateNamespaceRequest; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; @@ -20,9 +23,11 @@ import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.workflow.Functions; import java.io.Serializable; +import java.time.Duration; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +51,30 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo public static final String DEFAULT_NAMESPACE = "default"; + private static final Duration WORKFLOW_EXECUTION_TTL = Duration.ofDays(7); + private static final String HUMAN_READABLE_WORKFLOW_EXECUTION_TTL = + DurationFormatUtils.formatDurationWords(WORKFLOW_EXECUTION_TTL.toMillis(), true, true); + + public static void configureTemporalNamespace(WorkflowServiceStubs temporalService) { + final var client = temporalService.blockingStub(); + final var describeNamespaceRequest = DescribeNamespaceRequest.newBuilder().setNamespace(DEFAULT_NAMESPACE).build(); + final var currentRetentionGrpcDuration = client.describeNamespace(describeNamespaceRequest).getConfig().getWorkflowExecutionRetentionTtl(); + final var currentRetention = Duration.ofSeconds(currentRetentionGrpcDuration.getSeconds()); + + if (currentRetention.equals(WORKFLOW_EXECUTION_TTL)) { + LOGGER.info("Workflow execution TTL already set for namespace " + DEFAULT_NAMESPACE + ". Remains unchanged as: " + + HUMAN_READABLE_WORKFLOW_EXECUTION_TTL); + } else { + final var newGrpcDuration = com.google.protobuf.Duration.newBuilder().setSeconds(WORKFLOW_EXECUTION_TTL.getSeconds()).build(); + final var humanReadableCurrentRetention = DurationFormatUtils.formatDurationWords(currentRetention.toMillis(), true, true); + final var namespaceConfig = NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(newGrpcDuration).build(); + final var updateNamespaceRequest = UpdateNamespaceRequest.newBuilder().setNamespace(DEFAULT_NAMESPACE).setConfig(namespaceConfig).build(); + LOGGER.info("Workflow execution TTL differs for namespace " + DEFAULT_NAMESPACE + ". Changing from (" + humanReadableCurrentRetention + ") to (" + + HUMAN_READABLE_WORKFLOW_EXECUTION_TTL + "). "); + client.updateNamespace(updateNamespaceRequest); + } + } + @FunctionalInterface public interface TemporalJobCreator {