Skip to content

Rollout ctid cdc #28708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,7 @@ public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoClose
return new CompositeIterator<>(iterators, airbyteStreamStatusConsumer);
}

public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoCloseableIterator<T>> iterators) {
return concatWithEagerClose(iterators, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.1.2
LABEL io.airbyte.version=3.1.3
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerImageTag: 3.1.2
dockerImageTag: 3.1.3
dockerRepository: airbyte/source-alloydb-strict-encrypt
githubIssueLabel: source-alloydb
icon: alloydb.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.1.2
LABEL io.airbyte.version=3.1.3
LABEL io.airbyte.name=airbyte/source-alloydb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerImageTag: 3.1.2
dockerImageTag: 3.1.3
dockerRepository: airbyte/source-alloydb
githubIssueLabel: source-alloydb
icon: alloydb.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.1.2
LABEL io.airbyte.version=3.1.3
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
maxSecondsBetweenMessages: 7200
dockerImageTag: 3.1.2
dockerImageTag: 3.1.3
dockerRepository: airbyte/source-postgres-strict-encrypt
githubIssueLabel: source-postgres
icon: postgresql.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.1.2
LABEL io.airbyte.version=3.1.3
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.1.2
dockerImageTag: 3.1.3
maxSecondsBetweenMessages: 7200
dockerRepository: airbyte/source-postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcProperties;
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcSavedInfoFetcher;
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcStateHandler;
import io.airbyte.integrations.source.postgres.ctid.CtidFeatureFlags;
import io.airbyte.integrations.source.postgres.ctid.CtidPerStreamStateManager;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
Expand Down Expand Up @@ -413,82 +412,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
final CtidFeatureFlags ctidFeatureFlags = new CtidFeatureFlags(sourceConfig);
if (PostgresUtils.isCdc(sourceConfig) && shouldUseCDC(catalog)) {
if (ctidFeatureFlags.isCdcSyncEnabled()) {
LOGGER.info("Using ctid + CDC");
return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
getReplicationSlot(database, sourceConfig).get(0));
}
final Duration firstRecordWaitTime = PostgresUtils.getFirstRecordWaitTime(sourceConfig);
final OptionalInt queueSize = OptionalInt.of(PostgresUtils.getQueueSize(sourceConfig));
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
LOGGER.info("Queue size: {}", queueSize.getAsInt());

final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil();
final JsonNode state =
(stateManager.getCdcStateManager().getCdcState() == null ||
stateManager.getCdcStateManager().getCdcState().getState() == null) ? null
: Jsons.clone(stateManager.getCdcStateManager().getCdcState().getState());

final OptionalLong savedOffset = postgresDebeziumStateUtil.savedOffset(
Jsons.clone(PostgresCdcProperties.getDebeziumDefaultProperties(database)),
catalog,
state,
sourceConfig);

// We should always be able to extract offset out of state if it's not null
if (state != null && savedOffset.isEmpty()) {
throw new RuntimeException(
"Unable extract the offset out of state, State mutation might not be working. " + state.asText());
}

final boolean savedOffsetAfterReplicationSlotLSN = postgresDebeziumStateUtil.isSavedOffsetAfterReplicationSlotLSN(
// We can assume that there will be only 1 replication slot cause before the sync starts for
// Postgres CDC,
// we run all the check operations and one of the check validates that the replication slot exists
// and has only 1 entry
getReplicationSlot(database, sourceConfig).get(0),
savedOffset);

if (!savedOffsetAfterReplicationSlotLSN) {
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
} else if (PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
postgresDebeziumStateUtil.commitLSNToPostgresDatabase(database.getDatabaseConfig(),
savedOffset,
sourceConfig.get("replication_method").get("replication_slot").asText(),
sourceConfig.get("replication_method").get("publication").asText(),
PostgresUtils.getPluginValue(sourceConfig.get("replication_method")));
}

final AirbyteDebeziumHandler<Long> handler = new AirbyteDebeziumHandler<>(sourceConfig,
PostgresCdcTargetPosition.targetPosition(database),
false,
firstRecordWaitTime,
queueSize);
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier =
() -> handler.getIncrementalIterators(catalog,
new PostgresCdcSavedInfoFetcher(
savedOffsetAfterReplicationSlotLSN ? stateManager.getCdcStateManager().getCdcState()
: null),
postgresCdcStateHandler,
new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getDebeziumDefaultProperties(database),
emittedAt,
false);
if (!savedOffsetAfterReplicationSlotLSN || streamsToSnapshot.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
}

final AutoCloseableIterator<AirbyteMessage> snapshotIterator = handler.getSnapshotIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot), new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getSnapshotProperties(database), postgresCdcStateHandler, emittedAt);
return Collections.singletonList(
AutoCloseableIterators.concatWithEagerClose(AirbyteTraceMessageUtility::emitStreamStatusTrace, snapshotIterator,
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));

