Skip to content

Implement PropertiesChanged event for dbus_mpris #1025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 11, 2022
233 changes: 157 additions & 76 deletions src/dbus_mpris.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::prelude::*;
use dbus::arg::{RefArg, Variant};
use dbus::channel::MatchingReceiver;
use dbus::message::MatchRule;
use dbus::channel::{MatchingReceiver, Sender};
use dbus::message::{MatchRule, SignalArgs};
use dbus_crossroads::{Crossroads, IfaceToken};
use dbus_tokio::connection;
use futures::task::{Context, Poll};
Expand All @@ -12,15 +12,20 @@ use librespot_core::{
mercury::MercuryError,
session::Session,
};
use librespot_playback::player::PlayerEvent;
use log::info;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use rspotify::spotify::{
client::Spotify, model::offset::for_position, oauth2::TokenInfo as RspotifyToken, senum::*,
client::Spotify,
model::{offset::for_position, track::FullTrack},
oauth2::TokenInfo as RspotifyToken,
senum::*,
util::datetime_to_timestamp,
};
use std::pin::Pin;
use std::sync::Arc;
use std::{collections::HashMap, env};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

pub struct DbusServer {
session: Session,
Expand All @@ -30,6 +35,8 @@ pub struct DbusServer {
token_request: Option<Pin<Box<dyn Future<Output = Result<LibrespotToken, MercuryError>>>>>,
dbus_future: Option<Pin<Box<dyn Future<Output = ()>>>>,
device_name: String,
event_rx: UnboundedReceiver<PlayerEvent>,
event_tx: Option<UnboundedSender<PlayerEvent>>,
}

const CLIENT_ID: &str = "2c1ea588dfbc4a989e2426f8385297c3";
Expand All @@ -41,14 +48,21 @@ const SCOPE: &str = "user-read-playback-state,user-read-private,\
user-read-recently-played";

impl DbusServer {
pub fn new(session: Session, spirc: Arc<Spirc>, device_name: String) -> DbusServer {
pub fn new(
session: Session,
spirc: Arc<Spirc>,
device_name: String,
event_rx: UnboundedReceiver<PlayerEvent>,
) -> DbusServer {
DbusServer {
session,
spirc,
api_token: RspotifyToken::default(),
token_request: None,
dbus_future: None,
device_name,
event_rx,
event_tx: None,
}
}

Expand All @@ -65,6 +79,11 @@ impl Future for DbusServer {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.event_tx.is_some() {
if let Poll::Ready(Some(msg)) = self.event_rx.poll_recv(cx) {
self.event_tx.as_ref().unwrap().send(msg).unwrap();
}
}
let mut got_new_token = false;
if self.is_token_expired() {
if let Some(ref mut fut) = self.token_request {
Expand All @@ -73,10 +92,13 @@ impl Future for DbusServer {
.access_token(&token.access_token)
.expires_in(token.expires_in)
.expires_at(datetime_to_timestamp(token.expires_in));
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.event_tx = Some(tx);
self.dbus_future = Some(Box::pin(create_dbus_server(
self.api_token.clone(),
self.spirc.clone(),
self.device_name.clone(),
rx,
)));
// TODO: for reasons I don't _entirely_ understand, the token request completing
// convinces callers that they don't need to re-check the status of this future
Expand Down Expand Up @@ -110,7 +132,12 @@ fn create_spotify_api(token: &RspotifyToken) -> Spotify {
Spotify::default().access_token(&token.access_token).build()
}

async fn create_dbus_server(api_token: RspotifyToken, spirc: Arc<Spirc>, device_name: String) {
async fn create_dbus_server(
api_token: RspotifyToken,
spirc: Arc<Spirc>,
device_name: String,
mut event_rx: UnboundedReceiver<PlayerEvent>,
) {
// TODO: allow other DBus types through CLI and config entry.
let (resource, conn) =
connection::new_session_sync().expect("Failed to initialize DBus connection");
Expand Down Expand Up @@ -357,74 +384,7 @@ async fn create_dbus_server(api_token: RspotifyToken, spirc: Arc<Spirc>, device_

if let Ok(Some(playing)) = v {
if let Some(track) = playing.item {
m.insert("mpris:trackid".to_string(), Variant(Box::new(track.uri)));

m.insert(
"mpris:length".to_string(),
Variant(Box::new(i64::from(track.duration_ms) * 1000)),
);

m.insert(
"mpris:artUrl".to_string(),
Variant(Box::new(track.album.images.first().unwrap().url.clone())),
);

m.insert("xesam:title".to_string(), Variant(Box::new(track.name)));

m.insert(
"xesam:album".to_string(),
Variant(Box::new(track.album.name)),
);

m.insert(
"xesam:artist".to_string(),
Variant(Box::new(
track
.artists
.iter()
.map(|a| a.name.to_string())
.collect::<Vec<_>>(),
)),
);

m.insert(
"xesam:albumArtist".to_string(),
Variant(Box::new(
track
.album
.artists
.iter()
.map(|a| a.name.to_string())
.collect::<Vec<_>>(),
)),
);

m.insert(
"xesam:autoRating".to_string(),
Variant(Box::new((f64::from(track.popularity) / 100.0) as f64)),
);

m.insert(
"xesam:trackNumber".to_string(),
Variant(Box::new(track.track_number)),
);

m.insert(
"xesam:discNumber".to_string(),
Variant(Box::new(track.disc_number)),
);

m.insert(
"xesam:url".to_string(),
Variant(Box::new(
track
.external_urls
.iter()
.next()
.map_or("", |(_, v)| v)
.to_string(),
)),
);
insert_metadata(&mut m, track);
}
} else {
info!("Couldn't fetch metadata from spotify: {:?}", v);
Expand Down Expand Up @@ -459,7 +419,128 @@ async fn create_dbus_server(api_token: RspotifyToken, spirc: Arc<Spirc>, device_
}),
);

// run forever
futures::future::pending::<()>().await;
unreachable!();
loop {
let event = event_rx
.recv()
.await
.expect("Changed track channel was unexpectedly closed");
let mut changed_properties: HashMap<String, Variant<Box<dyn RefArg>>> = HashMap::new();
let track_id = match event {
PlayerEvent::Changed { new_track_id, .. } => Some(new_track_id),
PlayerEvent::Started { track_id, .. } => {
changed_properties.insert(
"PlaybackStatus".to_owned(),
Variant(Box::new("Playing".to_owned())),
);
Some(track_id)
}
PlayerEvent::Stopped { .. } => {
changed_properties.insert(
"PlaybackStatus".to_owned(),
Variant(Box::new("Stopped".to_owned())),
);
None
}
PlayerEvent::Paused { .. } => {
changed_properties.insert(
"PlaybackStatus".to_owned(),
Variant(Box::new("Paused".to_owned())),
);
None
}
_ => None,
};
if let Some(track_id) = track_id {
let sp = create_spotify_api(&api_token);
let track = sp.track(&track_id.to_base62());
if let Ok(track) = track {
let mut m: HashMap<String, Variant<Box<dyn RefArg>>> = HashMap::new();
insert_metadata(&mut m, track);

changed_properties.insert("Metadata".to_owned(), Variant(Box::new(m)));
} else {
info!("Couldn't fetch metadata from spotify: {:?}", track);
}
}
if !changed_properties.is_empty() {
let msg = dbus::nonblock::stdintf::org_freedesktop_dbus::PropertiesPropertiesChanged {
interface_name: "org.mpris.MediaPlayer2.Player".to_owned(),
changed_properties,
invalidated_properties: Vec::new(),
};
conn.send(msg.to_emit_message(&dbus::Path::new("/org/mpris/MediaPlayer2").unwrap()))
.unwrap();
}
}
}

fn insert_metadata(m: &mut HashMap<String, Variant<Box<dyn RefArg>>>, track: FullTrack) {
m.insert("mpris:trackid".to_string(), Variant(Box::new(track.uri)));

m.insert(
"mpris:length".to_string(),
Variant(Box::new(i64::from(track.duration_ms) * 1000)),
);

m.insert(
"mpris:artUrl".to_string(),
Variant(Box::new(track.album.images.first().unwrap().url.clone())),
);

m.insert("xesam:title".to_string(), Variant(Box::new(track.name)));

m.insert(
"xesam:album".to_string(),
Variant(Box::new(track.album.name)),
);

m.insert(
"xesam:artist".to_string(),
Variant(Box::new(
track
.artists
.iter()
.map(|a| a.name.to_string())
.collect::<Vec<_>>(),
)),
);

m.insert(
"xesam:albumArtist".to_string(),
Variant(Box::new(
track
.album
.artists
.iter()
.map(|a| a.name.to_string())
.collect::<Vec<_>>(),
)),
);

m.insert(
"xesam:autoRating".to_string(),
Variant(Box::new((f64::from(track.popularity) / 100.0) as f64)),
);

m.insert(
"xesam:trackNumber".to_string(),
Variant(Box::new(track.track_number)),
);

m.insert(
"xesam:discNumber".to_string(),
Variant(Box::new(track.disc_number)),
);

m.insert(
"xesam:url".to_string(),
Variant(Box::new(
track
.external_urls
.iter()
.next()
.map_or("", |(_, v)| &v)
.to_string(),
)),
);
}
29 changes: 19 additions & 10 deletions src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(feature = "dbus_mpris")]
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;

pub struct LibreSpotConnection {
Expand Down Expand Up @@ -67,17 +69,14 @@ fn new_dbus_server(
session: Session,
spirc: Arc<Spirc>,
device_name: String,
event_rx: UnboundedReceiver<PlayerEvent>,
) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
Some(Box::pin(DbusServer::new(session, spirc, device_name)))
}

#[cfg(not(feature = "dbus_mpris"))]
fn new_dbus_server(
_: Session,
_: Arc<Spirc>,
_: String,
) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
None
Some(Box::pin(DbusServer::new(
session,
spirc,
device_name,
event_rx,
)))
}

pub(crate) struct MainLoopState {
Expand All @@ -93,6 +92,8 @@ pub(crate) struct MainLoopState {
pub(crate) shell: String,
pub(crate) device_type: DeviceType,
pub(crate) use_mpris: bool,
#[cfg(feature = "dbus_mpris")]
pub(crate) mpris_event_tx: Option<UnboundedSender<PlayerEvent>>,
}

impl Future for MainLoopState {
Expand Down Expand Up @@ -130,6 +131,10 @@ impl Future for MainLoopState {
if let Some(ref mut player_event_channel) = self.spotifyd_state.player_event_channel
{
if let Poll::Ready(Some(event)) = player_event_channel.poll_next_unpin(cx) {
#[cfg(feature = "dbus_mpris")]
if let Some(ref tx) = self.mpris_event_tx {
tx.send(event.clone()).unwrap();
}
if let Some(ref cmd) = self.spotifyd_state.player_event_program {
match spawn_program_on_event(&self.shell, cmd, event) {
Ok(child) => self.running_event_program = Some(child),
Expand Down Expand Up @@ -179,11 +184,15 @@ impl Future for MainLoopState {
let shared_spirc = Arc::new(spirc);
self.librespot_connection.spirc = Some(shared_spirc.clone());

#[cfg(feature = "dbus_mpris")]
if self.use_mpris {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.mpris_event_tx = Some(tx);
self.spotifyd_state.dbus_mpris_server = new_dbus_server(
session,
shared_spirc,
self.spotifyd_state.device_name.clone(),
rx,
);
}
} else if self
Expand Down
2 changes: 2 additions & 0 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ pub(crate) fn initial_state(config: config::SpotifydConfig) -> main_loop::MainLo
device_type,
autoplay,
use_mpris: config.use_mpris,
#[cfg(feature = "dbus_mpris")]
mpris_event_tx: None,
}
}

Expand Down