| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- """Bridge between AudioEngine and stt.mm.mk WebSocket STT service.
- Architecture:
- - Audio out: UDP sendto() from audio callback — non-blocking, zero copies
- - Results in: Worker does HTTP POST to Flask /internal/stt endpoint
- Flask handles it natively in eventlet — no threads, no pipes, no sockets to manage.
- """
- from __future__ import annotations
- import logging
- import os
- import socket
- import subprocess
- import time
- from dataclasses import dataclass
- from pathlib import Path
- import numpy as np
- logger = logging.getLogger("stt_bridge")
- _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py")
- _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python")
- AUDIO_PORT = 19876
- @dataclass
- class SttSettings:
- enabled: bool = False
- language: str = "pl"
- timestamps: bool = True
- diarize: bool = True
- itn: bool = True
- detect_emotion: bool = False
- server_vad: bool = False
- vad_threshold: float = 0.3
- vad_pad_ms: int = 400
- vad_min_ms: int = 100
- class SttBridge:
- """Manages subprocess STT worker that connects to stt.mm.mk."""
- STT_URL = "wss://stt.mm.mk/ws/transcribe"
- def __init__(self, on_message=None):
- self._settings = SttSettings()
- self._on_message = on_message
- self._process: subprocess.Popen | None = None
- self._audio_sock: socket.socket | None = None
- self._connected = False
- self._sample_rate = 16000
- def get_settings(self) -> dict:
- return {
- "stt_enabled": self._settings.enabled,
- "stt_language": self._settings.language,
- "stt_timestamps": self._settings.timestamps,
- "stt_diarize": self._settings.diarize,
- "stt_itn": self._settings.itn,
- "stt_detect_emotion": self._settings.detect_emotion,
- "stt_server_vad": self._settings.server_vad,
- "stt_vad_threshold": self._settings.vad_threshold,
- "stt_vad_pad_ms": self._settings.vad_pad_ms,
- "stt_vad_min_ms": self._settings.vad_min_ms,
- "stt_connected": self._connected,
- }
- def handle_worker_message(self, msg: dict):
- """Called by Flask when worker POSTs a result."""
- if msg.get("type") == "stt_status":
- self._connected = bool(msg.get("connected", False))
- if self._on_message:
- self._on_message(msg)
- def update_settings(self, **kwargs) -> dict:
- reconnect_keys = {
- "language", "timestamps", "diarize", "itn",
- "detect_emotion", "server_vad", "vad_threshold",
- "vad_pad_ms", "vad_min_ms",
- }
- changed_enabled = False
- need_reconnect = False
- for key, val in kwargs.items():
- attr = key.replace("stt_", "")
- if hasattr(self._settings, attr):
- old = getattr(self._settings, attr)
- setattr(self._settings, attr, type(old)(val))
- if attr == "enabled" and old != self._settings.enabled:
- changed_enabled = True
- if attr in reconnect_keys:
- need_reconnect = True
- if changed_enabled:
- if self._settings.enabled:
- self._start_worker()
- else:
- self._stop_worker()
- elif self._settings.enabled and self._process is not None and need_reconnect:
- self._stop_worker()
- self._start_worker()
- return self.get_settings()
- def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
- """Feed processed audio to STT via UDP. Non-blocking."""
- sock = self._audio_sock
- if sock is None or not self._settings.enabled:
- return
- self._sample_rate = sample_rate
- pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
- try:
- sock.sendto(pcm16.tobytes(), ("127.0.0.1", AUDIO_PORT))
- except (OSError, BlockingIOError):
- pass
- def _build_url(self) -> str:
- s = self._settings
- parts = [
- "language=" + s.language,
- "rate=" + str(self._sample_rate),
- "stream_id=mic-system-" + str(int(time.time())),
- ]
- if s.timestamps:
- parts.append("timestamps=1")
- if s.diarize:
- parts.append("diarize=1")
- if s.itn:
- parts.append("itn=1")
- if s.detect_emotion:
- parts.append("detect_emotion=1")
- if s.server_vad:
- parts.append("vad=1")
- parts.append("vad_threshold=" + str(s.vad_threshold))
- parts.append("vad_pad_ms=" + str(s.vad_pad_ms))
- parts.append("vad_min_ms=" + str(s.vad_min_ms))
- return self.STT_URL + "?" + "&".join(parts)
- def _start_worker(self):
- url = self._build_url()
- logger.info("STT starting worker: %s", url)
- python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3"
- self._audio_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self._audio_sock.setblocking(False)
- try:
- self._process = subprocess.Popen(
- [python, _WORKER_PATH, url,
- "--audio-port", str(AUDIO_PORT),
- "--callback-url", "http://127.0.0.1:5000/internal/stt"],
- stdin=subprocess.DEVNULL,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- )
- except Exception as e:
- logger.error("Failed to start STT worker: %s", e)
- if self._audio_sock:
- self._audio_sock.close()
- self._audio_sock = None
- return
- def _stop_worker(self):
- self._connected = False
- proc = self._process
- self._process = None
- if self._audio_sock:
- try:
- self._audio_sock.close()
- except Exception:
- pass
- self._audio_sock = None
- if proc is not None:
- try:
- proc.terminate()
- proc.wait(timeout=3)
- except Exception:
- try:
- proc.kill()
- except Exception:
- pass
- def stop(self):
- self._stop_worker()
|