Skip to content

Commit fa7c4fd

Browse files
authored
feat!: Switch to a single Producer, wrapped in an API singleton (#32)
Purpose: - Revisit #16 since I finally figured out a clean way to have a single producer. - Reduce the burden on future code that will need to adjust how polling is done (#31) and maybe handle shutdown (#11) - Prepare for configurable implementation loading, which will need a singleton and getter: openedx/openedx-events#87 - Get rid of the `sync` argument (which didn't fit the abstraction) and move it to a dedicated method. Relying code should now call `get_producer().send(...)` rather than `send_to_event_bus(...)`. The return value is an object that wraps a `Producer` instance (not a `SerializingProducer`) and that handles the serialization itself. Serialization logic is moved to a cached `get_serializers(...)` that expands upon the previous `get_serializer` function; it now returns a pair of key and value serializers. This also acts as a patch point for mocking. I'd like to test the serializers themselves, but they want to talk to a server. `send_to_event_bus` gets a shorter name (now it's just a `send` method) and loses the `sync` keyword argument; there is instead now a `prepare_for_shutdown` method. Other refactoring: - Cache `create_schema_registry_client` and rename to `get_...` - Lift producer test data to be instance variables
1 parent 10aa36a commit fa7c4fd

File tree

8 files changed

+187
-147
lines changed

8 files changed

+187
-147
lines changed

CHANGELOG.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,20 @@ Unreleased
1616

1717
*
1818

19+
[0.5.0] - 2022-08-31
20+
********************
21+
22+
Changed
23+
=======
24+
25+
* **Breaking changes** in the producer module, refactored to expose a better API:
26+
27+
* Rather than `send_to_event_bus(...)`, relying code should now call `get_producer().send(...)`.
28+
* The `sync` kwarg is gone; to flush and sync messages before shutdown, call `get_producer().prepare_for_shutdown()` instead.
29+
30+
* Clarify that config module is for internal use only.
31+
* Implementation changes: Only a single Producer is created, and is used for all signals.
32+
1933
[0.4.4] - 2022-08-26
2034
********************
2135

edx_event_bus_kafka/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
Kafka implementation for Open edX event bus.
33
"""
44

5-
__version__ = '0.4.4'
5+
__version__ = '0.5.0'

edx_event_bus_kafka/config.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
"""
22
Configuration loading and validation.
3+
4+
This module is for internal use only.
35
"""
46

57
import warnings
8+
from functools import lru_cache
69
from typing import Optional
710

811
from django.conf import settings
12+
from django.dispatch import receiver
13+
from django.test.signals import setting_changed
914

1015
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
1116
try:
@@ -16,10 +21,15 @@
1621

1722

1823
# return type (Optional[SchemaRegistryClient]) removed from signature to avoid error on import
19-
def create_schema_registry_client():
24+
@lru_cache # will just be one cache entry, in practice
25+
def get_schema_registry_client():
2026
"""
2127
Create a schema registry client from common settings.
2228
29+
This is cached on the assumption of a performance benefit (avoid reloading settings and
30+
reconstructing client) but it may also be that the client keeps around long-lived
31+
connections that we could benefit from.
32+
2333
Returns
2434
None if confluent_kafka library is not available or the settings are invalid.
2535
SchemaRegistryClient if it is.
@@ -69,3 +79,9 @@ def load_common_settings() -> Optional[dict]:
6979
})
7080

7181
return base_settings
82+
83+
84+
@receiver(setting_changed)
85+
def _reset_state(sender, **kwargs): # pylint: disable=unused-argument
86+
"""Reset caches when settings change during unit tests."""
87+
get_schema_registry_client.cache_clear()

edx_event_bus_kafka/consumer/event_consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
1414
from openedx_events.tooling import OpenEdxPublicSignal
1515

16-
from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings
16+
from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings
1717

1818
logger = logging.getLogger(__name__)
1919

