mod state; use std::{cell::RefCell, collections::HashMap, rc::Rc}; use gtk::glib::{self, clone}; use log::{debug, info, warn}; use pipewire::{ link::{Link, LinkListener}, prelude::*, properties, registry::{GlobalObject, Registry}, spa::{Direction, ForeignDict}, types::ObjectType, Context, Core, MainLoop, }; use crate::{GtkMessage, MediaType, PipewireMessage}; use state::{Item, State}; enum ProxyItem { Link { _proxy: Link, _listener: LinkListener, }, } /// The "main" function of the pipewire thread. pub(super) fn thread_main( gtk_sender: glib::Sender, pw_receiver: pipewire::channel::Receiver, ) { let mainloop = MainLoop::new().expect("Failed to create mainloop"); let context = Context::new(&mainloop).expect("Failed to create context"); let core = Rc::new(context.connect(None).expect("Failed to connect to remote")); let registry = Rc::new(core.get_registry().expect("Failed to get registry")); // Keep proxies and their listeners alive so that we can receive info events. let proxies = Rc::new(RefCell::new(HashMap::new())); let state = Rc::new(RefCell::new(State::new())); let _receiver = pw_receiver.attach(&mainloop, { clone!(@strong mainloop, @weak core, @weak registry, @strong state => move |msg| match msg { GtkMessage::ToggleLink { port_from, port_to } => toggle_link(port_from, port_to, &core, ®istry, &state), GtkMessage::Terminate => mainloop.quit(), }) }); let _listener = registry .add_listener_local() .global(clone!(@strong gtk_sender, @weak registry, @strong proxies, @strong state => move |global| match global.type_ { ObjectType::Node => handle_node(global, >k_sender, &state), ObjectType::Port => handle_port(global, >k_sender, &state), ObjectType::Link => handle_link(global, >k_sender, ®istry, &proxies, &state), _ => { // Other objects are not interesting to us } } )) .global_remove(clone!(@strong proxies, @strong state => move |id| { if let Some(item) = state.borrow_mut().remove(id) { gtk_sender.send(match item { Item::Node { .. } => PipewireMessage::NodeRemoved {id}, Item::Port { node_id } => PipewireMessage::PortRemoved {id, node_id}, Item::Link { .. } => PipewireMessage::LinkRemoved {id}, }).expect("Failed to send message"); } else { warn!( "Attempted to remove item with id {} that is not saved in state", id ); } proxies.borrow_mut().remove(&id); })) .register(); mainloop.run(); } /// Handle a new node being added fn handle_node( node: &GlobalObject, sender: &glib::Sender, state: &Rc>, ) { let props = node .props .as_ref() .expect("Node object is missing properties"); // Get the nicest possible name for the node, using a fallback chain of possible name attributes. let name = String::from( props .get("node.nick") .or_else(|| props.get("node.description")) .or_else(|| props.get("node.name")) .unwrap_or_default(), ); // FIXME: Instead of checking these props, the "EnumFormat" parameter should be checked instead. let media_type = props .get("media.class") .map(|class| { if class.contains("Audio") { Some(MediaType::Audio) } else if class.contains("Video") { Some(MediaType::Video) } else if class.contains("Midi") { Some(MediaType::Midi) } else { None } }) .flatten(); state.borrow_mut().insert( node.id, Item::Node { // widget: node_widget, media_type, }, ); sender .send(PipewireMessage::NodeAdded { id: node.id, name }) .expect("Failed to send message"); } /// Handle a new port being added fn handle_port( port: &GlobalObject, sender: &glib::Sender, state: &Rc>, ) { let props = port .props .as_ref() .expect("Port object is missing properties"); let name = props.get("port.name").unwrap_or_default().to_string(); let node_id: u32 = props .get("node.id") .expect("Port has no node.id property!") .parse() .expect("Could not parse node.id property"); let direction = if matches!(props.get("port.direction"), Some("in")) { Direction::Input } else { Direction::Output }; // Find out the nodes media type so that the port can be colored. let media_type = if let Some(Item::Node { media_type, .. }) = state.borrow().get(node_id) { media_type.to_owned() } else { warn!("Node not found for Port {}", port.id); None }; // Save node_id so we can delete this port easily. state.borrow_mut().insert(port.id, Item::Port { node_id }); sender .send(PipewireMessage::PortAdded { id: port.id, node_id, name, direction, media_type, }) .expect("Failed to send message"); } /// Handle a new link being added fn handle_link( link: &GlobalObject, sender: &glib::Sender, registry: &Rc, proxies: &Rc>>, state: &Rc>, ) { debug!( "New link (id:{}) appeared, setting up info listener.", link.id ); let proxy: Link = registry.bind(link).expect("Failed to bind to link proxy"); let listener = proxy .add_listener_local() .info(clone!(@strong state, @strong sender => move |info| { debug!("Received link info: {:?}", info); let id = info.id(); let mut state = state.borrow_mut(); if let Some(Item::Link { .. }) = state.get(id) { // Info was an update - figure out if we should notify the gtk thread // TODO } else { // First time we get info. We can now notify the gtk thread of a new link. let node_from = info.output_node_id(); let port_from = info.output_port_id(); let node_to = info.input_node_id(); let port_to = info.input_port_id(); state.insert(id, Item::Link { port_from, port_to }); sender.send(PipewireMessage::LinkAdded { id, node_from, port_from, node_to, port_to }).expect( "Failed to send message" ); } })) .register(); proxies.borrow_mut().insert( link.id, ProxyItem::Link { _proxy: proxy, _listener: listener, }, ); } /// Toggle a link between the two specified ports. fn toggle_link( port_from: u32, port_to: u32, core: &Rc, registry: &Rc, state: &Rc>, ) { let state = state.borrow_mut(); if let Some(id) = state.get_link_id(port_from, port_to) { info!("Requesting removal of link with id {}", id); // FIXME: Handle error registry.destroy_global(id); } else { info!( "Requesting creation of link from port id:{} to port id:{}", port_from, port_to ); let node_from = state .get_node_of_port(port_from) .expect("Requested port not in state"); let node_to = state .get_node_of_port(port_to) .expect("Requested port not in state"); if let Err(e) = core.create_object::( "link-factory", &properties! { "link.output.node" => node_from.to_string(), "link.output.port" => port_from.to_string(), "link.input.node" => node_to.to_string(), "link.input.port" => port_to.to_string(), "object.linger" => "1" }, ) { warn!("Failed to create link: {}", e); } } }