Skip to content

Commit 174a740

Browse files
mfsiega-airbyteakashkulk
authored andcommitted
Add structured dbt cloud information to the operations api (#19395)
* add structured dbt cloud information to the operations api * remove unused webhook features, test updates * update tests to use structured dbt cloud operation api * add missing webhook operator type
1 parent 9781f83 commit 174a740

File tree

5 files changed

+269
-51
lines changed

5 files changed

+269
-51
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3778,19 +3778,35 @@ components:
37783778
type: string
37793779
OperatorWebhook:
37803780
type: object
3781-
required:
3782-
- executionUrl
37833781
properties:
3784-
executionUrl:
3785-
type: string
3786-
description: The URL to call to execute the webhook operation via POST request.
3787-
executionBody:
3788-
type: string
3789-
description: If populated, this will be sent with the POST request.
37903782
webhookConfigId:
37913783
type: string
37923784
format: uuid
37933785
description: The id of the webhook configs to use from the workspace.
3786+
webhookType:
3787+
type: string
3788+
enum:
3789+
- dbtCloud
3790+
dbtCloud:
3791+
type: object
3792+
required:
3793+
- accountId
3794+
- jobId
3795+
properties:
3796+
accountId:
3797+
type: integer
3798+
description: The account id associated with the job
3799+
jobId:
3800+
type: integer
3801+
description: The job id associated with the job
3802+
executionUrl:
3803+
type: string
3804+
description: DEPRECATED. Populate dbtCloud instead.
3805+
deprecated: true
3806+
executionBody:
3807+
type: string
3808+
description: DEPRECATED. Populate dbtCloud instead.
3809+
deprecated: true
37943810
CheckOperationRead:
37953811
type: object
37963812
required:

airbyte-server/src/main/java/io/airbyte/server/converters/OperationsConverter.java

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44

55
package io.airbyte.server.converters;
66

7+
import static io.airbyte.api.model.generated.OperatorWebhook.WebhookTypeEnum.DBTCLOUD;
8+
79
import com.google.common.base.Preconditions;
810
import io.airbyte.api.model.generated.OperationRead;
911
import io.airbyte.api.model.generated.OperatorConfiguration;
1012
import io.airbyte.api.model.generated.OperatorNormalization.OptionEnum;
13+
import io.airbyte.api.model.generated.OperatorWebhookDbtCloud;
1114
import io.airbyte.commons.enums.Enums;
1215
import io.airbyte.config.OperatorDbt;
1316
import io.airbyte.config.OperatorNormalization;
1417
import io.airbyte.config.OperatorNormalization.Option;
1518
import io.airbyte.config.OperatorWebhook;
1619
import io.airbyte.config.StandardSyncOperation;
1720
import io.airbyte.config.StandardSyncOperation.OperatorType;
21+
import java.util.regex.Matcher;
22+
import java.util.regex.Pattern;
1823

1924
public class OperationsConverter {
2025

@@ -44,10 +49,7 @@ public static void populateOperatorConfigFromApi(final OperatorConfiguration ope
4449
case WEBHOOK -> {
4550
Preconditions.checkArgument(operatorConfig.getWebhook() != null);
4651
// TODO(mfsiega-airbyte): check that the webhook config id references a real webhook config.
47-
standardSyncOperation.withOperatorWebhook(new OperatorWebhook()
48-
.withExecutionUrl(operatorConfig.getWebhook().getExecutionUrl())
49-
.withExecutionBody(operatorConfig.getWebhook().getExecutionBody())
50-
.withWebhookConfigId(operatorConfig.getWebhook().getWebhookConfigId()));
52+
standardSyncOperation.withOperatorWebhook(webhookOperatorFromConfig(operatorConfig.getWebhook()));
5153
// Null out the other configs, since it's mutually exclusive. We need to do this if it's an update.
5254
standardSyncOperation.withOperatorNormalization(null);
5355
standardSyncOperation.withOperatorDbt(null);
@@ -82,10 +84,7 @@ public static OperationRead operationReadFromPersistedOperation(final StandardSy
8284
}
8385
case WEBHOOK -> {
8486
Preconditions.checkArgument(standardSyncOperation.getOperatorWebhook() != null);
85-
operatorConfiguration.webhook(new io.airbyte.api.model.generated.OperatorWebhook()
86-
.webhookConfigId(standardSyncOperation.getOperatorWebhook().getWebhookConfigId())
87-
.executionUrl(standardSyncOperation.getOperatorWebhook().getExecutionUrl())
88-
.executionBody(standardSyncOperation.getOperatorWebhook().getExecutionBody()));
87+
operatorConfiguration.webhook(webhookOperatorFromPersistence(standardSyncOperation.getOperatorWebhook()));
8988
}
9089
}
9190
return new OperationRead()
@@ -95,4 +94,68 @@ public static OperationRead operationReadFromPersistedOperation(final StandardSy
9594
.operatorConfiguration(operatorConfiguration);
9695
}
9796

97+
private static OperatorWebhook webhookOperatorFromConfig(io.airbyte.api.model.generated.OperatorWebhook webhookConfig) {
98+
final var operatorWebhook = new OperatorWebhook().withWebhookConfigId(webhookConfig.getWebhookConfigId());
99+
// TODO(mfsiega-airbyte): remove this once the frontend is sending the new format.
100+
if (webhookConfig.getWebhookType() == null) {
101+
return operatorWebhook
102+
.withExecutionUrl(webhookConfig.getExecutionUrl())
103+
.withExecutionBody(webhookConfig.getExecutionBody());
104+
}
105+
switch (webhookConfig.getWebhookType()) {
106+
case DBTCLOUD -> {
107+
return operatorWebhook
108+
.withExecutionUrl(DbtCloudOperationConverter.getExecutionUrlFrom(webhookConfig.getDbtCloud()))
109+
.withExecutionBody(DbtCloudOperationConverter.getDbtCloudExecutionBody());
110+
}
111+
// Future webhook operator types added here.
112+
}
113+
throw new IllegalArgumentException("Unsupported webhook operation type");
114+
}
115+
116+
private static io.airbyte.api.model.generated.OperatorWebhook webhookOperatorFromPersistence(final OperatorWebhook persistedWebhook) {
117+
final io.airbyte.api.model.generated.OperatorWebhook webhookOperator = new io.airbyte.api.model.generated.OperatorWebhook()
118+
.webhookConfigId(persistedWebhook.getWebhookConfigId());
119+
OperatorWebhookDbtCloud dbtCloudOperator = DbtCloudOperationConverter.parseFrom(persistedWebhook);
120+
if (dbtCloudOperator != null) {
121+
webhookOperator.webhookType(DBTCLOUD).dbtCloud(dbtCloudOperator);
122+
// TODO(mfsiega-airbyte): remove once frontend switches to new format.
123+
// Dual-write deprecated webhook format.
124+
webhookOperator.executionUrl(DbtCloudOperationConverter.getExecutionUrlFrom(dbtCloudOperator));
125+
webhookOperator.executionBody(DbtCloudOperationConverter.getDbtCloudExecutionBody());
126+
} else {
127+
throw new IllegalArgumentException("Unexpected webhook operator config");
128+
}
129+
return webhookOperator;
130+
}
131+
132+
private static class DbtCloudOperationConverter {
133+
134+
// See https://docs.getdbt.com/dbt-cloud/api-v2 for documentation on dbt Cloud API endpoints.
135+
final static Pattern dbtUrlPattern = Pattern.compile("^https://cloud\\.getdbt\\.com/api/v2/accounts/(\\d+)/jobs/(\\d+)/run/$");
136+
private static final int ACCOUNT_REGEX_GROUP = 1;
137+
private static final int JOB_REGEX_GROUP = 2;
138+
139+
private static OperatorWebhookDbtCloud parseFrom(OperatorWebhook persistedWebhook) {
140+
Matcher dbtCloudUrlMatcher = dbtUrlPattern.matcher(persistedWebhook.getExecutionUrl());
141+
final var dbtCloudConfig = new OperatorWebhookDbtCloud();
142+
if (dbtCloudUrlMatcher.matches()) {
143+
dbtCloudConfig.setAccountId(Integer.valueOf(dbtCloudUrlMatcher.group(ACCOUNT_REGEX_GROUP)));
144+
dbtCloudConfig.setJobId(Integer.valueOf(dbtCloudUrlMatcher.group(JOB_REGEX_GROUP)));
145+
return dbtCloudConfig;
146+
}
147+
return null;
148+
}
149+
150+
private static String getExecutionUrlFrom(final OperatorWebhookDbtCloud dbtCloudConfig) {
151+
return String.format("https://cloud.getdbt.com/api/v2/accounts/%d/jobs/%d/run/", dbtCloudConfig.getAccountId(),
152+
dbtCloudConfig.getJobId());
153+
}
154+
155+
private static String getDbtCloudExecutionBody() {
156+
return "{\"cause\": \"airbyte\"}";
157+
}
158+
159+
}
160+
98161
}

airbyte-server/src/test/java/io/airbyte/server/handlers/OperationsHandlerTest.java

Lines changed: 87 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.server.handlers;
66

77
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertThrows;
89
import static org.junit.jupiter.api.Assertions.assertTrue;
910
import static org.mockito.ArgumentMatchers.eq;
1011
import static org.mockito.Mockito.mock;
@@ -24,6 +25,8 @@
2425
import io.airbyte.api.model.generated.OperatorNormalization.OptionEnum;
2526
import io.airbyte.api.model.generated.OperatorType;
2627
import io.airbyte.api.model.generated.OperatorWebhook;
28+
import io.airbyte.api.model.generated.OperatorWebhook.WebhookTypeEnum;
29+
import io.airbyte.api.model.generated.OperatorWebhookDbtCloud;
2730
import io.airbyte.commons.enums.Enums;
2831
import io.airbyte.config.OperatorNormalization.Option;
2932
import io.airbyte.config.StandardSync;
@@ -45,10 +48,12 @@ class OperationsHandlerTest {
4548

4649
private static final String WEBHOOK_OPERATION_NAME = "fake-operation-name";
4750
private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID();
48-
private static final String WEBHOOK_EXECUTION_URL = "fake-execution-url";
49-
private static final String WEBHOOK_EXECUTION_BODY = "fake-execution-body";
5051
private static final UUID WEBHOOK_OPERATION_ID = UUID.randomUUID();
51-
public static final String NEW_EXECUTION_URL = "new-execution-url";
52+
private static final Integer DBT_CLOUD_WEBHOOK_ACCOUNT_ID = 123;
53+
private static final Integer DBT_CLOUD_WEBHOOK_JOB_ID = 456;
54+
private static final Integer NEW_DBT_CLOUD_WEBHOOK_ACCOUNT_ID = 789;
55+
public static final String EXECUTION_BODY = "{\"cause\": \"airbyte\"}";
56+
public static final String EXECUTION_URL_TEMPLATE = "https://cloud.getdbt.com/api/v2/accounts/%d/jobs/%d/run/";
5257
private ConfigRepository configRepository;
5358
private Supplier<UUID> uuidGenerator;
5459
private OperationsHandler operationsHandler;
@@ -104,8 +109,10 @@ void testCreateWebhookOperation() throws JsonValidationException, ConfigNotFound
104109
when(uuidGenerator.get()).thenReturn(WEBHOOK_OPERATION_ID);
105110
final OperatorWebhook webhookConfig = new OperatorWebhook()
106111
.webhookConfigId(WEBHOOK_CONFIG_ID)
107-
.executionUrl(WEBHOOK_EXECUTION_URL)
108-
.executionBody(WEBHOOK_EXECUTION_BODY);
112+
.webhookType(WebhookTypeEnum.DBTCLOUD)
113+
.dbtCloud(new OperatorWebhookDbtCloud()
114+
.accountId(DBT_CLOUD_WEBHOOK_ACCOUNT_ID)
115+
.jobId(DBT_CLOUD_WEBHOOK_JOB_ID));
109116
final OperationCreate operationCreate = new OperationCreate()
110117
.workspaceId(standardSyncOperation.getWorkspaceId())
111118
.name(WEBHOOK_OPERATION_NAME)
@@ -119,8 +126,9 @@ void testCreateWebhookOperation() throws JsonValidationException, ConfigNotFound
119126
.withOperatorType(StandardSyncOperation.OperatorType.WEBHOOK)
120127
.withOperatorWebhook(new io.airbyte.config.OperatorWebhook()
121128
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
122-
.withExecutionUrl(WEBHOOK_EXECUTION_URL)
123-
.withExecutionBody(WEBHOOK_EXECUTION_BODY))
129+
.withExecutionUrl(String.format(EXECUTION_URL_TEMPLATE, DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
130+
DBT_CLOUD_WEBHOOK_JOB_ID))
131+
.withExecutionBody(EXECUTION_BODY))
124132
.withTombstone(false);
125133

126134
when(configRepository.getStandardSyncOperation(WEBHOOK_OPERATION_ID)).thenReturn(expectedPersistedOperation);
@@ -131,7 +139,12 @@ void testCreateWebhookOperation() throws JsonValidationException, ConfigNotFound
131139
assertEquals(WEBHOOK_OPERATION_ID, actualOperationRead.getOperationId());
132140
assertEquals(WEBHOOK_OPERATION_NAME, actualOperationRead.getName());
133141
assertEquals(OperatorType.WEBHOOK, actualOperationRead.getOperatorConfiguration().getOperatorType());
134-
assertEquals(webhookConfig, actualOperationRead.getOperatorConfiguration().getWebhook());
142+
143+
// NOTE: we expect the server to dual-write on read until the frontend moves to the new format.
144+
final OperatorWebhook expectedWebhookConfigRead =
145+
webhookConfig.executionUrl(String.format(EXECUTION_URL_TEMPLATE, DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
146+
DBT_CLOUD_WEBHOOK_JOB_ID)).executionBody(EXECUTION_BODY);
147+
assertEquals(expectedWebhookConfigRead, actualOperationRead.getOperatorConfiguration().getWebhook());
135148

136149
verify(configRepository).writeStandardSyncOperation(eq(expectedPersistedOperation));
137150
}
@@ -189,36 +202,58 @@ void testUpdateWebhookOperation() throws JsonValidationException, ConfigNotFound
189202
when(uuidGenerator.get()).thenReturn(WEBHOOK_OPERATION_ID);
190203
final OperatorWebhook webhookConfig = new OperatorWebhook()
191204
.webhookConfigId(WEBHOOK_CONFIG_ID)
192-
.executionUrl(NEW_EXECUTION_URL)
193-
.executionBody(WEBHOOK_EXECUTION_BODY);
205+
.webhookType(WebhookTypeEnum.DBTCLOUD)
206+
.dbtCloud(new OperatorWebhookDbtCloud()
207+
.accountId(NEW_DBT_CLOUD_WEBHOOK_ACCOUNT_ID)
208+
.jobId(DBT_CLOUD_WEBHOOK_JOB_ID));
194209
final OperationUpdate operationUpdate = new OperationUpdate()
195210
.name(WEBHOOK_OPERATION_NAME)
196211
.operationId(WEBHOOK_OPERATION_ID)
197212
.operatorConfiguration(new OperatorConfiguration()
198213
.operatorType(OperatorType.WEBHOOK).webhook(webhookConfig));
199214

215+
final var persistedWebhook = new io.airbyte.config.OperatorWebhook()
216+
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
217+
.withExecutionUrl(String.format(EXECUTION_URL_TEMPLATE, DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
218+
DBT_CLOUD_WEBHOOK_JOB_ID))
219+
.withExecutionBody(EXECUTION_BODY);
220+
221+
final var updatedWebhook = new io.airbyte.config.OperatorWebhook()
222+
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
223+
.withExecutionUrl(String.format(EXECUTION_URL_TEMPLATE, NEW_DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
224+
DBT_CLOUD_WEBHOOK_JOB_ID))
225+
.withExecutionBody(EXECUTION_BODY);
226+
200227
final StandardSyncOperation persistedOperation = new StandardSyncOperation()
201228
.withWorkspaceId(standardSyncOperation.getWorkspaceId())
202229
.withOperationId(WEBHOOK_OPERATION_ID)
203230
.withName(WEBHOOK_OPERATION_NAME)
204231
.withOperatorType(StandardSyncOperation.OperatorType.WEBHOOK)
205-
.withOperatorWebhook(new io.airbyte.config.OperatorWebhook()
206-
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
207-
.withExecutionUrl(WEBHOOK_EXECUTION_URL)
208-
.withExecutionBody(WEBHOOK_EXECUTION_BODY));
232+
.withOperatorWebhook(persistedWebhook);
209233

210-
when(configRepository.getStandardSyncOperation(WEBHOOK_OPERATION_ID)).thenReturn(persistedOperation);
234+
final StandardSyncOperation updatedOperation = new StandardSyncOperation()
235+
.withWorkspaceId(standardSyncOperation.getWorkspaceId())
236+
.withOperationId(WEBHOOK_OPERATION_ID)
237+
.withName(WEBHOOK_OPERATION_NAME)
238+
.withOperatorType(StandardSyncOperation.OperatorType.WEBHOOK)
239+
.withOperatorWebhook(updatedWebhook);
240+
241+
when(configRepository.getStandardSyncOperation(WEBHOOK_OPERATION_ID)).thenReturn(persistedOperation).thenReturn(updatedOperation);
211242

212243
final OperationRead actualOperationRead = operationsHandler.updateOperation(operationUpdate);
213244

214245
assertEquals(WEBHOOK_OPERATION_ID, actualOperationRead.getOperationId());
215246
assertEquals(WEBHOOK_OPERATION_NAME, actualOperationRead.getName());
216247
assertEquals(OperatorType.WEBHOOK, actualOperationRead.getOperatorConfiguration().getOperatorType());
217-
assertEquals(webhookConfig, actualOperationRead.getOperatorConfiguration().getWebhook());
248+
final OperatorWebhook expectedWebhookConfigRead =
249+
webhookConfig.executionUrl(String.format(EXECUTION_URL_TEMPLATE, NEW_DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
250+
DBT_CLOUD_WEBHOOK_JOB_ID)).executionBody(EXECUTION_BODY);
251+
assertEquals(expectedWebhookConfigRead, actualOperationRead.getOperatorConfiguration().getWebhook());
218252

219253
verify(configRepository)
220254
.writeStandardSyncOperation(persistedOperation.withOperatorWebhook(persistedOperation.getOperatorWebhook().withExecutionUrl(
221-
NEW_EXECUTION_URL)));
255+
String.format(EXECUTION_URL_TEMPLATE, NEW_DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
256+
DBT_CLOUD_WEBHOOK_JOB_ID))));
222257
}
223258

