Skip to content

[QUESTION] aiokafka stucks after group rejoin #847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
sam-dee opened this issue Jul 19, 2022 · 7 comments
Open

[QUESTION] aiokafka stucks after group rejoin #847

sam-dee opened this issue Jul 19, 2022 · 7 comments
Labels

Comments

@sam-dee
Copy link

sam-dee commented Jul 19, 2022

Description

We have long running service, which processes messages through the aiokafka, running in coroutine. Once in a few days Failed fetch messages from 1: [Error 7] RequestTimedOutError happens for some reason and after that no messages are consumed. It seems to rejoin the group properly after RequestTimedOutError but nothing happens.

Environment

The service is deployed into python:3.10-slim Docker container

  • aiokafka version: 0.7.2
  • kafka-python version: 2.0.2

Coroutine code used

async def listen_kafka():
    consumer = AIOKafkaConsumer(
        tasks_topic,
        group_id=f"{tasks_topic}_group",
        bootstrap_servers=kafka_addr,
        session_timeout_ms=45000,
        enable_auto_commit=False,
        value_deserializer=lambda x: loads(x.decode('utf-8')))
    try:
        while True:
            try:
                await consumer.start()
            except KafkaConnectionError as e:
                await asyncio.sleep(5)
                log.error(e)
            else:
                log.info("Connection to Kafka established")
                break
        try:
            async for msg in consumer:
                if type(msg) is Exception:
                    log.error(msg)
                    continue
                await dispatch(msg.value)
                await consumer.commit()

        except Exception as e:
            log.error(e)
        finally:
            await consumer.stop()

    except asyncio.CancelledError:
        log.info("Stopping consumer...")
        await consumer.stop()
        log.info("Listen Kafka task stopped")

Unfortunately, no decent text logs left yet we have a screenshot that shows the situation fairly enough.
2022-07-19 10 47 02
At the time the screenshot was taken plenty of tasks had been in kafka. They were successfully processed after service restart.

Do you have any ideas how we can overcome or at least debug the problem?

@sam-dee
Copy link
Author

sam-dee commented Aug 3, 2022

I managed to catch the problem with DEBUG logging level on. That's what I've got.

