Răsfoiți Sursa

Fix STT: UDP audio + HTTP callback — zero blocking in eventlet

All previous approaches blocked the eventlet hub:
- threads: eventlet monkey-patches threading.Thread → green threads
- pipes: stdin.write() blocks when buffer full
- O_NONBLOCK pipe: proc.stdout read still blocks hub
- UDP recv: socket.recvfrom blocks hub even with eventlet

Solution: worker receives audio via UDP (non-blocking sendto from
audio callback), sends results back via HTTP POST to /internal/stt
Flask endpoint. Flask handles HTTP natively in eventlet — no blocking.
Paweł Chodaczek 1 lună în urmă
părinte
comite
b12a327ce9
3 a modificat fișierele cu 89 adăugiri și 122 ștergeri
  1. 10 2
      app.py
  2. 37 86
      stt_bridge.py
  3. 42 34
      stt_worker.py

+ 10 - 2
app.py

@@ -23,7 +23,7 @@ audio_engine = AudioEngine(RECORDINGS_DIR)
 
 
 def _stt_message_callback(msg):
-    """Called from SttBridge background thread when STT sends a message."""
+    """Called when STT worker posts a result via /internal/stt."""
     socketio.emit("stt_message", msg)
 
 
@@ -70,6 +70,15 @@ def api_status():
     return jsonify(status)
 
 
+@app.route("/internal/stt", methods=["POST"])
+def internal_stt():
+    """Receives JSON messages from stt_worker subprocess via HTTP POST."""
+    msg = request.get_json(silent=True)
+    if msg:
+        stt_bridge.handle_worker_message(msg)
+    return "", 204
+
+
 @app.route("/api/recordings")
 def api_list_recordings():
     return jsonify(list_recordings(RECORDINGS_DIR))
@@ -108,7 +117,6 @@ def _handle_command(payload: dict) -> dict:
 
     if msg_type == "settings":
         settings = audio_engine.update_settings(payload)
-        # Handle STT settings
         stt_keys = {k: v for k, v in payload.items() if k.startswith("stt_")}
         if stt_keys:
             stt_settings = stt_bridge.update_settings(**stt_keys)

+ 37 - 86
stt_bridge.py

@@ -1,19 +1,15 @@
 """Bridge between AudioEngine and stt.mm.mk WebSocket STT service.
 
-Runs STT WebSocket in a **subprocess** to avoid conflicts with eventlet.
-Communication: stdin (length-prefixed PCM binary) / stdout (JSON lines).
-
-CRITICAL: This module runs inside an eventlet-patched process.
-threading.Thread = green thread, so we CANNOT use threads for I/O.
-Instead, feed_audio() uses os.write() with O_NONBLOCK on the pipe fd
-to avoid blocking the audio callback or the eventlet hub.
+Architecture:
+- Audio out: UDP sendto() from audio callback — non-blocking, zero copies
+- Results in: Worker does HTTP POST to Flask /internal/stt endpoint
+  Flask handles it natively in eventlet — no threads, no pipes, no sockets to manage.
 """
 from __future__ import annotations
 
-import fcntl
-import json
 import logging
 import os
+import socket
 import subprocess
 import time
 from dataclasses import dataclass
@@ -26,6 +22,8 @@ logger = logging.getLogger("stt_bridge")
 _WORKER_PATH = str(Path(__file__).resolve().parent / "stt_worker.py")
 _VENV_PYTHON = str(Path(__file__).resolve().parent / ".venv" / "bin" / "python")
 
+AUDIO_PORT = 19876
+
 
 @dataclass
 class SttSettings:
@@ -41,12 +39,6 @@ class SttSettings:
     vad_min_ms: int = 100
 
 
-def _set_nonblock(fd):
-    """Set a file descriptor to non-blocking mode."""
-    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
-    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
-
 class SttBridge:
     """Manages subprocess STT worker that connects to stt.mm.mk."""
 
@@ -56,10 +48,9 @@ class SttBridge:
         self._settings = SttSettings()
         self._on_message = on_message
         self._process: subprocess.Popen | None = None
-        self._stdin_fd: int | None = None  # raw fd for non-blocking writes
+        self._audio_sock: socket.socket | None = None
         self._connected = False
         self._sample_rate = 16000
