Skip to content

Schedules for stores #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
16e3f96
1. adding schedule type
vaibhawvipul Jan 10, 2024
e318ab5
removing unnecessary try/catch block
vaibhawvipul Jan 10, 2024
3cc7859
1. added schedules methods to the local promise store
vaibhawvipul Jan 10, 2024
a258a5d
fix lint errors
vaibhawvipul Jan 10, 2024
da9bbb7
fix lint
vaibhawvipul Jan 10, 2024
0d68770
adding TODO comments
vaibhawvipul Jan 10, 2024
44ea99f
embedding promiseValue type
vaibhawvipul Jan 11, 2024
a032bb7
fix lint errors
vaibhawvipul Jan 11, 2024
58db94a
using cron lib to parse cron expression for nextruntime
vaibhawvipul Jan 11, 2024
c03f2c5
deleted schedule returns a 204 so removing unnecessary try/catch
vaibhawvipul Jan 11, 2024
8d0c663
moving isSchedule fn to schedule.ts
vaibhawvipul Jan 11, 2024
97be805
fix test error
vaibhawvipul Jan 11, 2024
27691d5
moving isSearchSchedulesResp to store.ts
vaibhawvipul Jan 11, 2024
fc28ccf
any response from delete is true
vaibhawvipul Jan 11, 2024
f7f4e83
split up parse and validate
vaibhawvipul Jan 12, 2024
582ebef
adding tests for scheduler
vaibhawvipul Jan 16, 2024
7a13747
fix lint error
vaibhawvipul Jan 16, 2024
a1c3d47
fix tests
vaibhawvipul Jan 16, 2024
ce21799
adding rmw for scheduler and test modifications
vaibhawvipul Jan 17, 2024
23b3225
fixing naming conventions
vaibhawvipul Jan 17, 2024
ee15909
fix lint errors
vaibhawvipul Jan 17, 2024
adc47de
lint errors
vaibhawvipul Jan 17, 2024
26d6de8
implementing delete schedules
vaibhawvipul Jan 17, 2024
b9e6038
separating scheduels and promises in the local store
vaibhawvipul Jan 18, 2024
d01d51f
all 3 clases implemented
vaibhawvipul Jan 18, 2024
e2abc08
splitting the storage for schedules and promises
vaibhawvipul Jan 18, 2024
a7c9684
resolving merge conflicts
vaibhawvipul Jan 18, 2024
eb0f129
resolving lint errors
vaibhawvipul Jan 18, 2024
a11de53
remove unnecessary print statements
vaibhawvipul Jan 18, 2024
e8ebc73
1. renaming deleteSchedules to delete.
vaibhawvipul Jan 19, 2024
eb0b236
removing the if hasstate check in rmw promise
vaibhawvipul Jan 19, 2024
9353bc6
1. removing unnecessary checks
vaibhawvipul Jan 19, 2024
f26f9b0
removing redundant checks and moved retries arg in api call to the end
vaibhawvipul Jan 19, 2024
a1fe600
fix lint errors
vaibhawvipul Jan 19, 2024
918e649
moving control loop to localstore
vaibhawvipul Jan 19, 2024
9acd43f
moving all the control flow logic to local store
vaibhawvipul Jan 19, 2024
e01ada8
local store take a generic IPromiseStore and IScheduleStore
vaibhawvipul Jan 19, 2024
fdff006
fix lint errors
vaibhawvipul Jan 19, 2024
a1b5194
changing localstore and remotestore function to take ipromisestore an…
vaibhawvipul Jan 19, 2024
ff51d4c
fix lint errors
vaibhawvipul Jan 19, 2024
64edf74
Search promises and schedules (#27)
dfarr Jan 19, 2024
0929b45
fix lint errors
vaibhawvipul Jan 19, 2024
2bef90f
removing clocks.ts
vaibhawvipul Jan 19, 2024
a7615c7
taking ipromisestore and ischedulestore to construct remotestore and …
vaibhawvipul Jan 19, 2024
d71b9b6
expandPromiseIdTemplate
vaibhawvipul Jan 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/core/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,15 @@ export function isCompletedPromise(
): p is ResolvedPromise | RejectedPromise | CanceledPromise | TimedoutPromise {
return isDurablePromise(p) && ["RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"].includes(p.state);
}

export function searchStates(state: string | undefined): string[] {
if (state?.toLowerCase() == "pending") {
return ["PENDING"];
} else if (state?.toLowerCase() == "resolved") {
return ["RESOLVED"];
} else if (state?.toLowerCase() == "rejected") {
return ["REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
} else {
return ["PENDING", "RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
}
}
29 changes: 29 additions & 0 deletions lib/core/schedule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
export type Schedule = {
id: string;
description?: string;
cron: string;
tags?: Record<string, string>;
promiseId: string;
promiseTimeout: number;
promiseParam?: {
data?: string;
headers: Record<string, string>;
};
promiseTags?: Record<string, string>;
lastRunTime?: number;
nextRunTime?: number;
idempotencyKey?: string;
createdOn?: number;
};

// Function to check if the response matches the Schedule type
export function isSchedule(obj: any): obj is Schedule {
// You may need to adjust this based on the actual structure of your Schedule type
return (
obj !== undefined &&
typeof obj.id === "string" &&
typeof obj.cron === "string" &&
typeof obj.promiseId === "string" &&
typeof obj.promiseTimeout === "number"
);
}
63 changes: 8 additions & 55 deletions lib/core/storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DurablePromise, TimedoutPromise, isPendingPromise } from "./promise";
import { DurablePromise } from "./promise";
import { Schedule } from "./schedule";

export interface IStorage {
export interface IPromiseStorage {
rmw<P extends DurablePromise | undefined>(id: string, f: (promise: DurablePromise | undefined) => P): Promise<P>;
search(
id: string,
Expand All @@ -10,61 +11,13 @@ export interface IStorage {
): AsyncGenerator<DurablePromise[], void>;
}

export class WithTimeout implements IStorage {
constructor(private storage: IStorage) {}

rmw<P extends DurablePromise | undefined>(id: string, f: (promise: DurablePromise | undefined) => P): Promise<P> {
return this.storage.rmw(id, (promise) => f(timeout(promise)));
}

async *search(
export interface IScheduleStorage {
rmw<S extends Schedule | undefined>(id: string, f: (schedule: Schedule | undefined) => S): Promise<S>;
search(
id: string,
state: string | undefined,
tags: Record<string, string> | undefined,
limit: number | undefined,
): AsyncGenerator<DurablePromise[], void, unknown> {
const regex = new RegExp(id.replaceAll("*", ".*"));
const tagEntries = Object.entries(tags ?? {});

let states: string[] = [];
if (state?.toLowerCase() == "pending") {
states = ["PENDING"];
} else if (state?.toLowerCase() == "resolved") {
states = ["RESOLVED"];
} else if (state?.toLowerCase() == "rejected") {
states = ["REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
} else {
states = ["PENDING", "RESOLVED", "REJECTED", "REJECTED_CANCELED", "REJECTED_TIMEDOUT"];
}

for await (const res of this.storage.search("*", undefined, undefined, limit)) {
yield res
.map(timeout)
.filter((promise) => regex.test(promise.id))
.filter((promise) => states.includes(promise.state))
.filter((promise) => tagEntries.every(([k, v]) => promise.tags?.[k] == v));
}
}
}

function timeout<P extends DurablePromise | undefined>(promise: P): P | TimedoutPromise {
if (isPendingPromise(promise) && Date.now() >= promise.timeout) {
return {
state: "REJECTED_TIMEDOUT",
id: promise.id,
timeout: promise.timeout,
param: promise.param,
value: {
headers: undefined,
data: undefined,
},
createdOn: promise.createdOn,
completedOn: promise.timeout,
idempotencyKeyForCreate: promise.idempotencyKeyForCreate,
idempotencyKeyForComplete: undefined,
tags: promise.tags,
};
}
): AsyncGenerator<Schedule[], void>;

return promise;
delete(id: string): Promise<boolean>;
}
14 changes: 10 additions & 4 deletions lib/core/storages/indexdb.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { IStorage } from "../storage";
import { IPromiseStorage } from "../storage";
import { DurablePromise, isDurablePromise } from "../promise";
import { ResonateError, ErrorCodes } from "../error";
import { Schedule } from "../schedule";

export class IndexedDbStorage implements IStorage {
export class IndexedDbStorage implements IPromiseStorage {
private dbName = "resonateDB";
private readonly storeName = "promises";
private db: Promise<IDBDatabase>;
Expand Down Expand Up @@ -32,7 +33,7 @@ export class IndexedDbStorage implements IStorage {
}
}

async rmw<P extends DurablePromise | undefined>(
async rmw<P extends DurablePromise | Schedule | undefined>(
id: string,
f: (promise: DurablePromise | undefined) => P,
): Promise<P> {
Expand All @@ -43,7 +44,7 @@ export class IndexedDbStorage implements IStorage {
const storedPromise: DurablePromise | undefined = await this.getPromiseById(objectStore, id);
const resultPromise = f(storedPromise);

if (resultPromise) {
if (isDurablePromise(resultPromise)) {
await this.savePromise(objectStore, resultPromise);
}

Expand All @@ -65,6 +66,11 @@ export class IndexedDbStorage implements IStorage {
yield this.getAllPromises(objectStore);
}

async deleteSchedule(id: string): Promise<boolean> {
// TODO: To be implemented
return true;
}

private async getDb(): Promise<IDBDatabase> {
return new Promise<IDBDatabase>((resolve, reject) => {
const request = indexedDB.open(this.dbName, 1);
Expand Down
72 changes: 57 additions & 15 deletions lib/core/storages/memory.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import { IStorage } from "../storage";
import { DurablePromise } from "../promise";
import { IPromiseStorage, IScheduleStorage } from "../storage";
import { DurablePromise, searchStates } from "../promise";
import { Schedule } from "../schedule";

export class MemoryStorage implements IStorage {
export class MemoryPromiseStorage implements IPromiseStorage {
private promises: Record<string, DurablePromise> = {};

constructor() {}

async rmw<P extends DurablePromise | undefined>(
id: string,
f: (promise: DurablePromise | undefined) => P,
): Promise<P> {
const promise = f(this.promises[id]);
if (promise) {
this.promises[id] = promise;
async rmw<P extends DurablePromise | undefined>(id: string, f: (item: DurablePromise | undefined) => P): Promise<P> {
const item = f(this.promises[id]);
if (item) {
this.promises[id] = item;
}

return promise;
return item;
}

async *search(
Expand All @@ -24,8 +21,53 @@ export class MemoryStorage implements IStorage {
tags: Record<string, string> | undefined,
limit: number | undefined,
): AsyncGenerator<DurablePromise[], void> {
// for now WithTimeout will implement
// search logic
yield Object.values(this.promises);
// for the memory storage, we will ignore limit and return all
// promises that match the search criteria
const regex = new RegExp(id.replaceAll("*", ".*"));
const states = searchStates(state);
const tagEntries = Object.entries(tags ?? {});

yield Object.values(this.promises)
.filter((promise) => states.includes(promise.state))
.filter((promise) => regex.test(promise.id))
.filter((promise) => tagEntries.every(([k, v]) => promise.tags?.[k] == v));
}
}

export class MemoryScheduleStorage implements IScheduleStorage {
private schedules: Record<string, Schedule> = {};

constructor() {}

async rmw<S extends Schedule | undefined>(id: string, f: (item: Schedule | undefined) => S): Promise<S> {
const item = f(this.schedules[id]);
if (item) {
this.schedules[id] = item;
}

return item;
}

async *search(
id: string,
tags: Record<string, string> | undefined,
limit: number | undefined,
): AsyncGenerator<Schedule[], void> {
// for the memory storage, we will ignore limit and return all
// schedules that match the search criteria
const regex = new RegExp(id.replaceAll("*", ".*"));
const tagEntries = Object.entries(tags ?? {});

yield Object.values(this.schedules)
.filter((schedule) => regex.test(schedule.id))
.filter((schedule) => tagEntries.every(([k, v]) => schedule.tags?.[k] == v));
}

async delete(id: string): Promise<boolean> {
if (this.schedules[id]) {
delete this.schedules[id];
return true;
}
return false;
}
}
49 changes: 49 additions & 0 deletions lib/core/storages/withTimeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { DurablePromise, TimedoutPromise, isPendingPromise, searchStates } from "../promise";
import { IPromiseStorage } from "../storage";

export class WithTimeout implements IPromiseStorage {
constructor(private storage: IPromiseStorage) {}

rmw<P extends DurablePromise | undefined>(id: string, f: (promise: DurablePromise | undefined) => P): Promise<P> {
return this.storage.rmw(id, (promise) => f(promise ? timeout(promise) : undefined));
}

async *search(
id: string,
state: string | undefined,
tags: Record<string, string> | undefined,
limit: number | undefined,
): AsyncGenerator<DurablePromise[], void, unknown> {
// search promises against all provided criteria except state,
// then timeout any pending promises that have exceeded the
// timout, and finally apply the state filter

const states = searchStates(state);

for await (const promises of this.storage.search(id, undefined, tags, limit)) {
yield promises.map(timeout).filter((promise) => states.includes(promise.state));
}
}
}

function timeout<P extends DurablePromise>(promise: P): P | TimedoutPromise {
if (isPendingPromise(promise) && Date.now() >= promise.timeout) {
return {
state: "REJECTED_TIMEDOUT",
id: promise.id,
timeout: promise.timeout,
param: promise.param,
value: {
headers: undefined,
data: undefined,
},
createdOn: promise.createdOn,
completedOn: promise.timeout,
idempotencyKeyForCreate: promise.idempotencyKeyForCreate,
idempotencyKeyForComplete: undefined,
tags: promise.tags,
};
}

return promise;
}
67 changes: 67 additions & 0 deletions lib/core/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
isDurablePromise,
} from "./promise";

import { Schedule, isSchedule } from "./schedule";

export interface SearchPromiseParams {
id: string;
state?: string;
Expand All @@ -31,6 +33,17 @@ export function isSearchPromiseResult(obj: any): obj is SearchPromiseResult {
);
}

export function isSearchSchedulesResult(obj: any): obj is { cursor: string; schedules: Schedule[] } {
return (
obj !== undefined &&
obj.cursor !== undefined &&
(obj.cursor === null || typeof obj.cursor === "string") &&
obj.schedules !== undefined &&
Array.isArray(obj.schedules) &&
obj.schedules.every(isSchedule)
);
}

/**
* Promise Store API
*/
Expand Down Expand Up @@ -133,3 +146,57 @@ export interface IPromiseStore {
limit?: number,
): AsyncGenerator<DurablePromise[], void>;
}

export interface IScheduleStore {
/**
* Creates a new schedule.
*
* @param id Unique identifier for the schedule.
* @param ikey Idempotency key associated with the create operation.
* @param description Description of the schedule.
* @param cron CRON expression defining the schedule's execution time.
* @param tags Key-value pairs associated with the schedule.
* @param promiseId Unique identifier for the associated promise.
* @param promiseTimeout Timeout for the associated promise in milliseconds.
* @param promiseHeaders Headers associated with the promise data.
* @param promiseData Encoded data for the promise of type string.
* @param promiseTags Key-value pairs associated with the promise.
* @returns A Promise resolving to the created schedule.
*/
create(
id: string,
ikey: string | undefined,
description: string | undefined,
cron: string,
tags: Record<string, string> | undefined,
promiseId: string,
promiseTimeout: number,
promiseHeaders: Record<string, string> | undefined,
promiseData: string | undefined,
promiseTags: Record<string, string> | undefined,
): Promise<Schedule>;

/**
* Retrieves a schedule based on its id.
*
* @param id Unique identifier for the promise to be retrieved.
* @returns A promise schedule that is pending, canceled, resolved, or rejected.
*/
get(id: string): Promise<Schedule>;

/**
* Deletes a schedule based on its id.
* @param id Unique identifier for the promise to be deleted.
* @returns A promise schedule that is pending, canceled, resolved, or rejected.
*/
delete(id: string): Promise<boolean>;

/**
* Search for schedules.
*
* @param id Ids to match, can include wildcards.
* @param tags Tags to match.
* @returns A list of promise schedules.
*/
search(id: string, tags?: Record<string, string>, limit?: number): AsyncGenerator<Schedule[], void>;
}
Loading