Skip to content

Commit 4da10af

Browse files
authored
Replace DurablePromise union type with single type (#128)
Implements the proposed change in #126
1 parent 7bf23ef commit 4da10af

File tree

7 files changed

+76
-172
lines changed

7 files changed

+76
-172
lines changed

lib/core/promises/promises.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { IEncoder } from "../encoder";
22
import { ErrorCodes, ResonateError } from "../errors";
33
import { IPromiseStore } from "../store";
4-
import { PendingPromise, ResolvedPromise, RejectedPromise, CanceledPromise, TimedoutPromise } from "./types";
4+
import { DurablePromiseRecord } from "./types";
55

66
/**
77
* Durable Promise create options.
@@ -69,7 +69,7 @@ export class DurablePromise<T> {
6969
constructor(
7070
private store: IPromiseStore,
7171
private encoder: IEncoder<unknown, string | undefined>,
72-
private promise: PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise,
72+
private promise: DurablePromiseRecord,
7373
) {
7474
this.completed = new Promise((resolve) => {
7575
this.complete = resolve;

lib/core/promises/types.ts

+17-98
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,5 @@
1-
export type DurablePromise = PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise;
2-
3-
export type PendingPromise = {
4-
state: "PENDING";
5-
id: string;
6-
timeout: number;
7-
param: {
8-
headers: Record<string, string> | undefined;
9-
data: string | undefined;
10-
};
11-
value: {
12-
headers: undefined;
13-
data: undefined;
14-
};
15-
createdOn: number;
16-
completedOn: undefined;
17-
idempotencyKeyForCreate: string | undefined;
18-
idempotencyKeyForComplete: undefined;
19-
tags: Record<string, string> | undefined;
20-
};
21-
22-
export type ResolvedPromise = {
23-
state: "RESOLVED";
24-
id: string;
25-
timeout: number;
26-
param: {
27-
headers: Record<string, string> | undefined;
28-
data: string | undefined;
29-
};
30-
value: {
31-
headers: Record<string, string> | undefined;
32-
data: string | undefined;
33-
};
34-
createdOn: number;
35-
completedOn: number;
36-
idempotencyKeyForCreate: string | undefined;
37-
idempotencyKeyForComplete: string | undefined;
38-
tags: Record<string, string> | undefined;
39-
};
40-
41-
export type RejectedPromise = {
42-
state: "REJECTED";
1+
export type DurablePromiseRecord = {
2+
state: "PENDING" | "RESOLVED" | "REJECTED" | "REJECTED_CANCELED" | "REJECTED_TIMEDOUT";
433
id: string;
444
timeout: number;
455
param: {
@@ -51,53 +11,14 @@ export type RejectedPromise = {
5111
data: string | undefined;
5212
};
5313
createdOn: number;
54-
completedOn: number;
14+
completedOn: number | undefined;
5515
idempotencyKeyForCreate: string | undefined;
5616
idempotencyKeyForComplete: string | undefined;
5717
tags: Record<string, string> | undefined;
5818
};
5919

60-
export type CanceledPromise = {
61-
state: "REJECTED_CANCELED";
62-
id: string;
63-
timeout: number;
64-
param: {
65-
headers: Record<string, string> | undefined;
66-
data: string | undefined;
67-
};
68-
value: {
69-
headers: Record<string, string> | undefined;
70-
data: string | undefined;
71-
};
72-
createdOn: number;
73-
completedOn: number;
74-
idempotencyKeyForCreate: string | undefined;
75-
idempotencyKeyForComplete: string | undefined;
76-
tags: Record<string, string> | undefined;
77-
};
78-
79-
export type TimedoutPromise = {
80-
state: "REJECTED_TIMEDOUT";
81-
id: string;
82-
timeout: number;
83-
param: {
84-
headers: Record<string, string> | undefined;
85-
data: string | undefined;
86-
};
87-
value: {
88-
headers: undefined;
89-
data: undefined;
90-
};
91-
createdOn: number;
92-
completedOn: number;
93-
idempotencyKeyForCreate: string | undefined;
94-
idempotencyKeyForComplete: undefined;
95-
tags: Record<string, string> | undefined;
96-
};
97-
98-
// Type guards
99-
100-
export function isDurablePromise(p: unknown): p is DurablePromise {
20+
// This is an unsound type guard, we should be more strict in what we call a DurablePromise
21+
export function isDurablePromiseRecord(p: unknown): p is DurablePromiseRecord {
10122
return (
10223
p !== null &&
10324
typeof p === "object" &&
@@ -107,28 +28,26 @@ export function isDurablePromise(p: unknown): p is DurablePromise {
10728
);
10829
}
10930

110-
export function isPendingPromise(p: unknown): p is PendingPromise {
111-
return isDurablePromise(p) && p.state === "PENDING";
31+
export function isPendingPromise(p: DurablePromiseRecord): boolean {
32+
return p.state === "PENDING";
11233
}
11334

114-
export function isResolvedPromise(p: unknown): p is ResolvedPromise {
115-
return isDurablePromise(p) && p.state === "RESOLVED";
35+
export function isResolvedPromise(p: DurablePromiseRecord): boolean {
36+
return p.state === "RESOLVED";
11637
}
11738

118-
export function isRejectedPromise(p: unknown): p is RejectedPromise {
119-
return isDurablePromise(p) && p.state === "REJECTED";
39+
export function isRejectedPromise(p: DurablePromiseRecord): boolean {
40+
return p.state === "REJECTED";
12041
}
12142

122-
export function isCanceledPromise(p: unknown): p is CanceledPromise {
123-
return isDurablePromise(p) && p.state === "REJECTED_CANCELED";
43+
export function isCanceledPromise(p: DurablePromiseRecord): boolean {
44+
return p.state === "REJECTED_CANCELED";
12445
}
12546

126-
export function isTimedoutPromise(p: unknown): p is TimedoutPromise {
127-
return isDurablePromise(p) && p.state === "REJECTED_TIMEDOUT";
47+
export function isTimedoutPromise(p: DurablePromiseRecord): boolean {
48+
return p.state === "REJECTED_TIMEDOUT";
12849
}
12950

130-
export function isCompletedPromise(
131-
p: unknown,
132-
): p is ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise {
133-
return isDurablePromise(p) && ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state);
51+
export function isCompletedPromise(p: DurablePromiseRecord): boolean {
52+
return ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state);
13453
}

lib/core/storages/memory.ts

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { IStorage } from "../storage";
33
export class MemoryStorage<T> implements IStorage<T> {
44
private items: Record<string, T> = {};
55

6+
// read-modify-write
67
async rmw<X extends T | undefined>(id: string, func: (item: T | undefined) => X): Promise<X> {
78
const item = func(this.items[id]);
89
if (item) {
@@ -12,6 +13,7 @@ export class MemoryStorage<T> implements IStorage<T> {
1213
return item;
1314
}
1415

16+
// read-modify-delete
1517
async rmd(id: string, func: (item: T) => boolean): Promise<boolean> {
1618
const item = this.items[id];
1719
let result = false;

lib/core/storages/withTimeout.ts

+12-20
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
1-
import { DurablePromise, ResolvedPromise, TimedoutPromise, isPendingPromise } from "../promises/types";
1+
import { DurablePromiseRecord, isPendingPromise } from "../promises/types";
22
import { IStorage } from "../storage";
33
import { MemoryStorage } from "./memory";
44

5-
export class WithTimeout implements IStorage<DurablePromise> {
6-
constructor(private storage: IStorage<DurablePromise> = new MemoryStorage<DurablePromise>()) {}
5+
export class WithTimeout implements IStorage<DurablePromiseRecord> {
6+
constructor(private storage: IStorage<DurablePromiseRecord> = new MemoryStorage<DurablePromiseRecord>()) {}
77

8-
rmw<T extends DurablePromise | undefined>(id: string, func: (item: DurablePromise | undefined) => T): Promise<T> {
8+
rmw<T extends DurablePromiseRecord | undefined>(
9+
id: string,
10+
func: (item: DurablePromiseRecord | undefined) => T,
11+
): Promise<T> {
912
return this.storage.rmw(id, (p) => func(p ? timeout(p) : undefined));
1013
}
1114

12-
rmd(id: string, func: (item: DurablePromise) => boolean): Promise<boolean> {
15+
rmd(id: string, func: (item: DurablePromiseRecord) => boolean): Promise<boolean> {
1316
return this.storage.rmd(id, (p) => func(timeout(p)));
1417
}
1518

16-
async *all(): AsyncGenerator<DurablePromise[], void> {
19+
async *all(): AsyncGenerator<DurablePromiseRecord[], void> {
1720
for await (const promises of this.storage.all()) {
1821
yield promises.map(timeout);
1922
}
2023
}
2124
}
2225

23-
function timeout<T extends DurablePromise>(promise: T): T | ResolvedPromise | TimedoutPromise {
26+
function timeout<T extends DurablePromiseRecord>(promise: T): DurablePromiseRecord {
2427
if (isPendingPromise(promise) && Date.now() >= promise.timeout) {
25-
const body = {
28+
return {
29+
state: promise.tags?.["resonate:timeout"] === "true" ? "RESOLVED" : "REJECTED_TIMEDOUT",
2630
id: promise.id,
2731
timeout: promise.timeout,
2832
param: promise.param,
@@ -36,18 +40,6 @@ function timeout<T extends DurablePromise>(promise: T): T | ResolvedPromise | Ti
3640
idempotencyKeyForComplete: undefined,
3741
tags: promise.tags,
3842
};
39-
40-
if (promise.tags?.["resonate:timeout"] === "true") {
41-
return {
42-
state: "RESOLVED",
43-
...body,
44-
};
45-
} else {
46-
return {
47-
state: "REJECTED_TIMEDOUT",
48-
...body,
49-
};
50-
}
5143
}
5244

5345
return promise;

lib/core/store.ts

+7-14
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,4 @@
1-
import {
2-
DurablePromise,
3-
PendingPromise,
4-
ResolvedPromise,
5-
RejectedPromise,
6-
CanceledPromise,
7-
TimedoutPromise,
8-
} from "./promises/types";
1+
import { DurablePromiseRecord } from "./promises/types";
92

103
import { Schedule } from "./schedules/types";
114

@@ -42,7 +35,7 @@ export interface IPromiseStore {
4235
data: string | undefined,
4336
timeout: number,
4437
tags: Record<string, string> | undefined,
45-
): Promise<PendingPromise | CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
38+
): Promise<DurablePromiseRecord>;
4639

4740
/**
4841
* Cancels a new promise.
@@ -60,7 +53,7 @@ export interface IPromiseStore {
6053
strict: boolean,
6154
headers: Record<string, string> | undefined,
6255
data: string | undefined,
63-
): Promise<CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
56+
): Promise<DurablePromiseRecord>;
6457

6558
/**
6659
* Resolves a promise.
@@ -78,7 +71,7 @@ export interface IPromiseStore {
7871
strict: boolean,
7972
headers: Record<string, string> | undefined,
8073
data: string | undefined,
81-
): Promise<CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
74+
): Promise<DurablePromiseRecord>;
8275

8376
/**
8477
* Rejects a promise
@@ -96,15 +89,15 @@ export interface IPromiseStore {
9689
strict: boolean,
9790
headers: Record<string, string> | undefined,
9891
data: string | undefined,
99-
): Promise<CanceledPromise | ResolvedPromise | RejectedPromise | TimedoutPromise>;
92+
): Promise<DurablePromiseRecord>;
10093

10194
/**
10295
* Retrieves a promise based on its id.
10396
*
10497
* @param id Unique identifier for the promise to be retrieved.
10598
* @returns A durable promise that is pending, canceled, resolved, or rejected.
10699
*/
107-
get(id: string): Promise<DurablePromise>;
100+
get(id: string): Promise<DurablePromiseRecord>;
108101

109102
/**
110103
* Search for promises.
@@ -120,7 +113,7 @@ export interface IPromiseStore {
120113
state: string | undefined,
121114
tags: Record<string, string> | undefined,
122115
limit?: number,
123-
): AsyncGenerator<DurablePromise[], void>;
116+
): AsyncGenerator<DurablePromiseRecord[], void>;
124117
}
125118

126119
/**

lib/core/stores/local.ts

+9-14
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@ import { ILogger } from "../logger";
44
import { Logger } from "../loggers/logger";
55
import { StoreOptions } from "../options";
66
import {
7-
DurablePromise,
8-
PendingPromise,
9-
ResolvedPromise,
10-
RejectedPromise,
11-
CanceledPromise,
12-
TimedoutPromise,
7+
DurablePromiseRecord,
138
isPendingPromise,
149
isResolvedPromise,
1510
isRejectedPromise,
@@ -34,7 +29,7 @@ export class LocalStore implements IStore {
3429

3530
constructor(
3631
opts: Partial<StoreOptions> = {},
37-
promiseStorage: IStorage<DurablePromise> = new WithTimeout(new MemoryStorage<DurablePromise>()),
32+
promiseStorage: IStorage<DurablePromiseRecord> = new WithTimeout(new MemoryStorage<DurablePromiseRecord>()),
3833
scheduleStorage: IStorage<Schedule> = new MemoryStorage<Schedule>(),
3934
lockStorage: IStorage<{ id: string; eid: string }> = new MemoryStorage<{ id: string; eid: string }>(),
4035
) {
@@ -122,7 +117,7 @@ export class LocalStore implements IStore {
122117
export class LocalPromiseStore implements IPromiseStore {
123118
constructor(
124119
private store: LocalStore,
125-
private storage: IStorage<DurablePromise>,
120+
private storage: IStorage<DurablePromiseRecord>,
126121
) {}
127122

128123
async create(
@@ -133,7 +128,7 @@ export class LocalPromiseStore implements IPromiseStore {
133128
data: string | undefined,
134129
timeout: number,
135130
tags: Record<string, string> | undefined,
136-
): Promise<PendingPromise | ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
131+
): Promise<DurablePromiseRecord> {
137132
return this.storage.rmw(id, (promise) => {
138133
if (!promise) {
139134
return {
@@ -174,7 +169,7 @@ export class LocalPromiseStore implements IPromiseStore {
174169
strict: boolean,
175170
headers: Record<string, string> | undefined,
176171
data: string | undefined,
177-
): Promise<ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
172+
): Promise<DurablePromiseRecord> {
178173
return this.storage.rmw(id, (promise) => {
179174
if (!promise) {
180175
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
@@ -219,7 +214,7 @@ export class LocalPromiseStore implements IPromiseStore {
219214
strict: boolean,
220215
headers: Record<string, string> | undefined,
221216
data: string | undefined,
222-
): Promise<ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
217+
): Promise<DurablePromiseRecord> {
223218
return this.storage.rmw(id, (promise) => {
224219
if (!promise) {
225220
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
@@ -264,7 +259,7 @@ export class LocalPromiseStore implements IPromiseStore {
264259
strict: boolean,
265260
headers: Record<string, string> | undefined,
266261
data: string | undefined,
267-
): Promise<ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise> {
262+
): Promise<DurablePromiseRecord> {
268263
return this.storage.rmw(id, (promise) => {
269264
if (!promise) {
270265
throw new ResonateError("Not found", ErrorCodes.STORE_NOT_FOUND);
@@ -303,7 +298,7 @@ export class LocalPromiseStore implements IPromiseStore {
303298
});
304299
}
305300

306-
async get(id: string): Promise<DurablePromise> {
301+
async get(id: string): Promise<DurablePromiseRecord> {
307302
const promise = await this.storage.rmw(id, (p) => p);
308303

309304
if (!promise) {
@@ -318,7 +313,7 @@ export class LocalPromiseStore implements IPromiseStore {
318313
state?: string,
319314
tags?: Record<string, string>,
320315
limit?: number,
321-
): AsyncGenerator<DurablePromise[], void> {
316+
): AsyncGenerator<DurablePromiseRecord[], void> {
322317
// filter the promises returned from all storage
323318
const regex = new RegExp(id.replaceAll("*", ".*"));
324319
const states = searchStates(state);

0 commit comments

Comments
 (0)