Skip to content

Commit 0f699ee

Browse files
feat: force cleanup at boot with SYNC_FORCE_REMOVE=true (#956)
#### Migration notes None - [ ] The change comes with new or modified tests - [ ] Hard-to-understand functions have explanatory comments - [ ] End-user documentation is updated to reflect the change <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added optional `SYNC_FORCE_REMOVE` configuration variable for typegate synchronization. - Introduced ability to forcefully remove cached typegraphs at boot. - Added a new method to retrieve all history entries from the Redis replicated map. - Introduced a new function to return a greeting based on a provided name. - Added a synchronization feature test suite for validating cleanup logic. - **Documentation** - Updated documentation to reflect new synchronization configuration option. - **Improvements** - Enhanced the `Typegate` class with a method to facilitate bulk removal of typegraphs during initialization. - Made the `replicatedMap` parameter publicly accessible in the `ReplicatedRegister` class constructor. - Updated configuration retrieval to include the new `forceRemove` property. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Yohe-Am <[email protected]>
1 parent 0f2c8fa commit 0f699ee

File tree

12 files changed

+203
-10
lines changed

12 files changed

+203
-10
lines changed

docs/metatype.dev/docs/reference/typegate/synchronization/index.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ Synchronization variable names start with `SYNC_`.
6969
| SYNC\__S3_SECRET_KEY (\_Required_) | Access key secret for the S3 store credentials; |
7070
| SYNC\__S3_PATH_STYLE (\_Optional_) | `true` or `false`, force path style if `true`. |
7171
| SYNC\__S3_BUCKET (\_Required_) | The bucket to be used for the system (dedicated). |
72+
| SYNC\__FORCE_REMOVE (\_Optional_) | `true` or `false`, Undeploy cached typegraphs at boot |
7273

7374
## Synchronized mode features
7475

src/typegate/src/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ export function transformSyncConfig(raw: SyncConfig): SyncConfigX {
117117
redis,
118118
s3,
119119
s3Bucket: raw.s3_bucket,
120+
forceRemove: raw.force_remove
120121
};
121122
}
122123

src/typegate/src/config/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,14 @@ export const syncConfigSchema = z.object({
115115
s3_access_key: refineEnvVar("SYNC_S3_ACCESS_KEY"),
116116
s3_secret_key: refineEnvVar("SYNC_S3_SECRET_KEY"),
117117
s3_path_style: zBooleanString.default(false),
118+
force_remove: zBooleanString.default(false),
118119
});
119120
export type SyncConfig = z.infer<typeof syncConfigSchema>;
120121
export type SyncConfigX = {
121122
redis: RedisConnectOptions;
122123
s3: S3ClientConfig;
123124
s3Bucket: string;
125+
forceRemove?: boolean
124126
};
125127

126128
export type TypegateConfig = {

src/typegate/src/main.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ try {
5353
base: defaultTypegateConfigBase,
5454
});
5555
const typegate = await Typegate.init(config);
56-
5756
await SystemTypegraph.loadAll(typegate, !globalConfig.packaged);
5857

