18
18
package kafka .server
19
19
20
20
import java .util .concurrent .ConcurrentHashMap
21
- import java .util .concurrent .atomic .AtomicReference
22
21
import java .util .{Collections , Properties }
23
- import kafka .controller .KafkaController
24
22
import kafka .coordinator .transaction .TransactionCoordinator
25
23
import kafka .utils .Logging
26
24
import org .apache .kafka .clients .ClientResponse
@@ -31,7 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
31
29
import org .apache .kafka .common .message .CreateTopicsRequestData .{CreatableTopic , CreatableTopicConfig , CreatableTopicConfigCollection }
32
30
import org .apache .kafka .common .message .MetadataResponseData .MetadataResponseTopic
33
31
import org .apache .kafka .common .protocol .{ApiKeys , Errors }
34
- import org .apache .kafka .common .requests .{ApiError , CreateTopicsRequest , RequestContext , RequestHeader }
32
+ import org .apache .kafka .common .requests .{CreateTopicsRequest , RequestContext , RequestHeader }
35
33
import org .apache .kafka .coordinator .group .GroupCoordinator
36
34
import org .apache .kafka .coordinator .share .ShareCoordinator
37
35
import org .apache .kafka .server .common .{ControllerRequestCompletionHandler , NodeToControllerChannelManager }
@@ -49,34 +47,13 @@ trait AutoTopicCreationManager {
49
47
): Seq [MetadataResponseTopic ]
50
48
}
51
49
52
- object AutoTopicCreationManager {
53
-
54
- def apply (
55
- config : KafkaConfig ,
56
- channelManager : Option [NodeToControllerChannelManager ],
57
- adminManager : Option [ZkAdminManager ],
58
- controller : Option [KafkaController ],
59
- groupCoordinator : GroupCoordinator ,
60
- txnCoordinator : TransactionCoordinator ,
61
- shareCoordinator : Option [ShareCoordinator ],
62
- ): AutoTopicCreationManager = {
63
- new DefaultAutoTopicCreationManager (config, channelManager, adminManager,
64
- controller, groupCoordinator, txnCoordinator, shareCoordinator)
65
- }
66
- }
67
-
68
50
class DefaultAutoTopicCreationManager (
69
51
config : KafkaConfig ,
70
- channelManager : Option [NodeToControllerChannelManager ],
71
- adminManager : Option [ZkAdminManager ],
72
- controller : Option [KafkaController ],
52
+ channelManager : NodeToControllerChannelManager ,
73
53
groupCoordinator : GroupCoordinator ,
74
54
txnCoordinator : TransactionCoordinator ,
75
55
shareCoordinator : Option [ShareCoordinator ]
76
56
) extends AutoTopicCreationManager with Logging {
77
- if (controller.isEmpty && channelManager.isEmpty) {
78
- throw new IllegalArgumentException (" Must supply a channel manager if not supplying a controller" )
79
- }
80
57
81
58
private val inflightTopics = Collections .newSetFromMap(new ConcurrentHashMap [String , java.lang.Boolean ]())
82
59
@@ -99,65 +76,13 @@ class DefaultAutoTopicCreationManager(
99
76
100
77
val creatableTopicResponses = if (creatableTopics.isEmpty) {
101
78
Seq .empty
102
- } else if (controller.isEmpty || ! controller.get.isActive && channelManager.isDefined) {
103
- sendCreateTopicRequest(creatableTopics, metadataRequestContext)
104
79
} else {
105
- createTopicsInZk (creatableTopics, controllerMutationQuota )
80
+ sendCreateTopicRequest (creatableTopics, metadataRequestContext )
106
81
}
107
82
108
83
uncreatableTopicResponses ++ creatableTopicResponses
109
84
}
110
85
111
- private def createTopicsInZk (
112
- creatableTopics : Map [String , CreatableTopic ],
113
- controllerMutationQuota : ControllerMutationQuota
114
- ): Seq [MetadataResponseTopic ] = {
115
- val topicErrors = new AtomicReference [Map [String , ApiError ]]()
116
- try {
117
- // Note that we use timeout = 0 since we do not need to wait for metadata propagation
118
- // and we want to get the response error immediately.
119
- adminManager.get.createTopics(
120
- timeout = 0 ,
121
- validateOnly = false ,
122
- creatableTopics,
123
- Map .empty,
124
- controllerMutationQuota,
125
- topicErrors.set
126
- )
127
-
128
- val creatableTopicResponses = Option (topicErrors.get) match {
129
- case Some (errors) =>
130
- errors.toSeq.map { case (topic, apiError) =>
131
- val error = apiError.error match {
132
- case Errors .TOPIC_ALREADY_EXISTS | Errors .REQUEST_TIMED_OUT =>
133
- // The timeout error is expected because we set timeout=0. This
134
- // nevertheless indicates that the topic metadata was created
135
- // successfully, so we return LEADER_NOT_AVAILABLE.
136
- Errors .LEADER_NOT_AVAILABLE
137
- case error => error
138
- }
139
-
140
- new MetadataResponseTopic ()
141
- .setErrorCode(error.code)
142
- .setName(topic)
143
- .setIsInternal(Topic .isInternal(topic))
144
- }
145
-
146
- case None =>
147
- creatableTopics.keySet.toSeq.map { topic =>
148
- new MetadataResponseTopic ()
149
- .setErrorCode(Errors .UNKNOWN_TOPIC_OR_PARTITION .code)
150
- .setName(topic)
151
- .setIsInternal(Topic .isInternal(topic))
152
- }
153
- }
154
-
155
- creatableTopicResponses
156
- } finally {
157
- clearInflightRequests(creatableTopics)
158
- }
159
- }
160
-
161
86
private def sendCreateTopicRequest (
162
87
creatableTopics : Map [String , CreatableTopic ],
163
88
metadataRequestContext : Option [RequestContext ]
@@ -189,10 +114,6 @@ class DefaultAutoTopicCreationManager(
189
114
}
190
115
}
191
116
192
- val channelManager = this .channelManager.getOrElse {
193
- throw new IllegalStateException (" Channel manager must be defined in order to send CreateTopic requests." )
194
- }
195
-
196
117
val request = metadataRequestContext.map { context =>
197
118
val requestVersion =
198
119
channelManager.controllerApiVersions.toScala match {
0 commit comments