Skip to content

Commit bb6a914

Browse files
authored
chore(sources): Refactor struct SourceSender a bit (#23089)
* Clean up `inner` handling, as it should never be `None` * Rename some fields for clarity * Handle the unsent event count decrementing in one place
1 parent 34db5b0 commit bb6a914

File tree

1 file changed

+58
-72
lines changed

1 file changed

+58
-72
lines changed

src/source_sender/mod.rs

Lines changed: 58 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,27 @@ impl From<SourceSenderItem> for EventArray {
9494

9595
pub struct Builder {
9696
buf_size: usize,
97-
inner: Option<Inner>,
98-
named_inners: HashMap<String, Inner>,
97+
default_output: Option<Output>,
98+
named_outputs: HashMap<String, Output>,
9999
lag_time: Option<Histogram>,
100100
}
101101

102-
impl Builder {
103-
// https://github.com/rust-lang/rust/issues/73255
104-
#[allow(clippy::missing_const_for_fn)]
105-
pub fn with_buffer(self, n: usize) -> Self {
102+
impl Default for Builder {
103+
fn default() -> Self {
106104
Self {
107-
buf_size: n,
108-
inner: self.inner,
109-
named_inners: self.named_inners,
110-
lag_time: self.lag_time,
105+
buf_size: CHUNK_SIZE,
106+
default_output: None,
107+
named_outputs: Default::default(),
108+
lag_time: Some(histogram!(LAG_TIME_NAME)),
111109
}
112110
}
111+
}
112+
113+
impl Builder {
114+
pub const fn with_buffer(mut self, n: usize) -> Self {
115+
self.buf_size = n;
116+
self
117+
}
113118

114119
pub fn add_source_output(
115120
&mut self,
@@ -124,54 +129,47 @@ impl Builder {
124129
};
125130
match output.port {
126131
None => {
127-
let (inner, rx) = Inner::new_with_buffer(
132+
let (output, rx) = Output::new_with_buffer(
128133
self.buf_size,
129134
DEFAULT_OUTPUT.to_owned(),
130135
lag_time,
131136
log_definition,
132137
output_id,
133138
);
134-
self.inner = Some(inner);
139+
self.default_output = Some(output);
135140
rx
136141
}
137142
Some(name) => {
138-
let (inner, rx) = Inner::new_with_buffer(
143+
let (output, rx) = Output::new_with_buffer(
139144
self.buf_size,
140145
name.clone(),
141146
lag_time,
142147
log_definition,
143148
output_id,
144149
);
145-
self.named_inners.insert(name, inner);
150+
self.named_outputs.insert(name, output);
146151
rx
147152
}
148153
}
149154
}
150155

151-
// https://github.com/rust-lang/rust/issues/73255
152-
#[allow(clippy::missing_const_for_fn)]
153156
pub fn build(self) -> SourceSender {
154157
SourceSender {
155-
inner: self.inner,
156-
named_inners: self.named_inners,
158+
default_output: self.default_output.expect("no default output"),
159+
named_outputs: self.named_outputs,
157160
}
158161
}
159162
}
160163

161164
#[derive(Debug, Clone)]
162165
pub struct SourceSender {
163-
inner: Option<Inner>,
164-
named_inners: HashMap<String, Inner>,
166+
default_output: Output,
167+
named_outputs: HashMap<String, Output>,
165168
}
166169

167170
impl SourceSender {
168171
pub fn builder() -> Builder {
169-
Builder {
170-
buf_size: CHUNK_SIZE,
171-
inner: None,
172-
named_inners: Default::default(),
173-
lag_time: Some(histogram!(LAG_TIME_NAME)),
174-
}
172+
Builder::default()
175173
}
176174

177175
#[cfg(any(test, feature = "test-utils"))]
@@ -181,12 +179,12 @@ impl SourceSender {
181179
component: "test".to_string().into(),
182180
port: None,
183181
};
184-
let (inner, rx) =
185-
Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id);
182+
let (default_output, rx) =
183+
Output::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id);
186184
(
187185
Self {
188-
inner: Some(inner),
189-
named_inners: Default::default(),
186+
default_output,
187+
named_outputs: Default::default(),
190188
},
191189
rx,
192190
)
@@ -254,7 +252,7 @@ impl SourceSender {
254252
component: "test".to_string().into(),
255253
port: Some(name.clone()),
256254
};
257-
let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None, None, output_id);
255+
let (output, recv) = Output::new_with_buffer(100, name.clone(), None, None, output_id);
258256
let recv = recv.into_stream().map(move |mut item| {
259257
item.events.iter_events_mut().for_each(|mut event| {
260258
let metadata = event.metadata_mut();
@@ -263,19 +261,15 @@ impl SourceSender {
263261
});
264262
item
265263
});
266-
self.named_inners.insert(name, inner);
264+
self.named_outputs.insert(name, output);
267265
recv
268266
}
269267

270268
/// Send an event to the default output.
271269
///
272270
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
273271
pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
274-
self.inner
275-
.as_mut()
276-
.expect("no default output")
277-
.send_event(event)
278-
.await
272+
self.default_output.send_event(event).await
279273
}
280274

281275
/// Send a stream of events to the default output.
@@ -286,11 +280,7 @@ impl SourceSender {
286280
S: Stream<Item = E> + Unpin,
287281
E: Into<Event> + ByteSizeOf,
288282
{
289-
self.inner
290-
.as_mut()
291-
.expect("no default output")
292-
.send_event_stream(events)
293-
.await
283+
self.default_output.send_event_stream(events).await
294284
}
295285

296286
/// Send a batch of events to the default output.
@@ -302,11 +292,7 @@ impl SourceSender {
302292
I: IntoIterator<Item = E>,
303293
<I as IntoIterator>::IntoIter: ExactSizeIterator,
304294
{
305-
self.inner
306-
.as_mut()
307-
.expect("no default output")
308-
.send_batch(events)
309-
.await
295+
self.default_output.send_batch(events).await
310296
}
311297

312298
/// Send a batch of events event to a named output.
@@ -318,7 +304,7 @@ impl SourceSender {
318304
I: IntoIterator<Item = E>,
319305
<I as IntoIterator>::IntoIter: ExactSizeIterator,
320306
{
321-
self.named_inners
307+
self.named_outputs
322308
.get_mut(name)
323309
.expect("unknown output")
324310
.send_batch(events)
@@ -368,9 +354,8 @@ impl Drop for UnsentEventCount {
368354
}
369355

370356
#[derive(Clone)]
371-
struct Inner {
372-
inner: LimitedSender<SourceSenderItem>,
373-
output: String,
357+
struct Output {
358+
sender: LimitedSender<SourceSenderItem>,
374359
lag_time: Option<Histogram>,
375360
events_sent: Registered<EventsSent>,
376361
/// The schema definition that will be attached to Log events sent through here
@@ -380,17 +365,17 @@ struct Inner {
380365
output_id: Arc<OutputId>,
381366
}
382367

383-
impl fmt::Debug for Inner {
368+
impl fmt::Debug for Output {
384369
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
385-
fmt.debug_struct("Inner")
386-
.field("inner", &self.inner)
387-
.field("output", &self.output)
370+
fmt.debug_struct("Output")
371+
.field("sender", &self.sender)
372+
.field("output_id", &self.output_id)
388373
// `metrics::Histogram` is missing `impl Debug`
389374
.finish()
390375
}
391376
}
392377

393-
impl Inner {
378+
impl Output {
394379
fn new_with_buffer(
395380
n: usize,
396381
output: String,
@@ -401,8 +386,7 @@ impl Inner {
401386
let (tx, rx) = channel::limited(n);
402387
(
403388
Self {
404-
inner: tx,
405-
output: output.clone(),
389+
sender: tx,
406390
lag_time,
407391
events_sent: register!(EventsSent::from(internal_event::Output(Some(
408392
output.into()
@@ -414,7 +398,11 @@ impl Inner {
414398
)
415399
}
416400

417-
async fn send(&mut self, mut events: EventArray) -> Result<(), ClosedError> {
401+
async fn send(
402+
&mut self,
403+
mut events: EventArray,
404+
unsent_event_count: &mut UnsentEventCount,
405+
) -> Result<(), ClosedError> {
418406
let send_reference = Instant::now();
419407
let reference = Utc::now().timestamp_millis();
420408
events
@@ -433,14 +421,15 @@ impl Inner {
433421

434422
let byte_size = events.estimated_json_encoded_size_of();
435423
let count = events.len();
436-
self.inner
424+
self.sender
437425
.send(SourceSenderItem {
438426
events,
439427
send_reference,
440428
})
441429
.await
442430
.map_err(|_| ClosedError)?;
443431
self.events_sent.emit(CountByteSize(count, byte_size));
432+
unsent_event_count.decr(count);
444433
Ok(())
445434
}
446435

@@ -449,11 +438,8 @@ impl Inner {
449438
// It's possible that the caller stops polling this future while it is blocked waiting
450439
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
451440
// `ComponentEventsDropped` events.
452-
let count = event.len();
453-
let mut unsent_event_count = UnsentEventCount::new(count);
454-
let res = self.send(event).await;
455-
unsent_event_count.discard();
456-
res
441+
let mut unsent_event_count = UnsentEventCount::new(event.len());
442+
self.send(event, &mut unsent_event_count).await
457443
}
458444

459445
async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
@@ -480,13 +466,13 @@ impl Inner {
480466
let events = events.into_iter().map(Into::into);
481467
let mut unsent_event_count = UnsentEventCount::new(events.len());
482468
for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
483-
let count = events.len();
484-
self.send(events).await.inspect_err(|_| {
485-
// The unsent event count is discarded here because the caller emits the
486-
// `StreamClosedError`.
487-
unsent_event_count.discard();
488-
})?;
489-
unsent_event_count.decr(count);
469+
self.send(events, &mut unsent_event_count)
470+
.await
471+
.inspect_err(|_| {
472+
// The unsent event count is discarded here because the callee emits the
473+
// `StreamClosedError`.
474+
unsent_event_count.discard();
475+
})?;
490476
}
491477
Ok(())
492478
}

0 commit comments

Comments
 (0)