autofallback.ts 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. /**
  2. * autofallback.ts - AutoFallbackEmbeddingProvider.
  3. *
  4. * Composes a primary `EmbeddingProvider` (typically `OpenAIEmbeddingsProvider`)
  5. * and a fallback (typically `LocalLlamaCppProvider`). When the primary trips
  6. * its circuit breaker — or when persistent failures cross a threshold — calls
  7. * are routed to the fallback. After a recovery cooldown, the primary is
  8. * probed again; success closes the breaker and routing returns.
  9. *
  10. * Acceptance criterion 4 from i-qkarfffa: "Endpoint down → fallback local + WARN".
  11. *
  12. * Behavior summary:
  13. * - Primary call succeeds → return; record success.
  14. * - Primary throws CircuitOpenError → fall back, log WARN once per transition.
  15. * - Primary throws any other error → fall back for THIS call only;
  16. * count toward the failure-streak threshold.
  17. * - When failure streak crosses threshold (default 3) → set our own
  18. * "open until" timestamp; until expiry, route directly to fallback
  19. * (skip primary entirely).
  20. * - On expiry, retry primary opportunistically.
  21. * - getModelId / getDimensions / dispose are delegated to whichever
  22. * provider is currently active (or to the primary if both are usable).
  23. */
  24. import type {
  25. EmbeddingProvider,
  26. ProviderEmbedOptions,
  27. ProviderEmbedding,
  28. ProviderHealth,
  29. ProviderKind,
  30. } from "./provider.js";
  31. import { CircuitOpenError } from "./openai.js";
  32. export type AutoFallbackProviderConfig = {
  33. primary: EmbeddingProvider;
  34. fallback: EmbeddingProvider;
  35. /**
  36. * Number of consecutive non-CircuitOpenError failures before we suppress
  37. * primary calls and route directly to fallback. Default: 3.
  38. */
  39. failureStreakThreshold?: number;
  40. /**
  41. * Time in ms to keep routing through fallback after the breaker opens.
  42. * Default: 5 minutes (matches `OpenAIEmbeddingsProvider`'s circuit duration).
  43. */
  44. cooldownMs?: number;
  45. /**
  46. * Optional WARN sink. Defaults to writing to `process.stderr` once per
  47. * routing transition (closed→open and open→closed).
  48. */
  49. warn?: (msg: string) => void;
  50. /** Custom clock for tests */
  51. now?: () => number;
  52. };
  53. const DEFAULT_FAILURE_STREAK = 3;
  54. const DEFAULT_COOLDOWN_MS = 5 * 60_000;
  55. function defaultWarn(msg: string): void {
  56. process.stderr.write(`${msg}\n`);
  57. }
  58. export type FallbackState = "primary" | "fallback";
  59. export class AutoFallbackEmbeddingProvider implements EmbeddingProvider {
  60. readonly kind: ProviderKind;
  61. readonly primary: EmbeddingProvider;
  62. readonly fallback: EmbeddingProvider;
  63. private readonly failureStreakThreshold: number;
  64. private readonly cooldownMs: number;
  65. private readonly warn: (msg: string) => void;
  66. private readonly now: () => number;
  67. private failureStreak = 0;
  68. private fallbackUntil: number | null = null;
  69. private lastTransitionState: FallbackState = "primary";
  70. constructor(config: AutoFallbackProviderConfig) {
  71. if (!config.primary) throw new Error("AutoFallbackEmbeddingProvider: primary is required");
  72. if (!config.fallback) throw new Error("AutoFallbackEmbeddingProvider: fallback is required");
  73. if (config.primary === config.fallback) {
  74. throw new Error("AutoFallbackEmbeddingProvider: primary and fallback must differ");
  75. }
  76. this.primary = config.primary;
  77. this.fallback = config.fallback;
  78. // Inherit the primary's kind for callers introspecting `provider.kind`.
  79. this.kind = config.primary.kind;
  80. this.failureStreakThreshold = config.failureStreakThreshold ?? DEFAULT_FAILURE_STREAK;
  81. this.cooldownMs = config.cooldownMs ?? DEFAULT_COOLDOWN_MS;
  82. this.warn = config.warn ?? defaultWarn;
  83. this.now = config.now ?? Date.now;
  84. }
  85. /**
  86. * Stable model id reported by the primary. The model-id guard runs against
  87. * the primary's id because that's what callers actually want when the
  88. * remote endpoint is online; on fallback-only operation, the local
  89. * provider should report a compatible id (in the default config, both
  90. * report "embeddinggemma" so this is moot).
  91. */
  92. getModelId(): string {
  93. return this.primary.getModelId();
  94. }
  95. getDimensions(): number | undefined {
  96. return this.primary.getDimensions() ?? this.fallback.getDimensions();
  97. }
  98. /**
  99. * Combined last-error from primary + fallback. Either, neither, or both legs
  100. * may have a tracked error after `embed()`/`embedBatch()` runs:
  101. * - Both clean → undefined
  102. * - Primary failed, fallback rescued → returns primary error (most useful)
  103. * - Both failed → returns "primary: <msg> | fallback: <msg>"
  104. * - Only primary skipped (cooldown), fallback also failed → returns fallback error
  105. */
  106. getLastError(): string | undefined {
  107. const primaryErr = this.primary.getLastError?.();
  108. const fallbackErr = this.fallback.getLastError?.();
  109. if (primaryErr && fallbackErr) {
  110. return `primary: ${primaryErr} | fallback: ${fallbackErr}`;
  111. }
  112. return primaryErr ?? fallbackErr;
  113. }
  114. /** Current routing state (mostly for tests + observability) */
  115. getRoutingState(): FallbackState {
  116. if (this.fallbackUntil !== null && this.now() < this.fallbackUntil) {
  117. return "fallback";
  118. }
  119. return "primary";
  120. }
  121. /** Reset failure-streak + cooldown (mostly for tests / admin) */
  122. reset(): void {
  123. this.failureStreak = 0;
  124. this.fallbackUntil = null;
  125. this.transition("primary");
  126. }
  127. async healthcheck(signal?: AbortSignal): Promise<ProviderHealth> {
  128. // Primary first; if degraded, check fallback so callers can still tell
  129. // whether they have *any* working backend.
  130. const primaryHealth = await this.primary.healthcheck(signal);
  131. if (primaryHealth.ok) return primaryHealth;
  132. const fallbackHealth = await this.fallback.healthcheck(signal);
  133. return {
  134. ok: fallbackHealth.ok,
  135. model: this.primary.getModelId(),
  136. dimensions: primaryHealth.dimensions ?? fallbackHealth.dimensions,
  137. detail:
  138. `primary: ${primaryHealth.detail ?? "fail"} | fallback: ${fallbackHealth.detail ?? (fallbackHealth.ok ? "ok" : "fail")}`,
  139. };
  140. }
  141. async embed(
  142. text: string,
  143. options: ProviderEmbedOptions = {},
  144. ): Promise<ProviderEmbedding | null> {
  145. return this.run(
  146. (p, opts) => p.embed(text, opts),
  147. options,
  148. );
  149. }
  150. async embedBatch(
  151. texts: string[],
  152. options: ProviderEmbedOptions = {},
  153. ): Promise<(ProviderEmbedding | null)[]> {
  154. if (texts.length === 0) return [];
  155. return this.run(
  156. (p, opts) => p.embedBatch(texts, opts),
  157. options,
  158. () => texts.map(() => null),
  159. );
  160. }
  161. async dispose(): Promise<void> {
  162. await Promise.allSettled([this.primary.dispose(), this.fallback.dispose()]);
  163. }
  164. // ────────────────────── Internals ──────────────────────
  165. /**
  166. * Generic dispatcher: try primary if not in cooldown, fall back on
  167. * `CircuitOpenError`, count other errors against the failure streak.
  168. * `op` is invoked with whichever provider is selected.
  169. */
  170. private async run<T>(
  171. op: (provider: EmbeddingProvider, opts: ProviderEmbedOptions) => Promise<T>,
  172. options: ProviderEmbedOptions,
  173. onTotalFail?: () => T,
  174. ): Promise<T> {
  175. const inCooldown =
  176. this.fallbackUntil !== null && this.now() < this.fallbackUntil;
  177. if (inCooldown) {
  178. // Skip primary entirely
  179. this.transition("fallback");
  180. try {
  181. return await op(this.fallback, options);
  182. } catch (err) {
  183. if (onTotalFail) return onTotalFail();
  184. throw err;
  185. }
  186. }
  187. // Try primary first
  188. try {
  189. const result = await op(this.primary, options);
  190. // Success — clear streak and ensure routing reads "primary"
  191. this.failureStreak = 0;
  192. this.fallbackUntil = null;
  193. this.transition("primary");
  194. return result;
  195. } catch (err) {
  196. if (err instanceof CircuitOpenError) {
  197. // Primary circuit is open — open our own cooldown matching its
  198. // expected duration so subsequent calls skip the primary.
  199. this.openCooldown(`primary CircuitOpenError`);
  200. } else {
  201. this.failureStreak++;
  202. if (this.failureStreak >= this.failureStreakThreshold) {
  203. this.openCooldown(
  204. `primary failure streak ${this.failureStreak} ≥ ${this.failureStreakThreshold}`,
  205. );
  206. }
  207. }
  208. // Try fallback for THIS call regardless
  209. try {
  210. this.transition("fallback");
  211. return await op(this.fallback, options);
  212. } catch (fbErr) {
  213. if (onTotalFail) return onTotalFail();
  214. // Both providers failed — surface the fallback error (the primary
  215. // failure already informed the breaker).
  216. throw fbErr;
  217. }
  218. }
  219. }
  220. private openCooldown(reason: string): void {
  221. if (this.fallbackUntil === null || this.now() >= this.fallbackUntil) {
  222. this.fallbackUntil = this.now() + this.cooldownMs;
  223. this.warn(
  224. `[AutoFallbackEmbeddingProvider] WARN — falling back to "${this.fallback.kind}" provider for ${Math.round(this.cooldownMs / 1000)}s (reason: ${reason})`,
  225. );
  226. }
  227. }
  228. private transition(to: FallbackState): void {
  229. if (this.lastTransitionState === to) return;
  230. this.lastTransitionState = to;
  231. if (to === "primary") {
  232. this.warn(
  233. `[AutoFallbackEmbeddingProvider] WARN — primary "${this.primary.kind}" recovered, routing restored`,
  234. );
  235. }
  236. // The "fallback" transition WARN is already emitted by openCooldown
  237. // (with a richer message). No second WARN here.
  238. }
  239. }