Skip to content

Commit c3b9327

Browse files
authored
feat(pubsub): add delivery attempt property to message object received by user code (#10205)
- Return None when a DeadLetterPolicy hasn't been set on the subscription.
1 parent ad93c6c commit c3b9327

File tree

4 files changed

+80
-6
lines changed

4 files changed

+80
-6
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,10 @@ def _on_response(self, response):
569569

570570
for received_message in response.received_messages:
571571
message = google.cloud.pubsub_v1.subscriber.message.Message(
572-
received_message.message, received_message.ack_id, self._scheduler.queue
572+
received_message.message,
573+
received_message.ack_id,
574+
received_message.delivery_attempt,
575+
self._scheduler.queue,
573576
)
574577
# Making a decision based on the load, and modifying the data that
575578
# affects the load -> needs a lock, as that state can be modified

pubsub/google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class Message(object):
7070
published.
7171
"""
7272

73-
def __init__(self, message, ack_id, request_queue):
73+
def __init__(self, message, ack_id, delivery_attempt, request_queue):
7474
"""Construct the Message.
7575
7676
.. note::
@@ -82,12 +82,16 @@ def __init__(self, message, ack_id, request_queue):
8282
message (~.pubsub_v1.types.PubsubMessage): The message received
8383
from Pub/Sub.
8484
ack_id (str): The ack_id received from Pub/Sub.
85+
delivery_attempt (int): The delivery attempt counter received
86+
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
87+
and zero otherwise.
8588
request_queue (queue.Queue): A queue provided by the policy that
8689
can accept requests; the policy is responsible for handling
8790
those requests.
8891
"""
8992
self._message = message
9093
self._ack_id = ack_id
94+
self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None
9195
self._request_queue = request_queue
9296
self.message_id = message.message_id
9397

@@ -162,6 +166,30 @@ def ack_id(self):
162166
"""str: the ID used to ack the message."""
163167
return self._ack_id
164168

169+
@property
170+
def delivery_attempt(self):
171+
"""The delivery attempt counter is 1 + (the sum of number of NACKs
172+
and number of ack_deadline exceeds) for this message. It is set to None
173+
if a DeadLetterPolicy is not set on the subscription.
174+
175+
A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline
176+
exceeds event is whenever a message is not acknowledged within
177+
ack_deadline. Note that ack_deadline is initially
178+
Subscription.ackDeadlineSeconds, but may get extended automatically by
179+
the client library.
180+
181+
The first delivery of a given message will have this value as 1. The value
182+
is calculated at best effort and is approximate.
183+
184+
EXPERIMENTAL: This feature is part of a closed alpha release. This
185+
API might be changed in backward-incompatible ways and is not recommended
186+
for production use. It is not subject to any SLA or deprecation policy.
187+
188+
Returns:
189+
Optional[int]: The delivery attempt counter or None.
190+
"""
191+
return self._delivery_attempt
192+
165193
def ack(self):
166194
"""Acknowledge the given message.
167195

pubsub/tests/unit/pubsub_v1/subscriber/test_message.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,21 @@
3333
PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000
3434

3535

36-
def create_message(data, ack_id="ACKID", **attrs):
36+
def create_message(data, ack_id="ACKID", delivery_attempt=0, **attrs):
3737
with mock.patch.object(time, "time") as time_:
3838
time_.return_value = RECEIVED_SECONDS
3939
msg = message.Message(
40-
types.PubsubMessage(
40+
message=types.PubsubMessage(
4141
attributes=attrs,
4242
data=data,
4343
message_id="message_id",
4444
publish_time=timestamp_pb2.Timestamp(
4545
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
4646
),
4747
),
48-
ack_id,
49-
queue.Queue(),
48+
ack_id=ack_id,
49+
delivery_attempt=delivery_attempt,
50+
request_queue=queue.Queue(),
5051
)
5152
return msg
5253

@@ -72,6 +73,17 @@ def test_ack_id():
7273
assert msg.ack_id == ack_id
7374

7475

76+
def test_delivery_attempt():
77+
delivery_attempt = 10
78+
msg = create_message(b"foo", delivery_attempt=delivery_attempt)
79+
assert msg.delivery_attempt == delivery_attempt
80+
81+
82+
def test_delivery_attempt_is_none():
83+
msg = create_message(b"foo", delivery_attempt=0)
84+
assert msg.delivery_attempt is None
85+
86+
7587
def test_publish_time():
7688
msg = create_message(b"foo")
7789
assert msg.publish_time == PUBLISHED

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,37 @@ def test__get_initial_request_wo_leaser():
627627
assert initial_request.modify_deadline_seconds == []
628628

629629

630+
def test__on_response_delivery_attempt():
631+
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
632+
manager._callback = mock.sentinel.callback
633+
634+
# Set up the messages.
635+
response = types.StreamingPullResponse(
636+
received_messages=[
637+
types.ReceivedMessage(
638+
ack_id="fack", message=types.PubsubMessage(data=b"foo", message_id="1")
639+
),
640+
types.ReceivedMessage(
641+
ack_id="back",
642+
message=types.PubsubMessage(data=b"bar", message_id="2"),
643+
delivery_attempt=6,
644+
),
645+
]
646+
)
647+
648+
# adjust message bookkeeping in leaser
649+
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42)
650+
651+
manager._on_response(response)
652+
653+
schedule_calls = scheduler.schedule.mock_calls
654+
assert len(schedule_calls) == 2
655+
msg1 = schedule_calls[0][1][1]
656+
assert msg1.delivery_attempt is None
657+
msg2 = schedule_calls[1][1][1]
658+
assert msg2.delivery_attempt == 6
659+
660+
630661
def test__on_response_no_leaser_overload():
631662
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
632663
manager._callback = mock.sentinel.callback

0 commit comments

Comments
 (0)