|
9 | 9 | // - custom domains aren't yet supported
|
10 | 10 |
|
11 | 11 | import isEmpty from "lodash/isEmpty";
|
12 |
| -import { useMutation } from "react-query"; |
| 12 | +import { useMutation, useQuery } from "react-query"; |
13 | 13 |
|
14 | 14 | import { OperatorType, WebBackendConnectionRead, OperationRead, WebhookConfigRead } from "core/request/AirbyteClient";
|
15 | 15 | import { useWebConnectionService } from "hooks/services/useConnectionHook";
|
16 | 16 | import { useCurrentWorkspace } from "hooks/services/useWorkspace";
|
| 17 | +import { |
| 18 | + DbtCloudJobInfo, |
| 19 | + webBackendGetAvailableDbtJobsForWorkspace, |
| 20 | + WorkspaceGetDbtJobsResponse, |
| 21 | +} from "packages/cloud/lib/domain/dbtCloud/api"; |
| 22 | +import { useDefaultRequestMiddlewares } from "services/useDefaultRequestMiddlewares"; |
17 | 23 | import { useUpdateWorkspace } from "services/workspaces/WorkspacesService";
|
18 | 24 |
|
| 25 | +import { useConfig } from "./config"; |
| 26 | + |
19 | 27 | export interface DbtCloudJob {
|
20 | 28 | account: string;
|
21 | 29 | job: string;
|
22 | 30 | operationId?: string;
|
| 31 | + jobName?: string; |
23 | 32 | }
|
| 33 | +export type { DbtCloudJobInfo } from "packages/cloud/lib/domain/dbtCloud/api"; |
24 | 34 | const dbtCloudDomain = "https://cloud.getdbt.com";
|
25 | 35 | const webhookConfigName = "dbt cloud";
|
26 | 36 | const executionBody = `{"cause": "airbyte"}`;
|
27 | 37 | const jobName = (t: DbtCloudJob) => `${t.account}/${t.job}`;
|
28 | 38 |
|
29 | 39 | const isDbtWebhookConfig = (webhookConfig: WebhookConfigRead) => !!webhookConfig.name?.includes("dbt");
|
30 | 40 |
|
31 |
| -const toDbtCloudJob = (operation: OperationRead): DbtCloudJob => { |
32 |
| - const { operationId } = operation; |
33 |
| - const { executionUrl } = operation.operatorConfiguration.webhook || {}; |
| 41 | +export const toDbtCloudJob = (operationOrCloudJob: OperationRead | DbtCloudJobInfo): DbtCloudJob => { |
| 42 | + if ("operationId" in operationOrCloudJob) { |
| 43 | + const { operationId } = operationOrCloudJob; |
| 44 | + const { executionUrl } = operationOrCloudJob.operatorConfiguration.webhook || {}; |
34 | 45 |
|
35 |
| - const matches = (executionUrl || "").match(/\/accounts\/([^/]+)\/jobs\/([^]+)\/run/); |
36 |
| - if (!matches) { |
37 |
| - throw new Error(`Cannot extract dbt cloud job params from executionUrl ${executionUrl}`); |
38 |
| - } else { |
39 |
| - const [, account, job] = matches; |
| 46 | + const matches = (executionUrl || "").match(/\/accounts\/([^/]+)\/jobs\/([^]+)\/run/); |
| 47 | + if (!matches) { |
| 48 | + throw new Error(`Cannot extract dbt cloud job params from executionUrl ${executionUrl}`); |
| 49 | + } else { |
| 50 | + const [, account, job] = matches; |
40 | 51 |
|
41 |
| - return { |
42 |
| - account, |
43 |
| - job, |
44 |
| - operationId, |
45 |
| - }; |
| 52 | + return { |
| 53 | + account, |
| 54 | + job, |
| 55 | + operationId, |
| 56 | + }; |
| 57 | + } |
| 58 | + } else { |
| 59 | + const { accountId, jobId, jobName } = operationOrCloudJob; |
| 60 | + return { account: `${accountId}`, job: `${jobId}`, jobName }; |
46 | 61 | }
|
47 | 62 | };
|
| 63 | + |
48 | 64 | const isDbtCloudJob = (operation: OperationRead): boolean =>
|
49 | 65 | operation.operatorConfiguration.operatorType === OperatorType.webhook;
|
50 | 66 |
|
| 67 | +export const isSameJob = (remoteJob: DbtCloudJobInfo, savedJob: DbtCloudJob): boolean => |
| 68 | + savedJob.account === `${remoteJob.accountId}` && savedJob.job === `${remoteJob.jobId}`; |
| 69 | + |
51 | 70 | export const useSubmitDbtCloudIntegrationConfig = () => {
|
52 | 71 | const { workspaceId } = useCurrentWorkspace();
|
53 | 72 | const { mutateAsync: updateWorkspace } = useUpdateWorkspace();
|
@@ -78,35 +97,64 @@ export const useDbtIntegration = (connection: WebBackendConnectionRead) => {
|
78 | 97 | );
|
79 | 98 | const otherOperations = [...(connection.operations?.filter((operation) => !isDbtCloudJob(operation)) || [])];
|
80 | 99 |
|
81 |
| - const saveJobs = (jobs: DbtCloudJob[]) => { |
82 |
| - // TODO dynamically use the workspace's configured dbt cloud domain when it gets returned by backend |
83 |
| - const urlForJob = (job: DbtCloudJob) => `${dbtCloudDomain}/api/v2/accounts/${job.account}/jobs/${job.job}/run/`; |
84 |
| - |
85 |
| - return connectionService.update({ |
86 |
| - connectionId: connection.connectionId, |
87 |
| - operations: [ |
88 |
| - ...otherOperations, |
89 |
| - ...jobs.map((job) => ({ |
90 |
| - workspaceId, |
91 |
| - ...(job.operationId ? { operationId: job.operationId } : {}), |
92 |
| - name: jobName(job), |
93 |
| - operatorConfiguration: { |
94 |
| - operatorType: OperatorType.webhook, |
95 |
| - webhook: { |
96 |
| - executionUrl: urlForJob(job), |
97 |
| - // if `hasDbtIntegration` is true, webhookConfigId is guaranteed to exist |
98 |
| - ...(webhookConfigId ? { webhookConfigId } : {}), |
99 |
| - executionBody, |
| 100 | + const { mutateAsync, isLoading } = useMutation({ |
| 101 | + mutationFn: (jobs: DbtCloudJob[]) => { |
| 102 | + // TODO dynamically use the workspace's configured dbt cloud domain when it gets returned by backend |
| 103 | + const urlForJob = (job: DbtCloudJob) => `${dbtCloudDomain}/api/v2/accounts/${job.account}/jobs/${job.job}/run/`; |
| 104 | + |
| 105 | + return connectionService.update({ |
| 106 | + connectionId: connection.connectionId, |
| 107 | + operations: [ |
| 108 | + ...otherOperations, |
| 109 | + ...jobs.map((job) => ({ |
| 110 | + workspaceId, |
| 111 | + ...(job.operationId ? { operationId: job.operationId } : {}), |
| 112 | + name: jobName(job), |
| 113 | + operatorConfiguration: { |
| 114 | + operatorType: OperatorType.webhook, |
| 115 | + webhook: { |
| 116 | + executionUrl: urlForJob(job), |
| 117 | + // if `hasDbtIntegration` is true, webhookConfigId is guaranteed to exist |
| 118 | + ...(webhookConfigId ? { webhookConfigId } : {}), |
| 119 | + executionBody, |
| 120 | + }, |
100 | 121 | },
|
101 |
| - }, |
102 |
| - })), |
103 |
| - ], |
104 |
| - }); |
105 |
| - }; |
| 122 | + })), |
| 123 | + ], |
| 124 | + }); |
| 125 | + }, |
| 126 | + }); |
106 | 127 |
|
107 | 128 | return {
|
108 | 129 | hasDbtIntegration,
|
109 | 130 | dbtCloudJobs,
|
110 |
| - saveJobs, |
| 131 | + saveJobs: mutateAsync, |
| 132 | + isSaving: isLoading, |
111 | 133 | };
|
112 | 134 | };
|
| 135 | + |
| 136 | +export const useAvailableDbtJobs = () => { |
| 137 | + const { cloudApiUrl } = useConfig(); |
| 138 | + const config = { apiUrl: cloudApiUrl }; |
| 139 | + const middlewares = useDefaultRequestMiddlewares(); |
| 140 | + const requestOptions = { config, middlewares }; |
| 141 | + const workspace = useCurrentWorkspace(); |
| 142 | + const { workspaceId } = workspace; |
| 143 | + const dbtConfigId = workspace.webhookConfigs?.find((config) => config.name?.includes("dbt"))?.id; |
| 144 | + |
| 145 | + if (!dbtConfigId) { |
| 146 | + throw new Error("cannot request available dbt jobs for a workspace with no dbt cloud integration configured"); |
| 147 | + } |
| 148 | + |
| 149 | + const results = useQuery( |
| 150 | + ["dbtCloud", dbtConfigId, "list"], |
| 151 | + () => webBackendGetAvailableDbtJobsForWorkspace({ workspaceId, dbtConfigId }, requestOptions), |
| 152 | + { |
| 153 | + suspense: true, |
| 154 | + } |
| 155 | + ); |
| 156 | + |
| 157 | + // casting type to remove `| undefined`, since `suspense: true` will ensure the value |
| 158 | + // is, in fact, available |
| 159 | + return (results.data as WorkspaceGetDbtJobsResponse).availableDbtJobs; |
| 160 | +}; |
0 commit comments