"""Bridge between AudioEngine and stt.mm.mk WebSocket STT service. Runs STT WebSocket in a **subprocess** to avoid conflicts with eventlet. Communication: stdin (length-prefixed PCM binary) / stdout (JSON lines). CRITICAL: This module runs inside an eventlet-patched process. threading.Thread = green thread, so we CANNOT use threads for I/O. Instead, feed_audio() uses os.write() with O_NONBLOCK on the pipe fd to avoid blocking the audio callback or the eventlet hub. """ from __future__ import annotations import fcntl import json import logging import os import subprocess import time from dataclasses import dataclass from pathlib import Path import numpy as np logger = logging.getLogger("stt_bridge") _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py") _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python") @dataclass class SttSettings: enabled: bool = False language: str = "pl" timestamps: bool = True diarize: bool = True itn: bool = True detect_emotion: bool = False server_vad: bool = False vad_threshold: float = 0.3 vad_pad_ms: int = 400 vad_min_ms: int = 100 def _set_nonblock(fd): """Set a file descriptor to non-blocking mode.""" flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) class SttBridge: """Manages subprocess STT worker that connects to stt.mm.mk.""" STT_URL = "wss://stt.mm.mk/ws/transcribe" def __init__(self, on_message=None): self._settings = SttSettings() self._on_message = on_message self._process: subprocess.Popen | None = None self._stdin_fd: int | None = None # raw fd for non-blocking writes self._connected = False self._sample_rate = 16000 self._reader_greenlet = None def get_settings(self) -> dict: return { "stt_enabled": self._settings.enabled, "stt_language": self._settings.language, "stt_timestamps": self._settings.timestamps, "stt_diarize": self._settings.diarize, "stt_itn": self._settings.itn, "stt_detect_emotion": self._settings.detect_emotion, "stt_server_vad": self._settings.server_vad, "stt_vad_threshold": self._settings.vad_threshold, "stt_vad_pad_ms": self._settings.vad_pad_ms, "stt_vad_min_ms": self._settings.vad_min_ms, "stt_connected": self._connected, } def update_settings(self, **kwargs) -> dict: reconnect_keys = { "language", "timestamps", "diarize", "itn", "detect_emotion", "server_vad", "vad_threshold", "vad_pad_ms", "vad_min_ms", } changed_enabled = False need_reconnect = False for key, val in kwargs.items(): attr = key.replace("stt_", "") if hasattr(self._settings, attr): old = getattr(self._settings, attr) setattr(self._settings, attr, type(old)(val)) if attr == "enabled" and old != self._settings.enabled: changed_enabled = True if attr in reconnect_keys: need_reconnect = True if changed_enabled: if self._settings.enabled: self._start_worker() else: self._stop_worker() elif self._settings.enabled and self._process is not None and need_reconnect: self._stop_worker() self._start_worker() return self.get_settings() def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None: """Feed processed audio to STT. Completely non-blocking. Uses os.write() with O_NONBLOCK on the pipe fd. If pipe is full, silently drops the chunk. """ fd = self._stdin_fd if fd is None or not self._settings.enabled: return self._sample_rate = sample_rate pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16) payload = pcm16.tobytes() header = len(payload).to_bytes(4, "little") data = header + payload try: os.write(fd, data) except BlockingIOError: # Pipe full — drop chunk, never block pass except OSError: # Pipe broken — subprocess died pass def _build_url(self) -> str: s = self._settings parts = [ "language=" + s.language, "rate=" + str(self._sample_rate), "stream_id=mic-system-" + str(int(time.time())), ] if s.timestamps: parts.append("timestamps=1") if s.diarize: parts.append("diarize=1") if s.itn: parts.append("itn=1") if s.detect_emotion: parts.append("detect_emotion=1") if s.server_vad: parts.append("vad=1") parts.append("vad_threshold=" + str(s.vad_threshold)) parts.append("vad_pad_ms=" + str(s.vad_pad_ms)) parts.append("vad_min_ms=" + str(s.vad_min_ms)) return self.STT_URL + "?" + "&".join(parts) def _start_worker(self): url = self._build_url() logger.info("STT starting worker subprocess: %s", url) python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3" try: self._process = subprocess.Popen( [python, _WORKER_PATH, url], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0, ) except Exception as e: logger.error("Failed to start STT worker: %s", e) return # Set stdin pipe to non-blocking so os.write() never blocks self._stdin_fd = self._process.stdin.fileno() _set_nonblock(self._stdin_fd) # Read stdout in eventlet greenlet (this is fine — reading JSON lines # from a pipe cooperates well with eventlet) try: import eventlet self._reader_greenlet = eventlet.spawn(self._read_stdout) except ImportError: # Fallback if eventlet not available import threading t = threading.Thread(target=self._read_stdout, daemon=True) t.start() def _read_stdout(self): """Read JSON lines from worker stdout and forward via callback.""" proc = self._process if proc is None: return try: for line in proc.stdout: line = line.strip() if not line: continue try: msg = json.loads(line) except (json.JSONDecodeError, ValueError): continue if msg.get("type") == "stt_status": self._connected = bool(msg.get("connected", False)) if self._on_message: self._on_message(msg) except Exception: pass finally: self._connected = False if self._on_message: self._on_message({"type": "stt_status", "connected": False}) def _stop_worker(self): self._connected = False self._stdin_fd = None proc = self._process self._process = None if proc is not None: try: proc.stdin.close() except Exception: pass try: proc.terminate() proc.wait(timeout=3) except Exception: try: proc.kill() except Exception: pass self._reader_greenlet = None def stop(self): self._stop_worker()