Skip to content

Commit 80920d1

Browse files
authored
Postgres on Resumable full refresh (#37112)
1 parent 715bdae commit 80920d1

File tree

25 files changed

+843
-331
lines changed

25 files changed

+843
-331
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.34.1
1+
version=0.34.2

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
715715
// sync, the
716716
// data is replicated as expected.
717717
@Throws(Exception::class)
718-
fun testCdcAndNonResumableFullRefreshInSameSync() {
718+
protected open fun testCdcAndNonResumableFullRefreshInSameSync() {
719719
val configuredCatalog = Jsons.clone(configuredCatalog)
720720

721721
val MODEL_RECORDS_2: List<JsonNode> =
@@ -734,7 +734,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
734734
createTableSqlFmt(),
735735
modelsSchema(),
736736
MODELS_STREAM_NAME_2,
737-
columnClause(columns, Optional.of(COL_ID)),
737+
columnClause(columns, Optional.empty()),
738738
)
739739

740740
for (recordJson in MODEL_RECORDS_2) {

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -420,9 +420,10 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
420420
setEmittedAtToNull(actualMessages)
421421

422422
val expectedMessages = airbyteMessagesReadOneColumn
423-
Assertions.assertEquals(expectedMessages.size, actualMessages.size)
424-
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
425-
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
423+
val actualRecordMessages = filterRecords(actualMessages)
424+
Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size)
425+
Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages))
426+
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
426427
}
427428

428429
protected open val airbyteMessagesReadOneColumn: List<AirbyteMessage>
@@ -507,8 +508,6 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
507508

508509
expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2))
509510

510-
System.out.println("catalog: " + catalog)
511-
512511
val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null))
513512
val actualRecordMessages = filterRecords(actualMessages)
514513

airbyte-integrations/connectors/source-postgres/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.34.1'
15+
cdkVersionRequired = '0.34.2'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.3.33
12+
dockerImageTag: 3.4.0
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+205-95
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcConnectorMetadataInjector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector
2323
this.lsn = null;
2424
}
2525

26-
PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) {
26+
public PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) {
2727
this.transactionTimestamp = transactionTimestamp;
2828
this.lsn = lsn;
2929
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java

+187-138
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidUtils.java

-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
1717
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
1818
import io.airbyte.protocol.models.v0.StreamDescriptor;
19-
import io.airbyte.protocol.models.v0.SyncMode;
2019
import java.util.ArrayList;
2120
import java.util.HashMap;
2221
import java.util.HashSet;
@@ -34,7 +33,6 @@ public static CtidStreams streamsToSyncViaCtid(final CdcStateManager stateManage
3433
return new CtidStreams(
3534
fullCatalog.getStreams()
3635
.stream()
37-
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
3836
.collect(Collectors.toList()),
3937
new HashMap<>());
4038
}
@@ -78,7 +76,6 @@ private static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Con
7876
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
7977
final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams));
8078
return catalog.getStreams().stream()
81-
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
8279
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))).map(Jsons::clone)
8380
.collect(Collectors.toList());
8481
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java

+65-24
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState;
99
import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState;
10+
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
1011
import io.airbyte.commons.json.Jsons;
1112
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidUtils.CtidStreams;
1213
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
@@ -33,29 +34,41 @@ public class CtidGlobalStateManager extends CtidStateManager {
3334

3435
private static final Logger LOGGER = LoggerFactory.getLogger(CtidGlobalStateManager.class);
3536

36-
private final CdcState cdcState;
37-
private final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
37+
private final StateManager stateManager;
38+
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
39+
private Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
40+
private final boolean savedOffsetAfterReplicationSlotLSN;
41+
private final CdcState defaultCdcState;
3842

3943
public CtidGlobalStateManager(final CtidStreams ctidStreams,
4044
final FileNodeHandler fileNodeHandler,
41-
final CdcState cdcState,
42-
final ConfiguredAirbyteCatalog catalog) {
45+
final StateManager stateManager,
46+
final ConfiguredAirbyteCatalog catalog,
47+
final boolean savedOffsetAfterReplicationSlotLSN,
48+
final CdcState defaultCdcState) {
4349
super(filterOutExpiredFileNodes(ctidStreams.pairToCtidStatus(), fileNodeHandler));
44-
this.cdcState = cdcState;
45-
this.streamsThatHaveCompletedSnapshot = initStreamsCompletedSnapshot(ctidStreams, catalog);
50+
this.stateManager = stateManager;
51+
this.savedOffsetAfterReplicationSlotLSN = savedOffsetAfterReplicationSlotLSN;
52+
this.defaultCdcState = defaultCdcState;
53+
initStream(ctidStreams, catalog);
54+
this.fileNodeHandler = fileNodeHandler;
4655
}
4756

48-
private static Set<AirbyteStreamNameNamespacePair> initStreamsCompletedSnapshot(final CtidStreams ctidStreams,
49-
final ConfiguredAirbyteCatalog catalog) {
50-
final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot = new HashSet<>();
57+
private void initStream(final CtidStreams ctidStreams,
58+
final ConfiguredAirbyteCatalog catalog) {
59+
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
60+
this.resumableFullRefreshStreams = new HashSet<>();
5161
catalog.getStreams().forEach(configuredAirbyteStream -> {
52-
if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) || configuredAirbyteStream.getSyncMode() != SyncMode.INCREMENTAL) {
53-
return;
62+
if (!ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) && configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
63+
streamsThatHaveCompletedSnapshot.add(
64+
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
65+
}
66+
if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream)
67+
&& configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
68+
this.resumableFullRefreshStreams.add(
69+
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
5470
}
55-
streamsThatHaveCompletedSnapshot.add(
56-
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
5771
});
58-
return streamsThatHaveCompletedSnapshot;
5972
}
6073

6174
private static Map<AirbyteStreamNameNamespacePair, CtidStatus> filterOutExpiredFileNodes(
@@ -79,37 +92,65 @@ private static Map<AirbyteStreamNameNamespacePair, CtidStatus> filterOutExpiredF
7992
public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespacePair pair, final CtidStatus ctidStatus) {
8093
pairToCtidStatus.put(pair, ctidStatus);
8194
final List<AirbyteStreamState> streamStates = new ArrayList<>();
95+
8296
streamsThatHaveCompletedSnapshot.forEach(stream -> {
8397
final DbStreamState state = getFinalState(stream);
8498
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));
8599

86100
});
87-
streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus))));
88-
final AirbyteGlobalState globalState = new AirbyteGlobalState();
89-
globalState.setSharedState(Jsons.jsonNode(cdcState));
90-
globalState.setStreamStates(streamStates);
101+
102+
resumableFullRefreshStreams.forEach(stream -> {
103+
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(stream);
104+
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(ctidStatusForFullRefreshStream))));
105+
});
106+
107+
if (!resumableFullRefreshStreams.contains(pair)) {
108+
streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus))));
109+
}
91110

92111
return new AirbyteStateMessage()
93112
.withType(AirbyteStateType.GLOBAL)
94-
.withGlobal(globalState);
113+
.withGlobal(generateGlobalState(streamStates));
114+
}
115+
116+
public AirbyteGlobalState generateGlobalState(final List<AirbyteStreamState> streamStates) {
117+
final CdcState stateToBeUsed = getCdcState();
118+
final AirbyteGlobalState globalState = new AirbyteGlobalState();
119+
globalState.setSharedState(Jsons.jsonNode(stateToBeUsed));
120+
globalState.setStreamStates(streamStates);
121+
return globalState;
122+
123+
}
124+
125+
public CdcState getCdcState() {
126+
final CdcState stateManagerCdcState = stateManager.getCdcStateManager().getCdcState();
127+
128+
return !savedOffsetAfterReplicationSlotLSN || stateManagerCdcState == null
129+
|| stateManagerCdcState.getState() == null ? defaultCdcState
130+
: stateManagerCdcState;
131+
95132
}
96133

97134
@Override
98135
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
99-
streamsThatHaveCompletedSnapshot.add(pair);
136+
// Only incremental streams can be transformed into the next phase.
137+
if (!resumableFullRefreshStreams.contains(pair)) {
138+
streamsThatHaveCompletedSnapshot.add(pair);
139+
}
100140
final List<AirbyteStreamState> streamStates = new ArrayList<>();
101141
streamsThatHaveCompletedSnapshot.forEach(stream -> {
102142
final DbStreamState state = getFinalState(stream);
103143
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));
104144
});
105145

106-
final AirbyteGlobalState globalState = new AirbyteGlobalState();
107-
globalState.setSharedState(Jsons.jsonNode(cdcState));
108-
globalState.setStreamStates(streamStates);
146+
resumableFullRefreshStreams.forEach(stream -> {
147+
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair);
148+
streamStates.add(getAirbyteStreamState(pair, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
149+
});
109150

110151
return new AirbyteStateMessage()
111152
.withType(AirbyteStateType.GLOBAL)
112-
.withGlobal(globalState);
153+
.withGlobal(generateGlobalState(streamStates));
113154
}
114155

115156
private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespacePair pair, final JsonNode stateData) {

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java

+5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa
8181

8282
@Override
8383
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
84+
if (streamStateForIncrementalRun == null || streamStateForIncrementalRun.isEmpty()) {
85+
// resumeable full refresh for cursor based stream.
86+
var ctidStatus = generateCtidStatusForState(pair);
87+
return createCtidStateMessage(pair, ctidStatus);
88+
}
8489
return XminStateManager.getAirbyteStateMessage(pair, Jsons.object(streamStateForIncrementalRun, XminStatus.class));
8590
}
8691

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java

+25-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
9+
import io.airbyte.commons.json.Jsons;
910
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
1011
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
1112
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
@@ -27,13 +28,14 @@ public abstract class CtidStateManager implements SourceStateMessageProducer<Air
2728
public static final String STATE_TYPE_KEY = "state_type";
2829

2930
protected final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus;
30-
private Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;
31+
protected Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;
3132

32-
private String lastCtid;
33-
private FileNodeHandler fileNodeHandler;
33+
protected String lastCtid;
34+
protected FileNodeHandler fileNodeHandler;
3435

3536
protected CtidStateManager(final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus) {
3637
this.pairToCtidStatus = pairToCtidStatus;
38+
this.streamStateForIncrementalRunSupplier = namespacePair -> Jsons.emptyObject();
3739
}
3840

3941
public CtidStatus getCtidStatus(final AirbyteStreamNameNamespacePair pair) {
@@ -55,26 +57,39 @@ public static boolean validateRelationFileNode(final CtidStatus ctidstatus,
5557

5658
public abstract AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun);
5759

58-
public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
59-
FileNodeHandler fileNodeHandler) {
60+
public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier) {
6061
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
62+
}
63+
64+
public void setFileNodeHandler(final FileNodeHandler fileNodeHandler) {
6165
this.fileNodeHandler = fileNodeHandler;
6266
}
6367

68+
public FileNodeHandler getFileNodeHandler() {
69+
return fileNodeHandler;
70+
}
71+
6472
@Override
6573
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
6674
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
6775
stream.getStream().getNamespace());
76+
final CtidStatus ctidStatus = generateCtidStatusForState(pair);
77+
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
78+
return createCtidStateMessage(pair, ctidStatus);
79+
}
80+
81+
protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) {
6882
final Long fileNode = fileNodeHandler.getFileNode(pair);
6983
assert fileNode != null;
70-
final CtidStatus ctidStatus = new CtidStatus()
84+
// If the table is empty, lastCtid will be set to zero for the final state message.
85+
final String lastCtidInState = (Objects.nonNull(lastCtid)
86+
&& StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString();
87+
return new CtidStatus()
7188
.withVersion(CTID_STATUS_VERSION)
7289
.withStateType(StateType.CTID)
73-
.withCtid(lastCtid)
90+
.withCtid(lastCtidInState)
7491
.withIncrementalState(getStreamState(pair))
7592
.withRelationFilenode(fileNode);
76-
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
77-
return createCtidStateMessage(pair, ctidStatus);
7893
}
7994

8095
/**
@@ -112,6 +127,7 @@ public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) {
112127

113128
private JsonNode getStreamState(final AirbyteStreamNameNamespacePair pair) {
114129
final CtidStatus currentCtidStatus = getCtidStatus(pair);
130+
115131
return (currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
116132
: currentCtidStatus.getIncrementalState();
117133
}

0 commit comments

Comments
 (0)