Skip to content

Commit 016a401

Browse files
authored
Add count connection functions (#10568)
* Add count connection functions * Fix new configRepository queries - Remove unnecessary joins - Fix countConnection * Use existing mock data for tests
1 parent a60aa5f commit 016a401

File tree

2 files changed

+128
-0
lines changed

2 files changed

+128
-0
lines changed

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
package io.airbyte.config.persistence;
66

7+
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
8+
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
9+
710
import com.fasterxml.jackson.databind.JsonNode;
811
import com.google.common.base.Charsets;
912
import com.google.common.hash.HashFunction;
@@ -32,6 +35,7 @@
3235
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
3336
import io.airbyte.db.Database;
3437
import io.airbyte.db.ExceptionWrappingDatabase;
38+
import io.airbyte.db.instance.configs.jooq.enums.ActorType;
3539
import io.airbyte.protocol.models.AirbyteCatalog;
3640
import io.airbyte.protocol.models.ConnectorSpecification;
3741
import io.airbyte.validation.json.JsonSchemaValidator;
@@ -654,6 +658,30 @@ public void writeCatalog(final AirbyteCatalog catalog,
654658
actorCatalogFetchEvent);
655659
}
656660

661+
public int countConnectionsForWorkspace(final UUID workspaceId) throws IOException {
662+
return database.query(ctx -> ctx.selectCount()
663+
.from(CONNECTION)
664+
.join(ACTOR).on(CONNECTION.SOURCE_ID.eq(ACTOR.ID))
665+
.where(ACTOR.WORKSPACE_ID.eq(workspaceId))
666+
.andNot(ACTOR.TOMBSTONE)).fetchOne().into(int.class);
667+
}
668+
669+
public int countSourcesForWorkspace(final UUID workspaceId) throws IOException {
670+
return database.query(ctx -> ctx.selectCount()
671+
.from(ACTOR)
672+
.where(ACTOR.WORKSPACE_ID.equal(workspaceId))
673+
.and(ACTOR.ACTOR_TYPE.eq(ActorType.source))
674+
.andNot(ACTOR.TOMBSTONE)).fetchOne().into(int.class);
675+
}
676+
677+
public int countDestinationsForWorkspace(final UUID workspaceId) throws IOException {
678+
return database.query(ctx -> ctx.selectCount()
679+
.from(ACTOR)
680+
.where(ACTOR.WORKSPACE_ID.equal(workspaceId))
681+
.and(ACTOR.ACTOR_TYPE.eq(ActorType.destination))
682+
.andNot(ACTOR.TOMBSTONE)).fetchOne().into(int.class);
683+
}
684+
657685
/**
658686
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
659687
* string/jsonnode into the AirbyteConfig, Stream<Object<AirbyteConfig.getClassName()>>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.config.persistence;
6+
7+
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.mockito.Mockito.spy;
9+
10+
import io.airbyte.commons.json.Jsons;
11+
import io.airbyte.config.DestinationConnection;
12+
import io.airbyte.config.SourceConnection;
13+
import io.airbyte.config.StandardDestinationDefinition;
14+
import io.airbyte.config.StandardSourceDefinition;
15+
import io.airbyte.config.StandardSync;
16+
import io.airbyte.config.StandardSyncOperation;
17+
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
18+
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
19+
import io.airbyte.db.Database;
20+
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
21+
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
22+
import io.airbyte.db.instance.development.DevDatabaseMigrator;
23+
import io.airbyte.db.instance.development.MigrationDevHelper;
24+
import io.airbyte.protocol.models.ConnectorSpecification;
25+
import io.airbyte.validation.json.JsonValidationException;
26+
import java.io.IOException;
27+
import java.util.Optional;
28+
import java.util.UUID;
29+
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.BeforeAll;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import org.testcontainers.containers.PostgreSQLContainer;
34+
35+
public class ConfigRepositoryE2EReadWriteTest {
36+
37+
private static PostgreSQLContainer<?> container;
38+
private Database database;
39+
private ConfigRepository configRepository;
40+
private DatabaseConfigPersistence configPersistence;
41+
42+
@BeforeAll
43+
public static void dbSetup() {
44+
container = new PostgreSQLContainer<>("postgres:13-alpine")
45+
.withDatabaseName("airbyte")
46+
.withUsername("docker")
47+
.withPassword("docker");
48+
container.start();
49+
}
50+
51+
@BeforeEach
52+
void setup() throws IOException, JsonValidationException {
53+
final var secretPersistence = new MemorySecretPersistence();
54+
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
55+
configPersistence = spy(new DatabaseConfigPersistence(database));
56+
configRepository =
57+
spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence),
58+
database));
59+
final ConfigsDatabaseMigrator configsDatabaseMigrator =
60+
new ConfigsDatabaseMigrator(database, DatabaseConfigPersistenceLoadDataTest.class.getName());
61+
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
62+
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
63+
configRepository.writeStandardWorkspace(MockData.standardWorkspace());
64+
for (final StandardSourceDefinition sourceDefinition : MockData.standardSourceDefinitions()) {
65+
configRepository.writeStandardSourceDefinition(sourceDefinition);
66+
}
67+
for (final StandardDestinationDefinition destinationDefinition : MockData.standardDestinationDefinitions()) {
68+
configRepository.writeStandardDestinationDefinition(destinationDefinition);
69+
}
70+
final ConnectorSpecification specification = new ConnectorSpecification()
71+
.withConnectionSpecification(Jsons.deserialize("{}"));
72+
for (final SourceConnection connection : MockData.sourceConnections()) {
73+
configRepository.writeSourceConnection(connection, specification);
74+
}
75+
for (final DestinationConnection connection : MockData.destinationConnections()) {
76+
configRepository.writeDestinationConnection(connection, specification);
77+
}
78+
for (final StandardSyncOperation operation : MockData.standardSyncOperations()) {
79+
configRepository.writeStandardSyncOperation(operation);
80+
}
81+
for (final StandardSync sync : MockData.standardSyncs()) {
82+
configRepository.writeStandardSync(sync);
83+
}
84+
}
85+
86+
@AfterAll
87+
public static void dbDown() {
88+
container.close();
89+
}
90+
91+
@Test
92+
void testWorkspaceCountConnections() throws IOException {
93+
94+
final UUID workspaceId = MockData.standardWorkspace().getWorkspaceId();
95+
assertEquals(MockData.standardSyncs().size(), configRepository.countConnectionsForWorkspace(workspaceId));
96+
assertEquals(MockData.destinationConnections().size(), configRepository.countDestinationsForWorkspace(workspaceId));
97+
assertEquals(MockData.sourceConnections().size(), configRepository.countSourcesForWorkspace(workspaceId));
98+
}
99+
100+
}

0 commit comments

Comments
 (0)