Skip to content

Commit c9287cc

Browse files
authored
multi cloud gauge metric fix (#19460)
* multi cloud gauge metric fix * avoid casting Map * improve code comments
1 parent 9a8b058 commit c9287cc

File tree

2 files changed

+73
-20
lines changed

2 files changed

+73
-20
lines changed

airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/MetricRepository.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
1010
import static org.jooq.impl.DSL.asterisk;
1111
import static org.jooq.impl.DSL.count;
12+
import static org.jooq.impl.DSL.name;
1213
import static org.jooq.impl.SQLDataType.VARCHAR;
1314

1415
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
@@ -19,28 +20,48 @@
1920
import java.util.List;
2021
import java.util.Map;
2122
import org.jooq.DSLContext;
23+
import org.jooq.Field;
24+
import org.jooq.impl.DSL;
2225

2326
@Singleton
2427
class MetricRepository {
2528

2629
private final DSLContext ctx;
2730

31+
// We have to report gauge metric with value 0 if they are not showing up in the DB,
32+
// otherwise datadog will use previous reported value.
33+
// Another option we didn't use here is to build this into SQL query - it will lead SQL much less
34+
// readable while not decreasing any complexity.
35+
private final static List<String> REGISTERED_ATTEMPT_QUEUE = List.of("SYNC", "AWS_PARIS_SYNC", "null");
36+
private final static List<String> REGISTERED_GEOGRAPHY = List.of("US", "AUTO", "EU");
37+
2838
MetricRepository(final DSLContext ctx) {
2939
this.ctx = ctx;
3040
}
3141

3242
Map<String, Integer> numberOfPendingJobsByGeography() {
33-
var result = ctx.select(CONNECTION.GEOGRAPHY.cast(String.class), count(asterisk()).as("count"))
43+
String geographyResultAlias = "geography";
44+
String countResultAlias = "result";
45+
var result = ctx.select(CONNECTION.GEOGRAPHY.cast(String.class).as(geographyResultAlias), count(asterisk()).as(countResultAlias))
3446
.from(JOBS)
3547
.join(CONNECTION)
3648
.on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
3749
.where(JOBS.STATUS.eq(JobStatus.pending))
3850
.groupBy(CONNECTION.GEOGRAPHY);
39-
return (Map<String, Integer>) result.fetchMap(0, 1);
51+
Field<String> geographyResultField = DSL.field(name(geographyResultAlias), String.class);
52+
Field<Integer> countResultField = DSL.field(name(countResultAlias), Integer.class);
53+
Map<String, Integer> queriedMap = result.fetchMap(geographyResultField, countResultField);
54+
for (final String potentialGeography : REGISTERED_GEOGRAPHY) {
55+
if (!queriedMap.containsKey(potentialGeography)) {
56+
queriedMap.put(potentialGeography, 0);
57+
}
58+
}
59+
return queriedMap;
4060
}
4161

4262
Map<String, Integer> numberOfRunningJobsByTaskQueue() {
43-
var result = ctx.select(ATTEMPTS.PROCESSING_TASK_QUEUE, count(asterisk()).as("count"))
63+
String countFieldName = "count";
64+
var result = ctx.select(ATTEMPTS.PROCESSING_TASK_QUEUE, count(asterisk()).as(countFieldName))
4465
.from(JOBS)
4566
.join(CONNECTION)
4667
.on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
@@ -49,8 +70,15 @@ Map<String, Integer> numberOfRunningJobsByTaskQueue() {
4970
.where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.eq(StatusType.active)))
5071
.and(ATTEMPTS.STATUS.eq(AttemptStatus.running))
5172
.groupBy(ATTEMPTS.PROCESSING_TASK_QUEUE);
52-
return (Map<String, Integer>) result.fetchMap(0, 1);
5373

74+
Field<Integer> countResultField = DSL.field(name(countFieldName), Integer.class);
75+
Map<String, Integer> queriedMap = result.fetchMap(ATTEMPTS.PROCESSING_TASK_QUEUE, countResultField);
76+
for (final String potentialAttemptQueue : REGISTERED_ATTEMPT_QUEUE) {
77+
if (!queriedMap.containsKey(potentialAttemptQueue)) {
78+
queriedMap.put(potentialAttemptQueue, 0);
79+
}
80+
}
81+
return queriedMap;
5482
}
5583

5684
// This is a rare case and not likely to be related to data planes; So we will monitor them as a
@@ -75,7 +103,15 @@ SELECT cast(connection.geography as varchar) AS geography, MAX(EXTRACT(EPOCH FRO
75103
GROUP BY geography;
76104
""";
77105
final var result = ctx.fetch(query);
78-
return (Map<String, Double>) result.intoMap(0, 1);
106+
Field<String> geographyResultField = DSL.field(name("geography"), String.class);
107+
Field<Double> runDurationSecondsField = DSL.field(name("run_duration_seconds"), Double.class);
108+
Map<String, Double> queriedMap = result.intoMap(geographyResultField, runDurationSecondsField);
109+
for (final String potentialGeography : REGISTERED_GEOGRAPHY) {
110+
if (!queriedMap.containsKey(potentialGeography)) {
111+
queriedMap.put(potentialGeography, 0.0);
112+
}
113+
}
114+
return queriedMap;
79115
}
80116

81117
Map<String, Double> oldestRunningJobAgeSecsByTaskQueue() {
@@ -89,7 +125,15 @@ SELECT attempts.processing_task_queue AS task_queue, MAX(EXTRACT(EPOCH FROM (cur
89125
GROUP BY task_queue;
90126
""";
91127
final var result = ctx.fetch(query);
92-
return (Map<String, Double>) result.intoMap(0, 1);
128+
Field<String> taskQueueResultField = DSL.field(name("task_queue"), String.class);
129+
Field<Double> runDurationSecondsField = DSL.field(name("run_duration_seconds"), Double.class);
130+
Map<String, Double> queriedMap = result.intoMap(taskQueueResultField, runDurationSecondsField);
131+
for (final String potentialAttemptQueue : REGISTERED_ATTEMPT_QUEUE) {
132+
if (!queriedMap.containsKey(potentialAttemptQueue)) {
133+
queriedMap.put(potentialAttemptQueue, 0.0);
134+
}
135+
}
136+
return queriedMap;
93137
}
94138

95139
List<Long> numberOfActiveConnPerWorkspace() {

airbyte-metrics/reporter/src/test/java/io/airbyte/metrics/reporter/MetricRepositoryTest.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static org.junit.jupiter.api.Assertions.assertEquals;
1919
import static org.junit.jupiter.api.Assertions.assertTrue;
2020

21-
import com.google.common.collect.Iterators;
2221
import io.airbyte.db.factory.DSLContextFactory;
2322
import io.airbyte.db.init.DatabaseInitializationException;
2423
import io.airbyte.db.instance.configs.jooq.generated.enums.ActorType;
@@ -54,6 +53,8 @@ class MetricRepositoryTest {
5453
private static final String DEST = "dst";
5554
private static final String CONN = "conn";
5655
private static final String SYNC_QUEUE = "SYNC";
56+
private static final String AWS_SYNC_QUEUE = "AWS_PARIS_SYNC";
57+
private static final String AUTO_REGION = "AUTO";
5758
private static final String EU_REGION = "EU";
5859

5960
private static final UUID SRC_DEF_ID = UUID.randomUUID();
@@ -114,7 +115,7 @@ class NumJobs {
114115
void shouldReturnReleaseStages() {
115116
ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.PROCESSING_TASK_QUEUE)
116117
.values(10L, 1L, AttemptStatus.running, SYNC_QUEUE).values(20L, 2L, AttemptStatus.running, SYNC_QUEUE)
117-
.values(30L, 3L, AttemptStatus.running, SYNC_QUEUE).values(40L, 4L, AttemptStatus.running, SYNC_QUEUE)
118+
.values(30L, 3L, AttemptStatus.running, SYNC_QUEUE).values(40L, 4L, AttemptStatus.running, AWS_SYNC_QUEUE)
118119
.values(50L, 5L, AttemptStatus.running, SYNC_QUEUE)
119120
.execute();
120121
final var srcId = UUID.randomUUID();
@@ -138,7 +139,10 @@ void shouldReturnReleaseStages() {
138139
.values(5L, inactiveConnectionId.toString(), JobStatus.running)
139140
.execute();
140141

141-
assertEquals(2, db.numberOfRunningJobsByTaskQueue().get(SYNC_QUEUE));
142+
assertEquals(1, db.numberOfRunningJobsByTaskQueue().get(SYNC_QUEUE));
143+
assertEquals(1, db.numberOfRunningJobsByTaskQueue().get(AWS_SYNC_QUEUE));
144+
// To test we send 0 for 'null' to overwrite previous bug.
145+
assertEquals(0, db.numberOfRunningJobsByTaskQueue().get("null"));
142146
assertEquals(1, db.numberOfOrphanRunningJobs());
143147
}
144148

@@ -148,8 +152,9 @@ void runningJobsShouldReturnZero() throws SQLException {
148152
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute();
149153
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute();
150154

151-
final var res = db.numberOfRunningJobsByTaskQueue();
152-
assertTrue(res.isEmpty());
155+
final var result = db.numberOfRunningJobsByTaskQueue();
156+
assertEquals(result.get(SYNC_QUEUE), 0);
157+
assertEquals(result.get(AWS_SYNC_QUEUE), 0);
153158
}
154159

155160
@Test
@@ -173,6 +178,7 @@ void pendingJobsShouldReturnCorrectCount() throws SQLException {
173178

174179
final var res = db.numberOfPendingJobsByGeography();
175180
assertEquals(2, res.get(EU_REGION));
181+
assertEquals(0, res.get(AUTO_REGION));
176182
}
177183

178184
@Test
@@ -192,8 +198,9 @@ void pendingJobsShouldReturnZero() throws SQLException {
192198
.values(2L, connectionUuid.toString(), JobStatus.failed)
193199
.execute();
194200

195-
final var res = db.numberOfPendingJobsByGeography();
196-
assertTrue(res.isEmpty());
201+
final var result = db.numberOfPendingJobsByGeography();
202+
assertEquals(result.get(AUTO_REGION), 0);
203+
assertEquals(result.get(EU_REGION), 0);
197204
}
198205

199206
}
@@ -248,8 +255,9 @@ void shouldReturnNothingIfNotApplicable() {
248255
.values(2L, connectionUuid.toString(), JobStatus.running)
249256
.values(3L, connectionUuid.toString(), JobStatus.failed).execute();
250257

251-
final var res = db.oldestPendingJobAgeSecsByGeography();
252-
assertTrue(res.isEmpty());
258+
final var result = db.oldestPendingJobAgeSecsByGeography();
259+
assertEquals(result.get(EU_REGION), 0.0);
260+
assertEquals(result.get(AUTO_REGION), 0.0);
253261
}
254262

255263
}
@@ -277,10 +285,10 @@ void shouldReturnOnlyRunningSeconds() {
277285
.values(4L, "", JobStatus.failed)
278286
.execute();
279287

280-
final var result = Iterators.getOnlyElement(db.oldestRunningJobAgeSecsByTaskQueue().entrySet().iterator());
281-
assertEquals(SYNC_QUEUE, result.getKey());
288+
final var result = db.oldestRunningJobAgeSecsByTaskQueue();
282289
// expected age is 1000 seconds, but allow for +/- 1 second to account for timing/rounding errors
283-
assertTrue(9999 < result.getValue() && result.getValue() < 10001L);
290+
assertTrue(9999 < result.get(SYNC_QUEUE) && result.get(SYNC_QUEUE) < 10001L);
291+
assertEquals(result.get(AWS_SYNC_QUEUE), 0.0);
284292
}
285293

286294
@Test
@@ -293,8 +301,9 @@ void shouldReturnNothingIfNotApplicable() {
293301
.values(3L, "", JobStatus.failed)
294302
.execute();
295303

296-
final var res = db.oldestRunningJobAgeSecsByTaskQueue();
297-
assertTrue(res.isEmpty());
304+
final var result = db.oldestRunningJobAgeSecsByTaskQueue();
305+
assertEquals(result.get(SYNC_QUEUE), 0.0);
306+
assertEquals(result.get(AWS_SYNC_QUEUE), 0.0);
298307
}
299308

300309
}

0 commit comments

Comments
 (0)