| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- /**
- * openai.ts - OpenAI-compatible HTTP embedding provider
- *
- * Talks to any endpoint that implements `POST /v1/embeddings` with the OpenAI
- * shape: request `{model, input: string|string[]}`, response
- * `{data: [{embedding: number[], index: number}, ...]}`.
- *
- * Used by qmd to delegate embeddings to a GPU worker (e.g. ai.mm.mk →
- * qmd-embed-worker on `models` LXC, RTX 4090) instead of running
- * node-llama-cpp locally.
- *
- * Features:
- * - Batches input in groups of ≤64 (configurable via QMD_EMBED_BATCH_SIZE)
- * - Retries 429 / 503 with exponential backoff (1s, 4s, 16s)
- * - 4xx (non-429) → no retry, count as failure
- * - Circuit breaker: >50% failures in a 60s window → OPEN for 5 min,
- * callers can use this to fall back to a local provider
- * - Per-call timeout via AbortSignal (default QMD_EMBED_TIMEOUT_MS=30000)
- * - Healthcheck via `GET /health` if available, else a probe embed call
- */
- // ─────────────────────────── Configuration ───────────────────────────────────
- /**
- * Default batch size — most OpenAI-compatible embedding endpoints accept up to
- * 2048 inputs per call but for memory and latency we cap at 64.
- */
- export const DEFAULT_BATCH_SIZE = 64;
- /**
- * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes
- * <500ms per batch of 64 in practice; 30s is a safe upper bound.
- */
- export const DEFAULT_TIMEOUT_MS = 30_000;
- /**
- * Retry backoff schedule (ms) for 429/503 responses. 3 attempts total
- * (initial + 2 retries) — aligns with issue spec "1s/4s/16s".
- */
- export const RETRY_BACKOFFS_MS = [1_000, 4_000, 16_000];
- /**
- * Circuit breaker — flips OPEN when error rate exceeds threshold within
- * window. While OPEN, every call fails fast so the caller can fall back.
- */
- export const CIRCUIT_WINDOW_MS = 60_000;
- export const CIRCUIT_OPEN_DURATION_MS = 5 * 60_000;
- export const CIRCUIT_FAILURE_RATE_THRESHOLD = 0.5;
- export const CIRCUIT_MIN_SAMPLES = 4;
- // ─────────────────────────── Helpers ─────────────────────────────────────────
- function defaultSleep(ms) {
- return new Promise((resolve) => setTimeout(resolve, ms));
- }
- /**
- * Build the merged AbortSignal for a single HTTP attempt: combines an
- * external `userSignal` (from caller / withLLMSession) with a per-attempt
- * timeout signal. Returns the merged signal AND the timeout id so the
- * caller can `clearTimeout` after the attempt completes (avoids leaks).
- */
- function buildAttemptSignal(userSignal, timeoutMs) {
- const ctrl = new AbortController();
- const timeoutId = setTimeout(() => {
- ctrl.abort(new Error(`Request timed out after ${timeoutMs}ms`));
- }, timeoutMs);
- // Don't keep process alive just for this timer
- if (typeof timeoutId === "object" && timeoutId !== null && "unref" in timeoutId) {
- timeoutId.unref();
- }
- const onUserAbort = () => ctrl.abort(userSignal?.reason);
- if (userSignal) {
- if (userSignal.aborted) {
- ctrl.abort(userSignal.reason);
- }
- else {
- userSignal.addEventListener("abort", onUserAbort, { once: true });
- }
- }
- const cleanup = () => {
- clearTimeout(timeoutId);
- if (userSignal)
- userSignal.removeEventListener("abort", onUserAbort);
- };
- return { signal: ctrl.signal, cleanup };
- }
- /**
- * Determine whether an HTTP status is retryable. 429 (Too Many Requests)
- * and 503 (Service Unavailable) are retried; 4xx (other than 429) are not.
- */
- export function isRetryableStatus(status) {
- return status === 429 || status === 503;
- }
- /**
- * Chunk an array into pieces of ≤ size each. `size` MUST be ≥ 1.
- */
- export function chunkArray(items, size) {
- if (size < 1)
- throw new Error(`chunkArray: size must be ≥ 1, got ${size}`);
- if (items.length <= size)
- return items.length === 0 ? [] : [items];
- const out = [];
- for (let i = 0; i < items.length; i += size) {
- out.push(items.slice(i, i + size));
- }
- return out;
- }
- // ─────────────────────────── Circuit Breaker ─────────────────────────────────
- /**
- * Sliding-window circuit breaker. Tracks the last N samples (min 4) over a
- * 60-second window; flips OPEN when failure rate exceeds 50%, then auto-
- * resets to HALF-OPEN after 5 minutes — at which point the next probe
- * decides whether to close (success) or re-open (failure).
- */
- export class CircuitBreaker {
- samples = [];
- state = "closed";
- openedAt = null;
- windowMs;
- openDurationMs;
- threshold;
- minSamples;
- now;
- constructor(opts = {}) {
- this.windowMs = opts.windowMs ?? CIRCUIT_WINDOW_MS;
- this.openDurationMs = opts.openDurationMs ?? CIRCUIT_OPEN_DURATION_MS;
- this.threshold = opts.threshold ?? CIRCUIT_FAILURE_RATE_THRESHOLD;
- this.minSamples = opts.minSamples ?? CIRCUIT_MIN_SAMPLES;
- this.now = opts.now ?? Date.now;
- }
- getState() {
- this.tickAutoReset();
- return this.state;
- }
- /**
- * Returns true when calls should be short-circuited (skip HTTP, fall back).
- * Side-effects: may transition OPEN → HALF-OPEN if the open window expired.
- */
- shouldFailFast() {
- return this.getState() === "open";
- }
- /** Record a successful call. */
- recordSuccess() {
- // Honor the time-based OPEN→HALF-OPEN transition before deciding what
- // to do with this sample. Without this, a success that lands AFTER the
- // open window expired would still see state==="open" and never close
- // the breaker (a probe call could only flip it via getState()).
- this.tickAutoReset();
- this.pushSample(true);
- if (this.state === "half-open") {
- this.state = "closed";
- this.openedAt = null;
- }
- }
- /** Record a failed call. May trigger OPEN. */
- recordFailure() {
- // Same reasoning as recordSuccess — apply lazy auto-reset before
- // classifying the sample.
- this.tickAutoReset();
- this.pushSample(false);
- if (this.state === "half-open") {
- // Probe failed — re-open
- this.state = "open";
- this.openedAt = this.now();
- return;
- }
- if (this.state === "closed")
- this.evaluate();
- }
- /** Force-reset the breaker (used by tests / admin) */
- reset() {
- this.samples = [];
- this.state = "closed";
- this.openedAt = null;
- }
- pushSample(ok) {
- const ts = this.now();
- this.samples.push({ ts, ok });
- // Drop samples outside the window
- const cutoff = ts - this.windowMs;
- while (this.samples.length > 0 && this.samples[0].ts < cutoff) {
- this.samples.shift();
- }
- }
- evaluate() {
- if (this.samples.length < this.minSamples)
- return;
- const failures = this.samples.filter((s) => !s.ok).length;
- const rate = failures / this.samples.length;
- if (rate > this.threshold) {
- this.state = "open";
- this.openedAt = this.now();
- }
- }
- tickAutoReset() {
- if (this.state === "open" && this.openedAt !== null) {
- if (this.now() - this.openedAt >= this.openDurationMs) {
- this.state = "half-open";
- }
- }
- }
- }
- // ─────────────────────────── Errors ──────────────────────────────────────────
- /**
- * Raised when the circuit breaker is OPEN and a call is short-circuited.
- * Callers (e.g. fallback wrapper) can catch this to switch to local provider.
- */
- export class CircuitOpenError extends Error {
- constructor(message = "OpenAIEmbeddingsProvider circuit is OPEN") {
- super(message);
- this.name = "CircuitOpenError";
- }
- }
- /**
- * Persistent (non-retryable) HTTP error from upstream. Includes status code.
- */
- export class HttpError extends Error {
- status;
- bodyPreview;
- constructor(status, bodyPreview) {
- super(`HTTP ${status}: ${bodyPreview.slice(0, 200)}`);
- this.name = "HttpError";
- this.status = status;
- this.bodyPreview = bodyPreview.slice(0, 1024);
- }
- }
- // ─────────────────────────── Provider ────────────────────────────────────────
- export class OpenAIEmbeddingsProvider {
- kind = "openai";
- endpoint;
- apiKey;
- modelId;
- upstreamModel;
- batchSize;
- timeoutMs;
- fetchImpl;
- retryBackoffsMs;
- sleep;
- now;
- dimensions = undefined;
- breaker;
- constructor(config) {
- if (!config.endpoint) {
- throw new Error("OpenAIEmbeddingsProvider: endpoint is required");
- }
- this.endpoint = config.endpoint.replace(/\/+$/, "");
- this.apiKey = config.apiKey;
- this.modelId = config.modelId ?? "embeddinggemma";
- this.upstreamModel = config.upstreamModel ?? this.modelId;
- this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE;
- this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
- this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
- this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS;
- this.sleep = config.sleep ?? defaultSleep;
- this.now = config.now ?? Date.now;
- this.breaker = new CircuitBreaker({ now: this.now });
- if (!this.fetchImpl) {
- throw new Error("OpenAIEmbeddingsProvider: global fetch is unavailable. " +
- "Provide a `fetchImpl` config option (Node ≥18 ships fetch by default).");
- }
- if (this.batchSize < 1) {
- throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`);
- }
- }
- getModelId() {
- return this.modelId;
- }
- getDimensions() {
- return this.dimensions;
- }
- async healthcheck(signal) {
- // Try GET /health first (worker exposes it). Fall back to probe embed.
- try {
- const { signal: attemptSig, cleanup } = buildAttemptSignal(signal, this.timeoutMs);
- try {
- const resp = await this.fetchImpl(`${this.endpoint}/health`, {
- method: "GET",
- headers: this.buildHeaders(),
- signal: attemptSig,
- });
- if (resp.ok) {
- return {
- ok: true,
- model: this.modelId,
- dimensions: this.dimensions,
- detail: `GET /health → ${resp.status}`,
- };
- }
- return {
- ok: false,
- model: this.modelId,
- detail: `GET /health → HTTP ${resp.status}`,
- };
- }
- finally {
- cleanup();
- }
- }
- catch (err) {
- // Endpoint may not implement /health — try a single embed probe instead.
- try {
- const probe = await this.embed("healthcheck", { signal });
- if (probe) {
- return {
- ok: true,
- model: this.modelId,
- dimensions: probe.embedding.length,
- detail: "embed probe ok",
- };
- }
- return {
- ok: false,
- model: this.modelId,
- detail: "embed probe returned null",
- };
- }
- catch (probeErr) {
- return {
- ok: false,
- model: this.modelId,
- detail: (err instanceof Error ? err.message : String(err)) +
- " | probe: " +
- (probeErr instanceof Error ? probeErr.message : String(probeErr)),
- };
- }
- }
- }
- async embed(text, options = {}) {
- const batch = await this.embedBatch([text], options);
- return batch[0] ?? null;
- }
- async embedBatch(texts, options = {}) {
- if (texts.length === 0)
- return [];
- if (this.breaker.shouldFailFast()) {
- throw new CircuitOpenError();
- }
- const chunks = chunkArray(texts, this.batchSize);
- const results = new Array(texts.length).fill(null);
- let cursor = 0;
- for (const chunk of chunks) {
- const start = cursor;
- cursor += chunk.length;
- // Abort early if signal already fired
- if (options.signal?.aborted) {
- // Leave remaining slots as null (caller treats as errors)
- return results;
- }
- // Fail-fast if breaker tripped mid-loop
- if (this.breaker.shouldFailFast()) {
- throw new CircuitOpenError();
- }
- try {
- const embeddings = await this.requestWithRetry(chunk, options);
- for (let i = 0; i < chunk.length; i++) {
- const embedding = embeddings[i];
- if (embedding) {
- results[start + i] = {
- embedding,
- model: this.modelId,
- };
- // Record dimensions on first success
- if (this.dimensions === undefined) {
- this.dimensions = embedding.length;
- }
- }
- }
- this.breaker.recordSuccess();
- }
- catch (err) {
- this.breaker.recordFailure();
- // CircuitOpenError must propagate so the caller can fall back
- if (err instanceof CircuitOpenError)
- throw err;
- // Other errors mark the chunk as null and continue with next chunk.
- // (The store layer already handles per-text nulls as errors.)
- if (process.env.QMD_EMBED_DEBUG) {
- process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`);
- }
- }
- }
- return results;
- }
- async dispose() {
- // Nothing to release — fetch handles its own connection pooling.
- // Reset the breaker so a re-instantiation starts fresh.
- this.breaker.reset();
- }
- // ────────────────────── Internals ──────────────────────
- buildHeaders() {
- const headers = {
- "Content-Type": "application/json",
- "Accept": "application/json",
- };
- if (this.apiKey) {
- headers["Authorization"] = `Bearer ${this.apiKey}`;
- }
- return headers;
- }
- /**
- * Single HTTP request with retry on 429/503. Returns embeddings indexed
- * the same as `texts`. Throws on non-retryable failure or all attempts
- * exhausted.
- */
- async requestWithRetry(texts, options) {
- let lastErr = null;
- const maxAttempts = this.retryBackoffsMs.length + 1;
- for (let attempt = 0; attempt < maxAttempts; attempt++) {
- // Honor user abort BEFORE issuing the call (avoids wasted network)
- if (options.signal?.aborted) {
- throw new Error("aborted by caller");
- }
- try {
- return await this.requestOnce(texts, options);
- }
- catch (err) {
- lastErr = err;
- const retryable = err instanceof HttpError ? isRetryableStatus(err.status) : false;
- if (!retryable)
- throw err;
- if (attempt < this.retryBackoffsMs.length) {
- await this.sleep(this.retryBackoffsMs[attempt]);
- }
- }
- }
- // Exhausted retries → throw the last error so caller marks the chunk null
- throw lastErr ?? new Error("requestWithRetry exhausted");
- }
- /**
- * Issue one HTTP attempt to `POST /v1/embeddings`. Does NOT retry.
- */
- async requestOnce(texts, options) {
- const { signal: attemptSig, cleanup } = buildAttemptSignal(options.signal, this.timeoutMs);
- try {
- const body = JSON.stringify({
- model: options.model ?? this.upstreamModel,
- input: texts,
- });
- const resp = await this.fetchImpl(`${this.endpoint}/v1/embeddings`, {
- method: "POST",
- headers: this.buildHeaders(),
- body,
- signal: attemptSig,
- });
- if (!resp.ok) {
- const text = await resp.text().catch(() => "");
- throw new HttpError(resp.status, text);
- }
- let parsed;
- try {
- parsed = (await resp.json());
- }
- catch (err) {
- throw new Error(`OpenAIEmbeddingsProvider: malformed JSON from ${this.endpoint}/v1/embeddings: ${err instanceof Error ? err.message : String(err)}`);
- }
- if (!parsed || !Array.isArray(parsed.data)) {
- throw new Error(`OpenAIEmbeddingsProvider: response missing "data" array (got ${typeof parsed})`);
- }
- // Sort by index to match input order (in case server returns out-of-order).
- const out = new Array(texts.length);
- for (const item of parsed.data) {
- if (typeof item.index !== "number" ||
- item.index < 0 ||
- item.index >= texts.length) {
- throw new Error(`OpenAIEmbeddingsProvider: data item index out of range (${item.index}, expected 0..${texts.length - 1})`);
- }
- if (!Array.isArray(item.embedding)) {
- throw new Error(`OpenAIEmbeddingsProvider: data[${item.index}].embedding is not an array`);
- }
- out[item.index] = item.embedding;
- }
- // Sanity check — every slot must be filled
- for (let i = 0; i < texts.length; i++) {
- if (!out[i]) {
- throw new Error(`OpenAIEmbeddingsProvider: response missing embedding for index ${i}`);
- }
- }
- return out;
- }
- finally {
- cleanup();
- }
- }
- }
|