Browse Source

Fix STT lag: decouple audio callback from pipe writes

feed_audio() was writing to subprocess stdin synchronously inside the
audio callback. When the pipe buffer filled up (subprocess slow to read),
it blocked the entire DSP pipeline causing ~1 FPS waveform updates.

Now feed_audio() puts data in a bounded queue (non-blocking, drops on
overflow) and a separate writer thread drains the queue to stdin.
Paweł Chodaczek 1 month ago
parent
commit
1a6a61fd7f
1 changed files with 70 additions and 11 deletions
  1. 70 11
      stt_bridge.py

+ 70 - 11
stt_bridge.py

@@ -2,12 +2,15 @@
 
 Runs STT WebSocket in a **subprocess** to avoid conflicts with eventlet.
 Communication: stdin (length-prefixed PCM binary) / stdout (JSON lines).
+feed_audio() is non-blocking: puts data in a bounded queue, a writer
+thread drains the queue to subprocess stdin.
 """
 from __future__ import annotations
 
 import json
 import logging
 import os
+import queue
 import subprocess
 import threading
 import time
@@ -40,6 +43,7 @@ class SttBridge:
     """Manages subprocess STT worker that connects to stt.mm.mk."""
 
     STT_URL = "wss://stt.mm.mk/ws/transcribe"
+    MAX_QUEUE = 30
 
     def __init__(self, on_message=None):
         self._lock = threading.Lock()
@@ -47,6 +51,9 @@ class SttBridge:
         self._on_message = on_message
         self._process: subprocess.Popen | None = None
         self._reader_thread: threading.Thread | None = None
+        self._writer_thread: threading.Thread | None = None
+        self._audio_queue: queue.Queue[bytes] = queue.Queue(maxsize=self.MAX_QUEUE)
+        self._should_run = False
         self._connected = False
         self._sample_rate = 16000
 
@@ -98,24 +105,49 @@ class SttBridge:
         return self.get_settings()
 
     def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
-        """Feed processed audio to STT subprocess. Non-blocking."""
-        proc = self._process
-        if proc is None or proc.poll() is not None:
-            return
-        if not self._settings.enabled:
+        """Feed processed audio to STT. Completely non-blocking.
+
+        Puts length-prefixed PCM into a bounded queue. If queue is full,
+        drops the chunk silently (better than blocking the audio callback).
+        """
+        if not self._should_run or not self._settings.enabled:
             return
 
         self._sample_rate = sample_rate
         pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
         payload = pcm16.tobytes()
+        header = len(payload).to_bytes(4, "little")
 
         try:
-            # Length-prefixed binary: 4 bytes little-endian length + PCM data
-            header = len(payload).to_bytes(4, "little")
-            proc.stdin.write(header + payload)
-            proc.stdin.flush()
-        except (BrokenPipeError, OSError):
-            pass
+            self._audio_queue.put_nowait(header + payload)
+        except queue.Full:
+            # Drop — never block the audio callback
+            try:
+                self._audio_queue.get_nowait()  # drop oldest
+            except queue.Empty:
+                pass
+            try:
+                self._audio_queue.put_nowait(header + payload)
+            except queue.Full:
+                pass
+
+    def _writer_loop(self):
+        """Background thread: drains queue → writes to subprocess stdin."""
+        while self._should_run:
+            try:
+                data = self._audio_queue.get(timeout=0.2)
+            except queue.Empty:
+                continue
+
+            proc = self._process
+            if proc is None or proc.poll() is not None:
+                continue
+
+            try:
+                proc.stdin.write(data)
+                proc.stdin.flush()
+            except (BrokenPipeError, OSError):
+                pass
 
     def _build_url(self) -> str:
         s = self._settings
@@ -140,6 +172,15 @@ class SttBridge:
         return self.STT_URL + "?" + "&".join(parts)
 
     def _start_worker(self):
+        self._should_run = True
+
+        # Drain queue
+        while not self._audio_queue.empty():
+            try:
+                self._audio_queue.get_nowait()
+            except queue.Empty:
+                break
+
         url = self._build_url()
         logger.info("STT starting worker subprocess: %s", url)
 
@@ -155,6 +196,7 @@ class SttBridge:
             )
         except Exception as e:
             logger.error("Failed to start STT worker: %s", e)
+            self._should_run = False
             return
 
         self._reader_thread = threading.Thread(
@@ -163,6 +205,12 @@ class SttBridge:
         )
         self._reader_thread.start()
 
+        self._writer_thread = threading.Thread(
+            target=self._writer_loop,
+            daemon=True,
+        )
+        self._writer_thread.start()
+
     def _read_stdout(self):
         """Read JSON lines from worker stdout and forward via callback."""
         proc = self._process
@@ -192,9 +240,18 @@ class SttBridge:
                 self._on_message({"type": "stt_status", "connected": False})
 
     def _stop_worker(self):
+        self._should_run = False
         self._connected = False
         proc = self._process
         self._process = None
+
+        # Drain queue
+        while not self._audio_queue.empty():
+            try:
+                self._audio_queue.get_nowait()
+            except queue.Empty:
+                break
+
         if proc is not None:
             try:
                 proc.stdin.close()
@@ -208,7 +265,9 @@ class SttBridge:
                     proc.kill()
                 except Exception:
                     pass
+
         self._reader_thread = None
+        self._writer_thread = None
 
     def stop(self):
         self._stop_worker()