diff --git a/src/driver/tasks/udp_rx/playout_buffer.rs b/src/driver/tasks/udp_rx/playout_buffer.rs index 7717f0d..6ea37f8 100644 --- a/src/driver/tasks/udp_rx/playout_buffer.rs +++ b/src/driver/tasks/udp_rx/playout_buffer.rs @@ -45,6 +45,7 @@ pub struct PlayoutBuffer { playout_mode: PlayoutMode, next_seq: RtpSequence, current_timestamp: Option, + consecutive_store_fails: usize, } impl PlayoutBuffer { @@ -54,6 +55,7 @@ impl PlayoutBuffer { playout_mode: PlayoutMode::Fill, next_seq, current_timestamp: None, + consecutive_store_fails: 0, } } @@ -70,20 +72,45 @@ impl PlayoutBuffer { } // compute index by taking wrapping difference between both seq numbers. - // If the difference is *too big*, or in the past [also 'too big, in a way], + // If the difference is *too big*, or in the past [also too big, in a way], // ignore the packet - let desired_index = (rtp.get_sequence().0 - self.next_seq).0 as i16; + let pkt_seq = rtp.get_sequence().0; + let desired_index = (pkt_seq - self.next_seq).0 as i16; + + // Similar concept to fetch_packet -- if there's a critical desync, and we're unwilling + // to slot this packet into an empty/stuck buffer then behave as though this packet is the next + // sequence number we're releasing. + let err_threshold = i16::try_from(config.playout_buffer_length.get() * 5).unwrap_or(32); + let handling_desync = (self.buffer.is_empty() + || self.consecutive_store_fails >= (err_threshold as usize)) + && desired_index >= err_threshold; if desired_index < 0 { trace!("Missed packet arrived late, discarding from playout."); - } else if desired_index >= 64 { - trace!("Packet arrived beyond playout max length: wanted slot {desired_index}."); + } else if !handling_desync && desired_index >= 64 { + trace!( + "Packet arrived beyond playout max length({}): wanted slot {desired_index}.\ + ts {}, seq {}, next_out_seq {}", + rtp.get_ssrc(), + rtp.get_timestamp().0, + rtp.get_sequence().0, + self.next_seq, + ); + self.consecutive_store_fails += 1; } else { - let index = desired_index as usize; + let index = if handling_desync { + self.buffer.clear(); + self.next_seq = pkt_seq; + + 0 + } else { + desired_index as usize + }; while self.buffer.len() <= index { self.buffer.push_back(None); } self.buffer[index] = Some(packet); + self.consecutive_store_fails = 0; } if self.buffer.len() >= config.playout_buffer_length.get() { @@ -91,7 +118,7 @@ impl PlayoutBuffer { } } - pub fn fetch_packet(&mut self) -> PacketLookup { + pub fn fetch_packet(&mut self, config: &Config) -> PacketLookup { if self.playout_mode == PlayoutMode::Fill { return PacketLookup::Filling; } @@ -106,12 +133,37 @@ impl PlayoutBuffer { // However, we need to handle this in a wrap-safe way. // ts_diff shows where the current time lies if we treat packet_ts // as 0, s.t. ts_diff >= 0 (equiv) packet_time <= curr_time. - let curr_ts = self.current_timestamp.unwrap(); - let ts_diff = (curr_ts - rtp.get_timestamp().0).0 as i32; + let curr_ts = self.current_timestamp.as_mut().unwrap(); + let pkt_ts = rtp.get_timestamp().0; + let ts_diff = (*curr_ts - pkt_ts).0 as i32; + + // At least one client in the wild has seen unusual timestamp behaviour: the + // first packet sent out in a run of audio may have an older timestamp. + // This could be badly timestamped, or could conceivably be an orphaned packet + // from a prior run, or e.g.: + // (n x RTP) -> [>100ms delay] -> (RTP) -> [long O(s) delay] -> (m x RTP) + // This leaves us with two adjacent packets in the same playout with wildly varying + // timestamps. We have a slightly tricky situation -- we need to preserve accurate + // timing to correctly drain/refill/recreate very small pauses in audio, but don't + // want to block indefinitely. + // + // We have a compromise here -- if an adjacent (Drain) packet has a ts gap + // larger than it would take to go through multiple Fill/Drain cycles, then + // treat its TS as the next expected value to avoid jamming the buffer and losing + // later audio. + let skip_after = + i32::try_from(config.playout_buffer_length.get() * 5 * MONO_FRAME_SIZE) + .unwrap_or((AUDIO_FRAME_RATE * 2 * MONO_FRAME_SIZE) as i32); if ts_diff >= 0 { + // At or before expected timestamp. self.next_seq = (rtp.get_sequence() + 1).0; + PacketLookup::Packet(pkt) + } else if ts_diff <= -skip_after { + // >5 playouts ahead. + self.next_seq = (rtp.get_sequence() + 1).0; + *curr_ts = pkt_ts; PacketLookup::Packet(pkt) } else { trace!("Witholding packet: ts_diff is {ts_diff}"); diff --git a/src/driver/tasks/udp_rx/ssrc_state.rs b/src/driver/tasks/udp_rx/ssrc_state.rs index 87b6f1b..32dafe9 100644 --- a/src/driver/tasks/udp_rx/ssrc_state.rs +++ b/src/driver/tasks/udp_rx/ssrc_state.rs @@ -68,7 +68,7 @@ impl SsrcState { // Acquire a packet from the playout buffer: // Update nexts, lasts... // different cases: null packet who we want to decode as a miss, and packet who we must ignore temporarily. - let m_pkt = self.playout_buffer.fetch_packet(); + let m_pkt = self.playout_buffer.fetch_packet(config); let pkt = match m_pkt { PacketLookup::Packet(StoredPacket { packet, decrypted }) => Some((packet, decrypted)), PacketLookup::MissedPacket => None,