Skip to content

Commit 073b940

Browse files
db-sources: disable counts for state messages for FULL_REFRESH streams (#38208)
Co-authored-by: Xiaohan Song <[email protected]>
1 parent e19e634 commit 073b940

File tree

15 files changed

+92
-69
lines changed

15 files changed

+92
-69
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.35.2
1+
version=0.35.4

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt

+19-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.google.common.collect.AbstractIterator
77
import io.airbyte.protocol.models.v0.AirbyteMessage
88
import io.airbyte.protocol.models.v0.AirbyteStateStats
99
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
10+
import io.airbyte.protocol.models.v0.SyncMode
1011
import java.time.Duration
1112
import java.time.Instant
1213
import java.time.OffsetDateTime
@@ -41,9 +42,11 @@ open class SourceStateIterator<T>(
4142
) {
4243
val stateMessage =
4344
sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream)
44-
stateMessage!!.withSourceStats(
45-
AirbyteStateStats().withRecordCount(recordCount.toDouble())
46-
)
45+
if (shouldAttachCountWithState()) {
46+
stateMessage!!.withSourceStats(
47+
AirbyteStateStats().withRecordCount(recordCount.toDouble())
48+
)
49+
}
4750

4851
recordCount = 0L
4952
lastCheckpoint = Instant.now()
@@ -64,9 +67,11 @@ open class SourceStateIterator<T>(
6467
hasEmittedFinalState = true
6568
val finalStateMessageForStream =
6669
sourceStateMessageProducer.createFinalStateMessage(stream)
67-
finalStateMessageForStream!!.withSourceStats(
68-
AirbyteStateStats().withRecordCount(recordCount.toDouble())
69-
)
70+
if (shouldAttachCountWithState()) {
71+
finalStateMessageForStream!!.withSourceStats(
72+
AirbyteStateStats().withRecordCount(recordCount.toDouble())
73+
)
74+
}
7075
recordCount = 0L
7176
return AirbyteMessage()
7277
.withType(AirbyteMessage.Type.STATE)
@@ -76,6 +81,14 @@ open class SourceStateIterator<T>(
7681
}
7782
}
7883

84+
/**
85+
* We are disabling counts for FULL_REFRESH streams cause there is are issues with it. We should
86+
* re-enable it once we do the work for project Counts: Emit Counts in Full Refresh
87+
*/
88+
private fun shouldAttachCountWithState(): Boolean {
89+
return stream?.syncMode != SyncMode.FULL_REFRESH
90+
}
91+
7992
// This method is used to check if we should emit a state message. If the record count is set to
8093
// 0,
8194
// we should not emit a state message.

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducerTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ internal class CursorStateMessageProducerTest {
446446
NAMESPACE,
447447
Field.of(UUID_FIELD_NAME, JsonSchemaType.STRING)
448448
)
449+
.withSyncMode(SyncMode.INCREMENTAL)
449450
.withCursorField(List.of(UUID_FIELD_NAME))
450451

451452
private val EMPTY_STATE_MESSAGE = createEmptyStateMessage(0.0)

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

-10
Original file line numberDiff line numberDiff line change
@@ -665,10 +665,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
665665
modelsSchema(),
666666
)
667667
} else {
668-
assertExpectedStateMessageCountMatches(
669-
stateMessages1,
670-
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
671-
)
672668
assertExpectedRecords(
673669
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
674670
.collect(Collectors.toSet()),
@@ -690,7 +686,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
690686
val recordMessages2 = extractRecordMessages(actualRecords2)
691687
val stateMessages2 = extractStateMessages(actualRecords2)
692688

693-
assertExpectedStateMessageCountMatches(stateMessages2, 7)
694689
assertExpectedRecords(
695690
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
696691
.collect(Collectors.toSet()),
@@ -1134,7 +1129,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
11341129
val recordsFromFirstBatch = extractRecordMessages(dataFromFirstBatch)
11351130
val stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch)
11361131
assertExpectedStateMessagesForFullRefresh(stateAfterFirstBatch)
1137-
assertExpectedStateMessageCountMatches(stateAfterFirstBatch, MODEL_RECORDS.size.toLong())
11381132

11391133
val stateMessageEmittedAfterFirstSyncCompletion =
11401134
stateAfterFirstBatch[stateAfterFirstBatch.size - 1]
@@ -1244,10 +1238,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
12441238

12451239
Assertions.assertEquals(12, recordsFromFirstBatch.size)
12461240

1247-
assertExpectedStateMessageCountMatches(
1248-
stateAfterFirstBatch,
1249-
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
1250-
)
12511241
stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
12521242
}
12531243

airbyte-integrations/connectors/source-mssql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.33.2'
6+
cdkVersionRequired = '0.35.4'
77
features = ['db-sources']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/source-mssql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.0.22
12+
dockerImageTag: 4.0.23
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.concurrent.ExecutorService;
5959
import java.util.concurrent.Executors;
6060
import java.util.concurrent.TimeUnit;
61+
import java.util.concurrent.atomic.AtomicLong;
6162
import java.util.stream.Collectors;
6263
import javax.sql.DataSource;
6364
import org.junit.jupiter.api.*;
@@ -110,6 +111,13 @@ protected JsonNode config() {
110111
.build();
111112
}
112113

114+
@Override
115+
protected void assertExpectedStateMessageCountMatches(final List<? extends AirbyteStateMessage> stateMessages, long totalCount) {
116+
AtomicLong count = new AtomicLong(0L);
117+
stateMessages.stream().forEach(stateMessage -> count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue()));
118+
assertEquals(totalCount, count.get());
119+
}
120+
113121
@Override
114122
@BeforeEach
115123
protected void setup() {

airbyte-integrations/connectors/source-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.35.2'
9+
cdkVersionRequired = '0.35.4'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.4.3
12+
dockerImageTag: 3.4.4
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.util.Random;
7575
import java.util.Set;
7676
import java.util.concurrent.TimeUnit;
77+
import java.util.concurrent.atomic.AtomicLong;
7778
import java.util.stream.Collectors;
7879
import org.junit.jupiter.api.Order;
7980
import org.junit.jupiter.api.Test;
@@ -92,6 +93,13 @@ public class CdcMysqlSourceTest extends CdcSourceTest<MySqlSource, MySQLTestData
9293
private static final List<JsonNode> DATE_TIME_RECORDS = ImmutableList.of(
9394
Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_DATE_TIME, "'2023-00-00 20:37:47'")));
9495

96+
@Override
97+
protected void assertExpectedStateMessageCountMatches(final List<? extends AirbyteStateMessage> stateMessages, long totalCount) {
98+
AtomicLong count = new AtomicLong(0L);
99+
stateMessages.stream().forEach(stateMessage -> count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue()));
100+
assertEquals(totalCount, count.get());
101+
}
102+
95103
@Override
96104
protected MySQLTestDatabase createTestDatabase() {
97105
return MySQLTestDatabase.in(BaseImage.MYSQL_8, ContainerModifier.INVALID_TIMEZONE_CEST).withCdcPermissions();

airbyte-integrations/connectors/source-postgres/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.35.2'
15+
cdkVersionRequired = '0.35.4'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.3
12+
dockerImageTag: 3.4.4
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

0 commit comments

Comments
 (0)