Skip to content

Commit 52a8036

Browse files
authored
fix(component validation): make tests deterministic through absolute comparisons instead of bounds checks (vectordotdev#17956)
### Changes - Fixes a bug which was occasionally circumvented by comparing against a lower bound for some of the _bytes metrics- it seems the offered reasoning for the lower bound (inability to synchronize with the component's internal logic) was inaccurate. - The bug itself was that we were converting the `EventData` read from the yaml file, into a proper Event (and thus generating metadata for it), in two places- first when sending the event as input in the input runner, and again in the validator to construct the expected result. Because the event construction adds a timestamp, and because our determination of timestamp precision is based on whether there are trailing zeros that can be trimmed, this created a scenario where the actual event input to the component under test, and the event used to calculated expected metrics, had a timestamp that varied in it's number of bytes. This of course did not happen all of the time, which meant that the tests sometimes passed and sometimes failed. - The fix for this is to create the proper Event immediately after parsing the EventData from the yaml file (thereby only creating it once). - Moves the calculation of expected metrics, out of the Validators and up to the input/output runners. This removes the need for a lot of logic that was previously being done in the Validators. - Increases the static sleep we have when initializing the runner topology from 1s to 2s, and additionally increased the number of internal metric ticks to wait for before shutting down the telemetry from 2 ticks to 3. I found through experimentation that occasionally we would receive no events. The two attributes were increased independently and it was only through increasing both that the pass rate became 100%. - Extracts duplicated code in the sources validator functions into helper functions. ### Testing Done The tests were run in a loop, stopping on the first failure. This method was used to calibrate the waits we have on the topology as mentioned above. Before the current settings, the maximum sequential passes was about 100. With the current settings, the loop was manually stopped at about 1.2k iterations.
1 parent 7d0db6b commit 52a8036

File tree

10 files changed

+426
-444
lines changed

10 files changed

+426
-444
lines changed

src/components/validation/mod.rs

+13
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,19 @@ macro_rules! register_validatable_component {
170170
};
171171
}
172172

173+
/// Input and Output runners populate this structure as they send and receive events.
174+
/// The structure is passed into the validator to use as the expected values for the
175+
/// metrics that the components under test actually output.
176+
#[derive(Default)]
177+
pub struct RunnerMetrics {
178+
pub received_events_total: u64,
179+
pub received_event_bytes_total: u64,
180+
pub received_bytes_total: u64,
181+
pub sent_bytes_total: u64, // a reciprocal for received_bytes_total
182+
pub sent_event_bytes_total: u64,
183+
pub sent_events_total: u64,
184+
}
185+
173186
#[cfg(all(test, feature = "component-validation-tests"))]
174187
mod tests {
175188
use std::{

src/components/validation/resources/event.rs

+100-11
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
1+
use bytes::BytesMut;
12
use serde::Deserialize;
3+
use snafu::Snafu;
4+
use tokio_util::codec::Encoder as _;
5+
6+
use crate::codecs::Encoder;
7+
use codecs::{
8+
encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
9+
NewlineDelimitedEncoder,
10+
};
211
use vector_core::event::{Event, LogEvent};
312

4-
/// An event used in a test case.
13+
/// A test case event for deserialization from yaml file.
14+
/// This is an intermediary step to TestEvent.
515
#[derive(Clone, Debug, Deserialize)]
616
#[serde(untagged)]
7-
pub enum TestEvent {
17+
pub enum RawTestEvent {
818
/// The event is used, as-is, without modification.
919
Passthrough(EventData),
1020

@@ -20,15 +30,6 @@ pub enum TestEvent {
2030
Modified { modified: bool, event: EventData },
2131
}
2232

23-
impl TestEvent {
24-
pub fn into_event(self) -> Event {
25-
match self {
26-
Self::Passthrough(event) => event.into_event(),
27-
Self::Modified { event, .. } => event.into_event(),
28-
}
29-
}
30-
}
31-
3233
#[derive(Clone, Debug, Deserialize)]
3334
#[serde(untagged)]
3435
pub enum EventData {
@@ -44,3 +45,91 @@ impl EventData {
4445
}
4546
}
4647
}
48+
49+
/// An event used in a test case.
50+
/// It is important to have created the event with all fields, immediately after deserializing from the
51+
/// test case definition yaml file. This ensures that the event data we are using in the expected/actual
52+
/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event
53+
/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in
54+
/// the event which may or may not have the same millisecond precision as it's counterpart.
55+
#[derive(Clone, Debug, Deserialize)]
56+
#[serde(from = "RawTestEvent")]
57+
#[serde(untagged)]
58+
pub enum TestEvent {
59+
/// The event is used, as-is, without modification.
60+
Passthrough(Event),
61+
62+
/// The event is potentially modified by the external resource.
63+
///
64+
/// The modification made is dependent on the external resource, but this mode is made available
65+
/// for when a test case wants to exercise the failure path, but cannot cause a failure simply
66+
/// by constructing the event in a certain way i.e. adding an invalid field, or removing a
67+
/// required field, or using an invalid field value, and so on.
68+
///
69+
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
70+
/// is malformed in some way, which can be achieved without this test event variant.
71+
Modified { modified: bool, event: Event },
72+
}
73+
74+
impl TestEvent {
75+
#[allow(clippy::missing_const_for_fn)] // const cannot run destructor
76+
pub fn into_event(self) -> Event {
77+
match self {
78+
Self::Passthrough(event) => event,
79+
Self::Modified { event, .. } => event,
80+
}
81+
}
82+
}
83+
84+
#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
85+
pub enum RawTestEventParseError {}
86+
87+
impl From<RawTestEvent> for TestEvent {
88+
fn from(other: RawTestEvent) -> Self {
89+
match other {
90+
RawTestEvent::Passthrough(event_data) => {
91+
TestEvent::Passthrough(event_data.into_event())
92+
}
93+
RawTestEvent::Modified { modified, event } => TestEvent::Modified {
94+
modified,
95+
event: event.into_event(),
96+
},
97+
}
98+
}
99+
}
100+
101+
pub fn encode_test_event(
102+
encoder: &mut Encoder<encoding::Framer>,
103+
buf: &mut BytesMut,
104+
event: TestEvent,
105+
) {
106+
match event {
107+
TestEvent::Passthrough(event) => {
108+
// Encode the event normally.
109+
encoder
110+
.encode(event, buf)
111+
.expect("should not fail to encode input event");
112+
}
113+
TestEvent::Modified { event, .. } => {
114+
// This is a little fragile, but we check what serializer this encoder uses, and based
115+
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
116+
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
117+
// versa.
118+
let mut alt_encoder = if encoder.serializer().supports_json() {
119+
Encoder::<encoding::Framer>::new(
120+
LengthDelimitedEncoder::new().into(),
121+
LogfmtSerializer::new().into(),
122+
)
123+
} else {
124+
Encoder::<encoding::Framer>::new(
125+
NewlineDelimitedEncoder::new().into(),
126+
JsonSerializer::new(MetricTagValues::default()).into(),
127+
)
128+
};
129+
130+
alt_encoder
131+
.encode(event, buf)
132+
.expect("should not fail to encode input event");
133+
}
134+
}
135+
}

src/components/validation/resources/http.rs

+12-57
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
collections::VecDeque,
3+
future::Future,
34
net::{IpAddr, SocketAddr},
45
str::FromStr,
56
sync::Arc,
@@ -11,26 +12,18 @@ use axum::{
1112
Router,
1213
};
1314
use bytes::BytesMut;
14-
use codecs::{
15-
encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
16-
NewlineDelimitedEncoder,
17-
};
1815
use http::{Method, Request, StatusCode, Uri};
1916
use hyper::{Body, Client, Server};
20-
use std::future::Future;
2117
use tokio::{
2218
select,
2319
sync::{mpsc, oneshot, Mutex, Notify},
2420
};
25-
use tokio_util::codec::{Decoder, Encoder as _};
26-
use vector_core::event::Event;
21+
use tokio_util::codec::Decoder;
2722

28-
use crate::{
29-
codecs::Encoder,
30-
components::validation::sync::{Configuring, TaskCoordinator},
31-
};
23+
use crate::components::validation::sync::{Configuring, TaskCoordinator};
24+
use vector_core::event::Event;
3225

33-
use super::{ResourceCodec, ResourceDirection, TestEvent};
26+
use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent};
3427

3528
/// An HTTP resource.
3629
#[derive(Clone)]
@@ -67,7 +60,7 @@ impl HttpResourceConfig {
6760
self,
6861
direction: ResourceDirection,
6962
codec: ResourceCodec,
70-
output_tx: mpsc::Sender<Event>,
63+
output_tx: mpsc::Sender<Vec<Event>>,
7164
task_coordinator: &TaskCoordinator<Configuring>,
7265
) {
7366
match direction {
@@ -230,7 +223,7 @@ fn spawn_input_http_client(
230223
fn spawn_output_http_server(
231224
config: HttpResourceConfig,
232225
codec: ResourceCodec,
233-
output_tx: mpsc::Sender<Event>,
226+
output_tx: mpsc::Sender<Vec<Event>>,
234227
task_coordinator: &TaskCoordinator<Configuring>,
235228
) {
236229
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
@@ -252,12 +245,10 @@ fn spawn_output_http_server(
252245
loop {
253246
match decoder.decode_eof(&mut body) {
254247
Ok(Some((events, _byte_size))) => {
255-
for event in events {
256-
output_tx
257-
.send(event)
258-
.await
259-
.expect("should not fail to send output event");
260-
}
248+
output_tx
249+
.send(events.to_vec())
250+
.await
251+
.expect("should not fail to send output event");
261252
}
262253
Ok(None) => return StatusCode::OK.into_response(),
263254
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
@@ -290,7 +281,7 @@ fn spawn_output_http_server(
290281
fn spawn_output_http_client(
291282
_config: HttpResourceConfig,
292283
_codec: ResourceCodec,
293-
_output_tx: mpsc::Sender<Event>,
284+
_output_tx: mpsc::Sender<Vec<Event>>,
294285
_task_coordinator: &TaskCoordinator<Configuring>,
295286
) {
296287
// TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
@@ -400,39 +391,3 @@ fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
400391

401392
SocketAddr::from((uri_host, uri_port))
402393
}
403-
404-
pub fn encode_test_event(
405-
encoder: &mut Encoder<encoding::Framer>,
406-
buf: &mut BytesMut,
407-
event: TestEvent,
408-
) {
409-
match event {
410-
TestEvent::Passthrough(event) => {
411-
// Encode the event normally.
412-
encoder
413-
.encode(event.into_event(), buf)
414-
.expect("should not fail to encode input event");
415-
}
416-
TestEvent::Modified { event, .. } => {
417-
// This is a little fragile, but we check what serializer this encoder uses, and based
418-
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
419-
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
420-
// versa.
421-
let mut alt_encoder = if encoder.serializer().supports_json() {
422-
Encoder::<encoding::Framer>::new(
423-
LengthDelimitedEncoder::new().into(),
424-
LogfmtSerializer::new().into(),
425-
)
426-
} else {
427-
Encoder::<encoding::Framer>::new(
428-
NewlineDelimitedEncoder::new().into(),
429-
JsonSerializer::new(MetricTagValues::default()).into(),
430-
)
431-
};
432-
433-
alt_encoder
434-
.encode(event.into_event(), buf)
435-
.expect("should not fail to encode input event");
436-
}
437-
}
438-
}

src/components/validation/resources/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use vector_core::{config::DataType, event::Event};
1313

1414
use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};
1515

16-
pub use self::event::{EventData, TestEvent};
17-
pub use self::http::{encode_test_event, HttpResourceConfig};
16+
pub use self::event::{encode_test_event, TestEvent};
17+
pub use self::http::HttpResourceConfig;
1818

1919
use super::sync::{Configuring, TaskCoordinator};
2020

@@ -308,7 +308,7 @@ impl ExternalResource {
308308
/// Spawns this resource for use as an output for a sink.
309309
pub fn spawn_as_output(
310310
self,
311-
output_tx: mpsc::Sender<Event>,
311+
output_tx: mpsc::Sender<Vec<Event>>,
312312
task_coordinator: &TaskCoordinator<Configuring>,
313313
) {
314314
match self.definition {

src/components/validation/runner/io.rs

+17-14
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ use crate::{
2727

2828
#[derive(Clone)]
2929
pub struct EventForwardService {
30-
tx: mpsc::Sender<Event>,
30+
tx: mpsc::Sender<Vec<Event>>,
3131
}
3232

33-
impl From<mpsc::Sender<Event>> for EventForwardService {
34-
fn from(tx: mpsc::Sender<Event>) -> Self {
33+
impl From<mpsc::Sender<Vec<Event>>> for EventForwardService {
34+
fn from(tx: mpsc::Sender<Vec<Event>>) -> Self {
3535
Self { tx }
3636
}
3737
}
@@ -42,14 +42,17 @@ impl VectorService for EventForwardService {
4242
&self,
4343
request: tonic::Request<PushEventsRequest>,
4444
) -> Result<tonic::Response<PushEventsResponse>, Status> {
45-
let events = request.into_inner().events.into_iter().map(Event::from);
46-
47-
for event in events {
48-
self.tx
49-
.send(event)
50-
.await
51-
.expect("event forward rx should not close first");
52-
}
45+
let events = request
46+
.into_inner()
47+
.events
48+
.into_iter()
49+
.map(Event::from)
50+
.collect();
51+
52+
self.tx
53+
.send(events)
54+
.await
55+
.expect("event forward rx should not close first");
5356

5457
Ok(tonic::Response::new(PushEventsResponse {}))
5558
}
@@ -74,7 +77,7 @@ pub struct InputEdge {
7477
pub struct OutputEdge {
7578
listen_addr: GrpcAddress,
7679
service: VectorServer<EventForwardService>,
77-
rx: mpsc::Receiver<Event>,
80+
rx: mpsc::Receiver<Vec<Event>>,
7881
}
7982

8083
impl InputEdge {
@@ -129,7 +132,7 @@ impl OutputEdge {
129132
pub fn spawn_output_server(
130133
self,
131134
task_coordinator: &TaskCoordinator<Configuring>,
132-
) -> mpsc::Receiver<Event> {
135+
) -> mpsc::Receiver<Vec<Event>> {
133136
spawn_grpc_server(self.listen_addr, self.service, task_coordinator);
134137
self.rx
135138
}
@@ -184,5 +187,5 @@ pub fn spawn_grpc_server<S>(
184187

185188
pub struct ControlledEdges {
186189
pub input: Option<mpsc::Sender<TestEvent>>,
187-
pub output: Option<mpsc::Receiver<Event>>,
190+
pub output: Option<mpsc::Receiver<Vec<Event>>>,
188191
}

0 commit comments

Comments
 (0)