2022-07-27 14:31:53.159 DEBUG group_coordinator - _do_heartbeat: Heartbeat: ozon_topic_group[25] aiokafka-0.7.2-21483c38-31ff-4743-8280-c7f67e7e654b
2022-07-27 14:31:53.159 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 183: HeartbeatRequest_v1(group='ozon_topic_group', generation_id=25, member_id='aiokafka-0.7.2-21483c38-31ff-4743-8280-c7f67e7e654b')
2022-07-27 14:31:53.162 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 183: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2022-07-27 14:31:53.162 DEBUG group_coordinator - _do_heartbeat: Received successful heartbeat response for group ozon_topic_group
2022-07-27 14:31:53.510 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 668: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='ozon_topic', partitions=[(partition=0, error_code=0, highwater_offset=172, last_stable_offset=172, aborted_transactions=NULL, message_set=b'')])])
2022-07-27 14:31:53.511 DEBUG fetcher - _get_actions_per_node: Adding fetch request for partition TopicPartition(topic='ozon_topic', partition=0) at offset 172
2022-07-27 14:31:53.511 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 669: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, max_bytes=1048576)])])
2022-07-27 14:31:54.016 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 669: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='ozon_topic', partitions=[(partition=0, error_code=0, highwater_offset=172, last_stable_offset=172, aborted_transactions=NULL, message_set=b'')])])
2022-07-27 14:31:54.017 DEBUG fetcher - _get_actions_per_node: Adding fetch request for partition TopicPartition(topic='ozon_topic', partition=0) at offset 172
2022-07-27 14:31:54.017 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 670: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, max_bytes=1048576)])])
2022-07-27 14:31:54.522 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 670: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='ozon_topic', partitions=[(partition=0, error_code=0, highwater_offset=172, last_stable_offset=172, aborted_transactions=NULL, message_set=b'')])])
2022-07-27 14:31:54.522 DEBUG fetcher - _get_actions_per_node: Adding fetch request for partition TopicPartition(topic='ozon_topic', partition=0) at offset 172
2022-07-27 14:31:54.522 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 671: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, max_bytes=1048576)])])
2022-07-27 14:31:55.027 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 671: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='ozon_topic', partitions=[(partition=0, error_code=0, highwater_offset=172, last_stable_offset=172, aborted_transactions=NULL, message_set=b'')])])
2022-07-27 14:31:55.028 DEBUG fetcher - _get_actions_per_node: Adding fetch request for partition TopicPartition(topic='ozon_topic', partition=0) at offset 172
2022-07-27 14:31:55.028 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 672: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, max_bytes=1048576)])])
2022-07-27 14:31:55.534 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 672: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='ozon_topic', partitions=[(partition=0, error_code=0, highwater_offset=172, last_stable_offset=172, aborted_transactions=NULL, message_set=b'')])])
2022-07-27 14:31:55.535 DEBUG fetcher - _get_actions_per_node: Adding fetch request for partition TopicPartition(topic='ozon_topic', partition=0) at offset 172
2022-07-27 14:31:55.535 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 673: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, max_bytes=1048576)])])
2022-07-27 14:31:56.040 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 673: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='ozon_topic', partitions=[(partition=0, error_code=0, highwater_offset=172, last_stable_offset=172, aborted_transactions=NULL, message_set=b'')])])
2022-07-27 14:31:56.041 DEBUG fetcher - _get_actions_per_node: Adding fetch request for partition TopicPartition(topic='ozon_topic', partition=0) at offset 172
2022-07-27 14:31:56.041 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 674: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, max_bytes=1048576)])])
2022-07-27 14:31:56.161 DEBUG group_coordinator - _do_heartbeat: Heartbeat: ozon_topic_group[25] aiokafka-0.7.2-21483c38-31ff-4743-8280-c7f67e7e654b
2022-07-27 14:31:56.161 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 184: HeartbeatRequest_v1(group='ozon_topic_group', generation_id=25, member_id='aiokafka-0.7.2-21483c38-31ff-4743-8280-c7f67e7e654b')
2022-07-27 14:31:56.166 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 184: HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
2022-07-27 14:31:56.166 DEBUG group_coordinator - _do_heartbeat: Received successful heartbeat response for group ozon_topic_group
2022-07-27 14:31:56.547 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 674: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='ozon_topic', partitions=[(partition=0, error_code=0, highwater_offset=172, last_stable_offset=172, aborted_transactions=NULL, message_set=b'')])])
2022-07-27 14:31:56.547 DEBUG fetcher - _get_actions_per_node: Adding fetch request for partition TopicPartition(topic='ozon_topic', partition=0) at offset 172
2022-07-27 14:31:56.548 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 675: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, max_bytes=1048576)])])
2022-07-27 14:32:53.380 DEBUG group_coordinator - _do_commit_offsets: Sending offset-commit request with {TopicPartition(topic='ozon_topic', partition=0): OffsetAndMetadata(offset=172, metadata='')} for group ozon_topic_group to 1
2022-07-27 14:32:53.387 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 185: OffsetCommitRequest_v2(consumer_group='ozon_topic_group', consumer_group_generation_id=25, consumer_id='aiokafka-0.7.2-21483c38-31ff-4743-8280-c7f67e7e654b', retention_time=-1, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, metadata='')])])
2022-07-27 14:32:53.392 DEBUG group_coordinator - _do_heartbeat: Heartbeat: ozon_topic_group[25] aiokafka-0.7.2-21483c38-31ff-4743-8280-c7f67e7e654b
2022-07-27 14:32:53.396 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 186: HeartbeatRequest_v1(group='ozon_topic_group', generation_id=25, member_id='aiokafka-0.7.2-21483c38-31ff-4743-8280-c7f67e7e654b')
2022-07-27 14:32:53.397 DEBUG conn - close: Closing connection at 192.168.116.1:9092
2022-07-27 14:32:53.399 ERROR fetcher - _proc_fetch_request: Failed fetch messages from 1: [Error 7] RequestTimedOutError
2022-07-27 14:32:53.399 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 185: OffsetCommitResponse_v2(topics=[(topic='ozon_topic', partitions=[(partition=0, error_code=25)])])
2022-07-27 14:32:53.399 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 186: HeartbeatResponse_v1(throttle_time_ms=0, error_code=25)
2022-07-27 14:32:53.400 DEBUG client - _get_conn: Initiating connection to node 1 at 192.168.116.1:9092
2022-07-27 14:32:53.405 ERROR group_coordinator - _do_commit_offsets: OffsetCommit failed for group ozon_topic_group due to group error ([Error 25] UnknownMemberIdError: ozon_topic_group), will rejoin
2022-07-27 14:32:53.406 ERROR group_coordinator - _do_commit_offsets: OffsetCommit failed for group ozon_topic_group due to group error ([Error 25] UnknownMemberIdError: ozon_topic_group), will rejoin
2022-07-27 14:32:53.406 WARNING group_coordinator - _maybe_do_autocommit: Auto offset commit failed: [Error 25] UnknownMemberIdError: ozon_topic_group
2022-07-27 14:32:53.407 WARNING group_coordinator - _do_heartbeat: Heartbeat failed: local member_id was not recognized; resetting and re-joining group
2022-07-27 14:32:53.407 ERROR group_coordinator - _heartbeat_routine: Heartbeat session expired - marking coordinator dead
2022-07-27 14:32:53.407 WARNING group_coordinator - coordinator_dead: Marking the coordinator dead (node 1)for group ozon_topic_group.
2022-07-27 14:32:53.407 DEBUG group_coordinator - _heartbeat_routine: Stopping heartbeat task
2022-07-27 14:32:53.415 DEBUG client - coordinator_lookup: Sending FindCoordinator request for key ozon_topic_group to broker 1
2022-07-27 14:32:53.415 DEBUG client - _get_conn: Initiating connection to node 1 at 192.168.116.1:9092
2022-07-27 14:32:53.421 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 1: ApiVersionRequest_v0()
2022-07-27 14:32:53.434 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=13), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=12), (api_key=4, min_version=0, max_version=6), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=7), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=9), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=5), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=7), (api_key=20, min_version=0, max_version=6), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=4), (api_key=23, min_version=0, max_version=4), (api_key=24, min_version=0, max_version=3), (api_key=25, min_version=0, max_version=3), (api_key=26, min_version=0, max_version=3), (api_key=27, min_version=0, max_version=1), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=4), (api_key=33, min_version=0, max_version=2), (api_key=34, min_version=0, max_version=2), (api_key=35, min_version=0, max_version=3), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=3), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=1), (api_key=49, min_version=0, max_version=1), (api_key=50, min_version=0, max_version=0), (api_key=51, min_version=0, max_version=0), (api_key=56, min_version=0, max_version=1), (api_key=57, min_version=0, max_version=0), (api_key=60, min_version=0, max_version=0), (api_key=61, min_version=0, max_version=0), (api_key=65, min_version=0, max_version=0), (api_key=66, min_version=0, max_version=0), (api_key=67, min_version=0, max_version=0)])
2022-07-27 14:32:53.435 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 14:32:53.436 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 2: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 14:32:53.436 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 3: FindCoordinatorRequest_v1(coordinator_key='ozon_topic_group', coordinator_type=0)
2022-07-27 14:32:53.439 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 2: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 14:32:53.440 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 3: FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=0, error_message='NONE', coordinator_id=1, host='192.168.116.1', port=9092)
2022-07-27 14:32:53.440 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 14:32:53.440 DEBUG client - coordinator_lookup: Received group coordinator response FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=0, error_message='NONE', coordinator_id=1, host='192.168.116.1', port=9092)
2022-07-27 14:32:53.440 INFO group_coordinator - ensure_coordinator_known: Discovered coordinator 1 for group ozon_topic_group
2022-07-27 14:32:53.440 DEBUG group_coordinator - _do_commit_offsets: Sending offset-commit request with {TopicPartition(topic='ozon_topic', partition=0): OffsetAndMetadata(offset=172, metadata='')} for group ozon_topic_group to 1
2022-07-27 14:32:53.441 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 187: OffsetCommitRequest_v2(consumer_group='ozon_topic_group', consumer_group_generation_id=-1, consumer_id='', retention_time=-1, topics=[(topic='ozon_topic', partitions=[(partition=0, offset=172, metadata='')])])
2022-07-27 14:32:53.444 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 187: OffsetCommitResponse_v2(topics=[(topic='ozon_topic', partitions=[(partition=0, error_code=0)])])
2022-07-27 14:32:53.444 DEBUG group_coordinator - _do_commit_offsets: Committed offset OffsetAndMetadata(offset=172, metadata='') for partition TopicPartition(topic='ozon_topic', partition=0)
2022-07-27 14:32:53.444 INFO group_coordinator - _on_join_prepare: Revoking previously assigned partitions frozenset({TopicPartition(topic='ozon_topic', partition=0)}) for group ozon_topic_group
2022-07-27 14:32:53.444 INFO kafka - on_partitions_revoked: Revoked frozenset({TopicPartition(topic='ozon_topic', partition=0)})
2022-07-27 14:32:53.445 INFO kafka - on_partitions_revoked: frozenset({TopicPartition(topic='ozon_topic', partition=0)}) offset is 172
2022-07-27 14:32:53.445 INFO group_coordinator - perform_group_join: (Re-)joining group ozon_topic_group
2022-07-27 14:32:53.445 DEBUG group_coordinator - perform_group_join: Sending JoinGroup (JoinGroupRequest_v2(group='ozon_topic_group', session_timeout=45000, rebalance_timeout=45000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\nozon_topic\x00\x00\x00\x00')])) to coordinator 1
2022-07-27 14:32:53.445 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 188: JoinGroupRequest_v2(group='ozon_topic_group', session_timeout=45000, rebalance_timeout=45000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\nozon_topic\x00\x00\x00\x00')])
2022-07-27 14:32:53.461 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 188: JoinGroupResponse_v2(throttle_time_ms=0, error_code=0, generation_id=27, group_protocol='roundrobin', leader_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', members=[(member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', member_metadata=b'\x00\x00\x00\x00\x00\x01\x00\nozon_topic\x00\x00\x00\x00')])
2022-07-27 14:32:53.461 DEBUG group_coordinator - perform_group_join: Join group response JoinGroupResponse_v2(throttle_time_ms=0, error_code=0, generation_id=27, group_protocol='roundrobin', leader_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', members=[(member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', member_metadata=b'\x00\x00\x00\x00\x00\x01\x00\nozon_topic\x00\x00\x00\x00')])
2022-07-27 14:32:53.461 INFO group_coordinator - perform_group_join: Joined group 'ozon_topic_group' (generation 27) with member_id aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931
2022-07-27 14:32:53.461 INFO group_coordinator - perform_group_join: Elected group leader -- performing partition assignments using roundrobin
2022-07-27 14:32:53.461 DEBUG group_coordinator - _perform_assignment: Performing assignment for group ozon_topic_group using strategy roundrobin with subscriptions {'aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931': ConsumerProtocolMemberMetadata(version=0, subscription=['ozon_topic'], user_data=b'')}
2022-07-27 14:32:53.462 DEBUG group_coordinator - _perform_assignment: Finished assignment for group ozon_topic_group: {'aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931': ConsumerProtocolMemberAssignment(version=0, assignment=[(topic='ozon_topic', partitions=[0])], user_data=b'')}
2022-07-27 14:32:53.462 DEBUG group_coordinator - _on_join_leader: Sending leader SyncGroup for group ozon_topic_group to coordinator 1: SyncGroupRequest_v1(group='ozon_topic_group', generation_id=27, member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', group_assignment=[(member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', member_metadata=b'\x00\x00\x00\x00\x00\x01\x00\nozon_topic\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])
2022-07-27 14:32:53.462 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 189: SyncGroupRequest_v1(group='ozon_topic_group', generation_id=27, member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', group_assignment=[(member_id='aiokafka-0.7.2-b98ec328-0641-4694-bc7d-2a7dac35b931', member_metadata=b'\x00\x00\x00\x00\x00\x01\x00\nozon_topic\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])
2022-07-27 14:32:53.473 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 189: SyncGroupResponse_v1(throttle_time_ms=0, error_code=0, member_assignment=b'\x00\x00\x00\x00\x00\x01\x00\nozon_topic\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')
2022-07-27 14:32:53.474 INFO group_coordinator - _send_sync_group_request: Successfully synced group ozon_topic_group with generation 27
2022-07-27 14:32:53.474 INFO group_coordinator - _on_join_complete: Setting newly assigned partitions {TopicPartition(topic='ozon_topic', partition=0)} for group ozon_topic_group
2022-07-27 14:32:53.474 INFO kafka - on_partitions_assigned: Assigned {TopicPartition(topic='ozon_topic', partition=0)}
2022-07-27 14:37:53.461 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 14:37:53.463 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 4: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 14:37:53.478 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 4: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 14:37:53.479 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 14:41:53.474 DEBUG conn - close: Closing connection at 192.168.116.1:9092
2022-07-27 14:42:53.516 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 14:42:53.517 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 5: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 14:42:53.527 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 5: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 14:42:53.528 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 14:47:53.545 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 14:47:53.546 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 6: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 14:47:53.552 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 6: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 14:47:53.553 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 14:52:53.577 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 14:52:53.578 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 7: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 14:52:53.583 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 7: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 14:52:53.584 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 14:57:53.601 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 14:57:53.602 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 8: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 14:57:53.606 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 8: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 14:57:53.607 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:02:53.646 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:02:53.647 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 9: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:02:53.651 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 9: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 15:02:53.652 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:07:53.665 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:07:53.666 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 10: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:07:53.671 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 10: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 15:07:53.671 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:12:53.711 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:12:53.711 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 11: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:12:53.715 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 11: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 15:12:53.716 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:17:53.736 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:17:53.738 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 12: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:17:53.741 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 12: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 15:17:53.742 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:22:53.765 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:22:53.767 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 13: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:22:53.771 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 13: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 15:22:53.772 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:27:53.776 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:27:53.777 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 14: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:27:53.782 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 14: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 15:27:53.783 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:32:53.819 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:32:53.820 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 15: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:32:53.823 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 15: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])
2022-07-27 15:32:53.823 DEBUG cluster - update_metadata: Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-27 15:37:53.841 DEBUG client - _metadata_update: Sending metadata request MetadataRequest_v1(topics=['ozon_topic']) to node 1
2022-07-27 15:37:53.842 DEBUG conn - send: <AIOKafkaConnection host=192.168.116.1 port=9092> Request 16: MetadataRequest_v1(topics=['ozon_topic'])
2022-07-27 15:37:53.845 DEBUG conn - _handle_frame: <AIOKafkaConnection host=192.168.116.1 port=9092> Response 16: MetadataResponse_v1(brokers=[(node_id=1, host='192.168.116.1', port=9092, rack=None)], controller_id=1, topics=[(error_code=0, topic='ozon_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1, replicas=[1], isr=[1])])])

We have a custom rebalance_listener to log partition info and as you can see after successful on_partitions_assigned: Assigned {TopicPartition(topic='ozon_topic', partition=0)} no subscription to the topic occurs. Here's code we use.

class RebalanceListener(ConsumerRebalanceListener):
    def __init__(self, consumer):
        self.consumer: AIOKafkaConsumer = consumer

    async def on_partitions_revoked(self, revoked):
        log.info(f"Revoked {revoked}")
        for tp in revoked:
            log.info(f"{revoked} offset is {await self.consumer.position(tp)}")

    async def on_partitions_assigned(self, assigned):
        log.info(f"Assigned {assigned}")
        for tp in assigned:
            log.info(f"{assigned} offset is {await self.consumer.position(tp)}")

# later on ...

listener = RebalanceListener(consumer)
consumer.subscribe(topics=[tasks_topic], listener=listener)

@sam-dee
Copy link
Author

sam-dee commented Dec 4, 2022

We had to move to other library :(
I hope one day this issue'll be resolved.

@sam-dee sam-dee closed this as completed Dec 4, 2022
@c0nst-float
Copy link

@sam-dee, We have a similar issue. What library did you move to? kafka-python?

@sam-dee
Copy link
Author

sam-dee commented Feb 11, 2023

@c0nst-float, We use confluent-kafka with "pull" wrapped with loop.run_in_executor, which works pretty much the same. Here's sources

from confluent_kafka import Consumer

async def listen_kafka():
    log.info(f"Starting listen Kafka")
    consumer = Consumer({
        'bootstrap.servers': kafka_settings.KAFKA_ADDR,
        'group.id': f"{kafka_settings.TASKS_TOPIC}_group",
        'auto.offset.reset': 'latest',
        'enable.auto.commit': True
    })
    semaphore = asyncio.Semaphore(kafka_settings.NOF_SEM_TASKS)
    consumer.subscribe([kafka_settings.TASKS_TOPIC])
    loop = asyncio.get_running_loop()
    poll = functools.partial(consumer.poll, 1)
    try:
        while True:
            message = await loop.run_in_executor(None, poll)

            if message is None:
                continue

            if message.error():
                log.error(f"Error while getting message {message.error()}")
                continue

            async with semaphore:
                try:
                    msg = loads(message.value().decode('utf-8'))
                    asyncio.create_task(dispatcher.dispatch(msg))
                except DispatchError as e:
                    log.error(str(e))

    except asyncio.CancelledError:
        log.info("Stopping consumer...")
        consumer.close()
        log.info("Listen Kafka task stopped")

@ods ods reopened this Feb 13, 2023
@strongbugman
Copy link

How long it taked when execute the "await dispatch(msg.value)"?, looks like this line code "stucked" the loop.
by the way, it need some trick to handle long message-consuming task

@sam-dee
Copy link
Author

sam-dee commented May 14, 2023

In our case it actually works pretty fast, no longer than 2-3 sec. If it's stuck in your case, then it might be CPU-bounded task, hence consider moving it into separate thread, e.g

asyncio.create_task(loop.run_in_executor(None, dispatcher.dispatch(msg)))

@apmorton
Copy link
Contributor

apmorton commented May 9, 2024

@ods I have bisected this issue to #802

Consider the following script:

import asyncio
import logging
import time

import aiokafka


async def consume_task(consumer):
    async for msg in consumer:
        print(msg)
        await consumer.commit()


async def lag_task(producer):
    while True:
        print("sleeping")
        await asyncio.sleep(10)
        print("inducing lag")
        time.sleep(40)
        print("sending message")
        await producer.send_and_wait(
            topic='some_topic',
            value=b'a message',
        )


async def main():
    async with (
        aiokafka.AIOKafkaProducer() as producer,
        aiokafka.AIOKafkaConsumer(
            'some_topic',
            group_id='some_group',
            enable_auto_commit=False,
        ) as consumer,
    ):
        await consumer.seek_to_end()

        task1 = asyncio.create_task(consume_task(consumer))
        task2 = asyncio.create_task(lag_task(producer))

        await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
        print("something finished")


if __name__ == '__main__':
    # logging.basicConfig(level=logging.DEBUG)
    asyncio.run(main())

Prior to #802 this will print:

sleeping
inducing lag
sending message
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping
ConsumerRecord(topic='some_topic', partition=0, offset=10, timestamp=1715232041461, timestamp_type=0, key=None, value=b'a message', checksum=None, serialized_key_size=-1, serialized_value_size=9, headers=())

After #802 this will print:

sleeping
inducing lag
sending message
Failed fetch messages from 0: [Error 7] RequestTimedOutError
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group some_group.
sleeping

Notice the ConsumerRecord is not printed.

If you turn on debug logging you will see that after the induced lag we stop getting:

DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition

No new messages will ever be received by consume_task after this point.

Any ideas?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants