Skip to content

Fix TestIndexWriterWithThreads#testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce #14423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
guojialiang92 opened this issue Apr 1, 2025 · 1 comment
Labels

Comments

@guojialiang92
Copy link
Contributor

guojialiang92 commented Apr 1, 2025

Description

Description

I found that Test TestIndexWriterWithThreads#testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce may fail in rare cases. Exception information is as follows:

MockDirectoryWrapper: cannot close: there are still 9 open files: {_7.cfs=1, _0.cfs=1, _5.cfs=1, _2.cfs=1, _1.cfs=1, _6.cfs=1, _3.cfs=1, _4.cfs=1, _8.cfs=1}
	at __randomizedtesting.SeedInfo.seed([4D88B1144F0B21A3:1E330D289AB0246]:0)
	at org.apache.lucene.tests.store.MockDirectoryWrapper.close(MockDirectoryWrapper.java:876)
	at org.apache.lucene.index.TestIndexWriterWithThreads._testMultipleThreadsFailure(TestIndexWriterWithThreads.java:349)
	at org.apache.lucene.index.TestIndexWriterWithThreads.testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce(TestIndexWriterWithThreads.java:502)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1758)
	at com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:946)
	at com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:982)
	at com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:996)
	at org.apache.lucene.tests.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:48)
	at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
	at org.apache.lucene.tests.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:45)
	at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
	at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:390)
	at com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:843)
	at com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:490)
	at com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:955)
	at com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:840)
	at com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:891)
	at com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:902)
	at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
	at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at org.apache.lucene.tests.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:38)
	at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
	at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
	at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at org.apache.lucene.tests.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
	at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
	at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
	at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
	at org.apache.lucene.tests.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:47)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:390)
	at com.carrotsearch.randomizedtesting.ThreadLeakControl.lambda$forkTimeoutingTask$0(ThreadLeakControl.java:850)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: unclosed IndexInput: _8.cfs
	at org.apache.lucene.tests.store.MockDirectoryWrapper.addFileHandle(MockDirectoryWrapper.java:783)
	at org.apache.lucene.tests.store.MockDirectoryWrapper.openInput(MockDirectoryWrapper.java:835)
	at org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsReader.<init>(Lucene90BlockTreeTermsReader.java:136)
	at org.apache.lucene.codecs.lucene99.Lucene99PostingsFormat.fieldsProducer(Lucene99PostingsFormat.java:428)
	at org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsReader.<init>(PerFieldPostingsFormat.java:328)
	at org.apache.lucene.codecs.perfield.PerFieldPostingsFormat.fieldsProducer(PerFieldPostingsFormat.java:390)
	at org.apache.lucene.index.SegmentCoreReaders.<init>(SegmentCoreReaders.java:98)
	at org.apache.lucene.index.SegmentReader.<init>(SegmentReader.java:95)
	at org.apache.lucene.index.ReadersAndUpdates.getReader(ReadersAndUpdates.java:179)
	at org.apache.lucene.index.BufferedUpdatesStream$SegmentState.<init>(BufferedUpdatesStream.java:286)
	at org.apache.lucene.index.IndexWriter.openSegmentStates(IndexWriter.java:6481)
	at org.apache.lucene.index.IndexWriter.forceApply(IndexWriter.java:6255)
	at org.apache.lucene.index.IndexWriter.tryApply(IndexWriter.java:6189)
	at org.apache.lucene.index.IndexWriter.lambda$publishFrozenUpdates$12(IndexWriter.java:2814)
	at org.apache.lucene.index.IndexWriter$EventQueue.processEventsInternal(IndexWriter.java:336)
	at org.apache.lucene.index.IndexWriter$EventQueue.processEvents(IndexWriter.java:325)
	at org.apache.lucene.index.IndexWriter.processEvents(IndexWriter.java:6032)
	at org.apache.lucene.index.IndexWriter.maybeProcessEvents(IndexWriter.java:6025)
	at org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1552)
	at org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1843)
	at org.apache.lucene.index.TestIndexWriterWithThreads$IndexerThread.run(TestIndexWriterWithThreads.java:98)

