Skip to content

🎉 End-to-end test source: support stream duplication #10298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
- name: E2E Testing
sourceDefinitionId: d53f9084-fa6b-4a5a-976c-5b8392f4ad8a
dockerRepository: airbyte/source-e2e-test
dockerImageTag: 2.0.0
dockerImageTag: 2.1.0
documentationUrl: https://docs.airbyte.io/integrations/sources/e2e-test
icon: airbyte.svg
sourceType: api
Expand Down
22 changes: 17 additions & 5 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-e2e-test:2.0.0"
- dockerImage: "airbyte/source-e2e-test:2.1.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/e2e-test"
connectionSpecification:
Expand Down Expand Up @@ -1529,8 +1529,9 @@
type: "object"
order: 50
oneOf:
- title: "Single Stream"
description: "A catalog with one stream."
- title: "Single Schema"
description: "A catalog with one or multiple streams that share the same\
\ schema."
required:
- "type"
- "stream_name"
Expand All @@ -1554,8 +1555,19 @@
type: "string"
default: "{ \"type\": \"object\", \"properties\": { \"column1\": {\
\ \"type\": \"string\" } } }"
- title: "Multi-Stream"
description: "A catalog with multiple data streams."
stream_duplication:
title: "Duplicate the stream N times"
description: "Duplicate the stream for easy load testing. Each stream\
\ name will have a number suffix. For example, if the stream name\
\ is \"ds\", the duplicated streams will be \"ds_0\", \"ds_1\",\
\ etc."
type: "integer"
default: 1
min: 1
max: 10000
- title: "Multi Schema"
description: "A catalog with multiple data streams, each with a different\
\ schema."
required:
- "type"
- "stream_schemas"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,21 @@ public static <T, E extends Exception> T queryWithTracing(final String name,
}
}

public static ISpan createSpan(final String operationName) {
return createChildSpan(Sentry.getSpan(), operationName, Collections.emptyMap());
}

public static ISpan createSpan(final String operationName, final Map<String, Object> metadata) {
return createChildSpan(Sentry.getSpan(), operationName, metadata);
}

