Skip to content

Commit 4e236b5

Browse files
authored
Add Geography support to RouterService (#17902)
* router service uses geography in database instead of env var move geography map to a helper that can be overridden with a separate implementation in cloud format pmd fix import move geography mapper interface to airbyte-commons-temporal * add DefaultGeographyMapper back in * remove all args constructor and extranneous import * rename GeographyMapper to TaskQueueMapper
1 parent e232ffa commit 4e236b5

File tree

13 files changed

+226
-134
lines changed

13 files changed

+226
-134
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.temporal.scheduling;
6+
7+
import io.airbyte.config.Geography;
8+
9+
/**
10+
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given
11+
* Geography.
12+
*/
13+
public interface TaskQueueMapper {
14+
15+
String getTaskQueue(Geography geography);
16+
17+
}

airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -579,16 +579,6 @@ public interface Configs {
579579
*/
580580
boolean shouldRunConnectionManagerWorkflows();
581581

582-
// Worker - Control Plane configs
583-
584-
/**
585-
* TEMPORARY: Define a set of connection IDs that should run in Airbyte's MVP Data Plane. - This
586-
* should only be set on Control-plane workers, since those workers decide which Data Plane task
587-
* queue to use based on connectionId. - Will be removed in favor of the Routing Service in the
588-
* future. Internal-use only.
589-
*/
590-
Set<String> connectionIdsForMvpDataPlane();
591-
592582
// Worker - Data Plane configs
593583

594584
/**

airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ public class EnvConfigs implements Configs {
138138

139139
// Worker - Control plane configs
140140
private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name()
141-
private static final String CONNECTION_IDS_FOR_MVP_DATA_PLANE = "CONNECTION_IDS_FOR_MVP_DATA_PLANE";
142141

143142
// Worker - Data Plane configs
144143
private static final String DATA_SYNC_TASK_QUEUES = "DATA_SYNC_TASK_QUEUES";
@@ -947,17 +946,6 @@ public boolean shouldRunConnectionManagerWorkflows() {
947946
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
948947
}
949948

950-
// Worker - Control plane
951-
952-
@Override
953-
public Set<String> connectionIdsForMvpDataPlane() {
954-
final var connectionIds = getEnvOrDefault(CONNECTION_IDS_FOR_MVP_DATA_PLANE, "");
955-
if (connectionIds.isEmpty()) {
956-
return new HashSet<>();
957-
}
958-
return Arrays.stream(connectionIds.split(",")).collect(Collectors.toSet());
959-
}
960-
961949
// Worker - Data plane
962950

963951
@Override

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.airbyte.config.ConfigSchema;
3434
import io.airbyte.config.DestinationConnection;
3535
import io.airbyte.config.DestinationOAuthParameter;
36+
import io.airbyte.config.Geography;
3637
import io.airbyte.config.SourceConnection;
3738
import io.airbyte.config.SourceOAuthParameter;
3839
import io.airbyte.config.StandardDestinationDefinition;
@@ -1122,4 +1123,12 @@ public ConfiguredAirbyteCatalog getConfiguredCatalogForConnection(final UUID con
11221123
return standardSync.getCatalog();
11231124
}
11241125

1126+
public Geography getGeographyForConnection(final UUID connectionId) throws IOException {
1127+
return database.query(ctx -> ctx.select(CONNECTION.GEOGRAPHY)
1128+
.from(CONNECTION)
1129+
.where(CONNECTION.ID.eq(connectionId))
1130+
.limit(1))
1131+
.fetchOneInto(Geography.class);
1132+
}
1133+
11251134
}

airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.airbyte.config.ActorCatalog;
1919
import io.airbyte.config.DestinationConnection;
2020
import io.airbyte.config.DestinationOAuthParameter;
21+
import io.airbyte.config.Geography;
2122
import io.airbyte.config.SourceConnection;
2223
import io.airbyte.config.SourceOAuthParameter;
2324
import io.airbyte.config.StandardDestinationDefinition;
@@ -491,4 +492,13 @@ void testGetDestinationAndDefinitionsFromDestinationIds() throws IOException {
491492
assertThat(actual).hasSameElementsAs(expected);
492493
}
493494

495+
@Test
496+
void testGetGeographyForConnection() throws IOException {
497+
final StandardSync sync = MockData.standardSyncs().get(0);
498+
final Geography expected = sync.getGeography();
499+
final Geography actual = configRepository.getGeographyForConnection(sync.getConnectionId());
500+
501+
assertEquals(expected, actual);
502+
}
503+
494504
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.temporal.scheduling;
6+
7+
import com.google.common.annotations.VisibleForTesting;
8+
import io.airbyte.commons.temporal.TemporalJobType;
9+
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
10+
import io.airbyte.config.Geography;
11+
import jakarta.inject.Singleton;
12+
import java.util.Map;
13+
14+
@Singleton
15+
public class DefaultTaskQueueMapper implements TaskQueueMapper {
16+
17+
@VisibleForTesting
18+
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name();
19+
20+
// By default, map every Geography value to the default task queue.
21+
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation.
22+
@VisibleForTesting
23+
static final Map<Geography, String> GEOGRAPHY_TASK_QUEUE_MAP = Map.of(
24+
Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE,
25+
Geography.US, DEFAULT_SYNC_TASK_QUEUE,
26+
Geography.EU, DEFAULT_SYNC_TASK_QUEUE);
27+
28+
@Override
29+
public String getTaskQueue(final Geography geography) {
30+
if (GEOGRAPHY_TASK_QUEUE_MAP.containsKey(geography)) {
31+
return GEOGRAPHY_TASK_QUEUE_MAP.get(geography);
32+
}
33+
34+
throw new IllegalArgumentException(String.format("Unexpected geography %s", geography));
35+
}
36+
37+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.temporal.scheduling;
6+
7+
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
8+
import io.airbyte.config.Geography;
9+
import io.airbyte.config.persistence.ConfigRepository;
10+
import jakarta.inject.Singleton;
11+
import java.io.IOException;
12+
import java.util.UUID;
13+
import lombok.extern.slf4j.Slf4j;
14+
15+
/**
16+
* Decides which Task Queue should be used for a given connection's sync operations, based on the
17+
* configured {@link Geography}
18+
*/
19+
@Singleton
20+
@Slf4j
21+
public class RouterService {
22+
23+
private final ConfigRepository configRepository;
24+
private final TaskQueueMapper taskQueueMapper;
25+
26+
public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) {
27+
this.configRepository = configRepository;
28+
this.taskQueueMapper = taskQueueMapper;
29+
}
30+
31+
/**
32+
* Given a connectionId, look up the connection's configured {@link Geography} in the config DB and
33+
* use it to determine which Task Queue should be used for this connection's sync.
34+
*/
35+
public String getTaskQueue(final UUID connectionId) throws IOException {
36+
final Geography geography = configRepository.getGeographyForConnection(connectionId);
37+
return taskQueueMapper.getTaskQueue(geography);
38+
}
39+
40+
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@
88
import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY;
99

1010
import datadog.trace.api.Trace;
11+
import io.airbyte.commons.temporal.exception.RetryableException;
1112
import io.airbyte.metrics.lib.ApmTraceUtils;
12-
import io.airbyte.workers.temporal.sync.RouterService;
13+
import io.airbyte.workers.temporal.scheduling.RouterService;
1314
import jakarta.inject.Singleton;
15+
import java.io.IOException;
1416
import java.util.Map;
17+
import lombok.extern.slf4j.Slf4j;
1518

19+
@Slf4j
1620
@Singleton
1721
public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity {
1822

@@ -27,9 +31,14 @@ public RouteToSyncTaskQueueActivityImpl(final RouterService routerService) {
2731
public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) {
2832
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId()));
2933

30-
final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId());
34+
try {
35+
final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId());
3136

32-
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
37+
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
38+
} catch (final IOException e) {
39+
log.warn("Encountered an error while attempting to route connection {} to a task queue: \n{}", input.getConnectionId(), e);
40+
throw new RetryableException(e);
41+
}
3342
}
3443

3544
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

airbyte-workers/src/main/resources/application.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ airbyte:
6464
sync:
6565
task-queue: ${DATA_SYNC_TASK_QUEUES:SYNC}
6666
plane:
67-
connection-ids-mvp: ${CONNECTION_IDS_FOR_MVP_DATA_PLANE:}
6867
service-account:
6968
credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:}
7069
email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:}

0 commit comments

Comments
 (0)