Skip to content

Commit bd40638

Browse files
authored
Merge pull request #14 from knation/node-redis-v4
Improve internal handling of redis clients
2 parents c1f5c7d + f55ce33 commit bd40638

File tree

4 files changed

+146
-87
lines changed

4 files changed

+146
-87
lines changed

.github/workflows/node.js.yml

+1-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616

1717
strategy:
1818
matrix:
19-
node-version: [12.x, 14.x, 16.x]
19+
node-version: [18.x]
2020
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/
2121

2222
steps:
@@ -28,10 +28,3 @@ jobs:
2828
cache: 'npm'
2929
- run: npm ci
3030
- run: npm run build --if-present
31-
- run: npm run coverage
32-
33-
# Save coverage report in Coveralls
34-
- name: Coveralls
35-
uses: coverallsapp/github-action@master
36-
with:
37-
github-token: ${{ secrets.GITHUB_TOKEN }}

README.md

+13-7
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ try {
9595
this.add(5000, data);
9696
}
9797

98-
// Don't forget to clean up later: `dt.stop()`
98+
// Don't forget to clean up later: `dt.stop()` or `dt.close()`, as applicable
9999
```
100100

101101

@@ -113,21 +113,25 @@ The constructor takes a single object. Properties are as follows:
113113
| `callback` | The function to call when tasks are due. <br><br>When a task is due or past-due, your callback method is called asynchronously, passing the `data` you provided when adding, the generated `taskId`, and the time (in ms) that the task was due.<br><br>The context of `this` is the `DelayedTasks` object. | Yes | |
114114
| `options.pollIntervalMs` | How often to poll redis for tasks due (in milliseconds). The shorter the interval, the sooner after being due a task will be processed, but the more load in redis. | No | 1000 |
115115

116-
<sup>1</sup> This module uses [`node-redis`](https://github.com/redis/node-redis) version 4 under the hood in legacy mode.
116+
<sup>1</sup> This module uses [`node-redis`](https://github.com/redis/node-redis) version 4 under the hood in legacy mode. If you provide your own client, you're responsible for connecting, disconnecting, and creating an error handler.
117117

118118
### Connect
119119

120-
Before doing anything, call `await dt.connect()` to connect to redis.
120+
If you provided an object with options for connecting to redis, you must call `await dt.connect()` to connect to redis before doing anything else. If you've provided an existing, connected redis client, this is not necessary.
121121

122122
### start / stop polling
123123

124-
To begin polling for tasks, call `dt.start()`. Call `dt.stop()` to stop future polling.
124+
To begin polling for tasks, call `dt.start()`. This returns a boolean with the status of starting. If `false`, it's because the redis client hasn't been connected yet. If this was a self-supplied client, call `await client.connect()`. Otherwise, call `await dt.connect()` to create the connection.
125+
126+
Call `dt.stop()` to stop future polling.
125127

126128
### close()
127129

128-
Calling `dt.close()` will stop polling and close the redis client being used.
130+
Calling `await dt.close()` will stop polling. If a new redis client was created for the object instance (that is, it was passed an object of configuration details), that redis client will be closed, too (using `disconnect()` to abort pending requests). If you passed an existing redis client to the constructor, it will be left open and you'll have to close it yourself when you're finished with it.
129131

130-
**CAUTION:** If you passed your own redis client in the constructor, that client will be closed with this command. If you just want to stop polling, but leave the connection open, call `dt.stop()` instead.
132+
This returns a promise that resolves when the client connection is confirmed closed.
133+
134+
If you just want to stop polling, but leave the connection open, call `dt.stop()` instead.
131135

132136
### add(_delayMs_, _data_)
133137

@@ -151,7 +155,7 @@ To force a poll outside of the poll interval, call `dt.poll()`. This should be u
151155

152156
## Testing
153157

154-
Start a local redis server on port 6379. You can run `docker-compose up` to launch one from this repo. Once redis is running, run `npm test` or `npm coverage`.
158+
The test suite requires a local redis server on port 6379. You can run `docker-compose up` to launch one from this repo. Once redis is running, run `npm test` or `npm coverage`.
155159

156160
## Notes
157161

@@ -181,5 +185,7 @@ catch everything, including redis errors.
181185

182186
* Better handling of watch conflicts in `dt.poll`. Right now it just quits, but if this happens a lot, nothing would end up getting processed.
183187

188+
* Find a redis mock that works with `node-redis` v4 and later versions of redis server.
189+
184190
## License
185191
MIT License

index.js

+47-18
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,24 @@ class DelayedTasks {
1717
throw new TypeError('Invalid queue ID specified');
1818
}
1919

20+
// Will be set to true if the redis instance is self-contained
21+
this.selfContainedResis = false;
22+
2023
if (typeof settings.redis?.connect === 'function') {
2124
this.redisClient = settings.redis;
25+
2226
} else if (typeof settings.redis === 'object') {
2327
settings.redis.legacyMode = true;
2428
this.redisClient = redis.createClient(settings.redis);
29+
this.selfContainedResis = true;
30+
this.redisClient.on("error", function(error) {
31+
// todo: do something with this
32+
});
33+
2534
} else {
2635
throw new TypeError('Invalid redis connection options');
2736
}
2837

29-
this.redisClient.on("error", function(error) {
30-
// todo: do something with this
31-
});
3238

3339
// Callback function for all delayed tasks
3440
if (typeof settings.callback === 'function') {
@@ -54,14 +60,30 @@ class DelayedTasks {
5460
}
5561

5662
connect() {
57-
return this.redisClient.connect();
63+
if (!this.redisClient.isReady) {
64+
const p = new Promise(resolve => this.redisClient.on('ready', resolve));
65+
66+
this.redisClient.connect();
67+
68+
return p;
69+
70+
} else {
71+
// Already connected -- nothing to do and no failure
72+
return Promise.resolve();
73+
}
5874
}
5975

6076
/**
6177
* Start polling.
6278
*/
6379
start() {
64-
this.pollIntervalId = setInterval(this.poll.bind(this), this.pollIntervalMs);
80+
if (this.redisClient.isReady) {
81+
this.pollIntervalId = setInterval(this.poll.bind(this), this.pollIntervalMs);
82+
return true;
83+
84+
} else {
85+
return false;
86+
}
6587
}
6688

6789
/**
@@ -73,11 +95,16 @@ class DelayedTasks {
7395
}
7496

7597
/**
76-
* Closes up shop.
98+
* Closes up shop. If the redis instance is self contained (it was created,
99+
* just for this object instance), it will be deleted.
77100
*/
78-
close() {
101+
async close() {
79102
this.stop();
80-
this.redisClient.quit();
103+
104+
if (this.selfContainedResis && this.redisClient.isReady) {
105+
await this.redisClient.disconnect();
106+
this.redisClient = null;
107+
}
81108
}
82109

83110
/**
@@ -102,12 +129,19 @@ class DelayedTasks {
102129
.exec((execError, results) => {
103130
/* istanbul ignore next */
104131
if (execError) {
105-
resolve(0);
106-
return;
132+
if (execError instanceof redis.WatchError) {
133+
/**
134+
* If execError is a "WatchError", it means that a concurrent client
135+
* changed the key while we were processing it and thus
136+
* the execution of the MULTI command was not performed. We'll fail
137+
* silently as those jobs will be picked up on the next `poll()`
138+
*/
139+
return resolve(0);
140+
} else {
141+
return reject(execError)
142+
}
107143
}
108144

109-
// Success, either that the update was made, or the key changed
110-
111145
if (results && results[0] !== null) {
112146
// Process tasks
113147
tasks
@@ -116,16 +150,11 @@ class DelayedTasks {
116150
}
117151

118152
resolve((!results || results[0] === null) ? 0 : results[0]);
119-
120-
/**
121-
* If execError is a "WatchError", it means that a concurrent client
122-
* changed the key while we were processing it and thus
123-
* the execution of the MULTI command was not performed.
124-
*/
125153
});
126154

127155
} else {
128156
// No changes to make
157+
this.redisClient.unwatch();
129158
resolve(0);
130159
}
131160
});

0 commit comments

Comments
 (0)