Skip to content

Commit 037e9cb

Browse files
DylanRusselllzchen
andauthored
Make a BatchProcessor class which both BatchSpanRecordProcessor and BatchLogRecordProcessor can use (#4562)
* Refactor BatchLogRecordProcessor * Respond to comments * Fix lint * Add delay for windows test. * Fix fork test * Initial Commit * Another commit * Fix lint / precommit * Revert some old changes * Fix lint issues * Fix typo * Fix lint and spellcheck * Update test to use BatchLogRecordProcessor instead of BatchProcessor * Add a sleep to see if it helps this test pass on pypy 3.8 * fix lint and precommit * Add sleep to try to fix test.. * Fix flaky test attempt #2 * Fix test again.. * Fix test again * Try again * Fix again.. * Reintroduce weakref, I accidentlly undid that change in my last PR * Add changelog * Respond to comments on PR * Make BatchProcessor a member of BLRP instead of having BLRP subclass it * Run precommit * Use a generic Protocol for the Exporter * Minor change * Fix bad changelog mearge * Respond to comments.. --------- Co-authored-by: Leighton Chen <[email protected]>
1 parent f4f3253 commit 037e9cb

File tree

6 files changed

+520
-334
lines changed

6 files changed

+520
-334
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
11+
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
12+
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
13+
14+
1015
## Version 1.33.0/0.54b0 (2025-05-09)
1116

1217
- Fix intermittent `Connection aborted` error when using otlp/http exporters

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 16 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@
1414
from __future__ import annotations
1515

1616
import abc
17-
import collections
1817
import enum
1918
import logging
20-
import os
2119
import sys
22-
import threading
23-
import weakref
2420
from os import environ, linesep
25-
from typing import IO, Callable, Deque, Optional, Sequence
21+
from typing import IO, Callable, Optional, Sequence
2622

2723
from opentelemetry.context import (
2824
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -31,13 +27,13 @@
3127
set_value,
3228
)
3329
from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
30+
from opentelemetry.sdk._shared_internal import BatchProcessor
3431
from opentelemetry.sdk.environment_variables import (
3532
OTEL_BLRP_EXPORT_TIMEOUT,
3633
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
3734
OTEL_BLRP_MAX_QUEUE_SIZE,
3835
OTEL_BLRP_SCHEDULE_DELAY,
3936
)
40-
from opentelemetry.util._once import Once
4137

4238
_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
4339
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
@@ -46,7 +42,6 @@
4642
_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
4743
"Unable to parse value for %s as integer. Defaulting to %s."
4844
)
49-
5045
_logger = logging.getLogger(__name__)
5146

5247

@@ -55,29 +50,19 @@ class LogExportResult(enum.Enum):
5550
FAILURE = 1
5651

5752

58-
class BatchLogExportStrategy(enum.Enum):
59-
EXPORT_ALL = 0
60-
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
61-
EXPORT_AT_LEAST_ONE_BATCH = 2
62-
63-
6453
class LogExporter(abc.ABC):
6554
"""Interface for exporting logs.
66-
6755
Interface to be implemented by services that want to export logs received
6856
in their own format.
69-
7057
To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
7158
log processor.
7259
"""
7360

7461
@abc.abstractmethod
7562
def export(self, batch: Sequence[LogData]):
7663
"""Exports a batch of logs.
77-
7864
Args:
7965
batch: The list of `LogData` objects to be exported
80-
8166
Returns:
8267
The result of the export
8368
"""
@@ -146,9 +131,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
146131
return True
147132

148133

