@@ -2416,15 +2416,13 @@ public static void rebuildProducerState(ProducerStateManager producerStateManage
2416
2416
Time time ,
2417
2417
boolean reloadFromCleanShutdown ,
2418
2418
String logPrefix ) throws IOException {
2419
- List <Optional <Long >> offsetsToSnapshot = new ArrayList <>();
2420
- if (segments .nonEmpty ()) {
2421
- long lastSegmentBaseOffset = segments .lastSegment ().get ().baseOffset ();
2422
- Optional <LogSegment > lowerSegment = segments .lowerSegment (lastSegmentBaseOffset );
2423
- Optional <Long > nextLatestSegmentBaseOffset = lowerSegment .map (LogSegment ::baseOffset );
2424
- offsetsToSnapshot .add (nextLatestSegmentBaseOffset );
2425
- offsetsToSnapshot .add (Optional .of (lastSegmentBaseOffset ));
2426
- }
2427
- offsetsToSnapshot .add (Optional .of (lastOffset ));
2419
+ List <Long > offsetsToSnapshot = new ArrayList <>();
2420
+ segments .lastSegment ().ifPresent (lastSegment -> {
2421
+ long lastSegmentBaseOffset = lastSegment .baseOffset ();
2422
+ segments .lowerSegment (lastSegmentBaseOffset ).ifPresent (s -> offsetsToSnapshot .add (s .baseOffset ()));
2423
+ offsetsToSnapshot .add (lastSegmentBaseOffset );
2424
+ });
2425
+ offsetsToSnapshot .add (lastOffset );
2428
2426
2429
2427
LOG .info ("{}Loading producer state till offset {}" , logPrefix , lastOffset );
2430
2428
@@ -2443,11 +2441,9 @@ public static void rebuildProducerState(ProducerStateManager producerStateManage
2443
2441
// To avoid an expensive scan through all the segments, we take empty snapshots from the start of the
2444
2442
// last two segments and the last offset. This should avoid the full scan in the case that the log needs
2445
2443
// truncation.
2446
- for (Optional <Long > offset : offsetsToSnapshot ) {
2447
- if (offset .isPresent ()) {
2448
- producerStateManager .updateMapEndOffset (offset .get ());
2449
- producerStateManager .takeSnapshot ();
2450
- }
2444
+ for (long offset : offsetsToSnapshot ) {
2445
+ producerStateManager .updateMapEndOffset (offset );
2446
+ producerStateManager .takeSnapshot ();
2451
2447
}
2452
2448
} else {
2453
2449
LOG .info ("{}Reloading from producer snapshot and rebuilding producer state from offset {}" , logPrefix , lastOffset );
@@ -2469,7 +2465,7 @@ public static void rebuildProducerState(ProducerStateManager producerStateManage
2469
2465
long startOffset = Utils .max (segment .baseOffset (), producerStateManager .mapEndOffset (), logStartOffset );
2470
2466
producerStateManager .updateMapEndOffset (startOffset );
2471
2467
2472
- if (offsetsToSnapshot .contains (Optional . of ( segment .baseOffset () ))) {
2468
+ if (offsetsToSnapshot .contains (segment .baseOffset ())) {
2473
2469
producerStateManager .takeSnapshot ();
2474
2470
}
2475
2471
int maxPosition = segment .size ();
0 commit comments