/** * openai.ts - OpenAI-compatible HTTP embedding provider * * Talks to any endpoint that implements `POST /v1/embeddings` with the OpenAI * shape: request `{model, input: string|string[]}`, response * `{data: [{embedding: number[], index: number}, ...]}`. * * Used by qmd to delegate embeddings to a GPU worker (e.g. ai.mm.mk → * qmd-embed-worker on `models` LXC, RTX 4090) instead of running * node-llama-cpp locally. * * Features: * - Batches input in groups of ≤64 (configurable via QMD_EMBED_BATCH_SIZE) * - Retries 429 / 503 with exponential backoff (1s, 4s, 16s) * - 4xx (non-429) → no retry, count as failure * - Circuit breaker: >50% failures in a 60s window → OPEN for 5 min, * callers can use this to fall back to a local provider * - Per-call timeout via AbortSignal (default QMD_EMBED_TIMEOUT_MS=30000) * - Healthcheck via `GET /health` if available, else a probe embed call */ // ─────────────────────────── Configuration ─────────────────────────────────── /** * Default batch size — most OpenAI-compatible embedding endpoints accept up to * 2048 inputs per call but for memory and latency we cap at 64. */ export const DEFAULT_BATCH_SIZE = 64; /** * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes * <500ms per batch of 64 in practice; 30s is a safe upper bound. */ export const DEFAULT_TIMEOUT_MS = 30_000; /** * Retry backoff schedule (ms) for 429/503 responses. 3 attempts total * (initial + 2 retries) — aligns with issue spec "1s/4s/16s". */ export const RETRY_BACKOFFS_MS = [1_000, 4_000, 16_000]; /** * Circuit breaker — flips OPEN when error rate exceeds threshold within * window. While OPEN, every call fails fast so the caller can fall back. */ export const CIRCUIT_WINDOW_MS = 60_000; export const CIRCUIT_OPEN_DURATION_MS = 5 * 60_000; export const CIRCUIT_FAILURE_RATE_THRESHOLD = 0.5; export const CIRCUIT_MIN_SAMPLES = 4; // ─────────────────────────── Helpers ───────────────────────────────────────── function defaultSleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } /** * Build the merged AbortSignal for a single HTTP attempt: combines an * external `userSignal` (from caller / withLLMSession) with a per-attempt * timeout signal. Returns the merged signal AND the timeout id so the * caller can `clearTimeout` after the attempt completes (avoids leaks). */ function buildAttemptSignal(userSignal, timeoutMs) { const ctrl = new AbortController(); const timeoutId = setTimeout(() => { ctrl.abort(new Error(`Request timed out after ${timeoutMs}ms`)); }, timeoutMs); // Don't keep process alive just for this timer if (typeof timeoutId === "object" && timeoutId !== null && "unref" in timeoutId) { timeoutId.unref(); } const onUserAbort = () => ctrl.abort(userSignal?.reason); if (userSignal) { if (userSignal.aborted) { ctrl.abort(userSignal.reason); } else { userSignal.addEventListener("abort", onUserAbort, { once: true }); } } const cleanup = () => { clearTimeout(timeoutId); if (userSignal) userSignal.removeEventListener("abort", onUserAbort); }; return { signal: ctrl.signal, cleanup }; } /** * Determine whether an HTTP status is retryable. 429 (Too Many Requests) * and 503 (Service Unavailable) are retried; 4xx (other than 429) are not. */ export function isRetryableStatus(status) { return status === 429 || status === 503; } /** * Chunk an array into pieces of ≤ size each. `size` MUST be ≥ 1. */ export function chunkArray(items, size) { if (size < 1) throw new Error(`chunkArray: size must be ≥ 1, got ${size}`); if (items.length <= size) return items.length === 0 ? [] : [items]; const out = []; for (let i = 0; i < items.length; i += size) { out.push(items.slice(i, i + size)); } return out; } // ─────────────────────────── Circuit Breaker ───────────────────────────────── /** * Sliding-window circuit breaker. Tracks the last N samples (min 4) over a * 60-second window; flips OPEN when failure rate exceeds 50%, then auto- * resets to HALF-OPEN after 5 minutes — at which point the next probe * decides whether to close (success) or re-open (failure). */ export class CircuitBreaker { samples = []; state = "closed"; openedAt = null; windowMs; openDurationMs; threshold; minSamples; now; constructor(opts = {}) { this.windowMs = opts.windowMs ?? CIRCUIT_WINDOW_MS; this.openDurationMs = opts.openDurationMs ?? CIRCUIT_OPEN_DURATION_MS; this.threshold = opts.threshold ?? CIRCUIT_FAILURE_RATE_THRESHOLD; this.minSamples = opts.minSamples ?? CIRCUIT_MIN_SAMPLES; this.now = opts.now ?? Date.now; } getState() { this.tickAutoReset(); return this.state; } /** * Returns true when calls should be short-circuited (skip HTTP, fall back). * Side-effects: may transition OPEN → HALF-OPEN if the open window expired. */ shouldFailFast() { return this.getState() === "open"; } /** Record a successful call. */ recordSuccess() { // Honor the time-based OPEN→HALF-OPEN transition before deciding what // to do with this sample. Without this, a success that lands AFTER the // open window expired would still see state==="open" and never close // the breaker (a probe call could only flip it via getState()). this.tickAutoReset(); this.pushSample(true); if (this.state === "half-open") { this.state = "closed"; this.openedAt = null; } } /** Record a failed call. May trigger OPEN. */ recordFailure() { // Same reasoning as recordSuccess — apply lazy auto-reset before // classifying the sample. this.tickAutoReset(); this.pushSample(false); if (this.state === "half-open") { // Probe failed — re-open this.state = "open"; this.openedAt = this.now(); return; } if (this.state === "closed") this.evaluate(); } /** Force-reset the breaker (used by tests / admin) */ reset() { this.samples = []; this.state = "closed"; this.openedAt = null; } pushSample(ok) { const ts = this.now(); this.samples.push({ ts, ok }); // Drop samples outside the window const cutoff = ts - this.windowMs; while (this.samples.length > 0 && this.samples[0].ts < cutoff) { this.samples.shift(); } } evaluate() { if (this.samples.length < this.minSamples) return; const failures = this.samples.filter((s) => !s.ok).length; const rate = failures / this.samples.length; if (rate > this.threshold) { this.state = "open"; this.openedAt = this.now(); } } tickAutoReset() { if (this.state === "open" && this.openedAt !== null) { if (this.now() - this.openedAt >= this.openDurationMs) { this.state = "half-open"; } } } } // ─────────────────────────── Errors ────────────────────────────────────────── /** * Raised when the circuit breaker is OPEN and a call is short-circuited. * Callers (e.g. fallback wrapper) can catch this to switch to local provider. */ export class CircuitOpenError extends Error { constructor(message = "OpenAIEmbeddingsProvider circuit is OPEN") { super(message); this.name = "CircuitOpenError"; } } /** * Persistent (non-retryable) HTTP error from upstream. Includes status code. */ export class HttpError extends Error { status; bodyPreview; constructor(status, bodyPreview) { super(`HTTP ${status}: ${bodyPreview.slice(0, 200)}`); this.name = "HttpError"; this.status = status; this.bodyPreview = bodyPreview.slice(0, 1024); } } // ─────────────────────────── Provider ──────────────────────────────────────── export class OpenAIEmbeddingsProvider { kind = "openai"; endpoint; apiKey; modelId; upstreamModel; batchSize; timeoutMs; fetchImpl; retryBackoffsMs; sleep; now; dimensions = undefined; breaker; constructor(config) { if (!config.endpoint) { throw new Error("OpenAIEmbeddingsProvider: endpoint is required"); } this.endpoint = config.endpoint.replace(/\/+$/, ""); this.apiKey = config.apiKey; this.modelId = config.modelId ?? "embeddinggemma"; this.upstreamModel = config.upstreamModel ?? this.modelId; this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE; this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS; this.fetchImpl = config.fetchImpl ?? globalThis.fetch; this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS; this.sleep = config.sleep ?? defaultSleep; this.now = config.now ?? Date.now; this.breaker = new CircuitBreaker({ now: this.now }); if (!this.fetchImpl) { throw new Error("OpenAIEmbeddingsProvider: global fetch is unavailable. " + "Provide a `fetchImpl` config option (Node ≥18 ships fetch by default)."); } if (this.batchSize < 1) { throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`); } } getModelId() { return this.modelId; } getDimensions() { return this.dimensions; } async healthcheck(signal) { // Try GET /health first (worker exposes it). Fall back to probe embed. try { const { signal: attemptSig, cleanup } = buildAttemptSignal(signal, this.timeoutMs); try { const resp = await this.fetchImpl(`${this.endpoint}/health`, { method: "GET", headers: this.buildHeaders(), signal: attemptSig, }); if (resp.ok) { return { ok: true, model: this.modelId, dimensions: this.dimensions, detail: `GET /health → ${resp.status}`, }; } return { ok: false, model: this.modelId, detail: `GET /health → HTTP ${resp.status}`, }; } finally { cleanup(); } } catch (err) { // Endpoint may not implement /health — try a single embed probe instead. try { const probe = await this.embed("healthcheck", { signal }); if (probe) { return { ok: true, model: this.modelId, dimensions: probe.embedding.length, detail: "embed probe ok", }; } return { ok: false, model: this.modelId, detail: "embed probe returned null", }; } catch (probeErr) { return { ok: false, model: this.modelId, detail: (err instanceof Error ? err.message : String(err)) + " | probe: " + (probeErr instanceof Error ? probeErr.message : String(probeErr)), }; } } } async embed(text, options = {}) { const batch = await this.embedBatch([text], options); return batch[0] ?? null; } async embedBatch(texts, options = {}) { if (texts.length === 0) return []; if (this.breaker.shouldFailFast()) { throw new CircuitOpenError(); } const chunks = chunkArray(texts, this.batchSize); const results = new Array(texts.length).fill(null); let cursor = 0; for (const chunk of chunks) { const start = cursor; cursor += chunk.length; // Abort early if signal already fired if (options.signal?.aborted) { // Leave remaining slots as null (caller treats as errors) return results; } // Fail-fast if breaker tripped mid-loop if (this.breaker.shouldFailFast()) { throw new CircuitOpenError(); } try { const embeddings = await this.requestWithRetry(chunk, options); for (let i = 0; i < chunk.length; i++) { const embedding = embeddings[i]; if (embedding) { results[start + i] = { embedding, model: this.modelId, }; // Record dimensions on first success if (this.dimensions === undefined) { this.dimensions = embedding.length; } } } this.breaker.recordSuccess(); } catch (err) { this.breaker.recordFailure(); // CircuitOpenError must propagate so the caller can fall back if (err instanceof CircuitOpenError) throw err; // Other errors mark the chunk as null and continue with next chunk. // (The store layer already handles per-text nulls as errors.) if (process.env.QMD_EMBED_DEBUG) { process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`); } } } return results; } async dispose() { // Nothing to release — fetch handles its own connection pooling. // Reset the breaker so a re-instantiation starts fresh. this.breaker.reset(); } // ────────────────────── Internals ────────────────────── buildHeaders() { const headers = { "Content-Type": "application/json", "Accept": "application/json", }; if (this.apiKey) { headers["Authorization"] = `Bearer ${this.apiKey}`; } return headers; } /** * Single HTTP request with retry on 429/503. Returns embeddings indexed * the same as `texts`. Throws on non-retryable failure or all attempts * exhausted. */ async requestWithRetry(texts, options) { let lastErr = null; const maxAttempts = this.retryBackoffsMs.length + 1; for (let attempt = 0; attempt < maxAttempts; attempt++) { // Honor user abort BEFORE issuing the call (avoids wasted network) if (options.signal?.aborted) { throw new Error("aborted by caller"); } try { return await this.requestOnce(texts, options); } catch (err) { lastErr = err; const retryable = err instanceof HttpError ? isRetryableStatus(err.status) : false; if (!retryable) throw err; if (attempt < this.retryBackoffsMs.length) { await this.sleep(this.retryBackoffsMs[attempt]); } } } // Exhausted retries → throw the last error so caller marks the chunk null throw lastErr ?? new Error("requestWithRetry exhausted"); } /** * Issue one HTTP attempt to `POST /v1/embeddings`. Does NOT retry. */ async requestOnce(texts, options) { const { signal: attemptSig, cleanup } = buildAttemptSignal(options.signal, this.timeoutMs); try { const body = JSON.stringify({ model: options.model ?? this.upstreamModel, input: texts, }); const resp = await this.fetchImpl(`${this.endpoint}/v1/embeddings`, { method: "POST", headers: this.buildHeaders(), body, signal: attemptSig, }); if (!resp.ok) { const text = await resp.text().catch(() => ""); throw new HttpError(resp.status, text); } let parsed; try { parsed = (await resp.json()); } catch (err) { throw new Error(`OpenAIEmbeddingsProvider: malformed JSON from ${this.endpoint}/v1/embeddings: ${err instanceof Error ? err.message : String(err)}`); } if (!parsed || !Array.isArray(parsed.data)) { throw new Error(`OpenAIEmbeddingsProvider: response missing "data" array (got ${typeof parsed})`); } // Sort by index to match input order (in case server returns out-of-order). const out = new Array(texts.length); for (const item of parsed.data) { if (typeof item.index !== "number" || item.index < 0 || item.index >= texts.length) { throw new Error(`OpenAIEmbeddingsProvider: data item index out of range (${item.index}, expected 0..${texts.length - 1})`); } if (!Array.isArray(item.embedding)) { throw new Error(`OpenAIEmbeddingsProvider: data[${item.index}].embedding is not an array`); } out[item.index] = item.embedding; } // Sanity check — every slot must be filled for (let i = 0; i < texts.length; i++) { if (!out[i]) { throw new Error(`OpenAIEmbeddingsProvider: response missing embedding for index ${i}`); } } return out; } finally { cleanup(); } } }