from __future__ import annotations from dataclasses import dataclass import numpy as np @dataclass class AgcConfig: target_level: float = 0.22 attack_ms: float = 6.0 release_ms: float = 280.0 min_gain: float = 0.2 max_gain: float = 20.0 noise_floor_init: float = 8e-5 noise_floor_rise_alpha: float = 0.03 noise_floor_fall_alpha: float = 0.002 gate_open_ratio: float = 1.20 silence_attenuation: float = 0.85 class AgcProcessor: """Stateful AGC to preserve envelope between chunks.""" def __init__(self, sample_rate: int, config: AgcConfig | None = None) -> None: self.sample_rate = int(sample_rate) self.config = config or AgcConfig() self._envelope = 0.0 self._gain = 1.0 self._noise_floor = max(self.config.noise_floor_init, 1e-7) self._attack_seconds = 0.005 self._release_seconds = 0.300 self._recompute_coeffs() def update( self, *, sample_rate: int | None = None, attack_ms: float | None = None, release_ms: float | None = None, target_level: float | None = None, ) -> None: if sample_rate is not None: self.sample_rate = int(sample_rate) if attack_ms is not None: self.config.attack_ms = float(attack_ms) if release_ms is not None: self.config.release_ms = float(release_ms) if target_level is not None: self.config.target_level = float(target_level) self._recompute_coeffs() def reset(self) -> None: self._envelope = 0.0 self._gain = 1.0 self._noise_floor = max(self.config.noise_floor_init, 1e-7) def _recompute_coeffs(self) -> None: self._attack_seconds = max(self.config.attack_ms, 1e-3) / 1000.0 self._release_seconds = max(self.config.release_ms, 1e-3) / 1000.0 def process(self, audio: np.ndarray, *, speech_hint: bool = False) -> np.ndarray: if audio.size == 0: return audio.astype(np.float32, copy=False) samples = audio.astype(np.float32, copy=False) chunk_seconds = max(samples.size / float(self.sample_rate), 1e-6) chunk_rms = float(np.sqrt(np.mean(samples * samples, dtype=np.float64))) target_env = max(chunk_rms, 1e-6) env = self._envelope if target_env > env: coeff = np.exp(-chunk_seconds / self._attack_seconds) else: coeff = np.exp(-chunk_seconds / self._release_seconds) env = coeff * env + (1.0 - coeff) * target_env env = max(env, 1e-6) noise_floor = self._noise_floor if speech_hint: alpha = self.config.noise_floor_fall_alpha elif target_env > noise_floor: alpha = self.config.noise_floor_rise_alpha else: alpha = self.config.noise_floor_fall_alpha noise_floor = (1.0 - alpha) * noise_floor + alpha * target_env noise_floor = max(noise_floor, 1e-7) speech_ratio = target_env / noise_floor voiced = bool(speech_hint or speech_ratio >= self.config.gate_open_ratio) desired_gain = float(np.clip(self.config.target_level / env, self.config.min_gain, self.config.max_gain)) peak = float(np.max(np.abs(samples))) if peak > 1e-6: desired_gain = min(desired_gain, 0.92 / peak) if not voiced: desired_gain = min(desired_gain, 2.2) current_gain = self._gain if desired_gain > current_gain: gain_coeff = np.exp(-chunk_seconds / self._attack_seconds) else: gain_coeff = np.exp(-chunk_seconds / self._release_seconds) gain = gain_coeff * current_gain + (1.0 - gain_coeff) * desired_gain if not voiced: gain = min(gain, 2.2) self._envelope = env self._gain = gain self._noise_floor = noise_floor out = samples * gain if not voiced: gate_span = max(self.config.gate_open_ratio - 1.0, 1e-6) gate_t = float(np.clip((speech_ratio - 1.0) / gate_span, 0.0, 1.0)) gate = self.config.silence_attenuation + (1.0 - self.config.silence_attenuation) * (gate_t * gate_t) out *= gate return np.clip(out, -1.0, 1.0).astype(np.float32, copy=False)