Driver: Fix transition to Live on connect (#222)

This PR fixes a case where a call which changes channel or gracefully
reconnects would have been stuck in the Idle state. SetConn events will
now allow a transition straight back to Live if any tracks are found
attached to a mixer.
This commit is contained in:
Kyle Simpson
2024-01-24 09:30:50 +00:00
committed by GitHub
parent 1b98c30746
commit 087e5f2292
4 changed files with 19 additions and 9 deletions

View File

@@ -86,12 +86,18 @@ impl Idle {
self.tasks.insert(id, task); self.tasks.insert(id, task);
}, },
Ok(SchedulerMessage::Do(id, mix_msg)) => { Ok(SchedulerMessage::Do(id, mix_msg)) => {
let now_live = mix_msg.is_mixer_now_live(); let maybe_live = mix_msg.is_mixer_maybe_live();
if let Some(task) = self.tasks.get_mut(&id) { if let Some(task) = self.tasks.get_mut(&id) {
match task.handle_message(mix_msg) { match task.handle_message(mix_msg) {
Ok(false) if now_live => { Ok(false) if maybe_live => {
if task.mixer.tracks.is_empty() {
// No tracks, likely due to SetConn.
// Recreate message forwarding task.
task.spawn_forwarder(self.tx.clone(), id);
} else {
let task = self.tasks.remove(&id).unwrap(); let task = self.tasks.remove(&id).unwrap();
self.schedule_mixer(task, id, None); self.schedule_mixer(task, id, None);
}
}, },
Ok(false) => {}, Ok(false) => {},
Ok(true) | Err(()) => self.to_cull.push(id), Ok(true) | Err(()) => self.to_cull.push(id),

View File

@@ -116,7 +116,7 @@ impl ParkedMixer {
_ = kill_rx.recv_async() => break, _ = kill_rx.recv_async() => break,
msg = remote_rx.recv_async() => { msg = remote_rx.recv_async() => {
let exit = if let Ok(msg) = msg { let exit = if let Ok(msg) = msg {
let remove_self = msg.is_mixer_now_live(); let remove_self = msg.is_mixer_maybe_live();
tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err() || remove_self tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err() || remove_self
} else { } else {
true true
@@ -135,7 +135,8 @@ impl ParkedMixer {
pub fn handle_message(&mut self, msg: MixerMessage) -> Result<bool, ()> { pub fn handle_message(&mut self, msg: MixerMessage) -> Result<bool, ()> {
match msg { match msg {
MixerMessage::SetConn(conn, ssrc) => { MixerMessage::SetConn(conn, ssrc) => {
// Overridden because // Overridden because payload-specific fields are carried
// externally on `ParkedMixer`.
self.ssrc = ssrc; self.ssrc = ssrc;
self.rtp_sequence = random::<u16>(); self.rtp_sequence = random::<u16>();
self.rtp_timestamp = random::<u32>(); self.rtp_timestamp = random::<u32>();

View File

@@ -40,8 +40,11 @@ pub enum MixerMessage {
} }
impl MixerMessage { impl MixerMessage {
pub fn is_mixer_now_live(&self) -> bool { pub fn is_mixer_maybe_live(&self) -> bool {
matches!(self, Self::AddTrack(_) | Self::SetTrack(Some(_))) matches!(
self,
Self::AddTrack(_) | Self::SetTrack(Some(_)) | Self::SetConn(..)
)
} }
} }

View File

@@ -141,7 +141,7 @@ impl SsrcState {
let mut out = vec![0; self.decode_size.len()]; let mut out = vec![0; self.decode_size.len()];
for _ in 0..missed_packets { for _ in 0..missed_packets {
let missing_frame: Option<OpusPacket> = None; let missing_frame: Option<OpusPacket<'_>> = None;
let dest_samples = (&mut out[..]) let dest_samples = (&mut out[..])
.try_into() .try_into()
.expect("Decode logic will cap decode buffer size at i32::MAX."); .expect("Decode logic will cap decode buffer size at i32::MAX.");