Skip to content

Commit c739c4c

Browse files
rodireichalafanechereoctavia-squidington-iii
authored
Implement Debezium heartbeats for source-postgres (#19004)
* Initial working commit * Code sanity. Provide no-on implementation to mysql, MSSql to allow compilation. * Update test * sanity * sanity * sanity * sanity * sanity * changes per review comments * Make heartbeat change waittime configurable. * Trying to bypass test strictness test * Trying to bypass test strictness test * Trying to bypass test strictness test * fix acceptance test config format * add missing SAT test in config * revert back changes in acceptance-test-config.yml * Version and notes * auto-bump connector version Co-authored-by: alafanechere <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 26c866b commit c739c4c

File tree

12 files changed

+170
-10
lines changed

12 files changed

+170
-10
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,7 @@
11851185
- name: Postgres
11861186
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
11871187
dockerRepository: airbyte/source-postgres
1188-
dockerImageTag: 1.0.24
1188+
dockerImageTag: 1.0.25
11891189
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
11901190
icon: postgresql.svg
11911191
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10956,7 +10956,7 @@
1095610956
supportsNormalization: false
1095710957
supportsDBT: false
1095810958
supported_destination_sync_modes: []
10959-
- dockerImage: "airbyte/source-postgres:1.0.24"
10959+
- dockerImage: "airbyte/source-postgres:1.0.25"
1096010960
spec:
1096110961
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
1096210962
connectionSpecification:

airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.integrations.debezium;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.debezium.engine.ChangeEvent;
89

910
/**
1011
* This interface is used to define the target position at the beginning of the sync so that once we
@@ -15,6 +16,41 @@
1516
*/
1617
public interface CdcTargetPosition {
1718

19+
/**
20+
* Reads a position value (lsn) from a change event and compares it to target lsn
21+
*
22+
* @param valueAsJson json representation of a change event
23+
* @return true if event lsn is equal or greater than targer lsn, or if last snapshot event
24+
*/
1825
boolean reachedTargetPosition(JsonNode valueAsJson);
1926

27+
/**
28+
* Returns a position value (lsn) from a heartbeat event.
29+
*
30+
* @param heartbeatEvent a heartbeat change event
31+
* @return the lsn value in a heartbeat change event or null
32+
*/
33+
default Long getHeartbeatPosition(final ChangeEvent<String, String> heartbeatEvent) {
34+
throw new UnsupportedOperationException();
35+
}
36+
37+
/**
38+
* Checks if a specified lsn has reached the target lsn.
39+
*
40+
* @param lsn an lsn value
41+
* @return true if lsn is equal or greater than target lsn
42+
*/
43+
default boolean reachedTargetPosition(final Long lsn) {
44+
throw new UnsupportedOperationException();
45+
}
46+
47+
/**
48+
* Indicates whether the implementation supports heartbeat position.
49+
*
50+
* @return true if heartbeats are supported
51+
*/
52+
default boolean isHeartbeatSupported() {
53+
return false;
54+
}
55+
2056
}

airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.airbyte.integrations.debezium.CdcTargetPosition;
1414
import io.debezium.engine.ChangeEvent;
1515
import java.time.Duration;
16+
import java.time.LocalDateTime;
1617
import java.util.concurrent.LinkedBlockingQueue;
1718
import java.util.concurrent.TimeUnit;
1819
import java.util.function.Supplier;
@@ -46,6 +47,8 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,
4647
private boolean receivedFirstRecord;
4748
private boolean hasSnapshotFinished;
4849
private boolean signalledClose;
50+
private LocalDateTime tsLastHeartbeat;
51+
private Long lastHeartbeatPosition;
4952

5053
public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, String>> queue,
5154
final CdcTargetPosition targetPosition,
@@ -61,8 +64,17 @@ public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, Stri
6164
this.receivedFirstRecord = false;
6265
this.hasSnapshotFinished = true;
6366
this.signalledClose = false;
67+
tsLastHeartbeat = null;
68+
lastHeartbeatPosition = null;
6469
}
6570

