Skip to content

Commit 2d2cddd

Browse files
Rollout ctid cdc (#28708)
* source-postgres: enable ctid+cdc implementation * 100% ctid rollout for cdc * remove CtidFeatureFlags * fix CdcPostgresSourceAcceptanceTest * Bump versions and release notes * Fix compilation error due to previous merge --------- Co-authored-by: subodh <[email protected]>
1 parent d7f6bcb commit 2d2cddd

File tree

20 files changed

+340
-497
lines changed

20 files changed

+340
-497
lines changed

airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,7 @@ public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoClose
205205
return new CompositeIterator<>(iterators, airbyteStreamStatusConsumer);
206206
}
207207

208+
public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoCloseableIterator<T>> iterators) {
209+
return concatWithEagerClose(iterators, null);
210+
}
208211
}

airbyte-integrations/connectors/source-alloydb-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-alloydb-strict-encrypt
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.2
27+
LABEL io.airbyte.version=3.1.3
2828
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt

airbyte-integrations/connectors/source-alloydb-strict-encrypt/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: database
1212
connectorType: source
1313
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
14-
dockerImageTag: 3.1.2
14+
dockerImageTag: 3.1.3
1515
dockerRepository: airbyte/source-alloydb-strict-encrypt
1616
githubIssueLabel: source-alloydb
1717
icon: alloydb.svg

airbyte-integrations/connectors/source-alloydb/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-alloydb
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.2
27+
LABEL io.airbyte.version=3.1.3
2828
LABEL io.airbyte.name=airbyte/source-alloydb

airbyte-integrations/connectors/source-alloydb/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ data:
66
connectorSubtype: database
77
connectorType: source
88
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
9-
dockerImageTag: 3.1.2
9+
dockerImageTag: 3.1.3
1010
dockerRepository: airbyte/source-alloydb
1111
githubIssueLabel: source-alloydb
1212
icon: alloydb.svg

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.2
27+
LABEL io.airbyte.version=3.1.3
2828
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres-strict-encrypt/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ data:
1212
connectorType: source
1313
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
1414
maxSecondsBetweenMessages: 7200
15-
dockerImageTag: 3.1.2
15+
dockerImageTag: 3.1.3
1616
dockerRepository: airbyte/source-postgres-strict-encrypt
1717
githubIssueLabel: source-postgres
1818
icon: postgresql.svg

airbyte-integrations/connectors/source-postgres/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-postgres
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.2
27+
LABEL io.airbyte.version=3.1.3
2828
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ data:
66
connectorSubtype: database
77
connectorType: source
88
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
9-
dockerImageTag: 3.1.2
9+
dockerImageTag: 3.1.3
1010
maxSecondsBetweenMessages: 7200
1111
dockerRepository: airbyte/source-postgres
1212
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

