Skip to content

Commit f4b4863

Browse files
subodh1810tulirenoctavia-squidington-iii
authored
🎉 Postgres source: sync data from beginning if lsn is no longer valid in cdc (#15077)
* work in progress * cleanup * add test * introduce tests for state parsing util class * enable test via feature flag * review comments * Bump versions * auto-bump connector version [ci skip] Co-authored-by: Liren Tu <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent d57a6a6 commit f4b4863

File tree

13 files changed

+499
-111
lines changed

13 files changed

+499
-111
lines changed

‎airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@
762762
- name: Postgres
763763
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
764764
dockerRepository: airbyte/source-postgres
765-
dockerImageTag: 0.4.40
765+
dockerImageTag: 0.4.41
766766
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
767767
icon: postgresql.svg
768768
sourceType: database

‎airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7140,7 +7140,7 @@
71407140
supportsNormalization: false
71417141
supportsDBT: false
71427142
supported_destination_sync_modes: []
7143-
- dockerImage: "airbyte/source-postgres:0.4.40"
7143+
- dockerImage: "airbyte/source-postgres:0.4.41"
71447144
spec:
71457145
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
71467146
connectionSpecification:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.db.jdbc.JdbcUtils;
9+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
10+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
11+
import io.airbyte.protocol.models.SyncMode;
12+
import java.util.Optional;
13+
import java.util.Properties;
14+
import java.util.stream.Collectors;
15+
import org.codehaus.plexus.util.StringUtils;
16+
17+
public class DebeziumPropertiesManager {
18+
19+
private final JsonNode config;
20+
private final AirbyteFileOffsetBackingStore offsetManager;
21+
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;
22+
23+
private final Properties properties;
24+
private final ConfiguredAirbyteCatalog catalog;
25+
26+
public DebeziumPropertiesManager(final Properties properties,
27+
final JsonNode config,
28+
final ConfiguredAirbyteCatalog catalog,
29+
final AirbyteFileOffsetBackingStore offsetManager,
30+
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
31+
this.properties = properties;
32+
this.config = config;
33+
this.catalog = catalog;
34+
this.offsetManager = offsetManager;
35+
this.schemaHistoryManager = schemaHistoryManager;
36+
}
37+
38+
protected Properties getDebeziumProperties() {
39+
final Properties props = new Properties();
40+
props.putAll(properties);
41+
42+
// debezium engine configuration
43+
props.setProperty("name", "engine");
44+
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
45+
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
46+
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
47+
// default values from debezium CommonConnectorConfig
48+
props.setProperty("max.batch.size", "2048");
49+
props.setProperty("max.queue.size", "8192");
50+
51+
if (schemaHistoryManager.isPresent()) {
52+
// https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class
53+
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
54+
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
55+
// changes. If we don't do this, we can't fetch records for the table.
56+
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
57+
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
58+
}
59+
60+
// https://debezium.io/documentation/reference/configuration/avro.html
61+
props.setProperty("key.converter.schemas.enable", "false");
62+
props.setProperty("value.converter.schemas.enable", "false");
63+
64+
// debezium names
65+
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
66+
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());
67+
68+
// db connection configuration
69+
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
70+
props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText());
71+
props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText());
72+
props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText());
73+
74+
if (config.has(JdbcUtils.PASSWORD_KEY)) {
75+
props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText());
76+
}
77+
78+
// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
79+
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
80+
// explicitly in its Kafka messages for more details see:
81+
// https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types
82+
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
83+
props.setProperty("decimal.handling.mode", "string");
84+
85+
// table selection
86+
final String tableWhitelist = getTableWhitelist(catalog);
87+
props.setProperty("table.include.list", tableWhitelist);
88+
89+
return props;
90+
}
91+
92+
public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) {
93+
return catalog.getStreams().stream()
94+
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
95+
.map(ConfiguredAirbyteStream::getStream)
96+
.map(stream -> stream.getNamespace() + "." + stream.getName())
97+
// debezium needs commas escaped to split properly
98+
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
99+
.collect(Collectors.joining(","));
100+
}
101+
102+
}

‎airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java

Lines changed: 6 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@
55
package io.airbyte.integrations.debezium.internals;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8-
import com.google.common.annotations.VisibleForTesting;
9-
import io.airbyte.db.jdbc.JdbcUtils;
108
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
11-
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
12-
import io.airbyte.protocol.models.SyncMode;
139
import io.debezium.engine.ChangeEvent;
1410
import io.debezium.engine.DebeziumEngine;
1511
import io.debezium.engine.format.Json;
@@ -23,42 +19,31 @@
2319
import java.util.concurrent.TimeUnit;
2420
import java.util.concurrent.atomic.AtomicBoolean;
2521
import java.util.concurrent.atomic.AtomicReference;
26-
import java.util.stream.Collectors;
27-
import org.codehaus.plexus.util.StringUtils;
2822
import org.slf4j.Logger;
2923
import org.slf4j.LoggerFactory;
3024

