-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
createWebSocketStream - objectMode #1640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Because it does not play well with backpressure handling. In object mode backpressure is handled by counting the number of objects and not the actual bytes of buffered messages. Also it does not make much sense. As you noticed the WebSocket protocol is message oriented. What is pushed onto the readable queue of the wrapper stream is a full message. This means that every chunk you get from the The pattern you are using is ok and will not break unless you use |
All makes sense, thanks a lot! |
I'm not 100% sure why, but I found the issue where stream returned by const WebSocket = require('ws')
const ws = new WebSocket('wss://api-pub.bitfinex.com/ws/2')
const messageStream = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })
ws.once('open', function open() {
ws.send(JSON.stringify({ event: 'subscribe', channel: 'book', len: 100, prec: 'P0', freq: 'F0', symbol: `tBTCUSD` }))
})
async function run() {
for await (let message of messageStream) {
console.log('string msg', message)
console.log('parsed msg', JSON.parse(message))
}
}
await run() and on runkit: https://runkit.com/tadeuszwojcik/websocket-stream-issue Please let me know if you need anything else from me, could be something I do incorrectly, not sure... |
It looks like an issue with the async iterator implementation https://github.com/nodejs/node/blob/v12.11.1/lib/internal/streams/async_iterator.js#L22 as it uses Can you reproduce without it using only a |
Using object mode is not an option because it messes up backpressure handling. |
Thanks, I get it now, indeed if I'd only use |
Yes, but if each message is 64 MiB you'll end up buffering 1 GiB. Similarly if each message is 1 B the socket will be paused after only 16 B. Hope it makes sense. |
Thanks, that makes sense and in my case that would work just fine as I don't expect larger messages (and could argue most WS based API return small messages) also I could increase const duplex = new Duplex({
autoDestroy: false,
emitClose: false,
objectMode: false,
readableObjectMode: false,
writableObjectMode: false,
...options
}); So one could be able to override defaults, as today it's not possible. If someone wants to do this, I'd assume that should be aware about the consequences (different back-pressure characteristics). |
I have no objections but it's like giving users a loaded gun :) |
Here is a possible workaround that keeps 'use strict';
const WebSocket = require('ws');
const { PassThrough } = require('stream');
const webSocketStream = WebSocket.createWebSocketStream(
new WebSocket('wss://api-pub.bitfinex.com/ws/2'),
{ decodeStrings: false }
);
webSocketStream.on('error', console.error);
webSocketStream.write(
JSON.stringify({
event: 'subscribe',
channel: 'book',
len: 100,
prec: 'P0',
freq: 'F0',
symbol: 'tBTCUSD'
})
);
const messageStream = new PassThrough({
encoding: 'utf8',
readableObjectMode: true
});
webSocketStream.pipe(messageStream);
async function run() {
for await (const message of messageStream) {
console.log('string msg', message);
console.log('parsed msg', JSON.parse(message));
}
}
run().catch(console.error); |
Thanks, indeed that would work, but wouldn't it in practice provide two 'backpressure queues' - one for duplex stream (counted in bytes) and one for this PassThrough (objects based)? |
It is technically the same because the final readable is in object mode when it should not. The only difference is an additional buffer on the writable side of the
I would add "when using the |
You're right, it could certainly be an issue with Node implementation, I just like a lot that WebSocket messages can be consumed via |
Hi, forgive me please if it's silly question, but is there a reason why duplex stream created via
createWebSocketStream
doesn't haveobjectMode
set to true?ws.onmessage
clearly produces/pushes separate messages not continuous stream of bytes, am I missing something?I'm trying to run smth like this:
Is such pattern fine? Seems to be working and each message is correct string that matches json object provided by the server, but could that break as
objectMode
is not set?I also tried to send message via stream, but couldn't sort it out quickly.
Would you perhaps accept PR that would change https://github.com/websockets/ws/blob/master/lib/stream.js#L65:
to:
so I could provide objectMode flag via options?
The text was updated successfully, but these errors were encountered: