Receive: Fix handling for large timestamp/sequence jumps (#266)
This PR addresses some issues which have cropped up on voice receive at scale: * In unknown circumstances, we can be left with adjacent packets queued which have very different timestamps. The playout buffer would withhold its held packets, leading to the loss of many subsequent packets if the timestamp jump is larger than 64 frames. This seems to occur for some specific clients which join before a bot, suggesting the DAVE -> legacy switchover is involved. * Some loss patterns can leave us unable to correctly track the next expected sequence number (i.e., large loss runs), leaving the playout buffer unable to accept any packets if the packet sequence differed by over 64 entries. The fixes are fallbacks which treat sufficiently large desynchronisation, and allow the playout to get back into a consistent state in both cases. Large timestamp jumps on adjacent packets now update the next expected TS (noting that we only want to withhold a few playout delays at most ). Failure to insert 0.25s of packets (or attempting to add a new sequence number into an empty buffer) can now take precedence. Closes #261.
This commit is contained in:
@@ -45,6 +45,7 @@ pub struct PlayoutBuffer {
|
||||
playout_mode: PlayoutMode,
|
||||
next_seq: RtpSequence,
|
||||
current_timestamp: Option<RtpTimestamp>,
|
||||
consecutive_store_fails: usize,
|
||||
}
|
||||
|
||||
impl PlayoutBuffer {
|
||||
@@ -54,6 +55,7 @@ impl PlayoutBuffer {
|
||||
playout_mode: PlayoutMode::Fill,
|
||||
next_seq,
|
||||
current_timestamp: None,
|
||||
consecutive_store_fails: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,20 +72,45 @@ impl PlayoutBuffer {
|
||||
}
|
||||
|
||||
// compute index by taking wrapping difference between both seq numbers.
|
||||
// If the difference is *too big*, or in the past [also 'too big, in a way],
|
||||
// If the difference is *too big*, or in the past [also too big, in a way],
|
||||
// ignore the packet
|
||||
let desired_index = (rtp.get_sequence().0 - self.next_seq).0 as i16;
|
||||
let pkt_seq = rtp.get_sequence().0;
|
||||
let desired_index = (pkt_seq - self.next_seq).0 as i16;
|
||||
|
||||
// Similar concept to fetch_packet -- if there's a critical desync, and we're unwilling
|
||||
// to slot this packet into an empty/stuck buffer then behave as though this packet is the next
|
||||
// sequence number we're releasing.
|
||||
let err_threshold = i16::try_from(config.playout_buffer_length.get() * 5).unwrap_or(32);
|
||||
let handling_desync = (self.buffer.is_empty()
|
||||
|| self.consecutive_store_fails >= (err_threshold as usize))
|
||||
&& desired_index >= err_threshold;
|
||||
|
||||
if desired_index < 0 {
|
||||
trace!("Missed packet arrived late, discarding from playout.");
|
||||
} else if desired_index >= 64 {
|
||||
trace!("Packet arrived beyond playout max length: wanted slot {desired_index}.");
|
||||
} else if !handling_desync && desired_index >= 64 {
|
||||
trace!(
|
||||
"Packet arrived beyond playout max length({}): wanted slot {desired_index}.\
|
||||
ts {}, seq {}, next_out_seq {}",
|
||||
rtp.get_ssrc(),
|
||||
rtp.get_timestamp().0,
|
||||
rtp.get_sequence().0,
|
||||
self.next_seq,
|
||||
);
|
||||
self.consecutive_store_fails += 1;
|
||||
} else {
|
||||
let index = desired_index as usize;
|
||||
let index = if handling_desync {
|
||||
self.buffer.clear();
|
||||
self.next_seq = pkt_seq;
|
||||
|
||||
0
|
||||
} else {
|
||||
desired_index as usize
|
||||
};
|
||||
while self.buffer.len() <= index {
|
||||
self.buffer.push_back(None);
|
||||
}
|
||||
self.buffer[index] = Some(packet);
|
||||
self.consecutive_store_fails = 0;
|
||||
}
|
||||
|
||||
if self.buffer.len() >= config.playout_buffer_length.get() {
|
||||
@@ -91,7 +118,7 @@ impl PlayoutBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fetch_packet(&mut self) -> PacketLookup {
|
||||
pub fn fetch_packet(&mut self, config: &Config) -> PacketLookup {
|
||||
if self.playout_mode == PlayoutMode::Fill {
|
||||
return PacketLookup::Filling;
|
||||
}
|
||||
@@ -106,12 +133,37 @@ impl PlayoutBuffer {
|
||||
// However, we need to handle this in a wrap-safe way.
|
||||
// ts_diff shows where the current time lies if we treat packet_ts
|
||||
// as 0, s.t. ts_diff >= 0 (equiv) packet_time <= curr_time.
|
||||
let curr_ts = self.current_timestamp.unwrap();
|
||||
let ts_diff = (curr_ts - rtp.get_timestamp().0).0 as i32;
|
||||
let curr_ts = self.current_timestamp.as_mut().unwrap();
|
||||
let pkt_ts = rtp.get_timestamp().0;
|
||||
let ts_diff = (*curr_ts - pkt_ts).0 as i32;
|
||||
|
||||
// At least one client in the wild has seen unusual timestamp behaviour: the
|
||||
// first packet sent out in a run of audio may have an older timestamp.
|
||||
// This could be badly timestamped, or could conceivably be an orphaned packet
|
||||
// from a prior run, or e.g.:
|
||||
// (n x RTP) -> [>100ms delay] -> (RTP) -> [long O(s) delay] -> (m x RTP)
|
||||
// This leaves us with two adjacent packets in the same playout with wildly varying
|
||||
// timestamps. We have a slightly tricky situation -- we need to preserve accurate
|
||||
// timing to correctly drain/refill/recreate very small pauses in audio, but don't
|
||||
// want to block indefinitely.
|
||||
//
|
||||
// We have a compromise here -- if an adjacent (Drain) packet has a ts gap
|
||||
// larger than it would take to go through multiple Fill/Drain cycles, then
|
||||
// treat its TS as the next expected value to avoid jamming the buffer and losing
|
||||
// later audio.
|
||||
let skip_after =
|
||||
i32::try_from(config.playout_buffer_length.get() * 5 * MONO_FRAME_SIZE)
|
||||
.unwrap_or((AUDIO_FRAME_RATE * 2 * MONO_FRAME_SIZE) as i32);
|
||||
|
||||
if ts_diff >= 0 {
|
||||
// At or before expected timestamp.
|
||||
self.next_seq = (rtp.get_sequence() + 1).0;
|
||||
|
||||
PacketLookup::Packet(pkt)
|
||||
} else if ts_diff <= -skip_after {
|
||||
// >5 playouts ahead.
|
||||
self.next_seq = (rtp.get_sequence() + 1).0;
|
||||
*curr_ts = pkt_ts;
|
||||
PacketLookup::Packet(pkt)
|
||||
} else {
|
||||
trace!("Witholding packet: ts_diff is {ts_diff}");
|
||||
|
||||
@@ -68,7 +68,7 @@ impl SsrcState {
|
||||
// Acquire a packet from the playout buffer:
|
||||
// Update nexts, lasts...
|
||||
// different cases: null packet who we want to decode as a miss, and packet who we must ignore temporarily.
|
||||
let m_pkt = self.playout_buffer.fetch_packet();
|
||||
let m_pkt = self.playout_buffer.fetch_packet(config);
|
||||
let pkt = match m_pkt {
|
||||
PacketLookup::Packet(StoredPacket { packet, decrypted }) => Some((packet, decrypted)),
|
||||
PacketLookup::MissedPacket => None,
|
||||
|
||||
Reference in New Issue
Block a user