Skip to content

Commit 92e76c9

Browse files
committed
feat: pagination on web_backend/connections/list and server-side filtering (#16836)
1 parent 3e1c772 commit 92e76c9

File tree

40 files changed

+2971
-577
lines changed

40 files changed

+2971
-577
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16825,6 +16825,65 @@ components:
1682516825
type: array
1682616826
items:
1682716827
$ref: "#/components/schemas/DestinationId"
16828+
cursor:
16829+
$ref: "#/components/schemas/ConnectionId"
16830+
sortKey:
16831+
$ref: "#/components/schemas/WebBackendConnectionListSortKey"
16832+
description: Sort key that determines the order of results
16833+
filters:
16834+
$ref: "#/components/schemas/WebBackendConnectionListFilters"
16835+
description: Filters for list of of results
16836+
pageSize:
16837+
type: integer
16838+
description: Number of connections to return in the list
16839+
WebBackendConnectionListSortKey:
16840+
type: string
16841+
enum:
16842+
- connectionName_asc
16843+
- connectionName_desc
16844+
- sourceName_asc
16845+
- sourceName_desc
16846+
- destinationName_asc
16847+
- destinationName_desc
16848+
- lastSync_asc
16849+
- lastSync_desc
16850+
description: Available sort keys for connection list pagination
16851+
WebBackendConnectionListFilters:
16852+
type: object
16853+
properties:
16854+
sourceDefinitionIds:
16855+
type: array
16856+
items:
16857+
$ref: "#/components/schemas/SourceDefinitionId"
16858+
description: Filter connections by source connector definition types
16859+
destinationDefinitionIds:
16860+
type: array
16861+
items:
16862+
$ref: "#/components/schemas/DestinationDefinitionId"
16863+
description: Filter connections by destination connector definition types
16864+
statuses:
16865+
type: array
16866+
items:
16867+
type: string
16868+
enum:
16869+
- healthy
16870+
- failed
16871+
- running
16872+
- paused
16873+
description: Filter connections by their current sync status
16874+
states:
16875+
type: array
16876+
items:
16877+
$ref: "#/components/schemas/ActorStatus"
16878+
description: Filter connections by their active/inactive state
16879+
tagIds:
16880+
type: array
16881+
items:
16882+
$ref: "#/components/schemas/TagId"
16883+
description: Filter connections by tags
16884+
searchTerm:
16885+
type: string
16886+
description: Search term to filter connections by name, source name, destination name, etc.
1682816887
WebBackendConnectionListItem:
1682916888
type: object
1683016889
description: Information about a connection that shows up in the connection list view.
@@ -17017,6 +17076,10 @@ components:
1701717076
type: array
1701817077
items:
1701917078
$ref: "#/components/schemas/WebBackendConnectionListItem"
17079+
page_size:
17080+
type: integer
17081+
num_connections:
17082+
type: integer
1702017083
SyncMode:
1702117084
type: string
1702217085
enum:

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,7 @@ public ConnectionReadList listConnectionsForWorkspaces(final ListConnectionsForW
12551255

12561256
final List<ConnectionRead> connectionReads = Lists.newArrayList();
12571257

1258-
final Map<UUID, List<StandardSync>> workspaceIdToStandardSyncsMap = connectionService.listWorkspaceStandardSyncsPaginated(
1258+
final Map<UUID, List<StandardSync>> workspaceIdToStandardSyncsMap = connectionService.listWorkspaceStandardSyncsLimitOffsetPaginated(
12591259
listConnectionsForWorkspacesRequestBody.getWorkspaceIds(),
12601260
listConnectionsForWorkspacesRequestBody.getTagIds(),
12611261
listConnectionsForWorkspacesRequestBody.getIncludeDeleted(),

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandler.java

Lines changed: 124 additions & 55 deletions
Large diffs are not rendered by default.

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandlerTest.java

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.junit.jupiter.api.Assertions.assertFalse;
1010
import static org.junit.jupiter.api.Assertions.assertNotEquals;
11+
import static org.junit.jupiter.api.Assertions.assertNotNull;
12+
import static org.junit.jupiter.api.Assertions.assertNull;
1113
import static org.junit.jupiter.api.Assertions.assertTrue;
1214
import static org.mockito.ArgumentMatchers.any;
1315
import static org.mockito.ArgumentMatchers.eq;
@@ -23,6 +25,7 @@
2325
import com.fasterxml.jackson.databind.node.ObjectNode;
2426
import com.google.common.collect.Lists;
2527
import io.airbyte.api.model.generated.ActorDefinitionVersionRead;
28+
import io.airbyte.api.model.generated.ActorStatus;
2629
import io.airbyte.api.model.generated.AirbyteCatalog;
2730
import io.airbyte.api.model.generated.AirbyteStream;
2831
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
@@ -67,8 +70,10 @@
6770
import io.airbyte.api.model.generated.SynchronousJobRead;
6871
import io.airbyte.api.model.generated.Tag;
6972
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
73+
import io.airbyte.api.model.generated.WebBackendConnectionListFilters;
7074
import io.airbyte.api.model.generated.WebBackendConnectionListItem;
7175
import io.airbyte.api.model.generated.WebBackendConnectionListRequestBody;
76+
import io.airbyte.api.model.generated.WebBackendConnectionListSortKey;
7277
import io.airbyte.api.model.generated.WebBackendConnectionRead;
7378
import io.airbyte.api.model.generated.WebBackendConnectionReadList;
7479
import io.airbyte.api.model.generated.WebBackendConnectionRequestBody;
@@ -119,6 +124,8 @@
119124
import io.airbyte.data.services.PartialUserConfigService;
120125
import io.airbyte.data.services.SourceService;
121126
import io.airbyte.data.services.WorkspaceService;
127+
import io.airbyte.data.services.shared.ConnectionFilters;
128+
import io.airbyte.data.services.shared.ConnectionSortKey;
122129
import io.airbyte.data.services.shared.DestinationAndDefinition;
123130
import io.airbyte.data.services.shared.SourceAndDefinition;
124131
import io.airbyte.data.services.shared.StandardSyncQuery;
@@ -150,6 +157,7 @@
150157
import org.junit.jupiter.api.BeforeEach;
151158
import org.junit.jupiter.api.Test;
152159
import org.junit.jupiter.params.ParameterizedTest;
160+
import org.junit.jupiter.params.provider.MethodSource;
153161
import org.junit.jupiter.params.provider.ValueSource;
154162
import org.mockito.ArgumentCaptor;
155163
import org.mockito.InOrder;
@@ -331,8 +339,10 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio
331339
final StandardSync brokenStandardSync =
332340
ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true, Status.INACTIVE);
333341

334-
when(connectionService.listWorkspaceStandardSyncs(new StandardSyncQuery(sourceRead.getWorkspaceId(), List.of(), List.of(), false)))
342+
when(connectionService.listWorkspaceStandardSyncs(any(StandardSyncQuery.class)))
335343
.thenReturn(Collections.singletonList(standardSync));
344+
when(connectionService.countWorkspaceStandardSyncs(any(StandardSyncQuery.class), any()))
345+
.thenReturn(1);
336346
when(sourceService.getSourceAndDefinitionsFromSourceIds(Collections.singletonList(source.getSourceId())))
337347
.thenReturn(Collections.singletonList(new SourceAndDefinition(source, sourceDefinition)));
338348
when(destinationService.getDestinationAndDefinitionsFromDestinationIds(Collections.singletonList(destination.getDestinationId())))
@@ -375,6 +385,17 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio
375385
.name("Test Operation")));
376386

377387
final Instant now = Instant.now();
388+
389+
// Mock cursor pagination with job info using the fixed timestamp
390+
when(connectionService.listWorkspaceStandardSyncsCursorPaginated(any(StandardSyncQuery.class), any()))
391+
.thenReturn(Collections.singletonList(
392+
new io.airbyte.data.services.shared.ConnectionWithJobInfo(
393+
standardSync,
394+
"source",
395+
"destination",
396+
Optional.of(io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus.succeeded),
397+
Optional.of(java.time.OffsetDateTime.ofInstant(now, java.time.ZoneOffset.UTC)))));
398+
378399
final JobWithAttemptsRead jobRead = new JobWithAttemptsRead()
379400
.job(new JobRead()
380401
.configId(connectionRead.getConnectionId().toString())
@@ -1672,4 +1693,135 @@ void testGetSchemaChangeNotBreaking() {
16721693
Optional.of(catalogId), Optional.of(new ActorCatalogFetchEvent().withActorCatalogId(differentCatalogId))));
16731694
}
16741695

