Skip to content

Commit 9b1606b

Browse files
Merge pull request #1444 from roboflow/fix/webrtc-latency-improvements
Webrtc latency for heavy workflows
2 parents 1582bcc + 037a552 commit 9b1606b

File tree

4 files changed

+25
-61
lines changed

4 files changed

+25
-61
lines changed

inference/core/interfaces/camera/video_source.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -550,21 +550,12 @@ def read_frame(self, timeout: Optional[float] = None) -> Optional[VideoFrame]:
550550
self._fps = source_metadata.source_properties.fps
551551
if not self._fps or self._fps <= 0 or self._fps > 1000:
552552
self._fps = 30 # sane default
553-
if not self._is_file:
554-
current_timestamp = time.time_ns()
555-
if (current_timestamp - self._last_frame_timestamp) / 1e9 < 1 / self._fps:
556-
time.sleep(
557-
(1 / self._fps)
558-
- (current_timestamp - self._last_frame_timestamp) / 1e9
559-
)
560553
video_frame: Optional[Union[VideoFrame, str]] = get_from_queue(
561554
queue=self._frames_buffer,
562555
on_successful_read=self._video_consumer.notify_frame_consumed,
563556
timeout=timeout,
564557
purge=self._buffer_consumption_strategy is BufferConsumptionStrategy.EAGER,
565558
)
566-
if not self._is_file:
567-
self._last_frame_timestamp = time.time_ns()
568559
if video_frame == POISON_PILL:
569560
raise EndOfStreamError(
570561
"Attempted to retrieve frame from stream that already ended."
@@ -926,7 +917,7 @@ def _set_file_mode_buffering_strategies(self) -> None:
926917

927918
def _set_stream_mode_buffering_strategies(self) -> None:
928919
if self._buffer_filling_strategy is None:
929-
self._buffer_filling_strategy = BufferFillingStrategy.DROP_OLDEST
920+
self._buffer_filling_strategy = BufferFillingStrategy.ADAPTIVE_DROP_OLDEST
930921

931922
def _video_fps_should_be_sub_sampled(self) -> bool:
932923
if self._desired_fps is None:

inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,6 @@ def start_loop(loop: asyncio.AbstractEventLoop):
280280
from_inference_queue=from_inference_queue,
281281
asyncio_loop=loop,
282282
webcam_fps=webcam_fps,
283-
max_consecutive_timeouts=parsed_payload.max_consecutive_timeouts,
284-
min_consecutive_on_time=parsed_payload.min_consecutive_on_time,
285283
processing_timeout=parsed_payload.processing_timeout,
286284
fps_probe_frames=parsed_payload.fps_probe_frames,
287285
data_output=data_output,

inference/core/interfaces/stream_manager/manager_app/webrtc.py

Lines changed: 23 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import concurrent.futures
3+
import datetime
34
import json
45
import time
56
from typing import Any, Dict, List, Optional, Tuple, Union
@@ -62,10 +63,27 @@ def get_frame_from_workflow_output(
6263
) -> Optional[np.ndarray]:
6364
step_output = workflow_output.get(frame_output_key)
6465
if isinstance(step_output, WorkflowImageData):
66+
if (
67+
step_output.video_metadata
68+
and step_output.video_metadata.frame_timestamp is not None
69+
):
70+
latency = (
71+
datetime.datetime.now() - step_output.video_metadata.frame_timestamp
72+
)
73+
logger.info("Processing latency: %ss", latency.total_seconds())
6574
return step_output.numpy_image
6675
elif isinstance(step_output, dict):
6776
for frame_output in step_output.values():
6877
if isinstance(frame_output, WorkflowImageData):
78+
if (
79+
frame_output.video_metadata
80+
and frame_output.video_metadata.frame_timestamp is not None
81+
):
82+
latency = (
83+
datetime.datetime.now()
84+
- frame_output.video_metadata.frame_timestamp
85+
)
86+
logger.info("Processing latency: %ss", latency.total_seconds())
6987
return frame_output.numpy_image
7088

7189

@@ -77,8 +95,6 @@ def __init__(
7795
asyncio_loop: asyncio.AbstractEventLoop,
7896
processing_timeout: float,
7997
fps_probe_frames: int,
80-
min_consecutive_on_time: int,
81-
max_consecutive_timeouts: Optional[int] = None,
8298
webcam_fps: Optional[float] = None,
8399
*args,
84100
**kwargs,
@@ -104,10 +120,6 @@ def __init__(
104120
self.incoming_stream_fps: Optional[float] = webcam_fps
105121

106122
self._last_frame: Optional[VideoFrame] = None
107-
self._consecutive_timeouts: int = 0
108-
self._consecutive_on_time: int = 0
109-
self._max_consecutive_timeouts: Optional[int] = max_consecutive_timeouts
110-
self._min_consecutive_on_time: int = min_consecutive_on_time
111123

112124
self._av_logging_set: bool = False
113125

@@ -146,7 +158,7 @@ async def recv(self):
146158

147159
if not await self.to_inference_queue.async_full():
148160
await self.to_inference_queue.async_put(frame)
149-
elif not self._last_frame:
161+
else:
150162
await self.to_inference_queue.async_get_nowait()
151163
await self.to_inference_queue.async_put_nowait(frame)
152164

@@ -157,55 +169,22 @@ async def recv(self):
157169
)
158170
new_frame = VideoFrame.from_ndarray(np_frame, format="bgr24")
159171
self._last_frame = new_frame
160-
161-
if self._max_consecutive_timeouts:
162-
self._consecutive_on_time += 1
163-
if self._consecutive_on_time >= self._min_consecutive_on_time:
164-
self._consecutive_timeouts = 0
165172
except asyncio.TimeoutError:
166-
while not await self.to_inference_queue.async_empty():
167-
await self.to_inference_queue.async_get_nowait()
168-
if self._last_frame:
169-
if self._max_consecutive_timeouts:
170-
self._consecutive_timeouts += 1
171-
if self._consecutive_timeouts >= self._max_consecutive_timeouts:
172-
self._consecutive_on_time = 0
173-
174-
workflow_too_slow_message = [
175-
"Workflow is too heavy to process all frames on time..."
176-
]
173+
pass
174+
177175
if np_frame is None:
178176
if not self._last_frame:
179177
np_frame = overlay_text_on_np_frame(
180178
frame.to_ndarray(format="bgr24"),
181179
["Inference pipeline is starting..."],
182180
)
183181
new_frame = VideoFrame.from_ndarray(np_frame, format="bgr24")
184-
elif (
185-
self._max_consecutive_timeouts
186-
and self._consecutive_timeouts >= self._max_consecutive_timeouts
187-
):
188-
np_frame = overlay_text_on_np_frame(
189-
self._last_frame.to_ndarray(format="bgr24"),
190-
workflow_too_slow_message,
191-
)
192-
new_frame = VideoFrame.from_ndarray(np_frame, format="bgr24")
193182
else:
194183
new_frame = self._last_frame
195184
else:
196-
if (
197-
self._max_consecutive_timeouts
198-
and self._consecutive_timeouts >= self._max_consecutive_timeouts
199-
):
200-
np_frame = overlay_text_on_np_frame(
201-
self._last_frame.to_ndarray(format="bgr24"),
202-
workflow_too_slow_message,
203-
)
204-
new_frame = VideoFrame.from_ndarray(np_frame, format="bgr24")
205-
else:
206-
new_frame = VideoFrame.from_ndarray(np_frame, format="bgr24")
185+
new_frame = VideoFrame.from_ndarray(np_frame, format="bgr24")
207186

208-
new_frame.pts = self._processed
187+
new_frame.pts = frame.pts
209188
new_frame.time_base = frame.time_base
210189

211190
return new_frame
@@ -312,8 +291,6 @@ async def init_rtc_peer_connection(
312291
asyncio_loop: asyncio.AbstractEventLoop,
313292
processing_timeout: float,
314293
fps_probe_frames: int,
315-
max_consecutive_timeouts: int,
316-
min_consecutive_on_time: int,
317294
webrtc_turn_config: Optional[WebRTCTURNConfig] = None,
318295
webcam_fps: Optional[float] = None,
319296
stream_output: Optional[str] = None,
@@ -326,8 +303,6 @@ async def init_rtc_peer_connection(
326303
webcam_fps=webcam_fps,
327304
processing_timeout=processing_timeout,
328305
fps_probe_frames=fps_probe_frames,
329-
max_consecutive_timeouts=max_consecutive_timeouts,
330-
min_consecutive_on_time=min_consecutive_on_time,
331306
)
332307

333308
if webrtc_turn_config:

inference/core/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "0.51.5"
1+
__version__ = "0.51.6"
22

33

44
if __name__ == "__main__":

0 commit comments

Comments
 (0)