71+
// The following logic incorporates heartbeat (CDC postgres only for now):
72+
// 1. Wait on queue either the configured time first or 1 min after a record received
73+
// 2. If nothing came out of queue finish sync
74+
// 3. If received heartbeat: check if hearbeat_lsn reached target or hasn't changed in a while
75+
// finish sync
76+
// 4. If change event lsn reached target finish sync
77+
// 5. Otherwise check message queuen again
6678
@Override
6779
protected ChangeEvent<String, String> computeNext() {
6880
// keep trying until the publisher is closed or until the queue is empty. the latter case is
@@ -71,34 +83,72 @@ protected ChangeEvent<String, String> computeNext() {
7183
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
7284
final ChangeEvent<String, String> next;
7385
try {
74-
final Duration waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME : firstRecordWaitTime;
86+
// #18987: waitTime is still required with heartbeats for backward
87+
// compatibility with connectors not implementing heartbeat
88+
// yet (MySql, MSSql), And also due to postgres taking a long time
89+
// initially staying on "searching for WAL resume position"
90+
final Duration waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME : this.firstRecordWaitTime;
7591
next = queue.poll(waitTime.getSeconds(), TimeUnit.SECONDS);
7692
} catch (final InterruptedException e) {
7793
throw new RuntimeException(e);
7894
}
7995

8096
// if within the timeout, the consumer could not get a record, it is time to tell the producer to
8197
// shutdown.
98+
// #18987: Noticed in testing that it's possible for DBZ to be stuck "Searching for WAL resume
99+
// position"
100+
// when no changes exist. In that case queue will pop after timeout with null value for next
82101
if (next == null) {
83-
LOGGER.info("Closing cause next is returned as null");
102+
LOGGER.info("Closing: queue returned null event");
84103
requestClose();
85104
LOGGER.info("no record found. polling again.");
86105
continue;
87106
}
88107

108+
if (targetPosition.isHeartbeatSupported()) {
109+
// check if heartbeat and read hearbeat position
110+
LOGGER.debug("checking heartbeat lsn for: {}", next);
111+
final Long heartbeatPos = targetPosition.getHeartbeatPosition(next);
112+
if (heartbeatPos != null) {
113+
// wrap up sync if heartbeat position crossed the target OR heartbeat position hasn't changed for
114+
// too long
115+
if (targetPosition.reachedTargetPosition(heartbeatPos)
116+
|| (heartbeatPos.equals(this.lastHeartbeatPosition) && heartbeatPosNotChanging())) {
117+
LOGGER.info("Closing: Heartbeat indicates sync is done");
118+
requestClose();
119+
}
120+
if (!heartbeatPos.equals(this.lastHeartbeatPosition)) {
121+
this.tsLastHeartbeat = LocalDateTime.now();
122+
this.lastHeartbeatPosition = heartbeatPos;
123+
}
124+
continue;
125+
}
126+
}
127+
89128
final JsonNode eventAsJson = Jsons.deserialize(next.value());
90129
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);
91130

92131
// if the last record matches the target file position, it is time to tell the producer to shutdown.
132+
93133
if (!signalledClose && shouldSignalClose(eventAsJson)) {
134+
LOGGER.info("Closing: Change event reached target position");
94135
requestClose();
95136
}
137+
this.tsLastHeartbeat = null;
138+
this.lastHeartbeatPosition = null;
96139
receivedFirstRecord = true;
97140
return next;
98141
}
99142
return endOfData();
100143
}
101144

