-
Notifications
You must be signed in to change notification settings - Fork 689
how can Livekit transport support video? #1380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
We're happy to add better support for LiveKit, but we also need some helping hands; ideally, someone with LiveKit expertise. Is anyone available to help out? |
@ZaymeShaw # src/pipecat/transports/services/livekit.py
async def process_frame(self, frame: Frame, direction: FrameDirection):
""" 重写 livekit 的 process_frame 方法用于处理要发布到 livekit 的视频帧 """
await super().process_frame(frame, direction)
if isinstance(frame, OutputImageRawFrame):
if frame.format == "RGB":
# 直接使用 RGB 数据
livekit_video = self._convert_image_to_livekit(frame.image, frame.size)
await self._client.publish_video(livekit_video) # push video to livekit
else:
# 其他类型的帧...
pass
def _convert_image_to_livekit(self, image_data: bytes, size: Tuple[int, int]) -> rtc.VideoFrame:
"""将 OutputImageRawFrame 中的图像数据转换为 LiveKit VideoFrame"""
livekit_frame = rtc.VideoFrame(
width=size[0],
height=size[1],
type=rtc.VideoBufferType.RGB24,
data=image_data
)
return livekit_frame |
@ZaymeShaw # src/pipecat/transports/services/livekit.py
async def _async_on_track_subscribed(
self,
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
logger.info(f"Audio track subscribed: {track.sid} from participant {participant.sid}")
self._audio_tracks[participant.sid] = track
audio_stream = rtc.AudioStream(track)
self._task_manager.create_task(
self._process_audio_stream(audio_stream, participant.sid),
f"{self}::_process_audio_stream",
)
elif track.kind == rtc.TrackKind.KIND_VIDEO:
# 视频拉流
logger.info(f"Video track subscribed: {track.sid} from participant {participant.sid}")
self._video_tracks[participant.sid] = track
video_stream = rtc.VideoStream(track)
self._task_manager.create_task(
self._process_video_stream(video_stream, participant.sid),
f"{self}::_process_video_stream",
)
async def _process_video_stream(self, video_stream: rtc.VideoStream, participant_id: str):
"""视频处理,获取最新一帧"""
logger.info(f"Started processing video stream for participant {participant_id}")
async for event in video_stream:
if isinstance(event, rtc.VideoFrameEvent):
self._latest_image = event.frame
await asyncio.sleep(1)
else:
logger.warning(f"Received unexpected event type: {type(event)}") |
Is it only daily transport support video currently?
The text was updated successfully, but these errors were encountered: