Skip to content

Commit 81417e6

Browse files
authored
Add connector metadata as sentry tags (#10475)
* Pass worker metadata to connector * Fix compilation * Pass in job id and image from worker * Remove application version * Add default job environment variables * Add back removed comment * Rename env map to job metadata * Fix env configs * Read connector from application * Use empty string * Remove println * Fix unit test * Fix compilation error * Introduce constants for worker env * Add worker env to ENV_VARS_TO_TRANSFER * Pass into getWorkerMetadata map to all constructions * Format code * Format octavia cli * Fix test compilation * Fix typos
1 parent 2eab2da commit 81417e6

File tree

30 files changed

+228
-88
lines changed

30 files changed

+228
-88
lines changed

airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ class Config:
240240
)
241241
spec: Optional[ConnectorSpecification] = None
242242
connectionStatus: Optional[AirbyteConnectionStatus] = None
243-
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the calalog")
243+
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the catalog")
244244
record: Optional[AirbyteRecordMessage] = Field(None, description="record message: the record")
245245
state: Optional[AirbyteStateMessage] = Field(
246246
None,

airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteVersion.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
*/
1313
public class AirbyteVersion {
1414

15-
private static final String DEV_VERSION = "dev";
15+
public static final String DEV_VERSION = "dev";
1616
public static final String AIRBYTE_VERSION_KEY_NAME = "airbyte_version";
1717

1818
private final String version;

airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.base.Preconditions;
88
import com.google.common.base.Splitter;
99
import com.google.common.base.Strings;
10+
import io.airbyte.commons.map.MoreMaps;
1011
import io.airbyte.commons.version.AirbyteVersion;
1112
import io.airbyte.config.helpers.LogClientSingleton;
1213
import io.airbyte.config.helpers.LogConfigs;
@@ -20,6 +21,7 @@
2021
import java.util.HashSet;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Map.Entry;
2325
import java.util.Objects;
2426
import java.util.Optional;
2527
import java.util.Set;
@@ -168,6 +170,11 @@ public class EnvConfigs implements Configs {
168170

169171
public static final String DEFAULT_NETWORK = "host";
170172

173+
public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
174+
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
175+
AIRBYTE_ROLE, EnvConfigs::getAirbyteRole,
176+
WORKER_ENVIRONMENT, (instance) -> instance.getWorkerEnvironment().name());
177+
171178
public static final int DEFAULT_TEMPORAL_HISTORY_RETENTION_IN_DAYS = 30;
172179

173180
private final Function<String, String> getEnv;
@@ -632,11 +639,22 @@ public String getJobMainContainerMemoryLimit() {
632639
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_MEMORY_REQUIREMENT);
633640
}
634641

642+
/**
643+
* There are two types of environment variables available to the job container:
644+
* <ul>
645+
* <li>Exclusive variables prefixed with JOB_DEFAULT_ENV_PREFIX</li>
646+
* <li>Shared variables defined in JOB_SHARED_ENVS</li>
647+
* </ul>
648+
*/
635649
@Override
636650
public Map<String, String> getJobDefaultEnvMap() {
637-
return getAllEnvKeys.get().stream()
651+
final Map<String, String> jobPrefixedEnvMap = getAllEnvKeys.get().stream()
638652
.filter(key -> key.startsWith(JOB_DEFAULT_ENV_PREFIX))
639653
.collect(Collectors.toMap(key -> key.replace(JOB_DEFAULT_ENV_PREFIX, ""), getEnv));
654+
final Map<String, String> jobSharedEnvMap = JOB_SHARED_ENVS.entrySet().stream().collect(Collectors.toMap(
655+
Entry::getKey,
656+
entry -> Objects.requireNonNullElse(entry.getValue().apply(this), "")));
657+
return MoreMaps.merge(jobPrefixedEnvMap, jobSharedEnvMap);
640658
}
641659

642660
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.config;
6+
7+
/**
8+
* These extra env variables are created on the fly and passed to the connector.
9+
*/
10+
public class WorkerEnvConstants {
11+
12+
public static final String WORKER_CONNECTOR_IMAGE = "WORKER_CONNECTOR_IMAGE";
13+
public static final String WORKER_JOB_ID = "WORKER_JOB_ID";
14+
public static final String WORKER_JOB_ATTEMPT = "WORKER_JOB_ATTEMPT";
15+
16+
}

airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.junit.jupiter.api.Assertions.*;
88

99
import io.airbyte.commons.version.AirbyteVersion;
10+
import io.airbyte.config.Configs.WorkerEnvironment;
1011
import java.nio.file.Paths;
1112
import java.util.HashMap;
1213
import java.util.List;
@@ -343,16 +344,27 @@ void testCheckJobMemoryLimitTakePrecedentIfSet() {
343344
}
344345

345346
@Test
346-
void testEmptyEnvMapRetrieval() {
347-
assertEquals(Map.of(), config.getJobDefaultEnvMap());
347+
void testSharedJobEnvMapRetrieval() {
348+
envMap.put(EnvConfigs.AIRBYTE_VERSION, "dev");
349+
envMap.put(EnvConfigs.WORKER_ENVIRONMENT, WorkerEnvironment.KUBERNETES.name());
350+
final Map<String, String> expected = Map.of("AIRBYTE_VERSION", "dev",
351+
"AIRBYTE_ROLE", "",
352+
"WORKER_ENVIRONMENT", "KUBERNETES");
353+
assertEquals(expected, config.getJobDefaultEnvMap());
348354
}
349355

350356
@Test
351-
void testEnvMapRetrieval() {
357+
void testAllJobEnvMapRetrieval() {
358+
envMap.put(EnvConfigs.AIRBYTE_VERSION, "dev");
359+
envMap.put(EnvConfigs.AIRBYTE_ROLE, "UNIT_TEST");
352360
envMap.put(EnvConfigs.JOB_DEFAULT_ENV_PREFIX + "ENV1", "VAL1");
353361
envMap.put(EnvConfigs.JOB_DEFAULT_ENV_PREFIX + "ENV2", "VAL\"2WithQuotesand$ymbols");
354362

355-
final var expected = Map.of("ENV1", "VAL1", "ENV2", "VAL\"2WithQuotesand$ymbols");
363+
final Map<String, String> expected = Map.of("ENV1", "VAL1",
364+
"ENV2", "VAL\"2WithQuotesand$ymbols",
365+
"AIRBYTE_VERSION", "dev",
366+
"AIRBYTE_ROLE", "UNIT_TEST",
367+
"WORKER_ENVIRONMENT", "DOCKER");
356368
assertEquals(expected, config.getJobDefaultEnvMap());
357369
}
358370

airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ class Config:
226226
log: Optional[AirbyteLogMessage] = Field(None, description="log message: any kind of logging you want the platform to know about.")
227227
spec: Optional[ConnectorSpecification] = None
228228
connectionStatus: Optional[AirbyteConnectionStatus] = None
229-
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the calalog")
229+
catalog: Optional[AirbyteCatalog] = Field(None, description="catalog message: the catalog")
230230
record: Optional[AirbyteRecordMessage] = Field(None, description="record message: the record")
231231
state: Optional[AirbyteStateMessage] = Field(
232232
None, description="schema message: the state. Must be the last message produced. The platform uses this information"

airbyte-integrations/bases/base-java/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ plugins {
55

66
dependencies {
77
implementation project(':airbyte-protocol:models')
8+
implementation project(':airbyte-config:models')
89
implementation project(':airbyte-commons-cli')
910
implementation project(':airbyte-json-validation')
1011
api 'io.sentry:sentry:5.6.0'

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java

+68-14
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,18 @@
1212
import io.airbyte.commons.lang.Exceptions.Procedure;
1313
import io.airbyte.commons.string.Strings;
1414
import io.airbyte.commons.util.AutoCloseableIterator;
15+
import io.airbyte.commons.version.AirbyteVersion;
16+
import io.airbyte.config.Configs.WorkerEnvironment;
17+
import io.airbyte.config.EnvConfigs;
18+
import io.airbyte.config.WorkerEnvConstants;
1519
import io.airbyte.integrations.base.sentry.AirbyteSentry;
1620
import io.airbyte.protocol.models.AirbyteConnectionStatus;
1721
import io.airbyte.protocol.models.AirbyteMessage;
1822
import io.airbyte.protocol.models.AirbyteMessage.Type;
1923
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
2024
import io.airbyte.validation.json.JsonSchemaValidator;
2125
import io.sentry.ITransaction;
26+
import io.sentry.NoOpTransaction;
2227
import io.sentry.Sentry;
2328
import io.sentry.SentryLevel;
2429
import io.sentry.SpanStatus;
@@ -93,16 +98,10 @@ public IntegrationRunner(final Source source) {
9398
}
9499

95100
public void run(final String[] args) throws Exception {
96-
initSentry();
97-
98101
final IntegrationConfig parsed = cliParser.parse(args);
99-
final ITransaction transaction = Sentry.startTransaction(
100-
integration.getClass().getSimpleName(),
101-
parsed.getCommand().toString(),
102-
true);
103-
LOGGER.info("Sentry transaction event: {}", transaction.getEventId());
102+
final ITransaction transaction = createSentryTransaction(integration.getClass(), parsed.getCommand());
104103
try {
105-
runInternal(transaction, parsed);
104+
runInternal(parsed);
106105
transaction.finish(SpanStatus.OK);
107106
} catch (final Exception e) {
108107
transaction.setThrowable(e);
@@ -117,7 +116,7 @@ public void run(final String[] args) throws Exception {
117116
}
118117
}
119118

120-
public void runInternal(final ITransaction transaction, final IntegrationConfig parsed) throws Exception {
119+
private void runInternal(final IntegrationConfig parsed) throws Exception {
121120
LOGGER.info("Running integration: {}", integration.getClass().getName());
122121
LOGGER.info("Command: {}", parsed.getCommand());
123122
LOGGER.info("Integration config: {}", parsed);
@@ -285,21 +284,76 @@ private static <T> T parseConfig(final Path path, final Class<T> klass) {
285284
return Jsons.object(jsonNode, klass);
286285
}
287286

288-
private static void initSentry() {
287+
private static ITransaction createSentryTransaction(final Class<?> connectorClass, final Command command) {
288+
if (command == Command.SPEC) {
289+
return NoOpTransaction.getInstance();
290+
}
291+
289292
final Map<String, String> env = System.getenv();
290-
final String connector = env.getOrDefault("APPLICATION", "unknown");
291-
final String version = env.getOrDefault("APPLICATION_VERSION", "unknown");
292293
final boolean enableSentry = Boolean.parseBoolean(env.getOrDefault("ENABLE_SENTRY", "false"));
294+
final String sentryDsn = env.getOrDefault("SENTRY_DSN", "");
295+
if (!enableSentry || sentryDsn.equals("")) {
296+
LOGGER.debug("Skip Sentry transaction because DSN is not available");
297+
return NoOpTransaction.getInstance();
298+
}
299+
300+
final String version = parseConnectorVersion(env.getOrDefault("WORKER_CONNECTOR_IMAGE", ""));
301+
final String airbyteVersion = env.getOrDefault(EnvConfigs.AIRBYTE_VERSION, "");
302+
final String airbyteRole = env.getOrDefault(EnvConfigs.AIRBYTE_ROLE, "");
303+
final boolean isDev = version.equals(AirbyteVersion.DEV_VERSION)
304+
|| airbyteVersion.equals(AirbyteVersion.DEV_VERSION)
305+
|| airbyteRole.equals("airbyter");
306+
if (isDev) {
307+
LOGGER.debug("Skip Sentry transaction for dev environment");
308+
return NoOpTransaction.getInstance();
309+
}
310+
final String connector = env.getOrDefault("APPLICATION", "");
311+
if (connector.equals("")) {
312+
LOGGER.debug("Skip Sentry transaction for unknown connector");
313+
return NoOpTransaction.getInstance();
314+
}
315+
final String workerEnvironment = env.getOrDefault("WORKER_ENVIRONMENT", "");
316+
final String workerJobId = env.getOrDefault(WorkerEnvConstants.WORKER_JOB_ID, "");
317+
final String workerJobAttempt = env.getOrDefault(WorkerEnvConstants.WORKER_JOB_ATTEMPT, "");
318+
if (workerEnvironment.equals("") || workerEnvironment.equals(WorkerEnvironment.DOCKER.name())) {
319+
LOGGER.debug("Skip Sentry transaction for unknown or docker deployment");
320+
return NoOpTransaction.getInstance();
321+
}
293322

294323
// https://docs.sentry.io/platforms/java/configuration/
295324
Sentry.init(options -> {
296-
options.setDsn(enableSentry ? env.getOrDefault("SENTRY_DSN", "") : "");
325+
options.setDsn(sentryDsn);
297326
options.setEnableExternalConfiguration(true);
298-
options.setTracesSampleRate(enableSentry ? 1.0 : 0.0);
327+
options.setTracesSampleRate(1.0);
299328
options.setRelease(String.format("%s@%s", connector, version));
329+
options.setEnvironment(isDev ? "dev" : "production");
300330
options.setTag("connector", connector);
301331
options.setTag("connector_version", version);
332+
options.setTag("job_id", workerJobId);
333+
options.setTag("job_attempt", workerJobAttempt);
334+
options.setTag("airbyte_version", airbyteVersion);
335+
options.setTag("worker_environment", workerEnvironment);
302336
});
337+
338+
final ITransaction transaction = Sentry.startTransaction(
339+
connectorClass.getSimpleName(),
340+
command.toString(),
341+
true);
342+
LOGGER.info("Sentry transaction event: {}", transaction.getEventId());
343+
return transaction;
344+
}
345+
346+
/**
347+
* @param connectorImage Expected format: [organization/]image[:version]
348+
*/
349+
@VisibleForTesting
350+
static String parseConnectorVersion(final String connectorImage) {
351+
if (connectorImage == null || connectorImage.equals("")) {
352+
return "unknown";
353+
}
354+
355+
final String[] tokens = connectorImage.split(":");
356+
return tokens[tokens.length - 1];
303357
}
304358

305359
}

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@
44

55
package io.airbyte.integrations.base;
66

7-
public class JavaBaseConstants {
7+
public final class JavaBaseConstants {
88

9-
public static String ARGS_CONFIG_KEY = "config";
10-
public static String ARGS_CATALOG_KEY = "catalog";
11-
public static String ARGS_STATE_KEY = "state";
9+
private JavaBaseConstants() {}
1210

13-
public static String ARGS_CONFIG_DESC = "path to the json configuration file";
14-
public static String ARGS_CATALOG_DESC = "input path for the catalog";
15-
public static String ARGS_PATH_DESC = "path to the json-encoded state file";
11+
public static final String ARGS_CONFIG_KEY = "config";
12+
public static final String ARGS_CATALOG_KEY = "catalog";
13+
public static final String ARGS_STATE_KEY = "state";
1614

17-
public static String COLUMN_NAME_AB_ID = "_airbyte_ab_id";
18-
public static String COLUMN_NAME_EMITTED_AT = "_airbyte_emitted_at";
19-
public static String COLUMN_NAME_DATA = "_airbyte_data";
15+
public static final String ARGS_CONFIG_DESC = "path to the json configuration file";
16+
public static final String ARGS_CATALOG_DESC = "input path for the catalog";
17+
public static final String ARGS_PATH_DESC = "path to the json-encoded state file";
18+
19+
public static final String COLUMN_NAME_AB_ID = "_airbyte_ab_id";
20+
public static final String COLUMN_NAME_EMITTED_AT = "_airbyte_emitted_at";
21+
public static final String COLUMN_NAME_DATA = "_airbyte_data";
2022

2123
}

airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java

+10
Original file line numberDiff line numberDiff line change
@@ -368,4 +368,14 @@ private void startSleepingThread(final List<Exception> caughtExceptions, final b
368368
});
369369
}
370370

371+
@Test
372+
void testParseConnectorImage() {
373+
assertEquals("unknown", IntegrationRunner.parseConnectorVersion(null));
374+
assertEquals("unknown", IntegrationRunner.parseConnectorVersion(""));
375+
assertEquals("1.0.1-alpha", IntegrationRunner.parseConnectorVersion("airbyte/destination-test:1.0.1-alpha"));
376+
assertEquals("dev", IntegrationRunner.parseConnectorVersion("airbyte/destination-test:dev"));
377+
assertEquals("1.0.1-alpha", IntegrationRunner.parseConnectorVersion("destination-test:1.0.1-alpha"));
378+
assertEquals("1.0.1-alpha", IntegrationRunner.parseConnectorVersion(":1.0.1-alpha"));
379+
}
380+
371381
}

airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ FROM airbyte/integration-base-java:dev
1313
WORKDIR /airbyte
1414

1515
ENV APPLICATION destination-bigquery-denormalized
16-
ENV APPLICATION_VERSION 0.2.10
1716
ENV ENABLE_SENTRY true
1817

1918
COPY --from=build /airbyte /airbyte

airbyte-integrations/connectors/destination-bigquery/Dockerfile

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ FROM airbyte/integration-base-java:dev
1313
WORKDIR /airbyte
1414

1515
ENV APPLICATION destination-bigquery
16-
ENV APPLICATION_VERSION 0.6.11
1716
ENV ENABLE_SENTRY true
1817

1918
COPY --from=build /airbyte /airbyte

airbyte-integrations/connectors/destination-dev-null/Dockerfile

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ FROM airbyte/integration-base-java:dev
1313
WORKDIR /airbyte
1414

1515
ENV APPLICATION destination-dev-null
16-
ENV APPLICATION_VERSION 0.2.3
1716
ENV ENABLE_SENTRY true
1817

1918
COPY --from=build /airbyte /airbyte

airbyte-integrations/connectors/destination-e2e-test/Dockerfile

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ FROM airbyte/integration-base-java:dev
1313
WORKDIR /airbyte
1414

1515
ENV APPLICATION destination-e2e-test
16-
ENV APPLICATION_VERSION 0.2.2
1716
ENV ENABLE_SENTRY true
1817

1918
COPY --from=build /airbyte /airbyte

airbyte-integrations/connectors/destination-snowflake/Dockerfile

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
1818

1919
RUN tar xf ${APPLICATION}.tar --strip-components=1
2020

21-
ENV APPLICATION_VERSION 0.4.17
2221
ENV ENABLE_SENTRY true
2322

2423
LABEL io.airbyte.version=0.4.17

airbyte-integrations/connectors/source-e2e-test-cloud/Dockerfile

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ FROM airbyte/integration-base-java:dev
1313
WORKDIR /airbyte
1414

1515
ENV APPLICATION source-e2e-test-cloud
16-
ENV APPLICATION_VERSION 2.1.0
1716
ENV ENABLE_SENTRY true
1817

1918
COPY --from=build /airbyte /airbyte

airbyte-integrations/connectors/source-e2e-test/Dockerfile

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ FROM airbyte/integration-base-java:dev
1313
WORKDIR /airbyte
1414

1515
ENV APPLICATION source-e2e-test
16-
ENV APPLICATION_VERSION=2.1.0
1716
ENV ENABLE_SENTRY true
1817

1918
COPY --from=build /airbyte /airbyte

airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public boolean transform(final String jobId,
104104
resourceRequirements,
105105
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.CUSTOM_STEP),
106106
Collections.emptyMap(),
107+
Collections.emptyMap(),
107108
dbtArguments.toArray(new String[0]));
108109
LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC_BUILDER);
109110
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC_BUILDER);

airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java

+1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ private boolean runProcess(final String jobId,
130130
resourceRequirements,
131131
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP),
132132
Collections.emptyMap(),
133+
Collections.emptyMap(),
133134
args);
134135

135136
LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC_BUILDER);

0 commit comments

Comments
 (0)