Skip to content

Add structured dbt cloud information to the operations api #19395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3778,19 +3778,47 @@ components:
type: string
OperatorWebhook:
type: object
required:
- executionUrl
properties:
executionUrl:
type: string
description: The URL to call to execute the webhook operation via POST request.
executionBody:
type: string
description: If populated, this will be sent with the POST request.
webhookConfigId:
type: string
format: uuid
description: The id of the webhook configs to use from the workspace.
webhookType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice little affordance, lets the frontend use if (operatorWebhook.webhookType === "dbtCloud") to narrow the type checking to dbtCloud-specific fields.

type: string
enum:
- generic
- dbtCloud
generic:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, is the idea to remove generic once the webapp switches over to the dbtCloud one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was actually leave it around, but that's a good point - it's basically dead code, since nothing will be using it.

I'm happy to eliminate it, and just have structured dbtCloud (plus the deprecated fields, which will indeed be removed soon). Also happy to leave it, as a marker to say "generic webhook operations are a possible thing".

Copy link
Contributor

@davinchia davinchia Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at minimum we should remove the backend code to make it clear it's not executed on.

We can leave the interface around with the above comment so we don't have to redesign it. I have a slight preference to remove, however mainly neutral here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm on board with removing it altogether.

type: object
required:
- executionUrl
properties:
executionUrl:
type: string
description: The URL to call to execute the webhook operation via POST request.
executionBody:
type: string
description: If populated, this will be sent with the POST request.
dbtCloud:
type: object
required:
- accountId
- jobId
properties:
accountId:
type: integer
description: The account id associated with the job
jobId:
type: integer
description: The job id associated with the job
Comment on lines +3796 to +3801
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same field names and types is the key thing to make the frontend DX smooth 👍. Using the same type through the whole "list -> select -> submit" pipeline would let the frontend code be even cleaner, but not by a big enough margin to justify a lot of backend hassle.

executionUrl:
type: string
description: DEPRECATED. Populate generic.executionUrl instead.
deprecated: true
executionBody:
type: string
description: DEPRECATED. Populate generic.executionBody instead.
deprecated: true
CheckOperationRead:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@

package io.airbyte.server.converters;

import static io.airbyte.api.model.generated.OperatorWebhook.WebhookTypeEnum.DBTCLOUD;
import static io.airbyte.api.model.generated.OperatorWebhook.WebhookTypeEnum.GENERIC;

