Skip to content

Commit 6aec3f7

Browse files
committed
Send all outgoingMessages$ with the latest available open connection
1 parent a0b1ba8 commit 6aec3f7

File tree

1 file changed

+41
-35
lines changed

1 file changed

+41
-35
lines changed

src/GremlinClient.js

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,22 @@ const hasCode = (filterCode) => ({ status: { code } }) => code === filterCode;
1717

1818
const isErrorMessage = ({ status: { code }}) => [200, 204, 206].indexOf(code) === -1;
1919

20+
const serializeToBinary = (message, accept) => {
21+
let serializedMessage = accept + JSON.stringify(message);
22+
serializedMessage = unescape(encodeURIComponent(serializedMessage));
23+
24+
// Let's start packing the message into binary
25+
// mimeLength(1) + mimeType Length + serializedMessage Length
26+
let binaryMessage = new Uint8Array(1 + serializedMessage.length);
27+
binaryMessage[0] = accept.length;
28+
29+
for (let i = 0; i < serializedMessage.length; i++) {
30+
binaryMessage[i + 1] = serializedMessage.charCodeAt(i);
31+
}
32+
33+
return binaryMessage;
34+
}
35+
2036
class GremlinClient extends EventEmitter {
2137
constructor(port = 8182, host = 'localhost', options = {}) {
2238
super();
@@ -47,38 +63,37 @@ class GremlinClient extends EventEmitter {
4763

4864
this.commands = {};
4965

50-
const connection = this.createConnection({
51-
port,
52-
host,
53-
path: this.options.path
54-
});
5566

5667
this.commands$ = new Rx.Subject();
5768
this.commands$.subscribe((command) => {
5869
const { message: { requestId } } = command;
5970
this.commands[requestId] = command
6071
});
6172

62-
this.registerConnection(connection);
63-
}
73+
const connection = this.createConnection({
74+
port,
75+
host,
76+
path: this.options.path
77+
});
6478

65-
createConnection({ port, host, path }) {
66-
return new WebSocketGremlinConnection({ port, host, path });
67-
}
79+
const connections$ = Rx.Observable.create((observer) => observer.next(connection));
6880

69-
registerConnection(connection) {
70-
this.connection = connection;
81+
const open$ = connections$
82+
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'open'));
7183

72-
const open$ = Rx.Observable.fromEvent(connection, 'open');
73-
const error$ = Rx.Observable.fromEvent(connection, 'error');
74-
const incomingMessages$ = Rx.Observable.fromEvent(connection, 'message')
84+
const error$ = connections$
85+
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'error'));
86+
87+
const incomingMessages$ = connections$
88+
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'message'))
7589
.map(({ data }) => {
7690
const buffer = new Buffer(data, 'binary');
7791
const rawMessage = JSON.parse(buffer.toString('utf-8'));
7892

7993
return rawMessage;
8094
});
81-
const close$ = Rx.Observable.fromEvent(connection, 'close');
95+
const close$ = connections$
96+
.flatMap((connection) => Rx.Observable.fromEvent(connection, 'close'));
8297

8398
const canSend$ = Rx.Observable.merge(
8499
open$.map(true),
@@ -95,11 +110,18 @@ class GremlinClient extends EventEmitter {
95110
close$.subscribe((event) => this.handleDisconnection(event));
96111

97112
const outgoingMessages$ = this.commands$
98-
.map(({ message }) => message)
99-
.pausableBuffered(canSend$);
113+
.map(({ message }) => serializeToBinary(message, this.options.accept))
114+
.pausableBuffered(canSend$)
115+
.combineLatest(connections$);
100116

101117
outgoingMessages$
102-
.subscribe((message) => this.sendMessage(message));
118+
.subscribe(([binaryMessage, connection]) =>
119+
connection.sendMessage(binaryMessage)
120+
);
121+
}
122+
123+
createConnection({ port, host, path }) {
124+
return new WebSocketGremlinConnection({ port, host, path });
103125
}
104126

105127
handleError(err) {
@@ -177,22 +199,6 @@ class GremlinClient extends EventEmitter {
177199
return message;
178200
};
179201

180-
sendMessage(message) {
181-
let serializedMessage = this.options.accept + JSON.stringify(message);
182-
serializedMessage = unescape(encodeURIComponent(serializedMessage));
183-
184-
// Let's start packing the message into binary
185-
// mimeLength(1) + mimeType Length + serializedMessage Length
186-
let binaryMessage = new Uint8Array(1 + serializedMessage.length);
187-
binaryMessage[0] = this.options.accept.length;
188-
189-
for (let i = 0; i < serializedMessage.length; i++) {
190-
binaryMessage[i + 1] = serializedMessage.charCodeAt(i);
191-
}
192-
193-
this.connection.sendMessage(binaryMessage);
194-
};
195-
196202
/**
197203
* Asynchronously send a script to Gremlin Server for execution and fire
198204
* the provided callback when all results have been fetched.

0 commit comments

Comments
 (0)