Skip to content

[Source-postgres] : Source operations suport for meta column #36432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4317ffb
local_cdk = true
akashkulk Mar 19, 2024
7aee708
suppresswarnings
akashkulk Mar 19, 2024
d0ab13e
Cleanup
akashkulk Mar 19, 2024
9b971de
fix format
akashkulk Mar 19, 2024
dc910a1
Fix formatting
akashkulk Mar 19, 2024
fba2240
Initial POC
akashkulk Mar 21, 2024
f018101
Revert mysql
akashkulk Mar 24, 2024
7170142
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 24, 2024
6aecd75
Toggle cdk flag
akashkulk Mar 24, 2024
84f1466
Fix formatting
akashkulk Mar 24, 2024
6057bb8
Remove extra injector
akashkulk Mar 24, 2024
77a7baa
Fix test
akashkulk Mar 24, 2024
347af22
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 25, 2024
1a237db
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 25, 2024
b75233c
Null checks + keep Ctid source operations override
akashkulk Mar 25, 2024
d70507e
Change logs
akashkulk Mar 25, 2024
0b5964e
fix format
akashkulk Mar 25, 2024
15ec2c2
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 25, 2024
5cc43c6
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 25, 2024
233f565
Update log
akashkulk Mar 25, 2024
bcd6112
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 26, 2024
fb3870a
Bump versions
akashkulk Mar 26, 2024
3a050ea
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 26, 2024
2667bd4
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 26, 2024
a9734b8
Toggle CDK switch
akashkulk Mar 26, 2024
09aed9b
Bump cdk
akashkulk Mar 26, 2024
de49158
Merge branch 'master' into akash/pgrefac-source-ops
akashkulk Mar 26, 2024
e933574
toggle cdk versions
akashkulk Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.27.6 | 2024-03-26 | [\#36432](https://github.com/airbytehq/airbyte/pull/36432) | Sources support for AirbyteRecordMessageMeta during reading source data types. |
| 0.27.5 | 2024-03-25 | [\#36461](https://github.com/airbytehq/airbyte/pull/36461) | Destinations: Handle case-sensitive columns in destination state handling. |
| 0.27.4 | 2024-03-25 | [\#36333](https://github.com/airbytehq/airbyte/pull/36333) | Sunset DebeziumSourceDecoratingIterator. |
| 0.27.1 | 2024-03-22 | [\#36296](https://github.com/airbytehq/airbyte/pull/36296) | Destinations: (async framework) Do not log invalid message data. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public interface JdbcCompatibleSourceOperations<SourceType> extends SourceOperations<ResultSet, SourceType> {

AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException;

/**
* Read from a result set, and copy the value of the column at colIndex to the Json object.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import io.airbyte.cdk.db.DataTypeUtils;
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
Expand All @@ -28,19 +32,48 @@
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.slf4j.LoggerFactory;

/**
* Source operation skeleton for JDBC compatible databases.
*/
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations.class);

/**
* A Date representing the earliest date in CE. Any date before this is in BCE.
*/
private static final Date ONE_CE = Date.valueOf("0001-01-01");

public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What throws an SQLException here?

// the first call communicates with the database. after that the result is cached.
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
final List<AirbyteRecordMessageMetaChange> metaChanges = new ArrayList<>();

for (int i = 1; i <= columnCount; i++) {
final String columnName = queryContext.getMetaData().getColumnName(i);
try {
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
} catch (Exception e) {
LOGGER.info("Failed to serialize column: {}, with error {}", columnName, e.getMessage());
metaChanges.add(
new AirbyteRecordMessageMetaChange()
.withField(columnName)
.withChange(Change.NULLED)
.withReason(Reason.SOURCE_SERIALIZATION_ERROR));
}
}

return new AirbyteRecordData(jsonNode, new AirbyteRecordMessageMeta().withChanges(metaChanges));
}

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;

public record AirbyteRecordData(JsonNode rawRowData, AirbyteRecordMessageMeta meta) {}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.27.5
version=0.27.6
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ public interface CdcMetadataInjector<T> {
*/
void addMetaData(ObjectNode event, JsonNode source);

// TODO : Remove this - it is deprecated.
default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record, final String transactionTimestamp, final T metadataToAdd) {
throw new RuntimeException("Not Supported");
}

default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record) {
throw new RuntimeException("Not Supported");
}

/**
* As part of Airbyte record we need to add the namespace (schema name)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -36,6 +35,7 @@
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.cdk.db.SqlDatabase;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
Expand Down Expand Up @@ -104,29 +104,43 @@ public AbstractJdbcSource(final String driverClass,
}

@Override
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField) {
protected AutoCloseableIterator<AirbyteRecordData> queryTableFullRefresh(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField) {
LOGGER.info("Queueing query for table: {}", tableName);
// This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the records
// matters
// as intermediate state messages are emitted (if the connector emits intermediate state).
if (syncMode.equals(SyncMode.INCREMENTAL) && getStateEmissionFrequency() > 0) {
final String quotedCursorField = enquoteIdentifier(cursorField.get(), getQuoteString());
return queryTable(database, String.format("SELECT %s FROM %s ORDER BY %s ASC",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()), quotedCursorField),
tableName, schemaName);
} else {
// If we are in FULL_REFRESH mode, state messages are never emitted, so we don't care about ordering
// of the records.
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString())), tableName, schemaName);
}
final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair airbyteStream =
AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
connection -> {
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final String wrappedColumnNames = getWrappedColumnNames(database, connection, columnNames, schemaName, tableName);
final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s",
wrappedColumnNames,
fullTableName));
// if the connector emits intermediate states, the incremental query must be sorted by the cursor
// field
if (syncMode.equals(SyncMode.INCREMENTAL) && getStateEmissionFrequency() > 0) {
final String quotedCursorField = enquoteIdentifier(cursorField.get(), getQuoteString());
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField));
}

final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement);
return preparedStatement;
},
sourceOperations::convertDatabaseRowToAirbyteRecordData);
return AutoCloseableIterators.fromStream(stream, airbyteStream);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}, airbyteStream);
}

/**
Expand Down Expand Up @@ -322,18 +336,18 @@ public boolean isCursorType(final Datatype type) {
}

@Override
public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final CursorInfo cursorInfo,
final Datatype cursorFieldType) {
public AutoCloseableIterator<AirbyteRecordData> queryTableIncremental(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final CursorInfo cursorInfo,
final Datatype cursorFieldType) {
LOGGER.info("Queueing query for table: {}", tableName);
final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair airbyteStream =
AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.unsafeQuery(
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
connection -> {
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());
Expand Down Expand Up @@ -370,7 +384,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor());
return preparedStatement;
},
sourceOperations::rowToJson);
sourceOperations::convertDatabaseRowToAirbyteRecordData);
return AutoCloseableIterators.fromStream(stream, airbyteStream);
} catch (final SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import datadog.trace.api.Trace;
import io.airbyte.cdk.db.AbstractDatabase;
import io.airbyte.cdk.db.IncrementalUtils;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.JdbcConnector;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
Expand Down Expand Up @@ -43,6 +44,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
Expand Down Expand Up @@ -466,7 +468,7 @@ private AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Databas
table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)),
String.format("Could not find cursor field %s in table %s", cursorField, table.getName()));

final AutoCloseableIterator<JsonNode> queryIterator = queryTableIncremental(
final AutoCloseableIterator<AirbyteRecordData> queryIterator = queryTableIncremental(
database,
selectedDatabaseFields,
table.getNameSpace(),
Expand Down Expand Up @@ -498,26 +500,31 @@ private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Databas
final Instant emittedAt,
final SyncMode syncMode,
final Optional<String> cursorField) {
final AutoCloseableIterator<JsonNode> queryStream =
final AutoCloseableIterator<AirbyteRecordData> queryStream =
queryTableFullRefresh(database, selectedDatabaseFields, table.getNameSpace(),
table.getName(), syncMode, cursorField);
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}

private static AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final AutoCloseableIterator<AirbyteRecordData> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator,
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(streamName, namespace),
r -> new AirbyteMessage()
airbyteRecordData -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
.withData(airbyteRecordData.rawRowData())
.withMeta(isMetaChangesEmptyOrNull(airbyteRecordData.meta()) ? null : airbyteRecordData.meta())));
}

private static boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) {
return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty();
}

/**
Expand Down Expand Up @@ -649,12 +656,12 @@ protected abstract Map<String, List<String>> discoverPrimaryKeys(Database databa
* @param syncMode The sync mode that this full refresh stream should be associated with.
* @return iterator with read data
*/
protected abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField);
protected abstract AutoCloseableIterator<AirbyteRecordData> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField);

/**
* Read incremental data from a table. Incremental read should return only records where cursor
Expand All @@ -664,12 +671,12 @@ protected abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final D
*
* @return iterator with read data
*/
protected abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
List<String> columnNames,
String schemaName,
String tableName,
CursorInfo cursorInfo,
DataType cursorFieldType);
protected abstract AutoCloseableIterator<AirbyteRecordData> queryTableIncremental(Database database,
List<String> columnNames,
String schemaName,
String tableName,
CursorInfo cursorInfo,
DataType cursorFieldType);

/**
* When larger than 0, the incremental iterator will emit intermediate state for every N records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.27.4'
cdkVersionRequired = '0.27.6'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Loading