Skip to content

Commit 5c63b45

Browse files
authored
KAFKA-19202: Enable KIP-1071 in streams_smoke_test.py (#19560)
Enables KIP-1071 (`group.protocol=streams`) in the first streams system test `streams_smoke_test.py`. All tests using KIP-1071 cannot use `KafkaTest` anymore, since we need to customize the broker configuration. The corresponding functionality is added to `BaseStreamsTest`, which all streams tests will have to extend from now on. There are some left-overs from ZK in the tests that I copied from 'KafkaTest'. They need to be cleaned up, but this should be done in a separate PR.
1 parent 019459e commit 5c63b45

File tree

7 files changed

+53
-19
lines changed

7 files changed

+53
-19
lines changed

tests/kafkatest/services/kafka/config_property.py

+2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@
8181
SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR = "share.coordinator.state.topic.min.isr"
8282
SHARE_GROUP_ENABLE = "group.share.enable"
8383

84+
UNSTABLE_API_VERSIONS_ENABLE = "unstable.api.versions.enable"
85+
8486
"""
8587
From KafkaConfig.scala
8688

tests/kafkatest/services/kafka/kafka.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
206206
consumer_group_migration_policy=None,
207207
dynamicRaftQuorum=False,
208208
use_transactions_v2=False,
209-
use_share_groups=None
209+
use_share_groups=None,
210+
use_streams_groups=False
210211
):
211212
"""
212213
:param context: test context
@@ -271,6 +272,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
271272
:param dynamicRaftQuorum: When true, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag
272273
:param use_transactions_v2: When true, uses transaction.version=2 which utilizes the new transaction protocol introduced in KIP-890
273274
:param use_share_groups: When true, enables the use of share groups introduced in KIP-932
275+
:param use_streams_groups: When true, enables the use of streams groups introduced in KIP-1071
274276
"""
275277

276278
self.zk = zk
@@ -296,6 +298,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
296298
# Assign the determined value.
297299
self.use_transactions_v2 = use_transactions_v2
298300
self.use_share_groups = use_share_groups
301+
self.use_streams_groups = use_streams_groups
299302

300303
# Set consumer_group_migration_policy based on context and arguments.
301304
if consumer_group_migration_policy is None:
@@ -776,9 +779,17 @@ def prop_file(self, node):
776779
for prop in self.per_node_server_prop_overrides.get(self.idx(node), []):
777780
override_configs[prop[0]] = prop[1]
778781

782+
enabledProtocols = 'classic,consumer'
783+
779784
if self.use_share_groups is not None and self.use_share_groups is True:
780785
override_configs[config_property.SHARE_GROUP_ENABLE] = str(self.use_share_groups)
781-
override_configs[config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS] = 'classic,consumer,share'
786+
enabledProtocols += ',share'
787+
788+
if self.use_streams_groups is True:
789+
override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] = str(True)
790+
enabledProtocols += ',streams'
791+
792+
override_configs[config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS] = enabledProtocols
782793

783794
#update template configs with test override configs
784795
configs.update(override_configs)

tests/kafkatest/services/streams.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -320,13 +320,14 @@ def start_node(self, node):
320320
class StreamsSmokeTestBaseService(StreamsTestBaseService):
321321
"""Base class for Streams Smoke Test services providing some common settings and functionality"""
322322

323-
def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
323+
def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', group_protocol = 'classic', num_threads = 3, replication_factor = 3):
324324
super(StreamsSmokeTestBaseService, self).__init__(test_context,
325325
kafka,
326326
"org.apache.kafka.streams.tests.StreamsSmokeTest",
327327
command)
328328
self.NUM_THREADS = num_threads
329329
self.PROCESSING_GUARANTEE = processing_guarantee
330+
self.GROUP_PROTOCOL = group_protocol
330331
self.KAFKA_STREAMS_VERSION = ""
331332
self.UPGRADE_FROM = None
332333
self.REPLICATION_FACTOR = replication_factor
@@ -341,6 +342,7 @@ def prop_file(self):
341342
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
342343
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
343344
streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
345+
streams_property.GROUP_PROTOCOL: self.GROUP_PROTOCOL,
344346
streams_property.NUM_THREADS: self.NUM_THREADS,
345347
"replication.factor": self.REPLICATION_FACTOR,
346348
"num.standby.replicas": 2,
@@ -436,8 +438,8 @@ def start_cmd(self, node):
436438
return cmd
437439

438440
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
439-
def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3, replication_factor = 3):
440-
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, num_threads, replication_factor)
441+
def __init__(self, test_context, kafka, processing_guarantee, group_protocol = 'classic', num_threads = 3, replication_factor = 3):
442+
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, group_protocol, num_threads, replication_factor)
441443

442444
class StreamsEosTestDriverService(StreamsEosTestBaseService):
443445
def __init__(self, test_context, kafka):

tests/kafkatest/services/streams_property.py

+1
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@
2121
KAFKA_SERVERS = "bootstrap.servers"
2222
NUM_THREADS = "num.stream.threads"
2323
PROCESSING_GUARANTEE = "processing.guarantee"
24+
GROUP_PROTOCOL = "group.protocol"

tests/kafkatest/tests/streams/base_streams_test.py

+21-4
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,39 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from ducktape.tests.test import Test
1617
from ducktape.utils.util import wait_until
1718
from kafkatest.services.verifiable_consumer import VerifiableConsumer
1819
from kafkatest.services.verifiable_producer import VerifiableProducer
19-
from kafkatest.tests.kafka_test import KafkaTest
20+
from kafkatest.services.kafka import KafkaService
2021

2122

22-
class BaseStreamsTest(KafkaTest):
23+
class BaseStreamsTest(Test):
2324
"""
2425
Helper class that contains methods for producing and consuming
2526
messages and verification of results from log files
2627
2728
Extends KafkaTest which manages setting up Kafka Cluster and Zookeeper
2829
see tests/kafkatest/tests/kafka_test.py for more info
2930
"""
30-
def __init__(self, test_context, topics, num_controllers=1, num_brokers=3):
31-
super(BaseStreamsTest, self).__init__(test_context, num_controllers, num_brokers, topics)
31+
def __init__(self, test_context, topics, num_controllers=1, num_brokers=3):
32+
self.num_controllers = num_controllers
33+
self.num_brokers = num_brokers
34+
self.topics = topics
35+
36+
self.kafka = KafkaService(
37+
test_context, self.num_brokers,
38+
None, topics=self.topics,
39+
controller_num_nodes_override=self.num_controllers,
40+
use_streams_groups=True,
41+
server_prop_overrides=[
42+
[ "group.streams.min.session.timeout.ms", "10000" ], # Need to up the lower bound
43+
[ "group.streams.session.timeout.ms", "10000" ] # As in classic groups, set this to 10s
44+
]
45+
)
46+
47+
def setUp(self):
48+
self.kafka.start()
3249

3350
def get_consumer(self, client_id, topic, num_messages):
3451
return VerifiableConsumer(self.test_context,

tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ def __init__(self, test_context):
4141
num_brokers=1)
4242

4343
def setUp(self):
44-
if self.zk:
45-
self.zk.start()
44+
# do not start kafka
45+
pass
4646

4747
@cluster(num_nodes=7)
4848
@matrix(metadata_quorum=[quorum.combined_kraft])

tests/kafkatest/tests/streams/streams_smoke_test.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717
from ducktape.mark.resource import cluster
1818

1919
from kafkatest.services.kafka import quorum
20-
from kafkatest.tests.kafka_test import KafkaTest
2120
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
21+
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
2222

23-
class StreamsSmokeTest(KafkaTest):
23+
class StreamsSmokeTest(BaseStreamsTest):
2424
"""
2525
Simple test of Kafka Streams.
2626
"""
2727

2828
def __init__(self, test_context):
29-
super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
29+
super(StreamsSmokeTest, self).__init__(test_context, topics={
3030
'echo' : { 'partitions': 5, 'replication-factor': 1 },
3131
'data' : { 'partitions': 5, 'replication-factor': 1 },
3232
'min' : { 'partitions': 5, 'replication-factor': 1 },
@@ -49,11 +49,12 @@ def __init__(self, test_context):
4949
@cluster(num_nodes=8)
5050
@matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'],
5151
crash=[True, False],
52-
metadata_quorum=[quorum.combined_kraft])
53-
def test_streams(self, processing_guarantee, crash, metadata_quorum):
54-
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
55-
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
56-
processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
52+
metadata_quorum=[quorum.combined_kraft],
53+
group_protocol=["classic", "streams"])
54+
def test_streams(self, processing_guarantee, crash, metadata_quorum, group_protocol):
55+
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee, group_protocol)
56+
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee, group_protocol)
57+
processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee, group_protocol)
5758

5859
with processor1.node.account.monitor_log(processor1.STDOUT_FILE) as monitor1:
5960
processor1.start()

0 commit comments

Comments
 (0)