Change architecture to controller-centered arch

struct PipewireConnection is now decoupled from any other components, another component (the controller)
can receive updates by registering a callback.

struct PipewireState has been refactored to a struct Controller.
It still keeps state and manages the view, but now also actively requests updates from the pipewire connection via callback.
This commit is contained in:
Tom A. Wagner
2021-03-28 12:10:13 +02:00
parent d75dee5ea8
commit 9519eefa6e
5 changed files with 188 additions and 98 deletions

1
Cargo.lock generated
View File

@@ -583,6 +583,7 @@ dependencies = [
"gtk4", "gtk4",
"libspa", "libspa",
"log", "log",
"once_cell",
"pipewire", "pipewire",
] ]

View File

@@ -20,3 +20,5 @@ libspa = "0.3.0"
log = "0.4.11" log = "0.4.11"
env_logger = "0.8.2" env_logger = "0.8.2"
once_cell = "1.7.2"

View File

@@ -1,50 +1,96 @@
use crate::{view, PipewireLink};
use gtk::WidgetExt;
use libspa::{dict::ReadableDict, ForeignDict};
use log::warn;
use pipewire::{port::Direction, registry::GlobalObject, types::ObjectType};
use std::{cell::RefCell, collections::HashMap, rc::Rc}; use std::{cell::RefCell, collections::HashMap, rc::Rc};
enum MediaType { use gtk::{
glib::{self, clone},
prelude::*,
};
use libspa::{ForeignDict, ReadableDict};
use log::{info, warn};
use pipewire::{port::Direction, registry::GlobalObject, types::ObjectType};
use crate::{pipewire_connection::PipewireConnection, view};
pub enum MediaType {
Audio, Audio,
Video, Video,
Midi, Midi,
} }
/// Any pipewire item we need to keep track of.
/// These will be saved in the controllers `state` map associated with their id.
enum Item { enum Item {
Node { Node {
// Keep track of the widget to easily remove ports on it later.
widget: view::Node, widget: view::Node,
// Keep track of the nodes media type to color ports on it.
media_type: Option<MediaType>, media_type: Option<MediaType>,
}, },
Port { Port {
// Save the id of the node this is on so we can remove the port from it
// when it is deleted.
node_id: u32, node_id: u32,
}, },
// We don't need to memorize anything about links right now, but we need to
// be able to find out an id is a link.
Link, Link,
} }
/// This struct stores the state of the pipewire graph. /// Mediater between the pipewire connection and the view.
/// ///
/// It receives updates from the [`PipewireConnection`](crate::pipewire_connection::PipewireConnection) /// The Controller is the central piece of the architecture.
/// responsible for updating it and applies them to its internal state. /// It manages the view, receives updates from the pipewire connection
/// and relays changes the user made to the pipewire connection.
/// ///
/// It also keeps the view updated to always reflect this internal state. /// It also keeps and manages a state object that contains the current state of objects present on the remote.
pub struct PipewireState { pub struct Controller {
graphview: Rc<RefCell<view::GraphView>>, con: Rc<RefCell<PipewireConnection>>,
items: HashMap<u32, Item>, state: HashMap<u32, Item>,
view: view::GraphView,
} }
impl PipewireState { impl Controller {
pub fn new(graphview: Rc<RefCell<view::GraphView>>) -> Self { /// Create a new controller.
Self { ///
graphview, /// This function returns an `Rc`, because `Weak` references are needed inside closures the controller
items: HashMap::new(), /// passes to other components.
} ///
/// The returned `Rc` will be the only strong reference kept to the controller, so dropping the `Rc`
/// will also drop the controller, unless the `Rc` is cloned outside of this function.
pub(super) fn new(
view: view::GraphView,
con: Rc<RefCell<PipewireConnection>>,
) -> Rc<RefCell<Controller>> {
let result = Rc::new(RefCell::new(Controller {
con,
view,
state: HashMap::new(),
}));
result
.borrow()
.con
.borrow_mut()
.on_global_add(Some(Box::new(
clone!(@weak result as this => move |global| {
this.borrow_mut().global_add(global);
}),
)));
result
.borrow()
.con
.borrow_mut()
.on_global_remove(Some(Box::new(clone!(@weak result as this => move |id| {
this.borrow_mut().global_remove(id);
}))));
result
} }
/// This function is called from the `PipewireConnection` struct responsible for updating this struct. /// Handle a new global object being added.
pub fn global(&mut self, global: &GlobalObject<ForeignDict>) { /// Relevant objects are displayed to the user and/or stored to the state.
///
/// It is called from the `PipewireConnection` via callback.
fn global_add(&mut self, global: &GlobalObject<ForeignDict>) {
match global.type_ { match global.type_ {
ObjectType::Node => { ObjectType::Node => {
self.add_node(global); self.add_node(global);
@@ -59,7 +105,10 @@ impl PipewireState {
} }
} }
/// Handle a node object being added.
fn add_node(&mut self, node: &GlobalObject<ForeignDict>) { fn add_node(&mut self, node: &GlobalObject<ForeignDict>) {
info!("Adding node to graph: id {}", node.id);
// Update graph to contain the new node. // Update graph to contain the new node.
let node_widget = crate::view::Node::new( let node_widget = crate::view::Node::new(
&node &node
@@ -96,12 +145,9 @@ impl PipewireState {
.flatten() .flatten()
.flatten(); .flatten();
self.graphview self.view.add_node(node.id, node_widget.clone());
.borrow_mut()
.add_node(node.id, node_widget.clone());
// Save the created widget so we can delete ports easier. self.state.insert(
self.items.insert(
node.id, node.id,
Item::Node { Item::Node {
widget: node_widget, widget: node_widget,
@@ -110,7 +156,10 @@ impl PipewireState {
); );
} }
/// Handle a port object being added.
fn add_port(&mut self, port: &GlobalObject<ForeignDict>) { fn add_port(&mut self, port: &GlobalObject<ForeignDict>) {
info!("Adding port to graph: id {}", port.id);
// Update graph to contain the new port. // Update graph to contain the new port.
let props = port let props = port
.props .props
@@ -133,7 +182,7 @@ impl PipewireState {
); );
// Color the port accordingly to its media class. // Color the port accordingly to its media class.
if let Some(Item::Node { media_type, .. }) = self.items.get(&node_id) { if let Some(Item::Node { media_type, .. }) = self.state.get(&node_id) {
match media_type { match media_type {
Some(MediaType::Audio) => new_port.widget.add_css_class("audio"), Some(MediaType::Audio) => new_port.widget.add_css_class("audio"),
Some(MediaType::Video) => new_port.widget.add_css_class("video"), Some(MediaType::Video) => new_port.widget.add_css_class("video"),
@@ -144,17 +193,19 @@ impl PipewireState {
warn!("Node not found for Port {}", port.id); warn!("Node not found for Port {}", port.id);
} }
self.graphview self.view.add_port_to_node(node_id, new_port.id, new_port);
.borrow_mut()
.add_port_to_node(node_id, new_port.id, new_port);
// Save node_id so we can delete this port easily. // Save node_id so we can delete this port easily.
self.items.insert(port.id, Item::Port { node_id }); self.state.insert(port.id, Item::Port { node_id });
} }
/// Handle a link object being added.
fn add_link(&mut self, link: &GlobalObject<ForeignDict>) { fn add_link(&mut self, link: &GlobalObject<ForeignDict>) {
info!("Adding link to graph: id {}", link.id);
// FIXME: Links should be colored depending on the data they carry (video, audio, midi) like ports are. // FIXME: Links should be colored depending on the data they carry (video, audio, midi) like ports are.
self.items.insert(link.id, Item::Link);
self.state.insert(link.id, Item::Link);
// Update graph to contain the new link. // Update graph to contain the new link.
let props = link let props = link
@@ -181,9 +232,9 @@ impl PipewireState {
.expect("Link has no link.output.port property") .expect("Link has no link.output.port property")
.parse() .parse()
.expect("Could not parse link.output.port property"); .expect("Could not parse link.output.port property");
self.graphview.borrow_mut().add_link( self.view.add_link(
link.id, link.id,
PipewireLink { crate::PipewireLink {
node_from: output_node, node_from: output_node,
port_from: output_port, port_from: output_port,
node_to: input_node, node_to: input_node,
@@ -192,18 +243,19 @@ impl PipewireState {
); );
} }
/// This function is called from the `PipewireConnection` struct responsible for updating this struct. /// Handle a globalobject being removed.
pub fn global_remove(&mut self, id: u32) { /// Relevant objects are removed from the view and/or the state.
if let Some(item) = self.items.get(&id) { ///
/// This is called from the `PipewireConnection` via callback.
fn global_remove(&mut self, id: u32) {
if let Some(item) = self.state.remove(&id) {
match item { match item {
Item::Node { .. } => self.remove_node(id), Item::Node { .. } => self.remove_node(id),
Item::Port { node_id } => self.remove_port(id, *node_id), Item::Port { node_id } => self.remove_port(id, node_id),
Item::Link => self.remove_link(id), Item::Link => self.remove_link(id),
} }
self.items.remove(&id);
} else { } else {
log::warn!( warn!(
"Attempted to remove item with id {} that is not saved in state", "Attempted to remove item with id {} that is not saved in state",
id id
); );
@@ -211,16 +263,22 @@ impl PipewireState {
} }
fn remove_node(&self, id: u32) { fn remove_node(&self, id: u32) {
self.graphview.borrow().remove_node(id); info!("Removing node from graph: id {}", id);
self.view.remove_node(id);
} }
fn remove_port(&self, id: u32, node_id: u32) { fn remove_port(&self, id: u32, node_id: u32) {
if let Some(Item::Node { widget, .. }) = self.items.get(&node_id) { info!("Removing port from graph: id {}, node_id: {}", id, node_id);
if let Some(Item::Node { widget, .. }) = self.state.get(&node_id) {
widget.remove_port(id); widget.remove_port(id);
} }
} }
fn remove_link(&self, id: u32) { fn remove_link(&self, id: u32) {
self.graphview.borrow().remove_link(id); info!("Removing link from graph: id {}", id);
self.view.remove_link(id);
} }
} }

View File

@@ -1,12 +1,10 @@
mod controller;
mod pipewire_connection; mod pipewire_connection;
mod pipewire_state;
mod view; mod view;
use gtk::glib::{self, clone}; use gtk::glib::{self, clone};
use gtk::prelude::*; use gtk::prelude::*;
use std::{cell::RefCell, rc::Rc};
// FIXME: This should be in its own .css file. // FIXME: This should be in its own .css file.
static STYLE: &str = " static STYLE: &str = "
.audio { .audio {
@@ -37,18 +35,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init(); env_logger::init();
gtk::init()?; gtk::init()?;
let graphview = Rc::new(RefCell::new(view::GraphView::new())); let graphview = view::GraphView::new();
// Create the connection to the pipewire server and do an initial roundtrip before showing the view, let pw_con = pipewire_connection::PipewireConnection::new()?;
let _controller = controller::Controller::new(graphview.clone(), pw_con.clone());
// Do an initial roundtrip before showing the view,
// so that the graph is already populated when the window opens. // so that the graph is already populated when the window opens.
let pw_con = pipewire_connection::PipewireConnection::new(pipewire_state::PipewireState::new( pw_con.borrow().roundtrip();
graphview.clone(),
))
.expect("Failed to initialize pipewire connection");
pw_con.roundtrip();
// From now on, call roundtrip() every second. // From now on, call roundtrip() every second.
gtk::glib::timeout_add_seconds_local(1, move || { glib::timeout_add_seconds_local(1, move || {
pw_con.roundtrip(); pw_con.borrow().roundtrip();
Continue(true) Continue(true)
}); });
@@ -67,9 +64,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}); });
app.connect_activate(move |app| { app.connect_activate(move |app| {
let scrollwindow = gtk::ScrolledWindowBuilder::new() let scrollwindow = gtk::ScrolledWindowBuilder::new().child(&graphview).build();
.child(&*graphview.borrow())
.build();
let window = gtk::ApplicationWindowBuilder::new() let window = gtk::ApplicationWindowBuilder::new()
.application(app) .application(app)
.default_width(1280) .default_width(1280)

View File

@@ -1,7 +1,9 @@
use crate::pipewire_state::PipewireState;
use gtk::glib::{self, clone}; use gtk::glib::{self, clone};
use libspa::ForeignDict;
use log::trace;
use once_cell::unsync::OnceCell;
use pipewire as pw; use pipewire as pw;
use pw::registry::GlobalObject;
use std::{ use std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
@@ -9,59 +11,79 @@ use std::{
}; };
/// This struct is responsible for communication with the pipewire server. /// This struct is responsible for communication with the pipewire server.
/// It handles new globals appearing as well as globals being removed. /// The owner of this struct can subscribe to notifications for globals added or removed.
/// ///
/// It's `roundtrip` function must be called regularly to receive updates. /// It's `roundtrip` function must be called regularly to receive updates.
pub struct PipewireConnection { pub struct PipewireConnection {
mainloop: pw::MainLoop, mainloop: pw::MainLoop,
_context: pw::Context<pw::MainLoop>, _context: pw::Context<pw::MainLoop>,
core: Rc<pw::Core>, core: pw::Core,
_registry: pw::registry::Registry, registry: pw::registry::Registry,
_listeners: pw::registry::Listener, listeners: OnceCell<pw::registry::Listener>,
_state: Rc<RefCell<PipewireState>>, on_global_add: Option<Box<dyn Fn(&GlobalObject<ForeignDict>)>>,
on_global_remove: Option<Box<dyn Fn(u32)>>,
} }
impl PipewireConnection { impl PipewireConnection {
pub fn new(state: PipewireState) -> Result<Self, String> { /// Create a new Pipewire Connection.
///
/// This returns an `Rc`, because weak references to the result are needed inside closures set up during creation.
pub fn new() -> Result<Rc<RefCell<Self>>, pw::Error> {
// Initialize pipewire lib and obtain needed pipewire objects. // Initialize pipewire lib and obtain needed pipewire objects.
pw::init(); pw::init();
let mainloop = pw::MainLoop::new().map_err(|_| "Failed to create pipewire mainloop!")?; let mainloop = pw::MainLoop::new()?;
let context = let context = pw::Context::new(&mainloop)?;
pw::Context::new(&mainloop).map_err(|_| "Failed to create pipewire context")?; let core = context.connect(None)?;
let core = Rc::new( let registry = core.get_registry()?;
context
.connect(None)
.map_err(|_| "Failed to connect to pipewire core")?,
);
let registry = core
.get_registry()
.map_err(|_| "Failed to get pipewire registry")?;
let state = Rc::new(RefCell::new(state)); let result = Rc::new(RefCell::new(Self {
// Notify state on globals added / removed
let _listeners = registry
.add_listener_local()
.global(clone!(@weak state => @default-panic, move |global| {
state.borrow_mut().global(global);
}))
.global_remove(clone!(@weak state => @default-panic, move |id| {
state.borrow_mut().global_remove(id);
}))
.register();
Ok(Self {
mainloop, mainloop,
_context: context, _context: context,
core, core,
_registry: registry, registry,
_listeners, listeners: OnceCell::new(),
_state: state, on_global_add: None,
}) on_global_remove: None,
}));
// Notify state on globals added / removed
let listeners = result
.borrow()
.registry
.add_listener_local()
.global(clone!(@weak result as this => move |global| {
trace!("Global is added: {}", global.id);
let con = this.borrow();
if let Some(callback) = con.on_global_add.as_ref() {
callback(global)
} else {
trace!("No on_global_add callback registered");
}
}))
.global_remove(clone!(@weak result as this => move |id| {
trace!("Global is removed: {}", id);
let con = this.borrow();
if let Some(callback) = con.on_global_remove.as_ref() {
callback(id)
} else {
trace!("No on_global_remove callback registered");
}
}))
.register();
// Makeshift `expect()`: listeners does not implement `Debug`, so we can not use `expect`.
assert!(
result.borrow_mut().listeners.set(listeners).is_ok(),
"PipewireConnection.listeners field already set"
);
Ok(result)
} }
/// Receive all events from the pipewire server, sending them to the `pipewire_state` struct for processing. /// Receive all events from the pipewire server, sending them to the `pipewire_state` struct for processing.
pub fn roundtrip(&self) { pub fn roundtrip(&self) {
trace!("Starting roundtrip");
let done = Rc::new(Cell::new(false)); let done = Rc::new(Cell::new(false));
let pending = self let pending = self
.core .core
@@ -85,5 +107,17 @@ impl PipewireConnection {
while !done.get() { while !done.get() {
self.mainloop.run(); self.mainloop.run();
} }
trace!("Roundtrip finished");
}
/// Set or unset a callback that gets called when a new global is added.
pub fn on_global_add(&mut self, callback: Option<Box<dyn Fn(&GlobalObject<ForeignDict>)>>) {
self.on_global_add = callback;
}
/// Set or unset a callback that gets called when a global is removed.
pub fn on_global_remove(&mut self, callback: Option<Box<dyn Fn(u32)>>) {
self.on_global_remove = callback;
} }
} }