/** * autofallback.ts - AutoFallbackEmbeddingProvider. * * Composes a primary `EmbeddingProvider` (typically `OpenAIEmbeddingsProvider`) * and a fallback (typically `LocalLlamaCppProvider`). When the primary trips * its circuit breaker — or when persistent failures cross a threshold — calls * are routed to the fallback. After a recovery cooldown, the primary is * probed again; success closes the breaker and routing returns. * * Acceptance criterion 4 from i-qkarfffa: "Endpoint down → fallback local + WARN". * * Behavior summary: * - Primary call succeeds → return; record success. * - Primary throws CircuitOpenError → fall back, log WARN once per transition. * - Primary throws any other error → fall back for THIS call only; * count toward the failure-streak threshold. * - When failure streak crosses threshold (default 3) → set our own * "open until" timestamp; until expiry, route directly to fallback * (skip primary entirely). * - On expiry, retry primary opportunistically. * - getModelId / getDimensions / dispose are delegated to whichever * provider is currently active (or to the primary if both are usable). */ import type { EmbeddingProvider, ProviderEmbedOptions, ProviderEmbedding, ProviderHealth, ProviderKind, } from "./provider.js"; import { CircuitOpenError } from "./openai.js"; export type AutoFallbackProviderConfig = { primary: EmbeddingProvider; fallback: EmbeddingProvider; /** * Number of consecutive non-CircuitOpenError failures before we suppress * primary calls and route directly to fallback. Default: 3. */ failureStreakThreshold?: number; /** * Time in ms to keep routing through fallback after the breaker opens. * Default: 5 minutes (matches `OpenAIEmbeddingsProvider`'s circuit duration). */ cooldownMs?: number; /** * Optional WARN sink. Defaults to writing to `process.stderr` once per * routing transition (closed→open and open→closed). */ warn?: (msg: string) => void; /** Custom clock for tests */ now?: () => number; }; const DEFAULT_FAILURE_STREAK = 3; const DEFAULT_COOLDOWN_MS = 5 * 60_000; function defaultWarn(msg: string): void { process.stderr.write(`${msg}\n`); } export type FallbackState = "primary" | "fallback"; export class AutoFallbackEmbeddingProvider implements EmbeddingProvider { readonly kind: ProviderKind; readonly primary: EmbeddingProvider; readonly fallback: EmbeddingProvider; private readonly failureStreakThreshold: number; private readonly cooldownMs: number; private readonly warn: (msg: string) => void; private readonly now: () => number; private failureStreak = 0; private fallbackUntil: number | null = null; private lastTransitionState: FallbackState = "primary"; constructor(config: AutoFallbackProviderConfig) { if (!config.primary) throw new Error("AutoFallbackEmbeddingProvider: primary is required"); if (!config.fallback) throw new Error("AutoFallbackEmbeddingProvider: fallback is required"); if (config.primary === config.fallback) { throw new Error("AutoFallbackEmbeddingProvider: primary and fallback must differ"); } this.primary = config.primary; this.fallback = config.fallback; // Inherit the primary's kind for callers introspecting `provider.kind`. this.kind = config.primary.kind; this.failureStreakThreshold = config.failureStreakThreshold ?? DEFAULT_FAILURE_STREAK; this.cooldownMs = config.cooldownMs ?? DEFAULT_COOLDOWN_MS; this.warn = config.warn ?? defaultWarn; this.now = config.now ?? Date.now; } /** * Stable model id reported by the primary. The model-id guard runs against * the primary's id because that's what callers actually want when the * remote endpoint is online; on fallback-only operation, the local * provider should report a compatible id (in the default config, both * report "embeddinggemma" so this is moot). */ getModelId(): string { return this.primary.getModelId(); } getDimensions(): number | undefined { return this.primary.getDimensions() ?? this.fallback.getDimensions(); } /** * Combined last-error from primary + fallback. Either, neither, or both legs * may have a tracked error after `embed()`/`embedBatch()` runs: * - Both clean → undefined * - Primary failed, fallback rescued → returns primary error (most useful) * - Both failed → returns "primary: | fallback: " * - Only primary skipped (cooldown), fallback also failed → returns fallback error */ getLastError(): string | undefined { const primaryErr = this.primary.getLastError?.(); const fallbackErr = this.fallback.getLastError?.(); if (primaryErr && fallbackErr) { return `primary: ${primaryErr} | fallback: ${fallbackErr}`; } return primaryErr ?? fallbackErr; } /** Current routing state (mostly for tests + observability) */ getRoutingState(): FallbackState { if (this.fallbackUntil !== null && this.now() < this.fallbackUntil) { return "fallback"; } return "primary"; } /** Reset failure-streak + cooldown (mostly for tests / admin) */ reset(): void { this.failureStreak = 0; this.fallbackUntil = null; this.transition("primary"); } async healthcheck(signal?: AbortSignal): Promise { // Primary first; if degraded, check fallback so callers can still tell // whether they have *any* working backend. const primaryHealth = await this.primary.healthcheck(signal); if (primaryHealth.ok) return primaryHealth; const fallbackHealth = await this.fallback.healthcheck(signal); return { ok: fallbackHealth.ok, model: this.primary.getModelId(), dimensions: primaryHealth.dimensions ?? fallbackHealth.dimensions, detail: `primary: ${primaryHealth.detail ?? "fail"} | fallback: ${fallbackHealth.detail ?? (fallbackHealth.ok ? "ok" : "fail")}`, }; } async embed( text: string, options: ProviderEmbedOptions = {}, ): Promise { return this.run( (p, opts) => p.embed(text, opts), options, ); } async embedBatch( texts: string[], options: ProviderEmbedOptions = {}, ): Promise<(ProviderEmbedding | null)[]> { if (texts.length === 0) return []; return this.run( (p, opts) => p.embedBatch(texts, opts), options, () => texts.map(() => null), ); } async dispose(): Promise { await Promise.allSettled([this.primary.dispose(), this.fallback.dispose()]); } // ────────────────────── Internals ────────────────────── /** * Generic dispatcher: try primary if not in cooldown, fall back on * `CircuitOpenError`, count other errors against the failure streak. * `op` is invoked with whichever provider is selected. */ private async run( op: (provider: EmbeddingProvider, opts: ProviderEmbedOptions) => Promise, options: ProviderEmbedOptions, onTotalFail?: () => T, ): Promise { const inCooldown = this.fallbackUntil !== null && this.now() < this.fallbackUntil; if (inCooldown) { // Skip primary entirely this.transition("fallback"); try { return await op(this.fallback, options); } catch (err) { if (onTotalFail) return onTotalFail(); throw err; } } // Try primary first try { const result = await op(this.primary, options); // Success — clear streak and ensure routing reads "primary" this.failureStreak = 0; this.fallbackUntil = null; this.transition("primary"); return result; } catch (err) { if (err instanceof CircuitOpenError) { // Primary circuit is open — open our own cooldown matching its // expected duration so subsequent calls skip the primary. this.openCooldown(`primary CircuitOpenError`); } else { this.failureStreak++; if (this.failureStreak >= this.failureStreakThreshold) { this.openCooldown( `primary failure streak ${this.failureStreak} ≥ ${this.failureStreakThreshold}`, ); } } // Try fallback for THIS call regardless try { this.transition("fallback"); return await op(this.fallback, options); } catch (fbErr) { if (onTotalFail) return onTotalFail(); // Both providers failed — surface the fallback error (the primary // failure already informed the breaker). throw fbErr; } } } private openCooldown(reason: string): void { if (this.fallbackUntil === null || this.now() >= this.fallbackUntil) { this.fallbackUntil = this.now() + this.cooldownMs; this.warn( `[AutoFallbackEmbeddingProvider] WARN — falling back to "${this.fallback.kind}" provider for ${Math.round(this.cooldownMs / 1000)}s (reason: ${reason})`, ); } } private transition(to: FallbackState): void { if (this.lastTransitionState === to) return; this.lastTransitionState = to; if (to === "primary") { this.warn( `[AutoFallbackEmbeddingProvider] WARN — primary "${this.primary.kind}" recovered, routing restored`, ); } // The "fallback" transition WARN is already emitted by openCooldown // (with a richer message). No second WARN here. } }