149-
_BSP_RESET_ONCE = Once()
150-
151-
152134
class BatchLogRecordProcessor(LogRecordProcessor):
153135
"""This is an implementation of LogRecordProcessor which creates batches of
154136
received logs in the export-friendly LogData representation and
@@ -161,9 +143,9 @@ class BatchLogRecordProcessor(LogRecordProcessor):
161143
- :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
162144
- :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
163145
- :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
164-
"""
165146
166-
_queue: Deque[LogData]
147+
All the logic for emitting logs, shutting down etc. resides in the BatchProcessor class.
148+
"""
167149

168150
def __init__(
169151
self,
@@ -194,127 +176,24 @@ def __init__(
194176
BatchLogRecordProcessor._validate_arguments(
195177
max_queue_size, schedule_delay_millis, max_export_batch_size
196178
)
197-
198-
self._exporter = exporter
199-
self._max_queue_size = max_queue_size
200-
self._schedule_delay = schedule_delay_millis / 1e3
201-
self._max_export_batch_size = max_export_batch_size
202-
# Not used. No way currently to pass timeout to export.
203-
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
204-
self._export_timeout_millis = export_timeout_millis
205-
# Deque is thread safe.
206-
self._queue = collections.deque([], max_queue_size)
207-
self._worker_thread = threading.Thread(
208-
name="OtelBatchLogRecordProcessor",
209-
target=self.worker,
210-
daemon=True,
179+
# Initializes BatchProcessor
180+
self._batch_processor = BatchProcessor(
181+
exporter,
182+
schedule_delay_millis,
183+
max_export_batch_size,
184+
export_timeout_millis,
185+
max_queue_size,
186+
"Log",
211187
)
212188

213-
self._shutdown = False
214-
self._export_lock = threading.Lock()
215-
self._worker_awaken = threading.Event()
216-
self._worker_thread.start()
217-
if hasattr(os, "register_at_fork"):
218-
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
219-
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
220-
self._pid = os.getpid()
221-
222-
def _should_export_batch(
223-
self, batch_strategy: BatchLogExportStrategy, num_iterations: int
224-
) -> bool:
225-
if not self._queue:
226-
return False
227-
# Always continue to export while queue length exceeds max batch size.
228-
if len(self._queue) >= self._max_export_batch_size:
229-
return True
230-
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
231-
return True
232-
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
233-
return num_iterations == 0
234-
return False
235-
236-
def _at_fork_reinit(self):
237-
self._export_lock = threading.Lock()
238-
self._worker_awaken = threading.Event()
239-
self._queue.clear()
240-
self._worker_thread = threading.Thread(
241-
name="OtelBatchLogRecordProcessor",
242-
target=self.worker,
243-
daemon=True,
244-
)
245-
self._worker_thread.start()
246-
self._pid = os.getpid()
247-
248-
def worker(self):
249-
while not self._shutdown:
250-
# Lots of strategies in the spec for setting next timeout.
251-
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
252-
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
253-
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
254-
if self._shutdown:
255-
break
256-
self._export(
257-
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
258-
if sleep_interrupted
259-
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
260-
)
261-
self._worker_awaken.clear()
262-
self._export(BatchLogExportStrategy.EXPORT_ALL)
263-
264-
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
265-
with self._export_lock:
266-
iteration = 0
267-
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
268-
# once the lock is obtained to see if we still need to make the requested export.
269-
while self._should_export_batch(batch_strategy, iteration):
270-
iteration += 1
271-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
272-
try:
273-
self._exporter.export(
274-
[
275-
# Oldest records are at the back, so pop from there.
276-
self._queue.pop()
277-
for _ in range(
278-
min(
279-
self._max_export_batch_size,
280-
len(self._queue),
281-
)
282-
)
283-
]
284-
)
285-
except Exception: # pylint: disable=broad-exception-caught
286-
_logger.exception("Exception while exporting logs.")
287-
detach(token)
288-
289189
def emit(self, log_data: LogData) -> None:
290-
if self._shutdown:
291-
_logger.info("Shutdown called, ignoring log.")
292-
return
293-
if self._pid != os.getpid():
294-
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
295-
296-
if len(self._queue) == self._max_queue_size:
297-
_logger.warning("Queue full, dropping log.")
298-
self._queue.appendleft(log_data)
299-
if len(self._queue) >= self._max_export_batch_size:
300-
self._worker_awaken.set()
190+
return self._batch_processor.emit(log_data)
301191

302192
def shutdown(self):
303-
if self._shutdown:
304-
return
305-
# Prevents emit and force_flush from further calling export.
306-
self._shutdown = True
307-
# Interrupts sleep in the worker, if it's sleeping.
308-
self._worker_awaken.set()
309-
# Main worker loop should exit after one final export call with flush all strategy.
310-
self._worker_thread.join()
311-
self._exporter.shutdown()
193+
return self._batch_processor.shutdown()
312194

313-
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
314-
if self._shutdown:
315-
return
316-
# Blocking call to export.
317-
self._export(BatchLogExportStrategy.EXPORT_ALL)
195+
def force_flush(self, timeout_millis: Optional[int] = None):
196+
return self._batch_processor.force_flush(timeout_millis)
318197

319198
@staticmethod
320199
def _default_max_queue_size():

0 commit comments

Comments
 (0)