Skip to content

Commit 8022464

Browse files
authored
fix: propagate config build error instead of panicking (#18124)
* fix: propagate config build error instead of panicking * update usages in integration tests * propagate the actual error type instead of string
1 parent 8068f1d commit 8022464

File tree

23 files changed

+82
-60
lines changed

23 files changed

+82
-60
lines changed

lib/codecs/src/decoding/format/protobuf.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,8 @@ pub struct ProtobufDeserializerConfig {
3333

3434
impl ProtobufDeserializerConfig {
3535
/// Build the `ProtobufDeserializer` from this configuration.
36-
pub fn build(&self) -> ProtobufDeserializer {
37-
// TODO return a Result instead.
38-
ProtobufDeserializer::try_from(self).unwrap()
36+
pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
37+
ProtobufDeserializer::try_from(self)
3938
}
4039

4140
/// Return the type of event build by this deserializer.

lib/codecs/src/decoding/mod.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -277,16 +277,18 @@ impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
277277

278278
impl DeserializerConfig {
279279
/// Build the `Deserializer` from this configuration.
280-
pub fn build(&self) -> Deserializer {
280+
pub fn build(&self) -> vector_common::Result<Deserializer> {
281281
match self {
282-
DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()),
283-
DeserializerConfig::Json(config) => Deserializer::Json(config.build()),
284-
DeserializerConfig::Protobuf(config) => Deserializer::Protobuf(config.build()),
282+
DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
283+
DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
284+
DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
285285
#[cfg(feature = "syslog")]
286-
DeserializerConfig::Syslog(config) => Deserializer::Syslog(config.build()),
287-
DeserializerConfig::Native => Deserializer::Native(NativeDeserializerConfig.build()),
288-
DeserializerConfig::NativeJson(config) => Deserializer::NativeJson(config.build()),
289-
DeserializerConfig::Gelf(config) => Deserializer::Gelf(config.build()),
286+
DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
287+
DeserializerConfig::Native => {
288+
Ok(Deserializer::Native(NativeDeserializerConfig.build()))
289+
}
290+
DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
291+
DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
290292
}
291293
}
292294

src/codecs/decoding/config.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ impl DecodingConfig {
4141
}
4242

4343
/// Builds a `Decoder` from the provided configuration.
44-
pub fn build(&self) -> Decoder {
44+
pub fn build(&self) -> vector_common::Result<Decoder> {
4545
// Build the framer.
4646
let framer = self.framing.build();
4747

4848
// Build the deserializer.
49-
let deserializer = self.decoding.build();
49+
let deserializer = self.decoding.build()?;
5050

51-
Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace)
51+
Ok(Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace))
5252
}
5353
}

src/components/validation/resources/http.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,15 @@ impl HttpResourceConfig {
6262
codec: ResourceCodec,
6363
output_tx: mpsc::Sender<Vec<Event>>,
6464
task_coordinator: &TaskCoordinator<Configuring>,
65-
) {
65+
) -> vector_common::Result<()> {
6666
match direction {
6767
// We'll pull data from the sink.
68-
ResourceDirection::Pull => {
69-
spawn_output_http_client(self, codec, output_tx, task_coordinator)
70-
}
68+
ResourceDirection::Pull => Ok(spawn_output_http_client(
69+
self,
70+
codec,
71+
output_tx,
72+
task_coordinator,
73+
)),
7174
// The sink will push data to us.
7275
ResourceDirection::Push => {
7376
spawn_output_http_server(self, codec, output_tx, task_coordinator)
@@ -227,12 +230,12 @@ fn spawn_output_http_server(
227230
codec: ResourceCodec,
228231
output_tx: mpsc::Sender<Vec<Event>>,
229232
task_coordinator: &TaskCoordinator<Configuring>,
230-
) {
233+
) -> vector_common::Result<()> {
231234
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
232235
// via an output sender. We accept/collect events until we're told to shutdown.
233236

234237
// First, we'll build and spawn our HTTP server.
235-
let decoder = codec.into_decoder();
238+
let decoder = codec.into_decoder()?;
236239

237240
let (_, http_server_shutdown_tx) =
238241
spawn_http_server(task_coordinator, &config, move |request| {
@@ -276,6 +279,7 @@ fn spawn_output_http_server(
276279

277280
debug!("HTTP server external output resource completed.");
278281
});
282+
Ok(())
279283
}
280284

281285
/// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.

src/components/validation/resources/mod.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -96,24 +96,24 @@ impl ResourceCodec {
9696
///
9797
/// The decoder is generated as an inverse to the input codec: if an encoding configuration was
9898
/// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
99-
pub fn into_decoder(&self) -> Decoder {
99+
pub fn into_decoder(&self) -> vector_common::Result<Decoder> {
100100
let (framer, deserializer) = match self {
101101
Self::Decoding(config) => return config.build(),
102102
Self::Encoding(config) => (
103103
encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
104-
serializer_config_to_deserializer(config.config()),
104+
serializer_config_to_deserializer(config.config())?,
105105
),
106106
Self::EncodingWithFraming(config) => {
107107
let (maybe_framing, serializer) = config.config();
108108
let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
109109
(
110110
encoder_framing_to_decoding_framer(framing),
111-
serializer_config_to_deserializer(serializer),
111+
serializer_config_to_deserializer(serializer)?,
112112
)
113113
}
114114
};
115115

116-
Decoder::new(framer, deserializer)
116+
Ok(Decoder::new(framer, deserializer))
117117
}
118118
}
119119

@@ -178,7 +178,9 @@ fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> enco
178178
framing_config.build()
179179
}
180180

181-
fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Deserializer {
181+
fn serializer_config_to_deserializer(
182+
config: &SerializerConfig,
183+
) -> vector_common::Result<decoding::Deserializer> {
182184
let deserializer_config = match config {
183185
SerializerConfig::Avro { .. } => todo!(),
184186
SerializerConfig::Csv { .. } => todo!(),
@@ -311,7 +313,7 @@ impl ExternalResource {
311313
self,
312314
output_tx: mpsc::Sender<Vec<Event>>,
313315
task_coordinator: &TaskCoordinator<Configuring>,
314-
) {
316+
) -> vector_common::Result<()> {
315317
match self.definition {
316318
ResourceDefinition::Http(http_config) => {
317319
http_config.spawn_as_output(self.direction, self.codec, output_tx, task_coordinator)

src/components/validation/runner/mod.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl Runner {
191191
}
192192
}
193193

194-
pub async fn run_validation(self) -> Result<Vec<RunnerResults>, String> {
194+
pub async fn run_validation(self) -> Result<Vec<RunnerResults>, vector_common::Error> {
195195
// Initialize our test environment.
196196
initialize_test_environment();
197197

@@ -251,7 +251,7 @@ impl Runner {
251251
&self.configuration,
252252
&input_task_coordinator,
253253
&output_task_coordinator,
254-
);
254+
)?;
255255
let input_tx = runner_input.into_sender(controlled_edges.input);
256256
let output_rx = runner_output.into_receiver(controlled_edges.output);
257257
debug!("External resource (if any) and controlled edges built and spawned.");
@@ -413,7 +413,7 @@ fn build_external_resource(
413413
configuration: &ValidationConfiguration,
414414
input_task_coordinator: &TaskCoordinator<Configuring>,
415415
output_task_coordinator: &TaskCoordinator<Configuring>,
416-
) -> (RunnerInput, RunnerOutput, Option<Encoder<encoding::Framer>>) {
416+
) -> Result<(RunnerInput, RunnerOutput, Option<Encoder<encoding::Framer>>), vector_common::Error> {
417417
let component_type = configuration.component_type();
418418
let maybe_external_resource = configuration.external_resource();
419419
let maybe_encoder = maybe_external_resource
@@ -430,15 +430,15 @@ fn build_external_resource(
430430
maybe_external_resource.expect("a source must always have an external resource");
431431
resource.spawn_as_input(rx, input_task_coordinator);
432432

433-
(
433+
Ok((
434434
RunnerInput::External(tx),
435435
RunnerOutput::Controlled,
436436
maybe_encoder,
437-
)
437+
))
438438
}
439439
ComponentType::Transform => {
440440
// Transforms have no external resources.
441-
(RunnerInput::Controlled, RunnerOutput::Controlled, None)
441+
Ok((RunnerInput::Controlled, RunnerOutput::Controlled, None))
442442
}
443443
ComponentType::Sink => {
444444
// As an external resource for a sink, we create a channel that the validation runner
@@ -448,13 +448,13 @@ fn build_external_resource(
448448
let (tx, rx) = mpsc::channel(1024);
449449
let resource =
450450
maybe_external_resource.expect("a sink must always have an external resource");
451-
resource.spawn_as_output(tx, output_task_coordinator);
451+
resource.spawn_as_output(tx, output_task_coordinator)?;
452452

453-
(
453+
Ok((
454454
RunnerInput::Controlled,
455455
RunnerOutput::External(rx),
456456
maybe_encoder,
457-
)
457+
))
458458
}
459459
}
460460
}

src/sources/amqp.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ fn default_offset_key() -> OptionalValuePath {
127127
impl_generate_config_from_default!(AmqpSourceConfig);
128128

129129
impl AmqpSourceConfig {
130-
fn decoder(&self, log_namespace: LogNamespace) -> Decoder {
130+
fn decoder(&self, log_namespace: LogNamespace) -> vector_common::Result<Decoder> {
131131
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
132132
}
133133
}
@@ -317,7 +317,8 @@ async fn receive_event(
317317
msg: Delivery,
318318
) -> Result<(), ()> {
319319
let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
320-
let mut stream = FramedRead::new(payload, config.decoder(log_namespace));
320+
let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
321+
let mut stream = FramedRead::new(payload, decoder);
321322

322323
// Extract timestamp from AMQP message
323324
let timestamp = msg

src/sources/aws_kinesis_firehose/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
143143
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
144144
let log_namespace = cx.log_namespace(self.log_namespace);
145145
let decoder =
146-
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
146+
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
147+
.build()?;
147148

148149
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
149150

src/sources/aws_s3/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,8 @@ impl AwsS3Config {
241241
.await?;
242242

243243
let decoder =
244-
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
244+
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
245+
.build()?;
245246

246247
match self.sqs {
247248
Some(ref sqs) => {

src/sources/aws_sqs/config.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ impl SourceConfig for AwsSqsConfig {
111111

112112
let client = self.build_client(&cx).await?;
113113
let decoder =
114-
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
114+
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
115+
.build()?;
115116
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
116117

117118
Ok(Box::pin(

src/sources/aws_sqs/source.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ mod tests {
245245
config.decoding,
246246
LogNamespace::Vector,
247247
)
248-
.build(),
248+
.build()
249+
.unwrap(),
249250
"aws_sqs",
250251
b"test",
251252
Some(now),
@@ -297,7 +298,8 @@ mod tests {
297298
config.decoding,
298299
LogNamespace::Legacy,
299300
)
300-
.build(),
301+
.build()
302+
.unwrap(),
301303
"aws_sqs",
302304
b"test",
303305
Some(now),

src/sources/datadog_agent/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ impl SourceConfig for DatadogAgentConfig {
156156
.clone();
157157

158158
let decoder =
159-
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
159+
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
160+
.build()?;
160161

161162
let tls = MaybeTlsSettings::from_config(&self.tls, true)?;
162163
let source = DatadogAgentSource::new(

src/sources/demo_logs.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,8 @@ impl SourceConfig for DemoLogsConfig {
292292

293293
self.format.validate()?;
294294
let decoder =
295-
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
295+
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
296+
.build()?;
296297
Ok(Box::pin(demo_logs_source(
297298
self.interval,
298299
self.count,
@@ -361,7 +362,8 @@ mod tests {
361362
default_decoding(),
362363
LogNamespace::Legacy,
363364
)
364-
.build();
365+
.build()
366+
.unwrap();
365367
demo_logs_source(
366368
config.interval,
367369
config.count,

src/sources/exec/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ impl SourceConfig for ExecConfig {
234234
.clone()
235235
.unwrap_or_else(|| self.decoding.default_stream_framing());
236236
let decoder =
237-
DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build();
237+
DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build()?;
238238

239239
match &self.mode {
240240
Mode::Scheduled => {

src/sources/file_descriptors/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub trait FileDescriptorConfig: NamedComponent {
6262
let framing = self
6363
.framing()
6464
.unwrap_or_else(|| decoding.default_stream_framing());
65-
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build();
65+
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
6666

6767
let (sender, receiver) = mpsc::channel(1024);
6868

src/sources/gcp_pubsub.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ impl SourceConfig for PubsubConfig {
314314
self.decoding.clone(),
315315
log_namespace,
316316
)
317-
.build(),
317+
.build()?,
318318
acknowledgements: cx.do_acknowledgements(self.acknowledgements),
319319
shutdown: cx.shutdown,
320320
out: cx.out,

src/sources/heroku_logs.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ impl SourceConfig for LogplexConfig {
163163
let log_namespace = cx.log_namespace(self.log_namespace);
164164

165165
let decoder =
166-
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
166+
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
167+
.build()?;
167168

168169
let source = LogplexSource {
169170
query_parameters: self.query_parameters.clone(),

src/sources/http_client/client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl SourceConfig for HttpClientConfig {
194194
let log_namespace = cx.log_namespace(self.log_namespace);
195195

196196
// build the decoder
197-
let decoder = self.get_decoding_config(Some(log_namespace)).build();
197+
let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
198198

199199
let content_type = self.decoding.content_type(&self.framing).to_string();
200200

src/sources/http_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
314314
#[typetag::serde(name = "http_server")]
315315
impl SourceConfig for SimpleHttpConfig {
316316
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
317-
let decoder = self.get_decoding_config()?.build();
317+
let decoder = self.get_decoding_config()?.build()?;
318318
let log_namespace = cx.log_namespace(self.log_namespace);
319319

320320
let source = SimpleHttpSource {

src/sources/kafka.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,8 @@ impl SourceConfig for KafkaSourceConfig {
296296

297297
let consumer = create_consumer(self)?;
298298
let decoder =
299-
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
299+
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
300+
.build()?;
300301
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
301302

302303
Ok(Box::pin(kafka_source(
@@ -1158,7 +1159,8 @@ mod integration_test {
11581159
config.decoding.clone(),
11591160
log_namespace,
11601161
)
1161-
.build();
1162+
.build()
1163+
.unwrap();
11621164

11631165
tokio::spawn(kafka_source(
11641166
config,

0 commit comments

Comments
 (0)