5958
const server = Deno.serve(

src/typegate/src/sync/replicated_map.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,27 +105,43 @@ export class RedisReplicatedMap<T> implements AsyncDisposable {
105105
this.redisObs.close();
106106
}
107107

108+
async getAllHistory() {
109+
const { key, redis } = this;
110+
const all = await redis.hgetall(key);
111+
const history = [];
112+
for (let i = 0; i < all.length; i += 2) {
113+
history.push({
114+
name: all[i],
115+
payload: all[i+1]
116+
});
117+
}
118+
119+
return history;
120+
}
121+
108122
async historySync(): Promise<XIdInput> {
109-
const { key, redis, deserializer } = this;
123+
const { redis, deserializer } = this;
110124

111125
// get last received message before loading history
112126
const [lastMessage] = await redis.xrevrange(this.ekey, "+", "-", 1);
113127
const lastId = lastMessage ? lastMessage.xid : 0;
114128
logger.debug("last message loaded: {}", lastId);
115129

116-
const all = await redis.hgetall(key);
130+
const all = await this.getAllHistory();
117131
logger.debug("history load start: {} elements", all.length);
118-
for (let i = 0; i < all.length; i += 2) {
119-
const name = all[i];
120-
const payload = all[i + 1];
132+
133+
for (const { name, payload } of all) {
121134
logger.info(`reloaded addition: ${name}`);
122135
ensure(
123136
!this.memory.has(name),
124137
() => `typegraph ${name} should not exists in memory at first sync`,
125138
);
126-
this.memory.set(name, await deserializer(payload, true));
139+
140+
const engine = await deserializer(payload, true);
141+
this.memory.set(name, engine);
127142
}
128143
logger.debug("history load end");
144+
129145
return lastId;
130146
}
131147

src/typegate/src/typegate/mod.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import type { ArtifactStore } from "./artifacts/mod.ts";
4343
// TODO move from tests (MET-497)
4444
import { MemoryRegister } from "./memory_register.ts";
4545
import { NoLimiter } from "./no_limiter.ts";
46-
import { TypegraphStore } from "../sync/typegraph.ts";
46+
import { typegraphIdSchema, TypegraphStore } from "../sync/typegraph.ts";
4747
import { createLocalArtifactStore } from "./artifacts/local.ts";
4848
import { createSharedArtifactStore } from "./artifacts/shared.ts";
4949
import { AsyncDisposableStack } from "dispose";
@@ -141,15 +141,30 @@ export class Typegate implements AsyncDisposable {
141141
stack.move(),
142142
);
143143

144+
const typegraphStore = TypegraphStore.init(syncConfig, cryptoKeys);
144145
const register = await ReplicatedRegister.init(
145146
typegate,
146147
syncConfig.redis,
147-
TypegraphStore.init(syncConfig, cryptoKeys),
148+
typegraphStore
148149
);
149150
typegate.disposables.use(register);
150151

151152
(typegate as { register: Register }).register = register;
152153

154+
155+
if (config.sync?.forceRemove) {
156+
logger.warn("Force removal at boot enabled");
157+
const history = await register.replicatedMap.getAllHistory();
158+
for (const { name, payload } of history) {
159+
try {
160+
await typegate.forceRemove(name, payload, typegraphStore);
161+
} catch (e) {
162+
logger.error(`Failed to force remove typegraph "${name}": ${e}`);
163+
Sentry.captureException(e);
164+
}
165+
}
166+
}
167+
153168
const lastSync = await register.historySync().catch((err) => {
154169
logger.error(err);
155170
throw new Error(
@@ -397,6 +412,22 @@ export class Typegate implements AsyncDisposable {
397412
await this.artifactStore.runArtifactGC();
398413
}
399414

415+
async forceRemove(name: string, payload: string, typegraphStore: TypegraphStore) {
416+
logger.warn(`Dropping "${name}": started`);
417+
const typegraphId = typegraphIdSchema.parse(JSON.parse(payload));
418+
const [tg] = await typegraphStore.download(
419+
typegraphId,
420+
);
421+
const artifacts = new Set(
422+
Object.values(tg.meta.artifacts).map((m) => m.hash),
423+
);
424+
425+
await this.register.remove(name);
426+
await this.artifactStore.updateRefCounts(new Set(), artifacts);
427+
await this.artifactStore.runArtifactGC();
428+
logger.warn(`Dropping "${name}": done`);
429+
}
430+
400431
async initQueryEngine(
401432
tgDS: TypeGraphDS,
402433
secretManager: SecretManager,

src/typegate/src/typegate/register.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export class ReplicatedRegister extends Register {
8686
return new ReplicatedRegister(replicatedMap);
8787
}
8888

89-
constructor(private replicatedMap: RedisReplicatedMap<QueryEngine>) {
89+
constructor(public replicatedMap: RedisReplicatedMap<QueryEngine>) {
9090
super();
9191
}
9292

tests/e2e/published/published_test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const syncConfig = transformSyncConfig({
4444
s3_secret_key: syncEnvs.SYNC_S3_SECRET_KEY,
4545
s3_bucket: syncEnvs.SYNC_S3_BUCKET,
4646
s3_path_style: true,
47+
force_remove: false
4748
});
4849

4950
// put here typegraphs that are to be excluded

tests/sync/scripts/hello.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
export function hello({ name }: { name: string }) {
5+
return `Hello ${name}`;
6+
}

tests/sync/sync.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
2+
# SPDX-License-Identifier: MPL-2.0
3+
4+
from typegraph import t, typegraph, Policy, Graph
5+
from typegraph.runtimes.deno import DenoRuntime
6+
7+
8+
@typegraph()
9+
def sync(g: Graph):
10+
deno = DenoRuntime()
11+
public = Policy.public()
12+
13+
g.expose(
14+
hello=deno.import_(
15+
t.struct({"name": t.string()}),
16+
t.string(),
17+
name="hello",
18+
module="scripts/hello.ts",
19+
secrets=["ULTRA_SECRET"],
20+
).with_policy(public)
21+
)

tests/sync/sync_config_test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ Deno.test("test sync config", async (t) => {
4242
Deno.env.set("SYNC_S3_BUCKET", "bucket");
4343

4444
assertEquals(getSyncConfig(), {
45+
forceRemove: false,
4546
redis: {
4647
hostname: "localhost",
4748
port: "6379",

tests/sync/sync_force_remove_test.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
import { gql, Meta } from "test-utils/mod.ts";
5+
import { connect } from "redis";
6+
import { S3Client } from "aws-sdk/client-s3";
7+
import { createBucket, listObjects, tryDeleteBucket } from "test-utils/s3.ts";
8+
import { assertEquals } from "@std/assert";
9+
import { clearSyncData, setupSync } from "test-utils/hooks.ts";
10+
import { Typegate } from "@metatype/typegate/typegate/mod.ts";
11+
import {
12+
defaultTypegateConfigBase,
13+
getTypegateConfig,
14+
SyncConfig,
15+
} from "@metatype/typegate/config.ts";
16+
17+
const redisKey = "typegraph";
18+
const redisEventKey = "typegraph_event";
19+
20+
async function cleanUp(config: typeof syncConfig) {
21+
using redis = await connect(config.redis);
22+
await redis.del(redisKey);
23+
await redis.del(redisEventKey);
24+
25+
const s3 = new S3Client(config.s3);
26+
await tryDeleteBucket(s3, config.s3Bucket);
27+
await createBucket(s3, config.s3Bucket);
28+
s3.destroy();
29+
await redis.quit();
30+
}
31+
32+
const syncConfig = {
33+
redis: {
34+
hostname: "localhost",
35+
port: 6379,
36+
password: "password",
37+
db: 1,
38+
},
39+
s3: {
40+
endpoint: "http://localhost:9000",
41+
region: "local",
42+
credentials: {
43+
accessKeyId: "minio",
44+
secretAccessKey: "password",
45+
},
46+
forcePathStyle: true,
47+
},
48+
s3Bucket: "metatype-deno-runtime-sync-test",
49+
};
50+
51+
async function spawnGate(syncConfig: SyncConfig) {
52+
const config = getTypegateConfig({
53+
base: {
54+
...defaultTypegateConfigBase,
55+
},
56+
});
57+
58+
return await Typegate.init({
59+
...config,
60+
sync: syncConfig,
61+
});
62+
}
63+
64+
Meta.test(
65+
{
66+
name: "Force cleanup at boot on sync mode",
67+
syncConfig,
68+
async setup() {
69+
await clearSyncData(syncConfig);
70+
await setupSync(syncConfig);
71+
},
72+
async teardown() {
73+
await cleanUp(syncConfig);
74+
},
75+
},
76+
async (t) => {
77+
await t.should(
78+
"cleanup if forceRemove is true",
79+
async () => {
80+
const _engine = await t.engine("sync/sync.py", {
81+
secrets: {
82+
ULTRA_SECRET:
83+
"if_you_can_read_me_on_an_ERROR_there_is_a_bug",
84+
},
85+
});
86+
87+
const s3 = new S3Client(syncConfig.s3);
88+
const initialObjects = await listObjects(s3, syncConfig.s3Bucket);
89+
assertEquals(initialObjects?.length, 3);
90+
91+
const gateNoRemove = await spawnGate(syncConfig);
92+
const namesNoRemove = gateNoRemove.register.list().map(({ name }) =>
93+
name
94+
);
95+
96+
const gateAfterRemove = await spawnGate({
97+
...syncConfig,
98+
forceRemove: true,
99+
});
100+
const namesAfterRemove = gateAfterRemove.register.list().map((
101+
{ name },
102+
) => name);
103+
104+
t.addCleanup(async () => {
105+
await gateNoRemove[Symbol.asyncDispose]();
106+
await gateAfterRemove[Symbol.asyncDispose]();
107+
});
108+
109+
assertEquals(namesNoRemove, ["sync"]);
110+
assertEquals(namesAfterRemove, []); // !
111+
},
112+
);
113+
},
114+
);

0 commit comments

Comments
 (0)