|
12 | 12 | // See the License for the specific language governing permissions and
|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
| 15 | +use std::pin::pin; |
| 16 | + |
15 | 17 | use futures::channel::{mpsc, oneshot};
|
16 | 18 | use futures::stream::select_with_strategy;
|
17 | 19 | use futures::{stream, StreamExt};
|
@@ -105,7 +107,7 @@ impl RearrangedChainExecutor {
|
105 | 107 |
|
106 | 108 | #[try_stream(ok = Message, error = StreamExecutorError)]
|
107 | 109 | async fn execute_inner(mut self) {
|
108 |
| - let mut upstream = Box::pin(self.upstream.execute()); |
| 110 | + let mut upstream = pin!(self.upstream.execute()); |
109 | 111 |
|
110 | 112 | // 1. Poll the upstream to get the first barrier.
|
111 | 113 | let first_barrier = expect_first_barrier(&mut upstream).await?;
|
@@ -133,106 +135,107 @@ impl RearrangedChainExecutor {
|
133 | 135 | .unbounded_send(RearrangedMessage::PhantomBarrier(first_barrier))
|
134 | 136 | .unwrap();
|
135 | 137 |
|
136 |
| - // 3. Rearrange stream, will yield the barriers polled from upstream to rearrange. |
137 |
| - let rearranged_barrier = Box::pin( |
138 |
| - Self::rearrange_barrier(&mut upstream, upstream_tx, stop_rearrange_rx) |
139 |
| - .map(|result| result.map(RearrangedMessage::RearrangedBarrier)), |
140 |
| - ); |
141 |
| - |
142 |
| - // 4. Init the snapshot with reading epoch. |
143 |
| - let snapshot = self.snapshot.execute_with_epoch(create_epoch.prev); |
144 |
| - |
145 |
| - // Chain the `snapshot` and `upstream_rx` to get a unified `rearranged_chunks` stream. |
146 |
| - let rearranged_chunks = snapshot |
147 |
| - .map(|result| result.map(RearrangedMessage::rearranged_from)) |
148 |
| - .chain(upstream_rx.map(Ok)); |
149 |
| - |
150 |
| - // 5. Merge the rearranged barriers with chunks, with the priority of barrier. |
151 |
| - let mut rearranged = |
152 |
| - select_with_strategy(rearranged_barrier, rearranged_chunks, |_: &mut ()| { |
153 |
| - stream::PollNext::Left |
154 |
| - }); |
155 |
| - |
156 |
| - // Record the epoch of the last rearranged barrier we received. |
157 |
| - let mut last_rearranged_epoch = create_epoch; |
158 |
| - let mut stop_rearrange_tx = Some(stop_rearrange_tx); |
159 |
| - |
160 |
| - let mut processed_rows: u64 = 0; |
161 |
| - |
162 |
| - // 6. Consume the merged `rearranged` stream. |
163 |
| - #[for_await] |
164 |
| - for rearranged_msg in &mut rearranged { |
165 |
| - match rearranged_msg? { |
166 |
| - // If we received a phantom barrier, update the progress and check whether we |
167 |
| - // catches up with the progress of upstream MV. |
168 |
| - // |
169 |
| - // Note that there's no phantom barrier in the snapshot. So we must have already |
170 |
| - // consumed the whole snapshot and be on the upstream now. |
171 |
| - RearrangedMessage::PhantomBarrier(barrier) => { |
172 |
| - // Update the progress since we've consumed all chunks before this phantom. |
173 |
| - self.progress.update( |
174 |
| - last_rearranged_epoch.curr, |
175 |
| - barrier.epoch.curr, |
176 |
| - processed_rows, |
177 |
| - ); |
178 |
| - |
179 |
| - if barrier.epoch.curr >= last_rearranged_epoch.curr { |
180 |
| - // Stop the background rearrangement task. |
181 |
| - stop_rearrange_tx.take().unwrap().send(()).map_err(|_| { |
182 |
| - StreamExecutorError::channel_closed("stop rearrange") |
183 |
| - })?; |
184 |
| - break; |
| 138 | + { |
| 139 | + // 3. Rearrange stream, will yield the barriers polled from upstream to rearrange. |
| 140 | + let rearranged_barrier = |
| 141 | + pin!( |
| 142 | + Self::rearrange_barrier(&mut upstream, upstream_tx, stop_rearrange_rx) |
| 143 | + .map(|result| result.map(RearrangedMessage::RearrangedBarrier)), |
| 144 | + ); |
| 145 | + |
| 146 | + // 4. Init the snapshot with reading epoch. |
| 147 | + let snapshot = self.snapshot.execute_with_epoch(create_epoch.prev); |
| 148 | + |
| 149 | + // Chain the `snapshot` and `upstream_rx` to get a unified `rearranged_chunks` |
| 150 | + // stream. |
| 151 | + let rearranged_chunks = snapshot |
| 152 | + .map(|result| result.map(RearrangedMessage::rearranged_from)) |
| 153 | + .chain(upstream_rx.map(Ok)); |
| 154 | + |
| 155 | + // 5. Merge the rearranged barriers with chunks, with the priority of barrier. |
| 156 | + let mut rearranged = |
| 157 | + select_with_strategy(rearranged_barrier, rearranged_chunks, |_: &mut ()| { |
| 158 | + stream::PollNext::Left |
| 159 | + }); |
| 160 | + |
| 161 | + // Record the epoch of the last rearranged barrier we received. |
| 162 | + let mut last_rearranged_epoch = create_epoch; |
| 163 | + let mut stop_rearrange_tx = Some(stop_rearrange_tx); |
| 164 | + |
| 165 | + let mut processed_rows: u64 = 0; |
| 166 | + |
| 167 | + #[for_await] |
| 168 | + for rearranged_msg in &mut rearranged { |
| 169 | + match rearranged_msg? { |
| 170 | + // If we received a phantom barrier, update the progress and check whether |
| 171 | + // we catches up with the progress of upstream MV. |
| 172 | + // |
| 173 | + // Note that there's no phantom barrier in the snapshot. So we must have |
| 174 | + // already consumed the whole snapshot and be on the |
| 175 | + // upstream now. |
| 176 | + RearrangedMessage::PhantomBarrier(barrier) => { |
| 177 | + // Update the progress since we've consumed all chunks before this |
| 178 | + // phantom. |
| 179 | + self.progress.update( |
| 180 | + last_rearranged_epoch.curr, |
| 181 | + barrier.epoch.curr, |
| 182 | + processed_rows, |
| 183 | + ); |
| 184 | + |
| 185 | + if barrier.epoch.curr >= last_rearranged_epoch.curr { |
| 186 | + // Stop the background rearrangement task. |
| 187 | + stop_rearrange_tx.take().unwrap().send(()).map_err(|_| { |
| 188 | + StreamExecutorError::channel_closed("stop rearrange") |
| 189 | + })?; |
| 190 | + break; |
| 191 | + } |
185 | 192 | }
|
186 |
| - } |
187 | 193 |
|
188 |
| - // If we received a message, yield it. |
189 |
| - RearrangedMessage::RearrangedBarrier(barrier) => { |
190 |
| - last_rearranged_epoch = barrier.epoch; |
191 |
| - yield Message::Barrier(barrier); |
192 |
| - } |
193 |
| - RearrangedMessage::Chunk(chunk) => { |
194 |
| - processed_rows += chunk.cardinality() as u64; |
195 |
| - yield Message::Chunk(chunk) |
196 |
| - } |
197 |
| - RearrangedMessage::Watermark => { |
198 |
| - // Ignore watermark during snapshot consumption. |
| 194 | + // If we received a message, yield it. |
| 195 | + RearrangedMessage::RearrangedBarrier(barrier) => { |
| 196 | + last_rearranged_epoch = barrier.epoch; |
| 197 | + yield Message::Barrier(barrier); |
| 198 | + } |
| 199 | + RearrangedMessage::Chunk(chunk) => { |
| 200 | + processed_rows += chunk.cardinality() as u64; |
| 201 | + yield Message::Chunk(chunk) |
| 202 | + } |
| 203 | + RearrangedMessage::Watermark => { |
| 204 | + // Ignore watermark during snapshot consumption. |
| 205 | + } |
199 | 206 | }
|
200 | 207 | }
|
201 |
| - } |
202 |
| - |
203 |
| - // 7. Rearranged task finished. |
204 |
| - // The reason for finish must be that we told it to stop. |
205 |
| - tracing::trace!(actor = self.actor_id, "rearranged task finished"); |
206 |
| - if stop_rearrange_tx.is_some() { |
207 |
| - tracing::error!(actor = self.actor_id, "rearrangement finished passively"); |
208 |
| - } |
209 | 208 |
|
210 |
| - // 8. Consume remainings. |
211 |
| - let mut finish_on_barrier = |msg: &Message| { |
212 |
| - if let Some(barrier) = msg.as_barrier() { |
213 |
| - self.progress.finish(barrier.epoch.curr); |
| 209 | + // 7. Rearranged task finished. |
| 210 | + // The reason for finish must be that we told it to stop. |
| 211 | + tracing::trace!(actor = self.actor_id, "rearranged task finished"); |
| 212 | + if stop_rearrange_tx.is_some() { |
| 213 | + tracing::error!(actor = self.actor_id, "rearrangement finished passively"); |
214 | 214 | }
|
215 |
| - }; |
216 | 215 |
|
217 |
| - // Note that there may still be some messages in `rearranged`. However the rearranged |
218 |
| - // barriers must be ignored, we should take the phantoms. |
219 |
| - #[for_await] |
220 |
| - for msg in rearranged { |
221 |
| - let msg: RearrangedMessage = msg?; |
222 |
| - let Some(msg) = msg.phantom_into() else { continue }; |
223 |
| - finish_on_barrier(&msg); |
224 |
| - yield msg; |
| 216 | + // 8. Consume remainings. |
| 217 | + // Note that there may still be some messages in `rearranged`. However the |
| 218 | + // rearranged barriers must be ignored, we should take the phantoms. |
| 219 | + #[for_await] |
| 220 | + for msg in rearranged { |
| 221 | + let msg: RearrangedMessage = msg?; |
| 222 | + let Some(msg) = msg.phantom_into() else { continue }; |
| 223 | + if let Some(barrier) = msg.as_barrier() { |
| 224 | + self.progress.finish(barrier.epoch.curr); |
| 225 | + } |
| 226 | + yield msg; |
| 227 | + } |
225 | 228 | }
|
226 | 229 |
|
227 |
| - let mut remaining_upstream = upstream; |
228 |
| - |
229 | 230 | // Consume remaining upstream.
|
230 | 231 | tracing::trace!(actor = self.actor_id, "begin to consume remaining upstream");
|
231 | 232 |
|
232 | 233 | #[for_await]
|
233 |
| - for msg in &mut remaining_upstream { |
| 234 | + for msg in upstream { |
234 | 235 | let msg: Message = msg?;
|
235 |
| - finish_on_barrier(&msg); |
| 236 | + if let Some(barrier) = msg.as_barrier() { |
| 237 | + self.progress.finish(barrier.epoch.curr); |
| 238 | + } |
236 | 239 | yield msg;
|
237 | 240 | }
|
238 | 241 | } else {
|
|
0 commit comments