Skip to content

Commit 420d258

Browse files
authored
Merge pull request #769 from HathorNetwork/chore/sync-release-v1.12.1
chore: sync release v1.12.1
2 parents 060bd43 + 28e0e97 commit 420d258

File tree

4 files changed

+33
-14
lines changed

4 files changed

+33
-14
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@hathor/wallet-lib",
3-
"version": "1.12.0",
3+
"version": "1.12.1",
44
"description": "Library used by Hathor Wallet",
55
"main": "lib/index.js",
66
"engines": {

src/sync/gll.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,30 @@
1+
import { EventEmitter } from 'events';
12
import PQueue from 'queue-promise';
23
import { ILogger } from '../types';
34
import { GlobalLoadLockTaskError } from '../errors';
45

5-
const MAX_CONCURRENT_LOAD_TASKS = 5;
6+
const MAX_CONCURRENT_LOAD_TASKS = 3;
67

78
const GLL = new PQueue({ concurrent: MAX_CONCURRENT_LOAD_TASKS });
9+
const GLLEvents = new EventEmitter();
10+
11+
/**
12+
* PQueue will emit a `resolve` event when a task is resolved.
13+
* The data will be the taskId since our tasks always resolve with the taskId.
14+
* We then emit another event on GLLEvents specially for this taskId.
15+
*/
16+
GLL.on('resolve', data => {
17+
GLLEvents.emit(data, true);
18+
});
19+
20+
/**
21+
* PQueue will emit a `reject` event when a task is rejected.
22+
* The data will be the taskId and the error since our tasks always reject with them.
23+
* We then emit another event on GLLEvents specially for this taskId.
24+
*/
25+
GLL.on('reject', data => {
26+
GLLEvents.emit(data.taskId, false, data.innerError);
27+
});
828

929
/**
1030
* Add task to the GLL and return a promise that will resolve or reject when the task resolves or rejects.
@@ -16,15 +36,11 @@ export function addTask(task: () => Promise<void>, logger: ILogger) {
1636
const startTime = Date.now();
1737
// This promise will resolve or reject when the task does, so the caller can know his task has ended.
1838
const promise = new Promise<void>((resolve, reject) => {
19-
GLL.on('resolve', data => {
20-
if (data === taskId) {
39+
GLLEvents.once(taskId, (success, error) => {
40+
if (success) {
2141
resolve();
22-
}
23-
});
24-
25-
GLL.on('reject', data => {
26-
if (data.taskId === taskId) {
27-
reject(data.innerError);
42+
} else {
43+
reject(error);
2844
}
2945
});
3046
});

src/sync/stream.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const QUEUE_GRACEFUL_SHUTDOWN_LIMIT = 10000;
1717
interface IStreamSyncHistoryBegin {
1818
type: 'stream:history:begin';
1919
id: string;
20+
seq: number;
2021
}
2122

2223
interface IStreamSyncHistoryVertex {
@@ -37,6 +38,7 @@ interface IStreamSyncHistoryAddress {
3738
interface IStreamSyncHistoryEnd {
3839
type: 'stream:history:end';
3940
id: string;
41+
seq: number;
4042
}
4143

4244
interface IStreamSyncHistoryError {
@@ -645,9 +647,10 @@ export class StreamManager extends AbortController {
645647
}
646648
}
647649

648-
endStream() {
650+
endStream(seq: number) {
649651
this.logger.debug('Received end-of-stream event.');
650652
this.hasReceivedEndStream = true;
653+
this.connection.sendStreamHistoryAck(this.streamId, seq);
651654
}
652655

653656
async shutdown() {
@@ -713,7 +716,7 @@ function buildListener(manager: StreamManager, resolve: () => void) {
713716
manager.generateNextBatch();
714717
} else if (isStreamSyncHistoryEnd(wsData)) {
715718
// cleanup and stop the method.
716-
manager.endStream();
719+
manager.endStream(wsData.seq);
717720
resolve();
718721
} else if (isStreamSyncHistoryError(wsData)) {
719722
// An error happened on the fullnode, we should stop the stream

0 commit comments

Comments
 (0)