Skip to content

Commit bde7fe9

Browse files
authored
Metric fixes on multi cloud (#19268)
* fix numpending secs * fix worker
1 parent c755439 commit bde7fe9

File tree

4 files changed

+20
-20
lines changed

4 files changed

+20
-20
lines changed

airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/Emitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ final class OldestPendingJob extends Emitter {
8282
db.oldestPendingJobAgeSecsByGeography().forEach((geographyType, count) -> client.gauge(
8383
OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS,
8484
count,
85-
new MetricAttribute(MetricTags.GEOGRAPHY, geographyType.getLiteral())));
85+
new MetricAttribute(MetricTags.GEOGRAPHY, geographyType)));
8686
return null;
8787
});
8888
}

airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/MetricRepository.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import static org.jooq.impl.DSL.count;
1212
import static org.jooq.impl.SQLDataType.VARCHAR;
1313

14-
import io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType;
1514
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
1615
import io.airbyte.db.instance.jobs.jooq.generated.enums.AttemptStatus;
1716
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
@@ -65,7 +64,7 @@ int numberOfOrphanRunningJobs() {
6564
.fetchOne(0, int.class);
6665
}
6766

68-
Map<GeographyType, Double> oldestPendingJobAgeSecsByGeography() {
67+
Map<String, Double> oldestPendingJobAgeSecsByGeography() {
6968
final var query =
7069
"""
7170
SELECT cast(connection.geography as varchar) AS geography, MAX(EXTRACT(EPOCH FROM (current_timestamp - jobs.created_at))) AS run_duration_seconds
@@ -76,7 +75,7 @@ SELECT cast(connection.geography as varchar) AS geography, MAX(EXTRACT(EPOCH FRO
7675
GROUP BY geography;
7776
""";
7877
final var result = ctx.fetch(query);
79-
return (Map<GeographyType, Double>) result.intoMap(0, 1);
78+
return (Map<String, Double>) result.intoMap(0, 1);
8079
}
8180

8281
Map<String, Double> oldestRunningJobAgeSecsByTaskQueue() {

airbyte-metrics/reporter/src/test/java/io/airbyte/metrics/reporter/EmitterTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import static org.mockito.Mockito.verify;
1010
import static org.mockito.Mockito.when;
1111

12-
import io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType;
1312
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
1413
import io.airbyte.metrics.lib.MetricAttribute;
1514
import io.airbyte.metrics.lib.MetricClient;
@@ -40,7 +39,7 @@ void setUp() {
4039

4140
@Test
4241
void TestNumPendingJobs() {
43-
final var value = Map.of("AUTO", 101, "EU", 20);
42+
final var value = Map.of(AUTO_REGION, 101, EU_REGION, 20);
4443
when(repo.numberOfPendingJobsByGeography()).thenReturn(value);
4544

4645
final var emitter = new NumPendingJobs(client, repo);
@@ -49,9 +48,9 @@ void TestNumPendingJobs() {
4948
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
5049
verify(repo).numberOfPendingJobsByGeography();
5150
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 101,
52-
new MetricAttribute(MetricTags.GEOGRAPHY, "AUTO"));
51+
new MetricAttribute(MetricTags.GEOGRAPHY, AUTO_REGION));
5352
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 20,
54-
new MetricAttribute(MetricTags.GEOGRAPHY, "EU"));
53+
new MetricAttribute(MetricTags.GEOGRAPHY, EU_REGION));
5554
verify(client).count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
5655
}
5756