LOGGER.info("Using ctid + CDC");
return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
getReplicationSlot(database, sourceConfig).get(0));
}

if (isAnyStreamIncrementalSyncMode(catalog) && PostgresUtils.isXmin(sourceConfig)) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ protected void initTests() {
.addExpectedValues("(\"fuzzy dice\",42,1.99)", null)
.build());

addHstoreTest();
addTimeWithTimeZoneTest();
addArraysTestData();
addMoneyTest();
}

protected void addHstoreTest() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("hstore")
Expand All @@ -602,10 +609,6 @@ protected void initTests() {
{"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""",
null)
.build());

addTimeWithTimeZoneTest();
addArraysTestData();
addMoneyTest();
}

protected void addMoneyTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,39 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.List;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class CdcInitialSnapshotPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest {

private static final String SCHEMA_NAME = "test";
private static final String SLOT_NAME_BASE = "debezium_slot";
private static final String PUBLICATION = "publication";
private static final int INITIAL_WAITING_SECONDS = 30;

@SystemStub
private EnvironmentVariables environmentVariables;

@Override
protected Database setupDatabase() throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new PostgreSQLContainer<>("postgres:14-alpine")
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"),
"/etc/postgresql/postgresql.conf")
Expand Down Expand Up @@ -55,7 +66,6 @@ protected Database setupDatabase() throws Exception {
.put("replication_method", replicationMethod)
.put("is_test", true)
.put(JdbcUtils.SSL_KEY, false)
.put("snapshot_mode", "initial_only")
.build());

dslContext = DSLContextFactory.create(
Expand Down Expand Up @@ -99,4 +109,21 @@ public boolean testCatalog() {
return true;
}

@Override
protected void addHstoreTest() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("hstore")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("""
'"paperback" => "243","publisher" => "postgresqltutorial.com",
"language" => "English","ISBN-13" => "978-1449370000",
"weight" => "11.2 ounces"'
""", null)
.addExpectedValues(
//
"\"weight\"=>\"11.2 ounces\", \"ISBN-13\"=>\"978-1449370000\", \"language\"=>\"English\", \"paperback\"=>\"243\", \"publisher\"=>\"postgresqltutorial.com\"",
null)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
Expand All @@ -31,13 +32,19 @@
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

// todo (cgardens) - Sanity check that when configured for CDC that postgres performs like any other
// incremental source. As we have more sources support CDC we will find a more reusable way of doing
// this, but for now this is a solid sanity check.
@ExtendWith(SystemStubsExtension.class)
public class CdcPostgresSourceAcceptanceTest extends AbstractPostgresSourceAcceptanceTest {

protected static final String SLOT_NAME_BASE = "debezium_slot";
Expand All @@ -50,6 +57,13 @@ public class CdcPostgresSourceAcceptanceTest extends AbstractPostgresSourceAccep
protected PostgreSQLContainer<?> container;
protected JsonNode config;

@SystemStub
private EnvironmentVariables environmentVariables;

@BeforeEach
void setup() {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
}
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new PostgreSQLContainer<>("postgres:13-alpine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
Expand All @@ -19,12 +20,18 @@
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest {

private static final String SCHEMA_NAME = "test";
Expand All @@ -33,6 +40,9 @@ public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSource
private static final int INITIAL_WAITING_SECONDS = 30;
private JsonNode stateAfterFirstSync;

@SystemStub
private EnvironmentVariables environmentVariables;

@Override
protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog configuredCatalog) throws Exception {
if (stateAfterFirstSync == null) {
Expand All @@ -57,14 +67,11 @@ protected void postSetup() throws Exception {
catalog.getStreams().add(dummyTableWithData);

final List<AirbyteMessage> allMessages = super.runRead(catalog);
if (allMessages.size() != 2) {
throw new RuntimeException("First sync should only generate 2 records");
}
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(allMessages);
if (stateAfterFirstBatch == null || stateAfterFirstBatch.isEmpty()) {
throw new RuntimeException("stateAfterFirstBatch should not be null or empty");
}
stateAfterFirstSync = Jsons.jsonNode(stateAfterFirstBatch);
stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1)));
if (stateAfterFirstSync == null) {
throw new RuntimeException("stateAfterFirstSync should not be null");
}
Expand All @@ -78,7 +85,7 @@ protected void postSetup() throws Exception {

@Override
protected Database setupDatabase() throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new PostgreSQLContainer<>("postgres:14-alpine")
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"),
"/etc/postgresql/postgresql.conf")
Expand Down
Loading