Skip to content

Commit db37df0

Browse files
committed
websockets support
1 parent 476725e commit db37df0

File tree

6 files changed

+218
-31
lines changed

6 files changed

+218
-31
lines changed

README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,32 @@ client.on('log', (level, loggerName, message, furtherInfo) => {
242242
The `level` being passed to the listener can be `verbose`, `info`, `warning` or `error`. Visit the [logging
243243
documentation][doc-logging] for more information.
244244

245+
## WebSockets
246+
247+
You can use websocket as transport. But Cassandra doesn't support this protocol
248+
so some proxy should be deployed in front of Cassandra, which can handle this transport protocol.
249+
250+
```javascript
251+
const client = new cassandra.Client({
252+
transport: 'WebSocket',
253+
contactPoints: [
254+
// some proxies that support websocket transport
255+
'127.0.0.1:9043',
256+
'localhost:9044'
257+
],
258+
webSocketOptions: {
259+
// some client websocket options
260+
protocolVersion: 13,
261+
...
262+
}
263+
});
264+
```
265+
266+
You can configure your websocket client with `webSocketOptions`.
267+
To properly configure it follow [websocket/ws doc][ws-doc].
268+
269+
You also can use websockets over SSL by passing `transport: 'SecureWebSocket'`.
270+
245271
## Compatibility
246272

247273
The driver supports all versions of Node.js, Cassandra, and DSE that are not EOL at the time of release. Only LTS eligible branches (i.e. even numbered releases) are supported for Node.js. See the [project documentation] for more information about the Node.js release cycle.
@@ -296,4 +322,5 @@ Unless required by applicable law or agreed to in writing, software distributed
296322
[streams2]: https://nodejs.org/api/stream.html#stream_class_stream_readable
297323
[cql-udt]: https://cassandra.apache.org/doc/latest/cql/types.html#udts
298324
[dse]: https://www.datastax.com/products/datastax-enterprise
299-
[astra]: https://www.datastax.com/products/datastax-astra
325+
[astra]: https://www.datastax.com/products/datastax-astra
326+
[ws-doc]: https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options

index.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { metrics } from './lib/metrics';
2424
import { tracker } from './lib/tracker';
2525
import { metadata } from './lib/metadata';
2626
import { datastax } from './lib/datastax/';
27+
import { ClientRequestArgs } from 'http';
2728
import Long = types.Long;
2829
import Uuid = types.Uuid;
2930
import graph = datastax.graph;
@@ -191,7 +192,11 @@ export interface ExecutionOptions {
191192
setHints(hints: string[]): void;
192193
}
193194

195+
export type WebSocketClientOptions = (ClientOptions | ClientRequestArgs)
196+
& {protocols?: string | string[] | undefined};
197+
194198
export interface ClientOptions {
199+
transport?: 'SecureWebSocket' | 'WebSocket' | undefined
195200
contactPoints?: string[];
196201
localDataCenter?: string;
197202
keyspace?: string;
@@ -253,6 +258,7 @@ export interface ClientOptions {
253258
tcpNoDelay?: boolean;
254259
};
255260
sslOptions?: tls.ConnectionOptions;
261+
webSocketOptions?: WebSocketClientOptions;
256262
}
257263

258264
export interface QueryOptions {

lib/connection.js

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const StreamIdStack = require('./stream-id-stack');
3232
const OperationState = require('./operation-state');
3333
const promiseUtils = require('./promise-utils');
3434
const { ExecutionOptions } = require('./execution-options');
35+
const { WebSocketWrapper } = require('./websocket');
3536

3637
/**
3738
* Represents a connection to a Cassandra node
@@ -171,30 +172,70 @@ class Connection extends events.EventEmitter {
171172
const self = this;
172173
this.log('info', `Connecting to ${this.endpointFriendlyName}`);
173174

174-
if (!this.options.sslOptions) {
175-
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });
176-
this.netClient.connect(this.port, this.address, function connectCallback() {
177-
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
178-
self.bindSocketListeners();
179-
self.startup(callback);
180-
});
181-
}
182-
else {
183-
// Use TLS
184-
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);
175+
if (this.options.transport) {
176+
if (this.options.transport.toLowerCase() === 'securewebsocket') {
177+
// Use secure WebSocket
178+
const options = utils.extend({ rejectUnauthorized: false, transport: this.options.transport },
179+
this.options.webSocketOptions);
180+
181+
if (!options.protocols) {
182+
options.protocols = ['cql'];
183+
}
184+
185+
this.netClient = new WebSocketWrapper(options);
186+
187+
this.netClient.connect(this.port, this.address, function connectCallback() {
188+
self.log('verbose', `Secure WebSocket to ${self.endpointFriendlyName}`);
189+
self.bindSocketListeners();
190+
self.startup(callback);
191+
});
192+
} else {
193+
// Use WebSocket
194+
const options = utils.extend({
195+
transport: this.options.transport,
196+
highWaterMark: this.options.socketOptions.coalescingThreshold,
197+
handshakeTimeout: this.options.socketOptions.connectTimeout,
198+
}, this.options.webSocketOptions);
199+
200+
if (!options.protocols) {
201+
options.protocols = ['cql'];
202+
}
185203

186-
if (this.options.sni) {
187-
sslOptions.servername = this._serverName;
204+
this.netClient = new WebSocketWrapper(options);
205+
206+
this.netClient.connect(this.port, this.address, function connectCallback() {
207+
self.log('verbose', `WebSocket connected to ${self.endpointFriendlyName}`);
208+
self.bindSocketListeners();
209+
self.startup(callback);
210+
});
188211
}
212+
} else {
213+
// Use Socket
214+
if (!this.options.sslOptions) {
215+
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });
216+
217+
this.netClient.connect(this.port, this.address, function connectCallback() {
218+
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
219+
self.bindSocketListeners();
220+
self.startup(callback);
221+
});
222+
} else {
223+
// Use Socket with TLS
224+
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);
189225

190-
this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
191-
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
192-
self.bindSocketListeners();
193-
self.startup(callback);
194-
});
226+
if (this.options.sni) {
227+
sslOptions.servername = this._serverName;
228+
}
195229

196-
// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
197-
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
230+
this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
231+
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
232+
self.bindSocketListeners();
233+
self.startup(callback);
234+
});
235+
236+
// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
237+
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
238+
}
198239
}
199240

200241
this.netClient.once('error', function socketError(err) {

lib/websocket.js

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
'use strict';
18+
19+
const { EventEmitter } = require('events');
20+
const { WebSocket } = require('ws');
21+
22+
/**
23+
* WebSocketWrapper is a wrapper on the `ws.Websocket` which implements
24+
* `net.Socket` interface to be used by the `cassandra.Connection`
25+
*/
26+
class WebSocketWrapper extends EventEmitter {
27+
/**
28+
* Creates a websocket wrapper instance. To connect use `connect` method
29+
* @param {object} options client options for a websocket
30+
*/
31+
constructor(options) {
32+
super();
33+
this.options = options;
34+
}
35+
36+
/**
37+
* Creates an instance of a websocket and connects
38+
* @param {String} port
39+
* @param {String} address
40+
* @param {() => void} connectionCallback is called when connection is successfully established
41+
* @returns {WebSocketWrapper} wrapper itself
42+
*/
43+
connect(port, address, connectionCallback) {
44+
const schema = this.options.transport.toLowerCase() === 'securewebsocket' ? 'wss' : 'ws';
45+
46+
this.ws = new WebSocket(schema+'://'+address+':'+port, this.options.protocols, this.options);
47+
48+
if (connectionCallback) {
49+
this.ws.on('open', connectionCallback);
50+
}
51+
52+
const stream = WebSocket.createWebSocketStream(this.ws, this.options);
53+
54+
stream.on('error', err => {
55+
this.emit('error', err);
56+
});
57+
stream.on('drain', () => {
58+
this.emit('drain');
59+
});
60+
stream.on('close', () => {
61+
this.emit('close');
62+
});
63+
stream.on('end', () => {
64+
this.emit('end');
65+
});
66+
67+
this.write = stream.write.bind(stream);
68+
this.pipe = stream.pipe.bind(stream);
69+
this.end = stream.end.bind(stream);
70+
this.destroy = stream.destroy.bind(stream);
71+
72+
return this;
73+
}
74+
75+
/**
76+
* It is not implemented because `ws` lib doesn't provide API to work with
77+
*/
78+
setTimeout() {}
79+
80+
/**
81+
* It is not implemented because `ws` lib doesn't provide API to work with
82+
*/
83+
setKeepAlive() {}
84+
85+
/**
86+
* It is not implemented because `ws` lib doesn't provide API to work with
87+
*/
88+
setNoDelay() {}
89+
}
90+
91+
module.exports.WebSocketWrapper = WebSocketWrapper;

package-lock.json

Lines changed: 30 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
"@types/long": "~5.0.0",
2424
"@types/node": ">=8",
2525
"adm-zip": "~0.5.10",
26-
"long": "~5.2.3"
26+
"long": "~5.2.3",
27+
"ws": "^8.16.0"
2728
},
2829
"devDependencies": {
2930
"chai": "~4.3.8",

0 commit comments

Comments
 (0)