Skip to content

Add Geography support to RouterService #17902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.config.Geography;

/**
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given
* Geography.
*/
public interface TaskQueueMapper {

String getTaskQueue(Geography geography);

}
Original file line number Diff line number Diff line change
Expand Up @@ -579,16 +579,6 @@ public interface Configs {
*/
boolean shouldRunConnectionManagerWorkflows();

// Worker - Control Plane configs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


/**
* TEMPORARY: Define a set of connection IDs that should run in Airbyte's MVP Data Plane. - This
* should only be set on Control-plane workers, since those workers decide which Data Plane task
* queue to use based on connectionId. - Will be removed in favor of the Routing Service in the
* future. Internal-use only.
*/
Set<String> connectionIdsForMvpDataPlane();

// Worker - Data Plane configs

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ public class EnvConfigs implements Configs {

// Worker - Control plane configs
private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name()
private static final String CONNECTION_IDS_FOR_MVP_DATA_PLANE = "CONNECTION_IDS_FOR_MVP_DATA_PLANE";

// Worker - Data Plane configs
private static final String DATA_SYNC_TASK_QUEUES = "DATA_SYNC_TASK_QUEUES";
Expand Down Expand Up @@ -947,17 +946,6 @@ public boolean shouldRunConnectionManagerWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
}

// Worker - Control plane

@Override
public Set<String> connectionIdsForMvpDataPlane() {
final var connectionIds = getEnvOrDefault(CONNECTION_IDS_FOR_MVP_DATA_PLANE, "");
if (connectionIds.isEmpty()) {
return new HashSet<>();
}
return Arrays.stream(connectionIds.split(",")).collect(Collectors.toSet());
}

// Worker - Data plane

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.Geography;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
Expand Down Expand Up @@ -1122,4 +1123,12 @@ public ConfiguredAirbyteCatalog getConfiguredCatalogForConnection(final UUID con
return standardSync.getCatalog();
}

public Geography getGeographyForConnection(final UUID connectionId) throws IOException {
return database.query(ctx -> ctx.select(CONNECTION.GEOGRAPHY)
.from(CONNECTION)
.where(CONNECTION.ID.eq(connectionId))
.limit(1))
.fetchOneInto(Geography.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.Geography;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
Expand Down Expand Up @@ -491,4 +492,13 @@ void testGetDestinationAndDefinitionsFromDestinationIds() throws IOException {
assertThat(actual).hasSameElementsAs(expected);
}

@Test
void testGetGeographyForConnection() throws IOException {
final StandardSync sync = MockData.standardSyncs().get(0);
final Geography expected = sync.getGeography();
final Geography actual = configRepository.getGeographyForConnection(sync.getConnectionId());

assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import jakarta.inject.Singleton;
import java.util.Map;

@Singleton
public class DefaultTaskQueueMapper implements TaskQueueMapper {

@VisibleForTesting
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name();

// By default, map every Geography value to the default task queue.
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation.
@VisibleForTesting
static final Map<Geography, String> GEOGRAPHY_TASK_QUEUE_MAP = Map.of(
Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE,
Geography.US, DEFAULT_SYNC_TASK_QUEUE,
Geography.EU, DEFAULT_SYNC_TASK_QUEUE);

@Override
public String getTaskQueue(final Geography geography) {
if (GEOGRAPHY_TASK_QUEUE_MAP.containsKey(geography)) {
return GEOGRAPHY_TASK_QUEUE_MAP.get(geography);
}

throw new IllegalArgumentException(String.format("Unexpected geography %s", geography));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This diff shows up as a brand new file because I moved the RouterService from /sync/ to /scheduling/` because it's used by the ConnectionManagerWorkflow, not the SyncWorkflow.

This diff also contains the new functionality of reading the connection's Geography from the database instead of from the environment variable

* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

/**
* Decides which Task Queue should be used for a given connection's sync operations, based on the
* configured {@link Geography}
*/
@Singleton
@Slf4j
public class RouterService {

private final ConfigRepository configRepository;
private final TaskQueueMapper taskQueueMapper;

public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) {
this.configRepository = configRepository;
this.taskQueueMapper = taskQueueMapper;
}

/**
* Given a connectionId, look up the connection's configured {@link Geography} in the config DB and
* use it to determine which Task Queue should be used for this connection's sync.
*/
public String getTaskQueue(final UUID connectionId) throws IOException {
final Geography geography = configRepository.getGeographyForConnection(connectionId);
return taskQueueMapper.getTaskQueue(geography);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.workers.temporal.sync.RouterService;
import io.airbyte.workers.temporal.scheduling.RouterService;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity {

Expand All @@ -27,9 +31,14 @@ public RouteToSyncTaskQueueActivityImpl(final RouterService routerService) {
public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId()));

final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId());
try {
final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId());

return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
} catch (final IOException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the routerService queries the database, an IOException is possible. Throw a RetryableException if encountered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind me again, where do we catch the RetryableException? I can't look since as my IntelliJ is pooping out on me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asked here: https://airbytehq-team.slack.com/archives/C03AS1GAQV6/p1666217128557719

I think it extends RuntimeException which means Temporal knows to retry activities that fail with it as a cause, but it isn't super clear

log.warn("Encountered an error while attempting to route connection {} to a task queue: \n{}", input.getConnectionId(), e);
throw new RetryableException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: log.warn here?

}
}

}

This file was deleted.

1 change: 0 additions & 1 deletion airbyte-workers/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ airbyte:
sync:
task-queue: ${DATA_SYNC_TASK_QUEUES:SYNC}
plane:
connection-ids-mvp: ${CONNECTION_IDS_FOR_MVP_DATA_PLANE:}
service-account:
credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:}
email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;

class DefaultTaskQueueMapperTest {

@Test
void testGetTaskQueue() {
// By default, every Geography should map to the default SYNC task queue
final TaskQueueMapper mapper = new DefaultTaskQueueMapper();

assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO));
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US));
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU));
}

/**
* If this test fails, it likely means that a new value was added to the {@link Geography} enum. A
* new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this
* test to pass.
*/
@Test
void testAllGeographiesHaveAMapping() {
final Set<Geography> allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet());
final Set<Geography> mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet();

assertEquals(allGeographies, mappedGeographies);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import java.io.IOException;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

/**
* Test suite for the {@link RouterService} class.
*/
@ExtendWith(MockitoExtension.class)
class RouterServiceTest {

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final String US_TASK_QUEUE = "US_TASK_QUEUE";
private static final String EU_TASK_QUEUE = "EU_TASK_QUEUE";

@Mock
private ConfigRepository mConfigRepository;

@Mock
private TaskQueueMapper mTaskQueueMapper;

private RouterService routerService;

@BeforeEach
void init() {
routerService = new RouterService(mConfigRepository, mTaskQueueMapper);

Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE);
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE);
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE);
}

@Test
void testGetTaskQueue() throws IOException {
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO);
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID));

Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.US);
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID));

Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.EU);
assertEquals(EU_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID));
}

}
Loading