Skip to content

Commit 51fadf2

Browse files
authored
🐛 Airbyte Kube will now error on start up if incorrect logs configuration is passed in. Fix scheduler counting bug. (#6188)
* Move the cloud client create call so this errors out on app start up instead of when they the log file is retrieved. * Add tests. * Fix code that was never being called.
1 parent fa0bd55 commit 51fadf2

File tree

4 files changed

+64
-14
lines changed

4 files changed

+64
-14
lines changed

airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ public static File getServerLogFile(Configs configs) {
9191
}
9292

9393
var logConfigs = new LogConfigDelegator(configs);
94-
createCloudClientIfNull(logConfigs);
9594
var cloudLogPath = APP_LOGGING_CLOUD_PREFIX + logPathBase;
9695
try {
9796
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
@@ -107,7 +106,6 @@ public static File getSchedulerLogFile(Configs configs) {
107106
}
108107

109108
var logConfigs = new LogConfigDelegator(configs);
110-
createCloudClientIfNull(logConfigs);
111109
var cloudLogPath = APP_LOGGING_CLOUD_PREFIX + logPathBase;
112110
try {
113111
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
@@ -122,7 +120,6 @@ public static List<String> getJobLogFile(Configs configs, Path logPath) throws I
122120
}
123121

124122
var logConfigs = new LogConfigDelegator(configs);
125-
createCloudClientIfNull(logConfigs);
126123
var cloudLogPath = JOB_LOGGING_CLOUD_PREFIX + logPath;
127124
return logClient.tailCloudLog(logConfigs, cloudLogPath, LOG_TAIL_SIZE);
128125
}
@@ -136,33 +133,38 @@ public static void deleteLogs(Configs configs, String logPath) {
136133
throw new NotImplementedException("Local log deletes not supported.");
137134
}
138135
var logConfigs = new LogConfigDelegator(configs);
139-
createCloudClientIfNull(logConfigs);
140136
var cloudLogPath = JOB_LOGGING_CLOUD_PREFIX + logPath;
141137
logClient.deleteLogs(logConfigs, cloudLogPath);
142138
}
143139