224259
@Test
@@ -313,4 +348,39 @@ void testEnumConversion() {
313348
io.airbyte.config.OperatorNormalization.Option.class));
314349
}
315350

351+
@Test
352+
void testDbtCloudRegex() {
353+
// Validate that a non-url is rejected.
354+
assertThrows(IllegalArgumentException.class, () -> checkDbtCloudUrl("not-a-url"));
355+
// Validate that the URL is anchored to the beginning.
356+
assertThrows(IllegalArgumentException.class,
357+
() -> checkDbtCloudUrl("some-nonsense-" + String.format(EXECUTION_URL_TEMPLATE, DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
358+
DBT_CLOUD_WEBHOOK_JOB_ID)));
359+
// Validate that the URL is anchored to the end.
360+
assertThrows(IllegalArgumentException.class,
361+
() -> checkDbtCloudUrl(String.format(EXECUTION_URL_TEMPLATE, DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
362+
DBT_CLOUD_WEBHOOK_JOB_ID) + "-some-nonsense"));
363+
// Validate that the account id must be an integer.
364+
assertThrows(IllegalArgumentException.class, () -> checkDbtCloudUrl("https://cloud.getdbt.com/api/v2/accounts/abc/jobs/123/run/"));
365+
// Validate that the job id must be an integer.
366+
assertThrows(IllegalArgumentException.class, () -> checkDbtCloudUrl("https://cloud.getdbt.com/api/v2/accounts/123/jobs/abc/run/"));
367+
}
368+
369+
private void checkDbtCloudUrl(final String urlToCheck) throws JsonValidationException, ConfigNotFoundException, IOException {
370+
final StandardSyncOperation persistedOperation = new StandardSyncOperation()
371+
.withWorkspaceId(standardSyncOperation.getWorkspaceId())
372+
.withOperationId(WEBHOOK_OPERATION_ID)
373+
.withName(WEBHOOK_OPERATION_NAME)
374+
.withOperatorType(StandardSyncOperation.OperatorType.WEBHOOK)
375+
.withOperatorWebhook(new io.airbyte.config.OperatorWebhook()
376+
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
377+
.withExecutionUrl(urlToCheck)
378+
.withExecutionBody(EXECUTION_BODY))
379+
.withTombstone(false);
380+
when(configRepository.getStandardSyncOperation(WEBHOOK_OPERATION_ID)).thenReturn(persistedOperation);
381+
382+
final OperationIdRequestBody operationIdRequestBody = new OperationIdRequestBody().operationId(WEBHOOK_OPERATION_ID);
383+
operationsHandler.getOperation(operationIdRequestBody);
384+
}
385+
316386
}

airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
import io.airbyte.api.client.model.generated.OperatorConfiguration;
6767
import io.airbyte.api.client.model.generated.OperatorType;
6868
import io.airbyte.api.client.model.generated.OperatorWebhook;
69+
import io.airbyte.api.client.model.generated.OperatorWebhook.WebhookTypeEnum;
70+
import io.airbyte.api.client.model.generated.OperatorWebhookDbtCloud;
6971
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
7072
import io.airbyte.api.client.model.generated.SourceDefinitionIdWithWorkspaceId;
7173
import io.airbyte.api.client.model.generated.SourceDefinitionRead;
@@ -465,10 +467,9 @@ void testWebhookOperationExecutesSuccessfully() throws Exception {
465467
.operatorType(OperatorType.WEBHOOK)
466468
.webhook(new OperatorWebhook()
467469
.webhookConfigId(workspaceRead.getWebhookConfigs().get(0).getId())
468-
// NOTE: reqres.in is free service that hosts a REST API intended for testing frontend/client code.
469-
// We use it here as an endpoint that will accept an HTTP POST.
470-
.executionUrl("https://reqres.in/api/users")
471-
.executionBody("{\"name\": \"morpheus\", \"job\": \"leader\"}"))));
470+
// NOTE: this dbt Cloud config won't actually work, but the sync should still succeed.
471+
.webhookType(WebhookTypeEnum.DBTCLOUD)
472+
.dbtCloud(new OperatorWebhookDbtCloud().accountId(123).jobId(456)))));
472473
// create a connection with the new operation.
473474
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
474475
final UUID destinationId = testHarness.createPostgresDestination().getDestinationId();

0 commit comments

Comments
 (0)