| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- from __future__ import annotations
- from dataclasses import asdict, dataclass
- from datetime import datetime
- from pathlib import Path
- import queue
- import threading
- import wave
- import numpy as np
- @dataclass
- class RecorderStatus:
- recording: bool = False
- filename: str = ""
- duration_sec: float = 0.0
- channels: int = 1
- sample_rate: int = 16000
- source: str = "mic1"
- class WavRecorder:
- """Threaded WAV writer to avoid blocking the audio callback."""
- def __init__(self, recordings_dir: Path) -> None:
- self.recordings_dir = Path(recordings_dir)
- self.recordings_dir.mkdir(parents=True, exist_ok=True)
- self._lock = threading.Lock()
- self._queue: queue.Queue[bytes] = queue.Queue(maxsize=512)
- self._stop_event = threading.Event()
- self._worker: threading.Thread | None = None
- self._wave_file: wave.Wave_write | None = None
- self._frames_written = 0
- self._status = RecorderStatus()
- def get_status(self) -> RecorderStatus:
- with self._lock:
- return RecorderStatus(**asdict(self._status))
- def start(self, source: str, sample_rate: int, channels: int, filename: str | None = None) -> str:
- with self._lock:
- if self._status.recording:
- raise RuntimeError("Recording is already active")
- source_clean = source.replace(" ", "_").replace("/", "_")
- if filename is None:
- timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- filename = f"rec_{timestamp}_{source_clean}.wav"
- filepath = self.recordings_dir / filename
- self._wave_file = wave.open(str(filepath), "wb")
- self._wave_file.setnchannels(channels)
- self._wave_file.setsampwidth(2)
- self._wave_file.setframerate(sample_rate)
- self._stop_event.clear()
- self._frames_written = 0
- self._clear_queue_locked()
- self._status = RecorderStatus(
- recording=True,
- filename=filename,
- duration_sec=0.0,
- channels=channels,
- sample_rate=sample_rate,
- source=source_clean,
- )
- self._worker = threading.Thread(target=self._writer_loop, daemon=True)
- self._worker.start()
- return filename
- def write(self, audio: np.ndarray) -> None:
- status = self.get_status()
- if not status.recording:
- return
- data = np.asarray(audio, dtype=np.float32)
- if data.ndim == 1:
- if status.channels != 1:
- return
- frames = data.shape[0]
- int16_data = (np.clip(data, -1.0, 1.0) * 32767.0).astype(np.int16)
- payload = int16_data.tobytes()
- elif data.ndim == 2:
- if data.shape[1] != status.channels:
- return
- frames = data.shape[0]
- int16_data = (np.clip(data, -1.0, 1.0) * 32767.0).astype(np.int16)
- payload = int16_data.reshape(-1).tobytes()
- else:
- return
- try:
- self._queue.put_nowait(payload)
- except queue.Full:
- return
- with self._lock:
- self._frames_written += frames
- self._status.duration_sec = self._frames_written / float(status.sample_rate)
- def stop(self) -> RecorderStatus:
- with self._lock:
- if not self._status.recording:
- return RecorderStatus(**asdict(self._status))
- self._status.recording = False
- worker = self._worker
- self._stop_event.set()
- if worker is not None:
- worker.join(timeout=2.0)
- with self._lock:
- if self._wave_file is not None:
- self._wave_file.close()
- self._wave_file = None
- self._worker = None
- self._stop_event.clear()
- return RecorderStatus(**asdict(self._status))
- def _writer_loop(self) -> None:
- while not self._stop_event.is_set() or not self._queue.empty():
- try:
- payload = self._queue.get(timeout=0.1)
- except queue.Empty:
- continue
- with self._lock:
- if self._wave_file is not None:
- self._wave_file.writeframes(payload)
- def _clear_queue_locked(self) -> None:
- while True:
- try:
- self._queue.get_nowait()
- except queue.Empty:
- break
- def resolve_recording_path(recordings_dir: Path, filename: str) -> Path:
- root = Path(recordings_dir).resolve()
- candidate = (root / filename).resolve()
- if root not in candidate.parents and candidate != root:
- raise ValueError("Invalid recording path")
- return candidate
- def list_recordings(recordings_dir: Path) -> list[dict[str, object]]:
- results: list[dict[str, object]] = []
- folder = Path(recordings_dir)
- folder.mkdir(parents=True, exist_ok=True)
- for wav_file in sorted(folder.glob("*.wav"), key=lambda p: p.stat().st_mtime, reverse=True):
- try:
- with wave.open(str(wav_file), "rb") as wf:
- frames = wf.getnframes()
- sample_rate = wf.getframerate()
- channels = wf.getnchannels()
- except (wave.Error, EOFError, OSError):
- continue
- size_bytes = wav_file.stat().st_size
- mtime = datetime.fromtimestamp(wav_file.stat().st_mtime).isoformat(timespec="seconds")
- duration_sec = float(frames) / float(sample_rate) if sample_rate else 0.0
- results.append(
- {
- "filename": wav_file.name,
- "date": mtime,
- "duration_sec": round(duration_sec, 3),
- "size_bytes": size_bytes,
- "channels": channels,
- "sample_rate": sample_rate,
- }
- )
- return results
|