Skip to content

Commit ff1ebb0

Browse files
gosusnpdizel852krishnaglick
authored
Improve performance of list connection operations (#20264)
* Enable source/destination filtering in /web_backend/connection/list * Add tests on connection filtering * Remove redundant DB call * Make some methods static to avoid sneaky db calls * Minor refactor * Small refactoring + add filtering by source for catalog fetch events * Add comment * Trim WebBackendConnectionList response payload * fix build errors * Remove requests to list_by_workspace and list_latest * Add sourcedefid and destdefid to source/dest snippets read * fixed sourceDefinitionId and destinationDefinitionId * Fix webbackend handler tests Co-authored-by: Volodymyr Petrov <[email protected]> Co-authored-by: KC <[email protected]>
1 parent dddaad5 commit ff1ebb0

File tree

17 files changed

+260
-135
lines changed

17 files changed

+260
-135
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1771,7 +1771,7 @@ paths:
17711771
content:
17721772
application/json:
17731773
schema:
1774-
$ref: "#/components/schemas/WorkspaceIdRequestBody"
1774+
$ref: "#/components/schemas/WebBackendConnectionListRequestBody"
17751775
required: true
17761776
responses:
17771777
"200":
@@ -2738,6 +2738,24 @@ components:
27382738
type: string
27392739
icon:
27402740
type: string
2741+
SourceSnippetRead:
2742+
type: object
2743+
required:
2744+
- sourceId
2745+
- name
2746+
- sourceDefinitionId
2747+
- sourceName
2748+
properties:
2749+
sourceId:
2750+
$ref: "#/components/schemas/SourceId"
2751+
name:
2752+
type: string
2753+
sourceDefinitionId:
2754+
$ref: "#/components/schemas/SourceDefinitionId"
2755+
sourceName:
2756+
type: string
2757+
icon:
2758+
type: string
27412759
SourceReadList:
27422760
type: object
27432761
required:
@@ -3035,6 +3053,24 @@ components:
30353053
type: string
30363054
icon:
30373055
type: string
3056+
DestinationSnippetRead:
3057+
type: object
3058+
required:
3059+
- destinationId
3060+
- name
3061+
- destinationDefinitionId
3062+
- destinationName
3063+
properties:
3064+
destinationId:
3065+
$ref: "#/components/schemas/DestinationId"
3066+
name:
3067+
type: string
3068+
destinationDefinitionId:
3069+
$ref: "#/components/schemas/DestinationDefinitionId"
3070+
destinationName:
3071+
type: string
3072+
icon:
3073+
type: string
30383074
DestinationReadList:
30393075
type: object
30403076
required:
@@ -4624,14 +4660,23 @@ components:
46244660
type: integer
46254661
sourceDefinitions:
46264662
type: integer
4663+
WebBackendConnectionListRequestBody:
4664+
type: object
4665+
required:
4666+
- workspaceId
4667+
properties:
4668+
workspaceId:
4669+
$ref: "#/components/schemas/WorkspaceId"
4670+
sourceId:
4671+
$ref: "#/components/schemas/SourceId"
4672+
destinationId:
4673+
$ref: "#/components/schemas/DestinationId"
46274674
WebBackendConnectionListItem:
46284675
type: object
46294676
description: Information about a connection that shows up in the connection list view.
46304677
required:
46314678
- connectionId
46324679
- name
4633-
- sourceId
4634-
- destinationId
46354680
- source
46364681
- destination
46374682
- status
@@ -4642,20 +4687,16 @@ components:
46424687
$ref: "#/components/schemas/ConnectionId"
46434688
name:
46444689
type: string
4645-
sourceId:
4646-
$ref: "#/components/schemas/SourceId"
4647-
destinationId:
4648-
$ref: "#/components/schemas/DestinationId"
46494690
scheduleType:
46504691
$ref: "#/components/schemas/ConnectionScheduleType"
46514692
scheduleData:
46524693
$ref: "#/components/schemas/ConnectionScheduleData"
46534694
status:
46544695
$ref: "#/components/schemas/ConnectionStatus"
46554696
source:
4656-
$ref: "#/components/schemas/SourceRead"
4697+
$ref: "#/components/schemas/SourceSnippetRead"
46574698
destination:
4658-
$ref: "#/components/schemas/DestinationRead"
4699+
$ref: "#/components/schemas/DestinationSnippetRead"
46594700
latestSyncJobCreatedAt:
46604701
$ref: "#/components/schemas/JobCreatedAt"
46614702
latestSyncJobStatus:

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
5959
import io.airbyte.protocol.models.StreamDescriptor;
6060
import io.airbyte.validation.json.JsonValidationException;
61+
import jakarta.annotation.Nonnull;
6162
import java.io.IOException;
6263
import java.time.OffsetDateTime;
6364
import java.util.ArrayList;
@@ -92,6 +93,8 @@
9293
"OptionalUsedAsFieldOrParameterType"})
9394
public class ConfigRepository {
9495

96+
public record StandardSyncQuery(@Nonnull UUID workspaceId, UUID sourceId, UUID destinationId, boolean includeDeleted) {}
97+
9598
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class);
9699
private static final String OPERATION_IDS_AGG_FIELD = "operation_ids_agg";
97100
private static final String OPERATION_IDS_AGG_DELIMITER = ",";
@@ -843,6 +846,10 @@ public List<StandardSync> listStandardSyncsUsingOperation(final UUID operationId
843846
}
844847

