@@ -21,12 +21,81 @@ const { Endpoint } = require("../util/expressutil");
21
21
const { UserActorType } = require ( "./auth/Actor" ) ;
22
22
const BaseService = require ( "./BaseService" ) ;
23
23
24
+ class KeyPairHelper extends AdvancedBase {
25
+ static MODULES = {
26
+ tweetnacl : require ( 'tweetnacl' ) ,
27
+ } ;
28
+
29
+ constructor ( {
30
+ kpublic,
31
+ ksecret,
32
+ } ) {
33
+ super ( ) ;
34
+ this . kpublic = kpublic ;
35
+ this . ksecret = ksecret ;
36
+ this . nonce_ = 0 ;
37
+ }
38
+
39
+ to_nacl_key_ ( key ) {
40
+ console . log ( 'WUT' , key ) ;
41
+ const full_buffer = Buffer . from ( key , 'base64' ) ;
42
+
43
+ // Remove version byte (assumed to be 0x31 and ignored for now)
44
+ const buffer = full_buffer . slice ( 1 ) ;
45
+
46
+ return new Uint8Array ( buffer ) ;
47
+ }
48
+
49
+ get naclSecret ( ) {
50
+ return this . naclSecret_ ?? (
51
+ this . naclSecret_ = this . to_nacl_key_ ( this . ksecret ) ) ;
52
+ }
53
+ get naclPublic ( ) {
54
+ return this . naclPublic_ ?? (
55
+ this . naclPublic_ = this . to_nacl_key_ ( this . kpublic ) ) ;
56
+ }
57
+
58
+ write ( text ) {
59
+ const require = this . require ;
60
+ const nacl = require ( 'tweetnacl' ) ;
61
+
62
+ const nonce = nacl . randomBytes ( nacl . box . nonceLength ) ;
63
+ const message = { } ;
64
+
65
+ const textUint8 = new Uint8Array ( Buffer . from ( text , 'utf-8' ) ) ;
66
+ const encryptedText = nacl . box (
67
+ textUint8 , nonce ,
68
+ this . naclPublic , this . naclSecret
69
+ ) ;
70
+ message . text = Buffer . from ( encryptedText ) ;
71
+ message . nonce = Buffer . from ( nonce ) ;
72
+
73
+ return message ;
74
+ }
75
+
76
+ read ( message ) {
77
+ const require = this . require ;
78
+ const nacl = require ( 'tweetnacl' ) ;
79
+
80
+ const arr = nacl . box . open (
81
+ new Uint8Array ( message . text ) ,
82
+ new Uint8Array ( message . nonce ) ,
83
+ this . naclPublic ,
84
+ this . naclSecret ,
85
+ ) ;
86
+
87
+ return Buffer . from ( arr ) . toString ( 'utf-8' ) ;
88
+ }
89
+ }
90
+
24
91
class Peer extends AdvancedBase {
92
+ static AUTHENTICATING = Symbol ( 'AUTHENTICATING' ) ;
25
93
static ONLINE = Symbol ( 'ONLINE' ) ;
26
94
static OFFLINE = Symbol ( 'OFFLINE' ) ;
27
95
28
96
static MODULES = {
29
97
sioclient : require ( 'socket.io-client' ) ,
98
+ crypto : require ( 'crypto' ) ,
30
99
} ;
31
100
32
101
constructor ( svc_broadcast , config ) {
@@ -38,7 +107,23 @@ class Peer extends AdvancedBase {
38
107
39
108
send ( data ) {
40
109
if ( ! this . socket ) return ;
41
- this . socket . send ( data )
110
+ const require = this . require ;
111
+ const crypto = require ( 'crypto' ) ;
112
+ const iv = crypto . randomBytes ( 16 ) ;
113
+ const cipher = crypto . createCipheriv (
114
+ 'aes-256-cbc' ,
115
+ this . aesKey ,
116
+ iv ,
117
+ ) ;
118
+ const jsonified = JSON . stringify ( data ) ;
119
+ let buffers = [ ] ;
120
+ buffers . push ( cipher . update ( Buffer . from ( jsonified , 'utf-8' ) ) ) ;
121
+ buffers . push ( cipher . final ( ) ) ;
122
+ const buffer = Buffer . concat ( buffers ) ;
123
+ this . socket . send ( {
124
+ iv,
125
+ message : buffer ,
126
+ } ) ;
42
127
}
43
128
44
129
get state ( ) {
@@ -66,6 +151,22 @@ class Peer extends AdvancedBase {
66
151
this . log . info ( `connected` , {
67
152
address : this . config . address
68
153
} ) ;
154
+
155
+ const require = this . require ;
156
+ const crypto = require ( 'crypto' ) ;
157
+ this . aesKey = crypto . randomBytes ( 32 ) ;
158
+
159
+ const kp_helper = new KeyPairHelper ( {
160
+ kpublic : this . config . key ,
161
+ ksecret : this . svc_broadcast . config . keys . secret ,
162
+ } ) ;
163
+ socket . send ( {
164
+ $ : 'take-my-key' ,
165
+ key : this . svc_broadcast . config . keys . public ,
166
+ message : kp_helper . write (
167
+ this . aesKey . toString ( 'base64' )
168
+ ) ,
169
+ } ) ;
69
170
} ) ;
70
171
socket . on ( 'disconnect' , ( ) => {
71
172
this . log . info ( `disconnected` , {
@@ -88,6 +189,89 @@ class Peer extends AdvancedBase {
88
189
}
89
190
}
90
191
192
+ class Connection extends AdvancedBase {
193
+ static MODULES = {
194
+ crypto : require ( 'crypto' ) ,
195
+ }
196
+
197
+ static AUTHENTICATING = {
198
+ on_message ( data ) {
199
+ if ( data . $ !== 'take-my-key' ) {
200
+ this . disconnect ( ) ;
201
+ return ;
202
+ }
203
+
204
+ const hasKey = this . svc_broadcast . trustedPublicKeys_ [ data . key ] ;
205
+ if ( ! hasKey ) {
206
+ this . disconnect ( ) ;
207
+ return ;
208
+ }
209
+
210
+ const is_trusted =
211
+ this . svc_broadcast . trustedPublicKeys_
212
+ . hasOwnProperty ( data . key )
213
+ if ( ! is_trusted ) {
214
+ this . disconnect ( ) ;
215
+ return ;
216
+ }
217
+
218
+ const kp_helper = new KeyPairHelper ( {
219
+ kpublic : data . key ,
220
+ ksecret : this . svc_broadcast . config . keys . secret ,
221
+ } ) ;
222
+
223
+ const message = kp_helper . read ( data . message ) ;
224
+ this . aesKey = Buffer . from ( message , 'base64' ) ;
225
+
226
+ this . state = this . constructor . ONLINE ;
227
+ }
228
+ }
229
+ static ONLINE = {
230
+ on_message ( data ) {
231
+ if ( ! this . on_message ) return ;
232
+
233
+ const require = this . require ;
234
+ const crypto = require ( 'crypto' ) ;
235
+ const decipher = crypto . createDecipheriv (
236
+ 'aes-256-cbc' ,
237
+ this . aesKey ,
238
+ data . iv ,
239
+ )
240
+ const buffers = [ ] ;
241
+ buffers . push ( decipher . update ( data . message ) ) ;
242
+ buffers . push ( decipher . final ( ) ) ;
243
+
244
+ const rawjson = Buffer . concat ( buffers ) . toString ( 'utf-8' ) ;
245
+
246
+ const output = JSON . parse ( rawjson ) ;
247
+
248
+ this . on_message ( output ) ;
249
+ }
250
+ }
251
+ static OFFLINE = {
252
+ on_message ( ) {
253
+ throw new Error ( 'unexpected message' ) ;
254
+ }
255
+ }
256
+
257
+ constructor ( svc_broadcast , socket ) {
258
+ super ( ) ;
259
+ this . state = this . constructor . AUTHENTICATING ;
260
+ this . svc_broadcast = svc_broadcast ;
261
+ this . log = this . svc_broadcast . log ;
262
+ this . socket = socket ;
263
+
264
+ socket . on ( 'message' , data => {
265
+ this . state . on_message . call ( this , data ) ;
266
+ } ) ;
267
+ }
268
+
269
+ disconnect ( ) {
270
+ this . socket . disconnect ( true ) ;
271
+ this . state = this . constructor . OFFLINE ;
272
+ }
273
+ }
274
+
91
275
class BroadcastService extends BaseService {
92
276
static MODULES = {
93
277
express : require ( 'express' ) ,
@@ -96,16 +280,21 @@ class BroadcastService extends BaseService {
96
280
97
281
_construct ( ) {
98
282
this . peers_ = [ ] ;
283
+ this . connections_ = [ ] ;
284
+ this . trustedPublicKeys_ = { } ;
99
285
}
100
286
101
287
async _init ( ) {
102
288
const peers = this . config . peers ?? [ ] ;
103
289
for ( const peer_config of peers ) {
290
+ this . trustedPublicKeys_ [ peer_config . key ] = true ;
104
291
const peer = new Peer ( this , peer_config ) ;
105
292
this . peers_ . push ( peer ) ;
106
293
peer . connect ( ) ;
107
294
}
108
295
296
+ this . _register_commands ( this . services . get ( 'commands' ) ) ;
297
+
109
298
const svc_event = this . services . get ( 'event' ) ;
110
299
svc_event . on ( 'outer.*' , this . on_event . bind ( this ) ) ;
111
300
}
@@ -131,22 +320,45 @@ class BroadcastService extends BaseService {
131
320
} ) ;
132
321
133
322
io . on ( 'connection' , async socket => {
134
- socket . on ( 'message' , ( { key, data, meta } ) => {
323
+ const conn = new Connection ( this , socket ) ;
324
+ this . connections_ . push ( conn ) ;
325
+
326
+ conn . on_message = ( { key, data, meta } ) => {
135
327
if ( meta . from_outside ) {
136
328
this . log . noticeme ( 'possible over-sending' ) ;
137
329
return ;
138
330
}
139
331
332
+ if ( key === 'test' ) {
333
+ this . log . noticeme ( `test message: ` +
334
+ JSON . stringify ( data )
335
+ ) ;
336
+ }
337
+
140
338
meta . from_outside = true ;
141
339
svc_event . emit ( key , data , meta ) ;
142
- } ) ;
340
+ } ;
143
341
} ) ;
144
342
145
343
146
344
this . log . noticeme (
147
345
require ( 'node:util' ) . inspect ( this . config )
148
346
) ;
149
347
}
348
+
349
+ _register_commands ( commands ) {
350
+ commands . registerCommands ( 'broadcast' , [
351
+ {
352
+ id : 'test' ,
353
+ description : 'send a test message' ,
354
+ handler : async ( args , ctx ) => {
355
+ this . on_event ( 'test' , {
356
+ contents : 'I am a test message' ,
357
+ } , { } )
358
+ }
359
+ }
360
+ ] )
361
+ }
150
362
}
151
363
152
364
module . exports = { BroadcastService } ;
0 commit comments