Color links and ports according to their formats

Format params for links and ports are now being watched for in the pipewire connection code.

The parsed media type is then set on the port widget / link object
and they are colored accordingly.

For ports, which were already colored before, this new method of determining the media type
should be more reliable and accurate as this uses the real Format/EnumFormat
params instead of parsing optional properties.
This commit is contained in:
Tom A. Wagner
2023-07-30 12:09:08 +02:00
parent ba73d8cdcc
commit 7f754b207c
8 changed files with 362 additions and 148 deletions

View File

@@ -21,11 +21,15 @@ use std::{cell::RefCell, collections::HashMap, rc::Rc};
use gtk::glib::{self, clone};
use log::{debug, info, warn};
use pipewire::{
link::{Link, LinkChangeMask, LinkListener, LinkState},
link::{Link, LinkChangeMask, LinkInfo, LinkListener, LinkState},
port::{Port, PortChangeMask, PortInfo, PortListener},
prelude::*,
properties,
registry::{GlobalObject, Registry},
spa::{Direction, ForeignDict},
spa::{
param::{ParamInfoFlags, ParamType},
ForeignDict,
},
types::ObjectType,
Context, Core, MainLoop,
};
@@ -34,6 +38,10 @@ use crate::{GtkMessage, MediaType, NodeType, PipewireMessage};
use state::{Item, State};
enum ProxyItem {
Port {
proxy: Port,
_listener: PortListener,
},
Link {
_proxy: Link,
_listener: LinkListener,
@@ -67,7 +75,7 @@ pub(super) fn thread_main(
.global(clone!(@strong gtk_sender, @weak registry, @strong proxies, @strong state =>
move |global| match global.type_ {
ObjectType::Node => handle_node(global, &gtk_sender, &state),
ObjectType::Port => handle_port(global, &gtk_sender, &state),
ObjectType::Port => handle_port(global, &gtk_sender, &registry, &proxies, &state),
ObjectType::Link => handle_link(global, &gtk_sender, &registry, &proxies, &state),
_ => {
// Other objects are not interesting to us
@@ -115,19 +123,6 @@ fn handle_node(
.unwrap_or_default(),
);
// FIXME: Instead of checking these props, the "EnumFormat" parameter should be checked instead.
let media_type = props.get("media.class").and_then(|class| {
if class.contains("Audio") {
Some(MediaType::Audio)
} else if class.contains("Video") {
Some(MediaType::Video)
} else if class.contains("Midi") {
Some(MediaType::Midi)
} else {
None
}
});
let media_class = |class: &str| {
if class.contains("Sink") || class.contains("Input") {
Some(NodeType::Input)
@@ -149,13 +144,7 @@ fn handle_node(
})
.or_else(|| props.get("media.class").and_then(media_class));
state.borrow_mut().insert(
node.id,
Item::Node {
// widget: node_widget,
media_type,
},
);
state.borrow_mut().insert(node.id, Item::Node);
sender
.send(PipewireMessage::NodeAdded {
@@ -170,44 +159,106 @@ fn handle_node(
fn handle_port(
port: &GlobalObject<ForeignDict>,
sender: &glib::Sender<PipewireMessage>,
registry: &Rc<Registry>,
proxies: &Rc<RefCell<HashMap<u32, ProxyItem>>>,
state: &Rc<RefCell<State>>,
) {
let props = port
.props
.as_ref()
.expect("Port object is missing properties");
let name = props.get("port.name").unwrap_or_default().to_string();
let node_id: u32 = props
.get("node.id")
.expect("Port has no node.id property!")
.parse()
.expect("Could not parse node.id property");
let direction = if matches!(props.get("port.direction"), Some("in")) {
Direction::Input
} else {
Direction::Output
let port_id = port.id;
let proxy: Port = registry.bind(port).expect("Failed to bind to port proxy");
let listener = proxy
.add_listener_local()
.info(
clone!(@strong proxies, @strong state, @strong sender => move |info| {
handle_port_info(info, &proxies, &state, &sender);
}),
)
.param(clone!(@strong sender => move |_, param_id, _, _, param| {
if param_id == ParamType::EnumFormat {
handle_port_enum_format(port_id, param, &sender)
}
}))
.register();
proxies.borrow_mut().insert(
port.id,
ProxyItem::Port {
proxy,
_listener: listener,
},
);
}
fn handle_port_info(
info: &PortInfo,
proxies: &Rc<RefCell<HashMap<u32, ProxyItem>>>,
state: &Rc<RefCell<State>>,
sender: &glib::Sender<PipewireMessage>,
) {
debug!("Received port info: {:?}", info);
let id = info.id();
let proxies = proxies.borrow();
let Some(ProxyItem::Port { proxy, .. }) = proxies.get(&id) else {
log::error!("Received info on unknown port with id {id}");
return;
};
// Find out the nodes media type so that the port can be colored.
let media_type = if let Some(Item::Node { media_type, .. }) = state.borrow().get(node_id) {
media_type.to_owned()
} else {
warn!("Node not found for Port {}", port.id);
None
};
let mut state = state.borrow_mut();
// Save node_id so we can delete this port easily.
state.borrow_mut().insert(port.id, Item::Port { node_id });
if let Some(Item::Port { .. }) = state.get(id) {
// Info was an update, figure out if we should notify the GTK thread
if info.change_mask().contains(PortChangeMask::PARAMS) {
// TODO: React to param changes
}
} else {
// First time we get info. We can now notify the gtk thread of a new link.
let props = info.props().expect("Port object is missing properties");
let name = props.get("port.name").unwrap_or_default().to_string();
let node_id: u32 = props
.get("node.id")
.expect("Port has no node.id property!")
.parse()
.expect("Could not parse node.id property");
state.insert(id, Item::Port { node_id });
let params = info.params();
let enum_format_info = params
.iter()
.find(|param| param.id() == ParamType::EnumFormat);
if let Some(enum_format_info) = enum_format_info {
if enum_format_info.flags().contains(ParamInfoFlags::READ) {
proxy.enum_params(0, Some(ParamType::EnumFormat), 0, u32::MAX);
}
}
sender
.send(PipewireMessage::PortAdded {
id,
node_id,
name,
direction: info.direction(),
})
.expect("Failed to send message");
}
}
fn handle_port_enum_format(
port_id: u32,
param: Option<&pipewire::spa::pod::Pod>,
sender: &glib::Sender<PipewireMessage>,
) {
let media_type = param
.and_then(|param| pipewire::spa::param::format_utils::parse_format(param).ok())
.map(|(media_type, _media_subtype)| media_type)
.unwrap_or(MediaType::Unknown);
sender
.send(PipewireMessage::PortAdded {
id: port.id,
node_id,
name,
direction,
.send(PipewireMessage::PortFormatChanged {
id: port_id,
media_type,
})
.expect("Failed to send message");
.expect("Failed to send message")
}
/// Handle a new link being added
@@ -227,38 +278,7 @@ fn handle_link(
let listener = proxy
.add_listener_local()
.info(clone!(@strong state, @strong sender => move |info| {
debug!("Received link info: {:?}", info);
let id = info.id();
let mut state = state.borrow_mut();
if let Some(Item::Link { .. }) = state.get(id) {
// Info was an update - figure out if we should notify the gtk thread
if info.change_mask().contains(LinkChangeMask::STATE) {
sender.send(PipewireMessage::LinkStateChanged {
id,
active: matches!(info.state(), LinkState::Active)
}).expect("Failed to send message");
}
// TODO -- check other values that might have changed
} else {
// First time we get info. We can now notify the gtk thread of a new link.
let port_from = info.output_port_id();
let port_to = info.input_port_id();
state.insert(id, Item::Link {
port_from, port_to
});
sender.send(PipewireMessage::LinkAdded {
id,
port_from,
port_to,
active: matches!(info.state(), LinkState::Active)
}).expect(
"Failed to send message"
);
}
handle_link_info(info, &state, &sender);
}))
.register();
@@ -271,6 +291,53 @@ fn handle_link(
);
}
fn handle_link_info(
info: &LinkInfo,
state: &Rc<RefCell<State>>,
sender: &glib::Sender<PipewireMessage>,
) {
debug!("Received link info: {:?}", info);
let id = info.id();
let mut state = state.borrow_mut();
if let Some(Item::Link { .. }) = state.get(id) {
// Info was an update - figure out if we should notify the gtk thread
if info.change_mask().contains(LinkChangeMask::STATE) {
sender
.send(PipewireMessage::LinkStateChanged {
id,
active: matches!(info.state(), LinkState::Active),
})
.expect("Failed to send message");
}
if info.change_mask().contains(LinkChangeMask::FORMAT) {
sender
.send(PipewireMessage::LinkFormatChanged {
id,
media_type: get_link_media_type(info),
})
.expect("Failed to send message");
}
} else {
// First time we get info. We can now notify the gtk thread of a new link.
let port_from = info.output_port_id();
let port_to = info.input_port_id();
state.insert(id, Item::Link { port_from, port_to });
sender
.send(PipewireMessage::LinkAdded {
id,
port_from,
port_to,
active: matches!(info.state(), LinkState::Active),
media_type: get_link_media_type(info),
})
.expect("Failed to send message");
}
}
/// Toggle a link between the two specified ports.
fn toggle_link(
port_from: u32,
@@ -312,3 +379,13 @@ fn toggle_link(
}
}
}
fn get_link_media_type(link_info: &LinkInfo) -> MediaType {
let media_type = link_info
.format()
.and_then(|format| pipewire::spa::param::format_utils::parse_format(format).ok())
.map(|(media_type, _media_subtype)| media_type)
.unwrap_or(MediaType::Unknown);
media_type
}