Explorar o código

Fix STT: non-blocking audio feed via queue + sender thread

feed_audio() was doing synchronous ws.send() inside the audio callback,
blocking the entire DSP pipeline on RPi Zero 2W. Now uses a bounded
queue (max 50 chunks) drained by a dedicated sender thread. Drops
oldest chunk on overflow instead of blocking.
Paweł Chodaczek hai 1 mes
pai
achega
b2694d72aa
Modificáronse 1 ficheiros con 57 adicións e 5 borrados
  1. 57 5
      stt_bridge.py

+ 57 - 5
stt_bridge.py

@@ -2,11 +2,13 @@
 
 Connects to wss://stt.mm.mk/ws/transcribe, streams processed audio,
 and forwards transcription results back via a callback.
+Uses a queue + sender thread so feed_audio() never blocks the audio callback.
 """
 from __future__ import annotations
 
 import json
 import logging
+import queue
 import threading
 import time
 from dataclasses import dataclass
@@ -40,6 +42,8 @@ class SttBridge:
     """Manages WebSocket connection to stt.mm.mk and streams audio."""
 
     STT_URL = "wss://stt.mm.mk/ws/transcribe"
+    # Max queued audio chunks before dropping (prevent memory buildup)
+    MAX_QUEUE = 50
 
     def __init__(self, on_message=None):
         self._lock = threading.Lock()
@@ -47,6 +51,8 @@ class SttBridge:
         self._on_message = on_message
         self._ws = None
         self._ws_thread: threading.Thread | None = None
+        self._sender_thread: threading.Thread | None = None
+        self._audio_queue: queue.Queue[bytes] = queue.Queue(maxsize=self.MAX_QUEUE)
         self._connected = False
         self._should_run = False
         self._sample_rate = 16000
@@ -99,7 +105,7 @@ class SttBridge:
         return self.get_settings()
 
     def feed_audio(self, audio: np.ndarray, sample_rate: int) -> None:
-        """Feed processed audio (post-beamforming/AGC) to STT."""
+        """Feed processed audio to STT. Non-blocking — drops if queue is full."""
         if not self._connected or not self._settings.enabled:
             return
 
@@ -107,10 +113,33 @@ class SttBridge:
         pcm16 = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16)
 
         try:
-            if self._ws and self._connected:
-                self._ws.send(pcm16.tobytes(), opcode=0x2)
-        except Exception:
-            pass
+            self._audio_queue.put_nowait(pcm16.tobytes())
+        except queue.Full:
+            # Drop oldest chunk to make room
+            try:
+                self._audio_queue.get_nowait()
+            except queue.Empty:
+                pass
+            try:
+                self._audio_queue.put_nowait(pcm16.tobytes())
+            except queue.Full:
+                pass
+
+    def _sender_loop(self):
+        """Background thread that drains the queue and sends to WebSocket."""
+        while self._should_run:
+            try:
+                data = self._audio_queue.get(timeout=0.2)
+            except queue.Empty:
+                continue
+
+            if not self._connected or self._ws is None:
+                continue
+
+            try:
+                self._ws.send(data, opcode=0x2)
+            except Exception:
+                pass
 
     def _build_url(self) -> str:
         s = self._settings
@@ -140,6 +169,14 @@ class SttBridge:
             return
 
         self._should_run = True
+
+        # Clear queue
+        while not self._audio_queue.empty():
+            try:
+                self._audio_queue.get_nowait()
+            except queue.Empty:
+                break
+
         url = self._build_url()
         logger.info("STT connecting to %s", url)
 
@@ -179,6 +216,7 @@ class SttBridge:
             on_error=on_error,
             on_close=on_close,
         )
+
         self._ws_thread = threading.Thread(
             target=self._ws.run_forever,
             kwargs={"ping_interval": 20, "ping_timeout": 10},
@@ -186,6 +224,13 @@ class SttBridge:
         )
         self._ws_thread.start()
 
+        # Start sender thread
+        self._sender_thread = threading.Thread(
+            target=self._sender_loop,
+            daemon=True,
+        )
+        self._sender_thread.start()
+
     def _stop_connection(self):
         self._should_run = False
         self._connected = False
@@ -196,6 +241,13 @@ class SttBridge:
                 pass
             self._ws = None
         self._ws_thread = None
+        self._sender_thread = None
+        # Drain queue
+        while not self._audio_queue.empty():
+            try:
+                self._audio_queue.get_nowait()
+            except queue.Empty:
+                break
 
     def stop(self):
         self._stop_connection()