Skip to content

Commit 2c213a5

Browse files
authored
Merge branch 'master' into fix/selected_as_input_ttl
2 parents 99cff4b + 30a5b18 commit 2c213a5

File tree

6 files changed

+113
-3
lines changed

6 files changed

+113
-3
lines changed

__tests__/stream.test.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ const SERVER_MOCK_TYPE = {
5757
unknownId: 'unknown_id',
5858
};
5959

60-
function makeServerMock(mockServer, mockType) {
60+
function makeServerMock(mockServer, mockType, sendCapabilities = true) {
6161
function streamHistoryForSocket(streamId, socket) {
6262
// Begin event marks the start of a stream
6363
socket.send(
@@ -100,6 +100,9 @@ function makeServerMock(mockServer, mockType) {
100100
}
101101

102102
mockServer.on('connection', socket => {
103+
if (sendCapabilities) {
104+
socket.send(JSON.stringify({ type: 'capabilities', capabilities: ['history-streaming'] }));
105+
}
103106
socket.on('message', data => {
104107
const jsonData = JSON.parse(data);
105108
if (jsonData.type === 'subscribe_address') {
@@ -277,4 +280,38 @@ describe('Websocket stream history sync', () => {
277280
mockServer.stop();
278281
}
279282
}, 10000);
283+
284+
it('should default to POLLING_HTTP_API without capabilities', async () => {
285+
const mockServer = new Server('ws://localhost:8080/v1a/ws/');
286+
makeServerMock(mockServer, SERVER_MOCK_TYPE.simple, false);
287+
const wallet = await startWalletFor(HistorySyncMode.MANUAL_STREAM_WS);
288+
wallet.conn.on('stream', data => {
289+
// Any stream event should fail the test
290+
throw new Error(`Received a stream event: ${JSON.stringify(data)}`);
291+
});
292+
wallet.on('state', state => {
293+
// If the sync fails, fail the test
294+
if (state === HathorWallet.ERROR) {
295+
throw new Error('Wallet reached an error state');
296+
}
297+
});
298+
try {
299+
while (true) {
300+
if (wallet.isReady()) {
301+
break;
302+
}
303+
await new Promise(resolve => {
304+
setTimeout(resolve, 100);
305+
});
306+
}
307+
} finally {
308+
// Stop wallet
309+
await wallet.stop({ cleanStorage: true, cleanAddresses: true });
310+
mockServer.stop();
311+
}
312+
313+
await expect(wallet.getAddressAtIndex(0)).resolves.toEqual(
314+
'WewDeXWyvHP7jJTs7tjLoQfoB72LLxJQqN'
315+
);
316+
}, 10000);
280317
});

package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"level": "8.0.1",
2525
"lodash": "4.17.21",
2626
"long": "5.2.3",
27+
"queue-microtask": "1.2.3",
2728
"ws": "8.17.1"
2829
},
2930
"scripts": {

src/new/connection.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { handleSubscribeAddress, handleWsDashboard } from '../utils/connection';
1414
import { IStorage, ILogger, getDefaultLogger } from '../types';
1515

1616
const STREAM_ABORT_TIMEOUT = 10000; // 10s
17+
const CAPABILITIES_WAIT_TIMEOUT = 2000; // 2s
1718

1819
/**
1920
* Event names for requesting stream from fullnode
@@ -25,6 +26,8 @@ enum StreamRequestEvent {
2526

2627
const STREAM_HISTORY_ACK_EVENT = 'request:history:ack';
2728

29+
type FullnodeCapability = 'history-streaming';
30+
2831
/**
2932
* Stream abort controller that carries the streamId it is managing.
3033
*/
@@ -60,6 +63,8 @@ class WalletConnection extends BaseConnection {
6063

6164
streamWindowSize: number | undefined;
6265

66+
capabilities?: FullnodeCapability[];
67+
6368
constructor(options: ConnectionParams & { streamWindowSize?: number }) {
6469
super(options);
6570

@@ -96,6 +101,7 @@ class WalletConnection extends BaseConnection {
96101
this.websocket.on('is_online', this.onConnectionChange);
97102
this.websocket.on('wallet', this.handleWalletMessage.bind(this));
98103
this.websocket.on('stream', this.handleStreamMessage.bind(this));
104+
this.websocket.on('capabilities', this.handleCapabilities.bind(this));
99105

100106
this.websocket.on('height_updated', height => {
101107
this.emit('best-block-update', height);
@@ -109,6 +115,42 @@ class WalletConnection extends BaseConnection {
109115
this.websocket.setup();
110116
}
111117

118+
/**
119+
* Handle the capabilities event from the websocket.
120+
*/
121+
handleCapabilities(data: { type: string; capabilities: FullnodeCapability[] }) {
122+
this.logger.debug(`Fullnode has capabilities: ${JSON.stringify(data.capabilities)}`);
123+
const { capabilities } = data;
124+
if (!capabilities) {
125+
return;
126+
}
127+
this.capabilities = capabilities;
128+
}
129+
130+
/**
131+
* If the fullnode has not sent the capabilities yet wait a while.
132+
*/
133+
async waitCapabilities() {
134+
if (this.capabilities === undefined) {
135+
// Wait 2s so the fullnode has some time to send the capabilities envent
136+
await new Promise<void>(resolve => {
137+
setTimeout(resolve, CAPABILITIES_WAIT_TIMEOUT);
138+
});
139+
}
140+
}
141+
142+
/**
143+
* Check if the connected fullnode has the desired capability.
144+
* Will return false if the fullnode has not yet sent the capability list.
145+
*/
146+
async hasCapability(flag: FullnodeCapability) {
147+
await this.waitCapabilities();
148+
if (!this.capabilities) {
149+
return false;
150+
}
151+
return this.capabilities?.includes(flag) || false;
152+
}
153+
112154
startControlHandlers(storage: IStorage) {
113155
this.removeMetricsHandlers();
114156
this.addMetricsHandlers(storage);

src/new/wallet.js

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ class HathorWallet extends EventEmitter {
172172
this.storage = new Storage(store);
173173
}
174174
this.storage.setLogger(this.logger);
175+
/**
176+
* @type {import('./connection').default}
177+
*/
175178
this.conn = connection;
176179
this.conn.startControlHandlers(this.storage);
177180

@@ -1452,7 +1455,7 @@ class HathorWallet extends EventEmitter {
14521455
if (info.network.indexOf(this.conn.network) >= 0) {
14531456
this.storage.setApiVersion(info);
14541457
await this.storage.saveNativeToken();
1455-
this.conn.start(); // XXX: maybe await?
1458+
this.conn.start();
14561459
} else {
14571460
this.setState(HathorWallet.CLOSED);
14581461
throw new Error(`Wrong network. server=${info.network} expected=${this.conn.network}`);
@@ -2858,7 +2861,22 @@ class HathorWallet extends EventEmitter {
28582861
if (!(await getSupportedSyncMode(this.storage)).includes(this.historySyncMode)) {
28592862
throw new Error('Trying to use an unsupported sync method for this wallet.');
28602863
}
2861-
const syncMethod = getHistorySyncMethod(this.historySyncMode);
2864+
let syncMode = this.historySyncMode;
2865+
if (
2866+
[HistorySyncMode.MANUAL_STREAM_WS, HistorySyncMode.XPUB_STREAM_WS].includes(
2867+
this.historySyncMode
2868+
) &&
2869+
!(await this.conn.hasCapability('history-streaming'))
2870+
) {
2871+
// History sync mode is streaming but fullnode is not streaming capable.
2872+
// We revert to the http polling default.
2873+
this.logger.debug(
2874+
'Either fullnode does not support history-streaming or has not sent a capabilities event'
2875+
);
2876+
this.logger.debug('Falling back to http polling API');
2877+
syncMode = HistorySyncMode.POLLING_HTTP_API;
2878+
}
2879+
const syncMethod = getHistorySyncMethod(syncMode);
28622880
// This will add the task to the GLL queue and return a promise that
28632881
// resolves when the task finishes executing
28642882
await GLL.add(async () => {

src/sync/stream.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Address as BitcoreAddress, HDPublicKey } from 'bitcore-lib';
2+
import queueMicrotask from 'queue-microtask';
23
import FullNodeConnection from '../new/connection';
34
import {
45
IStorage,
@@ -556,6 +557,16 @@ export class StreamManager extends AbortController {
556557
await this.storage.addTx(item.vertex);
557558
}
558559
this.stats.proc();
560+
561+
/**
562+
* This promise will resolve after the JS task queue current run.
563+
* This means that we will free IO tasks like receiving events from the WS
564+
* and other async code to run before we continue processing our event queue.
565+
* @see https://www.npmjs.com/package/queue-microtask
566+
*/
567+
await new Promise<void>(resolve => {
568+
queueMicrotask(resolve);
569+
});
559570
}
560571

561572
this.isProcessingQueue = false;

0 commit comments

Comments
 (0)