@@ -27,7 +27,15 @@ export abstract class Execution<T> {
27
27
return this . promise ;
28
28
}
29
29
30
+ // each execution has a fork phase and a join phase
31
+
32
+ // fork phase is responsible for spawning the invocation,
33
+ // which in general means creating a durable promise (if necessary)
30
34
const forkPromise = this . fork ( ) ;
35
+
36
+ // join phase is responsible for awaiting the invocation,
37
+ // if the invocation is backed by a durable promise the durable
38
+ // promise is completed before the invocation is completed
31
39
const joinPromise = forkPromise . then ( ( f ) => this . join ( f ) ) ;
32
40
33
41
this . promise = new ResonatePromise ( this . invocation . id , forkPromise , joinPromise ) ;
@@ -82,15 +90,10 @@ export class OrdinaryExecution<T> extends Execution<T> {
82
90
}
83
91
84
92
protected async fork ( ) {
85
- try {
86
- // acquire lock if necessary
87
- while ( this . invocation . opts . lock && ! ( await this . acquireLock ( ) ) ) {
88
- await new Promise ( ( resolve ) => setTimeout ( resolve , this . invocation . opts . poll ) ) ;
89
- }
90
-
91
- if ( this . invocation . opts . durable ) {
92
- // if durable, create a durable promise
93
- const promise =
93
+ if ( this . invocation . opts . durable ) {
94
+ // if durable, create a durable promise
95
+ try {
96
+ this . durablePromise =
94
97
this . durablePromise ??
95
98
( await DurablePromise . create < T > (
96
99
this . resonate . store . promises ,
@@ -104,24 +107,59 @@ export class OrdinaryExecution<T> extends Execution<T> {
104
107
tags : this . invocation . opts . tags ,
105
108
} ,
106
109
) ) ;
110
+ } catch ( e ) {
111
+ // if an error occurs, kill the execution
112
+ this . kill ( e ) ;
113
+ }
114
+ }
107
115
108
- // override the invocation timeout
109
- this . invocation . timeout = promise . timeout ;
116
+ if ( this . durablePromise ) {
117
+ // override the invocation creation time
118
+ this . invocation . createdOn = this . durablePromise . createdOn ;
119
+
120
+ // override the invocation timeout
121
+ this . invocation . timeout = this . durablePromise . timeout ;
122
+ }
123
+
124
+ return this . invocation . future ;
125
+ }
126
+
127
+ protected async join ( future : Future < T > ) {
128
+ try {
129
+ // acquire lock if necessary
130
+ while ( this . invocation . opts . lock && ! ( await this . acquireLock ( ) ) ) {
131
+ await new Promise ( ( resolve ) => setTimeout ( resolve , this . invocation . opts . poll ) ) ;
132
+ }
133
+
134
+ if ( this . durablePromise ) {
135
+ if ( this . durablePromise . pending ) {
136
+ // if durable and pending, invoke the function and resolve/reject the durable promise
137
+ let value ! : T ;
138
+ let error : any ;
139
+
140
+ // we need to hold on to a boolean to determine if the function was successful,
141
+ // we cannot rely on the value or error as these values could be undefined
142
+ let success = true ;
110
143
111
- if ( promise . pending ) {
112
- // if pending, invoke the function and resolve/reject the durable promise
113
144
try {
114
- await promise . resolve ( await this . run ( ) , { idempotencyKey : this . invocation . idempotencyKey } ) ;
145
+ value = await this . run ( ) ;
115
146
} catch ( e ) {
116
- await promise . reject ( e , { idempotencyKey : this . invocation . idempotencyKey } ) ;
147
+ error = e ;
148
+ success = false ;
149
+ }
150
+
151
+ if ( success ) {
152
+ await this . durablePromise . resolve ( value , { idempotencyKey : this . invocation . idempotencyKey } ) ;
153
+ } else {
154
+ await this . durablePromise . reject ( error , { idempotencyKey : this . invocation . idempotencyKey } ) ;
117
155
}
118
156
}
119
157
120
- // resolve/reject the invocation
121
- if ( promise . resolved ) {
122
- this . invocation . resolve ( promise . value ( ) ) ;
123
- } else if ( promise . rejected || promise . canceled || promise . timedout ) {
124
- this . invocation . reject ( promise . error ( ) ) ;
158
+ // if durable resolve/reject the invocation
159
+ if ( this . durablePromise . resolved ) {
160
+ this . invocation . resolve ( this . durablePromise . value ( ) ) ;
161
+ } else if ( this . durablePromise . rejected || this . durablePromise . canceled || this . durablePromise . timedout ) {
162
+ this . invocation . reject ( this . durablePromise . error ( ) ) ;
125
163
}
126
164
} else {
127
165
// if not durable, invoke the function and resolve/reject the invocation
@@ -141,10 +179,6 @@ export class OrdinaryExecution<T> extends Execution<T> {
141
179
this . kill ( e ) ;
142
180
}
143
181
144
- return this . invocation . future ;
145
- }
146
-
147
- protected async join ( future : Future < T > ) {
148
182
return await future . promise ;
149
183
}
150
184
@@ -171,10 +205,12 @@ export class OrdinaryExecution<T> extends Execution<T> {
171
205
}
172
206
173
207
export class DeferredExecution < T > extends Execution < T > {
208
+ private durablePromise : DurablePromise < T > | null = null ;
209
+
174
210
protected async fork ( ) {
175
211
try {
176
212
// create a durable promise
177
- const promise = await DurablePromise . create < T > (
213
+ this . durablePromise = await DurablePromise . create < T > (
178
214
this . resonate . store . promises ,
179
215
this . invocation . opts . encoder ,
180
216
this . invocation . id ,
@@ -188,13 +224,11 @@ export class DeferredExecution<T> extends Execution<T> {
188
224
} ,
189
225
) ;
190
226
191
- // override the invocation timeout
192
- this . invocation . timeout = promise . timeout ;
227
+ // override the invocation creation time
228
+ this . invocation . createdOn = this . durablePromise . createdOn ;
193
229
194
- // poll the completion of the durable promise
195
- promise . completed . then ( ( p ) =>
196
- p . resolved ? this . invocation . resolve ( p . value ( ) ) : this . invocation . reject ( p . error ( ) ) ,
197
- ) ;
230
+ // override the invocation timeout
231
+ this . invocation . timeout = this . durablePromise . timeout ;
198
232
} catch ( e ) {
199
233
// if an error occurs, kill the execution
200
234
this . kill ( e ) ;
@@ -204,6 +238,17 @@ export class DeferredExecution<T> extends Execution<T> {
204
238
}
205
239
206
240
protected async join ( future : Future < T > ) {
241
+ if ( this . durablePromise ) {
242
+ // poll the completion of the durable promise
243
+ await this . durablePromise . completed ;
244
+
245
+ if ( this . durablePromise . resolved ) {
246
+ this . invocation . resolve ( this . durablePromise . value ( ) ) ;
247
+ } else {
248
+ this . invocation . reject ( this . durablePromise . error ( ) ) ;
249
+ }
250
+ }
251
+
207
252
return await future . promise ;
208
253
}
209
254
}
@@ -237,16 +282,21 @@ export class GeneratorExecution<T> extends Execution<T> {
237
282
} ,
238
283
) ) ;
239
284
240
- // override the invocation timeout
241
- this . invocation . timeout = this . durablePromise . timeout ;
242
-
243
285
// resolve/reject the invocation if already completed
244
286
if ( this . durablePromise . resolved ) {
245
287
this . invocation . resolve ( this . durablePromise . value ( ) ) ;
246
288
} else if ( this . durablePromise . rejected || this . durablePromise . canceled || this . durablePromise . timedout ) {
247
289
this . invocation . reject ( this . durablePromise . error ( ) ) ;
248
290
}
249
291
}
292
+
293
+ if ( this . durablePromise ) {
294
+ // override the invocation creation time
295
+ this . invocation . createdOn = this . durablePromise . createdOn ;
296
+
297
+ // override the invocation timeout
298
+ this . invocation . timeout = this . durablePromise . timeout ;
299
+ }
250
300
} catch ( e ) {
251
301
// if an error occurs, kill the execution
252
302
this . kill ( e ) ;
0 commit comments