Skip to content

Commit 0e3404b

Browse files
committed
fix: cleanup before history sync
1 parent 74fbd47 commit 0e3404b

File tree

4 files changed

+49
-18
lines changed

4 files changed

+49
-18
lines changed

src/typegate/src/main.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,6 @@ try {
5353
base: defaultTypegateConfigBase,
5454
});
5555
const typegate = await Typegate.init(config);
56-
if (config.sync?.forceRemove) {
57-
logger.warn("Force removal of typegraphs at boot");
58-
await typegate.forceRemoveAllTypegraphs();
59-
}
60-
6156
await SystemTypegraph.loadAll(typegate, !globalConfig.packaged);
6257

6358
const server = Deno.serve(

src/typegate/src/sync/replicated_map.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,27 +105,43 @@ export class RedisReplicatedMap<T> implements AsyncDisposable {
105105
this.redisObs.close();
106106
}
107107

108+
async getAllHistory() {
109+
const { key, redis } = this;
110+
const all = await redis.hgetall(key);
111+
const history = [];
112+
for (let i = 0; i < all.length; i += 2) {
113+
history.push({
114+
name: all[i],
115+
payload: all[i+1]
116+
});
117+
}
118+
119+
return history;
120+
}
121+
108122
async historySync(): Promise<XIdInput> {
109-
const { key, redis, deserializer } = this;
123+
const { redis, deserializer } = this;
110124

111125
// get last received message before loading history
112126
const [lastMessage] = await redis.xrevrange(this.ekey, "+", "-", 1);
113127
const lastId = lastMessage ? lastMessage.xid : 0;
114128
logger.debug("last message loaded: {}", lastId);
115129

116-
const all = await redis.hgetall(key);
130+
const all = await this.getAllHistory();
117131
logger.debug("history load start: {} elements", all.length);
118-
for (let i = 0; i < all.length; i += 2) {
119-
const name = all[i];
120-
const payload = all[i + 1];
132+
133+
for (const { name, payload } of all) {
121134
logger.info(`reloaded addition: ${name}`);
122135
ensure(
123136
!this.memory.has(name),
124137
() => `typegraph ${name} should not exists in memory at first sync`,
125138
);
126-
this.memory.set(name, await deserializer(payload, true));
139+
140+
const engine = await deserializer(payload, true);
141+
this.memory.set(name, engine);
127142
}
128143
logger.debug("history load end");
144+
129145
return lastId;
130146
}
131147

src/typegate/src/typegate/mod.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import type { ArtifactStore } from "./artifacts/mod.ts";
4343
// TODO move from tests (MET-497)
4444
import { MemoryRegister } from "./memory_register.ts";
4545
import { NoLimiter } from "./no_limiter.ts";
46-
import { TypegraphStore } from "../sync/typegraph.ts";
46+
import { typegraphIdSchema, TypegraphStore } from "../sync/typegraph.ts";
4747
import { createLocalArtifactStore } from "./artifacts/local.ts";
4848
import { createSharedArtifactStore } from "./artifacts/shared.ts";
4949
import { AsyncDisposableStack } from "dispose";
@@ -141,15 +141,25 @@ export class Typegate implements AsyncDisposable {
141141
stack.move(),
142142
);
143143

144+
const typegraphStore = TypegraphStore.init(syncConfig, cryptoKeys);
144145
const register = await ReplicatedRegister.init(
145146
typegate,
146147
syncConfig.redis,
147-
TypegraphStore.init(syncConfig, cryptoKeys),
148+
typegraphStore
148149
);
149150
typegate.disposables.use(register);
150151

151152
(typegate as { register: Register }).register = register;
152153

154+
155+
if (config.sync?.forceRemove) {
156+
logger.warn("Force removal at boot enabled");
157+
const history = await register.replicatedMap.getAllHistory();
158+
for (const { name, payload } of history) {
159+
await typegate.forceRemove(name, payload, typegraphStore);
160+
}
161+
}
162+
153163
const lastSync = await register.historySync().catch((err) => {
154164
logger.error(err);
155165
throw new Error(
@@ -397,10 +407,20 @@ export class Typegate implements AsyncDisposable {
397407
await this.artifactStore.runArtifactGC();
398408
}
399409

400-
async forceRemoveAllTypegraphs() {
401-
for (const engine of this.register.list()) {
402-
await this.removeTypegraph(engine.name);
403-
}
410+
async forceRemove(name: string, payload: string, typegraphStore: TypegraphStore) {
411+
logger.warn(`Dropping "${name}": started`);
412+
const typegraphId = typegraphIdSchema.parse(JSON.parse(payload));
413+
const [tg] = await typegraphStore.download(
414+
typegraphId,
415+
);
416+
const artifacts = new Set(
417+
Object.values(tg.meta.artifacts).map((m) => m.hash),
418+
);
419+
420+
await this.register.remove(name);
421+
await this.artifactStore.updateRefCounts(new Set(), artifacts);
422+
await this.artifactStore.runArtifactGC();
423+
logger.warn(`Dropping "${name}": done`);
404424
}
405425

406426
async initQueryEngine(

src/typegate/src/typegate/register.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export class ReplicatedRegister extends Register {
8686
return new ReplicatedRegister(replicatedMap);
8787
}
8888

89-
constructor(private replicatedMap: RedisReplicatedMap<QueryEngine>) {
89+
constructor(public replicatedMap: RedisReplicatedMap<QueryEngine>) {
9090
super();
9191
}
9292

0 commit comments

Comments
 (0)