| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168 |
- from __future__ import annotations
- from dataclasses import asdict, dataclass
- from datetime import datetime
- import base64
- from collections import deque
- from concurrent.futures import Future, ProcessPoolExecutor
- import math
- import os
- from pathlib import Path
- import threading
- import time
- import numpy as np
- from scipy import signal
- import sounddevice as sd
- from agc import AgcProcessor
- from beamforming import beamform_delay_and_sum
- from recorder import RecorderStatus, WavRecorder
- def _gcc_phat_job(sig: np.ndarray, refsig: np.ndarray, sample_rate: int, max_tau: float) -> float:
- n = sig.size + refsig.size
- sig_fft = np.fft.rfft(sig, n=n)
- ref_fft = np.fft.rfft(refsig, n=n)
- cross = sig_fft * np.conj(ref_fft)
- denom = np.abs(cross)
- cross = cross / np.maximum(denom, 1e-10)
- cc = np.fft.irfft(cross, n=n)
- max_shift = int(min(n // 2, max_tau * sample_rate))
- if max_shift <= 0:
- return 0.0
- cc_window = np.concatenate((cc[-max_shift:], cc[: max_shift + 1]))
- shift = int(np.argmax(np.abs(cc_window)) - max_shift)
- return float(shift) / float(sample_rate)
- def _estimate_speech_angle_job(
- mic1: np.ndarray,
- mic2: np.ndarray,
- sample_rate: int,
- mic_spacing: float,
- prev_angle_deg: float,
- prev_noise_floor: float,
- ) -> tuple[float, bool, float]:
- if mic1.size < 64 or mic2.size < 64:
- return float(prev_angle_deg), False, float(prev_noise_floor)
- high = min(3400.0, 0.45 * sample_rate)
- low = min(300.0, high * 0.5)
- if high <= low + 1.0:
- return float(prev_angle_deg), False, float(prev_noise_floor)
- try:
- sos = signal.butter(4, [low, high], btype="bandpass", fs=sample_rate, output="sos")
- except ValueError:
- return float(prev_angle_deg), False, float(prev_noise_floor)
- speech1 = signal.sosfilt(sos, mic1).astype(np.float32, copy=False)
- speech2 = signal.sosfilt(sos, mic2).astype(np.float32, copy=False)
- speech_energy = 0.5 * (np.mean(speech1 * speech1) + np.mean(speech2 * speech2))
- full_energy = 0.5 * (np.mean(mic1 * mic1) + np.mean(mic2 * mic2))
- speech_ratio = float(speech_energy / max(full_energy, 1e-12))
- noise_floor = 0.995 * float(prev_noise_floor) + 0.005 * float(speech_energy)
- speech_threshold = max(2.5e-7, noise_floor * 2.0)
- speech_detected = bool(speech_energy > speech_threshold and speech_ratio > 0.08)
- if not speech_detected:
- return float(prev_angle_deg), False, float(noise_floor)
- max_tau = mic_spacing / 343.0
- tau = _gcc_phat_job(speech1, speech2, sample_rate, max_tau=max_tau)
- sin_theta = np.clip((tau * 343.0) / max(mic_spacing, 1e-6), -1.0, 1.0)
- raw_angle = float(np.rad2deg(np.arcsin(sin_theta)))
- raw_angle = float(np.clip(raw_angle, -90.0, 90.0))
- angle = 0.88 * float(prev_angle_deg) + 0.12 * raw_angle
- return float(angle), True, float(noise_floor)
- ALLOWED_SAMPLE_RATES = {16000, 22050, 24000, 32000, 48000}
- ALLOWED_MODES = {"mic1", "mic2", "mono_mix", "beamforming"}
- ALLOWED_SOURCES = {"mic1", "mic2", "mono_mix", "beam", "compare_all", "hifi_raw"}
- ALLOWED_MONITOR_SOURCES = {"mic1", "mic2", "mono_mix", "beam"}
- ALLOWED_HIFI_MICS = {"mic1", "mic2"}
- @dataclass
- class AudioSettings:
- mode: str = "beamforming"
- gain_db: float = 0.0
- agc: bool = True
- attack_ms: float = 6.0
- release_ms: float = 280.0
- noise_suppression: bool = True
- speech_gate: bool = False
- hum_filter: bool = True
- limiter: bool = True
- beam_clarity: bool = True
- hifi_mode: bool = False
- hifi_mic: str = "mic1"
- angle: float = 0.0
- auto_beam: bool = True
- monitor_on: bool = False
- monitor_source: str = "beam"
- sample_rate: int = 16000
- class AudioEngine:
- CHANNELS = 2
- BIT_DEPTH = 32
- CHUNK_SIZE = 2048
- HARDWARE_SAMPLE_RATE = 48000
- RING_DIAMETER_M = 0.06
- RING_SLOTS = 4
- SLOT_STEP = 1 # neighboring slots (90 degrees). Use 2 for opposite mics.
- MIC_SPACING = RING_DIAMETER_M * math.sin(math.pi * SLOT_STEP / RING_SLOTS)
- STARTUP_IGNORE_SECONDS = 0.30
- SPEECH_GATE_HOLD_SECONDS = 0.85
- SPEECH_GATE_FLOOR = 0.55
- SPEECH_GATE_ATTACK_SECONDS = 0.012
- SPEECH_GATE_RELEASE_SECONDS = 0.360
- NOISE_SUPPRESS_OPEN_FLOOR = 0.72
- NOISE_SUPPRESS_CLOSED_FLOOR = 0.40
- NOISE_SUPPRESS_OPEN_STRENGTH = 0.30
- NOISE_SUPPRESS_CLOSED_STRENGTH = 0.55
- HUM_HPF_CUTOFF_HZ = 75.0
- HUM_NOTCH_HZ = 50.0
- HUM_NOTCH_Q = 22.0
- BEAM_CLARITY_BLEND = 0.22
- BEAM_PRESENCE_BOOST = 0.20
- def __init__(self, recordings_dir: Path) -> None:
- self._lock = threading.Lock()
- self._stream: sd.InputStream | None = None
- self._stream_channels = self.CHANNELS
- self._input_device_name = "unknown"
- self._settings = AudioSettings()
- self._running = False
- self._startup_deadline = 0.0
- self._auto_angle_deg = 0.0
- self._noise_floor = 1e-7
- self._vad_noise_floor = 1e-7
- self._speech_sos_cache: dict[int, np.ndarray | None] = {}
- self._hpf_sos_cache: dict[int, np.ndarray | None] = {}
- self._notch_cache: dict[int, tuple[np.ndarray, np.ndarray] | None] = {}
- self._hpf_state: dict[tuple[str, int], np.ndarray] = {}
- self._notch_state: dict[tuple[str, int], np.ndarray] = {}
- self._last_speech_detected = False
- self._angle_update_counter = 0
- self._angle_update_interval = 4
- cpu_total = max(1, int(os.cpu_count() or 1))
- self._cpu_workers = max(1, min(4, cpu_total))
- self._angle_workers = max(1, min(3, self._cpu_workers - 1))
- self._max_pending_angle_jobs = max(1, min(2, self._angle_workers))
- self._angle_executor: ProcessPoolExecutor | None = ProcessPoolExecutor(max_workers=self._angle_workers)
- self._angle_futures: deque[Future] = deque()
- self._speech_gate_hold_chunks = 0
- self._speech_gate_gain = 1.0
- self._ns_noise_power = {
- "mic1": 1e-7,
- "mic2": 1e-7,
- "mono_mix": 1e-7,
- "beam": 1e-7,
- }
- self._presence_prev = {"beam": 0.0}
- self._recordings_dir = Path(recordings_dir)
- self._recorder = WavRecorder(recordings_dir)
- self._compare_recorders: dict[str, WavRecorder] = {}
- self._compare_filenames: dict[str, str] = {}
- self._record_duration_limit_sec: float | None = None
- self._auto_stop_requested = False
- self._monitor_queue: deque[np.ndarray] = deque()
- self._monitor_queue_samples = 0
- self._agc_mic1 = AgcProcessor(self._settings.sample_rate)
- self._agc_mic2 = AgcProcessor(self._settings.sample_rate)
- self._agc_beam = AgcProcessor(self._settings.sample_rate)
- self._latest_frame: dict[str, object] = self._make_empty_frame()
- def start(self) -> None:
- with self._lock:
- if self._running:
- return
- self._open_stream()
- def stop(self) -> None:
- with self._lock:
- stream = self._stream
- self._stream = None
- self._running = False
- angle_executor = self._angle_executor
- self._angle_executor = None
- angle_futures = list(self._angle_futures)
- self._angle_futures.clear()
- if stream is not None:
- stream.stop()
- stream.close()
- self.stop_recording()
- for fut in angle_futures:
- fut.cancel()
- if angle_executor is not None:
- angle_executor.shutdown(wait=False, cancel_futures=True)
- def is_running(self) -> bool:
- with self._lock:
- return self._running
- def get_settings(self) -> dict[str, object]:
- with self._lock:
- return asdict(self._settings)
- def get_status(self) -> dict[str, object]:
- rec_status = self._current_recording_status()
- return {
- "recording": rec_status.recording,
- "mic_count": self.CHANNELS,
- "hardware_sample_rate": self.HARDWARE_SAMPLE_RATE,
- "mic_spacing_m": self.MIC_SPACING,
- "auto_beam_angle_deg": self._auto_angle_deg,
- "input_device": self._input_device_name,
- "settings": self.get_settings(),
- "recording_status": asdict(rec_status),
- "audio_running": self.is_running(),
- }
- def get_latest_packet(self) -> dict[str, object]:
- empty = np.empty(0, dtype=np.float32)
- with self._lock:
- frame = dict(self._latest_frame)
- mic1 = np.asarray(frame.get("mic1", empty), dtype=np.float32)
- mic2 = np.asarray(frame.get("mic2", empty), dtype=np.float32)
- beam = np.asarray(frame.get("beam", empty), dtype=np.float32)
- mono_mix = np.asarray(frame.get("mono_mix", empty), dtype=np.float32)
- show_mic2 = bool(frame.get("show_mic2", True))
- show_beam = bool(frame.get("show_beam", False))
- show_mono_mix = bool(frame.get("show_mono_mix", False))
- beam_angle_deg = float(frame.get("beam_angle_deg", 0.0))
- auto_beam = bool(frame.get("auto_beam", True))
- speech_detected = bool(frame.get("speech_detected", False))
- speech_gate_open = bool(frame.get("speech_gate_open", True))
- hifi_mode = bool(frame.get("hifi_mode", False))
- monitor_on = bool(frame.get("monitor_on", False))
- monitor_source = str(frame.get("monitor_source", "beam"))
- recording = bool(frame.get("recording", False))
- rec_duration = float(frame.get("rec_duration", 0.0))
- monitor_rate = int(self.HARDWARE_SAMPLE_RATE if hifi_mode else self._settings.sample_rate)
- monitor_chunk = None
- if monitor_on:
- monitor_chunk = self._pop_monitor_chunk(max_samples=max(512, int(monitor_rate * 0.08)))
- packet = {
- "type": "audio_data",
- "mic1": self._downsample_for_ui(mic1).tolist(),
- "mic2": self._downsample_for_ui(mic2).tolist() if show_mic2 else [],
- "beam": self._downsample_for_ui(beam).tolist() if show_beam else [],
- "mono_mix": self._downsample_for_ui(mono_mix).tolist() if show_mono_mix else [],
- "rms_mic1": self._rms(mic1),
- "rms_mic2": self._rms(mic2) if show_mic2 else 0.0,
- "rms_beam": self._rms(beam) if show_beam else 0.0,
- "rms_mono_mix": self._rms(mono_mix) if show_mono_mix else 0.0,
- "beam_angle_deg": beam_angle_deg,
- "auto_beam": auto_beam,
- "speech_detected": speech_detected,
- "speech_gate_open": speech_gate_open,
- "hifi_mode": hifi_mode,
- "monitor_on": monitor_on,
- "monitor_source": monitor_source,
- "monitor_sr": monitor_rate,
- "monitor_chunk_b64": "",
- "recording": recording,
- "rec_duration": rec_duration,
- }
- if monitor_chunk is not None and monitor_chunk.size > 0:
- packet["monitor_chunk_b64"] = self._encode_pcm16_base64(monitor_chunk)
- return packet
- def update_settings(self, updates: dict[str, object]) -> dict[str, object]:
- with self._lock:
- current = self._settings
- mode = str(updates.get("mode", current.mode))
- if mode not in ALLOWED_MODES:
- mode = current.mode
- sample_rate = int(updates.get("sample_rate", current.sample_rate))
- if sample_rate not in ALLOWED_SAMPLE_RATES:
- sample_rate = current.sample_rate
- gain_db = float(updates.get("gain_db", current.gain_db))
- gain_db = float(np.clip(gain_db, 0.0, 30.0))
- agc = bool(updates.get("agc", current.agc))
- attack_ms = float(updates.get("attack_ms", current.attack_ms))
- attack_ms = float(np.clip(attack_ms, 1.0, 50.0))
- release_ms = float(updates.get("release_ms", current.release_ms))
- release_ms = float(np.clip(release_ms, 50.0, 1000.0))
- noise_suppression = bool(updates.get("noise_suppression", current.noise_suppression))
- speech_gate = bool(updates.get("speech_gate", current.speech_gate))
- hum_filter = bool(updates.get("hum_filter", current.hum_filter))
- limiter = bool(updates.get("limiter", current.limiter))
- beam_clarity = bool(updates.get("beam_clarity", current.beam_clarity))
- hifi_mode = bool(updates.get("hifi_mode", current.hifi_mode))
- hifi_mic = str(updates.get("hifi_mic", current.hifi_mic))
- if hifi_mic not in ALLOWED_HIFI_MICS:
- hifi_mic = current.hifi_mic
- angle = float(updates.get("angle", current.angle))
- angle = float(np.clip(angle, -90.0, 90.0))
- auto_beam = bool(updates.get("auto_beam", current.auto_beam))
- monitor_on = bool(updates.get("monitor_on", current.monitor_on))
- monitor_source = str(updates.get("monitor_source", current.monitor_source))
- if monitor_source not in ALLOWED_MONITOR_SOURCES:
- monitor_source = current.monitor_source
- if hifi_mode:
- monitor_on = False
- self._settings = AudioSettings(
- mode=mode,
- gain_db=gain_db,
- agc=agc,
- attack_ms=attack_ms,
- release_ms=release_ms,
- noise_suppression=noise_suppression,
- speech_gate=speech_gate,
- hum_filter=hum_filter,
- limiter=limiter,
- beam_clarity=beam_clarity,
- hifi_mode=hifi_mode,
- hifi_mic=hifi_mic,
- angle=angle,
- auto_beam=auto_beam,
- monitor_on=monitor_on,
- monitor_source=monitor_source,
- sample_rate=sample_rate,
- )
- if not auto_beam:
- self._auto_angle_deg = angle
- if not monitor_on or hifi_mode:
- self._clear_monitor_queue_locked()
- self._agc_mic1.update(
- sample_rate=sample_rate,
- attack_ms=attack_ms,
- release_ms=release_ms,
- )
- self._agc_mic2.update(
- sample_rate=sample_rate,
- attack_ms=attack_ms,
- release_ms=release_ms,
- )
- self._agc_beam.update(
- sample_rate=sample_rate,
- attack_ms=attack_ms,
- release_ms=release_ms,
- )
- return self.get_settings()
- def start_recording(self, source: str, duration_sec: float | None = None) -> dict[str, object]:
- source = source.lower().strip()
- if source not in ALLOWED_SOURCES:
- raise ValueError("Invalid recording source")
- if self._current_recording_status().recording:
- raise RuntimeError("Recording is already active")
- limit = None
- if duration_sec is not None:
- try:
- duration_value = float(duration_sec)
- except (TypeError, ValueError):
- duration_value = 0.0
- if duration_value > 0.0:
- limit = float(np.clip(duration_value, 1.0, 3600.0))
- with self._lock:
- self._record_duration_limit_sec = limit
- self._auto_stop_requested = False
- settings = self.get_settings()
- sample_rate = int(settings["sample_rate"])
- hifi_mode = bool(settings.get("hifi_mode", False))
- hifi_mic = str(settings.get("hifi_mic", "mic1"))
- if hifi_mic not in ALLOWED_HIFI_MICS:
- hifi_mic = "mic1"
- if hifi_mode:
- timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- filename = self._recorder.start(
- source="hifi_raw",
- sample_rate=self.HARDWARE_SAMPLE_RATE,
- channels=1,
- filename=f"rec_{timestamp}_hifi_{hifi_mic}_48k.wav",
- )
- with self._lock:
- self._compare_recorders = {}
- self._compare_filenames = {}
- return {
- "source": "hifi_raw",
- "filenames": [filename],
- "duration_limit_sec": limit or 0.0,
- }
- if source == "hifi_raw":
- source = hifi_mic
- if source == "compare_all":
- timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- recorders = {
- "mic1": WavRecorder(self._recordings_dir),
- "mono_mix": WavRecorder(self._recordings_dir),
- "beam": WavRecorder(self._recordings_dir),
- }
- filenames = {
- "mic1": f"rec_{timestamp}_mic1.wav",
- "mono_mix": f"rec_{timestamp}_mono_mix.wav",
- "beam": f"rec_{timestamp}_beam.wav",
- }
- for key, recorder in recorders.items():
- recorder.start(
- source=key,
- sample_rate=sample_rate,
- channels=1,
- filename=filenames[key],
- )
- with self._lock:
- self._compare_recorders = recorders
- self._compare_filenames = filenames
- return {
- "source": source,
- "filenames": [filenames["mic1"], filenames["mono_mix"], filenames["beam"]],
- "duration_limit_sec": limit or 0.0,
- }
- channels = 1
- filename = self._recorder.start(
- source=source,
- sample_rate=sample_rate,
- channels=channels,
- )
- with self._lock:
- self._compare_recorders = {}
- self._compare_filenames = {}
- return {
- "source": source,
- "filenames": [filename],
- "duration_limit_sec": limit or 0.0,
- }
- def stop_recording(self) -> RecorderStatus:
- with self._lock:
- compare_recorders = self._compare_recorders
- compare_filenames = dict(self._compare_filenames)
- self._compare_recorders = {}
- self._compare_filenames = {}
- self._record_duration_limit_sec = None
- self._auto_stop_requested = False
- if compare_recorders:
- max_duration = 0.0
- sample_rate = self._settings.sample_rate
- for recorder in compare_recorders.values():
- status = recorder.stop()
- max_duration = max(max_duration, status.duration_sec)
- sample_rate = status.sample_rate
- return RecorderStatus(
- recording=False,
- filename=",".join(compare_filenames.values()),
- duration_sec=max_duration,
- channels=1,
- sample_rate=sample_rate,
- source="compare_all",
- )
- return self._recorder.stop()
- def get_recording_status(self) -> RecorderStatus:
- return self._current_recording_status()
- def _current_recording_status(self) -> RecorderStatus:
- with self._lock:
- compare_recorders = dict(self._compare_recorders)
- compare_filenames = dict(self._compare_filenames)
- if compare_recorders:
- statuses = [recorder.get_status() for recorder in compare_recorders.values()]
- is_recording = any(status.recording for status in statuses)
- duration = max((status.duration_sec for status in statuses), default=0.0)
- sample_rate = statuses[0].sample_rate if statuses else self._settings.sample_rate
- return RecorderStatus(
- recording=is_recording,
- filename=",".join(compare_filenames.values()),
- duration_sec=duration,
- channels=1,
- sample_rate=sample_rate,
- source="compare_all",
- )
- return self._recorder.get_status()
- def _open_stream(self) -> None:
- device_idx, device_name, input_channels = self._resolve_input_device()
- stream = sd.InputStream(
- samplerate=self.HARDWARE_SAMPLE_RATE,
- device=device_idx,
- channels=input_channels,
- dtype="int32",
- blocksize=self.CHUNK_SIZE,
- callback=self._audio_callback,
- )
- stream.start()
- with self._lock:
- self._stream = stream
- self._stream_channels = input_channels
- self._input_device_name = device_name
- self._running = True
- self._startup_deadline = time.monotonic() + self.STARTUP_IGNORE_SECONDS
- def _restart_stream(self) -> None:
- with self._lock:
- stream = self._stream
- self._stream = None
- self._running = False
- if stream is not None:
- stream.stop()
- stream.close()
- self._open_stream()
- def _audio_callback(self, indata: np.ndarray, frames: int, time_info: dict, status: sd.CallbackFlags) -> None:
- del frames, time_info
- if status:
- return
- with self._lock:
- if not self._running:
- return
- if time.monotonic() < self._startup_deadline:
- return
- settings = self._settings
- stream_channels = self._stream_channels
- pcm32 = indata.astype(np.int32, copy=False)
- pcm24 = pcm32 >> 8
- normalized = np.clip(pcm24.astype(np.float32) / 8388608.0, -1.0, 1.0)
- mic1_raw = normalized[:, 0]
- mic2_raw = normalized[:, 1] if stream_channels > 1 else mic1_raw.copy()
- processing_rate = self.HARDWARE_SAMPLE_RATE if settings.hifi_mode else settings.sample_rate
- if (not settings.hifi_mode) and settings.sample_rate != self.HARDWARE_SAMPLE_RATE:
- mic1_raw = self._resample_audio(mic1_raw, self.HARDWARE_SAMPLE_RATE, settings.sample_rate)
- mic2_raw = self._resample_audio(mic2_raw, self.HARDWARE_SAMPLE_RATE, settings.sample_rate)
- common_len = min(mic1_raw.size, mic2_raw.size)
- mic1_raw = mic1_raw[:common_len]
- mic2_raw = mic2_raw[:common_len]
- if settings.hifi_mode:
- hifi_mic = settings.hifi_mic if settings.hifi_mic in ALLOWED_HIFI_MICS else "mic1"
- hifi_signal = mic1_raw if hifi_mic == "mic1" else mic2_raw
- mic1_proc = hifi_signal
- mic2_proc = mic2_raw if hifi_mic == "mic1" else mic1_raw
- mono_mix_proc = hifi_signal
- beam_proc = hifi_signal
- angle_to_use = 0.0
- speech_active = True
- gate_open = True
- else:
- gain_linear = 10.0 ** (settings.gain_db / 20.0)
- mic1_base = np.clip(mic1_raw * gain_linear, -1.0, 1.0)
- mic2_base = np.clip(mic2_raw * gain_linear, -1.0, 1.0)
- mono_base = np.clip(0.5 * (mic1_base + mic2_base), -1.0, 1.0)
- angle_to_use = settings.angle
- speech_detected = False
- if settings.mode == "beamforming" and settings.auto_beam:
- latest_done: Future | None = None
- pending: deque[Future] = deque()
- while self._angle_futures:
- fut = self._angle_futures.popleft()
- if fut.done():
- latest_done = fut
- else:
- pending.append(fut)
- self._angle_futures = pending
- if latest_done is not None:
- try:
- angle_est, speech_est, noise_floor = latest_done.result(timeout=0)
- self._auto_angle_deg = float(angle_est)
- self._last_speech_detected = bool(speech_est)
- self._noise_floor = max(float(noise_floor), 1e-9)
- except Exception:
- pass
- self._angle_update_counter += 1
- if (
- self._angle_update_counter >= self._angle_update_interval
- and len(self._angle_futures) < self._max_pending_angle_jobs
- and self._angle_executor is not None
- ):
- self._angle_update_counter = 0
- try:
- fut = self._angle_executor.submit(
- _estimate_speech_angle_job,
- mic1_base.astype(np.float32, copy=True),
- mic2_base.astype(np.float32, copy=True),
- int(processing_rate),
- float(self.MIC_SPACING),
- float(self._auto_angle_deg),
- float(self._noise_floor),
- )
- self._angle_futures.append(fut)
- except Exception:
- pass
- angle_to_use = self._auto_angle_deg
- speech_detected = self._last_speech_detected
- else:
- while self._angle_futures:
- self._angle_futures.popleft().cancel()
- if not settings.auto_beam:
- self._auto_angle_deg = settings.angle
- speech_detected = False
- speech_presence = self._estimate_speech_presence_fast(mic1_base, mic2_base)
- speech_active = bool(speech_detected or speech_presence)
- beam_proc = beamform_delay_and_sum(
- mic1_base,
- mic2_base,
- angle_deg=angle_to_use,
- sample_rate=processing_rate,
- mic_spacing=self.MIC_SPACING,
- )
- if settings.beam_clarity:
- beam_proc = self._apply_beam_clarity_blend(beam_proc, mono_base)
- gate_open, gate_gain = self._update_speech_gate(
- speech_detected=speech_active,
- sample_rate=processing_rate,
- chunk_len=max(beam_proc.size, 1),
- enabled=settings.speech_gate,
- )
- if settings.agc:
- mic1_proc = self._agc_mic1.process(mic1_base, speech_hint=gate_open)
- mic2_proc = self._agc_mic2.process(mic2_base, speech_hint=gate_open)
- beam_proc = self._agc_beam.process(beam_proc, speech_hint=gate_open)
- else:
- mic1_proc = mic1_base
- mic2_proc = mic2_base
- mono_mix_proc = np.clip(0.5 * (mic1_proc + mic2_proc), -1.0, 1.0)
- if settings.hum_filter:
- mic1_proc = self._apply_hum_filter(mic1_proc, processing_rate, "mic1")
- mic2_proc = self._apply_hum_filter(mic2_proc, processing_rate, "mic2")
- beam_proc = self._apply_hum_filter(beam_proc, processing_rate, "beam")
- mono_mix_proc = np.clip(0.5 * (mic1_proc + mic2_proc), -1.0, 1.0)
- mono_mix_proc = self._apply_hum_filter(mono_mix_proc, processing_rate, "mono_mix")
- if settings.noise_suppression:
- mic1_proc = self._apply_noise_suppression(mic1_proc, "mic1", speech_active=gate_open)
- mic2_proc = self._apply_noise_suppression(mic2_proc, "mic2", speech_active=gate_open)
- mono_mix_proc = self._apply_noise_suppression(mono_mix_proc, "mono_mix", speech_active=gate_open)
- beam_proc = self._apply_noise_suppression(beam_proc, "beam", speech_active=gate_open)
- if settings.speech_gate:
- mic1_proc = np.clip(mic1_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False)
- mic2_proc = np.clip(mic2_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False)
- mono_mix_proc = np.clip(mono_mix_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False)
- beam_proc = np.clip(beam_proc * gate_gain, -1.0, 1.0).astype(np.float32, copy=False)
- if settings.limiter:
- mic1_proc = self._apply_limiter(mic1_proc)
- mic2_proc = self._apply_limiter(mic2_proc)
- mono_mix_proc = self._apply_limiter(mono_mix_proc)
- beam_proc = self._apply_limiter(beam_proc)
- rec_status = self._current_recording_status()
- if rec_status.recording:
- self._write_recording_chunk(rec_status.source, mic1_raw, mic2_raw, mic1_proc, mono_mix_proc, beam_proc)
- rec_status = self._current_recording_status()
- should_auto_stop = False
- with self._lock:
- duration_limit = self._record_duration_limit_sec
- if (
- rec_status.recording
- and duration_limit is not None
- and rec_status.duration_sec >= duration_limit
- and not self._auto_stop_requested
- ):
- self._auto_stop_requested = True
- should_auto_stop = True
- if should_auto_stop:
- threading.Thread(target=self.stop_recording, daemon=True).start()
- show_beam = (settings.mode == "beamforming") and (not settings.hifi_mode)
- show_mono_mix = (settings.mode == "mono_mix") and (not settings.hifi_mode)
- show_mic2 = not settings.hifi_mode
- if settings.monitor_on:
- monitor_map = {
- "mic1": mic1_proc,
- "mic2": mic2_proc,
- "mono_mix": mono_mix_proc,
- "beam": beam_proc,
- }
- monitor_signal = monitor_map.get(settings.monitor_source, beam_proc)
- with self._lock:
- self._push_monitor_chunk_locked(monitor_signal, processing_rate)
- frame = {
- "mic1": mic1_proc.astype(np.float32, copy=False),
- "mic2": mic2_proc.astype(np.float32, copy=False),
- "beam": beam_proc.astype(np.float32, copy=False),
- "mono_mix": mono_mix_proc.astype(np.float32, copy=False),
- "show_mic2": show_mic2,
- "show_beam": show_beam,
- "show_mono_mix": show_mono_mix,
- "beam_angle_deg": float(angle_to_use),
- "auto_beam": settings.auto_beam,
- "speech_detected": speech_active,
- "speech_gate_open": gate_open,
- "hifi_mode": settings.hifi_mode,
- "monitor_on": settings.monitor_on,
- "monitor_source": settings.monitor_source,
- "recording": rec_status.recording,
- "rec_duration": rec_status.duration_sec,
- }
- with self._lock:
- self._latest_frame = frame
- def _write_recording_chunk(
- self,
- source: str,
- mic1_raw: np.ndarray,
- mic2_raw: np.ndarray,
- mic1_proc: np.ndarray,
- mono_mix_proc: np.ndarray,
- beam_proc: np.ndarray,
- ) -> None:
- if source == "compare_all":
- with self._lock:
- mic1_rec = self._compare_recorders.get("mic1")
- mix_rec = self._compare_recorders.get("mono_mix")
- beam_rec = self._compare_recorders.get("beam")
- if mic1_rec is not None:
- mic1_rec.write(mic1_proc)
- if mix_rec is not None:
- mix_rec.write(mono_mix_proc)
- if beam_rec is not None:
- beam_rec.write(beam_proc)
- return
- if source == "mic1":
- self._recorder.write(mic1_raw)
- return
- if source == "mic2":
- self._recorder.write(mic2_raw)
- return
- if source == "mono_mix":
- self._recorder.write(mono_mix_proc)
- return
- if source == "beam":
- self._recorder.write(beam_proc)
- return
- if source == "hifi_raw":
- with self._lock:
- hifi_mic = self._settings.hifi_mic
- if hifi_mic == "mic2":
- self._recorder.write(mic2_raw)
- else:
- self._recorder.write(mic1_raw)
- return
- def _apply_beam_clarity_blend(self, beam: np.ndarray, mono: np.ndarray) -> np.ndarray:
- if beam.size == 0 or mono.size == 0:
- return beam.astype(np.float32, copy=False)
- n = min(beam.size, mono.size)
- if n <= 0:
- return beam.astype(np.float32, copy=False)
- blend = float(np.clip(self.BEAM_CLARITY_BLEND, 0.0, 0.5))
- out = ((1.0 - blend) * beam[:n] + blend * mono[:n]).astype(np.float32, copy=False)
- prev = float(self._presence_prev.get("beam", 0.0))
- hp = np.empty_like(out)
- hp[0] = out[0] - prev
- if out.size > 1:
- hp[1:] = out[1:] - out[:-1]
- self._presence_prev["beam"] = float(out[-1])
- out = out + float(self.BEAM_PRESENCE_BOOST) * hp
- return np.clip(out, -1.0, 1.0).astype(np.float32, copy=False)
- def _update_speech_gate(
- self,
- *,
- speech_detected: bool,
- sample_rate: int,
- chunk_len: int,
- enabled: bool,
- ) -> tuple[bool, float]:
- if not enabled:
- self._speech_gate_hold_chunks = 0
- self._speech_gate_gain = 1.0
- return True, 1.0
- chunk_seconds = max(float(chunk_len) / float(sample_rate), 1e-6)
- hold_chunks = max(1, int(round(self.SPEECH_GATE_HOLD_SECONDS / chunk_seconds)))
- if speech_detected:
- self._speech_gate_hold_chunks = hold_chunks
- elif self._speech_gate_hold_chunks > 0:
- self._speech_gate_hold_chunks -= 1
- gate_open = bool(speech_detected or self._speech_gate_hold_chunks > 0)
- target_gain = 1.0 if gate_open else self.SPEECH_GATE_FLOOR
- tau = self.SPEECH_GATE_ATTACK_SECONDS if target_gain > self._speech_gate_gain else self.SPEECH_GATE_RELEASE_SECONDS
- coeff = np.exp(-chunk_seconds / max(tau, 1e-4))
- self._speech_gate_gain = float(coeff * self._speech_gate_gain + (1.0 - coeff) * target_gain)
- return gate_open, self._speech_gate_gain
- def _apply_noise_suppression(self, audio: np.ndarray, key: str, *, speech_active: bool) -> np.ndarray:
- if audio.size == 0:
- return audio.astype(np.float32, copy=False)
- power = float(np.mean(audio * audio, dtype=np.float64))
- prev_noise = float(self._ns_noise_power.get(key, 1e-7))
- alpha = 0.01 if speech_active else 0.07
- noise = (1.0 - alpha) * prev_noise + alpha * power
- noise = max(noise, 1e-9)
- self._ns_noise_power[key] = noise
- ratio = noise / max(power, 1e-9)
- if speech_active:
- strength = self.NOISE_SUPPRESS_OPEN_STRENGTH
- floor = self.NOISE_SUPPRESS_OPEN_FLOOR
- else:
- strength = self.NOISE_SUPPRESS_CLOSED_STRENGTH
- floor = self.NOISE_SUPPRESS_CLOSED_FLOOR
- gain = float(np.clip(1.0 - strength * ratio, floor, 1.0))
- out = audio.astype(np.float32, copy=False) * gain
- return np.clip(out, -1.0, 1.0).astype(np.float32, copy=False)
- def _apply_hum_filter(self, audio: np.ndarray, sample_rate: int, channel_key: str) -> np.ndarray:
- if audio.size == 0:
- return audio.astype(np.float32, copy=False)
- out = audio.astype(np.float32, copy=False)
- state_key = (channel_key, int(sample_rate))
- sos = self._get_hpf_sos(sample_rate)
- if sos is not None:
- zi = self._hpf_state.get(state_key)
- if zi is None or zi.shape != (sos.shape[0], 2):
- zi = np.zeros((sos.shape[0], 2), dtype=np.float32)
- out, zi_new = signal.sosfilt(sos, out, zi=zi)
- self._hpf_state[state_key] = zi_new.astype(np.float32, copy=False)
- out = out.astype(np.float32, copy=False)
- notch = self._get_notch_coeff(sample_rate)
- if notch is not None:
- b, a = notch
- zi = self._notch_state.get(state_key)
- expected = max(len(a), len(b)) - 1
- if zi is None or zi.size != expected:
- zi = np.zeros(expected, dtype=np.float32)
- out, zi_new = signal.lfilter(b, a, out, zi=zi)
- self._notch_state[state_key] = zi_new.astype(np.float32, copy=False)
- out = out.astype(np.float32, copy=False)
- return out
- def _get_hpf_sos(self, sample_rate: int) -> np.ndarray | None:
- cached = self._hpf_sos_cache.get(sample_rate, "__missing__")
- if isinstance(cached, np.ndarray):
- return cached
- if cached is None:
- return None
- cutoff = min(self.HUM_HPF_CUTOFF_HZ, 0.45 * sample_rate)
- if cutoff < 20.0:
- self._hpf_sos_cache[sample_rate] = None
- return None
- try:
- sos = signal.butter(2, cutoff, btype="highpass", fs=sample_rate, output="sos")
- except ValueError:
- self._hpf_sos_cache[sample_rate] = None
- return None
- self._hpf_sos_cache[sample_rate] = sos
- return sos
- def _get_notch_coeff(self, sample_rate: int) -> tuple[np.ndarray, np.ndarray] | None:
- cached = self._notch_cache.get(sample_rate, "__missing__")
- if isinstance(cached, tuple):
- return cached
- if cached is None:
- return None
- nyquist = 0.5 * sample_rate
- if nyquist <= self.HUM_NOTCH_HZ * 1.2:
- self._notch_cache[sample_rate] = None
- return None
- w0 = self.HUM_NOTCH_HZ / nyquist
- try:
- b, a = signal.iirnotch(w0, self.HUM_NOTCH_Q)
- except ValueError:
- self._notch_cache[sample_rate] = None
- return None
- coeff = (b.astype(np.float32), a.astype(np.float32))
- self._notch_cache[sample_rate] = coeff
- return coeff
- @staticmethod
- def _apply_limiter(audio: np.ndarray) -> np.ndarray:
- if audio.size == 0:
- return audio.astype(np.float32, copy=False)
- x = np.clip(audio.astype(np.float32, copy=False), -1.0, 1.0)
- threshold = 0.82
- abs_x = np.abs(x)
- if not np.any(abs_x > threshold):
- return x.astype(np.float32, copy=False)
- out = x.copy()
- over = abs_x > threshold
- norm = (abs_x[over] - threshold) / max(1.0 - threshold, 1e-6)
- compressed = threshold + (1.0 - threshold) * (np.tanh(2.2 * norm) / np.tanh(2.2))
- out[over] = np.sign(x[over]) * compressed
- return out.astype(np.float32, copy=False)
- @staticmethod
- def _downsample_for_ui(audio: np.ndarray, target_points: int = 320) -> np.ndarray:
- if audio.size <= target_points:
- return audio.astype(np.float32, copy=False)
- step = int(np.ceil(audio.size / target_points))
- sampled = audio[::step]
- if sampled.size > target_points:
- sampled = sampled[:target_points]
- return sampled.astype(np.float32, copy=False)
- @staticmethod
- def _rms(audio: np.ndarray) -> float:
- if audio.size == 0:
- return 0.0
- return float(np.sqrt(np.mean(np.square(audio), dtype=np.float64)))
- @staticmethod
- def _resample_audio(audio: np.ndarray, source_rate: int, target_rate: int) -> np.ndarray:
- if audio.size == 0 or source_rate == target_rate:
- return audio.astype(np.float32, copy=False)
- if source_rate == 48000 and target_rate == 16000:
- usable = audio.size - (audio.size % 3)
- if usable <= 0:
- return audio.astype(np.float32, copy=False)
- # Fast decimation-by-3 tuned for speech workloads on Zero 2W.
- grouped = audio[:usable].reshape(-1, 3)
- return grouped.mean(axis=1, dtype=np.float32).astype(np.float32, copy=False)
- gcd = math.gcd(source_rate, target_rate)
- up = target_rate // gcd
- down = source_rate // gcd
- resampled = signal.resample_poly(audio, up=up, down=down)
- return resampled.astype(np.float32, copy=False)
- def _resolve_input_device(self) -> tuple[int | None, str, int]:
- try:
- default_input = sd.default.device[0]
- if isinstance(default_input, int) and default_input >= 0:
- info = sd.query_devices(default_input, "input")
- max_inputs = int(info.get("max_input_channels", 0))
- if max_inputs > 0:
- return int(default_input), str(info.get("name", f"device-{default_input}")), min(self.CHANNELS, max_inputs)
- except Exception:
- pass
- all_devices = sd.query_devices()
- preferred: tuple[int, dict] | None = None
- fallback: tuple[int, dict] | None = None
- for idx, raw_info in enumerate(all_devices):
- info = dict(raw_info)
- max_inputs = int(info.get("max_input_channels", 0))
- if max_inputs <= 0:
- continue
- device = (idx, info)
- name = str(info.get("name", "")).lower()
- if max_inputs >= self.CHANNELS and any(tag in name for tag in ("google", "voicehat", "i2s", "mic")):
- preferred = device
- break
- if max_inputs >= self.CHANNELS and fallback is None:
- fallback = device
- if fallback is None:
- fallback = device
- chosen = preferred or fallback
- if chosen is None:
- raise RuntimeError("No audio input device available")
- idx, info = chosen
- max_inputs = int(info.get("max_input_channels", 1))
- channels = min(self.CHANNELS, max_inputs)
- return int(idx), str(info.get("name", f"device-{idx}")), channels
- def _push_monitor_chunk_locked(self, chunk: np.ndarray, sample_rate: int) -> None:
- samples = chunk.astype(np.float32, copy=True)
- if samples.size == 0:
- return
- self._monitor_queue.append(samples)
- self._monitor_queue_samples += samples.size
- max_samples = max(sample_rate * 2, 2048)
- while self._monitor_queue_samples > max_samples and self._monitor_queue:
- dropped = self._monitor_queue.popleft()
- self._monitor_queue_samples -= dropped.size
- def _pop_monitor_chunk(self, max_samples: int) -> np.ndarray | None:
- if self._monitor_queue_samples <= 0 or not self._monitor_queue:
- return None
- take: list[np.ndarray] = []
- collected = 0
- while self._monitor_queue and collected < max_samples:
- chunk = self._monitor_queue[0]
- remaining = max_samples - collected
- if chunk.size <= remaining:
- take.append(chunk)
- collected += chunk.size
- self._monitor_queue.popleft()
- else:
- take.append(chunk[:remaining])
- self._monitor_queue[0] = chunk[remaining:]
- collected += remaining
- break
- self._monitor_queue_samples -= collected
- if not take:
- return None
- if len(take) == 1:
- return take[0]
- return np.concatenate(take).astype(np.float32, copy=False)
- @staticmethod
- def _encode_pcm16_base64(audio: np.ndarray) -> str:
- pcm16 = (np.clip(audio, -1.0, 1.0) * 32767.0).astype(np.int16)
- return base64.b64encode(pcm16.tobytes()).decode("ascii")
- def _clear_monitor_queue_locked(self) -> None:
- self._monitor_queue.clear()
- self._monitor_queue_samples = 0
- def _estimate_speech_presence_fast(self, mic1: np.ndarray, mic2: np.ndarray) -> bool:
- if mic1.size == 0 or mic2.size == 0:
- return False
- energy = 0.5 * (np.mean(mic1 * mic1) + np.mean(mic2 * mic2))
- energy = float(max(energy, 1e-10))
- if energy < self._vad_noise_floor:
- alpha = 0.05
- else:
- alpha = 0.004
- self._vad_noise_floor = (1.0 - alpha) * self._vad_noise_floor + alpha * energy
- threshold = max(2.2e-7, self._vad_noise_floor * 1.9)
- return bool(energy > threshold)
- @staticmethod
- def _gcc_phat(sig: np.ndarray, refsig: np.ndarray, sample_rate: int, max_tau: float) -> float:
- n = sig.size + refsig.size
- sig_fft = np.fft.rfft(sig, n=n)
- ref_fft = np.fft.rfft(refsig, n=n)
- cross = sig_fft * np.conj(ref_fft)
- denom = np.abs(cross)
- cross = cross / np.maximum(denom, 1e-10)
- cc = np.fft.irfft(cross, n=n)
- max_shift = int(min(n // 2, max_tau * sample_rate))
- if max_shift <= 0:
- return 0.0
- cc_window = np.concatenate((cc[-max_shift:], cc[: max_shift + 1]))
- shift = int(np.argmax(np.abs(cc_window)) - max_shift)
- return float(shift) / float(sample_rate)
- def _estimate_speech_angle(self, mic1: np.ndarray, mic2: np.ndarray, sample_rate: int) -> tuple[float, bool]:
- if mic1.size < 64 or mic2.size < 64:
- return self._auto_angle_deg, False
- high = min(3400.0, 0.45 * sample_rate)
- low = min(300.0, high * 0.5)
- if high <= low + 1.0:
- return self._auto_angle_deg, False
- sos = self._get_speech_sos(sample_rate, low, high)
- if sos is None:
- return self._auto_angle_deg, False
- speech1 = signal.sosfilt(sos, mic1).astype(np.float32, copy=False)
- speech2 = signal.sosfilt(sos, mic2).astype(np.float32, copy=False)
- speech_energy = 0.5 * (np.mean(speech1 * speech1) + np.mean(speech2 * speech2))
- full_energy = 0.5 * (np.mean(mic1 * mic1) + np.mean(mic2 * mic2))
- speech_ratio = float(speech_energy / max(full_energy, 1e-12))
- self._noise_floor = 0.995 * self._noise_floor + 0.005 * float(speech_energy)
- speech_threshold = max(2.5e-7, self._noise_floor * 2.0)
- speech_detected = bool(speech_energy > speech_threshold and speech_ratio > 0.08)
- if not speech_detected:
- return self._auto_angle_deg, False
- max_tau = self.MIC_SPACING / 343.0
- tau = self._gcc_phat(speech1, speech2, sample_rate, max_tau=max_tau)
- sin_theta = np.clip((tau * 343.0) / self.MIC_SPACING, -1.0, 1.0)
- raw_angle = float(np.rad2deg(np.arcsin(sin_theta)))
- raw_angle = float(np.clip(raw_angle, -90.0, 90.0))
- self._auto_angle_deg = 0.88 * self._auto_angle_deg + 0.12 * raw_angle
- return self._auto_angle_deg, True
- def _get_speech_sos(self, sample_rate: int, low: float, high: float) -> np.ndarray | None:
- cached = self._speech_sos_cache.get(sample_rate, "__missing__")
- if isinstance(cached, np.ndarray):
- return cached
- if cached is None:
- return None
- try:
- sos = signal.butter(4, [low, high], btype="bandpass", fs=sample_rate, output="sos")
- except ValueError:
- self._speech_sos_cache[sample_rate] = None
- return None
- self._speech_sos_cache[sample_rate] = sos
- return sos
- @staticmethod
- def _make_empty_frame() -> dict[str, object]:
- empty = np.empty(0, dtype=np.float32)
- return {
- "mic1": empty,
- "mic2": empty,
- "beam": empty,
- "mono_mix": empty,
- "show_mic2": True,
- "show_beam": False,
- "show_mono_mix": False,
- "beam_angle_deg": 0.0,
- "auto_beam": True,
- "speech_detected": False,
- "speech_gate_open": True,
- "hifi_mode": False,
- "monitor_on": False,
- "monitor_source": "beam",
- "recording": False,
- "rec_duration": 0.0,
- }
|