15
15
16
16
import static com .google .common .base .Preconditions .checkState ;
17
17
18
+ import com .google .common .collect .ImmutableList ;
18
19
import com .google .common .collect .ImmutableSet ;
19
20
import io .reactivex .rxjava3 .annotations .NonNull ;
20
21
import io .reactivex .rxjava3 .core .Completable ;
21
22
import io .reactivex .rxjava3 .core .Single ;
22
23
import io .reactivex .rxjava3 .core .SingleObserver ;
23
24
import io .reactivex .rxjava3 .disposables .Disposable ;
24
- import io . reactivex . rxjava3 . subjects . AsyncSubject ;
25
+ import java . util . ArrayList ;
25
26
import java .util .HashMap ;
27
+ import java .util .List ;
26
28
import java .util .Map ;
27
29
import java .util .Optional ;
28
- import java .util .concurrent .CancellationException ;
29
30
import java .util .concurrent .atomic .AtomicBoolean ;
30
- import java .util .concurrent .atomic .AtomicInteger ;
31
- import java .util .concurrent .atomic .AtomicReference ;
32
31
import javax .annotation .concurrent .GuardedBy ;
33
32
import javax .annotation .concurrent .ThreadSafe ;
34
33
@@ -55,7 +54,7 @@ public final class AsyncTaskCache<KeyT, ValueT> {
55
54
private final Map <KeyT , ValueT > finished ;
56
55
57
56
@ GuardedBy ("lock" )
58
- private final Map <KeyT , Execution < ValueT > > inProgress ;
57
+ private final Map <KeyT , Execution > inProgress ;
59
58
60
59
public static <KeyT , ValueT > AsyncTaskCache <KeyT , ValueT > create () {
61
60
return new AsyncTaskCache <>();
@@ -91,79 +90,136 @@ public Single<ValueT> executeIfNot(KeyT key, Single<ValueT> task) {
91
90
return execute (key , task , false );
92
91
}
93
92
94
- private static class Execution <ValueT > {
95
- private final AtomicBoolean isTaskDisposed = new AtomicBoolean (false );
96
- private final Single <ValueT > task ;
97
- private final AsyncSubject <ValueT > asyncSubject = AsyncSubject .create ();
98
- private final AtomicInteger referenceCount = new AtomicInteger (0 );
99
- private final AtomicReference <Disposable > taskDisposable = new AtomicReference <>(null );
93
+ /** Returns count of subscribers for a task. */
94
+ public int getSubscriberCount (KeyT key ) {
95
+ synchronized (lock ) {
96
+ Execution task = inProgress .get (key );
97
+ if (task != null ) {
98
+ return task .getSubscriberCount ();
99
+ }
100
+ }
101
+
102
+ return 0 ;
103
+ }
104
+
105
+ class Execution extends Single <ValueT > implements SingleObserver <ValueT > {
106
+ private final KeyT key ;
107
+ private final Single <ValueT > upstream ;
108
+
109
+ @ GuardedBy ("lock" )
110
+ private boolean terminated = false ;
111
+
112
+ @ GuardedBy ("lock" )
113
+ private Disposable upstreamDisposable ;
100
114
101
- Execution (Single <ValueT > task ) {
102
- this .task = task ;
115
+ @ GuardedBy ("lock" )
116
+ private final List <SingleObserver <? super ValueT >> observers = new ArrayList <>();
117
+
118
+ Execution (KeyT key , Single <ValueT > upstream ) {
119
+ this .key = key ;
120
+ this .upstream = upstream ;
103
121
}
104
122
105
- Single <ValueT > executeIfNot () {
106
- checkState (!isTaskDisposed (), "disposed" );
107
-
108
- int subscribed = referenceCount .getAndIncrement ();
109
- if (taskDisposable .get () == null && subscribed == 0 ) {
110
- task .subscribe (
111
- new SingleObserver <ValueT >() {
112
- @ Override
113
- public void onSubscribe (@ NonNull Disposable d ) {
114
- taskDisposable .compareAndSet (null , d );
115
- }
116
-
117
- @ Override
118
- public void onSuccess (@ NonNull ValueT value ) {
119
- asyncSubject .onNext (value );
120
- asyncSubject .onComplete ();
121
- }
122
-
123
- @ Override
124
- public void onError (@ NonNull Throwable e ) {
125
- asyncSubject .onError (e );
126
- }
127
- });
123
+ int getSubscriberCount () {
124
+ synchronized (lock ) {
125
+ return observers .size ();
128
126
}
127
+ }
128
+
129
+ @ Override
130
+ protected void subscribeActual (@ NonNull SingleObserver <? super ValueT > observer ) {
131
+ synchronized (lock ) {
132
+ checkState (!terminated , "terminated" );
133
+
134
+ boolean shouldSubscribe = observers .isEmpty ();
135
+
136
+ observers .add (observer );
137
+
138
+ observer .onSubscribe (new ExecutionDisposable (this , observer ));
129
139
130
- return Single .fromObservable (asyncSubject );
140
+ if (shouldSubscribe ) {
141
+ upstream .subscribe (this );
142
+ }
143
+ }
131
144
}
132
145
133
- boolean isTaskTerminated () {
134
- return asyncSubject .hasComplete () || asyncSubject .hasThrowable ();
146
+ @ Override
147
+ public void onSubscribe (@ NonNull Disposable d ) {
148
+ synchronized (lock ) {
149
+ upstreamDisposable = d ;
150
+
151
+ if (terminated ) {
152
+ d .dispose ();
153
+ }
154
+ }
135
155
}
136
156
137
- boolean isTaskDisposed () {
138
- return isTaskDisposed .get ();
157
+ @ Override
158
+ public void onSuccess (@ NonNull ValueT value ) {
159
+ synchronized (lock ) {
160
+ if (!terminated ) {
161
+ inProgress .remove (key );
162
+ finished .put (key , value );
163
+ terminated = true ;
164
+
165
+ for (SingleObserver <? super ValueT > observer : ImmutableList .copyOf (observers )) {
166
+ observer .onSuccess (value );
167
+ }
168
+ }
169
+ }
139
170
}
140
171
141
- void tryDisposeTask () {
142
- checkState (!isTaskDisposed (), "disposed" );
143
- checkState (!isTaskTerminated (), "terminated" );
172
+ @ Override
173
+ public void onError (@ NonNull Throwable error ) {
174
+ synchronized (lock ) {
175
+ if (!terminated ) {
176
+ inProgress .remove (key );
177
+ terminated = true ;
144
178
145
- if (referenceCount .decrementAndGet () == 0 ) {
146
- isTaskDisposed .set (true );
147
- asyncSubject .onError (new CancellationException ("disposed" ));
179
+ for (SingleObserver <? super ValueT > observer : ImmutableList .copyOf (observers )) {
180
+ observer .onError (error );
181
+ }
182
+ }
183
+ }
184
+ }
148
185
149
- Disposable d = taskDisposable .get ();
150
- if (d != null ) {
151
- d .dispose ();
186
+ void remove (SingleObserver <? super ValueT > observer ) {
187
+ synchronized (lock ) {
188
+ observers .remove (observer );
189
+
190
+ if (observers .isEmpty () && !terminated ) {
191
+ inProgress .remove (key );
192
+ terminated = true ;
193
+
194
+ if (upstreamDisposable != null ) {
195
+ upstreamDisposable .dispose ();
196
+ }
152
197
}
153
198
}
154
199
}
155
200
}
156
201
157
- /** Returns count of subscribers for a task. */
158
- public int getSubscriberCount (KeyT key ) {
159
- synchronized (lock ) {
160
- Execution <ValueT > execution = inProgress .get (key );
161
- if (execution != null ) {
162
- return execution .referenceCount .get ();
202
+ class ExecutionDisposable implements Disposable {
203
+ final Execution execution ;
204
+ final SingleObserver <? super ValueT > observer ;
205
+ AtomicBoolean isDisposed = new AtomicBoolean (false );
206
+
207
+ ExecutionDisposable (Execution execution , SingleObserver <? super ValueT > observer ) {
208
+ this .execution = execution ;
209
+ this .observer = observer ;
210
+ }
211
+
212
+ @ Override
213
+ public void dispose () {
214
+ if (isDisposed .compareAndSet (false , true )) {
215
+ execution .remove (observer );
163
216
}
164
217
}
165
218
166
- return 0 ;
219
+ @ Override
220
+ public boolean isDisposed () {
221
+ return isDisposed .get ();
222
+ }
167
223
}
168
224
169
225
/**
@@ -185,62 +241,34 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
185
241
186
242
finished .remove (key );
187
243
188
- Execution <ValueT > execution =
189
- inProgress .computeIfAbsent (
190
- key ,
191
- ignoredKey -> {
192
- AtomicInteger subscribeTimes = new AtomicInteger (0 );
193
- return new Execution <>(
194
- Single .defer (
195
- () -> {
196
- int times = subscribeTimes .incrementAndGet ();
197
- checkState (times == 1 , "Subscribed more than once to the task" );
198
- return task ;
199
- }));
200
- });
201
-
202
- execution
203
- .executeIfNot ()
204
- .subscribe (
205
- new SingleObserver <ValueT >() {
206
- @ Override
207
- public void onSubscribe (@ NonNull Disposable d ) {
208
- emitter .setCancellable (
209
- () -> {
210
- d .dispose ();
211
-
212
- if (!execution .isTaskTerminated ()) {
213
- synchronized (lock ) {
214
- execution .tryDisposeTask ();
215
- if (execution .isTaskDisposed ()) {
216
- inProgress .remove (key );
217
- }
218
- }
219
- }
220
- });
221
- }
222
-
223
- @ Override
224
- public void onSuccess (@ NonNull ValueT value ) {
225
- synchronized (lock ) {
226
- finished .put (key , value );
227
- inProgress .remove (key );
228
- }
229
-
230
- emitter .onSuccess (value );
231
- }
232
-
233
- @ Override
234
- public void onError (@ NonNull Throwable e ) {
235
- synchronized (lock ) {
236
- inProgress .remove (key );
237
- }
238
-
239
- if (!emitter .isDisposed ()) {
240
- emitter .onError (e );
241
- }
242
- }
243
- });
244
+ Execution execution =
245
+ inProgress .computeIfAbsent (key , ignoredKey -> new Execution (key , task ));
246
+
247
+ // We must subscribe the execution within the scope of lock to avoid race condition
248
+ // that:
249
+ // 1. Two callers get the same execution instance
250
+ // 2. One decides to dispose the execution, since no more observers, the execution
251
+ // will change to the terminate state
252
+ // 3. Another one try to subscribe, will get "terminated" error.
253
+ execution .subscribe (
254
+ new SingleObserver <ValueT >() {
255
+ @ Override
256
+ public void onSubscribe (@ NonNull Disposable d ) {
257
+ emitter .setDisposable (d );
258
+ }
259
+
260
+ @ Override
261
+ public void onSuccess (@ NonNull ValueT valueT ) {
262
+ emitter .onSuccess (valueT );
263
+ }
264
+
265
+ @ Override
266
+ public void onError (@ NonNull Throwable e ) {
267
+ if (!emitter .isDisposed ()) {
268
+ emitter .onError (e );
269
+ }
270
+ }
271
+ });
244
272
}
245
273
});
246
274
}
0 commit comments