Skip to content

[improve][misc] Sync commits from apache into 3.1_ds #442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2349,7 +2349,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
PositionImpl newMarkDeletePosition = null;

lock.writeLock().lock();

boolean skipMarkDeleteBecauseAckedNothing = false;
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
Expand Down Expand Up @@ -2377,7 +2377,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
continue;
}
if (position.ackSet == null) {
if (position.ackSet == null || position.ackSet.length == 0) {
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
}
Expand Down Expand Up @@ -2418,6 +2418,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb

if (individualDeletedMessages.isEmpty()) {
// No changes to individually deleted messages, so nothing to do at this point
skipMarkDeleteBecauseAckedNothing = true;
return;
}

Expand All @@ -2435,6 +2436,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb

if (range == null) {
// The set was completely cleaned up now
skipMarkDeleteBecauseAckedNothing = true;
return;
}

Expand All @@ -2461,9 +2463,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
callback.deleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
boolean empty = individualDeletedMessages.isEmpty();
lock.writeLock().unlock();
if (empty) {
if (skipMarkDeleteBecauseAckedNothing) {
callback.deleteComplete(ctx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2869,7 +2869,15 @@ public void operationComplete(Void result, Stat stat) {
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
invalidateReadHandle(ls.getLedgerId());
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()).thenAccept(__ -> {
log.info("[{}] Deleted and invalidated offloaded ledger {} from bookkeeper - size: {}",
name, ls.getLedgerId(), ls.getSize());
}).exceptionally(ex -> {
log.error("[{}] Failed to delete offloaded ledger {} from bookkeeper - size: {}",
name, ls.getLedgerId(), ls.getSize(), ex);
return null;
});
}
promise.complete(null);
}
Expand Down Expand Up @@ -3090,8 +3098,8 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
}
}

private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
private CompletableFuture<Void> asyncDeleteLedgerFromBookKeeper(long ledgerId) {
return asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
}

private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
Expand All @@ -3108,22 +3116,32 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
}
}

private void asyncDeleteLedger(long ledgerId, long retry) {
if (retry <= 0) {
log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId);
return;
}
private CompletableFuture<Void> asyncDeleteLedger(long ledgerId, long retry) {
CompletableFuture<Void> future = new CompletableFuture<>();
asyncDeleteLedgerWithRetry(future, ledgerId, retry);
return future;
}

private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future, long ledgerId, long retry) {
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
future.complete(null);
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
if (retry <= 1) {
// The latest once of retry has failed
log.warn("[{}] Failed to delete ledger after retries {}, code: {}", name, ledgerId, rc);
future.completeExceptionally(BKException.create(rc));
return;
}
scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted ledger {}", name, ledgerId);
}
future.complete(null);
}
}, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5218,5 +5218,48 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
assertEquals(positionRef4.get(), position4);
}

@Test
public void testDeleteBatchedMessageWithEmptyAckSet() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(false);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testDeleteBatchedMessageWithEmptyAckSet",
managedLedgerConfig);
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1");
Position position = ml.addEntry(new byte[1]);
Position positionWithEmptyAckSet =
new PositionImpl(position.getLedgerId(), position.getEntryId(), new long[]{});
cursor.delete(positionWithEmptyAckSet);
assertEquals(cursor.markDeletePosition, position);
ml.delete();
}

@Test
public void testCallbackTimes() throws Exception {
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testCallbackTimes");
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1");
Position position1 = ml.addEntry(new byte[1]);
Position position2 = ml.addEntry(new byte[2]);
AtomicInteger executedCallbackTimes = new AtomicInteger();
cursor.asyncDelete(Arrays.asList(position1, position2), new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
executedCallbackTimes.incrementAndGet();
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
executedCallbackTimes.incrementAndGet();
}
}, new Object());
// Verify that the executed count of callback is "1".
Awaitility.await().untilAsserted(() -> {
assertTrue(executedCallbackTimes.get() > 0);
});
Thread.sleep(2000);
assertEquals(executedCallbackTimes.get(), 1);
// cleanup.
ml.delete();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,33 @@
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;

import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -207,6 +211,125 @@ public void testLaggedDelete() throws Exception {
assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
}

@Test
public void testGetReadLedgerHandleAfterTrimOffloadedLedgers() throws Exception {
// Create managed ledger.
final long offloadThresholdSeconds = 5;
final long offloadDeletionLagInSeconds = 1;
OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInSeconds * 1000);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(offloadThresholdSeconds);
offloader.getOffloadPolicies().setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ml =
(ManagedLedgerImpl)factory.open("testGetReadLedgerHandleAfterTrimOffloadedLedgers", config);
ml.openCursor("c1");

// Write entries.
int i = 0;
for (; i < 35; i++) {
String content = "entry-" + i;
ml.addEntry(content.getBytes());
}
Assert.assertEquals(ml.getLedgersInfoAsList().size(), 4);
long ledger1 = ml.getLedgersInfoAsList().get(0).getLedgerId();
long ledger2 = ml.getLedgersInfoAsList().get(1).getLedgerId();
long ledger3 = ml.getLedgersInfoAsList().get(2).getLedgerId();
long ledger4 = ml.getLedgersInfoAsList().get(3).getLedgerId();

