autofallback.js 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. /**
  2. * autofallback.ts - AutoFallbackEmbeddingProvider.
  3. *
  4. * Composes a primary `EmbeddingProvider` (typically `OpenAIEmbeddingsProvider`)
  5. * and a fallback (typically `LocalLlamaCppProvider`). When the primary trips
  6. * its circuit breaker — or when persistent failures cross a threshold — calls
  7. * are routed to the fallback. After a recovery cooldown, the primary is
  8. * probed again; success closes the breaker and routing returns.
  9. *
  10. * Acceptance criterion 4 from i-qkarfffa: "Endpoint down → fallback local + WARN".
  11. *
  12. * Behavior summary:
  13. * - Primary call succeeds → return; record success.
  14. * - Primary throws CircuitOpenError → fall back, log WARN once per transition.
  15. * - Primary throws any other error → fall back for THIS call only;
  16. * count toward the failure-streak threshold.
  17. * - When failure streak crosses threshold (default 3) → set our own
  18. * "open until" timestamp; until expiry, route directly to fallback
  19. * (skip primary entirely).
  20. * - On expiry, retry primary opportunistically.
  21. * - getModelId / getDimensions / dispose are delegated to whichever
  22. * provider is currently active (or to the primary if both are usable).
  23. */
  24. import { CircuitOpenError } from "./openai.js";
  25. const DEFAULT_FAILURE_STREAK = 3;
  26. const DEFAULT_COOLDOWN_MS = 5 * 60_000;
  27. function defaultWarn(msg) {
  28. process.stderr.write(`${msg}\n`);
  29. }
  30. export class AutoFallbackEmbeddingProvider {
  31. kind;
  32. primary;
  33. fallback;
  34. failureStreakThreshold;
  35. cooldownMs;
  36. warn;
  37. now;
  38. failureStreak = 0;
  39. fallbackUntil = null;
  40. lastTransitionState = "primary";
  41. constructor(config) {
  42. if (!config.primary)
  43. throw new Error("AutoFallbackEmbeddingProvider: primary is required");
  44. if (!config.fallback)
  45. throw new Error("AutoFallbackEmbeddingProvider: fallback is required");
  46. if (config.primary === config.fallback) {
  47. throw new Error("AutoFallbackEmbeddingProvider: primary and fallback must differ");
  48. }
  49. this.primary = config.primary;
  50. this.fallback = config.fallback;
  51. // Inherit the primary's kind for callers introspecting `provider.kind`.
  52. this.kind = config.primary.kind;
  53. this.failureStreakThreshold = config.failureStreakThreshold ?? DEFAULT_FAILURE_STREAK;
  54. this.cooldownMs = config.cooldownMs ?? DEFAULT_COOLDOWN_MS;
  55. this.warn = config.warn ?? defaultWarn;
  56. this.now = config.now ?? Date.now;
  57. }
  58. /**
  59. * Stable model id reported by the primary. The model-id guard runs against
  60. * the primary's id because that's what callers actually want when the
  61. * remote endpoint is online; on fallback-only operation, the local
  62. * provider should report a compatible id (in the default config, both
  63. * report "embeddinggemma" so this is moot).
  64. */
  65. getModelId() {
  66. return this.primary.getModelId();
  67. }
  68. getDimensions() {
  69. return this.primary.getDimensions() ?? this.fallback.getDimensions();
  70. }
  71. /** Current routing state (mostly for tests + observability) */
  72. getRoutingState() {
  73. if (this.fallbackUntil !== null && this.now() < this.fallbackUntil) {
  74. return "fallback";
  75. }
  76. return "primary";
  77. }
  78. /** Reset failure-streak + cooldown (mostly for tests / admin) */
  79. reset() {
  80. this.failureStreak = 0;
  81. this.fallbackUntil = null;
  82. this.transition("primary");
  83. }
  84. async healthcheck(signal) {
  85. // Primary first; if degraded, check fallback so callers can still tell
  86. // whether they have *any* working backend.
  87. const primaryHealth = await this.primary.healthcheck(signal);
  88. if (primaryHealth.ok)
  89. return primaryHealth;
  90. const fallbackHealth = await this.fallback.healthcheck(signal);
  91. return {
  92. ok: fallbackHealth.ok,
  93. model: this.primary.getModelId(),
  94. dimensions: primaryHealth.dimensions ?? fallbackHealth.dimensions,
  95. detail: `primary: ${primaryHealth.detail ?? "fail"} | fallback: ${fallbackHealth.detail ?? (fallbackHealth.ok ? "ok" : "fail")}`,
  96. };
  97. }
  98. async embed(text, options = {}) {
  99. return this.run((p, opts) => p.embed(text, opts), options);
  100. }
  101. async embedBatch(texts, options = {}) {
  102. if (texts.length === 0)
  103. return [];
  104. return this.run((p, opts) => p.embedBatch(texts, opts), options, () => texts.map(() => null));
  105. }
  106. async dispose() {
  107. await Promise.allSettled([this.primary.dispose(), this.fallback.dispose()]);
  108. }
  109. // ────────────────────── Internals ──────────────────────
  110. /**
  111. * Generic dispatcher: try primary if not in cooldown, fall back on
  112. * `CircuitOpenError`, count other errors against the failure streak.
  113. * `op` is invoked with whichever provider is selected.
  114. */
  115. async run(op, options, onTotalFail) {
  116. const inCooldown = this.fallbackUntil !== null && this.now() < this.fallbackUntil;
  117. if (inCooldown) {
  118. // Skip primary entirely
  119. this.transition("fallback");
  120. try {
  121. return await op(this.fallback, options);
  122. }
  123. catch (err) {
  124. if (onTotalFail)
  125. return onTotalFail();
  126. throw err;
  127. }
  128. }
  129. // Try primary first
  130. try {
  131. const result = await op(this.primary, options);
  132. // Success — clear streak and ensure routing reads "primary"
  133. this.failureStreak = 0;
  134. this.fallbackUntil = null;
  135. this.transition("primary");
  136. return result;
  137. }
  138. catch (err) {
  139. if (err instanceof CircuitOpenError) {
  140. // Primary circuit is open — open our own cooldown matching its
  141. // expected duration so subsequent calls skip the primary.
  142. this.openCooldown(`primary CircuitOpenError`);
  143. }
  144. else {
  145. this.failureStreak++;
  146. if (this.failureStreak >= this.failureStreakThreshold) {
  147. this.openCooldown(`primary failure streak ${this.failureStreak} ≥ ${this.failureStreakThreshold}`);
  148. }
  149. }
  150. // Try fallback for THIS call regardless
  151. try {
  152. this.transition("fallback");
  153. return await op(this.fallback, options);
  154. }
  155. catch (fbErr) {
  156. if (onTotalFail)
  157. return onTotalFail();
  158. // Both providers failed — surface the fallback error (the primary
  159. // failure already informed the breaker).
  160. throw fbErr;
  161. }
  162. }
  163. }
  164. openCooldown(reason) {
  165. if (this.fallbackUntil === null || this.now() >= this.fallbackUntil) {
  166. this.fallbackUntil = this.now() + this.cooldownMs;
  167. this.warn(`[AutoFallbackEmbeddingProvider] WARN — falling back to "${this.fallback.kind}" provider for ${Math.round(this.cooldownMs / 1000)}s (reason: ${reason})`);
  168. }
  169. }
  170. transition(to) {
  171. if (this.lastTransitionState === to)
  172. return;
  173. this.lastTransitionState = to;
  174. if (to === "primary") {
  175. this.warn(`[AutoFallbackEmbeddingProvider] WARN — primary "${this.primary.kind}" recovered, routing restored`);
  176. }
  177. // The "fallback" transition WARN is already emitted by openCooldown
  178. // (with a richer message). No second WARN here.
  179. }
  180. }