-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(new source): Initial mqtt
source
#19931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. This is looking good and will be great to have to complement the mqtt
sink.
There's just a few fairly minor changes that I can see.
src/sources/mqtt/config.rs
Outdated
use vector_lib::config::{LegacyKey, LogNamespace}; | ||
use vector_lib::configurable::configurable_component; | ||
use vector_lib::lookup::owned_value_path; | ||
use vector_lib::tls::{MaybeTlsSettings, TlsEnableableConfig}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we group these imports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
For the record nightly rustfmt can do this automatically, I think it's worth it to consider it. We've been using it for some time now and it seems stable though we don't really use latest features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. We'd need to wait for that to move into stable since we have so many different contributors and we shouldn't dictate the nightly versions. But I'm definitely looking forward to that.
src/sources/mqtt/source.rs
Outdated
}) | ||
} | ||
|
||
pub async fn run(self, mut out: SourceSender, _shutdown: ShutdownSignal) -> Result<(), ()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't ignore the ShutdownSignal
otherwise Vector just gets stuck when it tries to shutdown and needs to be force killed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, fixed based on the suggestion you posted later. I tried to copy some other source and delete stuff that didn't seem needed for this and this was caught in that.
src/sources/mqtt/source.rs
Outdated
match connection.poll().await { | ||
Ok(MqttEvent::Incoming(Incoming::Publish(publish))) => { | ||
self.process_message(publish, &mut out).await; | ||
} | ||
Ok(MqttEvent::Incoming( | ||
Incoming::PubAck(_) | Incoming::PubRec(_) | Incoming::PubComp(_), | ||
)) => { | ||
// TODO Handle acknowledgement | ||
} | ||
_ => {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In pseudo code something like:
tokio::select! {
_ = shudown => break;
poll = connection.poll() => {
match poll {
....
}
}
}
/ci-run-component-features |
Co-authored-by: Stephen Wakely <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
.vscode/settings.json
Outdated
@@ -0,0 +1,6 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔨 warning: This config shouldn't be committed, it can instead be added to your global gitignore setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my mistake. I fixed my gitignore, thanks for the suggestion.
src/lib.rs
Outdated
@@ -4,7 +4,6 @@ | |||
#![deny(unused_allocation)] | |||
#![deny(unused_assignments)] | |||
#![deny(unused_comparisons)] | |||
#![deny(warnings)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ question: is there a good reason to delete this? We want to keep a clean code base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, sorry. I didn't mean to commit this.
It was just more convenient for me when warnings and clippy pedantic such as unused import did not abort compilation when I just wanted to check tests after some changes that I didn't properly clean up yet. I put it back.
src/sources/mqtt/config.rs
Outdated
InvalidClientId, | ||
|
||
#[snafu(display("Username and password must be either both or neither provided."))] | ||
BadCredentials, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥜 nitpick: IncompleteCredentials
is a bit more specific
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
src/sources/mqtt/config.rs
Outdated
/// MQTT server address (The broker’s domain name or IP address). | ||
#[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))] | ||
pub host: String, | ||
|
||
/// TCP port of the MQTT server to connect to. | ||
#[configurable(derived)] | ||
#[serde(default = "default_port")] | ||
#[derivative(Default(value = "default_port()"))] | ||
pub port: u16, | ||
|
||
/// MQTT username. | ||
#[configurable(derived)] | ||
#[serde(default)] | ||
pub user: Option<String>, | ||
|
||
/// MQTT password. | ||
#[configurable(derived)] | ||
#[serde(default)] | ||
pub password: Option<String>, | ||
|
||
/// MQTT client ID. If there are multiple | ||
#[configurable(derived)] | ||
#[serde(default)] | ||
pub client_id: Option<String>, | ||
|
||
/// Connection keep-alive interval. | ||
#[configurable(derived)] | ||
#[serde(default = "default_keep_alive")] | ||
#[derivative(Default(value = "default_keep_alive()"))] | ||
pub keep_alive: u16, | ||
|
||
/// MQTT topic from which messages are to be read. | ||
#[configurable(derived)] | ||
#[serde(default = "default_topic")] | ||
#[derivative(Default(value = "default_topic()"))] | ||
pub topic: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 thought: These appear to be exact replicas of the same settings for the sink. We could consider a common struct they both use with serde_flatten. This could be located in a shared space like src/common/mqtt.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there's a lot of duplication here.
I'm also in favor of creating a shared module, I just didn't get around to do it yet.
@@ -0,0 +1,111 @@ | |||
#![cfg(feature = "mqtt-integration-tests")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 praise: thanks for adding integration tests 🎉
#[derive(Debug, Snafu)] | ||
#[snafu(visibility(pub))] | ||
pub enum MqttError { | ||
#[snafu(display("TLS error: {}", source))] | ||
Tls { source: TlsError }, | ||
#[snafu(display("MQTT configuration error: {}", source))] | ||
Configuration { source: ConfigurationError }, | ||
} | ||
|
||
#[derive(Clone)] | ||
pub struct MqttConnector { | ||
options: MqttOptions, | ||
topic: String, | ||
} | ||
|
||
impl MqttConnector { | ||
pub const fn new(options: MqttOptions, topic: String) -> Result<Self, MqttError> { | ||
Ok(Self { options, topic }) | ||
} | ||
|
||
async fn connect(&self) -> Result<(AsyncClient, EventLoop), ClientError> { | ||
let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024); | ||
client.subscribe(&self.topic, QoS::AtLeastOnce).await?; | ||
Ok((client, eventloop)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💬 suggestion: this also looks like high overlap with the same objects in the sink, could it be in the common mqtt module?
)) => { | ||
// TODO Handle acknowledgement | ||
} | ||
_ => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 thought: Am I reading this correctly that we are going to silently drop on errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If by drop you mean drop incoming messages, than no, that's should not be the error case here.
The poll
method returns all incoming packets of which we only care about publishes (and hopefully later also about acks and such), but also completely irrelevant stuff like pong. It also returns all our outgoing packets like ping, acks, etc. This all could be handled by Ok(_)
.
The Err
variant here is error of the protocol. In real life it's going to be most likely network timeout, pong not arriving in a timely manner and possibly misbehaving broker. But continuing to call poll
should make the underlying library reconnect and everything should continue.
Now that I describe it, this should probably be logged as a warning/error as disconnection can eventually lead to lost messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I describe it, this should probably be logged as a warning/error as disconnection can eventually lead to lost messages.
Sounds good to me. I think adding some more in code comments to what you just mentioned also would be helpful.
protocol: "mqtt", | ||
endpoint: &self.connector.options.broker_address().0, | ||
}); | ||
let events_received = register!(EventsReceived); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔨 warning: this should be saved as state in the struct, only registering it once on initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, fixed. I've taken most inspiration from the sqs source (for no specific reason) and there they seem to clone the handle even though it's used only as a shared reference. I don't clone it as much but maybe there's a reason to do so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just glancing at the aws_sqs
source it looks to me like they're only cloning it once per the client_concurrency setting (which defaults to the number of threads on the system) , at initialization of the source.
Since the mqtt
source isn't managing concurrency directly there shouldn't be a need to clone the EventsReceived registry.
@@ -0,0 +1,14 @@ | |||
--- | |||
title: MQTT | |||
description: Receive observability event data from an [MQTT](https://mqtt.org) broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💬 suggestion: Since we're being specific to log data, I think we should keep that consistent here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Hi, is this PR still actively being worked on? Would love to see an MQTT sink shipped, happy to contribute to that effort |
Sorry, I forgot about it. I believe the only major thing I left unresolved is deduplication between the sink and source but don't quote me on that. Feel free to push it over the finish line. I can't promise I'd have time to get back to this in the coming weeks and definitely not in the coming days. |
Looking forward to having this feature. 🥳 Is this ready for merge? |
Hi @mladedav I see this is quite stale, is this PR still being worked on or should it be closed ? |
I'm obviously biased but I think it would be a shame to close this. IIRC the code works (unless there are conflicts now with the main branch) and the only thing I didn't get around to was code deduplication between the mqtt source and sink. |
Thanks @mladedav, are you motivated to take this over the finish line? I can help with fast reviews if so. |
would be great to have this in vector in the next release |
I will (1) review, (2) if it's not too much effort address nits. If it turns out this need a lot of dev work, I will leave it up to community to take it over the finish line. |
@pront can this be expected to be merged anytime soon ? |
The PR still needs some work. But if I recall correctly it's almost there. |
Are we there yet ? |
There hasn't been any activity on this PR for a while. If some community member wants to fork it or work on this directly, I am happy to review. |
Closed in favor of #22752 |
* wip * add mqtt source cue reference * add changelog * Update src/sources/mqtt/config.rs Co-authored-by: Stephen Wakely <[email protected]> * rustfmt * honor shutdown signa * git rm .vscode/settings.json * add deny warnings back * use new_maybe_logs * fix type, changelog, rename bad creds * add fowllow up github issue * common config WIP * Consolidated MQTT error types * More deduplication in MQTT error types * Deduplicated MQTT connector type * Formatting fixes for tests * Fixed docs tests * Fixed metrics and integration tests for MQTT * Added to changelog authorship * Fixed cue docs formating * Formatting fix on errors * Added topic metadata --------- Co-authored-by: David Mládek <[email protected]> Co-authored-by: Stephen Wakely <[email protected]> Co-authored-by: Pavlos Rontidis <[email protected]>
Closes #584
I made this initial implementation farily simple, one important point is that I have not implemented acknowledgements.