stt_bridge.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. """Bridge between AudioEngine and stt.mm.mk WebSocket STT service.
  2. Runs STT WebSocket in a **subprocess** to avoid conflicts with eventlet.
  3. Communication: stdin (length-prefixed PCM binary) / stdout (JSON lines).
  4. CRITICAL: This module runs inside an eventlet-patched process.
  5. threading.Thread = green thread, so we CANNOT use threads for I/O.
  6. Instead, feed_audio() uses os.write() with O_NONBLOCK on the pipe fd
  7. to avoid blocking the audio callback or the eventlet hub.
  8. """
  9. from __future__ import annotations
  10. import fcntl
  11. import json
  12. import logging
  13. import os
  14. import subprocess
  15. import time
  16. from dataclasses import dataclass
  17. from pathlib import Path
  18. import numpy as np
  19. logger = logging.getLogger("stt_bridge")
  20. _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py")
  21. _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python")
  22. @dataclass
  23. class SttSettings:
  24. enabled: bool = False
  25. language: str = "pl"
  26. timestamps: bool = True
  27. diarize: bool = True
  28. itn: bool = True
  29. detect_emotion: bool = False
  30. server_vad: bool = False
  31. vad_threshold: float = 0.3
  32. vad_pad_ms: int = 400
  33. vad_min_ms: int = 100
  34. def _set_nonblock(fd):
  35. """Set a file descriptor to non-blocking mode."""
  36. flags = fcntl.fcntl(fd, fcntl.F_GETFL)
  37. fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
  38. class SttBridge:
  39. """Manages subprocess STT worker that connects to stt.mm.mk."""
  40. STT_URL = "wss://stt.mm.mk/ws/transcribe"
  41. def __init__(self, on_message=None):
  42. self._settings = SttSettings()
  43. self._on_message = on_message
  44. self._process: subprocess.Popen | None = None
  45. self._stdin_fd: int | None = None # raw fd for non-blocking writes
  46. self._connected = False
  47. self._sample_rate = 16000
  48. self._reader_greenlet = None
  49. def get_settings(self) -> dict:
  50. return {
  51. "stt_enabled": self._settings.enabled,
  52. "stt_language": self._settings.language,
  53. "stt_timestamps": self._settings.timestamps,
  54. "stt_diarize": self._settings.diarize,
  55. "stt_itn": self._settings.itn,
  56. "stt_detect_emotion": self._settings.detect_emotion,
  57. "stt_server_vad": self._settings.server_vad,
  58. "stt_vad_threshold": self._settings.vad_threshold,
  59. "stt_vad_pad_ms": self._settings.vad_pad_ms,
  60. "stt_vad_min_ms": self._settings.vad_min_ms,
  61. "stt_connected": self._connected,
  62. }
  63. def update_settings(self, **kwargs) -> dict:
  64. reconnect_keys = {
  65. "language", "timestamps", "diarize", "itn",
  66. "detect_emotion", "server_vad", "vad_threshold",
  67. "vad_pad_ms", "vad_min_ms",
  68. }
  69. changed_enabled = False
  70. need_reconnect = False
  71. for key, val in kwargs.items():
  72. attr = key.replace("stt_", "")
  73. if hasattr(self._settings, attr):
  74. old = getattr(self._settings, attr)
  75. setattr(self._settings, attr, type(old)(val))
  76. if attr == "enabled" and old != self._settings.enabled:
  77. changed_enabled = True
  78. if attr in reconnect_keys:
  79. need_reconnect = True
  80. if changed_enabled:
  81. if self._settings.enabled:
  82. self._start_worker()
  83. else:
  84. self._stop_worker()
  85. elif self._settings.enabled and self._process is not None and need_reconnect:
  86. self._stop_worker()
  87. self._start_worker()
  88. return self.get_settings()
  89. def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
  90. """Feed processed audio to STT. Completely non-blocking.
  91. Uses os.write() with O_NONBLOCK on the pipe fd.
  92. If pipe is full, silently drops the chunk.
  93. """
  94. fd = self._stdin_fd
  95. if fd is None or not self._settings.enabled:
  96. return
  97. self._sample_rate = sample_rate
  98. pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
  99. payload = pcm16.tobytes()
  100. header = len(payload).to_bytes(4, "little")
  101. data = header + payload
  102. try:
  103. os.write(fd, data)
  104. except BlockingIOError:
  105. # Pipe full — drop chunk, never block
  106. pass
  107. except OSError:
  108. # Pipe broken — subprocess died
  109. pass
  110. def _build_url(self) -> str:
  111. s = self._settings
  112. parts = [
  113. "language=" + s.language,
  114. "rate=" + str(self._sample_rate),
  115. "stream_id=mic-system-" + str(int(time.time())),
  116. ]
  117. if s.timestamps:
  118. parts.append("timestamps=1")
  119. if s.diarize:
  120. parts.append("diarize=1")
  121. if s.itn:
  122. parts.append("itn=1")
  123. if s.detect_emotion:
  124. parts.append("detect_emotion=1")
  125. if s.server_vad:
  126. parts.append("vad=1")
  127. parts.append("vad_threshold=" + str(s.vad_threshold))
  128. parts.append("vad_pad_ms=" + str(s.vad_pad_ms))
  129. parts.append("vad_min_ms=" + str(s.vad_min_ms))
  130. return self.STT_URL + "?" + "&".join(parts)
  131. def _start_worker(self):
  132. url = self._build_url()
  133. logger.info("STT starting worker subprocess: %s", url)
  134. python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3"
  135. try:
  136. self._process = subprocess.Popen(
  137. [python, _WORKER_PATH, url],
  138. stdin=subprocess.PIPE,
  139. stdout=subprocess.PIPE,
  140. stderr=subprocess.DEVNULL,
  141. bufsize=0,
  142. )
  143. except Exception as e:
  144. logger.error("Failed to start STT worker: %s", e)
  145. return
  146. # Set stdin pipe to non-blocking so os.write() never blocks
  147. self._stdin_fd = self._process.stdin.fileno()
  148. _set_nonblock(self._stdin_fd)
  149. # Read stdout in eventlet greenlet (this is fine — reading JSON lines
  150. # from a pipe cooperates well with eventlet)
  151. try:
  152. import eventlet
  153. self._reader_greenlet = eventlet.spawn(self._read_stdout)
  154. except ImportError:
  155. # Fallback if eventlet not available
  156. import threading
  157. t = threading.Thread(target=self._read_stdout, daemon=True)
  158. t.start()
  159. def _read_stdout(self):
  160. """Read JSON lines from worker stdout and forward via callback."""
  161. proc = self._process
  162. if proc is None:
  163. return
  164. try:
  165. for line in proc.stdout:
  166. line = line.strip()
  167. if not line:
  168. continue
  169. try:
  170. msg = json.loads(line)
  171. except (json.JSONDecodeError, ValueError):
  172. continue
  173. if msg.get("type") == "stt_status":
  174. self._connected = bool(msg.get("connected", False))
  175. if self._on_message:
  176. self._on_message(msg)
  177. except Exception:
  178. pass
  179. finally:
  180. self._connected = False
  181. if self._on_message:
  182. self._on_message({"type": "stt_status", "connected": False})
  183. def _stop_worker(self):
  184. self._connected = False
  185. self._stdin_fd = None
  186. proc = self._process
  187. self._process = None
  188. if proc is not None:
  189. try:
  190. proc.stdin.close()
  191. except Exception:
  192. pass
  193. try:
  194. proc.terminate()
  195. proc.wait(timeout=3)
  196. except Exception:
  197. try:
  198. proc.kill()
  199. except Exception:
  200. pass
  201. self._reader_greenlet = None
  202. def stop(self):
  203. self._stop_worker()