Skip to content

Commit 5cc381c

Browse files
zeebe-bors-cloud[bot]npepinpe
zeebe-bors-cloud[bot]
andauthored
merge: #8832
8832: [Backport stable/1.3] Correctly truncate a job activation batch if it will not fit in the dispatcher r=npepinpe a=github-actions[bot] # Description Backport of #8799 to `stable/1.3`. relates to #8797 #5525 Co-authored-by: Nicolas Pepin-Perreault <[email protected]>
2 parents d4088b8 + 39f2ef7 commit 5cc381c

17 files changed

+649
-138
lines changed

engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java

-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public static TypedRecordProcessors createEngineProcessors(
6565

6666
final LogStream stream = processingContext.getLogStream();
6767
final int partitionId = stream.getPartitionId();
68-
final int maxFragmentSize = processingContext.getMaxFragmentSize();
6968

7069
final var variablesState = zeebeState.getVariableState();
7170
final var expressionProcessor =
@@ -128,7 +127,6 @@ public static TypedRecordProcessors createEngineProcessors(
128127
zeebeState,
129128
onJobsAvailableCallback,
130129
eventPublicationBehavior,
131-
maxFragmentSize,
132130
writers,
133131
jobMetrics,
134132
eventTriggerBehavior);

engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java

+10
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor
3939
writer.appendFollowUpEvent(key, intent, value);
4040
}
4141

42+
@Override
43+
public boolean canWriteEventOfLength(final int eventLength) {
44+
return writer.canWriteEventOfLength(eventLength);
45+
}
46+
47+
@Override
48+
public int getMaxEventLength() {
49+
return writer.getMaxEventLength();
50+
}
51+
4252
@Override
4353
public void appendNewCommand(final Intent intent, final RecordValue value) {
4454
writer.appendNewCommand(intent, value);

engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public long getRequestId() {
105105
}
106106

107107
@Override
108-
public long getLength() {
108+
public int getLength() {
109109
return 0;
110110
}
111111

engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java

+24-110
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;
1111

1212
import io.camunda.zeebe.engine.metrics.JobMetrics;
13+
import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob;
1314
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
1415
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
1516
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
@@ -18,13 +19,7 @@
1819
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
1920
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
2021
import io.camunda.zeebe.engine.state.KeyGenerator;
21-
import io.camunda.zeebe.engine.state.immutable.JobState;
22-
import io.camunda.zeebe.engine.state.immutable.VariableState;
2322
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
24-
import io.camunda.zeebe.msgpack.value.DocumentValue;
25-
import io.camunda.zeebe.msgpack.value.LongValue;
26-
import io.camunda.zeebe.msgpack.value.StringValue;
27-
import io.camunda.zeebe.msgpack.value.ValueArray;
2823
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
2924
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
3025
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
@@ -33,46 +28,32 @@
3328
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
3429
import io.camunda.zeebe.protocol.record.value.ErrorType;
3530
import io.camunda.zeebe.util.ByteValue;
36-
import java.util.Collection;
37-
import java.util.concurrent.atomic.AtomicInteger;
31+
import io.camunda.zeebe.util.Either;
3832
import org.agrona.DirectBuffer;
39-
import org.agrona.ExpandableArrayBuffer;
40-
import org.agrona.MutableDirectBuffer;
41-
import org.agrona.collections.ObjectHashSet;
42-
import org.agrona.concurrent.UnsafeBuffer;
4333

4434
public final class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {
4535

4636
private final StateWriter stateWriter;
47-
private final VariableState variableState;
4837
private final TypedRejectionWriter rejectionWriter;
4938
private final TypedResponseWriter responseWriter;
50-
51-
private final JobState jobState;
39+
private final JobBatchCollector jobBatchCollector;
5240
private final KeyGenerator keyGenerator;
53-
private final long maxRecordLength;
54-
private final long maxJobBatchLength;
55-
56-
private final ObjectHashSet<DirectBuffer> variableNames = new ObjectHashSet<>();
5741
private final JobMetrics jobMetrics;
5842

5943
public JobBatchActivateProcessor(
6044
final Writers writers,
6145
final ZeebeState state,
6246
final KeyGenerator keyGenerator,
63-
final long maxRecordLength,
6447
final JobMetrics jobMetrics) {
6548

6649
stateWriter = writers.state();
6750
rejectionWriter = writers.rejection();
6851
responseWriter = writers.response();
52+
jobBatchCollector =
53+
new JobBatchCollector(
54+
state.getJobState(), state.getVariableState(), stateWriter::canWriteEventOfLength);
6955

70-
jobState = state.getJobState();
71-
variableState = state.getVariableState();
7256
this.keyGenerator = keyGenerator;
73-
74-
this.maxRecordLength = maxRecordLength;
75-
maxJobBatchLength = maxRecordLength - Long.BYTES;
7657
this.jobMetrics = jobMetrics;
7758
}
7859

@@ -97,95 +78,21 @@ private boolean isValid(final JobBatchRecord record) {
9778

9879
private void activateJobs(final TypedRecord<JobBatchRecord> record) {
9980
final JobBatchRecord value = record.getValue();
100-
10181
final long jobBatchKey = keyGenerator.nextKey();
10282

103-
final AtomicInteger amount = new AtomicInteger(value.getMaxJobsToActivate());
104-
collectJobsToActivate(record, amount);
105-
106-
stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value);
107-
responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record);
108-
109-
final var activatedJobsCount = record.getValue().getJobKeys().size();
110-
jobMetrics.jobActivated(value.getType(), activatedJobsCount);
111-
}
112-
113-
private void collectJobsToActivate(
114-
final TypedRecord<JobBatchRecord> record, final AtomicInteger amount) {
115-
final JobBatchRecord value = record.getValue();
116-
final ValueArray<JobRecord> jobIterator = value.jobs();
117-
final ValueArray<LongValue> jobKeyIterator = value.jobKeys();
118-
119-
// collect jobs for activation
120-
variableNames.clear();
121-
final ValueArray<StringValue> jobBatchVariables = value.variables();
122-
123-
jobBatchVariables.forEach(
124-
v -> {
125-
final MutableDirectBuffer nameCopy = new UnsafeBuffer(new byte[v.getValue().capacity()]);
126-
nameCopy.putBytes(0, v.getValue(), 0, v.getValue().capacity());
127-
variableNames.add(nameCopy);
128-
});
129-
130-
jobState.forEachActivatableJobs(
131-
value.getTypeBuffer(),
132-
(key, jobRecord) -> {
133-
int remainingAmount = amount.get();
134-
final long deadline = record.getTimestamp() + value.getTimeout();
135-
jobRecord.setDeadline(deadline).setWorker(value.getWorkerBuffer());
136-
137-
// fetch and set variables, required here to already have the full size of the job record
138-
final long elementInstanceKey = jobRecord.getElementInstanceKey();
139-
if (elementInstanceKey >= 0) {
140-
final DirectBuffer variables = collectVariables(variableNames, elementInstanceKey);
141-
jobRecord.setVariables(variables);
142-
} else {
143-
jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT);
144-
}
145-
146-
if (remainingAmount >= 0
147-
&& (record.getLength() + jobRecord.getLength()) <= maxJobBatchLength) {
148-
149-
remainingAmount = amount.decrementAndGet();
150-
jobKeyIterator.add().setValue(key);
151-
final JobRecord arrayValueJob = jobIterator.add();
152-
153-
// clone job record since buffer is reused during iteration
154-
final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer(jobRecord.getLength());
155-
jobRecord.write(buffer, 0);
156-
arrayValueJob.wrap(buffer);
157-
} else {
158-
value.setTruncated(true);
159-
160-
if (value.getJobs().isEmpty()) {
161-
raiseIncidentJobTooLargeForMessageSize(key, jobRecord);
162-
}
163-
164-
return false;
165-
}
166-
167-
return remainingAmount > 0;
168-
});
169-
}
83+
final Either<TooLargeJob, Integer> result = jobBatchCollector.collectJobs(record);
84+
final var activatedJobCount = result.getOrElse(0);
85+
result.ifLeft(
86+
largeJob -> raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.record()));
17087

