26
26
27
27
_missing = object ()
28
28
29
+ ADMINCLIENT_CONFIG_PARAMS = (
30
+ "allow.auto.create.topics" ,
31
+ "bootstrap.servers" ,
32
+ "client.id" ,
33
+ "request.timeout.ms" ,
34
+ "metadata.max.age.ms" ,
35
+ "security.protocol" ,
36
+ "connections.max.idle.ms" ,
37
+ "sasl.mechanism" ,
38
+ "sasl.username" ,
39
+ "sasl.password" ,
40
+ )
41
+
29
42
30
43
class MsgToSend (BaseModel ):
31
44
"""A Pydantic model representing a message to be sent to Kafka.
@@ -214,6 +227,24 @@ async def send_batch(
214
227
]
215
228
await asyncio .gather (* tasks )
216
229
230
+ async def ping (self , timeout : Optional [float ] = 5.0 ) -> bool :
231
+ """Implement ping using AdminClient."""
232
+ try :
233
+ admin_client = AdminClient (
234
+ {
235
+ x : self .config [x ]
236
+ for x in ADMINCLIENT_CONFIG_PARAMS
237
+ if x in self .config
238
+ }
239
+ )
240
+ cluster_metadata = await call_or_await (
241
+ admin_client .list_topics , timeout = timeout
242
+ )
243
+
244
+ return bool (cluster_metadata )
245
+ except Exception :
246
+ return False
247
+
217
248
218
249
class TopicPartition (NamedTuple ):
219
250
"""A named tuple representing a Kafka topic and partition."""
@@ -228,22 +259,8 @@ def create_topics(
228
259
logger : Union ["LoggerProto" , None , object ] = logger ,
229
260
) -> None :
230
261
"""Creates Kafka topics using the provided configuration."""
231
- required_config_params = (
232
- "allow.auto.create.topics" ,
233
- "bootstrap.servers" ,
234
- "client.id" ,
235
- "request.timeout.ms" ,
236
- "metadata.max.age.ms" ,
237
- "security.protocol" ,
238
- "connections.max.idle.ms" ,
239
- "sasl.mechanism" ,
240
- "sasl.username" ,
241
- "sasl.password" ,
242
- "sasl.kerberos.service.name" ,
243
- )
244
-
245
262
admin_client = AdminClient (
246
- {x : config [x ] for x in required_config_params if x in config }
263
+ {x : config [x ] for x in ADMINCLIENT_CONFIG_PARAMS if x in config }
247
264
)
248
265
249
266
fs = admin_client .create_topics (
0 commit comments