Skip to content

Add --subscribe-audio flag to join-room #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions cmd/livekit-cli/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
Expand All @@ -26,13 +27,17 @@ import (
"time"

"github.com/pion/rtcp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/oggwriter"
"github.com/urfave/cli/v2"
"go.uber.org/atomic"

provider2 "github.com/livekit/livekit-cli/pkg/provider"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
lksdk "github.com/livekit/server-sdk-go/v2"
"github.com/livekit/server-sdk-go/v2/pkg/samplebuilder"
)

var (
Expand All @@ -55,6 +60,11 @@ var (
"can be used multiple times to publish multiple files. " +
"can publish from Unix or TCP socket using the format `codec://socket_name` or `codec://host:address` respectively. Valid codecs are h264, vp8, opus",
},
&cli.StringFlag{
Name: "subscribe-audio",
Usage: "subscribe to audio tracks and write to a local TCP socket." +
"pass the directory to place the socket file. For example --subscribe-audio /tmp/my-audio",
},
&cli.Float64Flag{
Name: "fps",
Usage: "if video files are published, indicates FPS of video",
Expand All @@ -76,12 +86,12 @@ func joinRoom(c *cli.Context) error {
return err
}

audioPath := c.String("subscribe-audio")
done := make(chan os.Signal, 1)
roomCB := &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnDataReceived: func(data []byte, params lksdk.DataReceiveParams) {
identity := params.SenderIdentity
logger.Infow("received data", "data", data, "participant", identity)
OnDataPacket: func(data lksdk.DataPacket, params lksdk.DataReceiveParams) {
logger.Infow("received data", "data", data.ToProto(), "participant", params.SenderIdentity)
},
OnConnectionQualityChanged: func(update *livekit.ConnectionQualityInfo, p lksdk.Participant) {
logger.Debugw("connection quality changed", "participant", p.Identity(), "quality", update.Quality)
Expand All @@ -93,6 +103,9 @@ func joinRoom(c *cli.Context) error {
"source", pub.Source(),
"participant", participant.Identity(),
)
if pub.Kind() == lksdk.TrackKindAudio && audioPath != "" {
go writeTrackToSocket(audioPath, participant.Identity(), pub.SID(), track)
}
},
OnTrackUnsubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, participant *lksdk.RemoteParticipant) {
logger.Infow("track unsubscribed",
Expand Down Expand Up @@ -365,3 +378,63 @@ func publishReader(room *lksdk.Room,
}
return nil
}

func writeTrackToSocket(destPath, identity, trackID string, track *webrtc.TrackRemote) {
if err := os.MkdirAll(destPath, 0755); err != nil {
log.Fatalf("Error creating directory: %v", err)
}
socketFile := fmt.Sprintf("%s/%s_%s.sock", destPath, identity, trackID)

if _, err := os.Stat(socketFile); err == nil {
os.Remove(socketFile)
}

listener, err := net.Listen("unix", socketFile)
if err != nil {
log.Fatalf("could not listen to unix socket: %v", err)
}
defer listener.Close()

connPtr := atomic.NewPointer[net.Conn](nil)
writerPtr := atomic.NewPointer[oggwriter.OggWriter](nil)
go func() {
conn, err := listener.Accept()
if err != nil {
log.Fatalf("could not accept connection: %v", err)
}
connPtr.Store(&conn)
writer, err := oggwriter.NewWith(conn, 48000, track.Codec().Channels)
if err != nil {
log.Fatalf("could not create Ogg writer: %v", err)
}
writerPtr.Store(writer)
}()
defer func() {
if conn := connPtr.Load(); conn != nil {
(*conn).Close()
}
}()

// start reading from the track, but only write if TCP socket is being read
sb := samplebuilder.New(200, &codecs.OpusPacket{}, 48000)
var writer *oggwriter.OggWriter
for {
packet, _, err := track.ReadRTP()
if err != nil {
break
}
if writer == nil {
writer = writerPtr.Load()
if writer != nil {
fmt.Printf("socket %v connected, writing track %v", socketFile, trackID)
}
}

sb.Push(packet)
for _, p := range sb.PopPackets() {
if writer != nil {
writer.WriteRTP(p)
}
}
}
}
29 changes: 14 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ toolchain go1.22.2
require (
github.com/frostbyte73/core v0.0.10
github.com/go-logr/logr v1.4.2
github.com/livekit/protocol v1.15.0
github.com/livekit/server-sdk-go/v2 v2.1.3-0.20240507072004-e3121c9908be
github.com/livekit/protocol v1.17.1-0.20240606023900-429fec77a69b
github.com/livekit/server-sdk-go/v2 v2.1.3
github.com/manifoldco/promptui v0.9.0
github.com/olekukonko/tablewriter v0.0.5
github.com/pion/rtcp v1.2.14
github.com/pion/rtp v1.8.5
github.com/pion/rtp v1.8.6
github.com/pion/webrtc/v3 v3.2.40
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -45,29 +45,29 @@ require (
github.com/gorilla/websocket v1.5.1 // indirect
github.com/jxskiss/base62 v1.1.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 // indirect
github.com/livekit/mediatransportutil v0.0.0-20240501132628-6105557bbb9a // indirect
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 // indirect
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/nats-io/nats.go v1.33.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/ice/v2 v2.3.24 // indirect
github.com/pion/interceptor v0.1.27 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.8.16 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/turn/v2 v2.1.4 // indirect
github.com/pion/transport/v2 v2.2.5 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand All @@ -81,11 +81,10 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap/exp v0.2.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240228224816-df926f6c8641 // indirect
google.golang.org/grpc v1.63.2 // indirect
)
Loading