Skip to content

Commit 61b7970

Browse files
committed
Made the necessary changes according to the reviews
1 parent 2d96156 commit 61b7970

File tree

2 files changed

+49
-27
lines changed

2 files changed

+49
-27
lines changed

lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,48 +10,50 @@
1010
*/
1111
public class MultiTenantMergeScheduler extends MergeScheduler {
1212

13-
// Shared global thread pool with lazy initialization
14-
private static class LazyHolder {
15-
static final ExecutorService MERGE_THREAD_POOL =
16-
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
17-
}
13+
// Shared global thread pool
14+
private static final ExecutorService MERGE_THREAD_POOL = Executors.newFixedThreadPool(
15+
Runtime.getRuntime().availableProcessors() / 2
16+
);
1817

19-
// Use getMergeThreadPool() instead of direct access
18+
// Track active merges per writer
19+
private final List<Future<?>> activeMerges = Collections.synchronizedList(new ArrayList<>());
2020

2121
@Override
2222
public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger) throws IOException {
23-
while (mergeSource.hasPendingMerges()) { // Use hasPendingMerges() instead of relying on null check
23+
while (true) {
2424
MergePolicy.OneMerge merge = mergeSource.getNextMerge();
25-
if (merge == null) {
26-
break; // Explicitly exit if no merge is available
27-
}
25+
if (merge == null) break; // No more merges
2826

29-
// Submit merge task to the shared thread pool
30-
MERGE_THREAD_POOL.submit(() -> {
27+
// Submit merge task and track future
28+
Future<?> future = MERGE_THREAD_POOL.submit(() -> {
3129
try {
3230
mergeSource.merge(merge);
3331
} catch (IOException e) {
3432
throw new RuntimeException("Merge operation failed", e);
3533
}
3634
});
3735

36+
activeMerges.add(future);
37+
3838
// Cleanup completed merges
3939
activeMerges.removeIf(Future::isDone);
4040
}
4141
}
4242

43+
private final ConcurrentHashMap<IndexWriter, List<Merge>> activeMerges = new ConcurrentHashMap<>();
44+
4345
@Override
4446
public void close() throws IOException {
45-
// Wait for all running merges to complete
46-
for (Future<?> future : activeMerges) {
47-
try {
48-
future.get(); // Wait for completion
49-
} catch (Exception e) {
50-
throw new IOException("Error while waiting for merges to finish", e);
51-
}
47+
IndexWriter currentWriter = getCurrentIndexWriter(); // Method to get the calling writer
48+
List<Merge> merges = activeMerges.getOrDefault(currentWriter, Collections.emptyList());
49+
50+
for (Merge merge : merges) {
51+
merge.waitForCompletion(); // Only wait for merges related to this writer
5252
}
53-
activeMerges.clear();
53+
54+
activeMerges.remove(currentWriter); // Cleanup after closing
5455
}
56+
5557

5658
// Providing a method to shut down the global thread pool gracefully
5759
public static void shutdownThreadPool() {

lucene/core/src/test/org/apache/lucene/index/TestMultiTenantMergeScheduler.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33
import org.apache.lucene.store.Directory;
44
import org.apache.lucene.store.RAMDirectory;
55
import org.apache.lucene.analysis.standard.StandardAnalyzer;
6+
import org.apache.lucene.analysis.MockAnalyzer;
67
import org.apache.lucene.document.Document;
7-
import org.apache.lucene.document.TextField;
8+
import org.apache.lucene.index.IndexWriterConfig;
89
import org.apache.lucene.util.LuceneTestCase;
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.ExecutorService;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.Future;
914

1015
public class TestMultiTenantMergeScheduler extends LuceneTestCase {
1116

@@ -57,17 +62,32 @@ public void testConcurrentMerging() throws Exception {
5762
}
5863
writer.commit();
5964

60-
long startTime = System.currentTimeMillis();
61-
writer.forceMerge(1);
62-
long endTime = System.currentTimeMillis();
65+
CountDownLatch latch = new CountDownLatch(2);
66+
ExecutorService executor = Executors.newFixedThreadPool(2);
67+
68+
Future<?> mergeTask1 = executor.submit(() -> {
69+
latch.countDown();
70+
try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
71+
writer.forceMerge(1);
72+
});
73+
74+
Future<?> mergeTask2 = executor.submit(() -> {
75+
latch.countDown();
76+
try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
77+
writer.forceMerge(1);
78+
});
79+
80+
mergeTask1.get();
81+
mergeTask2.get();
82+
executor.shutdown();
6383

6484
writer.close();
6585
scheduler.close();
6686
MultiTenantMergeScheduler.shutdownThreadPool();
6787

68-
// Check if merging took less time than sequential execution would
69-
assertTrue("Merges did not happen concurrently!", (endTime - startTime) < 5000);
88+
// Assert that at least two merges ran concurrently
89+
assertTrue("Expected concurrent merges", scheduler.getActiveMergesCount() > 1);
7090

7191
dir.close();
7292
}
73-
}
93+
}

0 commit comments

Comments
 (0)