I looked up some other related issues and finally located the problem. At the same time, I also added a test that can stably reproduce the problem.

Analysis

The root cause is that IndexWriter#ensureOpen() throws AlreadyClosedException when IndexWriter#closed or IndexWriter#closing is true. In the test code, after throwing AlreadyClosedException when executing IndexWriter#commit, the IndexWriter#close is not called again to ensure that IndexWriter closes normally.

try {
  writer.commit();
  writer.close();
  success = true;
} catch (
    @SuppressWarnings("unused")
    AlreadyClosedException ace) {
  // OK: abort closes the writer
  assertTrue(writer.isDeleterClosed());
} catch (
    @SuppressWarnings("unused")
    IOException ioe) {
  writer.rollback();
  failure.clearDoFail();
}

To Reproduce

In order to stabilize the reproduce problem, I added a test TestIndexWriterWithThreads#testIOExceptionWithMergeNotEndLongTime.
At the same time, in order to perform synchronous control in testing, IndexWriter#maybeCloseOnTragicEvent needs to be changed to default access permissions.

public void testIOExceptionWithMergeNotEndLongTime() throws Exception {
  _testMultipleThreadsFailureWithMergeNotEndLongTime(new FailOnlyOnAbortOrFlush(true));
}

public void _testMultipleThreadsFailureWithMergeNotEndLongTime(
    MockDirectoryWrapper.Failure failure) throws Exception {
  CountDownLatch mergeCloseCountDownLatch = new CountDownLatch(1);
  CountDownLatch updateDocumentFailCountDownLatch = new CountDownLatch(1);
  CountDownLatch mergeCountDownLatch = new CountDownLatch(1);
  CountDownLatch indexWriterCloseCountDownLatch = new CountDownLatch(1);
  ConcurrentMergeScheduler mergeScheduler =
      new ConcurrentMergeScheduler() {
        @Override
        public void close() throws IOException {
          try {
            mergeCloseCountDownLatch.countDown();
            // Let merge close not end for a long time, causing AlreadyClosedException exception
            // thrown when writer.commit()
            indexWriterCloseCountDownLatch.await();
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
          super.close();
        }
      };
  // Prevent waiting in ConcurrentMergeScheduler#maybeStall from causing test deadlock
  mergeScheduler.setMaxMergesAndThreads(Integer.MAX_VALUE, Integer.MAX_VALUE);

  int NUM_THREADS = 1;

  for (int iter = 0; iter < 1; iter++) {
    if (VERBOSE) {
      System.out.println("TEST: iter=" + iter);
    }
    MockDirectoryWrapper dir = newMockDirectory();

    IndexWriterConfig indexWriterConfig =
        newIndexWriterConfig(new MockAnalyzer(random()))
            .setMaxBufferedDocs(2)
            .setMergeScheduler(mergeScheduler)
            .setMergePolicy(newLogMergePolicy(4))
            .setCommitOnClose(false);
    indexWriterConfig.setReaderPooling(true);
    IndexWriter writer =
        new IndexWriter(dir, indexWriterConfig) {

          @Override
          void maybeCloseOnTragicEvent() throws IOException {
            boolean isMerge = false;
            for (StackTraceElement element : Thread.currentThread().getStackTrace()) {
              final String className = element.getClassName();
              final String methodName = element.getMethodName();
              if (className.equals(IndexWriter.class.getName()) && methodName.equals("merge")) {
                isMerge = true;
                break;
              }
            }
            if (isMerge == false) {
              try {
                updateDocumentFailCountDownLatch.countDown();
                mergeCloseCountDownLatch.await();
              } catch (InterruptedException e) {
                throw new RuntimeException(e);
              }
            }
            super.maybeCloseOnTragicEvent();
          }

          @Override
          protected void merge(MergePolicy.OneMerge merge) throws IOException {
            try {
              mergeCountDownLatch.countDown();
              updateDocumentFailCountDownLatch.await();
            } catch (InterruptedException e) {
              throw new RuntimeException(e);
            }
            super.merge(merge);
          }

          @Override
          public void close() throws IOException {
            indexWriterCloseCountDownLatch.countDown();
            super.close();
          }
        };
    ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();

    CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
    IndexerThread[] threads = new IndexerThread[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++) {
      threads[i] = new IndexerThread(writer, true, syncStart);
      threads[i].start();
    }
    syncStart.await();
    mergeCountDownLatch.await();
    dir.failOn(failure);
    failure.setDoFail();

    for (int i = 0; i < NUM_THREADS; i++) {
      threads[i].join();
      assertTrue("hit unexpected Throwable", threads[i].error == null);
    }

    boolean success = false;
    try {
      writer.commit();
      writer.close();
      success = true;
    } catch (
        @SuppressWarnings("unused")
        AlreadyClosedException ace) {
      // OK: abort closes the writer
      assertTrue(writer.isDeleterClosed());
    } catch (
        @SuppressWarnings("unused")
        IOException ioe) {
      writer.rollback();
      failure.clearDoFail();
    }
    if (VERBOSE) {
      System.out.println("TEST: success=" + success);
    }

    if (success) {
      IndexReader reader = DirectoryReader.open(dir);
      final Bits delDocs = MultiBits.getLiveDocs(reader);
      StoredFields storedFields = reader.storedFields();
      TermVectors termVectors = reader.termVectors();
      for (int j = 0; j < reader.maxDoc(); j++) {
        if (delDocs == null || !delDocs.get(j)) {
          storedFields.document(j);
          termVectors.get(j);
        }
      }
      reader.close();
    }

    try {
      dir.close();
    } finally {
      if (indexWriterCloseCountDownLatch.getCount() == 1) {
        // Avoid merge thread leakage when reproducing problems (without calling writer.close) in
        // finally
        indexWriterCloseCountDownLatch.countDown();
      }
    }
  }
}

The code will be executed in the following order:

  1. Set LiveIndexWriterConfig#readerPooling to true to ensure that ReaderPool#release does not release ReadersAndUpdates.
  2. Start an IndexerThread and start writing data.
  3. Wait for a merge thread to start working (using mergeCountDownLatch), then simulate write failure via MockDirectoryWrapper#failOn.
  4. Block the execution of the merge thread until IndexWriter#updateDocument throws an exception due to write failure (using updateDocumentFailCountDownLatch).
  5. Merge thread and IW thread will both call IndexWriter#maybeCloseOnTragicEvent, and I control to let Merge thread execute IndexWriter#rollbackInternalNoCommit first, and the IW thread will skip it. (using mergeCloseCountDownLatch)
  6. Because the execution of the merge thread is asynchronous, the test will continue until IndexWriter#commit is called and a AlreadyClosedException is throw.
  7. The closure of IndexInput needs to wait until the execution of ReaderPool#close in IndexWriter#rollbackInternalNoCommit. In order to prevent IndexInput from being closed before calling MockDirectoryWrapper#close, I let the ConcurrentMergeScheduler#close to wait until IndexWriter#close executes. (using indexWriterCloseCountDownLatch)
  8. Because IndexWriter#commit threw AlreadyClosedException, the merge thread did not end when executing MockDirectoryWrapper#close, resulting in the existence of an open file.

Solution

The closure of IndexInput needs to wait until the execution of ReaderPool#close in IndexWriter#rollbackInternalNoCommit. After calling IndexWriter#commit to throw AlreadyClosedException, It is necessary to call IndexWriter#close in finally.

try {
  writer.commit();
  writer.close();
  success = true;
} catch (
        @SuppressWarnings("unused")
        AlreadyClosedException ace) {
  // OK: abort closes the writer
  assertTrue(writer.isDeleterClosed());
} catch (
        @SuppressWarnings("unused")
        IOException ioe) {
  writer.rollback();
  failure.clearDoFail();
} finally {
  writer.close();
}

Related issues

10930, 13552

Version and environment details

No response

@guojialiang92 guojialiang92 changed the title fix TestIndexWriterWithThreads#testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce Fix TestIndexWriterWithThreads#testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce Apr 8, 2025
@guojialiang92
Copy link
Contributor Author

Hi @benwtrent
Can I invite you to review this PR?
(I saw that you have commented on issue [13552] before.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant