-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpipeline.py
124 lines (96 loc) · 4.04 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from config import config, register_functions
# from tools_config import config, register_functions
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService, AnthropicUserContextAggregator, AnthropicAssistantContextAggregator
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import (
RTVIConfig,
RTVIProcessor,
RTVIServiceConfig,
RTVIServiceOptionConfig)
from helpers.bot_rtvi_actions import register_rtvi_actions
from helpers.bot_rtvi_services import register_rtvi_services
from openai._types import NotGiven
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
# logger.add(sys.stderr, level="TRACE")
async def main():
global llm
async with aiohttp.ClientSession() as session:
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL")
token = os.getenv("DAILY_ROOM_TOKEN")
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=16000,
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
enable_prompt_caching_beta=True
)
register_functions(llm)
messages = []
tools = NotGiven()
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
user_aggregator = context_aggregator.user()
assistant_aggregator = context_aggregator.assistant()
rtvi = RTVIProcessor(config=RTVIConfig(config=config))
await register_rtvi_services(rtvi, user_aggregator)
await register_rtvi_actions(rtvi, user_aggregator)
llm.register_function(
None,
rtvi.handle_function_call,
start_callback=rtvi.handle_function_call_start)
pipeline = Pipeline([
transport.input(), # Transport user input
rtvi, # RTVI
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("FIRST P JOINED")
video_participant_id = participant["id"]
transport.capture_participant_transcription(video_participant_id)
transport.capture_participant_video(video_participant_id, framerate=0)
logger.debug("SETTING ATTRIBUTE")
setattr(llm, 'video_participant_id', video_participant_id)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())