|
|
@@ -0,0 +1,477 @@
|
|
|
+/**
|
|
|
+ * openai.ts - OpenAI-compatible HTTP embedding provider
|
|
|
+ *
|
|
|
+ * Talks to any endpoint that implements `POST /v1/embeddings` with the OpenAI
|
|
|
+ * shape: request `{model, input: string|string[]}`, response
|
|
|
+ * `{data: [{embedding: number[], index: number}, ...]}`.
|
|
|
+ *
|
|
|
+ * Used by qmd to delegate embeddings to a GPU worker (e.g. ai.mm.mk →
|
|
|
+ * qmd-embed-worker on `models` LXC, RTX 4090) instead of running
|
|
|
+ * node-llama-cpp locally.
|
|
|
+ *
|
|
|
+ * Features:
|
|
|
+ * - Batches input in groups of ≤64 (configurable via QMD_EMBED_BATCH_SIZE)
|
|
|
+ * - Retries 429 / 503 with exponential backoff (1s, 4s, 16s)
|
|
|
+ * - 4xx (non-429) → no retry, count as failure
|
|
|
+ * - Circuit breaker: >50% failures in a 60s window → OPEN for 5 min,
|
|
|
+ * callers can use this to fall back to a local provider
|
|
|
+ * - Per-call timeout via AbortSignal (default QMD_EMBED_TIMEOUT_MS=30000)
|
|
|
+ * - Healthcheck via `GET /health` if available, else a probe embed call
|
|
|
+ */
|
|
|
+// ─────────────────────────── Configuration ───────────────────────────────────
|
|
|
+/**
|
|
|
+ * Default batch size — most OpenAI-compatible embedding endpoints accept up to
|
|
|
+ * 2048 inputs per call but for memory and latency we cap at 64.
|
|
|
+ */
|
|
|
+export const DEFAULT_BATCH_SIZE = 64;
|
|
|
+/**
|
|
|
+ * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes
|
|
|
+ * <500ms per batch of 64 in practice; 30s is a safe upper bound.
|
|
|
+ */
|
|
|
+export const DEFAULT_TIMEOUT_MS = 30_000;
|
|
|
+/**
|
|
|
+ * Retry backoff schedule (ms) for 429/503 responses. 3 attempts total
|
|
|
+ * (initial + 2 retries) — aligns with issue spec "1s/4s/16s".
|
|
|
+ */
|
|
|
+export const RETRY_BACKOFFS_MS = [1_000, 4_000, 16_000];
|
|
|
+/**
|
|
|
+ * Circuit breaker — flips OPEN when error rate exceeds threshold within
|
|
|
+ * window. While OPEN, every call fails fast so the caller can fall back.
|
|
|
+ */
|
|
|
+export const CIRCUIT_WINDOW_MS = 60_000;
|
|
|
+export const CIRCUIT_OPEN_DURATION_MS = 5 * 60_000;
|
|
|
+export const CIRCUIT_FAILURE_RATE_THRESHOLD = 0.5;
|
|
|
+export const CIRCUIT_MIN_SAMPLES = 4;
|
|
|
+// ─────────────────────────── Helpers ─────────────────────────────────────────
|
|
|
+function defaultSleep(ms) {
|
|
|
+ return new Promise((resolve) => setTimeout(resolve, ms));
|
|
|
+}
|
|
|
+/**
|
|
|
+ * Build the merged AbortSignal for a single HTTP attempt: combines an
|
|
|
+ * external `userSignal` (from caller / withLLMSession) with a per-attempt
|
|
|
+ * timeout signal. Returns the merged signal AND the timeout id so the
|
|
|
+ * caller can `clearTimeout` after the attempt completes (avoids leaks).
|
|
|
+ */
|
|
|
+function buildAttemptSignal(userSignal, timeoutMs) {
|
|
|
+ const ctrl = new AbortController();
|
|
|
+ const timeoutId = setTimeout(() => {
|
|
|
+ ctrl.abort(new Error(`Request timed out after ${timeoutMs}ms`));
|
|
|
+ }, timeoutMs);
|
|
|
+ // Don't keep process alive just for this timer
|
|
|
+ if (typeof timeoutId === "object" && timeoutId !== null && "unref" in timeoutId) {
|
|
|
+ timeoutId.unref();
|
|
|
+ }
|
|
|
+ const onUserAbort = () => ctrl.abort(userSignal?.reason);
|
|
|
+ if (userSignal) {
|
|
|
+ if (userSignal.aborted) {
|
|
|
+ ctrl.abort(userSignal.reason);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ userSignal.addEventListener("abort", onUserAbort, { once: true });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ const cleanup = () => {
|
|
|
+ clearTimeout(timeoutId);
|
|
|
+ if (userSignal)
|
|
|
+ userSignal.removeEventListener("abort", onUserAbort);
|
|
|
+ };
|
|
|
+ return { signal: ctrl.signal, cleanup };
|
|
|
+}
|
|
|
+/**
|
|
|
+ * Determine whether an HTTP status is retryable. 429 (Too Many Requests)
|
|
|
+ * and 503 (Service Unavailable) are retried; 4xx (other than 429) are not.
|
|
|
+ */
|
|
|
+export function isRetryableStatus(status) {
|
|
|
+ return status === 429 || status === 503;
|
|
|
+}
|
|
|
+/**
|
|
|
+ * Chunk an array into pieces of ≤ size each. `size` MUST be ≥ 1.
|
|
|
+ */
|
|
|
+export function chunkArray(items, size) {
|
|
|
+ if (size < 1)
|
|
|
+ throw new Error(`chunkArray: size must be ≥ 1, got ${size}`);
|
|
|
+ if (items.length <= size)
|
|
|
+ return items.length === 0 ? [] : [items];
|
|
|
+ const out = [];
|
|
|
+ for (let i = 0; i < items.length; i += size) {
|
|
|
+ out.push(items.slice(i, i + size));
|
|
|
+ }
|
|
|
+ return out;
|
|
|
+}
|
|
|
+// ─────────────────────────── Circuit Breaker ─────────────────────────────────
|
|
|
+/**
|
|
|
+ * Sliding-window circuit breaker. Tracks the last N samples (min 4) over a
|
|
|
+ * 60-second window; flips OPEN when failure rate exceeds 50%, then auto-
|
|
|
+ * resets to HALF-OPEN after 5 minutes — at which point the next probe
|
|
|
+ * decides whether to close (success) or re-open (failure).
|
|
|
+ */
|
|
|
+export class CircuitBreaker {
|
|
|
+ samples = [];
|
|
|
+ state = "closed";
|
|
|
+ openedAt = null;
|
|
|
+ windowMs;
|
|
|
+ openDurationMs;
|
|
|
+ threshold;
|
|
|
+ minSamples;
|
|
|
+ now;
|
|
|
+ constructor(opts = {}) {
|
|
|
+ this.windowMs = opts.windowMs ?? CIRCUIT_WINDOW_MS;
|
|
|
+ this.openDurationMs = opts.openDurationMs ?? CIRCUIT_OPEN_DURATION_MS;
|
|
|
+ this.threshold = opts.threshold ?? CIRCUIT_FAILURE_RATE_THRESHOLD;
|
|
|
+ this.minSamples = opts.minSamples ?? CIRCUIT_MIN_SAMPLES;
|
|
|
+ this.now = opts.now ?? Date.now;
|
|
|
+ }
|
|
|
+ getState() {
|
|
|
+ this.tickAutoReset();
|
|
|
+ return this.state;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Returns true when calls should be short-circuited (skip HTTP, fall back).
|
|
|
+ * Side-effects: may transition OPEN → HALF-OPEN if the open window expired.
|
|
|
+ */
|
|
|
+ shouldFailFast() {
|
|
|
+ return this.getState() === "open";
|
|
|
+ }
|
|
|
+ /** Record a successful call. */
|
|
|
+ recordSuccess() {
|
|
|
+ // Honor the time-based OPEN→HALF-OPEN transition before deciding what
|
|
|
+ // to do with this sample. Without this, a success that lands AFTER the
|
|
|
+ // open window expired would still see state==="open" and never close
|
|
|
+ // the breaker (a probe call could only flip it via getState()).
|
|
|
+ this.tickAutoReset();
|
|
|
+ this.pushSample(true);
|
|
|
+ if (this.state === "half-open") {
|
|
|
+ this.state = "closed";
|
|
|
+ this.openedAt = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /** Record a failed call. May trigger OPEN. */
|
|
|
+ recordFailure() {
|
|
|
+ // Same reasoning as recordSuccess — apply lazy auto-reset before
|
|
|
+ // classifying the sample.
|
|
|
+ this.tickAutoReset();
|
|
|
+ this.pushSample(false);
|
|
|
+ if (this.state === "half-open") {
|
|
|
+ // Probe failed — re-open
|
|
|
+ this.state = "open";
|
|
|
+ this.openedAt = this.now();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (this.state === "closed")
|
|
|
+ this.evaluate();
|
|
|
+ }
|
|
|
+ /** Force-reset the breaker (used by tests / admin) */
|
|
|
+ reset() {
|
|
|
+ this.samples = [];
|
|
|
+ this.state = "closed";
|
|
|
+ this.openedAt = null;
|
|
|
+ }
|
|
|
+ pushSample(ok) {
|
|
|
+ const ts = this.now();
|
|
|
+ this.samples.push({ ts, ok });
|
|
|
+ // Drop samples outside the window
|
|
|
+ const cutoff = ts - this.windowMs;
|
|
|
+ while (this.samples.length > 0 && this.samples[0].ts < cutoff) {
|
|
|
+ this.samples.shift();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ evaluate() {
|
|
|
+ if (this.samples.length < this.minSamples)
|
|
|
+ return;
|
|
|
+ const failures = this.samples.filter((s) => !s.ok).length;
|
|
|
+ const rate = failures / this.samples.length;
|
|
|
+ if (rate > this.threshold) {
|
|
|
+ this.state = "open";
|
|
|
+ this.openedAt = this.now();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tickAutoReset() {
|
|
|
+ if (this.state === "open" && this.openedAt !== null) {
|
|
|
+ if (this.now() - this.openedAt >= this.openDurationMs) {
|
|
|
+ this.state = "half-open";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+// ─────────────────────────── Errors ──────────────────────────────────────────
|
|
|
+/**
|
|
|
+ * Raised when the circuit breaker is OPEN and a call is short-circuited.
|
|
|
+ * Callers (e.g. fallback wrapper) can catch this to switch to local provider.
|
|
|
+ */
|
|
|
+export class CircuitOpenError extends Error {
|
|
|
+ constructor(message = "OpenAIEmbeddingsProvider circuit is OPEN") {
|
|
|
+ super(message);
|
|
|
+ this.name = "CircuitOpenError";
|
|
|
+ }
|
|
|
+}
|
|
|
+/**
|
|
|
+ * Persistent (non-retryable) HTTP error from upstream. Includes status code.
|
|
|
+ */
|
|
|
+export class HttpError extends Error {
|
|
|
+ status;
|
|
|
+ bodyPreview;
|
|
|
+ constructor(status, bodyPreview) {
|
|
|
+ super(`HTTP ${status}: ${bodyPreview.slice(0, 200)}`);
|
|
|
+ this.name = "HttpError";
|
|
|
+ this.status = status;
|
|
|
+ this.bodyPreview = bodyPreview.slice(0, 1024);
|
|
|
+ }
|
|
|
+}
|
|
|
+// ─────────────────────────── Provider ────────────────────────────────────────
|
|
|
+export class OpenAIEmbeddingsProvider {
|
|
|
+ kind = "openai";
|
|
|
+ endpoint;
|
|
|
+ apiKey;
|
|
|
+ modelId;
|
|
|
+ upstreamModel;
|
|
|
+ batchSize;
|
|
|
+ timeoutMs;
|
|
|
+ fetchImpl;
|
|
|
+ retryBackoffsMs;
|
|
|
+ sleep;
|
|
|
+ now;
|
|
|
+ dimensions = undefined;
|
|
|
+ breaker;
|
|
|
+ constructor(config) {
|
|
|
+ if (!config.endpoint) {
|
|
|
+ throw new Error("OpenAIEmbeddingsProvider: endpoint is required");
|
|
|
+ }
|
|
|
+ this.endpoint = config.endpoint.replace(/\/+$/, "");
|
|
|
+ this.apiKey = config.apiKey;
|
|
|
+ this.modelId = config.modelId ?? "embeddinggemma";
|
|
|
+ this.upstreamModel = config.upstreamModel ?? this.modelId;
|
|
|
+ this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE;
|
|
|
+ this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
|
|
|
+ this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
|
|
|
+ this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS;
|
|
|
+ this.sleep = config.sleep ?? defaultSleep;
|
|
|
+ this.now = config.now ?? Date.now;
|
|
|
+ this.breaker = new CircuitBreaker({ now: this.now });
|
|
|
+ if (!this.fetchImpl) {
|
|
|
+ throw new Error("OpenAIEmbeddingsProvider: global fetch is unavailable. " +
|
|
|
+ "Provide a `fetchImpl` config option (Node ≥18 ships fetch by default).");
|
|
|
+ }
|
|
|
+ if (this.batchSize < 1) {
|
|
|
+ throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ getModelId() {
|
|
|
+ return this.modelId;
|
|
|
+ }
|
|
|
+ getDimensions() {
|
|
|
+ return this.dimensions;
|
|
|
+ }
|
|
|
+ async healthcheck(signal) {
|
|
|
+ // Try GET /health first (worker exposes it). Fall back to probe embed.
|
|
|
+ try {
|
|
|
+ const { signal: attemptSig, cleanup } = buildAttemptSignal(signal, this.timeoutMs);
|
|
|
+ try {
|
|
|
+ const resp = await this.fetchImpl(`${this.endpoint}/health`, {
|
|
|
+ method: "GET",
|
|
|
+ headers: this.buildHeaders(),
|
|
|
+ signal: attemptSig,
|
|
|
+ });
|
|
|
+ if (resp.ok) {
|
|
|
+ return {
|
|
|
+ ok: true,
|
|
|
+ model: this.modelId,
|
|
|
+ dimensions: this.dimensions,
|
|
|
+ detail: `GET /health → ${resp.status}`,
|
|
|
+ };
|
|
|
+ }
|
|
|
+ return {
|
|
|
+ ok: false,
|
|
|
+ model: this.modelId,
|
|
|
+ detail: `GET /health → HTTP ${resp.status}`,
|
|
|
+ };
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (err) {
|
|
|
+ // Endpoint may not implement /health — try a single embed probe instead.
|
|
|
+ try {
|
|
|
+ const probe = await this.embed("healthcheck", { signal });
|
|
|
+ if (probe) {
|
|
|
+ return {
|
|
|
+ ok: true,
|
|
|
+ model: this.modelId,
|
|
|
+ dimensions: probe.embedding.length,
|
|
|
+ detail: "embed probe ok",
|
|
|
+ };
|
|
|
+ }
|
|
|
+ return {
|
|
|
+ ok: false,
|
|
|
+ model: this.modelId,
|
|
|
+ detail: "embed probe returned null",
|
|
|
+ };
|
|
|
+ }
|
|
|
+ catch (probeErr) {
|
|
|
+ return {
|
|
|
+ ok: false,
|
|
|
+ model: this.modelId,
|
|
|
+ detail: (err instanceof Error ? err.message : String(err)) +
|
|
|
+ " | probe: " +
|
|
|
+ (probeErr instanceof Error ? probeErr.message : String(probeErr)),
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ async embed(text, options = {}) {
|
|
|
+ const batch = await this.embedBatch([text], options);
|
|
|
+ return batch[0] ?? null;
|
|
|
+ }
|
|
|
+ async embedBatch(texts, options = {}) {
|
|
|
+ if (texts.length === 0)
|
|
|
+ return [];
|
|
|
+ if (this.breaker.shouldFailFast()) {
|
|
|
+ throw new CircuitOpenError();
|
|
|
+ }
|
|
|
+ const chunks = chunkArray(texts, this.batchSize);
|
|
|
+ const results = new Array(texts.length).fill(null);
|
|
|
+ let cursor = 0;
|
|
|
+ for (const chunk of chunks) {
|
|
|
+ const start = cursor;
|
|
|
+ cursor += chunk.length;
|
|
|
+ // Abort early if signal already fired
|
|
|
+ if (options.signal?.aborted) {
|
|
|
+ // Leave remaining slots as null (caller treats as errors)
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+ // Fail-fast if breaker tripped mid-loop
|
|
|
+ if (this.breaker.shouldFailFast()) {
|
|
|
+ throw new CircuitOpenError();
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ const embeddings = await this.requestWithRetry(chunk, options);
|
|
|
+ for (let i = 0; i < chunk.length; i++) {
|
|
|
+ const embedding = embeddings[i];
|
|
|
+ if (embedding) {
|
|
|
+ results[start + i] = {
|
|
|
+ embedding,
|
|
|
+ model: this.modelId,
|
|
|
+ };
|
|
|
+ // Record dimensions on first success
|
|
|
+ if (this.dimensions === undefined) {
|
|
|
+ this.dimensions = embedding.length;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ this.breaker.recordSuccess();
|
|
|
+ }
|
|
|
+ catch (err) {
|
|
|
+ this.breaker.recordFailure();
|
|
|
+ // CircuitOpenError must propagate so the caller can fall back
|
|
|
+ if (err instanceof CircuitOpenError)
|
|
|
+ throw err;
|
|
|
+ // Other errors mark the chunk as null and continue with next chunk.
|
|
|
+ // (The store layer already handles per-text nulls as errors.)
|
|
|
+ if (process.env.QMD_EMBED_DEBUG) {
|
|
|
+ process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+ async dispose() {
|
|
|
+ // Nothing to release — fetch handles its own connection pooling.
|
|
|
+ // Reset the breaker so a re-instantiation starts fresh.
|
|
|
+ this.breaker.reset();
|
|
|
+ }
|
|
|
+ // ────────────────────── Internals ──────────────────────
|
|
|
+ buildHeaders() {
|
|
|
+ const headers = {
|
|
|
+ "Content-Type": "application/json",
|
|
|
+ "Accept": "application/json",
|
|
|
+ };
|
|
|
+ if (this.apiKey) {
|
|
|
+ headers["Authorization"] = `Bearer ${this.apiKey}`;
|
|
|
+ }
|
|
|
+ return headers;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Single HTTP request with retry on 429/503. Returns embeddings indexed
|
|
|
+ * the same as `texts`. Throws on non-retryable failure or all attempts
|
|
|
+ * exhausted.
|
|
|
+ */
|
|
|
+ async requestWithRetry(texts, options) {
|
|
|
+ let lastErr = null;
|
|
|
+ const maxAttempts = this.retryBackoffsMs.length + 1;
|
|
|
+ for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
|
|
+ // Honor user abort BEFORE issuing the call (avoids wasted network)
|
|
|
+ if (options.signal?.aborted) {
|
|
|
+ throw new Error("aborted by caller");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ return await this.requestOnce(texts, options);
|
|
|
+ }
|
|
|
+ catch (err) {
|
|
|
+ lastErr = err;
|
|
|
+ const retryable = err instanceof HttpError ? isRetryableStatus(err.status) : false;
|
|
|
+ if (!retryable)
|
|
|
+ throw err;
|
|
|
+ if (attempt < this.retryBackoffsMs.length) {
|
|
|
+ await this.sleep(this.retryBackoffsMs[attempt]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Exhausted retries → throw the last error so caller marks the chunk null
|
|
|
+ throw lastErr ?? new Error("requestWithRetry exhausted");
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Issue one HTTP attempt to `POST /v1/embeddings`. Does NOT retry.
|
|
|
+ */
|
|
|
+ async requestOnce(texts, options) {
|
|
|
+ const { signal: attemptSig, cleanup } = buildAttemptSignal(options.signal, this.timeoutMs);
|
|
|
+ try {
|
|
|
+ const body = JSON.stringify({
|
|
|
+ model: options.model ?? this.upstreamModel,
|
|
|
+ input: texts,
|
|
|
+ });
|
|
|
+ const resp = await this.fetchImpl(`${this.endpoint}/v1/embeddings`, {
|
|
|
+ method: "POST",
|
|
|
+ headers: this.buildHeaders(),
|
|
|
+ body,
|
|
|
+ signal: attemptSig,
|
|
|
+ });
|
|
|
+ if (!resp.ok) {
|
|
|
+ const text = await resp.text().catch(() => "");
|
|
|
+ throw new HttpError(resp.status, text);
|
|
|
+ }
|
|
|
+ let parsed;
|
|
|
+ try {
|
|
|
+ parsed = (await resp.json());
|
|
|
+ }
|
|
|
+ catch (err) {
|
|
|
+ throw new Error(`OpenAIEmbeddingsProvider: malformed JSON from ${this.endpoint}/v1/embeddings: ${err instanceof Error ? err.message : String(err)}`);
|
|
|
+ }
|
|
|
+ if (!parsed || !Array.isArray(parsed.data)) {
|
|
|
+ throw new Error(`OpenAIEmbeddingsProvider: response missing "data" array (got ${typeof parsed})`);
|
|
|
+ }
|
|
|
+ // Sort by index to match input order (in case server returns out-of-order).
|
|
|
+ const out = new Array(texts.length);
|
|
|
+ for (const item of parsed.data) {
|
|
|
+ if (typeof item.index !== "number" ||
|
|
|
+ item.index < 0 ||
|
|
|
+ item.index >= texts.length) {
|
|
|
+ throw new Error(`OpenAIEmbeddingsProvider: data item index out of range (${item.index}, expected 0..${texts.length - 1})`);
|
|
|
+ }
|
|
|
+ if (!Array.isArray(item.embedding)) {
|
|
|
+ throw new Error(`OpenAIEmbeddingsProvider: data[${item.index}].embedding is not an array`);
|
|
|
+ }
|
|
|
+ out[item.index] = item.embedding;
|
|
|
+ }
|
|
|
+ // Sanity check — every slot must be filled
|
|
|
+ for (let i = 0; i < texts.length; i++) {
|
|
|
+ if (!out[i]) {
|
|
|
+ throw new Error(`OpenAIEmbeddingsProvider: response missing embedding for index ${i}`);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return out;
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|