stt_bridge.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """Bridge between AudioEngine and stt.mm.mk WebSocket STT service.
  2. Architecture:
  3. - Audio out: UDP sendto() from audio callback — non-blocking, zero copies
  4. - Results in: Worker does HTTP POST to Flask /internal/stt endpoint
  5. Flask handles it natively in eventlet — no threads, no pipes, no sockets to manage.
  6. """
  7. from __future__ import annotations
  8. import logging
  9. import os
  10. import socket
  11. import subprocess
  12. import time
  13. from dataclasses import dataclass
  14. from pathlib import Path
  15. import numpy as np
  16. logger = logging.getLogger("stt_bridge")
  17. _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py")
  18. _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python")
  19. AUDIO_PORT = 19876
  20. @dataclass
  21. class SttSettings:
  22. enabled: bool = False
  23. language: str = "pl"
  24. timestamps: bool = True
  25. diarize: bool = True
  26. itn: bool = True
  27. detect_emotion: bool = False
  28. server_vad: bool = False
  29. vad_threshold: float = 0.3
  30. vad_pad_ms: int = 400
  31. vad_min_ms: int = 100
  32. class SttBridge:
  33. """Manages subprocess STT worker that connects to stt.mm.mk."""
  34. STT_URL = "wss://stt.mm.mk/ws/transcribe"
  35. def __init__(self, on_message=None):
  36. self._settings = SttSettings()
  37. self._on_message = on_message
  38. self._process: subprocess.Popen | None = None
  39. self._audio_sock: socket.socket | None = None
  40. self._connected = False
  41. self._sample_rate = 16000
  42. def get_settings(self) -> dict:
  43. return {
  44. "stt_enabled": self._settings.enabled,
  45. "stt_language": self._settings.language,
  46. "stt_timestamps": self._settings.timestamps,
  47. "stt_diarize": self._settings.diarize,
  48. "stt_itn": self._settings.itn,
  49. "stt_detect_emotion": self._settings.detect_emotion,
  50. "stt_server_vad": self._settings.server_vad,
  51. "stt_vad_threshold": self._settings.vad_threshold,
  52. "stt_vad_pad_ms": self._settings.vad_pad_ms,
  53. "stt_vad_min_ms": self._settings.vad_min_ms,
  54. "stt_connected": self._connected,
  55. }
  56. def handle_worker_message(self, msg: dict):
  57. """Called by Flask when worker POSTs a result."""
  58. if msg.get("type") == "stt_status":
  59. self._connected = bool(msg.get("connected", False))
  60. if self._on_message:
  61. self._on_message(msg)
  62. def update_settings(self, **kwargs) -> dict:
  63. reconnect_keys = {
  64. "language", "timestamps", "diarize", "itn",
  65. "detect_emotion", "server_vad", "vad_threshold",
  66. "vad_pad_ms", "vad_min_ms",
  67. }
  68. changed_enabled = False
  69. need_reconnect = False
  70. for key, val in kwargs.items():
  71. attr = key.replace("stt_", "")
  72. if hasattr(self._settings, attr):
  73. old = getattr(self._settings, attr)
  74. setattr(self._settings, attr, type(old)(val))
  75. if attr == "enabled" and old != self._settings.enabled:
  76. changed_enabled = True
  77. if attr in reconnect_keys:
  78. need_reconnect = True
  79. if changed_enabled:
  80. if self._settings.enabled:
  81. self._start_worker()
  82. else:
  83. self._stop_worker()
  84. elif self._settings.enabled and self._process is not None and need_reconnect:
  85. self._stop_worker()
  86. self._start_worker()
  87. return self.get_settings()
  88. def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
  89. """Feed processed audio to STT via UDP. Non-blocking."""
  90. sock = self._audio_sock
  91. if sock is None or not self._settings.enabled:
  92. return
  93. self._sample_rate = sample_rate
  94. pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
  95. try:
  96. sock.sendto(pcm16.tobytes(), ("127.0.0.1", AUDIO_PORT))
  97. except (OSError, BlockingIOError):
  98. pass
  99. def _build_url(self) -> str:
  100. s = self._settings
  101. parts = [
  102. "language=" + s.language,
  103. "rate=" + str(self._sample_rate),
  104. "stream_id=mic-system-" + str(int(time.time())),
  105. ]
  106. if s.timestamps:
  107. parts.append("timestamps=1")
  108. if s.diarize:
  109. parts.append("diarize=1")
  110. if s.itn:
  111. parts.append("itn=1")
  112. if s.detect_emotion:
  113. parts.append("detect_emotion=1")
  114. if s.server_vad:
  115. parts.append("vad=1")
  116. parts.append("vad_threshold=" + str(s.vad_threshold))
  117. parts.append("vad_pad_ms=" + str(s.vad_pad_ms))
  118. parts.append("vad_min_ms=" + str(s.vad_min_ms))
  119. return self.STT_URL + "?" + "&".join(parts)
  120. def _start_worker(self):
  121. url = self._build_url()
  122. logger.info("STT starting worker: %s", url)
  123. python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3"
  124. self._audio_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  125. self._audio_sock.setblocking(False)
  126. try:
  127. self._process = subprocess.Popen(
  128. [python, _WORKER_PATH, url,
  129. "--audio-port", str(AUDIO_PORT),
  130. "--callback-url", "http://127.0.0.1:5000/internal/stt"],
  131. stdin=subprocess.DEVNULL,
  132. stdout=subprocess.DEVNULL,
  133. stderr=subprocess.DEVNULL,
  134. )
  135. except Exception as e:
  136. logger.error("Failed to start STT worker: %s", e)
  137. if self._audio_sock:
  138. self._audio_sock.close()
  139. self._audio_sock = None
  140. return
  141. def _stop_worker(self):
  142. self._connected = False
  143. proc = self._process
  144. self._process = None
  145. if self._audio_sock:
  146. try:
  147. self._audio_sock.close()
  148. except Exception:
  149. pass
  150. self._audio_sock = None
  151. if proc is not None:
  152. try:
  153. proc.terminate()
  154. proc.wait(timeout=3)
  155. except Exception:
  156. try:
  157. proc.kill()
  158. except Exception:
  159. pass
  160. def stop(self):
  161. self._stop_worker()