Files
face-detector/src/facedet/retinaface.rs
2025-08-07 17:24:01 +05:30

407 lines
14 KiB
Rust

use crate::errors::*;
use bounding_box::{Aabb2, nms::nms};
use error_stack::ResultExt;
use mnn_bridge::ndarray::*;
use nalgebra::{Point2, Vector2};
use ndarray_resize::NdFir;
use std::path::Path;
/// Configuration for face detection postprocessing
#[derive(Debug, Clone, PartialEq)]
pub struct FaceDetectionConfig {
/// Minimum confidence to keep a detection
pub threshold: f32,
/// NMS threshold for suppressing overlapping boxes
pub nms_threshold: f32,
/// Variances for bounding box decoding
pub variances: [f32; 2],
/// The step size (stride) for each feature map
pub steps: Vec<usize>,
/// The minimum anchor sizes for each feature map
pub min_sizes: Vec<Vec<usize>>,
/// Whether to clip bounding boxes to the image dimensions
pub clamp: bool,
/// Input image width (used for anchor generation)
pub input_width: usize,
/// Input image height (used for anchor generation)
pub input_height: usize,
}
impl FaceDetectionConfig {
pub fn with_threshold(mut self, threshold: f32) -> Self {
self.threshold = threshold;
self
}
pub fn with_nms_threshold(mut self, nms_threshold: f32) -> Self {
self.nms_threshold = nms_threshold;
self
}
pub fn with_variances(mut self, variances: [f32; 2]) -> Self {
self.variances = variances;
self
}
pub fn with_steps(mut self, steps: Vec<usize>) -> Self {
self.steps = steps;
self
}
pub fn with_min_sizes(mut self, min_sizes: Vec<Vec<usize>>) -> Self {
self.min_sizes = min_sizes;
self
}
pub fn with_clip(mut self, clip: bool) -> Self {
self.clamp = clip;
self
}
pub fn with_input_width(mut self, input_width: usize) -> Self {
self.input_width = input_width;
self
}
pub fn with_input_height(mut self, input_height: usize) -> Self {
self.input_height = input_height;
self
}
}
impl Default for FaceDetectionConfig {
fn default() -> Self {
Self {
threshold: 0.5,
nms_threshold: 0.4,
variances: [0.1, 0.2],
steps: vec![8, 16, 32],
min_sizes: vec![vec![16, 32], vec![64, 128], vec![256, 512]],
clamp: true,
input_width: 1024,
input_height: 1024,
}
}
}
#[derive(Debug)]
pub struct FaceDetection {
handle: mnn_sync::SessionHandle,
}
#[derive(Debug, Clone, PartialEq)]
pub struct FaceDetectionModelOutput {
pub bbox: ndarray::Array3<f32>,
pub confidence: ndarray::Array3<f32>,
pub landmark: ndarray::Array3<f32>,
}
/// Represents the 5 facial landmarks detected by RetinaFace
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct FaceLandmarks {
pub left_eye: Point2<f32>,
pub right_eye: Point2<f32>,
pub nose: Point2<f32>,
pub left_mouth: Point2<f32>,
pub right_mouth: Point2<f32>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct FaceDetectionProcessedOutput {
pub bbox: Vec<Aabb2<f32>>,
pub confidence: Vec<f32>,
pub landmarks: Vec<FaceLandmarks>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct FaceDetectionOutput {
pub bbox: Vec<Aabb2<usize>>,
pub confidence: Vec<f32>,
pub landmark: Vec<FaceLandmarks>,
}
fn generate_anchors(config: &FaceDetectionConfig) -> ndarray::Array2<f32> {
let mut anchors = Vec::new();
let feature_maps: Vec<(usize, usize)> = config
.steps
.iter()
.map(|&step| {
(
(config.input_height as f32 / step as f32).ceil() as usize,
(config.input_width as f32 / step as f32).ceil() as usize,
)
})
.collect();
for (k, f) in feature_maps.iter().enumerate() {
let min_sizes = &config.min_sizes[k];
for i in 0..f.0 {
for j in 0..f.1 {
for &min_size in min_sizes {
let s_kx = min_size as f32 / config.input_width as f32;
let s_ky = min_size as f32 / config.input_height as f32;
let dense_cx =
(j as f32 + 0.5) * config.steps[k] as f32 / config.input_width as f32;
let dense_cy =
(i as f32 + 0.5) * config.steps[k] as f32 / config.input_height as f32;
anchors.push([
dense_cx - s_kx / 2.,
dense_cy - s_ky / 2.,
dense_cx + s_kx / 2.,
dense_cy + s_ky / 2.,
]);
}
}
}
}
ndarray::Array2::from_shape_vec((anchors.len(), 4), anchors.into_iter().flatten().collect())
.unwrap()
}
impl FaceDetectionModelOutput {
pub fn postprocess(self, config: &FaceDetectionConfig) -> Result<FaceDetectionProcessedOutput> {
use ndarray::s;
let priors = generate_anchors(config);
let scores = self.confidence.slice(s![0, .., 1]);
let boxes = self.bbox.slice(s![0, .., ..]);
let landmarks_raw = self.landmark.slice(s![0, .., ..]);
let mut decoded_boxes = Vec::new();
let mut decoded_landmarks = Vec::new();
let mut confidences = Vec::new();
for i in 0..priors.shape()[0] {
if scores[i] > config.threshold {
let prior = priors.row(i);
let loc = boxes.row(i);
let landm = landmarks_raw.row(i);
// Decode bounding box
let prior_cx = (prior[0] + prior[2]) / 2.0;
let prior_cy = (prior[1] + prior[3]) / 2.0;
let prior_w = prior[2] - prior[0];
let prior_h = prior[3] - prior[1];
let var = config.variances;
let cx = prior_cx + loc[0] * var[0] * prior_w;
let cy = prior_cy + loc[1] * var[0] * prior_h;
let w = prior_w * (loc[2] * var[1]).exp();
let h = prior_h * (loc[3] * var[1]).exp();
let xmin = cx - w / 2.0;
let ymin = cy - h / 2.0;
let xmax = cx + w / 2.0;
let ymax = cy + h / 2.0;
let mut bbox =
Aabb2::from_min_max_vertices(Point2::new(xmin, ymin), Point2::new(xmax, ymax));
if config.clamp {
bbox.component_clamp(0.0, 1.0);
}
decoded_boxes.push(bbox);
// Decode landmarks
let mut points = [Point2::new(0.0, 0.0); 5];
for j in 0..5 {
points[j].x = prior_cx + landm[j * 2] * var[0] * prior_w;
points[j].y = prior_cy + landm[j * 2 + 1] * var[0] * prior_h;
}
let landmarks = FaceLandmarks {
left_eye: points[0],
right_eye: points[1],
nose: points[2],
left_mouth: points[3],
right_mouth: points[4],
};
decoded_landmarks.push(landmarks);
confidences.push(scores[i]);
}
}
Ok(FaceDetectionProcessedOutput {
bbox: decoded_boxes,
confidence: confidences,
landmarks: decoded_landmarks,
})
}
}
impl FaceDetectionModelOutput {
pub fn print(&self, limit: usize) {
tracing::info!("Detected {} faces", self.bbox.shape()[1]);
for (bbox, confidence) in self
.bbox
.clone()
.remove_axis(ndarray::Axis(0))
.axis_iter(ndarray::Axis(0))
.zip(
self.confidence
.clone()
.remove_axis(ndarray::Axis(0))
.axis_iter(ndarray::Axis(0))
.map(|c| c[1]),
)
.filter(|(_, c)| *c > 0.1)
.take(limit)
{
tracing::info!("Face BBox: {:?}, Confidence: {:.2}", bbox, confidence);
}
}
}
impl FaceDetection {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let model = std::fs::read(path)
.change_context(Error)
.attach_printable("Failed to read model file")?;
Self::new_from_bytes(&model)
}
pub fn new_from_bytes(model: &[u8]) -> Result<Self> {
tracing::info!("Loading face detection model from bytes");
let mut model = mnn::Interpreter::from_bytes(model)
.map_err(|e| e.into_inner())
.change_context(Error)
.attach_printable("Failed to load model from bytes")?;
model.set_session_mode(mnn::SessionMode::Release);
let bc = mnn::BackendConfig::default().with_memory_mode(mnn::MemoryMode::High);
let sc = mnn::ScheduleConfig::new()
.with_type(mnn::ForwardType::CPU)
.with_backend_config(bc);
tracing::info!("Creating session handle for face detection model");
let handle = mnn_sync::SessionHandle::new(model, sc)
.change_context(Error)
.attach_printable("Failed to create session handle")?;
Ok(FaceDetection { handle })
}
pub fn detect_faces(
&self,
image: ndarray::ArrayView3<u8>,
config: FaceDetectionConfig,
) -> Result<FaceDetectionOutput> {
let (height, width, _channels) = image.dim();
let output = self
.run_models(image)
.change_context(Error)
.attach_printable("Failed to detect faces")?;
// denormalize the bounding boxes
let factor = Vector2::new(width as f32, height as f32);
let mut processed = output
.postprocess(&config)
.attach_printable("Failed to postprocess")?;
use itertools::Itertools;
let (boxes, scores, landmarks): (Vec<_>, Vec<_>, Vec<_>) = processed
.bbox
.iter()
.cloned()
.zip(processed.confidence.iter().cloned())
.zip(processed.landmarks.iter().cloned())
.sorted_by_key(|((_, score), _)| ordered_float::OrderedFloat(*score))
.map(|((b, s), l)| (b, s, l))
.multiunzip();
let keep_indices =
nms(&boxes, &scores, config.threshold, config.nms_threshold).change_context(Error)?;
let bboxes = boxes
.into_iter()
.enumerate()
.filter(|(i, _)| keep_indices.contains(i))
.flat_map(|(_, x)| x.denormalize(factor).try_cast::<usize>())
.collect();
let confidence = scores
.into_iter()
.enumerate()
.filter(|(i, _)| keep_indices.contains(i))
.map(|(_, score)| score)
.collect();
let landmark = landmarks
.into_iter()
.enumerate()
.filter(|(i, _)| keep_indices.contains(i))
.map(|(_, score)| score)
.collect();
Ok(FaceDetectionOutput {
bbox: bboxes,
confidence,
landmark,
})
}
pub fn run_models(&self, image: ndarray::ArrayView3<u8>) -> Result<FaceDetectionModelOutput> {
#[rustfmt::skip]
use ::tap::*;
let output = self
.handle
.run(move |sr| {
let mut resized = image
.fast_resize(1024, 1024, None)
.change_context(mnn::ErrorKind::TensorError)?
.mapv(|f| f as f32)
.tap_mut(|arr| {
arr.axis_iter_mut(ndarray::Axis(2))
.zip([104, 117, 123])
.for_each(|(mut array, pixel)| {
let pixel = pixel as f32;
array.map_inplace(|v| *v -= pixel);
});
})
.permuted_axes((2, 0, 1))
.insert_axis(ndarray::Axis(0))
.as_standard_layout()
.into_owned();
let tensor = resized
.as_mnn_tensor_mut()
.attach_printable("Failed to convert ndarray to mnn tensor")
.change_context(mnn::error::ErrorKind::TensorError)?;
tracing::trace!("Image Tensor shape: {:?}", tensor.shape());
let (intptr, session) = sr.both_mut();
tracing::trace!("Copying input tensor to host");
unsafe {
let mut input = intptr.input_unresized::<f32>(session, "input")?;
tracing::trace!("Input shape: {:?}", input.shape());
intptr.resize_tensor_by_nchw::<mnn::View<&mut f32>, _>(
input.view_mut(),
1,
3,
1024,
1024,
);
}
intptr.resize_session(session);
let mut input = intptr.input::<f32>(session, "input")?;
tracing::trace!("Input shape: {:?}", input.shape());
input.copy_from_host_tensor(tensor.view())?;
tracing::info!("Running face detection session");
intptr.run_session(&session)?;
let output_tensor = intptr
.output::<f32>(&session, "bbox")?
.create_host_tensor_from_device(true)
.as_ndarray()
.to_owned();
tracing::trace!("Output Bbox: \t\t{:?}", output_tensor.shape());
let output_confidence = intptr
.output::<f32>(&session, "confidence")?
.create_host_tensor_from_device(true)
.as_ndarray::<ndarray::Ix3>()
.to_owned();
tracing::trace!("Output Confidence: \t{:?}", output_confidence.shape());
let output_landmark = intptr
.output::<f32>(&session, "landmark")?
.create_host_tensor_from_device(true)
.as_ndarray::<ndarray::Ix3>()
.to_owned();
tracing::trace!("Output Landmark: \t{:?}", output_landmark.shape());
Ok(FaceDetectionModelOutput {
bbox: output_tensor,
confidence: output_confidence,
landmark: output_landmark,
})
})
.map_err(|e| e.into_inner())
.change_context(Error)?;
Ok(output)
}
}