@@ -89,7 +89,9 @@ public void shouldCallStreamProcessorLifecycle() throws Exception {
89
89
public void shouldCallStreamProcessorLifecycleOnFail () {
90
90
// given
91
91
final var mockProcessorLifecycleAware = streamPlatform .getMockProcessorLifecycleAware ();
92
- doThrow (new RuntimeException ("force fail" )).when (mockProcessorLifecycleAware ).onRecovered (any ());
92
+ doThrow (new RuntimeException ("force fail" ))
93
+ .when (mockProcessorLifecycleAware )
94
+ .onRecovered (any ());
93
95
94
96
// when
95
97
streamPlatform .startStreamProcessor ();
@@ -140,7 +142,9 @@ public void shouldCallOnErrorWhenProcessingFails() {
140
142
inOrder .verify (defaultRecordProcessor , TIMEOUT ).init (any ());
141
143
inOrder .verify (defaultRecordProcessor , TIMEOUT ).accepts (ValueType .PROCESS_INSTANCE );
142
144
inOrder .verify (defaultRecordProcessor , TIMEOUT ).process (any (), any ());
143
- inOrder .verify (defaultRecordProcessor , TIMEOUT ).onProcessingError (eq (processingError ), any (), any ());
145
+ inOrder
146
+ .verify (defaultRecordProcessor , TIMEOUT )
147
+ .onProcessingError (eq (processingError ), any (), any ());
144
148
inOrder .verifyNoMoreInteractions ();
145
149
}
146
150
@@ -163,7 +167,9 @@ public void shouldLoopWhenOnErrorFails() {
163
167
inOrder .verify (defaultRecordProcessor , TIMEOUT ).init (any ());
164
168
inOrder .verify (defaultRecordProcessor , TIMEOUT ).accepts (ValueType .PROCESS_INSTANCE );
165
169
inOrder .verify (defaultRecordProcessor , TIMEOUT ).process (any (), any ());
166
- inOrder .verify (defaultRecordProcessor , TIMEOUT .atLeast (5 )).onProcessingError (eq (processingError ), any (), any ());
170
+ inOrder
171
+ .verify (defaultRecordProcessor , TIMEOUT .atLeast (5 ))
172
+ .onProcessingError (eq (processingError ), any (), any ());
167
173
}
168
174
169
175
@ Test
@@ -221,14 +227,14 @@ public void shouldWriteFollowUpEventsAndCommands() {
221
227
// given
222
228
final var defaultRecordProcessor = streamPlatform .getDefaultMockedRecordProcessor ();
223
229
final var firstResultBuilder = new BufferedProcessingResultBuilder ((c , v ) -> true );
224
- firstResultBuilder .appendRecordReturnEither (1 , RecordType . EVENT ,
225
- ACTIVATE_ELEMENT , RejectionType .NULL_VAL , "" , RECORD );
226
- firstResultBuilder .appendRecordReturnEither (2 , RecordType . COMMAND ,
227
- ACTIVATE_ELEMENT , RejectionType .NULL_VAL , "" , RECORD );
230
+ firstResultBuilder .appendRecordReturnEither (
231
+ 1 , RecordType . EVENT , ACTIVATE_ELEMENT , RejectionType .NULL_VAL , "" , RECORD );
232
+ firstResultBuilder .appendRecordReturnEither (
233
+ 2 , RecordType . COMMAND , ACTIVATE_ELEMENT , RejectionType .NULL_VAL , "" , RECORD );
228
234
229
235
final var secondResultBuilder = new BufferedProcessingResultBuilder ((c , v ) -> true );
230
- secondResultBuilder .appendRecordReturnEither (3 , RecordType . EVENT ,
231
- ACTIVATE_ELEMENT , RejectionType .NULL_VAL , "" , RECORD );
236
+ secondResultBuilder .appendRecordReturnEither (
237
+ 3 , RecordType . EVENT , ACTIVATE_ELEMENT , RejectionType .NULL_VAL , "" , RECORD );
232
238
233
239
when (defaultRecordProcessor .process (any (), any ()))
234
240
.thenReturn (firstResultBuilder .build ())
@@ -238,17 +244,21 @@ public void shouldWriteFollowUpEventsAndCommands() {
238
244
streamPlatform .startStreamProcessor ();
239
245
240
246
// when
241
- streamPlatform .writeBatch (
242
- command ().processInstance (ACTIVATE_ELEMENT , RECORD ));
247
+ streamPlatform .writeBatch (command ().processInstance (ACTIVATE_ELEMENT , RECORD ));
243
248
244
249
// then
245
250
verify (defaultRecordProcessor , TIMEOUT .times (2 )).process (any (), any ());
246
- await ("Last written position should be updated" ).untilAsserted (() ->
247
- assertThat (streamPlatform .getLogStream ().getLastWrittenPosition ()).isEqualTo (4 )
248
- );
249
- await ("Last processed position should be updated" ).untilAsserted (() ->
250
- assertThat (streamPlatform .getStreamProcessor ().getLastProcessedPositionAsync ().join ()).isEqualTo (3 ));
251
+ await ("Last written position should be updated" )
252
+ .untilAsserted (
253
+ () -> assertThat (streamPlatform .getLogStream ().getLastWrittenPosition ()).isEqualTo (4 ));
254
+ await ("Last processed position should be updated" )
255
+ .untilAsserted (
256
+ () ->
257
+ assertThat (
258
+ streamPlatform .getStreamProcessor ().getLastProcessedPositionAsync ().join ())
259
+ .isEqualTo (3 ));
251
260
}
261
+
252
262
@ Test
253
263
public void shouldExecutePostCommitTask () {
254
264
// given
@@ -261,8 +271,7 @@ public void shouldExecutePostCommitTask() {
261
271
streamPlatform .startStreamProcessor ();
262
272
263
273
// when
264
- streamPlatform .writeBatch (
265
- command ().processInstance (ACTIVATE_ELEMENT , RECORD ));
274
+ streamPlatform .writeBatch (command ().processInstance (ACTIVATE_ELEMENT , RECORD ));
266
275
267
276
// then
268
277
verify (mockPostCommitTask , TIMEOUT ).flush ();
@@ -282,8 +291,7 @@ public void shouldRepeatExecutePostCommitTask() {
282
291
streamPlatform .startStreamProcessor ();
283
292
284
293
// when
285
- streamPlatform .writeBatch (
286
- command ().processInstance (ACTIVATE_ELEMENT , RECORD ));
294
+ streamPlatform .writeBatch (command ().processInstance (ACTIVATE_ELEMENT , RECORD ));
287
295
288
296
// then
289
297
verify (mockPostCommitTask , TIMEOUT .atLeast (5 )).flush ();
@@ -318,18 +326,20 @@ public void shouldNotRepeatPostCommitOnException() throws Exception {
318
326
public void shouldUpdateStateOnSuccessfulProcessing () {
319
327
// given
320
328
final var testProcessor = spy (new TestProcessor ());
321
- testProcessor .processingAction = (ctx ) -> {
322
- final var zeebeDb = ctx .getZeebeDb ();
323
- final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
324
- keyGenerator .nextKey ();
325
- keyGenerator .nextKey ();
326
- keyGenerator .nextKey ();
327
- };
329
+ testProcessor .processingAction =
330
+ (ctx ) -> {
331
+ final var zeebeDb = ctx .getZeebeDb ();
332
+ final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
333
+ keyGenerator .nextKey ();
334
+ keyGenerator .nextKey ();
335
+ keyGenerator .nextKey ();
336
+ };
328
337
// in order to not mark the processing as skipped we need to return a result
329
338
testProcessor .processingResult = new BufferedProcessingResultBuilder ((c , s ) -> true ).build ();
330
339
doCallRealMethod ()
331
340
.doReturn (EmptyProcessingResult .INSTANCE )
332
- .when (testProcessor ).process (any (), any ());
341
+ .when (testProcessor )
342
+ .process (any (), any ());
333
343
streamPlatform .withRecordProcessors (List .of (testProcessor )).startStreamProcessor ();
334
344
335
345
final var zeebeDb = testProcessor .recordProcessorContext .getZeebeDb ();
@@ -352,20 +362,22 @@ public void shouldUpdateStateOnSuccessfulProcessing() {
352
362
public void shouldNotUpdateStateOnExceptionInProcessing () {
353
363
// given
354
364
final var testProcessor = spy (new TestProcessor ());
355
- testProcessor .processingAction = (ctx ) -> {
356
- final var zeebeDb = ctx .getZeebeDb ();
357
- final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
358
- keyGenerator .nextKey ();
359
- keyGenerator .nextKey ();
360
- keyGenerator .nextKey ();
361
-
362
- throw new RuntimeException ("expected" );
363
- };
365
+ testProcessor .processingAction =
366
+ (ctx ) -> {
367
+ final var zeebeDb = ctx .getZeebeDb ();
368
+ final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
369
+ keyGenerator .nextKey ();
370
+ keyGenerator .nextKey ();
371
+ keyGenerator .nextKey ();
372
+
373
+ throw new RuntimeException ("expected" );
374
+ };
364
375
// in order to not mark the processing as skipped we need to return a result
365
376
testProcessor .processingResult = new BufferedProcessingResultBuilder ((c , s ) -> true ).build ();
366
377
doCallRealMethod ()
367
378
.doReturn (EmptyProcessingResult .INSTANCE )
368
- .when (testProcessor ).process (any (), any ());
379
+ .when (testProcessor )
380
+ .process (any (), any ());
369
381
streamPlatform .withRecordProcessors (List .of (testProcessor )).startStreamProcessor ();
370
382
371
383
final var zeebeDb = testProcessor .recordProcessorContext .getZeebeDb ();
@@ -388,19 +400,22 @@ public void shouldNotUpdateStateOnExceptionInProcessing() {
388
400
public void shouldUpdateStateOnProcessingErrorCall () {
389
401
// given
390
402
final var testProcessor = spy (new TestProcessor ());
391
- testProcessor .processingAction = (ctx ) -> {
392
- throw new RuntimeException ("expected" );
393
- };
394
- testProcessor .onProcessingErrorAction = (ctx ) -> {
395
- final var zeebeDb = ctx .getZeebeDb ();
396
- final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
397
- keyGenerator .nextKey ();
398
- keyGenerator .nextKey ();
399
- keyGenerator .nextKey ();
400
- };
403
+ testProcessor .processingAction =
404
+ (ctx ) -> {
405
+ throw new RuntimeException ("expected" );
406
+ };
407
+ testProcessor .onProcessingErrorAction =
408
+ (ctx ) -> {
409
+ final var zeebeDb = ctx .getZeebeDb ();
410
+ final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
411
+ keyGenerator .nextKey ();
412
+ keyGenerator .nextKey ();
413
+ keyGenerator .nextKey ();
414
+ };
401
415
doCallRealMethod ()
402
416
.doReturn (EmptyProcessingResult .INSTANCE )
403
- .when (testProcessor ).process (any (), any ());
417
+ .when (testProcessor )
418
+ .process (any (), any ());
404
419
streamPlatform .withRecordProcessors (List .of (testProcessor )).startStreamProcessor ();
405
420
406
421
final var zeebeDb = testProcessor .recordProcessorContext .getZeebeDb ();
@@ -423,24 +438,28 @@ public void shouldUpdateStateOnProcessingErrorCall() {
423
438
public void shouldNotUpdateStateOnExceptionOnProcessingErrorCall () {
424
439
// given
425
440
final var testProcessor = spy (new TestProcessor ());
426
- testProcessor .processingAction = (ctx ) -> {
427
- throw new RuntimeException ("expected" );
428
- };
429
- testProcessor .onProcessingErrorAction = (ctx ) -> {
430
- final var zeebeDb = ctx .getZeebeDb ();
431
- final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
432
- keyGenerator .nextKey ();
433
- keyGenerator .nextKey ();
434
- keyGenerator .nextKey ();
435
-
436
- throw new RuntimeException ("expected" );
437
- };
441
+ testProcessor .processingAction =
442
+ (ctx ) -> {
443
+ throw new RuntimeException ("expected" );
444
+ };
445
+ testProcessor .onProcessingErrorAction =
446
+ (ctx ) -> {
447
+ final var zeebeDb = ctx .getZeebeDb ();
448
+ final var keyGenerator = new DbKeyGenerator (1 , zeebeDb , ctx .getTransactionContext ());
449
+ keyGenerator .nextKey ();
450
+ keyGenerator .nextKey ();
451
+ keyGenerator .nextKey ();
452
+
453
+ throw new RuntimeException ("expected" );
454
+ };
438
455
doCallRealMethod ()
439
456
.doReturn (EmptyProcessingResult .INSTANCE )
440
- .when (testProcessor ).process (any (), any ());
457
+ .when (testProcessor )
458
+ .process (any (), any ());
441
459
doCallRealMethod ()
442
460
.doReturn (EmptyProcessingResult .INSTANCE )
443
- .when (testProcessor ).onProcessingError (any (), any (), any ());
461
+ .when (testProcessor )
462
+ .onProcessingError (any (), any (), any ());
444
463
streamPlatform .withRecordProcessors (List .of (testProcessor )).startStreamProcessor ();
445
464
446
465
final var zeebeDb = testProcessor .recordProcessorContext .getZeebeDb ();
@@ -465,7 +484,16 @@ public void shouldWriteResponse() {
465
484
final var defaultMockedRecordProcessor = streamPlatform .getDefaultMockedRecordProcessor ();
466
485
467
486
final var resultBuilder = new BufferedProcessingResultBuilder ((c , s ) -> true );
468
- resultBuilder .withResponse (RecordType .EVENT , 3 , ELEMENT_ACTIVATING , RECORD , ValueType .PROCESS_INSTANCE , RejectionType .NULL_VAL , "" , 1 , 12 );
487
+ resultBuilder .withResponse (
488
+ RecordType .EVENT ,
489
+ 3 ,
490
+ ELEMENT_ACTIVATING ,
491
+ RECORD ,
492
+ ValueType .PROCESS_INSTANCE ,
493
+ RejectionType .NULL_VAL ,
494
+ "" ,
495
+ 1 ,
496
+ 12 );
469
497
when (defaultMockedRecordProcessor .process (any (), any ()))
470
498
.thenReturn (resultBuilder .build ())
471
499
.thenReturn (EmptyProcessingResult .INSTANCE );
@@ -480,8 +508,7 @@ public void shouldWriteResponse() {
480
508
// then
481
509
verify (defaultMockedRecordProcessor , TIMEOUT .times (2 )).process (any (), any ());
482
510
483
- final var commandResponseWriter =
484
- streamPlatform .getMockCommandResponseWriter ();
511
+ final var commandResponseWriter = streamPlatform .getMockCommandResponseWriter ();
485
512
486
513
verify (commandResponseWriter , TIMEOUT .times (1 )).key (3 );
487
514
verify (commandResponseWriter , TIMEOUT .times (1 ))
@@ -500,7 +527,16 @@ public void shouldWriteResponseOnFailedEventProcessing() {
500
527
.thenReturn (EmptyProcessingResult .INSTANCE );
501
528
502
529
final var resultBuilder = new BufferedProcessingResultBuilder ((c , s ) -> true );
503
- resultBuilder .withResponse (RecordType .EVENT , 3 , ELEMENT_ACTIVATING , RECORD , ValueType .PROCESS_INSTANCE , RejectionType .NULL_VAL , "" , 1 , 12 );
530
+ resultBuilder .withResponse (
531
+ RecordType .EVENT ,
532
+ 3 ,
533
+ ELEMENT_ACTIVATING ,
534
+ RECORD ,
535
+ ValueType .PROCESS_INSTANCE ,
536
+ RejectionType .NULL_VAL ,
537
+ "" ,
538
+ 1 ,
539
+ 12 );
504
540
when (defaultMockedRecordProcessor .onProcessingError (any (), any (), any ()))
505
541
.thenReturn (resultBuilder .build ())
506
542
.thenReturn (EmptyProcessingResult .INSTANCE );
@@ -516,8 +552,7 @@ public void shouldWriteResponseOnFailedEventProcessing() {
516
552
verify (defaultMockedRecordProcessor , TIMEOUT .times (2 )).process (any (), any ());
517
553
verify (defaultMockedRecordProcessor , TIMEOUT .times (1 )).onProcessingError (any (), any (), any ());
518
554
519
- final var commandResponseWriter =
520
- streamPlatform .getMockCommandResponseWriter ();
555
+ final var commandResponseWriter = streamPlatform .getMockCommandResponseWriter ();
521
556
522
557
verify (commandResponseWriter , TIMEOUT .times (1 )).key (3 );
523
558
verify (commandResponseWriter , TIMEOUT .times (1 ))
@@ -534,8 +569,7 @@ public void shouldInvokeOnProcessedListenerWhenReturnResult() {
534
569
final var defaultMockedRecordProcessor = streamPlatform .getDefaultMockedRecordProcessor ();
535
570
536
571
final var resultBuilder = new BufferedProcessingResultBuilder ((c , s ) -> true );
537
- when (defaultMockedRecordProcessor .process (any (), any ()))
538
- .thenReturn (resultBuilder .build ());
572
+ when (defaultMockedRecordProcessor .process (any (), any ())).thenReturn (resultBuilder .build ());
539
573
streamPlatform .startStreamProcessor ();
540
574
541
575
// when
@@ -623,12 +657,14 @@ public void shouldNotUpdateLastWrittenPositionWhenSkipped() {
623
657
624
658
// then
625
659
verify (defaultRecordProcessor , TIMEOUT .times (2 )).process (any (), any ());
626
- await ("Last written position should be updated" ).untilAsserted (() ->
627
- assertThat (streamPlatform .getStreamProcessor ().getLastWrittenPositionAsync ().join ()).isEqualTo (-1 )
628
- );
660
+ await ("Last written position should be updated" )
661
+ .untilAsserted (
662
+ () ->
663
+ assertThat (streamPlatform .getStreamProcessor ().getLastWrittenPositionAsync ().join ())
664
+ .isEqualTo (-1 ));
629
665
}
630
666
631
- private final static class TestProcessor implements RecordProcessor {
667
+ private static final class TestProcessor implements RecordProcessor {
632
668
633
669
ProcessingResult processingResult = EmptyProcessingResult .INSTANCE ;
634
670
ProcessingResult processingResultOnError = EmptyProcessingResult .INSTANCE ;
@@ -647,18 +683,18 @@ public boolean accepts(final ValueType valueType) {
647
683
}
648
684
649
685
@ Override
650
- public void replay (final TypedRecord record ) {
651
- }
686
+ public void replay (final TypedRecord record ) {}
652
687
653
688
@ Override
654
- public ProcessingResult process (final TypedRecord record ,
655
- final ProcessingResultBuilder processingResultBuilder ) {
689
+ public ProcessingResult process (
690
+ final TypedRecord record , final ProcessingResultBuilder processingResultBuilder ) {
656
691
processingAction .accept (recordProcessorContext );
657
692
return processingResult ;
658
693
}
659
694
660
695
@ Override
661
- public ProcessingResult onProcessingError (final Throwable processingException ,
696
+ public ProcessingResult onProcessingError (
697
+ final Throwable processingException ,
662
698
final TypedRecord record ,
663
699
final ProcessingResultBuilder processingResultBuilder ) {
664
700
onProcessingErrorAction .accept (recordProcessorContext );
0 commit comments