Skip to content

Commit 6acb51d

Browse files
Fix shutdown sequence
Fix a few issues in shutdown sequence and make sure scheduler shutdown without invoking run works Inorder for the main thread to shutdown, the scheduler's final shutdown still needs to be invoked, which only happens via the run method.
1 parent 037dd7f commit 6acb51d

File tree

4 files changed

+13
-2
lines changed

4 files changed

+13
-2
lines changed

amazon-kinesis-client/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@
120120
<artifactId>http-client-spi</artifactId>
121121
<version>${awssdk.version}</version>
122122
</dependency>
123+
<dependency>
124+
<groupId>software.amazon.awssdk</groupId>
125+
<artifactId>dynamodb-enhanced</artifactId>
126+
<version>${awssdk.version}</version>
127+
</dependency>
123128

124129
<dependency>
125130
<groupId>software.amazon.glue</groupId>

amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java

+6
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ void shutdown() {
223223
workerMetricsThreadPool.shutdown();
224224
try {
225225
if (!lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
226+
log.info(
227+
"LamThreadPool did not shutdown in {}s, forcefully shutting down",
228+
SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS);
226229
lamThreadPool.shutdownNow();
227230
}
228231
} catch (final InterruptedException e) {
@@ -232,6 +235,9 @@ void shutdown() {
232235

233236
try {
234237
if (!workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
238+
log.info(
239+
"WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down",
240+
SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS);
235241
workerMetricsThreadPool.shutdownNow();
236242
}
237243
} catch (final InterruptedException e) {

amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public synchronized void enter(final ClientVersion fromClientVersion) throws Dep
8787
}
8888

8989
@Override
90-
public void leave() {
90+
public synchronized void leave() {
9191
if (entered && !left) {
9292
log.info("Leaving {}", this);
9393
cancelRollbackMonitor();

amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void shutdown() {
126126
if (!stateMachineThreadPool.isShutdown()) {
127127
stateMachineThreadPool.shutdown();
128128
try {
129-
if (stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
129+
if (!stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
130130
log.info(
131131
"StateMachineThreadPool did not shutdown within {} seconds, forcefully shutting down",
132132
THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS);

0 commit comments

Comments
 (0)