Skip to content

Commit 537da16

Browse files
authored
configure temporal workflow execution ttl (#9838)
1 parent c51608a commit 537da16

File tree

3 files changed

+31
-1
lines changed

3 files changed

+31
-1
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java

+2
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ public static void main(final String[] args) throws IOException, InterruptedExce
323323

324324
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
325325

326+
TemporalUtils.configureTemporalNamespace(temporalService);
327+
326328
final Database configDatabase = new ConfigsDatabaseInstance(
327329
configs.getConfigDatabaseUser(),
328330
configs.getConfigDatabasePassword(),

airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java

+28
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010
import io.airbyte.scheduler.models.JobRunConfig;
1111
import io.temporal.activity.Activity;
1212
import io.temporal.api.common.v1.WorkflowExecution;
13+
import io.temporal.api.namespace.v1.NamespaceConfig;
1314
import io.temporal.api.namespace.v1.NamespaceInfo;
15+
import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest;
1416
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
1517
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
18+
import io.temporal.api.workflowservice.v1.UpdateNamespaceRequest;
1619
import io.temporal.client.ActivityCompletionException;
1720
import io.temporal.client.WorkflowClient;
1821
import io.temporal.client.WorkflowOptions;
@@ -30,6 +33,7 @@
3033
import java.util.concurrent.Executors;
3134
import java.util.concurrent.ScheduledExecutorService;
3235
import java.util.concurrent.TimeUnit;
36+
import org.apache.commons.lang3.time.DurationFormatUtils;
3337
import org.apache.commons.lang3.tuple.ImmutablePair;
3438
import org.slf4j.Logger;
3539
import org.slf4j.LoggerFactory;
@@ -56,6 +60,30 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo
5660

5761
public static final String DEFAULT_NAMESPACE = "default";
5862

63+
private static final Duration WORKFLOW_EXECUTION_TTL = Duration.ofDays(7);
64+
private static final String HUMAN_READABLE_WORKFLOW_EXECUTION_TTL =
65+
DurationFormatUtils.formatDurationWords(WORKFLOW_EXECUTION_TTL.toMillis(), true, true);
66+
67+
public static void configureTemporalNamespace(WorkflowServiceStubs temporalService) {
68+
final var client = temporalService.blockingStub();
69+
final var describeNamespaceRequest = DescribeNamespaceRequest.newBuilder().setNamespace(DEFAULT_NAMESPACE).build();
70+
final var currentRetentionGrpcDuration = client.describeNamespace(describeNamespaceRequest).getConfig().getWorkflowExecutionRetentionTtl();
71+
final var currentRetention = Duration.ofSeconds(currentRetentionGrpcDuration.getSeconds());
72+
73+
if (currentRetention.equals(WORKFLOW_EXECUTION_TTL)) {
74+
LOGGER.info("Workflow execution TTL already set for namespace " + DEFAULT_NAMESPACE + ". Remains unchanged as: "
75+
+ HUMAN_READABLE_WORKFLOW_EXECUTION_TTL);
76+
} else {
77+
final var newGrpcDuration = com.google.protobuf.Duration.newBuilder().setSeconds(WORKFLOW_EXECUTION_TTL.getSeconds()).build();
78+
final var humanReadableCurrentRetention = DurationFormatUtils.formatDurationWords(currentRetention.toMillis(), true, true);
79+
final var namespaceConfig = NamespaceConfig.newBuilder().setWorkflowExecutionRetentionTtl(newGrpcDuration).build();
80+
final var updateNamespaceRequest = UpdateNamespaceRequest.newBuilder().setNamespace(DEFAULT_NAMESPACE).setConfig(namespaceConfig).build();
81+
LOGGER.info("Workflow execution TTL differs for namespace " + DEFAULT_NAMESPACE + ". Changing from (" + humanReadableCurrentRetention + ") to ("
82+
+ HUMAN_READABLE_WORKFLOW_EXECUTION_TTL + "). ");
83+
client.updateNamespace(updateNamespaceRequest);
84+
}
85+
}
86+
5987
@FunctionalInterface
6088
public interface TemporalJobCreator<T extends Serializable> {
6189

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class ActivityConfiguration {
3232
.setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS))
3333
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
3434
.setRetryOptions(TemporalUtils.NO_RETRY)
35-
.setHeartbeatTimeout(Duration.ofSeconds(30))
35+
.setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT)
3636
.build();
3737

3838
}

0 commit comments

Comments
 (0)