Fix clippy pedantic warnings (#204)
This commit is contained in:
@@ -405,6 +405,7 @@ impl Config {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn test_cfg(raw_output: bool) -> (DriverTestHandle, Config) {
|
pub fn test_cfg(raw_output: bool) -> (DriverTestHandle, Config) {
|
||||||
let (tick_tx, tick_rx) = flume::unbounded();
|
let (tick_tx, tick_rx) = flume::unbounded();
|
||||||
|
|
||||||
@@ -418,8 +419,10 @@ impl Config {
|
|||||||
(OutputMode::Rtp(rtp_tx), OutputReceiver::Rtp(rtp_rx))
|
(OutputMode::Rtp(rtp_tx), OutputReceiver::Rtp(rtp_rx))
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut sc_config = SchedulerConfig::default();
|
let sc_config = SchedulerConfig {
|
||||||
sc_config.strategy = crate::driver::SchedulerMode::MaxPerThread(1.try_into().unwrap());
|
strategy: crate::driver::SchedulerMode::MaxPerThread(1.try_into().unwrap()),
|
||||||
|
move_expensive_tasks: true,
|
||||||
|
};
|
||||||
|
|
||||||
let config = Config::default()
|
let config = Config::default()
|
||||||
.tick_style(TickStyle::UntimedWithExecLimit(tick_rx))
|
.tick_style(TickStyle::UntimedWithExecLimit(tick_rx))
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ pub const RTP_VERSION: u8 = 2;
|
|||||||
pub const RTP_PROFILE_TYPE: RtpType = RtpType::Dynamic(120);
|
pub const RTP_PROFILE_TYPE: RtpType = RtpType::Dynamic(120);
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
#[allow(clippy::doc_markdown)]
|
||||||
pub mod test_data {
|
pub mod test_data {
|
||||||
/// URL for a source which YTDL must extract.
|
/// URL for a source which YTDL must extract.
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -153,7 +153,7 @@ impl CryptoMode {
|
|||||||
|
|
||||||
cipher
|
cipher
|
||||||
.decrypt_in_place_detached(nonce_slice, b"", data_bytes, tag)
|
.decrypt_in_place_detached(nonce_slice, b"", data_bytes, tag)
|
||||||
.map(|_| (body_start, body_tail))
|
.map(|()| (body_start, body_tail))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Encrypts a Discord RT(C)P packet using the given key.
|
/// Encrypts a Discord RT(C)P packet using the given key.
|
||||||
@@ -290,8 +290,7 @@ mod test {
|
|||||||
let mut pkt = MutableRtpPacket::new(&mut buf[..]).unwrap();
|
let mut pkt = MutableRtpPacket::new(&mut buf[..]).unwrap();
|
||||||
let mut crypto_state = CryptoState::from(mode);
|
let mut crypto_state = CryptoState::from(mode);
|
||||||
let payload = pkt.payload_mut();
|
let payload = pkt.payload_mut();
|
||||||
(&mut payload[TAG_SIZE..TAG_SIZE + TRUE_PAYLOAD.len()])
|
payload[TAG_SIZE..TAG_SIZE + TRUE_PAYLOAD.len()].copy_from_slice(&TRUE_PAYLOAD[..]);
|
||||||
.copy_from_slice(&TRUE_PAYLOAD[..]);
|
|
||||||
|
|
||||||
let final_payload_size =
|
let final_payload_size =
|
||||||
crypto_state.write_packet_nonce(&mut pkt, TAG_SIZE + TRUE_PAYLOAD.len());
|
crypto_state.write_packet_nonce(&mut pkt, TAG_SIZE + TRUE_PAYLOAD.len());
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ impl Idle {
|
|||||||
self.schedule_mixer(task, id, None);
|
self.schedule_mixer(task, id, None);
|
||||||
},
|
},
|
||||||
Ok(false) => {},
|
Ok(false) => {},
|
||||||
Ok(true) | Err(_) => self.to_cull.push(id),
|
Ok(true) | Err(()) => self.to_cull.push(id),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info!("Received post-cull message for {id:?}, discarding.");
|
info!("Received post-cull message for {id:?}, discarding.");
|
||||||
@@ -161,7 +161,7 @@ impl Idle {
|
|||||||
let task = loop_task.take().unwrap();
|
let task = loop_task.take().unwrap();
|
||||||
let (worker, idx) = self.fetch_worker(&task, avoid);
|
let (worker, idx) = self.fetch_worker(&task, avoid);
|
||||||
match worker.schedule_mixer(id, task) {
|
match worker.schedule_mixer(id, task) {
|
||||||
Ok(_) => {
|
Ok(()) => {
|
||||||
self.stats.move_mixer_to_live();
|
self.stats.move_mixer_to_live();
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
@@ -239,8 +239,10 @@ mod test {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn active_mixers_spawn_threads() {
|
async fn active_mixers_spawn_threads() {
|
||||||
let mut config = Config::default();
|
let config = Config {
|
||||||
config.move_expensive_tasks = false;
|
strategy: Mode::default(),
|
||||||
|
move_expensive_tasks: false,
|
||||||
|
};
|
||||||
|
|
||||||
let sched = Scheduler::new(config);
|
let sched = Scheduler::new(config);
|
||||||
let (pkt_tx, _pkt_rx) = flume::unbounded();
|
let (pkt_tx, _pkt_rx) = flume::unbounded();
|
||||||
@@ -270,11 +272,14 @@ mod test {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn excess_threads_are_cleaned_up() {
|
async fn excess_threads_are_cleaned_up() {
|
||||||
let mut config = Config::default();
|
|
||||||
config.strategy = Mode::MaxPerThread(1.try_into().unwrap());
|
|
||||||
let (mut core, tx) = Idle::new(config.clone());
|
|
||||||
|
|
||||||
const TEST_TIMER: Duration = Duration::from_millis(500);
|
const TEST_TIMER: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
|
let config = Config {
|
||||||
|
strategy: Mode::MaxPerThread(1.try_into().unwrap()),
|
||||||
|
move_expensive_tasks: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (mut core, tx) = Idle::new(config.clone());
|
||||||
core.cull_timer = TEST_TIMER;
|
core.cull_timer = TEST_TIMER;
|
||||||
|
|
||||||
let mut next_id = TaskId::new();
|
let mut next_id = TaskId::new();
|
||||||
|
|||||||
@@ -273,7 +273,7 @@ impl Live {
|
|||||||
for (i, mixer) in self.tasks.iter_mut().enumerate() {
|
for (i, mixer) in self.tasks.iter_mut().enumerate() {
|
||||||
let res = mixer
|
let res = mixer
|
||||||
.audio_commands_events()
|
.audio_commands_events()
|
||||||
.and_then(|_| mixer.check_and_send_keepalive(self.start_of_work));
|
.and_then(|()| mixer.check_and_send_keepalive(self.start_of_work));
|
||||||
rebuild_if_err(mixer, res, &mut self.to_cull, i);
|
rebuild_if_err(mixer, res, &mut self.to_cull, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ impl<T> IsEnabled for ResId<T> {}
|
|||||||
#[allow(missing_docs)]
|
#[allow(missing_docs)]
|
||||||
impl<T: Copy> ResId<T> {
|
impl<T: Copy> ResId<T> {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
ResId(0, PhantomData)
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn incr(&mut self) -> Self {
|
pub fn incr(&mut self) -> Self {
|
||||||
@@ -49,11 +49,17 @@ impl<T: Copy> ResId<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(any(test, feature = "internals"))]
|
#[cfg(any(test, feature = "internals"))]
|
||||||
pub fn get(&self) -> u64 {
|
pub fn get(self) -> u64 {
|
||||||
self.0
|
self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: Copy> Default for ResId<T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self(0, PhantomData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// An idle mixer instance, externally controlled by a `Driver`.
|
/// An idle mixer instance, externally controlled by a `Driver`.
|
||||||
///
|
///
|
||||||
/// Since we do not allocate packet buffers for idle threads, this
|
/// Since we do not allocate packet buffers for idle threads, this
|
||||||
@@ -152,7 +158,7 @@ impl ParkedMixer {
|
|||||||
self.mixer
|
self.mixer
|
||||||
.do_rebuilds(events_failure, conn_failure)
|
.do_rebuilds(events_failure, conn_failure)
|
||||||
.map_err(|_| ())
|
.map_err(|_| ())
|
||||||
.map(|_| should_exit)
|
.map(|()| should_exit)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -576,12 +576,12 @@ impl Mixer {
|
|||||||
let msg = match mix_len {
|
let msg = match mix_len {
|
||||||
MixType::Passthrough(len) if len == SILENT_FRAME.len() => OutputMessage::Silent,
|
MixType::Passthrough(len) if len == SILENT_FRAME.len() => OutputMessage::Silent,
|
||||||
MixType::Passthrough(len) => {
|
MixType::Passthrough(len) => {
|
||||||
let rtp = RtpPacket::new(&packet).expect(
|
let rtp = RtpPacket::new(packet).expect(
|
||||||
"FATAL: Too few bytes in self.packet for RTP header.\
|
"FATAL: Too few bytes in self.packet for RTP header.\
|
||||||
(Blame: VOICE_PACKET_MAX?)",
|
(Blame: VOICE_PACKET_MAX?)",
|
||||||
);
|
);
|
||||||
let payload = rtp.payload();
|
let payload = rtp.payload();
|
||||||
let opus_frame = (&payload[TAG_SIZE..][..len]).to_vec();
|
let opus_frame = (payload[TAG_SIZE..][..len]).to_vec();
|
||||||
|
|
||||||
OutputMessage::Passthrough(opus_frame)
|
OutputMessage::Passthrough(opus_frame)
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ impl AuxNetwork {
|
|||||||
let hb = sleep_until(next_heartbeat);
|
let hb = sleep_until(next_heartbeat);
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
_ = hb => {
|
() = hb => {
|
||||||
ws_error = match self.send_heartbeat().await {
|
ws_error = match self.send_heartbeat().await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
should_reconnect = ws_error_is_not_final(&e);
|
should_reconnect = ws_error_is_not_final(&e);
|
||||||
|
|||||||
@@ -27,14 +27,17 @@ pub enum OutputMessage {
|
|||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
impl OutputMessage {
|
impl OutputMessage {
|
||||||
|
#[must_use]
|
||||||
pub fn is_passthrough(&self) -> bool {
|
pub fn is_passthrough(&self) -> bool {
|
||||||
matches!(self, Self::Passthrough(_))
|
matches!(self, Self::Passthrough(_))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn is_mixed(&self) -> bool {
|
pub fn is_mixed(&self) -> bool {
|
||||||
matches!(self, Self::Mixed(_))
|
matches!(self, Self::Mixed(_))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn is_mixed_with_nonzero_signal(&self) -> bool {
|
pub fn is_mixed_with_nonzero_signal(&self) -> bool {
|
||||||
if let Self::Mixed(data) = self {
|
if let Self::Mixed(data) = self {
|
||||||
data.iter().any(|v| *v != 0.0f32)
|
data.iter().any(|v| *v != 0.0f32)
|
||||||
@@ -43,6 +46,7 @@ impl OutputMessage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn is_explicit_silence(&self) -> bool {
|
pub fn is_explicit_silence(&self) -> bool {
|
||||||
*self == Self::Silent
|
*self == Self::Silent
|
||||||
}
|
}
|
||||||
@@ -94,6 +98,7 @@ pub enum OutputPacket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl OutputPacket {
|
impl OutputPacket {
|
||||||
|
#[must_use]
|
||||||
pub fn raw(&self) -> Option<&OutputMessage> {
|
pub fn raw(&self) -> Option<&OutputMessage> {
|
||||||
if let Self::Raw(o) = self {
|
if let Self::Raw(o) = self {
|
||||||
Some(o)
|
Some(o)
|
||||||
@@ -116,6 +121,7 @@ pub struct DriverTestHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DriverTestHandle {
|
impl DriverTestHandle {
|
||||||
|
#[must_use]
|
||||||
pub fn recv(&self) -> OutputPacket {
|
pub fn recv(&self) -> OutputPacket {
|
||||||
match &self.rx {
|
match &self.rx {
|
||||||
OutputReceiver::Raw(rx) => rx.recv().unwrap().into(),
|
OutputReceiver::Raw(rx) => rx.recv().unwrap().into(),
|
||||||
@@ -130,6 +136,7 @@ impl DriverTestHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
match &self.rx {
|
match &self.rx {
|
||||||
OutputReceiver::Raw(rx) => rx.len(),
|
OutputReceiver::Raw(rx) => rx.len(),
|
||||||
@@ -137,6 +144,11 @@ impl DriverTestHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
|
|
||||||
pub fn wait(&self, n_ticks: u64) {
|
pub fn wait(&self, n_ticks: u64) {
|
||||||
for _i in 0..n_ticks {
|
for _i in 0..n_ticks {
|
||||||
drop(self.recv());
|
drop(self.recv());
|
||||||
@@ -149,7 +161,7 @@ impl DriverTestHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn spawn_ticker(&self) {
|
pub fn spawn_ticker(&self) {
|
||||||
let remote = self.clone();
|
let remote = self.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
@@ -179,9 +191,11 @@ impl DriverTestHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn tick(&self, n_ticks: u64) {
|
pub fn tick(&self, n_ticks: u64) {
|
||||||
if n_ticks == 0 {
|
assert!(
|
||||||
panic!("Number of ticks to advance driver/mixer must be >= 1.");
|
n_ticks != 0,
|
||||||
}
|
"Number of ticks to advance driver/mixer must be >= 1."
|
||||||
|
);
|
||||||
|
|
||||||
self.tx.send(n_ticks).unwrap();
|
self.tx.send(n_ticks).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,9 +204,6 @@ impl DriverTestHandle {
|
|||||||
handle: &TrackHandle,
|
handle: &TrackHandle,
|
||||||
tick_wait: Option<Duration>,
|
tick_wait: Option<Duration>,
|
||||||
) -> TrackState {
|
) -> TrackState {
|
||||||
let (tx, rx) = flume::bounded(1);
|
|
||||||
let (err_tx, err_rx) = flume::bounded(1);
|
|
||||||
|
|
||||||
struct SongPlayable {
|
struct SongPlayable {
|
||||||
tx: Sender<TrackState>,
|
tx: Sender<TrackState>,
|
||||||
}
|
}
|
||||||
@@ -223,6 +234,9 @@ impl DriverTestHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let (tx, rx) = flume::bounded(1);
|
||||||
|
let (err_tx, err_rx) = flume::bounded(1);
|
||||||
|
|
||||||
handle
|
handle
|
||||||
.add_event(Event::Track(TrackEvent::Playable), SongPlayable { tx })
|
.add_event(Event::Track(TrackEvent::Playable), SongPlayable { tx })
|
||||||
.expect("Adding track evt should not fail before any ticks.");
|
.expect("Adding track evt should not fail before any ticks.");
|
||||||
@@ -237,7 +251,7 @@ impl DriverTestHandle {
|
|||||||
self.wait_async(1).await;
|
self.wait_async(1).await;
|
||||||
|
|
||||||
match err_rx.try_recv() {
|
match err_rx.try_recv() {
|
||||||
Ok(e) => panic!("Error reported on track: {:?}", e),
|
Ok(e) => panic!("Error reported on track: {e:?}"),
|
||||||
Err(flume::TryRecvError::Empty | flume::TryRecvError::Disconnected) => {},
|
Err(flume::TryRecvError::Empty | flume::TryRecvError::Disconnected) => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ impl Mixer {
|
|||||||
|
|
||||||
#[cfg(feature = "receive")]
|
#[cfg(feature = "receive")]
|
||||||
let fake_conn = MixerConnection {
|
let fake_conn = MixerConnection {
|
||||||
cipher: Cipher::new_from_slice(&vec![0u8; KEY_SIZE]).unwrap(),
|
cipher: Cipher::new_from_slice(&[0u8; KEY_SIZE]).unwrap(),
|
||||||
crypto_state: CryptoState::Normal,
|
crypto_state: CryptoState::Normal,
|
||||||
udp_rx: udp_receiver_tx,
|
udp_rx: udp_receiver_tx,
|
||||||
udp_tx,
|
udp_tx,
|
||||||
@@ -74,7 +74,7 @@ impl Mixer {
|
|||||||
|
|
||||||
#[cfg(not(feature = "receive"))]
|
#[cfg(not(feature = "receive"))]
|
||||||
let fake_conn = MixerConnection {
|
let fake_conn = MixerConnection {
|
||||||
cipher: Cipher::new_from_slice(&vec![0u8; KEY_SIZE]).unwrap(),
|
cipher: Cipher::new_from_slice(&[0u8; KEY_SIZE]).unwrap(),
|
||||||
crypto_state: CryptoState::Normal,
|
crypto_state: CryptoState::Normal,
|
||||||
udp_tx,
|
udp_tx,
|
||||||
};
|
};
|
||||||
@@ -97,7 +97,7 @@ impl Mixer {
|
|||||||
let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into();
|
let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into();
|
||||||
let promoted = match input {
|
let promoted = match input {
|
||||||
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
||||||
_ => panic!("Failed to create a guaranteed source."),
|
Input::Lazy(_) => panic!("Failed to create a guaranteed source."),
|
||||||
};
|
};
|
||||||
let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context();
|
let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context();
|
||||||
_ = out.0.add_track(ctx);
|
_ = out.0.add_track(ctx);
|
||||||
@@ -114,7 +114,7 @@ impl Mixer {
|
|||||||
let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into();
|
let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into();
|
||||||
let promoted = match input {
|
let promoted = match input {
|
||||||
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
||||||
_ => panic!("Failed to create a guaranteed source."),
|
Input::Lazy(_) => panic!("Failed to create a guaranteed source."),
|
||||||
};
|
};
|
||||||
let mut track = Track::from(Input::Live(promoted.unwrap(), None));
|
let mut track = Track::from(Input::Live(promoted.unwrap(), None));
|
||||||
track.loops = LoopState::Infinite;
|
track.loops = LoopState::Infinite;
|
||||||
@@ -133,7 +133,7 @@ impl Mixer {
|
|||||||
let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into();
|
let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into();
|
||||||
let promoted = match input {
|
let promoted = match input {
|
||||||
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
||||||
_ => panic!("Failed to create a guaranteed source."),
|
Input::Lazy(_) => panic!("Failed to create a guaranteed source."),
|
||||||
};
|
};
|
||||||
let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context();
|
let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context();
|
||||||
_ = out.0.add_track(ctx);
|
_ = out.0.add_track(ctx);
|
||||||
@@ -142,7 +142,7 @@ impl Mixer {
|
|||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn test_with_opus(handle: Handle) -> DummyMixer {
|
pub fn test_with_opus(handle: &Handle) -> DummyMixer {
|
||||||
// should add a single opus-based track.
|
// should add a single opus-based track.
|
||||||
// make this fully loaded to prevent any perf cost there.
|
// make this fully loaded to prevent any perf cost there.
|
||||||
let mut out = Self::mock(handle.clone(), false);
|
let mut out = Self::mock(handle.clone(), false);
|
||||||
@@ -161,7 +161,7 @@ impl Mixer {
|
|||||||
|
|
||||||
let promoted = match src.into() {
|
let promoted = match src.into() {
|
||||||
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE),
|
||||||
_ => panic!("Failed to create a guaranteed source."),
|
Input::Lazy(_) => panic!("Failed to create a guaranteed source."),
|
||||||
};
|
};
|
||||||
let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context();
|
let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context();
|
||||||
|
|
||||||
@@ -181,6 +181,7 @@ pub struct MockScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl MockScheduler {
|
impl MockScheduler {
|
||||||
|
#[must_use]
|
||||||
pub fn new(mode: Option<Mode>) -> Self {
|
pub fn new(mode: Option<Mode>) -> Self {
|
||||||
let stats = Arc::new(StatBlock::default());
|
let stats = Arc::new(StatBlock::default());
|
||||||
let local = Arc::new(LiveStatBlock::default());
|
let local = Arc::new(LiveStatBlock::default());
|
||||||
@@ -188,8 +189,10 @@ impl MockScheduler {
|
|||||||
let (task_tx, task_rx) = flume::unbounded();
|
let (task_tx, task_rx) = flume::unbounded();
|
||||||
let (sched_tx, sched_rx) = flume::unbounded();
|
let (sched_tx, sched_rx) = flume::unbounded();
|
||||||
|
|
||||||
let mut cfg = crate::driver::SchedulerConfig::default();
|
let cfg = crate::driver::SchedulerConfig {
|
||||||
cfg.strategy = mode.unwrap_or_default();
|
strategy: mode.unwrap_or_default(),
|
||||||
|
move_expensive_tasks: true,
|
||||||
|
};
|
||||||
|
|
||||||
let core = Live::new(
|
let core = Live::new(
|
||||||
WorkerId::new(),
|
WorkerId::new(),
|
||||||
@@ -214,6 +217,7 @@ impl MockScheduler {
|
|||||||
self.core.add_task_direct(m, id);
|
self.core.add_task_direct(m, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn from_mixers(mode: Option<Mode>, mixers: Vec<DummyMixer>) -> (Self, Vec<Listeners>) {
|
pub fn from_mixers(mode: Option<Mode>, mixers: Vec<DummyMixer>) -> (Self, Vec<Listeners>) {
|
||||||
let mut out = Self::new(mode);
|
let mut out = Self::new(mode);
|
||||||
let mut listeners = vec![];
|
let mut listeners = vec![];
|
||||||
|
|||||||
@@ -57,16 +57,10 @@ impl EventStore {
|
|||||||
|
|
||||||
match evt.event {
|
match evt.event {
|
||||||
Event::Core(c) => {
|
Event::Core(c) => {
|
||||||
self.untimed
|
self.untimed.entry(c.into()).or_default().push(evt);
|
||||||
.entry(c.into())
|
|
||||||
.or_insert_with(Vec::new)
|
|
||||||
.push(evt);
|
|
||||||
},
|
},
|
||||||
Event::Track(t) => {
|
Event::Track(t) => {
|
||||||
self.untimed
|
self.untimed.entry(t.into()).or_default().push(evt);
|
||||||
.entry(t.into())
|
|
||||||
.or_insert_with(Vec::new)
|
|
||||||
.push(evt);
|
|
||||||
},
|
},
|
||||||
Event::Delayed(_) | Event::Periodic(_, _) => {
|
Event::Delayed(_) | Event::Periodic(_, _) => {
|
||||||
self.timed.push(evt);
|
self.timed.push(evt);
|
||||||
@@ -170,7 +164,7 @@ impl GlobalEvents {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn fire_track_event(&mut self, evt: TrackEvent, index: usize) {
|
pub(crate) fn fire_track_event(&mut self, evt: TrackEvent, index: usize) {
|
||||||
let holder = self.awaiting_tick.entry(evt).or_insert_with(Vec::new);
|
let holder = self.awaiting_tick.entry(evt).or_default();
|
||||||
|
|
||||||
holder.push(index);
|
holder.push(index);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -241,7 +241,7 @@ impl Call {
|
|||||||
|
|
||||||
self.update()
|
self.update()
|
||||||
.await
|
.await
|
||||||
.map(|_| Join::new(rx.into_recv_async(), gw_rx.into_recv_async(), timeout))
|
.map(|()| Join::new(rx.into_recv_async(), gw_rx.into_recv_async(), timeout))
|
||||||
} else {
|
} else {
|
||||||
// Skipping the gateway connection implies that the current connection is complete
|
// Skipping the gateway connection implies that the current connection is complete
|
||||||
// AND the channel is a match.
|
// AND the channel is a match.
|
||||||
@@ -304,7 +304,7 @@ impl Call {
|
|||||||
|
|
||||||
self.update()
|
self.update()
|
||||||
.await
|
.await
|
||||||
.map(|_| JoinGateway::new(rx.into_recv_async(), timeout))
|
.map(|()| JoinGateway::new(rx.into_recv_async(), timeout))
|
||||||
} else {
|
} else {
|
||||||
Ok(JoinGateway::new(rx.into_recv_async(), None))
|
Ok(JoinGateway::new(rx.into_recv_async(), None))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ impl AsyncAdapterSink {
|
|||||||
let msg = if blocked || hit_end {
|
let msg = if blocked || hit_end {
|
||||||
let mut fs = FuturesUnordered::new();
|
let mut fs = FuturesUnordered::new();
|
||||||
fs.push(Either::Left(self.req_rx.recv_async()));
|
fs.push(Either::Left(self.req_rx.recv_async()));
|
||||||
fs.push(Either::Right(self.notify_rx.notified().map(|_| {
|
fs.push(Either::Right(self.notify_rx.notified().map(|()| {
|
||||||
let o: Result<AdapterRequest, RecvError> = Ok(AdapterRequest::Wake);
|
let o: Result<AdapterRequest, RecvError> = Ok(AdapterRequest::Wake);
|
||||||
o
|
o
|
||||||
})));
|
})));
|
||||||
@@ -274,7 +274,7 @@ impl Seek for AsyncAdapterStream {
|
|||||||
self.finalised.store(false, Ordering::Relaxed);
|
self.finalised.store(false, Ordering::Relaxed);
|
||||||
match self.handle_messages(Operation::Seek) {
|
match self.handle_messages(Operation::Seek) {
|
||||||
Some(AdapterResponse::SeekClear) => {},
|
Some(AdapterResponse::SeekClear) => {},
|
||||||
None => self.check_dropped().map(|_| unreachable!())?,
|
None => self.check_dropped().map(|()| unreachable!())?,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -284,7 +284,7 @@ impl Seek for AsyncAdapterStream {
|
|||||||
|
|
||||||
match self.handle_messages(Operation::Seek) {
|
match self.handle_messages(Operation::Seek) {
|
||||||
Some(AdapterResponse::SeekResult(a)) => a,
|
Some(AdapterResponse::SeekResult(a)) => a,
|
||||||
None => self.check_dropped().map(|_| unreachable!()),
|
None => self.check_dropped().map(|()| unreachable!()),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -302,7 +302,7 @@ impl MediaSource for AsyncAdapterStream {
|
|||||||
|
|
||||||
match self.handle_messages(Operation::Len) {
|
match self.handle_messages(Operation::Len) {
|
||||||
Some(AdapterResponse::ByteLen(a)) => a,
|
Some(AdapterResponse::ByteLen(a)) => a,
|
||||||
None => self.check_dropped().ok().map(|_| unreachable!()),
|
None => self.check_dropped().ok().map(|()| unreachable!()),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -404,7 +404,7 @@ where
|
|||||||
// However, we can guarantee that reads will be channel aligned at least!
|
// However, we can guarantee that reads will be channel aligned at least!
|
||||||
for el in sample_buf[..samples_in_frame].chunks_mut(interleaved_count) {
|
for el in sample_buf[..samples_in_frame].chunks_mut(interleaved_count) {
|
||||||
match src.read_f32_into::<LittleEndian>(el) {
|
match src.read_f32_into::<LittleEndian>(el) {
|
||||||
Ok(_) => {
|
Ok(()) => {
|
||||||
raw_len += interleaved_count;
|
raw_len += interleaved_count;
|
||||||
},
|
},
|
||||||
Err(e) if e.kind() == IoErrorKind::UnexpectedEof => {
|
Err(e) if e.kind() == IoErrorKind::UnexpectedEof => {
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ fn cleanup_child_processes(mut children: Vec<Child>) {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let attempt = attempt.and_then(|_| {
|
let attempt = attempt.and_then(|()| {
|
||||||
children
|
children
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.rev()
|
.rev()
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ impl<A: MediaSource> Seek for RawAdapter<A> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let out = if target_pos as usize <= self.prepend.len() {
|
let out = if target_pos as usize <= self.prepend.len() {
|
||||||
self.inner.rewind().map(|_| 0)
|
self.inner.rewind().map(|()| 0)
|
||||||
} else {
|
} else {
|
||||||
self.inner.seek(SeekFrom::Start(target_pos))
|
self.inner.seek(SeekFrom::Start(target_pos))
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ where
|
|||||||
// 2) track's position is approx 30s
|
// 2) track's position is approx 30s
|
||||||
// 3) track's play time is considerably less (O(5s))
|
// 3) track's play time is considerably less (O(5s))
|
||||||
let state = handle.get_info();
|
let state = handle.get_info();
|
||||||
t_handle.spawn_ticker().await;
|
t_handle.spawn_ticker();
|
||||||
let state = state.await.expect("Should have received valid state.");
|
let state = state.await.expect("Should have received valid state.");
|
||||||
|
|
||||||
assert_eq!(state.ready, ReadyState::Playable);
|
assert_eq!(state.ready, ReadyState::Playable);
|
||||||
@@ -134,7 +134,7 @@ where
|
|||||||
// 2) track's position is approx 1s
|
// 2) track's position is approx 1s
|
||||||
// 3) track's play time is preserved (About 4s)
|
// 3) track's play time is preserved (About 4s)
|
||||||
let state = handle.get_info();
|
let state = handle.get_info();
|
||||||
t_handle.spawn_ticker().await;
|
t_handle.spawn_ticker();
|
||||||
let state = state.await.expect("Should have received valid state.");
|
let state = state.await.expect("Should have received valid state.");
|
||||||
|
|
||||||
assert_eq!(state.ready, ReadyState::Playable);
|
assert_eq!(state.ready, ReadyState::Playable);
|
||||||
|
|||||||
@@ -102,8 +102,10 @@ pub struct Format {
|
|||||||
pub filename: String,
|
pub filename: String,
|
||||||
pub nb_streams: u64,
|
pub nb_streams: u64,
|
||||||
pub nb_programs: u64,
|
pub nb_programs: u64,
|
||||||
pub format_name: String,
|
#[serde(rename = "format_name")]
|
||||||
pub format_long_name: Option<String>,
|
pub name: String,
|
||||||
|
#[serde(rename = "format_long_name")]
|
||||||
|
pub long_name: Option<String>,
|
||||||
|
|
||||||
#[serde(deserialize_with = "deserialize_option_number_from_string")]
|
#[serde(deserialize_with = "deserialize_option_number_from_string")]
|
||||||
pub start_time: Option<f64>,
|
pub start_time: Option<f64>,
|
||||||
|
|||||||
@@ -188,6 +188,8 @@ impl Songbird {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Creates an iterator for all [`Call`]s currently managed.
|
/// Creates an iterator for all [`Call`]s currently managed.
|
||||||
|
// TODO: Implement IntoIterator
|
||||||
|
#[allow(clippy::iter_without_into_iter)]
|
||||||
pub fn iter(&self) -> Iter<'_> {
|
pub fn iter(&self) -> Iter<'_> {
|
||||||
Iter {
|
Iter {
|
||||||
inner: self.calls.iter().map(|x| (*x.key(), Arc::clone(x.value()))),
|
inner: self.calls.iter().map(|x| (*x.key(), Arc::clone(x.value()))),
|
||||||
@@ -253,7 +255,7 @@ impl Songbird {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match stage_1 {
|
match stage_1 {
|
||||||
Ok(chan) => chan.await.map(|_| call),
|
Ok(chan) => chan.await.map(|()| call),
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
use byteorder::{LittleEndian, WriteBytesExt};
|
use byteorder::{LittleEndian, WriteBytesExt};
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn make_sine(float_len: usize, stereo: bool) -> Vec<u8> {
|
pub fn make_sine(float_len: usize, stereo: bool) -> Vec<u8> {
|
||||||
let sample_len = mem::size_of::<f32>();
|
let sample_len = mem::size_of::<f32>();
|
||||||
let byte_len = float_len * sample_len;
|
let byte_len = float_len * sample_len;
|
||||||
@@ -34,6 +35,7 @@ pub fn make_sine(float_len: usize, stereo: bool) -> Vec<u8> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
pub fn make_pcm_sine(i16_len: usize, stereo: bool) -> Vec<u8> {
|
pub fn make_pcm_sine(i16_len: usize, stereo: bool) -> Vec<u8> {
|
||||||
let sample_len = mem::size_of::<i16>();
|
let sample_len = mem::size_of::<i16>();
|
||||||
let byte_len = i16_len * sample_len;
|
let byte_len = i16_len * sample_len;
|
||||||
|
|||||||
@@ -282,7 +282,7 @@ mod tests {
|
|||||||
let handle = driver.play(Track::from(file).pause());
|
let handle = driver.play(Track::from(file).pause());
|
||||||
|
|
||||||
let callback = handle.make_playable();
|
let callback = handle.make_playable();
|
||||||
t_handle.spawn_ticker().await;
|
t_handle.spawn_ticker();
|
||||||
assert!(callback.result_async().await.is_ok());
|
assert!(callback.result_async().await.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -297,7 +297,7 @@ mod tests {
|
|||||||
|
|
||||||
let target = Duration::from_millis(500);
|
let target = Duration::from_millis(500);
|
||||||
let callback = handle.seek(target);
|
let callback = handle.seek(target);
|
||||||
t_handle.spawn_ticker().await;
|
t_handle.spawn_ticker();
|
||||||
|
|
||||||
let answer = callback.result_async().await;
|
let answer = callback.result_async().await;
|
||||||
assert!(answer.is_ok());
|
assert!(answer.is_ok());
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ mod tests {
|
|||||||
let _ = handle.add_event(Event::Track(TrackEvent::Loop), Looper { tx: l_tx });
|
let _ = handle.add_event(Event::Track(TrackEvent::Loop), Looper { tx: l_tx });
|
||||||
let _ = handle.add_event(Event::Track(TrackEvent::End), Looper { tx: e_tx });
|
let _ = handle.add_event(Event::Track(TrackEvent::End), Looper { tx: e_tx });
|
||||||
|
|
||||||
t_handle.spawn_ticker().await;
|
t_handle.spawn_ticker();
|
||||||
|
|
||||||
// CONDITIONS:
|
// CONDITIONS:
|
||||||
// 1) 2 loop events, each changes the loop count.
|
// 1) 2 loop events, each changes the loop count.
|
||||||
@@ -100,7 +100,7 @@ mod tests {
|
|||||||
let (l_tx, l_rx) = flume::unbounded();
|
let (l_tx, l_rx) = flume::unbounded();
|
||||||
let _ = handle.add_event(Event::Track(TrackEvent::Loop), Looper { tx: l_tx });
|
let _ = handle.add_event(Event::Track(TrackEvent::Loop), Looper { tx: l_tx });
|
||||||
|
|
||||||
t_handle.spawn_ticker().await;
|
t_handle.spawn_ticker();
|
||||||
|
|
||||||
// CONDITIONS:
|
// CONDITIONS:
|
||||||
// 1) 3 loop events, each does not change the loop count.
|
// 1) 3 loop events, each does not change the loop count.
|
||||||
|
|||||||
Reference in New Issue
Block a user