| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- from __future__ import annotations
- import argparse
- import json
- import time
- import wave
- from pathlib import Path
- import numpy as np
- import socketio
- def wav_stats(path: Path) -> dict[str, float | int | str]:
- with wave.open(str(path), "rb") as wf:
- channels = wf.getnchannels()
- sample_rate = wf.getframerate()
- frames = wf.getnframes()
- raw = wf.readframes(frames)
- if not raw:
- return {
- "file": path.name,
- "channels": channels,
- "sample_rate": sample_rate,
- "frames": frames,
- "duration_sec": 0.0,
- "rms": 0.0,
- "peak": 0.0,
- }
- pcm = np.frombuffer(raw, dtype=np.int16)
- if channels > 1:
- pcm = pcm.reshape(-1, channels)[:, 0]
- norm = pcm.astype(np.float32) / 32768.0
- rms = float(np.sqrt(np.mean(norm * norm, dtype=np.float64)))
- peak = float(np.max(np.abs(norm)))
- duration = float(frames / sample_rate) if sample_rate > 0 else 0.0
- return {
- "file": path.name,
- "channels": channels,
- "sample_rate": sample_rate,
- "frames": frames,
- "duration_sec": duration,
- "rms": rms,
- "peak": peak,
- }
- def main() -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument("--url", default="http://127.0.0.1:5000")
- parser.add_argument("--duration", type=float, default=10.0)
- parser.add_argument("--source", default="compare_all")
- parser.add_argument("--agc", action="store_true")
- parser.add_argument("--hifi", action="store_true")
- parser.add_argument("--hifi-mic", default="mic1", choices=["mic1", "mic2"])
- parser.add_argument("--sample-rate", type=int, default=16000)
- parser.add_argument("--timeout", type=float, default=60.0)
- parser.add_argument("--recordings-dir", default="/home/pch/mic_system/recordings")
- args = parser.parse_args()
- sio = socketio.Client(reconnection=False, logger=False, engineio_logger=False)
- state: dict[str, object] = {
- "record_started": False,
- "recording_now": False,
- "filenames": [],
- "last_rec_duration": 0.0,
- "max_rec_duration": 0.0,
- "first_rec_wall": None,
- "last_rec_wall": None,
- "events": [],
- }
- @sio.on("audio_data")
- def on_audio_data(payload: dict) -> None: # type: ignore[no-redef]
- now = time.monotonic()
- recording = bool(payload.get("recording", False))
- rec_duration = float(payload.get("rec_duration", 0.0))
- state["recording_now"] = recording
- state["last_rec_duration"] = rec_duration
- if rec_duration > float(state["max_rec_duration"]):
- state["max_rec_duration"] = rec_duration
- if recording:
- if state["first_rec_wall"] is None:
- state["first_rec_wall"] = now
- state["last_rec_wall"] = now
- @sio.on("server_ack")
- def on_server_ack(payload: dict) -> None: # type: ignore[no-redef]
- events = state["events"]
- assert isinstance(events, list)
- events.append(payload)
- if payload.get("type") == "record_started":
- state["record_started"] = True
- state["filenames"] = list(payload.get("filenames", []))
- if payload.get("type") == "record_stopped":
- state["recording_now"] = False
- sio.connect(args.url)
- sio.emit(
- "client_message",
- {
- "type": "settings",
- "mode": "beamforming",
- "auto_beam": True,
- "sample_rate": int(args.sample_rate),
- "gain_db": 0.0,
- "agc": bool(args.agc),
- "attack_ms": 5.0,
- "release_ms": 300.0,
- "hifi_mode": bool(args.hifi),
- "hifi_mic": str(args.hifi_mic),
- "monitor_on": False,
- },
- )
- time.sleep(0.4)
- wall_start = time.monotonic()
- sio.emit(
- "client_message",
- {
- "type": "record_start",
- "source": args.source,
- "duration_sec": float(args.duration),
- },
- )
- deadline = wall_start + float(args.timeout)
- saw_recording = False
- quiet_ticks = 0
- while time.monotonic() < deadline:
- time.sleep(0.2)
- recording_now = bool(state["recording_now"])
- if recording_now:
- saw_recording = True
- quiet_ticks = 0
- continue
- if saw_recording:
- quiet_ticks += 1
- if quiet_ticks >= 8:
- break
- if bool(state["recording_now"]):
- sio.emit("client_message", {"type": "record_stop"})
- time.sleep(0.5)
- wall_end = time.monotonic()
- sio.disconnect()
- filenames = [str(x) for x in state["filenames"] if isinstance(x, str) and x]
- rec_dir = Path(args.recordings_dir)
- stats = []
- for name in filenames:
- path = rec_dir / name
- wait_deadline = time.monotonic() + 5.0
- while (not path.exists()) and time.monotonic() < wait_deadline:
- time.sleep(0.1)
- if path.exists():
- stats.append(wav_stats(path))
- else:
- stats.append({"file": name, "error": "missing"})
- first_rec_wall = state["first_rec_wall"]
- last_rec_wall = state["last_rec_wall"]
- rec_wall = 0.0
- if isinstance(first_rec_wall, float) and isinstance(last_rec_wall, float) and last_rec_wall >= first_rec_wall:
- rec_wall = last_rec_wall - first_rec_wall
- out = {
- "agc": bool(args.agc),
- "hifi": bool(args.hifi),
- "hifi_mic": str(args.hifi_mic),
- "source": args.source,
- "target_duration_sec": float(args.duration),
- "wall_elapsed_sec": wall_end - wall_start,
- "recording_wall_sec": rec_wall,
- "reported_rec_duration_sec": float(state["last_rec_duration"]),
- "reported_max_rec_duration_sec": float(state["max_rec_duration"]),
- "record_started": bool(state["record_started"]),
- "recordings": stats,
- }
- print(json.dumps(out, ensure_ascii=True))
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|