Skip to content

Commit 6430fb5

Browse files
authored
MINOR: Add note that streams groups are in early access (#19434)
Add a note to the group protocol configuration that streams groups are in early access and should not be used in production. Also update an outdated comment related to disabling the protocol. Reviewers: Bruno Cadonna <[email protected]>
1 parent 699ae1b commit 6430fb5

File tree

2 files changed

+6
-8
lines changed

2 files changed

+6
-8
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

+5-7
Original file line numberDiff line numberDiff line change
@@ -2629,16 +2629,15 @@ class KafkaApis(val requestChannel: RequestChannel,
26292629

26302630
}
26312631

2632-
private def isStreamsGroupProtocolEnabled(): Boolean = {
2632+
private def isStreamsGroupProtocolEnabled: Boolean = {
26332633
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
26342634
}
26352635

26362636
def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
26372637
val streamsGroupHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest]
26382638

2639-
if (!isStreamsGroupProtocolEnabled()) {
2640-
// The API is not supported by the "old" group coordinator (the default). If the
2641-
// new one is not enabled, we fail directly here.
2639+
if (!isStreamsGroupProtocolEnabled) {
2640+
// The API is not enabled by default. If it is not enabled, we fail directly here.
26422641
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
26432642
CompletableFuture.completedFuture[Unit](())
26442643
} else if (!authHelper.authorize(request.context, READ, GROUP, streamsGroupHeartbeatRequest.data.groupId)) {
@@ -2729,9 +2728,8 @@ class KafkaApis(val requestChannel: RequestChannel,
27292728
val streamsGroupDescribeRequest = request.body[StreamsGroupDescribeRequest]
27302729
val includeAuthorizedOperations = streamsGroupDescribeRequest.data.includeAuthorizedOperations
27312730

2732-
if (!isStreamsGroupProtocolEnabled()) {
2733-
// The API is not supported by the "old" group coordinator (the default). If the
2734-
// new one is not enabled, we fail directly here.
2731+
if (!isStreamsGroupProtocolEnabled) {
2732+
// The API is not enabled by default. If it is not enabled, we fail directly here.
27352733
requestHelper.sendMaybeThrottle(request, request.body[StreamsGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
27362734
CompletableFuture.completedFuture[Unit](())
27372735
} else {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class GroupCoordinatorConfig {
6060
///
6161
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols";
6262
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols." +
63-
"The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production.";
63+
"The " + Group.GroupType.SHARE + " and " + Group.GroupType.STREAMS + " rebalance protocols are in early access and therefore must not be used in production.";
6464
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
6565
Group.GroupType.CLASSIC.toString(),
6666
Group.GroupType.CONSUMER.toString());

0 commit comments

Comments
 (0)