Kaynağa Gözat

Fix STT: use O_NONBLOCK pipe instead of threads

threading.Thread in eventlet = green thread, not OS thread.
Writer thread doing proc.stdin.write() was blocking the eventlet hub.

Now: set stdin pipe fd to O_NONBLOCK, use os.write() directly from
audio callback. If pipe is full, BlockingIOError is caught and chunk
is silently dropped. Zero blocking, zero threads for writing.
Reader uses eventlet.spawn() (green thread that cooperates with hub).
Paweł Chodaczek 1 ay önce
ebeveyn
işleme
80f864bfce
1 değiştirilmiş dosya ile 63 ekleme ve 96 silme
  1. 63 96
      stt_bridge.py

+ 63 - 96
stt_bridge.py

@@ -2,17 +2,19 @@
 
 Runs STT WebSocket in a **subprocess** to avoid conflicts with eventlet.
 Communication: stdin (length-prefixed PCM binary) / stdout (JSON lines).
-feed_audio() is non-blocking: puts data in a bounded queue, a writer
-thread drains the queue to subprocess stdin.
+
+CRITICAL: This module runs inside an eventlet-patched process.
+threading.Thread = green thread, so we CANNOT use threads for I/O.
+Instead, feed_audio() uses os.write() with O_NONBLOCK on the pipe fd
+to avoid blocking the audio callback or the eventlet hub.
 """
 from __future__ import annotations
 
+import fcntl
 import json
 import logging
 import os
-import queue
 import subprocess
-import threading
 import time
 from dataclasses import dataclass
 from pathlib import Path
@@ -39,39 +41,40 @@ class SttSettings:
     vad_min_ms: int = 100
 
 
+def _set_nonblock(fd):
+    """Set a file descriptor to non-blocking mode."""
+    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+
 class SttBridge:
     """Manages subprocess STT worker that connects to stt.mm.mk."""
 
     STT_URL = "wss://stt.mm.mk/ws/transcribe"
-    MAX_QUEUE = 30
 
     def __init__(self, on_message=None):
-        self._lock = threading.Lock()
         self._settings = SttSettings()
         self._on_message = on_message
         self._process: subprocess.Popen | None = None
-        self._reader_thread: threading.Thread | None = None
-        self._writer_thread: threading.Thread | None = None
-        self._audio_queue: queue.Queue[bytes] = queue.Queue(maxsize=self.MAX_QUEUE)
-        self._should_run = False
+        self._stdin_fd: int | None = None  # raw fd for non-blocking writes
         self._connected = False
         self._sample_rate = 16000
+        self._reader_greenlet = None
 
     def get_settings(self) -> dict:
-        with self._lock:
-            return {
-                "stt_enabled": self._settings.enabled,
-                "stt_language": self._settings.language,
-                "stt_timestamps": self._settings.timestamps,
-                "stt_diarize": self._settings.diarize,
-                "stt_itn": self._settings.itn,
-                "stt_detect_emotion": self._settings.detect_emotion,
-                "stt_server_vad": self._settings.server_vad,
-                "stt_vad_threshold": self._settings.vad_threshold,
-                "stt_vad_pad_ms": self._settings.vad_pad_ms,
-                "stt_vad_min_ms": self._settings.vad_min_ms,
-                "stt_connected": self._connected,
-            }
+        return {
+            "stt_enabled": self._settings.enabled,
+            "stt_language": self._settings.language,
+            "stt_timestamps": self._settings.timestamps,
+            "stt_diarize": self._settings.diarize,
+            "stt_itn": self._settings.itn,
+            "stt_detect_emotion": self._settings.detect_emotion,
+            "stt_server_vad": self._settings.server_vad,
+            "stt_vad_threshold": self._settings.vad_threshold,
+            "stt_vad_pad_ms": self._settings.vad_pad_ms,
+            "stt_vad_min_ms": self._settings.vad_min_ms,
+            "stt_connected": self._connected,
+        }
 
     def update_settings(self, **kwargs) -> dict:
         reconnect_keys = {
@@ -82,16 +85,15 @@ class SttBridge:
         changed_enabled = False
         need_reconnect = False
 
-        with self._lock:
-            for key, val in kwargs.items():
-                attr = key.replace("stt_", "")
-                if hasattr(self._settings, attr):
-                    old = getattr(self._settings, attr)
-                    setattr(self._settings, attr, type(old)(val))
-                    if attr == "enabled" and old != self._settings.enabled:
-                        changed_enabled = True
-                    if attr in reconnect_keys:
-                        need_reconnect = True
+        for key, val in kwargs.items():
+            attr = key.replace("stt_", "")
+            if hasattr(self._settings, attr):
+                old = getattr(self._settings, attr)
+                setattr(self._settings, attr, type(old)(val))
+                if attr == "enabled" and old != self._settings.enabled:
+                    changed_enabled = True
+                if attr in reconnect_keys:
+                    need_reconnect = True
 
         if changed_enabled:
             if self._settings.enabled:
@@ -107,47 +109,27 @@ class SttBridge:
     def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
         """Feed processed audio to STT. Completely non-blocking.
 