@@ -68,7 +68,7 @@ def _create_consumer(self):
6868
DeserializingConsumer if it is.
6969
"""
7070

71-
schema_registry_client = create_schema_registry_client()
71+
schema_registry_client = get_schema_registry_client()
7272

7373
# TODO (EventBus):
7474
# 1. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset)

edx_event_bus_kafka/management/commands/produce_event.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from django.utils.module_loading import import_string
1010
from openedx_events.tooling import OpenEdxPublicSignal
1111

12-
from edx_event_bus_kafka.publishing.event_producer import send_to_event_bus
12+
from edx_event_bus_kafka.publishing.event_producer import get_producer
1313

1414
logger = logging.getLogger(__name__)
1515

@@ -53,12 +53,13 @@ def add_arguments(self, parser):
5353

5454
def handle(self, *args, **options):
5555
try:
56-
send_to_event_bus(
56+
producer = get_producer()
57+
producer.send(
5758
signal=import_string(options['signal'][0]),
5859
topic=options['topic'][0],
5960
event_key_field=options['key_field'][0],
6061
event_data=json.loads(options['data'][0]),
61-
sync=True, # otherwise command may exit before delivery is complete
6262
)
63+
producer.prepare_for_shutdown() # otherwise command may exit before delivery is complete
6364
except Exception: # pylint: disable=broad-except
6465
logger.exception("Error producing Kafka event")
Lines changed: 100 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
11
"""
22
Produce Kafka events from signals.
33
4-
Main function is ``send_to_event_bus``.
4+
Main function is ``get_producer()``.
55
"""
66

77
import json
88
import logging
99
from functools import lru_cache
10-
from typing import Any, List
10+
from typing import Any, List, Optional
1111

1212
from django.dispatch import receiver
1313
from django.test.signals import setting_changed
1414
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
1515
from openedx_events.tooling import OpenEdxPublicSignal
1616

17-
from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings
17+
from edx_event_bus_kafka.config import get_schema_registry_client, load_common_settings
1818

1919
logger = logging.getLogger(__name__)
2020

2121
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
2222
try:
2323
import confluent_kafka
24-
from confluent_kafka import SerializingProducer
24+
from confluent_kafka import Producer
2525
from confluent_kafka.schema_registry.avro import AvroSerializer
26+
from confluent_kafka.serialization import MessageField, SerializationContext
2627
except ImportError: # pragma: no cover
2728
confluent_kafka = None
2829

@@ -113,63 +114,25 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field:
113114

114115

115116
@lru_cache
116-
def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
117+
def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str):
117118
"""
118-
Get the serializer for a signal.
119+
Get the key and value serializers for a signal and a key field path.
119120
120121
This is cached in order to save work re-transforming classes into Avro schemas.
121-
"""
122-
return AvroSignalSerializer(signal)
123-
124-
125-
# Note: This caching is required, since otherwise the Producer will
126-
# fall out of scope and be garbage-collected, destroying the
127-
# outbound-message queue and threads. The use of this cache allows the
128-
# producers to be long-lived.
129-
#
130-
# We are also likely to need to iterate through this cache at server
131-
# shutdown in order to flush each of the producers, which means the
132-
# cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11
133-
# for more details.
134-
#
135-
# (Why not change the code to use a single Producer rather than multiple
136-
# SerializerProducer? Because the code actually turns out to be significantly
137-
# uglier that way due to the number of separate values that need to be passed
138-
# around in bundles. There aren't clear "cut-off" points. Additionally, it
139-
# makes unit testing harder/uglier since now the mocks need to either deal with
140-
# serialized bytes or mock out the serializers. Getting this down to a single
141-
# Producer doesn't really seem worth the trouble.)
142-
143-
# return type (Optional[SerializingProducer]) removed from signature to avoid error on import
144-
145-
@lru_cache(maxsize=None) # Never evict an entry -- it's a small set and we need to keep all of them.
146-
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str):
147-
"""
148-
Create the producer for a signal and a key field path.
149-
150-
If essential settings are missing or invalid, warn and return None.
151122
152123
Arguments:
153-
signal: The OpenEdxPublicSignal to make a producer for
154-
event_key_field: Path to the event data field to use as the event key (period-delimited
155-
string naming the dictionary keys to descend)
124+
signal: The OpenEdxPublicSignal to make a serializer for.
125+
event_key_field: Path to descend in the signal schema to find the subschema for the key
126+
(period-delimited string naming the field names to descend).
127+
156128
Returns:
157-
None if confluent_kafka is not defined or the settings are invalid.
158-
SerializingProducer if it is.
129+
2-tuple of AvroSignalSerializers, for event key and value
159130
"""
160-
if not confluent_kafka: # pragma: no cover
161-
logger.warning('Library confluent-kafka not available. Cannot create event producer.')
162-
return None
131+
client = get_schema_registry_client()
132+
if client is None:
133+
raise Exception('Cannot create Kafka serializers -- missing library or settings')
163134

164-
schema_registry_client = create_schema_registry_client()
165-
if schema_registry_client is None:
166-
return None
167-
168-
producer_settings = load_common_settings()
169-
if producer_settings is None:
170-
return None
171-
172-
signal_serializer = get_serializer(signal)
135+
signal_serializer = AvroSignalSerializer(signal)
173136

174137
def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
175138
"""Tells Avro how to turn objects into dictionaries."""
@@ -178,21 +141,95 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
178141
# Serializers for key and value components of Kafka event
179142
key_serializer = AvroSerializer(
180143
schema_str=extract_key_schema(signal_serializer, event_key_field),
181-
schema_registry_client=schema_registry_client,
144+
schema_registry_client=client,
182145
to_dict=inner_to_dict,
183146
)
184147
value_serializer = AvroSerializer(
185148
schema_str=signal_serializer.schema_string(),
186-
schema_registry_client=schema_registry_client,
149+
schema_registry_client=client,
187150
to_dict=inner_to_dict,
188151
)
189152

190-
producer_settings.update({
191-
'key.serializer': key_serializer,
192-
'value.serializer': value_serializer,
193-
})
153+
return key_serializer, value_serializer
154+
155+
156+
class EventProducerKafka():
157+
"""
158+
API singleton for event production to Kafka.
159+
160+
This is just a wrapper around a confluent_kafka Producer that knows how to
161+
serialize a signal to event wire format.
162+
163+
Only one instance (of Producer or this wrapper) should be created,
164+
since it is stateful and needs lifecycle management.
165+
"""
166+
167+
def __init__(self, producer):
168+
self.producer = producer
169+
170+
def send(
171+
self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
172+
) -> None:
173+
"""
174+
Send a signal event to the event bus under the specified topic.
175+
176+
Arguments:
177+
signal: The original OpenEdxPublicSignal the event was sent to
178+
topic: The event bus topic for the event
179+
event_key_field: Path to the event data field to use as the event key (period-delimited
180+
string naming the dictionary keys to descend)
181+
event_data: The event data (kwargs) sent to the signal
182+
"""
183+
event_key = extract_event_key(event_data, event_key_field)
184+
headers = {EVENT_TYPE_HEADER_KEY: signal.event_type}
185+
186+
key_serializer, value_serializer = get_serializers(signal, event_key_field)
187+
key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers))
188+
value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers))
189+
190+
self.producer.produce(
191+
topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver,
192+
)
193+
194+
# Opportunistically ensure any pending callbacks from recent event-sends are triggered.
195+
#
196+
# This assumes events come regularly, or that we're not concerned about
197+
# high latency between delivery and callback. If those assumptions are
198+
# false, we should switch to calling poll(1.0) or similar in a loop on
199+
# a separate thread. Or do both.
200+
#
201+
# Issue: https://github.com/openedx/event-bus-kafka/issues/31
202+
self.producer.poll(0)
203+
204+
def prepare_for_shutdown(self):
205+
"""
206+
Prepare producer for a clean shutdown.
194207
195-
return SerializingProducer(producer_settings)
208+
Flush pending outbound events, wait for acknowledgement, and process callbacks.
209+
"""
210+
self.producer.flush(-1)
211+
212+
213+
# Note: This caching is required, since otherwise the Producer will
214+
# fall out of scope and be garbage-collected, destroying the
215+
# outbound-message queue and threads. The use of this cache allows the
216+
# producer to be long-lived.
217+
@lru_cache # will just be one cache entry, in practice
218+
def get_producer() -> Optional[EventProducerKafka]:
219+
"""
220+
Create or retrieve Producer API singleton.
221+
222+
If confluent-kafka library or essential settings are missing, warn and return None.
223+
"""
224+
if not confluent_kafka: # pragma: no cover
225+
logger.warning('Library confluent-kafka not available. Cannot create event producer.')
226+
return None
227+
228+
producer_settings = load_common_settings()
229+
if producer_settings is None:
230+
return None
231+
232+
return EventProducerKafka(Producer(producer_settings))
196233

197234

198235
def on_event_deliver(err, evt):
@@ -214,51 +251,8 @@ def on_event_deliver(err, evt):
214251
f"partition={evt.partition()}")
215252

216253

217-
def send_to_event_bus(
218-
signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
219-
sync: bool = False,
220-
) -> None:
221-
"""
222-
Send a signal event to the event bus under the specified topic.
223-
224-
If the Kafka settings are missing or invalid, return with a warning.
225-
226-
Arguments:
227-
signal: The original OpenEdxPublicSignal the event was sent to
228-
topic: The event bus topic for the event
229-
event_key_field: Path to the event data field to use as the event key (period-delimited
230-
string naming the dictionary keys to descend)
231-
event_data: The event data (kwargs) sent to the signal
232-
sync: Whether to wait indefinitely for event to be received by the message bus (probably
233-
only want to use this for testing)
234-
"""
235-
producer = get_producer_for_signal(signal, event_key_field)
236-
if producer is None: # Note: SerializingProducer has False truthiness when len() == 0
237-
return
238-
239-
event_key = extract_event_key(event_data, event_key_field)
240-
producer.produce(topic, key=event_key, value=event_data,
241-
on_delivery=on_event_deliver,
242-
headers={EVENT_TYPE_HEADER_KEY: signal.event_type})
243-
244-
if sync:
245-
# Wait for all buffered events to send, then wait for all of
246-
# them to be acknowledged, and trigger all callbacks.
247-
producer.flush(-1)
248-
else:
249-
# Opportunistically ensure any pending callbacks from recent events are triggered.
250-
#
251-
# This assumes events come regularly, or that we're not concerned about
252-
# high latency between delivery and callback. If those assumptions are
253-
# false, we should switch to calling poll(1.0) or similar in a loop on
254-
# a separate thread.
255-
#
256-
# Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079
257-
producer.poll(0)
258-
259-
260254
@receiver(setting_changed)
261255
def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument
262-
"""Reset caches during testing when settings change."""
263-
get_serializer.cache_clear()
264-
get_producer_for_signal.cache_clear()
256+
"""Reset caches when settings change during unit tests."""
257+
get_serializers.cache_clear()
258+
get_producer.cache_clear()

0 commit comments

Comments
 (0)