-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.py
26 lines (19 loc) · 923 Bytes
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
data = "hello world"
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
p.produce('chatroom1', data.encode('utf-8'), key="TEST", callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()