Skip to content

Commit b43a520

Browse files
committed
Clean up project
1 parent f742f50 commit b43a520

22 files changed

+193
-213
lines changed

build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ java {
2727
}
2828

2929
dependencies {
30+
compileOnly(libs.jetbrains.annotations)
3031
implementation(libs.protobuf)
3132
implementation(libs.dropwizard.metrics.influxdb)
3233
implementation(libs.commonsLang)
@@ -54,6 +55,7 @@ testing {
5455
}
5556
"test"(JvmTestSuite::class) {
5657
dependencies {
58+
compileOnly(libs.jetbrains.annotations)
5759
implementation(libs.assertj)
5860
implementation(libs.mockito)
5961
implementation(libs.wiremock)
@@ -62,6 +64,7 @@ testing {
6264
}
6365
"integrationTest"(JvmTestSuite::class) {
6466
dependencies {
67+
compileOnly(libs.jetbrains.annotations)
6568
implementation(libs.testcontainers.junitJupiter)
6669
implementation(libs.testcontainers.hivemq)
6770
implementation(libs.gradleOci.junitJupiter)

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ dropwizard-metrics-influxdb = "1.3.4"
55
gradleOci-junitJupiter = "0.7.0"
66
hivemq-extensionSdk = "4.40.0"
77
hivemq-mqttClient = "1.3.7"
8+
jetbrains-annotations = "26.0.2"
89
junit-jupiter = "5.10.0"
910
logback = "1.5.18"
1011
mockito = "5.18.0"
@@ -17,6 +18,7 @@ assertj = { module = "org.assertj:assertj-core", version.ref = "assertj" }
1718
commonsLang = { module = "org.apache.commons:commons-lang3", version.ref = "commonsLang" }
1819
dropwizard-metrics-influxdb = { module = "com.izettle:dropwizard-metrics-influxdb", version.ref = "dropwizard-metrics-influxdb" }
1920
gradleOci-junitJupiter = { module = "io.github.sgtsilvio:gradle-oci-junit-jupiter", version.ref = "gradleOci-junitJupiter" }
21+
jetbrains-annotations = { module = "org.jetbrains:annotations", version.ref = "jetbrains-annotations" }
2022
hivemq-mqttClient = { module = "com.hivemq:hivemq-mqtt-client", version.ref = "hivemq-mqttClient" }
2123
logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
2224
mockito = { module = "org.mockito:mockito-core", version.ref = "mockito" }

renovate.json5

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
"local>hivemq/renovate-config:default.json5",
55
],
66
addLabels: [
7-
"tooling-and-extensions-coordination",
7+
"integrations-team-coordination",
88
]
99
}

src/integrationTest/java/com/hivemq/extensions/sparkplug/influxdb/SparkplugBInterceptorIT.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.hivemq.extensions.sparkplug.influxdb;
1717

18-
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
1918
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
2019
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
2120
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
@@ -47,44 +46,43 @@ class SparkplugBInterceptorIT {
4746
@Test
4847
@Timeout(value = 3, unit = TimeUnit.MINUTES)
4948
void test_DBIRTH() throws Exception {
50-
final String DEATH_TOPIC = "spBv1.0/group1/NDEATH/eon1";
51-
final String BIRTH_TOPIC = "spBv1.0/group1/NBIRTH/eon1";
49+
final var deathTopic = "spBv1.0/group1/NDEATH/eon1";
50+
final var birthTopic = "spBv1.0/group1/NBIRTH/eon1";
5251

53-
final Mqtt5BlockingClient client = Mqtt5Client.builder()
52+
final var client = Mqtt5Client.builder()
5453
.serverHost("localhost")
5554
.serverPort(container.getMqttPort())
5655
.identifier("EON1")
5756
.buildBlocking();
5857

59-
final Mqtt5BlockingClient subscriber = Mqtt5Client.builder()
58+
final var subscriber = Mqtt5Client.builder()
6059
.serverHost("localhost")
6160
.serverPort(container.getMqttPort())
6261
.identifier("SCADA")
6362
.buildBlocking();
6463

6564
subscriber.connect();
6665

67-
final CompletableFuture<Mqtt5Publish> publishBIRTH = new CompletableFuture<>();
68-
final CompletableFuture<Mqtt5Publish> publishDEATH = new CompletableFuture<>();
69-
subscriber.toAsync().subscribeWith().topicFilter(BIRTH_TOPIC).callback(publishBIRTH::complete).send().get();
70-
subscriber.toAsync().subscribeWith().topicFilter(DEATH_TOPIC).callback(publishDEATH::complete).send().get();
66+
final var publishBIRTH = new CompletableFuture<Mqtt5Publish>();
67+
final var publishDEATH = new CompletableFuture<Mqtt5Publish>();
68+
subscriber.toAsync().subscribeWith().topicFilter(birthTopic).callback(publishBIRTH::complete).send().get();
69+
subscriber.toAsync().subscribeWith().topicFilter(deathTopic).callback(publishDEATH::complete).send().get();
7170

72-
final Mqtt5Publish will =
73-
Mqtt5Publish.builder().topic(DEATH_TOPIC).payload("".getBytes(StandardCharsets.UTF_8)).build();
71+
final var will = Mqtt5Publish.builder().topic(deathTopic).payload("".getBytes(StandardCharsets.UTF_8)).build();
7472
client.connectWith().willPublish(will).send();
7573

76-
final Mqtt5Publish birthPublish =
77-
Mqtt5Publish.builder().topic(BIRTH_TOPIC).payload("".getBytes(StandardCharsets.UTF_8)).build();
74+
final var birthPublish =
75+
Mqtt5Publish.builder().topic(birthTopic).payload("".getBytes(StandardCharsets.UTF_8)).build();
7876
client.publish(birthPublish);
7977

80-
final Mqtt5Publish birth = publishBIRTH.get();
81-
assertEquals(BIRTH_TOPIC, birth.getTopic().toString());
78+
final var birth = publishBIRTH.get();
79+
assertEquals(birthTopic, birth.getTopic().toString());
8280

8381
// disconnect triggers a death certificate
8482
client.disconnectWith().reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE).send();
8583

86-
final Mqtt5Publish death = publishDEATH.get();
87-
assertEquals(DEATH_TOPIC, death.getTopic().toString());
84+
final var death = publishDEATH.get();
85+
assertEquals(deathTopic, death.getTopic().toString());
8886

8987
subscriber.disconnect();
9088
}

src/integrationTest/resources/logback-test.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
1717
-->
1818
<configuration>
19-
2019
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
2120
<encoder>
2221
<pattern>%-30(%d %level)- %msg%n%ex</pattern>
@@ -33,5 +32,4 @@
3332

3433
<!-- silence netty -->
3534
<logger name="io.netty" level="WARN"/>
36-
3735
</configuration>

src/main/java/com/hivemq/extensions/sparkplug/influxdb/InfluxDbCloudSender.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.izettle.metrics.influxdb.utils.TimeUtils;
2222

2323
import java.io.IOException;
24-
import java.io.OutputStream;
2524
import java.net.HttpURLConnection;
2625
import java.net.URL;
2726
import java.net.URLEncoder;
@@ -57,33 +56,29 @@ public InfluxDbCloudSender(
5756
this.connectTimeout = connectTimeout;
5857
this.readTimeout = readTimeout;
5958

60-
final String endpoint = new URL(protocol, host, port, "/api/v2/write").toString();
61-
final String queryPrecision = String.format("precision=%s", TimeUtils.toTimePrecision(timePrecision));
62-
final String orgParameter = String.format("org=%s", URLEncoder.encode(organization, StandardCharsets.UTF_8));
63-
final String bucketParameter = String.format("bucket=%s", URLEncoder.encode(bucket, StandardCharsets.UTF_8));
59+
final var endpoint = new URL(protocol, host, port, "/api/v2/write").toString();
60+
final var queryPrecision = String.format("precision=%s", TimeUtils.toTimePrecision(timePrecision));
61+
final var orgParameter = String.format("org=%s", URLEncoder.encode(organization, StandardCharsets.UTF_8));
62+
final var bucketParameter = String.format("bucket=%s", URLEncoder.encode(bucket, StandardCharsets.UTF_8));
6463
this.url = new URL(endpoint + "?" + queryPrecision + "&" + orgParameter + "&" + bucketParameter);
6564
}
6665

6766
@Override
6867
protected int writeData(final byte @NotNull [] line) throws Exception {
69-
final HttpURLConnection con = (HttpURLConnection) url.openConnection();
68+
final var con = (HttpURLConnection) url.openConnection();
7069
con.setRequestMethod("POST");
7170
con.setRequestProperty("Authorization", "Token " + authToken);
7271
con.setDoOutput(true);
7372
con.setConnectTimeout(connectTimeout);
7473
con.setReadTimeout(readTimeout);
7574
con.setRequestProperty("Content-Encoding", "gzip");
76-
77-
try (final OutputStream out = con.getOutputStream();
78-
final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) {
75+
try (final var out = con.getOutputStream(); final var gzipOutputStream = new GZIPOutputStream(out)) {
7976
gzipOutputStream.write(line);
8077
gzipOutputStream.flush();
8178
out.flush();
8279
}
83-
84-
final int responseCode = con.getResponseCode();
85-
8680
// check if non 2XX response code
81+
final var responseCode = con.getResponseCode();
8782
if (responseCode / 100 != 2) {
8883
throw new IOException("Server returned HTTP response code: " +
8984
responseCode +

src/main/java/com/hivemq/extensions/sparkplug/influxdb/SparkplugBInterceptor.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,20 @@
1515
*/
1616
package com.hivemq.extensions.sparkplug.influxdb;
1717

18-
import com.hivemq.extension.sdk.api.annotations.NotNull;
1918
import com.hivemq.extension.sdk.api.interceptor.publish.PublishInboundInterceptor;
2019
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundInput;
2120
import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundOutput;
22-
import com.hivemq.extension.sdk.api.packets.publish.PublishPacket;
2321
import com.hivemq.extensions.sparkplug.influxdb.configuration.SparkplugConfiguration;
2422
import com.hivemq.extensions.sparkplug.influxdb.metrics.MetricsHolder;
2523
import com.hivemq.extensions.sparkplug.influxdb.topics.TopicStructure;
2624
import org.eclipse.tahu.protobuf.SparkplugBProto;
25+
import org.jetbrains.annotations.NotNull;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

30-
import java.nio.ByteBuffer;
3129
import java.util.HashMap;
3230
import java.util.List;
3331
import java.util.Map;
34-
import java.util.Optional;
3532

3633
import static com.hivemq.extensions.sparkplug.influxdb.topics.MessageType.STATE;
3734

@@ -53,7 +50,6 @@ public class SparkplugBInterceptor implements PublishInboundInterceptor {
5350
public SparkplugBInterceptor(
5451
final @NotNull MetricsHolder metricsHolder,
5552
final @NotNull SparkplugConfiguration configuration) {
56-
5753
this.metricsHolder = metricsHolder;
5854
this.sparkplugVersion = configuration.getSparkplugVersion();
5955
}
@@ -65,24 +61,23 @@ public void onInboundPublish(
6561
if (log.isTraceEnabled()) {
6662
log.trace("Incoming publish from {}", publishInboundInput.getPublishPacket().getTopic());
6763
}
68-
final PublishPacket publishPacket = publishInboundInput.getPublishPacket();
69-
final String topic = publishPacket.getTopic();
70-
final Optional<ByteBuffer> payload = publishPacket.getPayload();
71-
final TopicStructure topicStructure = new TopicStructure(topic);
64+
final var publishPacket = publishInboundInput.getPublishPacket();
65+
final var topic = publishPacket.getTopic();
66+
final var payload = publishPacket.getPayload();
67+
final var topicStructure = new TopicStructure(topic);
7268
if (payload.isPresent() && topicStructure.isValid(sparkplugVersion)) {
7369
// it's a Sparkplug publish
74-
final ByteBuffer byteBuffer = payload.get();
70+
final var byteBuffer = payload.get();
7571
try {
76-
final SparkplugBProto.Payload spPayload = SparkplugBProto.Payload.parseFrom(byteBuffer);
77-
final List<SparkplugBProto.Payload.Metric> metricsList = spPayload.getMetricsList();
78-
for (final SparkplugBProto.Payload.Metric metric : metricsList) {
72+
final var spPayload = SparkplugBProto.Payload.parseFrom(byteBuffer);
73+
final var metricsList = spPayload.getMetricsList();
74+
for (final var metric : metricsList) {
7975
aliasToMetric.put(metric.getAlias(), metric.getName());
8076
if (log.isTraceEnabled()) {
8177
log.trace("Add Metric Mapping (Alias={}, MetricName={})", metric.getAlias(), metric.getName());
8278
}
8379
}
8480
generateMetricsFromMessage(topicStructure, metricsList);
85-
8681
} catch (final Exception e) {
8782
log.error("Could not parse MQTT payload to protobuf", e);
8883
}
@@ -136,9 +131,9 @@ private void generateMetricForEdgesAndDevices(
136131
}
137132
case DDATA:
138133
case NDATA: {
139-
for (final SparkplugBProto.Payload.Metric metric : metricsList) {
140-
final long alias = metric.getAlias();
141-
final String metricName = aliasToMetric.get(alias);
134+
for (final var metric : metricsList) {
135+
final var alias = metric.getAlias();
136+
final var metricName = aliasToMetric.get(alias);
142137
if (metric.hasIntValue()) {
143138
metricsHolder.getDeviceInformationMetricsInt(topicStructure.getEonId(),
144139
topicStructure.getDeviceId(),

0 commit comments

Comments
 (0)