| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- /**
- * autofallback.ts - AutoFallbackEmbeddingProvider.
- *
- * Composes a primary `EmbeddingProvider` (typically `OpenAIEmbeddingsProvider`)
- * and a fallback (typically `LocalLlamaCppProvider`). When the primary trips
- * its circuit breaker — or when persistent failures cross a threshold — calls
- * are routed to the fallback. After a recovery cooldown, the primary is
- * probed again; success closes the breaker and routing returns.
- *
- * Acceptance criterion 4 from i-qkarfffa: "Endpoint down → fallback local + WARN".
- *
- * Behavior summary:
- * - Primary call succeeds → return; record success.
- * - Primary throws CircuitOpenError → fall back, log WARN once per transition.
- * - Primary throws any other error → fall back for THIS call only;
- * count toward the failure-streak threshold.
- * - When failure streak crosses threshold (default 3) → set our own
- * "open until" timestamp; until expiry, route directly to fallback
- * (skip primary entirely).
- * - On expiry, retry primary opportunistically.
- * - getModelId / getDimensions / dispose are delegated to whichever
- * provider is currently active (or to the primary if both are usable).
- */
- import { CircuitOpenError } from "./openai.js";
- const DEFAULT_FAILURE_STREAK = 3;
- const DEFAULT_COOLDOWN_MS = 5 * 60_000;
- function defaultWarn(msg) {
- process.stderr.write(`${msg}\n`);
- }
- export class AutoFallbackEmbeddingProvider {
- kind;
- primary;
- fallback;
- failureStreakThreshold;
- cooldownMs;
- warn;
- now;
- failureStreak = 0;
- fallbackUntil = null;
- lastTransitionState = "primary";
- constructor(config) {
- if (!config.primary)
- throw new Error("AutoFallbackEmbeddingProvider: primary is required");
- if (!config.fallback)
- throw new Error("AutoFallbackEmbeddingProvider: fallback is required");
- if (config.primary === config.fallback) {
- throw new Error("AutoFallbackEmbeddingProvider: primary and fallback must differ");
- }
- this.primary = config.primary;
- this.fallback = config.fallback;
- // Inherit the primary's kind for callers introspecting `provider.kind`.
- this.kind = config.primary.kind;
- this.failureStreakThreshold = config.failureStreakThreshold ?? DEFAULT_FAILURE_STREAK;
- this.cooldownMs = config.cooldownMs ?? DEFAULT_COOLDOWN_MS;
- this.warn = config.warn ?? defaultWarn;
- this.now = config.now ?? Date.now;
- }
- /**
- * Stable model id reported by the primary. The model-id guard runs against
- * the primary's id because that's what callers actually want when the
- * remote endpoint is online; on fallback-only operation, the local
- * provider should report a compatible id (in the default config, both
- * report "embeddinggemma" so this is moot).
- */
- getModelId() {
- return this.primary.getModelId();
- }
- getDimensions() {
- return this.primary.getDimensions() ?? this.fallback.getDimensions();
- }
- /**
- * Combined last-error from primary + fallback. Either, neither, or both legs
- * may have a tracked error after `embed()`/`embedBatch()` runs:
- * - Both clean → undefined
- * - Primary failed, fallback rescued → returns primary error (most useful)
- * - Both failed → returns "primary: <msg> | fallback: <msg>"
- * - Only primary skipped (cooldown), fallback also failed → returns fallback error
- */
- getLastError() {
- const primaryErr = this.primary.getLastError?.();
- const fallbackErr = this.fallback.getLastError?.();
- if (primaryErr && fallbackErr) {
- return `primary: ${primaryErr} | fallback: ${fallbackErr}`;
- }
- return primaryErr ?? fallbackErr;
- }
- /** Current routing state (mostly for tests + observability) */
- getRoutingState() {
- if (this.fallbackUntil !== null && this.now() < this.fallbackUntil) {
- return "fallback";
- }
- return "primary";
- }
- /** Reset failure-streak + cooldown (mostly for tests / admin) */
- reset() {
- this.failureStreak = 0;
- this.fallbackUntil = null;
- this.transition("primary");
- }
- async healthcheck(signal) {
- // Primary first; if degraded, check fallback so callers can still tell
- // whether they have *any* working backend.
- const primaryHealth = await this.primary.healthcheck(signal);
- if (primaryHealth.ok)
- return primaryHealth;
- const fallbackHealth = await this.fallback.healthcheck(signal);
- return {
- ok: fallbackHealth.ok,
- model: this.primary.getModelId(),
- dimensions: primaryHealth.dimensions ?? fallbackHealth.dimensions,
- detail: `primary: ${primaryHealth.detail ?? "fail"} | fallback: ${fallbackHealth.detail ?? (fallbackHealth.ok ? "ok" : "fail")}`,
- };
- }
- async embed(text, options = {}) {
- return this.run((p, opts) => p.embed(text, opts), options);
- }
- async embedBatch(texts, options = {}) {
- if (texts.length === 0)
- return [];
- return this.run((p, opts) => p.embedBatch(texts, opts), options, () => texts.map(() => null));
- }
- async dispose() {
- await Promise.allSettled([this.primary.dispose(), this.fallback.dispose()]);
- }
- // ────────────────────── Internals ──────────────────────
- /**
- * Generic dispatcher: try primary if not in cooldown, fall back on
- * `CircuitOpenError`, count other errors against the failure streak.
- * `op` is invoked with whichever provider is selected.
- */
- async run(op, options, onTotalFail) {
- const inCooldown = this.fallbackUntil !== null && this.now() < this.fallbackUntil;
- if (inCooldown) {
- // Skip primary entirely
- this.transition("fallback");
- try {
- return await op(this.fallback, options);
- }
- catch (err) {
- if (onTotalFail)
- return onTotalFail();
- throw err;
- }
- }
- // Try primary first
- try {
- const result = await op(this.primary, options);
- // Success — clear streak and ensure routing reads "primary"
- this.failureStreak = 0;
- this.fallbackUntil = null;
- this.transition("primary");
- return result;
- }
- catch (err) {
- if (err instanceof CircuitOpenError) {
- // Primary circuit is open — open our own cooldown matching its
- // expected duration so subsequent calls skip the primary.
- this.openCooldown(`primary CircuitOpenError`);
- }
- else {
- this.failureStreak++;
- if (this.failureStreak >= this.failureStreakThreshold) {
- this.openCooldown(`primary failure streak ${this.failureStreak} ≥ ${this.failureStreakThreshold}`);
- }
- }
- // Try fallback for THIS call regardless
- try {
- this.transition("fallback");
- return await op(this.fallback, options);
- }
- catch (fbErr) {
- if (onTotalFail)
- return onTotalFail();
- // Both providers failed — surface the fallback error (the primary
- // failure already informed the breaker).
- throw fbErr;
- }
- }
- }
- openCooldown(reason) {
- if (this.fallbackUntil === null || this.now() >= this.fallbackUntil) {
- this.fallbackUntil = this.now() + this.cooldownMs;
- this.warn(`[AutoFallbackEmbeddingProvider] WARN — falling back to "${this.fallback.kind}" provider for ${Math.round(this.cooldownMs / 1000)}s (reason: ${reason})`);
- }
- }
- transition(to) {
- if (this.lastTransitionState === to)
- return;
- this.lastTransitionState = to;
- if (to === "primary") {
- this.warn(`[AutoFallbackEmbeddingProvider] WARN — primary "${this.primary.kind}" recovered, routing restored`);
- }
- // The "fallback" transition WARN is already emitted by openCooldown
- // (with a richer message). No second WARN here.
- }
- }
|