"""Bridge between AudioEngine and stt.mm.mk WebSocket STT service. Architecture: - Audio out: UDP sendto() from audio callback — non-blocking, zero copies - Results in: Worker does HTTP POST to Flask /internal/stt endpoint Flask handles it natively in eventlet — no threads, no pipes, no sockets to manage. """ from __future__ import annotations import logging import os import socket import subprocess import time from dataclasses import dataclass from pathlib import Path import numpy as np logger = logging.getLogger("stt_bridge") _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py") _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python") AUDIO_PORT = 19876 @dataclass class SttSettings: enabled: bool = False language: str = "pl" timestamps: bool = True diarize: bool = True itn: bool = True detect_emotion: bool = False server_vad: bool = False vad_threshold: float = 0.3 vad_pad_ms: int = 400 vad_min_ms: int = 100 class SttBridge: """Manages subprocess STT worker that connects to stt.mm.mk.""" STT_URL = "wss://stt.mm.mk/ws/transcribe" def __init__(self, on_message=None): self._settings = SttSettings() self._on_message = on_message self._process: subprocess.Popen | None = None self._audio_sock: socket.socket | None = None self._connected = False self._sample_rate = 16000 def get_settings(self) -> dict: return { "stt_enabled": self._settings.enabled, "stt_language": self._settings.language, "stt_timestamps": self._settings.timestamps, "stt_diarize": self._settings.diarize, "stt_itn": self._settings.itn, "stt_detect_emotion": self._settings.detect_emotion, "stt_server_vad": self._settings.server_vad, "stt_vad_threshold": self._settings.vad_threshold, "stt_vad_pad_ms": self._settings.vad_pad_ms, "stt_vad_min_ms": self._settings.vad_min_ms, "stt_connected": self._connected, } def handle_worker_message(self, msg: dict): """Called by Flask when worker POSTs a result.""" if msg.get("type") == "stt_status": self._connected = bool(msg.get("connected", False)) if self._on_message: self._on_message(msg) def update_settings(self, **kwargs) -> dict: reconnect_keys = { "language", "timestamps", "diarize", "itn", "detect_emotion", "server_vad", "vad_threshold", "vad_pad_ms", "vad_min_ms", } changed_enabled = False need_reconnect = False for key, val in kwargs.items(): attr = key.replace("stt_", "") if hasattr(self._settings, attr): old = getattr(self._settings, attr) setattr(self._settings, attr, type(old)(val)) if attr == "enabled" and old != self._settings.enabled: changed_enabled = True if attr in reconnect_keys: need_reconnect = True if changed_enabled: if self._settings.enabled: self._start_worker() else: self._stop_worker() elif self._settings.enabled and self._process is not None and need_reconnect: self._stop_worker() self._start_worker() return self.get_settings() def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None: """Feed processed audio to STT via UDP. Non-blocking.""" sock = self._audio_sock if sock is None or not self._settings.enabled: return self._sample_rate = sample_rate pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16) try: sock.sendto(pcm16.tobytes(), ("127.0.0.1", AUDIO_PORT)) except (OSError, BlockingIOError): pass def _build_url(self) -> str: s = self._settings parts = [ "language=" + s.language, "rate=" + str(self._sample_rate), "stream_id=mic-system-" + str(int(time.time())), ] if s.timestamps: parts.append("timestamps=1") if s.diarize: parts.append("diarize=1") if s.itn: parts.append("itn=1") if s.detect_emotion: parts.append("detect_emotion=1") if s.server_vad: parts.append("vad=1") parts.append("vad_threshold=" + str(s.vad_threshold)) parts.append("vad_pad_ms=" + str(s.vad_pad_ms)) parts.append("vad_min_ms=" + str(s.vad_min_ms)) return self.STT_URL + "?" + "&".join(parts) def _start_worker(self): url = self._build_url() logger.info("STT starting worker: %s", url) python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3" self._audio_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._audio_sock.setblocking(False) try: self._process = subprocess.Popen( [python, _WORKER_PATH, url, "--audio-port", str(AUDIO_PORT), "--callback-url", "http://127.0.0.1:5000/internal/stt"], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except Exception as e: logger.error("Failed to start STT worker: %s", e) if self._audio_sock: self._audio_sock.close() self._audio_sock = None return def _stop_worker(self): self._connected = False proc = self._process self._process = None if self._audio_sock: try: self._audio_sock.close() except Exception: pass self._audio_sock = None if proc is not None: try: proc.terminate() proc.wait(timeout=3) except Exception: try: proc.kill() except Exception: pass def stop(self): self._stop_worker()