845848
public List<StandardSync> listWorkspaceStandardSyncs(final UUID workspaceId, final boolean includeDeleted) throws IOException {
849+
return listWorkspaceStandardSyncs(new StandardSyncQuery(workspaceId, null, null, includeDeleted));
850+
}
851+
852+
public List<StandardSync> listWorkspaceStandardSyncs(final StandardSyncQuery standardSyncQuery) throws IOException {
846853
final Result<Record> connectionAndOperationIdsResult = database.query(ctx -> ctx
847854
// SELECT connection.* plus the connection's associated operationIds as a concatenated list
848855
.select(
@@ -856,8 +863,10 @@ public List<StandardSync> listWorkspaceStandardSyncs(final UUID workspaceId, fin
856863

857864
// join with source actors so that we can filter by workspaceId
858865
.join(ACTOR).on(CONNECTION.SOURCE_ID.eq(ACTOR.ID))
859-
.where(ACTOR.WORKSPACE_ID.eq(workspaceId)
860-
.and(includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))
866+
.where(ACTOR.WORKSPACE_ID.eq(standardSyncQuery.workspaceId)
867+
.and(standardSyncQuery.destinationId == null ? noCondition() : CONNECTION.DESTINATION_ID.eq(standardSyncQuery.destinationId))
868+
.and(standardSyncQuery.sourceId == null ? noCondition() : CONNECTION.SOURCE_ID.eq(standardSyncQuery.sourceId))
869+
.and(standardSyncQuery.includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))
861870

862871
// group by connection.id so that the groupConcat above works
863872
.groupBy(CONNECTION.ID)).fetch();

airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.airbyte.config.StandardWorkspace;
3232
import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition;
3333
import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition;
34+
import io.airbyte.config.persistence.ConfigRepository.StandardSyncQuery;
3435
import io.airbyte.db.Database;
3536
import io.airbyte.db.ExceptionWrappingDatabase;
3637
import io.airbyte.protocol.models.AirbyteCatalog;
@@ -188,7 +189,45 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
188189
@Test
189190
void testListWorkspaceStandardSyncAll() throws IOException {
190191
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 4);
191-
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(MockData.standardWorkspaces().get(0).getWorkspaceId(), true);
192+
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(
193+
MockData.standardWorkspaces().get(0).getWorkspaceId(), true);
194+
195+
assertSyncsMatch(expectedSyncs, actualSyncs);
196+
}
197+
198+
@Test
199+
void testListWorkspaceStandardSyncWithAllFiltering() throws IOException {
200+
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
201+
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, MockData.SOURCE_ID_1, MockData.DESTINATION_ID_1, false);
202+
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
203+
.filter(sync -> sync.getDestinationId().equals(query.destinationId()))
204+
.filter(sync -> sync.getSourceId().equals(query.sourceId()))
205+
.toList();
206+
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);
207+
208+
assertSyncsMatch(expectedSyncs, actualSyncs);
209+
}
210+
211+
@Test
212+
void testListWorkspaceStandardSyncDestinationFiltering() throws IOException {
213+
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
214+
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, null, MockData.DESTINATION_ID_1, false);
215+
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
216+
.filter(sync -> sync.getDestinationId().equals(query.destinationId()))
217+
.toList();
218+
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);
219+
220+
assertSyncsMatch(expectedSyncs, actualSyncs);
221+
}
222+
223+
@Test
224+
void testListWorkspaceStandardSyncSourceFiltering() throws IOException {
225+
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
226+
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, MockData.SOURCE_ID_2, null, false);
227+
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
228+
.filter(sync -> sync.getSourceId().equals(query.sourceId()))
229+
.toList();
230+
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);
192231

193232
assertSyncsMatch(expectedSyncs, actualSyncs);
194233
}

airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ public class MockData {
7979
public static final UUID SOURCE_ID_1 = UUID.randomUUID();
8080
public static final UUID SOURCE_ID_2 = UUID.randomUUID();
8181
private static final UUID SOURCE_ID_3 = UUID.randomUUID();
82-
private static final UUID DESTINATION_ID_1 = UUID.randomUUID();
83-
private static final UUID DESTINATION_ID_2 = UUID.randomUUID();
84-
private static final UUID DESTINATION_ID_3 = UUID.randomUUID();
82+
public static final UUID DESTINATION_ID_1 = UUID.randomUUID();
83+
public static final UUID DESTINATION_ID_2 = UUID.randomUUID();
84+
public static final UUID DESTINATION_ID_3 = UUID.randomUUID();
8585
private static final UUID OPERATION_ID_1 = UUID.randomUUID();
8686
private static final UUID OPERATION_ID_2 = UUID.randomUUID();
8787
private static final UUID OPERATION_ID_3 = UUID.randomUUID();

airbyte-server/src/main/java/io/airbyte/server/apis/WebBackendApiController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
import io.airbyte.api.model.generated.ConnectionStateType;
1010
import io.airbyte.api.model.generated.WebBackendCheckUpdatesRead;
1111
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
12+
import io.airbyte.api.model.generated.WebBackendConnectionListRequestBody;
1213
import io.airbyte.api.model.generated.WebBackendConnectionRead;
1314
import io.airbyte.api.model.generated.WebBackendConnectionReadList;
1415
import io.airbyte.api.model.generated.WebBackendConnectionRequestBody;
1516
import io.airbyte.api.model.generated.WebBackendConnectionUpdate;
1617
import io.airbyte.api.model.generated.WebBackendGeographiesListResult;
1718
import io.airbyte.api.model.generated.WebBackendWorkspaceState;
1819
import io.airbyte.api.model.generated.WebBackendWorkspaceStateResult;
19-
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
2020
import io.airbyte.server.handlers.WebBackendCheckUpdatesHandler;
2121
import io.airbyte.server.handlers.WebBackendConnectionsHandler;
2222
import io.airbyte.server.handlers.WebBackendGeographiesHandler;
@@ -57,8 +57,8 @@ public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBacke
5757
}
5858

5959
@Override
60-
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
61-
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
60+
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WebBackendConnectionListRequestBody webBackendConnectionListRequestBody) {
61+
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(webBackendConnectionListRequestBody));
6262
}
6363

6464
@Override

airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.airbyte.api.model.generated.DestinationRead;
1717
import io.airbyte.api.model.generated.DestinationReadList;
1818
import io.airbyte.api.model.generated.DestinationSearch;
19+
import io.airbyte.api.model.generated.DestinationSnippetRead;
1920
import io.airbyte.api.model.generated.DestinationUpdate;
2021
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
2122
import io.airbyte.commons.json.Jsons;
@@ -303,4 +304,14 @@ protected static DestinationRead toDestinationRead(final DestinationConnection d
303304
.icon(DestinationDefinitionsHandler.loadIcon(standardDestinationDefinition.getIcon()));
304305
}
305306

307+
protected static DestinationSnippetRead toDestinationSnippetRead(final DestinationConnection destinationConnection,
308+
final StandardDestinationDefinition standardDestinationDefinition) {
309+
return new DestinationSnippetRead()
310+
.destinationId(destinationConnection.getDestinationId())
311+
.name(destinationConnection.getName())
312+
.destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId())
313+
.destinationName(standardDestinationDefinition.getName())
314+
.icon(DestinationDefinitionsHandler.loadIcon(standardDestinationDefinition.getIcon()));
315+
}
316+
306317
}

airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.airbyte.api.model.generated.SourceRead;
1616
import io.airbyte.api.model.generated.SourceReadList;
1717
import io.airbyte.api.model.generated.SourceSearch;
18+
import io.airbyte.api.model.generated.SourceSnippetRead;
1819
import io.airbyte.api.model.generated.SourceUpdate;
1920
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
2021
import io.airbyte.config.SourceConnection;
@@ -319,4 +320,13 @@ protected static SourceRead toSourceRead(final SourceConnection sourceConnection
319320
.icon(SourceDefinitionsHandler.loadIcon(standardSourceDefinition.getIcon()));
320321
}
321322

323+
protected static SourceSnippetRead toSourceSnippetRead(final SourceConnection source, final StandardSourceDefinition sourceDefinition) {
324+
return new SourceSnippetRead()
325+
.sourceId(source.getSourceId())
326+
.name(source.getName())
327+
.sourceDefinitionId(sourceDefinition.getSourceDefinitionId())
328+
.sourceName(sourceDefinition.getName())
329+
.icon(SourceDefinitionsHandler.loadIcon(sourceDefinition.getIcon()));
330+
}
331+
322332
}

0 commit comments

Comments
 (0)