import com.google.common.base.Preconditions;
import io.airbyte.api.model.generated.OperationRead;
import io.airbyte.api.model.generated.OperatorConfiguration;
import io.airbyte.api.model.generated.OperatorNormalization.OptionEnum;
import io.airbyte.api.model.generated.OperatorWebhookDbtCloud;
import io.airbyte.api.model.generated.OperatorWebhookGeneric;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class OperationsConverter {

Expand Down Expand Up @@ -44,10 +51,7 @@ public static void populateOperatorConfigFromApi(final OperatorConfiguration ope
case WEBHOOK -> {
Preconditions.checkArgument(operatorConfig.getWebhook() != null);
// TODO(mfsiega-airbyte): check that the webhook config id references a real webhook config.
standardSyncOperation.withOperatorWebhook(new OperatorWebhook()
.withExecutionUrl(operatorConfig.getWebhook().getExecutionUrl())
.withExecutionBody(operatorConfig.getWebhook().getExecutionBody())
.withWebhookConfigId(operatorConfig.getWebhook().getWebhookConfigId()));
standardSyncOperation.withOperatorWebhook(webhookOperatorFromConfig(operatorConfig.getWebhook()));
// Null out the other configs, since it's mutually exclusive. We need to do this if it's an update.
standardSyncOperation.withOperatorNormalization(null);
standardSyncOperation.withOperatorDbt(null);
Expand Down Expand Up @@ -82,10 +86,7 @@ public static OperationRead operationReadFromPersistedOperation(final StandardSy
}
case WEBHOOK -> {
Preconditions.checkArgument(standardSyncOperation.getOperatorWebhook() != null);
operatorConfiguration.webhook(new io.airbyte.api.model.generated.OperatorWebhook()
.webhookConfigId(standardSyncOperation.getOperatorWebhook().getWebhookConfigId())
.executionUrl(standardSyncOperation.getOperatorWebhook().getExecutionUrl())
.executionBody(standardSyncOperation.getOperatorWebhook().getExecutionBody()));
operatorConfiguration.webhook(webhookOperatorFromPersistence(standardSyncOperation.getOperatorWebhook()));
}
}
return new OperationRead()
Expand All @@ -95,4 +96,62 @@ public static OperationRead operationReadFromPersistedOperation(final StandardSy
.operatorConfiguration(operatorConfiguration);
}

private static OperatorWebhook webhookOperatorFromConfig(io.airbyte.api.model.generated.OperatorWebhook webhookConfig) {
final var operatorWebhook = new OperatorWebhook().withWebhookConfigId(webhookConfig.getWebhookConfigId());
if (webhookConfig.getWebhookType() == null) {
return operatorWebhook
.withExecutionUrl(webhookConfig.getExecutionUrl())
.withExecutionBody(webhookConfig.getExecutionBody());
}
switch (webhookConfig.getWebhookType()) {
case DBTCLOUD -> {
return operatorWebhook
.withExecutionUrl(String.format("https://cloud.getdbt.com/api/v2/accounts/%d/jobs/%d/run/", webhookConfig.getDbtCloud().getAccountId(),
webhookConfig.getDbtCloud().getJobId()))
.withExecutionBody("{\"cause\": \"airbyte\"}");
}
case GENERIC -> {
return operatorWebhook
.withExecutionUrl(webhookConfig.getGeneric().getExecutionUrl())
.withExecutionBody(webhookConfig.getGeneric().getExecutionBody());
}
}
throw new IllegalArgumentException("Unsupported webhook operation type");
}

private static io.airbyte.api.model.generated.OperatorWebhook webhookOperatorFromPersistence(final OperatorWebhook persistedWebhook) {
final io.airbyte.api.model.generated.OperatorWebhook webhookOperator = new io.airbyte.api.model.generated.OperatorWebhook()
.webhookConfigId(persistedWebhook.getWebhookConfigId());
OperatorWebhookDbtCloud dbtCloudOperator = DbtCloudOperationConverter.parseFrom(persistedWebhook);
if (dbtCloudOperator != null) {
webhookOperator.webhookType(DBTCLOUD).dbtCloud(DbtCloudOperationConverter.parseFrom(persistedWebhook));
} else {
webhookOperator.webhookType(GENERIC).generic(new OperatorWebhookGeneric()
.executionBody(persistedWebhook.getExecutionBody())
.executionUrl(persistedWebhook.getExecutionUrl()));
// NOTE: double-write until the frontend starts using the new fields.
webhookOperator.executionUrl(persistedWebhook.getExecutionUrl()).executionBody(persistedWebhook.getExecutionBody());
}
return webhookOperator;
}

private static class DbtCloudOperationConverter {

final static Pattern dbtUrlPattern = Pattern.compile("^https://cloud\\.getdbt\\.com/api/v2/accounts/(\\d+)/jobs/(\\d+)/run/$");
private static final int ACCOUNT_REGEX_GROUP = 1;
private static final int JOB_REGEX_GROUP = 2;

private static OperatorWebhookDbtCloud parseFrom(OperatorWebhook persistedWebhook) {
Matcher dbtCloudUrlMatcher = dbtUrlPattern.matcher(persistedWebhook.getExecutionUrl());
final var dbtCloudConfig = new OperatorWebhookDbtCloud();
if (dbtCloudUrlMatcher.matches()) {
dbtCloudConfig.setAccountId(Integer.valueOf(dbtCloudUrlMatcher.group(ACCOUNT_REGEX_GROUP)));
dbtCloudConfig.setJobId(Integer.valueOf(dbtCloudUrlMatcher.group(JOB_REGEX_GROUP)));
return dbtCloudConfig;
}
return null;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.airbyte.api.model.generated.OperatorNormalization.OptionEnum;
import io.airbyte.api.model.generated.OperatorType;
import io.airbyte.api.model.generated.OperatorWebhook;
import io.airbyte.api.model.generated.OperatorWebhook.WebhookTypeEnum;
import io.airbyte.api.model.generated.OperatorWebhookDbtCloud;
import io.airbyte.api.model.generated.OperatorWebhookGeneric;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.StandardSync;
Expand All @@ -49,6 +52,8 @@ class OperationsHandlerTest {
private static final String WEBHOOK_EXECUTION_BODY = "fake-execution-body";
private static final UUID WEBHOOK_OPERATION_ID = UUID.randomUUID();
public static final String NEW_EXECUTION_URL = "new-execution-url";
private static final Integer DBT_CLOUD_WEBHOOK_ACCOUNT_ID = 123;
private static final Integer DBT_CLOUD_WEBHOOK_JOB_ID = 456;
private ConfigRepository configRepository;
private Supplier<UUID> uuidGenerator;
private OperationsHandler operationsHandler;
Expand Down Expand Up @@ -105,7 +110,12 @@ void testCreateWebhookOperation() throws JsonValidationException, ConfigNotFound
final OperatorWebhook webhookConfig = new OperatorWebhook()
.webhookConfigId(WEBHOOK_CONFIG_ID)
.executionUrl(WEBHOOK_EXECUTION_URL)
.executionBody(WEBHOOK_EXECUTION_BODY);
.executionBody(WEBHOOK_EXECUTION_BODY)
.webhookType(WebhookTypeEnum.GENERIC)
// NOTE: we double-write until the frontend moves to the new format, so we expect it in both places.
.generic(new OperatorWebhookGeneric()
.executionUrl(WEBHOOK_EXECUTION_URL)
.executionBody(WEBHOOK_EXECUTION_BODY));
final OperationCreate operationCreate = new OperationCreate()
.workspaceId(standardSyncOperation.getWorkspaceId())
.name(WEBHOOK_OPERATION_NAME)
Expand Down Expand Up @@ -136,6 +146,46 @@ void testCreateWebhookOperation() throws JsonValidationException, ConfigNotFound
verify(configRepository).writeStandardSyncOperation(eq(expectedPersistedOperation));
}

@Test
void testCreateDbtCloudOperation() throws JsonValidationException, ConfigNotFoundException, IOException {
when(uuidGenerator.get()).thenReturn(WEBHOOK_OPERATION_ID);
final OperatorWebhook webhookConfig = new OperatorWebhook()
.webhookConfigId(WEBHOOK_CONFIG_ID)
.webhookType(WebhookTypeEnum.DBTCLOUD)
.dbtCloud(new OperatorWebhookDbtCloud()
.accountId(DBT_CLOUD_WEBHOOK_ACCOUNT_ID)
.jobId(DBT_CLOUD_WEBHOOK_JOB_ID));
final OperationCreate operationCreate = new OperationCreate()
.workspaceId(standardSyncOperation.getWorkspaceId())
.name(WEBHOOK_OPERATION_NAME)
.operatorConfiguration(new OperatorConfiguration()
.operatorType(OperatorType.WEBHOOK).webhook(webhookConfig));

final StandardSyncOperation expectedPersistedOperation = new StandardSyncOperation()
.withWorkspaceId(standardSyncOperation.getWorkspaceId())
.withOperationId(WEBHOOK_OPERATION_ID)
.withName(WEBHOOK_OPERATION_NAME)
.withOperatorType(StandardSyncOperation.OperatorType.WEBHOOK)
.withOperatorWebhook(new io.airbyte.config.OperatorWebhook()
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
.withExecutionUrl(String.format("https://cloud.getdbt.com/api/v2/accounts/%d/jobs/%d/run/", DBT_CLOUD_WEBHOOK_ACCOUNT_ID,
DBT_CLOUD_WEBHOOK_JOB_ID))
.withExecutionBody("{\"cause\": \"airbyte\"}"))
.withTombstone(false);

when(configRepository.getStandardSyncOperation(WEBHOOK_OPERATION_ID)).thenReturn(expectedPersistedOperation);

final OperationRead actualOperationRead = operationsHandler.createOperation(operationCreate);

assertEquals(operationCreate.getWorkspaceId(), actualOperationRead.getWorkspaceId());
assertEquals(WEBHOOK_OPERATION_ID, actualOperationRead.getOperationId());
assertEquals(WEBHOOK_OPERATION_NAME, actualOperationRead.getName());
assertEquals(OperatorType.WEBHOOK, actualOperationRead.getOperatorConfiguration().getOperatorType());
assertEquals(webhookConfig, actualOperationRead.getOperatorConfiguration().getWebhook());

verify(configRepository).writeStandardSyncOperation(eq(expectedPersistedOperation));
}

@Test
void testUpdateOperation() throws JsonValidationException, ConfigNotFoundException, IOException {
final OperationUpdate operationUpdate = new OperationUpdate()
Expand Down Expand Up @@ -190,7 +240,12 @@ void testUpdateWebhookOperation() throws JsonValidationException, ConfigNotFound
final OperatorWebhook webhookConfig = new OperatorWebhook()
.webhookConfigId(WEBHOOK_CONFIG_ID)
.executionUrl(NEW_EXECUTION_URL)
.executionBody(WEBHOOK_EXECUTION_BODY);
.executionBody(WEBHOOK_EXECUTION_BODY)
.webhookType(WebhookTypeEnum.GENERIC)
// NOTE: we double-write until the frontend moves to the new format, so we expect it in both places.
.generic(new OperatorWebhookGeneric()
.executionUrl(NEW_EXECUTION_URL)
.executionBody(WEBHOOK_EXECUTION_BODY));
final OperationUpdate operationUpdate = new OperationUpdate()
.name(WEBHOOK_OPERATION_NAME)
.operationId(WEBHOOK_OPERATION_ID)
Expand Down
Loading