use crate::errors::*; use bounding_box::Aabb2; use error_stack::ResultExt; use mnn_bridge::ndarray::*; use nalgebra::{Point2, Vector2}; use ndarray_resize::NdFir; use std::path::Path; pub struct FaceDetectionConfig { min_sizes: Vec>, steps: Vec, variance: Vec, } pub struct FaceDetection { handle: mnn_sync::SessionHandle, } pub struct FaceDetectionModelOutput { pub bbox: ndarray::Array3, pub confidence: ndarray::Array3, pub landmark: ndarray::Array3, } impl FaceDetectionModelOutput { pub fn postprocess(self, config: FaceDetectionConfig) -> Result>> { // for k, step in enumerate(cfg['steps']): // feature_size = 640 // step // for i in range(feature_size): // for j in range(feature_size): // for min_size in cfg['min_sizes'][k]: // cx = (j + 0.5) * step / 640 // cy = (i + 0.5) * step / 640 // s_kx = s_ky = min_size / 640 // anchors.append([cx, cy, s_kx, s_ky]) let mut anchors = Vec::new(); config.steps.iter().enumerate().for_each(|(k, step)| { let feature_size = 640 / step; for i in 0..feature_size { for j in 0..feature_size { for min_size in &config.min_sizes[k] { let cx = (j as f32 + 0.5) * *step as f32 / 640.0; let cy = (i as f32 + 0.5) * *step as f32 / 640.0; let s_kx = *min_size as f32 / 640.0; let s_ky = *min_size as f32 / 640.0; anchors.push([cx, cy, s_kx, s_ky]); } } } }); Ok(Vec::new()) } } impl FaceDetectionModelOutput { pub fn print(&self, limit: usize) { tracing::info!("Detected {} faces", self.bbox.shape()[1]); for (bbox, confidence) in self .bbox .clone() .remove_axis(ndarray::Axis(0)) .axis_iter(ndarray::Axis(0)) .zip( self.confidence .clone() .remove_axis(ndarray::Axis(0)) .axis_iter(ndarray::Axis(0)) .map(|c| c[1]), ) .filter(|(_, c)| *c > 0.1) .take(limit) { tracing::info!("Face BBox: {:?}, Confidence: {:.2}", bbox, confidence); } } } impl FaceDetection { pub fn new(path: impl AsRef) -> Result { let model = std::fs::read(path) .change_context(Error) .attach_printable("Failed to read model file")?; Self::new_from_bytes(&model) } pub fn new_from_bytes(model: &[u8]) -> Result { tracing::info!("Loading face detection model from bytes"); let mut model = mnn::Interpreter::from_bytes(model) .map_err(|e| e.into_inner()) .change_context(Error) .attach_printable("Failed to load model from bytes")?; model.set_session_mode(mnn::SessionMode::Release); let bc = mnn::BackendConfig::default().with_memory_mode(mnn::MemoryMode::High); let sc = mnn::ScheduleConfig::new() .with_type(mnn::ForwardType::CPU) .with_backend_config(bc); tracing::info!("Creating session handle for face detection model"); let handle = mnn_sync::SessionHandle::new(model, sc) .change_context(Error) .attach_printable("Failed to create session handle")?; Ok(FaceDetection { handle }) } pub fn detect_faces(&self, image: ndarray::Array3) -> Result { use ::tap::*; let output = self .handle .run(move |sr| { let mut resized = image .fast_resize(640, 640, None) .change_context(mnn::ErrorKind::TensorError)? .mapv(|f| f as f32) .tap_mut(|arr| { arr.axis_iter_mut(ndarray::Axis(2)) .zip([104, 117, 123]) .for_each(|(mut array, pixel)| { let pixel = pixel as f32; array.map_inplace(|v| *v -= pixel); }); }) .permuted_axes((2, 0, 1)) .insert_axis(ndarray::Axis(0)) .as_standard_layout() .into_owned(); let tensor = resized .as_mnn_tensor_mut() .attach_printable("Failed to convert ndarray to mnn tensor") .change_context(mnn::error::ErrorKind::TensorError)?; tracing::trace!("Image Tensor shape: {:?}", tensor.shape()); let (intptr, session) = sr.both_mut(); tracing::trace!("Copying input tensor to host"); unsafe { let mut input = intptr.input_unresized::(session, "input")?; tracing::trace!("Input shape: {:?}", input.shape()); intptr.resize_tensor_by_nchw::, _>( input.view_mut(), 1, 3, 640, 640, ); } intptr.resize_session(session); let mut input = intptr.input::(session, "input")?; tracing::trace!("Input shape: {:?}", input.shape()); input.copy_from_host_tensor(tensor.view())?; tracing::info!("Running face detection session"); intptr.run_session(&session)?; let output_tensor = intptr .output::(&session, "bbox")? .create_host_tensor_from_device(true) .as_ndarray() .to_owned(); tracing::trace!("Output Bbox: \t\t{:?}", output_tensor.shape()); let output_confidence = intptr .output::(&session, "confidence")? .create_host_tensor_from_device(true) .as_ndarray::() .to_owned(); tracing::trace!("Output Confidence: \t{:?}", output_confidence.shape()); let output_landmark = intptr .output::(&session, "landmark")? .create_host_tensor_from_device(true) .as_ndarray::() .to_owned(); tracing::trace!("Output Landmark: \t{:?}", output_landmark.shape()); Ok(FaceDetectionModelOutput { bbox: output_tensor, confidence: output_confidence, landmark: output_landmark, }) }) .map_err(|e| e.into_inner()) .change_context(Error)?; Ok(output) } }