Skip to content

Commit 699ae1b

Browse files
KAFKA-16729: Support isolation level for share consumer (#19261)
This PR adds the share group dynamic config `share.isolation.level`. Until now, share groups only supported `READ_UNCOMMITTED` isolation level type. With this PR, we aim to support `READ_COMMITTED` isolation type to share groups. Reviewers: Andrew Schofield <[email protected]>, Jun Rao <[email protected]>, Apoorv Mittal <[email protected]>
1 parent eeb1214 commit 699ae1b

File tree

12 files changed

+1232
-102
lines changed

12 files changed

+1232
-102
lines changed

checkstyle/suppressions.xml

+3
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@
168168
<suppress checks="NPathComplexity"
169169
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
170170

171+
<suppress checks="ClassFanOutComplexity"
172+
files="ShareConsumerTest.java"/>
173+
171174
<!-- connect tests-->
172175
<suppress checks="ClassDataAbstractionCoupling"
173176
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask)Test.java"/>

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

+419-20
Large diffs are not rendered by default.

core/src/main/java/kafka/server/share/ShareFetchUtils.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
114114
shareFetch.batchSize(),
115115
shareFetch.maxFetchRecords() - acquiredRecordsCount,
116116
shareFetchPartitionData.fetchOffset(),
117-
fetchPartitionData
117+
fetchPartitionData,
118+
shareFetch.fetchParams().isolation
118119
);
119120
log.trace("Acquired records: {} for topicIdPartition: {}", shareAcquiredRecords, topicIdPartition);
120121
// Maybe, in the future, check if no records are acquired, and we want to retry

core/src/main/java/kafka/server/share/SharePartition.java

+224-16
Large diffs are not rendered by default.

core/src/main/scala/kafka/server/KafkaApis.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
5757
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
5858
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
5959
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
60-
import org.apache.kafka.coordinator.group.{Group, GroupConfigManager, GroupCoordinator}
60+
import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator}
6161
import org.apache.kafka.coordinator.share.ShareCoordinator
6262
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
6363
import org.apache.kafka.server.{ClientMetricsManager, ProcessRole}
@@ -3210,7 +3210,7 @@ class KafkaApis(val requestChannel: RequestChannel,
32103210
shareFetchRequest.maxWait,
32113211
fetchMinBytes,
32123212
fetchMaxBytes,
3213-
FetchIsolation.HIGH_WATERMARK,
3213+
FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, groupConfigManager.groupConfig(groupId).map(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
32143214
clientMetadata,
32153215
true
32163216
)

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
181181

182182
when(sp0.canAcquireRecords()).thenReturn(true);
183183
when(sp1.canAcquireRecords()).thenReturn(false);
184-
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
184+
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
185185
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
186186

187187
// We are testing the case when the share partition is getting fetched for the first time, so for the first time
@@ -253,7 +253,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
253253

254254
when(sp0.canAcquireRecords()).thenReturn(true);
255255
when(sp1.canAcquireRecords()).thenReturn(false);
256-
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
256+
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
257257
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
258258

259259
// We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff
@@ -305,7 +305,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
305305

306306
when(sp0.canAcquireRecords()).thenReturn(true);
307307
when(sp1.canAcquireRecords()).thenReturn(false);
308-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
308+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
309309
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
310310
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
311311

@@ -418,7 +418,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
418418

419419
when(sp0.canAcquireRecords()).thenReturn(true);
420420
when(sp1.canAcquireRecords()).thenReturn(false);
421-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
421+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
422422
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
423423
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
424424

@@ -580,7 +580,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
580580
// sp1 can be acquired now
581581
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
582582
when(sp1.canAcquireRecords()).thenReturn(true);
583-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
583+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
584584
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
585585

586586
// when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch
@@ -676,7 +676,7 @@ public void testExceptionInMinBytesCalculation() {
676676
BROKER_TOPIC_STATS);
677677

678678
when(sp0.canAcquireRecords()).thenReturn(true);
679-
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
679+
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
680680
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
681681
doAnswer(invocation -> buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
682682

@@ -919,15 +919,15 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab
919919
new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE, MAX_FETCH_RECORDS,
920920
BROKER_TOPIC_STATS);
921921

922-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
922+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
923923
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
924-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
924+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
925925
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
926-
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
926+
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
927927
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
928-
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
928+
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
929929
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
930-
when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
930+
when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
931931
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
932932

933933
// All 5 partitions are acquirable.
@@ -1015,9 +1015,9 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab
10151015
new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE, MAX_FETCH_RECORDS,
10161016
BROKER_TOPIC_STATS);
10171017

