Skip to content

Commit 17f04ef

Browse files
committed
[improve][broker] Make maxBatchDeletedIndexToPersist configurable and document other related configs (#24392)
(cherry picked from commit d1bca65)
1 parent 31bae69 commit 17f04ef

File tree

5 files changed

+73
-18
lines changed

5 files changed

+73
-18
lines changed

conf/broker.conf

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,6 +1228,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000
12281228
# Max time before triggering a rollover on a cursor ledger
12291229
managedLedgerCursorRolloverTimeInSeconds=14400
12301230

1231+
# Maximum amount of memory used hold data read from storage (or from the cache).
1232+
# This mechanism prevents the broker to have too many concurrent
1233+
# reads from storage and fall into Out of Memory errors in case
1234+
# of multiple concurrent reads to multiple concurrent consumers.
1235+
# Set 0 in order to disable the feature.
1236+
#
1237+
managedLedgerMaxReadsInFlightSizeInMB=0
1238+
12311239
# Max number of "acknowledgment holes" that are going to be persistently stored.
12321240
# When acknowledging out of order, a consumer will leave holes that are supposed
12331241
# to be quickly filled by acking all the messages. The information of which
@@ -1237,13 +1245,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400
12371245
# crashes.
12381246
managedLedgerMaxUnackedRangesToPersist=10000
12391247

1240-
# Maximum amount of memory used hold data read from storage (or from the cache).
1241-
# This mechanism prevents the broker to have too many concurrent
1242-
# reads from storage and fall into Out of Memory errors in case
1243-
# of multiple concurrent reads to multiple concurrent consumers.
1244-
# Set 0 in order to disable the feature.
1245-
#
1246-
managedLedgerMaxReadsInFlightSizeInMB=0
1248+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
1249+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
1250+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
1251+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
1252+
# deleted indexes will be cleared while redelivering the messages to consumers.
1253+
managedLedgerMaxBatchDeletedIndexToPersist=10000
1254+
1255+
# When storing acknowledgement state, choose a more compact serialization format that stores
1256+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
1257+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
1258+
managedLedgerPersistIndividualAckAsLongArray=false
1259+
1260+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
1261+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
1262+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
1263+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
12471264

12481265
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
12491266
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
@@ -1696,9 +1713,6 @@ narExtractionDirectory=
16961713
# Maximum prefetch rounds for ledger reading for offloading
16971714
managedLedgerOffloadPrefetchRounds=1
16981715

1699-
# Use Open Range-Set to cache unacked messages
1700-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
1701-
17021716
# For Amazon S3 ledger offload, AWS region
17031717
s3ManagedLedgerOffloadRegion=
17041718

conf/standalone.conf

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048
791791
# crashes.
792792
managedLedgerMaxUnackedRangesToPersist=10000
793793

794+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
795+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
796+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
797+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
798+
# deleted indexes will be cleared while redelivering the messages to consumers.
799+
managedLedgerMaxBatchDeletedIndexToPersist=10000
800+
801+
# When storing acknowledgement state, choose a more compact serialization format that stores
802+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
803+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
804+
managedLedgerPersistIndividualAckAsLongArray=false
805+
806+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
807+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
808+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
809+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
810+
794811
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
795812
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
796813
# MetadataStore.
@@ -825,9 +842,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000
825842
# Maximum backlog entry difference to prevent caching entries that can't be reused.
826843
managedLedgerMaxBacklogBetweenCursorsForCaching=1000
827844

828-
# Use Open Range-Set to cache unacked messages
829-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
830-
831845
# Managed ledger prometheus stats latency rollover seconds (default: 60s)
832846
managedLedgerPrometheusStatsLatencyRolloverSeconds=60
833847

@@ -1296,3 +1310,4 @@ delayedDeliveryMaxNumBuckets=-1
12961310

12971311
# Whether retain null-key message during topic compaction
12981312
topicCompactionRetainNullKey=true
1313+

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,16 @@ public int getMaxBatchDeletedIndexToPersist() {
487487
return maxBatchDeletedIndexToPersist;
488488
}
489489

490+
/**
491+
* Set max batch deleted index that will be persisted and recovered.
492+
*
493+
* @param maxBatchDeletedIndexToPersist
494+
* max batch deleted index that will be persisted and recovered.
495+
*/
496+
public void setMaxBatchDeletedIndexToPersist(int maxBatchDeletedIndexToPersist) {
497+
this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist;
498+
}
499+
490500
public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() {
491501
return persistentUnackedRangesWithMultipleEntriesEnabled;
492502
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2152,10 +2152,22 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
21522152
+ " will only be tracked in memory and messages will be redelivered in case of"
21532153
+ " crashes.")
21542154
private int managedLedgerMaxUnackedRangesToPersist = 10000;
2155-
@FieldContext(
2156-
category = CATEGORY_STORAGE_ML,
2157-
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
2155+
2156+
@FieldContext(category = CATEGORY_STORAGE_ML,
2157+
doc = "Maximum number of partially acknowledged batch messages per subscription that will have their batch "
2158+
+ "deleted indexes persisted. Batch deleted index state is handled when "
2159+
+ "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n"
2160+
+ "When this limit is exceeded, remaining batch message containing the batch deleted indexes will "
2161+
+ "only be tracked in memory. In case of broker restarts or load balancing events, the batch "
2162+
+ "deleted indexes will be cleared while redelivering the messages to consumers.")
2163+
private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
2164+
2165+
@FieldContext(category = CATEGORY_STORAGE_ML,
2166+
doc = "When storing acknowledgement state, choose a more compact serialization format that stores"
2167+
+ " individual acknowledgements as a bitmap which is serialized to an array of long values.\n\n"
2168+
+ "NOTE: This setting requires managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.")
21582169
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
2170+
21592171
@FieldContext(
21602172
category = CATEGORY_STORAGE_ML,
21612173
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
@@ -2178,8 +2190,10 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
21782190
private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
21792191
@FieldContext(
21802192
category = CATEGORY_STORAGE_OFFLOADING,
2181-
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
2182-
)
2193+
doc = "When set to true, a BitSet will be used to track acknowledged messages that come after the \"mark "
2194+
+ "delete position\" for each subscription.\n\nRoaringBitmap is used as a memory efficient BitSet "
2195+
+ "implementation for the acknowledged messages tracking. Unacknowledged ranges are the message "
2196+
+ "ranges excluding the acknowledged messages.")
21832197
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
21842198
@FieldContext(
21852199
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,6 +1989,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
19891989

19901990
managedLedgerConfig
19911991
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
1992+
managedLedgerConfig.setMaxBatchDeletedIndexToPersist(
1993+
serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist());
19921994
managedLedgerConfig
19931995
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
19941996
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(

0 commit comments

Comments
 (0)