Skip to content

KAFKA-19316: added share_group_command_test.py system tests #19774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/kafkatest/services/console_share_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadSer
"collect_default": False}
}

def __init__(self, context, num_nodes, kafka, topic, group_id="test-share-consumer-group",
def __init__(self, context, num_nodes, kafka, topic, group_id="test-share-group",
message_validator=None, share_consumer_timeout_ms=None, version=DEV_BRANCH,
client_id="console-share-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, print_partition=False,
Expand Down
77 changes: 76 additions & 1 deletion tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1749,6 +1749,27 @@ def list_consumer_groups(self, node=None, command_config=None, state=None, type=
if type is not None:
cmd += " --type %s" % type
return self.run_cli_tool(node, cmd)

def list_share_groups(self, node=None, command_config=None, state=None):
""" Get list of share groups.
"""
if node is None:
node = self.nodes[0]
share_group_script = self.path.script("kafka-share-groups.sh", node)

if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s %s --list" % \
(share_group_script,
self.bootstrap_servers(self.security_protocol),
command_config)
if state is not None:
cmd += " --state %s" % state
return self.run_cli_tool(node, cmd)

def describe_consumer_group(self, group, node=None, command_config=None):
""" Describe a consumer group.
Expand All @@ -1771,10 +1792,64 @@ def describe_consumer_group(self, group, node=None, command_config=None):
output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
if not (line.startswith("SLF4J") or line.startswith("TOPIC") or line.startswith("Could not fetch offset")):
if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.startswith("Could not fetch offset")):
output += line
self.logger.debug(output)
return output

def describe_share_group(self, group, node=None, command_config=None):
""" Describe a share group.
"""
if node is None:
node = self.nodes[0]
share_group_script = self.path.script("kafka-share-groups.sh", node)

if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
(share_group_script,
self.bootstrap_servers(self.security_protocol),
command_config, group)

output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.startswith("Could not fetch offset")):
output += line
self.logger.debug(output)
return output

def describe_share_group_members(self, group, node=None, command_config=None):
""" Describe members of a share group.
"""
if node is None:
node = self.nodes[0]
share_group_script = self.path.script("kafka-share-groups.sh", node)

if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
(share_group_script,
self.bootstrap_servers(self.security_protocol),
command_config, group)

cmd += " --members"

output_lines = []
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.strip() == ""):
output_lines.append(line.strip())
self.logger.debug(output_lines)
return output_lines

def describe_quorum(self, node=None):
"""Run the describe quorum command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

group.id={{ group_id|default('test-share-consumer-group') }}
group.id={{ group_id|default('test-share-group') }}

{% if client_id is defined and client_id is not none %}
client.id={{ client_id }}
Expand Down
128 changes: 128 additions & 0 deletions tests/kafkatest/tests/core/share_group_command_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark import matrix
from ducktape.mark.resource import cluster

from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.console_share_consumer import ConsoleShareConsumer
from kafkatest.services.security.security_config import SecurityConfig

import os
import re

TOPIC = "topic-share-group-command"


class ShareGroupCommandTest(Test):
"""
Tests ShareGroupCommand
"""
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/share_group_command"
COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")

def __init__(self, test_context):
super(ShareGroupCommandTest, self).__init__(test_context)
self.num_brokers = 1
self.topics = {
TOPIC: {'partitions': 1, 'replication-factor': 1}
}

def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
None, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
controller_num_nodes_override=self.num_brokers)
self.kafka.start()

def start_share_consumer(self):
self.share_consumer = ConsoleShareConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
share_consumer_timeout_ms=None)
self.share_consumer.start()

def setup_and_verify(self, security_protocol, group=None, describe_members=False):
self.start_kafka(security_protocol, security_protocol)
self.start_share_consumer()
share_consumer_node = self.share_consumer.nodes[0]
wait_until(lambda: self.share_consumer.alive(share_consumer_node),
timeout_sec=20, backoff_sec=.2, err_msg="Share consumer was too slow to start")
kafka_node = self.kafka.nodes[0]
if security_protocol is not SecurityConfig.PLAINTEXT:
prop_file = str(self.kafka.security_config.client_config())
self.logger.debug(prop_file)
kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)

# Verify ShareConsumerGroupCommand lists expected consumer groups
command_config_file = self.COMMAND_CONFIG_FILE

if group:
if describe_members:
def has_expected_share_group_member():
output = self.kafka.describe_share_group_members(group=group, node=kafka_node, command_config=command_config_file)
return len(output) == 1 and all("test-share-group" in line for line in output)
wait_until(has_expected_share_group_member, timeout_sec=10, err_msg="Timed out waiting to describe members of the share group.")
else:
wait_until(lambda: re.search("topic-share-group-command",self.kafka.describe_share_group(group=group, node=kafka_node, command_config=command_config_file)), timeout_sec=10,
err_msg="Timed out waiting to describe expected share groups.")
else:
wait_until(lambda: "test-share-group" in self.kafka.list_share_groups(node=kafka_node, command_config=command_config_file), timeout_sec=10,
err_msg="Timed out waiting to list expected share groups.")

self.share_consumer.stop()

@cluster(num_nodes=3)
@matrix(
security_protocol=['PLAINTEXT', 'SSL'],
metadata_quorum=[quorum.isolated_kraft],
use_share_groups=[True]
)
def test_list_share_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
"""
Tests if ShareGroupCommand is listing correct share groups
:return: None
"""
self.setup_and_verify(security_protocol)

@cluster(num_nodes=3)
@matrix(
security_protocol=['PLAINTEXT', 'SSL'],
metadata_quorum=[quorum.isolated_kraft],
use_share_groups=[True],
)
def test_describe_share_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
"""
Tests if ShareGroupCommand is describing a share group correctly
:return: None
"""
self.setup_and_verify(security_protocol, group="test-share-group")

@cluster(num_nodes=3)
@matrix(
security_protocol=['PLAINTEXT', 'SSL'],
metadata_quorum=[quorum.isolated_kraft],
use_share_groups=[True],
)
def test_describe_share_group_members(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
"""
Tests if ShareGroupCommand is describing the members of a share group correctly
:return: None
"""
self.setup_and_verify(security_protocol, group="test-share-group", describe_members=True)