/** * openai.ts - OpenAI-compatible HTTP embedding provider * * Talks to any endpoint that implements `POST /v1/embeddings` with the OpenAI * shape: request `{model, input: string|string[]}`, response * `{data: [{embedding: number[], index: number}, ...]}`. * * Used by qmd to delegate embeddings to a GPU worker (e.g. ai.mm.mk → * qmd-embed-worker on `models` LXC, RTX 4090) instead of running * node-llama-cpp locally. * * Features: * - Batches input in groups of ≤64 (configurable via QMD_EMBED_BATCH_SIZE) * - Retries 429 / 503 with exponential backoff (1s, 4s, 16s) * - 4xx (non-429) → no retry, count as failure * - Circuit breaker: >50% failures in a 60s window → OPEN for 5 min, * callers can use this to fall back to a local provider * - Per-call timeout via AbortSignal (default QMD_EMBED_TIMEOUT_MS=30000) * - Healthcheck via `GET /health` if available, else a probe embed call */ // ─────────────────────────── Configuration ─────────────────────────────────── /** * Default batch size — most OpenAI-compatible embedding endpoints accept up to * 2048 inputs per call but for memory and latency we cap at 64. */ export const DEFAULT_BATCH_SIZE = 64; /** * Default in-flight concurrency cap for `embedBatch`. The qmd-embed-worker * exposes a 4-way semaphore (`MAX_CONCURRENT_REQUESTS=4`) and idles at * queue-depth 1.0 under sequential clients (i-fkpnar9i baseline). Defaulting * to 4 matches the worker's advertised concurrency without overshooting the * GPU. Override per-deploy via `QMD_EMBED_CONCURRENCY`. Setting to 1 reverts * to the legacy sequential dispatch. */ export const DEFAULT_CONCURRENCY = 4; /** * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes * <500ms per batch of 64 in practice; 30s is a safe upper bound. */ export const DEFAULT_TIMEOUT_MS = 30_000; /** * Retry backoff schedule (ms) for 429/503 responses. 3 attempts total * (initial + 2 retries) — aligns with issue spec "1s/4s/16s". */ export const RETRY_BACKOFFS_MS = [1_000, 4_000, 16_000]; /** * Circuit breaker — flips OPEN when error rate exceeds threshold within * window. While OPEN, every call fails fast so the caller can fall back. */ export const CIRCUIT_WINDOW_MS = 60_000; export const CIRCUIT_OPEN_DURATION_MS = 5 * 60_000; export const CIRCUIT_FAILURE_RATE_THRESHOLD = 0.5; export const CIRCUIT_MIN_SAMPLES = 4; // ─────────────────────────── Helpers ───────────────────────────────────────── function defaultSleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } /** * Build the merged AbortSignal for a single HTTP attempt: combines an * external `userSignal` (from caller / withLLMSession) with a per-attempt * timeout signal. Returns the merged signal AND the timeout id so the * caller can `clearTimeout` after the attempt completes (avoids leaks). */ function buildAttemptSignal(userSignal, timeoutMs) { const ctrl = new AbortController(); const timeoutId = setTimeout(() => { ctrl.abort(new Error(`Request timed out after ${timeoutMs}ms`)); }, timeoutMs); // Don't keep process alive just for this timer if (typeof timeoutId === "object" && timeoutId !== null && "unref" in timeoutId) { timeoutId.unref(); } const onUserAbort = () => ctrl.abort(userSignal?.reason); if (userSignal) { if (userSignal.aborted) { ctrl.abort(userSignal.reason); } else { userSignal.addEventListener("abort", onUserAbort, { once: true }); } } const cleanup = () => { clearTimeout(timeoutId); if (userSignal) userSignal.removeEventListener("abort", onUserAbort); }; return { signal: ctrl.signal, cleanup }; } /** * Determine whether an HTTP status is retryable. 429 (Too Many Requests) * and 503 (Service Unavailable) are retried; 4xx (other than 429) are not. */ export function isRetryableStatus(status) { return status === 429 || status === 503; } /** * Chunk an array into pieces of ≤ size each. `size` MUST be ≥ 1. */ export function chunkArray(items, size) { if (size < 1) throw new Error(`chunkArray: size must be ≥ 1, got ${size}`); if (items.length <= size) return items.length === 0 ? [] : [items]; const out = []; for (let i = 0; i < items.length; i += size) { out.push(items.slice(i, i + size)); } return out; } // ─────────────────────────── Circuit Breaker ───────────────────────────────── /** * Sliding-window circuit breaker. Tracks the last N samples (min 4) over a * 60-second window; flips OPEN when failure rate exceeds 50%, then auto- * resets to HALF-OPEN after 5 minutes — at which point the next probe * decides whether to close (success) or re-open (failure). */ export class CircuitBreaker { samples = []; state = "closed"; openedAt = null; windowMs; openDurationMs; threshold; minSamples; now; constructor(opts = {}) { this.windowMs = opts.windowMs ?? CIRCUIT_WINDOW_MS; this.openDurationMs = opts.openDurationMs ?? CIRCUIT_OPEN_DURATION_MS; this.threshold = opts.threshold ?? CIRCUIT_FAILURE_RATE_THRESHOLD; this.minSamples = opts.minSamples ?? CIRCUIT_MIN_SAMPLES; this.now = opts.now ?? Date.now; } getState() { this.tickAutoReset(); return this.state; } /** * Returns true when calls should be short-circuited (skip HTTP, fall back). * Side-effects: may transition OPEN → HALF-OPEN if the open window expired. */ shouldFailFast() { return this.getState() === "open"; } /** Record a successful call. */ recordSuccess() { // Honor the time-based OPEN→HALF-OPEN transition before deciding what // to do with this sample. Without this, a success that lands AFTER the // open window expired would still see state==="open" and never close // the breaker (a probe call could only flip it via getState()). this.tickAutoReset(); this.pushSample(true); if (this.state === "half-open") { this.state = "closed"; this.openedAt = null; } } /** Record a failed call. May trigger OPEN. */ recordFailure() { // Same reasoning as recordSuccess — apply lazy auto-reset before // classifying the sample. this.tickAutoReset(); this.pushSample(false); if (this.state === "half-open") { // Probe failed — re-open this.state = "open"; this.openedAt = this.now(); return; } if (this.state === "closed") this.evaluate(); } /** Force-reset the breaker (used by tests / admin) */ reset() { this.samples = []; this.state = "closed"; this.openedAt = null; } pushSample(ok) { const ts = this.now(); this.samples.push({ ts, ok }); // Drop samples outside the window const cutoff = ts - this.windowMs; while (this.samples.length > 0 && this.samples[0].ts < cutoff) { this.samples.shift(); } } evaluate() { if (this.samples.length < this.minSamples) return; const failures = this.samples.filter((s) => !s.ok).length; const rate = failures / this.samples.length; if (rate > this.threshold) { this.state = "open"; this.openedAt = this.now(); } } tickAutoReset() { if (this.state === "open" && this.openedAt !== null) { if (this.now() - this.openedAt >= this.openDurationMs) { this.state = "half-open"; } } } } // ─────────────────────────── Errors ────────────────────────────────────────── /** * Raised when the circuit breaker is OPEN and a call is short-circuited. * Callers (e.g. fallback wrapper) can catch this to switch to local provider. */ export class CircuitOpenError extends Error { constructor(message = "OpenAIEmbeddingsProvider circuit is OPEN") { super(message); this.name = "CircuitOpenError"; } } /** * Persistent (non-retryable) HTTP error from upstream. Includes status code. */ export class HttpError extends Error { status; bodyPreview; constructor(status, bodyPreview) { super(`HTTP ${status}: ${bodyPreview.slice(0, 200)}`); this.name = "HttpError"; this.status = status; this.bodyPreview = bodyPreview.slice(0, 1024); } } // ─────────────────────────── Provider ──────────────────────────────────────── export class OpenAIEmbeddingsProvider { kind = "openai"; endpoint; apiKey; modelId; upstreamModel; batchSize; concurrency; timeoutMs; fetchImpl; retryBackoffsMs; sleep; now; dimensions = undefined; lastError = undefined; breaker; constructor(config) { if (!config.endpoint) { throw new Error("OpenAIEmbeddingsProvider: endpoint is required"); } this.endpoint = config.endpoint.replace(/\/+$/, ""); this.apiKey = config.apiKey; this.modelId = config.modelId ?? "embeddinggemma"; this.upstreamModel = config.upstreamModel ?? this.modelId; this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE; this.concurrency = config.concurrency ?? DEFAULT_CONCURRENCY; this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS; this.fetchImpl = config.fetchImpl ?? globalThis.fetch; this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS; this.sleep = config.sleep ?? defaultSleep; this.now = config.now ?? Date.now; this.breaker = new CircuitBreaker({ now: this.now }); if (!this.fetchImpl) { throw new Error("OpenAIEmbeddingsProvider: global fetch is unavailable. " + "Provide a `fetchImpl` config option (Node ≥18 ships fetch by default)."); } if (this.batchSize < 1) { throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`); } if (this.concurrency < 1) { throw new Error(`OpenAIEmbeddingsProvider: concurrency must be ≥ 1, got ${this.concurrency}`); } } getModelId() { return this.modelId; } getDimensions() { return this.dimensions; } /** * Most recent per-chunk failure message (HTTP status + body preview, malformed * JSON, timeout, abort reason). Returns `undefined` after a successful call * or before the first call. See `EmbeddingProvider.getLastError`. */ getLastError() { return this.lastError; } /** Endpoint URL configured at construction time — used by callers when * building error messages for failed first-chunk probes. */ getEndpoint() { return this.endpoint; } async healthcheck(signal) { // Try GET /health first (worker exposes it). Fall back to probe embed. try { const { signal: attemptSig, cleanup } = buildAttemptSignal(signal, this.timeoutMs); try { const resp = await this.fetchImpl(`${this.endpoint}/health`, { method: "GET", headers: this.buildHeaders(), signal: attemptSig, }); if (resp.ok) { return { ok: true, model: this.modelId, dimensions: this.dimensions, detail: `GET /health → ${resp.status}`, }; } return { ok: false, model: this.modelId, detail: `GET /health → HTTP ${resp.status}`, }; } finally { cleanup(); } } catch (err) { // Endpoint may not implement /health — try a single embed probe instead. try { const probe = await this.embed("healthcheck", { signal }); if (probe) { return { ok: true, model: this.modelId, dimensions: probe.embedding.length, detail: "embed probe ok", }; } return { ok: false, model: this.modelId, detail: "embed probe returned null", }; } catch (probeErr) { return { ok: false, model: this.modelId, detail: (err instanceof Error ? err.message : String(err)) + " | probe: " + (probeErr instanceof Error ? probeErr.message : String(probeErr)), }; } } } async embed(text, options = {}) { const batch = await this.embedBatch([text], options); return batch[0] ?? null; } async embedBatch(texts, options = {}) { if (texts.length === 0) return []; if (this.breaker.shouldFailFast()) { throw new CircuitOpenError(); } const chunks = chunkArray(texts, this.batchSize); const results = new Array(texts.length).fill(null); // Pre-compute the input-array starting position for each chunk so each // worker can write its slice of `results` independently — input order is // preserved end-to-end without a final re-sort step. const chunkStarts = new Array(chunks.length); { let cursor = 0; for (let i = 0; i < chunks.length; i++) { chunkStarts[i] = cursor; cursor += chunks[i].length; } } // Shared state across the worker pool. Each transition is final-write, // so plain JS scalars are safe — no atomics or locks needed since // workers only contend on these via cooperative-scheduled awaits. let nextChunkIdx = 0; let anySucceeded = false; let aborted = false; let circuitTrippedDuringRun = null; // Workers run as parallel async tasks pulling chunks off `nextChunkIdx` // until the queue is drained or one of the early-exit flags is set. // Concurrency is capped at min(this.concurrency, chunks.length) so we // don't spin up idle workers for tiny inputs. const workerCount = Math.min(this.concurrency, chunks.length); const dispatchOne = async () => { while (true) { if (aborted || circuitTrippedDuringRun) return; const idx = nextChunkIdx++; if (idx >= chunks.length) return; const chunk = chunks[idx]; const start = chunkStarts[idx]; // Honor abort/breaker BEFORE issuing the request so we don't waste // network for a dispatch we know will be discarded. if (options.signal?.aborted) { aborted = true; this.lastError = `aborted by caller${options.signal.reason ? `: ${String(options.signal.reason)}` : ""}`; return; } if (this.breaker.shouldFailFast()) { // Capture the breaker-open intent so we throw it AFTER all // currently in-flight workers settle, instead of leaking // half-completed results. The thrown error is a fresh instance // (matching legacy behavior). circuitTrippedDuringRun = new CircuitOpenError(); return; } try { const embeddings = await this.requestWithRetry(chunk, options); for (let i = 0; i < chunk.length; i++) { const embedding = embeddings[i]; if (embedding) { results[start + i] = { embedding, model: this.modelId, }; anySucceeded = true; // Record dimensions on first success. Concurrent workers may // race on this assignment, but they all observe the same // length so the race is benign. if (this.dimensions === undefined) { this.dimensions = embedding.length; } } } this.breaker.recordSuccess(); } catch (err) { this.breaker.recordFailure(); if (err instanceof CircuitOpenError) { circuitTrippedDuringRun = err; return; } // Last-write-wins on lastError matches the legacy semantics — under // concurrency multiple workers may fail in the same call, but the // lastError just needs to surface "the most recent cause." this.lastError = this.formatErrorContext(err); if (process.env.QMD_EMBED_DEBUG) { process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`); } } } }; await Promise.all(Array.from({ length: workerCount }, () => dispatchOne())); // If a worker observed `shouldFailFast()` mid-run, surface the error // after all in-flight workers have settled. if (circuitTrippedDuringRun) throw circuitTrippedDuringRun; // Clear lastError on a fully-successful sweep (every input got an embedding). if (anySucceeded && results.every((r) => r !== null)) { this.lastError = undefined; } return results; } async dispose() { // Nothing to release — fetch handles its own connection pooling. // Reset the breaker so a re-instantiation starts fresh. this.breaker.reset(); } // ────────────────────── Internals ────────────────────── /** * Format a request-failure context string for `lastError`. Includes endpoint * + HTTP status + body preview when the error was an `HttpError`, otherwise * falls back to the message of the underlying error (or the value itself * when not an Error). Kept short — body preview is already capped at 1024 * chars by `HttpError`, but we trim further here for the dimension-probe * thrown error which surfaces directly to users. */ formatErrorContext(err) { if (err instanceof HttpError) { const preview = err.bodyPreview.replace(/\s+/g, " ").trim().slice(0, 240); return `endpoint=${this.endpoint}/v1/embeddings status=${err.status}${preview ? ` body="${preview}"` : ""}`; } if (err instanceof Error) { return `endpoint=${this.endpoint}/v1/embeddings error="${err.message}"`; } return `endpoint=${this.endpoint}/v1/embeddings error="${String(err)}"`; } buildHeaders() { const headers = { "Content-Type": "application/json", "Accept": "application/json", }; if (this.apiKey) { headers["Authorization"] = `Bearer ${this.apiKey}`; } return headers; } /** * Single HTTP request with retry on 429/503. Returns embeddings indexed * the same as `texts`. Throws on non-retryable failure or all attempts * exhausted. */ async requestWithRetry(texts, options) { let lastErr = null; const maxAttempts = this.retryBackoffsMs.length + 1; for (let attempt = 0; attempt < maxAttempts; attempt++) { // Honor user abort BEFORE issuing the call (avoids wasted network) if (options.signal?.aborted) { throw new Error("aborted by caller"); } try { return await this.requestOnce(texts, options); } catch (err) { lastErr = err; const retryable = err instanceof HttpError ? isRetryableStatus(err.status) : false; if (!retryable) throw err; if (attempt < this.retryBackoffsMs.length) { await this.sleep(this.retryBackoffsMs[attempt]); } } } // Exhausted retries → throw the last error so caller marks the chunk null throw lastErr ?? new Error("requestWithRetry exhausted"); } /** * Issue one HTTP attempt to `POST /v1/embeddings`. Does NOT retry. */ async requestOnce(texts, options) { const { signal: attemptSig, cleanup } = buildAttemptSignal(options.signal, this.timeoutMs); try { const body = JSON.stringify({ model: options.model ?? this.upstreamModel, input: texts, }); const resp = await this.fetchImpl(`${this.endpoint}/v1/embeddings`, { method: "POST", headers: this.buildHeaders(), body, signal: attemptSig, }); if (!resp.ok) { const text = await resp.text().catch(() => ""); throw new HttpError(resp.status, text); } let parsed; try { parsed = (await resp.json()); } catch (err) { throw new Error(`OpenAIEmbeddingsProvider: malformed JSON from ${this.endpoint}/v1/embeddings: ${err instanceof Error ? err.message : String(err)}`); } if (!parsed || !Array.isArray(parsed.data)) { throw new Error(`OpenAIEmbeddingsProvider: response missing "data" array (got ${typeof parsed})`); } // Sort by index to match input order (in case server returns out-of-order). const out = new Array(texts.length); for (const item of parsed.data) { if (typeof item.index !== "number" || item.index < 0 || item.index >= texts.length) { throw new Error(`OpenAIEmbeddingsProvider: data item index out of range (${item.index}, expected 0..${texts.length - 1})`); } if (!Array.isArray(item.embedding)) { throw new Error(`OpenAIEmbeddingsProvider: data[${item.index}].embedding is not an array`); } out[item.index] = item.embedding; } // Sanity check — every slot must be filled for (let i = 0; i < texts.length; i++) { if (!out[i]) { throw new Error(`OpenAIEmbeddingsProvider: response missing embedding for index ${i}`); } } return out; } finally { cleanup(); } } }