Skip to content

Commit b03dcea

Browse files
fix failing tests
1 parent cc8ac8f commit b03dcea

File tree

12 files changed

+339
-151
lines changed

12 files changed

+339
-151
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
5959
logLineSuffix = "instance creation for %s".formatted(invocationContext.getTargetClass());
6060
} else if (methodMatcher.matches()) {
6161
String interceptedEvent = methodMatcher.group(1);
62-
logLineSuffix = "execution of @%s method %s.%s".formatted(invocationContext.getExecutable().getDeclaringClass().getSimpleName(),
63-
interceptedEvent, invocationContext.getExecutable().getName());
62+
logLineSuffix = "execution of @%s method %s.%s".formatted(interceptedEvent,
63+
invocationContext.getExecutable().getDeclaringClass().getSimpleName(),
64+
invocationContext.getExecutable().getName());
6465
} else {
6566
logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName);
6667
}

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.fasterxml.jackson.databind.JsonNode;
1111
import com.google.common.collect.ImmutableMap;
1212
import io.airbyte.cdk.integrations.util.HostPortResolver;
13+
import io.airbyte.cdk.testutils.ContainerFactory;
1314
import io.airbyte.commons.json.Jsons;
1415
import java.io.IOException;
1516
import java.util.List;
@@ -20,9 +21,29 @@
2021
import org.testcontainers.containers.JdbcDatabaseContainer;
2122
import org.testcontainers.containers.Network;
2223
import org.testcontainers.images.builder.ImageFromDockerfile;
24+
import org.testcontainers.utility.DockerImageName;
2325

