Skip to content

Add API source to tracking data #22320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 6, 2023
8 changes: 6 additions & 2 deletions airbyte-analytics/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
dependencies {
implementation 'com.segment.analytics.java:analytics:2.1.1'
plugins {
id 'java-library'
}

dependencies {
api libs.segment.java.analytics
api libs.micronaut.http

implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.config.StandardWorkspace;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.context.ServerRequestContext;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -45,10 +48,12 @@ public class SegmentTrackingClient implements TrackingClient {

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentTrackingClient.class);

public static final String AIRBYTE_ANALYTIC_SOURCE_HEADER = "X-Airbyte-Analytic-Source";
public static final String CUSTOMER_ID_KEY = "user_id";
private static final String SEGMENT_WRITE_KEY = "7UDdp5K55CyiGgsauOr2pNNujGvmhaeu";
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
protected static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String AIRBYTE_ROLE = "airbyte_role";
protected static final String AIRBYTE_SOURCE = "airbyte_source";
private static final String AIRBYTE_TRACKED_AT = "tracked_at";

// Analytics is threadsafe.
Expand Down Expand Up @@ -123,6 +128,9 @@ public void track(@Nullable final UUID workspaceId, final String action, final M
}
final Map<String, Object> mapCopy = new HashMap<>(metadata);
final TrackingIdentity trackingIdentity = identityFetcher.apply(workspaceId);
final Optional<String> airbyteSource = getAirbyteSource();

airbyteSource.ifPresent(a -> mapCopy.put(AIRBYTE_SOURCE, a));

// Always add these traits.
mapCopy.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion().serialize());
Expand All @@ -138,4 +146,13 @@ public void track(@Nullable final UUID workspaceId, final String action, final M
.properties(mapCopy));
}

private Optional<String> getAirbyteSource() {
final Optional<HttpRequest<Object>> currentRequest = ServerRequestContext.currentRequest();
if (currentRequest.isPresent()) {
return Optional.ofNullable(currentRequest.get().getHeaders().get(AIRBYTE_ANALYTIC_SOURCE_HEADER));
}

return Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.analytics;

import static io.airbyte.analytics.SegmentTrackingClient.AIRBYTE_ANALYTIC_SOURCE_HEADER;
import static io.airbyte.analytics.SegmentTrackingClient.AIRBYTE_SOURCE;
import static io.airbyte.analytics.SegmentTrackingClient.AIRBYTE_VERSION_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -20,6 +23,9 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.context.ServerRequestContext;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -37,8 +43,8 @@ class SegmentTrackingClientTest {
private static final TrackingIdentity IDENTITY = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), EMAIL, false, false, true);
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final Function<UUID, TrackingIdentity> MOCK_TRACKING_IDENTITY = (workspaceId) -> IDENTITY;
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String JUMP = "jump";
private static final String EMAIL_KEY = "email";

private Analytics analytics;
private SegmentTrackingClient segmentTrackingClient;
Expand Down Expand Up @@ -69,7 +75,7 @@ void testIdentify() {
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("email", IDENTITY.getEmail().get())
.put(EMAIL_KEY, IDENTITY.getEmail().get())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.build();
Expand All @@ -96,7 +102,7 @@ void testIdentifyWithRole() {
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("email", IDENTITY.getEmail().get())
.put(EMAIL_KEY, IDENTITY.getEmail().get())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.build();
Expand Down Expand Up @@ -124,7 +130,7 @@ void testTrackWithMetadata() {
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of(
AIRBYTE_VERSION_KEY, AIRBYTE_VERSION.serialize(),
"email", EMAIL,
EMAIL_KEY, EMAIL,
"height", "80 meters",
"user_id", IDENTITY.getCustomerId());

Expand All @@ -144,6 +150,30 @@ void testTrackNullWorkspace() {
verify(analytics, never()).enqueue(any());
}

@Test
void testTrackAirbyteAnalyticSource() {
final String analyticSource = "test";
final HttpHeaders httpHeaders = mock(HttpHeaders.class);
final HttpRequest<?> httpRequest = mock(HttpRequest.class);

when(httpHeaders.get(AIRBYTE_ANALYTIC_SOURCE_HEADER)).thenReturn(analyticSource);
when(httpRequest.getHeaders()).thenReturn(httpHeaders);
ServerRequestContext.set(httpRequest);

final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of(
AIRBYTE_VERSION_KEY, AIRBYTE_VERSION.serialize(),
EMAIL_KEY, EMAIL,
"height", "80 meters",
"user_id", IDENTITY.getCustomerId());

segmentTrackingClient.track(WORKSPACE_ID, JUMP, metadata);

verify(analytics).enqueue(mockBuilder.capture());
final TrackMessage actual = mockBuilder.getValue().build();
assertEquals(analyticSource, actual.properties().get(AIRBYTE_SOURCE));
}

private static ImmutableMap<String, Object> filterTrackedAtProperty(final Map<String, ?> properties) {
final String trackedAtKey = "tracked_at";
assertTrue(properties.containsKey(trackedAtKey));
Expand Down
2 changes: 2 additions & 0 deletions deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ micronaut-test = "3.8.0"
platform-testcontainers = "1.17.3"
postgresql = "42.3.5"
reactor = "3.5.2"
segment = "2.1.1"
slf4j = "1.7.36"
temporal = "1.17.0"

Expand Down Expand Up @@ -108,6 +109,7 @@ quartz-scheduler = { module = "org.quartz-scheduler:quartz", version = "2.3.2" }
reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" }
s3 = { module = "software.amazon.awssdk:s3", version = "2.16.84" }
segment-java-analytics = { module = "com.segment.analytics.java:analytics", version.ref = "segment" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
spotbugs-annotations = { module = "com.github.spotbugs:spotbugs-annotations", version = "4.7.3" }
temporal-sdk = { module = "io.temporal:temporal-sdk", version.ref = "temporal" }
Expand Down