@@ -291,57 +291,7 @@ public Single<ValueT> execute(
291
291
Action onAlreadyRunning ,
292
292
Action onAlreadyFinished ,
293
293
boolean force ) {
294
- return Single .create (
295
- emitter -> {
296
- synchronized (lock ) {
297
- if (state != STATE_ACTIVE ) {
298
- emitter .onError (new CancellationException ("already shutdown" ));
299
- return ;
300
- }
301
-
302
- if (!force && finished .containsKey (key )) {
303
- onAlreadyFinished .run ();
304
- emitter .onSuccess (finished .get (key ));
305
- return ;
306
- }
307
-
308
- finished .remove (key );
309
-
310
- Execution execution = inProgress .get (key );
311
- if (execution != null ) {
312
- onAlreadyRunning .run ();
313
- } else {
314
- execution = new Execution (key , task );
315
- inProgress .put (key , execution );
316
- }
317
-
318
- // We must subscribe the execution within the scope of lock to avoid race condition
319
- // that:
320
- // 1. Two callers get the same execution instance
321
- // 2. One decides to dispose the execution, since no more observers, the execution
322
- // will change to the terminate state
323
- // 3. Another one try to subscribe, will get "terminated" error.
324
- execution .subscribe (
325
- new SingleObserver <ValueT >() {
326
- @ Override
327
- public void onSubscribe (@ NonNull Disposable d ) {
328
- emitter .setDisposable (d );
329
- }
330
-
331
- @ Override
332
- public void onSuccess (@ NonNull ValueT valueT ) {
333
- emitter .onSuccess (valueT );
334
- }
335
-
336
- @ Override
337
- public void onError (@ NonNull Throwable e ) {
338
- if (!emitter .isDisposed ()) {
339
- emitter .onError (e );
340
- }
341
- }
342
- });
343
- }
344
- });
294
+ return task
345
295
}
346
296
347
297
/**
0 commit comments