Skip to content

Commit 3bd1e98

Browse files
xiaohansongdrewrasm
authored andcommitted
query to include data plane attributes (airbytehq#18531)
* query to include data plane attributes * rename functions * fix java build * more renaming fix
1 parent 709610a commit 3bd1e98

File tree

5 files changed

+181
-85
lines changed

5 files changed

+181
-85
lines changed

airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class MetricTags {
2020
public static final String RELEASE_STAGE = "release_stage";
2121
public static final String RESET_WORKFLOW_FAILURE_CAUSE = "failure_cause";
2222
public static final String WORKFLOW_TYPE = "workflow_type";
23+
public static final String ATTEMPT_QUEUE = "attempt_queue";
24+
public static final String GEOGRAPHY = "geography";
2325

2426
public static String getReleaseStage(final ReleaseStage stage) {
2527
return stage.getLiteral();

airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/Emitter.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ final class NumPendingJobs extends Emitter {
2020

2121
public NumPendingJobs(final MetricClient client, final MetricRepository db) {
2222
super(client, () -> {
23-
final var pending = db.numberOfPendingJobs();
24-
client.gauge(OssMetricsRegistry.NUM_PENDING_JOBS, pending);
23+
db.numberOfPendingJobsByGeography().forEach((geography, count) -> client.gauge(
24+
OssMetricsRegistry.NUM_PENDING_JOBS,
25+
count,
26+
new MetricAttribute(MetricTags.GEOGRAPHY, geography)));
27+
2528
return null;
2629
});
2730
}
@@ -33,8 +36,10 @@ final class NumRunningJobs extends Emitter {
3336

3437
public NumRunningJobs(final MetricClient client, final MetricRepository db) {
3538
super(client, () -> {
36-
final var running = db.numberOfRunningJobs();
37-
client.gauge(OssMetricsRegistry.NUM_RUNNING_JOBS, running);
39+
db.numberOfRunningJobsByTaskQueue().forEach((attemptQueue, count) -> client.gauge(
40+
OssMetricsRegistry.NUM_RUNNING_JOBS,
41+
count,
42+
new MetricAttribute(MetricTags.ATTEMPT_QUEUE, attemptQueue)));
3843
return null;
3944
});
4045
}
@@ -59,8 +64,10 @@ final class OldestRunningJob extends Emitter {
5964

6065
OldestRunningJob(final MetricClient client, final MetricRepository db) {
6166
super(client, () -> {
62-
final var age = db.oldestRunningJobAgeSecs();
63-
client.gauge(OssMetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age);
67+
db.oldestRunningJobAgeSecsByTaskQueue().forEach((attemptQueue, count) -> client.gauge(
68+
OssMetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS,
69+
count,
70+
new MetricAttribute(MetricTags.ATTEMPT_QUEUE, attemptQueue)));
6471
return null;
6572
});
6673
}
@@ -72,8 +79,10 @@ final class OldestPendingJob extends Emitter {
7279

7380
OldestPendingJob(final MetricClient client, final MetricRepository db) {
7481
super(client, () -> {
75-
final var age = db.oldestPendingJobAgeSecs();
76-
client.gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age);
82+
db.oldestPendingJobAgeSecsByGeography().forEach((geographyType, count) -> client.gauge(
83+
OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS,
84+
count,
85+
new MetricAttribute(MetricTags.GEOGRAPHY, geographyType.getLiteral())));
7786
return null;
7887
});
7988
}

airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/MetricRepository.java

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,15 @@
55
package io.airbyte.metrics.reporter;
66

77
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
8+
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
89
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
10+
import static org.jooq.impl.DSL.asterisk;
11+
import static org.jooq.impl.DSL.count;
912
import static org.jooq.impl.SQLDataType.VARCHAR;
1013

14+
import io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType;
1115
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
16+
import io.airbyte.db.instance.jobs.jooq.generated.enums.AttemptStatus;
1217
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
1318
import jakarta.inject.Singleton;
1419
import java.util.HashMap;
@@ -25,22 +30,32 @@ class MetricRepository {
2530
this.ctx = ctx;
2631
}
2732

28-
int numberOfPendingJobs() {
29-
return ctx.selectCount()
33+
Map<String, Integer> numberOfPendingJobsByGeography() {
34+
var result = ctx.select(CONNECTION.GEOGRAPHY.cast(String.class), count(asterisk()).as("count"))
3035
.from(JOBS)
36+
.join(CONNECTION)
37+
.on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
3138
.where(JOBS.STATUS.eq(JobStatus.pending))
32-
.fetchOne(0, int.class);
39+
.groupBy(CONNECTION.GEOGRAPHY);
40+
return (Map<String, Integer>) result.fetchMap(0, 1);
3341
}
3442

35-
int numberOfRunningJobs() {
36-
return ctx.selectCount()
43+
Map<String, Integer> numberOfRunningJobsByTaskQueue() {
44+
var result = ctx.select(ATTEMPTS.PROCESSING_TASK_QUEUE, count(asterisk()).as("count"))
3745
.from(JOBS)
3846
.join(CONNECTION)
3947
.on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
48+
.join(ATTEMPTS)
49+
.on(ATTEMPTS.JOB_ID.eq(JOBS.ID))
4050
.where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.eq(StatusType.active)))
41-
.fetchOne(0, int.class);
51+
.and(ATTEMPTS.STATUS.eq(AttemptStatus.running))
52+
.groupBy(ATTEMPTS.PROCESSING_TASK_QUEUE);
53+
return (Map<String, Integer>) result.fetchMap(0, 1);
54+
4255
}
4356

57+
// This is a rare case and not likely to be related to data planes; So we will monitor them as a
58+
// whole.
4459
int numberOfOrphanRunningJobs() {
4560
return ctx.selectCount()
4661
.from(JOBS)
@@ -50,12 +65,32 @@ int numberOfOrphanRunningJobs() {
5065
.fetchOne(0, int.class);
5166
}
5267

53-
long oldestPendingJobAgeSecs() {
54-
return oldestJobAgeSecs(JobStatus.pending);
68+
Map<GeographyType, Double> oldestPendingJobAgeSecsByGeography() {
69+
final var query =
70+
"""
71+
SELECT cast(connection.geography as varchar) AS geography, MAX(EXTRACT(EPOCH FROM (current_timestamp - jobs.created_at))) AS run_duration_seconds
72+
FROM jobs
73+
JOIN connection
74+
ON jobs.scope::uuid = connection.id
75+
WHERE jobs.status = 'pending'
76+
GROUP BY geography;
77+
""";
78+
final var result = ctx.fetch(query);
79+
return (Map<GeographyType, Double>) result.intoMap(0, 1);
5580
}
5681

57-
long oldestRunningJobAgeSecs() {
58-
return oldestJobAgeSecs(JobStatus.running);
82+
Map<String, Double> oldestRunningJobAgeSecsByTaskQueue() {
83+
final var query =
84+
"""
85+
SELECT attempts.processing_task_queue AS task_queue, MAX(EXTRACT(EPOCH FROM (current_timestamp - jobs.created_at))) AS run_duration_seconds
86+
FROM jobs
87+
JOIN attempts
88+
ON jobs.id = attempts.job_id
89+
WHERE jobs.status = 'running' AND attempts.status = 'running'
90+
GROUP BY task_queue;
91+
""";
92+
final var result = ctx.fetch(query);
93+
return (Map<String, Double>) result.intoMap(0, 1);
5994
}
6095

6196
List<Long> numberOfActiveConnPerWorkspace() {
@@ -218,18 +253,4 @@ SELECT status, extract(epoch from age(updated_at, created_at)) AS sec FROM jobs
218253
return results;
219254
}
220255

221-
private long oldestJobAgeSecs(final JobStatus status) {
222-
final var query = """
223-
SELECT id, EXTRACT(EPOCH FROM (current_timestamp - created_at)) AS run_duration_seconds
224-
FROM jobs WHERE status = ?::job_status
225-
ORDER BY created_at ASC limit 1;
226-
""";
227-
final var result = ctx.fetchOne(query, status.getLiteral());
228-
if (result == null) {
229-
return 0L;
230-
}
231-
// as double can have rounding errors, round down to remove noise.
232-
return result.getValue("run_duration_seconds", Double.class).longValue();
233-
}
234-
235256
}

airbyte-metrics/reporter/src/test/java/io/airbyte/metrics/reporter/EmitterTest.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static org.mockito.Mockito.verify;
1010
import static org.mockito.Mockito.when;
1111

12+
import io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType;
1213
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
1314
import io.airbyte.metrics.lib.MetricAttribute;
1415
import io.airbyte.metrics.lib.MetricClient;
@@ -25,6 +26,12 @@ class EmitterTest {
2526
private MetricClient client;
2627
private MetricRepository repo;
2728

29+
private static final String SYNC_QUEUE = "SYNC";
30+
private static final String AWS_QUEUE = "AWS";
31+
32+
private static final String EU_REGION = "EU";
33+
private static final String AUTO_REGION = "AUTO";
34+
2835
@BeforeEach
2936
void setUp() {
3037
client = mock(MetricClient.class);
@@ -33,29 +40,35 @@ void setUp() {
3340

3441
@Test
3542
void TestNumPendingJobs() {
36-
final var value = 101;
37-
when(repo.numberOfPendingJobs()).thenReturn(value);
43+
final var value = Map.of("AUTO", 101, "EU", 20);
44+
when(repo.numberOfPendingJobsByGeography()).thenReturn(value);
3845

3946
final var emitter = new NumPendingJobs(client, repo);
4047
emitter.Emit();
4148

4249
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
43-
verify(repo).numberOfPendingJobs();
44-
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, value);
50+
verify(repo).numberOfPendingJobsByGeography();
51+
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 101,
52+
new MetricAttribute(MetricTags.GEOGRAPHY, "AUTO"));
53+
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 20,
54+
new MetricAttribute(MetricTags.GEOGRAPHY, "EU"));
4555
verify(client).count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
4656
}
4757

4858
@Test
4959
void TestNumRunningJobs() {
50-
final var value = 101;
51-
when(repo.numberOfRunningJobs()).thenReturn(value);
60+
final var value = Map.of(SYNC_QUEUE, 101, AWS_QUEUE, 20);
61+
when(repo.numberOfRunningJobsByTaskQueue()).thenReturn(value);
5262

5363
final var emitter = new NumRunningJobs(client, repo);
5464
emitter.Emit();
5565

5666
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
57-
verify(repo).numberOfRunningJobs();
58-
verify(client).gauge(OssMetricsRegistry.NUM_RUNNING_JOBS, value);
67+
verify(repo).numberOfRunningJobsByTaskQueue();
68+
verify(client).gauge(OssMetricsRegistry.NUM_RUNNING_JOBS, 101,
69+
new MetricAttribute(MetricTags.ATTEMPT_QUEUE, SYNC_QUEUE));
70+
verify(client).gauge(OssMetricsRegistry.NUM_RUNNING_JOBS, 20,
71+
new MetricAttribute(MetricTags.ATTEMPT_QUEUE, AWS_QUEUE));
5972
verify(client).count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
6073
}
6174

@@ -75,29 +88,36 @@ void TestNumOrphanRunningJobs() {
7588

7689
@Test
7790
void TestOldestRunningJob() {
78-
final var value = 101;
79-
when(repo.oldestRunningJobAgeSecs()).thenReturn((long) value);
91+
final var value = Map.of(SYNC_QUEUE, 101.0, AWS_QUEUE, 20.0);
92+
when(repo.oldestRunningJobAgeSecsByTaskQueue()).thenReturn(value);
8093

8194
final var emitter = new OldestRunningJob(client, repo);
8295
emitter.Emit();
8396

8497
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
85-
verify(repo).oldestRunningJobAgeSecs();
86-
verify(client).gauge(OssMetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, value);
98+
verify(repo).oldestRunningJobAgeSecsByTaskQueue();
99+
verify(client).gauge(OssMetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, 101,
100+
new MetricAttribute(MetricTags.ATTEMPT_QUEUE, SYNC_QUEUE));
101+
verify(client).gauge(OssMetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, 20,
102+
new MetricAttribute(MetricTags.ATTEMPT_QUEUE, AWS_QUEUE));
87103
verify(client).count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
88104
}
89105

90106
@Test
91107
void TestOldestPendingJob() {
92-
final var value = 101;
93-
when(repo.oldestPendingJobAgeSecs()).thenReturn((long) value);
108+
final var value = Map.of(GeographyType.AUTO, 101.0, GeographyType.EU, 20.0);
109+
when(repo.oldestPendingJobAgeSecsByGeography()).thenReturn(value);
94110

95111
final var emitter = new OldestPendingJob(client, repo);
96112
emitter.Emit();
97113

98114
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
99-
verify(repo).oldestPendingJobAgeSecs();
100-
verify(client).gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, value);
115+
verify(repo).oldestPendingJobAgeSecsByGeography();
116+
verify(client).gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, 101,
117+
new MetricAttribute(MetricTags.GEOGRAPHY, AUTO_REGION));
118+
verify(client).gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, 20,
119+
new MetricAttribute(MetricTags.GEOGRAPHY, EU_REGION));
120+
101121
verify(client).count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
102122
}
103123

0 commit comments

Comments
 (0)