Skip to content

Commit c7e3e1b

Browse files
authored
Merge pull request #2029 from atsign-foundation/perf-tweaking
2 parents bcbcfeb + caf0b44 commit c7e3e1b

File tree

18 files changed

+188
-72
lines changed

18 files changed

+188
-72
lines changed

packages/dart/noports_core/lib/src/npt/npt.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ class _NptImpl extends NptBase
192192
/// Start the sshnpd payload handler
193193
await sshnpdChannel.callInitialization();
194194

195+
if (sshnpdChannel.cachedPingResponse != null) {
196+
_srvdChannel.cachedDaemonPublicSigningKeyUri =
197+
sshnpdChannel.cachedPingResponse!['publicSigningKeyUri'];
198+
}
199+
195200
List<DaemonFeature> requiredFeatures = [
196201
DaemonFeature.srAuth,
197202
DaemonFeature.srE2ee,

packages/dart/noports_core/lib/src/srvd/isolates/port_pair_isolate.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ class PortPairWorker extends RelayWorker {
104104
if (lookups.containsKey(atKey)) {
105105
return lookups[atKey];
106106
} else {
107-
final resp = await rpcToMain(IIRequest.create('lookup', atKey));
107+
final resp = await rpcToMain(
108+
IIRequest.create('lookup', {'key': atKey, 'sessionId': sessionId}));
108109
lookups[atKey] = resp.payload;
109110
return resp.payload;
110111
}

packages/dart/noports_core/lib/src/srvd/isolates/relay_worker.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ abstract class RelayWorker implements RelayAuthVerifyHelper {
5656
} else {
5757
rpcCompleters[msg.id]!.complete(msg);
5858
}
59+
} else {
60+
logger.shout('Got an unexpected IIResponse (${msg.toString()})');
5961
}
6062
return;
6163
} else {

packages/dart/noports_core/lib/src/srvd/isolates/shared_single_port_isolate.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ class SinglePortWorker extends RelayWorker {
7171
if (si.lookups[atKey] != null) {
7272
return si.lookups[atKey]!;
7373
} else {
74-
final resp = await rpcToMain(IIRequest.create('lookup', atKey));
74+
final resp = await rpcToMain(
75+
IIRequest.create('lookup', {'key': atKey, 'sessionId': sessionId}));
7576
si.lookups[atKey] = resp.payload;
7677
return resp.payload;
7778
}

packages/dart/noports_core/lib/src/srvd/srvd_impl.dart

Lines changed: 72 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import 'srvd_session_params.dart';
1818
@protected
1919
class SrvdImpl implements Srvd {
2020
@override
21-
final AtSignLogger logger = AtSignLogger(' srvd ');
21+
final AtSignLogger logger = AtSignLogger(' srvd main ');
2222
@override
2323
AtClient atClient;
2424
@override
@@ -90,6 +90,9 @@ class SrvdImpl implements Srvd {
9090
if (p.verbose) {
9191
AtSignLogger.root_level = 'INFO';
9292
}
93+
if (p.debug) {
94+
AtSignLogger.root_level = 'FINEST';
95+
}
9396

9497
if (atClient == null && atClientGenerator == null) {
9598
throw StateError('atClient and atClientGenerator are both null');
@@ -217,41 +220,49 @@ class SrvdImpl implements Srvd {
217220
return;
218221
}
219222

220-
var mutexKey = AtKey.fromString('${sessionParams.sessionId}'
221-
'.session_mutexes.${Srvd.namespace}'
222-
'${atClient.getCurrentAtSign()!}')
223-
..metadata = (Metadata()
224-
..immutable = true // only one srvd will succeed in doing this
225-
..ttl = 30000); // expire after 30 seconds to keep datastore clean
226-
PutRequestOptions pro = PutRequestOptions()
227-
..shouldEncrypt = false
228-
..useRemoteAtServer = true;
229-
230-
try {
231-
await atClient.put(
232-
mutexKey,
233-
'lock',
234-
putRequestOptions: pro,
235-
);
223+
if (sessionParams.multipleAcksOk) {
224+
// client can handle multiple acks, no need to lock a mutex
236225
logger.shout('😎 Will handle request from ${notification.from}'
237-
'; acquired mutex $mutexKey');
238-
} catch (err) {
239-
if (err.toString().toLowerCase().contains('immutable')) {
240-
logger.shout('🤷‍♂️ Will not handle request from ${notification.from}'
241-
'; did not acquire mutex $mutexKey');
242-
ppiSendToSpawned?.send(IIRequest.create('stop', null));
243-
} else {
244-
logger.shout('Will not handle; did not acquire mutex $mutexKey : $err');
226+
' which can handle multiple acks (no mutex required)');
227+
} else {
228+
// client cannot handle multiple acks, so we need to lock a mutex
229+
var mutexKey = AtKey.fromString('${sessionParams.sessionId}'
230+
'.session_mutexes.${Srvd.namespace}'
231+
'${atClient.getCurrentAtSign()!}')
232+
..metadata = (Metadata()
233+
..immutable = true // only one srvd will succeed in doing this
234+
..ttl = 30000); // expire after 30 seconds to keep datastore clean
235+
PutRequestOptions pro = PutRequestOptions()
236+
..shouldEncrypt = false
237+
..useRemoteAtServer = true;
238+
239+
try {
240+
await atClient.put(
241+
mutexKey,
242+
'lock',
243+
putRequestOptions: pro,
244+
);
245+
logger.shout('😎 Will handle request from ${notification.from}'
246+
'; acquired mutex $mutexKey');
247+
} catch (err) {
248+
if (err.toString().toLowerCase().contains('immutable')) {
249+
logger.shout('🤷‍♂️ Will not handle request from ${notification.from}'
250+
'; did not acquire mutex $mutexKey');
251+
ppiSendToSpawned?.send(IIRequest.create('stop', null));
252+
} else {
253+
logger
254+
.shout('Will not handle; did not acquire mutex $mutexKey : $err');
255+
}
256+
return;
245257
}
246-
return;
247258
}
248259

249260
if (sessionParams.only443) {
250261
if (!bind443) {
251262
var message = 'Client requested port 443'
252263
' but this relay is not bound to port 443';
253264
logger.shout(message);
254-
if (sessionParams.sendNacks) {
265+
if (sessionParams.multipleAcksOk) {
255266
try {
256267
await sendNack(
257268
sessionId: sessionParams.sessionId,
@@ -299,20 +310,47 @@ class SrvdImpl implements Srvd {
299310
waitForFinalDeliveryStatus: false,
300311
checkForFinalDeliveryStatus: false);
301312
} catch (e) {
302-
stderr.writeln("Error writing session ${notification.value} atKey");
313+
logger.shout("Error sending response to client");
303314
}
315+
316+
preFetched[sessionParams.sessionId] = {};
317+
for (final s in sessionParams.preFetch) {
318+
try {
319+
final AtValue value = await _lookup(AtKey.fromString(s));
320+
preFetched[sessionParams.sessionId]![s] = value.value;
321+
} catch (e) {
322+
logger.shout('$e while preFetching $s');
323+
}
324+
}
325+
unawaited(Future.delayed(Duration(seconds: 30))
326+
.whenComplete(() => preFetched.remove(sessionParams.sessionId)));
327+
}
328+
329+
Map<String, Map<String, dynamic>> preFetched = {};
330+
331+
Future<AtValue> _lookup(AtKey atKey) async {
332+
logger.info('Looking up $atKey on atServer');
333+
return await atClient.get(
334+
atKey,
335+
getRequestOptions: GetRequestOptions()..useRemoteAtServer = true,
336+
);
304337
}
305338

306339
@override
307340
Future<void> lookup(IIRequest msg, SendPort toSpawned) async {
308-
final AtValue value;
309341
try {
310342
logger.info('request: "lookup" : ${msg.payload}');
311-
value = await atClient.get(
312-
AtKey.fromString(msg.payload),
313-
getRequestOptions: GetRequestOptions()..useRemoteAtServer = true,
314-
);
315-
logger.info('request: "lookup" : success ${value.value}');
343+
String sessionId = msg.payload['sessionId'];
344+
String key = msg.payload['key'];
345+
AtValue value;
346+
String fromPreFetch = '';
347+
if (preFetched[sessionId]?[key] != null) {
348+
value = AtValue()..value = preFetched[sessionId]?[key];
349+
fromPreFetch = ' (pre-fetched)';
350+
} else {
351+
value = await _lookup(AtKey.fromString(key));
352+
}
353+
logger.info('request: "lookup" : success$fromPreFetch: ${value.value}');
316354
toSpawned.send(IIResponse(
317355
id: msg.id,
318356
isError: false,

packages/dart/noports_core/lib/src/srvd/srvd_params.dart

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class SrvdParams {
1414
final bool logTraffic;
1515
final String rootDomain;
1616
final bool perSessionStorage;
17+
final bool debug;
1718

1819
/// Whether to start an isolate where all connections are to the same port
1920
final bool bind443;
@@ -37,6 +38,7 @@ class SrvdParams {
3738
required this.perSessionStorage,
3839
required this.bind443,
3940
required this.localBindPort443,
41+
required this.debug,
4042
});
4143

4244
static Future<SrvdParams> fromArgs(List<String> args) async {
@@ -65,6 +67,7 @@ class SrvdParams {
6567
bind443: r['443'],
6668
localBindPort443:
6769
r['443-bind-port'] == null ? 443 : int.parse(r['443-bind-port']),
70+
debug: r['debug'],
6871
);
6972
}
7073

@@ -105,7 +108,12 @@ class SrvdParams {
105108
parser.addFlag(
106109
'verbose',
107110
abbr: 'v',
108-
help: 'More logging',
111+
help: 'Show more logs (INFO and above)',
112+
);
113+
parser.addFlag(
114+
'debug',
115+
defaultsTo: false,
116+
help: 'Show all logs (FINEST and above)',
109117
);
110118
if (BuildEnv.enableSnoop) {
111119
parser.addFlag(

packages/dart/noports_core/lib/src/srvd/srvd_session_params.dart

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:async';
22
import 'dart:convert';
33

44
import 'package:at_client/at_client.dart';
5+
import 'package:at_utils/at_logger.dart';
56
import 'package:noports_core/sshnp_foundation.dart';
67
import 'package:noports_core/srvd.dart';
78

@@ -18,7 +19,8 @@ class SrvdSessionParams {
1819
final RelayAuthMode relayAuthMode;
1920
final String? relayAuthAesKey;
2021
final bool only443;
21-
final bool sendNacks;
22+
final bool multipleAcksOk;
23+
final List<String> preFetch;
2224

2325
SrvdSessionParams({
2426
required this.sessionId,
@@ -33,7 +35,8 @@ class SrvdSessionParams {
3335
this.relayAuthMode = RelayAuthMode.payload,
3436
this.relayAuthAesKey,
3537
required this.only443,
36-
required this.sendNacks,
38+
required this.multipleAcksOk,
39+
required this.preFetch,
3740
});
3841

3942
@override
@@ -52,11 +55,13 @@ class SrvdSessionParams {
5255
'relayAuthMode': relayAuthMode.name,
5356
'relayAuthAesKey': relayAuthAesKey,
5457
'only443': only443,
55-
'sendNacks': sendNacks,
58+
'multipleAcksOk': multipleAcksOk,
59+
'preFetch': preFetch,
5660
};
5761
}
5862

5963
class SrvdUtil {
64+
static AtSignLogger logger = AtSignLogger(' SrvdUtil ');
6065
final AtClient atClient;
6166

6267
SrvdUtil(this.atClient);
@@ -79,7 +84,8 @@ class SrvdUtil {
7984
sessionId: notification.value!,
8085
atSignA: notification.from,
8186
only443: false,
82-
sendNacks: false,
87+
multipleAcksOk: false,
88+
preFetch: [],
8389
);
8490
}
8591

@@ -93,6 +99,7 @@ class SrvdUtil {
9399
Future<SrvdSessionParams> _sessionParamsForJsonRequest(
94100
AtNotification notification) async {
95101
dynamic json = jsonDecode(notification.value ?? '');
102+
logger.info('Received session request JSON: $json');
96103

97104
assertValidMapValue(json, 'sessionId', String);
98105
assertValidMapValue(json, 'atSignA', String);
@@ -135,7 +142,8 @@ class SrvdUtil {
135142
relayAuthMode: relayAuthMode,
136143
relayAuthAesKey: json['relayAuthAesKey'],
137144
only443: json['only443'] ?? false,
138-
sendNacks: json['sendNacks'] ?? false,
145+
multipleAcksOk: json['multipleAcksOk'] ?? false,
146+
preFetch: List<String>.from(json['preFetch'] ?? []),
139147
);
140148
}
141149

packages/dart/noports_core/lib/src/sshnp/sshnp_core.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ abstract class SshnpCore
161161
}
162162
await sshnpdChannel.sharePublicKeyIfRequired(identityKeyPair);
163163

164+
if (sshnpdChannel.cachedPingResponse != null) {
165+
srvdChannel.cachedDaemonPublicSigningKeyUri =
166+
sshnpdChannel.cachedPingResponse!['publicSigningKeyUri'];
167+
}
168+
164169
/// Retrieve the srvd host and port pair
165170
sendProgress('Fetching host and port from srvd');
166171
await srvdChannel.callInitialization();

packages/dart/noports_core/lib/src/sshnp/util/srvd_channel/notification_request_message.dart

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ class SocketRendezvousRequestMessage {
1212
final RelayAuthMode relayAuthMode;
1313
final String? relayAuthAesKey;
1414
final bool only443;
15-
final bool sendNacks;
15+
final bool multipleAcksOk;
16+
final List<String> preFetch;
1617

1718
SocketRendezvousRequestMessage({
1819
required this.sessionId,
@@ -24,7 +25,8 @@ class SocketRendezvousRequestMessage {
2425
required this.relayAuthMode,
2526
required this.relayAuthAesKey,
2627
required this.only443,
27-
required this.sendNacks,
28+
required this.multipleAcksOk,
29+
required this.preFetch,
2830
});
2931

3032
@override
@@ -39,7 +41,8 @@ class SocketRendezvousRequestMessage {
3941
m['relayAuthMode'] = relayAuthMode.name;
4042
m['relayAuthAesKey'] = relayAuthAesKey;
4143
m['only443'] = only443;
42-
m['sendNacks'] = sendNacks;
44+
m['multipleAcksOk'] = multipleAcksOk;
45+
m['preFetch'] = preFetch;
4346
return jsonEncode(m);
4447
}
4548
}

packages/dart/noports_core/lib/src/sshnp/util/srvd_channel/srvd_channel.dart

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ abstract class SrvdChannel<T>
3838
final String sessionId;
3939
final String clientNonce = DateTime.now().toIso8601String();
4040

41+
String? cachedDaemonPublicSigningKeyUri;
42+
4143
Completer acked = Completer();
4244

4345
bool fetched = false;
@@ -110,10 +112,12 @@ abstract class SrvdChannel<T>
110112

111113
@override
112114
Future<void> initialize() async {
113-
await publishPublicSigningKey();
115+
Future publishPSKFuture = publishPublicSigningKey();
114116

115117
await getHostAndPortFromSrvd();
116118

119+
await publishPSKFuture;
120+
117121
completeInitialization();
118122
}
119123

@@ -233,6 +237,16 @@ abstract class SrvdChannel<T>
233237
..namespaceAware = false
234238
..ttl = 10000);
235239

240+
List<String> preFetch = [];
241+
242+
// Currently prefetch is only needed if auth mode is ESCR
243+
if (params.relayAuthMode == RelayAuthMode.escr) {
244+
preFetch.add(publicSigningKeyUri);
245+
if (cachedDaemonPublicSigningKeyUri != null) {
246+
preFetch.add(cachedDaemonPublicSigningKeyUri!);
247+
}
248+
}
249+
236250
var message = SocketRendezvousRequestMessage(
237251
sessionId: sessionId,
238252
atSignA: params.clientAtSign,
@@ -243,7 +257,8 @@ abstract class SrvdChannel<T>
243257
relayAuthMode: params.relayAuthMode,
244258
relayAuthAesKey: relayAuthAesKey,
245259
only443: params.only443,
246-
sendNacks: true,
260+
multipleAcksOk: true,
261+
preFetch: preFetch,
247262
);
248263

249264
rvdRequestValue = message.toString();

0 commit comments

Comments
 (0)