| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- from __future__ import annotations
- from dataclasses import dataclass
- import numpy as np
- @dataclass
- class AgcConfig:
- target_level: float = 0.22
- attack_ms: float = 6.0
- release_ms: float = 280.0
- min_gain: float = 0.2
- max_gain: float = 20.0
- noise_floor_init: float = 8e-5
- noise_floor_rise_alpha: float = 0.03
- noise_floor_fall_alpha: float = 0.002
- gate_open_ratio: float = 1.20
- silence_attenuation: float = 0.85
- class AgcProcessor:
- """Stateful AGC to preserve envelope between chunks."""
- def __init__(self, sample_rate: int, config: AgcConfig | None = None) -> None:
- self.sample_rate = int(sample_rate)
- self.config = config or AgcConfig()
- self._envelope = 0.0
- self._gain = 1.0
- self._noise_floor = max(self.config.noise_floor_init, 1e-7)
- self._attack_seconds = 0.005
- self._release_seconds = 0.300
- self._recompute_coeffs()
- def update(
- self,
- *,
- sample_rate: int | None = None,
- attack_ms: float | None = None,
- release_ms: float | None = None,
- target_level: float | None = None,
- ) -> None:
- if sample_rate is not None:
- self.sample_rate = int(sample_rate)
- if attack_ms is not None:
- self.config.attack_ms = float(attack_ms)
- if release_ms is not None:
- self.config.release_ms = float(release_ms)
- if target_level is not None:
- self.config.target_level = float(target_level)
- self._recompute_coeffs()
- def reset(self) -> None:
- self._envelope = 0.0
- self._gain = 1.0
- self._noise_floor = max(self.config.noise_floor_init, 1e-7)
- def _recompute_coeffs(self) -> None:
- self._attack_seconds = max(self.config.attack_ms, 1e-3) / 1000.0
- self._release_seconds = max(self.config.release_ms, 1e-3) / 1000.0
- def process(self, audio: np.ndarray, *, speech_hint: bool = False) -> np.ndarray:
- if audio.size == 0:
- return audio.astype(np.float32, copy=False)
- samples = audio.astype(np.float32, copy=False)
- chunk_seconds = max(samples.size / float(self.sample_rate), 1e-6)
- chunk_rms = float(np.sqrt(np.mean(samples * samples, dtype=np.float64)))
- target_env = max(chunk_rms, 1e-6)
- env = self._envelope
- if target_env > env:
- coeff = np.exp(-chunk_seconds / self._attack_seconds)
- else:
- coeff = np.exp(-chunk_seconds / self._release_seconds)
- env = coeff * env + (1.0 - coeff) * target_env
- env = max(env, 1e-6)
- noise_floor = self._noise_floor
- if speech_hint:
- alpha = self.config.noise_floor_fall_alpha
- elif target_env > noise_floor:
- alpha = self.config.noise_floor_rise_alpha
- else:
- alpha = self.config.noise_floor_fall_alpha
- noise_floor = (1.0 - alpha) * noise_floor + alpha * target_env
- noise_floor = max(noise_floor, 1e-7)
- speech_ratio = target_env / noise_floor
- voiced = bool(speech_hint or speech_ratio >= self.config.gate_open_ratio)
- desired_gain = float(np.clip(self.config.target_level / env, self.config.min_gain, self.config.max_gain))
- peak = float(np.max(np.abs(samples)))
- if peak > 1e-6:
- desired_gain = min(desired_gain, 0.92 / peak)
- if not voiced:
- desired_gain = min(desired_gain, 2.2)
- current_gain = self._gain
- if desired_gain > current_gain:
- gain_coeff = np.exp(-chunk_seconds / self._attack_seconds)
- else:
- gain_coeff = np.exp(-chunk_seconds / self._release_seconds)
- gain = gain_coeff * current_gain + (1.0 - gain_coeff) * desired_gain
- if not voiced:
- gain = min(gain, 2.2)
- self._envelope = env
- self._gain = gain
- self._noise_floor = noise_floor
- out = samples * gain
- if not voiced:
- gate_span = max(self.config.gate_open_ratio - 1.0, 1e-6)
- gate_t = float(np.clip((speech_ratio - 1.0) / gate_span, 0.0, 1.0))
- gate = self.config.silence_attenuation + (1.0 - self.config.silence_attenuation) * (gate_t * gate_t)
- out *= gate
- return np.clip(out, -1.0, 1.0).astype(np.float32, copy=False)
|