Skip to content

Commit 57d883e

Browse files
Release 2.2.1 (#45)
* Fix prints for the HA stream's client: 1st implementation * Using subscription id instead of full path * Fixed reference before assignment * Fixed param in listener and updated changelog --------- Co-authored-by: Carlos Blazquez <[email protected]>
1 parent 0129f71 commit 57d883e

File tree

5 files changed

+23
-12
lines changed

5 files changed

+23
-12
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,8 @@ the subscription *and* consuming it. Instead they will just consume an already e
126126

127127
2.2.0 / 2024-05-21
128128
==================
129-
- [added] - Support for highly-available streams with a new method under the `Listener` class: `listen_async_ha`
129+
- [added] - Support for highly-available streams with a new method under the `Listener` class: `listen_async_ha`
130+
131+
2.2.1 / 2024-07-01
132+
==================
133+
- [fixed] - Fixed the `subscription_id` parameter of the listener callback so it changes if the listener switches regions (for HA streams only)

dnaStreaming/demo/show_ha_stream_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@
44

55
listener = Listener()
66
quiet_demo = os.getenv('QUIET_DEMO', "false") == "true"
7-
max_secs = 5
7+
max_secs = 1000
88
print("\n[ACTIVITY] Receiving messages (ASYNC) for {} seconds...\n[0]".format(max_secs), end='')
99

1010

1111
def callback(message, subscription_id):
1212
callback.counter += 1
1313
if not quiet_demo:
1414
if message['action'] != 'del':
15-
print('[INFO] [MSG] [{}]: AN: {}, TITLE: {}'.format(callback.counter, message['an'], message['title']))
15+
print('[INFO] [SUBSCRIPTION]: {} [MSG] [{}]: AN: {}, TITLE: {}'.format(subscription_id, callback.counter, message['an'], message['title']))
1616
else:
17-
print('[INFO] [MSG] [{}]: AN: {}, *** DELETE ***'.format(callback.counter, message['an']))
17+
print('[INFO] [SUBSCRIPTION]: {} [MSG] [{}]: AN: {}, *** DELETE ***'.format(subscription_id, callback.counter, message['an']))
1818
else:
1919
if callback.counter % 10 == 0:
2020
print('[{}]'.format(callback.counter), end='')

dnaStreaming/listener.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,19 +125,20 @@ def ack_message_and_callback(message):
125125
'Listeners for subscriptions have been configured, set and await message arrival.')
126126
return subscription
127127

128-
def listen_async_ha(self, on_message_callback, subscription_id=""):
129-
def ack_message_and_callback(message):
128+
def listen_async_ha(self, on_message_callback):
129+
def ack_message_and_callback(message, subscription_path):
130130
pubsub_msg = json.loads(message.data)
131131
logger.info("Received news message with ID: {}".format(
132132
pubsub_msg['data'][0]['id']))
133133
news_msg = pubsub_msg['data'][0]['attributes']
134-
on_message_callback(news_msg, subscription_id)
134+
short_subscription_id = subscription_path.split("/")[-1]
135+
on_message_callback(news_msg, short_subscription_id)
135136
message.ack()
136137

137138
main_pubsub_client = pubsub_service.get_client(self.config, MAIN_REGION)
138139
backup_pubsub_client = pubsub_service.get_client(self.config, BACKUP_REGION)
139140

140-
subscription_id = subscription_id or self.config.subscription()
141+
subscription_id = self.config.subscription()
141142

142143
if not subscription_id:
143144
raise Exception(

dnaStreaming/services/availability_service.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ def ha_listen(api_host, user_key, subscription_id, stop_event, main_subscription
3838

3939
current_region = MAIN_REGION
4040

41+
def wrapped_callback(subscription_path):
42+
def inner_callback(message):
43+
callback(message, subscription_path)
44+
45+
return inner_callback
46+
4147
streaming_pull_future = main_pubsub_client.subscribe(
42-
main_subscription_path, callback=callback)
48+
main_subscription_path, callback=wrapped_callback(main_subscription_path))
4349

4450
while not stop_event.is_set():
4551

@@ -65,10 +71,10 @@ def ha_listen(api_host, user_key, subscription_id, stop_event, main_subscription
6571

6672
if active_region == MAIN_REGION:
6773
streaming_pull_future = main_pubsub_client.subscribe(
68-
main_subscription_path, callback=callback)
74+
main_subscription_path, callback=wrapped_callback(main_subscription_path))
6975
else: # active_region == BACKUP_REGION
7076
streaming_pull_future = backup_pubsub_client.subscribe(
71-
backup_subscription_path, callback=callback)
77+
backup_subscription_path, callback=wrapped_callback(backup_subscription_path))
7278

7379
logger.warning(f"Started listener in region {active_region}")
7480

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
with open(path.join(this_directory, 'README.rst'), encoding='utf-8') as f:
77
long_description = f.read()
88

9-
VERSION = "2.2.0"
9+
VERSION = "2.2.1"
1010
RELEASE_TAG = f"release-{VERSION}"
1111

1212
setup(

0 commit comments

Comments
 (0)