diff --git a/pip/pip-282.md b/pip/pip-282.md new file mode 100644 index 0000000000000..d07bd15964fb7 --- /dev/null +++ b/pip/pip-282.md @@ -0,0 +1,293 @@ +# Background knowledge + +Key_Shared is one of the subscription types which allows multiple consumer connections. +Messages are distributed across consumers, and messages with the same key or same ordering key are delivered to only one consumer. +No matter how many times the message is re-delivered, it is delivered to the same consumer. + +When disabling `allowOutOfOrderDelivery`, Key_Shared subscription guarantees a key will be processed in order by a single consumer, even if a new consumer is connected. + +# Motivation + +Key_Shared has a mechanism called the "recently joined consumers" to keep message ordering. +However, currently, it doesn't care about some corner cases. +More specifically, we found two out-of-order issues cased by: + +1. [issue-1] The race condition in the "recently joined consumers", where consumers can be added before finishing reading and dispatching messages from ledgers. +2. [issue-2] Messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement. + +We should care about these cases in Key_Shared subscription. + +## [issue-1] + +Key_Shared subscription has out-of-order cases because of the race condition of [the "recently joined consumers"](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L378-L386). +Consider the following flow. + +1. Assume that the current read position is `1:6` and the recently joined consumers is empty. +2. Called [OpReadEntry#internalReadEntriesComplete](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L92-L95) from thread-1. + Then, the current read position is updated to `1:12` (Messages from `1:6` to `1:11` have yet to be dispatched to consumers). +3. Called [PersistentStickyKeyDispatcherMultipleConsumers#addConsumer](https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L130-L139) from thread-2. + Then, the new consumer is stored to the recently joined consumers with read position `1:12`. +4. Called [PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) from thread-5. + Then, messages from `1:6` to `1:11` can be dispatched to the new consumer since the "recently joined consumers" allow brokers to send messages before the joined position (i.e., `1:12` here). **However, it is not expected.** + For example, if existing consumers have some unacked messages, disconnecting, and redelivering them can cause out-of-order. + +An example scenario is shown below. + +1. Assume that the [entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) has the following messages, and the dispatcher has two consumers (`c1` `messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return `c1` if `key-a` and `c2` if `key-b`. + - `1:6` key: `key-a` + - `1:7` key: `key-a` + - `1:8` key: `key-a` + - `1:9` key: `key-b` + - `1:10` key: `key-b` + - `1:11` key: `key-b` +2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`. + - So, the current read position is `1:12`. + - `c1` never acknowledge `1:6`. +3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the `recentlyJoinedConsumers` is `{c3=1:12}` +4. Send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are less than the recently joined consumers position, `1:12`. +5. Disconnect `c1`. +6. Send `1:6` to `c3`. + As a result `c3` receives messages with the following order: `1:7`, `1:8`, `1:6` // out-of-order + +## [issue-2] +Key_Shared subscription has out-of-order cases because messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement. +Consider the following flow. + +1. Assume that, + readPosition: `2:1` + messagesToRedeliver: [] + recentlyJoinedConsumers: [] + c1: messagesForC: 1, pending: [] + c2: messagesForC: 1000, pending: [] // Necessary to ensure that the dispatcher reads entries even if c1 has no more permits. + selector: key-a: c1 +2. Dispatch `2:1` (key: `key-a`, type: Normal) + readPosition: `2:2` + messagesToRedeliver: [] + recentlyJoinedConsumers: [] + c1: messagesForC: 0, pending: [`2:1`] + c2: messagesForC: 1000, pending: [] + selector: key-a: c1 +3. Try to dispatch `2:2` (key: `key-a`, type: Normal), but it can't be sent to c1 because c1 has no more permits. Then, it is added to messagesToRedeliver. + readPosition: `2:3` + messagesToRedeliver: [`2:2`] + recentlyJoinedConsumers: [] + c1: messagesForC: 0, pending: [`2:1`] + c2: messagesForC: 1000, pending: [] + selector: key-a: c1 +4. Add consumer c3 + readPosition: `2:3` + messagesToRedeliver: [`2:2`] + recentlyJoinedConsumers: [c3: `2:3`] + c1: messagesForC: 0, pending: [`2:1`] + c2: messagesForC: 1000, pending: [] + c3: messagesForC: 1000, pending: [] + selector: key-a: c3 // modified +5. Dispatch `2:2` (key: `key-a`, type: Replay) from messagesToRedeliver. + readPosition: `2:3` + messagesToRedeliver: [] + recentlyJoinedConsumers: [c3: `2:3`] + c1: messagesForC: 0, pending: [`2:1`] + c2: messagesForC: 1000, pending: [] + c3: messagesForC: 999, pending: [`2:2`] + selector: key-a: c3 +6. Disconnect c1 and redelivery `2:1` + readPosition: `2:3` + messagesToRedeliver: [] + recentlyJoinedConsumers: [c3: `2:3`] + c2: messagesForC: 1000, pending: [] + c3: messagesForC: 998, pending: [`2:2`, `2:1`] // out-of-order + selector: key-a: c3 + +# Goals + +## In Scope + +Fix out-of-order issues above. + +## Out of Scope + +Simplify or improve the specification of Key_Shared. + +# High Level Design + +The root cause of the issues described above is that `recentlyJoinedConsumers` uses "read position" as joined positions for consumers, because this does not guarantee that messages less than or equal to it have already been scheduled to be sent. +Instead, we propose to use "last sent position" as joined positions for consumers. + +Also, change (or add) some stats to know Key_Shared subscription status easily. + +# Detailed Design + +## Design & Implementation Details + +First, introduce the new position, like the mark delete position and the individually deleted messages. In other words, + +- All positions less than or equal to it are already scheduled to be sent. +- Manage individually sent positions to update the position as expected. + +An example of updating the individually sent messages and the last sent position will be as follows. + +Initially, the last sent position is `3:0`, and the individually sent positions is `[]`. +1. Read `3:1` - `3:10` positions +2. Send `3:1` - `3:3`, `3:5`, and `3:8` - `3:10` positions + - last sent position: `3:3` + - individually sent positions: `[(3:4, 3:5], (3:7, 3:10]]` +3. Send `3:7` position + - last sent position: `3:3` + - individually sent positions: `[(3:4, 3:5], (3:6, 3:10]]` +4. Send `3:6` position + - last sent position: `3:3` + - individually sent positions: `[(3:4, 3:10]]` +5. Send `3:4` position + - last sent position: `3:10` + - individually sent positions: `[]` + +More specifically, the recently joined consumers related fields will be as follows. +```diff +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +index 8f05530f58b..2b17c580832 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +@@ -69,8 +69,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi + * This means that, in order to preserve ordering, new consumers can only receive old + * messages, until the mark-delete position will move past this point. + */ ++ // Map(key: recently joined consumer, value: last sent position when joining) + private final LinkedHashMap recentlyJoinedConsumers; + ++ private PositionImpl lastSentPosition; ++ private final RangeSetWrapper individuallySentPositions; ++ + PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, + Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { + super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); +``` + +Next, rename the consumer stats as follows. +```diff +--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java ++++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +@@ -74,8 +74,8 @@ public class ConsumerStatsImpl implements ConsumerStats { + /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */ + public boolean blockedConsumerOnUnackedMsgs; + +- /** The read position of the cursor when the consumer joining. */ +- public String readPositionWhenJoining; ++ /** The last sent position of the cursor when the consumer joining. */ ++ public String lastSentPositionWhenJoining; + + /** Address of this consumer. */ + private String address; +``` + +Note that I just renamed the stats from `readPositionWhenJoining` to `lastSentPositionWhenJoining` without keeping the backward-compatibility because readPositionWhenJoining is no longer meaningful and redundant. + +And finally, modify the subscription stats of the definition as follows. +```diff +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +index dc666f3a18e..7591369277f 100644 +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +@@ -1177,7 +1177,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs + .getRecentlyJoinedConsumers(); + if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) { + recentlyJoinedConsumers.forEach((k, v) -> { +- subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString()); ++ // The dispatcher allows same name consumers ++ final StringBuilder stringBuilder = new StringBuilder(); ++ stringBuilder.append("consumerName=").append(k.consumerName()) ++ .append(", consumerId=").append(k.consumerId()); ++ if (k.cnx() != null) { ++ stringBuilder.append(", address=").append(k.cnx().clientAddress()); ++ } ++ subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString()); + }); + } + } +``` + +## How The Proposal Resolves The Issue + +**[issue-1]** +Consider the following flow. + +1. Assume that the [entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) has the following messages, and the dispatcher has two consumers (`c1` `messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return `c1` if `key-a` and `c2` if `key-b`. + - `1:6` key: `key-a` + - `1:7` key: `key-a` + - `1:8` key: `key-a` + - `1:9` key: `key-b` + - `1:10` key: `key-b` + - `1:11` key: `key-b` +2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`. + - So, the current last sent position is `1:6` and the individually sent positions is `[(1:8, 1:11]]`. + - `c1` never acknowledge `1:6`. +3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the `recentlyJoinedConsumers` is `{c3=1:6}`. +4. Can't send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are greater than the recently joined consumers position, `1:6`. +5. Disconnect `c1`. +6. Send `1:6` - `1:8` to `c3`. + Now, `c3` receives messages with expected order regarding `key-a`. + +**[issue-2]** +This mechanism guarantees all messages less than or equal to the last sent position are already scheduled to be sent. Therefore, skipped messages (e.g. `2:2`) are greater than the last sent position. + +1. The last sent position is `2:1`. +2. When add new consumer `c3`, `recentlyJoinedConsumers` is `[{c3: 2:1}]`. + The dispatcher can't send `2:2` to `c3` because `2:2` is greater than the joined position `2:1`. +3. When `c3` receives `2:1` and acknowledges it, then the mark delete position is advanced to `2:1`. + When all messages up to the joined position (i.e., `2:1` ) have been acknowledged, then the consumer (i.e., `c3` ) is removed from `recentlyJoinedConsumers`. + Therefore, `c3` will be able to receive `2:2`. + +**[stats]** +`readPositionWhenJoining` is replaced with `lastSentPositionWhenJoining` in each consumer stats instead. + +## Public-facing Changes + +### Public API + +### Binary protocol + +### Configuration + +### CLI + +### Metrics +* The consumer stats `readPositionWhenJoining` is renamed to `lastSentPositionWhenJoining`. +* The subscription stats `consumersAfterMarkDeletePosition` of the definition is modified as described. + +# Monitoring + +# Security Considerations + +# Backward & Forward Compatability + +## Revert + +## Upgrade + +# Alternatives + +### Alternative-1 +See https://github.com/apache/pulsar/pull/20179 in detail. It isn't merged when publishing this proposal. +The only difference is the message key, i.e., this approach leverages per-key information in addition to the proposal described in this PIP. +For example, the `recentlyJoinedConsumers` will be: + +``` +// Map(key: recently joined consumer, value: Map(key: message key, value: last sent position in the key when joining)) +private final LinkedHashMap> recentlyJoinedConsumers; +``` + +With this change, message delivery stuck on one key will no longer prevent other keys from being dispatched. +However, the codes will be vulnerable to an increase in keys, causing OOM in the worst case. + +### Alternative-2 +Make updating the read position, dispatching messages, and adding new consumers exclusive to ensure that messages less than the read position have already been sent. +However, introducing such an exclusion mechanism disrupts the throughput of the dispatcher. + +# General Notes + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/69fpb0d30y7pc02k3zvg2lpb2lj0smdg +* Mailing List voting thread: https://lists.apache.org/thread/45x056t8njjnzflbkhkofh00gcy4z5g6