21
21
import io .camunda .zeebe .test .util .bpmn .random .TestDataGenerator .TestDataRecord ;
22
22
import io .camunda .zeebe .test .util .record .RecordingExporter ;
23
23
import java .util .Collection ;
24
+ import java .util .Map ;
24
25
import org .assertj .core .api .SoftAssertions ;
25
26
import org .awaitility .Awaitility ;
26
27
import org .junit .Before ;
@@ -38,6 +39,9 @@ public class ReplayStateRandomizedPropertyTest {
38
39
private static final String PROCESS_COUNT = System .getProperty ("processCount" , "3" );
39
40
private static final String EXECUTION_PATH_COUNT =
40
41
System .getProperty ("replayExecutionCount" , "1" );
42
+ /* Grace period to wait if new records come in after processing has reached end */
43
+ private static final long GRACE_PERIOD = 50 ; // ms
44
+
41
45
@ Parameter public TestDataRecord record ;
42
46
43
47
@ Rule
@@ -101,7 +105,12 @@ public void shouldRestoreStateAtEachStepInExecution() {
101
105
102
106
private void stopAndRestartEngineAndCompareStates () {
103
107
// given
104
- waitForProcessingToStop ();
108
+ Awaitility .await (
109
+ "await the last written record to be processed, then wait a GRACE_PERIOD to make sure no new events are added" )
110
+ .untilAsserted (
111
+ () -> {
112
+ processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod ();
113
+ });
105
114
106
115
engineRule .pauseProcessing (1 );
107
116
@@ -122,51 +131,58 @@ private void stopAndRestartEngineAndCompareStates() {
122
131
.untilAsserted (
123
132
() -> {
124
133
final var replayState = engineRule .collectState ();
134
+ assertIdenticalStates (processingState , replayState );
135
+ });
136
+ }
125
137
126
- final var softly = new SoftAssertions ();
127
-
128
- processingState .entrySet ().stream ()
129
- .filter (entry -> entry .getKey () != ZbColumnFamilies .DEFAULT )
130
- .forEach (
131
- entry -> {
132
- final var column = entry .getKey ();
133
- final var processingEntries = entry .getValue ();
134
- final var replayEntries = replayState .get (column );
135
-
136
- if (processingEntries .isEmpty ()) {
137
- softly
138
- .assertThat (replayEntries )
139
- .describedAs (
140
- "The state column '%s' should be empty after replay" , column )
141
- .isEmpty ();
142
- } else {
143
- softly
144
- .assertThat (replayEntries )
145
- .describedAs (
146
- "The state column '%s' has different entries after replay" ,
147
- column )
148
- .containsExactlyInAnyOrderEntriesOf (processingEntries );
149
- }
150
- });
151
-
152
- softly .assertAll ();
138
+ private void assertIdenticalStates (
139
+ final Map <ZbColumnFamilies , Map <Object , Object >> expectedState ,
140
+ final Map <ZbColumnFamilies , Map <Object , Object >> actualState ) {
141
+ final var softly = new SoftAssertions ();
142
+ expectedState .entrySet ().stream ()
143
+ .filter (entry -> entry .getKey () != ZbColumnFamilies .DEFAULT )
144
+ .forEach (
145
+ entry -> {
146
+ final var column = entry .getKey ();
147
+ final var expectedEntries = entry .getValue ();
148
+ final var actualEntries = actualState .get (column );
149
+
150
+ if (expectedEntries .isEmpty ()) {
151
+ softly
152
+ .assertThat (actualEntries )
153
+ .describedAs ("The state column '%s' should be empty" , column )
154
+ .isEmpty ();
155
+ } else {
156
+ softly
157
+ .assertThat (actualEntries )
158
+ .describedAs ("The state column '%s' has different entries" , column )
159
+ .containsExactlyInAnyOrderEntriesOf (expectedEntries );
160
+ }
153
161
});
162
+
163
+ softly .assertAll ();
154
164
}
155
165
156
- private void waitForProcessingToStop () {
157
- Awaitility .await ("await the last written record to be processed" )
158
- .untilAsserted (
159
- () ->
160
- assertThat (engineRule .hasReachedEnd ())
161
- .describedAs ("Processing has reached end of the log." )
162
- .isTrue ());
166
+ private void processingHasStoppedAndNoNewRecordsAreAddedDuringGracePeriod ()
167
+ throws InterruptedException {
168
+ assertThat (engineRule .hasReachedEnd ())
169
+ .describedAs ("Processing has reached end of the log." )
170
+ .isTrue ();
171
+ final var stateBeforeGracePeriod = engineRule .collectState ();
172
+ Thread .sleep (GRACE_PERIOD );
173
+ assertThat (engineRule .hasReachedEnd ())
174
+ .describedAs ("Processing has reached end of the log." )
175
+ .isTrue ();
176
+ final var stateAfterGracePeriod = engineRule .collectState ();
177
+
178
+ assertIdenticalStates (stateBeforeGracePeriod , stateAfterGracePeriod );
163
179
}
164
180
165
181
@ Parameters (name = "{0}" )
166
182
public static Collection <TestDataRecord > getTestRecords () {
167
183
// use the following code to rerun a specific test case:
168
- // final var processSeed = 3499044774323385558L ;
169
- // final var executionPathSeed = 3627169465144620203L ;
184
+ // final var processSeed = 6163452194952018956L ;
185
+ // final var executionPathSeed = 6499103602285813109L ;
170
186
// return List.of(TestDataGenerator.regenerateTestRecord(processSeed, executionPathSeed));
171
187
return TestDataGenerator .generateTestRecords (
172
188
Integer .parseInt (PROCESS_COUNT ), Integer .parseInt (EXECUTION_PATH_COUNT ));
0 commit comments