1696+
@Test
1697+
void testParseSortKeyAllValues() {
1698+
// Test null sort key defaults to CONNECTION_NAME ascending
1699+
final WebBackendConnectionsHandler.SortKeyInfo result = wbHandler.parseSortKey(null);
1700+
assertEquals(ConnectionSortKey.CONNECTION_NAME, result.sortKey());
1701+
assertTrue(result.ascending());
1702+
1703+
// Test all sort key enum values
1704+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.CONNECTION_NAME, true),
1705+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.CONNECTION_NAME_ASC));
1706+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.CONNECTION_NAME, false),
1707+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.CONNECTION_NAME_DESC));
1708+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.SOURCE_NAME, true),
1709+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.SOURCE_NAME_ASC));
1710+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.SOURCE_NAME, false),
1711+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.SOURCE_NAME_DESC));
1712+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.DESTINATION_NAME, true),
1713+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.DESTINATION_NAME_ASC));
1714+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.DESTINATION_NAME, false),
1715+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.DESTINATION_NAME_DESC));
1716+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.LAST_SYNC, true),
1717+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.LAST_SYNC_ASC));
1718+
assertEquals(new WebBackendConnectionsHandler.SortKeyInfo(ConnectionSortKey.LAST_SYNC, false),
1719+
wbHandler.parseSortKey(WebBackendConnectionListSortKey.LAST_SYNC_DESC));
1720+
}
1721+
1722+
@Test
1723+
void testBuildConnectionFiltersNull() {
1724+
assertNull(wbHandler.buildConnectionFilters(null));
1725+
}
1726+
1727+
@Test
1728+
void testBuildConnectionFiltersEmpty() {
1729+
final WebBackendConnectionListFilters filters = new WebBackendConnectionListFilters();
1730+
final ConnectionFilters result = wbHandler.buildConnectionFilters(filters);
1731+
1732+
assertNotNull(result);
1733+
assertNull(result.getSearchTerm());
1734+
1735+
// Empty lists are returned instead of null for collection fields
1736+
assertTrue(result.getSourceDefinitionIds().isEmpty());
1737+
assertTrue(result.getDestinationDefinitionIds().isEmpty());
1738+
assertTrue(result.getStatuses().isEmpty());
1739+
assertTrue(result.getStates().isEmpty());
1740+
assertTrue(result.getTagIds().isEmpty());
1741+
}
1742+
1743+
@ParameterizedTest
1744+
@MethodSource("provideFilterTestCases")
1745+
void testBuildConnectionFiltersWithAllFields(String testName,
1746+
WebBackendConnectionListFilters filters,
1747+
String expectedSearchTerm,
1748+
int expectedSourceDefIds,
1749+
int expectedDestDefIds,
1750+
int expectedStatuses,
1751+
int expectedStates,
1752+
int expectedTagIds) {
1753+
final ConnectionFilters result = wbHandler.buildConnectionFilters(filters);
1754+
1755+
assertNotNull(result);
1756+
assertEquals(expectedSearchTerm, result.getSearchTerm());
1757+
assertEquals(expectedSourceDefIds, result.getSourceDefinitionIds().size());
1758+
assertEquals(expectedDestDefIds, result.getDestinationDefinitionIds().size());
1759+
assertEquals(expectedStatuses, result.getStatuses().size());
1760+
assertEquals(expectedStates, result.getStates().size());
1761+
assertEquals(expectedTagIds, result.getTagIds().size());
1762+
}
1763+
1764+
static Object[][] provideFilterTestCases() {
1765+
final UUID sourceDefId1 = UUID.randomUUID();
1766+
final UUID sourceDefId2 = UUID.randomUUID();
1767+
final UUID destDefId = UUID.randomUUID();
1768+
final UUID tagId1 = UUID.randomUUID();
1769+
final UUID tagId2 = UUID.randomUUID();
1770+
1771+
return new Object[][] {
1772+
// Search term only
1773+
{
1774+
"Search term only",
1775+
new WebBackendConnectionListFilters().searchTerm("test search"),
1776+
"test search", 0, 0, 0, 0, 0
1777+
},
1778+
// Source definition IDs only
1779+
{
1780+
"Source definition IDs only",
1781+
new WebBackendConnectionListFilters().sourceDefinitionIds(List.of(sourceDefId1, sourceDefId2)),
1782+
null, 2, 0, 0, 0, 0
1783+
},
1784+
// Destination definition IDs only
1785+
{
1786+
"Destination definition IDs only",
1787+
new WebBackendConnectionListFilters().destinationDefinitionIds(List.of(destDefId)),
1788+
null, 0, 1, 0, 0, 0
1789+
},
1790+
// Tag IDs only
1791+
{
1792+
"Tag IDs only",
1793+
new WebBackendConnectionListFilters().tagIds(List.of(tagId1, tagId2)),
1794+
null, 0, 0, 0, 0, 2
1795+
},
1796+
// Statuses only
1797+
{
1798+
"Statuses only",
1799+
new WebBackendConnectionListFilters().statuses(List.of(
1800+
WebBackendConnectionListFilters.StatusesEnum.HEALTHY,
1801+
WebBackendConnectionListFilters.StatusesEnum.FAILED)),
1802+
null, 0, 0, 2, 0, 0
1803+
},
1804+
// States only
1805+
{
1806+
"States only",
1807+
new WebBackendConnectionListFilters().states(List.of(
1808+
ActorStatus.ACTIVE,
1809+
ActorStatus.INACTIVE)),
1810+
null, 0, 0, 0, 2, 0
1811+
},
1812+
// Multiple filters combined
1813+
{
1814+
"Multiple filters combined",
1815+
new WebBackendConnectionListFilters()
1816+
.searchTerm("production")
1817+
.sourceDefinitionIds(List.of(sourceDefId1))
1818+
.destinationDefinitionIds(List.of(destDefId))
1819+
.statuses(List.of(WebBackendConnectionListFilters.StatusesEnum.HEALTHY))
1820+
.states(List.of(ActorStatus.ACTIVE))
1821+
.tagIds(List.of(tagId1)),
1822+
"production", 1, 1, 1, 1, 1
1823+
}
1824+
};
1825+
}
1826+
16751827
}

airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionService.kt

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import io.airbyte.config.StandardSync
1010
import io.airbyte.config.StreamDescriptor
1111
import io.airbyte.config.StreamDescriptorForDestination
1212
import io.airbyte.data.ConfigNotFoundException
13+
import io.airbyte.data.services.shared.ConnectionFilters
14+
import io.airbyte.data.services.shared.ConnectionListCursorPagination
15+
import io.airbyte.data.services.shared.ConnectionSortKey
16+
import io.airbyte.data.services.shared.ConnectionWithJobInfo
1317
import io.airbyte.data.services.shared.StandardSyncQuery
1418
import io.airbyte.data.services.shared.StandardSyncsQueryPaginated
1519
import io.airbyte.validation.json.JsonValidationException
@@ -45,16 +49,38 @@ interface ConnectionService {
4549
fun listWorkspaceStandardSyncs(standardSyncQuery: StandardSyncQuery): List<StandardSync>
4650

4751
@Throws(IOException::class)
48-
fun listWorkspaceStandardSyncsPaginated(
52+
fun listWorkspaceStandardSyncsCursorPaginated(
53+
standardSyncQuery: StandardSyncQuery,
54+
connectionListCursorPagination: ConnectionListCursorPagination,
55+
): List<ConnectionWithJobInfo>
56+
57+
@Throws(IOException::class)
58+
fun countWorkspaceStandardSyncs(
59+
standardSyncQuery: StandardSyncQuery,
60+
filters: ConnectionFilters?,
61+
): Int
62+
63+
@Throws(IOException::class)
64+
fun listWorkspaceStandardSyncsLimitOffsetPaginated(
4965
workspaceIds: List<UUID>,
5066
tagIds: List<UUID>,
5167
includeDeleted: Boolean,
5268
pageSize: Int,
5369
rowOffset: Int,
5470
): Map<UUID, List<StandardSync>>
5571

72+
@Throws(IOException::class, ConfigNotFoundException::class, JsonValidationException::class)
73+
fun buildCursorPagination(
74+
cursor: UUID?,
75+
internalSortKey: ConnectionSortKey?,
76+
connectionFilters: ConnectionFilters?,
77+
query: StandardSyncQuery?,
78+
ascending: Boolean?,
79+
pageSize: Int?,
80+
): ConnectionListCursorPagination?
81+
5682
@Throws(IOException::class)
57-
fun listWorkspaceStandardSyncsPaginated(standardSyncsQueryPaginated: StandardSyncsQueryPaginated): Map<UUID, List<StandardSync>>
83+
fun listWorkspaceStandardSyncsLimitOffsetPaginated(standardSyncsQueryPaginated: StandardSyncsQueryPaginated): Map<UUID, List<StandardSync>>
5884

5985
@Throws(IOException::class)
6086
fun listConnectionsBySource(

0 commit comments

Comments
 (0)