Skip to content

source-postgres: enable ctid+cdc implementation #28044

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,80 +416,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final JsonNode sourceConfig = database.getSourceConfig();
final CtidFeatureFlags ctidFeatureFlags = new CtidFeatureFlags(sourceConfig);
if (PostgresUtils.isCdc(sourceConfig) && shouldUseCDC(catalog)) {
if (sourceConfig.has("cdc_via_ctid") && sourceConfig.get("cdc_via_ctid").asBoolean()) {
LOGGER.info("Using ctid + CDC");
return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
getReplicationSlot(database, sourceConfig).get(0));
}
final Duration firstRecordWaitTime = PostgresUtils.getFirstRecordWaitTime(sourceConfig);
final OptionalInt queueSize = OptionalInt.of(PostgresUtils.getQueueSize(sourceConfig));
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
LOGGER.info("Queue size: {}", queueSize.getAsInt());

final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil();
final JsonNode state =
(stateManager.getCdcStateManager().getCdcState() == null ||
stateManager.getCdcStateManager().getCdcState().getState() == null) ? null
: Jsons.clone(stateManager.getCdcStateManager().getCdcState().getState());

final OptionalLong savedOffset = postgresDebeziumStateUtil.savedOffset(
Jsons.clone(PostgresCdcProperties.getDebeziumDefaultProperties(database)),
catalog,
state,
sourceConfig);

// We should always be able to extract offset out of state if it's not null
if (state != null && savedOffset.isEmpty()) {
throw new RuntimeException(
"Unable extract the offset out of state, State mutation might not be working. " + state.asText());
}

final boolean savedOffsetAfterReplicationSlotLSN = postgresDebeziumStateUtil.isSavedOffsetAfterReplicationSlotLSN(
// We can assume that there will be only 1 replication slot cause before the sync starts for
// Postgres CDC,
// we run all the check operations and one of the check validates that the replication slot exists
// and has only 1 entry
getReplicationSlot(database, sourceConfig).get(0),
savedOffset);

if (!savedOffsetAfterReplicationSlotLSN) {
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
} else if (PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
postgresDebeziumStateUtil.commitLSNToPostgresDatabase(database.getDatabaseConfig(),
savedOffset,
sourceConfig.get("replication_method").get("replication_slot").asText(),
sourceConfig.get("replication_method").get("publication").asText(),
PostgresUtils.getPluginValue(sourceConfig.get("replication_method")));
}

final AirbyteDebeziumHandler<Long> handler = new AirbyteDebeziumHandler<>(sourceConfig,
PostgresCdcTargetPosition.targetPosition(database),
false,
firstRecordWaitTime,
queueSize);
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier =
() -> handler.getIncrementalIterators(catalog,
new PostgresCdcSavedInfoFetcher(
savedOffsetAfterReplicationSlotLSN ? stateManager.getCdcStateManager().getCdcState()
: null),
postgresCdcStateHandler,
new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getDebeziumDefaultProperties(database),
emittedAt,
false);
if (!savedOffsetAfterReplicationSlotLSN || streamsToSnapshot.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
}

final AutoCloseableIterator<AirbyteMessage> snapshotIterator = handler.getSnapshotIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot), new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getSnapshotProperties(database), postgresCdcStateHandler, emittedAt);
return Collections.singletonList(
AutoCloseableIterators.concatWithEagerClose(AirbyteTraceMessageUtility::emitStreamStatusTrace, snapshotIterator,
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));

LOGGER.info("Using ctid + CDC");
return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
getReplicationSlot(database, sourceConfig).get(0));
}

