Skip to content

Commit d386096

Browse files
committed
feat: hash-based distributed cache inval
1 parent eb7494c commit d386096

File tree

4 files changed

+73
-5
lines changed

4 files changed

+73
-5
lines changed

src/backend/src/filesystem/ll_operations/ll_write.js

+22-1
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ const { NodeUIDSelector } = require("../node/selectors");
2323
const { UploadProgressTracker } = require("../storage/UploadProgressTracker");
2424
const FSNodeContext = require("../FSNodeContext");
2525
const APIError = require("../../api/APIError");
26-
const { progress_stream, stuck_detector_stream } = require("../../util/streamutil");
26+
const { progress_stream, stuck_detector_stream, hashing_stream } = require("../../util/streamutil");
2727
const { OperationFrame } = require("../../services/OperationTraceService");
2828
const { Actor } = require("../../services/auth/Actor");
2929
const { DB_WRITE } = require("../../services/database/consts");
3030

31+
const crypto = require('crypto');
32+
3133
const STUCK_STATUS_TIMEOUT = 10 * 1000;
3234
const STUCK_ALARM_TIMEOUT = 20 * 1000;
3335

@@ -98,6 +100,25 @@ class LLWriteBase extends LLFilesystemOperation {
98100
file = { ...file, stream, };
99101
}
100102

103+
let hashPromise;
104+
if ( file.buffer ) {
105+
const hash = crypto.createHash('sha256');
106+
hash.update(file.buffer);
107+
hashPromise = Promise.resolve(hash.digest('hex'));
108+
} else {
109+
const hs = hashing_stream(file.stream);
110+
file.stream = hs.stream;
111+
hashPromise = hs.hashPromise;
112+
}
113+
114+
hashPromise.then(hash => {
115+
const svc_event = Context.get('services').get('event');
116+
console.log('\x1B[36;1m[fs.write]', uuid, hash);
117+
svc_event.emit('outer.fs.write-hash', {
118+
hash, uuid,
119+
});
120+
});
121+
101122
const state_upload = storage.create_upload();
102123

103124
try {

src/backend/src/modules/broadcast/BroadcastService.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class BroadcastService extends BaseService {
9595
JSON.stringify(data)
9696
);
9797
}
98-
98+
9999
meta.from_outside = true;
100100
const context = Context.get(undefined, { allow_fallback: true });
101101
context.arun(async () => {

src/backend/src/services/file-cache/FileCacheService.js

+23-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ const { pausing_tee } = require("../../util/streamutil");
2323
const putility = require("@heyputer/putility");
2424
const { EWMA } = require("../../util/opmath");
2525

26+
const crypto = require('crypto');
27+
2628
/**
2729
* FileCacheService
2830
*
@@ -62,13 +64,14 @@ class FileCacheService extends AdvancedBase {
6264

6365
this.log = services.get('log-service').create(this.constructor.name);
6466
this.errors = services.get('error-service').create(this.log);
67+
this.services = services;
6568

6669
this.disk_limit = my_config.disk_limit;
6770
this.disk_max_size = my_config.disk_max_size;
6871
this.precache_size = my_config.precache_size;
6972
this.path = my_config.path;
7073

71-
this.ttl = my_config.ttl || (5 * 1000);
74+
this.ttl = my_config.ttl || (60 * 1000);
7275

7376
this.precache = new Map();
7477
this.uid_to_tracker = new Map();
@@ -132,6 +135,17 @@ class FileCacheService extends AdvancedBase {
132135
const { fs } = this.modules;
133136
// Ensure storage path exists
134137
await fs.promises.mkdir(this.path, { recursive: true });
138+
139+
// Distributed cache invalidation
140+
const svc_event = this.services.get('event');
141+
svc_event.on('outer.fs.write-hash', async (_, { uuid, hash }) => {
142+
const tracker = this.uid_to_tracker.get(uuid);
143+
if ( ! tracker ) return;
144+
145+
if ( tracker.hash !== hash ) {
146+
await this.invalidate(uuid);
147+
}
148+
});
135149
}
136150

137151
_get_path (uid) {
@@ -262,13 +276,16 @@ class FileCacheService extends AdvancedBase {
262276

263277
(async () => {
264278
let offset = 0;
279+
const hash = crypto.createHash('sha256');
265280
for await (const chunk of store_stream) {
266281
chunk.copy(data, offset);
282+
hash.update(chunk);
267283
offset += chunk.length;
268284
}
269285

270286
await this._precache_make_room(size);
271287
this.precache.set(key, data);
288+
tracker.hash = hash.digest('hex');
272289
tracker.phase = FileTracker.PHASE_PRECACHE;
273290
tracker.p_ready.resolve();
274291
})()
@@ -288,8 +305,11 @@ class FileCacheService extends AdvancedBase {
288305
* the precache and disk storage, ensuring that any references to this file are cleaned up.
289306
* If the file is not found in the cache, the method does nothing.
290307
*/
291-
async invalidate (fsNode) {
292-
const key = await fsNode.get('uid');
308+
async invalidate (fsNode_or_uid) {
309+
const key = (typeof fsNode_or_uid === 'string')
310+
? fsNode_or_uid
311+
: await fsNode_or_uid.get('uid');
312+
293313
if ( ! this.uid_to_tracker.has(key) ) return;
294314
const tracker = this.uid_to_tracker.get(key);
295315
if ( tracker.phase === FileTracker.PHASE_PRECACHE ) {

src/backend/src/util/streamutil.js

+27
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
const { PassThrough, Readable, Transform } = require('stream');
2020
const { TeePromise } = require('@heyputer/putility').libs.promise;
21+
const crypto = require('crypto');
2122

2223
class StreamBuffer extends TeePromise {
2324
constructor () {
@@ -475,6 +476,31 @@ const buffer_to_stream = (buffer) => {
475476
return stream;
476477
};
477478

479+
const hashing_stream = (source) => {
480+
const hash = crypto.createHash('sha256');
481+
const stream = new Transform({
482+
transform(chunk, encoding, callback) {
483+
hash.update(chunk);
484+
this.push(chunk);
485+
callback();
486+
}
487+
});
488+
489+
source.pipe(stream);
490+
491+
const hashPromise = new Promise((resolve, reject) => {
492+
source.on('end', () => {
493+
resolve(hash.digest('hex'));
494+
});
495+
source.on('error', reject);
496+
});
497+
498+
return {
499+
stream,
500+
hashPromise,
501+
};
502+
};
503+
478504
module.exports = {
479505
StreamBuffer,
480506
stream_to_the_void,
@@ -488,4 +514,5 @@ module.exports = {
488514
chunk_stream,
489515
stream_to_buffer,
490516
buffer_to_stream,
517+
hashing_stream,
491518
};

0 commit comments

Comments
 (0)