Skip to content

Make MetricsRegistry as an interface #12487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void count(final MetricsRegistry metric, final double amt, final S
}

log.info("publishing count, name: {}, value: {}, tags: {}", metric, amt, tags);
statsDClient.count(metric.metricName, amt, tags);
statsDClient.count(metric.getMetricName(), amt, tags);
}
}

Expand All @@ -91,7 +91,7 @@ public static void gauge(final MetricsRegistry metric, final double val, final S
}

log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.gauge(metric.metricName, val, tags);
statsDClient.gauge(metric.getMetricName(), val, tags);
}
}

Expand All @@ -118,7 +118,7 @@ public static void recordTimeLocal(final MetricsRegistry metric, final double va
}

log.info("recording histogram, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.histogram(metric.metricName, val, tags);
statsDClient.histogram(metric.getMetricName(), val, tags);
}
}

Expand All @@ -139,7 +139,7 @@ public static void recordTimeGlobal(final MetricsRegistry metric, final double v
}

log.info("recording distribution, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.distribution(metric.metricName, val, tags);
statsDClient.distribution(metric.getMetricName(), val, tags);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,106 +4,16 @@

package io.airbyte.metrics.lib;

import com.google.api.client.util.Preconditions;

/**
* Enum source of truth of all Airbyte metrics. Each enum value represent a metric and is linked to
* an application and contains a description to make it easier to understand.
*
* Each object of the enum actually represent a metric, so the Registry name is misleading. The
* reason 'Registry' is in the name is to emphasize this enum's purpose as a source of truth for all
* metrics. This also helps code readability i.e. AirbyteMetricsRegistry.metricA.
*
* Metric Name Convention (adapted from
* https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/):
* <p>
* - Use lowercase. Metric names are case sensitive.
* <p>
* - Use underscore to delimit names with multiple words.
* <p>
* - No spaces. This makes the metric confusing to read.
* <p>
* - Avoid numbers. This makes the metric confusing to read. Numbers should only be used as a
* <p>
* - Add units at name end if applicable. This is especially relevant for time units.
* <p>
* - Include the time period in the name if the metric is meant to be run at a certain interval.
* Interface representing metrics collected an Airbyte Application. This interface is present as
* Java doesn't support enum inheritance as of Java 17.
*/
public enum MetricsRegistry {

ATTEMPT_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_created_by_release_stage",
"increments when a new attempt is created. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_failed_by_release_stage",
"increments when an attempt fails. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_FAILURE_ORIGIN(
MetricEmittingApps.WORKER,
"attempt_failed_by_failure_origin",
"increments for every failure origin a failed attempt has. since a failure can have multiple origins, a single failure can be counted more than once. tagged by failure origin."),
ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_succeeded_by_release_stage",
"increments when an attempts succeeds. attempts are double counted as this is tagged by release stage."),
EST_NUM_METRICS_EMITTED_BY_REPORTER(
MetricEmittingApps.METRICS_REPORTER,
"est_num_metrics_emitted_by_reporter",
"estimated metrics emitted by the reporter in the last interval. this is estimated since the count is not precise."),
JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
"increments when a job is cancelled. jobs are double counted as this is tagged by release stage."),
JOB_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_created_by_release_stage",
"increments when a new job is created. jobs are double counted as this is tagged by release stage."),
JOB_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_failed_by_release_stage",
"increments when a job fails. jobs are double counted as this is tagged by release stage."),
JOB_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
NUM_PENDING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_pending_jobs",
"number of pending jobs"),
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_running_jobs",
"number of running jobs"),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
OLDEST_RUNNING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_running_job_age_secs",
"oldest running job in seconds"),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states.");
public interface MetricsRegistry {

public final MetricEmittingApp application;
public final String metricName;
public final String metricDescription;
MetricEmittingApp getApplication();

MetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
Preconditions.checkNotNull(metricDescription);
Preconditions.checkNotNull(application);
String getMetricName();

this.application = application;
this.metricName = metricName;
this.metricDescription = metricDescription;
}
String getMetricDescription();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import com.google.api.client.util.Preconditions;