if (isIncrementalSyncMode(catalog) && PostgresUtils.isXmin(sourceConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ private void putDoubleArray(final ObjectNode node, final String columnName, fina
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getDouble(colIndex), Double::isFinite));
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getDouble(2), Double::isFinite));
}
node.set(columnName, arrayNode);
}
Expand All @@ -370,7 +370,7 @@ private void putMoneyArray(final ObjectNode node, final String columnName, final
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final String moneyValue = parseMoneyValue(arrayResultSet.getString(colIndex));
final String moneyValue = parseMoneyValue(arrayResultSet.getString(2));
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)));
}
node.set(columnName, arrayNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,13 @@
// One for each type: CDC and standard cursor based
public class CtidFeatureFlags {

public static final String CDC_VIA_CTID = "cdc_via_ctid";
public static final String CURSOR_VIA_CTID = "cursor_via_ctid";
private final JsonNode sourceConfig;

public CtidFeatureFlags(final JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}

public boolean isCdcSyncEnabled() {
return getFlagValue(CDC_VIA_CTID);
}

public boolean isCursorSyncEnabled() {
return getFlagValue(CURSOR_VIA_CTID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ protected void initTests() {
.addExpectedValues("(\"fuzzy dice\",42,1.99)", null)
.build());

addHstoreTest();
addTimeWithTimeZoneTest();
addArraysTestData();
addMoneyTest();
}

protected void addHstoreTest() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("hstore")
Expand All @@ -602,10 +609,6 @@ protected void initTests() {
{"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""",
null)
.build());

addTimeWithTimeZoneTest();
addArraysTestData();
addMoneyTest();
}

protected void addMoneyTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,39 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.List;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class CdcInitialSnapshotPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest {

private static final String SCHEMA_NAME = "test";
private static final String SLOT_NAME_BASE = "debezium_slot";
private static final String PUBLICATION = "publication";
private static final int INITIAL_WAITING_SECONDS = 30;

@SystemStub
private EnvironmentVariables environmentVariables;

@Override
protected Database setupDatabase() throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new PostgreSQLContainer<>("postgres:14-alpine")
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"),
"/etc/postgresql/postgresql.conf")
Expand Down Expand Up @@ -55,7 +66,6 @@ protected Database setupDatabase() throws Exception {
.put("replication_method", replicationMethod)
.put("is_test", true)
.put(JdbcUtils.SSL_KEY, false)
.put("snapshot_mode", "initial_only")
.build());

dslContext = DSLContextFactory.create(
Expand Down Expand Up @@ -99,4 +109,21 @@ public boolean testCatalog() {
return true;
}

@Override
protected void addHstoreTest() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("hstore")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("""
'"paperback" => "243","publisher" => "postgresqltutorial.com",
"language" => "English","ISBN-13" => "978-1449370000",
"weight" => "11.2 ounces"'
""", null)
.addExpectedValues(
//
"\"weight\"=>\"11.2 ounces\", \"ISBN-13\"=>\"978-1449370000\", \"language\"=>\"English\", \"paperback\"=>\"243\", \"publisher\"=>\"postgresqltutorial.com\"",
null)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
Expand All @@ -19,12 +20,18 @@
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;

@ExtendWith(SystemStubsExtension.class)
public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest {

private static final String SCHEMA_NAME = "test";
Expand All @@ -33,6 +40,9 @@ public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSource
private static final int INITIAL_WAITING_SECONDS = 30;
private JsonNode stateAfterFirstSync;

@SystemStub
private EnvironmentVariables environmentVariables;

@Override
protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog configuredCatalog) throws Exception {
if (stateAfterFirstSync == null) {
Expand All @@ -57,14 +67,11 @@ protected void postSetup() throws Exception {
catalog.getStreams().add(dummyTableWithData);

final List<AirbyteMessage> allMessages = super.runRead(catalog);
if (allMessages.size() != 2) {
throw new RuntimeException("First sync should only generate 2 records");
}
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(allMessages);
if (stateAfterFirstBatch == null || stateAfterFirstBatch.isEmpty()) {
throw new RuntimeException("stateAfterFirstBatch should not be null or empty");
}
stateAfterFirstSync = Jsons.jsonNode(stateAfterFirstBatch);
stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1)));
if (stateAfterFirstSync == null) {
throw new RuntimeException("stateAfterFirstSync should not be null");
}
Expand All @@ -78,7 +85,7 @@ protected void postSetup() throws Exception {

@Override
protected Database setupDatabase() throws Exception {

environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new PostgreSQLContainer<>("postgres:14-alpine")
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"),
"/etc/postgresql/postgresql.conf")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,6 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
}
}

private JsonNode getConfig(final String username, final String password, final List<String> schemas) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
.build());
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(container))
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.SCHEMAS_KEY, Jsons.jsonNode(schemas))
.put(JdbcUtils.USERNAME_KEY, username)
.put(JdbcUtils.PASSWORD_KEY, password)
.put(JdbcUtils.SSL_KEY, false)
.put("replication_method", replicationMethod)
.build());
}

private JsonNode getXminConfig(final String username, final String password, final List<String> schemas) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Xmin")
Expand Down Expand Up @@ -133,40 +117,6 @@ protected boolean supportsPerStream() {
return true;
}

private ConfiguredAirbyteCatalog getCommonConfigCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

private ConfiguredAirbyteCatalog getXminCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
Expand Down
Loading