/** * embedding-openai.test.ts - Tests for OpenAIEmbeddingsProvider (HTTP backend). * * Uses a mock fetch — no network required. Covers: * - 200 happy path * - 429 → retry → success * - 503 persistent → exhausted retries → null * - 4xx (non-429) → no retry, immediate failure * - batch chunking (>64 items → multiple HTTP calls) * - timeout / abort * - malformed JSON / missing data array * - circuit breaker open + half-open recovery * - dimension probing * - healthcheck endpoint */ import { describe, test, expect, vi } from "vitest"; import { OpenAIEmbeddingsProvider, CircuitBreaker, CircuitOpenError, HttpError, isRetryableStatus, chunkArray, RETRY_BACKOFFS_MS, } from "../src/embedding/openai.js"; // ─────────────────────────── Helpers ───────────────────────────────────────── function mockResponse(status: number, body: unknown, opts?: { delayMs?: number }): Response { const text = typeof body === "string" ? body : JSON.stringify(body); const init: ResponseInit = { status, headers: { "content-type": "application/json" }, }; if (opts?.delayMs) { // Synchronous Response — test code awaits it directly, so delayMs would // need to be implemented in the fetch wrapper, not here. } return new Response(text, init); } function makeFetchSequence(responses: Array<() => Promise | Response>): { fetchImpl: typeof fetch; calls: { url: string; init?: RequestInit }[]; } { const calls: { url: string; init?: RequestInit }[] = []; let i = 0; const fetchImpl = (async (input: RequestInfo | URL, init?: RequestInit) => { const url = typeof input === "string" ? input : input.toString(); calls.push({ url, init }); if (i >= responses.length) throw new Error(`Mock fetch exhausted at call ${i + 1}`); const r = responses[i++]!(); return r instanceof Promise ? r : r; }) as typeof fetch; return { fetchImpl, calls }; } function fakeEmbedding(dim: number, seed = 0): number[] { return Array.from({ length: dim }, (_, i) => Math.sin(seed + i) * 0.5); } function embeddingsResponse(texts: string[], dim = 4): Response { return mockResponse(200, { object: "list", model: "embeddinggemma:300m", data: texts.map((_, i) => ({ object: "embedding", index: i, embedding: fakeEmbedding(dim, i * 7), })), }); } // ─────────────────────────── Pure helpers ──────────────────────────────────── describe("isRetryableStatus", () => { test("429 retryable", () => expect(isRetryableStatus(429)).toBe(true)); test("503 retryable", () => expect(isRetryableStatus(503)).toBe(true)); test("400 NOT retryable", () => expect(isRetryableStatus(400)).toBe(false)); test("401 NOT retryable", () => expect(isRetryableStatus(401)).toBe(false)); test("404 NOT retryable", () => expect(isRetryableStatus(404)).toBe(false)); test("500 NOT retryable", () => expect(isRetryableStatus(500)).toBe(false)); test("502 NOT retryable", () => expect(isRetryableStatus(502)).toBe(false)); test("200 NOT retryable", () => expect(isRetryableStatus(200)).toBe(false)); }); describe("chunkArray", () => { test("empty input → empty output", () => { expect(chunkArray([], 5)).toEqual([]); }); test("input ≤ size → single chunk", () => { expect(chunkArray([1, 2, 3], 5)).toEqual([[1, 2, 3]]); }); test("input = size → single chunk", () => { expect(chunkArray([1, 2, 3, 4, 5], 5)).toEqual([[1, 2, 3, 4, 5]]); }); test("input > size → multiple chunks", () => { expect(chunkArray([1, 2, 3, 4, 5, 6, 7], 3)).toEqual([ [1, 2, 3], [4, 5, 6], [7], ]); }); test("65 items at size 64 → 64 + 1", () => { const items = Array.from({ length: 65 }, (_, i) => i); const chunks = chunkArray(items, 64); expect(chunks.length).toBe(2); expect(chunks[0]!.length).toBe(64); expect(chunks[1]!.length).toBe(1); }); test("size < 1 throws", () => { expect(() => chunkArray([1, 2, 3], 0)).toThrow(); expect(() => chunkArray([1, 2, 3], -1)).toThrow(); }); }); // ─────────────────────────── Circuit Breaker ───────────────────────────────── describe("CircuitBreaker", () => { test("starts closed", () => { const cb = new CircuitBreaker(); expect(cb.getState()).toBe("closed"); expect(cb.shouldFailFast()).toBe(false); }); test("stays closed below minSamples even with all-failures", () => { const cb = new CircuitBreaker({ minSamples: 4, threshold: 0.5 }); cb.recordFailure(); cb.recordFailure(); cb.recordFailure(); expect(cb.getState()).toBe("closed"); }); test("opens when failure rate exceeds threshold", () => { const cb = new CircuitBreaker({ minSamples: 4, threshold: 0.5 }); cb.recordFailure(); cb.recordFailure(); cb.recordFailure(); cb.recordFailure(); expect(cb.getState()).toBe("open"); expect(cb.shouldFailFast()).toBe(true); }); test("transitions OPEN → HALF-OPEN after openDurationMs", () => { let now = 1_000_000; const cb = new CircuitBreaker({ minSamples: 4, threshold: 0.5, openDurationMs: 5000, now: () => now, }); for (let i = 0; i < 4; i++) cb.recordFailure(); expect(cb.getState()).toBe("open"); now += 5001; expect(cb.getState()).toBe("half-open"); }); test("HALF-OPEN + success → CLOSED", () => { let now = 1_000_000; const cb = new CircuitBreaker({ minSamples: 4, threshold: 0.5, openDurationMs: 5000, now: () => now, }); for (let i = 0; i < 4; i++) cb.recordFailure(); now += 5001; cb.recordSuccess(); // half-open probe expect(cb.getState()).toBe("closed"); }); test("HALF-OPEN + failure → re-OPEN", () => { let now = 1_000_000; const cb = new CircuitBreaker({ minSamples: 4, threshold: 0.5, openDurationMs: 5000, now: () => now, }); for (let i = 0; i < 4; i++) cb.recordFailure(); now += 5001; expect(cb.getState()).toBe("half-open"); cb.recordFailure(); expect(cb.getState()).toBe("open"); }); test("samples outside window are dropped", () => { let now = 1_000_000; const cb = new CircuitBreaker({ minSamples: 4, threshold: 0.5, windowMs: 1000, now: () => now, }); cb.recordFailure(); cb.recordFailure(); now += 1500; // window expired cb.recordSuccess(); cb.recordSuccess(); cb.recordSuccess(); cb.recordSuccess(); // Old failures should be discarded; rate = 0/4 < threshold expect(cb.getState()).toBe("closed"); }); test("reset() clears state", () => { const cb = new CircuitBreaker({ minSamples: 2, threshold: 0.5 }); cb.recordFailure(); cb.recordFailure(); expect(cb.getState()).toBe("open"); cb.reset(); expect(cb.getState()).toBe("closed"); }); }); // ─────────────────────────── HappyPath ─────────────────────────────────────── describe("OpenAIEmbeddingsProvider — happy path", () => { test("single embed call → 200 success", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(["hello"], 4), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); const r = await p.embed("hello"); expect(r).not.toBeNull(); expect(r!.embedding.length).toBe(4); expect(r!.model).toBe("embeddinggemma"); expect(p.getDimensions()).toBe(4); expect(calls.length).toBe(1); expect(calls[0]!.url).toBe("https://ai.example.com/v1/embeddings"); }); test("strips trailing slashes from endpoint", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(["x"], 2), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com////", fetchImpl, }); await p.embed("x"); expect(calls[0]!.url).toBe("https://ai.example.com/v1/embeddings"); }); test("batch of 3 → 1 HTTP call", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(["a", "b", "c"], 3), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); const result = await p.embedBatch(["a", "b", "c"]); expect(result.length).toBe(3); expect(result.every((r) => r !== null)).toBe(true); expect(calls.length).toBe(1); }); test("respects custom modelId / upstreamModel in request body", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(["x"], 2), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", modelId: "embeddinggemma", upstreamModel: "embeddinggemma:300m", fetchImpl, }); const r = await p.embed("x"); expect(r!.model).toBe("embeddinggemma"); const body = JSON.parse(calls[0]!.init!.body as string); expect(body.model).toBe("embeddinggemma:300m"); }); test("Authorization header set when apiKey provided", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(["x"], 2), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", apiKey: "sk-test-123", fetchImpl, }); await p.embed("x"); const headers = calls[0]!.init!.headers as Record; expect(headers["Authorization"]).toBe("Bearer sk-test-123"); }); test("Authorization header omitted when apiKey not provided", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(["x"], 2), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); await p.embed("x"); const headers = calls[0]!.init!.headers as Record; expect(headers["Authorization"]).toBeUndefined(); }); }); // ─────────────────────────── Batch chunking ────────────────────────────────── describe("OpenAIEmbeddingsProvider — batch chunking", () => { test("100 items at batchSize=64 → 2 HTTP calls (64 + 36)", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(Array.from({ length: 64 }, () => "x"), 4), () => embeddingsResponse(Array.from({ length: 36 }, () => "x"), 4), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, batchSize: 64, }); const texts = Array.from({ length: 100 }, (_, i) => `text-${i}`); const result = await p.embedBatch(texts); expect(result.length).toBe(100); expect(result.every((r) => r !== null)).toBe(true); expect(calls.length).toBe(2); const body0 = JSON.parse(calls[0]!.init!.body as string); const body1 = JSON.parse(calls[1]!.init!.body as string); expect(body0.input.length).toBe(64); expect(body1.input.length).toBe(36); }); test("custom batchSize=10 → multiple smaller calls", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(Array.from({ length: 10 }, () => "x"), 2), () => embeddingsResponse(Array.from({ length: 10 }, () => "x"), 2), () => embeddingsResponse(Array.from({ length: 5 }, () => "x"), 2), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, batchSize: 10, }); const texts = Array.from({ length: 25 }, (_, i) => `t${i}`); const result = await p.embedBatch(texts); expect(result.length).toBe(25); expect(result.every((r) => r !== null)).toBe(true); expect(calls.length).toBe(3); }); test("empty input → no HTTP calls", async () => { const { fetchImpl, calls } = makeFetchSequence([]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); const result = await p.embedBatch([]); expect(result).toEqual([]); expect(calls.length).toBe(0); }); }); // ─────────────────────────── Retry behavior ────────────────────────────────── describe("OpenAIEmbeddingsProvider — retry on 429/503", () => { test("429 → retry → success", async () => { const sleepCalls: number[] = []; const { fetchImpl, calls } = makeFetchSequence([ () => mockResponse(429, { error: "rate limit" }), () => embeddingsResponse(["x"], 4), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [10, 20, 40], sleep: async (ms) => { sleepCalls.push(ms); }, }); const r = await p.embed("x"); expect(r).not.toBeNull(); expect(calls.length).toBe(2); expect(sleepCalls).toEqual([10]); }); test("503 → retry → success", async () => { const sleepCalls: number[] = []; const { fetchImpl, calls } = makeFetchSequence([ () => mockResponse(503, { error: "service unavailable" }), () => embeddingsResponse(["x"], 4), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [5, 10, 20], sleep: async (ms) => { sleepCalls.push(ms); }, }); const r = await p.embed("x"); expect(r).not.toBeNull(); expect(calls.length).toBe(2); expect(sleepCalls).toEqual([5]); }); test("503 persistent → exhausted retries → null result", async () => { const sleepCalls: number[] = []; const { fetchImpl, calls } = makeFetchSequence([ () => mockResponse(503, "down"), () => mockResponse(503, "down"), () => mockResponse(503, "down"), () => mockResponse(503, "down"), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [1, 2, 4], sleep: async (ms) => { sleepCalls.push(ms); }, }); const r = await p.embed("x"); expect(r).toBeNull(); expect(calls.length).toBe(4); // initial + 3 retries expect(sleepCalls).toEqual([1, 2, 4]); }); test("default backoff schedule is 1s/4s/16s", () => { expect(RETRY_BACKOFFS_MS).toEqual([1000, 4000, 16000]); }); test("4xx (non-429) → immediate failure, no retry", async () => { const sleepCalls: number[] = []; const { fetchImpl, calls } = makeFetchSequence([ () => mockResponse(401, { error: "unauthorized" }), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [10, 20, 40], sleep: async (ms) => { sleepCalls.push(ms); }, }); const r = await p.embed("x"); expect(r).toBeNull(); expect(calls.length).toBe(1); // no retries expect(sleepCalls).toEqual([]); }); test("404 → immediate failure, no retry", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => mockResponse(404, "not found"), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [1], sleep: async () => {}, }); await p.embed("x"); expect(calls.length).toBe(1); }); }); // ─────────────────────────── Malformed responses ───────────────────────────── describe("OpenAIEmbeddingsProvider — malformed responses", () => { test("malformed JSON → null result", async () => { const { fetchImpl } = makeFetchSequence([ () => new Response("not-json{}", { status: 200 }), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [], sleep: async () => {}, }); const r = await p.embed("x"); expect(r).toBeNull(); }); test("missing data array → null result", async () => { const { fetchImpl } = makeFetchSequence([ () => mockResponse(200, { object: "list", model: "x" }), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [], sleep: async () => {}, }); const r = await p.embed("x"); expect(r).toBeNull(); }); test("data item index out of range → null result", async () => { const { fetchImpl } = makeFetchSequence([ () => mockResponse(200, { object: "list", data: [ { index: 5, embedding: [0.1, 0.2] }, // out of range for 1 input ], }), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [], sleep: async () => {}, }); const r = await p.embed("x"); expect(r).toBeNull(); }); test("response handles out-of-order data array (sorts by index)", async () => { const { fetchImpl } = makeFetchSequence([ () => mockResponse(200, { object: "list", data: [ { index: 1, embedding: [0.7, 0.8] }, { index: 0, embedding: [0.1, 0.2] }, ], }), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); const result = await p.embedBatch(["zero", "one"]); expect(result.length).toBe(2); expect(result[0]!.embedding).toEqual([0.1, 0.2]); expect(result[1]!.embedding).toEqual([0.7, 0.8]); }); }); // ─────────────────────────── Timeout / abort ───────────────────────────────── describe("OpenAIEmbeddingsProvider — timeout and abort", () => { test("user abort signal → null result + no further calls", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => embeddingsResponse(["a", "b"], 2), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, batchSize: 1, }); const ctrl = new AbortController(); ctrl.abort(new Error("user cancelled")); const result = await p.embedBatch(["a", "b"], { signal: ctrl.signal }); expect(result).toEqual([null, null]); expect(calls.length).toBe(0); // signal aborted before first call }); test("per-attempt timeout aborts a slow request", async () => { let aborted = false; const fetchImpl = (async (_url: any, init?: RequestInit) => { return await new Promise((_resolve, reject) => { const sig = init?.signal; sig?.addEventListener("abort", () => { aborted = true; reject(new DOMException("aborted", "AbortError")); }); }); }) as typeof fetch; const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, timeoutMs: 50, retryBackoffsMs: [], sleep: async () => {}, }); const r = await p.embed("hello"); expect(r).toBeNull(); expect(aborted).toBe(true); }); }); // ─────────────────────────── Circuit breaker integration ───────────────────── describe("OpenAIEmbeddingsProvider — circuit breaker integration", () => { test("repeated failures eventually trip breaker → CircuitOpenError", async () => { // 4 chunks of size 1 = 4 sample slots. All fail with 401 → 4 failures → breaker opens. const { fetchImpl } = makeFetchSequence([ () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), // shouldn't be reached ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, batchSize: 1, retryBackoffsMs: [], sleep: async () => {}, }); // First call: 4 sub-chunks, all fail, breaker opens during 4th const result1 = await p.embedBatch(["a", "b", "c", "d"]); expect(result1.every((x) => x === null)).toBe(true); expect(p.breaker.getState()).toBe("open"); // Second call: breaker fails fast await expect(p.embedBatch(["e"])).rejects.toBeInstanceOf(CircuitOpenError); }); test("breaker recovers after openDuration → success closes it", async () => { let now = 1_000_000; const { fetchImpl } = makeFetchSequence([ () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => embeddingsResponse(["recovered"], 2), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, batchSize: 1, retryBackoffsMs: [], sleep: async () => {}, now: () => now, }); // Override breaker with a shorter open duration (p as any).breaker = new CircuitBreaker({ minSamples: 4, threshold: 0.5, openDurationMs: 1000, now: () => now, }); await p.embedBatch(["a", "b", "c", "d"]); expect((p as any).breaker.getState()).toBe("open"); now += 1500; expect((p as any).breaker.getState()).toBe("half-open"); const r = await p.embed("recovered"); expect(r).not.toBeNull(); expect((p as any).breaker.getState()).toBe("closed"); }); }); // ─────────────────────────── Healthcheck ───────────────────────────────────── describe("OpenAIEmbeddingsProvider — healthcheck", () => { test("healthcheck pings GET /health when available", async () => { const { fetchImpl, calls } = makeFetchSequence([ () => mockResponse(200, { status: "ok", model: "embeddinggemma:300m", }), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); const h = await p.healthcheck(); expect(h.ok).toBe(true); expect(calls.length).toBe(1); expect(calls[0]!.url).toBe("https://ai.example.com/health"); expect(calls[0]!.init!.method).toBe("GET"); }); test("healthcheck failure → falls through to embed probe", async () => { const { fetchImpl } = makeFetchSequence([ () => mockResponse(404, "no /health"), // Then fall back to /v1/embeddings probe () => embeddingsResponse(["healthcheck"], 4), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); const h = await p.healthcheck(); // 404 isn't an exception, it returns ok:false from the /health branch // The fallback probe is only triggered on actual exceptions. expect(h.ok).toBe(false); expect(h.detail).toContain("404"); }); }); // ─────────────────────────── HttpError ─────────────────────────────────────── describe("HttpError", () => { test("preserves status and body preview", () => { const err = new HttpError(429, "rate limit exceeded"); expect(err.status).toBe(429); expect(err.bodyPreview).toBe("rate limit exceeded"); expect(err.message).toContain("HTTP 429"); }); test("truncates long bodies in message", () => { const longBody = "x".repeat(500); const err = new HttpError(500, longBody); expect(err.message.length).toBeLessThan(longBody.length + 200); }); }); // ─────────────────────────── lastError tracking (i-vm1lxwry) ──────────────── describe("OpenAIEmbeddingsProvider — getLastError (i-vm1lxwry)", () => { test("returns undefined before first call", () => { const { fetchImpl } = makeFetchSequence([]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, }); expect(p.getLastError()).toBeUndefined(); }); test("captures HTTP status + endpoint after non-retryable failure", async () => { const { fetchImpl } = makeFetchSequence([ () => mockResponse(500, "internal error: GPU OOM"), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [], sleep: async () => {}, }); const r = await p.embed("hello"); expect(r).toBeNull(); const lastErr = p.getLastError(); expect(lastErr).toBeDefined(); expect(lastErr).toContain("https://ai.example.com/v1/embeddings"); expect(lastErr).toContain("status=500"); expect(lastErr).toContain("internal error: GPU OOM"); }); test("captures malformed-JSON error message", async () => { const { fetchImpl } = makeFetchSequence([ () => new Response("not json at all", { status: 200, headers: { "content-type": "application/json" } }), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [], sleep: async () => {}, }); const r = await p.embed("hello"); expect(r).toBeNull(); const lastErr = p.getLastError(); expect(lastErr).toBeDefined(); expect(lastErr).toContain("https://ai.example.com/v1/embeddings"); expect(lastErr).toMatch(/error="/); }); test("clears lastError after a fully-successful sweep", async () => { const { fetchImpl } = makeFetchSequence([ () => mockResponse(500, "fail"), () => embeddingsResponse(["recovered"], 4), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, retryBackoffsMs: [], sleep: async () => {}, }); // First call fails — lastError set const r1 = await p.embed("first"); expect(r1).toBeNull(); expect(p.getLastError()).toBeDefined(); // Second call succeeds — lastError cleared const r2 = await p.embed("recovered"); expect(r2).not.toBeNull(); expect(p.getLastError()).toBeUndefined(); }); test("getEndpoint() exposes configured endpoint (no trailing slash)", () => { const { fetchImpl } = makeFetchSequence([]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com//", fetchImpl, }); expect(p.getEndpoint()).toBe("https://ai.example.com"); }); }); // ─────────────────────────── dispose ───────────────────────────────────────── // ─────────────────────────── Concurrent dispatch (i-fkpnar9i Phase 1 #1) ───── /** * Fetch helper that lets a test: * - count how many requests are in-flight at any moment * - control resolution order via per-call deferred promises * - inspect the start order vs resolution order * * Each `responses[i]` returns a Response (or Promise); the helper * wraps each call so it awaits a `gate[i]` deferred BEFORE responding. Tests * call `release(i)` to let the i-th request settle. Useful for testing that * concurrent dispatch actually overlaps requests. */ function makeGatedFetchSequence(count: number): { fetchImpl: typeof fetch; inFlight: () => number; startOrder: number[]; release: (idx: number, response: Response) => void; releaseAll: (responseFor: (idx: number) => Response) => void; } { const gates: Array<{ resolve: (r: Response) => void }> = []; const startOrder: number[] = []; let inFlight = 0; let nextStart = 0; for (let i = 0; i < count; i++) { let resolveFn: (r: Response) => void = () => {}; new Promise((resolve) => { resolveFn = resolve; }); // re-create properly: let r2: (x: Response) => void = () => {}; const p = new Promise((resolve) => { r2 = resolve; }); gates.push({ resolve: r2 }); // attach the unresolved promise back to the slot via a closure (below) (gates[i] as any).promise = p; } const fetchImpl = (async (_input: RequestInfo | URL, _init?: RequestInit) => { const idx = nextStart++; if (idx >= count) throw new Error(`gated fetch exhausted at ${idx + 1}`); startOrder.push(idx); inFlight++; try { const r = await (gates[idx] as any).promise; return r as Response; } finally { inFlight--; } }) as typeof fetch; return { fetchImpl, inFlight: () => inFlight, startOrder, release: (idx: number, response: Response) => gates[idx]!.resolve(response), releaseAll: (responseFor: (idx: number) => Response) => { for (let i = 0; i < count; i++) gates[i]!.resolve(responseFor(i)); }, }; } describe("OpenAIEmbeddingsProvider — concurrent dispatch (i-fkpnar9i)", () => { test("default concurrency is 4 — 8 chunks of size 1, max in-flight = 4", async () => { const N = 8; const gated = makeGatedFetchSequence(N); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: gated.fetchImpl, batchSize: 1, // each text becomes its own chunk }); // Expected concurrency=4 default expect((p as any).concurrency).toBe(4); const texts = Array.from({ length: N }, (_, i) => `t${i}`); const promise = p.embedBatch(texts); // Yield to the microtask queue so workers can start their first dispatch. // Multiple yields needed for chained `await this.requestWithRetry → await fetch`. for (let i = 0; i < 5; i++) await Promise.resolve(); expect(gated.inFlight()).toBe(4); expect(gated.startOrder).toEqual([0, 1, 2, 3]); // Release first 4 in reverse order — concurrent dispatch should still // preserve input order in `results` because each worker writes to its // pre-computed slot. for (let i = 3; i >= 0; i--) { gated.release(i, embeddingsResponse([`t${i}`], 4)); } // Yield to let workers pick up next chunks for (let i = 0; i < 10; i++) await Promise.resolve(); expect(gated.inFlight()).toBeGreaterThan(0); // 4 more workers should be in flight expect(gated.startOrder.length).toBe(8); // all 8 dispatched // Release the rest for (let i = 4; i < N; i++) { gated.release(i, embeddingsResponse([`t${i}`], 4)); } const result = await promise; expect(result.length).toBe(N); // Critical: input order preserved despite out-of-order resolution for (let i = 0; i < N; i++) { expect(result[i]).not.toBeNull(); expect(result[i]!.model).toBe("embeddinggemma"); } expect(gated.inFlight()).toBe(0); }); test("explicit concurrency=2 — only 2 in-flight at any moment", async () => { const N = 6; const gated = makeGatedFetchSequence(N); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: gated.fetchImpl, batchSize: 1, concurrency: 2, }); const texts = Array.from({ length: N }, (_, i) => `t${i}`); const promise = p.embedBatch(texts); for (let i = 0; i < 5; i++) await Promise.resolve(); expect(gated.inFlight()).toBe(2); // Cycle: release one, wait, expect new one started gated.release(0, embeddingsResponse(["t0"], 4)); for (let i = 0; i < 10; i++) await Promise.resolve(); expect(gated.inFlight()).toBe(2); // still 2 — slot filled by t2 // Drain the rest for (let i = 1; i < N; i++) { gated.release(i, embeddingsResponse([`t${i}`], 4)); for (let j = 0; j < 5; j++) await Promise.resolve(); } const result = await promise; expect(result.every((r) => r !== null)).toBe(true); }); test("concurrency=1 reproduces legacy sequential behavior", async () => { const N = 4; const gated = makeGatedFetchSequence(N); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: gated.fetchImpl, batchSize: 1, concurrency: 1, }); const texts = Array.from({ length: N }, (_, i) => `t${i}`); const promise = p.embedBatch(texts); for (let i = 0; i < 5; i++) await Promise.resolve(); expect(gated.inFlight()).toBe(1); expect(gated.startOrder).toEqual([0]); // Release one at a time, confirm the next starts only after. for (let i = 0; i < N; i++) { gated.release(i, embeddingsResponse([`t${i}`], 4)); for (let j = 0; j < 5; j++) await Promise.resolve(); } await promise; // All started in order (sequential) expect(gated.startOrder).toEqual([0, 1, 2, 3]); }); test("results in input order even when the LAST chunk resolves first", async () => { const N = 4; const gated = makeGatedFetchSequence(N); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: gated.fetchImpl, batchSize: 1, concurrency: 4, }); const texts = ["alpha", "beta", "gamma", "delta"]; const promise = p.embedBatch(texts); // Wait for all 4 to be in flight, then resolve LAST first for (let i = 0; i < 5; i++) await Promise.resolve(); expect(gated.inFlight()).toBe(4); gated.release(3, embeddingsResponse(["delta"], 4)); gated.release(2, embeddingsResponse(["gamma"], 4)); gated.release(1, embeddingsResponse(["beta"], 4)); gated.release(0, embeddingsResponse(["alpha"], 4)); const result = await promise; expect(result.length).toBe(N); // Each input slot got its own embedding — input order preserved expect(result[0]).not.toBeNull(); expect(result[1]).not.toBeNull(); expect(result[2]).not.toBeNull(); expect(result[3]).not.toBeNull(); }); test("dimensions recorded correctly even if the first-resolving chunk is not chunk 0", async () => { const gated = makeGatedFetchSequence(2); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: gated.fetchImpl, batchSize: 1, concurrency: 2, }); const promise = p.embedBatch(["a", "b"]); for (let i = 0; i < 5; i++) await Promise.resolve(); // Resolve chunk 1 first with 7-dim, then chunk 0 with 7-dim gated.release(1, embeddingsResponse(["b"], 7)); for (let i = 0; i < 5; i++) await Promise.resolve(); gated.release(0, embeddingsResponse(["a"], 7)); await promise; expect(p.getDimensions()).toBe(7); }); test("abort signal during concurrent run stops new dispatches; in-flight settle", async () => { const gated = makeGatedFetchSequence(8); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: gated.fetchImpl, batchSize: 1, concurrency: 4, }); const ctrl = new AbortController(); const texts = Array.from({ length: 8 }, (_, i) => `t${i}`); const promise = p.embedBatch(texts, { signal: ctrl.signal }); for (let i = 0; i < 5; i++) await Promise.resolve(); expect(gated.startOrder.length).toBe(4); // Resolve in-flight, then abort — remaining 4 should NOT dispatch gated.release(0, embeddingsResponse(["t0"], 4)); gated.release(1, embeddingsResponse(["t1"], 4)); gated.release(2, embeddingsResponse(["t2"], 4)); gated.release(3, embeddingsResponse(["t3"], 4)); ctrl.abort(new Error("operator cancelled")); for (let i = 0; i < 20; i++) await Promise.resolve(); const result = await promise; // First 4 succeeded, last 4 are null (never dispatched after abort) expect(result.slice(0, 4).every((r) => r !== null)).toBe(true); expect(result.slice(4).every((r) => r === null)).toBe(true); // Total dispatched MUST be ≤ 5 (the abort can race with one extra // worker pulling the next idx before the abort flag is set; we cap // at first 4 + at-most-1 grace). expect(gated.startOrder.length).toBeLessThanOrEqual(5); // Audit string captured expect(p.getLastError()).toMatch(/aborted by caller/); }); test("ctor rejects concurrency < 1", () => { expect(() => new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: (async () => mockResponse(200, {})) as typeof fetch, concurrency: 0, })).toThrow(/concurrency must be ≥ 1/); expect(() => new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl: (async () => mockResponse(200, {})) as typeof fetch, concurrency: -3, })).toThrow(/concurrency must be ≥ 1/); }); test("circuit-open observed mid-run is thrown after in-flight settle", async () => { // 8 chunks, all fail → breaker opens after the first 4 (minSamples=4). // Workers 0-3 dispatch in parallel, all fail, recordFailure × 4 → breaker // OPEN. Remaining workers see shouldFailFast() and set circuitTrippedDuringRun. const N = 8; const { fetchImpl } = makeFetchSequence( Array.from({ length: N }, () => () => mockResponse(401, "fail")) ); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, batchSize: 1, concurrency: 4, retryBackoffsMs: [], sleep: async () => {}, }); // First 4 land before breaker opens (concurrent dispatch); after they all // fail the breaker tips OPEN. The next pull observes shouldFailFast(). // Either the result resolves with all-null (legacy semantics — breaker // tripped AFTER all workers grabbed their chunk) OR throws CircuitOpenError // (breaker observed before next pull). Both are valid post-condition; // we just assert the state ends OPEN and the call completes. let res: Awaited> | undefined; let err: unknown; try { res = await p.embedBatch(Array.from({ length: N }, (_, i) => `t${i}`)); } catch (e) { err = e; } expect(p.breaker.getState()).toBe("open"); if (err) { expect(err).toBeInstanceOf(CircuitOpenError); } else { expect(res!.every((r) => r === null)).toBe(true); } }); }); describe("OpenAIEmbeddingsProvider — dispose", () => { test("dispose resets the breaker", async () => { const { fetchImpl } = makeFetchSequence([ () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), () => mockResponse(401, "fail"), ]); const p = new OpenAIEmbeddingsProvider({ endpoint: "https://ai.example.com", fetchImpl, batchSize: 1, retryBackoffsMs: [], sleep: async () => {}, }); await p.embedBatch(["a", "b", "c", "d"]); expect(p.breaker.getState()).toBe("open"); await p.dispose(); expect(p.breaker.getState()).toBe("closed"); }); });