Skip to content

Notifications Workflow #18735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e74764d
new notification workflow
alovew Oct 31, 2022
29004de
add schema change notification methods
alovew Oct 31, 2022
d37b8f4
add params
alovew Oct 31, 2022
e7e4f9e
add functionality
alovew Nov 1, 2022
9ec0f4b
add workflow
alovew Nov 3, 2022
53f3514
update tests
alovew Nov 3, 2022
71d35e1
updates
alovew Nov 4, 2022
778becf
add wip
alovew Nov 4, 2022
aaff7e8
use api endpoint
alovew Nov 7, 2022
3790d4f
remove logs
alovew Nov 7, 2022
cc0f1bb
formatting, removing loggin
alovew Nov 7, 2022
2740661
pmd and formatting
alovew Nov 7, 2022
3dd1f7c
Merge branch 'master' into anne/notifications-workflow
alovew Nov 7, 2022
08a2325
Merge branch 'master' into anne/notifications-workflow
alovew Nov 8, 2022
584a9f6
Merge branch 'master' into anne/notifications-workflow
alovew Nov 8, 2022
d10c20d
Merge branch 'master' into anne/notifications-workflow
alovew Nov 8, 2022
3000294
Merge branch 'master' into anne/notifications-workflow
alovew Nov 8, 2022
943a64b
ternary for rendering template
alovew Nov 8, 2022
aa1b182
formatting
alovew Nov 8, 2022
f682ab1
singleton annotation
alovew Nov 8, 2022
419f67a
Merge branch 'master' into anne/notifications-workflow
alovew Nov 8, 2022
7afb9e3
default env variable to false
alovew Nov 9, 2022
6719670
default to false
alovew Nov 9, 2022
848e999
add to docker compose
alovew Nov 10, 2022
5b8a45e
more places
alovew Nov 10, 2022
d41ab7c
add one more place
alovew Nov 10, 2022
4c164ed
fetch connection in activity
alovew Nov 14, 2022
7e69177
add config fetch activity to list
alovew Nov 14, 2022
f1224c7
formatting
alovew Nov 15, 2022
acb5ab5
fix test
alovew Nov 15, 2022
1dbfb64
add finals
alovew Nov 15, 2022
a4d68b1
format
alovew Nov 15, 2022
6f4c250
add max notify workers
alovew Nov 15, 2022
a32c0e3
Merge branch 'master' into anne/notifications-workflow
alovew Nov 15, 2022
751492a
Merge branch 'master' into anne/notifications-workflow
alovew Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
MAX_CHECK_WORKERS=5
MAX_DISCOVER_WORKERS=5
MAX_NOTIFY_WORKERS=5
SHOULD_RUN_NOTIFY_WORKFLOWS=false
# Temporal Activity configuration
ACTIVITY_MAX_ATTEMPT=
ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=
Expand Down
2 changes: 2 additions & 0 deletions airbyte-commons-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies {
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-worker-models')
implementation project(':airbyte-api')
implementation project(':airbyte-json-validation')

testImplementation libs.temporal.testing
// Needed to be able to mock final class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ public enum TemporalJobType {
SYNC,
RESET_CONNECTION,
CONNECTION_UPDATER,
REPLICATE
REPLICATE,
NOTIFY
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.io.IOException;
import java.util.UUID;

@WorkflowInterface
public interface ConnectionNotificationWorkflow {

@WorkflowMethod
boolean sendSchemaChangeNotification(UUID connectionId)
throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,11 @@ public interface Configs {
*/
boolean shouldRunConnectionManagerWorkflows();

/**
* Define if the worker should run notification workflows. Defaults to true. Internal-use only.
*/
public boolean shouldRunNotifyWorkflows();

// Worker - Data Plane configs

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class EnvConfigs implements Configs {
public static final String MAX_CHECK_WORKERS = "MAX_CHECK_WORKERS";
public static final String MAX_DISCOVER_WORKERS = "MAX_DISCOVER_WORKERS";
public static final String MAX_SYNC_WORKERS = "MAX_SYNC_WORKERS";
public static final String MAX_NOTIFY_WORKERS = "MAX_NOTIFY_WORKERS";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String TEMPORAL_HISTORY_RETENTION_IN_DAYS = "TEMPORAL_HISTORY_RETENTION_IN_DAYS";
Expand Down Expand Up @@ -135,6 +136,7 @@ public class EnvConfigs implements Configs {
private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS";
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";
private static final String SHOULD_RUN_NOTIFY_WORKFLOWS = "SHOULD_RUN_NOTIFY_WORKFLOWS";

// Worker - Control plane configs
private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name()
Expand Down Expand Up @@ -198,6 +200,7 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_CHECK_WORKERS = 5;
private static final long DEFAULT_MAX_DISCOVER_WORKERS = 5;
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
Expand Down Expand Up @@ -918,7 +921,8 @@ public MaxWorkersConfig getMaxWorkers() {
Math.toIntExact(getEnvOrDefault(MAX_SPEC_WORKERS, DEFAULT_MAX_SPEC_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_CHECK_WORKERS, DEFAULT_MAX_CHECK_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_DISCOVER_WORKERS, DEFAULT_MAX_DISCOVER_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS)));
Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_NOTIFY_WORKERS, DEFAULT_MAX_NOTIFY_WORKERS)));
}

@Override
Expand Down Expand Up @@ -946,6 +950,11 @@ public boolean shouldRunConnectionManagerWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
}