2426
public class SshBastionContainer implements AutoCloseable {
2527

28+
public class SshBastionContainerFactory extends ContainerFactory<GenericContainer<?>> {
29+
30+
@Override
31+
protected GenericContainer<?> createNewContainer(DockerImageName imageName) {
32+
imageName = imageName.asCompatibleSubstituteFor("mcr.microsoft.com/mssql/server");
33+
var container = new GenericContainer(new ImageFromDockerfile("bastion-test")
34+
.withFileFromClasspath("Dockerfile", "bastion/Dockerfile"))
35+
.withExposedPorts(22);
36+
return container;
37+
}
38+
39+
public GenericContainer exclusive(final Network network) {
40+
var container = super.exclusive("bastion-test");
41+
container.withNetwork(network);
42+
return container;
43+
}
44+
45+
}
46+
2647
private static final String SSH_USER = "sshuser";
2748
private static final String SSH_PASSWORD = "secret";
2849
private GenericContainer bastion;

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import java.util.concurrent.ConcurrentHashMap;
1414
import java.util.concurrent.ConcurrentMap;
1515
import java.util.concurrent.atomic.AtomicInteger;
16+
import java.util.function.Consumer;
1617
import java.util.function.Supplier;
1718
import java.util.stream.Stream;
1819
import org.apache.commons.lang3.StringUtils;
20+
import org.apache.commons.lang3.tuple.Pair;
1921
import org.slf4j.Logger;
2022
import org.slf4j.LoggerFactory;
2123
import org.testcontainers.containers.GenericContainer;
@@ -28,11 +30,11 @@
2830
* ContainerFactory is the companion to {@link TestDatabase} and provides it with suitable
2931
* testcontainer instances.
3032
*/
31-
public abstract class ContainerFactory<C extends JdbcDatabaseContainer<?>> {
33+
public abstract class ContainerFactory<C extends GenericContainer<?>> {
3234

3335
static private final Logger LOGGER = LoggerFactory.getLogger(ContainerFactory.class);
3436

35-
private record ContainerKey(Class<? extends ContainerFactory> clazz, DockerImageName imageName, List<String> methods) {};
37+
private record ContainerKey<C extends GenericContainer<?>>(Class<? extends ContainerFactory> clazz, DockerImageName imageName, List<NamedContainerModifier<C>> methods) {};
3638

3739
private static class ContainerOrException {
3840

@@ -67,12 +69,12 @@ GenericContainer<?> container() {
6769

6870
}
6971

70-
private static final ConcurrentMap<ContainerKey, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
72+
private final ConcurrentMap<ContainerKey<C>, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
7173
private static final AtomicInteger containerId = new AtomicInteger(0);
7274

73-
private static final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName, List<String> methods) {
75+
private final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName, List<NamedContainerModifier<C>> containerModifiers) {
7476
return new MdcScope.Builder()
75-
.setLogPrefix("testcontainer %s (%s[%s]):".formatted(containerId.incrementAndGet(), imageName, StringUtils.join(methods, ",")))
77+
.setLogPrefix("testcontainer %s (%s[%s]):".formatted(containerId.incrementAndGet(), imageName, StringUtils.join(containerModifiers, ",")))
7678
.setPrefixColor(LoggingHelper.Color.RED_BACKGROUND);
7779
}
7880

@@ -87,7 +89,18 @@ private static final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageN
8789
*/
8890
@SuppressWarnings("unchecked")
8991
public final C shared(String imageName, String... methods) {
90-
final var containerKey = new ContainerKey(getClass(), DockerImageName.parse(imageName), Stream.of(methods).toList());
92+
return (C) shared(imageName, Stream.of(methods).map(n->new NamedContainerModifier<C>(n, null)).toList());
93+
}
94+
95+
public final C shared(String imageName, NamedContainerModifier<C>... namedContainerModifiers) {
96+
return (C) shared(imageName, List.of(namedContainerModifiers));
97+
}
98+
99+
public final C shared(String imageName) {
100+
return (C) shared(imageName, new ArrayList<>());
101+
}
102+
public final C shared(String imageName, List<NamedContainerModifier<C>> namedContainerModifiers) {
103+
final ContainerKey<C> containerKey = new ContainerKey<>(getClass(), DockerImageName.parse(imageName), namedContainerModifiers);
91104
// We deliberately avoid creating the container itself eagerly during the evaluation of the map
92105
// value.
93106
// Container creation can be exceedingly slow.
@@ -103,38 +116,69 @@ public final C shared(String imageName, String... methods) {
103116
*/
104117
@SuppressWarnings("unchecked")
105118
public final C exclusive(String imageName, String... methods) {
106-
return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(methods).toList());
119+
return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(methods).map(n->new NamedContainerModifier<C>(n, null)).toList());
107120
}
108121

109-
private GenericContainer<?> createAndStartContainer(DockerImageName imageName, List<String> methodNames) {
110-
LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames);
111-
try {
112-
GenericContainer<?> container = createNewContainer(imageName);
113-
final var methods = new ArrayList<Method>();
114-
for (String methodName : methodNames) {
115-
methods.add(getClass().getMethod(methodName, container.getClass()));
122+
public final C exclusive(String imageName) {
123+
return (C) createAndStartContainer(DockerImageName.parse(imageName), new ArrayList<>());
124+
}
125+
126+
public final C exclusive(String imageName, NamedContainerModifier<C>... namedContainerModifiers) {
127+
return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(namedContainerModifiers).toList());
128+
}
129+
130+
public record NamedContainerModifier<C extends GenericContainer<?>>(String name, Consumer<C> modifier){
131+
132+
@Override
133+
public String toString() {
134+
return name;
135+
}
136+
}
137+
138+
private NamedContainerModifier<C> resolveModifierByName(C container, String methodName) {
139+
final ContainerFactory<C> self = this;
140+
Consumer<C> resolvedMethod = c->{
141+
try {
142+
Class<? extends GenericContainer> containerClass = container.getClass();
143+
Method method = self.getClass().getMethod(methodName, containerClass);
144+
method.invoke(self, c);
145+
} catch(NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
146+
throw new RuntimeException(e);
116147
}
117-
final var logConsumer = new Slf4jLogConsumer(LOGGER) {
148+
};
149+
return new NamedContainerModifier<>(methodName, resolvedMethod);
150+
}
118151

119-
public void accept(OutputFrame frame) {
120-
if (frame.getUtf8StringWithoutLineEnding().trim().length() > 0) {
121-
super.accept(frame);
122-
}
123-
}
124152

125-
};
126-
getTestContainerLogMdcBuilder(imageName, methodNames).produceMappings(logConsumer::withMdc);
127-
container.withLogConsumer(logConsumer);
128-
for (Method method : methods) {
129-
LOGGER.info("Calling {} in {} on new shared container based on {}.",
130-
method.getName(), getClass().getName(), imageName);
131-
method.invoke(this, container);
153+
private C createAndStartContainer(DockerImageName imageName, List<NamedContainerModifier<C>> namedContainerModifiers) {
154+
LOGGER.info("Creating new shared container based on {} with {}.", imageName, namedContainerModifiers);
155+
C container = createNewContainer(imageName);
156+
final List<NamedContainerModifier<C>> resolvedNamedContainerModifiers = new ArrayList<>();
157+
for (NamedContainerModifier<C> namedContainerModifier : namedContainerModifiers) {
158+
if (namedContainerModifier.modifier == null) {
159+
resolvedNamedContainerModifiers.add(resolveModifierByName(container, namedContainerModifier.name));
160+
} else {
161+
resolvedNamedContainerModifiers.add(namedContainerModifier);
132162
}
133-
container.start();
134-
return container;
135-
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
136-
throw new RuntimeException(e);
137163
}
164+
final var logConsumer = new Slf4jLogConsumer(LOGGER) {
165+
166+
public void accept(OutputFrame frame) {
167+
if (frame.getUtf8StringWithoutLineEnding().trim().length() > 0) {
168+
super.accept(frame);
169+
}
170+
}
171+
172+
};
173+
getTestContainerLogMdcBuilder(imageName, resolvedNamedContainerModifiers).produceMappings(logConsumer::withMdc);
174+
container.withLogConsumer(logConsumer);
175+
for (NamedContainerModifier<C> resolvedNamedContainerModifier : resolvedNamedContainerModifiers) {
176+
LOGGER.info("Calling {} in {} on new shared container based on {}.",
177+
resolvedNamedContainerModifier.name(), getClass().getName(), imageName);
178+
resolvedNamedContainerModifier.modifier.accept(container);
179+
}
180+
container.start();
181+
return container;
138182
}
139183

140184
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
142142
protected abstract void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages);
143143

144144
@BeforeEach
145-
protected void setup() {
145+
protected void setup() throws Exception {
146146
testdb = createTestDatabase();
147147

148148
// create and populate actual table
@@ -359,6 +359,7 @@ void testDelete() throws Exception {
359359
assertExpectedStateMessages(stateMessages1);
360360

361361
deleteMessageOnIdCol(MODELS_STREAM_NAME, COL_ID, 11);
362+
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, 1);
362363

363364
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(stateMessages1.size() - 1)));
364365
final AutoCloseableIterator<AirbyteMessage> read2 = source()
@@ -388,6 +389,7 @@ void testUpdate() throws Exception {
388389
assertExpectedStateMessages(stateMessages1);
389390

390391
updateCommand(MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11);
392+
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, 1);
391393

392394
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(stateMessages1.size() - 1)));
393395
final AutoCloseableIterator<AirbyteMessage> read2 = source()
@@ -408,7 +410,9 @@ void testUpdate() throws Exception {
408410
// Verify that when data is inserted into the database while a sync is happening and after the first
409411
// sync, it all gets replicated.
410412
protected void testRecordsProducedDuringAndAfterSync() throws Exception {
411-
413+
int recordsCreatedBeforeTestCount = MODEL_RECORDS.size();
414+
int expectedRecords = recordsCreatedBeforeTestCount;
415+
int expectedRecordsInCdc = 0;
412416
final int recordsToCreate = 20;
413417
// first batch of records. 20 created here and 6 created in setup method.
414418
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
@@ -418,6 +422,9 @@ protected void testRecordsProducedDuringAndAfterSync() throws Exception {
418422
"F-" + recordsCreated));
419423
writeModelRecord(record);
420424
}
425+
expectedRecords = expectedRecords + recordsToCreate;
426+
expectedRecordsInCdc = expectedRecordsInCdc + recordsToCreate;
427+
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, expectedRecordsInCdc);
421428

422429
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = source()
423430
.read(config(), getConfiguredCatalog(), null);
@@ -427,7 +434,7 @@ protected void testRecordsProducedDuringAndAfterSync() throws Exception {
427434
assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(stateAfterFirstBatch);
428435
final Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
429436
dataFromFirstBatch);
430-
assertEquals((MODEL_RECORDS.size() + recordsToCreate), recordsFromFirstBatch.size());
437+
assertEquals(expectedRecords, recordsFromFirstBatch.size());
431438

432439
// second batch of records again 20 being created
433440
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
@@ -437,6 +444,9 @@ protected void testRecordsProducedDuringAndAfterSync() throws Exception {
437444
"F-" + recordsCreated));
438445
writeModelRecord(record);
439446
}
447+
expectedRecords = expectedRecords + recordsToCreate;
448+
expectedRecordsInCdc = expectedRecordsInCdc + recordsToCreate;
449+
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, expectedRecordsInCdc);
440450

