Skip to content

Commit 965743c

Browse files
KAFKA-19131: Adjust remote storage reader thread maximum pool size to avoid illegal argument (#19532)
The remote storage reader thread pool use same count for both maximum and core size. If users adjust the pool size larger than original value, it throws `IllegalArgumentException`. Updated both value to fix the issue. --------- Signed-off-by: PoAn Yang <[email protected]> Reviewers: Kamal Chandraprakash <[email protected]>
1 parent b3161ba commit 965743c

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,15 @@ public void resizeExpirationThreadPool(int newSize) {
292292

293293
public void resizeReaderThreadPool(int newSize) {
294294
int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
295+
int currentMaximumSize = remoteStorageReaderThreadPool.getMaximumPoolSize();
295296
LOGGER.info("Updating remote reader thread pool size from {} to {}", currentSize, newSize);
296-
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
297+
if (newSize > currentMaximumSize) {
298+
remoteStorageReaderThreadPool.setMaximumPoolSize(newSize);
299+
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
300+
} else {
301+
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
302+
remoteStorageReaderThreadPool.setMaximumPoolSize(newSize);
303+
}
297304
}
298305

299306
private void removeMetrics() {
@@ -302,6 +309,11 @@ private void removeMetrics() {
302309
remoteStorageReaderThreadPool.removeMetrics();
303310
}
304311

312+
// Visible for testing
313+
int readerThreadPoolSize() {
314+
return remoteStorageReaderThreadPool.getCorePoolSize();
315+
}
316+
305317
/**
306318
* Returns the timeout for the RLM Tasks to wait for the quota to be available
307319
*/

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -3753,6 +3753,15 @@ public void testMonitorableRemoteLogStorageManager() throws IOException {
37533753
}
37543754
}
37553755

3756+
@Test
3757+
void testUpdateRemoteStorageReaderThreads() {
3758+
assertEquals(10, remoteLogManager.readerThreadPoolSize());
3759+
remoteLogManager.resizeReaderThreadPool(6);
3760+
assertEquals(6, remoteLogManager.readerThreadPoolSize());
3761+
remoteLogManager.resizeReaderThreadPool(12);
3762+
assertEquals(12, remoteLogManager.readerThreadPoolSize());
3763+
}
3764+
37563765
private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException {
37573766
byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
37583767
Compression compression = Compression.NONE;

0 commit comments

Comments
 (0)