Skip to content

createWebSocketStream - objectMode #1640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tadeuszwojcik opened this issue Sep 29, 2019 · 13 comments · Fixed by #1647
Closed

createWebSocketStream - objectMode #1640

tadeuszwojcik opened this issue Sep 29, 2019 · 13 comments · Fixed by #1647

Comments

@tadeuszwojcik
Copy link
Contributor

Hi, forgive me please if it's silly question, but is there a reason why duplex stream created via createWebSocketStream doesn't have objectMode set to true? ws.onmessage clearly produces/pushes separate messages not continuous stream of bytes, am I missing something?
I'm trying to run smth like this:

const WebSocket = require('ws')
const ws = new WebSocket('wss://www.bitmex.com/realtime')
const messageStream = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })

ws.once('open', function open() {
  ws.send(JSON.stringify({ op: 'subscribe', args: ['orderBookL2'] }))
})

async function run() {
  for await (let message of messageStream) {
    console.log(message)
  }
}
await run()

Is such pattern fine? Seems to be working and each message is correct string that matches json object provided by the server, but could that break as objectMode is not set?
I also tried to send message via stream, but couldn't sort it out quickly.

Would you perhaps accept PR that would change https://github.com/websockets/ws/blob/master/lib/stream.js#L65:
to:

const duplex = new Duplex({
    autoDestroy: false,
    emitClose: false,
    objectMode: false,
    readableObjectMode: false,
    writableObjectMode: false,
    ...options
  });

so I could provide objectMode flag via options?

@lpinca
Copy link
Member

lpinca commented Sep 29, 2019

Because it does not play well with backpressure handling. In object mode backpressure is handled by counting the number of objects and not the actual bytes of buffered messages.

Also it does not make much sense. As you noticed the WebSocket protocol is message oriented. What is pushed onto the readable queue of the wrapper stream is a full message. This means that every chunk you get from the 'data' listeners of the stream returned by createWebSocketStream() is the full message itself.

The pattern you are using is ok and will not break unless you use duplex.read() directly.

@tadeuszwojcik
Copy link
Contributor Author

All makes sense, thanks a lot!

@tadeuszwojcik
Copy link
Contributor Author

I'm not 100% sure why, but I found the issue where stream returned by createWebSocketStream returns two messages when iterating via for await at once which obviously breaks stuff. When using standard 'message' callback messages are returned separately as they should. Also when I set readableObjectMode to true as suggested in this issue it fixes the issue. So perhaps object mode should be set to on by default?
Here's small snippet that reproduces that:

const WebSocket = require('ws')
const ws = new WebSocket('wss://api-pub.bitfinex.com/ws/2')
const messageStream = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })

ws.once('open', function open() {
  ws.send(JSON.stringify({ event: 'subscribe', channel: 'book', len: 100, prec: 'P0', freq: 'F0', symbol: `tBTCUSD` }))
})

async function run() {
  for await (let message of messageStream) {
    console.log('string msg', message)
    console.log('parsed msg', JSON.parse(message))
  }
}
await run()

and on runkit: https://runkit.com/tadeuszwojcik/websocket-stream-issue
I think second message gets 'merged' with third and breaks JSON parse.

Please let me know if you need anything else from me, could be something I do incorrectly, not sure...

@tadeuszwojcik tadeuszwojcik reopened this Oct 10, 2019
@lpinca
Copy link
Member

lpinca commented Oct 10, 2019

It looks like an issue with the async iterator implementation https://github.com/nodejs/node/blob/v12.11.1/lib/internal/streams/async_iterator.js#L22 as it uses readable.read().

Can you reproduce without it using only a 'data' listener?

@lpinca
Copy link
Member

lpinca commented Oct 10, 2019

Using object mode is not an option because it messes up backpressure handling.

@tadeuszwojcik
Copy link
Contributor Author

