Skip to content

Commit 6ed676a

Browse files
DylanRussellemdnetoxrmx
authored
Make exporter timeout encompass retries/backoffs, add jitter to backoffs, cleanup code a bit (#4564)
* Initial commit to add timeout as a parm to export, make retries encompass timeout * Fix lint issues * Fix a bunch of failing style/lint/spellcheck checks * Remove timeout param from the export calls. * Fix flaky windows test ? * Respond to review comments.. * Delete exponential backoff code that is now unused * Add changelog and remove some unused imports.. * fix typo and unit test flaking on windows * Refactor tests, HTTP exporters a bit * Remove unneeded test reqs * Remove gRPC retry config * Tweak backoff calculation * Lint and precommit * Empty commit * Another empty commit * Calculate backoff in 1 place instead of 2 * Update changelog * Update changelog * Make new _common directory in the http exporter for shared code * precommit * Respond to comments on PR * Fix broken test, execute precommit * Skip some tests on windows * Explain why test is skipped * Update exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py Co-authored-by: Emídio Neto <[email protected]> * Revert change to start respecting timeout passed into metric exporter --------- Co-authored-by: Emídio Neto <[email protected]> Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 1689654 commit 6ed676a

File tree

19 files changed

+429
-508
lines changed

19 files changed

+429
-508
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Update OTLP gRPC/HTTP exporters: the export timeout is now inclusive of all retries and backoffs.
11+
A +/-20% jitter was added to all backoffs. A pointless 32 second sleep that occurred after all retries
12+
had completed/failed was removed.
13+
([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)).
1014
- Update ConsoleLogExporter.export to handle LogRecord's containing bytes type
1115
in the body ([#4614](https://github.com/open-telemetry/opentelemetry-python/pull/4614/)).
1216
- opentelemetry-sdk: Fix invalid `type: ignore` that causes mypy to ignore the whole file
@@ -20,9 +24,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2024

2125
- typecheck: add sdk/resources and drop mypy
2226
([#4578](https://github.com/open-telemetry/opentelemetry-python/pull/4578))
23-
- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
24-
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
25-
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
2627
- Use PEP702 for marking deprecations
2728
([#4522](https://github.com/open-telemetry/opentelemetry-python/pull/4522))
2829
- Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code

exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717

1818
import logging
1919
from collections.abc import Sequence
20-
from itertools import count
2120
from typing import (
2221
Any,
2322
Callable,
2423
Dict,
25-
Iterator,
2624
List,
2725
Mapping,
2826
Optional,
@@ -177,38 +175,3 @@ def _get_resource_data(
177175
)
178176
)
179177
return resource_data
180-
181-
182-
def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]:
183-
"""
184-
Generates an infinite sequence of exponential backoff values. The sequence starts
185-
from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified
186-
and non-zero, the generated values will not exceed this maximum, capping at max_value
187-
instead of growing indefinitely.
188-
189-
Parameters:
190-
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
191-
sequence grows without bound.
192-
193-
Returns:
194-
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
195-
capped at max_value.
196-
197-
Example:
198-
```
199-
gen = _create_exp_backoff_generator(max_value=10)
200-
for _ in range(5):
201-
print(next(gen))
202-
```
203-
This will print:
204-
1
205-
2
206-
4
207-
8
208-
10
209-
210-
Note: this functionality used to be handled by the 'backoff' package.
211-
"""
212-
for i in count(0):
213-
out = 2**i
214-
yield min(out, max_value) if max_value else out

exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py

Lines changed: 0 additions & 46 deletions
This file was deleted.

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(
5858
headers: Optional[
5959
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
6060
] = None,
61-
timeout: Optional[int] = None,
61+
timeout: Optional[float] = None,
6262
compression: Optional[Compression] = None,
6363
):
6464
if insecure is None:
@@ -79,7 +79,7 @@ def __init__(
7979

8080
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
8181
environ_timeout = (
82-
int(environ_timeout) if environ_timeout is not None else None
82+
float(environ_timeout) if environ_timeout is not None else None
8383
)
8484

8585
compression = (

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 53 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@
1414

1515
"""OTLP Exporter"""
1616

17+
import random
1718
import threading
1819
from abc import ABC, abstractmethod
1920
from collections.abc import Sequence # noqa: F401
2021
from logging import getLogger
2122
from os import environ
22-
from time import sleep
23+
from time import sleep, time
2324
from typing import ( # noqa: F401
2425
Any,
2526
Callable,
@@ -47,7 +48,6 @@
4748
ssl_channel_credentials,
4849
)
4950
from opentelemetry.exporter.otlp.proto.common._internal import (
50-
_create_exp_backoff_generator,
5151
_get_resource_data,
5252
)
5353
from opentelemetry.exporter.otlp.proto.grpc import (
@@ -74,6 +74,18 @@
7474
from opentelemetry.sdk.trace import ReadableSpan
7575
from opentelemetry.util.re import parse_env_headers
7676

77+
_RETRYABLE_ERROR_CODES = frozenset(
78+
[
79+
StatusCode.CANCELLED,
80+
StatusCode.DEADLINE_EXCEEDED,
81+
StatusCode.RESOURCE_EXHAUSTED,
82+
StatusCode.ABORTED,
83+
StatusCode.OUT_OF_RANGE,
84+
StatusCode.UNAVAILABLE,
85+
StatusCode.DATA_LOSS,
86+
]
87+
)
88+
_MAX_RETRYS = 6
7789
logger = getLogger(__name__)
7890
SDKDataT = TypeVar("SDKDataT")
7991
ResourceDataT = TypeVar("ResourceDataT")
@@ -186,8 +198,6 @@ class OTLPExporterMixin(
186198
compression: gRPC compression method to use
187199
"""
188200

189-
_MAX_RETRY_TIMEOUT = 64
190-
191201
def __init__(
192202
self,
193203
endpoint: Optional[str] = None,
@@ -196,7 +206,7 @@ def __init__(
196206
headers: Optional[
197207
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
198208
] = None,
199-
timeout: Optional[int] = None,
209+
timeout: Optional[float] = None,
200210
compression: Optional[Compression] = None,
201211
):
202212
super().__init__()
@@ -233,7 +243,7 @@ def __init__(
233243
else:
234244
self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS)
235245

236-
self._timeout = timeout or int(
246+
self._timeout = timeout or float(
237247
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10)
238248
)
239249
self._collector_kwargs = None
@@ -246,7 +256,8 @@ def __init__(
246256

247257
if insecure:
248258
self._channel = insecure_channel(
249-
self._endpoint, compression=compression
259+
self._endpoint,
260+
compression=compression,
250261
)
251262
else:
252263
credentials = _get_credentials(
@@ -256,7 +267,9 @@ def __init__(
256267
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
257268
)
258269
self._channel = secure_channel(
259-
self._endpoint, credentials, compression=compression
270+
self._endpoint,
271+
credentials,
272+
compression=compression,
260273
)
261274
self._client = self._stub(self._channel)
262275

@@ -270,89 +283,61 @@ def _translate_data(
270283
pass
271284

272285
def _export(
273-
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
286+
self,
287+
data: Union[TypingSequence[ReadableSpan], MetricsData],
274288
) -> ExportResultT:
275-
# After the call to shutdown, subsequent calls to Export are
276-
# not allowed and should return a Failure result.
277289
if self._shutdown:
278290
logger.warning("Exporter already shutdown, ignoring batch")
279291
return self._result.FAILURE
280292

281293
# FIXME remove this check if the export type for traces
282294
# gets updated to a class that represents the proto
283295
# TracesData and use the code below instead.
284-
# logger.warning(
285-
# "Transient error %s encountered while exporting %s, retrying in %ss.",
286-
# error.code(),
287-
# data.__class__.__name__,
288-
# delay,
289-
# )
290-
# expo returns a generator that yields delay values which grow
291-
# exponentially. Once delay is greater than max_value, the yielded
292-
# value will remain constant.
293-
for delay in _create_exp_backoff_generator(
294-
max_value=self._MAX_RETRY_TIMEOUT
295-
):
296-
if delay == self._MAX_RETRY_TIMEOUT or self._shutdown:
297-
return self._result.FAILURE
298-
299-
with self._export_lock:
296+
with self._export_lock:
297+
deadline_sec = time() + self._timeout
298+
for retry_num in range(_MAX_RETRYS):
300299
try:
301300
self._client.Export(
302301
request=self._translate_data(data),
303302
metadata=self._headers,
304-
timeout=self._timeout,
303+
timeout=deadline_sec - time(),
305304
)
306-
307305
return self._result.SUCCESS
308-
309306
except RpcError as error:
310-
if error.code() in [
311-
StatusCode.CANCELLED,
312-
StatusCode.DEADLINE_EXCEEDED,
313-
StatusCode.RESOURCE_EXHAUSTED,
314-
StatusCode.ABORTED,
315-
StatusCode.OUT_OF_RANGE,
316-
StatusCode.UNAVAILABLE,
317-
StatusCode.DATA_LOSS,
318-
]:
319-
retry_info_bin = dict(error.trailing_metadata()).get(
320-
"google.rpc.retryinfo-bin"
321-
)
322-
if retry_info_bin is not None:
323-
retry_info = RetryInfo()
324-
retry_info.ParseFromString(retry_info_bin)
325-
delay = (
326-
retry_info.retry_delay.seconds
327-
+ retry_info.retry_delay.nanos / 1.0e9
328-
)
329-
330-
logger.warning(
331-
(
332-
"Transient error %s encountered while exporting "
333-
"%s to %s, retrying in %ss."
334-
),
335-
error.code(),
336-
self._exporting,
337-
self._endpoint,
338-
delay,
307+
retry_info_bin = dict(error.trailing_metadata()).get(
308+
"google.rpc.retryinfo-bin"
309+
)
310+
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
311+
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
312+
if retry_info_bin is not None:
313+
retry_info = RetryInfo()
314+
retry_info.ParseFromString(retry_info_bin)
315+
backoff_seconds = (
316+
retry_info.retry_delay.seconds
317+
+ retry_info.retry_delay.nanos / 1.0e9
339318
)
340-
sleep(delay)
341-
continue
342-
else:
319+
if (
320+
error.code() not in _RETRYABLE_ERROR_CODES
321+
or retry_num + 1 == _MAX_RETRYS
322+
or backoff_seconds > (deadline_sec - time())
323+
):
343324
logger.error(
344325
"Failed to export %s to %s, error code: %s",
345326
self._exporting,
346327
self._endpoint,
347328
error.code(),
348329
exc_info=error.code() == StatusCode.UNKNOWN,
349330
)
350-
351-
if error.code() == StatusCode.OK:
352-
return self._result.SUCCESS
353-
354-
return self._result.FAILURE
355-
331+
return self._result.FAILURE
332+
logger.warning(
333+
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
334+
error.code(),
335+
self._exporting,
336+
self._endpoint,
337+
backoff_seconds,
338+
)
339+
sleep(backoff_seconds)
340+
# Not possible to reach here but the linter is complaining.
356341
return self._result.FAILURE
357342

358343
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def __init__(
9999
credentials: ChannelCredentials | None = None,
100100
headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str]
101101
| None = None,
102-
timeout: int | None = None,
102+
timeout: float | None = None,
103103
compression: Compression | None = None,
104104
preferred_temporality: dict[type, AggregationTemporality]
105105
| None = None,
@@ -124,7 +124,7 @@ def __init__(
124124

125125
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
126126
environ_timeout = (
127-
int(environ_timeout) if environ_timeout is not None else None
127+
float(environ_timeout) if environ_timeout is not None else None
128128
)
129129

130130
compression = (
@@ -172,7 +172,6 @@ def export(
172172

173173
if split_export_result is MetricExportResult.FAILURE:
174174
export_result = MetricExportResult.FAILURE
175-
176175
return export_result
177176

178177
def _split_metrics_data(

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__(
9191
headers: Optional[
9292
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
9393
] = None,
94-
timeout: Optional[int] = None,
94+
timeout: Optional[float] = None,
9595
compression: Optional[Compression] = None,
9696
):
9797
if insecure is None:
@@ -112,7 +112,7 @@ def __init__(
112112

113113
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
114114
environ_timeout = (
115-
int(environ_timeout) if environ_timeout is not None else None
115+
float(environ_timeout) if environ_timeout is not None else None
116116
)
117117

118118
compression = (

0 commit comments

Comments
 (0)