8
8
import static io .debezium .connector .postgresql .SourceInfo .LSN_KEY ;
9
9
10
10
import com .fasterxml .jackson .databind .JsonNode ;
11
+ import io .airbyte .db .jdbc .JdbcUtils ;
11
12
import io .airbyte .protocol .models .v0 .ConfiguredAirbyteCatalog ;
12
13
import io .debezium .config .Configuration ;
13
14
import io .debezium .connector .common .OffsetReader ;
16
17
import io .debezium .connector .postgresql .PostgresOffsetContext .Loader ;
17
18
import io .debezium .connector .postgresql .PostgresPartition ;
18
19
import io .debezium .connector .postgresql .connection .Lsn ;
20
+ import io .debezium .jdbc .JdbcConnection .ResultSetMapper ;
21
+ import io .debezium .jdbc .JdbcConnection .StatementFactory ;
19
22
import io .debezium .pipeline .spi .Offsets ;
20
23
import io .debezium .pipeline .spi .Partition ;
24
+ import java .sql .Connection ;
25
+ import java .sql .DriverManager ;
26
+ import java .sql .ResultSet ;
27
+ import java .sql .SQLException ;
28
+ import java .sql .Statement ;
21
29
import java .util .Collections ;
22
30
import java .util .Map ;
31
+ import java .util .Objects ;
23
32
import java .util .Optional ;
24
33
import java .util .OptionalLong ;
25
34
import java .util .Properties ;
29
38
import org .apache .kafka .connect .runtime .standalone .StandaloneConfig ;
30
39
import org .apache .kafka .connect .storage .FileOffsetBackingStore ;
31
40
import org .apache .kafka .connect .storage .OffsetStorageReaderImpl ;
41
+ import org .postgresql .core .BaseConnection ;
42
+ import org .postgresql .replication .LogSequenceNumber ;
43
+ import org .postgresql .replication .PGReplicationStream ;
44
+ import org .postgresql .replication .fluent .logical .ChainedLogicalStreamBuilder ;
32
45
import org .slf4j .Logger ;
33
46
import org .slf4j .LoggerFactory ;
34
47
@@ -40,33 +53,131 @@ public class PostgresDebeziumStateUtil {
40
53
41
54
private static final Logger LOGGER = LoggerFactory .getLogger (PostgresDebeziumStateUtil .class );
42
55
43
- public boolean isSavedOffsetAfterReplicationSlotLSN (final Properties baseProperties ,
44
- final ConfiguredAirbyteCatalog catalog ,
45
- final JsonNode cdcState ,
46
- final JsonNode replicationSlot ,
47
- final JsonNode config ) {
56
+ public boolean isSavedOffsetAfterReplicationSlotLSN (final JsonNode replicationSlot ,
57
+ final OptionalLong savedOffset ) {
48
58
59
+ if (Objects .isNull (savedOffset ) || savedOffset .isEmpty ()) {
60
+ return true ;
61
+ }
62
+
63
+ if (replicationSlot .has ("confirmed_flush_lsn" )) {
64
+ final long confirmedFlushLsnOnServerSide = Lsn .valueOf (replicationSlot .get ("confirmed_flush_lsn" ).asText ()).asLong ();
65
+ LOGGER .info ("Replication slot confirmed_flush_lsn : " + confirmedFlushLsnOnServerSide + " Saved offset LSN : " + savedOffset .getAsLong ());
66
+ return savedOffset .getAsLong () >= confirmedFlushLsnOnServerSide ;
67
+ } else if (replicationSlot .has ("restart_lsn" )) {
68
+ final long restartLsn = Lsn .valueOf (replicationSlot .get ("restart_lsn" ).asText ()).asLong ();
69
+ LOGGER .info ("Replication slot restart_lsn : " + restartLsn + " Saved offset LSN : " + savedOffset .getAsLong ());
70
+ return savedOffset .getAsLong () >= restartLsn ;
71
+ }
72
+
73
+ // We return true when saved offset is not present cause using an empty offset would result in sync
74
+ // from scratch anyway
75
+ return true ;
76
+ }
77
+
78
+ public OptionalLong savedOffset (final Properties baseProperties ,
79
+ final ConfiguredAirbyteCatalog catalog ,
80
+ final JsonNode cdcState ,
81
+ final JsonNode config ) {
49
82
final DebeziumPropertiesManager debeziumPropertiesManager = new DebeziumPropertiesManager (baseProperties , config , catalog ,
50
83
AirbyteFileOffsetBackingStore .initializeState (cdcState ),
51
84
Optional .empty ());
52
85
final Properties debeziumProperties = debeziumPropertiesManager .getDebeziumProperties ();
53
- final OptionalLong savedOffset = parseSavedOffset (debeziumProperties );
54
-
55
- if (savedOffset .isPresent ()) {
56
- if (replicationSlot .has ("confirmed_flush_lsn" )) {
57
- final long confirmedFlushLsnOnServerSide = Lsn .valueOf (replicationSlot .get ("confirmed_flush_lsn" ).asText ()).asLong ();
58
- LOGGER .info ("Replication slot confirmed_flush_lsn : " + confirmedFlushLsnOnServerSide + " Saved offset LSN : " + savedOffset .getAsLong ());
59
- return savedOffset .getAsLong () >= confirmedFlushLsnOnServerSide ;
60
- } else if (replicationSlot .has ("restart_lsn" )) {
61
- final long restartLsn = Lsn .valueOf (replicationSlot .get ("restart_lsn" ).asText ()).asLong ();
62
- LOGGER .info ("Replication slot restart_lsn : " + restartLsn + " Saved offset LSN : " + savedOffset .getAsLong ());
63
- return savedOffset .getAsLong () >= restartLsn ;
86
+ return parseSavedOffset (debeziumProperties );
87
+ }
88
+
89
+ private Connection connection (final JsonNode jdbcConfig ) throws SQLException {
90
+ Properties properties = new Properties ();
91
+ properties .setProperty ("user" , jdbcConfig .has (JdbcUtils .USERNAME_KEY ) ? jdbcConfig .get (JdbcUtils .USERNAME_KEY ).asText ()
92
+ : null );
93
+ properties .setProperty ("password" , jdbcConfig .has (JdbcUtils .PASSWORD_KEY ) ? jdbcConfig .get (JdbcUtils .PASSWORD_KEY ).asText ()
94
+ : null );
95
+ properties .setProperty ("assumeMinServerVersion" , "9.4" );
96
+ properties .setProperty ("ApplicationName" , "Airbyte Debezium Streaming" );
97
+ properties .setProperty ("replication" , "database" );
98
+ properties .setProperty ("preferQueryMode" , "simple" ); // replication protocol only supports simple query mode
99
+
100
+ return DriverManager .getConnection (jdbcConfig .get (JdbcUtils .JDBC_URL_KEY ).asText (), properties );
101
+ }
102
+
103
+ public void commitLSNToPostgresDatabase (final JsonNode jdbcConfig ,
104
+ final OptionalLong savedOffset ,
105
+ final String slotName ,
106
+ final String publicationName ,
107
+ final String plugin ) {
108
+ if (Objects .isNull (savedOffset ) || savedOffset .isEmpty ()) {
109
+ return ;
110
+ }
111
+
112
+ final LogSequenceNumber logSequenceNumber = LogSequenceNumber .valueOf (savedOffset .getAsLong ());
113
+
114
+ try (final BaseConnection pgConnection = ((BaseConnection ) connection (jdbcConfig ))) {
115
+ validateReplicationConnection (pgConnection );
116
+
117
+ ChainedLogicalStreamBuilder streamBuilder = pgConnection
118
+ .getReplicationAPI ()
119
+ .replicationStream ()
120
+ .logical ()
121
+ .withSlotName ("\" " + slotName + "\" " )
122
+ .withStartPosition (logSequenceNumber );
123
+
124
+ streamBuilder = addSlotOption (publicationName , plugin , pgConnection , streamBuilder );
125
+
126
+ try (final PGReplicationStream stream = streamBuilder .start ()) {
127
+ stream .forceUpdateStatus ();
128
+
129
+ stream .setFlushedLSN (logSequenceNumber );
130
+ stream .setAppliedLSN (logSequenceNumber );
131
+
132
+ stream .forceUpdateStatus ();
64
133
}
134
+ } catch (SQLException e ) {
135
+ throw new RuntimeException (e );
65
136
}
137
+ }
66
138
67
- // We return true when saved offset is not present cause using an empty offset would result in sync
68
- // from scratch anyway
69
- return true ;
139
+ private ChainedLogicalStreamBuilder addSlotOption (final String publicationName ,
140
+ final String plugin ,
141
+ final BaseConnection pgConnection ,
142
+ ChainedLogicalStreamBuilder streamBuilder ) {
143
+ if (plugin .equalsIgnoreCase ("pgoutput" )) {
144
+ streamBuilder = streamBuilder .withSlotOption ("proto_version" , 1 )
145
+ .withSlotOption ("publication_names" , publicationName );
146
+
147
+ if (pgConnection .haveMinimumServerVersion (140000 )) {
148
+ streamBuilder = streamBuilder .withSlotOption ("messages" , true );
149
+ }
150
+ } else if (plugin .equalsIgnoreCase ("wal2json" )) {
151
+ streamBuilder = streamBuilder
152
+ .withSlotOption ("pretty-print" , 1 )
153
+ .withSlotOption ("write-in-chunks" , 1 )
154
+ .withSlotOption ("include-xids" , 1 )
155
+ .withSlotOption ("include-timestamp" , 1 )
156
+ .withSlotOption ("include-not-null" , "true" );
157
+ } else {
158
+ throw new RuntimeException ("Unknown plugin value : " + plugin );
159
+ }
160
+ return streamBuilder ;
161
+ }
162
+
163
+ private void validateReplicationConnection (final BaseConnection pgConnection ) throws SQLException {
164
+ final Lsn xlogStart = queryAndMap (pgConnection , "IDENTIFY_SYSTEM" , Connection ::createStatement , rs -> {
165
+ if (!rs .next ()) {
166
+ throw new IllegalStateException ("The DB connection is not a valid replication connection" );
167
+ }
168
+ String xlogpos = rs .getString ("xlogpos" );
169
+ return Lsn .valueOf (xlogpos );
170
+ });
171
+ }
172
+
173
+ private <T > T queryAndMap (final Connection conn , final String query , final StatementFactory statementFactory , final ResultSetMapper <T > mapper )
174
+ throws SQLException {
175
+ Objects .requireNonNull (mapper , "Mapper must be provided" );
176
+ try (Statement statement = statementFactory .createStatement (conn )) {
177
+ try (ResultSet resultSet = statement .executeQuery (query );) {
178
+ return mapper .apply (resultSet );
179
+ }
180
+ }
70
181
}
71
182
72
183
/**
0 commit comments