| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- """Bridge between AudioEngine and stt.mm.mk WebSocket STT service.
- Runs STT WebSocket in a **subprocess** to avoid conflicts with eventlet.
- Communication: stdin (length-prefixed PCM binary) / stdout (JSON lines).
- CRITICAL: This module runs inside an eventlet-patched process.
- threading.Thread = green thread, so we CANNOT use threads for I/O.
- Instead, feed_audio() uses os.write() with O_NONBLOCK on the pipe fd
- to avoid blocking the audio callback or the eventlet hub.
- """
- from __future__ import annotations
- import fcntl
- import json
- import logging
- import os
- import subprocess
- import time
- from dataclasses import dataclass
- from pathlib import Path
- import numpy as np
- logger = logging.getLogger("stt_bridge")
- _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py")
- _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python")
- @dataclass
- class SttSettings:
- enabled: bool = False
- language: str = "pl"
- timestamps: bool = True
- diarize: bool = True
- itn: bool = True
- detect_emotion: bool = False
- server_vad: bool = False
- vad_threshold: float = 0.3
- vad_pad_ms: int = 400
- vad_min_ms: int = 100
- def _set_nonblock(fd):
- """Set a file descriptor to non-blocking mode."""
- flags = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- class SttBridge:
- """Manages subprocess STT worker that connects to stt.mm.mk."""
- STT_URL = "wss://stt.mm.mk/ws/transcribe"
- def __init__(self, on_message=None):
- self._settings = SttSettings()
- self._on_message = on_message
- self._process: subprocess.Popen | None = None
- self._stdin_fd: int | None = None # raw fd for non-blocking writes
- self._connected = False
- self._sample_rate = 16000
- self._reader_greenlet = None
- def get_settings(self) -> dict:
- return {
- "stt_enabled": self._settings.enabled,
- "stt_language": self._settings.language,
- "stt_timestamps": self._settings.timestamps,
- "stt_diarize": self._settings.diarize,
- "stt_itn": self._settings.itn,
- "stt_detect_emotion": self._settings.detect_emotion,
- "stt_server_vad": self._settings.server_vad,
- "stt_vad_threshold": self._settings.vad_threshold,
- "stt_vad_pad_ms": self._settings.vad_pad_ms,
- "stt_vad_min_ms": self._settings.vad_min_ms,
- "stt_connected": self._connected,
- }
- def update_settings(self, **kwargs) -> dict:
- reconnect_keys = {
- "language", "timestamps", "diarize", "itn",
- "detect_emotion", "server_vad", "vad_threshold",
- "vad_pad_ms", "vad_min_ms",
- }
- changed_enabled = False
- need_reconnect = False
- for key, val in kwargs.items():
- attr = key.replace("stt_", "")
- if hasattr(self._settings, attr):
- old = getattr(self._settings, attr)
- setattr(self._settings, attr, type(old)(val))
- if attr == "enabled" and old != self._settings.enabled:
- changed_enabled = True
- if attr in reconnect_keys:
- need_reconnect = True
- if changed_enabled:
- if self._settings.enabled:
- self._start_worker()
- else:
- self._stop_worker()
- elif self._settings.enabled and self._process is not None and need_reconnect:
- self._stop_worker()
- self._start_worker()
- return self.get_settings()
- def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
- """Feed processed audio to STT. Completely non-blocking.
- Uses os.write() with O_NONBLOCK on the pipe fd.
- If pipe is full, silently drops the chunk.
- """
- fd = self._stdin_fd
- if fd is None or not self._settings.enabled:
- return
- self._sample_rate = sample_rate
- pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
- payload = pcm16.tobytes()
- header = len(payload).to_bytes(4, "little")
- data = header + payload
- try:
- os.write(fd, data)
- except BlockingIOError:
- # Pipe full — drop chunk, never block
- pass
- except OSError:
- # Pipe broken — subprocess died
- pass
- def _build_url(self) -> str:
- s = self._settings
- parts = [
- "language=" + s.language,
- "rate=" + str(self._sample_rate),
- "stream_id=mic-system-" + str(int(time.time())),
- ]
- if s.timestamps:
- parts.append("timestamps=1")
- if s.diarize:
- parts.append("diarize=1")
- if s.itn:
- parts.append("itn=1")
- if s.detect_emotion:
- parts.append("detect_emotion=1")
- if s.server_vad:
- parts.append("vad=1")
- parts.append("vad_threshold=" + str(s.vad_threshold))
- parts.append("vad_pad_ms=" + str(s.vad_pad_ms))
- parts.append("vad_min_ms=" + str(s.vad_min_ms))
- return self.STT_URL + "?" + "&".join(parts)
- def _start_worker(self):
- url = self._build_url()
- logger.info("STT starting worker subprocess: %s", url)
- python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3"
- try:
- self._process = subprocess.Popen(
- [python, _WORKER_PATH, url],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL,
- bufsize=0,
- )
- except Exception as e:
- logger.error("Failed to start STT worker: %s", e)
- return
- # Set stdin pipe to non-blocking so os.write() never blocks
- self._stdin_fd = self._process.stdin.fileno()
- _set_nonblock(self._stdin_fd)
- # Read stdout in eventlet greenlet (this is fine — reading JSON lines
- # from a pipe cooperates well with eventlet)
- try:
- import eventlet
- self._reader_greenlet = eventlet.spawn(self._read_stdout)
- except ImportError:
- # Fallback if eventlet not available
- import threading
- t = threading.Thread(target=self._read_stdout, daemon=True)
- t.start()
- def _read_stdout(self):
- """Read JSON lines from worker stdout and forward via callback."""
- proc = self._process
- if proc is None:
- return
- try:
- for line in proc.stdout:
- line = line.strip()
- if not line:
- continue
- try:
- msg = json.loads(line)
- except (json.JSONDecodeError, ValueError):
- continue
- if msg.get("type") == "stt_status":
- self._connected = bool(msg.get("connected", False))
- if self._on_message:
- self._on_message(msg)
- except Exception:
- pass
- finally:
- self._connected = False
- if self._on_message:
- self._on_message({"type": "stt_status", "connected": False})
- def _stop_worker(self):
- self._connected = False
- self._stdin_fd = None
- proc = self._process
- self._process = None
- if proc is not None:
- try:
- proc.stdin.close()
- except Exception:
- pass
- try:
- proc.terminate()
- proc.wait(timeout=3)
- except Exception:
- try:
- proc.kill()
- except Exception:
- pass
- self._reader_greenlet = None
- def stop(self):
- self._stop_worker()
|