Skip to content

Query and monitor if we have skipped any scheduled jobs #15256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e31eb19
add error code to ManualOperationResult
xiaohansong Jul 12, 2022
ff9b519
fix a bug
xiaohansong Jul 13, 2022
f26a0b2
support temporal metrics
xiaohansong Jul 14, 2022
f872225
metrics in temporal
xiaohansong Jul 14, 2022
4e651ed
use statsd
xiaohansong Jul 14, 2022
f56233a
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 15, 2022
586aad9
wrap otel config to temporal metric export
xiaohansong Jul 15, 2022
5bf4e67
use http port 4318 for otlp exporter
xiaohansong Jul 18, 2022
33a18f5
simpilfy to support dd only
xiaohansong Jul 18, 2022
85a56d2
use /v1/metrics for endpoint
xiaohansong Jul 18, 2022
43bee0c
use statsd
xiaohansong Jul 19, 2022
1056890
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 19, 2022
1a113d8
fix
xiaohansong Jul 19, 2022
413cbce
remove unused func
xiaohansong Jul 19, 2022
e29815a
wrap up implementation to export temporal metrics to datadog
xiaohansong Jul 19, 2022
0453ac7
use deps.toml to wrap up the dependency
xiaohansong Jul 19, 2022
9d5b795
Merge branch 'master' into xiaohan/goofing
xiaohansong Jul 19, 2022
3666dd5
move to metric client factory
xiaohansong Jul 27, 2022
1eb22dc
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 27, 2022
ac97b12
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 28, 2022
b1d1984
fix pmd error
xiaohansong Jul 28, 2022
acf58da
pmd, comment fix
xiaohansong Jul 28, 2022
1b7fbf4
pr comment fix
xiaohansong Aug 1, 2022
12a422b
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Aug 1, 2022
9cae9bb
add a new metric to observe abnormal scheduled sycns
xiaohansong Aug 3, 2022
4732604
Merge remote-tracking branch 'origin/master' into xiaohan/metricsync
xiaohansong Aug 3, 2022
784ee2a
formatting
xiaohansong Aug 3, 2022
9bdcfb9
add javadoc
xiaohansong Aug 8, 2022
7cb8e08
Merge branch 'master' into xiaohan/metricsync
xiaohansong Aug 8, 2022
d3585ce
format fix
xiaohansong Aug 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,49 @@ public static List<Pair<JobStatus, Double>> overallJobRuntimeForTerminalJobsInLa
return pairedRes;
}

/*
* A connection that is not running on schedule is defined in last 24 hours if the number of runs
* are not matching with the number of expected runs according to the schedule settings. Refer to
* playbook for detailed discussion.
*/
public static Long numOfJobsNotRunningOnSchedule(final DSLContext ctx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any kind of testing for these queries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in unit test I'm afraid... but I ran these manually on dbeaver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, should we add a comment about what our logic is for determining if a job run was missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. I have a runbook coming in https://github.com/airbytehq/airbyte-cloud/pull/2272; I'll add a short summary here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok perfect, thanks!

final var countField = "cnt";
final var query = """
SELECT count(1) as cnt FROM ((
SELECT
c.id,
count(*) as cnt
FROM
connection c
LEFT JOIN Jobs j ON j.scope::uuid = c.id
WHERE
c.schedule IS NOT null
AND c.schedule != 'null'
AND j.created_at > now() - interval '24 hours 1 minutes'
AND c.status = 'active'
AND j.config_type = 'sync'
AND c.updated_at < now() - interval '24 hours 1 minutes'
AND cast(c.schedule::jsonb->'timeUnit' as text) = '"hours"'
GROUP BY 1
HAVING count(*) < 24 / cast(c.schedule::jsonb->'units' as integer))
UNION (
SELECT
c.id,
count(*) as cnt
FROM connection c
LEFT JOIN Jobs j ON j.scope::uuid = c.id
WHERE
c.schedule IS NOT null
AND c.schedule != 'null'
AND j.created_at > now() - interval '1 hours 1 minutes'
AND c.status = 'active'
AND j.config_type = 'sync'
AND c.updated_at < now() - interval '1 hours 1 minutes'
AND cast(c.schedule::jsonb->'timeUnit' as text) = '"minutes"'
GROUP BY 1
HAVING count(*) < 60 / cast(c.schedule::jsonb->'units' as integer))) as abnormal_sync_jobs
""";
return ctx.fetch(query).getValues(countField, long.class).get(0).longValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public enum OssMetricsRegistry implements MetricsRegistry {
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
NUM_ABNORMAL_SCHEDULED_SYNCS(
MetricEmittingApps.METRICS_REPORTER,
"num_abnormal_scheduled_syncs",
"number of abnormal syncs that have skipped at least 1 scheduled run recently."),
OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public enum ToEmit {
MetricClientFactory.getMetricClient().distribution(OssMetricsRegistry.NUM_ACTIVE_CONN_PER_WORKSPACE, count);
}
})),
NUM_ABNORMAL_SCHEDULED_SYNCS(countMetricEmission(() -> {
final var count = ReporterApp.configDatabase.query(MetricQueries::numOfJobsNotRunningOnSchedule);
MetricClientFactory.getMetricClient().gauge(OssMetricsRegistry.NUM_ABNORMAL_SCHEDULED_SYNCS, count);
})),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(countMetricEmission(() -> {
final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour);
for (Pair<JobStatus, Double> pair : times) {
Expand Down