Skip to content

feat(new source): Initial mqtt source #19931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

mladedav
Copy link
Contributor

Closes #584

I made this initial implementation farily simple, one important point is that I have not implemented acknowledgements.

@mladedav mladedav requested review from a team as code owners February 22, 2024 17:52
@mladedav mladedav requested a review from a team February 22, 2024 17:52
@github-actions github-actions bot added domain: sources Anything related to the Vector's sources domain: ci Anything related to Vector's CI environment domain: external docs Anything related to Vector's external, public documentation labels Feb 22, 2024
@sghall sghall requested review from sghall and removed request for sghall February 26, 2024 22:17
@neuronull neuronull self-requested a review February 26, 2024 23:35
Copy link
Contributor

@StephenWakely StephenWakely left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. This is looking good and will be great to have to complement the mqtt sink.

There's just a few fairly minor changes that I can see.

Comment on lines 7 to 10
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::owned_value_path;
use vector_lib::tls::{MaybeTlsSettings, TlsEnableableConfig};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we group these imports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

For the record nightly rustfmt can do this automatically, I think it's worth it to consider it. We've been using it for some time now and it seems stable though we don't really use latest features.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. We'd need to wait for that to move into stable since we have so many different contributors and we shouldn't dictate the nightly versions. But I'm definitely looking forward to that.

})
}

pub async fn run(self, mut out: SourceSender, _shutdown: ShutdownSignal) -> Result<(), ()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't ignore the ShutdownSignal otherwise Vector just gets stuck when it tries to shutdown and needs to be force killed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fixed based on the suggestion you posted later. I tried to copy some other source and delete stuff that didn't seem needed for this and this was caught in that.

Comment on lines 77 to 87
match connection.poll().await {
Ok(MqttEvent::Incoming(Incoming::Publish(publish))) => {
self.process_message(publish, &mut out).await;
}
Ok(MqttEvent::Incoming(
Incoming::PubAck(_) | Incoming::PubRec(_) | Incoming::PubComp(_),
)) => {
// TODO Handle acknowledgement
}
_ => {}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In pseudo code something like:

tokio::select! {
  _ = shudown => break;
  poll = connection.poll() => {
     match poll {
        ....
     }
  }
} 

@StephenWakely
Copy link
Contributor

/ci-run-component-features

Copy link
Contributor

@neuronull neuronull left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@@ -0,0 +1,6 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔨 warning: This config shouldn't be committed, it can instead be added to your global gitignore setting.‏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my mistake. I fixed my gitignore, thanks for the suggestion.

src/lib.rs Outdated
@@ -4,7 +4,6 @@
#![deny(unused_allocation)]
#![deny(unused_assignments)]
#![deny(unused_comparisons)]
#![deny(warnings)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ question: ‏is there a good reason to delete this? We want to keep a clean code base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, sorry. I didn't mean to commit this.

It was just more convenient for me when warnings and clippy pedantic such as unused import did not abort compilation when I just wanted to check tests after some changes that I didn't properly clean up yet. I put it back.

InvalidClientId,

#[snafu(display("Username and password must be either both or neither provided."))]
BadCredentials,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥜 nitpick: IncompleteCredentials is a bit more specific

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 40 to 75
/// MQTT server address (The broker’s domain name or IP address).
#[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
pub host: String,

/// TCP port of the MQTT server to connect to.
#[configurable(derived)]
#[serde(default = "default_port")]
#[derivative(Default(value = "default_port()"))]
pub port: u16,

/// MQTT username.
#[configurable(derived)]
#[serde(default)]
pub user: Option<String>,

/// MQTT password.
#[configurable(derived)]
#[serde(default)]
pub password: Option<String>,

/// MQTT client ID. If there are multiple
#[configurable(derived)]
#[serde(default)]
pub client_id: Option<String>,

/// Connection keep-alive interval.
#[configurable(derived)]
#[serde(default = "default_keep_alive")]
#[derivative(Default(value = "default_keep_alive()"))]
pub keep_alive: u16,

/// MQTT topic from which messages are to be read.
#[configurable(derived)]
#[serde(default = "default_topic")]
#[derivative(Default(value = "default_topic()"))]
pub topic: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 thought: ‏These appear to be exact replicas of the same settings for the sink. We could consider a common struct they both use with serde_flatten. This could be located in a shared space like src/common/mqtt.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there's a lot of duplication here.

I'm also in favor of creating a shared module, I just didn't get around to do it yet.

@@ -0,0 +1,111 @@
#![cfg(feature = "mqtt-integration-tests")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 praise: ‏thanks for adding integration tests 🎉

Comment on lines +22 to +47
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum MqttError {
#[snafu(display("TLS error: {}", source))]
Tls { source: TlsError },
#[snafu(display("MQTT configuration error: {}", source))]
Configuration { source: ConfigurationError },
}

#[derive(Clone)]
pub struct MqttConnector {
options: MqttOptions,
topic: String,
}

impl MqttConnector {
pub const fn new(options: MqttOptions, topic: String) -> Result<Self, MqttError> {
Ok(Self { options, topic })
}

async fn connect(&self) -> Result<(AsyncClient, EventLoop), ClientError> {
let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024);
client.subscribe(&self.topic, QoS::AtLeastOnce).await?;
Ok((client, eventloop))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏this also looks like high overlap with the same objects in the sink, could it be in the common mqtt module?

)) => {
// TODO Handle acknowledgement
}
_ => {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 thought: ‏Am I reading this correctly that we are going to silently drop on errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If by drop you mean drop incoming messages, than no, that's should not be the error case here.

The poll method returns all incoming packets of which we only care about publishes (and hopefully later also about acks and such), but also completely irrelevant stuff like pong. It also returns all our outgoing packets like ping, acks, etc. This all could be handled by Ok(_).

The Err variant here is error of the protocol. In real life it's going to be most likely network timeout, pong not arriving in a timely manner and possibly misbehaving broker. But continuing to call poll should make the underlying library reconnect and everything should continue.

Now that I describe it, this should probably be logged as a warning/error as disconnection can eventually lead to lost messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I describe it, this should probably be logged as a warning/error as disconnection can eventually lead to lost messages.

Sounds good to me. I think adding some more in code comments to what you just mentioned also would be helpful.

protocol: "mqtt",
endpoint: &self.connector.options.broker_address().0,
});
let events_received = register!(EventsReceived);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔨 warning: ‏this should be saved as state in the struct, only registering it once on initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fixed. I've taken most inspiration from the sqs source (for no specific reason) and there they seem to clone the handle even though it's used only as a shared reference. I don't clone it as much but maybe there's a reason to do so?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just glancing at the aws_sqs source it looks to me like they're only cloning it once per the client_concurrency setting (which defaults to the number of threads on the system) , at initialization of the source.

