Skip to content

Commit 4a9e051

Browse files
zeebe-bors-camunda[bot]romansmirnov
zeebe-bors-camunda[bot]
andauthored
merge: #8959
8959: [Backport stable/1.3] #3631: Re-activate jobs r=romansmirnov a=github-actions[bot] # Description Backport of #8879 to `stable/1.3`. relates to #3631 Co-authored-by: Roman <[email protected]>
2 parents 86564c7 + 78ac46a commit 4a9e051

File tree

13 files changed

+864
-264
lines changed

13 files changed

+864
-264
lines changed

gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java

+25-9
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.camunda.zeebe.gateway.interceptors.impl.DecoratedInterceptor;
2626
import io.camunda.zeebe.gateway.interceptors.impl.InterceptorRepository;
2727
import io.camunda.zeebe.gateway.query.impl.QueryApiImpl;
28+
import io.camunda.zeebe.util.sched.Actor;
29+
import io.camunda.zeebe.util.sched.ActorControl;
2830
import io.camunda.zeebe.util.sched.ActorSchedulingService;
2931
import io.grpc.BindableService;
3032
import io.grpc.Server;
@@ -38,7 +40,9 @@
3840
import java.time.Duration;
3941
import java.util.Collections;
4042
import java.util.List;
43+
import java.util.concurrent.CompletableFuture;
4144
import java.util.concurrent.TimeUnit;
45+
import java.util.function.Consumer;
4246
import java.util.function.Function;
4347
import java.util.stream.Collectors;
4448
import me.dinowernli.grpc.prometheus.Configuration;
@@ -110,15 +114,8 @@ public void start() throws IOException {
110114
healthManager.setStatus(Status.STARTING);
111115
brokerClient = buildBrokerClient();
112116

113-
final ActivateJobsHandler activateJobsHandler;
114-
if (gatewayCfg.getLongPolling().isEnabled()) {
115-
final LongPollingActivateJobsHandler longPollingHandler =
116-
buildLongPollingHandler(brokerClient);
117-
actorSchedulingService.submitActor(longPollingHandler);
118-
activateJobsHandler = longPollingHandler;
119-
} else {
120-
activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient);
121-
}
117+
final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
118+
submitActorToActivateJobs((Consumer<ActorControl>) activateJobsHandler);
122119

123120
final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
124121
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
@@ -189,6 +186,25 @@ private BrokerClient buildBrokerClient() {
189186
return brokerClientFactory.apply(gatewayCfg);
190187
}
191188

189+
private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
190+
final var actorStartedFuture = new CompletableFuture<ActorControl>();
191+
final var actor =
192+
Actor.newActor()
193+
.name("ActivateJobsHandler")
194+
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
195+
.build();
196+
actorSchedulingService.submitActor(actor);
197+
actorStartedFuture.join();
198+
}
199+
200+
private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
201+
if (gatewayCfg.getLongPolling().isEnabled()) {
202+
return buildLongPollingHandler(brokerClient);
203+
} else {
204+
return new RoundRobinActivateJobsHandler(brokerClient);
205+
}
206+
}
207+
192208
private LongPollingActivateJobsHandler buildLongPollingHandler(final BrokerClient brokerClient) {
193209
return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
194210
}

gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/ActivateJobsHandler.java

+10
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
1111
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
1212
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
13+
import java.util.concurrent.atomic.AtomicLong;
1314

1415
/** Can handle an 'activate jobs' request from a client. */
1516
public interface ActivateJobsHandler {
1617

18+
static final AtomicLong ACTIVATE_JOBS_REQUEST_ID_GENERATOR = new AtomicLong(1);
19+
1720
/**
1821
* Handle activate jobs request from a client
1922
*
@@ -22,4 +25,11 @@ public interface ActivateJobsHandler {
2225
*/
2326
void activateJobs(
2427
ActivateJobsRequest request, ServerStreamObserver<ActivateJobsResponse> responseObserver);
28+
29+
public static InflightActivateJobsRequest toInflightActivateJobsRequest(
30+
final ActivateJobsRequest request,
31+
final ServerStreamObserver<ActivateJobsResponse> responseObserver) {
32+
return new InflightActivateJobsRequest(
33+
ACTIVATE_JOBS_REQUEST_ID_GENERATOR.getAndIncrement(), request, responseObserver);
34+
}
2535
}

gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InFlightLongPollingActivateJobsRequestsState.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ public final class InFlightLongPollingActivateJobsRequestsState {
1818

1919
private final String jobType;
2020
private final LongPollingMetrics metrics;
21-
private final Queue<LongPollingActivateJobsRequest> activeRequests = new LinkedList<>();
22-
private final Queue<LongPollingActivateJobsRequest> pendingRequests = new LinkedList<>();
23-
private final Set<LongPollingActivateJobsRequest> activeRequestsToBeRepeated = new HashSet<>();
21+
private final Queue<InflightActivateJobsRequest> activeRequests = new LinkedList<>();
22+
private final Queue<InflightActivateJobsRequest> pendingRequests = new LinkedList<>();
23+
private final Set<InflightActivateJobsRequest> activeRequestsToBeRepeated = new HashSet<>();
2424
private int failedAttempts;
2525
private long lastUpdatedTime;
2626

@@ -60,14 +60,14 @@ public long getLastUpdatedTime() {
6060
return lastUpdatedTime;
6161
}
6262

63-
public void enqueueRequest(final LongPollingActivateJobsRequest request) {
63+
public void enqueueRequest(final InflightActivateJobsRequest request) {
6464
if (!pendingRequests.contains(request)) {
6565
pendingRequests.offer(request);
6666
}
6767
removeObsoleteRequestsAndUpdateMetrics();
6868
}
6969

70-
public Queue<LongPollingActivateJobsRequest> getPendingRequests() {
70+
public Queue<InflightActivateJobsRequest> getPendingRequests() {
7171
removeObsoleteRequestsAndUpdateMetrics();
7272
return pendingRequests;
7373
}
@@ -79,29 +79,32 @@ private void removeObsoleteRequestsAndUpdateMetrics() {
7979
metrics.setBlockedRequestsCount(jobType, pendingRequests.size());
8080
}
8181

82-
private boolean isObsolete(final LongPollingActivateJobsRequest request) {
83-
return request.isTimedOut() || request.isCanceled() || request.isCompleted();
82+
private boolean isObsolete(final InflightActivateJobsRequest request) {
83+
return request.isTimedOut()
84+
|| request.isCanceled()
85+
|| request.isCompleted()
86+
|| request.isAborted();
8487
}
8588

86-
public void removeRequest(final LongPollingActivateJobsRequest request) {
89+
public void removeRequest(final InflightActivateJobsRequest request) {
8790
pendingRequests.remove(request);
8891
removeObsoleteRequestsAndUpdateMetrics();
8992
}
9093

91-
public LongPollingActivateJobsRequest getNextPendingRequest() {
94+
public InflightActivateJobsRequest getNextPendingRequest() {
9295
removeObsoleteRequestsAndUpdateMetrics();
93-
final LongPollingActivateJobsRequest request = pendingRequests.poll();
96+
final InflightActivateJobsRequest request = pendingRequests.poll();
9497
metrics.setBlockedRequestsCount(jobType, pendingRequests.size());
9598
return request;
9699
}
97100

98-
public void addActiveRequest(final LongPollingActivateJobsRequest request) {
101+
public void addActiveRequest(final InflightActivateJobsRequest request) {
99102
activeRequests.offer(request);
100103
pendingRequests.remove(request);
101104
activeRequestsToBeRepeated.remove(request);
102105
}
103106

104-
public void removeActiveRequest(final LongPollingActivateJobsRequest request) {
107+
public void removeActiveRequest(final InflightActivateJobsRequest request) {
105108
activeRequests.remove(request);
106109
activeRequestsToBeRepeated.remove(request);
107110
}
@@ -116,7 +119,7 @@ public boolean hasActiveRequests() {
116119
* attempts were reset to 0 (because new jobs became available) whilst the request was running,
117120
* and if the request's long polling is enabled.
118121
*/
119-
public boolean shouldBeRepeated(final LongPollingActivateJobsRequest request) {
122+
public boolean shouldBeRepeated(final InflightActivateJobsRequest request) {
120123
return activeRequestsToBeRepeated.contains(request) && !request.isLongPollingDisabled();
121124
}
122125

gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsRequest.java renamed to gateway/src/main/java/io/camunda/zeebe/gateway/impl/job/InflightActivateJobsRequest.java

+38-10
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
1414
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
1515
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
16+
import io.camunda.zeebe.util.Either;
1617
import io.camunda.zeebe.util.sched.ScheduledTimer;
1718
import java.time.Duration;
1819
import java.util.Objects;
1920
import org.slf4j.Logger;
2021

21-
public final class LongPollingActivateJobsRequest {
22+
public class InflightActivateJobsRequest {
2223

2324
private static final Logger LOG = Loggers.GATEWAY_LOGGER;
2425
private final long requestId;
@@ -32,8 +33,9 @@ public final class LongPollingActivateJobsRequest {
3233
private ScheduledTimer scheduledTimer;
3334
private boolean isTimedOut;
3435
private boolean isCompleted;
36+
private boolean isAborted;
3537

36-
public LongPollingActivateJobsRequest(
38+
public InflightActivateJobsRequest(
3739
final long requestId,
3840
final ActivateJobsRequest request,
3941
final ServerStreamObserver<ActivateJobsResponse> responseObserver) {
@@ -47,7 +49,7 @@ public LongPollingActivateJobsRequest(
4749
request.getRequestTimeout());
4850
}
4951

50-
private LongPollingActivateJobsRequest(
52+
private InflightActivateJobsRequest(
5153
final long requestId,
5254
final BrokerActivateJobsRequest request,
5355
final ServerStreamObserver<ActivateJobsResponse> responseObserver,
@@ -66,7 +68,7 @@ private LongPollingActivateJobsRequest(
6668
}
6769

6870
public void complete() {
69-
if (isCompleted() || isCanceled()) {
71+
if (!isOpen()) {
7072
return;
7173
}
7274
cancelTimerIfScheduled();
@@ -82,26 +84,44 @@ public boolean isCompleted() {
8284
return isCompleted;
8385
}
8486

85-
public void onResponse(final ActivateJobsResponse grpcResponse) {
86-
if (!(isCompleted() || isCanceled())) {
87+
/**
88+
* Sends activated jobs to the respective client.
89+
*
90+
* @param activatedJobs to send back to the client
91+
* @return an instance of {@link Either} indicating the following:
92+
* <ul>
93+
* <li>{@link Either#get() == true}: if the activated jobs have been sent back to the client
94+
* <li>{@link Either#get() == false}: if the activated jobs couldn't be sent back to the
95+
* client
96+
* <li>{@link Either#getLeft() != null}: if sending back the activated jobs failed with an
97+
* exception (note: in this case {@link Either#isRight() == false})
98+
* </ul>
99+
*/
100+
public Either<Exception, Boolean> tryToSendActivatedJobs(
101+
final ActivateJobsResponse activatedJobs) {
102+
if (isOpen()) {
87103
try {
88-
responseObserver.onNext(grpcResponse);
104+
responseObserver.onNext(activatedJobs);
105+
return Either.right(true);
89106
} catch (final Exception e) {
90107
LOG.warn("Failed to send response to client.", e);
108+
return Either.left(e);
91109
}
92110
}
111+
return Either.right(false);
93112
}
94113

95114
public void onError(final Throwable error) {
96-
if (isCompleted() || isCanceled()) {
115+
if (!isOpen()) {
97116
return;
98117
}
99118
cancelTimerIfScheduled();
100119
try {
101120
responseObserver.onError(error);
102121
} catch (final Exception e) {
103-
LOG.warn("Failed to send response to client.", e);
122+
LOG.warn("Failed to send terminating error to client.", e);
104123
}
124+
isAborted = true;
105125
}
106126

107127
public void timeout() {
@@ -163,6 +183,14 @@ private void cancelTimerIfScheduled() {
163183
}
164184
}
165185

186+
public boolean isAborted() {
187+
return isAborted;
188+
}
189+
190+
public boolean isOpen() {
191+
return !(isCompleted() || isCanceled() || isAborted());
192+
}
193+
166194
@Override
167195
public int hashCode() {
168196
return Objects.hash(jobType, maxJobsToActivate, requestId, worker);
@@ -179,7 +207,7 @@ public boolean equals(Object obj) {
179207
if (getClass() != obj.getClass()) {
180208
return false;
181209
}
182-
final var other = (LongPollingActivateJobsRequest) obj;
210+
final var other = (InflightActivateJobsRequest) obj;
183211
return Objects.equals(jobType, other.jobType)
184212
&& maxJobsToActivate == other.maxJobsToActivate
185213
&& requestId == other.requestId
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
3+
* one or more contributor license agreements. See the NOTICE file distributed
4+
* with this work for additional information regarding copyright ownership.
5+
* Licensed under the Zeebe Community License 1.1. You may not use this file
6+
* except in compliance with the Zeebe Community License 1.1.
7+
*/
8+
package io.camunda.zeebe.gateway.impl.job;
9+
10+
import io.camunda.zeebe.gateway.impl.broker.PartitionIdIterator;
11+
12+
public class InflightActivateJobsRequestState {
13+
14+
private final PartitionIdIterator iterator;
15+
private int remainingAmount;
16+
private boolean pollPrevPartition;
17+
private boolean resourceExhaustedWasPresent;
18+
19+
public InflightActivateJobsRequestState(
20+
final PartitionIdIterator iterator, final int remainingAmount) {
21+
this.iterator = iterator;
22+
this.remainingAmount = remainingAmount;
23+
}
24+
25+
private boolean hasNextPartition() {
26+
return iterator.hasNext();
27+
}
28+
29+
public int getCurrentPartition() {
30+
return iterator.getCurrentPartitionId();
31+
}
32+
33+
public int getNextPartition() {
34+
return pollPrevPartition ? iterator.getCurrentPartitionId() : iterator.next();
35+
}
36+
37+
public int getRemainingAmount() {
38+
return remainingAmount;
39+
}
40+
41+
public void setRemainingAmount(int remainingAmount) {
42+
this.remainingAmount = remainingAmount;
43+
}
44+
45+
public boolean wasResourceExhaustedPresent() {
46+
return resourceExhaustedWasPresent;
47+
}
48+
49+
public void setResourceExhaustedWasPresent(final boolean resourceExhaustedWasPresent) {
50+
this.resourceExhaustedWasPresent = resourceExhaustedWasPresent;
51+
}
52+
53+
public void setPollPrevPartition(boolean pollPrevPartition) {
54+
this.pollPrevPartition = pollPrevPartition;
55+
}
56+
57+
public boolean shouldActivateJobs() {
58+
return remainingAmount > 0 && (pollPrevPartition || hasNextPartition());
59+
}
60+
}

0 commit comments

Comments
 (0)