| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- /**
- * autofallback.ts - AutoFallbackEmbeddingProvider.
- *
- * Composes a primary `EmbeddingProvider` (typically `OpenAIEmbeddingsProvider`)
- * and a fallback (typically `LocalLlamaCppProvider`). When the primary trips
- * its circuit breaker — or when persistent failures cross a threshold — calls
- * are routed to the fallback. After a recovery cooldown, the primary is
- * probed again; success closes the breaker and routing returns.
- *
- * Acceptance criterion 4 from i-qkarfffa: "Endpoint down → fallback local + WARN".
- *
- * Behavior summary:
- * - Primary call succeeds → return; record success.
- * - Primary throws CircuitOpenError → fall back, log WARN once per transition.
- * - Primary throws any other error → fall back for THIS call only;
- * count toward the failure-streak threshold.
- * - When failure streak crosses threshold (default 3) → set our own
- * "open until" timestamp; until expiry, route directly to fallback
- * (skip primary entirely).
- * - On expiry, retry primary opportunistically.
- * - getModelId / getDimensions / dispose are delegated to whichever
- * provider is currently active (or to the primary if both are usable).
- */
- import type {
- EmbeddingProvider,
- ProviderEmbedOptions,
- ProviderEmbedding,
- ProviderHealth,
- ProviderKind,
- } from "./provider.js";
- import { CircuitOpenError } from "./openai.js";
- export type AutoFallbackProviderConfig = {
- primary: EmbeddingProvider;
- fallback: EmbeddingProvider;
- /**
- * Number of consecutive non-CircuitOpenError failures before we suppress
- * primary calls and route directly to fallback. Default: 3.
- */
- failureStreakThreshold?: number;
- /**
- * Time in ms to keep routing through fallback after the breaker opens.
- * Default: 5 minutes (matches `OpenAIEmbeddingsProvider`'s circuit duration).
- */
- cooldownMs?: number;
- /**
- * Optional WARN sink. Defaults to writing to `process.stderr` once per
- * routing transition (closed→open and open→closed).
- */
- warn?: (msg: string) => void;
- /** Custom clock for tests */
- now?: () => number;
- };
- const DEFAULT_FAILURE_STREAK = 3;
- const DEFAULT_COOLDOWN_MS = 5 * 60_000;
- function defaultWarn(msg: string): void {
- process.stderr.write(`${msg}\n`);
- }
- export type FallbackState = "primary" | "fallback";
- export class AutoFallbackEmbeddingProvider implements EmbeddingProvider {
- readonly kind: ProviderKind;
- readonly primary: EmbeddingProvider;
- readonly fallback: EmbeddingProvider;
- private readonly failureStreakThreshold: number;
- private readonly cooldownMs: number;
- private readonly warn: (msg: string) => void;
- private readonly now: () => number;
- private failureStreak = 0;
- private fallbackUntil: number | null = null;
- private lastTransitionState: FallbackState = "primary";
- constructor(config: AutoFallbackProviderConfig) {
- if (!config.primary) throw new Error("AutoFallbackEmbeddingProvider: primary is required");
- if (!config.fallback) throw new Error("AutoFallbackEmbeddingProvider: fallback is required");
- if (config.primary === config.fallback) {
- throw new Error("AutoFallbackEmbeddingProvider: primary and fallback must differ");
- }
- this.primary = config.primary;
- this.fallback = config.fallback;
- // Inherit the primary's kind for callers introspecting `provider.kind`.
- this.kind = config.primary.kind;
- this.failureStreakThreshold = config.failureStreakThreshold ?? DEFAULT_FAILURE_STREAK;
- this.cooldownMs = config.cooldownMs ?? DEFAULT_COOLDOWN_MS;
- this.warn = config.warn ?? defaultWarn;
- this.now = config.now ?? Date.now;
- }
- /**
- * Stable model id reported by the primary. The model-id guard runs against
- * the primary's id because that's what callers actually want when the
- * remote endpoint is online; on fallback-only operation, the local
- * provider should report a compatible id (in the default config, both
- * report "embeddinggemma" so this is moot).
- */
- getModelId(): string {
- return this.primary.getModelId();
- }
- getDimensions(): number | undefined {
- return this.primary.getDimensions() ?? this.fallback.getDimensions();
- }
- /**
- * Combined last-error from primary + fallback. Either, neither, or both legs
- * may have a tracked error after `embed()`/`embedBatch()` runs:
- * - Both clean → undefined
- * - Primary failed, fallback rescued → returns primary error (most useful)
- * - Both failed → returns "primary: <msg> | fallback: <msg>"
- * - Only primary skipped (cooldown), fallback also failed → returns fallback error
- */
- getLastError(): string | undefined {
- const primaryErr = this.primary.getLastError?.();
- const fallbackErr = this.fallback.getLastError?.();
- if (primaryErr && fallbackErr) {
- return `primary: ${primaryErr} | fallback: ${fallbackErr}`;
- }
- return primaryErr ?? fallbackErr;
- }
- /** Current routing state (mostly for tests + observability) */
- getRoutingState(): FallbackState {
- if (this.fallbackUntil !== null && this.now() < this.fallbackUntil) {
- return "fallback";
- }
- return "primary";
- }
- /** Reset failure-streak + cooldown (mostly for tests / admin) */
- reset(): void {
- this.failureStreak = 0;
- this.fallbackUntil = null;
- this.transition("primary");
- }
- async healthcheck(signal?: AbortSignal): Promise<ProviderHealth> {
- // Primary first; if degraded, check fallback so callers can still tell
- // whether they have *any* working backend.
- const primaryHealth = await this.primary.healthcheck(signal);
- if (primaryHealth.ok) return primaryHealth;
- const fallbackHealth = await this.fallback.healthcheck(signal);
- return {
- ok: fallbackHealth.ok,
- model: this.primary.getModelId(),
- dimensions: primaryHealth.dimensions ?? fallbackHealth.dimensions,
- detail:
- `primary: ${primaryHealth.detail ?? "fail"} | fallback: ${fallbackHealth.detail ?? (fallbackHealth.ok ? "ok" : "fail")}`,
- };
- }
- async embed(
- text: string,
- options: ProviderEmbedOptions = {},
- ): Promise<ProviderEmbedding | null> {
- return this.run(
- (p, opts) => p.embed(text, opts),
- options,
- );
- }
- async embedBatch(
- texts: string[],
- options: ProviderEmbedOptions = {},
- ): Promise<(ProviderEmbedding | null)[]> {
- if (texts.length === 0) return [];
- return this.run(
- (p, opts) => p.embedBatch(texts, opts),
- options,
- () => texts.map(() => null),
- );
- }
- async dispose(): Promise<void> {
- await Promise.allSettled([this.primary.dispose(), this.fallback.dispose()]);
- }
- // ────────────────────── Internals ──────────────────────
- /**
- * Generic dispatcher: try primary if not in cooldown, fall back on
- * `CircuitOpenError`, count other errors against the failure streak.
- * `op` is invoked with whichever provider is selected.
- */
- private async run<T>(
- op: (provider: EmbeddingProvider, opts: ProviderEmbedOptions) => Promise<T>,
- options: ProviderEmbedOptions,
- onTotalFail?: () => T,
- ): Promise<T> {
- const inCooldown =
- this.fallbackUntil !== null && this.now() < this.fallbackUntil;
- if (inCooldown) {
- // Skip primary entirely
- this.transition("fallback");
- try {
- return await op(this.fallback, options);
- } catch (err) {
- if (onTotalFail) return onTotalFail();
- throw err;
- }
- }
- // Try primary first
- try {
- const result = await op(this.primary, options);
- // Success — clear streak and ensure routing reads "primary"
- this.failureStreak = 0;
- this.fallbackUntil = null;
- this.transition("primary");
- return result;
- } catch (err) {
- if (err instanceof CircuitOpenError) {
- // Primary circuit is open — open our own cooldown matching its
- // expected duration so subsequent calls skip the primary.
- this.openCooldown(`primary CircuitOpenError`);
- } else {
- this.failureStreak++;
- if (this.failureStreak >= this.failureStreakThreshold) {
- this.openCooldown(
- `primary failure streak ${this.failureStreak} ≥ ${this.failureStreakThreshold}`,
- );
- }
- }
- // Try fallback for THIS call regardless
- try {
- this.transition("fallback");
- return await op(this.fallback, options);
- } catch (fbErr) {
- if (onTotalFail) return onTotalFail();
- // Both providers failed — surface the fallback error (the primary
- // failure already informed the breaker).
- throw fbErr;
- }
- }
- }
- private openCooldown(reason: string): void {
- if (this.fallbackUntil === null || this.now() >= this.fallbackUntil) {
- this.fallbackUntil = this.now() + this.cooldownMs;
- this.warn(
- `[AutoFallbackEmbeddingProvider] WARN — falling back to "${this.fallback.kind}" provider for ${Math.round(this.cooldownMs / 1000)}s (reason: ${reason})`,
- );
- }
- }
- private transition(to: FallbackState): void {
- if (this.lastTransitionState === to) return;
- this.lastTransitionState = to;
- if (to === "primary") {
- this.warn(
- `[AutoFallbackEmbeddingProvider] WARN — primary "${this.primary.kind}" recovered, routing restored`,
- );
- }
- // The "fallback" transition WARN is already emitted by openCooldown
- // (with a richer message). No second WARN here.
- }
- }
|