Skip to content

Commit 62fe528

Browse files
authored
KAFKA-19082: [1/4] Add client config for enable2PC and overloaded initProducerId (KIP-939) (#19429)
This is part of the client side changes required to enable 2PC for KIP-939 **Producer Config:** transaction.two.phase.commit.enable The default would be ‘false’. If set to ‘true’, the broker is informed that the client is participating in two phase commit protocol and transactions that this client starts never expire. **Overloaded InitProducerId method** If the value is 'true' then the corresponding field is set in the InitProducerIdRequest Reviewers: Justine Olshan <[email protected]>, Artem Livshits <[email protected]>
1 parent 3c05dfd commit 62fe528

File tree

10 files changed

+287
-73
lines changed

10 files changed

+287
-73
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
9999

100100
<suppress checks="JavaNCSS"
101-
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
101+
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaProducerTest).java"/>
102102

103103
<suppress checks="NPathComplexity"
104104
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -595,14 +595,17 @@ private TransactionManager configureTransactionState(ProducerConfig config,
595595

596596
if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
597597
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
598+
final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
598599
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
599600
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
601+
600602
transactionManager = new TransactionManager(
601603
logContext,
602604
transactionalId,
603605
transactionTimeoutMs,
604606
retryBackoffMs,
605-
apiVersions
607+
apiVersions,
608+
enable2PC
606609
);
607610

608611
if (transactionManager.isTransactional())
@@ -617,8 +620,13 @@ private TransactionManager configureTransactionState(ProducerConfig config,
617620
}
618621

619622
/**
623+
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
624+
* with additional capabilities to keep a previously prepared transaction.
625+
*
620626
* Needs to be called before any other methods when the {@code transactional.id} is set in the configuration.
621-
* This method does the following:
627+
*
628+
* When {@code keepPreparedTxn} is {@code false}, this behaves like the standard transactional
629+
* initialization where the method does the following:
622630
* <ol>
623631
* <li>Ensures any transactions initiated by previous instances of the producer with the same
624632
* {@code transactional.id} are completed. If the previous instance had failed with a transaction in
@@ -627,26 +635,38 @@ private TransactionManager configureTransactionState(ProducerConfig config,
627635
* <li>Gets the internal producer id and epoch, used in all future transactional
628636
* messages issued by the producer.</li>
629637
* </ol>
638+
*
639+
* <p>
640+
* When {@code keepPreparedTxn} is set to {@code true}, the producer does <em>not</em> automatically abort existing
641+
* transactions. Instead, it enters a recovery mode allowing only finalization of those previously
642+
* prepared transactions.
643+
* This behavior is especially crucial for 2PC scenarios, where transactions should remain intact
644+
* until the external transaction manager decides whether to commit or abort.
645+
* <p>
646+
*
647+
* @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC
648+
* recovery), false to abort existing transactions and behave like
649+
* the standard initTransactions.
650+
*
630651
* Note that this method will raise {@link TimeoutException} if the transactional state cannot
631652
* be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException}
632653
* if interrupted. It is safe to retry in either case, but once the transactional state has been successfully
633654
* initialized, this method should no longer be used.
634655
*
635-
* @throws IllegalStateException if no {@code transactional.id} has been configured
636-
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
637-
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
638-
* @throws org.apache.kafka.common.errors.AuthorizationException error indicating that the configured
639-
* transactional.id is not authorized, or the idempotent producer id is unavailable. See the exception for
640-
* more details. User may retry this function call after fixing the permission.
641-
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
656+
* @throws IllegalStateException if no {@code transactional.id} is configured
657+
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not
658+
* support transactions (i.e. if its version is lower than 0.11.0.0)
659+
* @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured
660+
* {@code transactional.id} is unauthorized either for normal transaction writes or 2PC.
661+
* @throws KafkaException if the producer encounters a fatal error or any other unexpected error
642662
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
643663
* @throws InterruptException if the thread is interrupted while blocked
644664
*/
645-
public void initTransactions() {
665+
public void initTransactions(boolean keepPreparedTxn) {
646666
throwIfNoTransactionManager();
647667
throwIfProducerClosed();
648668
long now = time.nanoseconds();
649-
TransactionalRequestResult result = transactionManager.initializeTransactions();
669+
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
650670
sender.wakeup();
651671
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
652672
producerMetrics.recordInit(time.nanoseconds() - now);

clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public MockProducer() {
142142
}
143143

144144
@Override
145-
public void initTransactions() {
145+
public void initTransactions(boolean keepPreparedTxn) {
146146
verifyNotClosed();
147147
verifyNotFenced();
148148
if (this.transactionInitialized) {

clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,14 @@ public interface Producer<K, V> extends Closeable {
4242
/**
4343
* See {@link KafkaProducer#initTransactions()}
4444
*/
45-
void initTransactions();
45+
default void initTransactions() {
46+
initTransactions(false);
47+
}
48+
49+
/**
50+
* See {@link KafkaProducer#initTransactions(boolean)}
51+
*/
52+
void initTransactions(boolean keepPreparedTxn);
4653

4754
/**
4855
* See {@link KafkaProducer#beginTransaction()}

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig {
355355
"By default the TransactionId is not configured, which means transactions cannot be used. " +
356356
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code>.";
357357

358+
/** <code> transaction.two.phase.commit.enable </code> */
359+
public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable";
360+
private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " +
361+
"two phase commit protocol and transactions that this client starts never expire.";
362+
358363
/**
359364
* <code>security.providers</code>
360365
*/
@@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig {
526531
new ConfigDef.NonEmptyString(),
527532
Importance.LOW,
528533
TRANSACTIONAL_ID_DOC)
534+
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
535+
Type.BOOLEAN,
536+
false,
537+
Importance.LOW,
538+
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
529539
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
530540
Type.STRING,
531541
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
@@ -609,6 +619,20 @@ private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
609619
if (!idempotenceEnabled && userConfiguredTransactions) {
610620
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
611621
}
622+
623+
// Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
624+
// In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
625+
// transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
626+
// decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
627+
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
628+
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
629+
if (enable2PC && userConfiguredTransactionTimeout) {
630+
throw new ConfigException(
631+
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
632+
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
633+
" is set to true. Transactions will not expire with two-phase commit enabled."
634+
);
635+
}
612636
}
613637

614638
private static String parseAcks(String acksString) {

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public class TransactionManager {
143143
private volatile boolean clientSideEpochBumpRequired = false;
144144
private volatile long latestFinalizedFeaturesEpoch = -1;
145145
private volatile boolean isTransactionV2Enabled = false;
146+
private final boolean enable2PC;
146147

147148
private enum State {
148149
UNINITIALIZED,
@@ -203,7 +204,8 @@ public TransactionManager(final LogContext logContext,
203204
final String transactionalId,
204205
final int transactionTimeoutMs,
205206
final long retryBackoffMs,
206-
final ApiVersions apiVersions) {
207+
final ApiVersions apiVersions,
208+
final boolean enable2PC) {
207209
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
208210
this.transactionalId = transactionalId;
209211
this.log = logContext.logger(TransactionManager.class);
@@ -220,6 +222,7 @@ public TransactionManager(final LogContext logContext,
220222
this.retryBackoffMs = retryBackoffMs;
221223
this.txnPartitionMap = new TxnPartitionMap(logContext);
222224
this.apiVersions = apiVersions;
225+
this.enable2PC = enable2PC;
223226
}
224227

225228
/**
@@ -279,11 +282,18 @@ protected boolean shouldPoisonStateOnInvalidTransition() {
279282
return Thread.currentThread() instanceof Sender.SenderThread;
280283
}
281284

282-
public synchronized TransactionalRequestResult initializeTransactions() {
283-
return initializeTransactions(ProducerIdAndEpoch.NONE);
285+
synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
286+
return initializeTransactions(producerIdAndEpoch, false);
284287
}
285288

286-
synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
289+
public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
290+
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
291+
}
292+
293+
synchronized TransactionalRequestResult initializeTransactions(
294+
ProducerIdAndEpoch producerIdAndEpoch,
295+
boolean keepPreparedTxn
296+
) {
287297
maybeFailWithError();
288298

289299
boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
@@ -292,14 +302,20 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
292302
if (!isEpochBump) {
293303
transitionTo(State.INITIALIZING);
294304
log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");
305+
if (keepPreparedTxn) {
306+
log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions");
307+
}
295308
} else {
296309
log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch);
297310
}
298311
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
299312
.setTransactionalId(transactionalId)
300313
.setTransactionTimeoutMs(transactionTimeoutMs)
301314
.setProducerId(producerIdAndEpoch.producerId)
302-
.setProducerEpoch(producerIdAndEpoch.epoch);
315+
.setProducerEpoch(producerIdAndEpoch.epoch)
316+
.setEnable2Pc(enable2PC)
317+
.setKeepPreparedTxn(keepPreparedTxn);
318+
303319
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
304320
isEpochBump);
305321
enqueueRequest(handler);

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.kafka.common.requests.EndTxnResponse;
7676
import org.apache.kafka.common.requests.FindCoordinatorRequest;
7777
import org.apache.kafka.common.requests.FindCoordinatorResponse;
78+
import org.apache.kafka.common.requests.InitProducerIdRequest;
7879
import org.apache.kafka.common.requests.InitProducerIdResponse;
7980
import org.apache.kafka.common.requests.JoinGroupRequest;
8081
import org.apache.kafka.common.requests.MetadataResponse;
@@ -103,6 +104,7 @@
103104
import org.junit.jupiter.api.Test;
104105
import org.junit.jupiter.api.TestInfo;
105106
import org.junit.jupiter.params.ParameterizedTest;
107+
import org.junit.jupiter.params.provider.CsvSource;
106108
import org.junit.jupiter.params.provider.ValueSource;
107109
import org.mockito.MockedStatic;
108110
import org.mockito.Mockito;
@@ -1315,7 +1317,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception {
13151317
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
13161318
FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
13171319

1318-
Future<?> future = executor.submit(producer::initTransactions);
1320+
Future<?> future = executor.submit(() -> producer.initTransactions());
13191321
TestUtils.waitForCondition(client::hasInFlightRequests,
13201322
"Timed out while waiting for expected `InitProducerId` request to be sent");
13211323

@@ -1390,6 +1392,59 @@ public void testInitTransactionWhileThrottled() {
13901392
}
13911393
}
13921394

1395+
@ParameterizedTest
1396+
@CsvSource({
1397+
"true, false",
1398+
"true, true",
1399+
"false, true"
1400+
})
1401+
public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) {
1402+
Map<String, Object> configs = new HashMap<>();
1403+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
1404+
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
1405+
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
1406+
if (enable2PC) {
1407+
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
1408+
}
1409+
1410+
Time time = new MockTime(1);
1411+
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
1412+
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
1413+
MockClient client = new MockClient(time, metadata);
1414+
client.updateMetadata(initialUpdateResponse);
1415+
1416+
// Capture flags from the InitProducerIdRequest
1417+
boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc]
1418+
1419+
client.prepareResponse(
1420+
request -> request instanceof FindCoordinatorRequest &&
1421+
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
1422+
FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE));
1423+
1424+
client.prepareResponse(
1425+
request -> {
1426+
if (request instanceof InitProducerIdRequest) {
1427+
InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
1428+
requestFlags[0] = initRequest.data().keepPreparedTxn();
1429+
requestFlags[1] = initRequest.data().enable2Pc();
1430+
return true;
1431+
}
1432+
return false;
1433+
},
1434+
initProducerIdResponse(1L, (short) 5, Errors.NONE));
1435+
1436+
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
1437+
new StringSerializer(), metadata, client, null, time)) {
1438+
producer.initTransactions(keepPreparedTxn);
1439+
1440+
// Verify request flags match expected values
1441+
assertEquals(keepPreparedTxn, requestFlags[0],
1442+
"keepPreparedTxn flag should match input parameter");
1443+
assertEquals(enable2PC, requestFlags[1],
1444+
"enable2Pc flag should match producer configuration");
1445+
}
1446+
}
1447+
13931448
@Test
13941449
public void testClusterAuthorizationFailure() throws Exception {
13951450
int maxBlockMs = 500;

clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,27 @@ void testUpperboundCheckOfEnableIdempotence() {
145145
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
146146
assertDoesNotThrow(() -> new ProducerConfig(configs));
147147
}
148+
149+
@Test
150+
void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
151+
Map<String, Object> configs = new HashMap<>();
152+
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
153+
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
154+
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
155+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
156+
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
157+
configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
158+
159+
ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
160+
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
161+
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG));
162+
163+
// Verify that setting one but not the other is valid
164+
configs.remove(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
165+
assertDoesNotThrow(() -> new ProducerConfig(configs));
166+
167+
configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
168+
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
169+
assertDoesNotThrow(() -> new ProducerConfig(configs));
170+
}
148171
}

0 commit comments

Comments
 (0)