Skip to content

Commit c02060f

Browse files
authored
fix: PubSub incompatibility with api-core 1.17.0+ (googleapis#103)
* fix: disable pre-fetching first streaming pull item * Remove api-core version cap, but ban 1.17.0 release * Regenerate gapic layer with synth * Revert "Regenerate gapic layer with synth" This reverts commit 1d24853. * Retain only the relevant fix in generated code * Exclude multiple incompatible api-core versions * Fix syntax error in synth.py * Ban all bugfix versions of problematic api-core minor versions
1 parent 6c7677e commit c02060f

File tree

4 files changed

+34
-3
lines changed

4 files changed

+34
-3
lines changed

google/cloud/pubsub_v1/gapic/subscriber_client.py

+5
Original file line numberDiff line numberDiff line change
@@ -1175,6 +1175,11 @@ def streaming_pull(
11751175
client_info=self._client_info,
11761176
)
11771177

1178+
# Wrappers in api-core should not automatically pre-fetch the first
1179+
# stream result, as this breaks the stream when re-opening it.
1180+
# https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
1181+
self.transport.streaming_pull._prefetch_first_result_ = False
1182+
11781183
return self._inner_api_calls["streaming_pull"](
11791184
requests, retry=retry, timeout=timeout, metadata=metadata
11801185
)

setup.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
# 'Development Status :: 5 - Production/Stable'
3030
release_status = "Development Status :: 5 - Production/Stable"
3131
dependencies = [
32-
# google-api-core[grpc] 1.17.0 causes problems, thus restricting its
33-
# version until the issue gets fixed.
32+
# google-api-core[grpc] 1.17.0 up to 1.19.1 causes problems with stream
33+
# recovery, thus those versions should not be used.
3434
# https://github.com/googleapis/python-pubsub/issues/74
35-
"google-api-core[grpc] >= 1.14.0, < 1.17.0",
35+
"google-api-core[grpc] >= 1.14.0, != 1.17.*, != 1.18.*, != 1.19.*",
3636
"grpc-google-iam-v1 >= 0.12.3, < 0.13dev",
3737
'enum34; python_version < "3.4"',
3838
]

synth.py

+14
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,20 @@ def _merge_dict(d1, d2):
185185
"from google.iam.v1 import iam_policy_pb2_grpc as iam_policy_pb2",
186186
)
187187

188+
# Monkey patch the streaming_pull() GAPIC method to disable pre-fetching stream
189+
# results.
190+
s.replace(
191+
"google/cloud/pubsub_v1/gapic/subscriber_client.py",
192+
r"return self\._inner_api_calls\['streaming_pull'\]\(.*",
193+
"""
194+
# Wrappers in api-core should not automatically pre-fetch the first
195+
# stream result, as this breaks the stream when re-opening it.
196+
# https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
197+
self.transport.streaming_pull._prefetch_first_result_ = False
198+
199+
\g<0>"""
200+
)
201+
188202
# Add missing blank line before Attributes: in generated docstrings
189203
# https://github.com/googleapis/protoc-docs-plugin/pull/31
190204
s.replace(

tests/unit/pubsub_v1/subscriber/test_subscriber_client.py

+12
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,15 @@ def test_closes_channel_as_context_manager():
154154
pass
155155

156156
mock_transport.channel.close.assert_called()
157+
158+
159+
def test_streaming_pull_gapic_monkeypatch():
160+
transport = mock.NonCallableMock(spec=["streaming_pull"])
161+
transport.streaming_pull = mock.Mock(spec=[])
162+
client = subscriber.Client(transport=transport)
163+
164+
client.streaming_pull(requests=iter([]))
165+
166+
assert client.api.transport is transport
167+
assert hasattr(transport.streaming_pull, "_prefetch_first_result_")
168+
assert not transport.streaming_pull._prefetch_first_result_

0 commit comments

Comments
 (0)