73
73
@ Order (1 )
74
74
public class CdcPostgresSourceTest extends CdcSourceTest <PostgresSource , PostgresTestDatabase > {
75
75
76
+ protected BaseImage postgresImage ;
77
+
78
+ protected void setBaseImage () {
79
+ this .postgresImage = getServerImage ();
80
+ }
81
+
76
82
@ Override
77
83
protected PostgresTestDatabase createTestDatabase () {
78
- return PostgresTestDatabase .in (getServerImage (), ContainerModifier .CONF ).withReplicationSlot ();
84
+ setBaseImage ();
85
+ return PostgresTestDatabase .in (this .postgresImage , ContainerModifier .CONF ).withReplicationSlot ();
79
86
}
80
87
81
88
@ Override
@@ -101,6 +108,15 @@ protected void setup() {
101
108
testdb .withPublicationForAllTables ();
102
109
}
103
110
111
+ // For legacy Postgres we will call advanceLsn() after we retrieved target LSN, so that debezium
112
+ // would not drop any record.
113
+ // However, that might cause unexpected state and cause failure in the test. Thus we need to bypass
114
+ // some check if they are on legacy postgres
115
+ // versions.
116
+ private boolean isOnLegacyPostgres () {
117
+ return postgresImage .majorVersion < 15 ;
118
+ }
119
+
104
120
@ Test
105
121
void testDebugMode () {
106
122
final JsonNode invalidDebugConfig = testdb .testConfigBuilder ()
@@ -196,7 +212,12 @@ private void assertStateTypes(final List<AirbyteStateMessage> stateMessages, fin
196
212
if (Objects .isNull (sharedState )) {
197
213
sharedState = global .getSharedState ();
198
214
} else {
199
- assertEquals (sharedState , global .getSharedState ());
215
+ // This validation is only true for versions on or after postgres 15. We execute
216
+ // EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS for earlier versions of
217
+ // Postgres. See https://github.com/airbytehq/airbyte/pull/33605 for details.
218
+ if (!isOnLegacyPostgres ()) {
219
+ assertEquals (sharedState , global .getSharedState ());
220
+ }
200
221
}
201
222
assertEquals (1 , global .getStreamStates ().size ());
202
223
final AirbyteStreamState streamState = global .getStreamStates ().get (0 );
@@ -324,7 +345,11 @@ public void testTwoStreamSync() throws Exception {
324
345
if (Objects .isNull (sharedState )) {
325
346
sharedState = global .getSharedState ();
326
347
} else {
327
- assertEquals (sharedState , global .getSharedState ());
348
+ // LSN will be advanced for postgres version before 15. See
349
+ // https://github.com/airbytehq/airbyte/pull/33605
350
+ if (!isOnLegacyPostgres ()) {
351
+ assertEquals (sharedState , global .getSharedState ());
352
+ }
328
353
}
329
354
330
355
if (Objects .isNull (firstStreamInState )) {
@@ -755,7 +780,11 @@ protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition
755
780
if (syncNumber == 1 ) {
756
781
assertEquals (1 , lsnPosition2 .compareTo (lsnPosition1 ));
757
782
} else if (syncNumber == 2 ) {
758
- assertEquals (0 , lsnPosition2 .compareTo (lsnPosition1 ));
783
+ // Earlier Postgres version will advance lsn even if there is no sync records. See
784
+ // https://github.com/airbytehq/airbyte/pull/33605.
785
+ if (!isOnLegacyPostgres ()) {
786
+ assertEquals (0 , lsnPosition2 .compareTo (lsnPosition1 ));
787
+ }
759
788
} else {
760
789
throw new RuntimeException ("Unknown sync number " + syncNumber );
761
790
}
@@ -791,7 +820,9 @@ protected void verifyCheckpointStatesByRecords() throws Exception {
791
820
.toListAndClose (secondBatchIterator );
792
821
assertEquals (recordsToCreate , extractRecordMessages (dataFromSecondBatch ).size ());
793
822
final List <AirbyteStateMessage > stateMessagesCDC = extractStateMessages (dataFromSecondBatch );
794
- assertTrue (stateMessagesCDC .size () > 1 , "Generated only the final state." );
823
+ if (!isOnLegacyPostgres ()) {
824
+ assertTrue (stateMessagesCDC .size () > 1 , "Generated only the final state." );
825
+ }
795
826
assertEquals (stateMessagesCDC .size (), stateMessagesCDC .stream ().distinct ().count (), "There are duplicated states." );
796
827
}
797
828
@@ -830,7 +861,9 @@ protected void verifyCheckpointStatesBySeconds() throws Exception {
830
861
831
862
assertEquals (recordsToCreate , extractRecordMessages (dataFromSecondBatch ).size ());
832
863
final List <AirbyteStateMessage > stateMessagesCDC = extractStateMessages (dataFromSecondBatch );
833
- assertTrue (stateMessagesCDC .size () > 1 , "Generated only the final state." );
864
+ if (!isOnLegacyPostgres ()) {
865
+ assertTrue (stateMessagesCDC .size () > 1 , "Generated only the final state." );
866
+ }
834
867
assertEquals (stateMessagesCDC .size (), stateMessagesCDC .stream ().distinct ().count (), "There are duplicated states." );
835
868
}
836
869
0 commit comments