Skip to content

Commit 17ac9cf

Browse files
feat: Add throttle option for devices (#24122)
* basic spam control * used npm run pretty:write * add test for SPAMMER description to comply with 100% coverage test * define friendly name to spammer test devices * Update README.md Co-authored-by: Koen Kanters <[email protected]> * trying now with throttleit library * lint corrections * last lint request * correct await / async definiction * remove description support * change first command to be executed --------- Co-authored-by: Koen Kanters <[email protected]>
1 parent 063d462 commit 17ac9cf

File tree

7 files changed

+112
-2
lines changed

7 files changed

+112
-2
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ Zigbee2MQTT is made up of three modules, each developed in its own Github projec
105105
### Developing
106106

107107
Zigbee2MQTT uses TypeScript (partially for now). Therefore after making changes to files in the `lib/` directory you need to recompile Zigbee2MQTT. This can be done by executing `npm run build`. For faster development instead of running `npm run build` you can run `npm run build-watch` in another terminal session, this will recompile as you change files.
108+
In first time before building you need to run `npm install --include=dev`
109+
Before submitting changes run `npm run test-with-coverage`, `npm run pretty:check` and `npm run eslint`
108110

109111
## Supported devices
110112

lib/extension/receive.ts

+19-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import assert from 'assert';
33
import bind from 'bind-decorator';
44
import debounce from 'debounce';
55
import stringify from 'json-stable-stringify-without-jsonify';
6+
import throttle from 'throttleit';
67

78
import * as zhc from 'zigbee-herdsman-converters';
89

@@ -16,6 +17,7 @@ type DebounceFunction = (() => void) & {clear(): void} & {flush(): void};
1617
export default class Receive extends Extension {
1718
private elapsed: {[s: string]: number} = {};
1819
private debouncers: {[s: string]: {payload: KeyValue; publish: DebounceFunction}} = {};
20+
private throttlers: {[s: string]: {publish: PublishEntityState}} = {};
1921

2022
async start(): Promise<void> {
2123
this.eventBus.onPublishEntityState(this, this.onPublishEntityState);
@@ -68,6 +70,20 @@ export default class Receive extends Extension {
6870
this.debouncers[device.ieeeAddr].publish();
6971
}
7072

73+
async publishThrottle(device: Device, payload: KeyValue, time: number): Promise<void> {
74+
if (!this.throttlers[device.ieeeAddr]) {
75+
this.throttlers[device.ieeeAddr] = {
76+
publish: throttle(this.publishEntityState, time * 1000),
77+
};
78+
}
79+
80+
// Update state cache right away. This makes sure that during throttling cached state is always up to date.
81+
// By updating cache we make sure that state cache is always up-to-date.
82+
this.state.set(device, payload);
83+
84+
await this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle');
85+
}
86+
7187
// if debounce_ignore are specified (Array of strings)
7288
// then all newPayload values with key present in debounce_ignore
7389
// should equal or be undefined in oldPayload
@@ -130,9 +146,11 @@ export default class Receive extends Extension {
130146
this.elapsed[data.device.ieeeAddr] = now;
131147
}
132148

133-
// Check if we have to debounce
149+
// Check if we have to debounce or throttle
134150
if (data.device.options.debounce) {
135151
this.publishDebounce(data.device, payload, data.device.options.debounce, data.device.options.debounce_ignore);
152+
} else if (data.device.options.throttle) {
153+
await this.publishThrottle(data.device, payload, data.device.options.throttle);
136154
} else {
137155
await this.publishEntityState(data.device, payload);
138156
}

lib/types/types.d.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ declare global {
4646
properties?: {messageExpiryInterval: number};
4747
}
4848
type Scene = {id: number; name: string};
49-
type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached';
49+
type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached' | 'publishThrottle';
5050
type PublishEntityState = (entity: Device | Group, payload: KeyValue, stateChangeReason?: StateChangeReason) => Promise<void>;
5151
type RecursivePartial<T> = {[P in keyof T]?: RecursivePartial<T[P]>};
5252
interface KeyValue {
@@ -232,6 +232,7 @@ declare global {
232232
retrieve_state?: boolean;
233233
debounce?: number;
234234
debounce_ignore?: string[];
235+
throttle?: number;
235236
filtered_attributes?: string[];
236237
filtered_cache?: string[];
237238
filtered_optimistic?: string[];

package-lock.json

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
"rimraf": "^6.0.1",
5656
"semver": "^7.6.3",
5757
"source-map-support": "^0.5.21",
58+
"throttleit": "^2.1.0",
5859
"uri-js": "^4.4.1",
5960
"winston": "^3.14.2",
6061
"winston-syslog": "^2.7.1",

test/receive.test.js

+73
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,79 @@ describe('Receive', () => {
355355
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.09, humidity: 0.01, pressure: 2});
356356
});
357357

358+
it('Should throttle multiple messages from spamming devices', async () => {
359+
const device = zigbeeHerdsman.devices.SPAMMER;
360+
const throttle_for_testing = 1;
361+
settings.set(['device_options', 'throttle'], throttle_for_testing);
362+
settings.set(['device_options', 'retain'], true);
363+
settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1');
364+
const data1 = {measuredValue: 1};
365+
const payload1 = {
366+
data: data1,
367+
cluster: 'msTemperatureMeasurement',
368+
device,
369+
endpoint: device.getEndpoint(1),
370+
type: 'attributeReport',
371+
linkquality: 10,
372+
};
373+
await zigbeeHerdsman.events.message(payload1);
374+
const data2 = {measuredValue: 2};
375+
const payload2 = {
376+
data: data2,
377+
cluster: 'msTemperatureMeasurement',
378+
device,
379+
endpoint: device.getEndpoint(1),
380+
type: 'attributeReport',
381+
linkquality: 10,
382+
};
383+
await zigbeeHerdsman.events.message(payload2);
384+
const data3 = {measuredValue: 3};
385+
const payload3 = {
386+
data: data3,
387+
cluster: 'msTemperatureMeasurement',
388+
device,
389+
endpoint: device.getEndpoint(1),
390+
type: 'attributeReport',
391+
linkquality: 10,
392+
};
393+
await zigbeeHerdsman.events.message(payload3);
394+
await flushPromises();
395+
396+
expect(MQTT.publish).toHaveBeenCalledTimes(1);
397+
await flushPromises();
398+
expect(MQTT.publish).toHaveBeenCalledTimes(1);
399+
expect(MQTT.publish.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer1');
400+
expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01});
401+
expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true});
402+
403+
// Now we try after elapsed time to see if it publishes next message
404+
const timeshift = throttle_for_testing * 2000;
405+
jest.advanceTimersByTime(timeshift);
406+
expect(MQTT.publish).toHaveBeenCalledTimes(2);
407+
await flushPromises();
408+
409+
expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1');
410+
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03});
411+
expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true});
412+
413+
const data4 = {measuredValue: 4};
414+
const payload4 = {
415+
data: data4,
416+
cluster: 'msTemperatureMeasurement',
417+
device,
418+
endpoint: device.getEndpoint(1),
419+
type: 'attributeReport',
420+
linkquality: 10,
421+
};
422+
await zigbeeHerdsman.events.message(payload4);
423+
await flushPromises();
424+
425+
expect(MQTT.publish).toHaveBeenCalledTimes(3);
426+
expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1');
427+
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04});
428+
expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true});
429+
});
430+
358431
it('Shouldnt republish old state', async () => {
359432
// https://github.com/Koenkk/zigbee2mqtt/issues/3572
360433
const device = zigbeeHerdsman.devices.bulb;

test/stub/zigbeeHerdsman.js

+2
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,8 @@ const devices = {
503503
'lumi.sensor_86sw2.es1',
504504
),
505505
WSDCGQ11LM: new Device('EndDevice', '0x0017880104e45522', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'),
506+
// This are not a real spammer device, just copy of previous to test the throttle filter
507+
SPAMMER: new Device('EndDevice', '0x0017880104e455fe', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'),
506508
RTCGQ11LM: new Device('EndDevice', '0x0017880104e45523', 6540, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.sensor_motion.aq2'),
507509
ZNCZ02LM: ZNCZ02LM,
508510
E1743: new Device('Router', '0x0017880104e45540', 6540, 4476, [new Endpoint(1, [0], [])], true, 'Mains (single phase)', 'TRADFRI on/off switch'),

0 commit comments

Comments
 (0)