"""Bridge between AudioEngine and stt.mm.mk WebSocket STT service. Runs STT WebSocket in a **subprocess** to avoid conflicts with eventlet. Communication: stdin (length-prefixed PCM binary) / stdout (JSON lines). """ from __future__ import annotations import json import logging import os import subprocess import threading import time from dataclasses import dataclass from pathlib import Path import numpy as np logger = logging.getLogger("stt_bridge") _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py") _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python") @dataclass class SttSettings: enabled: bool = False language: str = "pl" timestamps: bool = True diarize: bool = True itn: bool = True detect_emotion: bool = False server_vad: bool = False vad_threshold: float = 0.3 vad_pad_ms: int = 400 vad_min_ms: int = 100 class SttBridge: """Manages subprocess STT worker that connects to stt.mm.mk.""" STT_URL = "wss://stt.mm.mk/ws/transcribe" def __init__(self, on_message=None): self._lock = threading.Lock() self._settings = SttSettings() self._on_message = on_message self._process: subprocess.Popen | None = None self._reader_thread: threading.Thread | None = None self._connected = False self._sample_rate = 16000 def get_settings(self) -> dict: with self._lock: return { "stt_enabled": self._settings.enabled, "stt_language": self._settings.language, "stt_timestamps": self._settings.timestamps, "stt_diarize": self._settings.diarize, "stt_itn": self._settings.itn, "stt_detect_emotion": self._settings.detect_emotion, "stt_server_vad": self._settings.server_vad, "stt_vad_threshold": self._settings.vad_threshold, "stt_vad_pad_ms": self._settings.vad_pad_ms, "stt_vad_min_ms": self._settings.vad_min_ms, "stt_connected": self._connected, } def update_settings(self, **kwargs) -> dict: reconnect_keys = { "language", "timestamps", "diarize", "itn", "detect_emotion", "server_vad", "vad_threshold", "vad_pad_ms", "vad_min_ms", } changed_enabled = False need_reconnect = False with self._lock: for key, val in kwargs.items(): attr = key.replace("stt_", "") if hasattr(self._settings, attr): old = getattr(self._settings, attr) setattr(self._settings, attr, type(old)(val)) if attr == "enabled" and old != self._settings.enabled: changed_enabled = True if attr in reconnect_keys: need_reconnect = True if changed_enabled: if self._settings.enabled: self._start_worker() else: self._stop_worker() elif self._settings.enabled and self._process is not None and need_reconnect: self._stop_worker() self._start_worker() return self.get_settings() def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None: """Feed processed audio to STT subprocess. Non-blocking.""" proc = self._process if proc is None or proc.poll() is not None: return if not self._settings.enabled: return self._sample_rate = sample_rate pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16) payload = pcm16.tobytes() try: # Length-prefixed binary: 4 bytes little-endian length + PCM data header = len(payload).to_bytes(4, "little") proc.stdin.write(header + payload) proc.stdin.flush() except (BrokenPipeError, OSError): pass def _build_url(self) -> str: s = self._settings parts = [ "language=" + s.language, "rate=" + str(self._sample_rate), "stream_id=mic-system-" + str(int(time.time())), ] if s.timestamps: parts.append("timestamps=1") if s.diarize: parts.append("diarize=1") if s.itn: parts.append("itn=1") if s.detect_emotion: parts.append("detect_emotion=1") if s.server_vad: parts.append("vad=1") parts.append("vad_threshold=" + str(s.vad_threshold)) parts.append("vad_pad_ms=" + str(s.vad_pad_ms)) parts.append("vad_min_ms=" + str(s.vad_min_ms)) return self.STT_URL + "?" + "&".join(parts) def _start_worker(self): url = self._build_url() logger.info("STT starting worker subprocess: %s", url) python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3" try: self._process = subprocess.Popen( [python, _WORKER_PATH, url], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0, ) except Exception as e: logger.error("Failed to start STT worker: %s", e) return self._reader_thread = threading.Thread( target=self._read_stdout, daemon=True, ) self._reader_thread.start() def _read_stdout(self): """Read JSON lines from worker stdout and forward via callback.""" proc = self._process if proc is None: return try: for line in proc.stdout: line = line.strip() if not line: continue try: msg = json.loads(line) except (json.JSONDecodeError, ValueError): continue if msg.get("type") == "stt_status": self._connected = bool(msg.get("connected", False)) if self._on_message: self._on_message(msg) except Exception: pass finally: self._connected = False if self._on_message: self._on_message({"type": "stt_status", "connected": False}) def _stop_worker(self): self._connected = False proc = self._process self._process = None if proc is not None: try: proc.stdin.close() except Exception: pass try: proc.terminate() proc.wait(timeout=3) except Exception: try: proc.kill() except Exception: pass self._reader_thread = None def stop(self): self._stop_worker()