Skip to content

Commit 36a80ae

Browse files
lhotarisrinath-ctds
authored andcommitted
[improve][broker] Make maxBatchDeletedIndexToPersist configurable and document other related configs (apache#24392)
(cherry picked from commit d1bca65) (cherry picked from commit 17f04ef)
1 parent ce43873 commit 36a80ae

File tree

5 files changed

+73
-18
lines changed

5 files changed

+73
-18
lines changed

conf/broker.conf

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000
12331233
# Max time before triggering a rollover on a cursor ledger
12341234
managedLedgerCursorRolloverTimeInSeconds=14400
12351235

1236+
# Maximum amount of memory used hold data read from storage (or from the cache).
1237+
# This mechanism prevents the broker to have too many concurrent
1238+
# reads from storage and fall into Out of Memory errors in case
1239+
# of multiple concurrent reads to multiple concurrent consumers.
1240+
# Set 0 in order to disable the feature.
1241+
#
1242+
managedLedgerMaxReadsInFlightSizeInMB=0
1243+
12361244
# Max number of "acknowledgment holes" that are going to be persistently stored.
12371245
# When acknowledging out of order, a consumer will leave holes that are supposed
12381246
# to be quickly filled by acking all the messages. The information of which
@@ -1242,13 +1250,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400
12421250
# crashes.
12431251
managedLedgerMaxUnackedRangesToPersist=10000
12441252

1245-
# Maximum amount of memory used hold data read from storage (or from the cache).
1246-
# This mechanism prevents the broker to have too many concurrent
1247-
# reads from storage and fall into Out of Memory errors in case
1248-
# of multiple concurrent reads to multiple concurrent consumers.
1249-
# Set 0 in order to disable the feature.
1250-
#
1251-
managedLedgerMaxReadsInFlightSizeInMB=0
1253+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
1254+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
1255+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
1256+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
1257+
# deleted indexes will be cleared while redelivering the messages to consumers.
1258+
managedLedgerMaxBatchDeletedIndexToPersist=10000
1259+
1260+
# When storing acknowledgement state, choose a more compact serialization format that stores
1261+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
1262+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
1263+
managedLedgerPersistIndividualAckAsLongArray=false
1264+
1265+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
1266+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
1267+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
1268+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
12521269

12531270
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
12541271
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
@@ -1709,9 +1726,6 @@ narExtractionDirectory=
17091726
# Maximum prefetch rounds for ledger reading for offloading
17101727
managedLedgerOffloadPrefetchRounds=1
17111728

1712-
# Use Open Range-Set to cache unacked messages
1713-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
1714-
17151729
# For Amazon S3 ledger offload, AWS region
17161730
s3ManagedLedgerOffloadRegion=
17171731

conf/standalone.conf

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048
798798
# crashes.
799799
managedLedgerMaxUnackedRangesToPersist=10000
800800

801+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
802+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
803+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
804+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
805+
# deleted indexes will be cleared while redelivering the messages to consumers.
806+
managedLedgerMaxBatchDeletedIndexToPersist=10000
807+
808+
# When storing acknowledgement state, choose a more compact serialization format that stores
809+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
810+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
811+
managedLedgerPersistIndividualAckAsLongArray=false
812+
813+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
814+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
815+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
816+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
817+
801818
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
802819
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
803820
# MetadataStore.
@@ -832,9 +849,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000
832849
# Maximum backlog entry difference to prevent caching entries that can't be reused.
833850
managedLedgerMaxBacklogBetweenCursorsForCaching=1000
834851

835-
# Use Open Range-Set to cache unacked messages
836-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
837-
838852
# Managed ledger prometheus stats latency rollover seconds (default: 60s)
839853
managedLedgerPrometheusStatsLatencyRolloverSeconds=60
840854

@@ -1314,3 +1328,4 @@ disableBrokerInterceptors=true
13141328

13151329
# Whether retain null-key message during topic compaction
13161330
topicCompactionRetainNullKey=true
1331+

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,16 @@ public int getMaxBatchDeletedIndexToPersist() {
487487
return maxBatchDeletedIndexToPersist;
488488
}
489489

490+
/**
491+
* Set max batch deleted index that will be persisted and recovered.
492+
*
493+
* @param maxBatchDeletedIndexToPersist
494+
* max batch deleted index that will be persisted and recovered.
495+
*/
496+
public void setMaxBatchDeletedIndexToPersist(int maxBatchDeletedIndexToPersist) {
497+
this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist;
498+
}
499+
490500
public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() {
491501
return persistentUnackedRangesWithMultipleEntriesEnabled;
492502
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2177,10 +2177,22 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
21772177
+ " will only be tracked in memory and messages will be redelivered in case of"
21782178
+ " crashes.")
21792179
private int managedLedgerMaxUnackedRangesToPersist = 10000;
2180-
@FieldContext(
2181-
category = CATEGORY_STORAGE_ML,
2182-
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
2180+
2181+
@FieldContext(category = CATEGORY_STORAGE_ML,
2182+
doc = "Maximum number of partially acknowledged batch messages per subscription that will have their batch "
2183+
+ "deleted indexes persisted. Batch deleted index state is handled when "
2184+
+ "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n"
2185+
+ "When this limit is exceeded, remaining batch message containing the batch deleted indexes will "
2186+
+ "only be tracked in memory. In case of broker restarts or load balancing events, the batch "
2187+
+ "deleted indexes will be cleared while redelivering the messages to consumers.")
2188+
private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
2189+
2190+
@FieldContext(category = CATEGORY_STORAGE_ML,
2191+
doc = "When storing acknowledgement state, choose a more compact serialization format that stores"
2192+
+ " individual acknowledgements as a bitmap which is serialized to an array of long values.\n\n"
2193+
+ "NOTE: This setting requires managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.")
21832194
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
2195+
21842196
@FieldContext(
21852197
category = CATEGORY_STORAGE_ML,
21862198
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
@@ -2203,8 +2215,10 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
22032215
private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
22042216
@FieldContext(
22052217
category = CATEGORY_STORAGE_OFFLOADING,
2206-
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
2207-
)
2218+
doc = "When set to true, a BitSet will be used to track acknowledged messages that come after the \"mark "
2219+
+ "delete position\" for each subscription.\n\nRoaringBitmap is used as a memory efficient BitSet "
2220+
+ "implementation for the acknowledged messages tracking. Unacknowledged ranges are the message "
2221+
+ "ranges excluding the acknowledged messages.")
22082222
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
22092223
@FieldContext(
22102224
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,6 +1970,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
19701970

19711971
managedLedgerConfig
19721972
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
1973+
managedLedgerConfig.setMaxBatchDeletedIndexToPersist(
1974+
serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist());
19731975
managedLedgerConfig
19741976
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
19751977
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(

0 commit comments

Comments
 (0)