Skip to content

Commit 14042fd

Browse files
authored
add exponential backoff (#687)
1 parent c2b383f commit 14042fd

File tree

10 files changed

+144
-40
lines changed

10 files changed

+144
-40
lines changed

opamp/src/main/java/co/elastic/opamp/client/CentralConfigurationManagerImpl.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,28 @@ public void onConnect(OpampClient client) {
122122
}
123123

124124
@Override
125-
public void onConnectFailed(OpampClient client, Throwable throwable) {
126-
logger.log(Level.INFO, "onConnect({0}, {1})", new Object[] {client, throwable});
125+
public void onConnectFailed(OpampClient client, Throwable throwable, Duration nextTry) {
126+
if (nextTry == null) {
127+
logger.log(Level.INFO, "onConnect({0}, {1})", new Object[] {client, throwable});
128+
} else {
129+
logger.log(
130+
Level.INFO,
131+
"onConnect({0}, {1}, next attempt to connect in {2})",
132+
new Object[] {client, throwable, nextTry});
133+
}
127134
}
128135

129136
@Override
130-
public void onErrorResponse(OpampClient client, Opamp.ServerErrorResponse errorResponse) {
131-
logger.log(Level.INFO, "onErrorResponse({0}, {1})", new Object[] {client, errorResponse});
137+
public void onErrorResponse(
138+
OpampClient client, Opamp.ServerErrorResponse errorResponse, Duration nextTry) {
139+
if (nextTry == null) {
140+
logger.log(Level.INFO, "onErrorResponse({0}, {1})", new Object[] {client, errorResponse});
141+
} else {
142+
logger.log(
143+
Level.INFO,
144+
"onErrorResponse({0}, {1}, next attempt to send in {2})",
145+
new Object[] {client, errorResponse, nextTry});
146+
}
132147
}
133148

134149
public static class Builder {
@@ -175,6 +190,7 @@ public CentralConfigurationManagerImpl build() {
175190
OpampClientBuilder builder = OpampClient.builder();
176191
OkHttpSender httpSender = OkHttpSender.create("http://localhost:4320/v1/opamp");
177192
PeriodicDelay pollingDelay = HttpRequestService.DEFAULT_DELAY_BETWEEN_REQUESTS;
193+
PeriodicDelay retryDelay = PeriodicDelay.ofVariableDuration(pollingDelay.getNextDelay());
178194
if (serviceName != null) {
179195
builder.setServiceName(serviceName);
180196
}
@@ -192,8 +208,9 @@ public CentralConfigurationManagerImpl build() {
192208
}
193209
if (pollingInterval != null) {
194210
pollingDelay = PeriodicDelay.ofFixedDuration(pollingInterval);
211+
retryDelay = PeriodicDelay.ofVariableDuration(pollingInterval);
195212
}
196-
builder.setRequestService(HttpRequestService.create(httpSender, pollingDelay, pollingDelay));
213+
builder.setRequestService(HttpRequestService.create(httpSender, pollingDelay, retryDelay));
197214
return new CentralConfigurationManagerImpl(builder.build());
198215
}
199216
}

opamp/src/main/java/co/elastic/opamp/client/OpampClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package co.elastic.opamp.client;
2020

2121
import co.elastic.opamp.client.response.MessageData;
22+
import java.time.Duration;
2223
import opamp.proto.Opamp;
2324

2425
public interface OpampClient {
@@ -31,7 +32,7 @@ static OpampClientBuilder builder() {
3132
* Starts the client and begin attempts to connect to the Server. Once connection is established
3233
* the client will attempt to maintain it by reconnecting if the connection is lost. All failed
3334
* connection attempts will be reported via {@link Callback#onConnectFailed(OpampClient,
34-
* Throwable)} callback.
35+
* Throwable, Duration)} callback.
3536
*
3637
* <p>This method does not wait until the connection to the Server is established and will likely
3738
* return before the connection attempts are even made.
@@ -78,7 +79,7 @@ interface Callback {
7879
* @param client The relevant {@link co.elastic.opamp.client.OpampClient} instance.
7980
* @param throwable The exception.
8081
*/
81-
void onConnectFailed(OpampClient client, Throwable throwable);
82+
void onConnectFailed(OpampClient client, Throwable throwable, Duration nextTry);
8283

8384
/**
8485
* Called when the Server reports an error in response to some previously sent request. Useful
@@ -89,7 +90,8 @@ interface Callback {
8990
* @param client The relevant {@link co.elastic.opamp.client.OpampClient} instance.
9091
* @param errorResponse The error returned by the Server.
9192
*/
92-
void onErrorResponse(OpampClient client, Opamp.ServerErrorResponse errorResponse);
93+
void onErrorResponse(
94+
OpampClient client, Opamp.ServerErrorResponse errorResponse, Duration nextTry);
9395

9496
/**
9597
* Called when the Agent receives a message that needs processing. See {@link

opamp/src/main/java/co/elastic/opamp/client/internal/OpampClientImpl.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import co.elastic.opamp.client.response.Response;
4141
import co.elastic.opamp.client.state.observer.Observable;
4242
import com.google.protobuf.ByteString;
43+
import java.time.Duration;
4344
import java.util.Arrays;
4445
import java.util.List;
4546
import java.util.concurrent.locks.Lock;
@@ -146,8 +147,8 @@ public void onConnectionSuccess() {
146147
}
147148

148149
@Override
149-
public void onConnectionFailed(Throwable throwable) {
150-
callback.onConnectFailed(this, throwable);
150+
public void onConnectionFailed(Throwable throwable, Duration nextTry) {
151+
callback.onConnectFailed(this, throwable, nextTry);
151152
preserveFailedRequestRecipe();
152153
}
153154

@@ -160,7 +161,7 @@ public void onRequestSuccess(Response response) {
160161
}
161162

162163
@Override
163-
public void onRequestFailed(Throwable throwable) {
164+
public void onRequestFailed(Throwable throwable, Duration nextTry) {
164165
final Opamp.ServerErrorResponse error;
165166
if (throwable == null) {
166167
error =
@@ -175,14 +176,14 @@ public void onRequestFailed(Throwable throwable) {
175176
throwable.getClass().getName() + ": " + throwable.getMessage()))
176177
.build();
177178
}
178-
callback.onErrorResponse(this, error);
179+
callback.onErrorResponse(this, error, nextTry);
179180
preserveFailedRequestRecipe();
180181
}
181182

182183
private void handleResponsePayload(Opamp.ServerToAgent response) {
183184
if (response.hasErrorResponse()) {
184185
Opamp.ServerErrorResponse errorResponse = response.getErrorResponse();
185-
callback.onErrorResponse(this, errorResponse);
186+
callback.onErrorResponse(this, errorResponse, null);
186187
}
187188
long reportFullState = Opamp.ServerToAgentFlags.ServerToAgentFlags_ReportFullState_VALUE;
188189
if ((response.getFlags() & reportFullState) == reportFullState) {

opamp/src/main/java/co/elastic/opamp/client/request/delay/PeriodicDelay.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ static PeriodicDelay ofFixedDuration(Duration duration) {
2525
return new FixedPeriodicDelay(duration);
2626
}
2727

28+
static PeriodicDelay ofVariableDuration(Duration duration) {
29+
return new VariablePeriodicDelay(duration);
30+
}
31+
2832
Duration getNextDelay();
2933

3034
void reset();
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package co.elastic.opamp.client.request.delay;
20+
21+
import java.time.Duration;
22+
23+
final class VariablePeriodicDelay implements PeriodicDelay, AcceptsDelaySuggestion {
24+
private volatile Duration duration;
25+
26+
public VariablePeriodicDelay(Duration duration) {
27+
this.duration = duration;
28+
}
29+
30+
@Override
31+
public Duration getNextDelay() {
32+
return duration;
33+
}
34+
35+
@Override
36+
public void reset() {}
37+
38+
@Override
39+
public void suggestDelay(Duration delay) {
40+
duration = delay;
41+
}
42+
}

opamp/src/main/java/co/elastic/opamp/client/request/service/HttpRequestService.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public final class HttpRequestService implements RequestService, Runnable {
4545
private Supplier<Request> requestSupplier;
4646
private boolean isRunning = false;
4747
private boolean isStopped = false;
48+
private int exponentialBackoffSkips;
4849
public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_REQUESTS =
4950
PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
5051

@@ -114,15 +115,26 @@ public void stop() {
114115
}
115116
}
116117

117-
private void enableRetryMode(Duration suggestedDelay) {
118-
if (retryModeEnabled.compareAndSet(false, true)) {
118+
private void setRetryMode(Duration suggestedDelay) {
119+
retryModeEnabled.set(true);
120+
setRetryModeIfEnabled(suggestedDelay);
121+
}
122+
123+
private void setRetryModeIfEnabled(Duration suggestedDelay) {
124+
if (retryModeEnabled.get()) {
119125
if (suggestedDelay != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) {
120126
((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(suggestedDelay);
121127
}
122128
executor.setPeriodicDelay(periodicRetryDelay);
123129
}
124130
}
125131

132+
private void enableRetryMode(Duration suggestedDelay) {
133+
if (retryModeEnabled.compareAndSet(false, true)) {
134+
setRetryModeIfEnabled(suggestedDelay);
135+
}
136+
}
137+
126138
private void disableRetryMode() {
127139
if (retryModeEnabled.compareAndSet(true, false)) {
128140
executor.setPeriodicDelay(periodicRequestDelay);
@@ -152,25 +164,27 @@ private void doSendRequest() {
152164
agentToServer.getSerializedSize())
153165
.get()) {
154166
if (isSuccessful(response)) {
167+
resetExponentialBackoffSkips();
155168
handleResponse(
156169
Response.create(Opamp.ServerToAgent.parseFrom(response.bodyInputStream())));
157170
} else {
158171
handleHttpError(response);
159172
}
160-
} catch (IOException e) {
161-
callback.onRequestFailed(e);
162173
}
163-
164-
} catch (InterruptedException e) {
165-
callback.onRequestFailed(e);
174+
} catch (IOException | InterruptedException e) {
175+
incrementExponentialBackoff();
176+
callback.onConnectionFailed(e, periodicRetryDelay.getNextDelay());
166177
} catch (ExecutionException e) {
167-
callback.onRequestFailed(e.getCause());
178+
incrementExponentialBackoff();
179+
callback.onConnectionFailed(e.getCause(), periodicRetryDelay.getNextDelay());
168180
}
169181
}
170182

171183
private void handleHttpError(HttpSender.Response response) {
172184
int errorCode = response.statusCode();
173-
callback.onRequestFailed(new HttpErrorException(errorCode, response.statusMessage()));
185+
callback.onRequestFailed(
186+
new HttpErrorException(errorCode, response.statusMessage()),
187+
periodicRetryDelay.getNextDelay());
174188

175189
if (errorCode == 503 || errorCode == 429) {
176190
String retryAfterHeader = response.getHeader("Retry-After");
@@ -208,9 +222,30 @@ private void handleErrorResponse(Opamp.ServerErrorResponse errorResponse) {
208222
retryAfter = Duration.ofNanos(errorResponse.getRetryInfo().getRetryAfterNanoseconds());
209223
}
210224
enableRetryMode(retryAfter);
225+
} else {
226+
incrementExponentialBackoff();
227+
Duration retryAfter = Duration.ofSeconds(30 * exponentialBackoffSkips);
228+
enableRetryMode(retryAfter);
211229
}
212230
}
213231

232+
private void incrementExponentialBackoff() {
233+
if (exponentialBackoffSkips == 0) {
234+
exponentialBackoffSkips = 1;
235+
} else {
236+
exponentialBackoffSkips *= 2;
237+
}
238+
if (exponentialBackoffSkips >= 32) {
239+
exponentialBackoffSkips = 32;
240+
}
241+
setRetryMode(Duration.ofSeconds(30 * exponentialBackoffSkips));
242+
}
243+
244+
private void resetExponentialBackoffSkips() {
245+
exponentialBackoffSkips = 0;
246+
disableRetryMode();
247+
}
248+
214249
private static class ByteArrayWriter implements Consumer<OutputStream> {
215250
private final byte[] data;
216251

opamp/src/main/java/co/elastic/opamp/client/request/service/RequestService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import co.elastic.opamp.client.OpampClient;
2222
import co.elastic.opamp.client.request.Request;
2323
import co.elastic.opamp.client.response.Response;
24+
import java.time.Duration;
2425
import java.util.function.Supplier;
2526

2627
/**
@@ -66,7 +67,7 @@ interface Callback {
6667
*
6768
* @param throwable The detailed error.
6869
*/
69-
void onConnectionFailed(Throwable throwable);
70+
void onConnectionFailed(Throwable throwable, Duration nextTry);
7071

7172
/**
7273
* For WebSocket implementations, this is called every time there's a new message from the
@@ -83,6 +84,6 @@ interface Callback {
8384
*
8485
* @param throwable The detailed error.
8586
*/
86-
void onRequestFailed(Throwable throwable);
87+
void onRequestFailed(Throwable throwable, Duration nextTry);
8788
}
8889
}

opamp/src/main/java/co/elastic/opamp/client/request/service/WebSocketRequestService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private void doSendRequest() {
9797
codedOutput.flush();
9898
webSocket.send(outputStream.toByteArray());
9999
} catch (IOException e) {
100-
callback.onRequestFailed(e);
100+
callback.onRequestFailed(e, null);
101101
}
102102
}
103103

@@ -128,7 +128,7 @@ public void onMessage(WebSocket webSocket, byte[] data) {
128128

129129
callback.onRequestSuccess(Response.create(serverToAgent));
130130
} catch (IOException e) {
131-
callback.onRequestFailed(e);
131+
callback.onRequestFailed(e, null);
132132
}
133133
}
134134

@@ -185,7 +185,7 @@ public void onClosed(WebSocket webSocket) {
185185

186186
@Override
187187
public void onFailure(WebSocket webSocket, Throwable t) {
188-
callback.onConnectionFailed(t);
188+
callback.onConnectionFailed(t, null);
189189
enableRetryMode(null);
190190
}
191191

0 commit comments

Comments
 (0)