441451
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1)));
442452
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = source()
@@ -459,10 +469,9 @@ protected void testRecordsProducedDuringAndAfterSync() throws Exception {
459469
final Set<AirbyteRecordMessage> recordsFromSecondBatchWithoutDuplicates = removeDuplicates(
460470
recordsFromSecondBatch);
461471

462-
final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size();
463472
assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(),
464473
"Expected first sync to include records created while the test was running.");
465-
assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount,
474+
assertEquals(expectedRecords,
466475
recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates
467476
.size());
468477
}
@@ -527,6 +536,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
527536
final JsonNode puntoRecord = Jsons
528537
.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto"));
529538
writeModelRecord(puntoRecord);
539+
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, 1);
530540

531541
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(stateMessages1.size() - 1)));
532542
final AutoCloseableIterator<AirbyteMessage> read2 = source()
@@ -547,9 +557,10 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
547557

548558
@Test
549559
// When no records exist, no records are returned.
550-
void testNoData() throws Exception {
560+
public void testNoData() throws Exception {
551561

552562
deleteCommand(MODELS_STREAM_NAME);
563+
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, MODEL_RECORDS.size());
553564
final AutoCloseableIterator<AirbyteMessage> read = source().read(config(), getConfiguredCatalog(), null);
554565
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
555566

@@ -569,15 +580,23 @@ void testNoDataOnSecondSync() throws Exception {
569580
final AutoCloseableIterator<AirbyteMessage> read1 = source()
570581
.read(config(), getConfiguredCatalog(), null);
571582
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
583+
LOGGER.info("SGX actualRecords1 has " + actualRecords1.size() + " records:" + actualRecords1);
584+
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
585+
LOGGER.info("SGX recordMessages1 has " + recordMessages1.size() + " records:" + recordMessages1);
572586
final List<AirbyteStateMessage> stateMessagesFromFirstSync = extractStateMessages(actualRecords1);
587+
LOGGER.info("SGX stateMessagesFromFirstSync has " + stateMessagesFromFirstSync.size() + " records:" + stateMessagesFromFirstSync);
573588
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessagesFromFirstSync.get(stateMessagesFromFirstSync.size() - 1)));
589+
LOGGER.info("SGX state=" + state);
574590

