openai.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. /**
  2. * openai.ts - OpenAI-compatible HTTP embedding provider
  3. *
  4. * Talks to any endpoint that implements `POST /v1/embeddings` with the OpenAI
  5. * shape: request `{model, input: string|string[]}`, response
  6. * `{data: [{embedding: number[], index: number}, ...]}`.
  7. *
  8. * Used by qmd to delegate embeddings to a GPU worker (e.g. ai.mm.mk →
  9. * qmd-embed-worker on `models` LXC, RTX 4090) instead of running
  10. * node-llama-cpp locally.
  11. *
  12. * Features:
  13. * - Batches input in groups of ≤64 (configurable via QMD_EMBED_BATCH_SIZE)
  14. * - Retries 429 / 503 with exponential backoff (1s, 4s, 16s)
  15. * - 4xx (non-429) → no retry, count as failure
  16. * - Circuit breaker: >50% failures in a 60s window → OPEN for 5 min,
  17. * callers can use this to fall back to a local provider
  18. * - Per-call timeout via AbortSignal (default QMD_EMBED_TIMEOUT_MS=30000)
  19. * - Healthcheck via `GET /health` if available, else a probe embed call
  20. */
  21. // ─────────────────────────── Configuration ───────────────────────────────────
  22. /**
  23. * Default batch size — most OpenAI-compatible embedding endpoints accept up to
  24. * 2048 inputs per call but for memory and latency we cap at 64.
  25. */
  26. export const DEFAULT_BATCH_SIZE = 64;
  27. /**
  28. * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes
  29. * <500ms per batch of 64 in practice; 30s is a safe upper bound.
  30. */
  31. export const DEFAULT_TIMEOUT_MS = 30_000;
  32. /**
  33. * Retry backoff schedule (ms) for 429/503 responses. 3 attempts total
  34. * (initial + 2 retries) — aligns with issue spec "1s/4s/16s".
  35. */
  36. export const RETRY_BACKOFFS_MS = [1_000, 4_000, 16_000];
  37. /**
  38. * Circuit breaker — flips OPEN when error rate exceeds threshold within
  39. * window. While OPEN, every call fails fast so the caller can fall back.
  40. */
  41. export const CIRCUIT_WINDOW_MS = 60_000;
  42. export const CIRCUIT_OPEN_DURATION_MS = 5 * 60_000;
  43. export const CIRCUIT_FAILURE_RATE_THRESHOLD = 0.5;
  44. export const CIRCUIT_MIN_SAMPLES = 4;
  45. // ─────────────────────────── Helpers ─────────────────────────────────────────
  46. function defaultSleep(ms) {
  47. return new Promise((resolve) => setTimeout(resolve, ms));
  48. }
  49. /**
  50. * Build the merged AbortSignal for a single HTTP attempt: combines an
  51. * external `userSignal` (from caller / withLLMSession) with a per-attempt
  52. * timeout signal. Returns the merged signal AND the timeout id so the
  53. * caller can `clearTimeout` after the attempt completes (avoids leaks).
  54. */
  55. function buildAttemptSignal(userSignal, timeoutMs) {
  56. const ctrl = new AbortController();
  57. const timeoutId = setTimeout(() => {
  58. ctrl.abort(new Error(`Request timed out after ${timeoutMs}ms`));
  59. }, timeoutMs);
  60. // Don't keep process alive just for this timer
  61. if (typeof timeoutId === "object" && timeoutId !== null && "unref" in timeoutId) {
  62. timeoutId.unref();
  63. }
  64. const onUserAbort = () => ctrl.abort(userSignal?.reason);
  65. if (userSignal) {
  66. if (userSignal.aborted) {
  67. ctrl.abort(userSignal.reason);
  68. }
  69. else {
  70. userSignal.addEventListener("abort", onUserAbort, { once: true });
  71. }
  72. }
  73. const cleanup = () => {
  74. clearTimeout(timeoutId);
  75. if (userSignal)
  76. userSignal.removeEventListener("abort", onUserAbort);
  77. };
  78. return { signal: ctrl.signal, cleanup };
  79. }
  80. /**
  81. * Determine whether an HTTP status is retryable. 429 (Too Many Requests)
  82. * and 503 (Service Unavailable) are retried; 4xx (other than 429) are not.
  83. */
  84. export function isRetryableStatus(status) {
  85. return status === 429 || status === 503;
  86. }
  87. /**
  88. * Chunk an array into pieces of ≤ size each. `size` MUST be ≥ 1.
  89. */
  90. export function chunkArray(items, size) {
  91. if (size < 1)
  92. throw new Error(`chunkArray: size must be ≥ 1, got ${size}`);
  93. if (items.length <= size)
  94. return items.length === 0 ? [] : [items];
  95. const out = [];
  96. for (let i = 0; i < items.length; i += size) {
  97. out.push(items.slice(i, i + size));
  98. }
  99. return out;
  100. }
  101. // ─────────────────────────── Circuit Breaker ─────────────────────────────────
  102. /**
  103. * Sliding-window circuit breaker. Tracks the last N samples (min 4) over a
  104. * 60-second window; flips OPEN when failure rate exceeds 50%, then auto-
  105. * resets to HALF-OPEN after 5 minutes — at which point the next probe
  106. * decides whether to close (success) or re-open (failure).
  107. */
  108. export class CircuitBreaker {
  109. samples = [];
  110. state = "closed";
  111. openedAt = null;
  112. windowMs;
  113. openDurationMs;
  114. threshold;
  115. minSamples;
  116. now;
  117. constructor(opts = {}) {
  118. this.windowMs = opts.windowMs ?? CIRCUIT_WINDOW_MS;
  119. this.openDurationMs = opts.openDurationMs ?? CIRCUIT_OPEN_DURATION_MS;
  120. this.threshold = opts.threshold ?? CIRCUIT_FAILURE_RATE_THRESHOLD;
  121. this.minSamples = opts.minSamples ?? CIRCUIT_MIN_SAMPLES;
  122. this.now = opts.now ?? Date.now;
  123. }
  124. getState() {
  125. this.tickAutoReset();
  126. return this.state;
  127. }
  128. /**
  129. * Returns true when calls should be short-circuited (skip HTTP, fall back).
  130. * Side-effects: may transition OPEN → HALF-OPEN if the open window expired.
  131. */
  132. shouldFailFast() {
  133. return this.getState() === "open";
  134. }
  135. /** Record a successful call. */
  136. recordSuccess() {
  137. // Honor the time-based OPEN→HALF-OPEN transition before deciding what
  138. // to do with this sample. Without this, a success that lands AFTER the
  139. // open window expired would still see state==="open" and never close
  140. // the breaker (a probe call could only flip it via getState()).
  141. this.tickAutoReset();
  142. this.pushSample(true);
  143. if (this.state === "half-open") {
  144. this.state = "closed";
  145. this.openedAt = null;
  146. }
  147. }
  148. /** Record a failed call. May trigger OPEN. */
  149. recordFailure() {
  150. // Same reasoning as recordSuccess — apply lazy auto-reset before
  151. // classifying the sample.
  152. this.tickAutoReset();
  153. this.pushSample(false);
  154. if (this.state === "half-open") {
  155. // Probe failed — re-open
  156. this.state = "open";
  157. this.openedAt = this.now();
  158. return;
  159. }
  160. if (this.state === "closed")
  161. this.evaluate();
  162. }
  163. /** Force-reset the breaker (used by tests / admin) */
  164. reset() {
  165. this.samples = [];
  166. this.state = "closed";
  167. this.openedAt = null;
  168. }
  169. pushSample(ok) {
  170. const ts = this.now();
  171. this.samples.push({ ts, ok });
  172. // Drop samples outside the window
  173. const cutoff = ts - this.windowMs;
  174. while (this.samples.length > 0 && this.samples[0].ts < cutoff) {
  175. this.samples.shift();
  176. }
  177. }
  178. evaluate() {
  179. if (this.samples.length < this.minSamples)
  180. return;
  181. const failures = this.samples.filter((s) => !s.ok).length;
  182. const rate = failures / this.samples.length;
  183. if (rate > this.threshold) {
  184. this.state = "open";
  185. this.openedAt = this.now();
  186. }
  187. }
  188. tickAutoReset() {
  189. if (this.state === "open" && this.openedAt !== null) {
  190. if (this.now() - this.openedAt >= this.openDurationMs) {
  191. this.state = "half-open";
  192. }
  193. }
  194. }
  195. }
  196. // ─────────────────────────── Errors ──────────────────────────────────────────
  197. /**
  198. * Raised when the circuit breaker is OPEN and a call is short-circuited.
  199. * Callers (e.g. fallback wrapper) can catch this to switch to local provider.
  200. */
  201. export class CircuitOpenError extends Error {
  202. constructor(message = "OpenAIEmbeddingsProvider circuit is OPEN") {
  203. super(message);
  204. this.name = "CircuitOpenError";
  205. }
  206. }
  207. /**
  208. * Persistent (non-retryable) HTTP error from upstream. Includes status code.
  209. */
  210. export class HttpError extends Error {
  211. status;
  212. bodyPreview;
  213. constructor(status, bodyPreview) {
  214. super(`HTTP ${status}: ${bodyPreview.slice(0, 200)}`);
  215. this.name = "HttpError";
  216. this.status = status;
  217. this.bodyPreview = bodyPreview.slice(0, 1024);
  218. }
  219. }
  220. // ─────────────────────────── Provider ────────────────────────────────────────
  221. export class OpenAIEmbeddingsProvider {
  222. kind = "openai";
  223. endpoint;
  224. apiKey;
  225. modelId;
  226. upstreamModel;
  227. batchSize;
  228. timeoutMs;
  229. fetchImpl;
  230. retryBackoffsMs;
  231. sleep;
  232. now;
  233. dimensions = undefined;
  234. breaker;
  235. constructor(config) {
  236. if (!config.endpoint) {
  237. throw new Error("OpenAIEmbeddingsProvider: endpoint is required");
  238. }
  239. this.endpoint = config.endpoint.replace(/\/+$/, "");
  240. this.apiKey = config.apiKey;
  241. this.modelId = config.modelId ?? "embeddinggemma";
  242. this.upstreamModel = config.upstreamModel ?? this.modelId;
  243. this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE;
  244. this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
  245. this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
  246. this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS;
  247. this.sleep = config.sleep ?? defaultSleep;
  248. this.now = config.now ?? Date.now;
  249. this.breaker = new CircuitBreaker({ now: this.now });
  250. if (!this.fetchImpl) {
  251. throw new Error("OpenAIEmbeddingsProvider: global fetch is unavailable. " +
  252. "Provide a `fetchImpl` config option (Node ≥18 ships fetch by default).");
  253. }
  254. if (this.batchSize < 1) {
  255. throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`);
  256. }
  257. }
  258. getModelId() {
  259. return this.modelId;
  260. }
  261. getDimensions() {
  262. return this.dimensions;
  263. }
  264. async healthcheck(signal) {
  265. // Try GET /health first (worker exposes it). Fall back to probe embed.
  266. try {
  267. const { signal: attemptSig, cleanup } = buildAttemptSignal(signal, this.timeoutMs);
  268. try {
  269. const resp = await this.fetchImpl(`${this.endpoint}/health`, {
  270. method: "GET",
  271. headers: this.buildHeaders(),
  272. signal: attemptSig,
  273. });
  274. if (resp.ok) {
  275. return {
  276. ok: true,
  277. model: this.modelId,
  278. dimensions: this.dimensions,
  279. detail: `GET /health → ${resp.status}`,
  280. };
  281. }
  282. return {
  283. ok: false,
  284. model: this.modelId,
  285. detail: `GET /health → HTTP ${resp.status}`,
  286. };
  287. }
  288. finally {
  289. cleanup();
  290. }
  291. }
  292. catch (err) {
  293. // Endpoint may not implement /health — try a single embed probe instead.
  294. try {
  295. const probe = await this.embed("healthcheck", { signal });
  296. if (probe) {
  297. return {
  298. ok: true,
  299. model: this.modelId,
  300. dimensions: probe.embedding.length,
  301. detail: "embed probe ok",
  302. };
  303. }
  304. return {
  305. ok: false,
  306. model: this.modelId,
  307. detail: "embed probe returned null",
  308. };
  309. }
  310. catch (probeErr) {
  311. return {
  312. ok: false,
  313. model: this.modelId,
  314. detail: (err instanceof Error ? err.message : String(err)) +
  315. " | probe: " +
  316. (probeErr instanceof Error ? probeErr.message : String(probeErr)),
  317. };
  318. }
  319. }
  320. }
  321. async embed(text, options = {}) {
  322. const batch = await this.embedBatch([text], options);
  323. return batch[0] ?? null;
  324. }
  325. async embedBatch(texts, options = {}) {
  326. if (texts.length === 0)
  327. return [];
  328. if (this.breaker.shouldFailFast()) {
  329. throw new CircuitOpenError();
  330. }
  331. const chunks = chunkArray(texts, this.batchSize);
  332. const results = new Array(texts.length).fill(null);
  333. let cursor = 0;
  334. for (const chunk of chunks) {
  335. const start = cursor;
  336. cursor += chunk.length;
  337. // Abort early if signal already fired
  338. if (options.signal?.aborted) {
  339. // Leave remaining slots as null (caller treats as errors)
  340. return results;
  341. }
  342. // Fail-fast if breaker tripped mid-loop
  343. if (this.breaker.shouldFailFast()) {
  344. throw new CircuitOpenError();
  345. }
  346. try {
  347. const embeddings = await this.requestWithRetry(chunk, options);
  348. for (let i = 0; i < chunk.length; i++) {
  349. const embedding = embeddings[i];
  350. if (embedding) {
  351. results[start + i] = {
  352. embedding,
  353. model: this.modelId,
  354. };
  355. // Record dimensions on first success
  356. if (this.dimensions === undefined) {
  357. this.dimensions = embedding.length;
  358. }
  359. }
  360. }
  361. this.breaker.recordSuccess();
  362. }
  363. catch (err) {
  364. this.breaker.recordFailure();
  365. // CircuitOpenError must propagate so the caller can fall back
  366. if (err instanceof CircuitOpenError)
  367. throw err;
  368. // Other errors mark the chunk as null and continue with next chunk.
  369. // (The store layer already handles per-text nulls as errors.)
  370. if (process.env.QMD_EMBED_DEBUG) {
  371. process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`);
  372. }
  373. }
  374. }
  375. return results;
  376. }
  377. async dispose() {
  378. // Nothing to release — fetch handles its own connection pooling.
  379. // Reset the breaker so a re-instantiation starts fresh.
  380. this.breaker.reset();
  381. }
  382. // ────────────────────── Internals ──────────────────────
  383. buildHeaders() {
  384. const headers = {
  385. "Content-Type": "application/json",
  386. "Accept": "application/json",
  387. };
  388. if (this.apiKey) {
  389. headers["Authorization"] = `Bearer ${this.apiKey}`;
  390. }
  391. return headers;
  392. }
  393. /**
  394. * Single HTTP request with retry on 429/503. Returns embeddings indexed
  395. * the same as `texts`. Throws on non-retryable failure or all attempts
  396. * exhausted.
  397. */
  398. async requestWithRetry(texts, options) {
  399. let lastErr = null;
  400. const maxAttempts = this.retryBackoffsMs.length + 1;
  401. for (let attempt = 0; attempt < maxAttempts; attempt++) {
  402. // Honor user abort BEFORE issuing the call (avoids wasted network)
  403. if (options.signal?.aborted) {
  404. throw new Error("aborted by caller");
  405. }
  406. try {
  407. return await this.requestOnce(texts, options);
  408. }
  409. catch (err) {
  410. lastErr = err;
  411. const retryable = err instanceof HttpError ? isRetryableStatus(err.status) : false;
  412. if (!retryable)
  413. throw err;
  414. if (attempt < this.retryBackoffsMs.length) {
  415. await this.sleep(this.retryBackoffsMs[attempt]);
  416. }
  417. }
  418. }
  419. // Exhausted retries → throw the last error so caller marks the chunk null
  420. throw lastErr ?? new Error("requestWithRetry exhausted");
  421. }
  422. /**
  423. * Issue one HTTP attempt to `POST /v1/embeddings`. Does NOT retry.
  424. */
  425. async requestOnce(texts, options) {
  426. const { signal: attemptSig, cleanup } = buildAttemptSignal(options.signal, this.timeoutMs);
  427. try {
  428. const body = JSON.stringify({
  429. model: options.model ?? this.upstreamModel,
  430. input: texts,
  431. });
  432. const resp = await this.fetchImpl(`${this.endpoint}/v1/embeddings`, {
  433. method: "POST",
  434. headers: this.buildHeaders(),
  435. body,
  436. signal: attemptSig,
  437. });
  438. if (!resp.ok) {
  439. const text = await resp.text().catch(() => "");
  440. throw new HttpError(resp.status, text);
  441. }
  442. let parsed;
  443. try {
  444. parsed = (await resp.json());
  445. }
  446. catch (err) {
  447. throw new Error(`OpenAIEmbeddingsProvider: malformed JSON from ${this.endpoint}/v1/embeddings: ${err instanceof Error ? err.message : String(err)}`);
  448. }
  449. if (!parsed || !Array.isArray(parsed.data)) {
  450. throw new Error(`OpenAIEmbeddingsProvider: response missing "data" array (got ${typeof parsed})`);
  451. }
  452. // Sort by index to match input order (in case server returns out-of-order).
  453. const out = new Array(texts.length);
  454. for (const item of parsed.data) {
  455. if (typeof item.index !== "number" ||
  456. item.index < 0 ||
  457. item.index >= texts.length) {
  458. throw new Error(`OpenAIEmbeddingsProvider: data item index out of range (${item.index}, expected 0..${texts.length - 1})`);
  459. }
  460. if (!Array.isArray(item.embedding)) {
  461. throw new Error(`OpenAIEmbeddingsProvider: data[${item.index}].embedding is not an array`);
  462. }
  463. out[item.index] = item.embedding;
  464. }
  465. // Sanity check — every slot must be filled
  466. for (let i = 0; i < texts.length; i++) {
  467. if (!out[i]) {
  468. throw new Error(`OpenAIEmbeddingsProvider: response missing embedding for index ${i}`);
  469. }
  470. }
  471. return out;
  472. }
  473. finally {
  474. cleanup();
  475. }
  476. }
  477. }