private static ISpan createChildSpan(final ISpan currentSpan, final String operationName, final Map<String, Object> metadata) {
final String name = Strings.isBlank(operationName) ? DEFAULT_UNKNOWN_OPERATION : operationName;
final String spanOperation = Strings.isBlank(operationName) ? DEFAULT_UNKNOWN_OPERATION : operationName;
final ISpan childSpan;
if (currentSpan == null) {
childSpan = Sentry.startTransaction(DEFAULT_ROOT_TRANSACTION, operationName);
childSpan = Sentry.startTransaction(DEFAULT_ROOT_TRANSACTION, spanOperation);
} else {
childSpan = currentSpan.startChild(operationName);
childSpan = currentSpan.startChild(spanOperation);
}
if (metadata != null && !metadata.isEmpty()) {
metadata.forEach(childSpan::setData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ FROM airbyte/integration-base-java:dev
WORKDIR /airbyte

ENV APPLICATION source-e2e-test-cloud
ENV APPLICATION_VERSION 2.0.0
ENV APPLICATION_VERSION 2.1.0
ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.version=2.1.0
LABEL io.airbyte.name=airbyte/source-e2e-test-cloud
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.spec_modification.SpecModifyingSource;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Since 2.0.0, the cloud version is the same as the OSS version. This connector should be removed.
*/
public class CloudTestingSources extends SpecModifyingSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(CloudTestingSources.class);
private static final String CLOUD_TESTING_SOURCES_TITLE = "Cloud E2E Test Source Spec";

public CloudTestingSources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
"order": 50,
"oneOf": [
{
"title": "Single Stream",
"description": "A catalog with one stream.",
"title": "Single Schema",
"description": "A catalog with one or multiple streams that share the same schema.",
"required": ["type", "stream_name", "stream_schema"],
"properties": {
"type": {
Expand All @@ -67,12 +67,20 @@
"description": "A Json schema for the stream. The schema should be compatible with <a href=\"https://json-schema.org/draft-07/json-schema-release-notes.html\">draft-07</a>. See <a href=\"https://cswr.github.io/JsonSchema/spec/introduction/\">this doc</a> for examples.",
"type": "string",
"default": "{ \"type\": \"object\", \"properties\": { \"column1\": { \"type\": \"string\" } } }"
},
"stream_duplication": {
"title": "Duplicate the stream N times",
"description": "Duplicate the stream for easy load testing. Each stream name will have a number suffix. For example, if the stream name is \"ds\", the duplicated streams will be \"ds_0\", \"ds_1\", etc.",
"type": "integer",
"default": 1,
"min": 1,
"max": 10000
}
}
},
{
"title": "Multi-Stream",
"description": "A catalog with multiple data streams.",
"title": "Multi Schema",
"description": "A catalog with multiple data streams, each with a different schema.",
"required": ["type", "stream_schemas"],
"properties": {
"type": {
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-e2e-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ FROM airbyte/integration-base-java:dev
WORKDIR /airbyte

ENV APPLICATION source-e2e-test
ENV APPLICATION_VERSION=2.0.0
ENV APPLICATION_VERSION=2.1.0
ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.version=2.1.0
LABEL io.airbyte.name=airbyte/source-e2e-test
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package io.airbyte.integrations.source.e2e_test;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
Expand All @@ -30,7 +28,6 @@ public class ContinuousFeedConfig {

private static final JsonNode JSON_SCHEMA_DRAFT_07;
private static final JsonSchemaValidator SCHEMA_VALIDATOR = new JsonSchemaValidator();
private static final ObjectMapper MAPPER = MoreMappers.initMapper();

static {
try {
Expand Down Expand Up @@ -72,14 +69,27 @@ static AirbyteCatalog parseMockCatalog(final JsonNode config) throws JsonValidat
case SINGLE_STREAM -> {
final String streamName = mockCatalogConfig.get("stream_name").asText();
final String streamSchemaText = mockCatalogConfig.get("stream_schema").asText();
final int streamDuplication = mockCatalogConfig.has("stream_duplication")
? mockCatalogConfig.get("stream_duplication").asInt()
: 1;
final Optional<JsonNode> streamSchema = Jsons.tryDeserialize(streamSchemaText);
if (streamSchema.isEmpty()) {
throw new JsonValidationException(String.format("Stream \"%s\" has invalid schema: %s", streamName, streamSchemaText));
}
checkSchema(streamName, streamSchema.get());

final AirbyteStream stream = new AirbyteStream().withName(streamName).withJsonSchema(streamSchema.get());
return new AirbyteCatalog().withStreams(Collections.singletonList(stream));
if (streamDuplication == 1) {
final AirbyteStream stream = new AirbyteStream().withName(streamName).withJsonSchema(streamSchema.get());
return new AirbyteCatalog().withStreams(Collections.singletonList(stream));
} else {
final List<AirbyteStream> streams = new ArrayList<>(streamDuplication);
for (int i = 0; i < streamDuplication; ++i) {
streams.add(new AirbyteStream()
.withName(String.join("_", streamName, String.valueOf(i)))
.withJsonSchema(streamSchema.get()));
}
return new AirbyteCatalog().withStreams(streams);
}
}
case MULTI_STREAM -> {
final String streamSchemasText = mockCatalogConfig.get("stream_schemas").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
Expand All @@ -20,10 +21,13 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.sentry.ISpan;
import io.sentry.SpanStatus;
import java.time.Instant;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -68,17 +72,27 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode jsonConfig, fin

final Iterator<AirbyteMessage> streamIterator = new AbstractIterator<>() {

private ISpan span;

@CheckForNull
@Override
protected AirbyteMessage computeNext() {
if (span == null) {
span = AirbyteSentry.createSpan("ReadStream",
Map.of("stream", stream.getStream().getName(), "recordCount", feedConfig.getMaxMessages()));
}

if (emittedMessages.get() >= feedConfig.getMaxMessages()) {
span.finish(SpanStatus.OK);
return endOfData();
}

if (messageIntervalMs.isPresent() && emittedMessages.get() != 0) {
try {
Thread.sleep(messageIntervalMs.get());
} catch (final InterruptedException e) {
span.setThrowable(e);
span.finish(SpanStatus.INTERNAL_ERROR);
throw new RuntimeException(e);
}
}
Expand All @@ -87,6 +101,8 @@ protected AirbyteMessage computeNext() {
try {
data = Jsons.jsonNode(generator.generate(schema, ContinuousFeedConstants.MOCK_JSON_MAX_TREE_SIZE));
} catch (final JsonGeneratorException e) {
span.setThrowable(e);
span.finish(SpanStatus.INTERNAL_ERROR);
throw new RuntimeException(e);
}
emittedMessages.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestingSources extends BaseConnector implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(TestingSources.class);

private final Map<TestingSourceType, Source> sourceMap;

public enum TestingSourceType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
"order": 50,
"oneOf": [
{
"title": "Single Stream",
"description": "A catalog with one stream.",
"title": "Single Schema",
"description": "A catalog with one or multiple streams that share the same schema.",
"required": ["type", "stream_name", "stream_schema"],
"properties": {
"type": {
Expand All @@ -67,12 +67,20 @@
"description": "A Json schema for the stream. The schema should be compatible with <a href=\"https://json-schema.org/draft-07/json-schema-release-notes.html\">draft-07</a>. See <a href=\"https://cswr.github.io/JsonSchema/spec/introduction/\">this doc</a> for examples.",
"type": "string",
"default": "{ \"type\": \"object\", \"properties\": { \"column1\": { \"type\": \"string\" } } }"
},
"stream_duplication": {
"title": "Duplicate the stream N times",
"description": "Duplicate the stream for easy load testing. Each stream name will have a number suffix. For example, if the stream name is \"ds\", the duplicated streams will be \"ds_0\", \"ds_1\", etc.",
"type": "integer",
"default": 1,
"min": 1,
"max": 10000
}
}
},
{
"title": "Multi-Stream",
"description": "A catalog with multiple data streams.",
"title": "Multi Schema",
"description": "A catalog with multiple data streams, each with a different schema.",
"required": ["type", "stream_schemas"],
"properties": {
"type": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ContinuousFeedConfigTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousFeedConfigTest.class);

private static final ObjectMapper MAPPER = MoreMappers.initMapper();

@Test
Expand Down Expand Up @@ -77,10 +73,10 @@ public void testParseMockCatalog(final String testCaseName,
final AirbyteCatalog expectedCatalog)
throws Exception {
if (expectedCatalog == null) {
assertThrows(JsonValidationException.class, () -> ContinuousFeedConfig.parseMockCatalog(mockConfig));
assertThrows(JsonValidationException.class, () -> ContinuousFeedConfig.parseMockCatalog(mockConfig), testCaseName);
} else {
final AirbyteCatalog actualCatalog = ContinuousFeedConfig.parseMockCatalog(mockConfig);
assertEquals(expectedCatalog.getStreams(), actualCatalog.getStreams());
assertEquals(expectedCatalog.getStreams(), actualCatalog.getStreams(), testCaseName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testComplexObjectGeneration(final String testCase, final JsonNode js
final Generator generator = new Generator(CONFIG, schemaStore, RANDOM);
for (int i = 0; i < 10; ++i) {
final JsonNode json = Jsons.jsonNode(generator.generate(schema, ContinuousFeedConstants.MOCK_JSON_MAX_TREE_SIZE));
assertTrue(JSON_VALIDATOR.test(jsonSchema, json));
assertTrue(JSON_VALIDATOR.test(jsonSchema, json), testCase);
}

}
Expand Down
Loading