Skip to content

feat: force cleanup at boot with SYNC_FORCE_REMOVE=true #956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Synchronization variable names start with `SYNC_`.
| SYNC\__S3_SECRET_KEY (\_Required_) | Access key secret for the S3 store credentials; |
| SYNC\__S3_PATH_STYLE (\_Optional_) | `true` or `false`, force path style if `true`. |
| SYNC\__S3_BUCKET (\_Required_) | The bucket to be used for the system (dedicated). |
| SYNC\__FORCE_REMOVE (\_Optional_) | `true` or `false`, Undeploy cached typegraphs at boot |

## Synchronized mode features

Expand Down
1 change: 1 addition & 0 deletions src/typegate/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export function transformSyncConfig(raw: SyncConfig): SyncConfigX {
redis,
s3,
s3Bucket: raw.s3_bucket,
forceRemove: raw.force_remove
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/typegate/src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ export const syncConfigSchema = z.object({
s3_access_key: refineEnvVar("SYNC_S3_ACCESS_KEY"),
s3_secret_key: refineEnvVar("SYNC_S3_SECRET_KEY"),
s3_path_style: zBooleanString.default(false),
force_remove: zBooleanString.default(false),
});
export type SyncConfig = z.infer<typeof syncConfigSchema>;
export type SyncConfigX = {
redis: RedisConnectOptions;
s3: S3ClientConfig;
s3Bucket: string;
forceRemove: boolean
};

export type TypegateConfig = {
Expand Down
1 change: 0 additions & 1 deletion src/typegate/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ try {
base: defaultTypegateConfigBase,
});
const typegate = await Typegate.init(config);

await SystemTypegraph.loadAll(typegate, !globalConfig.packaged);

const server = Deno.serve(
Expand Down
28 changes: 22 additions & 6 deletions src/typegate/src/sync/replicated_map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,43 @@ export class RedisReplicatedMap<T> implements AsyncDisposable {
this.redisObs.close();
}

async getAllHistory() {
const { key, redis } = this;
const all = await redis.hgetall(key);
const history = [];
for (let i = 0; i < all.length; i += 2) {
history.push({
name: all[i],
payload: all[i+1]
});
}

return history;
}

async historySync(): Promise<XIdInput> {
const { key, redis, deserializer } = this;
const { redis, deserializer } = this;

// get last received message before loading history
const [lastMessage] = await redis.xrevrange(this.ekey, "+", "-", 1);
const lastId = lastMessage ? lastMessage.xid : 0;
logger.debug("last message loaded: {}", lastId);

const all = await redis.hgetall(key);
const all = await this.getAllHistory();
logger.debug("history load start: {} elements", all.length);
for (let i = 0; i < all.length; i += 2) {
const name = all[i];
const payload = all[i + 1];

for (const { name, payload } of all) {
logger.info(`reloaded addition: ${name}`);
ensure(
!this.memory.has(name),
() => `typegraph ${name} should not exists in memory at first sync`,
);
this.memory.set(name, await deserializer(payload, true));

const engine = await deserializer(payload, true);
this.memory.set(name, engine);
}
logger.debug("history load end");

return lastId;
}

Expand Down
30 changes: 28 additions & 2 deletions src/typegate/src/typegate/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import type { ArtifactStore } from "./artifacts/mod.ts";
// TODO move from tests (MET-497)
import { MemoryRegister } from "./memory_register.ts";
import { NoLimiter } from "./no_limiter.ts";
import { TypegraphStore } from "../sync/typegraph.ts";
import { typegraphIdSchema, TypegraphStore } from "../sync/typegraph.ts";
import { createLocalArtifactStore } from "./artifacts/local.ts";
import { createSharedArtifactStore } from "./artifacts/shared.ts";
import { AsyncDisposableStack } from "dispose";
Expand Down Expand Up @@ -141,15 +141,25 @@ export class Typegate implements AsyncDisposable {
stack.move(),
);

const typegraphStore = TypegraphStore.init(syncConfig, cryptoKeys);
const register = await ReplicatedRegister.init(
typegate,
syncConfig.redis,
TypegraphStore.init(syncConfig, cryptoKeys),
typegraphStore
);
typegate.disposables.use(register);

(typegate as { register: Register }).register = register;


if (config.sync?.forceRemove) {
logger.warn("Force removal at boot enabled");
const history = await register.replicatedMap.getAllHistory();
for (const { name, payload } of history) {
await typegate.forceRemove(name, payload, typegraphStore);
}
}

const lastSync = await register.historySync().catch((err) => {
logger.error(err);
throw new Error(
Expand Down Expand Up @@ -397,6 +407,22 @@ export class Typegate implements AsyncDisposable {
await this.artifactStore.runArtifactGC();
}

async forceRemove(name: string, payload: string, typegraphStore: TypegraphStore) {
logger.warn(`Dropping "${name}": started`);
const typegraphId = typegraphIdSchema.parse(JSON.parse(payload));
const [tg] = await typegraphStore.download(
typegraphId,
);
const artifacts = new Set(
Object.values(tg.meta.artifacts).map((m) => m.hash),
);

await this.register.remove(name);
await this.artifactStore.updateRefCounts(new Set(), artifacts);
await this.artifactStore.runArtifactGC();
logger.warn(`Dropping "${name}": done`);
}

async initQueryEngine(
tgDS: TypeGraphDS,
secretManager: SecretManager,
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/register.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class ReplicatedRegister extends Register {
return new ReplicatedRegister(replicatedMap);
}

constructor(private replicatedMap: RedisReplicatedMap<QueryEngine>) {
constructor(public replicatedMap: RedisReplicatedMap<QueryEngine>) {
super();
}

Expand Down
Loading