Skip to content

Commit b6b66a1

Browse files
authored
fix: fully encapsulating rejected execution handling
closes: #6863 Signed-off-by: Steve Hawkins <[email protected]>
1 parent 108f35d commit b6b66a1

File tree

6 files changed

+59
-60
lines changed

6 files changed

+59
-60
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
#### Improvements
88

9+
* Fix #6863: ensuring SerialExecutor does not throw RejectedExecutionException to prevent unnecessary error logs
10+
911
#### Dependency Upgrade
1012

1113
#### New Features

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java

+12-17
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import java.util.concurrent.Executor;
4848
import java.util.concurrent.ExecutorService;
4949
import java.util.concurrent.Executors;
50-
import java.util.concurrent.RejectedExecutionException;
5150
import java.util.concurrent.atomic.AtomicBoolean;
5251
import java.util.concurrent.atomic.AtomicReference;
5352

@@ -371,23 +370,19 @@ public void onClose(WebSocket webSocket, int code, String reason) {
371370
}
372371
closeWebSocketOnce(code, reason);
373372
LOGGER.debug("Exec Web Socket: On Close with code:[{}], due to: [{}]", code, reason);
374-
try {
375-
serialExecutor.execute(() -> {
376-
try {
377-
if (exitCode.complete(null)) {
378-
// this is expected for processes that don't terminate - uploads for example
379-
LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose");
380-
}
381-
cleanUpOnce();
382-
} finally {
383-
if (listener != null) {
384-
listener.onClose(code, reason);
385-
}
373+
serialExecutor.execute(() -> {
374+
try {
375+
if (exitCode.complete(null)) {
376+
// this is expected for processes that don't terminate - uploads for example
377+
LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose");
386378
}
387-
});
388-
} catch (RejectedExecutionException e) {
389-
LOGGER.debug("Client already shutdown, aborting normal closure", e);
390-
}
379+
cleanUpOnce();
380+
} finally {
381+
if (listener != null) {
382+
listener.onClose(code, reason);
383+
}
384+
}
385+
});
391386
}
392387

393388
@Override

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java

+15-20
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.concurrent.Executor;
3333
import java.util.concurrent.ExecutorService;
3434
import java.util.concurrent.Executors;
35-
import java.util.concurrent.RejectedExecutionException;
3635
import java.util.concurrent.atomic.AtomicBoolean;
3736
import java.util.function.BooleanSupplier;
3837

@@ -130,27 +129,23 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) {
130129
} else {
131130
// Data
132131
if (out != null) {
133-
try {
134-
serialExecutor.execute(() -> {
135-
try {
136-
while (buffer.hasRemaining()) {
137-
int written = out.write(buffer); // channel byte already skipped
138-
if (written == 0) {
139-
// out is non-blocking, prevent a busy loop
140-
Thread.sleep(50);
141-
}
142-
}
143-
webSocket.request();
144-
} catch (IOException | InterruptedException e) {
145-
if (e instanceof InterruptedException) {
146-
Thread.currentThread().interrupt();
132+
serialExecutor.execute(() -> {
133+
try {
134+
while (buffer.hasRemaining()) {
135+
int written = out.write(buffer); // channel byte already skipped
136+
if (written == 0) {
137+
// out is non-blocking, prevent a busy loop
138+
Thread.sleep(50);
147139
}
148-
clientError(webSocket, "forwarding data to the client", e);
149140
}
150-
});
151-
} catch (RejectedExecutionException e) {
152-
// just ignore
153-
}
141+
webSocket.request();
142+
} catch (IOException | InterruptedException e) {
143+
if (e instanceof InterruptedException) {
144+
Thread.currentThread().interrupt();
145+
}
146+
clientError(webSocket, "forwarding data to the client", e);
147+
}
148+
});
154149
}
155150
}
156151
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void retainAll(Set<String> nextKeys, Consumer<Executor> cacheStateComplet
113113
}
114114
});
115115
if (cacheStateComplete != null) {
116-
cacheStateComplete.accept(this.processor::executeIfPossible);
116+
cacheStateComplete.accept(this.processor.getSerialExecutor()::execute);
117117
}
118118
}
119119

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java

+12-20
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.List;
2727
import java.util.Optional;
2828
import java.util.concurrent.Executor;
29-
import java.util.concurrent.RejectedExecutionException;
3029
import java.util.concurrent.locks.ReadWriteLock;
3130
import java.util.concurrent.locks.ReentrantReadWriteLock;
3231
import java.util.function.Consumer;
@@ -108,20 +107,16 @@ public void distribute(Consumer<ProcessorListener<T>> operation, boolean isSync)
108107
} finally {
109108
lock.readLock().unlock();
110109
}
111-
try {
112-
executor.execute(() -> {
113-
for (ProcessorListener<T> listener : toCall) {
114-
try {
115-
operation.accept(listener);
116-
} catch (Exception ex) {
117-
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
118-
ex);
119-
}
110+
executor.execute(() -> {
111+
for (ProcessorListener<T> listener : toCall) {
112+
try {
113+
operation.accept(listener);
114+
} catch (Exception ex) {
115+
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
116+
ex);
120117
}
121-
});
122-
} catch (RejectedExecutionException e) {
123-
// do nothing
124-
}
118+
}
119+
});
125120
}
126121

127122
public boolean shouldResync() {
@@ -202,11 +197,8 @@ public Optional<Long> getMinimalNonZeroResyncPeriod() {
202197
}
203198
}
204199

205-
public void executeIfPossible(Runnable runnable) {
206-
try {
207-
this.executor.execute(runnable);
208-
} catch (RejectedExecutionException e) {
209-
// already shutdown
210-
}
200+
public SerialExecutor getSerialExecutor() {
201+
return executor;
211202
}
203+
212204
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/SerialExecutor.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package io.fabric8.kubernetes.client.utils.internal;
1717

18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
1821
import java.util.Queue;
1922
import java.util.concurrent.Executor;
2023
import java.util.concurrent.LinkedBlockingDeque;
@@ -30,6 +33,9 @@
3033
* Added shutdown support
3134
*/
3235
public class SerialExecutor implements Executor {
36+
37+
private static final Logger log = LoggerFactory.getLogger(SerialExecutor.class);
38+
3339
final Queue<Runnable> tasks = new LinkedBlockingDeque<>();
3440
final Executor executor;
3541
Runnable active;
@@ -41,10 +47,14 @@ public SerialExecutor(Executor executor) {
4147
this.executor = executor;
4248
}
4349

50+
/**
51+
* Executes the given command at some time in the future. Unlike a normal {@link Executor}, it will
52+
* not throw a {@link RejectedExecutionException}
53+
*/
4454
@Override
4555
public synchronized void execute(final Runnable r) {
4656
if (shutdown) {
47-
throw new RejectedExecutionException();
57+
log.debug("Task submitted after the executor was shutdown");
4858
}
4959
tasks.offer(() -> {
5060
try {
@@ -72,7 +82,11 @@ public synchronized void execute(final Runnable r) {
7282

7383
protected synchronized void scheduleNext() {
7484
if ((active = tasks.poll()) != null) {
75-
executor.execute(active);
85+
try {
86+
executor.execute(active);
87+
} catch (RejectedExecutionException e) {
88+
log.debug("Underlying executor rejected execution", e);
89+
}
7690
}
7791
}
7892

@@ -94,4 +108,5 @@ public void shutdownNow() {
94108
public boolean isShutdown() {
95109
return shutdown;
96110
}
111+
97112
}

0 commit comments

Comments
 (0)