Skip to content

Commit 59fbd73

Browse files
committed
Merge structs
1 parent 81e5bf2 commit 59fbd73

14 files changed

+82
-190
lines changed

aiokafka/admin/client.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
AlterConfigsRequest,
1717
ListGroupsRequest,
1818
ApiVersionRequest_v0)
19-
from kafka.structs import TopicPartition, OffsetAndMetadata
2019

2120
from aiokafka import __version__
22-
from aiokafka.errors import IncompatibleBrokerVersion, for_code
2321
from aiokafka.client import AIOKafkaClient
22+
from aiokafka.errors import IncompatibleBrokerVersion, for_code
23+
from aiokafka.structs import TopicPartition, OffsetAndMetadata
2424

2525
from .config_resource import ConfigResourceType, ConfigResource
2626
from .new_topic import NewTopic

aiokafka/coordinator/assignors/roundrobin.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
import itertools
33
import logging
44

5-
from kafka.structs import TopicPartition
6-
75
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
86
from aiokafka.coordinator.protocol import (
97
ConsumerProtocolMemberMetadata,
108
ConsumerProtocolMemberAssignment,
119
)
10+
from aiokafka.structs import TopicPartition
1211

1312
log = logging.getLogger(__name__)
1413

aiokafka/coordinator/assignors/sticky/sticky_assignor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from kafka.protocol.struct import Struct
66
from kafka.protocol.types import String, Array, Int32
7-
from kafka.structs import TopicPartition
87

98
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
109
from aiokafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
@@ -14,6 +13,7 @@
1413
ConsumerProtocolMemberAssignment,
1514
)
1615
from aiokafka.coordinator.protocol import Schema
16+
from aiokafka.structs import TopicPartition
1717

1818
log = logging.getLogger(__name__)
1919

aiokafka/coordinator/consumer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
from kafka.metrics import AnonMeasurable
99
from kafka.metrics.stats import Avg, Count, Max, Rate
1010
from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
11-
from kafka.structs import OffsetAndMetadata, TopicPartition
1211
from kafka.util import WeakMethod
1312

1413
import aiokafka.errors as Errors
14+
from aiokafka.structs import OffsetAndMetadata, TopicPartition
1515

1616
from .base import BaseCoordinator, Generation
1717
from .assignors.range import RangePartitionAssignor

aiokafka/coordinator/protocol.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from kafka.protocol.struct import Struct
22
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
3-
from kafka.structs import TopicPartition
3+
4+
from aiokafka.structs import TopicPartition
45

56

67
class ConsumerProtocolMemberMetadata(Struct):

aiokafka/structs.py

+70-7
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
from dataclasses import dataclass
2-
from typing import Generic, NamedTuple, Optional, Sequence, Tuple, TypeVar
2+
from typing import Generic, List, NamedTuple, Optional, Sequence, Tuple, TypeVar
33

4-
from kafka.structs import (
5-
BrokerMetadata,
6-
OffsetAndMetadata,
7-
PartitionMetadata,
8-
TopicPartition,
9-
)
4+
from aiokafka.errors import KafkaError
105

116

127
__all__ = [
@@ -19,6 +14,74 @@
1914
]
2015

2116

17+
class TopicPartition(NamedTuple):
18+
"""A topic and partition tuple"""
19+
20+
topic: str
21+
"A topic name"
22+
23+
partition: int
24+
"A partition id"
25+
26+
27+
class BrokerMetadata(NamedTuple):
28+
"""A Kafka broker metadata used by admin tools"""
29+
30+
nodeId: int
31+
"The Kafka broker id"
32+
33+
host: str
34+
"The Kafka broker hostname"
35+
36+
port: int
37+
"The Kafka broker port"
38+
39+
rack: Optional[str]
40+
"""The rack of the broker, which is used to in rack aware partition
41+
assignment for fault tolerance.
42+
Examples: `RACK1`, `us-east-1d`. Default: None
43+
"""
44+
45+
46+
class PartitionMetadata(NamedTuple):
47+
"""A topic partition metadata describing the state in the MetadataResponse"""
48+
49+
topic: str
50+
"The topic name of the partition this metadata relates to"
51+
52+
partition: int
53+
"The id of the partition this metadata relates to"
54+
55+
leader: int
56+
"The id of the broker that is the leader for the partition"
57+
58+
replicas: List[int]
59+
"The ids of all brokers that contain replicas of the partition"
60+
isr: List[int]
61+
"The ids of all brokers that contain in-sync replicas of the partition"
62+
63+
error: Optional[KafkaError]
64+
"A KafkaError object associated with the request for this partition metadata"
65+
66+
67+
class OffsetAndMetadata(NamedTuple):
68+
"""The Kafka offset commit API
69+
70+
The Kafka offset commit API allows users to provide additional metadata
71+
(in the form of a string) when an offset is committed. This can be useful
72+
(for example) to store information about which node made the commit,
73+
what time the commit was made, etc.
74+
"""
75+
76+
offset: int
77+
"The offset to be committed"
78+
79+
metadata: str
80+
"Non-null metadata"
81+
82+
# TODO add leaderEpoch:
83+
84+
2285
class RecordMetadata(NamedTuple):
2386
"""Returned when a :class:`~.AIOKafkaProducer` sends a message"""
2487

docs/api.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ Structs
149149

150150
.. automodule:: aiokafka.structs
151151

152-
.. autoclass:: kafka.structs.TopicPartition
152+
.. autoclass:: aiokafka.structs.TopicPartition
153153
:members:
154154

155155
.. autoclass:: aiokafka.structs.RecordMetadata

examples/ssl_consume_produce.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
33
from aiokafka.helpers import create_ssl_context
4-
from kafka.structs import TopicPartition
4+
from aiokafka.structs import TopicPartition
55

66
context = create_ssl_context(
77
cafile="./ca-cert", # CA used to sign certificate.

kafka/__init__.py

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ def emit(self, record):
1919

2020

2121
from kafka.serializer import Serializer, Deserializer
22-
from kafka.structs import TopicPartition, OffsetAndMetadata
2322

2423

2524
__all__ = [

kafka/scram.py

-81
This file was deleted.

kafka/structs.py

-87
This file was deleted.

tests/coordinator/test_assignors.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66

77
import pytest
88

9-
from kafka.structs import TopicPartition
109
from aiokafka.coordinator.assignors.range import RangePartitionAssignor
1110
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1211
from aiokafka.coordinator.assignors.sticky.sticky_assignor import (
1312
StickyPartitionAssignor,
1413
)
1514
from aiokafka.coordinator.protocol import ConsumerProtocolMemberAssignment
15+
from aiokafka.structs import TopicPartition
1616

1717

1818
@pytest.fixture(autouse=True)

tests/coordinator/test_partition_movements.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
from kafka.structs import TopicPartition
2-
31
from aiokafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
2+
from aiokafka.structs import TopicPartition
43

54

65
def test_empty_movements_are_sticky():

tests/test_message_accumulator.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import unittest
44
from unittest import mock
55

6-
from kafka.structs import TopicPartition
7-
86
from aiokafka.cluster import ClusterMetadata
97
from aiokafka.errors import (
108
KafkaTimeoutError, NotLeaderForPartitionError, LeaderNotAvailableError
@@ -13,6 +11,7 @@
1311
from aiokafka.producer.message_accumulator import (
1412
MessageAccumulator, MessageBatch, BatchBuilder
1513
)
14+
from aiokafka.structs import TopicPartition
1615

1716
from ._testutil import run_until_complete
1817

0 commit comments

Comments
 (0)