Skip to content

Commit a7ebd6d

Browse files
subodh1810xiaohansong
authored andcommitted
destination-async-framework: make emission of state from FlushWorkers synchronized (#35144)
1 parent f018994 commit a7ebd6d

File tree

11 files changed

+20
-13
lines changed

11 files changed

+20
-13
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. |
169170
| 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. |
170171
| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. |
171172
| 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. |

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class FlushWorkers implements AutoCloseable {
6767
private final AtomicBoolean isClosing;
6868
private final GlobalAsyncStateManager stateManager;
6969

70+
private final Object LOCK = new Object();
71+
7072
public FlushWorkers(final BufferDequeue bufferDequeue,
7173
final DestinationFlushFunction flushFunction,
7274
final Consumer<AirbyteMessage> outputRecordCollector,
@@ -238,10 +240,13 @@ public void close() throws Exception {
238240
}
239241

240242
private void emitStateMessages(final List<PartialStateWithDestinationStats> partials) {
241-
for (final PartialStateWithDestinationStats partial : partials) {
242-
final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class);
243-
message.getState().setDestinationStats(partial.stats());
244-
outputRecordCollector.accept(message);
243+
synchronized (LOCK) {
244+
for (final PartialStateWithDestinationStats partial : partials) {
245+
final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class);
246+
message.getState().setDestinationStats(partial.stats());
247+
log.info("State with arrival number {} emitted from thread {}", partial.stateArrivalNumber(), Thread.currentThread().getName());
248+
outputRecordCollector.accept(message);
249+
}
245250
}
246251
}
247252

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,8 @@ public List<PartialStateWithDestinationStats> flushStates() {
195195
if (allRecordsCommitted) {
196196
final StateMessageWithArrivalNumber stateMessage = oldestState.getLeft();
197197
final double flushedRecordsAssociatedWithState = stateIdToCounterForPopulatingDestinationStats.get(oldestStateId).doubleValue();
198-
LOGGER.info("State with arrival number {} emitted", stateMessage.arrivalNumber);
199198
output.add(new PartialStateWithDestinationStats(stateMessage.partialAirbyteStateMessage(),
200-
new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState)));
199+
new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState), stateMessage.arrivalNumber()));
201200
bytesFlushed += oldestState.getRight();
202201

203202
// cleanup

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/PartialStateWithDestinationStats.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
88
import io.airbyte.protocol.models.v0.AirbyteStateStats;
99

10-
public record PartialStateWithDestinationStats(PartialAirbyteMessage stateMessage, AirbyteStateStats stats) {}
10+
public record PartialStateWithDestinationStats(PartialAirbyteMessage stateMessage, AirbyteStateStats stats, long stateArrivalNumber) {}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.20.1
1+
version=0.20.2

airbyte-integrations/connectors/destination-bigquery/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.20.1'
6+
cdkVersionRequired = '0.20.2'
77
features = [
88
'datastore-bigquery',
99
'db-destinations',

airbyte-integrations/connectors/destination-bigquery/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
8-
dockerImageTag: 2.4.7
8+
dockerImageTag: 2.4.8
99
dockerRepository: airbyte/destination-bigquery
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
1111
githubIssueLabel: destination-bigquery

airbyte-integrations/connectors/destination-snowflake/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.20.1'
6+
cdkVersionRequired = '0.20.2'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.5.9
8+
dockerImageTag: 3.5.10
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

docs/integrations/destinations/bigquery.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ tutorials:
210210

211211
| Version | Date | Pull Request | Subject |
212212
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
213-
| 2.4.7 | 2024-02-23 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |
213+
| 2.4.8 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 |
214+
| 2.4.7 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |
214215
| 2.4.6 | 2024-02-09 | [34575](https://github.com/airbytehq/airbyte/pull/34575) | Adopt CDK 0.20.0 |
215216
| 2.4.5 | 2024-02-08 | [34745](https://github.com/airbytehq/airbyte/pull/34745) | Adopt CDK 0.19.0 |
216217
| 2.4.4 | 2024-02-08 | [35027](https://github.com/airbytehq/airbyte/pull/35027) | Upgrade CDK to 0.17.1 |

docs/integrations/destinations/snowflake.md

+1
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n
246246

247247
| Version | Date | Pull Request | Subject |
248248
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
249+
| 3.5.10 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 |
249250
| 3.5.9 | 2024-02-12 | [35111](https://github.com/airbytehq/airbyte/pull/35111) | Adopt CDK 0.20.1 |
250251
| 3.5.8 | 2024-02-09 | [34574](https://github.com/airbytehq/airbyte/pull/34574) | Adopt CDK 0.20.0 |
251252
| 3.5.7 | 2024-02-08 | [34747](https://github.com/airbytehq/airbyte/pull/34747) | Adopt CDK 0.19.0 |

0 commit comments

Comments
 (0)