171-
private DirectBuffer collectVariables(
172-
final Collection<DirectBuffer> variableNames, final long elementInstanceKey) {
173-
final DirectBuffer variables;
174-
if (variableNames.isEmpty()) {
175-
variables = variableState.getVariablesAsDocument(elementInstanceKey);
176-
} else {
177-
variables = variableState.getVariablesAsDocument(elementInstanceKey, variableNames);
178-
}
179-
return variables;
88+
activateJobBatch(record, value, jobBatchKey, activatedJobCount);
18089
}
18190

18291
private void rejectCommand(final TypedRecord<JobBatchRecord> record) {
18392
final RejectionType rejectionType;
18493
final String rejectionReason;
185-
18694
final JobBatchRecord value = record.getValue();
187-
188-
final String format = "Expected to activate job batch with %s to be %s, but it was %s";
95+
final var format = "Expected to activate job batch with %s to be %s, but it was %s";
18996

19097
if (value.getMaxJobsToActivate() < 1) {
19198
rejectionType = RejectionType.INVALID_ARGUMENT;
@@ -212,18 +119,25 @@ private void rejectCommand(final TypedRecord<JobBatchRecord> record) {
212119
responseWriter.writeRejectionOnCommand(record, rejectionType, rejectionReason);
213120
}
214121

215-
private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) {
216-
217-
final String messageSize = ByteValue.prettyPrint(maxRecordLength);
122+
private void activateJobBatch(
123+
final TypedRecord<JobBatchRecord> record,
124+
final JobBatchRecord value,
125+
final long jobBatchKey,
126+
final Integer activatedCount) {
127+
stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value);
128+
responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record);
129+
jobMetrics.jobActivated(value.getType(), activatedCount);
130+
}
218131

132+
private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) {
133+
final String messageSize = ByteValue.prettyPrint(stateWriter.getMaxEventLength());
219134
final DirectBuffer incidentMessage =
220135
wrapString(
221136
String.format(
222137
"The job with key '%s' can not be activated because it is larger than the configured message size (%s). "
223138
+ "Try to reduce the size by reducing the number of fetched variables or modifying the variable values.",
224139
jobKey, messageSize));
225-
226-
final IncidentRecord incidentEvent =
140+
final var incidentEvent =
227141
new IncidentRecord()
228142
.setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED)
229143
.setErrorMessage(incidentMessage)

0 commit comments

Comments
 (0)