575591
final AutoCloseableIterator<AirbyteMessage> read2 = source()
576592
.read(config(), getConfiguredCatalog(), state);
577593
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
594+
LOGGER.info("SGX actualRecords2 has " + actualRecords2.size() + " records:" + actualRecords2);
578595

579596
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
580597
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
598+
LOGGER.info("SGX recordMessages2 has " + recordMessages2.size() + " records:" + recordMessages2);
599+
LOGGER.info("SGX stateMessages2 has " + stateMessages2.size() + " records:" + stateMessages2);
581600

582601
assertExpectedRecords(Collections.emptySet(), recordMessages2);
583602
assertExpectedStateMessagesFromIncrementalSync(stateMessages2);
@@ -823,4 +842,7 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
823842
return expectedCatalog;
824843
}
825844

845+
protected void waitForCdcRecords(String schemaName, String tableName, int recordCount)
846+
throws Exception {}
847+
826848
}

airbyte-integrations/connectors/source-mssql/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
airbyteJavaConnector {
66
cdkVersionRequired = '0.23.6'
77
features = ['db-sources']
8-
useLocalCdk = false
8+
useLocalCdk = true
99
}
1010

1111
java {

airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,10 @@ protected void initTests() {
123123
.createTablePatternSql(CREATE_TABLE_SQL)
124124
.build());
125125
// TODO SGX re-enable
126-
/*
127-
* addDataTypeTestData( TestDataHolder.builder() .sourceType("real")
128-
* .airbyteType(JsonSchemaType.NUMBER) .addInsertValues("'123'", "'1234567890.1234567'", "null")
129-
* .addExpectedValues("123.0", "1.23456794E9", null) .createTablePatternSql(CREATE_TABLE_SQL)
130-
* .build());
131-
*/
126+
addDataTypeTestData(TestDataHolder.builder().sourceType("real")
127+
.airbyteType(JsonSchemaType.NUMBER).addInsertValues("'123'", "'1234567890.1234567'", "null")
128+
.addExpectedValues("123.0", "1.234568E9", null).createTablePatternSql(CREATE_TABLE_SQL)
129+
.build());
132130
addDataTypeTestData(
133131
TestDataHolder.builder()
134132
.sourceType("date")

0 commit comments

Comments
 (0)