Skip to content

Commit e65e527

Browse files
refactor(engine): append side-effects to writer
This removes difficult to understand abstractions around side effects and ensures that the engine does not re-use side-effect queues which complicate batch processing and processing of uncommitted. Adds a `SideEffectWriter` backed by the `ProcessingResultBuilder` that is used throughout like other writers. (cherry picked from commit 9eac8c3)
1 parent 433061a commit e65e527

39 files changed

+176
-379
lines changed

engine/src/main/java/io/camunda/zeebe/engine/Engine.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public void replay(final TypedRecord event) {
104104
@Override
105105
public ProcessingResult process(
106106
final TypedRecord record, final ProcessingResultBuilder processingResultBuilder) {
107+
107108
try (final var scope = new ProcessingResultBuilderScope(processingResultBuilder)) {
108109
TypedRecordProcessor<?> currentProcessor = null;
109110

@@ -124,12 +125,7 @@ public ProcessingResult process(
124125

125126
final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand);
126127
if (isNotOnBlacklist) {
127-
currentProcessor.processRecord(
128-
record,
129-
(sep) -> {
130-
processingResultBuilder.resetPostCommitTasks();
131-
processingResultBuilder.appendPostCommitTask(sep::flush);
132-
});
128+
currentProcessor.processRecord(record);
133129
}
134130
}
135131
return processingResultBuilder.build();

engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java

+5-13
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
2525
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
2626
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
27-
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
2827
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
2928
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
3029
import io.camunda.zeebe.engine.state.KeyGenerator;
@@ -67,7 +66,6 @@ public static TypedRecordProcessors createEngineProcessors(
6766

6867
final var jobMetrics = new JobMetrics(partitionId);
6968
final var processEngineMetrics = new ProcessEngineMetrics(zeebeState.getPartitionId());
70-
final var sideEffectQueue = new SideEffectQueue();
7169

7270
final BpmnBehaviorsImpl bpmnBehaviors =
7371
createBehaviors(
@@ -77,8 +75,7 @@ public static TypedRecordProcessors createEngineProcessors(
7775
partitionsCount,
7876
timerChecker,
7977
jobMetrics,
80-
processEngineMetrics,
81-
sideEffectQueue);
78+
processEngineMetrics);
8279

8380
addDeploymentRelatedProcessorAndServices(
8481
bpmnBehaviors,
@@ -98,8 +95,7 @@ public static TypedRecordProcessors createEngineProcessors(
9895
typedRecordProcessors,
9996
subscriptionCommandSender,
10097
writers,
101-
timerChecker,
102-
sideEffectQueue);
98+
timerChecker);
10399

104100
JobEventProcessors.addJobProcessors(
105101
typedRecordProcessors,
@@ -121,10 +117,8 @@ private static BpmnBehaviorsImpl createBehaviors(
121117
final int partitionsCount,
122118
final DueDateTimerChecker timerChecker,
123119
final JobMetrics jobMetrics,
124-
final ProcessEngineMetrics processEngineMetrics,
125-
final SideEffectQueue sideEffectQueue) {
120+
final ProcessEngineMetrics processEngineMetrics) {
126121
return new BpmnBehaviorsImpl(
127-
sideEffectQueue,
128122
zeebeState,
129123
writers,
130124
jobMetrics,
@@ -140,16 +134,14 @@ private static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
140134
final TypedRecordProcessors typedRecordProcessors,
141135
final SubscriptionCommandSender subscriptionCommandSender,
142136
final Writers writers,
143-
final DueDateTimerChecker timerChecker,
144-
final SideEffectQueue sideEffectQueue) {
137+
final DueDateTimerChecker timerChecker) {
145138
return ProcessEventProcessors.addProcessProcessors(
146139
zeebeState,
147140
bpmnBehaviors,
148141
typedRecordProcessors,
149142
subscriptionCommandSender,
150143
timerChecker,
151-
writers,
152-
sideEffectQueue);
144+
writers);
153145
}
154146

155147
private static void addDeploymentRelatedProcessorAndServices(

engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.camunda.zeebe.engine.processing.processinstance.ProcessInstanceModificationProcessor;
2222
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
2323
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
24-
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
2524
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
2625
import io.camunda.zeebe.engine.processing.timer.CancelTimerProcessor;
2726
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
@@ -51,8 +50,7 @@ public static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
5150
final TypedRecordProcessors typedRecordProcessors,
5251
final SubscriptionCommandSender subscriptionCommandSender,
5352
final DueDateTimerChecker timerChecker,
54-
final Writers writers,
55-
final SideEffectQueue sideEffectQueue) {
53+
final Writers writers) {
5654
final MutableProcessMessageSubscriptionState subscriptionState =
5755
zeebeState.getProcessMessageSubscriptionState();
5856
final var keyGenerator = zeebeState.getKeyGenerator();
@@ -63,8 +61,7 @@ public static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
6361
writers, typedRecordProcessors, zeebeState.getElementInstanceState());
6462

6563
final var bpmnStreamProcessor =
66-
new BpmnStreamProcessor(
67-
bpmnBehaviors, zeebeState, writers, sideEffectQueue, processEngineMetrics);
64+
new BpmnStreamProcessor(bpmnBehaviors, zeebeState, writers, processEngineMetrics);
6865
addBpmnStepProcessor(typedRecordProcessors, bpmnStreamProcessor);
6966

7067
addMessageStreamProcessors(

engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/BpmnStreamProcessor.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior;
1616
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
1717
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
18-
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
19-
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
2018
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
2119
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
2220
import io.camunda.zeebe.engine.state.immutable.ProcessState;
@@ -25,7 +23,6 @@
2523
import io.camunda.zeebe.protocol.record.RejectionType;
2624
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
2725
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
28-
import java.util.function.Consumer;
2926
import org.slf4j.Logger;
3027

3128
public final class BpmnStreamProcessor implements TypedRecordProcessor<ProcessInstanceRecord> {
@@ -34,7 +31,6 @@ public final class BpmnStreamProcessor implements TypedRecordProcessor<ProcessIn
3431

3532
private final BpmnElementContextImpl context = new BpmnElementContextImpl();
3633

37-
private final SideEffectQueue sideEffectQueue;
3834
private final ProcessState processState;
3935
private final BpmnElementProcessors processors;
4036
private final ProcessInstanceStateTransitionGuard stateTransitionGuard;
@@ -46,7 +42,6 @@ public BpmnStreamProcessor(
4642
final BpmnBehaviors bpmnBehaviors,
4743
final MutableZeebeState zeebeState,
4844
final Writers writers,
49-
final SideEffectQueue sideEffectQueue,
5045
final ProcessEngineMetrics processEngineMetrics) {
5146
processState = zeebeState.getProcessState();
5247

@@ -61,7 +56,6 @@ public BpmnStreamProcessor(
6156
this::getContainerProcessor,
6257
writers);
6358
processors = new BpmnElementProcessors(bpmnBehaviors, stateTransitionBehavior);
64-
this.sideEffectQueue = sideEffectQueue;
6559
}
6660

6761
private BpmnElementContainerProcessor<ExecutableFlowElement> getContainerProcessor(
@@ -70,14 +64,9 @@ private BpmnElementContainerProcessor<ExecutableFlowElement> getContainerProcess
7064
}
7165

7266
@Override
73-
public void processRecord(
74-
final TypedRecord<ProcessInstanceRecord> record,
75-
final Consumer<SideEffectProducer> sideEffect) {
67+
public void processRecord(final TypedRecord<ProcessInstanceRecord> record) {
7668

7769
// initialize
78-
sideEffectQueue.clear();
79-
sideEffect.accept(sideEffectQueue);
80-
8170
final var intent = (ProcessInstanceIntent) record.getIntent();
8271
final var recordValue = record.getValue();
8372

engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
1818
import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
1919
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
20-
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects;
2120
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
2221
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
2322
import io.camunda.zeebe.engine.processing.variable.VariableBehavior;
@@ -45,7 +44,6 @@ public final class BpmnBehaviorsImpl implements BpmnBehaviors {
4544
private final ElementActivationBehavior elementActivationBehavior;
4645

4746
public BpmnBehaviorsImpl(
48-
final SideEffects sideEffects,
4947
final MutableZeebeState zeebeState,
5048
final Writers writers,
5149
final JobMetrics jobMetrics,
@@ -69,6 +67,7 @@ public BpmnBehaviorsImpl(
6967
expressionBehavior,
7068
subscriptionCommandSender,
7169
writers.state(),
70+
writers.sideEffect(),
7271
timerChecker,
7372
partitionsCount);
7473

@@ -94,8 +93,7 @@ public BpmnBehaviorsImpl(
9493
new BpmnVariableMappingBehavior(expressionBehavior, zeebeState, variableBehavior);
9594

9695
eventSubscriptionBehavior =
97-
new BpmnEventSubscriptionBehavior(
98-
catchEventBehavior, eventTriggerBehavior, sideEffects, zeebeState);
96+
new BpmnEventSubscriptionBehavior(catchEventBehavior, eventTriggerBehavior, zeebeState);
9997

10098
incidentBehavior =
10199
new BpmnIncidentBehavior(zeebeState, zeebeState.getKeyGenerator(), writers.state());

engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnEventSubscriptionBehavior.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.camunda.zeebe.engine.processing.common.Failure;
1414
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEventSupplier;
1515
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
16-
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects;
1716
import io.camunda.zeebe.engine.state.immutable.EventScopeInstanceState;
1817
import io.camunda.zeebe.engine.state.immutable.ProcessState;
1918
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
@@ -27,19 +26,15 @@ public final class BpmnEventSubscriptionBehavior {
2726
private final EventScopeInstanceState eventScopeInstanceState;
2827
private final CatchEventBehavior catchEventBehavior;
2928

30-
private final SideEffects sideEffects;
31-
3229
private final ProcessState processState;
3330
private final EventTriggerBehavior eventTriggerBehavior;
3431

3532
public BpmnEventSubscriptionBehavior(
3633
final CatchEventBehavior catchEventBehavior,
3734
final EventTriggerBehavior eventTriggerBehavior,
38-
final SideEffects sideEffects,
3935
final ZeebeState zeebeState) {
4036
this.catchEventBehavior = catchEventBehavior;
4137
this.eventTriggerBehavior = eventTriggerBehavior;
42-
this.sideEffects = sideEffects;
4338

4439
processState = zeebeState.getProcessState();
4540
eventScopeInstanceState = zeebeState.getEventScopeInstanceState();
@@ -50,11 +45,11 @@ public BpmnEventSubscriptionBehavior(
5045
*/
5146
public <T extends ExecutableCatchEventSupplier> Either<Failure, Void> subscribeToEvents(
5247
final T element, final BpmnElementContext context) {
53-
return catchEventBehavior.subscribeToEvents(context, element, sideEffects);
48+
return catchEventBehavior.subscribeToEvents(context, element);
5449
}
5550

5651
public void unsubscribeFromEvents(final BpmnElementContext context) {
57-
catchEventBehavior.unsubscribeFromEvents(context.getElementInstanceKey(), sideEffects);
52+
catchEventBehavior.unsubscribeFromEvents(context.getElementInstanceKey());
5853
}
5954

6055
/**

0 commit comments

Comments
 (0)