from __future__ import annotations from dataclasses import asdict, dataclass from datetime import datetime import base64 from collections import deque from concurrent.futures import Future, ProcessPoolExecutor import math import os from pathlib import Path import threading import time import numpy as np from scipy import signal import sounddevice as sd from agc import AgcProcessor from beamforming import beamform_delay_and_sum from recorder import RecorderStatus, WavRecorder def _gcc_phat_job(sig: np.ndarray, refsig: np.ndarray, sample_rate: int, max_tau: float) -> float: n = sig.size + refsig.size sig_fft = np.fft.rfft(sig, n=n) ref_fft = np.fft.rfft(refsig, n=n) cross = sig_fft * np.conj(ref_fft) denom = np.abs(cross) cross = cross / np.maximum(denom, 1e-10) cc = np.fft.irfft(cross, n=n) max_shift = int(min(n // 2, max_tau * sample_rate)) if max_shift <= 0: return 0.0 cc_window = np.concatenate((cc[-max_shift:], cc[: max_shift + 1])) shift = int(np.argmax(np.abs(cc_window)) - max_shift) return float(shift) / float(sample_rate) def _estimate_speech_angle_job( mic1: np.ndarray, mic2: np.ndarray, sample_rate: int, mic_spacing: float, prev_angle_deg: float, prev_noise_floor: float, ) -> tuple[float, bool, float]: if mic1.size < 64 or mic2.size < 64: return float(prev_angle_deg), False, float(prev_noise_floor) high = min(3400.0, 0.45 * sample_rate) low = min(300.0, high * 0.5) if high <= low + 1.0: return float(prev_angle_deg), False, float(prev_noise_floor) try: sos = signal.butter(4, [low, high], btype="bandpass", fs=sample_rate, output="sos") except ValueError: return float(prev_angle_deg), False, float(prev_noise_floor) speech1 = signal.sosfilt(sos, mic1).astype(np.float32, copy=False) speech2 = signal.sosfilt(sos, mic2).astype(np.float32, copy=False) speech_energy = 0.5 * (np.mean(speech1 * speech1) + np.mean(speech2 * speech2)) full_energy = 0.5 * (np.mean(mic1 * mic1) + np.mean(mic2 * mic2)) speech_ratio = float(speech_energy / max(full_energy, 1e-12)) noise_floor = 0.995 * float(prev_noise_floor) + 0.005 * float(speech_energy) speech_threshold = max(2.5e-7, noise_floor * 2.0) speech_detected = bool(speech_energy > speech_threshold and speech_ratio > 0.08) if not speech_detected: return float(prev_angle_deg), False, float(noise_floor) max_tau = mic_spacing / 343.0 tau = _gcc_phat_job(speech1, speech2, sample_rate, max_tau=max_tau) sin_theta = np.clip((tau * 343.0) / max(mic_spacing, 1e-6), -1.0, 1.0) raw_angle = float(np.rad2deg(np.arcsin(sin_theta))) raw_angle = float(np.clip(raw_angle, -90.0, 90.0)) angle = 0.88 * float(prev_angle_deg) + 0.12 * raw_angle return float(angle), True, float(noise_floor) ALLOWED_SAMPLE_RATES = {16000, 22050, 24000, 32000, 48000} ALLOWED_MODES = {"mic1", "mic2", "mono_mix", "beamforming"} ALLOWED_SOURCES = {"mic1", "mic2", "mono_mix", "beam", "compare_all", "hifi_raw"} ALLOWED_MONITOR_SOURCES = {"mic1", "mic2", "mono_mix", "beam"} ALLOWED_HIFI_MICS = {"mic1", "mic2"} @dataclass class AudioSettings: mode: str = "beamforming" gain_db: float = 0.0 agc: bool = True attack_ms: float = 6.0 release_ms: float = 280.0 noise_suppression: bool = True speech_gate: bool = False hum_filter: bool = True limiter: bool = True beam_clarity: bool = True hifi_mode: bool = False hifi_mic: str = "mic1" angle: float = 0.0 auto_beam: bool = True monitor_on: bool = False monitor_source: str = "beam" sample_rate: int = 16000 class AudioEngine: CHANNELS = 2 BIT_DEPTH = 32 CHUNK_SIZE = 2048 HARDWARE_SAMPLE_RATE = 48000 RING_DIAMETER_M = 0.06 RING_SLOTS = 4 SLOT_STEP = 1 # neighboring slots (90 degrees). Use 2 for opposite mics. MIC_SPACING = RING_DIAMETER_M * math.sin(math.pi * SLOT_STEP / RING_SLOTS) STARTUP_IGNORE_SECONDS = 0.30 SPEECH_GATE_HOLD_SECONDS = 0.85 SPEECH_GATE_FLOOR = 0.55 SPEECH_GATE_ATTACK_SECONDS = 0.012 SPEECH_GATE_RELEASE_SECONDS = 0.360 NOISE_SUPPRESS_OPEN_FLOOR = 0.72 NOISE_SUPPRESS_CLOSED_FLOOR = 0.40 NOISE_SUPPRESS_OPEN_STRENGTH = 0.30 NOISE_SUPPRESS_CLOSED_STRENGTH = 0.55 HUM_HPF_CUTOFF_HZ = 75.0 HUM_NOTCH_HZ = 50.0 HUM_NOTCH_Q = 22.0 BEAM_CLARITY_BLEND = 0.22 BEAM_PRESENCE_BOOST = 0.20 def __init__(self, recordings_dir: Path) -> None: self._lock = threading.Lock() self._stream: sd.InputStream | None = None self._stream_channels = self.CHANNELS self._input_device_name = "unknown" self._settings = AudioSettings() self._running = False self._startup_deadline = 0.0 self._auto_angle_deg = 0.0 self._noise_floor = 1e-7 self._vad_noise_floor = 1e-7 self._speech_sos_cache: dict[int, np.ndarray | None] = {} self._hpf_sos_cache: dict[int, np.ndarray | None] = {} self._notch_cache: dict[int, tuple[np.ndarray, np.ndarray] | None] = {} self._hpf_state: dict[tuple[str, int], np.ndarray] = {} self._notch_state: dict[tuple[str, int], np.ndarray] = {} self._last_speech_detected = False self._angle_update_counter = 0 self._angle_update_interval = 4 cpu_total = max(1, int(os.cpu_count() or 1)) self._cpu_workers = max(1, min(4, cpu_total)) self._angle_workers = max(1, min(3, self._cpu_workers - 1)) self._max_pending_angle_jobs = max(1, min(2, self._angle_workers)) self._angle_executor: ProcessPoolExecutor | None = ProcessPoolExecutor(max_workers=self._angle_workers) self._angle_futures: deque[Future] = deque() self._speech_gate_hold_chunks = 0 self._speech_gate_gain = 1.0 self._ns_noise_power = { "mic1": 1e-7, "mic2": 1e-7, "mono_mix": 1e-7, "beam": 1e-7, } self._presence_prev = {"beam": 0.0} self._recordings_dir = Path(recordings_dir) self._recorder = WavRecorder(recordings_dir) self._compare_recorders: dict[str, WavRecorder] = {} self._compare_filenames: dict[str, str] = {} self._record_duration_limit_sec: float | None = None self._auto_stop_requested = False self._monitor_queue: deque[np.ndarray] = deque() self._monitor_queue_samples = 0 self._agc_mic1 = AgcProcessor(self._settings.sample_rate) self._agc_mic2 = AgcProcessor(self._settings.sample_rate) self._agc_beam = AgcProcessor(self._settings.sample_rate) self._latest_frame: dict[str, object] = self._make_empty_frame() def start(self) -> None: with self._lock: if self._running: return self._open_stream() def stop(self) -> None: with self._lock: stream = self._stream self._stream = None self._running = False angle_executor = self._angle_executor self._angle_executor = None angle_futures = list(self._angle_futures) self._angle_futures.clear() if stream is not None: stream.stop() stream.close() self.stop_recording() for fut in angle_futures: fut.cancel() if angle_executor is not None: angle_executor.shutdown(wait=False, cancel_futures=True) def is_running(self) -> bool: with self._lock: return self._running def get_settings(self) -> dict[str, object]: with self._lock: return asdict(self._settings) def get_status(self) -> dict[str, object]: rec_status = self._current_recording_status() return { "recording": rec_status.recording, "mic_count": self.CHANNELS, "hardware_sample_rate": self.HARDWARE_SAMPLE_RATE, "mic_spacing_m": self.MIC_SPACING, "auto_beam_angle_deg": self._auto_angle_deg, "input_device": self._input_device_name, "settings": self.get_settings(), "recording_status": asdict(rec_status), "audio_running": self.is_running(), } def get_latest_packet(self) -> dict[str, object]: empty = np.empty(0, dtype=np.float32) with self._lock: frame = dict(self._latest_frame) mic1 = np.asarray(frame.get("mic1", empty), dtype=np.float32) mic2 = np.asarray(frame.get("mic2", empty), dtype=np.float32) beam = np.asarray(frame.get("beam", empty), dtype=np.float32) mono_mix = np.asarray(frame.get("mono_mix", empty), dtype=np.float32) show_mic2 = bool(frame.get("show_mic2", True)) show_beam = bool(frame.get("show_beam", False)) show_mono_mix = bool(frame.get("show_mono_mix", False)) beam_angle_deg = float(frame.get("beam_angle_deg", 0.0)) auto_beam = bool(frame.get("auto_beam", True)) speech_detected = bool(frame.get("speech_detected", False)) speech_gate_open = bool(frame.get("speech_gate_open", True)) hifi_mode = bool(frame.get("hifi_mode", False)) monitor_on = bool(frame.get("monitor_on", False)) monitor_source = str(frame.get("monitor_source", "beam")) recording = bool(frame.get("recording", False)) rec_duration = float(frame.get("rec_duration", 0.0)) monitor_rate = int(self.HARDWARE_SAMPLE_RATE if hifi_mode else self._settings.sample_rate) monitor_chunk = None if monitor_on: monitor_chunk = self._pop_monitor_chunk(max_samples=max(512, int(monitor_rate * 0.08))) packet = { "type": "audio_data", "mic1": self._downsample_for_ui(mic1).tolist(), "mic2": self._downsample_for_ui(mic2).tolist() if show_mic2 else [], "beam": self._downsample_for_ui(beam).tolist() if show_beam else [], "mono_mix": self._downsample_for_ui(mono_mix).tolist() if show_mono_mix else [], "rms_mic1": self._rms(mic1), "rms_mic2": self._rms(mic2) if show_mic2 else 0.0, "rms_beam": self._rms(beam) if show_beam else 0.0, "rms_mono_mix": self._rms(mono_mix) if show_mono_mix else 0.0, "beam_angle_deg": beam_angle_deg, "auto_beam": auto_beam, "speech_detected": speech_detected, "speech_gate_open": speech_gate_open, "hifi_mode": hifi_mode, "monitor_on": monitor_on, "monitor_source": monitor_source, "monitor_sr": monitor_rate, "monitor_chunk_b64": "", "recording": recording, "rec_duration": rec_duration, } if monitor_chunk is not None and monitor_chunk.size > 0: packet["monitor_chunk_b64"] = self._encode_pcm16_base64(monitor_chunk) return packet def update_settings(self, updates: dict[str, object]) -> dict[str, object]: with self._lock: current = self._settings mode = str(updates.get("mode", current.mode)) if mode not in ALLOWED_MODES: mode = current.mode sample_rate = int(updates.get("sample_rate", current.sample_rate)) if sample_rate not in ALLOWED_SAMPLE_RATES: sample_rate = current.sample_rate gain_db = float(updates.get("gain_db", current.gain_db)) gain_db = float(np.clip(gain_db, 0.0, 30.0)) agc = bool(updates.get("agc", current.agc)) attack_ms = float(updates.get("attack_ms", current.attack_ms)) attack_ms = float(np.clip(attack_ms, 1.0, 50.0)) release_ms = float(updates.get("release_ms", current.release_ms)) release_ms = float(np.clip(release_ms, 50.0, 1000.0)) noise_suppression = bool(updates.get("noise_suppression", current.noise_suppression)) speech_gate = bool(updates.get("speech_gate", current.speech_gate)) hum_filter = bool(updates.get("hum_filter", current.hum_filter)) limiter = bool(updates.get("limiter", current.limiter)) beam_clarity = bool(updates.get("beam_clarity", current.beam_clarity)) hifi_mode = bool(updates.get("hifi_mode", current.hifi_mode)) hifi_mic = str(updates.get("hifi_mic", current.hifi_mic)) if hifi_mic not in ALLOWED_HIFI_MICS: hifi_mic = current.hifi_mic angle = float(updates.get("angle", current.angle)) angle = float(np.clip(angle, -90.0, 90.0)) auto_beam = bool(updates.get("auto_beam", current.auto_beam)) monitor_on = bool(updates.get("monitor_on", current.monitor_on)) monitor_source = str(updates.get("monitor_source", current.monitor_source)) if monitor_source not in ALLOWED_MONITOR_SOURCES: monitor_source = current.monitor_source if hifi_mode: monitor_on = False self._settings = AudioSettings( mode=mode, gain_db=gain_db, agc=agc, attack_ms=attack_ms, release_ms=release_ms, noise_suppression=noise_suppression, speech_gate=speech_gate, hum_filter=hum_filter, limiter=limiter, beam_clarity=beam_clarity, hifi_mode=hifi_mode, hifi_mic=hifi_mic, angle=angle, auto_beam=auto_beam, monitor_on=monitor_on, monitor_source=monitor_source, sample_rate=sample_rate, ) if not auto_beam: self._auto_angle_deg = angle if not monitor_on or hifi_mode: self._clear_monitor_queue_locked() self._agc_mic1.update( sample_rate=sample_rate, attack_ms=attack_ms, release_ms=release_ms, ) self._agc_mic2.update( sample_rate=sample_rate, attack_ms=attack_ms, release_ms=release_ms, ) self._agc_beam.update( sample_rate=sample_rate, attack_ms=attack_ms, release_ms=release_ms, ) return self.get_settings() def start_recording(self, source: str, duration_sec: float | None = None) -> dict[str, object]: source = source.lower().strip() if source not in ALLOWED_SOURCES: raise ValueError("Invalid recording source") if self._current_recording_status().recording: raise RuntimeError("Recording is already active") limit = None if duration_sec is not None: try: duration_value = float(duration_sec) except (TypeError, ValueError): duration_value = 0.0 if duration_value > 0.0: limit = float(np.clip(duration_value, 1.0, 3600.0)) with self._lock: self._record_duration_limit_sec = limit self._auto_stop_requested = False settings = self.get_settings() sample_rate = int(settings["sample_rate"]) hifi_mode = bool(settings.get("hifi_mode", False)) hifi_mic = str(settings.get("hifi_mic", "mic1")) if hifi_mic not in ALLOWED_HIFI_MICS: hifi_mic = "mic1" if hifi_mode: timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") filename = self._recorder.start( source="hifi_raw", sample_rate=self.HARDWARE_SAMPLE_RATE, channels=1, filename=f"rec_{timestamp}_hifi_{hifi_mic}_48k.wav", ) with self._lock: self._compare_recorders = {} self._compare_filenames = {} return { "source": "hifi_raw", "filenames": [filename], "duration_limit_sec": limit or 0.0, } if source == "hifi_raw": source = hifi_mic if source == "compare_all": timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") recorders = { "mic1": WavRecorder(self._recordings_dir), "mono_mix": WavRecorder(self._recordings_dir), "beam": WavRecorder(self._recordings_dir), } filenames = { "mic1": f"rec_{timestamp}_mic1.wav", "mono_mix": f"rec_{timestamp}_mono_mix.wav", "beam": f"rec_{timestamp}_beam.wav", } for key, recorder in recorders.items(): recorder.start( source=key, sample_rate=sample_rate, channels=1, filename=filenames[key], ) with self._lock: self._compare_recorders = recorders self._compare_filenames = filenames return { "source": source, "filenames": [filenames["mic1"], filenames["mono_mix"], filenames["beam"]], "duration_limit_sec": limit or 0.0, } channels = 1 filename = self._recorder.start( source=source, sample_rate=sample_rate, channels=channels, ) with self._lock: self._compare_recorders = {} self._compare_filenames = {} return { "source": source, "filenames": [filename], "duration_limit_sec": limit or 0.0, } def stop_recording(self) -> RecorderStatus: with self._lock: compare_recorders = self._compare_recorders compare_filenames = dict(self._compare_filenames) self._compare_recorders = {} self._compare_filenames = {} self._record_duration_limit_sec = None self._auto_stop_requested = False if compare_recorders: max_duration = 0.0 sample_rate = self._settings.sample_rate for recorder in compare_recorders.values(): status = recorder.stop() max_duration = max(max_duration, status.duration_sec) sample_rate = status.sample_rate return RecorderStatus( recording=False, filename=",".join(compare_filenames.values()), duration_sec=max_duration, channels=1, sample_rate=sample_rate, source="compare_all", ) return self._recorder.stop() def get_recording_status(self) -> RecorderStatus: return self._current_recording_status() def _current_recording_status(self) -> RecorderStatus: with self._lock: compare_recorders = dict(self._compare_recorders) compare_filenames = dict(self._compare_filenames) if compare_recorders: statuses = [recorder.get_status() for recorder in compare_recorders.values()] is_recording = any(status.recording for status in statuses) duration = max((status.duration_sec for status in statuses), default=0.0) sample_rate = statuses[0].sample_rate if statuses else self._settings.sample_rate return RecorderStatus( recording=is_recording, filename=",".join(compare_filenames.values()), duration_sec=duration, channels=1, sample_rate=sample_rate, source="compare_all", ) return self._recorder.get_status() def _open_stream(self) -> None: device_idx, device_name, input_channels = self._resolve_input_device() stream = sd.InputStream( samplerate=self.HARDWARE_SAMPLE_RATE, device=device_idx, channels=input_channels, dtype="int32", blocksize=self.CHUNK_SIZE, callback=self._audio_callback, ) stream.start() with self._lock: self._stream = stream self._stream_channels = input_channels self._input_device_name = device_name self._running = True self._startup_deadline = time.monotonic() + self.STARTUP_IGNORE_SECONDS def _restart_stream(self) -> None: with self._lock: stream = self._stream self._stream = None self._running = False if stream is not None: stream.stop() stream.close() self._open_stream() def _audio_callback(self, indata: np.ndarray, frames: int, time_info: dict, status: sd.CallbackFlags) -> None: del frames, time_info if status: return with self._lock: if not self._running: return if time.monotonic() < self._startup_deadline: return settings = self._settings stream_channels = self._stream_channels pcm32 = indata.astype(np.int32, copy=False) pcm24 = pcm32 >> 8 normalized = np.clip(pcm24.astype(np.float32) / 8388608.0, -1.0, 1.0) mic1_raw = normalized[:, 0] mic2_raw = normalized[:, 1] if stream_channels > 1 else mic1_raw.copy() processing_rate = self.HARDWARE_SAMPLE_RATE if settings.hifi_mode else settings.sample_rate if (not settings.hifi_mode) and settings.sample_rate != self.HARDWARE_SAMPLE_RATE: mic1_raw = self._resample_audio(mic1_raw, self.HARDWARE_SAMPLE_RATE, settings.sample_rate) mic2_raw = self._resample_audio(mic2_raw, self.HARDWARE_SAMPLE_RATE, settings.sample_rate) common_len = min(mic1_raw.size, mic2_raw.size) mic1_raw = mic1_raw[:common_len] mic2_raw = mic2_raw[:common_len] if settings.hifi_mode: hifi_mic = settings.hifi_mic if settings.hifi_mic in ALLOWED_HIFI_MICS else "mic1" hifi_signal = mic1_raw if hifi_mic == "mic1" else mic2_raw mic1_proc = hifi_signal mic2_proc = mic2_raw if hifi_mic == "mic1" else mic1_raw mono_mix_proc = hifi_signal beam_proc = hifi_signal angle_to_use = 0.0 speech_active = True gate_open = True else: gain_linear = 10.0 ** (settings.gain_db / 20.0) mic1_base = np.clip(mic1_raw * gain_linear, -1.0, 1.0) mic2_base = np.clip(mic2_raw * gain_linear, -1.0, 1.0) mono_base = np.clip(0.5 * (mic1_base + mic2_base), -1.0, 1.0) angle_to_use = settings.angle speech_detected = False if settings.mode == "beamforming" and settings.auto_beam: latest_done: Future | None = None pending: deque[Future] = deque() while self._angle_futures: fut = self._angle_futures.popleft() if fut.done(): latest_done = fut else: pending.append(fut) self._angle_futures = pending if latest_done is not None: try: angle_est, speech_est, noise_floor = latest_done.result(timeout=0) self._auto_angle_deg = float(angle_est) self._last_speech_detected = bool(speech_est) self._noise_floor = max(float(noise_floor), 1e-9) except Exception: pass self._angle_update_counter += 1 if ( self._angle_update_counter >= self._angle_update_interval and len(self._angle_futures) < self._max_pending_angle_jobs and self._angle_executor is not None ): self._angle_update_counter = 0 try: fut = self._angle_executor.submit( _estimate_speech_angle_job, mic1_base.astype(np.float32, copy=True), mic2_base.astype(np.float32, copy=True), int(processing_rate), float(self.MIC_SPACING), float(self._auto_angle_deg), float(self._noise_floor), ) self._angle_futures.append(fut) except Exception: pass angle_to_use = self._auto_angle_deg speech_detected = self._last_speech_detected else: while self._angle_futures: self._angle_futures.popleft().cancel() if not settings.auto_beam: self._auto_angle_deg = settings.angle speech_detected = False speech_presence = self._estimate_speech_presence_fast(mic1_base, mic2_base) speech_active = bool(speech_detected or speech_presence) beam_proc = beamform_delay_and_sum( mic1_base, mic2_base, angle_deg=angle_to_use, sample_rate=processing_rate, mic_spacing=self.MIC_SPACING, ) if settings.beam_clarity: beam_proc = self._apply_beam_clarity_blend(beam_proc, mono_base) gate_open, gate_gain = self._update_speech_gate( speech_detected=speech_active, sample_rate=processing_rate, chunk_len=max(beam_proc.size, 1), enabled=settings.speech_gate, ) if settings.agc: mic1_proc = self._agc_mic1.process(mic1_base, speech_hint=gate_open) mic2_proc = self._agc_mic2.process(mic2_base, speech_hint=gate_open) beam_proc = self._agc_beam.process(beam_proc, speech_hint=gate_open) else: mic1_proc = mic1_base mic2_proc = mic2_base mono_mix_proc = np.clip(0.5 * (mic1_proc + mic2_proc), -1.0, 1.0) if settings.hum_filter: mic1_proc = self._apply_hum_filter(mic1_proc, processing_rate, "mic1") mic2_proc = self._apply_hum_filter(mic2_proc, processing_rate, "mic2") beam_proc = self._apply_hum_filter(beam_proc, processing_rate, "beam") mono_mix_proc = np.clip(0.5 * (mic1_proc + mic2_proc), -1.0, 1.0) mono_mix_proc = self._apply_hum_filter(mono_mix_proc, processing_rate, "mono_mix") if settings.noise_suppression: mic1_proc = self._apply_noise_suppression(mic1_proc, "mic1", speech_active=gate_open) mic2_proc = self._apply_noise_suppression(mic2_proc, "mic2", speech_active=gate_open) mono_mix_proc = self._apply_noise_suppression(mono_mix_proc, "mono_mix", speech_active=gate_open) beam_proc = self._apply_noise_suppression(beam_proc, "beam", speech_active=gate_open) if settings.speech_gate: mic1_proc = np.clip(mic1_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False) mic2_proc = np.clip(mic2_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False) mono_mix_proc = np.clip(mono_mix_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False) beam_proc = np.clip(beam_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False) if settings.limiter: mic1_proc = self._apply_limiter(mic1_proc) mic2_proc = self._apply_limiter(mic2_proc) mono_mix_proc = self._apply_limiter(mono_mix_proc) beam_proc = self._apply_limiter(beam_proc) rec_status = self._current_recording_status() if rec_status.recording: self._write_recording_chunk(rec_status.source, mic1_raw, mic2_raw, mic1_proc, mono_mix_proc, beam_proc) rec_status = self._current_recording_status() should_auto_stop = False with self._lock: duration_limit = self._record_duration_limit_sec if ( rec_status.recording and duration_limit is not None and rec_status.duration_sec >= duration_limit and not self._auto_stop_requested ): self._auto_stop_requested = True should_auto_stop = True if should_auto_stop: threading.Thread(target=self.stop_recording, daemon=True).start() show_beam = (settings.mode == "beamforming") and (not settings.hifi_mode) show_mono_mix = (settings.mode == "mono_mix") and (not settings.hifi_mode) show_mic2 = not settings.hifi_mode if settings.monitor_on: monitor_map = { "mic1": mic1_proc, "mic2": mic2_proc, "mono_mix": mono_mix_proc, "beam": beam_proc, } monitor_signal = monitor_map.get(settings.monitor_source, beam_proc) with self._lock: self._push_monitor_chunk_locked(monitor_signal, processing_rate) frame = { "mic1": mic1_proc.astype(np.float32, copy=False), "mic2": mic2_proc.astype(np.float32, copy=False), "beam": beam_proc.astype(np.float32, copy=False), "mono_mix": mono_mix_proc.astype(np.float32, copy=False), "show_mic2": show_mic2, "show_beam": show_beam, "show_mono_mix": show_mono_mix, "beam_angle_deg": float(angle_to_use), "auto_beam": settings.auto_beam, "speech_detected": speech_active, "speech_gate_open": gate_open, "hifi_mode": settings.hifi_mode, "monitor_on": settings.monitor_on, "monitor_source": settings.monitor_source, "recording": rec_status.recording, "rec_duration": rec_status.duration_sec, } with self._lock: self._latest_frame = frame def _write_recording_chunk( self, source: str, mic1_raw: np.ndarray, mic2_raw: np.ndarray, mic1_proc: np.ndarray, mono_mix_proc: np.ndarray, beam_proc: np.ndarray, ) -> None: if source == "compare_all": with self._lock: mic1_rec = self._compare_recorders.get("mic1") mix_rec = self._compare_recorders.get("mono_mix") beam_rec = self._compare_recorders.get("beam") if mic1_rec is not None: mic1_rec.write(mic1_proc) if mix_rec is not None: mix_rec.write(mono_mix_proc) if beam_rec is not None: beam_rec.write(beam_proc) return if source == "mic1": self._recorder.write(mic1_raw) return if source == "mic2": self._recorder.write(mic2_raw) return if source == "mono_mix": self._recorder.write(mono_mix_proc) return if source == "beam": self._recorder.write(beam_proc) return if source == "hifi_raw": with self._lock: hifi_mic = self._settings.hifi_mic if hifi_mic == "mic2": self._recorder.write(mic2_raw) else: self._recorder.write(mic1_raw) return def _apply_beam_clarity_blend(self, beam: np.ndarray, mono: np.ndarray) -> np.ndarray: if beam.size == 0 or mono.size == 0: return beam.astype(np.float32, copy=False) n = min(beam.size, mono.size) if n <= 0: return beam.astype(np.float32, copy=False) blend = float(np.clip(self.BEAM_CLARITY_BLEND, 0.0, 0.5)) out = ((1.0 - blend) * beam[:n] + blend * mono[:n]).astype(np.float32, copy=False) prev = float(self._presence_prev.get("beam", 0.0)) hp = np.empty_like(out) hp[0] = out[0] - prev if out.size > 1: hp[1:] = out[1:] - out[:-1] self._presence_prev["beam"] = float(out[-1]) out = out + float(self.BEAM_PRESENCE_BOOST) * hp return np.clip(out, -1.0, 1.0).astype(np.float32, copy=False) def _update_speech_gate( self, *, speech_detected: bool, sample_rate: int, chunk_len: int, enabled: bool, ) -> tuple[bool, float]: if not enabled: self._speech_gate_hold_chunks = 0 self._speech_gate_gain = 1.0 return True, 1.0 chunk_seconds = max(float(chunk_len) / float(sample_rate), 1e-6) hold_chunks = max(1, int(round(self.SPEECH_GATE_HOLD_SECONDS / chunk_seconds))) if speech_detected: self._speech_gate_hold_chunks = hold_chunks elif self._speech_gate_hold_chunks > 0: self._speech_gate_hold_chunks -= 1 gate_open = bool(speech_detected or self._speech_gate_hold_chunks > 0) target_gain = 1.0 if gate_open else self.SPEECH_GATE_FLOOR tau = self.SPEECH_GATE_ATTACK_SECONDS if target_gain > self._speech_gate_gain else self.SPEECH_GATE_RELEASE_SECONDS coeff = np.exp(-chunk_seconds / max(tau, 1e-4)) self._speech_gate_gain = float(coeff * self._speech_gate_gain + (1.0 - coeff) * target_gain) return gate_open, self._speech_gate_gain def _apply_noise_suppression(self, audio: np.ndarray, key: str, *, speech_active: bool) -> np.ndarray: if audio.size == 0: return audio.astype(np.float32, copy=False) power = float(np.mean(audio * audio, dtype=np.float64)) prev_noise = float(self._ns_noise_power.get(key, 1e-7)) alpha = 0.01 if speech_active else 0.07 noise = (1.0 - alpha) * prev_noise + alpha * power noise = max(noise, 1e-9) self._ns_noise_power[key] = noise ratio = noise / max(power, 1e-9) if speech_active: strength = self.NOISE_SUPPRESS_OPEN_STRENGTH floor = self.NOISE_SUPPRESS_OPEN_FLOOR else: strength = self.NOISE_SUPPRESS_CLOSED_STRENGTH floor = self.NOISE_SUPPRESS_CLOSED_FLOOR gain = float(np.clip(1.0 - strength * ratio, floor, 1.0)) out = audio.astype(np.float32, copy=False) * gain return np.clip(out, -1.0, 1.0).astype(np.float32, copy=False) def _apply_hum_filter(self, audio: np.ndarray, sample_rate: int, channel_key: str) -> np.ndarray: if audio.size == 0: return audio.astype(np.float32, copy=False) out = audio.astype(np.float32, copy=False) state_key = (channel_key, int(sample_rate)) sos = self._get_hpf_sos(sample_rate) if sos is not None: zi = self._hpf_state.get(state_key) if zi is None or zi.shape != (sos.shape[0], 2): zi = np.zeros((sos.shape[0], 2), dtype=np.float32) out, zi_new = signal.sosfilt(sos, out, zi=zi) self._hpf_state[state_key] = zi_new.astype(np.float32, copy=False) out = out.astype(np.float32, copy=False) notch = self._get_notch_coeff(sample_rate) if notch is not None: b, a = notch zi = self._notch_state.get(state_key) expected = max(len(a), len(b)) - 1 if zi is None or zi.size != expected: zi = np.zeros(expected, dtype=np.float32) out, zi_new = signal.lfilter(b, a, out, zi=zi) self._notch_state[state_key] = zi_new.astype(np.float32, copy=False) out = out.astype(np.float32, copy=False) return out def _get_hpf_sos(self, sample_rate: int) -> np.ndarray | None: cached = self._hpf_sos_cache.get(sample_rate, "__missing__") if isinstance(cached, np.ndarray): return cached if cached is None: return None cutoff = min(self.HUM_HPF_CUTOFF_HZ, 0.45 * sample_rate) if cutoff < 20.0: self._hpf_sos_cache[sample_rate] = None return None try: sos = signal.butter(2, cutoff, btype="highpass", fs=sample_rate, output="sos") except ValueError: self._hpf_sos_cache[sample_rate] = None return None self._hpf_sos_cache[sample_rate] = sos return sos def _get_notch_coeff(self, sample_rate: int) -> tuple[np.ndarray, np.ndarray] | None: cached = self._notch_cache.get(sample_rate, "__missing__") if isinstance(cached, tuple): return cached if cached is None: return None nyquist = 0.5 * sample_rate if nyquist <= self.HUM_NOTCH_HZ * 1.2: self._notch_cache[sample_rate] = None return None w0 = self.HUM_NOTCH_HZ / nyquist try: b, a = signal.iirnotch(w0, self.HUM_NOTCH_Q) except ValueError: self._notch_cache[sample_rate] = None return None coeff = (b.astype(np.float32), a.astype(np.float32)) self._notch_cache[sample_rate] = coeff return coeff @staticmethod def _apply_limiter(audio: np.ndarray) -> np.ndarray: if audio.size == 0: return audio.astype(np.float32, copy=False) x = np.clip(audio.astype(np.float32, copy=False), -1.0, 1.0) threshold = 0.82 abs_x = np.abs(x) if not np.any(abs_x > threshold): return x.astype(np.float32, copy=False) out = x.copy() over = abs_x > threshold norm = (abs_x[over] - threshold) / max(1.0 - threshold, 1e-6) compressed = threshold + (1.0 - threshold) * (np.tanh(2.2 * norm) / np.tanh(2.2)) out[over] = np.sign(x[over]) * compressed return out.astype(np.float32, copy=False) @staticmethod def _downsample_for_ui(audio: np.ndarray, target_points: int = 320) -> np.ndarray: if audio.size <= target_points: return audio.astype(np.float32, copy=False) step = int(np.ceil(audio.size / target_points)) sampled = audio[::step] if sampled.size > target_points: sampled = sampled[:target_points] return sampled.astype(np.float32, copy=False) @staticmethod def _rms(audio: np.ndarray) -> float: if audio.size == 0: return 0.0 return float(np.sqrt(np.mean(np.square(audio), dtype=np.float64))) @staticmethod def _resample_audio(audio: np.ndarray, source_rate: int, target_rate: int) -> np.ndarray: if audio.size == 0 or source_rate == target_rate: return audio.astype(np.float32, copy=False) if source_rate == 48000 and target_rate == 16000: usable = audio.size - (audio.size % 3) if usable <= 0: return audio.astype(np.float32, copy=False) # Fast decimation-by-3 tuned for speech workloads on Zero 2W. grouped = audio[:usable].reshape(-1, 3) return grouped.mean(axis=1, dtype=np.float32).astype(np.float32, copy=False) gcd = math.gcd(source_rate, target_rate) up = target_rate // gcd down = source_rate // gcd resampled = signal.resample_poly(audio, up=up, down=down) return resampled.astype(np.float32, copy=False) def _resolve_input_device(self) -> tuple[int | None, str, int]: try: default_input = sd.default.device[0] if isinstance(default_input, int) and default_input >= 0: info = sd.query_devices(default_input, "input") max_inputs = int(info.get("max_input_channels", 0)) if max_inputs > 0: return int(default_input), str(info.get("name", f"device-{default_input}")), min(self.CHANNELS, max_inputs) except Exception: pass all_devices = sd.query_devices() preferred: tuple[int, dict] | None = None fallback: tuple[int, dict] | None = None for idx, raw_info in enumerate(all_devices): info = dict(raw_info) max_inputs = int(info.get("max_input_channels", 0)) if max_inputs <= 0: continue device = (idx, info) name = str(info.get("name", "")).lower() if max_inputs >= self.CHANNELS and any(tag in name for tag in ("google", "voicehat", "i2s", "mic")): preferred = device break if max_inputs >= self.CHANNELS and fallback is None: fallback = device if fallback is None: fallback = device chosen = preferred or fallback if chosen is None: raise RuntimeError("No audio input device available") idx, info = chosen max_inputs = int(info.get("max_input_channels", 1)) channels = min(self.CHANNELS, max_inputs) return int(idx), str(info.get("name", f"device-{idx}")), channels def _push_monitor_chunk_locked(self, chunk: np.ndarray, sample_rate: int) -> None: samples = chunk.astype(np.float32, copy=True) if samples.size == 0: return self._monitor_queue.append(samples) self._monitor_queue_samples += samples.size max_samples = max(sample_rate * 2, 2048) while self._monitor_queue_samples > max_samples and self._monitor_queue: dropped = self._monitor_queue.popleft() self._monitor_queue_samples -= dropped.size def _pop_monitor_chunk(self, max_samples: int) -> np.ndarray | None: if self._monitor_queue_samples <= 0 or not self._monitor_queue: return None take: list[np.ndarray] = [] collected = 0 while self._monitor_queue and collected < max_samples: chunk = self._monitor_queue[0] remaining = max_samples - collected if chunk.size <= remaining: take.append(chunk) collected += chunk.size self._monitor_queue.popleft() else: take.append(chunk[:remaining]) self._monitor_queue[0] = chunk[remaining:] collected += remaining break self._monitor_queue_samples -= collected if not take: return None if len(take) == 1: return take[0] return np.concatenate(take).astype(np.float32, copy=False) @staticmethod def _encode_pcm16_base64(audio: np.ndarray) -> str: pcm16 = (np.clip(audio, -1.0, 1.0) * 32767.0).astype(np.int16) return base64.b64encode(pcm16.tobytes()).decode("ascii") def _clear_monitor_queue_locked(self) -> None: self._monitor_queue.clear() self._monitor_queue_samples = 0 def _estimate_speech_presence_fast(self, mic1: np.ndarray, mic2: np.ndarray) -> bool: if mic1.size == 0 or mic2.size == 0: return False energy = 0.5 * (np.mean(mic1 * mic1) + np.mean(mic2 * mic2)) energy = float(max(energy, 1e-10)) if energy < self._vad_noise_floor: alpha = 0.05 else: alpha = 0.004 self._vad_noise_floor = (1.0 - alpha) * self._vad_noise_floor + alpha * energy threshold = max(2.2e-7, self._vad_noise_floor * 1.9) return bool(energy > threshold) @staticmethod def _gcc_phat(sig: np.ndarray, refsig: np.ndarray, sample_rate: int, max_tau: float) -> float: n = sig.size + refsig.size sig_fft = np.fft.rfft(sig, n=n) ref_fft = np.fft.rfft(refsig, n=n) cross = sig_fft * np.conj(ref_fft) denom = np.abs(cross) cross = cross / np.maximum(denom, 1e-10) cc = np.fft.irfft(cross, n=n) max_shift = int(min(n // 2, max_tau * sample_rate)) if max_shift <= 0: return 0.0 cc_window = np.concatenate((cc[-max_shift:], cc[: max_shift + 1])) shift = int(np.argmax(np.abs(cc_window)) - max_shift) return float(shift) / float(sample_rate) def _estimate_speech_angle(self, mic1: np.ndarray, mic2: np.ndarray, sample_rate: int) -> tuple[float, bool]: if mic1.size < 64 or mic2.size < 64: return self._auto_angle_deg, False high = min(3400.0, 0.45 * sample_rate) low = min(300.0, high * 0.5) if high <= low + 1.0: return self._auto_angle_deg, False sos = self._get_speech_sos(sample_rate, low, high) if sos is None: return self._auto_angle_deg, False speech1 = signal.sosfilt(sos, mic1).astype(np.float32, copy=False) speech2 = signal.sosfilt(sos, mic2).astype(np.float32, copy=False) speech_energy = 0.5 * (np.mean(speech1 * speech1) + np.mean(speech2 * speech2)) full_energy = 0.5 * (np.mean(mic1 * mic1) + np.mean(mic2 * mic2)) speech_ratio = float(speech_energy / max(full_energy, 1e-12)) self._noise_floor = 0.995 * self._noise_floor + 0.005 * float(speech_energy) speech_threshold = max(2.5e-7, self._noise_floor * 2.0) speech_detected = bool(speech_energy > speech_threshold and speech_ratio > 0.08) if not speech_detected: return self._auto_angle_deg, False max_tau = self.MIC_SPACING / 343.0 tau = self._gcc_phat(speech1, speech2, sample_rate, max_tau=max_tau) sin_theta = np.clip((tau * 343.0) / self.MIC_SPACING, -1.0, 1.0) raw_angle = float(np.rad2deg(np.arcsin(sin_theta))) raw_angle = float(np.clip(raw_angle, -90.0, 90.0)) self._auto_angle_deg = 0.88 * self._auto_angle_deg + 0.12 * raw_angle return self._auto_angle_deg, True def _get_speech_sos(self, sample_rate: int, low: float, high: float) -> np.ndarray | None: cached = self._speech_sos_cache.get(sample_rate, "__missing__") if isinstance(cached, np.ndarray): return cached if cached is None: return None try: sos = signal.butter(4, [low, high], btype="bandpass", fs=sample_rate, output="sos") except ValueError: self._speech_sos_cache[sample_rate] = None return None self._speech_sos_cache[sample_rate] = sos return sos @staticmethod def _make_empty_frame() -> dict[str, object]: empty = np.empty(0, dtype=np.float32) return { "mic1": empty, "mic2": empty, "beam": empty, "mono_mix": empty, "show_mic2": True, "show_beam": False, "show_mono_mix": False, "beam_angle_deg": 0.0, "auto_beam": True, "speech_detected": False, "speech_gate_open": True, "hifi_mode": False, "monitor_on": False, "monitor_source": "beam", "recording": False, "rec_duration": 0.0, }