Since the mqtt source isn't managing concurrency directly there shouldn't be a need to clone the EventsReceived registry.

@@ -0,0 +1,14 @@
---
title: MQTT
description: Receive observability event data from an [MQTT](https://mqtt.org) broker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏Since we're being specific to log data, I think we should keep that consistent here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@dmvinson
Copy link

Hi, is this PR still actively being worked on? Would love to see an MQTT sink shipped, happy to contribute to that effort

@mladedav
Copy link
Contributor Author

Sorry, I forgot about it. I believe the only major thing I left unresolved is deduplication between the sink and source but don't quote me on that.

Feel free to push it over the finish line. I can't promise I'd have time to get back to this in the coming weeks and definitely not in the coming days.

@yonas
Copy link

yonas commented Sep 28, 2024

Looking forward to having this feature. 🥳

Is this ready for merge?

@jszwedko jszwedko requested a review from a team as a code owner October 3, 2024 18:54
@aliciascott
Copy link
Contributor

Hi @mladedav I see this is quite stale, is this PR still being worked on or should it be closed ?

@mladedav
Copy link
Contributor Author

I'm obviously biased but I think it would be a shame to close this.

IIRC the code works (unless there are conflicts now with the main branch) and the only thing I didn't get around to was code deduplication between the mqtt source and sink.

@pront
Copy link
Member

pront commented Nov 11, 2024

I'm obviously biased but I think it would be a shame to close this.

IIRC the code works (unless there are conflicts now with the main branch) and the only thing I didn't get around to was code deduplication between the mqtt source and sink.

Thanks @mladedav, are you motivated to take this over the finish line? I can help with fast reviews if so.

@ahsandar
Copy link

ahsandar commented Dec 5, 2024

would be great to have this in vector in the next release

@pront
Copy link
Member

pront commented Dec 5, 2024

I will (1) review, (2) if it's not too much effort address nits. If it turns out this need a lot of dev work, I will leave it up to community to take it over the finish line.

@pront pront requested a review from neuronull December 5, 2024 16:52
@pront pront dismissed stale reviews from StephenWakely and neuronull December 5, 2024 16:53

outdated

@pront pront requested a review from StephenWakely December 5, 2024 16:53
@github-actions github-actions bot added the domain: sinks Anything related to the Vector's sinks label Dec 5, 2024
@ahsandar
Copy link

@pront can this be expected to be merged anytime soon ?

@pront
Copy link
Member

pront commented Dec 11, 2024

@pront can this be expected to be merged anytime soon ?

The PR still needs some work. But if I recall correctly it's almost there.

@ahsandar
Copy link

ahsandar commented Mar 5, 2025

Are we there yet ?

@pront
Copy link
Member

pront commented Mar 5, 2025

Are we there yet ?

There hasn't been any activity on this PR for a while. If some community member wants to fork it or work on this directly, I am happy to review.

@pront
Copy link
Member

pront commented Mar 31, 2025

Closed in favor of #22752

@pront pront closed this Mar 31, 2025
github-merge-queue bot pushed a commit that referenced this pull request May 14, 2025
* wip

* add mqtt source cue reference

* add changelog

* Update src/sources/mqtt/config.rs

Co-authored-by: Stephen Wakely <[email protected]>

* rustfmt

* honor shutdown signa

* git rm .vscode/settings.json

* add deny warnings back

* use new_maybe_logs

* fix type, changelog, rename bad creds

* add fowllow up github issue

* common config WIP

* Consolidated MQTT error types

* More deduplication in MQTT error types

* Deduplicated MQTT connector type

* Formatting fixes for tests

* Fixed docs tests

* Fixed metrics and integration tests for MQTT

* Added to changelog authorship

* Fixed cue docs formating

* Formatting fix on errors

* Added topic metadata

---------

Co-authored-by: David Mládek <[email protected]>
Co-authored-by: Stephen Wakely <[email protected]>
Co-authored-by: Pavlos Rontidis <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: ci Anything related to Vector's CI environment domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks domain: sources Anything related to the Vector's sources source: mqtt
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New mqtt source
8 participants