Skip to content

Commit 13b7217

Browse files
authored
Schedules for stores (#21)
Add IScheduleStore and local and remote implementations
1 parent e88f1f3 commit 13b7217

13 files changed

+825
-224
lines changed

lib/core/promise.ts

+12
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,15 @@ export function isCompletedPromise(
134134
): p is ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise {
135135
return isDurablePromise(p) && ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state);
136136
}
137+
138+
export function searchStates(state: string | undefined): string[] {
139+
if (state?.toLowerCase() == "pending") {
140+
return ["PENDING"];
141+
} else if (state?.toLowerCase() == "resolved") {
142+
return ["RESOLVED"];
143+
} else if (state?.toLowerCase() == "rejected") {
144+
return ["REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
145+
} else {
146+
return ["PENDING", "RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
147+
}
148+
}

lib/core/schedule.ts

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
export type Schedule = {
2+
id: string;
3+
description?: string;
4+
cron: string;
5+
tags?: Record<string, string>;
6+
promiseId: string;
7+
promiseTimeout: number;
8+
promiseParam?: {
9+
data?: string;
10+
headers: Record<string, string>;
11+
};
12+
promiseTags?: Record<string, string>;
13+
lastRunTime?: number;
14+
nextRunTime?: number;
15+
idempotencyKey?: string;
16+
createdOn?: number;
17+
};
18+
19+
// Function to check if the response matches the Schedule type
20+
export function isSchedule(obj: any): obj is Schedule {
21+
// You may need to adjust this based on the actual structure of your Schedule type
22+
return (
23+
obj !== undefined &&
24+
typeof obj.id === "string" &&
25+
typeof obj.cron === "string" &&
26+
typeof obj.promiseId === "string" &&
27+
typeof obj.promiseTimeout === "number"
28+
);
29+
}

lib/core/storage.ts

+8-55
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import { DurablePromise, TimedoutPromise, isPendingPromise } from "./promise";
1+
import { DurablePromise } from "./promise";
2+
import { Schedule } from "./schedule";
23

3-
export interface IStorage {
4+
export interface IPromiseStorage {
45
rmw<P extends DurablePromise | undefined>(id: string, f: (promise: DurablePromise | undefined) => P): Promise<P>;
56
search(
67
id: string,
@@ -10,61 +11,13 @@ export interface IStorage {
1011
): AsyncGenerator<DurablePromise[], void>;
1112
}
1213

13-
export class WithTimeout implements IStorage {
14-
constructor(private storage: IStorage) {}
15-
16-
rmw<P extends DurablePromise | undefined>(id: string, f: (promise: DurablePromise | undefined) => P): Promise<P> {
17-
return this.storage.rmw(id, (promise) => f(timeout(promise)));
18-
}
19-
20-
async *search(
14+
export interface IScheduleStorage {
15+
rmw<S extends Schedule | undefined>(id: string, f: (schedule: Schedule | undefined) => S): Promise<S>;
16+
search(
2117
id: string,
22-
state: string | undefined,
2318
tags: Record<string, string> | undefined,
2419
limit: number | undefined,
25-
): AsyncGenerator<DurablePromise[], void, unknown> {
26-
const regex = new RegExp(id.replaceAll("*", ".*"));
27-
const tagEntries = Object.entries(tags ?? {});
28-
29-
let states: string[] = [];
30-
if (state?.toLowerCase() == "pending") {
31-
states = ["PENDING"];
32-
} else if (state?.toLowerCase() == "resolved") {
33-
states = ["RESOLVED"];
34-
} else if (state?.toLowerCase() == "rejected") {
35-
states = ["REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
36-
} else {
37-
states = ["PENDING", "RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
38-
}
39-
40-
for await (const res of this.storage.search("*", undefined, undefined, limit)) {
41-
yield res
42-
.map(timeout)
43-
.filter((promise) => regex.test(promise.id))
44-
.filter((promise) => states.includes(promise.state))
45-
.filter((promise) => tagEntries.every(([k, v]) => promise.tags?.[k] == v));
46-
}
47-
}
48-
}
49-
50-
function timeout<P extends DurablePromise | undefined>(promise: P): P | TimedoutPromise {
51-
if (isPendingPromise(promise) && Date.now() >= promise.timeout) {
52-
return {
53-
state: "REJECTED_TIMEDOUT",
54-
id: promise.id,
55-
timeout: promise.timeout,
56-
param: promise.param,
57-
value: {
58-
headers: undefined,
59-
data: undefined,
60-
},
61-
createdOn: promise.createdOn,
62-
completedOn: promise.timeout,
63-
idempotencyKeyForCreate: promise.idempotencyKeyForCreate,
64-
idempotencyKeyForComplete: undefined,
65-
tags: promise.tags,
66-
};
67-
}
20+
): AsyncGenerator<Schedule[], void>;
6821

69-
return promise;
22+
delete(id: string): Promise<boolean>;
7023
}

lib/core/storages/indexdb.ts

+10-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import { IStorage } from "../storage";
1+
import { IPromiseStorage } from "../storage";
22
import { DurablePromise, isDurablePromise } from "../promise";
33
import { ResonateError, ErrorCodes } from "../error";
4+
import { Schedule } from "../schedule";
45

5-
export class IndexedDbStorage implements IStorage {
6+
export class IndexedDbStorage implements IPromiseStorage {
67
private dbName = "resonateDB";
78
private readonly storeName = "promises";
89
private db: Promise<IDBDatabase>;
@@ -32,7 +33,7 @@ export class IndexedDbStorage implements IStorage {
3233
}
3334
}
3435

35-
async rmw<P extends DurablePromise | undefined>(
36+
async rmw<P extends DurablePromise | Schedule | undefined>(
3637
id: string,
3738
f: (promise: DurablePromise | undefined) => P,
3839
): Promise<P> {
@@ -43,7 +44,7 @@ export class IndexedDbStorage implements IStorage {
4344
const storedPromise: DurablePromise | undefined = await this.getPromiseById(objectStore, id);
4445
const resultPromise = f(storedPromise);
4546

46-
if (resultPromise) {
47+
if (isDurablePromise(resultPromise)) {
4748
await this.savePromise(objectStore, resultPromise);
4849
}
4950

@@ -65,6 +66,11 @@ export class IndexedDbStorage implements IStorage {
6566
yield this.getAllPromises(objectStore);
6667
}
6768

69+
async deleteSchedule(id: string): Promise<boolean> {
70+
// TODO: To be implemented
71+
return true;
72+
}
73+
6874
private async getDb(): Promise<IDBDatabase> {
6975
return new Promise<IDBDatabase>((resolve, reject) => {
7076
const request = indexedDB.open(this.dbName, 1);

lib/core/storages/memory.ts

+57-15
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
1-
import { IStorage } from "../storage";
2-
import { DurablePromise } from "../promise";
1+
import { IPromiseStorage, IScheduleStorage } from "../storage";
2+
import { DurablePromise, searchStates } from "../promise";
3+
import { Schedule } from "../schedule";
34

4-
export class MemoryStorage implements IStorage {
5+
export class MemoryPromiseStorage implements IPromiseStorage {
56
private promises: Record<string, DurablePromise> = {};
6-
77
constructor() {}
88

9-
async rmw<P extends DurablePromise | undefined>(
10-
id: string,
11-
f: (promise: DurablePromise | undefined) => P,
12-
): Promise<P> {
13-
const promise = f(this.promises[id]);
14-
if (promise) {
15-
this.promises[id] = promise;
9+
async rmw<P extends DurablePromise | undefined>(id: string, f: (item: DurablePromise | undefined) => P): Promise<P> {
10+
const item = f(this.promises[id]);
11+
if (item) {
12+
this.promises[id] = item;
1613
}
1714

18-
return promise;
15+
return item;
1916
}
2017

2118
async *search(
@@ -24,8 +21,53 @@ export class MemoryStorage implements IStorage {
2421
tags: Record<string, string> | undefined,
2522
limit: number | undefined,
2623
): AsyncGenerator<DurablePromise[], void> {
27-
// for now WithTimeout will implement
28-
// search logic
29-
yield Object.values(this.promises);
24+
// for the memory storage, we will ignore limit and return all
25+
// promises that match the search criteria
26+
const regex = new RegExp(id.replaceAll("*", ".*"));
27+
const states = searchStates(state);
28+
const tagEntries = Object.entries(tags ?? {});
29+
30+
yield Object.values(this.promises)
31+
.filter((promise) => states.includes(promise.state))
32+
.filter((promise) => regex.test(promise.id))
33+
.filter((promise) => tagEntries.every(([k, v]) => promise.tags?.[k] == v));
34+
}
35+
}
36+
37+
export class MemoryScheduleStorage implements IScheduleStorage {
38+
private schedules: Record<string, Schedule> = {};
39+
40+
constructor() {}
41+
42+
async rmw<S extends Schedule | undefined>(id: string, f: (item: Schedule | undefined) => S): Promise<S> {
43+
const item = f(this.schedules[id]);
44+
if (item) {
45+
this.schedules[id] = item;
46+
}
47+
48+
return item;
49+
}
50+
51+
async *search(
52+
id: string,
53+
tags: Record<string, string> | undefined,
54+
limit: number | undefined,
55+
): AsyncGenerator<Schedule[], void> {
56+
// for the memory storage, we will ignore limit and return all
57+
// schedules that match the search criteria
58+
const regex = new RegExp(id.replaceAll("*", ".*"));
59+
const tagEntries = Object.entries(tags ?? {});
60+
61+
yield Object.values(this.schedules)
62+
.filter((schedule) => regex.test(schedule.id))
63+
.filter((schedule) => tagEntries.every(([k, v]) => schedule.tags?.[k] == v));
64+
}
65+
66+
async delete(id: string): Promise<boolean> {
67+
if (this.schedules[id]) {
68+
delete this.schedules[id];
69+
return true;
70+
}
71+
return false;
3072
}
3173
}

lib/core/storages/withTimeout.ts

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { DurablePromise, TimedoutPromise, isPendingPromise, searchStates } from "../promise";
2+
import { IPromiseStorage } from "../storage";
3+
4+
export class WithTimeout implements IPromiseStorage {
5+
constructor(private storage: IPromiseStorage) {}
6+
7+
rmw<P extends DurablePromise | undefined>(id: string, f: (promise: DurablePromise | undefined) => P): Promise<P> {
8+
return this.storage.rmw(id, (promise) => f(promise ? timeout(promise) : undefined));
9+
}
10+
11+
async *search(
12+
id: string,
13+
state: string | undefined,
14+
tags: Record<string, string> | undefined,
15+
limit: number | undefined,
16+
): AsyncGenerator<DurablePromise[], void, unknown> {
17+
// search promises against all provided criteria except state,
18+
// then timeout any pending promises that have exceeded the
19+
// timout, and finally apply the state filter
20+
21+
const states = searchStates(state);
22+
23+
for await (const promises of this.storage.search(id, undefined, tags, limit)) {
24+
yield promises.map(timeout).filter((promise) => states.includes(promise.state));
25+
}
26+
}
27+
}
28+
29+
function timeout<P extends DurablePromise>(promise: P): P | TimedoutPromise {
30+
if (isPendingPromise(promise) && Date.now() >= promise.timeout) {
31+
return {
32+
state: "REJECTED_TIMEDOUT",
33+
id: promise.id,
34+
timeout: promise.timeout,
35+
param: promise.param,
36+
value: {
37+
headers: undefined,
38+
data: undefined,
39+
},
40+
createdOn: promise.createdOn,
41+
completedOn: promise.timeout,
42+
idempotencyKeyForCreate: promise.idempotencyKeyForCreate,
43+
idempotencyKeyForComplete: undefined,
44+
tags: promise.tags,
45+
};
46+
}
47+
48+
return promise;
49+
}

lib/core/store.ts

+67
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import {
88
isDurablePromise,
99
} from "./promise";
1010

11+
import { Schedule, isSchedule } from "./schedule";
12+
1113
export interface SearchPromiseParams {
1214
id: string;
1315
state?: string;
@@ -31,6 +33,17 @@ export function isSearchPromiseResult(obj: any): obj is SearchPromiseResult {
3133
);
3234
}
3335

36+
export function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules: Schedule[] } {
37+
return (
38+
obj !== undefined &&
39+
obj.cursor !== undefined &&
40+
(obj.cursor === null || typeof obj.cursor === "string") &&
41+
obj.schedules !== undefined &&
42+
Array.isArray(obj.schedules) &&
43+
obj.schedules.every(isSchedule)
44+
);
45+
}
46+
3447
/**
3548
* Promise Store API
3649
*/
@@ -133,3 +146,57 @@ export interface IPromiseStore {
133146
limit?: number,
134147
): AsyncGenerator<DurablePromise[], void>;
135148
}
149+
150+
export interface IScheduleStore {
151+
/**
152+
* Creates a new schedule.
153+
*
154+
* @param id Unique identifier for the schedule.
155+
* @param ikey Idempotency key associated with the create operation.
156+
* @param description Description of the schedule.
157+
* @param cron CRON expression defining the schedule's execution time.
158+
* @param tags Key-value pairs associated with the schedule.
159+
* @param promiseId Unique identifier for the associated promise.
160+
* @param promiseTimeout Timeout for the associated promise in milliseconds.
161+
* @param promiseHeaders Headers associated with the promise data.
162+
* @param promiseData Encoded data for the promise of type string.
163+
* @param promiseTags Key-value pairs associated with the promise.
164+
* @returns A Promise resolving to the created schedule.
165+
*/
166+
create(
167+
id: string,
168+
ikey: string | undefined,
169+
description: string | undefined,
170+
cron: string,
171+
tags: Record<string, string> | undefined,
172+
promiseId: string,
173+
promiseTimeout: number,
174+
promiseHeaders: Record<string, string> | undefined,
175+
promiseData: string | undefined,
176+
promiseTags: Record<string, string> | undefined,
177+
): Promise<Schedule>;
178+
179+
/**
180+
* Retrieves a schedule based on its id.
181+
*
182+
* @param id Unique identifier for the promise to be retrieved.
183+
* @returns A promise schedule that is pending, canceled, resolved, or rejected.
184+
*/
185+
get(id: string): Promise<Schedule>;
186+
187+
/**
188+
* Deletes a schedule based on its id.
189+
* @param id Unique identifier for the promise to be deleted.
190+
* @returns A promise schedule that is pending, canceled, resolved, or rejected.
191+
*/
192+
delete(id: string): Promise<boolean>;
193+
194+
/**
195+
* Search for schedules.
196+
*
197+
* @param id Ids to match, can include wildcards.
198+
* @param tags Tags to match.
199+
* @returns A list of promise schedules.
200+
*/
201+
search(id: string, tags?: Record<string, string>, limit?: number): AsyncGenerator<Schedule[], void>;
202+
}

0 commit comments

Comments
 (0)