25
25
TTSStoppedFrame ,
26
26
)
27
27
from pipecat .processors .frame_processor import FrameDirection
28
- from pipecat .services .tts_service import InterruptibleWordTTSService , TTSService
28
+ from pipecat .services .tts_service import InterruptibleWordTTSService , WordTTSService
29
29
from pipecat .transcriptions .language import Language
30
30
31
31
# See .env.example for ElevenLabs configuration needed
@@ -441,8 +441,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
441
441
logger .error (f"{ self } exception: { e } " )
442
442
443
443
444
- class ElevenLabsHttpTTSService (TTSService ):
445
- """ElevenLabs Text-to-Speech service using HTTP streaming.
444
+ class ElevenLabsHttpTTSService (WordTTSService ):
445
+ """ElevenLabs Text-to-Speech service using HTTP streaming with word timestamps .
446
446
447
447
Args:
448
448
api_key: ElevenLabs API key
@@ -475,7 +475,13 @@ def __init__(
475
475
params : InputParams = InputParams (),
476
476
** kwargs ,
477
477
):
478
- super ().__init__ (sample_rate = sample_rate , ** kwargs )
478
+ super ().__init__ (
479
+ aggregate_sentences = True ,
480
+ push_text_frames = False ,
481
+ push_stop_frames = True ,
482
+ sample_rate = sample_rate ,
483
+ ** kwargs ,
484
+ )
479
485
480
486
self ._api_key = api_key
481
487
self ._base_url = base_url
@@ -498,28 +504,109 @@ def __init__(
498
504
self ._output_format = "" # initialized in start()
499
505
self ._voice_settings = self ._set_voice_settings ()
500
506
507
+ # Track cumulative time to properly sequence word timestamps across utterances
508
+ self ._cumulative_time = 0
509
+ self ._started = False
510
+
511
+ def language_to_service_language (self , language : Language ) -> Optional [str ]:
512
+ """Convert pipecat Language to ElevenLabs language code."""
513
+ return language_to_elevenlabs_language (language )
514
+
501
515
def can_generate_metrics (self ) -> bool :
516
+ """Indicate that this service can generate usage metrics."""
502
517
return True
503
518
504
519
def _set_voice_settings (self ):
505
520
return build_elevenlabs_voice_settings (self ._settings )
506
521
507
522
async def start (self , frame : StartFrame ):
523
+ """Initialize the service upon receiving a StartFrame."""
508
524
await super ().start (frame )
509
525
self ._output_format = output_format_from_sample_rate (self .sample_rate )
526
+ self ._cumulative_time = 0
527
+ self ._started = False
528
+
529
+ async def push_frame (self , frame : Frame , direction : FrameDirection = FrameDirection .DOWNSTREAM ):
530
+ await super ().push_frame (frame , direction )
531
+ if isinstance (frame , (StartInterruptionFrame , TTSStoppedFrame )):
532
+ # Reset timing on interruption or stop
533
+ self ._started = False
534
+ self ._cumulative_time = 0
535
+ if isinstance (frame , TTSStoppedFrame ):
536
+ await self .add_word_timestamps ([("LLMFullResponseEndFrame" , 0 ), ("Reset" , 0 )])
537
+
538
+ def calculate_word_times (self , alignment_info : Mapping [str , Any ]) -> List [Tuple [str , float ]]:
539
+ """Calculate word timing from character alignment data.
540
+
541
+ Example input data:
542
+ {
543
+ "characters": [" ", "H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"],
544
+ "character_start_times_seconds": [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
545
+ "character_end_times_seconds": [0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
546
+ }
547
+
548
+ Would produce word times (with cumulative_time=0):
549
+ [("Hello", 0.1), ("world", 0.5)]
550
+
551
+ Args:
552
+ alignment_info: Character timing data from ElevenLabs
553
+
554
+ Returns:
555
+ List of (word, timestamp) pairs
556
+ """
557
+ chars = alignment_info .get ("characters" , [])
558
+ char_start_times = alignment_info .get ("character_start_times_seconds" , [])
559
+
560
+ if not chars or not char_start_times or len (chars ) != len (char_start_times ):
561
+ logger .warning (
562
+ f"Invalid alignment data: chars={ len (chars )} , times={ len (char_start_times )} "
563
+ )
564
+ return []
565
+
566
+ # Build the words and find their start times
567
+ words = []
568
+ word_start_times = []
569
+ current_word = ""
570
+ first_char_idx = - 1
571
+
572
+ for i , char in enumerate (chars ):
573
+ if char == " " :
574
+ if current_word : # Only add non-empty words
575
+ words .append (current_word )
576
+ # Use time of the first character of the word, offset by cumulative time
577
+ word_start_times .append (
578
+ self ._cumulative_time + char_start_times [first_char_idx ]
579
+ )
580
+ current_word = ""
581
+ first_char_idx = - 1
582
+ else :
583
+ if not current_word : # This is the first character of a new word
584
+ first_char_idx = i
585
+ current_word += char
586
+
587
+ # Don't forget the last word if there's no trailing space
588
+ if current_word and first_char_idx >= 0 :
589
+ words .append (current_word )
590
+ word_start_times .append (self ._cumulative_time + char_start_times [first_char_idx ])
591
+
592
+ # Create word-time pairs
593
+ word_times = list (zip (words , word_start_times ))
594
+
595
+ return word_times
510
596
511
597
async def run_tts (self , text : str ) -> AsyncGenerator [Frame , None ]:
512
- """Generate speech from text using ElevenLabs streaming API.
598
+ """Generate speech from text using ElevenLabs streaming API with timestamps .
513
599
514
600
Args:
515
- text: The text to convert to speech
601
+ text: Text to convert to speech
516
602
517
603
Yields:
518
- Frames containing audio data and status information
604
+ Audio and control frames
519
605
"""
520
606
logger .debug (f"{ self } : Generating TTS [{ text } ]" )
521
607
522
- url = f"{ self ._base_url } /v1/text-to-speech/{ self ._voice_id } /stream"
608
+ # Use the with-timestamps endpoint
609
+ url = f"{ self ._base_url } /v1/text-to-speech/{ self ._voice_id } /stream/with-timestamps"
523
610
524
611
payload : Dict [str , Union [str , Dict [str , Union [float , bool ]]]] = {
525
612
"text" : text ,
@@ -550,8 +637,6 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
550
637
if self ._settings ["optimize_streaming_latency" ] is not None :
551
638
params ["optimize_streaming_latency" ] = self ._settings ["optimize_streaming_latency" ]
552
639
553
- logger .debug (f"{ self } ElevenLabs request - payload: { payload } , params: { params } " )
554
-
555
640
try :
556
641
await self .start_ttfb_metrics ()
557
642
@@ -566,17 +651,59 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
566
651
567
652
await self .start_tts_usage_metrics (text )
568
653
569
- # Process the streaming response
570
- CHUNK_SIZE = 1024
654
+ # Start TTS sequence if not already started
655
+ if not self ._started :
656
+ self .start_word_timestamps ()
657
+ yield TTSStartedFrame ()
658
+ self ._started = True
659
+
660
+ # Track the duration of this utterance based on the last character's end time
661
+ utterance_duration = 0
662
+ async for line in response .content :
663
+ line_str = line .decode ("utf-8" ).strip ()
664
+ if not line_str :
665
+ continue
666
+
667
+ try :
668
+ # Parse the JSON object
669
+ data = json .loads (line_str )
670
+
671
+ # Process audio if present
672
+ if data and "audio_base64" in data :
673
+ await self .stop_ttfb_metrics ()
674
+ audio = base64 .b64decode (data ["audio_base64" ])
675
+ yield TTSAudioRawFrame (audio , self .sample_rate , 1 )
676
+
677
+ # Process alignment if present
678
+ if data and "alignment" in data :
679
+ alignment = data ["alignment" ]
680
+ if alignment : # Ensure alignment is not None
681
+ # Get end time of the last character in this chunk
682
+ char_end_times = alignment .get ("character_end_times_seconds" , [])
683
+ if char_end_times :
684
+ chunk_end_time = char_end_times [- 1 ]
685
+ # Update to the longest end time seen so far
686
+ utterance_duration = max (utterance_duration , chunk_end_time )
687
+
688
+ # Calculate word timestamps
689
+ word_times = self .calculate_word_times (alignment )
690
+ if word_times :
691
+ await self .add_word_timestamps (word_times )
692
+ except json .JSONDecodeError as e :
693
+ logger .warning (f"Failed to parse JSON from stream: { e } " )
694
+ continue
695
+ except Exception as e :
696
+ logger .error (f"Error processing response: { e } " , exc_info = True )
697
+ continue
698
+
699
+ # After processing all chunks, add the total utterance duration
700
+ # to the cumulative time to ensure next utterance starts after this one
701
+ if utterance_duration > 0 :
702
+ self ._cumulative_time += utterance_duration
571
703
572
- yield TTSStartedFrame ()
573
- async for chunk in response .content .iter_chunked (CHUNK_SIZE ):
574
- if len (chunk ) > 0 :
575
- await self .stop_ttfb_metrics ()
576
- yield TTSAudioRawFrame (chunk , self .sample_rate , 1 )
577
704
except Exception as e :
578
705
logger .error (f"Error in run_tts: { e } " )
579
706
yield ErrorFrame (error = str (e ))
580
707
finally :
581
708
await self .stop_ttfb_metrics ()
582
- yield TTSStoppedFrame ()
709
+ # Let the parent class handle TTSStoppedFrame
0 commit comments