|
6 | 6 |
|
7 | 7 | import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR;
|
8 | 8 | import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
|
| 9 | +import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION; |
| 10 | +import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; |
| 11 | +import static org.jooq.impl.SQLDataType.VARCHAR; |
9 | 12 |
|
10 | 13 | import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
|
| 14 | +import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType; |
| 15 | +import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus; |
| 16 | +import java.util.ArrayList; |
11 | 17 | import java.util.List;
|
12 | 18 | import java.util.UUID;
|
13 | 19 | import lombok.extern.slf4j.Slf4j;
|
| 20 | +import org.apache.commons.lang3.tuple.ImmutablePair; |
| 21 | +import org.apache.commons.lang3.tuple.Pair; |
14 | 22 | import org.jooq.DSLContext;
|
15 | 23 |
|
16 | 24 | /**
|
@@ -49,4 +57,173 @@ public static List<ReleaseStage> srcIdAndDestIdToReleaseStages(final DSLContext
|
49 | 57 | .or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE);
|
50 | 58 | }
|
51 | 59 |
|
| 60 | + public static int numberOfPendingJobs(final DSLContext ctx) { |
| 61 | + return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.pending)).fetchOne(0, int.class); |
| 62 | + } |
| 63 | + |
| 64 | + public static int numberOfRunningJobs(final DSLContext ctx) { |
| 65 | + return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE)) |
| 66 | + .where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.eq(StatusType.active))) |
| 67 | + .fetchOne(0, int.class); |
| 68 | + } |
| 69 | + |
| 70 | + public static int numberOfOrphanRunningJobs(final DSLContext ctx) { |
| 71 | + return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE)) |
| 72 | + .where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.ne(StatusType.active))) |
| 73 | + .fetchOne(0, int.class); |
| 74 | + } |
| 75 | + |
| 76 | + public static Long oldestPendingJobAgeSecs(final DSLContext ctx) { |
| 77 | + return oldestJobAgeSecs(ctx, JobStatus.pending); |
| 78 | + } |
| 79 | + |
| 80 | + public static Long oldestRunningJobAgeSecs(final DSLContext ctx) { |
| 81 | + return oldestJobAgeSecs(ctx, JobStatus.running); |
| 82 | + } |
| 83 | + |
| 84 | + private static Long oldestJobAgeSecs(final DSLContext ctx, final JobStatus status) { |
| 85 | + final var readableTimeField = "run_duration"; |
| 86 | + final var durationSecField = "run_duration_secs"; |
| 87 | + final var query = String.format(""" |
| 88 | + WITH |
| 89 | + oldest_job AS ( |
| 90 | + SELECT id, |
| 91 | + age(current_timestamp, created_at) AS %s |
| 92 | + FROM jobs |
| 93 | + WHERE status = '%s' |
| 94 | + ORDER BY run_duration DESC |
| 95 | + LIMIT 1) |
| 96 | + SELECT id, |
| 97 | + run_duration, |
| 98 | + extract(epoch from run_duration) as %s |
| 99 | + FROM oldest_job""", readableTimeField, status.getLiteral(), durationSecField); |
| 100 | + final var res = ctx.fetch(query); |
| 101 | + // unfortunately there are no good Jooq methods for retrieving a single record of a single column |
| 102 | + // forcing the List cast. |
| 103 | + final var duration = res.getValues(durationSecField, Double.class); |
| 104 | + |
| 105 | + if (duration.size() == 0) { |
| 106 | + return 0L; |
| 107 | + } |
| 108 | + // .get(0) works in the following code due to the query's SELECT 1. |
| 109 | + final var id = res.getValues("id", String.class).get(0); |
| 110 | + final var readableTime = res.getValues(readableTimeField, String.class).get(0); |
| 111 | + log.info("oldest job information - id: {}, readable time: {}", id, readableTime); |
| 112 | + |
| 113 | + // as double can have rounding errors, round down to remove noise. |
| 114 | + return duration.get(0).longValue(); |
| 115 | + } |
| 116 | + |
| 117 | + public static List<Long> numberOfActiveConnPerWorkspace(final DSLContext ctx) { |
| 118 | + final var countField = "num_conn"; |
| 119 | + final var query = String.format(""" |
| 120 | + SELECT workspace_id, count(c.id) as %s |
| 121 | + FROM actor |
| 122 | + INNER JOIN workspace ws ON actor.workspace_id = ws.id |
| 123 | + INNER JOIN connection c ON actor.id = c.source_id |
| 124 | + WHERE ws.tombstone = false |
| 125 | + AND actor.tombstone = false AND actor.actor_type = 'source' |
| 126 | + AND c.status = 'active' |
| 127 | + GROUP BY workspace_id;""", countField); |
| 128 | + return ctx.fetch(query).getValues(countField, long.class); |
| 129 | + } |
| 130 | + |
| 131 | + public static List<Pair<JobStatus, Double>> overallJobRuntimeForTerminalJobsInLastHour(final DSLContext ctx) { |
| 132 | + final var statusField = "status"; |
| 133 | + final var timeField = "sec"; |
| 134 | + final var query = |
| 135 | + String.format(""" |
| 136 | + SELECT %s, extract(epoch from age(updated_at, created_at)) AS %s FROM jobs |
| 137 | + WHERE updated_at >= NOW() - INTERVAL '1 HOUR' |
| 138 | + AND (jobs.status = 'failed' OR jobs.status = 'succeeded' OR jobs.status = 'cancelled');""", statusField, timeField); |
| 139 | + final var statuses = ctx.fetch(query).getValues(statusField, JobStatus.class); |
| 140 | + final var times = ctx.fetch(query).getValues(timeField, double.class); |
| 141 | + |
| 142 | + final var pairedRes = new ArrayList<Pair<JobStatus, Double>>(); |
| 143 | + for (int i = 0; i < statuses.size(); i++) { |
| 144 | + final var pair = new ImmutablePair<>(statuses.get(i), times.get(i)); |
| 145 | + pairedRes.add(pair); |
| 146 | + } |
| 147 | + |
| 148 | + return pairedRes; |
| 149 | + } |
| 150 | + |
| 151 | + /* |
| 152 | + * A connection that is not running on schedule is defined in last 24 hours if the number of runs |
| 153 | + * are not matching with the number of expected runs according to the schedule settings. Refer to |
| 154 | + * runbook for detailed discussion. |
| 155 | + * |
| 156 | + */ |
| 157 | + public static Long numberOfJobsNotRunningOnScheduleInLastDay(final DSLContext ctx) { |
| 158 | + final var countField = "cnt"; |
| 159 | + // This query finds all sync jobs ran in last 24 hours and count how many times they have run. |
| 160 | + // Comparing this to the expected number of runs (24 hours divide by configured cadence in hours), |
| 161 | + // if it runs below that expected number it will be considered as abnormal instance. |
| 162 | + // For example, if it's configured to run every 6 hours but in last 24 hours it only has 3 runs, |
| 163 | + // it will be considered as 1 abnormal instance. |
| 164 | + final var queryForAbnormalSyncInHoursInLastDay = |
| 165 | + String.format(""" |
| 166 | + select count(1) as %s |
| 167 | + from |
| 168 | + ( |
| 169 | + select |
| 170 | + c.id, |
| 171 | + count(*) as cnt |
| 172 | + from |
| 173 | + connection c |
| 174 | + left join Jobs j on |
| 175 | + j.scope::uuid = c.id |
| 176 | + where |
| 177 | + c.schedule is not null |
| 178 | + and c.schedule != 'null' |
| 179 | + and j.created_at > now() - interval '24 hours 1 minutes' |
| 180 | + and c.status = 'active' |
| 181 | + and j.config_type = 'sync' |
| 182 | + and c.updated_at < now() - interval '24 hours 1 minutes' |
| 183 | + and cast(c.schedule::jsonb->'timeUnit' as text) = '"hours"' |
| 184 | + group by 1 |
| 185 | + having count(*) < 24 / cast(c.schedule::jsonb->'units' as integer)) as abnormal_jobs |
| 186 | + """, countField); |
| 187 | + |
| 188 | + // Similar to the query above, this finds if the connection cadence's timeUnit is minutes. |
| 189 | + // thus we use 1440 (=24 hours x 60 minutes) to divide the configured cadence. |
| 190 | + final var queryForAbnormalSyncInMinutesInLastDay = |
| 191 | + String.format(""" |
| 192 | + select count(1) as %s from ( |
| 193 | + select |
| 194 | + c.id, |
| 195 | + count(*) as cnt |
| 196 | + from |
| 197 | + connection c |
| 198 | + left join Jobs j on |
| 199 | + j.scope::uuid = c.id |
| 200 | + where |
| 201 | + c.schedule is not null |
| 202 | + and c.schedule != 'null' |
| 203 | + and j.created_at > now() - interval '24 hours 1 minutes' |
| 204 | + and c.status = 'active' |
| 205 | + and j.config_type = 'sync' |
| 206 | + and c.updated_at < now() - interval '24 hours 1 minutes' |
| 207 | + and cast(c.schedule::jsonb->'timeUnit' as text) = '"minutes"' |
| 208 | + group by 1 |
| 209 | + having count(*) < 1440 / cast(c.schedule::jsonb->'units' as integer)) as abnormal_jobs |
| 210 | + """, countField); |
| 211 | + return ctx.fetch(queryForAbnormalSyncInHoursInLastDay).getValues(countField, long.class).get(0) |
| 212 | + + ctx.fetch(queryForAbnormalSyncInMinutesInLastDay).getValues(countField, long.class).get(0); |
| 213 | + } |
| 214 | + |
| 215 | + public static Long numScheduledActiveConnectionsInLastDay(final DSLContext ctx) { |
| 216 | + final var countField = "cnt"; |
| 217 | + final var queryForTotalConnections = String.format(""" |
| 218 | + select count(1) as %s |
| 219 | + from connection c |
| 220 | + where |
| 221 | + c.updated_at < now() - interval '24 hours 1 minutes' |
| 222 | + and cast(c.schedule::jsonb->'timeUnit' as text) IN ('"hours"', '"minutes"') |
| 223 | + and c.status = 'active' |
| 224 | + """, countField); |
| 225 | + |
| 226 | + return ctx.fetch(queryForTotalConnections).getValues(countField, long.class).get(0); |
| 227 | + } |
| 228 | + |
52 | 229 | }
|
0 commit comments