-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Add Geography support to RouterService #17902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7a0eedd
2c87d34
05518c9
53045c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.commons.temporal.scheduling; | ||
|
||
import io.airbyte.config.Geography; | ||
|
||
/** | ||
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given | ||
* Geography. | ||
*/ | ||
public interface TaskQueueMapper { | ||
|
||
String getTaskQueue(Geography geography); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import io.airbyte.commons.temporal.TemporalJobType; | ||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import jakarta.inject.Singleton; | ||
import java.util.Map; | ||
|
||
@Singleton | ||
public class DefaultTaskQueueMapper implements TaskQueueMapper { | ||
|
||
@VisibleForTesting | ||
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name(); | ||
|
||
// By default, map every Geography value to the default task queue. | ||
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation. | ||
@VisibleForTesting | ||
static final Map<Geography, String> GEOGRAPHY_TASK_QUEUE_MAP = Map.of( | ||
Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE, | ||
Geography.US, DEFAULT_SYNC_TASK_QUEUE, | ||
Geography.EU, DEFAULT_SYNC_TASK_QUEUE); | ||
|
||
@Override | ||
public String getTaskQueue(final Geography geography) { | ||
if (GEOGRAPHY_TASK_QUEUE_MAP.containsKey(geography)) { | ||
return GEOGRAPHY_TASK_QUEUE_MAP.get(geography); | ||
} | ||
|
||
throw new IllegalArgumentException(String.format("Unexpected geography %s", geography)); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This diff shows up as a brand new file because I moved the RouterService from This diff also contains the new functionality of reading the connection's Geography from the database instead of from the environment variable |
||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import jakarta.inject.Singleton; | ||
import java.io.IOException; | ||
import java.util.UUID; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* Decides which Task Queue should be used for a given connection's sync operations, based on the | ||
* configured {@link Geography} | ||
*/ | ||
@Singleton | ||
@Slf4j | ||
public class RouterService { | ||
|
||
private final ConfigRepository configRepository; | ||
private final TaskQueueMapper taskQueueMapper; | ||
|
||
public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) { | ||
this.configRepository = configRepository; | ||
this.taskQueueMapper = taskQueueMapper; | ||
} | ||
|
||
/** | ||
* Given a connectionId, look up the connection's configured {@link Geography} in the config DB and | ||
* use it to determine which Task Queue should be used for this connection's sync. | ||
*/ | ||
public String getTaskQueue(final UUID connectionId) throws IOException { | ||
final Geography geography = configRepository.getGeographyForConnection(connectionId); | ||
return taskQueueMapper.getTaskQueue(geography); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,11 +8,15 @@ | |
import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; | ||
|
||
import datadog.trace.api.Trace; | ||
import io.airbyte.commons.temporal.exception.RetryableException; | ||
import io.airbyte.metrics.lib.ApmTraceUtils; | ||
import io.airbyte.workers.temporal.sync.RouterService; | ||
import io.airbyte.workers.temporal.scheduling.RouterService; | ||
import jakarta.inject.Singleton; | ||
import java.io.IOException; | ||
import java.util.Map; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
@Singleton | ||
public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity { | ||
|
||
|
@@ -27,9 +31,14 @@ public RouteToSyncTaskQueueActivityImpl(final RouterService routerService) { | |
public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) { | ||
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId())); | ||
|
||
final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId()); | ||
try { | ||
final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId()); | ||
|
||
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); | ||
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); | ||
} catch (final IOException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that the routerService queries the database, an IOException is possible. Throw a RetryableException if encountered There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remind me again, where do we catch the RetryableException? I can't look since as my IntelliJ is pooping out on me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Asked here: https://airbytehq-team.slack.com/archives/C03AS1GAQV6/p1666217128557719 I think it extends |
||
log.warn("Encountered an error while attempting to route connection {} to a task queue: \n{}", input.getConnectionId(), e); | ||
throw new RetryableException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: log.warn here? |
||
} | ||
} | ||
|
||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import java.util.Arrays; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class DefaultTaskQueueMapperTest { | ||
|
||
@Test | ||
void testGetTaskQueue() { | ||
// By default, every Geography should map to the default SYNC task queue | ||
final TaskQueueMapper mapper = new DefaultTaskQueueMapper(); | ||
|
||
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO)); | ||
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US)); | ||
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU)); | ||
} | ||
|
||
/** | ||
* If this test fails, it likely means that a new value was added to the {@link Geography} enum. A | ||
* new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this | ||
* test to pass. | ||
*/ | ||
@Test | ||
void testAllGeographiesHaveAMapping() { | ||
final Set<Geography> allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet()); | ||
final Set<Geography> mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet(); | ||
|
||
assertEquals(allGeographies, mappedGeographies); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import java.io.IOException; | ||
import java.util.UUID; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.Mock; | ||
import org.mockito.Mockito; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
|
||
/** | ||
* Test suite for the {@link RouterService} class. | ||
*/ | ||
@ExtendWith(MockitoExtension.class) | ||
class RouterServiceTest { | ||
|
||
private static final UUID CONNECTION_ID = UUID.randomUUID(); | ||
private static final String US_TASK_QUEUE = "US_TASK_QUEUE"; | ||
private static final String EU_TASK_QUEUE = "EU_TASK_QUEUE"; | ||
|
||
@Mock | ||
private ConfigRepository mConfigRepository; | ||
|
||
@Mock | ||
private TaskQueueMapper mTaskQueueMapper; | ||
|
||
private RouterService routerService; | ||
|
||
@BeforeEach | ||
void init() { | ||
routerService = new RouterService(mConfigRepository, mTaskQueueMapper); | ||
|
||
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE); | ||
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE); | ||
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE); | ||
} | ||
|
||
@Test | ||
void testGetTaskQueue() throws IOException { | ||
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO); | ||
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); | ||
|
||
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.US); | ||
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); | ||
|
||
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.EU); | ||
assertEquals(EU_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1