Thanks, I get it now, indeed if I'd only use data listener it works fine. I was very happy with async iterable pattern when consuming WS messages, much nicer than callbacks, but it seems to no be working in practice unfortunately.
In theory if I'd be able to set readableObjectMode to true, then backpressure would work in a way that stream would read up to 16 WebSocket messages and then pause the underlying socket? or it would mess it up in different way/completely unusable one?

@lpinca
Copy link
Member

lpinca commented Oct 10, 2019

Yes, but if each message is 64 MiB you'll end up buffering 1 GiB. Similarly if each message is 1 B the socket will be paused after only 16 B. Hope it makes sense.

@tadeuszwojcik
Copy link
Contributor Author

tadeuszwojcik commented Oct 10, 2019

Thanks, that makes sense and in my case that would work just fine as I don't expect larger messages (and could argue most WS based API return small messages) also I could increase highWaterMark to larger value, smth like 100-500. I understand your position about defaults(readableObjectMode set to false), but will you be willing to accept PR that changes https://github.com/websockets/ws/blob/master/lib/stream.js#L65:
to:

const duplex = new Duplex({
    autoDestroy: false,
    emitClose: false,
    objectMode: false,
    readableObjectMode: false,
    writableObjectMode: false,
    ...options
  });

So one could be able to override defaults, as today it's not possible. If someone wants to do this, I'd assume that should be aware about the consequences (different back-pressure characteristics).
Thanks!

@lpinca
Copy link
Member

lpinca commented Oct 10, 2019

I have no objections but it's like giving users a loaded gun :)

@lpinca
Copy link
Member

lpinca commented Oct 12, 2019

Here is a possible workaround that keeps createWebSocketStream() "safe" and should also handle backpressure correctly.

'use strict';

const WebSocket = require('ws');
const { PassThrough } = require('stream');

const webSocketStream = WebSocket.createWebSocketStream(
  new WebSocket('wss://api-pub.bitfinex.com/ws/2'),
  { decodeStrings: false }
);

webSocketStream.on('error', console.error);

webSocketStream.write(
  JSON.stringify({
    event: 'subscribe',
    channel: 'book',
    len: 100,
    prec: 'P0',
    freq: 'F0',
    symbol: 'tBTCUSD'
  })
);

const messageStream = new PassThrough({
  encoding: 'utf8',
  readableObjectMode: true
});

webSocketStream.pipe(messageStream);

async function run() {
  for await (const message of messageStream) {
    console.log('string msg', message);
    console.log('parsed msg', JSON.parse(message));
  }
}

run().catch(console.error);

@tadeuszwojcik
Copy link
Contributor Author

Thanks, indeed that would work, but wouldn't it in practice provide two 'backpressure queues' - one for duplex stream (counted in bytes) and one for this PassThrough (objects based)?
I only wanted to have stream created by WebSocket.createWebSocketStream to work in object mode since WebSocket protocol is message/object based and I don't think anyone would expect to receive 'one and a half' message as it's possible right now. This is why I'd like to have and option to override defaults (eg. set readableObjectMode to true)

@lpinca
Copy link
Member

lpinca commented Oct 14, 2019

one for duplex stream (counted in bytes) and one for this PassThrough (objects based)

It is technically the same because the final readable is in object mode when it should not. The only difference is an additional buffer on the writable side of the PassThrough.

I don't think anyone would expect to receive 'one and a half' message as it's possible right now.

I would add "when using the Readable async iterator implementation" :). It is not a minor details as I think this should be fixed in Node.js core. There is not a 1:1 correspondence between 'data' events and iterations of a for await ... of loop and I think this will be a problem not only for ws.

@tadeuszwojcik
Copy link
Contributor Author

You're right, it could certainly be an issue with Node implementation, I just like a lot that WebSocket messages can be consumed via for await pattern and with proper backpressure handling, hence my small issue - that was only 'blocker' to use it in such way, so thanks a lot for considering it and merging in!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants