Skip to content

Commit 042622c

Browse files
committed
Merge branch 'main' into chore/update-ghjk
2 parents 87e6479 + b8b8820 commit 042622c

File tree

9 files changed

+455
-26
lines changed

9 files changed

+455
-26
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Requests
2+
3+
## Idempotency
4+
5+
An idempotent request is a request that always returns the same result when retried with the same input.
6+
This is useful to prevent duplicate operations, such as charging a customer twice if a network timeout or retry occurs.
7+
8+
You can enforce idempotency on any request to a typegraph, you just have to set the `Idempotency-Key` header with a unique value (such as a UUID).
9+
10+
```curl
11+
curl https://localhost:7890/awesome_typegraph \
12+
-X POST \
13+
-H "Authorization: Bearer ...." \
14+
-H "Content-Type: application/json" \
15+
-H "Idempotency-Key: 123abc" \
16+
-d '{
17+
"query": "mutation CreatePost($title: String!) { createPost(title: $title) { id title } }",
18+
"variables": { "title": "My First Post" }
19+
}'
20+
```
21+
22+
:::info Notes
23+
24+
- Keys expire happens after 24 hours.
25+
- A key must match the original request exactly during its lifetime.
26+
- Reusing the same key for different requests will result in error status 422.
27+
- An empty string explicitly disables idempotency, it is a no-op.
28+
- The maximum key length is 255 characters.
29+
:::

src/typegate/src/services/graphql_service.ts

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,22 @@ import {
1313
findOperation,
1414
type FragmentDefs,
1515
} from "../transports/graphql/graphql.ts";
16-
import { forceAnyToOption } from "../utils.ts";
16+
import {
17+
type CachedResponse,
18+
computeRequestSignature,
19+
forceAnyToOption,
20+
toResponse,
21+
toSerializableResponse,
22+
} from "../utils.ts";
1723
import type { QueryEngine } from "../engine/query_engine.ts";
1824
import type * as ast from "graphql/ast";
1925
import { BadContext, ResolverError } from "../errors.ts";
2026
import { badRequest, jsonError, jsonOk } from "./responses.ts";
2127
import { BaseError, ErrorKind } from "../errors.ts";
28+
import type { Register } from "@metatype/typegate/typegate/register.ts";
2229

2330
const logger = getLogger(import.meta);
31+
const IDEMPOTENCY_HEADER = "Idempotency-Key";
2432

2533
class InvalidQuery extends BaseError {
2634
constructor(message: string) {
@@ -41,23 +49,14 @@ export function isIntrospectionQuery(
4149
return operation.name?.value === "IntrospectionQuery";
4250
}
4351

44-
export async function handleGraphQL(
45-
request: Request,
52+
export async function handleGraphQLHelper(
53+
content: Operations,
4654
engine: QueryEngine,
4755
context: Context,
4856
info: Info,
4957
limit: RateLimit | null,
5058
headers: Headers,
5159
): Promise<Response> {
52-
let content: Operations | null = null;
53-
try {
54-
content = await parseRequest(request);
55-
} catch (err: any) {
56-
if (err instanceof BaseError) {
57-
return err.toResponse(headers);
58-
}
59-
return badRequest(err.message);
60-
}
6160
const { query, operationName: operationNameRaw, variables } = content;
6261
const operationName = forceAnyToOption(operationNameRaw);
6362

@@ -145,3 +144,93 @@ export async function handleGraphQL(
145144
}
146145
}
147146
}
147+
148+
export async function handleGraphQL(
149+
register: Register,
150+
request: Request,
151+
engine: QueryEngine,
152+
context: Context,
153+
info: Info,
154+
limit: RateLimit | null,
155+
headers: Headers,
156+
): Promise<Response> {
157+
const key = request.headers.get(IDEMPOTENCY_HEADER);
158+
let content: Operations | null = null;
159+
try {
160+
content = await parseRequest(key ? request.clone() : request);
161+
} catch (err: any) {
162+
if (err instanceof BaseError) {
163+
return err.toResponse(headers);
164+
}
165+
return badRequest(err.message);
166+
}
167+
168+
if (key) {
169+
if (key.length > 255) {
170+
return jsonError({
171+
status: 422,
172+
message:
173+
`'${IDEMPOTENCY_HEADER}' value should not exceed 255 characters`,
174+
headers,
175+
});
176+
}
177+
178+
const userRequestHash = await computeRequestSignature(request, [
179+
IDEMPOTENCY_HEADER,
180+
]);
181+
const now = Date.now();
182+
const memoized = await register.getResponse(key);
183+
184+
if (memoized) {
185+
const { response, expiryMillis, requestHash: savedHash } = memoized;
186+
187+
if (now < expiryMillis) {
188+
if (userRequestHash != savedHash) {
189+
return jsonError({
190+
status: 422,
191+
message:
192+
`The request associated with key "${key}" has changed. Please use a new key or ensure the request matches the original.`,
193+
headers,
194+
});
195+
}
196+
197+
logger.debug(`Idempotent request key "${key}" replayed`);
198+
return toResponse(response);
199+
} else {
200+
await register.deleteResponse(key);
201+
}
202+
}
203+
204+
const response = await handleGraphQLHelper(
205+
content,
206+
engine,
207+
context,
208+
info,
209+
limit,
210+
headers,
211+
);
212+
213+
const oneDay = 24 * 3600 * 1000;
214+
const expiryMillis = now + oneDay;
215+
await register.addResponse(
216+
key,
217+
{
218+
response: await toSerializableResponse(response.clone()),
219+
expiryMillis,
220+
requestHash: userRequestHash,
221+
} satisfies CachedResponse,
222+
);
223+
224+
logger.warn(`Idempotent request key "${key}" renewed`);
225+
return response;
226+
}
227+
228+
return await handleGraphQLHelper(
229+
content,
230+
engine,
231+
context,
232+
info,
233+
limit,
234+
headers,
235+
);
236+
}

src/typegate/src/typegate/memory_register.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33

44
import type { QueryEngine } from "../engine/query_engine.ts";
55
import { Register } from "../typegate/register.ts";
6+
import type { CachedResponse } from "@metatype/typegate/utils.ts";
67

78
export class MemoryRegister extends Register {
89
private map = new Map<string, QueryEngine>();
10+
private responseMap = new Map<string, CachedResponse>();
911

1012
constructor() {
1113
super();
@@ -18,6 +20,7 @@ export class MemoryRegister extends Register {
1820
),
1921
);
2022
this.map.clear();
23+
this.responseMap.clear();
2124
}
2225

2326
async add(engine: QueryEngine): Promise<void> {
@@ -47,4 +50,18 @@ export class MemoryRegister extends Register {
4750
has(name: string): boolean {
4851
return this.map.has(name);
4952
}
53+
54+
addResponse(key: string, response: CachedResponse): Promise<void> {
55+
this.responseMap.set(key, response);
56+
return Promise.resolve();
57+
}
58+
59+
deleteResponse(key: string): Promise<void> {
60+
this.responseMap.delete(key);
61+
return Promise.resolve();
62+
}
63+
64+
getResponse(key: string): CachedResponse | undefined {
65+
return this.responseMap.get(key);
66+
}
5067
}

src/typegate/src/typegate/mod.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,16 @@ export class Typegate implements AsyncDisposable {
174174
});
175175
register.startSync(lastSync);
176176

177+
const lastSyncResponses = await register.historySyncResponses().catch(
178+
(err) => {
179+
logger.error(err);
180+
throw new Error(
181+
`failed to load response history at boot, aborting: ${err.message}`,
182+
);
183+
},
184+
);
185+
register.startSyncResponses(lastSyncResponses);
186+
177187
return typegate;
178188
}
179189
}
@@ -326,7 +336,15 @@ export class Typegate implements AsyncDisposable {
326336
return methodNotAllowed();
327337
}
328338