/**
* Enum source of truth of all Airbyte metrics. Each enum value represent a metric and is linked to
* an application and contains a description to make it easier to understand.
*
* Each object of the enum actually represent a metric, so the Registry name is misleading. The
* reason 'Registry' is in the name is to emphasize this enum's purpose as a source of truth for all
* metrics. This also helps code readability i.e. AirbyteMetricsRegistry.metricA.
*
* Metric Name Convention (adapted from
* https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/):
* <p>
* - Use lowercase. Metric names are case sensitive.
* <p>
* - Use underscore to delimit names with multiple words.
* <p>
* - No spaces. This makes the metric confusing to read.
* <p>
* - Avoid numbers. This makes the metric confusing to read. Numbers should only be used as a
* <p>
* - Add units at name end if applicable. This is especially relevant for time units.
* <p>
* - Include the time period in the name if the metric is meant to be run at a certain interval.
*/
public enum OssMetricsRegistry implements MetricsRegistry {

ATTEMPT_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_created_by_release_stage",
"increments when a new attempt is created. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_failed_by_release_stage",
"increments when an attempt fails. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_FAILURE_ORIGIN(
MetricEmittingApps.WORKER,
"attempt_failed_by_failure_origin",
"increments for every failure origin a failed attempt has. since a failure can have multiple origins, a single failure can be counted more than once. tagged by failure origin."),
ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_succeeded_by_release_stage",
"increments when an attempts succeeds. attempts are double counted as this is tagged by release stage."),
EST_NUM_METRICS_EMITTED_BY_REPORTER(
MetricEmittingApps.METRICS_REPORTER,
"est_num_metrics_emitted_by_reporter",
"estimated metrics emitted by the reporter in the last interval. this is estimated since the count is not precise."),
JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
"increments when a job is cancelled. jobs are double counted as this is tagged by release stage."),
JOB_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_created_by_release_stage",
"increments when a new job is created. jobs are double counted as this is tagged by release stage."),
JOB_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_failed_by_release_stage",
"increments when a job fails. jobs are double counted as this is tagged by release stage."),
JOB_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
NUM_PENDING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_pending_jobs",
"number of pending jobs"),
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_running_jobs",
"number of running jobs"),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
OLDEST_RUNNING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_running_job_age_secs",
"oldest running job in seconds"),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states.");

private final MetricEmittingApp application;
private final String metricName;
private final String metricDescription;

OssMetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
Preconditions.checkNotNull(metricDescription);
Preconditions.checkNotNull(application);

this.application = application;
this.metricName = metricName;
this.metricDescription = metricDescription;
}

@Override
public MetricEmittingApp getApplication() {
return application;
}

@Override
public String getMetricName() {
return metricName;
}

@Override
public String getMetricDescription() {
return metricDescription;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void tearDown() {
public void testPublishTrueNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", false));
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand All @@ -30,15 +30,15 @@ public void testPublishTrueNoEmitError() {
public void testPublishFalseNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", true));
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

@Test
@DisplayName("there should be no exception if we attempt to emit metrics without initializing")
public void testNoInitializeNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.MetricsRegistry;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -24,31 +24,31 @@ public enum ToEmit {

NUM_PENDING_JOBS(countMetricEmission(() -> {
final var pendingJobs = ReporterApp.configDatabase.query(MetricQueries::numberOfPendingJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_PENDING_JOBS, pendingJobs);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.NUM_PENDING_JOBS, pendingJobs);
})),
NUM_RUNNING_JOBS(countMetricEmission(() -> {
final var runningJobs = ReporterApp.configDatabase.query(MetricQueries::numberOfRunningJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_RUNNING_JOBS, runningJobs);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.NUM_RUNNING_JOBS, runningJobs);
})),
OLDEST_RUNNING_JOB_AGE_SECS(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::oldestRunningJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age);
})),
OLDEST_PENDING_JOB_AGE_SECS(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::oldestPendingJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age);
})),
NUM_ACTIVE_CONN_PER_WORKSPACE(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::numberOfActiveConnPerWorkspace);
for (long count : age) {
DogStatsDMetricSingleton.percentile(MetricsRegistry.NUM_ACTIVE_CONN_PER_WORKSPACE, count);
DogStatsDMetricSingleton.percentile(OssMetricsRegistry.NUM_ACTIVE_CONN_PER_WORKSPACE, count);
}
})),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(countMetricEmission(() -> {
final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour);
for (Pair<JobStatus, Double> pair : times) {
DogStatsDMetricSingleton.recordTimeGlobal(
MetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), MetricTags.getJobStatus(pair.getLeft()));
OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), MetricTags.getJobStatus(pair.getLeft()));
}
}), 1, TimeUnit.HOURS);

Expand All @@ -72,7 +72,7 @@ private static Runnable countMetricEmission(Procedure metricQuery) {
return () -> {
try {
metricQuery.call();
DogStatsDMetricSingleton.count(MetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
DogStatsDMetricSingleton.count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
} catch (Exception e) {
log.error("Exception querying database for metric: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.TolerationPOJO;
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricsRegistry;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
Expand Down Expand Up @@ -555,7 +555,7 @@ public KubePodProcess(final boolean isOrchestrator,
final boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p);
return isReady || KubePodResourceHelper.isTerminal(p);
}, 20, TimeUnit.MINUTES);
DogStatsDMetricSingleton.recordTimeGlobal(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start);
DogStatsDMetricSingleton.recordTimeGlobal(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start);

// allow writing stdin to pod
LOGGER.info("Reading pod IP...");
Expand Down
Loading