1
1
"""
2
2
Produce Kafka events from signals.
3
3
4
- Main function is ``send_to_event_bus ``.
4
+ Main function is ``get_producer() ``.
5
5
"""
6
6
7
7
import json
8
8
import logging
9
9
from functools import lru_cache
10
- from typing import Any , List
10
+ from typing import Any , List , Optional
11
11
12
12
from django .dispatch import receiver
13
13
from django .test .signals import setting_changed
21
21
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
22
22
try :
23
23
import confluent_kafka
24
- from confluent_kafka import SerializingProducer
24
+ from confluent_kafka import Producer
25
25
from confluent_kafka .schema_registry .avro import AvroSerializer
26
+ from confluent_kafka .serialization import MessageField , SerializationContext
26
27
except ImportError : # pragma: no cover
27
28
confluent_kafka = None
28
29
@@ -113,63 +114,22 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field:
113
114
114
115
115
116
@lru_cache
116
- def get_serializer (signal : OpenEdxPublicSignal ) -> AvroSignalSerializer :
117
+ def get_serializers (signal : OpenEdxPublicSignal , event_key_field : str ) :
117
118
"""
118
- Get the serializer for a signal.
119
+ Get the key and value serializers for a signal and a key field path .
119
120
120
121
This is cached in order to save work re-transforming classes into Avro schemas.
121
- """
122
- return AvroSignalSerializer (signal )
123
-
124
-
125
- # Note: This caching is required, since otherwise the Producer will
126
- # fall out of scope and be garbage-collected, destroying the
127
- # outbound-message queue and threads. The use of this cache allows the
128
- # producers to be long-lived.
129
- #
130
- # We are also likely to need to iterate through this cache at server
131
- # shutdown in order to flush each of the producers, which means the
132
- # cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11
133
- # for more details.
134
- #
135
- # (Why not change the code to use a single Producer rather than multiple
136
- # SerializerProducer? Because the code actually turns out to be significantly
137
- # uglier that way due to the number of separate values that need to be passed
138
- # around in bundles. There aren't clear "cut-off" points. Additionally, it
139
- # makes unit testing harder/uglier since now the mocks need to either deal with
140
- # serialized bytes or mock out the serializers. Getting this down to a single
141
- # Producer doesn't really seem worth the trouble.)
142
-
143
- # return type (Optional[SerializingProducer]) removed from signature to avoid error on import
144
-
145
- @lru_cache (maxsize = None ) # Never evict an entry -- it's a small set and we need to keep all of them.
146
- def get_producer_for_signal (signal : OpenEdxPublicSignal , event_key_field : str ):
147
- """
148
- Create the producer for a signal and a key field path.
149
-
150
- If essential settings are missing or invalid, warn and return None.
151
122
152
123
Arguments:
153
- signal: The OpenEdxPublicSignal to make a producer for
154
- event_key_field: Path to the event data field to use as the event key (period-delimited
155
- string naming the dictionary keys to descend)
124
+ signal: The OpenEdxPublicSignal to make a serializer for.
125
+ event_key_field: Path to descend in the signal schema to find the subschema for the key
126
+ (period-delimited string naming the field names to descend).
127
+
156
128
Returns:
157
- None if confluent_kafka is not defined or the settings are invalid.
158
- SerializingProducer if it is.
129
+ 2-tuple of AvroSignalSerializers, for event key and value
159
130
"""
160
- if not confluent_kafka : # pragma: no cover
161
- logger .warning ('Library confluent-kafka not available. Cannot create event producer.' )
162
- return None
163
-
164
- schema_registry_client = get_schema_registry_client ()
165
- if schema_registry_client is None :
166
- return None
167
-
168
- producer_settings = load_common_settings ()
169
- if producer_settings is None :
170
- return None
171
-
172
- signal_serializer = get_serializer (signal )
131
+ client = get_schema_registry_client ()
132
+ signal_serializer = AvroSignalSerializer (signal )
173
133
174
134
def inner_to_dict (event_data , ctx = None ): # pylint: disable=unused-argument
175
135
"""Tells Avro how to turn objects into dictionaries."""
@@ -178,21 +138,95 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
178
138
# Serializers for key and value components of Kafka event
179
139
key_serializer = AvroSerializer (
180
140
schema_str = extract_key_schema (signal_serializer , event_key_field ),
181
- schema_registry_client = schema_registry_client ,
141
+ schema_registry_client = client ,
182
142
to_dict = inner_to_dict ,
183
143
)
184
144
value_serializer = AvroSerializer (
185
145
schema_str = signal_serializer .schema_string (),
186
- schema_registry_client = schema_registry_client ,
146
+ schema_registry_client = client ,
187
147
to_dict = inner_to_dict ,
188
148
)
189
149
190
- producer_settings .update ({
191
- 'key.serializer' : key_serializer ,
192
- 'value.serializer' : value_serializer ,
193
- })
150
+ return key_serializer , value_serializer
151
+
152
+
153
+ class EventProducerKafka ():
154
+ """
155
+ API singleton for event production to Kafka.
156
+
157
+ This is just a wrapper around a confluent_kafka Producer that knows how to
158
+ serialize a signal to event wire format.
159
+
160
+ Only one instance (of Producer or this wrapper) should be created,
161
+ since it is stateful and needs lifecycle management.
162
+ """
163
+
164
+ def __init__ (self , producer ):
165
+ self .producer = producer
166
+
167
+ def send (
168
+ self , * , signal : OpenEdxPublicSignal , topic : str , event_key_field : str , event_data : dict ,
169
+ ) -> None :
170
+ """
171
+ Send a signal event to the event bus under the specified topic.
172
+
173
+ Arguments:
174
+ signal: The original OpenEdxPublicSignal the event was sent to
175
+ topic: The event bus topic for the event
176
+ event_key_field: Path to the event data field to use as the event key (period-delimited
177
+ string naming the dictionary keys to descend)
178
+ event_data: The event data (kwargs) sent to the signal
179
+ """
180
+ event_key = extract_event_key (event_data , event_key_field )
181
+ headers = {EVENT_TYPE_HEADER_KEY : signal .event_type }
182
+
183
+ key_serializer , value_serializer = get_serializers (signal , event_key_field )
184
+ key_bytes = key_serializer (event_key , SerializationContext (topic , MessageField .KEY , headers ))
185
+ value_bytes = value_serializer (event_data , SerializationContext (topic , MessageField .VALUE , headers ))
186
+
187
+ self .producer .produce (
188
+ topic , key = key_bytes , value = value_bytes , headers = headers , on_delivery = on_event_deliver ,
189
+ )
190
+
191
+ # Opportunistically ensure any pending callbacks from recent event-sends are triggered.
192
+ #
193
+ # This assumes events come regularly, or that we're not concerned about
194
+ # high latency between delivery and callback. If those assumptions are
195
+ # false, we should switch to calling poll(1.0) or similar in a loop on
196
+ # a separate thread. Or do both.
197
+ #
198
+ # Issue: https://github.com/openedx/event-bus-kafka/issues/31
199
+ self .producer .poll (0 )
200
+
201
+ def pre_shutdown (self ):
202
+ """
203
+ Prepare producer for a clean shutdown.
194
204
195
- return SerializingProducer (producer_settings )
205
+ Flush pending outbound events, wait for acknowledgement, and process callbacks.
206
+ """
207
+ self .producer .flush (- 1 )
208
+
209
+
210
+ # Note: This caching is required, since otherwise the Producer will
211
+ # fall out of scope and be garbage-collected, destroying the
212
+ # outbound-message queue and threads. The use of this cache allows the
213
+ # producer to be long-lived.
214
+ @lru_cache
215
+ def get_producer () -> Optional [EventProducerKafka ]:
216
+ """
217
+ Create or retrieve Producer API singleton.
218
+
219
+ If confluent-kafka library or essential settings are missing, warn and return None.
220
+ """
221
+ if not confluent_kafka : # pragma: no cover
222
+ logger .warning ('Library confluent-kafka not available. Cannot create event producer.' )
223
+ return None
224
+
225
+ producer_settings = load_common_settings ()
226
+ if producer_settings is None :
227
+ return None
228
+
229
+ return EventProducerKafka (Producer (producer_settings ))
196
230
197
231
198
232
def on_event_deliver (err , evt ):
@@ -214,51 +248,8 @@ def on_event_deliver(err, evt):
214
248
f"partition={ evt .partition ()} " )
215
249
216
250
217
- def send_to_event_bus (
218
- signal : OpenEdxPublicSignal , topic : str , event_key_field : str , event_data : dict ,
219
- sync : bool = False ,
220
- ) -> None :
221
- """
222
- Send a signal event to the event bus under the specified topic.
223
-
224
- If the Kafka settings are missing or invalid, return with a warning.
225
-
226
- Arguments:
227
- signal: The original OpenEdxPublicSignal the event was sent to
228
- topic: The event bus topic for the event
229
- event_key_field: Path to the event data field to use as the event key (period-delimited
230
- string naming the dictionary keys to descend)
231
- event_data: The event data (kwargs) sent to the signal
232
- sync: Whether to wait indefinitely for event to be received by the message bus (probably
233
- only want to use this for testing)
234
- """
235
- producer = get_producer_for_signal (signal , event_key_field )
236
- if producer is None : # Note: SerializingProducer has False truthiness when len() == 0
237
- return
238
-
239
- event_key = extract_event_key (event_data , event_key_field )
240
- producer .produce (topic , key = event_key , value = event_data ,
241
- on_delivery = on_event_deliver ,
242
- headers = {EVENT_TYPE_HEADER_KEY : signal .event_type })
243
-
244
- if sync :
245
- # Wait for all buffered events to send, then wait for all of
246
- # them to be acknowledged, and trigger all callbacks.
247
- producer .flush (- 1 )
248
- else :
249
- # Opportunistically ensure any pending callbacks from recent events are triggered.
250
- #
251
- # This assumes events come regularly, or that we're not concerned about
252
- # high latency between delivery and callback. If those assumptions are
253
- # false, we should switch to calling poll(1.0) or similar in a loop on
254
- # a separate thread.
255
- #
256
- # Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079
257
- producer .poll (0 )
258
-
259
-
260
251
@receiver (setting_changed )
261
252
def _reset_caches (sender , ** kwargs ): # pylint: disable=unused-argument
262
253
"""Reset caches during testing when settings change."""
263
- get_serializer .cache_clear ()
264
- get_producer_for_signal .cache_clear ()
254
+ get_serializers .cache_clear ()
255
+ get_producer .cache_clear ()
0 commit comments