Skip to content

Commit a9cb322

Browse files
committed
examples: Add dup filtering to mqtt_relay
Keep information about the previous value sent. If it's been 5 seconds, or new value is different (ignoring keys like snr and frequency), then send it. Otherwise, just don't. This causes bursts of e.g. 4 transmissions to result in one MQTT message, on the theory that the 4 transmissions are not actually 4 messags, but a strategy to transmit one message more reliably. Define a new configuration option to enable duplicate filtering, and default it to True. Steal logging config from mqtt_filter.py, and add a configuration option DEBUG that if True results in debug logging instead of info.
1 parent 3821498 commit a9cb322

File tree

1 file changed

+96
-1
lines changed

1 file changed

+96
-1
lines changed

examples/rtl_433_mqtt_relay.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020
from __future__ import print_function
2121
from __future__ import with_statement
2222

23-
import socket
2423
import json
24+
import socket
25+
import time
26+
2527
import paho.mqtt.client as mqtt
2628

29+
# \todo Make a config variable?
30+
debug = True
31+
32+
2733
# The config class represents a config object. The constructor takes
2834
# an optional pathname, and will switch on the suffix (.yaml for now)
2935
# and read a dictionary.
@@ -42,6 +48,7 @@ class rtlconfig(object):
4248
'MQTT_PASSWORD': None,
4349
'MQTT_TLS': False,
4450
'MQTT_PREFIX': "sensor/rtl_433",
51+
'MQTT_DEDUP': True,
4552
'MQTT_INDIVIDUAL_TOPICS': True,
4653
'MQTT_JSON_TOPIC': True,
4754
}
@@ -68,9 +75,87 @@ def __init__(self, f=None):
6875
def __getitem__(self, k):
6976
return self.c[k]
7077

78+
# A dedup class object supports deduping a stream of reports by
79+
# answering if a report is interesting relative to the history.
80+
# While more complicated deduping is allowed by the interface, for now
81+
# it is very simple, keeping track of only the previous interesting object.
82+
# For now, we more or less require that all reports have the same keys.
83+
# \todo Consider a cache with several entries.
84+
class dedup(object):
85+
86+
def __init__(self, debug = False):
87+
self.debug = debug
88+
89+
# Make this long enough to skip repeats, but allow messages
90+
# every 10s to come through.
91+
self.duration = 5
92+
# Exclude reception metadata (time and RF).
93+
self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg')
94+
# Initialize storage for what was last sent.
95+
(self.last_report, self.last_now) = (None, None)
96+
97+
def send_store(self, report, n):
98+
(self.last_report, self.last_now) = (report, n)
99+
return True
100+
101+
# Return True if j1 and j2 are the same, except for boring_keys.
102+
def equiv(self, j1, j2):
103+
for (k, v) in j1.items():
104+
# If in boring, we don't care.
105+
if k not in self.boring_keys:
106+
# If in j1 and not j2, they are different.
107+
if k not in j2:
108+
if self.debug:
109+
print("%s in j1 and not j2" % (k))
110+
return False
111+
if j1[k] != j2[k]:
112+
if self.debug:
113+
print("%s differs j1=%s and j2=%s" % (k, j1[k], j2[k]))
114+
return False
115+
# If the lengths are different, they must be different.
116+
if len(j1) != len(j2):
117+
if self.debug:
118+
print("len(j1) %d != len(j2) %d" % (len(j1), len(j2)))
119+
return False
120+
121+
# If we get here, then the lengths are the same, and all
122+
# non-boring keys in j1 exist in j2, and have the same value.
123+
# It could be that j2 is missing a boring key and also has a
124+
# new non-boring key, but boring keys in particular should not
125+
# be variable.
126+
return True
127+
128+
# report is a python dictionary
129+
def is_interesting(self, report):
130+
n = time.time()
131+
132+
# If previous interesting is empty (or troubled), accept this
133+
# one.
134+
if self.last_report is None or self.last_now is None:
135+
if self.debug:
136+
print("interesting: no previous")
137+
return self.send_store(report, n)
138+
139+
# If previous one was too long ago, accept this one.
140+
if n - self.last_now > self.duration:
141+
if self.debug:
142+
print("interesting: time")
143+
return self.send_store(report, n)
144+
145+
if not self.equiv(self.last_report, report):
146+
if self.debug:
147+
print("interesting: different")
148+
return self.send_store(report, n)
149+
150+
return False
151+
71152
# Create a config object, defaults modified by the config file if present.
72153
c = rtlconfig("rtl_433_mqtt_relay.yaml")
73154

155+
# Create a dedup object for later use, even if it's configure off.
156+
d = dedup(debug)
157+
158+
74159
def mqtt_connect(client, userdata, flags, rc):
75160
"""Handle MQTT connection callback."""
76161
print("MQTT connected: " + mqtt.connack_string(rc))
@@ -100,6 +185,16 @@ def sanitize(text):
100185
def publish_sensor_to_mqtt(mqttc, data, line):
101186
"""Publish rtl_433 sensor data to MQTT."""
102187

188+
if c['MQTT_DEDUP']:
189+
# If this data is not novel relative to recent data, just skip it.
190+
# Otherwise, send it via MQTT.
191+
if not d.is_interesting(data):
192+
if debug:
193+
print("not interesting: %s" % (line))
194+
return
195+
if debug:
196+
print("INTERESTING: %s" % (line))
197+
103198
# Construct a topic from the information that identifies which
104199
# device this frame is from.
105200
# NB: id is only used if channel is not present.

0 commit comments

Comments
 (0)