Skip to content

Commit b068ce9

Browse files
authored
Stop exposing database objects in ConfigRepository. (#11282)
Restore DB abstraction layer by removing the DB exposure.
1 parent eaf364e commit b068ce9

File tree

4 files changed

+23
-28
lines changed

4 files changed

+23
-28
lines changed

airbyte-config/persistence/build.gradle

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@ plugins {
44
}
55

66
dependencies {
7-
implementation 'commons-io:commons-io:2.7'
8-
97
implementation project(':airbyte-commons-docker')
8+
implementation project(':airbyte-config:models')
109
implementation project(':airbyte-db:lib')
1110
implementation project(':airbyte-db:jooq')
12-
implementation project(':airbyte-protocol:models')
13-
implementation project(':airbyte-config:models')
1411
implementation project(':airbyte-json-validation')
12+
implementation project(':airbyte-protocol:models')
13+
implementation project(':airbyte-metrics:lib')
14+
15+
implementation 'commons-io:commons-io:2.7'
1516
implementation 'com.google.cloud:google-cloud-secretmanager:2.0.5'
17+
1618
testImplementation "org.testcontainers:postgresql:1.15.3"
1719
testImplementation project(':airbyte-test-utils')
1820
integrationTestJavaImplementation project(':airbyte-config:persistence')

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.airbyte.db.Database;
3838
import io.airbyte.db.ExceptionWrappingDatabase;
3939
import io.airbyte.db.instance.configs.jooq.enums.ActorType;
40+
import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
41+
import io.airbyte.metrics.lib.MetricQueries;
4042
import io.airbyte.protocol.models.AirbyteCatalog;
4143
import io.airbyte.validation.json.JsonValidationException;
4244
import java.io.IOException;
@@ -68,12 +70,6 @@ public class ConfigRepository {
6870
private final ConfigPersistence persistence;
6971
private final ExceptionWrappingDatabase database;
7072

71-
// todo (cgardens) - very bad that this exposed. usages should be removed. do not use it.
72-
@Deprecated
73-
public ExceptionWrappingDatabase getDatabase() {
74-
return database;
75-
}
76-
7773
public ConfigRepository(final ConfigPersistence persistence, final Database database) {
7874
this.persistence = persistence;
7975
this.database = new ExceptionWrappingDatabase(database);;
@@ -778,4 +774,17 @@ public void loadDataNoSecrets(final ConfigPersistence seedPersistenceWithoutSecr
778774
persistence.loadData(seedPersistenceWithoutSecrets);
779775
}
780776

777+
/**
778+
* The following methods are present to allow the JobCreationAndStatusUpdateActivity class to emit
779+
* metrics without exposing the underlying database connection.
780+
*/
781+
782+
public List<ReleaseStage> getSrcIdAndDestIdToReleaseStages(final UUID srcId, final UUID dstId) throws IOException {
783+
return database.query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId));
784+
}
785+
786+
public List<ReleaseStage> getJobIdToReleaseStages(final long jobId) throws IOException {
787+
return database.query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId));
788+
}
789+
781790
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.airbyte.config.persistence.ConfigRepository;
2222
import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
2323
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
24-
import io.airbyte.metrics.lib.MetricQueries;
2524
import io.airbyte.metrics.lib.MetricTags;
2625
import io.airbyte.metrics.lib.MetricsRegistry;
2726
import io.airbyte.scheduler.models.Job;
@@ -98,7 +97,7 @@ public JobCreationOutput createNewJob(final JobCreationInput input) {
9897
}
9998

10099
private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID dstId) throws IOException {
101-
final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.srcIdAndDestIdToReleaseStages(ctx, srcId, dstId));
100+
final var releaseStages = configRepository.getSrcIdAndDestIdToReleaseStages(srcId, dstId);
102101
if (releaseStages == null || releaseStages.size() == 0) {
103102
return;
104103
}
@@ -263,7 +262,7 @@ public void reportJobStart(final ReportJobStartInput input) {
263262
}
264263

265264
private void emitJobIdToReleaseStagesMetric(final MetricsRegistry metric, final long jobId) throws IOException {
266-
final var releaseStages = configRepository.getDatabase().query(ctx -> MetricQueries.jobIdToReleaseStages(ctx, jobId));
265+
final var releaseStages = configRepository.getJobIdToReleaseStages(jobId);
267266
if (releaseStages == null || releaseStages.size() == 0) {
268267
return;
269268
}

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.airbyte.config.helpers.LogConfigs;
1818
import io.airbyte.config.persistence.ConfigNotFoundException;
1919
import io.airbyte.config.persistence.ConfigRepository;
20-
import io.airbyte.db.ExceptionWrappingDatabase;
2120
import io.airbyte.scheduler.models.Job;
2221
import io.airbyte.scheduler.persistence.JobNotifier;
2322
import io.airbyte.scheduler.persistence.JobPersistence;
@@ -108,8 +107,6 @@ public void createJob() throws JsonValidationException, ConfigNotFoundException,
108107
.thenReturn(JOB_ID);
109108
Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID))
110109
.thenReturn(Mockito.mock(StandardSync.class));
111-
Mockito.when(mConfigRepository.getDatabase())
112-
.thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
113110

114111
final JobCreationOutput output = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(CONNECTION_ID, false));
115112

@@ -119,8 +116,6 @@ public void createJob() throws JsonValidationException, ConfigNotFoundException,
119116
@Test
120117
@DisplayName("Test attempt creation")
121118
public void createAttempt() throws IOException {
122-
Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
123-
124119
final Job mJob = Mockito.mock(Job.class);
125120

126121
Mockito.when(mJobPersistence.getJob(JOB_ID))
@@ -169,8 +164,6 @@ public void createAttemptThrowException() throws IOException {
169164
@Test
170165
@DisplayName("Test attempt creation")
171166
public void createAttemptNumber() throws IOException {
172-
Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
173-
174167
final Job mJob = Mockito.mock(Job.class);
175168

176169
Mockito.when(mJobPersistence.getJob(JOB_ID))
@@ -223,8 +216,6 @@ class Update {
223216

224217
@Test
225218
public void setJobSuccess() throws IOException {
226-
Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
227-
228219
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput));
229220

230221
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
@@ -245,8 +236,6 @@ public void setJobSuccessWrapException() throws IOException {
245236

246237
@Test
247238
public void setJobFailure() throws IOException {
248-
Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
249-
250239
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason"));
251240

252241
Mockito.verify(mJobPersistence).failJob(JOB_ID);
@@ -265,8 +254,6 @@ public void setJobFailureWrapException() throws IOException {
265254

266255
@Test
267256
public void setAttemptFailure() throws IOException {
268-
Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
269-
270257
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, standardSyncOutput, failureSummary));
271258

272259
Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
@@ -288,8 +275,6 @@ public void setAttemptFailureWrapException() throws IOException {
288275

289276
@Test
290277
public void setJobCancelled() throws IOException {
291-
Mockito.when(mConfigRepository.getDatabase()).thenReturn(Mockito.mock(ExceptionWrappingDatabase.class));
292-
293278
jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, failureSummary));
294279

295280
Mockito.verify(mJobPersistence).cancelJob(JOB_ID);

0 commit comments

Comments
 (0)