Lines changed: 3 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcProperties;
7272
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcSavedInfoFetcher;
7373
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcStateHandler;
74-
import io.airbyte.integrations.source.postgres.ctid.CtidFeatureFlags;
7574
import io.airbyte.integrations.source.postgres.ctid.CtidPerStreamStateManager;
7675
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
7776
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
@@ -413,82 +412,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
413412
final StateManager stateManager,
414413
final Instant emittedAt) {
415414
final JsonNode sourceConfig = database.getSourceConfig();
416-
final CtidFeatureFlags ctidFeatureFlags = new CtidFeatureFlags(sourceConfig);
417415
if (PostgresUtils.isCdc(sourceConfig) && shouldUseCDC(catalog)) {
418-
if (ctidFeatureFlags.isCdcSyncEnabled()) {
419-
LOGGER.info("Using ctid + CDC");
420-
return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
421-
getReplicationSlot(database, sourceConfig).get(0));
422-
}
423-
final Duration firstRecordWaitTime = PostgresUtils.getFirstRecordWaitTime(sourceConfig);
424-
final OptionalInt queueSize = OptionalInt.of(PostgresUtils.getQueueSize(sourceConfig));
425-
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
426-
LOGGER.info("Queue size: {}", queueSize.getAsInt());
427-
428-
final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil();
429-
final JsonNode state =
430-
(stateManager.getCdcStateManager().getCdcState() == null ||
431-
stateManager.getCdcStateManager().getCdcState().getState() == null) ? null
432-
: Jsons.clone(stateManager.getCdcStateManager().getCdcState().getState());
433-
434-
final OptionalLong savedOffset = postgresDebeziumStateUtil.savedOffset(
435-
Jsons.clone(PostgresCdcProperties.getDebeziumDefaultProperties(database)),
436-
catalog,
437-
state,
438-
sourceConfig);
439-
440-
// We should always be able to extract offset out of state if it's not null
441-
if (state != null && savedOffset.isEmpty()) {
442-
throw new RuntimeException(
443-
"Unable extract the offset out of state, State mutation might not be working. " + state.asText());
444-
}
445-
446-
final boolean savedOffsetAfterReplicationSlotLSN = postgresDebeziumStateUtil.isSavedOffsetAfterReplicationSlotLSN(
447-
// We can assume that there will be only 1 replication slot cause before the sync starts for
448-
// Postgres CDC,
449-
// we run all the check operations and one of the check validates that the replication slot exists
450-
// and has only 1 entry
451-
getReplicationSlot(database, sourceConfig).get(0),
452-
savedOffset);
453-
454-
if (!savedOffsetAfterReplicationSlotLSN) {
455-
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
456-
} else if (PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
457-
postgresDebeziumStateUtil.commitLSNToPostgresDatabase(database.getDatabaseConfig(),
458-
savedOffset,
459-
sourceConfig.get("replication_method").get("replication_slot").asText(),
460-
sourceConfig.get("replication_method").get("publication").asText(),
461-
PostgresUtils.getPluginValue(sourceConfig.get("replication_method")));
462-
}
463-
464-
final AirbyteDebeziumHandler<Long> handler = new AirbyteDebeziumHandler<>(sourceConfig,
465-
PostgresCdcTargetPosition.targetPosition(database),
466-
false,
467-
firstRecordWaitTime,
468-
queueSize);
469-
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
470-
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
471-
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier =
472-
() -> handler.getIncrementalIterators(catalog,
473-
new PostgresCdcSavedInfoFetcher(
474-
savedOffsetAfterReplicationSlotLSN ? stateManager.getCdcStateManager().getCdcState()
475-
: null),
476-
postgresCdcStateHandler,
477-
new PostgresCdcConnectorMetadataInjector(),
478-
PostgresCdcProperties.getDebeziumDefaultProperties(database),
479-
emittedAt,
480-
false);
481-
if (!savedOffsetAfterReplicationSlotLSN || streamsToSnapshot.isEmpty()) {
482-
return Collections.singletonList(incrementalIteratorSupplier.get());
483-
}
484-
485-
final AutoCloseableIterator<AirbyteMessage> snapshotIterator = handler.getSnapshotIterators(
486-
new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot), new PostgresCdcConnectorMetadataInjector(),
487-
PostgresCdcProperties.getSnapshotProperties(database), postgresCdcStateHandler, emittedAt);
488-
return Collections.singletonList(
489-
AutoCloseableIterators.concatWithEagerClose(AirbyteTraceMessageUtility::emitStreamStatusTrace, snapshotIterator,
490-
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));
491-
416+
LOGGER.info("Using ctid + CDC");
417+
return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
418+
getReplicationSlot(database, sourceConfig).get(0));
492419
}
493420

