Skip to content

Commit acce295

Browse files
committed
feat: treat streaming edge cases
1 parent 060bd43 commit acce295

File tree

3 files changed

+46
-1
lines changed

3 files changed

+46
-1
lines changed

src/new/connection.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ enum StreamRequestEvent {
2525

2626
const STREAM_HISTORY_ACK_EVENT = 'request:history:ack';
2727

28+
type FullnodeCapability = 'history-streaming';
29+
2830
/**
2931
* Stream abort controller that carries the streamId it is managing.
3032
*/
@@ -60,6 +62,8 @@ class WalletConnection extends BaseConnection {
6062

6163
streamWindowSize: number | undefined;
6264

65+
capabilities?: FullnodeCapability[];
66+
6367
constructor(options: ConnectionParams & { streamWindowSize?: number }) {
6468
super(options);
6569

@@ -96,6 +100,7 @@ class WalletConnection extends BaseConnection {
96100
this.websocket.on('is_online', this.onConnectionChange);
97101
this.websocket.on('wallet', this.handleWalletMessage.bind(this));
98102
this.websocket.on('stream', this.handleStreamMessage.bind(this));
103+
this.websocket.on('capabilities', this.handleCapabilities.bind(this));
99104

100105
this.websocket.on('height_updated', height => {
101106
this.emit('best-block-update', height);
@@ -109,6 +114,29 @@ class WalletConnection extends BaseConnection {
109114
this.websocket.setup();
110115
}
111116

117+
/**
118+
* Handle the capabilities event from the websocket.
119+
*/
120+
handleCapabilities(data: { type: string, capabilities: FullnodeCapability[]}) {
121+
this.logger.debug(`Fullnode has capabilities: ${JSON.stringify(data.capabilities)}`);
122+
const { capabilities } = data;
123+
if (!capabilities) {
124+
return;
125+
}
126+
this.capabilities = capabilities;
127+
}
128+
129+
/**
130+
* Check if the connected fullnode has the desired capability.
131+
* Will return false if the fullnode has not yet sent the capability list.
132+
*/
133+
hasCapability(flag: FullnodeCapability) {
134+
if (!this.capabilities) {
135+
return false;
136+
}
137+
return this.capabilities?.includes(flag) || false;
138+
}
139+
112140
startControlHandlers(storage: IStorage) {
113141
this.removeMetricsHandlers();
114142
this.addMetricsHandlers(storage);

src/new/wallet.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1452,7 +1452,7 @@ class HathorWallet extends EventEmitter {
14521452
if (info.network.indexOf(this.conn.network) >= 0) {
14531453
this.storage.setApiVersion(info);
14541454
await this.storage.saveNativeToken();
1455-
this.conn.start(); // XXX: maybe await?
1455+
this.conn.start();
14561456
} else {
14571457
this.setState(HathorWallet.CLOSED);
14581458
throw new Error(`Wrong network. server=${info.network} expected=${this.conn.network}`);

src/sync/stream.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Address as BitcoreAddress, HDPublicKey } from 'bitcore-lib';
2+
import queueMicrotask from 'queue-microtask';
23
import FullNodeConnection from '../new/connection';
34
import {
45
IStorage,
@@ -424,6 +425,12 @@ export class StreamManager extends AbortController {
424425
throw new Error('No abort controller on connection');
425426
}
426427

428+
// The connection with the fullnode may have been lost during the time waiting to sync
429+
if(signal.aborted) {
430+
this.abort();
431+
throw new Error('The connection already aborted this stream');
432+
}
433+
427434
signal.addEventListener(
428435
'abort',
429436
() => {
@@ -554,6 +561,16 @@ export class StreamManager extends AbortController {
554561
await this.storage.addTx(item.vertex);
555562
}
556563
this.stats.proc();
564+
565+
/**
566+
* This promise will resolve after the JS task queue current run.
567+
* This means that we will free IO tasks like receiving events from the WS
568+
* and other async code to run before we continue processing our event queue.
569+
* @see https://www.npmjs.com/package/queue-microtask
570+
*/
571+
new Promise<void>(resolve => {
572+
queueMicrotask(resolve);
573+
});
557574
}
558575

559576
this.isProcessingQueue = false;

0 commit comments

Comments
 (0)