Skip to content

Commit dfcc246

Browse files
committed
[improve][broker] Make maxBatchDeletedIndexToPersist configurable and document other related configs (#24392)
(cherry picked from commit d1bca65)
1 parent b07a403 commit dfcc246

File tree

5 files changed

+72
-18
lines changed

5 files changed

+72
-18
lines changed

conf/broker.conf

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,6 +1276,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000
12761276
# Max time before triggering a rollover on a cursor ledger
12771277
managedLedgerCursorRolloverTimeInSeconds=14400
12781278

1279+
# Maximum amount of memory used hold data read from storage (or from the cache).
1280+
# This mechanism prevents the broker to have too many concurrent
1281+
# reads from storage and fall into Out of Memory errors in case
1282+
# of multiple concurrent reads to multiple concurrent consumers.
1283+
# Set 0 in order to disable the feature.
1284+
#
1285+
managedLedgerMaxReadsInFlightSizeInMB=0
1286+
12791287
# Max number of "acknowledgment holes" that are going to be persistently stored.
12801288
# When acknowledging out of order, a consumer will leave holes that are supposed
12811289
# to be quickly filled by acking all the messages. The information of which
@@ -1285,13 +1293,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400
12851293
# crashes.
12861294
managedLedgerMaxUnackedRangesToPersist=10000
12871295

1288-
# Maximum amount of memory used hold data read from storage (or from the cache).
1289-
# This mechanism prevents the broker to have too many concurrent
1290-
# reads from storage and fall into Out of Memory errors in case
1291-
# of multiple concurrent reads to multiple concurrent consumers.
1292-
# Set 0 in order to disable the feature.
1293-
#
1294-
managedLedgerMaxReadsInFlightSizeInMB=0
1296+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
1297+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
1298+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
1299+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
1300+
# deleted indexes will be cleared while redelivering the messages to consumers.
1301+
managedLedgerMaxBatchDeletedIndexToPersist=10000
1302+
1303+
# When storing acknowledgement state, choose a more compact serialization format that stores
1304+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
1305+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
1306+
managedLedgerPersistIndividualAckAsLongArray=false
1307+
1308+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
1309+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
1310+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
1311+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
12951312

12961313
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
12971314
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
@@ -1770,9 +1787,6 @@ narExtractionDirectory=
17701787
# Maximum prefetch rounds for ledger reading for offloading
17711788
managedLedgerOffloadPrefetchRounds=1
17721789

1773-
# Use Open Range-Set to cache unacked messages
1774-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
1775-
17761790
# For Amazon S3 ledger offload, AWS region
17771791
s3ManagedLedgerOffloadRegion=
17781792

conf/standalone.conf

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048
827827
# crashes.
828828
managedLedgerMaxUnackedRangesToPersist=10000
829829

830+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
831+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
832+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
833+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
834+
# deleted indexes will be cleared while redelivering the messages to consumers.
835+
managedLedgerMaxBatchDeletedIndexToPersist=10000
836+
837+
# When storing acknowledgement state, choose a more compact serialization format that stores
838+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
839+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
840+
managedLedgerPersistIndividualAckAsLongArray=false
841+
842+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
843+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
844+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
845+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
846+
830847
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
831848
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
832849
# MetadataStore.
@@ -861,9 +878,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000
861878
# Maximum backlog entry difference to prevent caching entries that can't be reused.
862879
managedLedgerMaxBacklogBetweenCursorsForCaching=1000
863880

864-
# Use Open Range-Set to cache unacked messages
865-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
866-
867881
# Managed ledger prometheus stats latency rollover seconds (default: 60s)
868882
managedLedgerPrometheusStatsLatencyRolloverSeconds=60
869883

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,16 @@ public int getMaxBatchDeletedIndexToPersist() {
502502
return maxBatchDeletedIndexToPersist;
503503
}
504504

505+
/**
506+
* Set max batch deleted index that will be persisted and recovered.
507+
*
508+
* @param maxBatchDeletedIndexToPersist
509+
* max batch deleted index that will be persisted and recovered.
510+
*/
511+
public void setMaxBatchDeletedIndexToPersist(int maxBatchDeletedIndexToPersist) {
512+
this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist;
513+
}
514+
505515
public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() {
506516
return persistentUnackedRangesWithMultipleEntriesEnabled;
507517
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2265,10 +2265,22 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
22652265
+ " will only be tracked in memory and messages will be redelivered in case of"
22662266
+ " crashes.")
22672267
private int managedLedgerMaxUnackedRangesToPersist = 10000;
2268-
@FieldContext(
2269-
category = CATEGORY_STORAGE_ML,
2270-
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
2268+
2269+
@FieldContext(category = CATEGORY_STORAGE_ML,
2270+
doc = "Maximum number of partially acknowledged batch messages per subscription that will have their batch "
2271+
+ "deleted indexes persisted. Batch deleted index state is handled when "
2272+
+ "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n"
2273+
+ "When this limit is exceeded, remaining batch message containing the batch deleted indexes will "
2274+
+ "only be tracked in memory. In case of broker restarts or load balancing events, the batch "
2275+
+ "deleted indexes will be cleared while redelivering the messages to consumers.")
2276+
private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
2277+
2278+
@FieldContext(category = CATEGORY_STORAGE_ML,
2279+
doc = "When storing acknowledgement state, choose a more compact serialization format that stores"
2280+
+ " individual acknowledgements as a bitmap which is serialized to an array of long values.\n\n"
2281+
+ "NOTE: This setting requires managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.")
22712282
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
2283+
22722284
@FieldContext(
22732285
category = CATEGORY_STORAGE_ML,
22742286
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
@@ -2291,8 +2303,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
22912303
private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
22922304
@FieldContext(
22932305
category = CATEGORY_STORAGE_OFFLOADING,
2294-
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
2295-
)
2306+
doc = "When set to true, a BitSet will be used to track acknowledged messages that come after the \"mark "
2307+
+ "delete position\" for each subscription.\n\nRoaringBitmap is used as a memory efficient BitSet "
2308+
+ "implementation for the acknowledged messages tracking. Unacknowledged ranges are the message "
2309+
+ "ranges excluding the acknowledged messages.")
22962310
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
22972311
@FieldContext(
22982312
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2044,6 +2044,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull To
20442044

20452045
managedLedgerConfig
20462046
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
2047+
managedLedgerConfig.setMaxBatchDeletedIndexToPersist(
2048+
serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist());
20472049
managedLedgerConfig
20482050
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
20492051
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(

0 commit comments

Comments
 (0)