Skip to content

Commit 5ab0512

Browse files
authored
[fix] [broker] rename to changeMaxReadPositionCount (apache#22656)
1 parent ca44b9b commit 5ab0512

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
7676
*/
7777
private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new LinkedMap<>();
7878

79-
// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it.
80-
private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong();
79+
// when change max read position, the count will +1. Take snapshot will reset the count.
80+
private final AtomicLong changeMaxReadPositionCount = new AtomicLong();
8181

8282
private final LongAdder txnCommittedCounter = new LongAdder();
8383

@@ -429,15 +429,15 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
429429
}
430430

431431
private void takeSnapshotByChangeTimes() {
432-
if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) {
433-
this.changeMaxReadPositionAndAddAbortTimes.set(0);
432+
if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) {
433+
this.changeMaxReadPositionCount.set(0);
434434
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
435435
}
436436
}
437437

438438
private void takeSnapshotByTimeout() {
439-
if (changeMaxReadPositionAndAddAbortTimes.get() > 0) {
440-
this.changeMaxReadPositionAndAddAbortTimes.set(0);
439+
if (changeMaxReadPositionCount.get() > 0) {
440+
this.changeMaxReadPositionCount.set(0);
441441
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
442442
}
443443
this.timer.newTimeout(TopicTransactionBuffer.this,
@@ -454,7 +454,7 @@ void updateMaxReadPosition(TxnID txnID) {
454454
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
455455
}
456456
if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
457-
this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
457+
this.changeMaxReadPositionCount.getAndIncrement();
458458
}
459459
}
460460

@@ -489,7 +489,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
489489
} else if (checkIfReady()) {
490490
if (ongoingTxns.isEmpty()) {
491491
maxReadPosition = position;
492-
changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
492+
changeMaxReadPositionCount.incrementAndGet();
493493
}
494494
}
495495
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,20 +1095,20 @@ public void testCancelTxnTimeout() throws Exception{
10951095
}
10961096

10971097
@Test
1098-
public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
1098+
public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception {
10991099
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
11001100
.getBrokerService()
1101-
.getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
1101+
.getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true)
11021102
.get().get();
11031103
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
11041104
Field processorField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
11051105
processorField.setAccessible(true);
11061106

11071107
AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) processorField.get(buffer);
11081108
Field changeTimeField = TopicTransactionBuffer
1109-
.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
1109+
.class.getDeclaredField("changeMaxReadPositionCount");
11101110
changeTimeField.setAccessible(true);
1111-
AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) changeTimeField.get(buffer);
1111+
AtomicLong changeMaxReadPositionCount = (AtomicLong) changeTimeField.get(buffer);
11121112

11131113
Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
11141114
field1.setAccessible(true);
@@ -1117,10 +1117,10 @@ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot()
11171117
TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
11181118
Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
11191119
});
1120-
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
1120+
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
11211121

11221122
buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
1123-
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
1123+
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
11241124

11251125
}
11261126

0 commit comments

Comments
 (0)