3125
/**
32-
* The purpose of this class is to intiliaze and spawn the debezium engine with the right properties
33-
* to fetch records
26+
* The purpose of this class is to initialize and spawn the debezium engine with the right
27+
* properties to fetch records
3428
*/
3529
public class DebeziumRecordPublisher implements AutoCloseable {
3630

3731
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class);
3832
private final ExecutorService executor;
3933
private DebeziumEngine<ChangeEvent<String, String>> engine;
40-
41-
private final JsonNode config;
42-
private final AirbyteFileOffsetBackingStore offsetManager;
43-
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;
44-
4534
private final AtomicBoolean hasClosed;
4635
private final AtomicBoolean isClosing;
4736
private final AtomicReference<Throwable> thrownError;
4837
private final CountDownLatch engineLatch;
49-
private final Properties properties;
50-
private final ConfiguredAirbyteCatalog catalog;
38+
private final DebeziumPropertiesManager debeziumPropertiesManager;
5139

5240
public DebeziumRecordPublisher(final Properties properties,
5341
final JsonNode config,
5442
final ConfiguredAirbyteCatalog catalog,
5543
final AirbyteFileOffsetBackingStore offsetManager,
5644
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
57-
this.properties = properties;
58-
this.config = config;
59-
this.catalog = catalog;
60-
this.offsetManager = offsetManager;
61-
this.schemaHistoryManager = schemaHistoryManager;
45+
this.debeziumPropertiesManager = new DebeziumPropertiesManager(properties, config, catalog, offsetManager,
46+
schemaHistoryManager);
6247
this.hasClosed = new AtomicBoolean(false);
6348
this.isClosing = new AtomicBoolean(false);
6449
this.thrownError = new AtomicReference<>();
@@ -68,7 +53,7 @@ public DebeziumRecordPublisher(final Properties properties,
6853

6954
public void start(final Queue<ChangeEvent<String, String>> queue) {
7055
engine = DebeziumEngine.create(Json.class)
71-
.using(getDebeziumProperties())
56+
.using(debeziumPropertiesManager.getDebeziumProperties())
7257
.using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
7358
.notifying(e -> {
7459
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
@@ -120,69 +105,4 @@ public void close() throws Exception {
120105
}
121106
}
122107

123-
protected Properties getDebeziumProperties() {
124-
final Properties props = new Properties();
125-
props.putAll(properties);
126-
127-
// debezium engine configuration
128-
props.setProperty("name", "engine");
129-
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
130-
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
131-
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
132-
// default values from debezium CommonConnectorConfig
133-
props.setProperty("max.batch.size", "2048");
134-
props.setProperty("max.queue.size", "8192");
135-
136-
if (schemaHistoryManager.isPresent()) {
137-
// https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class
138-
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
139-
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
140-
// changes. If we don't do this, we can't fetch records for the table.
141-
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
142-
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
143-
}
144-
145-
// https://debezium.io/documentation/reference/configuration/avro.html
146-
props.setProperty("key.converter.schemas.enable", "false");
147-
props.setProperty("value.converter.schemas.enable", "false");
148-
149-
// debezium names
150-
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
151-
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());
152-
153-
// db connection configuration
154-
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
155-
props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText());
156-
props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText());
157-
props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText());
158-
159-
if (config.has(JdbcUtils.PASSWORD_KEY)) {
160-
props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText());
161-
}
162-
163-
// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
164-
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
165-
// explicitly in its Kafka messages for more details see:
166-
// https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types
167-
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
168-
props.setProperty("decimal.handling.mode", "string");
169-
170-
// table selection
171-
final String tableWhitelist = getTableWhitelist(catalog);
172-
props.setProperty("table.include.list", tableWhitelist);
173-
174-
return props;
175-
}
176-
177-
@VisibleForTesting
178-
public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) {
179-
return catalog.getStreams().stream()
180-
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
181-
.map(ConfiguredAirbyteStream::getStream)
182-
.map(stream -> stream.getNamespace() + "." + stream.getName())
183-
// debezium needs commas escaped to split properly
184-
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
185-
.collect(Collectors.joining(","));
186-
}
187-
188108
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import io.airbyte.commons.json.Jsons;
8+
import io.debezium.connector.postgresql.PostgresConnectorConfig;
9+
import io.debezium.connector.postgresql.PostgresOffsetContext;
10+
import io.debezium.connector.postgresql.PostgresOffsetContext.Loader;
11+
import java.util.Collections;
12+
import java.util.Map;
13+
14+
public class PostgresCustomLoader extends Loader {
15+
16+
private Map<String, ?> offset;
17+
18+
public PostgresCustomLoader(PostgresConnectorConfig connectorConfig) {
19+
super(connectorConfig);
20+
}
21+
22+
@Override
23+
public PostgresOffsetContext load(Map<String, ?> offset) {
24+
this.offset = Jsons.clone(offset);
25+
return super.load(offset);
26+
}
27+
28+
public Map<String, ?> getRawOffset() {
29+
return Collections.unmodifiableMap(offset);
30+
}
31+
32+
}

0 commit comments

Comments
 (0)