@Override
public boolean shouldRunNotifyWorkflows() {
return getEnvOrDefault(SHOULD_RUN_NOTIFY_WORKFLOWS, false);
}

// Worker - Data plane

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ public class MaxWorkersConfig {
private final int maxCheckWorkers;
private final int maxDiscoverWorkers;
private final int maxSyncWorkers;
private final int maxNotifyWorkers;

public MaxWorkersConfig(final int maxSpecWorkers, final int maxCheckWorkers, final int maxDiscoverWorkers, final int maxSyncWorkers) {
public MaxWorkersConfig(final int maxSpecWorkers,
final int maxCheckWorkers,
final int maxDiscoverWorkers,
final int maxSyncWorkers,
final int maxNotifyWorkers) {
this.maxSpecWorkers = maxSpecWorkers;
this.maxCheckWorkers = maxCheckWorkers;
this.maxDiscoverWorkers = maxDiscoverWorkers;
this.maxSyncWorkers = maxSyncWorkers;
this.maxNotifyWorkers = maxNotifyWorkers;
}

public int getMaxSpecWorkers() {
Expand All @@ -34,13 +40,18 @@ public int getMaxSyncWorkers() {
return maxSyncWorkers;
}

public int getMaxNotifyWorkers() {
return maxNotifyWorkers;
}

@Override
public String toString() {
return "MaxWorkersConfig{" +
"maxSpecWorkers=" + maxSpecWorkers +
", maxCheckWorkers=" + maxCheckWorkers +
", maxDiscoverWorkers=" + maxDiscoverWorkers +
", maxSyncWorkers=" + maxSyncWorkers +
", maxNotifyWorkers=" + maxNotifyWorkers +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public boolean notifyFailure(final String message) throws IOException, Interrupt
throw new NotImplementedException();
}

@Override
public boolean notifySchemaChange(final UUID connectionId, final boolean isBreaking) {
throw new NotImplementedException();
}

private boolean notifyByEmail(final String requestBody) throws IOException, InterruptedException {
final HttpRequest request = HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract boolean notifyConnectionDisableWarning(String receiverEmail,

public abstract boolean notifyFailure(String message) throws IOException, InterruptedException;

public abstract boolean notifySchemaChange(UUID connectionId, boolean isBreaking) throws IOException, InterruptedException;

public static NotificationClient createNotificationClient(final Notification notification) {
return switch (notification.getNotificationType()) {
case SLACK -> new SlackNotificationClient(notification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.notification;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -33,11 +34,9 @@ public class SlackNotificationClient extends NotificationClient {

private static final Logger LOGGER = LoggerFactory.getLogger(SlackNotificationClient.class);

private final HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
private final SlackNotificationConfiguration config;

@JsonCreator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was throwing an error when I tried to construct the SlackNotificationClient without this:

Notification notification = new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false)
          .withSlackConfiguration(slackConfig.get());
      SlackNotificationClient notificationClient = new SlackNotificationClient(notification);
      return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, isBreaking);

public SlackNotificationClient(final Notification notification) {
super(notification);
this.config = notification.getSlackConfiguration();
Expand Down Expand Up @@ -121,7 +120,22 @@ public boolean notifyConnectionDisableWarning(final String receiverEmail,
return false;
}

@Override
public boolean notifySchemaChange(UUID connectionId, boolean isBreaking) throws IOException, InterruptedException {
final String message = renderTemplate(
isBreaking ? "slack/breaking_schema_change_notification_template.txt" : "slack/non_breaking_schema_change_notification_template.txt",
connectionId.toString());
final String webhookUrl = config.getWebhook();
if (!Strings.isEmpty(webhookUrl)) {
return notify(message);
}
return false;
}

private boolean notify(final String message) throws IOException, InterruptedException {
final HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
final ImmutableMap<String, String> body = new Builder<String, String>()
.put("text", message)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Your source schema has changed for connection ID: %s

Airbyte has disabled this connection because this source schema change will cause broken syncs. Visit your connection page, refresh your source schema, and reset your data in order to fix this connection.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Your source schema has changed for connection ID: %s

Visit your connection page, refresh your source schema, and reset your data in order to update this connection.
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-notification')
implementation (project(':airbyte-persistence:job-persistence')) {
// Temporary hack to avoid dependency conflicts
exclude group: 'io.micronaut'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl;
import io.airbyte.workers.temporal.scheduling.ConnectionNotificationWorkflowImpl;
import io.airbyte.workers.temporal.spec.SpecWorkflowImpl;
import io.airbyte.workers.temporal.support.TemporalProxyHelper;
import io.airbyte.workers.temporal.sync.SyncWorkflowImpl;
Expand Down Expand Up @@ -77,6 +78,11 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
@Inject
@Named("discoverActivities")
private Optional<List<Object>> discoverActivities;

@Inject
@Named("notifyActivities")
private Optional<List<Object>> notifyActivities;

@Inject
@Named(TaskExecutors.IO)
private ExecutorService executorService;
Expand All @@ -91,6 +97,8 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
private Optional<LogConfigs> logConfigs;
@Value("${airbyte.worker.check.max-workers}")
private Integer maxCheckWorkers;
@Value("${airbyte.worker.notify.max-workers}")
private Integer maxNotifyWorkers;
@Value("${airbyte.worker.discover.max-workers}")
private Integer maxDiscoverWorkers;
@Value("${airbyte.worker.spec.max-workers}")
Expand All @@ -107,6 +115,9 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
private boolean shouldRunGetSpecWorkflows;
@Value("${airbyte.worker.sync.enabled}")
private boolean shouldRunSyncWorkflows;
@Value("${airbyte.worker.sync.enabled}")
private boolean shouldRunNotifyWorkflows;

@Inject
@Named("specActivities")
private Optional<List<Object>> specActivities;
Expand Down Expand Up @@ -148,7 +159,7 @@ public void onApplicationEvent(final ServiceReadyEvent event) {

registerWorkerFactory(workerFactory,
new MaxWorkersConfig(maxCheckWorkers, maxDiscoverWorkers, maxSpecWorkers,
maxSyncWorkers));
maxSyncWorkers, maxNotifyWorkers));

log.info("Starting worker factory...");
workerFactory.start();
Expand Down Expand Up @@ -220,6 +231,18 @@ private void registerWorkerFactory(final WorkerFactory workerFactory,
if (shouldRunConnectionManagerWorkflows) {
registerConnectionManager(workerFactory, maxWorkersConfiguration);
}

if (shouldRunNotifyWorkflows) {
registerConnectionNotification(workerFactory, maxWorkersConfiguration);
}
}

private void registerConnectionNotification(final WorkerFactory factory, final MaxWorkersConfig maxWorkersConfig) {
final Worker notifyWorker = factory.newWorker(TemporalJobType.NOTIFY.name(), getWorkerOptions(maxWorkersConfig.getMaxNotifyWorkers()));
final WorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder().setFailWorkflowExceptionTypes(NonDeterministicException.class).build();
notifyWorker.registerWorkflowImplementationTypes(options, temporalProxyHelper.proxyWorkflowClass(ConnectionNotificationWorkflowImpl.class));
notifyWorker.registerActivitiesImplementations(notifyActivities.orElseThrow().toArray(new Object[] {}));
}

private void registerCheckConnection(final WorkerFactory factory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import io.airbyte.workers.temporal.scheduling.activities.ConnectionDeletionActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity;
import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity;
import io.airbyte.workers.temporal.scheduling.activities.RouteToSyncTaskQueueActivity;
import io.airbyte.workers.temporal.scheduling.activities.SlackConfigActivity;
import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity;
import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity;
import io.airbyte.workers.temporal.spec.SpecActivity;
Expand Down Expand Up @@ -52,6 +54,15 @@ public List<Object> checkConnectionActivities(
return List.of(checkConnectionActivity);
}

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("notifyActivities")
public List<Object> notifyActivities(final NotifySchemaChangeActivity notifySchemaChangeActivity,
SlackConfigActivity slackConfigActivity,
ConfigFetchActivity configFetchActivity) {
return List.of(notifySchemaChangeActivity, slackConfigActivity, configFetchActivity);
}

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("connectionManagerActivities")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow;
import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.SlackNotificationConfiguration;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.notification.SlackNotificationClient;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity;
import io.airbyte.workers.temporal.scheduling.activities.SlackConfigActivity;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ConnectionNotificationWorkflowImpl implements ConnectionNotificationWorkflow {

@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private NotifySchemaChangeActivity notifySchemaChangeActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private SlackConfigActivity slackConfigActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Override
public boolean sendSchemaChangeNotification(final UUID connectionId)
throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException {
final StandardSync standardSync = configFetchActivity.getStandardSync(connectionId);
final Optional<SlackNotificationConfiguration> slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId);
if (slackConfig.isPresent()) {
final Notification notification =
new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false)
.withSlackConfiguration(slackConfig.get());
final SlackNotificationClient notificationClient = new SlackNotificationClient(notification);
return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, standardSync.getBreakingChange());
} else {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -33,6 +37,8 @@ class ScheduleRetrieverOutput {

}

StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException;

/**
* Return how much time to wait before running the next sync. It will query the DB to get the last
* starting time of the latest terminal job (Failed, canceled or successful) and return the amount
Expand Down
Loading