Skip to content

Commit e1c9fa3

Browse files
committed
refactor(retries): added new retry policy to prevent retry amplification
1 parent f4735c9 commit e1c9fa3

File tree

10 files changed

+206
-14
lines changed

10 files changed

+206
-14
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ venv*
77
.tox
88
build/
99
dist
10+
.python-version

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,24 @@ sdk = SDK(iam_token="t1.9eu...", endpoint="api.yandexcloud.kz")
156156
set_up_yc_api_endpoint(kz_region_endpoint)
157157
```
158158

159+
### Retries
160+
If you want to retry SDK requests, build SDK with `retry_policy` field using `RetryPolicy` class:
161+
162+
```python
163+
import grpc
164+
from yandexcloud import SDK, RetryPolicy
165+
166+
sdk = SDK(retry_policy=RetryPolicy(max_attempts=2, status_codes=(grpc.StatusCode.UNAVAILABLE,)))
167+
```
168+
169+
It's **strongly recommended** to use default settings of RetryPolicy to avoid retry amplification:
170+
```python
171+
import grpc
172+
from yandexcloud import SDK, RetryPolicy
173+
174+
sdk = SDK(retry_policy=RetryPolicy())
175+
```
176+
159177
## Contributing
160178
### Dependencies
161179
We use [uv](https://docs.astral.sh/uv) to manage dependencies and run commands in Makefile.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dependencies = [
2727
"requests>=2.32.3,<3",
2828
"six>=1.17.0,<2",
2929
"grpcio-tools>=1.68.1",
30+
"deprecated>=1.2.18",
3031
]
3132

3233
[project.readme]
@@ -57,6 +58,7 @@ type = [
5758
"grpc-stubs>=1.53.0.5",
5859
"types-requests>=2.32.0.20241016",
5960
"types-six>=1.17.0.20241205",
61+
"types-deprecated>=1.2.15.20241117",
6062
]
6163
style = [
6264
"flake8>=7.1.1",

tests/test_retry_policy.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from concurrent import futures
2+
from unittest.mock import Mock, patch
3+
4+
import grpc
5+
import pytest
6+
7+
from yandex.cloud.vpc.v1.network_pb2 import Network
8+
from yandex.cloud.vpc.v1.network_service_pb2 import GetNetworkRequest
9+
from yandex.cloud.vpc.v1.network_service_pb2_grpc import (
10+
NetworkServiceStub,
11+
add_NetworkServiceServicer_to_server,
12+
)
13+
from yandexcloud import SDK, RetryPolicy
14+
from yandexcloud._channels import Channels
15+
16+
INSECURE_SERVICE_PORT = "50051"
17+
SERVICE_ADDR = "localhost"
18+
19+
20+
def side_effect_internal(_, context):
21+
context.set_code(grpc.StatusCode.INTERNAL)
22+
23+
24+
def side_effect_unavailable(_, context):
25+
context.set_code(grpc.StatusCode.UNAVAILABLE)
26+
27+
28+
class VPCServiceMock:
29+
def __init__(self, fn):
30+
self.Get = Mock(return_value=Network(id="12342314"))
31+
self.Create = Mock()
32+
self.Update = Mock()
33+
self.Delete = Mock()
34+
self.ListSubnets = Mock()
35+
self.ListSecurityGroups = Mock()
36+
self.ListRouteTables = Mock()
37+
self.ListOperations = Mock()
38+
self.Move = Mock()
39+
self.List = Mock()
40+
41+
42+
@pytest.fixture
43+
def mock_channel():
44+
with patch.multiple(
45+
Channels,
46+
_get_creds=lambda self, endpoint: grpc.local_channel_credentials(),
47+
_get_endpoints=lambda self: {
48+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
49+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
50+
},
51+
) as channel_patch:
52+
yield channel_patch
53+
54+
55+
def grpc_server(side_effect):
56+
service = VPCServiceMock(side_effect)
57+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
58+
server.add_insecure_port("localhost:" + INSECURE_SERVICE_PORT)
59+
add_NetworkServiceServicer_to_server(service, server)
60+
server.start()
61+
return server, service
62+
63+
64+
def test_default_retries(mock_channel):
65+
server, service = grpc_server(side_effect_unavailable)
66+
67+
sdk = SDK(
68+
retry_policy=RetryPolicy(),
69+
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
70+
endpoints={
71+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
72+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
73+
},
74+
)
75+
network_client = sdk.client(NetworkServiceStub, insecure=True)
76+
try:
77+
request = GetNetworkRequest(network_id="asdf")
78+
network_client.Get(request)
79+
except grpc.RpcError:
80+
assert service.Get.call_count == 4
81+
82+
server.stop(0)
83+
84+
85+
def test_custom_retries(mock_channel):
86+
server, service = grpc_server(side_effect_internal)
87+
88+
sdk = SDK(
89+
retry_policy=RetryPolicy(status_codes=(grpc.StatusCode.INTERNAL,), max_attempts=4),
90+
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
91+
endpoints={
92+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
93+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
94+
},
95+
)
96+
network_client = sdk.client(NetworkServiceStub, insecure=True)
97+
try:
98+
request = GetNetworkRequest(network_id="asdf")
99+
network_client.Get(request)
100+
except grpc.RpcError:
101+
assert service.Get.call_count == 4
102+
103+
server.stop(0)
104+
105+
106+
def test_no_retries(mock_channel):
107+
server, service = grpc_server(side_effect_internal)
108+
109+
sdk = SDK(
110+
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
111+
endpoints={
112+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
113+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
114+
},
115+
)
116+
network_client = sdk.client(NetworkServiceStub, insecure=True)
117+
try:
118+
request = GetNetworkRequest(network_id="asdf")
119+
network_client.Get(request)
120+
except grpc.RpcError:
121+
assert service.Get.call_count == 1
122+
123+
server.stop(0)

uv.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

yandexcloud/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
default_backoff,
99
)
1010
from yandexcloud._retry_interceptor import RetryInterceptor
11+
from yandexcloud._retry_policy import RetryPolicy
1112
from yandexcloud._sdk import SDK
1213

1314
__version__ = "0.333.0"

yandexcloud/_channels.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def __init__(
3030
root_certificates: Optional[bytes] = None,
3131
private_key: Optional[bytes] = None,
3232
certificate_chain: Optional[bytes] = None,
33+
service_config: Optional[str] = None,
3334
**_: str,
3435
) -> None:
3536
self._channel_creds = grpc.ssl_channel_credentials(
@@ -48,10 +49,14 @@ def __init__(
4849
self._client_user_agent = client_user_agent
4950
self._config_endpoints = endpoints if endpoints is not None else {}
5051
self._endpoints: Optional[Dict[str, str]] = None
52+
# flake8: noqa
5153
self.channel_options = tuple(
52-
("grpc.primary_user_agent", user_agent)
53-
for user_agent in [self._client_user_agent, SDK_USER_AGENT]
54-
if user_agent is not None
54+
[
55+
("grpc.primary_user_agent", user_agent)
56+
for user_agent in [self._client_user_agent, SDK_USER_AGENT]
57+
if user_agent is not None
58+
]
59+
+ ([("grpc.service_config", service_config)] if service_config is not None else [])
5560
)
5661

5762
def channel(self, service: str, endpoint: Optional[str] = None, insecure: bool = False) -> grpc.Channel:

yandexcloud/_retry_interceptor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Callable, Iterable, Optional
55

66
import grpc
7+
from deprecated import deprecated
78

89

910
class _ClientCallDetails(
@@ -19,6 +20,7 @@ class _RetryCall(Exception):
1920
pass
2021

2122

23+
@deprecated(version="0.334.0", reason="Instead of this class use retry_policy field when building the SDK")
2224
class RetryInterceptor(grpc.UnaryUnaryClientInterceptor):
2325
"""RetryInterceptor implements grpc retries.
2426
It supports retries quantity, list of retriable codes, backoff function,

yandexcloud/_retry_policy.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import json
2+
from typing import Tuple
3+
4+
import grpc
5+
6+
7+
class RetryPolicy:
8+
def __init__(
9+
self,
10+
max_attempts: int = 4,
11+
status_codes: Tuple["grpc.StatusCode"] = (grpc.StatusCode.UNAVAILABLE,),
12+
):
13+
self.__policy = {
14+
"methodConfig": [
15+
{
16+
"name": [{}],
17+
"retryPolicy": {
18+
"maxAttempts": max_attempts,
19+
"initialBackoff": "0.1s",
20+
"maxBackoff": "20s",
21+
"backoffMultiplier": 2,
22+
"retryableStatusCodes": [status.name for status in status_codes],
23+
},
24+
}
25+
],
26+
"retryThrottling": {"maxTokens": 100, "tokenRatio": 0.1},
27+
"waitForReady": True,
28+
}
29+
30+
def to_json(self) -> str:
31+
return json.dumps(self.__policy)

yandexcloud/_sdk.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33

44
import grpc
55

6-
from yandexcloud import _channels, _helpers, _operation_waiter
7-
from yandexcloud._backoff import backoff_exponential_with_jitter
8-
from yandexcloud._retry_interceptor import RetryInterceptor
6+
from yandexcloud import _channels, _helpers, _operation_waiter, _retry_policy
97
from yandexcloud._wrappers import Wrappers
108

119
if TYPE_CHECKING:
@@ -41,14 +39,16 @@ def __init__(
4139
root_certificates: Optional[bytes] = None,
4240
private_key: Optional[bytes] = None,
4341
certificate_chain: Optional[bytes] = None,
42+
retry_policy: Optional[_retry_policy.RetryPolicy] = None,
4443
**kwargs: str,
4544
):
4645
"""
4746
API entry-point object.
4847
49-
:param interceptor: GRPC interceptor to be used instead of default RetryInterceptor
48+
:param interceptor: GRPC interceptor to be used
5049
:param user_agent: String to prepend User-Agent metadata header for all GRPC requests made via SDK object
5150
:param endpoints: Dict with services endpoints overrides. Example: {'vpc': 'new.vpc.endpoint:443'}
51+
:param retry_policy: Retry policy configuration object to retry all failed GRPC requests
5252
5353
"""
5454
self._channels = _channels.Channels(
@@ -61,14 +61,9 @@ def __init__(
6161
root_certificates,
6262
private_key,
6363
certificate_chain,
64+
retry_policy.to_json() if retry_policy is not None else None,
6465
**kwargs,
6566
)
66-
if interceptor is None:
67-
interceptor = RetryInterceptor(
68-
max_retry_count=5,
69-
per_call_timeout=30,
70-
back_off_func=backoff_exponential_with_jitter(1, 30),
71-
)
7267
self._default_interceptor = interceptor
7368
self.helpers = _helpers.Helpers(self)
7469
self.wrappers = Wrappers(self)

0 commit comments

Comments
 (0)