// Offload ledgers.
Thread.sleep(offloadThresholdSeconds * 2 * 1000);
CompletableFuture<PositionImpl> offloadFuture = new CompletableFuture<PositionImpl>();
ml.maybeOffloadInBackground(offloadFuture);
offloadFuture.join();

// Cache ledger handle.
CountDownLatch readCountDownLatch = new CountDownLatch(4);
AsyncCallbacks.ReadEntryCallback readCb = new AsyncCallbacks.ReadEntryCallback(){

@Override
public void readEntryComplete(Entry entry, Object ctx) {
readCountDownLatch.countDown();
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
readCountDownLatch.countDown();
}
};
ml.asyncReadEntry(PositionImpl.get(ledger1, 0), readCb, null);
ml.asyncReadEntry(PositionImpl.get(ledger2, 0), readCb, null);
ml.asyncReadEntry(PositionImpl.get(ledger3, 0), readCb, null);
ml.asyncReadEntry(PositionImpl.get(ledger4, 0), readCb, null);
readCountDownLatch.await();
ReadHandle originalReadHandle4 = ml.getLedgerHandle(ledger4).join();

// Trim offloaded BK ledger handles.
Thread.sleep(offloadDeletionLagInSeconds * 2 * 1000);
CompletableFuture<Position> trimLedgerFuture = new CompletableFuture<Position>();
ml.internalTrimLedgers(false, trimLedgerFuture);
trimLedgerFuture.join();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgerInfo(ledger1).get();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgerInfo(ledger2).get();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo3 = ml.getLedgerInfo(ledger3).get();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo4 = ml.getLedgerInfo(ledger4).get();
Assert.assertTrue(ledgerInfo1.hasOffloadContext() && ledgerInfo1.getOffloadContext().getBookkeeperDeleted());
Assert.assertTrue(ledgerInfo2.hasOffloadContext() && ledgerInfo2.getOffloadContext().getBookkeeperDeleted());
Assert.assertTrue(ledgerInfo3.hasOffloadContext() && ledgerInfo3.getOffloadContext().getBookkeeperDeleted());
Assert.assertFalse(ledgerInfo4.hasOffloadContext() || ledgerInfo4.getOffloadContext().getBookkeeperDeleted());

Awaitility.await().untilAsserted(() -> {
try {
factory.getBookKeeper().openLedger(ledger3, ml.digestType, ml.config.getPassword());
Assert.fail("Should fail: the ledger has been deleted");
} catch (BKException.BKNoSuchLedgerExistsException ex) {
// Expected.
}
try {
factory.getBookKeeper().openLedger(ledger2, ml.digestType, ml.config.getPassword());
Assert.fail("Should fail: the ledger has been deleted");
} catch (BKException.BKNoSuchLedgerExistsException ex) {
// Expected.
}
try {
factory.getBookKeeper().openLedger(ledger1, ml.digestType, ml.config.getPassword());
Assert.fail("Should fail: the ledger has been deleted");
} catch (BKException.BKNoSuchLedgerExistsException ex) {
// Expected.
}
});

// Verify: "ml.getLedgerHandle" returns a correct ledger handle.
ReadHandle currentReadHandle4 = ml.getLedgerHandle(ledger4).join();
Assert.assertEquals(currentReadHandle4, originalReadHandle4);
try {
ml.getLedgerHandle(ledger3).join();
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
} catch (Exception ex) {
Assert.assertTrue(ex.getCause().getCause().getMessage()
.contains("MockLedgerOffloader does not support read"));
}
try {
ml.getLedgerHandle(ledger2).join();
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
} catch (Exception ex) {
Assert.assertTrue(ex.getCause().getCause().getMessage()
.contains("MockLedgerOffloader does not support read"));
}
try {
ml.getLedgerHandle(ledger1).join();
Assert.fail("should get a failure: MockLedgerOffloader does not support read");
} catch (Exception ex) {
Assert.assertTrue(ex.getCause().getCause().getMessage()
.contains("MockLedgerOffloader does not support read"));
}
}

@Test(timeOut = 5000)
public void testFileSystemOffloadDeletePath() throws Exception {
MockFileSystemLedgerOffloader offloader = new MockFileSystemLedgerOffloader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
Map<String, String> offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
promise.completeExceptionally(new UnsupportedOperationException("MockLedgerOffloader does not support read"));
return promise;
}

Expand Down
25 changes: 25 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,31 @@ flexible messaging model and an intuitive client API.</description>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>set-system-properties</goal>
</goals>
<configuration>
<properties combine.children="append">
<!-- for lightproto (protostuff) -->
<property>
<name>proto_path</name>
<value>${pulsar.basedir}</value>
</property>
<property>
<name>proto_search_strategy</name>
<value>2</value>
</property>
</properties>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
Expand Down
24 changes: 0 additions & 24 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -625,30 +625,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>set-system-properties</goal>
</goals>
<configuration>
<properties>
<property>
<name>proto_path</name>
<value>${project.parent.basedir}</value>
</property>
<property>
<name>proto_search_strategy</name>
<value>2</value>
</property>
</properties>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.splunk.lightproto</groupId>
<artifactId>lightproto-maven-plugin</artifactId>
Expand Down
Loading