Skip to content

Commit e6c2aa1

Browse files
authored
fix(datastore): Prevent stop from interrupting subsequent start call (#2569)
1 parent be07335 commit e6c2aa1

File tree

1 file changed

+37
-25
lines changed

1 file changed

+37
-25
lines changed

aws-api/src/main/java/com/amplifyframework/api/aws/SubscriptionEndpoint.java

+37-25
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ final class SubscriptionEndpoint {
7373
private final TimeoutWatchdog timeoutWatchdog;
7474
private final Set<String> pendingSubscriptionIds;
7575
private final OkHttpClient okHttpClient;
76+
private final Object webSocketLock = new Object();
7677
private WebSocket webSocket;
7778
private AmplifyWebSocketListener webSocketListener;
7879

@@ -126,26 +127,35 @@ synchronized <T> void requestSubscription(
126127
Objects.requireNonNull(onSubscriptionError);
127128
Objects.requireNonNull(onSubscriptionComplete);
128129

129-
// The first call to subscribe OR a disconnected websocket listener will
130-
// force a new connection to be created.
131-
if (webSocketListener == null || webSocketListener.isDisconnectedState()) {
132-
webSocketListener = new AmplifyWebSocketListener();
133-
try {
134-
webSocket = okHttpClient.newWebSocket(new Request.Builder()
135-
.url(buildConnectionRequestUrl(authType))
136-
.addHeader("Sec-WebSocket-Protocol", "graphql-ws")
137-
.header("User-Agent", UserAgent.string())
138-
.build(), webSocketListener);
139-
} catch (ApiException apiException) {
140-
onSubscriptionError.accept(apiException);
141-
return;
130+
final String subscriptionId = UUID.randomUUID().toString();
131+
final AmplifyWebSocketListener socketListener;
132+
final WebSocket socket;
133+
134+
synchronized (webSocketLock) {
135+
// The first call to subscribe OR a disconnected websocket listener will
136+
// force a new connection to be created.
137+
if (webSocketListener == null || webSocketListener.isDisconnectedState()) {
138+
webSocketListener = new AmplifyWebSocketListener();
139+
try {
140+
webSocket = okHttpClient.newWebSocket(new Request.Builder()
141+
.url(buildConnectionRequestUrl(authType))
142+
.addHeader("Sec-WebSocket-Protocol", "graphql-ws")
143+
.header("User-Agent", UserAgent.string())
144+
.build(), webSocketListener);
145+
} catch (ApiException apiException) {
146+
onSubscriptionError.accept(apiException);
147+
return;
148+
}
149+
142150
}
143151

152+
pendingSubscriptionIds.add(subscriptionId);
153+
socketListener = webSocketListener;
154+
socket = webSocket;
144155
}
145-
final String subscriptionId = UUID.randomUUID().toString();
146-
pendingSubscriptionIds.add(subscriptionId);
156+
147157
// Every request waits here for the connection to be ready.
148-
Connection connection = webSocketListener.waitForConnectionReady();
158+
Connection connection = socketListener.waitForConnectionReady();
149159
if (connection.hasFailure()) {
150160
// If the latch didn't count all the way down
151161
if (pendingSubscriptionIds.remove(subscriptionId)) {
@@ -166,7 +176,7 @@ synchronized <T> void requestSubscription(
166176
.put("authorization", authorizer.createHeadersForSubscription(request, authType))))
167177
.toString();
168178

169-
webSocket.send(jsonMessage);
179+
socket.send(jsonMessage);
170180
} catch (JSONException | ApiException exception) {
171181
// If the subscriptionId was still pending, then we can call the onSubscriptionError
172182
if (pendingSubscriptionIds.remove(subscriptionId)) {
@@ -273,8 +283,8 @@ synchronized void releaseSubscription(String subscriptionId) throws ApiException
273283

274284
// Only do this if the subscription was NOT pending.
275285
// Otherwise it would probably fail since it was never established in the first place.
276-
277-
if (!wasSubscriptionPending && !webSocketListener.isDisconnectedState()) {
286+
final AmplifyWebSocketListener socketListener = webSocketListener;
287+
if (!wasSubscriptionPending && socketListener != null && !socketListener.isDisconnectedState()) {
278288
try {
279289
String jsonMessage = new JSONObject()
280290
.put("type", "stop")
@@ -292,13 +302,15 @@ synchronized void releaseSubscription(String subscriptionId) throws ApiException
292302
subscription.awaitSubscriptionCompleted();
293303
}
294304

295-
subscriptions.remove(subscriptionId);
296-
297305
// If we have zero subscriptions, close the WebSocket
298-
if (subscriptions.size() == 0) {
299-
LOG.info("No more active subscriptions. Closing web socket.");
300-
timeoutWatchdog.stop();
301-
webSocket.close(NORMAL_CLOSURE_STATUS, "No active subscriptions");
306+
synchronized (webSocketLock) {
307+
subscriptions.remove(subscriptionId);
308+
if (subscriptions.isEmpty() && pendingSubscriptionIds.isEmpty()) {
309+
LOG.info("No more active subscriptions. Closing web socket.");
310+
timeoutWatchdog.stop();
311+
webSocket.close(NORMAL_CLOSURE_STATUS, "No active subscriptions");
312+
webSocketListener = null;
313+
}
302314
}
303315
}
304316

0 commit comments

Comments
 (0)