-        Puts length-prefixed PCM into a bounded queue. If queue is full,
-        drops the chunk silently (better than blocking the audio callback).
+        Uses os.write() with O_NONBLOCK on the pipe fd.
+        If pipe is full, silently drops the chunk.
         """
-        if not self._should_run or not self._settings.enabled:
+        fd = self._stdin_fd
+        if fd is None or not self._settings.enabled:
             return
 
         self._sample_rate = sample_rate
         pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
         payload = pcm16.tobytes()
         header = len(payload).to_bytes(4, "little")
+        data = header + payload
 
         try:
-            self._audio_queue.put_nowait(header + payload)
-        except queue.Full:
-            # Drop — never block the audio callback
-            try:
-                self._audio_queue.get_nowait()  # drop oldest
-            except queue.Empty:
-                pass
-            try:
-                self._audio_queue.put_nowait(header + payload)
-            except queue.Full:
-                pass
-
-    def _writer_loop(self):
-        """Background thread: drains queue → writes to subprocess stdin."""
-        while self._should_run:
-            try:
-                data = self._audio_queue.get(timeout=0.2)
-            except queue.Empty:
-                continue
-
-            proc = self._process
-            if proc is None or proc.poll() is not None:
-                continue
-
-            try:
-                proc.stdin.write(data)
-                proc.stdin.flush()
-            except (BrokenPipeError, OSError):
-                pass
+            os.write(fd, data)
+        except BlockingIOError:
+            # Pipe full — drop chunk, never block
+            pass
+        except OSError:
+            # Pipe broken — subprocess died
+            pass
 
     def _build_url(self) -> str:
         s = self._settings
@@ -172,15 +154,6 @@ class SttBridge:
         return self.STT_URL + "?" + "&".join(parts)
 
     def _start_worker(self):
-        self._should_run = True
-
-        # Drain queue
-        while not self._audio_queue.empty():
-            try:
-                self._audio_queue.get_nowait()
-            except queue.Empty:
-                break
-
         url = self._build_url()
         logger.info("STT starting worker subprocess: %s", url)
 
@@ -196,20 +169,22 @@ class SttBridge:
             )
         except Exception as e:
             logger.error("Failed to start STT worker: %s", e)
-            self._should_run = False
             return
 
-        self._reader_thread = threading.Thread(
-            target=self._read_stdout,
-            daemon=True,
-        )
-        self._reader_thread.start()
+        # Set stdin pipe to non-blocking so os.write() never blocks
+        self._stdin_fd = self._process.stdin.fileno()
+        _set_nonblock(self._stdin_fd)
 
-        self._writer_thread = threading.Thread(
-            target=self._writer_loop,
-            daemon=True,
-        )
-        self._writer_thread.start()
+        # Read stdout in eventlet greenlet (this is fine — reading JSON lines
+        # from a pipe cooperates well with eventlet)
+        try:
+            import eventlet
+            self._reader_greenlet = eventlet.spawn(self._read_stdout)
+        except ImportError:
+            # Fallback if eventlet not available
+            import threading
+            t = threading.Thread(target=self._read_stdout, daemon=True)
+            t.start()
 
     def _read_stdout(self):
         """Read JSON lines from worker stdout and forward via callback."""
@@ -240,18 +215,11 @@ class SttBridge:
                 self._on_message({"type": "stt_status", "connected": False})
 
     def _stop_worker(self):
-        self._should_run = False
         self._connected = False
+        self._stdin_fd = None
         proc = self._process
         self._process = None
 
-        # Drain queue
-        while not self._audio_queue.empty():
-            try:
-                self._audio_queue.get_nowait()
-            except queue.Empty:
-                break
-
         if proc is not None:
             try:
                 proc.stdin.close()
@@ -266,8 +234,7 @@ class SttBridge:
                 except Exception:
                     pass
 
-        self._reader_thread = None
-        self._writer_thread = None
+        self._reader_greenlet = None
 
     def stop(self):
         self._stop_worker()