1018-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
1018+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
10191019
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
1020-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
1020+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
10211021
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
10221022

10231023
// Only 2 out of 5 partitions are acquirable.

core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,10 @@ public void testProcessFetchResponse() {
9898
when(sp0.nextFetchOffset()).thenReturn((long) 3);
9999
when(sp1.nextFetchOffset()).thenReturn((long) 3);
100100

101-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
101+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
102102
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
103103
.setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
104-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
104+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
105105
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
106106
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
107107

@@ -163,8 +163,8 @@ public void testProcessFetchResponseWithEmptyRecords() {
163163
when(sp0.nextFetchOffset()).thenReturn((long) 3);
164164
when(sp1.nextFetchOffset()).thenReturn((long) 3);
165165

166-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
167-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
166+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
167+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
168168

169169
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
170170
sharePartitions.put(tp0, sp0);
@@ -221,11 +221,11 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() {
221221
when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5);
222222
when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4);
223223

224-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
224+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
225225
ShareAcquiredRecords.empty(),
226226
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
227227
.setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
228-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
228+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
229229
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
230230
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)),
231231
ShareAcquiredRecords.empty());
@@ -309,7 +309,7 @@ public void testProcessFetchResponseWhenNoRecordsAreAcquired() {
309309
// Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition.
310310
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty());
311311
doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean());
312-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
312+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
313313

314314
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
315315
new SimpleRecord("0".getBytes(), "v".getBytes()),
@@ -390,10 +390,10 @@ public void testProcessFetchResponseWithMaxFetchRecords() throws IOException {
390390
records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
391391
OptionalInt.empty(), false);
392392

393-
when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0, fetchPartitionData1)).thenReturn(
393+
when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0, fetchPartitionData1, FetchIsolation.HIGH_WATERMARK)).thenReturn(
394394
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
395395
.setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 1)));
396-
when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0, fetchPartitionData2)).thenReturn(
396+
when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0, fetchPartitionData2, FetchIsolation.HIGH_WATERMARK)).thenReturn(
397397
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
398398
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
399399

@@ -444,7 +444,7 @@ public void testProcessFetchResponseWithOffsetFetchException() {
444444
// Mock the replicaManager.fetchOffsetForTimestamp method to throw exception.
445445
Throwable exception = new FencedLeaderEpochException("Fenced exception");
446446
doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean());
447-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
447+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
448448

449449
// When no records are acquired from share partition.
450450
List<ShareFetchPartitionData> responseData = List.of(

core/src/test/java/kafka/server/share/SharePartitionManagerTest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -1119,13 +1119,13 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException {
11191119
when(sp1.canAcquireRecords()).thenReturn(true);
11201120
when(sp2.canAcquireRecords()).thenReturn(true);
11211121
when(sp3.canAcquireRecords()).thenReturn(true);
1122-
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
1122+
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
11231123
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
1124-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
1124+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
11251125
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
1126-
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
1126+
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
11271127
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
1128-
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
1128+
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
11291129
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
11301130
// Mocks to have fetch offset metadata match for share partitions to avoid any extra calls to replicaManager.readFromLog.
11311131
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
@@ -1788,8 +1788,8 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() {
17881788
when(sp1.canAcquireRecords()).thenReturn(false);
17891789
when(sp2.maybeAcquireFetchLock()).thenReturn(true);
17901790
when(sp2.canAcquireRecords()).thenReturn(false);
1791-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any())).thenReturn(ShareAcquiredRecords.empty());
1792-
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any())).thenReturn(ShareAcquiredRecords.empty());
1791+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(ShareAcquiredRecords.empty());
1792+
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(ShareAcquiredRecords.empty());
17931793

17941794
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
17951795
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
@@ -2034,7 +2034,7 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() {
20342034
when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId))).thenReturn(List.of(tp1, tp3));
20352035

20362036
doAnswer(invocation -> buildLogReadResult(List.of(tp1))).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
2037-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
2037+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
20382038
// Release acquired records on session close request for tp1 and tp3.
20392039
sharePartitionManager.releaseSession(groupId, memberId);
20402040

@@ -2604,7 +2604,7 @@ public void testSharePartitionPartialInitializationFailure() throws Exception {
26042604
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
26052605
when(sp1.canAcquireRecords()).thenReturn(true);
26062606
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
2607-
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
2607+
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
26082608

26092609
// Fail initialization for tp2.
26102610
SharePartition sp2 = mock(SharePartition.class);

0 commit comments

Comments
 (0)