Skip to content

Commit 2ab1a68

Browse files
committed
Improvements to MqttMessageReceiver
1 parent b596a6a commit 2ab1a68

File tree

2 files changed

+11
-10
lines changed

2 files changed

+11
-10
lines changed

src/main/java/org/thingsboard/gateway/service/MqttMessageReceiver.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
@Slf4j
2929
public class MqttMessageReceiver implements Runnable {
3030

31-
private static final int INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD = 10000;
31+
private static final int INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD = 1000;
3232

3333
private PersistentFileService persistentFileService;
3434
private BlockingQueue<MessageFuturePair> incomingQueue;
@@ -65,17 +65,17 @@ public void run() {
6565
log.warn("Failed to send message [{}] due to [{}]", message, future.cause());
6666
}
6767
} catch (InterruptedException e) {
68-
log.error(e.getMessage(), e);
69-
Thread.currentThread().interrupt();
70-
} catch (IOException e) {
68+
log.warn(e.getMessage(), e);
69+
break;
70+
} catch (Exception e) {
7171
log.error(e.getMessage(), e);
7272
}
7373
}
7474
}
7575

7676
private void checkIncomingQueueSize() {
77-
if (incomingQueue.size() > INCOMING_QUEUE_DEFAULT_WARNING_THRESHOLD) {
78-
log.warn("Incoming queue has [{}] messages which is more than thee specified threshold of [{}]", incomingQueue.size());
77+
if (incomingQueue.size() > incomingQueueWarningThreshold) {
78+
log.warn("Incoming queue has [{}] messages which is more than thee specified threshold of [{}]", incomingQueue.size(), incomingQueueWarningThreshold);
7979
}
8080
}
8181
}

src/main/java/org/thingsboard/gateway/service/MqttMessageSender.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Iterator;
2929
import java.util.List;
3030
import java.util.Queue;
31+
import java.util.concurrent.BlockingQueue;
3132
import java.util.concurrent.ConcurrentLinkedQueue;
3233
import java.util.concurrent.ExecutionException;
3334
import java.util.function.Consumer;
@@ -46,14 +47,14 @@ public class MqttMessageSender implements Runnable {
4647

4748
private final TbConnectionConfiguration connection;
4849

49-
private Queue<MessageFuturePair> incomingQueue;
50+
private BlockingQueue<MessageFuturePair> incomingQueue;
5051
private Queue<Future<Void>> outgoingQueue;
5152

5253
public MqttMessageSender(TbPersistenceConfiguration persistence,
5354
TbConnectionConfiguration connection,
5455
MqttClient tbClient,
5556
PersistentFileService persistentFileService,
56-
Queue<MessageFuturePair> incomingQueue) {
57+
BlockingQueue<MessageFuturePair> incomingQueue) {
5758
this.persistence = persistence;
5859
this.connection = connection;
5960
this.tbClient = tbClient;
@@ -102,7 +103,7 @@ public void run() {
102103

103104
private Future<Void> publishMqttMessage(MqttPersistentMessage message) {
104105
return tbClient.publish(message.getTopic(), Unpooled.wrappedBuffer(message.getPayload()), MqttQoS.AT_LEAST_ONCE).addListener(
105-
future -> incomingQueue.add(new MessageFuturePair(future, message))
106+
future -> incomingQueue.put(new MessageFuturePair(future, message))
106107
);
107108
}
108109

@@ -128,7 +129,7 @@ private boolean checkOutgoingQueueIsEmpty() {
128129

129130
private boolean checkClientConnected() throws InterruptedException {
130131
if (!tbClient.isConnected()) {
131-
outgoingQueue.stream().forEach(future -> future.cancel(true));
132+
outgoingQueue.forEach(future -> future.cancel(true));
132133
outgoingQueue.clear();
133134
log.info("ThingsBoard MQTT connection failed. Reconnecting in [{}] milliseconds", connection.getRetryInterval());
134135
Thread.sleep(connection.getRetryInterval());

0 commit comments

Comments
 (0)