Skip to content

Commit 9ffd44f

Browse files
committed
chore: replace uploadUrls expiration queue with redis expiration set
1 parent 6d9d918 commit 9ffd44f

File tree

2 files changed

+62
-44
lines changed

2 files changed

+62
-44
lines changed

typegate/src/typegate/artifacts/shared.ts

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,49 @@
11
// Copyright Metatype OÜ, licensed under the Elastic License 2.0.
22
// SPDX-License-Identifier: Elastic-2.0
33

4-
import { connect, RedisConnectOptions } from "redis";
4+
import { connect, Redis, RedisConnectOptions } from "redis";
55
import { createHash } from "node:crypto";
66
import { S3 } from "aws-sdk/client-s3";
77
import { getLocalPath, STORE_DIR, STORE_TEMP_DIR } from "./mod.ts";
88
import { ArtifactMeta, ArtifactStore } from "./mod.ts";
99
import { HashTransformStream } from "../../utils/hash.ts";
1010
import { resolve } from "std/path/resolve.ts";
1111
import { SyncConfig } from "../../sync/config.ts";
12-
import { Redis } from "redis";
13-
import * as jwt from "jwt";
1412
import { readAll } from "https://deno.land/[email protected]/streams/conversion.ts";
1513

1614
export interface RemoteUploadUrlStore {
1715
redisClient: Redis;
18-
expirationQueueKey: string;
19-
expirationTimerId: number;
2016
}
2117

22-
interface ExpirationQueueValue {
23-
url: string;
24-
expirationTime: number;
18+
const setCmd = `
19+
local key = KEYS[1]
20+
local value = ARGV[1]
21+
local expirationTime = ARGV[2]
22+
23+
redis.call('HSET', key, 'url', value)
24+
redis.call('EXPIRE', key, expirationTime)
25+
`.trim();
26+
const existsCmd = `
27+
local key = KEYS[1]
28+
29+
local exists = redis.call('EXISTS', key)
30+
return exists
31+
`.trim();
32+
33+
function resolveRedisUrlKey(url: string) {
34+
return `articact-upload-urls:${url}`;
2535
}
2636

2737
async function initRemoteUploadUrlStore(
2838
redisConfig: RedisConnectOptions,
2939
): Promise<RemoteUploadUrlStore> {
3040
const redisClient = await connect(redisConfig);
31-
const expirationQueueKey = "queue:expiration";
32-
const expirationTimerId = setInterval(async () => {
33-
const now = jwt.getNumericDate(new Date());
34-
while (await redisClient.llen(expirationQueueKey) > 0) {
35-
const firstInQueue = await redisClient.lindex("expirationQueueKey", 0);
36-
if (!firstInQueue) {
37-
throw new Error("Expiration queue is empty but should not be.");
38-
}
39-
const { url, expirationTime } = deserializeToCustom<ExpirationQueueValue>(
40-
firstInQueue,
41-
);
42-
if (expirationTime > now) {
43-
break;
44-
}
45-
redisClient.lpop(expirationQueueKey);
46-
redisClient.del(url);
47-
}
48-
}, 5000);
4941

50-
return { redisClient, expirationQueueKey, expirationTimerId };
42+
return { redisClient };
5143
}
5244

53-
function deinitRemoteUploadUrlStore(uploadUrls: RemoteUploadUrlStore) {
54-
uploadUrls.redisClient.quit();
55-
clearInterval(uploadUrls.expirationTimerId);
45+
async function deinitRemoteUploadUrlStore(uploadUrls: RemoteUploadUrlStore) {
46+
await uploadUrls.redisClient.quit();
5647
}
5748

5849
function serializeToRedisValue<T>(value: T): string {
@@ -200,15 +191,7 @@ export class SharedArtifactStore extends ArtifactStore {
200191
origin,
201192
meta.typegraphName,
202193
);
203-
this.#uploadUrls.redisClient.set(
204-
url,
205-
serializeToRedisValue<ArtifactMeta>(meta),
206-
);
207-
const queueValue: ExpirationQueueValue = { url, expirationTime };
208-
await this.#uploadUrls.redisClient.rpush(
209-
this.#uploadUrls.expirationQueueKey,
210-
serializeToRedisValue(queueValue),
211-
);
194+
await this.#addUrlToRedis(url, serializeToRedisValue(meta), expirationTime);
212195

213196
return url;
214197
}
@@ -220,17 +203,51 @@ export class SharedArtifactStore extends ArtifactStore {
220203
> {
221204
ArtifactStore.validateUploadUrl(url);
222205

223-
const meta = await this.#uploadUrls.redisClient.get(url.toString());
206+
const meta = await this.#getUrlFromRedis(url.toString());
224207
if (!meta) {
225208
throw new Error("Invalid upload URL");
226209
}
227210

228-
this.#uploadUrls.redisClient.del(url.toString());
229-
return Promise.resolve(deserializeToCustom<ArtifactMeta>(meta));
211+
await this.#removeFromRedis(url.toString());
212+
return Promise.resolve(deserializeToCustom<ArtifactMeta>(meta as string));
213+
}
214+
215+
async #addUrlToRedis(url: string, value: string, expirationDuration: number) {
216+
await this.#uploadUrls.redisClient.eval(
217+
setCmd,
218+
[resolveRedisUrlKey(url)],
219+
[value, expirationDuration],
220+
);
221+
}
222+
223+
async #getUrlFromRedis(url: string) {
224+
return await this.#uploadUrls.redisClient.eval(
225+
"return redis.call('HGET', KEYS[1], ARGV[1])",
226+
[resolveRedisUrlKey(url)],
227+
["url"],
228+
);
229+
}
230+
231+
async #existsInRedis(url: string) {
232+
return Boolean(
233+
await this.#uploadUrls.redisClient.eval(
234+
existsCmd,
235+
[resolveRedisUrlKey(url)],
236+
[],
237+
),
238+
);
239+
}
240+
241+
async #removeFromRedis(url: string) {
242+
await this.#uploadUrls.redisClient.eval(
243+
"redis.call('DEL', KEYS[1])",
244+
[resolveRedisUrlKey(url)],
245+
[],
246+
);
230247
}
231248

232-
override close(): Promise<void> {
233-
deinitRemoteUploadUrlStore(this.#uploadUrls);
249+
override async close(): Promise<void> {
250+
await deinitRemoteUploadUrlStore(this.#uploadUrls);
234251
return Promise.resolve(void null);
235252
}
236253
}

typegate/tests/utils/test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ export const test = ((name, fn, opts = {}): void => {
289289
if (opts.setup != null) {
290290
await opts.setup();
291291
}
292+
// TODO: load balancing for multiple typegate instances
292293
const typegate = await Typegate.init(opts.syncConfig);
293294
const {
294295
systemTypegraphs = false,

0 commit comments

Comments
 (0)