diff --git a/airbyte-cdk/java/airbyte-cdk/core/build.gradle b/airbyte-cdk/java/airbyte-cdk/core/build.gradle index 00064ec2dfbcc..6d04fbedc7548 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/core/build.gradle @@ -1,3 +1,6 @@ +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.dsl.KotlinVersion + java { // TODO: rewrite code to avoid javac wornings in the first place compileJava { @@ -11,6 +14,18 @@ java { } } +compileTestFixturesKotlin { + compilerOptions { + jvmTarget = JvmTarget.JVM_21 + languageVersion = KotlinVersion.KOTLIN_1_9 + allWarningsAsErrors = false + freeCompilerArgs = ["-Xjvm-default=all"] + } + dependsOn { + tasks.matching { it.name == 'generate' } + } +} + dependencies { api 'com.datadoghq:dd-trace-api:1.28.0' diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 0f84cfd8bf529..1f3888a7ce784 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.28.0 +version=0.28.1 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.java deleted file mode 100644 index a62788dc1ad44..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.extensions; - -import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.HOURS; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.regex.Pattern.CASE_INSENSITIVE; -import static java.util.regex.Pattern.UNICODE_CASE; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.Proxy; -import java.time.Duration; -import java.time.Instant; -import java.time.format.DateTimeParseException; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.extension.DynamicTestInvocationContext; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.InvocationInterceptor; -import org.junit.jupiter.api.extension.ReflectiveInvocationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * By default, junit only output logs to the console, and nothing makes it into log4j logs. This - * class fixes that by using the interceptor facility to print progress and timing information. This - * allows us to have junit loglines in our test logs. This is instanciated via Java's - * ServiceLoader The declaration can be found in - * resources/META-INF/services/org.junit.jupiter.api.extension.Extension - */ -public class LoggingInvocationInterceptor implements InvocationInterceptor { - - private static final Logger LOGGER = LoggerFactory.getLogger(LoggingInvocationInterceptor.class); - private static final String JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME = "JunitMethodExecutionTimeout"; - - private static final class LoggingInvocationInterceptorHandler implements InvocationHandler { - - private static final Pattern methodPattern = Pattern.compile("intercept(.*)Method"); - - @Override - @SuppressWarnings("unchecked") - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - if (LoggingInvocationInterceptor.class.getDeclaredMethod(method.getName(), Invocation.class, ReflectiveInvocationContext.class, - ExtensionContext.class) == null) { - LOGGER.error("Junit LoggingInvocationInterceptor executing unknown interception point {}", method.getName()); - return method.invoke(proxy, args); - } - var invocation = (Invocation) args[0]; - var invocationContext = (ReflectiveInvocationContext) args[1]; - var extensionContext = (ExtensionContext) args[2]; - String methodName = method.getName(); - String logLineSuffix; - Matcher methodMatcher = methodPattern.matcher(methodName); - if (methodName.equals("interceptDynamicTest")) { - logLineSuffix = "execution of DynamicTest %s".formatted(extensionContext.getDisplayName()); - } else if (methodName.equals("interceptTestClassConstructor")) { - logLineSuffix = "instance creation for %s".formatted(invocationContext.getTargetClass()); - } else if (methodMatcher.matches()) { - String interceptedEvent = methodMatcher.group(1); - logLineSuffix = "execution of @%s method %s.%s".formatted(interceptedEvent, - invocationContext.getExecutable().getDeclaringClass().getSimpleName(), - invocationContext.getExecutable().getName()); - } else { - logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName); - } - Thread currentThread = Thread.currentThread(); - TimeoutInteruptor timeoutTask = new TimeoutInteruptor(currentThread); - Instant start = Instant.now(); - try { - final Object retVal; - Duration timeout = getTimeout(invocationContext); - if (timeout != null) { - LOGGER.info("Junit starting {} with timeout of {}", logLineSuffix, DurationFormatUtils.formatDurationWords(timeout.toMillis(), true, true)); - new Timer("TimeoutTimer-" + currentThread.getName(), true).schedule(timeoutTask, timeout.toMillis()); - } else { - LOGGER.warn("Junit starting {} with no timeout", logLineSuffix); - } - retVal = invocation.proceed(); - long elapsedMs = Duration.between(start, Instant.now()).toMillis(); - LOGGER.info("Junit completed {} in {}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true)); - return retVal; - } catch (Throwable t) { - timeoutTask.cancel(); - long elapsedMs = Duration.between(start, Instant.now()).toMillis(); - if (timeoutTask.wasTriggered) { - Throwable t1 = t; - t = new TimeoutException( - "Execution was cancelled after %s. If you think your test should be given more time to complete, you can use the @Timeout annotation. If all the test of a connector are slow, " - + " you can override the property 'JunitMethodExecutionTimeout' in your gradle.properties." - .formatted(DurationFormatUtils.formatDurationWords(elapsedMs, true, true))); - t.initCause(t1); - } - boolean belowCurrentCall = false; - List stackToDisplay = new LinkedList<>(); - for (String stackString : ExceptionUtils.getStackFrames(t)) { - if (stackString.startsWith("\tat ")) { - if (!belowCurrentCall && stackString.contains(LoggingInvocationInterceptor.class.getCanonicalName())) { - belowCurrentCall = true; - } - } else { - belowCurrentCall = false; - } - if (!belowCurrentCall) { - stackToDisplay.add(stackString); - } - } - String stackTrace = StringUtils.join(stackToDisplay, "\n "); - LOGGER.error("Junit exception throw during {} after {}:\n{}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true), - stackTrace); - throw t; - } finally { - timeoutTask.cancel(); - } - } - - private static class TimeoutInteruptor extends TimerTask { - - private final Thread parentThread; - volatile boolean wasTriggered = false; - - TimeoutInteruptor(Thread parentThread) { - this.parentThread = parentThread; - } - - @Override - public void run() { - wasTriggered = true; - parentThread.interrupt(); - } - - public boolean cancel() { - return super.cancel(); - } - - } - - private static final Pattern PATTERN = Pattern.compile("([1-9]\\d*) *((?:[nμm]?s)|m|h|d)?", - CASE_INSENSITIVE | UNICODE_CASE); - private static final Map UNITS_BY_ABBREVIATION; - - static { - Map unitsByAbbreviation = new HashMap<>(); - unitsByAbbreviation.put("ns", NANOSECONDS); - unitsByAbbreviation.put("μs", MICROSECONDS); - unitsByAbbreviation.put("ms", MILLISECONDS); - unitsByAbbreviation.put("s", SECONDS); - unitsByAbbreviation.put("m", MINUTES); - unitsByAbbreviation.put("h", HOURS); - unitsByAbbreviation.put("d", DAYS); - UNITS_BY_ABBREVIATION = Collections.unmodifiableMap(unitsByAbbreviation); - } - - static Duration parseDuration(String text) throws DateTimeParseException { - Matcher matcher = PATTERN.matcher(text.trim()); - if (matcher.matches()) { - long value = Long.parseLong(matcher.group(1)); - String unitAbbreviation = matcher.group(2); - TimeUnit unit = unitAbbreviation == null ? SECONDS - : UNITS_BY_ABBREVIATION.get(unitAbbreviation.toLowerCase(Locale.ENGLISH)); - return Duration.ofSeconds(unit.toSeconds(value)); - } - throw new DateTimeParseException("Timeout duration is not in the expected format ( [ns|μs|ms|s|m|h|d])", - text, 0); - } - - private static Duration getTimeout(ReflectiveInvocationContext invocationContext) { - Duration timeout = null; - if (invocationContext.getExecutable()instanceof Method m) { - Timeout timeoutAnnotation = m.getAnnotation(Timeout.class); - if (timeoutAnnotation == null) { - timeoutAnnotation = invocationContext.getTargetClass().getAnnotation(Timeout.class); - } - if (timeoutAnnotation != null) { - timeout = Duration.ofMillis(timeoutAnnotation.unit().toMillis(timeoutAnnotation.value())); - } - } - if (timeout == null) { - timeout = parseDuration(System.getProperty(JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME)); - } - return timeout; - } - - } - - private final InvocationInterceptor proxy = (InvocationInterceptor) Proxy.newProxyInstance( - getClass().getClassLoader(), - new Class[] {InvocationInterceptor.class}, - new LoggingInvocationInterceptorHandler()); - - @Override - public void interceptAfterAllMethod(Invocation invocation, - ReflectiveInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - proxy.interceptAfterAllMethod(invocation, invocationContext, extensionContext); - } - - @Override - public void interceptAfterEachMethod(Invocation invocation, - ReflectiveInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - proxy.interceptAfterEachMethod(invocation, invocationContext, extensionContext); - } - - @Override - public void interceptBeforeAllMethod(Invocation invocation, - ReflectiveInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - proxy.interceptBeforeAllMethod(invocation, invocationContext, extensionContext); - } - - @Override - public void interceptBeforeEachMethod(Invocation invocation, - ReflectiveInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - proxy.interceptBeforeEachMethod(invocation, invocationContext, extensionContext); - } - - @Override - public void interceptDynamicTest(Invocation invocation, - DynamicTestInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - proxy.interceptDynamicTest(invocation, invocationContext, extensionContext); - } - - @Override - public void interceptTestMethod(Invocation invocation, - ReflectiveInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - if (!Modifier.isPublic(invocationContext.getExecutable().getModifiers())) { - LOGGER.warn("Junit method {}.{} is not declared as public", invocationContext.getExecutable().getDeclaringClass().getCanonicalName(), - invocationContext.getExecutable().getName()); - } - proxy.interceptTestMethod(invocation, invocationContext, extensionContext); - } - - @Override - public void interceptTestTemplateMethod(Invocation invocation, - ReflectiveInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - proxy.interceptTestTemplateMethod(invocation, invocationContext, extensionContext); - } - - @Override - public T interceptTestFactoryMethod(Invocation invocation, - ReflectiveInvocationContext invocationContext, - ExtensionContext extensionContext) - throws Throwable { - return proxy.interceptTestFactoryMethod(invocation, invocationContext, extensionContext); - } - - @Override - public T interceptTestClassConstructor(Invocation invocation, - ReflectiveInvocationContext> invocationContext, - ExtensionContext extensionContext) - throws Throwable { - return proxy.interceptTestClassConstructor(invocation, invocationContext, extensionContext); - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.java deleted file mode 100644 index 1770dca4905e8..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.base.ssh; - -import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH; -import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.integrations.util.HostPortResolver; -import io.airbyte.cdk.testutils.ContainerFactory; -import io.airbyte.commons.json.Jsons; -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.function.Consumer; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.JdbcDatabaseContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.images.builder.ImageFromDockerfile; -import org.testcontainers.utility.DockerImageName; - -public class SshBastionContainer implements AutoCloseable { - - public static class SshBastionContainerFactory extends ContainerFactory> { - - @Override - protected GenericContainer createNewContainer(DockerImageName imageName) { - var container = new GenericContainer(new ImageFromDockerfile("bastion-test") - .withFileFromClasspath("Dockerfile", "bastion/Dockerfile")) - .withExposedPorts(22); - return container; - } - - public GenericContainer exclusive(final Network network) { - Consumer> imageModifier = c -> { - c.withNetwork(network); - }; - var container = super.exclusive("bastion-test", new NamedContainerModifierImpl<>("withNetwork", imageModifier)); - return container; - } - - } - - private static final SshBastionContainerFactory factory = new SshBastionContainerFactory(); - - private static final String SSH_USER = "sshuser"; - private static final String SSH_PASSWORD = "secret"; - private GenericContainer bastion; - - public void initAndStartBastion(final Network network) { - bastion = factory.exclusive(network); - bastion.start(); - } - - public JsonNode getTunnelMethod(final SshTunnel.TunnelMethod tunnelMethod, - final boolean innerAddress) - throws IOException, InterruptedException { - final var containerAddress = innerAddress ? getInnerContainerAddress(bastion) : getOuterContainerAddress(bastion); - return Jsons.jsonNode(ImmutableMap.builder() - .put("tunnel_host", - Objects.requireNonNull(containerAddress.left)) - .put("tunnel_method", tunnelMethod) - .put("tunnel_port", containerAddress.right) - .put("tunnel_user", SSH_USER) - .put("tunnel_user_password", tunnelMethod.equals(SSH_PASSWORD_AUTH) ? SSH_PASSWORD : "") - .put("ssh_key", tunnelMethod.equals(SSH_KEY_AUTH) ? bastion.execInContainer("cat", "var/bastion/id_rsa").getStdout() : "") - .build()); - } - - public JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, - final ImmutableMap.Builder builderWithSchema, - final boolean innerAddress) - throws IOException, InterruptedException { - return Jsons.jsonNode(builderWithSchema - .put("tunnel_method", getTunnelMethod(tunnelMethod, innerAddress)) - .build()); - } - - public ImmutableMap.Builder getBasicDbConfigBuider(final JdbcDatabaseContainer db) { - return getBasicDbConfigBuider(db, db.getDatabaseName()); - } - - public ImmutableMap.Builder getBasicDbConfigBuider(final JdbcDatabaseContainer db, final List schemas) { - return getBasicDbConfigBuider(db, db.getDatabaseName()).put("schemas", schemas); - } - - public ImmutableMap.Builder getBasicDbConfigBuider(final JdbcDatabaseContainer db, final String schemaName) { - return ImmutableMap.builder() - .put("host", Objects.requireNonNull(HostPortResolver.resolveHost(db))) - .put("username", db.getUsername()) - .put("password", db.getPassword()) - .put("port", HostPortResolver.resolvePort(db)) - .put("database", schemaName) - .put("ssl", false); - } - - public void stopAndCloseContainers(final JdbcDatabaseContainer db) { - bastion.stop(); - bastion.close(); - db.stop(); - db.close(); - } - - public void stopAndClose() { - bastion.close(); - } - - @Override - public void close() { - stopAndClose(); - } - - public GenericContainer getContainer() { - return bastion; - } - - /** - * Returns the inner docker network ip address and port of a container. This can be used to reach a - * container from another container running on the same network - * - * @param container container - * @return a pair of host and port - */ - public static ImmutablePair getInnerContainerAddress(final Container container) { - return ImmutablePair.of( - container.getContainerInfo().getNetworkSettings().getNetworks().entrySet().stream().findFirst().get().getValue().getIpAddress(), - (Integer) container.getExposedPorts().stream().findFirst().get()); - } - - /** - * Returns the outer docker network ip address and port of a container. This can be used to reach a - * container from the host machine - * - * @param container container - * @return a pair of host and port - */ - public static ImmutablePair getOuterContainerAddress(final Container container) { - return ImmutablePair.of( - container.getHost(), - container.getFirstMappedPort()); - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/integrations/util/HostPortResolver.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/integrations/util/HostPortResolver.java deleted file mode 100644 index 4d29d36c98484..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/integrations/util/HostPortResolver.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.util; - -import java.util.Objects; -import org.testcontainers.containers.GenericContainer; - -public class HostPortResolver { - - public static String resolveHost(GenericContainer container) { - return getIpAddress(container); - } - - public static int resolvePort(GenericContainer container) { - return (Integer) container.getExposedPorts().stream().findFirst().get(); - } - - public static String resolveIpAddress(GenericContainer container) { - return getIpAddress(container); - } - - private static String getIpAddress(GenericContainer container) { - return Objects.requireNonNull(container.getContainerInfo() - .getNetworkSettings() - .getNetworks() - .entrySet().stream() - .findFirst() - .get().getValue().getIpAddress()); - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java deleted file mode 100644 index 0cc6977641061..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.testutils; - -import com.google.common.collect.Lists; -import io.airbyte.commons.logging.LoggingHelper; -import io.airbyte.commons.logging.MdcScope; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.stream.Stream; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.OutputFrame; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.utility.DockerImageName; - -/** - * ContainerFactory is the companion to {@link TestDatabase} and provides it with suitable - * testcontainer instances. - */ -public abstract class ContainerFactory> { - - static private final Logger LOGGER = LoggerFactory.getLogger(ContainerFactory.class); - - private record ContainerKey> (Class clazz, - DockerImageName imageName, - List> methods) {} - - ; - - private static class ContainerOrException { - - private final Supplier> containerSupplier; - private volatile RuntimeException _exception = null; - private volatile GenericContainer _container = null; - - ContainerOrException(Supplier> containerSupplier) { - this.containerSupplier = containerSupplier; - } - - GenericContainer container() { - if (_exception == null && _container == null) { - synchronized (this) { - if (_container == null && _exception == null) { - try { - _container = containerSupplier.get(); - if (_container == null) { - throw new IllegalStateException("testcontainer instance was not constructed"); - } - } catch (RuntimeException e) { - _exception = e; - } - } - } - } - if (_exception != null) { - throw _exception; - } - return _container; - } - - } - - private static final ConcurrentMap, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>(); - private static final AtomicInteger containerId = new AtomicInteger(0); - - private final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName, - List> containerModifiers) { - return new MdcScope.Builder() - .setLogPrefix("testcontainer %s (%s[%s]):".formatted(containerId.incrementAndGet(), imageName, StringUtils.join(containerModifiers, ","))) - .setPrefixColor(LoggingHelper.Color.RED_BACKGROUND); - } - - /** - * Creates a new, unshared testcontainer instance. This usually wraps the default constructor for - * the testcontainer type. - */ - protected abstract C createNewContainer(DockerImageName imageName); - - /** - * Returns a shared instance of the testcontainer. - * - * @Deprecated use shared(String, NamedContainerModifier) instead - */ - @Deprecated - public final C shared(String imageName, String... methods) { - return shared(imageName, - Stream.of(methods).map(n -> new NamedContainerModifierImpl(n, resolveModifierByName(n))).toList()); - } - - public final C shared(String imageName, NamedContainerModifier... namedContainerModifiers) { - return shared(imageName, List.of(namedContainerModifiers)); - } - - public final C shared(String imageName) { - return shared(imageName, new ArrayList<>()); - } - - public final C shared(String imageName, List> namedContainerModifiers) { - final ContainerKey containerKey = new ContainerKey<>(getClass(), DockerImageName.parse(imageName), namedContainerModifiers); - // We deliberately avoid creating the container itself eagerly during the evaluation of the map - // value. - // Container creation can be exceedingly slow. - // Furthermore, we need to handle exceptions raised during container creation. - ContainerOrException containerOrError = SHARED_CONTAINERS.computeIfAbsent(containerKey, - key -> new ContainerOrException(() -> createAndStartContainer(key.imageName(), ((ContainerKey) key).methods()))); - // Instead, the container creation (if applicable) is deferred to here. - return (C) containerOrError.container(); - } - - /** - * Returns an exclusive instance of the testcontainer. - * - * @Deprecated use exclusive(String, NamedContainerModifier) instead - */ - @SuppressWarnings("unchecked") - @Deprecated - public final C exclusive(String imageName, String... methods) { - return exclusive(imageName, Stream.of(methods).map(n -> new NamedContainerModifierImpl(n, resolveModifierByName(n))).toList()); - } - - public final C exclusive(String imageName) { - return exclusive(imageName, new ArrayList<>()); - } - - public final C exclusive(String imageName, NamedContainerModifier... namedContainerModifiers) { - return exclusive(imageName, List.of(namedContainerModifiers)); - } - - public final C exclusive(String imageName, List> namedContainerModifiers) { - return (C) createAndStartContainer(DockerImageName.parse(imageName), namedContainerModifiers); - } - - public interface NamedContainerModifier> { - - String name(); - - Consumer modifier(); - - } - - public record NamedContainerModifierImpl> (String name, Consumer method) implements NamedContainerModifier { - - public String name() { - return name; - } - - public Consumer modifier() { - return method; - } - - } - - private Consumer resolveModifierByName(String methodName) { - final ContainerFactory self = this; - Consumer resolvedMethod = c -> { - try { - Class containerClass = c.getClass(); - Method method = self.getClass().getMethod(methodName, containerClass); - method.invoke(self, c); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - }; - return resolvedMethod; - } - - private C createAndStartContainer(DockerImageName imageName, List> namedContainerModifiers) { - LOGGER.info("Creating new container based on {} with {}.", imageName, Lists.transform(namedContainerModifiers, c -> c.name())); - C container = createNewContainer(imageName); - final var logConsumer = new Slf4jLogConsumer(LOGGER) { - - public void accept(OutputFrame frame) { - if (frame.getUtf8StringWithoutLineEnding().trim().length() > 0) { - super.accept(frame); - } - } - - }; - getTestContainerLogMdcBuilder(imageName, namedContainerModifiers).produceMappings(logConsumer::withMdc); - container.withLogConsumer(logConsumer); - for (NamedContainerModifier resolvedNamedContainerModifier : namedContainerModifiers) { - LOGGER.info("Calling {} in {} on new container based on {}.", - resolvedNamedContainerModifier.name(), getClass().getName(), imageName); - resolvedNamedContainerModifier.modifier().accept(container); - } - container.start(); - return container; - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/DatabaseConnectionHelper.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/DatabaseConnectionHelper.java deleted file mode 100644 index da503eb21dfbb..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/DatabaseConnectionHelper.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.testutils; - -import io.airbyte.cdk.db.factory.DSLContextFactory; -import io.airbyte.cdk.db.factory.DataSourceFactory; -import javax.sql.DataSource; -import org.jooq.DSLContext; -import org.jooq.SQLDialect; -import org.testcontainers.containers.JdbcDatabaseContainer; - -/** - * Helper class that facilitates the creation of database connection objects for testing purposes. - */ -public class DatabaseConnectionHelper { - - /** - * Constructs a new {@link DataSource} using the provided configuration. - * - * @param container A JDBC Test Container instance. - * @return The configured {@link DataSource}. - */ - public static DataSource createDataSource(final JdbcDatabaseContainer container) { - return DataSourceFactory.create(container.getUsername(), - container.getPassword(), - container.getDriverClassName(), - container.getJdbcUrl()); - } - - /** - * Constructs a configured {@link DSLContext} instance using the provided configuration. - * - * @param container A JDBC Test Container instance. - * @param dialect The SQL dialect to use with objects created from this context. - * @return The configured {@link DSLContext}. - */ - public static DSLContext createDslContext(final JdbcDatabaseContainer container, final SQLDialect dialect) { - return DSLContextFactory.create( - container.getUsername(), - container.getPassword(), - container.getDriverClassName(), - container.getJdbcUrl(), - dialect); - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/NonContainer.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/NonContainer.java deleted file mode 100644 index badf004d4f990..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/NonContainer.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.testutils; - -import org.testcontainers.containers.JdbcDatabaseContainer; - -/** - * This is used when a source (such as Snowflake) relies on an always-on resource and therefore - * doesn't need an actual container. compatible - */ -public class NonContainer extends JdbcDatabaseContainer { - - private final String username; - private final String password; - private final String jdbcUrl; - - private final String driverClassName; - - public NonContainer(final String userName, - final String password, - final String jdbcUrl, - final String driverClassName, - final String dockerImageName) { - super(dockerImageName); - this.username = userName; - this.password = password; - this.jdbcUrl = jdbcUrl; - this.driverClassName = driverClassName; - } - - @Override - public String getDriverClassName() { - return driverClassName; - } - - @Override - public String getJdbcUrl() { - return jdbcUrl; - } - - @Override - public String getUsername() { - return username; - } - - @Override - public String getPassword() { - return password; - } - - @Override - protected String getTestQueryString() { - return "SELECT 1"; - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java deleted file mode 100644 index 3ee1d0e9b0d1c..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.testutils; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.db.ContextQueryFunction; -import io.airbyte.cdk.db.Database; -import io.airbyte.cdk.db.factory.DSLContextFactory; -import io.airbyte.cdk.db.factory.DataSourceFactory; -import io.airbyte.cdk.db.factory.DatabaseDriver; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.JdbcConnector; -import io.airbyte.cdk.integrations.util.HostPortResolver; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.sql.SQLException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; -import javax.sql.DataSource; -import org.jooq.DSLContext; -import org.jooq.SQLDialect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.JdbcDatabaseContainer; - -/** - * TestDatabase provides a convenient pattern for interacting with databases when testing SQL - * database sources. The basic idea is to share the same database testcontainer instance for all - * tests and to use SQL constructs such as DATABASE and USER to isolate each test case's state. - * - * @param the type of the backing testcontainer. - * @param itself - * @param the type of the object returned by {@link #configBuilder()} - */ -abstract public class TestDatabase, T extends TestDatabase, B extends TestDatabase.ConfigBuilder> - implements AutoCloseable { - - static private final Logger LOGGER = LoggerFactory.getLogger(TestDatabase.class); - - final private C container; - final private String suffix; - final private ArrayList cleanupSQL = new ArrayList<>(); - final private Map connectionProperties = new HashMap<>(); - - private volatile DataSource dataSource; - private volatile DSLContext dslContext; - - protected final int databaseId; - private static final AtomicInteger nextDatabaseId = new AtomicInteger(0); - - protected final int containerId; - private static final AtomicInteger nextContainerId = new AtomicInteger(0); - private static final Map containerUidToId = new ConcurrentHashMap<>(); - - @SuppressWarnings("this-escape") - protected TestDatabase(C container) { - this.container = container; - this.suffix = Strings.addRandomSuffix("", "_", 10); - this.databaseId = nextDatabaseId.getAndIncrement(); - this.containerId = containerUidToId.computeIfAbsent(container.getContainerId(), k -> nextContainerId.getAndIncrement()); - LOGGER.info(formatLogLine("creating database " + getDatabaseName())); - } - - private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); - - protected String formatLogLine(String logLine) { - String retVal = "TestDatabase databaseId=" + databaseId + ", containerId=" + containerId + " - " + logLine; - return retVal; - } - - @SuppressWarnings("unchecked") - protected T self() { - return (T) this; - } - - /** - * Adds a key-value pair to the JDBC URL's query parameters. - */ - public T withConnectionProperty(String key, String value) { - if (isInitialized()) { - throw new RuntimeException("TestDatabase instance is already initialized"); - } - connectionProperties.put(key, value); - return self(); - } - - /** - * Enqueues a SQL statement to be executed when this object is closed. - */ - public T onClose(String fmtSql, Object... fmtArgs) { - cleanupSQL.add(String.format(fmtSql, fmtArgs)); - return self(); - } - - /** - * Executes a SQL statement after calling String.format on the arguments. - */ - public T with(String fmtSql, Object... fmtArgs) { - execSQL(Stream.of(String.format(fmtSql, fmtArgs))); - return self(); - } - - /** - * Executes SQL statements as root to provide the necessary isolation for the lifetime of this - * object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes the - * {@link DataSource} and {@link DSLContext} owned by this object. - */ - public T initialized() { - inContainerBootstrapCmd().forEach(this::execInContainer); - this.dataSource = DataSourceFactory.create( - getUserName(), - getPassword(), - getDatabaseDriver().getDriverClassName(), - getJdbcUrl(), - connectionProperties, - JdbcConnector.getConnectionTimeout(connectionProperties, getDatabaseDriver().getDriverClassName())); - this.dslContext = DSLContextFactory.create(dataSource, getSqlDialect()); - return self(); - } - - final public boolean isInitialized() { - return dslContext != null; - } - - abstract protected Stream> inContainerBootstrapCmd(); - - abstract protected Stream inContainerUndoBootstrapCmd(); - - abstract public DatabaseDriver getDatabaseDriver(); - - abstract public SQLDialect getSqlDialect(); - - final public C getContainer() { - return container; - } - - public String withNamespace(String name) { - return name + suffix; - } - - public String getDatabaseName() { - return withNamespace("db"); - } - - public String getUserName() { - return withNamespace("user"); - } - - public String getPassword() { - return "password"; - } - - public DataSource getDataSource() { - if (!isInitialized()) { - throw new RuntimeException("TestDatabase instance is not yet initialized"); - } - return dataSource; - } - - final public DSLContext getDslContext() { - if (!isInitialized()) { - throw new RuntimeException("TestDatabase instance is not yet initialized"); - } - return dslContext; - } - - public String getJdbcUrl() { - return String.format( - getDatabaseDriver().getUrlFormatString(), - getContainer().getHost(), - getContainer().getFirstMappedPort(), - getDatabaseName()); - } - - public Database getDatabase() { - return new Database(getDslContext()); - } - - protected void execSQL(final Stream sql) { - try { - getDatabase().query(ctx -> { - sql.forEach(statement -> { - LOGGER.info("executing SQL statement {}", statement); - ctx.execute(statement); - }); - return null; - }); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - protected void execInContainer(Stream cmds) { - final List cmd = cmds.toList(); - if (cmd.isEmpty()) { - return; - } - try { - LOGGER.info(formatLogLine(String.format("executing command %s", Strings.join(cmd, " ")))); - final var exec = getContainer().execInContainer(cmd.toArray(new String[0])); - if (exec.getExitCode() == 0) { - LOGGER.info(formatLogLine(String.format("execution success\nstdout:\n%s\nstderr:\n%s", exec.getStdout(), exec.getStderr()))); - } else { - LOGGER.error(formatLogLine( - String.format("execution failure, code %s\nstdout:\n%s\nstderr:\n%s", exec.getExitCode(), exec.getStdout(), exec.getStderr()))); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - public X query(final ContextQueryFunction transform) throws SQLException { - return getDatabase().query(transform); - } - - public X transaction(final ContextQueryFunction transform) throws SQLException { - return getDatabase().transaction(transform); - } - - /** - * Returns a builder for the connector config object. - */ - public B configBuilder() { - return new ConfigBuilder(self()).self(); - } - - public B testConfigBuilder() { - return configBuilder() - .withHostAndPort() - .withCredentials() - .withDatabase(); - } - - public B integrationTestConfigBuilder() { - return configBuilder() - .withResolvedHostAndPort() - .withCredentials() - .withDatabase(); - } - - @Override - public void close() { - execSQL(this.cleanupSQL.stream()); - execInContainer(inContainerUndoBootstrapCmd()); - LOGGER.info("closing database databaseId=" + databaseId); - } - - static public class ConfigBuilder, B extends ConfigBuilder> { - - static public final Duration DEFAULT_CDC_REPLICATION_INITIAL_WAIT = Duration.ofSeconds(5); - - protected final ImmutableMap.Builder builder = ImmutableMap.builder(); - protected final T testDatabase; - - protected ConfigBuilder(T testDatabase) { - this.testDatabase = testDatabase; - } - - public JsonNode build() { - return Jsons.jsonNode(builder.build()); - } - - @SuppressWarnings("unchecked") - final protected B self() { - return (B) this; - } - - public B with(Object key, Object value) { - builder.put(key, value); - return self(); - } - - public B withDatabase() { - return this - .with(JdbcUtils.DATABASE_KEY, testDatabase.getDatabaseName()); - } - - public B withCredentials() { - return this - .with(JdbcUtils.USERNAME_KEY, testDatabase.getUserName()) - .with(JdbcUtils.PASSWORD_KEY, testDatabase.getPassword()); - } - - public B withResolvedHostAndPort() { - return this - .with(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(testDatabase.getContainer())) - .with(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(testDatabase.getContainer())); - } - - public B withHostAndPort() { - return this - .with(JdbcUtils.HOST_KEY, testDatabase.getContainer().getHost()) - .with(JdbcUtils.PORT_KEY, testDatabase.getContainer().getFirstMappedPort()); - } - - public B withoutSsl() { - return with(JdbcUtils.SSL_KEY, false); - } - - public B withSsl(Map sslMode) { - return with(JdbcUtils.SSL_KEY, true).with(JdbcUtils.SSL_MODE_KEY, sslMode); - } - - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt new file mode 100644 index 0000000000000..ef73027ad87fe --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt @@ -0,0 +1,335 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.extensions + +import java.lang.reflect.* +import java.time.Duration +import java.time.Instant +import java.time.format.DateTimeParseException +import java.util.* +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import java.util.regex.Pattern +import kotlin.concurrent.Volatile +import org.apache.commons.lang3.StringUtils +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.commons.lang3.time.DurationFormatUtils +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.extension.DynamicTestInvocationContext +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.api.extension.InvocationInterceptor +import org.junit.jupiter.api.extension.ReflectiveInvocationContext +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * By default, junit only output logs to the console, and nothing makes it into log4j logs. This + * class fixes that by using the interceptor facility to print progress and timing information. This + * allows us to have junit loglines in our test logs. This is instanciated via + * [Java's + * ServiceLoader](https://docs.oracle.com/javase%2F9%2Fdocs%2Fapi%2F%2F/java/util/ServiceLoader.html) + * The declaration can be found in + * resources/META-INF/services/org.junit.jupiter.api.extension.Extension + */ +class LoggingInvocationInterceptor : InvocationInterceptor { + private class LoggingInvocationInterceptorHandler : InvocationHandler { + @Throws(Throwable::class) + override fun invoke(proxy: Any?, method: Method?, args: Array?): Any? { + if ( + LoggingInvocationInterceptor::class + .java + .getDeclaredMethod( + method!!.name, + InvocationInterceptor.Invocation::class.java, + ReflectiveInvocationContext::class.java, + ExtensionContext::class.java + ) == null + ) { + LOGGER!!.error( + "Junit LoggingInvocationInterceptor executing unknown interception point {}", + method.name + ) + return method.invoke(proxy, *(args!!)) + } + val invocation = args!![0] as InvocationInterceptor.Invocation<*>? + val invocationContext = args[1] as ReflectiveInvocationContext<*>? + val extensionContext = args[2] as ExtensionContext? + val methodName = method.name + val logLineSuffix: String? + val methodMatcher = methodPattern!!.matcher(methodName) + if (methodName == "interceptDynamicTest") { + logLineSuffix = + "execution of DynamicTest %s".formatted(extensionContext!!.displayName) + } else if (methodName == "interceptTestClassConstructor") { + logLineSuffix = + "instance creation for %s".formatted(invocationContext!!.targetClass) + } else if (methodMatcher.matches()) { + val interceptedEvent = methodMatcher.group(1) + logLineSuffix = + "execution of @%s method %s.%s".formatted( + interceptedEvent, + invocationContext!!.executable!!.declaringClass.simpleName, + invocationContext.executable!!.name + ) + } else { + logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName) + } + val currentThread = Thread.currentThread() + val timeoutTask = TimeoutInteruptor(currentThread) + val start = Instant.now() + try { + val timeout = getTimeout(invocationContext) + if (timeout != null) { + LOGGER!!.info( + "Junit starting {} with timeout of {}", + logLineSuffix, + DurationFormatUtils.formatDurationWords(timeout.toMillis(), true, true) + ) + Timer("TimeoutTimer-" + currentThread.name, true) + .schedule(timeoutTask, timeout.toMillis()) + } else { + LOGGER!!.warn("Junit starting {} with no timeout", logLineSuffix) + } + val retVal = invocation!!.proceed() + val elapsedMs = Duration.between(start, Instant.now()).toMillis() + LOGGER.info( + "Junit completed {} in {}", + logLineSuffix, + DurationFormatUtils.formatDurationWords(elapsedMs, true, true) + ) + return retVal + } catch (throwable: Throwable) { + timeoutTask.cancel() + val elapsedMs = Duration.between(start, Instant.now()).toMillis() + var t1: Throwable + if (timeoutTask.wasTriggered) { + t1 = + TimeoutException( + "Execution was cancelled after %s. If you think your test should be given more time to complete, you can use the @Timeout annotation. If all the test of a connector are slow, " + + " you can override the property 'JunitMethodExecutionTimeout' in your gradle.properties.".formatted( + DurationFormatUtils.formatDurationWords(elapsedMs, true, true) + ) + ) + t1.initCause(throwable) + } else { + t1 = throwable + } + var belowCurrentCall = false + val stackToDisplay: MutableList = LinkedList() + for (stackString in ExceptionUtils.getStackFrames(throwable)) { + if (stackString!!.startsWith("\tat ")) { + if ( + !belowCurrentCall && + stackString.contains( + LoggingInvocationInterceptor::class.java.canonicalName + ) + ) { + belowCurrentCall = true + } + } else { + belowCurrentCall = false + } + if (!belowCurrentCall) { + stackToDisplay.add(stackString) + } + } + val stackTrace = StringUtils.join(stackToDisplay, "\n ") + LOGGER!!.error( + "Junit exception throw during {} after {}:\n{}", + logLineSuffix, + DurationFormatUtils.formatDurationWords(elapsedMs, true, true), + stackTrace + ) + throw t1 + } finally { + timeoutTask.cancel() + } + } + + private class TimeoutInteruptor(private val parentThread: Thread?) : TimerTask() { + @Volatile var wasTriggered: Boolean = false + + override fun run() { + wasTriggered = true + parentThread!!.interrupt() + } + + override fun cancel(): Boolean { + return super.cancel() + } + } + + companion object { + private val methodPattern: Pattern? = Pattern.compile("intercept(.*)Method") + + private val PATTERN: Pattern? = + Pattern.compile( + "([1-9]\\d*) *((?:[nμm]?s)|m|h|d)?", + Pattern.CASE_INSENSITIVE or Pattern.UNICODE_CASE + ) + private val UNITS_BY_ABBREVIATION: MutableMap? + + init { + val unitsByAbbreviation: MutableMap = HashMap() + unitsByAbbreviation["ns"] = TimeUnit.NANOSECONDS + unitsByAbbreviation["μs"] = TimeUnit.MICROSECONDS + unitsByAbbreviation["ms"] = TimeUnit.MILLISECONDS + unitsByAbbreviation["s"] = TimeUnit.SECONDS + unitsByAbbreviation["m"] = TimeUnit.MINUTES + unitsByAbbreviation["h"] = TimeUnit.HOURS + unitsByAbbreviation["d"] = TimeUnit.DAYS + UNITS_BY_ABBREVIATION = Collections.unmodifiableMap(unitsByAbbreviation) + } + + @Throws(DateTimeParseException::class) + fun parseDuration(text: String?): Duration? { + val matcher = PATTERN!!.matcher(text!!.trim { it <= ' ' }) + if (matcher.matches()) { + val value = matcher.group(1).toLong() + val unitAbbreviation = matcher.group(2) + val unit = + if (unitAbbreviation == null) TimeUnit.SECONDS + else UNITS_BY_ABBREVIATION!![unitAbbreviation.lowercase()] + return Duration.ofSeconds(unit!!.toSeconds(value)) + } + throw DateTimeParseException( + "Timeout duration is not in the expected format ( [ns|μs|ms|s|m|h|d])", + text, + 0 + ) + } + + private fun getTimeout(invocationContext: ReflectiveInvocationContext<*>?): Duration? { + var timeout: Duration? = null + var m = invocationContext!!.executable + if (m is Method) { + var timeoutAnnotation: Timeout? = m.getAnnotation(Timeout::class.java) + if (timeoutAnnotation == null) { + timeoutAnnotation = + invocationContext.targetClass.getAnnotation(Timeout::class.java) + } + if (timeoutAnnotation != null) { + timeout = + Duration.ofMillis( + timeoutAnnotation.unit.toMillis(timeoutAnnotation.value) + ) + } + } + if (timeout == null) { + timeout = + parseDuration( + System.getProperty(JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME) + ) + } + return timeout + } + } + } + + private val proxy: InvocationInterceptor? = + Proxy.newProxyInstance( + javaClass.classLoader, + arrayOf?>(InvocationInterceptor::class.java), + LoggingInvocationInterceptorHandler() + ) as InvocationInterceptor + + @Throws(Throwable::class) + override fun interceptAfterAllMethod( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, + extensionContext: ExtensionContext? + ) { + proxy!!.interceptAfterAllMethod(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptAfterEachMethod( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, + extensionContext: ExtensionContext? + ) { + proxy!!.interceptAfterEachMethod(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptBeforeAllMethod( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, + extensionContext: ExtensionContext? + ) { + proxy!!.interceptBeforeAllMethod(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptBeforeEachMethod( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, + extensionContext: ExtensionContext? + ) { + proxy!!.interceptBeforeEachMethod(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptDynamicTest( + invocation: InvocationInterceptor.Invocation?, + invocationContext: DynamicTestInvocationContext?, + extensionContext: ExtensionContext? + ) { + proxy!!.interceptDynamicTest(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptTestMethod( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, + extensionContext: ExtensionContext? + ) { + if (!Modifier.isPublic(invocationContext!!.executable!!.modifiers)) { + LOGGER!!.warn( + "Junit method {}.{} is not declared as public", + invocationContext.executable!!.declaringClass.canonicalName, + invocationContext.executable!!.name + ) + } + proxy!!.interceptTestMethod(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptTestTemplateMethod( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, + extensionContext: ExtensionContext? + ) { + proxy!!.interceptTestTemplateMethod(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptTestFactoryMethod( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, + extensionContext: ExtensionContext? + ): T? { + return proxy!!.interceptTestFactoryMethod(invocation, invocationContext, extensionContext) + } + + @Throws(Throwable::class) + override fun interceptTestClassConstructor( + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?>?, + extensionContext: ExtensionContext? + ): T? { + return proxy!!.interceptTestClassConstructor( + invocation, + invocationContext, + extensionContext + ) + } + + companion object { + private val LOGGER: Logger? = + LoggerFactory.getLogger(LoggingInvocationInterceptor::class.java) + private val JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME: String? = + "JunitMethodExecutionTimeout" + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.kt new file mode 100644 index 0000000000000..84c1ab31344dd --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.integrations.base.ssh + +import com.fasterxml.jackson.databind.JsonNode +import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.integrations.util.HostPortResolver +import io.airbyte.cdk.testutils.ContainerFactory +import io.airbyte.commons.json.Jsons +import java.io.IOException +import java.util.* +import java.util.function.Consumer +import org.apache.commons.lang3.tuple.ImmutablePair +import org.testcontainers.containers.Container +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.JdbcDatabaseContainer +import org.testcontainers.containers.Network +import org.testcontainers.images.builder.ImageFromDockerfile +import org.testcontainers.utility.DockerImageName + +class SshBastionContainer : AutoCloseable { + class SshBastionContainerFactory : ContainerFactory>() { + override fun createNewContainer(imageName: DockerImageName?): GenericContainer<*>? { + val container: GenericContainer<*> = + GenericContainer( + ImageFromDockerfile("bastion-test") + .withFileFromClasspath("Dockerfile", "bastion/Dockerfile") + ) + .withExposedPorts(22) + return container + } + + fun exclusive(network: Network): GenericContainer<*>? { + val imageModifier = Consumer { c: GenericContainer<*> -> c!!.withNetwork(network) } + val container = + super.exclusive( + "bastion-test", + NamedContainerModifierImpl("withNetwork", imageModifier) + ) + return container + } + } + + var container: GenericContainer<*>? = null + private set + + fun initAndStartBastion(network: Network) { + container = factory!!.exclusive(network) + container!!.start() + } + + @Throws(IOException::class, InterruptedException::class) + fun getTunnelMethod(tunnelMethod: SshTunnel.TunnelMethod?, innerAddress: Boolean): JsonNode? { + val containerAddress = + if (innerAddress) getInnerContainerAddress(container) + else getOuterContainerAddress(container) + return Jsons.jsonNode( + ImmutableMap.builder() + .put("tunnel_host", Objects.requireNonNull(containerAddress!!.left)) + .put("tunnel_method", tunnelMethod) + .put("tunnel_port", containerAddress.right) + .put("tunnel_user", SSH_USER) + .put( + "tunnel_user_password", + if (tunnelMethod == SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH) SSH_PASSWORD + else "" + ) + .put( + "ssh_key", + if (tunnelMethod == SshTunnel.TunnelMethod.SSH_KEY_AUTH) + container!!.execInContainer("cat", "var/bastion/id_rsa").stdout + else "" + ) + .build() + ) + } + + @Throws(IOException::class, InterruptedException::class) + fun getTunnelConfig( + tunnelMethod: SshTunnel.TunnelMethod?, + builderWithSchema: ImmutableMap.Builder?, + innerAddress: Boolean + ): JsonNode? { + return Jsons.jsonNode( + builderWithSchema!! + .put("tunnel_method", getTunnelMethod(tunnelMethod, innerAddress)) + .build() + ) + } + + fun getBasicDbConfigBuider(db: JdbcDatabaseContainer<*>?): ImmutableMap.Builder? { + return getBasicDbConfigBuider(db, db!!.databaseName) + } + + fun getBasicDbConfigBuider( + db: JdbcDatabaseContainer<*>?, + schemas: MutableList? + ): ImmutableMap.Builder? { + return getBasicDbConfigBuider(db, db!!.databaseName)!!.put("schemas", schemas) + } + + fun getBasicDbConfigBuider( + db: JdbcDatabaseContainer<*>?, + schemaName: String? + ): ImmutableMap.Builder? { + return ImmutableMap.builder() + .put("host", Objects.requireNonNull(HostPortResolver.resolveHost(db))) + .put("username", db!!.username) + .put("password", db.password) + .put("port", HostPortResolver.resolvePort(db)) + .put("database", schemaName) + .put("ssl", false) + } + + fun stopAndCloseContainers(db: JdbcDatabaseContainer<*>?) { + container!!.stop() + container!!.close() + db!!.stop() + db.close() + } + + fun stopAndClose() { + container!!.close() + } + + override fun close() { + stopAndClose() + } + + companion object { + private val factory: SshBastionContainerFactory? = SshBastionContainerFactory() + + private val SSH_USER: String? = "sshuser" + private val SSH_PASSWORD: String? = "secret" + + /** + * Returns the inner docker network ip address and port of a container. This can be used to + * reach a container from another container running on the same network + * + * @param container container + * @return a pair of host and port + */ + fun getInnerContainerAddress(container: Container<*>?): ImmutablePair? { + return ImmutablePair.of( + container!! + .containerInfo + .networkSettings + .networks + .entries + .stream() + .findFirst() + .get() + .value + .ipAddress, + container.exposedPorts.stream().findFirst().get() + ) + } + + /** + * Returns the outer docker network ip address and port of a container. This can be used to + * reach a container from the host machine + * + * @param container container + * @return a pair of host and port + */ + fun getOuterContainerAddress(container: Container<*>?): ImmutablePair? { + return ImmutablePair.of(container!!.host, container.firstMappedPort) + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/util/HostPortResolver.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/util/HostPortResolver.kt new file mode 100644 index 0000000000000..62f41b8229837 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/util/HostPortResolver.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.integrations.util + +import java.util.* +import org.testcontainers.containers.GenericContainer + +object HostPortResolver { + @JvmStatic + fun resolveHost(container: GenericContainer<*>?): String? { + return getIpAddress(container) + } + + @JvmStatic + fun resolvePort(container: GenericContainer<*>?): Int { + return container!!.exposedPorts.stream().findFirst().get() + } + + fun resolveIpAddress(container: GenericContainer<*>?): String? { + return getIpAddress(container) + } + + private fun getIpAddress(container: GenericContainer<*>?): String? { + return Objects.requireNonNull( + container!! + .containerInfo + .networkSettings + .networks + .entries + .stream() + .findFirst() + .get() + .value + .ipAddress + ) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt new file mode 100644 index 0000000000000..29c432119339d --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.testutils + +import com.google.common.collect.Lists +import io.airbyte.commons.logging.LoggingHelper +import io.airbyte.commons.logging.MdcScope +import java.lang.reflect.InvocationTargetException +import java.util.List +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Consumer +import java.util.function.Supplier +import java.util.stream.Stream +import kotlin.concurrent.Volatile +import org.apache.commons.lang3.StringUtils +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.output.OutputFrame +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.utility.DockerImageName + +/** + * ContainerFactory is the companion to [TestDatabase] and provides it with suitable testcontainer + * instances. + */ +abstract class ContainerFactory> { + @JvmRecord + private data class ContainerKey>( + val clazz: Class?>?, + val imageName: DockerImageName?, + val methods: MutableList> + ) + + private class ContainerOrException( + private val containerSupplier: Supplier> + ) { + @Volatile private lateinit var _exception: RuntimeException + + @Volatile private lateinit var _container: GenericContainer<*> + + fun container(): GenericContainer<*> { + if (!::_exception.isInitialized && !::_container.isInitialized) { + synchronized(this) { + if (!::_exception.isInitialized && !::_container.isInitialized) { + try { + _container = containerSupplier!!.get() + checkNotNull(_container) { + "testcontainer instance was not constructed" + } + } catch (e: RuntimeException) { + _exception = e + } + } + } + } + if (::_exception.isInitialized) { + throw _exception + } + return _container + } + } + + private fun getTestContainerLogMdcBuilder( + imageName: DockerImageName?, + containerModifiers: MutableList> + ): MdcScope.Builder { + return MdcScope.Builder() + .setLogPrefix( + "testcontainer %s (%s[%s]):".formatted( + containerId!!.incrementAndGet(), + imageName, + StringUtils.join(containerModifiers, ",") + ) + ) + .setPrefixColor(LoggingHelper.Color.RED_BACKGROUND) + } + + /** + * Creates a new, unshared testcontainer instance. This usually wraps the default constructor + * for the testcontainer type. + */ + protected abstract fun createNewContainer(imageName: DockerImageName?): C? + + /** + * Returns a shared instance of the testcontainer. + * + * @Deprecated use shared(String, NamedContainerModifier) instead + */ + @Deprecated("") + fun shared(imageName: String, vararg methods: String): C { + return shared( + imageName, + Stream.of(*methods) + .map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } + .toList() + ) + } + + fun shared(imageName: String, vararg namedContainerModifiers: NamedContainerModifier): C { + return shared(imageName, List.of(*namedContainerModifiers)) + } + + @JvmOverloads + fun shared( + imageName: String, + namedContainerModifiers: MutableList> = ArrayList() + ): C { + val containerKey = + ContainerKey(javaClass, DockerImageName.parse(imageName), namedContainerModifiers) + // We deliberately avoid creating the container itself eagerly during the evaluation of the + // map + // value. + // Container creation can be exceedingly slow. + // Furthermore, we need to handle exceptions raised during container creation. + val containerOrError = + SHARED_CONTAINERS!!.computeIfAbsent(containerKey) { key: ContainerKey<*>? -> + ContainerOrException { + createAndStartContainer(key!!.imageName, (key as ContainerKey)!!.methods) + } + } + // Instead, the container creation (if applicable) is deferred to here. + return containerOrError!!.container() as C + } + + /** + * Returns an exclusive instance of the testcontainer. + * + * @Deprecated use exclusive(String, NamedContainerModifier) instead + */ + @Deprecated("") + fun exclusive(imageName: String, vararg methods: String): C { + return exclusive( + imageName, + Stream.of(*methods) + .map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } + .toList() + ) + } + + fun exclusive(imageName: String, vararg namedContainerModifiers: NamedContainerModifier): C { + return exclusive(imageName, List.of(*namedContainerModifiers)) + } + + @JvmOverloads + fun exclusive( + imageName: String, + namedContainerModifiers: MutableList> = ArrayList() + ): C { + return createAndStartContainer(DockerImageName.parse(imageName), namedContainerModifiers) + } + + interface NamedContainerModifier> { + fun name(): String + + fun modifier(): Consumer + } + + class NamedContainerModifierImpl>(name: String, method: Consumer) : + NamedContainerModifier { + override fun name(): String { + return name + } + + override fun modifier(): Consumer { + return method + } + + val name: String + val method: Consumer + + init { + this.name = name + this.method = method + } + } + + private fun resolveModifierByName(methodName: String?): Consumer { + val self: ContainerFactory = this + val resolvedMethod = Consumer { c: C -> + try { + val containerClass: Class> = c.javaClass + val method = self.javaClass.getMethod(methodName, containerClass) + method.invoke(self, c) + } catch (e: NoSuchMethodException) { + throw RuntimeException(e) + } catch (e: IllegalAccessException) { + throw RuntimeException(e) + } catch (e: InvocationTargetException) { + throw RuntimeException(e) + } + } + return resolvedMethod + } + + private fun createAndStartContainer( + imageName: DockerImageName?, + namedContainerModifiers: MutableList> + ): C { + LOGGER!!.info( + "Creating new container based on {} with {}.", + imageName, + Lists.transform(namedContainerModifiers) { c: NamedContainerModifier -> c!!.name() } + ) + val container = createNewContainer(imageName) + val logConsumer: Slf4jLogConsumer = + object : Slf4jLogConsumer(LOGGER) { + override fun accept(frame: OutputFrame?) { + if (frame!!.utf8StringWithoutLineEnding.trim { it <= ' ' }.length > 0) { + super.accept(frame) + } + } + } + getTestContainerLogMdcBuilder(imageName, namedContainerModifiers)!!.produceMappings { + key: String?, + value: String? -> + logConsumer.withMdc(key, value) + } + container!!.withLogConsumer(logConsumer) + for (resolvedNamedContainerModifier in namedContainerModifiers!!) { + LOGGER.info( + "Calling {} in {} on new container based on {}.", + resolvedNamedContainerModifier!!.name(), + javaClass.name, + imageName + ) + resolvedNamedContainerModifier.modifier()!!.accept(container) + } + container.start() + return container + } + + companion object { + private val LOGGER: Logger? = LoggerFactory.getLogger(ContainerFactory::class.java) + + private val SHARED_CONTAINERS: ConcurrentMap?, ContainerOrException?>? = + ConcurrentHashMap() + private val containerId: AtomicInteger? = AtomicInteger(0) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/DatabaseConnectionHelper.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/DatabaseConnectionHelper.kt new file mode 100644 index 0000000000000..72be607a26e01 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/DatabaseConnectionHelper.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.testutils + +import io.airbyte.cdk.db.factory.DSLContextFactory +import io.airbyte.cdk.db.factory.DataSourceFactory +import javax.sql.DataSource +import org.jooq.DSLContext +import org.jooq.SQLDialect +import org.testcontainers.containers.JdbcDatabaseContainer + +/** + * Helper class that facilitates the creation of database connection objects for testing purposes. + */ +object DatabaseConnectionHelper { + /** + * Constructs a new [DataSource] using the provided configuration. + * + * @param container A JDBC Test Container instance. + * @return The configured [DataSource]. + */ + @JvmStatic + fun createDataSource(container: JdbcDatabaseContainer<*>?): DataSource? { + return DataSourceFactory.create( + container!!.username, + container.password, + container.driverClassName, + container.jdbcUrl + ) + } + + /** + * Constructs a configured [DSLContext] instance using the provided configuration. + * + * @param container A JDBC Test Container instance. + * @param dialect The SQL dialect to use with objects created from this context. + * @return The configured [DSLContext]. + */ + @JvmStatic + fun createDslContext(container: JdbcDatabaseContainer<*>?, dialect: SQLDialect?): DSLContext? { + return DSLContextFactory.create( + container!!.username, + container.password, + container.driverClassName, + container.jdbcUrl, + dialect + ) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/NonContainer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/NonContainer.kt new file mode 100644 index 0000000000000..2dffc3b41e47b --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/NonContainer.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.testutils + +import org.testcontainers.containers.JdbcDatabaseContainer + +/** + * This is used when a source (such as Snowflake) relies on an always-on resource and therefore + * doesn't need an actual container. compatible + */ +class NonContainer( + private val username: String?, + private val password: String?, + private val jdbcUrl: String?, + private val driverClassName: String?, + dockerImageName: String? +) : JdbcDatabaseContainer(dockerImageName!!) { + override fun getDriverClassName(): String? { + return driverClassName + } + + override fun getJdbcUrl(): String? { + return jdbcUrl + } + + override fun getUsername(): String? { + return username + } + + override fun getPassword(): String? { + return password + } + + override fun getTestQueryString(): String? { + return "SELECT 1" + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt new file mode 100644 index 0000000000000..f376bb411c387 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.testutils + +import com.fasterxml.jackson.databind.JsonNode +import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.db.ContextQueryFunction +import io.airbyte.cdk.db.Database +import io.airbyte.cdk.db.factory.DSLContextFactory +import io.airbyte.cdk.db.factory.DataSourceFactory +import io.airbyte.cdk.db.factory.DatabaseDriver +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.JdbcConnector +import io.airbyte.cdk.integrations.util.HostPortResolver +import io.airbyte.commons.json.Jsons +import io.airbyte.commons.string.Strings +import java.io.IOException +import java.io.UncheckedIOException +import java.sql.SQLException +import java.text.DateFormat +import java.text.SimpleDateFormat +import java.time.Duration +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Stream +import javax.sql.DataSource +import kotlin.concurrent.Volatile +import org.jooq.DSLContext +import org.jooq.SQLDialect +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.testcontainers.containers.JdbcDatabaseContainer + +/** + * TestDatabase provides a convenient pattern for interacting with databases when testing SQL + * database sources. The basic idea is to share the same database testcontainer instance for all + * tests and to use SQL constructs such as DATABASE and USER to isolate each test case's state. + * + * @param the type of the backing testcontainer. + * @param itself + * @param the type of the object returned by [.configBuilder] + */ +abstract class TestDatabase< + C : JdbcDatabaseContainer<*>, T : TestDatabase, B : TestDatabase.ConfigBuilder> +protected constructor(@JvmField val container: C) : AutoCloseable { + private val suffix: String = Strings.addRandomSuffix("", "_", 10) + private val cleanupSQL: ArrayList = ArrayList() + private val connectionProperties: MutableMap = HashMap() + + @Volatile private var dataSource: DataSource? = null + + @Volatile private var dslContext: DSLContext? = null + + protected val databaseId: Int + protected val containerId: Int + private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") + + init { + this.databaseId = nextDatabaseId!!.getAndIncrement() + this.containerId = + containerUidToId!!.computeIfAbsent(container!!.containerId) { k: String? -> + nextContainerId!!.getAndIncrement() + }!! + LOGGER!!.info(formatLogLine("creating database " + databaseName)) + } + + protected fun formatLogLine(logLine: String?): String? { + val retVal = "TestDatabase databaseId=$databaseId, containerId=$containerId - $logLine" + return retVal + } + + protected fun self(): T { + return this as T + } + + /** Adds a key-value pair to the JDBC URL's query parameters. */ + fun withConnectionProperty(key: String, value: String): T { + if (this.isInitialized) { + throw RuntimeException("TestDatabase instance is already initialized") + } + connectionProperties!![key] = value + return self() + } + + /** Enqueues a SQL statement to be executed when this object is closed. */ + fun onClose(fmtSql: String, vararg fmtArgs: Any?): T { + cleanupSQL!!.add(String.format(fmtSql!!, *fmtArgs)) + return self() + } + + /** Executes a SQL statement after calling String.format on the arguments. */ + fun with(fmtSql: String, vararg fmtArgs: Any?): T { + execSQL(Stream.of(String.format(fmtSql!!, *fmtArgs))) + return self() + } + + /** + * Executes SQL statements as root to provide the necessary isolation for the lifetime of this + * object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes + * the [DataSource] and [DSLContext] owned by this object. + */ + fun initialized(): T? { + inContainerBootstrapCmd()!!.forEach { cmds: Stream? -> this.execInContainer(cmds) } + this.dataSource = + DataSourceFactory.create( + userName, + password, + databaseDriver!!.driverClassName, + jdbcUrl, + connectionProperties, + JdbcConnector.getConnectionTimeout( + connectionProperties, + databaseDriver!!.driverClassName + ) + ) + this.dslContext = DSLContextFactory.create(dataSource, sqlDialect) + return self() + } + + val isInitialized: Boolean + get() = dslContext != null + + protected abstract fun inContainerBootstrapCmd(): Stream?>? + + protected abstract fun inContainerUndoBootstrapCmd(): Stream? + + abstract val databaseDriver: DatabaseDriver? + + abstract val sqlDialect: SQLDialect? + + fun withNamespace(name: String?): String? { + return name + suffix + } + + val databaseName: String? + get() = withNamespace("db") + + val userName: String? + get() = withNamespace("user") + + val password: String? + get() = "password" + + fun getDataSource(): DataSource? { + if (!this.isInitialized) { + throw RuntimeException("TestDatabase instance is not yet initialized") + } + return dataSource + } + + fun getDslContext(): DSLContext? { + if (!this.isInitialized) { + throw RuntimeException("TestDatabase instance is not yet initialized") + } + return dslContext + } + + val jdbcUrl: String? + get() = + String.format( + databaseDriver!!.urlFormatString, + container!!.host, + container.firstMappedPort, + databaseName + ) + + val database: Database? + get() = Database(getDslContext()) + + protected fun execSQL(sql: Stream) { + try { + database!!.query { ctx: DSLContext? -> + sql!!.forEach { statement: String? -> + LOGGER!!.info("executing SQL statement {}", statement) + ctx!!.execute(statement) + } + null + } + } catch (e: SQLException) { + throw RuntimeException(e) + } + } + + protected fun execInContainer(cmds: Stream?) { + val cmd = cmds!!.toList() + if (cmd!!.isEmpty()) { + return + } + try { + LOGGER!!.info( + formatLogLine(String.format("executing command %s", Strings.join(cmd, " "))) + ) + val exec = container!!.execInContainer(*cmd.toTypedArray()) + if (exec!!.exitCode == 0) { + LOGGER.info( + formatLogLine( + String.format( + "execution success\nstdout:\n%s\nstderr:\n%s", + exec.stdout, + exec.stderr + ) + ) + ) + } else { + LOGGER.error( + formatLogLine( + String.format( + "execution failure, code %s\nstdout:\n%s\nstderr:\n%s", + exec.exitCode, + exec.stdout, + exec.stderr + ) + ) + ) + } + } catch (e: IOException) { + throw UncheckedIOException(e) + } catch (e: InterruptedException) { + throw RuntimeException(e) + } + } + + @Throws(SQLException::class) + fun query(transform: ContextQueryFunction?): X? { + return database!!.query(transform) + } + + @Throws(SQLException::class) + fun transaction(transform: ContextQueryFunction?): X? { + return database!!.transaction(transform) + } + + /** Returns a builder for the connector config object. */ + open fun configBuilder(): B { + return ConfigBuilder(self()).self() + } + + fun testConfigBuilder(): B { + return configBuilder().withHostAndPort().withCredentials().withDatabase() + } + + fun integrationTestConfigBuilder(): B? { + return configBuilder().withResolvedHostAndPort().withCredentials().withDatabase() + } + + override fun close() { + execSQL(cleanupSQL!!.stream()) + execInContainer(inContainerUndoBootstrapCmd()) + LOGGER!!.info("closing database databaseId=$databaseId") + } + + open class ConfigBuilder, B : ConfigBuilder>( + protected val testDatabase: T + ) { + protected val builder: ImmutableMap.Builder = ImmutableMap.builder() + + fun build(): JsonNode { + return Jsons.jsonNode(builder!!.build()) + } + + fun self(): B { + return this as B + } + + fun with(key: Any?, value: Any?): B { + builder!!.put(key, value) + return self() + } + + fun withDatabase(): B { + return this.with(JdbcUtils.DATABASE_KEY, testDatabase!!.databaseName) + } + + fun withCredentials(): B { + return this.with(JdbcUtils.USERNAME_KEY, testDatabase!!.userName) + .with(JdbcUtils.PASSWORD_KEY, testDatabase.password) + } + + fun withResolvedHostAndPort(): B { + return this.with( + JdbcUtils.HOST_KEY, + HostPortResolver.resolveHost(testDatabase!!.container) + ) + .with(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(testDatabase.container)) + } + + fun withHostAndPort(): B { + return this.with(JdbcUtils.HOST_KEY, testDatabase!!.container!!.host) + .with(JdbcUtils.PORT_KEY, testDatabase.container!!.firstMappedPort) + } + + fun withoutSsl(): B { + return with(JdbcUtils.SSL_KEY, false) + } + + fun withSsl(sslMode: MutableMap?): B { + return with(JdbcUtils.SSL_KEY, true)!!.with(JdbcUtils.SSL_MODE_KEY, sslMode) + } + + companion object { + val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration? = Duration.ofSeconds(5) + } + } + + companion object { + private val LOGGER: Logger? = LoggerFactory.getLogger(TestDatabase::class.java) + + private val nextDatabaseId: AtomicInteger? = AtomicInteger(0) + + private val nextContainerId: AtomicInteger? = AtomicInteger(0) + private val containerUidToId: MutableMap? = ConcurrentHashMap() + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java index 5d4dcb3e68d05..ea21487b38e85 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java @@ -148,8 +148,8 @@ protected Stream> inContainerBootstrapCmd() { String.format("ALTER USER %s WITH SUPERUSER", getUserName())); return Stream.of(Stream.concat( Stream.of("psql", - "-d", getContainer().getDatabaseName(), - "-U", getContainer().getUsername(), + "-d", container.getDatabaseName(), + "-U", container.getUsername(), "-v", "ON_ERROR_STOP=1", "-a"), sql.flatMap(stmt -> Stream.of("-c", stmt))));