-        self._reader_greenlet = None
 
     def get_settings(self) -> dict:
         return {
@@ -76,6 +67,13 @@ class SttBridge:
             "stt_connected": self._connected,
         }
 
+    def handle_worker_message(self, msg: dict):
+        """Called by Flask when worker POSTs a result."""
+        if msg.get("type") == "stt_status":
+            self._connected = bool(msg.get("connected", False))
+        if self._on_message:
+            self._on_message(msg)
+
     def update_settings(self, **kwargs) -> dict:
         reconnect_keys = {
             "language", "timestamps", "diarize", "itn",
@@ -107,28 +105,17 @@ class SttBridge:
         return self.get_settings()
 
     def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
-        """Feed processed audio to STT. Completely non-blocking.
-
-        Uses os.write() with O_NONBLOCK on the pipe fd.
-        If pipe is full, silently drops the chunk.
-        """
-        fd = self._stdin_fd
-        if fd is None or not self._settings.enabled:
+        """Feed processed audio to STT via UDP. Non-blocking."""
+        sock = self._audio_sock
+        if sock is None or not self._settings.enabled:
             return
 
         self._sample_rate = sample_rate
         pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
-        payload = pcm16.tobytes()
-        header = len(payload).to_bytes(4, "little")
-        data = header + payload
 
         try:
-            os.write(fd, data)
-        except BlockingIOError:
-            # Pipe full — drop chunk, never block
-            pass
-        except OSError:
-            # Pipe broken — subprocess died
+            sock.sendto(pcm16.tobytes(), ("127.0.0.1", AUDIO_PORT))
+        except (OSError, BlockingIOError):
             pass
 
     def _build_url(self) -> str:
@@ -155,76 +142,42 @@ class SttBridge:
 
     def _start_worker(self):
         url = self._build_url()
-        logger.info("STT starting worker subprocess: %s", url)
+        logger.info("STT starting worker: %s", url)
 
         python = _VENV_PYTHON if os.path.exists(_VENV_PYTHON) else "python3"
 
+        self._audio_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self._audio_sock.setblocking(False)
+
         try:
             self._process = subprocess.Popen(
-                [python, _WORKER_PATH, url],
-                stdin=subprocess.PIPE,
-                stdout=subprocess.PIPE,
+                [python, _WORKER_PATH, url,
+                 "--audio-port", str(AUDIO_PORT),
+                 "--callback-url", "http://127.0.0.1:5000/internal/stt"],
+                stdin=subprocess.DEVNULL,
+                stdout=subprocess.DEVNULL,
                 stderr=subprocess.DEVNULL,
-                bufsize=0,
             )
         except Exception as e:
             logger.error("Failed to start STT worker: %s", e)
+            if self._audio_sock:
+                self._audio_sock.close()
+                self._audio_sock = None
             return
 
-        # Set stdin pipe to non-blocking so os.write() never blocks
-        self._stdin_fd = self._process.stdin.fileno()
-        _set_nonblock(self._stdin_fd)
-
-        # Read stdout in eventlet greenlet (this is fine — reading JSON lines
-        # from a pipe cooperates well with eventlet)
-        try:
-            import eventlet
-            self._reader_greenlet = eventlet.spawn(self._read_stdout)
-        except ImportError:
-            # Fallback if eventlet not available
-            import threading
-            t = threading.Thread(target=self._read_stdout, daemon=True)
-            t.start()
-
-    def _read_stdout(self):
-        """Read JSON lines from worker stdout and forward via callback."""
-        proc = self._process
-        if proc is None:
-            return
-
-        try:
-            for line in proc.stdout:
-                line = line.strip()
-                if not line:
-                    continue
-                try:
-                    msg = json.loads(line)
-                except (json.JSONDecodeError, ValueError):
-                    continue
-
-                if msg.get("type") == "stt_status":
-                    self._connected = bool(msg.get("connected", False))
-
-                if self._on_message:
-                    self._on_message(msg)
-        except Exception:
-            pass
-        finally:
-            self._connected = False
-            if self._on_message:
-                self._on_message({"type": "stt_status", "connected": False})
-
     def _stop_worker(self):
         self._connected = False
-        self._stdin_fd = None
         proc = self._process
         self._process = None
 
-        if proc is not None:
+        if self._audio_sock:
             try:
-                proc.stdin.close()
+                self._audio_sock.close()
             except Exception:
                 pass
+            self._audio_sock = None
+
+        if proc is not None:
             try:
                 proc.terminate()
                 proc.wait(timeout=3)
@@ -234,7 +187,5 @@ class SttBridge:
                 except Exception:
                     pass
 
-        self._reader_greenlet = None
-
     def stop(self):
         self._stop_worker()

+ 42 - 34
stt_worker.py

@@ -1,63 +1,70 @@
 #!/usr/bin/env python3
 """STT worker subprocess — connects to stt.mm.mk via WebSocket.
 
-Reads raw PCM int16 binary chunks from stdin, sends to STT server.
-Writes JSON messages (one per line) to stdout.
+Receives PCM via UDP, sends to STT server.
+Posts JSON results back to Flask via HTTP POST.
 
-Usage: python3 stt_worker.py <ws_url>
+Usage: python3 stt_worker.py <ws_url> --audio-port 19876 --callback-url http://127.0.0.1:5000/internal/stt
 """
+import argparse
 import json
-import sys
+import socket
 import threading
-import time
+import urllib.request
 
 import websocket
 
+
 def main():
-    if len(sys.argv) < 2:
-        sys.exit(1)
+    parser = argparse.ArgumentParser()
+    parser.add_argument("url")
+    parser.add_argument("--audio-port", type=int, default=19876)
+    parser.add_argument("--callback-url", default="http://127.0.0.1:5000/internal/stt")
+    args = parser.parse_args()
 
-    url = sys.argv[1]
     connected = False
     ws = None
 
+    def emit(obj):
+        try:
+            data = json.dumps(obj).encode("utf-8")
+            req = urllib.request.Request(
+                args.callback_url,
+                data=data,
+                headers={"Content-Type": "application/json"},
+                method="POST",
+            )
+            urllib.request.urlopen(req, timeout=2)
+        except Exception:
+            pass
+
     def on_open(w):
         nonlocal connected
         connected = True
-        _emit({"type": "stt_status", "connected": True})
+        emit({"type": "stt_status", "connected": True})
 
     def on_message(w, message):
         try:
-            msg = json.loads(message)
-            _emit(msg)
+            emit(json.loads(message))
         except Exception:
             pass
 
     def on_error(w, error):
-        _emit({"type": "stt_error", "error": str(error)})
+        pass
 
     def on_close(w, code, msg):
         nonlocal connected
         connected = False
-        _emit({"type": "stt_status", "connected": False})
-
-    def _emit(obj):
-        try:
-            line = json.dumps(obj, ensure_ascii=True)
-            sys.stdout.write(line + "\n")
-            sys.stdout.flush()
-        except Exception:
-            pass
+        emit({"type": "stt_status", "connected": False})
 
     ws = websocket.WebSocketApp(
-        url,
+        args.url,
         on_open=on_open,
         on_message=on_message,
         on_error=on_error,
         on_close=on_close,
     )
 
-    # Run WebSocket in background thread
     ws_thread = threading.Thread(
         target=ws.run_forever,
         kwargs={"ping_interval": 20, "ping_timeout": 10},
@@ -65,29 +72,30 @@ def main():
     )
     ws_thread.start()
 
-    # Read PCM chunks from stdin and forward to WebSocket
+    # Listen for audio on UDP
+    audio_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    audio_sock.bind(("127.0.0.1", args.audio_port))
+    audio_sock.settimeout(1.0)
+
     try:
         while True:
-            # Read 4-byte length prefix, then that many bytes of PCM
-            header = sys.stdin.buffer.read(4)
-            if not header or len(header) < 4:
-                break
-            length = int.from_bytes(header, "little")
-            if length <= 0 or length > 1_000_000:
+            try:
+                data, _ = audio_sock.recvfrom(65536)
+            except socket.timeout:
                 continue
-            data = sys.stdin.buffer.read(length)
-            if not data or len(data) < length:
+            except OSError:
                 break
-            if connected and ws:
+            if connected and ws and data:
                 try:
                     ws.send(data, opcode=0x2)
                 except Exception:
                     pass
-    except (BrokenPipeError, KeyboardInterrupt):
+    except KeyboardInterrupt:
         pass
     finally:
         if ws:
             ws.close()
+        audio_sock.close()
 
 
 if __name__ == "__main__":