Skip to content

Commit a53dd7e

Browse files
authored
remove sleep logic when the queue is full from CDC (#5600)
* dont sleep when queue is full * bump version
1 parent 314a747 commit a53dd7e

File tree

6 files changed

+11
-16
lines changed

6 files changed

+11
-16
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
4646
name: Microsoft SQL Server (MSSQL)
4747
dockerRepository: airbyte/source-mssql
48-
dockerImageTag: 0.3.4
48+
dockerImageTag: 0.3.5
4949
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
5050
icon: mssql.svg
5151
- sourceDefinitionId: d8286229-c680-4063-8c59-23b9b391c700
@@ -56,7 +56,7 @@
5656
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
5757
name: Postgres
5858
dockerRepository: airbyte/source-postgres
59-
dockerImageTag: 0.3.9
59+
dockerImageTag: 0.3.10
6060
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
6161
icon: postgresql.svg
6262
- sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
@@ -101,7 +101,7 @@
101101
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
102102
name: MySQL
103103
dockerRepository: airbyte/source-mysql
104-
dockerImageTag: 0.4.3
104+
dockerImageTag: 0.4.4
105105
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
106106
icon: mysql.svg
107107
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,
6363
private final VoidCallable requestClose;
6464
private boolean receivedFirstRecord;
6565
private boolean hasSnapshotFinished;
66+
private boolean signalledClose;
6667

6768
public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
6869
CdcTargetPosition targetPosition,
@@ -74,6 +75,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
7475
this.requestClose = requestClose;
7576
this.receivedFirstRecord = false;
7677
this.hasSnapshotFinished = true;
78+
this.signalledClose = false;
7779
}
7880

7981
@Override
@@ -103,7 +105,7 @@ protected ChangeEvent<String, String> computeNext() {
103105
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);
104106

105107
// if the last record matches the target file position, it is time to tell the producer to shutdown.
106-
if (shouldSignalClose(eventAsJson)) {
108+
if (!signalledClose && shouldSignalClose(eventAsJson)) {
107109
requestClose();
108110
}
109111
receivedFirstRecord = true;
@@ -135,8 +137,7 @@ private boolean hasSnapshotFinished(JsonNode eventAsJson) {
135137
*/
136138
@Override
137139
public void close() throws Exception {
138-
requestClose.call();
139-
throwExceptionIfSnapshotNotFinished();
140+
requestClose();
140141
}
141142

142143
private boolean shouldSignalClose(JsonNode eventAsJson) {
@@ -146,6 +147,7 @@ private boolean shouldSignalClose(JsonNode eventAsJson) {
146147
private void requestClose() {
147148
try {
148149
requestClose.call();
150+
signalledClose = true;
149151
} catch (Exception e) {
150152
throw new RuntimeException(e);
151153
}

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java

-7
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,6 @@ public void start(Queue<ChangeEvent<String, String>> queue) {
9898
boolean inserted = false;
9999
while (!inserted) {
100100
inserted = queue.offer(e);
101-
if (!inserted) {
102-
try {
103-
Thread.sleep(10);
104-
} catch (InterruptedException interruptedException) {
105-
throw new RuntimeException(interruptedException);
106-
}
107-
}
108101
}
109102
}
110103
})

airbyte-integrations/connectors/source-mssql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.4
11+
LABEL io.airbyte.version=0.3.5
1212
LABEL io.airbyte.name=airbyte/source-mssql

airbyte-integrations/connectors/source-mysql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.4.3
11+
LABEL io.airbyte.version=0.4.4
1212

1313
LABEL io.airbyte.name=airbyte/source-mysql

airbyte-integrations/connectors/source-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.3.9
11+
LABEL io.airbyte.version=0.3.10
1212
LABEL io.airbyte.name=airbyte/source-postgres

0 commit comments

Comments
 (0)