Skip to content

[improve][admin] PIP-416: Add a new topic method to implement trigger offload by size threshold #24420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

JunFu0814
Copy link
Contributor

@JunFu0814 JunFu0814 commented Jun 17, 2025

Main Issue: #24276

Motivation

For pip #24276 , add a new admin api for trigger offload with size threshold.

Modifications

  1. Add new admin apis for trigger offload with size threshold.
  2. Optimize formatting.
  3. Enhance testing.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

Copy link

@JunFu0814 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@JunFu0814 JunFu0814 changed the title [feat][client] clinet apitrigger offload with size threshold [feat][admin] new admin api for trigger offload with size threshold Jun 17, 2025
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Jun 17, 2025
@codelipenghui codelipenghui added this to the 4.1.0 milestone Jun 17, 2025
@codelipenghui codelipenghui added type/feature The PR added a new feature or issue requested a new feature area/broker area/tieredstorage labels Jun 17, 2025
@JunFu0814 JunFu0814 removed their assignment Jun 18, 2025
public CompletableFuture<Void> triggerOffloadAsync(String topic, long sizeThreshold) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
PersistentTopicInternalStats stats = getInternalStats(topic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use getInternalStatsAsync instead of getInternalStats to avoid thread blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodece please review again bc9d735

@nodece nodece changed the title [feat][admin] new admin api for trigger offload with size threshold [improve][admin] PIP-416: Add a new topic method to implement trigger offload by size threshold Jun 18, 2025
Comment on lines 1261 to 1275
private MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> ledgers,
long sizeThreshold) {
long suffixSize = 0L;

ledgers = Lists.reverse(ledgers);
long previousLedger = ledgers.get(0).ledgerId;
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
suffixSize += l.size;
if (suffixSize > sizeThreshold) {
return new MessageIdImpl(previousLedger, 0L, -1);
}
previousLedger = l.ledgerId;
}
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move to the broker side which can provide consistent behavior from the admin CLI and the admin REST API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @codelipenghui , It is a better way to move to the broker side, which can ensure consistent behavior of cli and clients of any language type, but of course this will also bring more workload. In the current pip, I will first ensure that cli and java client use the same findFirstLedgerWithinThreshold logic. Please help review on this ff002be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but of course this will also bring more workload

While both approaches require iterating over ledgers, the scope of that iteration differs:

Client-side: You'd have to iterate over all ledgers to determine what needs offloading.

Broker-side: The broker only needs to iterate over the specific ledgers that are being offloaded, directly accessing data already in memory.

Even if the client pre-calculates message IDs and sends them to the broker, the broker still needs to iterate the ledger map and decide ledgers should be offloaded. The key is that the broker can do this much more efficiently, leveraging its existing in-memory structures without the constant creation and disposal of new objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the broker side already has most of the implementation

private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
CompletableFuture<Position> finalPromise) {
if (getOffloadPoliciesIfAppendable().isEmpty()) {
String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
finalPromise.completeExceptionally(new IllegalArgumentException(msg));
return;
}
if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+ "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
finalPromise.completeExceptionally(new IllegalArgumentException(msg));
return;
}
if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise),
100, TimeUnit.MILLISECONDS);
return;
}
CompletableFuture<Position> unlockingPromise = new CompletableFuture<>();
unlockingPromise.whenComplete((res, ex) -> {
offloadMutex.unlock();
if (ex != null) {
finalPromise.completeExceptionally(ex);
} else {
finalPromise.complete(res);
}
});
long sizeSummed = 0;
long toOffloadSize = 0;
long alreadyOffloadedSize = 0;
ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
final LedgerInfo info = e.getValue();
// Skip current active ledger, an active ledger can't be offloaded.
// Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
if (info.getTimestamp() == 0L) {
continue;
}
final long size = info.getSize();
final long timestamp = info.getTimestamp();
final long now = System.currentTimeMillis();
sizeSummed += size;
final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
if (alreadyOffloaded) {
alreadyOffloadedSize += size;
} else {
if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
|| (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {
toOffloadSize += size;
toOffload.addFirst(info);
}
}
}
if (toOffload.size() > 0) {
log.info("[{}] Going to automatically offload ledgers {}"
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
offloadLoop(unlockingPromise, toOffload, PositionFactory.LATEST, Optional.empty());
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, "
+ "threshold = [managedLedgerOffloadThresholdInBytes:{}, "
+ "managedLedgerOffloadThresholdInSeconds:{}]",
name, sizeSummed, alreadyOffloadedSize, offloadThresholdInBytes,
TimeUnit.MILLISECONDS.toSeconds(offloadTimeThresholdMillis));
unlockingPromise.complete(PositionFactory.LATEST);
}
}

There is no need to have duplicated codes for like findFirstLedgerWithinThreshold.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Thank you for your suggestion, I will understand the logic here in depth frist

@JunFu0814 JunFu0814 requested a review from nodece June 20, 2025 06:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker area/tieredstorage doc-not-needed Your PR changes do not impact docs type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants