Skip to content

Commit 0301e31

Browse files
committed
rename GeographyMapper to TaskQueueMapper
1 parent fade942 commit 0301e31

File tree

6 files changed

+26
-23
lines changed

6 files changed

+26
-23
lines changed

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/GeographyMapper.java renamed to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given
1111
* Geography.
1212
*/
13-
public interface GeographyMapper {
13+
public interface TaskQueueMapper {
1414

1515
String getTaskQueue(Geography geography);
1616

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultGeographyMapper.java renamed to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@
66

77
import com.google.common.annotations.VisibleForTesting;
88
import io.airbyte.commons.temporal.TemporalJobType;
9-
import io.airbyte.commons.temporal.scheduling.GeographyMapper;
9+
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
1010
import io.airbyte.config.Geography;
1111
import jakarta.inject.Singleton;
1212
import java.util.Map;
1313

1414
@Singleton
15-
public class DefaultGeographyMapper implements GeographyMapper {
15+
public class DefaultTaskQueueMapper implements TaskQueueMapper {
1616

1717
@VisibleForTesting
1818
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name();
1919

2020
// By default, map every Geography value to the default task queue.
21-
// To override this behavior, define a new GeographyMapper bean with the @Primary annotation.
21+
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation.
2222
@VisibleForTesting
2323
static final Map<Geography, String> GEOGRAPHY_TASK_QUEUE_MAP = Map.of(
2424
Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE,

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.workers.temporal.scheduling;
66

7-
import io.airbyte.commons.temporal.scheduling.GeographyMapper;
7+
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
88
import io.airbyte.config.Geography;
99
import io.airbyte.config.persistence.ConfigRepository;
1010
import jakarta.inject.Singleton;
@@ -21,11 +21,11 @@
2121
public class RouterService {
2222

2323
private final ConfigRepository configRepository;
24-
private final GeographyMapper geographyMapper;
24+
private final TaskQueueMapper taskQueueMapper;
2525

26-
public RouterService(final ConfigRepository configRepository, final GeographyMapper geographyMapper) {
26+
public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) {
2727
this.configRepository = configRepository;
28-
this.geographyMapper = geographyMapper;
28+
this.taskQueueMapper = taskQueueMapper;
2929
}
3030

3131
/**
@@ -34,7 +34,7 @@ public RouterService(final ConfigRepository configRepository, final GeographyMap
3434
*/
3535
public String getTaskQueue(final UUID connectionId) throws IOException {
3636
final Geography geography = configRepository.getGeographyForConnection(connectionId);
37-
return geographyMapper.getTaskQueue(geography);
37+
return taskQueueMapper.getTaskQueue(geography);
3838
}
3939

4040
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import jakarta.inject.Singleton;
1515
import java.io.IOException;
1616
import java.util.Map;
17+
import lombok.extern.slf4j.Slf4j;
1718

19+
@Slf4j
1820
@Singleton
1921
public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity {
2022

@@ -34,6 +36,7 @@ public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) {
3436

3537
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
3638
} catch (final IOException e) {
39+
log.warn("Encountered an error while attempting to route connection {} to a task queue: \n{}", input.getConnectionId(), e);
3740
throw new RetryableException(e);
3841
}
3942
}
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,34 @@
66

77
import static org.junit.jupiter.api.Assertions.assertEquals;
88

9-
import io.airbyte.commons.temporal.scheduling.GeographyMapper;
9+
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
1010
import io.airbyte.config.Geography;
1111
import java.util.Arrays;
1212
import java.util.Set;
1313
import java.util.stream.Collectors;
1414
import org.junit.jupiter.api.Test;
1515

16-
class DefaultGeographyMapperTest {
16+
class DefaultTaskQueueMapperTest {
1717

1818
@Test
1919
void testGetTaskQueue() {
2020
// By default, every Geography should map to the default SYNC task queue
21-
final GeographyMapper mapper = new DefaultGeographyMapper();
21+
final TaskQueueMapper mapper = new DefaultTaskQueueMapper();
2222

23-
assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO));
24-
assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US));
25-
assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU));
23+
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO));
24+
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US));
25+
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU));
2626
}
2727

2828
/**
2929
* If this test fails, it likely means that a new value was added to the {@link Geography} enum. A
30-
* new entry must be added to {@link DefaultGeographyMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this
30+
* new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this
3131
* test to pass.
3232
*/
3333
@Test
3434
void testAllGeographiesHaveAMapping() {
3535
final Set<Geography> allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet());
36-
final Set<Geography> mappedGeographies = DefaultGeographyMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet();
36+
final Set<Geography> mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet();
3737

3838
assertEquals(allGeographies, mappedGeographies);
3939
}

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import static org.junit.jupiter.api.Assertions.assertEquals;
88

9-
import io.airbyte.commons.temporal.scheduling.GeographyMapper;
9+
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
1010
import io.airbyte.config.Geography;
1111
import io.airbyte.config.persistence.ConfigRepository;
1212
import java.io.IOException;
@@ -32,17 +32,17 @@ class RouterServiceTest {
3232
private ConfigRepository mConfigRepository;
3333

3434
@Mock
35-
private GeographyMapper mGeographyMapper;
35+
private TaskQueueMapper mTaskQueueMapper;
3636

3737
private RouterService routerService;
3838

3939
@BeforeEach
4040
void init() {
41-
routerService = new RouterService(mConfigRepository, mGeographyMapper);
41+
routerService = new RouterService(mConfigRepository, mTaskQueueMapper);
4242

43-
Mockito.when(mGeographyMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE);
44-
Mockito.when(mGeographyMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE);
45-
Mockito.when(mGeographyMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE);
43+
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE);
44+
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE);
45+
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE);
4646
}
4747

4848
@Test

0 commit comments

Comments
 (0)