329-
return handleGraphQL(request, engine, context, info, limit, headers);
339+
return handleGraphQL(
340+
this.register,
341+
request,
342+
engine,
343+
context,
344+
info,
345+
limit,
346+
headers,
347+
);
330348
} catch (e) {
331349
Sentry.captureException(e);
332350
console.error(e);

src/typegate/src/typegate/register.ts

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import {
1111
} from "../typegraph/versions.ts";
1212
import { typegraphIdSchema, type TypegraphStore } from "../sync/typegraph.ts";
1313
import { RedisReplicatedMap } from "../sync/replicated_map.ts";
14+
import {
15+
type CachedResponse,
16+
CachedResponseSchema,
17+
} from "@metatype/typegate/utils.ts";
1418

1519
export interface MessageEntry {
1620
type: "info" | "warning" | "error";
@@ -33,6 +37,12 @@ export abstract class Register implements AsyncDisposable {
3337

3438
abstract has(name: string): boolean;
3539

40+
abstract addResponse(key: string, response: CachedResponse): Promise<void>;
41+
42+
abstract deleteResponse(key: string): Promise<void>;
43+
44+
abstract getResponse(key: string): CachedResponse | undefined;
45+
3646
abstract [Symbol.asyncDispose](): Promise<void>;
3747
}
3848

@@ -83,15 +93,35 @@ export class ReplicatedRegister extends Register {
8393
},
8494
);
8595

86-
return new ReplicatedRegister(replicatedMap);
96+
const replicatedResponseMap = await RedisReplicatedMap.init<CachedResponse>(
97+
"typegraph_responses",
98+
redisConfig,
99+
{
100+
serialize(data: CachedResponse) {
101+
return JSON.stringify(data);
102+
},
103+
deserialize(json: string, _: boolean) {
104+
const raw = JSON.parse(json);
105+
const cachedResponse = CachedResponseSchema.parse(raw);
106+
return cachedResponse;
107+
},
108+
async terminate(_: CachedResponse) {},
109+
},
110+
);
111+
112+
return new ReplicatedRegister(replicatedMap, replicatedResponseMap);
87113
}
88114

89-
constructor(public replicatedMap: RedisReplicatedMap<QueryEngine>) {
115+
constructor(
116+
public replicatedMap: RedisReplicatedMap<QueryEngine>,
117+
public replicatedResponseMap: RedisReplicatedMap<CachedResponse>,
118+
) {
90119
super();
91120
}
92121

93122
async [Symbol.asyncDispose](): Promise<void> {
94123
await this.replicatedMap[Symbol.asyncDispose]();
124+
await this.replicatedResponseMap[Symbol.asyncDispose]();
95125
await Promise.all(this.list().map((e) => e[Symbol.asyncDispose]()));
96126
}
97127

@@ -133,7 +163,27 @@ export class ReplicatedRegister extends Register {
133163
return this.replicatedMap.historySync();
134164
}
135165

166+
historySyncResponses(): Promise<XIdInput> {
167+
return this.replicatedResponseMap.historySync();
168+
}
169+
136170
startSync(xid: XIdInput): void {
137171
void this.replicatedMap.startSync(xid);
138172
}
173+
174+
startSyncResponses(xid: XIdInput): void {
175+
void this.replicatedResponseMap.startSync(xid);
176+
}
177+
178+
addResponse(key: string, response: CachedResponse): Promise<void> {
179+
return this.replicatedResponseMap.set(key, response);
180+
}
181+
182+
deleteResponse(key: string): Promise<void> {
183+
return this.replicatedResponseMap.delete(key);
184+
}
185+
186+
getResponse(key: string): CachedResponse | undefined {
187+
return this.replicatedResponseMap.get(key);
188+
}
139189
}

0 commit comments

Comments
 (0)