|
6 | 6 |
|
7 | 7 | import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR;
|
8 | 8 | import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
|
9 |
| -import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION; |
10 |
| -import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; |
11 |
| -import static org.jooq.impl.SQLDataType.VARCHAR; |
12 | 9 |
|
13 | 10 | import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
|
14 |
| -import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType; |
15 |
| -import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus; |
16 |
| -import java.util.ArrayList; |
17 | 11 | import java.util.List;
|
18 | 12 | import java.util.UUID;
|
19 | 13 | import lombok.extern.slf4j.Slf4j;
|
20 |
| -import org.apache.commons.lang3.tuple.ImmutablePair; |
21 |
| -import org.apache.commons.lang3.tuple.Pair; |
22 | 14 | import org.jooq.DSLContext;
|
23 | 15 |
|
24 | 16 | /**
|
@@ -57,173 +49,4 @@ public static List<ReleaseStage> srcIdAndDestIdToReleaseStages(final DSLContext
|
57 | 49 | .or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE);
|
58 | 50 | }
|
59 | 51 |
|
60 |
| - public static int numberOfPendingJobs(final DSLContext ctx) { |
61 |
| - return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.pending)).fetchOne(0, int.class); |
62 |
| - } |
63 |
| - |
64 |
| - public static int numberOfRunningJobs(final DSLContext ctx) { |
65 |
| - return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE)) |
66 |
| - .where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.eq(StatusType.active))) |
67 |
| - .fetchOne(0, int.class); |
68 |
| - } |
69 |
| - |
70 |
| - public static int numberOfOrphanRunningJobs(final DSLContext ctx) { |
71 |
| - return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE)) |
72 |
| - .where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.ne(StatusType.active))) |
73 |
| - .fetchOne(0, int.class); |
74 |
| - } |
75 |
| - |
76 |
| - public static Long oldestPendingJobAgeSecs(final DSLContext ctx) { |
77 |
| - return oldestJobAgeSecs(ctx, JobStatus.pending); |
78 |
| - } |
79 |
| - |
80 |
| - public static Long oldestRunningJobAgeSecs(final DSLContext ctx) { |
81 |
| - return oldestJobAgeSecs(ctx, JobStatus.running); |
82 |
| - } |
83 |
| - |
84 |
| - private static Long oldestJobAgeSecs(final DSLContext ctx, final JobStatus status) { |
85 |
| - final var readableTimeField = "run_duration"; |
86 |
| - final var durationSecField = "run_duration_secs"; |
87 |
| - final var query = String.format(""" |
88 |
| - WITH |
89 |
| - oldest_job AS ( |
90 |
| - SELECT id, |
91 |
| - age(current_timestamp, created_at) AS %s |
92 |
| - FROM jobs |
93 |
| - WHERE status = '%s' |
94 |
| - ORDER BY run_duration DESC |
95 |
| - LIMIT 1) |
96 |
| - SELECT id, |
97 |
| - run_duration, |
98 |
| - extract(epoch from run_duration) as %s |
99 |
| - FROM oldest_job""", readableTimeField, status.getLiteral(), durationSecField); |
100 |
| - final var res = ctx.fetch(query); |
101 |
| - // unfortunately there are no good Jooq methods for retrieving a single record of a single column |
102 |
| - // forcing the List cast. |
103 |
| - final var duration = res.getValues(durationSecField, Double.class); |
104 |
| - |
105 |
| - if (duration.size() == 0) { |
106 |
| - return 0L; |
107 |
| - } |
108 |
| - // .get(0) works in the following code due to the query's SELECT 1. |
109 |
| - final var id = res.getValues("id", String.class).get(0); |
110 |
| - final var readableTime = res.getValues(readableTimeField, String.class).get(0); |
111 |
| - log.info("oldest job information - id: {}, readable time: {}", id, readableTime); |
112 |
| - |
113 |
| - // as double can have rounding errors, round down to remove noise. |
114 |
| - return duration.get(0).longValue(); |
115 |
| - } |
116 |
| - |
117 |
| - public static List<Long> numberOfActiveConnPerWorkspace(final DSLContext ctx) { |
118 |
| - final var countField = "num_conn"; |
119 |
| - final var query = String.format(""" |
120 |
| - SELECT workspace_id, count(c.id) as %s |
121 |
| - FROM actor |
122 |
| - INNER JOIN workspace ws ON actor.workspace_id = ws.id |
123 |
| - INNER JOIN connection c ON actor.id = c.source_id |
124 |
| - WHERE ws.tombstone = false |
125 |
| - AND actor.tombstone = false AND actor.actor_type = 'source' |
126 |
| - AND c.status = 'active' |
127 |
| - GROUP BY workspace_id;""", countField); |
128 |
| - return ctx.fetch(query).getValues(countField, long.class); |
129 |
| - } |
130 |
| - |
131 |
| - public static List<Pair<JobStatus, Double>> overallJobRuntimeForTerminalJobsInLastHour(final DSLContext ctx) { |
132 |
| - final var statusField = "status"; |
133 |
| - final var timeField = "sec"; |
134 |
| - final var query = |
135 |
| - String.format(""" |
136 |
| - SELECT %s, extract(epoch from age(updated_at, created_at)) AS %s FROM jobs |
137 |
| - WHERE updated_at >= NOW() - INTERVAL '1 HOUR' |
138 |
| - AND (jobs.status = 'failed' OR jobs.status = 'succeeded' OR jobs.status = 'cancelled');""", statusField, timeField); |
139 |
| - final var statuses = ctx.fetch(query).getValues(statusField, JobStatus.class); |
140 |
| - final var times = ctx.fetch(query).getValues(timeField, double.class); |
141 |
| - |
142 |
| - final var pairedRes = new ArrayList<Pair<JobStatus, Double>>(); |
143 |
| - for (int i = 0; i < statuses.size(); i++) { |
144 |
| - final var pair = new ImmutablePair<>(statuses.get(i), times.get(i)); |
145 |
| - pairedRes.add(pair); |
146 |
| - } |
147 |
| - |
148 |
| - return pairedRes; |
149 |
| - } |
150 |
| - |
151 |
| - /* |
152 |
| - * A connection that is not running on schedule is defined in last 24 hours if the number of runs |
153 |
| - * are not matching with the number of expected runs according to the schedule settings. Refer to |
154 |
| - * runbook for detailed discussion. |
155 |
| - * |
156 |
| - */ |
157 |
| - public static Long numberOfJobsNotRunningOnScheduleInLastDay(final DSLContext ctx) { |
158 |
| - final var countField = "cnt"; |
159 |
| - // This query finds all sync jobs ran in last 24 hours and count how many times they have run. |
160 |
| - // Comparing this to the expected number of runs (24 hours divide by configured cadence in hours), |
161 |
| - // if it runs below that expected number it will be considered as abnormal instance. |
162 |
| - // For example, if it's configured to run every 6 hours but in last 24 hours it only has 3 runs, |
163 |
| - // it will be considered as 1 abnormal instance. |
164 |
| - final var queryForAbnormalSyncInHoursInLastDay = |
165 |
| - String.format(""" |
166 |
| - select count(1) as %s |
167 |
| - from |
168 |
| - ( |
169 |
| - select |
170 |
| - c.id, |
171 |
| - count(*) as cnt |
172 |
| - from |
173 |
| - connection c |
174 |
| - left join Jobs j on |
175 |
| - j.scope::uuid = c.id |
176 |
| - where |
177 |
| - c.schedule is not null |
178 |
| - and c.schedule != 'null' |
179 |
| - and j.created_at > now() - interval '24 hours 1 minutes' |
180 |
| - and c.status = 'active' |
181 |
| - and j.config_type = 'sync' |
182 |
| - and c.updated_at < now() - interval '24 hours 1 minutes' |
183 |
| - and cast(c.schedule::jsonb->'timeUnit' as text) = '"hours"' |
184 |
| - group by 1 |
185 |
| - having count(*) < 24 / cast(c.schedule::jsonb->'units' as integer)) as abnormal_jobs |
186 |
| - """, countField); |
187 |
| - |
188 |
| - // Similar to the query above, this finds if the connection cadence's timeUnit is minutes. |
189 |
| - // thus we use 1440 (=24 hours x 60 minutes) to divide the configured cadence. |
190 |
| - final var queryForAbnormalSyncInMinutesInLastDay = |
191 |
| - String.format(""" |
192 |
| - select count(1) as %s from ( |
193 |
| - select |
194 |
| - c.id, |
195 |
| - count(*) as cnt |
196 |
| - from |
197 |
| - connection c |
198 |
| - left join Jobs j on |
199 |
| - j.scope::uuid = c.id |
200 |
| - where |
201 |
| - c.schedule is not null |
202 |
| - and c.schedule != 'null' |
203 |
| - and j.created_at > now() - interval '24 hours 1 minutes' |
204 |
| - and c.status = 'active' |
205 |
| - and j.config_type = 'sync' |
206 |
| - and c.updated_at < now() - interval '24 hours 1 minutes' |
207 |
| - and cast(c.schedule::jsonb->'timeUnit' as text) = '"minutes"' |
208 |
| - group by 1 |
209 |
| - having count(*) < 1440 / cast(c.schedule::jsonb->'units' as integer)) as abnormal_jobs |
210 |
| - """, countField); |
211 |
| - return ctx.fetch(queryForAbnormalSyncInHoursInLastDay).getValues(countField, long.class).get(0) |
212 |
| - + ctx.fetch(queryForAbnormalSyncInMinutesInLastDay).getValues(countField, long.class).get(0); |
213 |
| - } |
214 |
| - |
215 |
| - public static Long numScheduledActiveConnectionsInLastDay(final DSLContext ctx) { |
216 |
| - final var countField = "cnt"; |
217 |
| - final var queryForTotalConnections = String.format(""" |
218 |
| - select count(1) as %s |
219 |
| - from connection c |
220 |
| - where |
221 |
| - c.updated_at < now() - interval '24 hours 1 minutes' |
222 |
| - and cast(c.schedule::jsonb->'timeUnit' as text) IN ('"hours"', '"minutes"') |
223 |
| - and c.status = 'active' |
224 |
| - """, countField); |
225 |
| - |
226 |
| - return ctx.fetch(queryForTotalConnections).getValues(countField, long.class).get(0); |
227 |
| - } |
228 |
| - |
229 | 52 | }
|
0 commit comments