Skip to content

Commit d1bca65

Browse files
authored
[improve][broker] Make maxBatchDeletedIndexToPersist configurable and document other related configs (#24392)
1 parent 9c95454 commit d1bca65

File tree

5 files changed

+73
-19
lines changed

5 files changed

+73
-19
lines changed

conf/broker.conf

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,6 +1280,14 @@ managedLedgerCursorMaxEntriesPerLedger=50000
12801280
# Max time before triggering a rollover on a cursor ledger
12811281
managedLedgerCursorRolloverTimeInSeconds=14400
12821282

1283+
# Maximum amount of memory used hold data read from storage (or from the cache).
1284+
# This mechanism prevents the broker to have too many concurrent
1285+
# reads from storage and fall into Out of Memory errors in case
1286+
# of multiple concurrent reads to multiple concurrent consumers.
1287+
# Set 0 in order to disable the feature.
1288+
#
1289+
managedLedgerMaxReadsInFlightSizeInMB=0
1290+
12831291
# Max number of "acknowledgment holes" that are going to be persistently stored.
12841292
# When acknowledging out of order, a consumer will leave holes that are supposed
12851293
# to be quickly filled by acking all the messages. The information of which
@@ -1289,13 +1297,22 @@ managedLedgerCursorRolloverTimeInSeconds=14400
12891297
# crashes.
12901298
managedLedgerMaxUnackedRangesToPersist=10000
12911299

1292-
# Maximum amount of memory used hold data read from storage (or from the cache).
1293-
# This mechanism prevents the broker to have too many concurrent
1294-
# reads from storage and fall into Out of Memory errors in case
1295-
# of multiple concurrent reads to multiple concurrent consumers.
1296-
# Set 0 in order to disable the feature.
1297-
#
1298-
managedLedgerMaxReadsInFlightSizeInMB=0
1300+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
1301+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
1302+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
1303+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
1304+
# deleted indexes will be cleared while redelivering the messages to consumers.
1305+
managedLedgerMaxBatchDeletedIndexToPersist=10000
1306+
1307+
# When storing acknowledgement state, choose a more compact serialization format that stores
1308+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
1309+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
1310+
managedLedgerPersistIndividualAckAsLongArray=true
1311+
1312+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
1313+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
1314+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
1315+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
12991316

13001317
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
13011318
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
@@ -1777,9 +1794,6 @@ narExtractionDirectory=
17771794
# Maximum prefetch rounds for ledger reading for offloading
17781795
managedLedgerOffloadPrefetchRounds=1
17791796

1780-
# Use Open Range-Set to cache unacked messages
1781-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
1782-
17831797
# For Amazon S3 ledger offload, AWS region
17841798
s3ManagedLedgerOffloadRegion=
17851799

conf/standalone.conf

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,23 @@ managedLedgerMaxSizePerLedgerMbytes=2048
827827
# crashes.
828828
managedLedgerMaxUnackedRangesToPersist=10000
829829

830+
# Maximum number of partially acknowledged batch messages per subscription that will have their batch
831+
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
832+
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
833+
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
834+
# deleted indexes will be cleared while redelivering the messages to consumers.
835+
managedLedgerMaxBatchDeletedIndexToPersist=10000
836+
837+
# When storing acknowledgement state, choose a more compact serialization format that stores
838+
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
839+
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
840+
managedLedgerPersistIndividualAckAsLongArray=true
841+
842+
# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
843+
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
844+
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
845+
managedLedgerUnackedRangesOpenCacheSetEnabled=true
846+
830847
# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
831848
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
832849
# MetadataStore.
@@ -861,9 +878,6 @@ managedLedgerMinimumBacklogEntriesForCaching=1000
861878
# Maximum backlog entry difference to prevent caching entries that can't be reused.
862879
managedLedgerMaxBacklogBetweenCursorsForCaching=1000
863880

864-
# Use Open Range-Set to cache unacked messages
865-
managedLedgerUnackedRangesOpenCacheSetEnabled=true
866-
867881
# Managed ledger prometheus stats latency rollover seconds (default: 60s)
868882
managedLedgerPrometheusStatsLatencyRolloverSeconds=60
869883

@@ -1362,4 +1376,4 @@ topicCompactionRetainNullKey=false
13621376
# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory",
13631377
# will create topic compaction service based on message eventTime.
13641378
# By default compaction service is based on message publishing order.
1365-
compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory
1379+
compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,16 @@ public int getMaxBatchDeletedIndexToPersist() {
502502
return maxBatchDeletedIndexToPersist;
503503
}
504504

505+
/**
506+
* Set max batch deleted index that will be persisted and recovered.
507+
*
508+
* @param maxBatchDeletedIndexToPersist
509+
* max batch deleted index that will be persisted and recovered.
510+
*/
511+
public void setMaxBatchDeletedIndexToPersist(int maxBatchDeletedIndexToPersist) {
512+
this.maxBatchDeletedIndexToPersist = maxBatchDeletedIndexToPersist;
513+
}
514+
505515
public boolean isPersistentUnackedRangesWithMultipleEntriesEnabled() {
506516
return persistentUnackedRangesWithMultipleEntriesEnabled;
507517
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2270,10 +2270,22 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
22702270
+ " will only be tracked in memory and messages will be redelivered in case of"
22712271
+ " crashes.")
22722272
private int managedLedgerMaxUnackedRangesToPersist = 10000;
2273-
@FieldContext(
2274-
category = CATEGORY_STORAGE_ML,
2275-
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
2273+
2274+
@FieldContext(category = CATEGORY_STORAGE_ML,
2275+
doc = "Maximum number of partially acknowledged batch messages per subscription that will have their batch "
2276+
+ "deleted indexes persisted. Batch deleted index state is handled when "
2277+
+ "acknowledgmentAtBatchIndexLevelEnabled=true.\n\n"
2278+
+ "When this limit is exceeded, remaining batch message containing the batch deleted indexes will "
2279+
+ "only be tracked in memory. In case of broker restarts or load balancing events, the batch "
2280+
+ "deleted indexes will be cleared while redelivering the messages to consumers.")
2281+
private int managedLedgerMaxBatchDeletedIndexToPersist = 10000;
2282+
2283+
@FieldContext(category = CATEGORY_STORAGE_ML,
2284+
doc = "When storing acknowledgement state, choose a more compact serialization format that stores"
2285+
+ " individual acknowledgements as a bitmap which is serialized to an array of long values.\n\n"
2286+
+ "NOTE: This setting requires managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.")
22762287
private boolean managedLedgerPersistIndividualAckAsLongArray = true;
2288+
22772289
@FieldContext(
22782290
category = CATEGORY_STORAGE_ML,
22792291
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
@@ -2296,8 +2308,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
22962308
private int managedLedgerMaxUnackedRangesToPersistInMetadataStore = 1000;
22972309
@FieldContext(
22982310
category = CATEGORY_STORAGE_OFFLOADING,
2299-
doc = "Use Open Range-Set to cache unacked messages (it is memory efficient but it can take more cpu)"
2300-
)
2311+
doc = "When set to true, a BitSet will be used to track acknowledged messages that come after the \"mark "
2312+
+ "delete position\" for each subscription.\n\nRoaringBitmap is used as a memory efficient BitSet "
2313+
+ "implementation for the acknowledged messages tracking. Unacknowledged ranges are the message "
2314+
+ "ranges excluding the acknowledged messages.")
23012315
private boolean managedLedgerUnackedRangesOpenCacheSetEnabled = true;
23022316
@FieldContext(
23032317
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,6 +2056,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull To
20562056

20572057
managedLedgerConfig
20582058
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
2059+
managedLedgerConfig.setMaxBatchDeletedIndexToPersist(
2060+
serviceConfig.getManagedLedgerMaxBatchDeletedIndexToPersist());
20592061
managedLedgerConfig
20602062
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
20612063
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(

0 commit comments

Comments
 (0)