Skip to content

Commit 4b73bb0

Browse files
authored
Move to DogStatsD. (#10238)
We want to move to DogStatsD to make it easier to do more complex metrics collection. The official DD library also does not double count metrics so using this is cheaper overall. Unfortunately unlike Prometheus, there is no good way to test that metrics are being reported in an automated test so I've omitted that for now. I've tested this manually in the dev env to make sure metrics are being emitted.
1 parent 910c598 commit 4b73bb0

File tree

6 files changed

+145
-85
lines changed

6 files changed

+145
-85
lines changed

airbyte-metrics/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ dependencies {
1010
implementation 'io.prometheus:simpleclient_hotspot:0.12.0' // basic client instrumentation
1111
implementation 'io.prometheus:simpleclient_httpserver:0.12.0' // basic server to serve prometheus port
1212
implementation 'io.prometheus:simpleclient_pushgateway:0.12.0' // push libs for basic server
13+
14+
implementation 'com.datadoghq:java-dogstatsd-client:4.0.0'
1315
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.metrics;
6+
7+
import com.timgroup.statsd.NonBlockingStatsDClientBuilder;
8+
import com.timgroup.statsd.StatsDClient;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
/**
12+
* Light wrapper around the DogsStatsD client to make using the client slightly more ergonomic.
13+
* <p>
14+
* This class mainly exists to help Airbyte instrument/debug application on Airbyte Cloud.
15+
* <p>
16+
* Open source users are free to turn this on and consume the same metrics.
17+
*/
18+
@Slf4j
19+
public class DogstatsdMetricSingleton {
20+
21+
private static DogstatsdMetricSingleton instance;
22+
private final StatsDClient statsDClient;
23+
private final boolean instancePublish;
24+
25+
public DogstatsdMetricSingleton(final String appName, final boolean publish) {
26+
instancePublish = publish;
27+
statsDClient = new NonBlockingStatsDClientBuilder()
28+
.prefix(appName)
29+
.hostname(System.getenv("DD_AGENT_HOST"))
30+
.port(Integer.parseInt(System.getenv("DD_DOGSTATSD_PORT")))
31+
.build();
32+
}
33+
34+
public static synchronized DogstatsdMetricSingleton getInstance() {
35+
if (instance == null) {
36+
throw new RuntimeException("You must initialize configuration with the initialize() method before getting an instance.");
37+
}
38+
return instance;
39+
}
40+
41+
public synchronized static void initialize(final String appName, final boolean publish) {
42+
if (instance != null) {
43+
throw new RuntimeException("You cannot initialize configuration more than once.");
44+
}
45+
if (publish) {
46+
log.info("Starting DogStatsD client..");
47+
// The second constructor argument ('true') makes this server start as a separate daemon thread.
48+
// http://prometheus.github.io/client_java/io/prometheus/client/exporter/HTTPServer.html#HTTPServer-int-boolean-
49+
instance = new DogstatsdMetricSingleton(appName, publish);
50+
}
51+
}
52+
53+
/**
54+
* Increment or decrement a counter.
55+
*
56+
* @param name of counter.
57+
* @param amt to adjust.
58+
* @param tags
59+
*/
60+
public void count(final String name, final double amt, final String... tags) {
61+
if (instancePublish) {
62+
log.info("publishing count, name: {}, value: {}", name, amt);
63+
statsDClient.count(name, amt, tags);
64+
}
65+
}
66+
67+
/**
68+
* Record the latest value for a gauge.
69+
*
70+
* @param name of gauge.
71+
* @param val to record.
72+
* @param tags
73+
*/
74+
public void gauge(final String name, final double val, final String... tags) {
75+
if (instancePublish) {
76+
log.info("publishing gauge, name: {}, value: {}", name, val);
77+
statsDClient.gauge(name, val, tags);
78+
}
79+
}
80+
81+
/**
82+
* Submit a single execution time aggregated locally by the Agent. Use this if approximate stats are
83+
* sufficient.
84+
*
85+
* @param name of histogram.
86+
* @param val of time to record.
87+
* @param tags
88+
*/
89+
public void recordTimeLocal(final String name, final double val, final String... tags) {
90+
if (instancePublish) {
91+
log.info("recording histogram, name: {}, value: {}", name, val);
92+
statsDClient.histogram(name, val, tags);
93+
}
94+
}
95+
96+
/**
97+
* Submit a single execution time aggregated globally by Datadog. Use this for precise stats.
98+
*
99+
* @param name of distribution.
100+
* @param val of time to record.
101+
* @param tags
102+
*/
103+
public void recordTimeGlobal(final String name, final double val, final String... tags) {
104+
if (instancePublish) {
105+
log.info("recording distribution, name: {}, value: {}", name, val);
106+
statsDClient.distribution(name, val, tags);
107+
}
108+
}
109+
110+
/**
111+
* Wrapper of {@link #recordTimeGlobal(String, double, String...)} with a runnable for convenience.
112+
*
113+
* @param name
114+
* @param runnable
115+
* @param tags
116+
*/
117+
public void recordTimeGlobal(final String name, final Runnable runnable, final String... tags) {
118+
final long start = System.currentTimeMillis();
119+
runnable.run();
120+
final long end = System.currentTimeMillis();
121+
final long val = end - start;
122+
recordTimeGlobal(name, val, tags);
123+
}
124+
125+
}

airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusBaseExample.java

-69
This file was deleted.

airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java renamed to airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusMetricSingleton.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,21 @@
2727
* <p>
2828
* Open source users are free to turn this on and consume the same metrics.
2929
*/
30-
public class MetricSingleton {
30+
@Deprecated
31+
public class PrometheusMetricSingleton {
3132

32-
private static final Logger LOGGER = LoggerFactory.getLogger(MetricSingleton.class);
33-
private static MetricSingleton instance;
33+
private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusMetricSingleton.class);
34+
private static PrometheusMetricSingleton instance;
3435

3536
private final Map<String, Gauge> nameToGauge = new HashMap<>();
3637
private final Map<String, Counter> nameToCounter = new HashMap<>();
3738
private final Map<String, Histogram> nameToHistogram = new HashMap<>();
3839

3940
private HTTPServer monitoringDaemon;
4041

41-
private MetricSingleton() {}
42+
private PrometheusMetricSingleton() {}
4243

43-
public static synchronized MetricSingleton getInstance() {
44+
public static synchronized PrometheusMetricSingleton getInstance() {
4445
if (instance == null) {
4546
throw new RuntimeException("You must initialize configuration with the initializeMonitoringServiceDaemon() method before getting an instance.");
4647
}
@@ -193,7 +194,7 @@ public synchronized static void initializeMonitoringServiceDaemon(final String m
193194
if (instance != null) {
194195
throw new RuntimeException("You cannot initialize configuration more than once.");
195196
}
196-
instance = new MetricSingleton();
197+
instance = new PrometheusMetricSingleton();
197198
if (publish) {
198199
try {
199200
MDC.setContextMap(mdc);

airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java renamed to airbyte-metrics/src/test/java/io/airbyte/metrics/PrometheusMetricSingletonTest.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* Since Prometheus publishes metrics at a specific port, we can test our wrapper by querying the
2626
* port and validating the response.
2727
*/
28-
public class MetricSingletonTest {
28+
public class PrometheusMetricSingletonTest {
2929

3030
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
3131
.version(HttpClient.Version.HTTP_1_1)
@@ -41,25 +41,26 @@ public static void setUp() throws IOException {
4141
availPort = socket.getLocalPort();
4242
}
4343

44-
MetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true);
44+
PrometheusMetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true);
4545
}
4646

4747
@AfterAll
4848
public static void tearDown() {
49-
MetricSingleton.getInstance().closeMonitoringServiceDaemon();
49+
PrometheusMetricSingleton.getInstance().closeMonitoringServiceDaemon();
5050
}
5151

5252
@Nested
5353
class Validation {
5454

5555
@Test
5656
public void testNameWithDashFails() {
57-
assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed"));
57+
assertThrows(RuntimeException.class,
58+
() -> PrometheusMetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed"));
5859
}
5960

6061
@Test
6162
public void testNoDescriptionFails() {
62-
assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("good_name", 0.0, null));
63+
assertThrows(RuntimeException.class, () -> PrometheusMetricSingleton.getInstance().incrementCounter("good_name", 0.0, null));
6364
}
6465

6566
}
@@ -69,7 +70,7 @@ public void testCounter() throws InterruptedException, IOException {
6970
final var metricName = "test_counter";
7071
final var rand = new Random();
7172
for (int i = 0; i < 5; i++) {
72-
MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter");
73+
PrometheusMetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter");
7374
Thread.sleep(500);
7475
}
7576

@@ -82,7 +83,7 @@ public void testGauge() throws InterruptedException, IOException {
8283
final var metricName = "test_gauge";
8384
final var rand = new Random();
8485
for (int i = 0; i < 5; i++) {
85-
MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge");
86+
PrometheusMetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge");
8687
Thread.sleep(500);
8788
}
8889

@@ -95,7 +96,7 @@ public void testTimer() throws InterruptedException, IOException {
9596
final var metricName = "test_timer";
9697
final var rand = new Random();
9798
for (int i = 0; i < 5; i++) {
98-
MetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time");
99+
PrometheusMetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time");
99100
Thread.sleep(500);
100101
}
101102

airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import io.airbyte.db.Database;
3030
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
3131
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
32-
import io.airbyte.metrics.MetricSingleton;
32+
import io.airbyte.metrics.PrometheusMetricSingleton;
3333
import io.airbyte.scheduler.models.Job;
3434
import io.airbyte.scheduler.models.JobStatus;
3535
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
@@ -273,7 +273,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
273273
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot, configs);
274274

275275
final Map<String, String> mdc = MDC.getCopyOfContextMap();
276-
MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics());
276+
PrometheusMetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics());
277277

278278
LOGGER.info("Launching scheduler...");
279279
new SchedulerApp(

0 commit comments

Comments
 (0)