Skip to content

Commit 4ef7915

Browse files
reduce interrupt and shutdown delays to 1 minutes and 2 minutes when stopping a connector (initially set at 60minutes and 70minutes)
1 parent e2742bd commit 4ef7915

File tree

6 files changed

+30
-22
lines changed

6 files changed

+30
-22
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.23.2 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdow timeouts |
169170
| 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state |
170171
| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB |
171172
| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs |
@@ -259,4 +260,4 @@ MavenLocal debugging steps:
259260
| 0.1.1 | 2023-09-28 | [\#30835](https://github.com/airbytehq/airbyte/pull/30835) | JDBC destinations now avoid staging area name collisions by using the raw table name as the stage name. (previously we used the stream name as the stage name) |
260261
| 0.1.0 | 2023-09-27 | [\#30445](https://github.com/airbytehq/airbyte/pull/30445) | First launch, including shared classes for all connectors. |
261262
| 0.0.2 | 2023-08-21 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Version bump only (no other changes). |
262-
| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. |
263+
| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. |

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public class IntegrationRunner {
6767
static final Predicate<Thread> ORPHANED_THREAD_FILTER = runningThread -> !runningThread.getName().equals(Thread.currentThread().getName())
6868
&& !runningThread.isDaemon() && !TYPE_AND_DEDUPE_THREAD_NAME.equals(runningThread.getName());
6969

70-
public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60;
71-
public static final int EXIT_THREAD_DELAY_MINUTES = 70;
70+
public static final int INTERRUPT_THREAD_DELAY_MINUTES = 1;
71+
public static final int EXIT_THREAD_DELAY_MINUTES = 2;
7272

7373
public static final int FORCED_EXIT_CODE = 2;
7474

@@ -189,11 +189,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
189189
try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
190190
consumeWriteStream(consumer);
191191
} finally {
192-
stopOrphanedThreads(EXIT_HOOK,
193-
INTERRUPT_THREAD_DELAY_MINUTES,
194-
TimeUnit.MINUTES,
195-
EXIT_THREAD_DELAY_MINUTES,
196-
TimeUnit.MINUTES);
192+
stopOrphanedThreads();
197193
}
198194
}
199195
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
@@ -263,23 +259,15 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo
263259
LOGGER.error("Unable to perform concurrent read.", e);
264260
throw e;
265261
} finally {
266-
stopOrphanedThreads(EXIT_HOOK,
267-
INTERRUPT_THREAD_DELAY_MINUTES,
268-
TimeUnit.MINUTES,
269-
EXIT_THREAD_DELAY_MINUTES,
270-
TimeUnit.MINUTES);
262+
stopOrphanedThreads();
271263
}
272264
}
273265

274266
private void readSerial(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional) throws Exception {
275267
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
276268
produceMessages(messageIterator, outputRecordCollector);
277269
} finally {
278-
stopOrphanedThreads(EXIT_HOOK,
279-
INTERRUPT_THREAD_DELAY_MINUTES,
280-
TimeUnit.MINUTES,
281-
EXIT_THREAD_DELAY_MINUTES,
282-
TimeUnit.MINUTES);
270+
stopOrphanedThreads();
283271
}
284272
}
285273

@@ -335,6 +323,23 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer,
335323
}
336324
}
337325

326+
/**
327+
* Stops any non-daemon threads that could block the JVM from exiting when the main thread is done.
328+
*
329+
* If any active non-daemon threads would be left as orphans, this method will schedule some
330+
* interrupt/exit hooks after giving it some time delay to close up properly. It is generally
331+
* preferred to have a proper closing sequence from children threads instead of interrupting or
332+
* force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs
333+
* for maintainers to fix the code behavior instead.
334+
*/
335+
static void stopOrphanedThreads() {
336+
stopOrphanedThreads(EXIT_HOOK,
337+
INTERRUPT_THREAD_DELAY_MINUTES,
338+
TimeUnit.MINUTES,
339+
EXIT_THREAD_DELAY_MINUTES,
340+
TimeUnit.MINUTES);
341+
}
342+
338343
/**
339344
* Stops any non-daemon threads that could block the JVM from exiting when the main thread is done.
340345
* <p>
@@ -343,6 +348,7 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer,
343348
* preferred to have a proper closing sequence from children threads instead of interrupting or
344349
* force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs
345350
* for maintainers to fix the code behavior instead.
351+
* </p>
346352
*
347353
* @param exitHook The {@link Runnable} exit hook to execute for any orphaned threads.
348354
* @param interruptTimeDelay The time to delay execution of the orphaned thread interrupt attempt.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.23.0
1+
version=0.23.1

airbyte-integrations/connectors/source-mysql/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.21.4'
9+
cdkVersionRequired = '0.23.1'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.3.10
12+
dockerImageTag: 3.3.11
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

docs/integrations/sources/mysql.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ Any database or table encoding combination of charset and collation is supported
223223

224224
| Version | Date | Pull Request | Subject |
225225
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
226-
| 3.3.10 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
226+
| 3.3.11 | 2024-02-23 | [35527](https://github.com/airbytehq/airbyte/pull/35527) | Adopt 0.23.1 and shutdown timeouts. |
227+
| 3.3.10 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
227228
| 3.3.9 | 2024-02-21 | [35525](https://github.com/airbytehq/airbyte/pull/35338) | Adopt 0.21.4 and reduce cdc state compression threshold to 1MB. |
228229
| 3.3.8 | 2024-02-20 | [35338](https://github.com/airbytehq/airbyte/pull/35338) | Add config to throw an error on invalid CDC position. |
229230
| 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |

0 commit comments

Comments
 (0)