Skip to content

Consolidate errors #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Execution, OrdinaryExecution, DeferredExecution } from "./core/executio
import { ResonatePromise } from "./core/future";
import { Invocation } from "./core/invocation";
import { ResonateOptions, Options, PartialOptions } from "./core/options";
import { Retry } from "./core/retries/retry";
import * as utils from "./core/utils";
import { ResonateBase } from "./resonate";

Expand Down Expand Up @@ -200,10 +201,19 @@ export class Context {
*/
run<T>(func: string, args?: any, opts?: PartialOptions): ResonatePromise<T>;
run(func: string | ((...args: any[]) => any), ...argsWithOpts: any[]): ResonatePromise<any> {
// the parent is the current invocation
const parent = this.invocation;

// the id is either:
// 1. a provided string in the case of a deferred execution
// 2. a generated string in the case of an ordinary execution
const id = typeof func === "string" ? func : `${this.invocation.id}.${this.invocation.counter}`;
const id = typeof func === "string" ? func : `${parent.id}.${parent.counter}`;

// human readable name of the function
const name = typeof func === "string" ? func : func.name;

// version is inherited from the parent
const version = parent.version;

// the idempotency key is a hash of the id
const idempotencyKey = utils.hash(id);
Expand All @@ -212,23 +222,22 @@ export class Context {
const { args, opts } = this.invocation.split(argsWithOpts);

// create a new invocation
const invocation = new Invocation(id, idempotencyKey, undefined, opts, this.invocation.version, this.invocation);
const invocation = new Invocation(name, version, id, idempotencyKey, undefined, undefined, opts, parent);

let execution: Execution<any>;
if (typeof func === "string") {
// create a deferred execution
// this execution will be fulfilled out-of-process
execution = new DeferredExecution(invocation);
} else {
const ctx = new Context(invocation);

// create an ordinary execution
// create an ordinary execution// human readable name of the function
// this execution wraps a user-provided function
execution = new OrdinaryExecution(invocation, () => func(ctx, ...args));
const ctx = new Context(invocation);
execution = new OrdinaryExecution(invocation, () => func(ctx, ...args), Retry.never());
}

// bump the invocation counter
this.invocation.counter++;
// bump the counter
parent.counter++;

// return a resonate promise
return execution.execute();
Expand All @@ -244,20 +253,25 @@ export class Context {
*/
io<F extends IFunc>(func: F, ...args: [...Params<F>, PartialOptions?]): ResonatePromise<Return<F>>;
io(func: (...args: any[]) => any, ...argsWithOpts: any[]): ResonatePromise<any> {
const id = `${this.invocation.id}.${this.invocation.counter}`;
const parent = this.invocation;

const id = `${parent.id}.${parent.counter}`;
const idempotencyKey = utils.hash(id);
const { args, opts } = this.invocation.split(argsWithOpts);

const invocation = new Invocation(id, idempotencyKey, undefined, opts, this.invocation.version, this.invocation);
const info = new Info(invocation);
const name = func.name;
const version = parent.version;

const invocation = new Invocation(name, version, id, idempotencyKey, undefined, undefined, opts, parent);

// create an ordinary execution
// this execution wraps a user-provided io function
// unlike run, an io function can not create child invocations
const info = new Info(invocation);
const execution = new OrdinaryExecution(invocation, () => func(info, ...args));

// bump the invocation counter
this.invocation.counter++;
// bump the counter
parent.counter++;

// return a resonate promise
return execution.execute();
Expand All @@ -279,7 +293,7 @@ export class Context {
/////////////////////////////////////////////////////////////////////

class Scheduler {
private executions: Record<string, { execution: Execution<any>; promise: ResonatePromise<any> }> = {};
private cache: Record<string, Execution<any>> = {};

add<F extends AFunc>(
name: string,
Expand All @@ -291,8 +305,9 @@ class Scheduler {
): ResonatePromise<Return<F>> {
// if the execution is already running, and not killed,
// return the promise
if (this.executions[id] && !this.executions[id].execution.invocation.killed) {
return this.executions[id].promise;
if (this.cache[id] && !this.cache[id].killed) {
// execute is idempotent
return this.cache[id].execute();
}

// the idempotency key is a hash of the id
Expand All @@ -305,21 +320,17 @@ class Scheduler {
args,
};

// add an invocation tag
opts.tags["resonate:invocation"] = "true";

// create a new invocation
const invocation = new Invocation<Return<F>>(id, idempotencyKey, param, opts, version);
const invocation = new Invocation<Return<F>>(name, version, id, idempotencyKey, undefined, param, opts);

// create a new execution
const ctx = new Context(invocation);
const execution = new OrdinaryExecution(invocation, () => func(ctx, ...args));
const promise = execution.execute();

// store the execution and promise,
// store the execution,
// will be used if run is called again with the same id
this.executions[id] = { execution, promise };
this.cache[id] = execution;

return promise;
return execution.execute();
}
}
17 changes: 17 additions & 0 deletions lib/core/encoders/json.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { IEncoder } from "../encoder";
import { ResonateError } from "../errors";

export class JSONEncoder implements IEncoder<unknown, string | undefined> {
encode(data: unknown): string | undefined {
Expand All @@ -12,10 +13,22 @@ export class JSONEncoder implements IEncoder<unknown, string | undefined> {
if (value instanceof AggregateError) {
return {
__type: "aggregate_error",
message: value.message,
stack: value.stack,
name: value.name,
errors: value.errors,
};
}

if (value instanceof ResonateError) {
return {
__type: "resonate_error",
message: value.message,
stack: value.stack,
name: value.name,
code: value.code,
cause: value.cause,
retriable: value.retriable,
};
}

Expand Down Expand Up @@ -44,6 +57,10 @@ export class JSONEncoder implements IEncoder<unknown, string | undefined> {
return Object.assign(new AggregateError(value.errors, value.message), value);
}

if (value?.__type === "resonate_error") {
return Object.assign(new ResonateError(value.message, value.code, value.cause, value.retriable), value);
}

if (value?.__type === "error") {
return Object.assign(new Error(value.message), value);
}
Expand Down
52 changes: 0 additions & 52 deletions lib/core/error.ts

This file was deleted.

37 changes: 37 additions & 0 deletions lib/core/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
export enum ErrorCodes {
// unknown
UNKNOWN = 0,

// canceled
CANCELED = 10,

// timedout
TIMEDOUT = 20,

// killed
KILLED = 30,

// store
STORE = 40,
STORE_PAYLOAD = 41,
STORE_FORBIDDEN = 42,
STORE_NOT_FOUND = 43,
STORE_ALREADY_EXISTS = 44,
STORE_INVALID_STATE = 45,
STORE_ENCODER = 46,
}

export class ResonateError extends Error {
constructor(
message: string,
public readonly code: ErrorCodes,
public readonly cause?: any,
public readonly retriable: boolean = false,
) {
super(message);
}

public static fromError(e: unknown): ResonateError {
return e instanceof ResonateError ? e : new ResonateError("Unknown error", ErrorCodes.UNKNOWN, e, true);
}
}
Loading
Loading