144140
public static void setJobMdc(Path path) {
145-
if (shouldUseLocalLogs(new EnvConfigs())) {
141+
var configs = new EnvConfigs();
142+
if (shouldUseLocalLogs(configs)) {
146143
LOGGER.debug("Setting docker job mdc");
147144
MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
148145
} else {
149146
LOGGER.debug("Setting kube job mdc");
147+
var logConfigs = new LogConfigDelegator(configs);
148+
createCloudClientIfNull(logConfigs);
150149
MDC.put(LogClientSingleton.CLOUD_JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
151150
}
152151
}
153152

154153
public static void setWorkspaceMdc(Path path) {
155-
if (shouldUseLocalLogs(new EnvConfigs())) {
154+
var configs = new EnvConfigs();
155+
if (shouldUseLocalLogs(configs)) {
156156
LOGGER.debug("Setting docker workspace mdc");
157157
MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, path.toString());
158158
} else {
159159
LOGGER.debug("Setting kube workspace mdc");
160+
var logConfigs = new LogConfigDelegator(configs);
161+
createCloudClientIfNull(logConfigs);
160162
MDC.put(LogClientSingleton.CLOUD_WORKSPACE_MDC_KEY, path.toString());
161163
}
162164
}
163165

164166
private static boolean shouldUseLocalLogs(Configs configs) {
165-
return configs.getWorkerEnvironment().equals(WorkerEnvironment.DOCKER) || CloudLogs.hasEmptyConfigs(new LogConfigDelegator(configs));
167+
return configs.getWorkerEnvironment().equals(WorkerEnvironment.DOCKER);
166168
}
167169

168170
private static void createCloudClientIfNull(LogConfigs configs) {

airbyte-config/models/src/test/java/io/airbyte/config/helpers/CloudLogsTest.java renamed to airbyte-config/models/src/test/java/io/airbyte/config/helpers/CloudLogsClientTest.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,59 @@
2525
package io.airbyte.config.helpers;
2626

2727
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.junit.jupiter.api.Assertions.assertThrows;
2829

30+
import org.junit.jupiter.api.Nested;
2931
import org.junit.jupiter.api.Test;
3032
import org.mockito.Mockito;
3133

32-
public class CloudLogsTest {
34+
public class CloudLogsClientTest {
35+
36+
@Nested
37+
class CloudLogClientMissingConfiguration {
38+
39+
@Test
40+
public void testMinio() {
41+
var configs = Mockito.mock(LogConfigs.class);
42+
// Mising bucket.
43+
Mockito.when(configs.getS3MinioEndpoint()).thenReturn("minio-endpoint");
44+
Mockito.when(configs.getAwsAccessKey()).thenReturn("access-key");
45+
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret");
46+
Mockito.when(configs.getS3LogBucket()).thenReturn("");
47+
Mockito.when(configs.getS3LogBucketRegion()).thenReturn("");
48+
49+
assertThrows(RuntimeException.class, () -> CloudLogs.createCloudLogClient(configs));
50+
}
51+
52+
@Test
53+
public void testAws() {
54+
var configs = Mockito.mock(LogConfigs.class);
55+
// Missing bucket and access key.
56+
Mockito.when(configs.getS3MinioEndpoint()).thenReturn("");
57+
Mockito.when(configs.getAwsAccessKey()).thenReturn("");
58+
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("access-key-secret");
59+
Mockito.when(configs.getS3LogBucket()).thenReturn("");
60+
Mockito.when(configs.getS3LogBucketRegion()).thenReturn("");
61+
62+
assertThrows(RuntimeException.class, () -> CloudLogs.createCloudLogClient(configs));
63+
}
64+
65+
@Test
66+
public void testGcs() {
67+
var configs = Mockito.mock(LogConfigs.class);
68+
Mockito.when(configs.getAwsAccessKey()).thenReturn("");
69+
Mockito.when(configs.getAwsSecretAccessKey()).thenReturn("");
70+
Mockito.when(configs.getS3LogBucket()).thenReturn("");
71+
Mockito.when(configs.getS3LogBucketRegion()).thenReturn("");
72+
73+
// Missing bucket.
74+
Mockito.when(configs.getGcpStorageBucket()).thenReturn("");
75+
Mockito.when(configs.getGoogleApplicationCredentials()).thenReturn("path/to/google/secret");
76+
77+
assertThrows(RuntimeException.class, () -> CloudLogs.createCloudLogClient(configs));
78+
}
79+
80+
}
3381

3482
@Test
3583
public void createCloudLogClientTestMinio() {

airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import java.time.Instant;
4141
import java.util.List;
4242
import java.util.Optional;
43-
import java.util.concurrent.atomic.AtomicInteger;
4443
import java.util.function.BiPredicate;
4544
import java.util.stream.Collectors;
4645
import org.slf4j.Logger;
@@ -92,19 +91,20 @@ public void run() {
9291
}
9392

9493
private void scheduleSyncJobs() throws IOException {
95-
final AtomicInteger jobsScheduled = new AtomicInteger();
94+
int jobsScheduled = 0;
9695
final List<StandardSync> activeConnections = getAllActiveConnections();
9796

9897
for (StandardSync connection : activeConnections) {
9998
final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connection.getConnectionId());
10099

101100
if (scheduleJobPredicate.test(previousJobOptional, connection)) {
102101
jobFactory.create(connection.getConnectionId());
102+
jobsScheduled++;
103103
}
104104
}
105-
int jobsScheduledCount = jobsScheduled.get();
106-
if (jobsScheduledCount > 0) {
107-
LOGGER.info("Job-Scheduler Summary. Active connections: {}, Jobs scheduler: {}", activeConnections.size(), jobsScheduled.get());
105+
106+
if (jobsScheduled > 0) {
107+
LOGGER.info("Job-Scheduler Summary. Active connections: {}, Jobs scheduled this cycle: {}", activeConnections.size(), jobsScheduled);
108108
}
109109
}
110110

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def createSpotlessTarget = { pattern ->
7171
'normalization_test_output',
7272
'tools',
7373
'secrets',
74-
'charts'
74+
'charts' // Helm charts often have injected template strings that will fail general linting. Helm linting is done separately.
7575
]
7676

7777
if (System.getenv().containsKey("SUB_BUILD")) {

0 commit comments

Comments
 (0)