|
| 1 | +# Background knowledge |
| 2 | + |
| 3 | +Key_Shared is one of the subscription types which allows multiple consumer connections. |
| 4 | +Messages are distributed across consumers, and messages with the same key or same ordering key are delivered to only one consumer. |
| 5 | +No matter how many times the message is re-delivered, it is delivered to the same consumer. |
| 6 | + |
| 7 | +When disabling `allowOutOfOrderDelivery`, Key_Shared subscription guarantees a key will be processed in order by a single consumer, even if a new consumer is connected. |
| 8 | + |
| 9 | +# Motivation |
| 10 | + |
| 11 | +Key_Shared has a mechanism called the "recently joined consumers" to keep message ordering. |
| 12 | +However, currently, it doesn't care about some corner cases. |
| 13 | +More specifically, we found two out-of-order issues cased by: |
| 14 | + |
| 15 | +1. [issue-1] The race condition in the "recently joined consumers", where consumers can be added before finishing reading and dispatching messages from ledgers. |
| 16 | +2. [issue-2] Messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement. |
| 17 | + |
| 18 | +We should care about these cases in Key_Shared subscription. |
| 19 | + |
| 20 | +## [issue-1] |
| 21 | + |
| 22 | +Key_Shared subscription has out-of-order cases because of the race condition of [the "recently joined consumers"](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L378-L386). |
| 23 | +Consider the following flow. |
| 24 | + |
| 25 | +1. Assume that the current read position is `1:6` and the recently joined consumers is empty. |
| 26 | +2. Called [OpReadEntry#internalReadEntriesComplete](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L92-L95) from thread-1. |
| 27 | + Then, the current read position is updated to `1:12` (Messages from `1:6` to `1:11` have yet to be dispatched to consumers). |
| 28 | +3. Called [PersistentStickyKeyDispatcherMultipleConsumers#addConsumer](https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L130-L139) from thread-2. |
| 29 | + Then, the new consumer is stored to the recently joined consumers with read position `1:12`. |
| 30 | +4. Called [PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) from thread-5. |
| 31 | + Then, messages from `1:6` to `1:11` can be dispatched to the new consumer since the "recently joined consumers" allow brokers to send messages before the joined position (i.e., `1:12` here). **However, it is not expected.** |
| 32 | + For example, if existing consumers have some unacked messages, disconnecting, and redelivering them can cause out-of-order. |
| 33 | + |
| 34 | +An example scenario is shown below. |
| 35 | + |
| 36 | +1. Assume that the [entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) has the following messages, and the dispatcher has two consumers (`c1` `messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return `c1` if `key-a` and `c2` if `key-b`. |
| 37 | + - `1:6` key: `key-a` |
| 38 | + - `1:7` key: `key-a` |
| 39 | + - `1:8` key: `key-a` |
| 40 | + - `1:9` key: `key-b` |
| 41 | + - `1:10` key: `key-b` |
| 42 | + - `1:11` key: `key-b` |
| 43 | +2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`. |
| 44 | + - So, the current read position is `1:12`. |
| 45 | + - `c1` never acknowledge `1:6`. |
| 46 | +3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the `recentlyJoinedConsumers` is `{c3=1:12}` |
| 47 | +4. Send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are less than the recently joined consumers position, `1:12`. |
| 48 | +5. Disconnect `c1`. |
| 49 | +6. Send `1:6` to `c3`. |
| 50 | + As a result `c3` receives messages with the following order: `1:7`, `1:8`, `1:6` // out-of-order |
| 51 | + |
| 52 | +## [issue-2] |
| 53 | +Key_Shared subscription has out-of-order cases because messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement. |
| 54 | +Consider the following flow. |
| 55 | + |
| 56 | +1. Assume that, |
| 57 | + readPosition: `2:1` |
| 58 | + messagesToRedeliver: [] |
| 59 | + recentlyJoinedConsumers: [] |
| 60 | + c1: messagesForC: 1, pending: [] |
| 61 | + c2: messagesForC: 1000, pending: [] // Necessary to ensure that the dispatcher reads entries even if c1 has no more permits. |
| 62 | + selector: key-a: c1 |
| 63 | +2. Dispatch `2:1` (key: `key-a`, type: Normal) |
| 64 | + readPosition: `2:2` |
| 65 | + messagesToRedeliver: [] |
| 66 | + recentlyJoinedConsumers: [] |
| 67 | + c1: messagesForC: 0, pending: [`2:1`] |
| 68 | + c2: messagesForC: 1000, pending: [] |
| 69 | + selector: key-a: c1 |
| 70 | +3. Try to dispatch `2:2` (key: `key-a`, type: Normal), but it can't be sent to c1 because c1 has no more permits. Then, it is added to messagesToRedeliver. |
| 71 | + readPosition: `2:3` |
| 72 | + messagesToRedeliver: [`2:2`] |
| 73 | + recentlyJoinedConsumers: [] |
| 74 | + c1: messagesForC: 0, pending: [`2:1`] |
| 75 | + c2: messagesForC: 1000, pending: [] |
| 76 | + selector: key-a: c1 |
| 77 | +4. Add consumer c3 |
| 78 | + readPosition: `2:3` |
| 79 | + messagesToRedeliver: [`2:2`] |
| 80 | + recentlyJoinedConsumers: [c3: `2:3`] |
| 81 | + c1: messagesForC: 0, pending: [`2:1`] |
| 82 | + c2: messagesForC: 1000, pending: [] |
| 83 | + c3: messagesForC: 1000, pending: [] |
| 84 | + selector: key-a: c3 // modified |
| 85 | +5. Dispatch `2:2` (key: `key-a`, type: Replay) from messagesToRedeliver. |
| 86 | + readPosition: `2:3` |
| 87 | + messagesToRedeliver: [] |
| 88 | + recentlyJoinedConsumers: [c3: `2:3`] |
| 89 | + c1: messagesForC: 0, pending: [`2:1`] |
| 90 | + c2: messagesForC: 1000, pending: [] |
| 91 | + c3: messagesForC: 999, pending: [`2:2`] |
| 92 | + selector: key-a: c3 |
| 93 | +6. Disconnect c1 and redelivery `2:1` |
| 94 | + readPosition: `2:3` |
| 95 | + messagesToRedeliver: [] |
| 96 | + recentlyJoinedConsumers: [c3: `2:3`] |
| 97 | + c2: messagesForC: 1000, pending: [] |
| 98 | + c3: messagesForC: 998, pending: [`2:2`, `2:1`] // out-of-order |
| 99 | + selector: key-a: c3 |
| 100 | + |
| 101 | +# Goals |
| 102 | + |
| 103 | +## In Scope |
| 104 | + |
| 105 | +Fix out-of-order issues above. |
| 106 | + |
| 107 | +## Out of Scope |
| 108 | + |
| 109 | +Simplify or improve the specification of Key_Shared. |
| 110 | + |
| 111 | +# High Level Design |
| 112 | + |
| 113 | +The root cause of the issues described above is that `recentlyJoinedConsumers` uses "read position" as joined positions for consumers, because this does not guarantee that messages less than or equal to it have already been scheduled to be sent. |
| 114 | +Instead, we propose to use "last sent position" as joined positions for consumers. |
| 115 | + |
| 116 | +Also, change (or add) some stats to know Key_Shared subscription status easily. |
| 117 | + |
| 118 | +# Detailed Design |
| 119 | + |
| 120 | +## Design & Implementation Details |
| 121 | + |
| 122 | +First, introduce the new position, like the mark delete position and the individually deleted messages. In other words, |
| 123 | + |
| 124 | +- All positions less than or equal to it are already scheduled to be sent. |
| 125 | +- Manage individually sent positions to update the position as expected. |
| 126 | + |
| 127 | +An example of updating the individually sent messages and the last sent position will be as follows. |
| 128 | + |
| 129 | +Initially, the last sent position is `3:0`, and the individually sent positions is `[]`. |
| 130 | +1. Read `3:1` - `3:10` positions |
| 131 | +2. Send `3:1` - `3:3`, `3:5`, and `3:8` - `3:10` positions |
| 132 | + - last sent position: `3:3` |
| 133 | + - individually sent positions: `[(3:4, 3:5], (3:7, 3:10]]` |
| 134 | +3. Send `3:7` position |
| 135 | + - last sent position: `3:3` |
| 136 | + - individually sent positions: `[(3:4, 3:5], (3:6, 3:10]]` |
| 137 | +4. Send `3:6` position |
| 138 | + - last sent position: `3:3` |
| 139 | + - individually sent positions: `[(3:4, 3:10]]` |
| 140 | +5. Send `3:4` position |
| 141 | + - last sent position: `3:10` |
| 142 | + - individually sent positions: `[]` |
| 143 | + |
| 144 | +More specifically, the recently joined consumers related fields will be as follows. |
| 145 | +```diff |
| 146 | +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java |
| 147 | +index 8f05530f58b..2b17c580832 100644 |
| 148 | +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java |
| 149 | ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java |
| 150 | +@@ -69,8 +69,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi |
| 151 | + * This means that, in order to preserve ordering, new consumers can only receive old |
| 152 | + * messages, until the mark-delete position will move past this point. |
| 153 | + */ |
| 154 | ++ // Map(key: recently joined consumer, value: last sent position when joining) |
| 155 | + private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers; |
| 156 | + |
| 157 | ++ private PositionImpl lastSentPosition; |
| 158 | ++ private final RangeSetWrapper<PositionImpl> individuallySentPositions; |
| 159 | ++ |
| 160 | + PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, |
| 161 | + Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { |
| 162 | + super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); |
| 163 | +``` |
| 164 | + |
| 165 | +Next, rename the consumer stats as follows. |
| 166 | +```diff |
| 167 | +--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java |
| 168 | ++++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java |
| 169 | +@@ -74,8 +74,8 @@ public class ConsumerStatsImpl implements ConsumerStats { |
| 170 | + /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */ |
| 171 | + public boolean blockedConsumerOnUnackedMsgs; |
| 172 | + |
| 173 | +- /** The read position of the cursor when the consumer joining. */ |
| 174 | +- public String readPositionWhenJoining; |
| 175 | ++ /** The last sent position of the cursor when the consumer joining. */ |
| 176 | ++ public String lastSentPositionWhenJoining; |
| 177 | + |
| 178 | + /** Address of this consumer. */ |
| 179 | + private String address; |
| 180 | +``` |
| 181 | + |
| 182 | +Note that I just renamed the stats from `readPositionWhenJoining` to `lastSentPositionWhenJoining` without keeping the backward-compatibility because readPositionWhenJoining is no longer meaningful and redundant. |
| 183 | + |
| 184 | +And finally, modify the subscription stats of the definition as follows. |
| 185 | +```diff |
| 186 | +diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java |
| 187 | +index dc666f3a18e..7591369277f 100644 |
| 188 | +--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java |
| 189 | ++++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java |
| 190 | +@@ -1177,7 +1177,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs |
| 191 | + .getRecentlyJoinedConsumers(); |
| 192 | + if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) { |
| 193 | + recentlyJoinedConsumers.forEach((k, v) -> { |
| 194 | +- subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString()); |
| 195 | ++ // The dispatcher allows same name consumers |
| 196 | ++ final StringBuilder stringBuilder = new StringBuilder(); |
| 197 | ++ stringBuilder.append("consumerName=").append(k.consumerName()) |
| 198 | ++ .append(", consumerId=").append(k.consumerId()); |
| 199 | ++ if (k.cnx() != null) { |
| 200 | ++ stringBuilder.append(", address=").append(k.cnx().clientAddress()); |
| 201 | ++ } |
| 202 | ++ subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString()); |
| 203 | + }); |
| 204 | + } |
| 205 | + } |
| 206 | +``` |
| 207 | + |
| 208 | +## How The Proposal Resolves The Issue |
| 209 | + |
| 210 | +**[issue-1]** |
| 211 | +Consider the following flow. |
| 212 | + |
| 213 | +1. Assume that the [entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) has the following messages, and the dispatcher has two consumers (`c1` `messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return `c1` if `key-a` and `c2` if `key-b`. |
| 214 | + - `1:6` key: `key-a` |
| 215 | + - `1:7` key: `key-a` |
| 216 | + - `1:8` key: `key-a` |
| 217 | + - `1:9` key: `key-b` |
| 218 | + - `1:10` key: `key-b` |
| 219 | + - `1:11` key: `key-b` |
| 220 | +2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`. |
| 221 | + - So, the current last sent position is `1:6` and the individually sent positions is `[(1:8, 1:11]]`. |
| 222 | + - `c1` never acknowledge `1:6`. |
| 223 | +3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the `recentlyJoinedConsumers` is `{c3=1:6}`. |
| 224 | +4. Can't send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are greater than the recently joined consumers position, `1:6`. |
| 225 | +5. Disconnect `c1`. |
| 226 | +6. Send `1:6` - `1:8` to `c3`. |
| 227 | + Now, `c3` receives messages with expected order regarding `key-a`. |
| 228 | + |
| 229 | +**[issue-2]** |
| 230 | +This mechanism guarantees all messages less than or equal to the last sent position are already scheduled to be sent. Therefore, skipped messages (e.g. `2:2`) are greater than the last sent position. |
| 231 | + |
| 232 | +1. The last sent position is `2:1`. |
| 233 | +2. When add new consumer `c3`, `recentlyJoinedConsumers` is `[{c3: 2:1}]`. |
| 234 | + The dispatcher can't send `2:2` to `c3` because `2:2` is greater than the joined position `2:1`. |
| 235 | +3. When `c3` receives `2:1` and acknowledges it, then the mark delete position is advanced to `2:1`. |
| 236 | + When all messages up to the joined position (i.e., `2:1` ) have been acknowledged, then the consumer (i.e., `c3` ) is removed from `recentlyJoinedConsumers`. |
| 237 | + Therefore, `c3` will be able to receive `2:2`. |
| 238 | + |
| 239 | +**[stats]** |
| 240 | +`readPositionWhenJoining` is replaced with `lastSentPositionWhenJoining` in each consumer stats instead. |
| 241 | + |
| 242 | +## Public-facing Changes |
| 243 | + |
| 244 | +### Public API |
| 245 | + |
| 246 | +### Binary protocol |
| 247 | + |
| 248 | +### Configuration |
| 249 | + |
| 250 | +### CLI |
| 251 | + |
| 252 | +### Metrics |
| 253 | +* The consumer stats `readPositionWhenJoining` is renamed to `lastSentPositionWhenJoining`. |
| 254 | +* The subscription stats `consumersAfterMarkDeletePosition` of the definition is modified as described. |
| 255 | + |
| 256 | +# Monitoring |
| 257 | + |
| 258 | +# Security Considerations |
| 259 | + |
| 260 | +# Backward & Forward Compatability |
| 261 | + |
| 262 | +## Revert |
| 263 | + |
| 264 | +## Upgrade |
| 265 | + |
| 266 | +# Alternatives |
| 267 | + |
| 268 | +### Alternative-1 |
| 269 | +See https://github.com/apache/pulsar/pull/20179 in detail. It isn't merged when publishing this proposal. |
| 270 | +The only difference is the message key, i.e., this approach leverages per-key information in addition to the proposal described in this PIP. |
| 271 | +For example, the `recentlyJoinedConsumers` will be: |
| 272 | + |
| 273 | +``` |
| 274 | +// Map(key: recently joined consumer, value: Map(key: message key, value: last sent position in the key when joining)) |
| 275 | +private final LinkedHashMap<Consumer, Map<ByteBuffer, PositionImpl>> recentlyJoinedConsumers; |
| 276 | +``` |
| 277 | + |
| 278 | +With this change, message delivery stuck on one key will no longer prevent other keys from being dispatched. |
| 279 | +However, the codes will be vulnerable to an increase in keys, causing OOM in the worst case. |
| 280 | + |
| 281 | +### Alternative-2 |
| 282 | +Make updating the read position, dispatching messages, and adding new consumers exclusive to ensure that messages less than the read position have already been sent. |
| 283 | +However, introducing such an exclusion mechanism disrupts the throughput of the dispatcher. |
| 284 | + |
| 285 | +# General Notes |
| 286 | + |
| 287 | +# Links |
| 288 | + |
| 289 | +<!-- |
| 290 | +Updated afterwards |
| 291 | +--> |
| 292 | +* Mailing List discussion thread: https://lists.apache.org/thread/69fpb0d30y7pc02k3zvg2lpb2lj0smdg |
| 293 | +* Mailing List voting thread: https://lists.apache.org/thread/45x056t8njjnzflbkhkofh00gcy4z5g6 |
0 commit comments