from __future__ import annotations from dataclasses import asdict, dataclass from datetime import datetime from pathlib import Path import queue import threading import wave import numpy as np @dataclass class RecorderStatus: recording: bool = False filename: str = "" duration_sec: float = 0.0 channels: int = 1 sample_rate: int = 16000 source: str = "mic1" class WavRecorder: """Threaded WAV writer to avoid blocking the audio callback.""" def __init__(self, recordings_dir: Path) -> None: self.recordings_dir = Path(recordings_dir) self.recordings_dir.mkdir(parents=True, exist_ok=True) self._lock = threading.Lock() self._queue: queue.Queue[bytes] = queue.Queue(maxsize=512) self._stop_event = threading.Event() self._worker: threading.Thread | None = None self._wave_file: wave.Wave_write | None = None self._frames_written = 0 self._status = RecorderStatus() def get_status(self) -> RecorderStatus: with self._lock: return RecorderStatus(**asdict(self._status)) def start(self, source: str, sample_rate: int, channels: int, filename: str | None = None) -> str: with self._lock: if self._status.recording: raise RuntimeError("Recording is already active") source_clean = source.replace(" ", "_").replace("/", "_") if filename is None: timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") filename = f"rec_{timestamp}_{source_clean}.wav" filepath = self.recordings_dir / filename self._wave_file = wave.open(str(filepath), "wb") self._wave_file.setnchannels(channels) self._wave_file.setsampwidth(2) self._wave_file.setframerate(sample_rate) self._stop_event.clear() self._frames_written = 0 self._clear_queue_locked() self._status = RecorderStatus( recording=True, filename=filename, duration_sec=0.0, channels=channels, sample_rate=sample_rate, source=source_clean, ) self._worker = threading.Thread(target=self._writer_loop, daemon=True) self._worker.start() return filename def write(self, audio: np.ndarray) -> None: status = self.get_status() if not status.recording: return data = np.asarray(audio, dtype=np.float32) if data.ndim == 1: if status.channels != 1: return frames = data.shape[0] int16_data = (np.clip(data, -1.0, 1.0) * 32767.0).astype(np.int16) payload = int16_data.tobytes() elif data.ndim == 2: if data.shape[1] != status.channels: return frames = data.shape[0] int16_data = (np.clip(data, -1.0, 1.0) * 32767.0).astype(np.int16) payload = int16_data.reshape(-1).tobytes() else: return try: self._queue.put_nowait(payload) except queue.Full: return with self._lock: self._frames_written += frames self._status.duration_sec = self._frames_written / float(status.sample_rate) def stop(self) -> RecorderStatus: with self._lock: if not self._status.recording: return RecorderStatus(**asdict(self._status)) self._status.recording = False worker = self._worker self._stop_event.set() if worker is not None: worker.join(timeout=2.0) with self._lock: if self._wave_file is not None: self._wave_file.close() self._wave_file = None self._worker = None self._stop_event.clear() return RecorderStatus(**asdict(self._status)) def _writer_loop(self) -> None: while not self._stop_event.is_set() or not self._queue.empty(): try: payload = self._queue.get(timeout=0.1) except queue.Empty: continue with self._lock: if self._wave_file is not None: self._wave_file.writeframes(payload) def _clear_queue_locked(self) -> None: while True: try: self._queue.get_nowait() except queue.Empty: break def resolve_recording_path(recordings_dir: Path, filename: str) -> Path: root = Path(recordings_dir).resolve() candidate = (root / filename).resolve() if root not in candidate.parents and candidate != root: raise ValueError("Invalid recording path") return candidate def list_recordings(recordings_dir: Path) -> list[dict[str, object]]: results: list[dict[str, object]] = [] folder = Path(recordings_dir) folder.mkdir(parents=True, exist_ok=True) for wav_file in sorted(folder.glob("*.wav"), key=lambda p: p.stat().st_mtime, reverse=True): try: with wave.open(str(wav_file), "rb") as wf: frames = wf.getnframes() sample_rate = wf.getframerate() channels = wf.getnchannels() except (wave.Error, EOFError, OSError): continue size_bytes = wav_file.stat().st_size mtime = datetime.fromtimestamp(wav_file.stat().st_mtime).isoformat(timespec="seconds") duration_sec = float(frames) / float(sample_rate) if sample_rate else 0.0 results.append( { "filename": wav_file.name, "date": mtime, "duration_sec": round(duration_sec, 3), "size_bytes": size_bytes, "channels": channels, "sample_rate": sample_rate, } ) return results