Skip to content

Commit 3582cfe

Browse files
committed
[improve][broker] Make maxBatchDeletedIndexToPersist configurable and document other related configs (#24392)
(cherry picked from commit d1bca65)
1 parent c472f17 commit 3582cfe

File tree

5 files changed

+73
-18
lines changed

5 files changed

+73
-18
lines changed

conf/broker.conf

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000
12441244
# Max time before triggering a rollover on a cursor ledger
12451245
managedLedgerCursorRolloverTimeInSeconds=14400
12461246

1247+
# Maximum amount of memory used hold data read from storage (or from the cache).
1248+
# This mechanism prevents the broker to have too many concurrent
1249+
# reads from storage and fall into Out of Memory errors in case
1250+
# of multiple concurrent reads to multiple concurrent consumers.
1251+
# Set 0 in order to disable the feature.
1252+
#
1253+
managedLedgerMaxReadsInFlightSizeInMB=0
1254+
12471255
# Max number of "acknowledgment holes" that are going to be persistently stored.
12481256
# When acknowledging out of order, a consumer will leave holes that are supposed
12491257
# to be quickly filled by acking all the messages. The information of which
@@ -1253,13 +1261,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400
12531261
# crashes.
12541262
managedLedgerMaxUnackedRangesToPersist=10000
12551263

1256-
# Maximum amount of memory used hold data read from storage (or from the cache).
1257-
# This mechanism prevents the broker to have too many concurrent
1258-
# reads from storage and fall into Out of Memory errors in case
1259-
# of multiple concurrent reads to multiple concurrent consumers.
1260-
# Set 0 in order to disable the feature.
1261-
#
1262-
managedLedgerMaxReadsInFlightSizeInMB=0
1264+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
1265+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
1266+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
1267+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
1268+
# deleted indexes will be cleared while redelivering the messages to consumers.
1269+
managedLedgerMaxBatchDeletedIndexToPersist=10000
1270+
1271+
# When storing acknowledgement state, choose a more compact serialization format that stores
1272+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
1273+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
1274+
managedLedgerPersistIndividualAckAsLongArray=false
1275+
1276+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
1277+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
1278+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
1279+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
12631280

12641281
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
12651282
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
@@ -1721,9 +1738,6 @@ narExtractionDirectory=
17211738
# Maximum prefetch rounds for ledger reading for offloading
17221739
managedLedgerOffloadPrefetchRounds=1
17231740

1724-
# Use Open Range-Set to cache unacked messages
1725-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
1726-
17271741
# For Amazon S3 ledger offload, AWS region
17281742
s3ManagedLedgerOffloadRegion=
17291743

conf/standalone.conf

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048
801801
# crashes.
802802
managedLedgerMaxUnackedRangesToPersist=10000
803803

804+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
805+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
806+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
807+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
808+
# deleted indexes will be cleared while redelivering the messages to consumers.
809+
managedLedgerMaxBatchDeletedIndexToPersist=10000
810+
811+
# When storing acknowledgement state, choose a more compact serialization format that stores
812+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
813+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
814+
managedLedgerPersistIndividualAckAsLongArray=false
815+
816+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
817+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
818+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
819+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
820+
804821
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
805822
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
806823
# MetadataStore.
@@ -835,9 +852,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000
835852
# Maximum backlog entry difference to prevent caching entries that can't be reused.
836853
managedLedgerMaxBacklogBetweenCursorsForCaching=1000
837854

838-
# Use Open Range-Set to cache unacked messages
839-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
840-
841855
# Managed ledger prometheus stats latency rollover seconds (default: 60s)
842856
managedLedgerPrometheusStatsLatencyRolloverSeconds=60
843857

@@ -1322,3 +1336,4 @@ disableBrokerInterceptors=true
13221336

13231337
# Whether retain null-key message during topic compaction
13241338
topicCompactionRetainNullKey=false
1339+

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,16 @@ public int getMaxBatchDeletedIndexToPersist() {
488488
return maxBatchDeletedIndexToPersist;
489489
}
490490

491+
/**
492+
* Set max batch deleted index that will be persisted and recovered.
493+
*
494+
* @param maxBatchDeletedIndexToPersist
495+
* max batch deleted index that will be persisted and recovered.
496+
*/
497+
public void setMaxBatchDeletedIndexToPersist(int maxBatchDeletedIndexToPersist) {
498+
this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist;
499+
}
500+
491501
public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() {
492502
return persistentUnackedRangesWithMultipleEntriesEnabled;
493503
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2195,10 +2195,22 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
21952195
+ " will only be tracked in memory and messages will be redelivered in case of"
21962196
+ " crashes.")
21972197
private int managedLedgerMaxUnackedRangesToPersist = 10000;
2198-
@FieldContext(
2199-
category = CATEGORY_STORAGE_ML,
2200-
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
2198+
2199+
@FieldContext(category = CATEGORY_STORAGE_ML,
2200+
doc = "Maximum number of partially acknowledged batch messages per subscription that will have their batch "
2201+
+ "deleted indexes persisted. Batch deleted index state is handled when "
2202+
+ "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n"
2203+
+ "When this limit is exceeded, remaining batch message containing the batch deleted indexes will "
2204+
+ "only be tracked in memory. In case of broker restarts or load balancing events, the batch "
2205+
+ "deleted indexes will be cleared while redelivering the messages to consumers.")
2206+
private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
2207+
2208+
@FieldContext(category = CATEGORY_STORAGE_ML,
2209+
doc = "When storing acknowledgement state, choose a more compact serialization format that stores"
2210+
+ " individual acknowledgements as a bitmap which is serialized to an array of long values.\n\n"
2211+
+ "NOTE: This setting requires managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.")
22012212
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
2213+
22022214
@FieldContext(
22032215
category = CATEGORY_STORAGE_ML,
22042216
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
@@ -2221,8 +2233,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
22212233
private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
22222234
@FieldContext(
22232235
category = CATEGORY_STORAGE_OFFLOADING,
2224-
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
2225-
)
2236+
doc = "When set to true, a BitSet will be used to track acknowledged messages that come after the \"mark "
2237+
+ "delete position\" for each subscription.\n\nRoaringBitmap is used as a memory efficient BitSet "
2238+
+ "implementation for the acknowledged messages tracking. Unacknowledged ranges are the message "
2239+
+ "ranges excluding the acknowledged messages.")
22262240
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
22272241
@FieldContext(
22282242
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1983,6 +1983,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull To
19831983

19841984
managedLedgerConfig
19851985
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
1986+
managedLedgerConfig.setMaxBatchDeletedIndexToPersist(
1987+
serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist());
19861988
managedLedgerConfig
19871989
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
19881990
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(

0 commit comments

Comments
 (0)