openai.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. /**
  2. * openai.ts - OpenAI-compatible HTTP embedding provider
  3. *
  4. * Talks to any endpoint that implements `POST /v1/embeddings` with the OpenAI
  5. * shape: request `{model, input: string|string[]}`, response
  6. * `{data: [{embedding: number[], index: number}, ...]}`.
  7. *
  8. * Used by qmd to delegate embeddings to a GPU worker (e.g. ai.mm.mk →
  9. * qmd-embed-worker on `models` LXC, RTX 4090) instead of running
  10. * node-llama-cpp locally.
  11. *
  12. * Features:
  13. * - Batches input in groups of ≤64 (configurable via QMD_EMBED_BATCH_SIZE)
  14. * - Retries 429 / 503 with exponential backoff (1s, 4s, 16s)
  15. * - 4xx (non-429) → no retry, count as failure
  16. * - Circuit breaker: >50% failures in a 60s window → OPEN for 5 min,
  17. * callers can use this to fall back to a local provider
  18. * - Per-call timeout via AbortSignal (default QMD_EMBED_TIMEOUT_MS=30000)
  19. * - Healthcheck via `GET /health` if available, else a probe embed call
  20. */
  21. // ─────────────────────────── Configuration ───────────────────────────────────
  22. /**
  23. * Default batch size — most OpenAI-compatible embedding endpoints accept up to
  24. * 2048 inputs per call but for memory and latency we cap at 64.
  25. */
  26. export const DEFAULT_BATCH_SIZE = 64;
  27. /**
  28. * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes
  29. * <500ms per batch of 64 in practice; 30s is a safe upper bound.
  30. */
  31. export const DEFAULT_TIMEOUT_MS = 30_000;
  32. /**
  33. * Retry backoff schedule (ms) for 429/503 responses. 3 attempts total
  34. * (initial + 2 retries) — aligns with issue spec "1s/4s/16s".
  35. */
  36. export const RETRY_BACKOFFS_MS = [1_000, 4_000, 16_000];
  37. /**
  38. * Circuit breaker — flips OPEN when error rate exceeds threshold within
  39. * window. While OPEN, every call fails fast so the caller can fall back.
  40. */
  41. export const CIRCUIT_WINDOW_MS = 60_000;
  42. export const CIRCUIT_OPEN_DURATION_MS = 5 * 60_000;
  43. export const CIRCUIT_FAILURE_RATE_THRESHOLD = 0.5;
  44. export const CIRCUIT_MIN_SAMPLES = 4;
  45. // ─────────────────────────── Helpers ─────────────────────────────────────────
  46. function defaultSleep(ms) {
  47. return new Promise((resolve) => setTimeout(resolve, ms));
  48. }
  49. /**
  50. * Build the merged AbortSignal for a single HTTP attempt: combines an
  51. * external `userSignal` (from caller / withLLMSession) with a per-attempt
  52. * timeout signal. Returns the merged signal AND the timeout id so the
  53. * caller can `clearTimeout` after the attempt completes (avoids leaks).
  54. */
  55. function buildAttemptSignal(userSignal, timeoutMs) {
  56. const ctrl = new AbortController();
  57. const timeoutId = setTimeout(() => {
  58. ctrl.abort(new Error(`Request timed out after ${timeoutMs}ms`));
  59. }, timeoutMs);
  60. // Don't keep process alive just for this timer
  61. if (typeof timeoutId === "object" && timeoutId !== null && "unref" in timeoutId) {
  62. timeoutId.unref();
  63. }
  64. const onUserAbort = () => ctrl.abort(userSignal?.reason);
  65. if (userSignal) {
  66. if (userSignal.aborted) {
  67. ctrl.abort(userSignal.reason);
  68. }
  69. else {
  70. userSignal.addEventListener("abort", onUserAbort, { once: true });
  71. }
  72. }
  73. const cleanup = () => {
  74. clearTimeout(timeoutId);
  75. if (userSignal)
  76. userSignal.removeEventListener("abort", onUserAbort);
  77. };
  78. return { signal: ctrl.signal, cleanup };
  79. }
  80. /**
  81. * Determine whether an HTTP status is retryable. 429 (Too Many Requests)
  82. * and 503 (Service Unavailable) are retried; 4xx (other than 429) are not.
  83. */
  84. export function isRetryableStatus(status) {
  85. return status === 429 || status === 503;
  86. }
  87. /**
  88. * Chunk an array into pieces of ≤ size each. `size` MUST be ≥ 1.
  89. */
  90. export function chunkArray(items, size) {
  91. if (size < 1)
  92. throw new Error(`chunkArray: size must be ≥ 1, got ${size}`);
  93. if (items.length <= size)
  94. return items.length === 0 ? [] : [items];
  95. const out = [];
  96. for (let i = 0; i < items.length; i += size) {
  97. out.push(items.slice(i, i + size));
  98. }
  99. return out;
  100. }
  101. // ─────────────────────────── Circuit Breaker ─────────────────────────────────
  102. /**
  103. * Sliding-window circuit breaker. Tracks the last N samples (min 4) over a
  104. * 60-second window; flips OPEN when failure rate exceeds 50%, then auto-
  105. * resets to HALF-OPEN after 5 minutes — at which point the next probe
  106. * decides whether to close (success) or re-open (failure).
  107. */
  108. export class CircuitBreaker {
  109. samples = [];
  110. state = "closed";
  111. openedAt = null;
  112. windowMs;
  113. openDurationMs;
  114. threshold;
  115. minSamples;
  116. now;
  117. constructor(opts = {}) {
  118. this.windowMs = opts.windowMs ?? CIRCUIT_WINDOW_MS;
  119. this.openDurationMs = opts.openDurationMs ?? CIRCUIT_OPEN_DURATION_MS;
  120. this.threshold = opts.threshold ?? CIRCUIT_FAILURE_RATE_THRESHOLD;
  121. this.minSamples = opts.minSamples ?? CIRCUIT_MIN_SAMPLES;
  122. this.now = opts.now ?? Date.now;
  123. }
  124. getState() {
  125. this.tickAutoReset();
  126. return this.state;
  127. }
  128. /**
  129. * Returns true when calls should be short-circuited (skip HTTP, fall back).
  130. * Side-effects: may transition OPEN → HALF-OPEN if the open window expired.
  131. */
  132. shouldFailFast() {
  133. return this.getState() === "open";
  134. }
  135. /** Record a successful call. */
  136. recordSuccess() {
  137. // Honor the time-based OPEN→HALF-OPEN transition before deciding what
  138. // to do with this sample. Without this, a success that lands AFTER the
  139. // open window expired would still see state==="open" and never close
  140. // the breaker (a probe call could only flip it via getState()).
  141. this.tickAutoReset();
  142. this.pushSample(true);
  143. if (this.state === "half-open") {
  144. this.state = "closed";
  145. this.openedAt = null;
  146. }
  147. }
  148. /** Record a failed call. May trigger OPEN. */
  149. recordFailure() {
  150. // Same reasoning as recordSuccess — apply lazy auto-reset before
  151. // classifying the sample.
  152. this.tickAutoReset();
  153. this.pushSample(false);
  154. if (this.state === "half-open") {
  155. // Probe failed — re-open
  156. this.state = "open";
  157. this.openedAt = this.now();
  158. return;
  159. }
  160. if (this.state === "closed")
  161. this.evaluate();
  162. }
  163. /** Force-reset the breaker (used by tests / admin) */
  164. reset() {
  165. this.samples = [];
  166. this.state = "closed";
  167. this.openedAt = null;
  168. }
  169. pushSample(ok) {
  170. const ts = this.now();
  171. this.samples.push({ ts, ok });
  172. // Drop samples outside the window
  173. const cutoff = ts - this.windowMs;
  174. while (this.samples.length > 0 && this.samples[0].ts < cutoff) {
  175. this.samples.shift();
  176. }
  177. }
  178. evaluate() {
  179. if (this.samples.length < this.minSamples)
  180. return;
  181. const failures = this.samples.filter((s) => !s.ok).length;
  182. const rate = failures / this.samples.length;
  183. if (rate > this.threshold) {
  184. this.state = "open";
  185. this.openedAt = this.now();
  186. }
  187. }
  188. tickAutoReset() {
  189. if (this.state === "open" && this.openedAt !== null) {
  190. if (this.now() - this.openedAt >= this.openDurationMs) {
  191. this.state = "half-open";
  192. }
  193. }
  194. }
  195. }
  196. // ─────────────────────────── Errors ──────────────────────────────────────────
  197. /**
  198. * Raised when the circuit breaker is OPEN and a call is short-circuited.
  199. * Callers (e.g. fallback wrapper) can catch this to switch to local provider.
  200. */
  201. export class CircuitOpenError extends Error {
  202. constructor(message = "OpenAIEmbeddingsProvider circuit is OPEN") {
  203. super(message);
  204. this.name = "CircuitOpenError";
  205. }
  206. }
  207. /**
  208. * Persistent (non-retryable) HTTP error from upstream. Includes status code.
  209. */
  210. export class HttpError extends Error {
  211. status;
  212. bodyPreview;
  213. constructor(status, bodyPreview) {
  214. super(`HTTP ${status}: ${bodyPreview.slice(0, 200)}`);
  215. this.name = "HttpError";
  216. this.status = status;
  217. this.bodyPreview = bodyPreview.slice(0, 1024);
  218. }
  219. }
  220. // ─────────────────────────── Provider ────────────────────────────────────────
  221. export class OpenAIEmbeddingsProvider {
  222. kind = "openai";
  223. endpoint;
  224. apiKey;
  225. modelId;
  226. upstreamModel;
  227. batchSize;
  228. timeoutMs;
  229. fetchImpl;
  230. retryBackoffsMs;
  231. sleep;
  232. now;
  233. dimensions = undefined;
  234. lastError = undefined;
  235. breaker;
  236. constructor(config) {
  237. if (!config.endpoint) {
  238. throw new Error("OpenAIEmbeddingsProvider: endpoint is required");
  239. }
  240. this.endpoint = config.endpoint.replace(/\/+$/, "");
  241. this.apiKey = config.apiKey;
  242. this.modelId = config.modelId ?? "embeddinggemma";
  243. this.upstreamModel = config.upstreamModel ?? this.modelId;
  244. this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE;
  245. this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
  246. this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
  247. this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS;
  248. this.sleep = config.sleep ?? defaultSleep;
  249. this.now = config.now ?? Date.now;
  250. this.breaker = new CircuitBreaker({ now: this.now });
  251. if (!this.fetchImpl) {
  252. throw new Error("OpenAIEmbeddingsProvider: global fetch is unavailable. " +
  253. "Provide a `fetchImpl` config option (Node ≥18 ships fetch by default).");
  254. }
  255. if (this.batchSize < 1) {
  256. throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`);
  257. }
  258. }
  259. getModelId() {
  260. return this.modelId;
  261. }
  262. getDimensions() {
  263. return this.dimensions;
  264. }
  265. /**
  266. * Most recent per-chunk failure message (HTTP status + body preview, malformed
  267. * JSON, timeout, abort reason). Returns `undefined` after a successful call
  268. * or before the first call. See `EmbeddingProvider.getLastError`.
  269. */
  270. getLastError() {
  271. return this.lastError;
  272. }
  273. /** Endpoint URL configured at construction time — used by callers when
  274. * building error messages for failed first-chunk probes. */
  275. getEndpoint() {
  276. return this.endpoint;
  277. }
  278. async healthcheck(signal) {
  279. // Try GET /health first (worker exposes it). Fall back to probe embed.
  280. try {
  281. const { signal: attemptSig, cleanup } = buildAttemptSignal(signal, this.timeoutMs);
  282. try {
  283. const resp = await this.fetchImpl(`${this.endpoint}/health`, {
  284. method: "GET",
  285. headers: this.buildHeaders(),
  286. signal: attemptSig,
  287. });
  288. if (resp.ok) {
  289. return {
  290. ok: true,
  291. model: this.modelId,
  292. dimensions: this.dimensions,
  293. detail: `GET /health → ${resp.status}`,
  294. };
  295. }
  296. return {
  297. ok: false,
  298. model: this.modelId,
  299. detail: `GET /health → HTTP ${resp.status}`,
  300. };
  301. }
  302. finally {
  303. cleanup();
  304. }
  305. }
  306. catch (err) {
  307. // Endpoint may not implement /health — try a single embed probe instead.
  308. try {
  309. const probe = await this.embed("healthcheck", { signal });
  310. if (probe) {
  311. return {
  312. ok: true,
  313. model: this.modelId,
  314. dimensions: probe.embedding.length,
  315. detail: "embed probe ok",
  316. };
  317. }
  318. return {
  319. ok: false,
  320. model: this.modelId,
  321. detail: "embed probe returned null",
  322. };
  323. }
  324. catch (probeErr) {
  325. return {
  326. ok: false,
  327. model: this.modelId,
  328. detail: (err instanceof Error ? err.message : String(err)) +
  329. " | probe: " +
  330. (probeErr instanceof Error ? probeErr.message : String(probeErr)),
  331. };
  332. }
  333. }
  334. }
  335. async embed(text, options = {}) {
  336. const batch = await this.embedBatch([text], options);
  337. return batch[0] ?? null;
  338. }
  339. async embedBatch(texts, options = {}) {
  340. if (texts.length === 0)
  341. return [];
  342. if (this.breaker.shouldFailFast()) {
  343. throw new CircuitOpenError();
  344. }
  345. const chunks = chunkArray(texts, this.batchSize);
  346. const results = new Array(texts.length).fill(null);
  347. let cursor = 0;
  348. let anySucceeded = false;
  349. for (const chunk of chunks) {
  350. const start = cursor;
  351. cursor += chunk.length;
  352. // Abort early if signal already fired
  353. if (options.signal?.aborted) {
  354. // Leave remaining slots as null (caller treats as errors)
  355. this.lastError = `aborted by caller${options.signal.reason ? `: ${String(options.signal.reason)}` : ""}`;
  356. return results;
  357. }
  358. // Fail-fast if breaker tripped mid-loop
  359. if (this.breaker.shouldFailFast()) {
  360. throw new CircuitOpenError();
  361. }
  362. try {
  363. const embeddings = await this.requestWithRetry(chunk, options);
  364. for (let i = 0; i < chunk.length; i++) {
  365. const embedding = embeddings[i];
  366. if (embedding) {
  367. results[start + i] = {
  368. embedding,
  369. model: this.modelId,
  370. };
  371. anySucceeded = true;
  372. // Record dimensions on first success
  373. if (this.dimensions === undefined) {
  374. this.dimensions = embedding.length;
  375. }
  376. }
  377. }
  378. this.breaker.recordSuccess();
  379. }
  380. catch (err) {
  381. this.breaker.recordFailure();
  382. // CircuitOpenError must propagate so the caller can fall back
  383. if (err instanceof CircuitOpenError)
  384. throw err;
  385. // Capture the underlying error so callers (e.g. the store dimension
  386. // probe) can surface it instead of "Failed to get embedding
  387. // dimensions from first chunk" with no context.
  388. this.lastError = this.formatErrorContext(err);
  389. // Other errors mark the chunk as null and continue with next chunk.
  390. // (The store layer already handles per-text nulls as errors.)
  391. if (process.env.QMD_EMBED_DEBUG) {
  392. process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`);
  393. }
  394. }
  395. }
  396. // Clear lastError on a fully-successful sweep (every input got an embedding).
  397. if (anySucceeded && results.every((r) => r !== null)) {
  398. this.lastError = undefined;
  399. }
  400. return results;
  401. }
  402. async dispose() {
  403. // Nothing to release — fetch handles its own connection pooling.
  404. // Reset the breaker so a re-instantiation starts fresh.
  405. this.breaker.reset();
  406. }
  407. // ────────────────────── Internals ──────────────────────
  408. /**
  409. * Format a request-failure context string for `lastError`. Includes endpoint
  410. * + HTTP status + body preview when the error was an `HttpError`, otherwise
  411. * falls back to the message of the underlying error (or the value itself
  412. * when not an Error). Kept short — body preview is already capped at 1024
  413. * chars by `HttpError`, but we trim further here for the dimension-probe
  414. * thrown error which surfaces directly to users.
  415. */
  416. formatErrorContext(err) {
  417. if (err instanceof HttpError) {
  418. const preview = err.bodyPreview.replace(/\s+/g, " ").trim().slice(0, 240);
  419. return `endpoint=${this.endpoint}/v1/embeddings status=${err.status}${preview ? ` body="${preview}"` : ""}`;
  420. }
  421. if (err instanceof Error) {
  422. return `endpoint=${this.endpoint}/v1/embeddings error="${err.message}"`;
  423. }
  424. return `endpoint=${this.endpoint}/v1/embeddings error="${String(err)}"`;
  425. }
  426. buildHeaders() {
  427. const headers = {
  428. "Content-Type": "application/json",
  429. "Accept": "application/json",
  430. };
  431. if (this.apiKey) {
  432. headers["Authorization"] = `Bearer ${this.apiKey}`;
  433. }
  434. return headers;
  435. }
  436. /**
  437. * Single HTTP request with retry on 429/503. Returns embeddings indexed
  438. * the same as `texts`. Throws on non-retryable failure or all attempts
  439. * exhausted.
  440. */
  441. async requestWithRetry(texts, options) {
  442. let lastErr = null;
  443. const maxAttempts = this.retryBackoffsMs.length + 1;
  444. for (let attempt = 0; attempt < maxAttempts; attempt++) {
  445. // Honor user abort BEFORE issuing the call (avoids wasted network)
  446. if (options.signal?.aborted) {
  447. throw new Error("aborted by caller");
  448. }
  449. try {
  450. return await this.requestOnce(texts, options);
  451. }
  452. catch (err) {
  453. lastErr = err;
  454. const retryable = err instanceof HttpError ? isRetryableStatus(err.status) : false;
  455. if (!retryable)
  456. throw err;
  457. if (attempt < this.retryBackoffsMs.length) {
  458. await this.sleep(this.retryBackoffsMs[attempt]);
  459. }
  460. }
  461. }
  462. // Exhausted retries → throw the last error so caller marks the chunk null
  463. throw lastErr ?? new Error("requestWithRetry exhausted");
  464. }
  465. /**
  466. * Issue one HTTP attempt to `POST /v1/embeddings`. Does NOT retry.
  467. */
  468. async requestOnce(texts, options) {
  469. const { signal: attemptSig, cleanup } = buildAttemptSignal(options.signal, this.timeoutMs);
  470. try {
  471. const body = JSON.stringify({
  472. model: options.model ?? this.upstreamModel,
  473. input: texts,
  474. });
  475. const resp = await this.fetchImpl(`${this.endpoint}/v1/embeddings`, {
  476. method: "POST",
  477. headers: this.buildHeaders(),
  478. body,
  479. signal: attemptSig,
  480. });
  481. if (!resp.ok) {
  482. const text = await resp.text().catch(() => "");
  483. throw new HttpError(resp.status, text);
  484. }
  485. let parsed;
  486. try {
  487. parsed = (await resp.json());
  488. }
  489. catch (err) {
  490. throw new Error(`OpenAIEmbeddingsProvider: malformed JSON from ${this.endpoint}/v1/embeddings: ${err instanceof Error ? err.message : String(err)}`);
  491. }
  492. if (!parsed || !Array.isArray(parsed.data)) {
  493. throw new Error(`OpenAIEmbeddingsProvider: response missing "data" array (got ${typeof parsed})`);
  494. }
  495. // Sort by index to match input order (in case server returns out-of-order).
  496. const out = new Array(texts.length);
  497. for (const item of parsed.data) {
  498. if (typeof item.index !== "number" ||
  499. item.index < 0 ||
  500. item.index >= texts.length) {
  501. throw new Error(`OpenAIEmbeddingsProvider: data item index out of range (${item.index}, expected 0..${texts.length - 1})`);
  502. }
  503. if (!Array.isArray(item.embedding)) {
  504. throw new Error(`OpenAIEmbeddingsProvider: data[${item.index}].embedding is not an array`);
  505. }
  506. out[item.index] = item.embedding;
  507. }
  508. // Sanity check — every slot must be filled
  509. for (let i = 0; i < texts.length; i++) {
  510. if (!out[i]) {
  511. throw new Error(`OpenAIEmbeddingsProvider: response missing embedding for index ${i}`);
  512. }
  513. }
  514. return out;
  515. }
  516. finally {
  517. cleanup();
  518. }
  519. }
  520. }