@@ -105,7 +104,7 @@ void TestOldestRunningJob() {
105104

106105
@Test
107106
void TestOldestPendingJob() {
108-
final var value = Map.of(GeographyType.AUTO, 101.0, GeographyType.EU, 20.0);
107+
final var value = Map.of(AUTO_REGION, 101.0, EU_REGION, 20.0);
109108
when(repo.oldestPendingJobAgeSecsByGeography()).thenReturn(value);
110109

111110
final var emitter = new OldestPendingJob(client, repo);

airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class TemporalAttemptExecution<INPUT, OUTPUT> implements Supplier<OUTPUT>
5151
private final Supplier<String> workflowIdProvider;
5252
private final AirbyteApiClient airbyteApiClient;
5353
private final String airbyteVersion;
54-
private final Optional<String> workflowTaskQueue;
54+
private final Optional<String> replicationTaskQueue;
5555

5656
public TemporalAttemptExecution(final Path workspaceRoot,
5757
final WorkerEnvironment workerEnvironment,
@@ -86,7 +86,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,
8686
final AirbyteApiClient airbyteApiClient,
8787
final String airbyteVersion,
8888
final Supplier<ActivityExecutionContext> activityContext,
89-
final Optional<String> workflowTaskQueue) {
89+
final Optional<String> replicationTaskQueue) {
9090
this(
9191
workspaceRoot, workerEnvironment, logConfigs,
9292
jobRunConfig,
@@ -97,7 +97,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,
9797
airbyteApiClient,
9898
() -> activityContext.get().getInfo().getWorkflowId(),
9999
airbyteVersion,
100-
workflowTaskQueue);
100+
replicationTaskQueue);
101101
}
102102

103103
@VisibleForTesting
@@ -112,7 +112,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,
112112
final AirbyteApiClient airbyteApiClient,
113113
final Supplier<String> workflowIdProvider,
114114
final String airbyteVersion,
115-
final Optional<String> workflowTaskQueue) {
115+
final Optional<String> replicationTaskQueue) {
116116
this.jobRunConfig = jobRunConfig;
117117

118118
this.jobRoot = TemporalUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
@@ -124,7 +124,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,
124124

125125
this.airbyteApiClient = airbyteApiClient;
126126
this.airbyteVersion = airbyteVersion;
127-
this.workflowTaskQueue = workflowTaskQueue;
127+
this.replicationTaskQueue = replicationTaskQueue;
128128
}
129129

130130
@Override
@@ -139,8 +139,6 @@ public OUTPUT get() {
139139
}
140140

141141
LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersion);
142-
// TODO(Davin): This will eventually run into scaling problems, since it opens a DB connection per
143-
// workflow. See https://github.com/airbytehq/airbyte/issues/5936.
144142
saveWorkflowIdForCancellation(airbyteApiClient);
145143

146144
final Worker<INPUT, OUTPUT> worker = workerSupplier.get();
@@ -170,14 +168,18 @@ public OUTPUT get() {
170168
private void saveWorkflowIdForCancellation(final AirbyteApiClient airbyteApiClient) throws ApiException {
171169
// If the jobId is not a number, it means the job is a synchronous job. No attempt is created for
172170
// it, and it cannot be cancelled, so do not save the workflowId. See
173-
// SynchronousSchedulerClient.java
174-
// for info.
175-
if (NumberUtils.isCreatable(jobRunConfig.getJobId())) {
171+
// SynchronousSchedulerClient.java for info.
172+
//
173+
// At this moment(Nov 2022), we decide to save workflowId for cancellation purpose only at
174+
// replication activity level. We know now the only async workflow is SyncWorkflow,
175+
// and under the same workflow, the workflowId would stay the same,
176+
// so it's not needed to save it for multiple times.
177+
if (NumberUtils.isCreatable(jobRunConfig.getJobId()) && replicationTaskQueue.isPresent()) {
176178
final String workflowId = workflowIdProvider.get();
177179
airbyteApiClient.getAttemptApi().setWorkflowInAttempt(new SetWorkflowInAttemptRequestBody()
178180
.jobId(Long.parseLong(jobRunConfig.getJobId()))
179181
.attemptNumber(jobRunConfig.getAttemptId().intValue())
180-
.processingTaskQueue(workflowTaskQueue.orElse(""))
182+
.processingTaskQueue(replicationTaskQueue.get())
181183
.workflowId(workflowId));
182184
}
183185
}

0 commit comments

Comments
 (0)