agc.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. from __future__ import annotations
  2. from dataclasses import dataclass
  3. import numpy as np
  4. @dataclass
  5. class AgcConfig:
  6. target_level: float = 0.22
  7. attack_ms: float = 6.0
  8. release_ms: float = 280.0
  9. min_gain: float = 0.2
  10. max_gain: float = 20.0
  11. noise_floor_init: float = 8e-5
  12. noise_floor_rise_alpha: float = 0.03
  13. noise_floor_fall_alpha: float = 0.002
  14. gate_open_ratio: float = 1.20
  15. silence_attenuation: float = 0.85
  16. class AgcProcessor:
  17. """Stateful AGC to preserve envelope between chunks."""
  18. def __init__(self, sample_rate: int, config: AgcConfig | None = None) -> None:
  19. self.sample_rate = int(sample_rate)
  20. self.config = config or AgcConfig()
  21. self._envelope = 0.0
  22. self._gain = 1.0
  23. self._noise_floor = max(self.config.noise_floor_init, 1e-7)
  24. self._attack_seconds = 0.005
  25. self._release_seconds = 0.300
  26. self._recompute_coeffs()
  27. def update(
  28. self,
  29. *,
  30. sample_rate: int | None = None,
  31. attack_ms: float | None = None,
  32. release_ms: float | None = None,
  33. target_level: float | None = None,
  34. ) -> None:
  35. if sample_rate is not None:
  36. self.sample_rate = int(sample_rate)
  37. if attack_ms is not None:
  38. self.config.attack_ms = float(attack_ms)
  39. if release_ms is not None:
  40. self.config.release_ms = float(release_ms)
  41. if target_level is not None:
  42. self.config.target_level = float(target_level)
  43. self._recompute_coeffs()
  44. def reset(self) -> None:
  45. self._envelope = 0.0
  46. self._gain = 1.0
  47. self._noise_floor = max(self.config.noise_floor_init, 1e-7)
  48. def _recompute_coeffs(self) -> None:
  49. self._attack_seconds = max(self.config.attack_ms, 1e-3) / 1000.0
  50. self._release_seconds = max(self.config.release_ms, 1e-3) / 1000.0
  51. def process(self, audio: np.ndarray, *, speech_hint: bool = False) -> np.ndarray:
  52. if audio.size == 0:
  53. return audio.astype(np.float32, copy=False)
  54. samples = audio.astype(np.float32, copy=False)
  55. chunk_seconds = max(samples.size / float(self.sample_rate), 1e-6)
  56. chunk_rms = float(np.sqrt(np.mean(samples * samples, dtype=np.float64)))
  57. target_env = max(chunk_rms, 1e-6)
  58. env = self._envelope
  59. if target_env > env:
  60. coeff = np.exp(-chunk_seconds / self._attack_seconds)
  61. else:
  62. coeff = np.exp(-chunk_seconds / self._release_seconds)
  63. env = coeff * env + (1.0 - coeff) * target_env
  64. env = max(env, 1e-6)
  65. noise_floor = self._noise_floor
  66. if speech_hint:
  67. alpha = self.config.noise_floor_fall_alpha
  68. elif target_env > noise_floor:
  69. alpha = self.config.noise_floor_rise_alpha
  70. else:
  71. alpha = self.config.noise_floor_fall_alpha
  72. noise_floor = (1.0 - alpha) * noise_floor + alpha * target_env
  73. noise_floor = max(noise_floor, 1e-7)
  74. speech_ratio = target_env / noise_floor
  75. voiced = bool(speech_hint or speech_ratio >= self.config.gate_open_ratio)
  76. desired_gain = float(np.clip(self.config.target_level / env, self.config.min_gain, self.config.max_gain))
  77. peak = float(np.max(np.abs(samples)))
  78. if peak > 1e-6:
  79. desired_gain = min(desired_gain, 0.92 / peak)
  80. if not voiced:
  81. desired_gain = min(desired_gain, 2.2)
  82. current_gain = self._gain
  83. if desired_gain > current_gain:
  84. gain_coeff = np.exp(-chunk_seconds / self._attack_seconds)
  85. else:
  86. gain_coeff = np.exp(-chunk_seconds / self._release_seconds)
  87. gain = gain_coeff * current_gain + (1.0 - gain_coeff) * desired_gain
  88. if not voiced:
  89. gain = min(gain, 2.2)
  90. self._envelope = env
  91. self._gain = gain
  92. self._noise_floor = noise_floor
  93. out = samples * gain
  94. if not voiced:
  95. gate_span = max(self.config.gate_open_ratio - 1.0, 1e-6)
  96. gate_t = float(np.clip((speech_ratio - 1.0) / gate_span, 0.0, 1.0))
  97. gate = self.config.silence_attenuation + (1.0 - self.config.silence_attenuation) * (gate_t * gate_t)
  98. out *= gate
  99. return np.clip(out, -1.0, 1.0).astype(np.float32, copy=False)