494421
if (isAnyStreamIncrementalSyncMode(catalog) && PostgresUtils.isXmin(sourceConfig)) {

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidFeatureFlags.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,13 @@ protected void initTests() {
588588
.addExpectedValues("(\"fuzzy dice\",42,1.99)", null)
589589
.build());
590590

591+
addHstoreTest();
592+
addTimeWithTimeZoneTest();
593+
addArraysTestData();
594+
addMoneyTest();
595+
}
596+
597+
protected void addHstoreTest() {
591598
addDataTypeTestData(
592599
TestDataHolder.builder()
593600
.sourceType("hstore")
@@ -602,10 +609,6 @@ protected void initTests() {
602609
{"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""",
603610
null)
604611
.build());
605-
606-
addTimeWithTimeZoneTest();
607-
addArraysTestData();
608-
addMoneyTest();
609612
}
610613

611614
protected void addMoneyTest() {

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotPostgresSourceDatatypeTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,39 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.google.common.collect.ImmutableMap;
9+
import io.airbyte.commons.features.EnvVariableFeatureFlags;
910
import io.airbyte.commons.json.Jsons;
1011
import io.airbyte.db.Database;
1112
import io.airbyte.db.factory.DSLContextFactory;
1213
import io.airbyte.db.factory.DatabaseDriver;
1314
import io.airbyte.db.jdbc.JdbcUtils;
15+
import io.airbyte.integrations.standardtest.source.TestDataHolder;
1416
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
1517
import io.airbyte.integrations.util.HostPortResolver;
18+
import io.airbyte.protocol.models.JsonSchemaType;
1619
import java.util.List;
1720
import org.jooq.SQLDialect;
21+
import org.junit.jupiter.api.extension.ExtendWith;
1822
import org.testcontainers.containers.PostgreSQLContainer;
1923
import org.testcontainers.utility.MountableFile;
24+
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
25+
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
26+
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
2027

28+
@ExtendWith(SystemStubsExtension.class)
2129
public class CdcInitialSnapshotPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest {
2230

2331
private static final String SCHEMA_NAME = "test";
2432
private static final String SLOT_NAME_BASE = "debezium_slot";
2533
private static final String PUBLICATION = "publication";
2634
private static final int INITIAL_WAITING_SECONDS = 30;
2735

36+
@SystemStub
37+
private EnvironmentVariables environmentVariables;
38+
2839
@Override
2940
protected Database setupDatabase() throws Exception {
30-
41+
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
3142
container = new PostgreSQLContainer<>("postgres:14-alpine")
3243
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"),
3344
"/etc/postgresql/postgresql.conf")
@@ -55,7 +66,6 @@ protected Database setupDatabase() throws Exception {
5566
.put("replication_method", replicationMethod)
5667
.put("is_test", true)
5768
.put(JdbcUtils.SSL_KEY, false)
58-
.put("snapshot_mode", "initial_only")
5969
.build());
6070

6171
dslContext = DSLContextFactory.create(
@@ -99,4 +109,21 @@ public boolean testCatalog() {
99109
return true;
100110
}
101111

112+
@Override
113+
protected void addHstoreTest() {
114+
addDataTypeTestData(
115+
TestDataHolder.builder()
116+
.sourceType("hstore")
117+
.airbyteType(JsonSchemaType.STRING)
118+
.addInsertValues("""
119+
'"paperback" => "243","publisher" => "postgresqltutorial.com",
120+
"language" => "English","ISBN-13" => "978-1449370000",
121+
"weight" => "11.2 ounces"'
122+
""", null)
123+
.addExpectedValues(
124+
//
125+
"\"weight\"=>\"11.2 ounces\", \"ISBN-13\"=>\"978-1449370000\", \"language\"=>\"English\", \"paperback\"=>\"243\", \"publisher\"=>\"postgresqltutorial.com\"",
126+
null)
127+
.build());
128+
}
102129
}

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.fasterxml.jackson.databind.JsonNode;
1111
import com.google.common.collect.ImmutableMap;
1212
import com.google.common.collect.Lists;
13+
import io.airbyte.commons.features.EnvVariableFeatureFlags;
1314
import io.airbyte.commons.json.Jsons;
1415
import io.airbyte.db.Database;
1516
import io.airbyte.db.factory.DSLContextFactory;
@@ -31,13 +32,19 @@
3132
import java.util.stream.Collectors;
3233
import org.jooq.DSLContext;
3334
import org.jooq.SQLDialect;
35+
import org.junit.jupiter.api.BeforeEach;
3436
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.api.extension.ExtendWith;
3538
import org.testcontainers.containers.PostgreSQLContainer;
3639
import org.testcontainers.utility.MountableFile;
40+
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
41+
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
42+
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
3743

3844
// todo (cgardens) - Sanity check that when configured for CDC that postgres performs like any other
3945
// incremental source. As we have more sources support CDC we will find a more reusable way of doing
4046
// this, but for now this is a solid sanity check.
47+
@ExtendWith(SystemStubsExtension.class)
4148
public class CdcPostgresSourceAcceptanceTest extends AbstractPostgresSourceAcceptanceTest {
4249

4350
protected static final String SLOT_NAME_BASE = "debezium_slot";
@@ -50,6 +57,13 @@ public class CdcPostgresSourceAcceptanceTest extends AbstractPostgresSourceAccep
5057
protected PostgreSQLContainer<?> container;
5158
protected JsonNode config;
5259

60+
@SystemStub
61+
private EnvironmentVariables environmentVariables;
62+
63+
@BeforeEach
64+
void setup() {
65+
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
66+
}
5367
@Override
5468
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
5569
container = new PostgreSQLContainer<>("postgres:13-alpine")

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.google.common.collect.ImmutableMap;
9+
import io.airbyte.commons.features.EnvVariableFeatureFlags;
910
import io.airbyte.commons.json.Jsons;
1011
import io.airbyte.db.Database;
1112
import io.airbyte.db.factory.DSLContextFactory;
@@ -19,12 +20,18 @@
1920
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
2021
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
2122
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
23+
import java.util.Collections;
2224
import java.util.List;
2325
import java.util.Set;
2426
import org.jooq.SQLDialect;
27+
import org.junit.jupiter.api.extension.ExtendWith;
2528
import org.testcontainers.containers.PostgreSQLContainer;
2629
import org.testcontainers.utility.MountableFile;
30+
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
31+
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
32+
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
2733

34+
@ExtendWith(SystemStubsExtension.class)
2835
public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest {
2936

3037
private static final String SCHEMA_NAME = "test";
@@ -33,6 +40,9 @@ public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSource
3340
private static final int INITIAL_WAITING_SECONDS = 30;
3441
private JsonNode stateAfterFirstSync;
3542

43+
@SystemStub
44+
private EnvironmentVariables environmentVariables;
45+
3646
@Override
3747
protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog configuredCatalog) throws Exception {
3848
if (stateAfterFirstSync == null) {
@@ -57,14 +67,11 @@ protected void postSetup() throws Exception {
5767
catalog.getStreams().add(dummyTableWithData);
5868

5969
final List<AirbyteMessage> allMessages = super.runRead(catalog);
60-
if (allMessages.size() != 2) {
61-
throw new RuntimeException("First sync should only generate 2 records");
62-
}
6370
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(allMessages);
6471
if (stateAfterFirstBatch == null || stateAfterFirstBatch.isEmpty()) {
6572
throw new RuntimeException("stateAfterFirstBatch should not be null or empty");
6673
}
67-
stateAfterFirstSync = Jsons.jsonNode(stateAfterFirstBatch);
74+
stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1)));
6875
if (stateAfterFirstSync == null) {
6976
throw new RuntimeException("stateAfterFirstSync should not be null");
7077
}
@@ -78,7 +85,7 @@ protected void postSetup() throws Exception {
7885

7986
@Override
8087
protected Database setupDatabase() throws Exception {
81-
88+
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
8289
container = new PostgreSQLContainer<>("postgres:14-alpine")
8390
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"),
8491
"/etc/postgresql/postgresql.conf")

0 commit comments

Comments
 (0)