Driver: Remove RwLock from ThreadPool (#206)
This commit is contained in:
@@ -6,7 +6,7 @@ use crate::{
|
|||||||
Config,
|
Config,
|
||||||
};
|
};
|
||||||
use flume::Sender;
|
use flume::Sender;
|
||||||
use parking_lot::RwLock;
|
use rusty_pool::ThreadPool;
|
||||||
use std::{result::Result as StdResult, sync::Arc, time::Duration};
|
use std::{result::Result as StdResult, sync::Arc, time::Duration};
|
||||||
use symphonia_core::{
|
use symphonia_core::{
|
||||||
formats::{SeekMode, SeekTo},
|
formats::{SeekMode, SeekTo},
|
||||||
@@ -16,18 +16,14 @@ use tokio::runtime::Handle;
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct BlockyTaskPool {
|
pub struct BlockyTaskPool {
|
||||||
pool: Arc<RwLock<rusty_pool::ThreadPool>>,
|
pool: ThreadPool,
|
||||||
handle: Handle,
|
handle: Handle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockyTaskPool {
|
impl BlockyTaskPool {
|
||||||
pub fn new(handle: Handle) -> Self {
|
pub fn new(handle: Handle) -> Self {
|
||||||
Self {
|
Self {
|
||||||
pool: Arc::new(RwLock::new(rusty_pool::ThreadPool::new(
|
pool: ThreadPool::new(0, 64, Duration::from_secs(5)),
|
||||||
0,
|
|
||||||
64,
|
|
||||||
Duration::from_secs(5),
|
|
||||||
))),
|
|
||||||
handle,
|
handle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -52,8 +48,7 @@ impl BlockyTaskPool {
|
|||||||
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
|
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
let pool = self.pool.read();
|
self.pool.execute(move || {
|
||||||
pool.execute(move || {
|
|
||||||
let out = lazy.create();
|
let out = lazy.create();
|
||||||
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
|
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
|
||||||
});
|
});
|
||||||
@@ -91,10 +86,9 @@ impl BlockyTaskPool {
|
|||||||
seek_time: Option<SeekTo>,
|
seek_time: Option<SeekTo>,
|
||||||
) {
|
) {
|
||||||
let pool_clone = self.clone();
|
let pool_clone = self.clone();
|
||||||
let pool = self.pool.read();
|
|
||||||
|
|
||||||
pool.execute(
|
self.pool.execute(move || {
|
||||||
move || match input.promote(config.codec_registry, config.format_registry) {
|
match input.promote(config.codec_registry, config.format_registry) {
|
||||||
Ok(LiveInput::Parsed(parsed)) => match seek_time {
|
Ok(LiveInput::Parsed(parsed)) => match seek_time {
|
||||||
// If seek time is zero, then wipe it out.
|
// If seek time is zero, then wipe it out.
|
||||||
// Some formats (MKV) make SeekTo(0) require a backseek to realign with the
|
// Some formats (MKV) make SeekTo(0) require a backseek to realign with the
|
||||||
@@ -110,8 +104,8 @@ impl BlockyTaskPool {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
drop(callback.send(MixerInputResultMessage::ParseErr(e.into())));
|
drop(callback.send(MixerInputResultMessage::ParseErr(e.into())));
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
);
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn seek(
|
pub fn seek(
|
||||||
@@ -126,9 +120,8 @@ impl BlockyTaskPool {
|
|||||||
config: Arc<Config>,
|
config: Arc<Config>,
|
||||||
) {
|
) {
|
||||||
let pool_clone = self.clone();
|
let pool_clone = self.clone();
|
||||||
let pool = self.pool.read();
|
|
||||||
|
|
||||||
pool.execute(move || match rec {
|
self.pool.execute(move || match rec {
|
||||||
Some(rec) if (!input.supports_backseek) && backseek_needed => {
|
Some(rec) if (!input.supports_backseek) && backseek_needed => {
|
||||||
pool_clone.create(callback, Input::Lazy(rec), Some(seek_time), config);
|
pool_clone.create(callback, Input::Lazy(rec), Some(seek_time), config);
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user