Skip to content

Commit f384eb1

Browse files
committed
checkpoint
Signed-off-by: Jean Mertz <[email protected]>
1 parent 41f764c commit f384eb1

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

src/sinks/pulsar.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -280,19 +280,19 @@ impl PulsarSink {
280280
if let PulsarSinkState::Sending(fut) = &mut self.state {
281281
let (producer, result, metadata, finalizers, source_id) = ready!(fut.as_mut().poll(cx));
282282

283+
let source = source_id.and_then(|id| {
284+
self.sources_details
285+
.get(id)
286+
.map(|details| details.key.id().to_owned())
287+
});
288+
283289
self.state = PulsarSinkState::Ready(producer);
284290
self.in_flight.push(Box::pin(async move {
285291
let result = match result {
286292
Ok(fut) => fut.await,
287293
Err(error) => Err(error),
288294
};
289295

290-
let source = source_id.and_then(|id| {
291-
self.sources_details
292-
.get(id)
293-
.map(|details| details.key.id().to_owned())
294-
});
295-
296296
(result, metadata, finalizers, source)
297297
}));
298298
}

0 commit comments

Comments
 (0)