-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[improve][admin] PIP-416: Add a new topic method to implement trigger offload by size threshold #24420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[improve][admin] PIP-416: Add a new topic method to implement trigger offload by size threshold #24420
Conversation
@JunFu0814 Please add the following content to your PR description and select a checkbox:
|
public CompletableFuture<Void> triggerOffloadAsync(String topic, long sizeThreshold) { | ||
CompletableFuture<Void> future = new CompletableFuture<>(); | ||
try { | ||
PersistentTopicInternalStats stats = getInternalStats(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use getInternalStatsAsync
instead of getInternalStats
to avoid thread blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
Outdated
Show resolved
Hide resolved
private MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> ledgers, | ||
long sizeThreshold) { | ||
long suffixSize = 0L; | ||
|
||
ledgers = Lists.reverse(ledgers); | ||
long previousLedger = ledgers.get(0).ledgerId; | ||
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) { | ||
suffixSize += l.size; | ||
if (suffixSize > sizeThreshold) { | ||
return new MessageIdImpl(previousLedger, 0L, -1); | ||
} | ||
previousLedger = l.ledgerId; | ||
} | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to move to the broker side which can provide consistent behavior from the admin CLI and the admin REST API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @codelipenghui , It is a better way to move to the broker side, which can ensure consistent behavior of cli and clients of any language type, but of course this will also bring more workload. In the current pip, I will first ensure that cli and java client use the same findFirstLedgerWithinThreshold
logic. Please help review on this ff002be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but of course this will also bring more workload
While both approaches require iterating over ledgers, the scope of that iteration differs:
Client-side: You'd have to iterate over all ledgers to determine what needs offloading.
Broker-side: The broker only needs to iterate over the specific ledgers that are being offloaded, directly accessing data already in memory.
Even if the client pre-calculates message IDs and sends them to the broker, the broker still needs to iterate the ledger map and decide ledgers should be offloaded. The key is that the broker can do this much more efficiently, leveraging its existing in-memory structures without the constant creation and disposal of new objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the broker side already has most of the implementation
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Lines 2619 to 2696 in 73a4ae4
private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds, | |
CompletableFuture<Position> finalPromise) { | |
if (getOffloadPoliciesIfAppendable().isEmpty()) { | |
String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name); | |
finalPromise.completeExceptionally(new IllegalArgumentException(msg)); | |
return; | |
} | |
if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) { | |
String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and " | |
+ "[managedLedgerOffloadThresholdInSeconds] less than 0.", name); | |
finalPromise.completeExceptionally(new IllegalArgumentException(msg)); | |
return; | |
} | |
if (!offloadMutex.tryLock()) { | |
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise), | |
100, TimeUnit.MILLISECONDS); | |
return; | |
} | |
CompletableFuture<Position> unlockingPromise = new CompletableFuture<>(); | |
unlockingPromise.whenComplete((res, ex) -> { | |
offloadMutex.unlock(); | |
if (ex != null) { | |
finalPromise.completeExceptionally(ex); | |
} else { | |
finalPromise.complete(res); | |
} | |
}); | |
long sizeSummed = 0; | |
long toOffloadSize = 0; | |
long alreadyOffloadedSize = 0; | |
ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>(); | |
final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds); | |
for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) { | |
final LedgerInfo info = e.getValue(); | |
// Skip current active ledger, an active ledger can't be offloaded. | |
// Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger. | |
if (info.getTimestamp() == 0L) { | |
continue; | |
} | |
final long size = info.getSize(); | |
final long timestamp = info.getTimestamp(); | |
final long now = System.currentTimeMillis(); | |
sizeSummed += size; | |
final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete(); | |
if (alreadyOffloaded) { | |
alreadyOffloadedSize += size; | |
} else { | |
if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes) | |
|| (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) { | |
toOffloadSize += size; | |
toOffload.addFirst(info); | |
} | |
} | |
} | |
if (toOffload.size() > 0) { | |
log.info("[{}] Going to automatically offload ledgers {}" | |
+ ", total size = {}, already offloaded = {}, to offload = {}", | |
name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()), | |
sizeSummed, alreadyOffloadedSize, toOffloadSize); | |
offloadLoop(unlockingPromise, toOffload, PositionFactory.LATEST, Optional.empty()); | |
} else { | |
// offloadLoop will complete immediately with an empty list to offload | |
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, " | |
+ "threshold = [managedLedgerOffloadThresholdInBytes:{}, " | |
+ "managedLedgerOffloadThresholdInSeconds:{}]", | |
name, sizeSummed, alreadyOffloadedSize, offloadThresholdInBytes, | |
TimeUnit.MILLISECONDS.toSeconds(offloadTimeThresholdMillis)); | |
unlockingPromise.complete(PositionFactory.LATEST); | |
} | |
} |
There is no need to have duplicated codes for like findFirstLedgerWithinThreshold
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui Thank you for your suggestion, I will understand the logic here in depth frist
Main Issue: #24276
Motivation
For pip #24276 , add a new admin api for trigger offload with size threshold.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: