Skip to content

Commit 1e97a2f

Browse files
addisonjspencergilbertfuchsnjneuronull
authored
enhancement(pulsar sink): Refactor to use StreamSink (#14345)
This commit heavily refactors the Pulsar Sink to use the StreamSink interface and is modeled after the Kafka Sink. It also adds additional features that bring it in line with Kafka Sink feature set. This includes: * Refactoring to use StreamSink instead of Sink interface. * Supports dynamic topics using a topic template * Refactor configurations in advance of adding Pulsar source * Rework message parsing to support logs and metrics, with support for dynamic keys and properties Co-authored-by: Spencer Gilbert <[email protected]> Co-authored-by: Nathan Fox <[email protected]> Co-authored-by: neuronull <[email protected]>
1 parent 2d72f82 commit 1e97a2f

File tree

16 files changed

+1000
-601
lines changed

16 files changed

+1000
-601
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ sinks-new_relic_logs = ["sinks-http"]
688688
sinks-new_relic = []
689689
sinks-papertrail = ["dep:syslog"]
690690
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"]
691-
sinks-pulsar = ["dep:apache-avro", "dep:pulsar"]
691+
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
692692
sinks-redis = ["dep:redis"]
693693
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
694694
sinks-socket = ["sinks-utils-udp"]

scripts/integration/pulsar/test.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
features:
22
- pulsar-integration-tests
33

4-
test_filter: '::pulsar::'
4+
test_filter: '::pulsar::integration_tests::'
55

66
env:
77
PULSAR_ADDRESS: pulsar://pulsar:6650

src/internal_events/pulsar.rs

+23
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,26 @@ impl InternalEvent for PulsarSendingError {
3333
});
3434
}
3535
}
36+
37+
pub struct PulsarPropertyExtractionError<F: std::fmt::Display> {
38+
pub property_field: F,
39+
}
40+
41+
impl<F: std::fmt::Display> InternalEvent for PulsarPropertyExtractionError<F> {
42+
fn emit(self) {
43+
error!(
44+
message = "Failed to extract properties. Value should be a map of String -> Bytes.",
45+
error_code = "extracting_property",
46+
error_type = error_type::PARSER_FAILED,
47+
stage = error_stage::PROCESSING,
48+
property_field = %self.property_field,
49+
internal_log_rate_limit = true,
50+
);
51+
counter!(
52+
"component_errors_total", 1,
53+
"error_code" => "extracting_property",
54+
"error_type" => error_type::PARSER_FAILED,
55+
"stage" => error_stage::PROCESSING,
56+
);
57+
}
58+
}

src/sinks/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ pub enum Sinks {
357357
/// Publish observability events to Apache Pulsar topics.
358358
#[cfg(feature = "sinks-pulsar")]
359359
#[configurable(metadata(docs::label = "Pulsar"))]
360-
Pulsar(pulsar::PulsarSinkConfig),
360+
Pulsar(pulsar::config::PulsarSinkConfig),
361361

362362
/// Publish observability data to Redis.
363363
#[cfg(feature = "sinks-redis")]

0 commit comments

Comments
 (0)