145+
private boolean heartbeatPosNotChanging() {
146+
final Duration tbt = Duration.between(this.tsLastHeartbeat, LocalDateTime.now());
147+
LOGGER.debug("Time since last hb_pos change {}s", tbt.toSeconds());
148+
// wait time for no change in heartbeat position is half of initial waitTime
149+
return tbt.compareTo(this.firstRecordWaitTime.dividedBy(2)) > 0;
150+
}
151+
102152
private boolean hasSnapshotFinished(final JsonNode eventAsJson) {
103153
final SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase());
104154
return SnapshotMetadata.TRUE != snapshot;
@@ -122,6 +172,7 @@ private boolean hasSnapshotFinished(final JsonNode eventAsJson) {
122172
*/
123173
@Override
124174
public void close() throws Exception {
175+
LOGGER.info("Closing: Iterator closing");
125176
requestClose();
126177
}
127178

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,4 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) {
8383
+ fileName + " , target position : " + position);
8484
return true;
8585
}
86-
8786
}

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.24
19+
LABEL io.airbyte.version=1.0.25
2020
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.24
19+
LABEL io.airbyte.version=1.0.25
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ application {
1313

1414
dependencies {
1515
implementation project(':airbyte-db:db-lib')
16+
implementation 'io.debezium:debezium-api:1.9.6.Final'
17+
implementation 'io.debezium:debezium-embedded:1.9.6.Final'
1618
implementation project(':airbyte-integrations:bases:base-java')
1719
implementation project(':airbyte-integrations:bases:debezium-v1-9-6')
1820
implementation project(':airbyte-protocol:protocol-models')

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource.SslMode;
1717
import java.net.URI;
1818
import java.nio.file.Path;
19+
import java.time.Duration;
1920
import java.util.Properties;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

2324
public class PostgresCdcProperties {
2425

26+
private static final int HEARTBEAT_FREQUENCY_SEC = 10;
2527
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcProperties.class);
2628

2729
static Properties getDebeziumDefaultProperties(final JdbcDatabase database) {
@@ -54,6 +56,7 @@ private static Properties commonProperties(final JdbcDatabase database) {
5456
props.setProperty("converters", "datetime");
5557
props.setProperty("datetime.type", PostgresConverter.class.getName());
5658
props.setProperty("include.unknown.datatypes", "true");
59+
props.setProperty("heartbeat.interval.ms", Long.toString(Duration.ofSeconds(HEARTBEAT_FREQUENCY_SEC).toMillis()));
5760

5861
// Check params for SSL connection in config and add properties for CDC SSL connection
5962
// https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-database-sslmode

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcTargetPosition.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,26 @@
55
package io.airbyte.integrations.source.postgres;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.google.common.annotations.VisibleForTesting;
89
import io.airbyte.db.PgLsn;
910
import io.airbyte.db.PostgresUtils;
1011
import io.airbyte.db.jdbc.JdbcDatabase;
1112
import io.airbyte.integrations.debezium.CdcTargetPosition;
1213
import io.airbyte.integrations.debezium.internals.SnapshotMetadata;
14+
import io.debezium.engine.ChangeEvent;
15+
import java.lang.reflect.Field;
1316
import java.sql.SQLException;
1417
import java.util.Objects;
1518
import java.util.Optional;
19+
import org.apache.kafka.connect.source.SourceRecord;
1620
import org.slf4j.Logger;
1721
import org.slf4j.LoggerFactory;
1822

1923
public class PostgresCdcTargetPosition implements CdcTargetPosition {
2024

2125
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcTargetPosition.class);
22-
private final PgLsn targetLsn;
26+
@VisibleForTesting
27+
final PgLsn targetLsn;
2328

2429
public PostgresCdcTargetPosition(final PgLsn targetLsn) {
2530
this.targetLsn = targetLsn;
@@ -62,6 +67,32 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) {
6267
}
6368
}
6469

70+
private boolean isHeartbeatEvent(final ChangeEvent<String, String> event) {
71+
return Objects.nonNull(event) && !event.value().contains("source");
72+
}
73+
74+
@Override
75+
public Long getHeartbeatPosition(final ChangeEvent<String, String> heartbeatEvent) {
76+
if (isHeartbeatEvent(heartbeatEvent)) {
77+
try {
78+
final Field f = heartbeatEvent.getClass().getDeclaredField("sourceRecord");
79+
f.setAccessible(true);
80+
final SourceRecord sr = (SourceRecord) f.get(heartbeatEvent);
81+
final Long hbLsn = (Long) sr.sourceOffset().get("lsn");
82+
LOGGER.debug("Found heartbeat lsn: {}", hbLsn);
83+
return hbLsn;
84+
} catch (final NoSuchFieldException | IllegalAccessException e) {
85+
LOGGER.info("failed to get heartbeat lsn");
86+
}
87+
}
88+
return null;
89+
}
90+
91+
@Override
92+
public boolean reachedTargetPosition(final Long lsn) {
93+
return (lsn == null) ? false : lsn.compareTo(targetLsn.asLong()) >= 0;
94+
}
95+
6596
private PgLsn extractLsn(final JsonNode valueAsJson) {
6697
return Optional.ofNullable(valueAsJson.get("source"))
6798
.flatMap(source -> Optional.ofNullable(source.get("lsn").asText()))
@@ -70,4 +101,8 @@ private PgLsn extractLsn(final JsonNode valueAsJson) {
70101
.orElseThrow(() -> new IllegalStateException("Could not find LSN"));
71102
}
72103

104+
@Override
105+
public boolean isHeartbeatSupported() {
106+
return true;
107+
}
73108
}

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,13 @@
4949
import io.airbyte.protocol.models.JsonSchemaType;
5050
import io.airbyte.protocol.models.SyncMode;
5151
import io.airbyte.test.utils.PostgreSQLContainerHelper;
52+
import io.debezium.engine.ChangeEvent;
5253
import java.sql.SQLException;
54+
import java.util.Collections;
5355
import java.util.List;
5456
import java.util.Optional;
5557
import java.util.Set;
58+
import org.apache.kafka.connect.source.SourceRecord;
5659
import org.jooq.DSLContext;
5760
import org.jooq.SQLDialect;
5861
import org.junit.jupiter.api.AfterEach;
@@ -383,4 +386,34 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
383386
assertEquals(MODEL_RECORDS.size() + recordsToCreate + 1, recordsFromThirdBatch.size());
384387
}
385388

389+
@Test
390+
void testReachedTargetPosition() {
391+
final CdcTargetPosition ctp = cdcLatestTargetPosition();
392+
final PostgresCdcTargetPosition pctp = (PostgresCdcTargetPosition) ctp;
393+
final PgLsn target = pctp.targetLsn;
394+
assertTrue(ctp.reachedTargetPosition(target.asLong() + 1));
395+
assertTrue(ctp.reachedTargetPosition(target.asLong()));
396+
assertFalse(ctp.reachedTargetPosition(target.asLong() - 1));
397+
assertFalse(ctp.reachedTargetPosition((Long) null));
398+
}
399+
400+
@Test
401+
void testGetHeartbeatPosition() {
402+
final CdcTargetPosition ctp = cdcLatestTargetPosition();
403+
final PostgresCdcTargetPosition pctp = (PostgresCdcTargetPosition) ctp;
404+
final Long lsn = pctp.getHeartbeatPosition(new ChangeEvent<String, String>() {
405+
private final SourceRecord sourceRecord = new SourceRecord(null, Collections.singletonMap("lsn", 358824993496L), null, null, null );
406+
@Override
407+
public String key() { return null; }
408+
@Override
409+
public String value() { return "{\"ts_ms\":1667616934701}"; }
410+
@Override
411+
public String destination() { return null; }
412+
public SourceRecord sourceRecord() { return sourceRecord; }
413+
});
414+
415+
assertEquals(lsn, 358824993496L);
416+
417+
assertNull(pctp.getHeartbeatPosition(null));
418+
}
386419
}

docs/integrations/sources/postgres.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,9 @@ The root causes is that the WALs needed for the incremental sync has been remove
400400

401401
| Version | Date | Pull Request | Subject |
402402
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
403-
| 1.0.24 | 2022-11-07 | [19291](https://github.com/airbytehq/airbyte/pull/19291) | Default timeout is reduced from 1 min to 10sec |
404-
| 1.0.23 | 2022-11-07 | [19025](https://github.com/airbytehq/airbyte/pull/19025) | Stop enforce SSL if ssl mode is disabled |
403+
| 1.0.25 | 2022-11-16 | [19004](https://github.com/airbytehq/airbyte/pull/19004) | Use Debezium heartbeats to improve CDC replication of large databases |
404+
| 1.0.24 | 2022-11-07 | [19291](https://github.com/airbytehq/airbyte/pull/19291) | Default timeout is reduced from 1 min to 10sec |
405+
| 1.0.23 | 2022-11-07 | [19025](https://github.com/airbytehq/airbyte/pull/19025) | Stop enforce SSL if ssl mode is disabled |
405406
| 1.0.22 | 2022-10-31 | [18538](https://github.com/airbytehq/airbyte/pull/18538) | Encode database name |
406407
| 1.0.21 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
407408
| 1.0.20 | 2022-10-25 | [18383](https://github.com/airbytehq/airbyte/pull/18383) | Better SSH error handling + messages |

0 commit comments

Comments
 (0)