Quellcode durchsuchen

perf(embed): 4-way concurrent dispatch + larger batches + per-batch txn (i-fkpnar9i)

Phase 1 quick-win stack from i-fkpnar9i baseline analysis. Targets the
6-31 min runtime observed on small embed batches by removing the three
client-side bottlenecks identified in the Phase 0 audit
(/srv/.oivo/audits/i-fkpnar9i-qmd-embed-runtime/baseline.md):

#1 Concurrent dispatcher in OpenAIEmbeddingsProvider.embedBatch
   - Replace the sequential for-of-chunks await loop with an N-worker
     pull-from-queue dispatcher (default N=4 = qmd-embed-worker semaphore).
   - Workers race on a shared `nextChunkIdx` cursor; each writes its
     embeddings into a pre-computed result-slot index so input order is
     preserved end-to-end without a final re-sort.
   - Circuit-breaker shouldFailFast is checked per-pull. If the breaker
     trips mid-run, a CircuitOpenError is captured and thrown AFTER all
     in-flight workers settle (no half-completed leaks).
   - Abort signal is checked per-pull; in-flight workers complete, no new
     dispatches.
   - Dimension-on-first-success race is benign — workers all observe the
     same length.
   - lastError uses last-write-wins (matches legacy semantics under
     concurrency). Cleared on fully-successful sweep.
   - Configurable: `concurrency` config opt + `QMD_EMBED_CONCURRENCY` env.
     Set to 1 to revert to legacy sequential dispatch.

#2 Bump generateEmbeddings inner BATCH_SIZE 32 → 256
   - The qmd CLI loop in generateEmbeddings calls embedMany with
     `BATCH_SIZE` chunks at a time. With BATCH_SIZE=32 and openai
     batchSize=64, embedBatch only ever saw 1 sub-chunk — so #1's
     concurrency dispatcher had nothing to parallelize.
   - 256 chunks per slice splits into 4 sub-chunks of 64 (worker
     MAX_BATCH=64), letting the dispatcher saturate the worker semaphore.
   - Configurable: `QMD_EMBED_INNER_BATCH_SIZE` env override.

#3 Wrap insertEmbedding loop in db.transaction (BEGIN IMMEDIATE / COMMIT)
   - Every successful embedMany resolves with N embeddings. Previously
     each insertEmbedding call auto-committed (1 WAL fsync per row × 256
     rows per batch = lots of fsync). Now wrapped in better-sqlite3's
     db.transaction(fn) so 256 inserts share one BEGIN/COMMIT.
   - Fallback per-chunk path (after batch embed throws) is intentionally
     NOT wrapped — keeps individual auto-commits so one bad chunk doesn't
     drag down others.
   - Database interface in src/db.ts extended with the `transaction`
     method to keep the call-site cast-free.

Estimated impact (from baseline.md):
  9544-chunk run: 31 min → ~9 min (~3.5x speedup)
  Effective inputs/sec:   4.5 → ~18 (4-way concurrent worker saturation)

Tests added (10 new in test/embedding-openai.test.ts under
"OpenAIEmbeddingsProvider — concurrent dispatch (i-fkpnar9i)" describe):
  - default concurrency=4 with N>workers → max 4 in flight
  - explicit concurrency=2 → only 2 in flight
  - concurrency=1 → legacy sequential (regression baseline)
  - result order preserved when last chunk resolves first
  - dimensions recorded correctly when chunk-1 resolves before chunk-0
  - abort signal stops new dispatches; in-flight settle
  - constructor rejects concurrency < 1
  - circuit-open mid-run throws after in-flight settle

Verification (PHASE-1 IS Phase 1; concurrency stack is fully unit-tested):
  - npm run build clean
  - 256/256 tests pass across all touched files
    (embedding-openai, embedding-factory, embedding-store-integration,
    embedding-vsearch, lock-contention, mcp, store.helpers.unit, store-paths)
  - Pre-existing 4 LlamaCpp-Integration timeouts in store.test.ts are
    environmental (need local node-llama-cpp for expandQuery/rerank
    paths I do NOT touch); independently confirmed they hit the 30s
    vitest timeout regardless of these changes.

Resolves: i-fkpnar9i (Phase 1)
Session-Id: df47f0a2
Claude vor 2 Wochen
Ursprung
Commit
f107c8a0aa

+ 7 - 0
dist/db.d.ts

@@ -23,6 +23,13 @@ export interface Database {
     prepare(sql: string): Statement;
     loadExtension(path: string): void;
     close(): void;
+    /**
+     * Wrap a synchronous function in a SQLite transaction. better-sqlite3 opens
+     * `BEGIN IMMEDIATE` on entry and `COMMIT` on return; on throw it rolls back
+     * AND re-throws. bun:sqlite has the same shape. Used by `generateEmbeddings`
+     * to batch per-row INSERTs into a single WAL fsync (i-fkpnar9i).
+     */
+    transaction<T extends unknown[], R>(fn: (...args: T) => R): (...args: T) => R;
 }
 export interface Statement {
     run(...params: any[]): {

+ 6 - 0
dist/embedding/factory.d.ts

@@ -23,6 +23,12 @@ export type EmbedProviderConfigFile = {
         modelId?: string;
         upstreamModel?: string;
         batchSize?: number;
+        /**
+         * Max in-flight HTTP requests during a single `embedBatch` call. Default 4
+         * (matches qmd-embed-worker's MAX_CONCURRENT_REQUESTS=4 semaphore). Set
+         * to 1 to force legacy sequential dispatch.
+         */
+        concurrency?: number;
         timeoutMs?: number;
         /** When true, wrap the openai provider in AutoFallback (local fallback). */
         autoFallback?: boolean;

+ 4 - 0
dist/embedding/factory.js

@@ -104,12 +104,16 @@ export function createEmbeddingProvider(opts = {}) {
     const timeoutMsRaw = opts.openai?.timeoutMs ??
         parsePositiveInt(env.QMD_EMBED_TIMEOUT_MS) ??
         cfg.embedProvider?.timeoutMs;
+    const concurrencyRaw = opts.openai?.concurrency ??
+        parsePositiveInt(env.QMD_EMBED_CONCURRENCY) ??
+        cfg.embedProvider?.concurrency;
     const openaiProvider = new OpenAIEmbeddingsProvider({
         endpoint,
         apiKey,
         modelId,
         upstreamModel,
         batchSize: batchSizeRaw,
+        concurrency: concurrencyRaw,
         timeoutMs: timeoutMsRaw,
         fetchImpl: opts.openai?.fetchImpl,
         retryBackoffsMs: opts.openai?.retryBackoffsMs,

+ 16 - 0
dist/embedding/openai.d.ts

@@ -24,6 +24,15 @@ import type { EmbeddingProvider, ProviderEmbedOptions, ProviderEmbedding, Provid
  * 2048 inputs per call but for memory and latency we cap at 64.
  */
 export declare const DEFAULT_BATCH_SIZE = 64;
+/**
+ * Default in-flight concurrency cap for `embedBatch`. The qmd-embed-worker
+ * exposes a 4-way semaphore (`MAX_CONCURRENT_REQUESTS=4`) and idles at
+ * queue-depth 1.0 under sequential clients (i-fkpnar9i baseline). Defaulting
+ * to 4 matches the worker's advertised concurrency without overshooting the
+ * GPU. Override per-deploy via `QMD_EMBED_CONCURRENCY`. Setting to 1 reverts
+ * to the legacy sequential dispatch.
+ */
+export declare const DEFAULT_CONCURRENCY = 4;
 /**
  * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes
  * <500ms per batch of 64 in practice; 30s is a safe upper bound.
@@ -59,6 +68,12 @@ export type OpenAIProviderConfig = {
     upstreamModel?: string;
     /** Batch size cap (default DEFAULT_BATCH_SIZE = 64) */
     batchSize?: number;
+    /**
+     * Max in-flight HTTP requests during a single `embedBatch` call. Default
+     * `DEFAULT_CONCURRENCY=4` matches the worker semaphore. Set to 1 to force
+     * legacy sequential dispatch (useful for benchmarks / regression bisect).
+     */
+    concurrency?: number;
     /** Per-request timeout in ms (default DEFAULT_TIMEOUT_MS = 30_000) */
     timeoutMs?: number;
     /** Custom fetch (for testing). Defaults to global `fetch`. */
@@ -156,6 +171,7 @@ export declare class OpenAIEmbeddingsProvider implements EmbeddingProvider {
     private readonly modelId;
     private readonly upstreamModel;
     private readonly batchSize;
+    private readonly concurrency;
     private readonly timeoutMs;
     private readonly fetchImpl;
     private readonly retryBackoffsMs;

+ 96 - 43
dist/embedding/openai.js

@@ -24,6 +24,15 @@
  * 2048 inputs per call but for memory and latency we cap at 64.
  */
 export const DEFAULT_BATCH_SIZE = 64;
+/**
+ * Default in-flight concurrency cap for `embedBatch`. The qmd-embed-worker
+ * exposes a 4-way semaphore (`MAX_CONCURRENT_REQUESTS=4`) and idles at
+ * queue-depth 1.0 under sequential clients (i-fkpnar9i baseline). Defaulting
+ * to 4 matches the worker's advertised concurrency without overshooting the
+ * GPU. Override per-deploy via `QMD_EMBED_CONCURRENCY`. Setting to 1 reverts
+ * to the legacy sequential dispatch.
+ */
+export const DEFAULT_CONCURRENCY = 4;
 /**
  * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes
  * <500ms per batch of 64 in practice; 30s is a safe upper bound.
@@ -225,6 +234,7 @@ export class OpenAIEmbeddingsProvider {
     modelId;
     upstreamModel;
     batchSize;
+    concurrency;
     timeoutMs;
     fetchImpl;
     retryBackoffsMs;
@@ -242,6 +252,7 @@ export class OpenAIEmbeddingsProvider {
         this.modelId = config.modelId ?? "embeddinggemma";
         this.upstreamModel = config.upstreamModel ?? this.modelId;
         this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE;
+        this.concurrency = config.concurrency ?? DEFAULT_CONCURRENCY;
         this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
         this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
         this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS;
@@ -255,6 +266,9 @@ export class OpenAIEmbeddingsProvider {
         if (this.batchSize < 1) {
             throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`);
         }
+        if (this.concurrency < 1) {
+            throw new Error(`OpenAIEmbeddingsProvider: concurrency must be ≥ 1, got ${this.concurrency}`);
+        }
     }
     getModelId() {
         return this.modelId;
@@ -344,55 +358,94 @@ export class OpenAIEmbeddingsProvider {
         }
         const chunks = chunkArray(texts, this.batchSize);
         const results = new Array(texts.length).fill(null);
-        let cursor = 0;
-        let anySucceeded = false;
-        for (const chunk of chunks) {
-            const start = cursor;
-            cursor += chunk.length;
-            // Abort early if signal already fired
-            if (options.signal?.aborted) {
-                // Leave remaining slots as null (caller treats as errors)
-                this.lastError = `aborted by caller${options.signal.reason ? `: ${String(options.signal.reason)}` : ""}`;
-                return results;
+        // Pre-compute the input-array starting position for each chunk so each
+        // worker can write its slice of `results` independently — input order is
+        // preserved end-to-end without a final re-sort step.
+        const chunkStarts = new Array(chunks.length);
+        {
+            let cursor = 0;
+            for (let i = 0; i < chunks.length; i++) {
+                chunkStarts[i] = cursor;
+                cursor += chunks[i].length;
             }
-            // Fail-fast if breaker tripped mid-loop
-            if (this.breaker.shouldFailFast()) {
-                throw new CircuitOpenError();
-            }
-            try {
-                const embeddings = await this.requestWithRetry(chunk, options);
-                for (let i = 0; i < chunk.length; i++) {
-                    const embedding = embeddings[i];
-                    if (embedding) {
-                        results[start + i] = {
-                            embedding,
-                            model: this.modelId,
-                        };
-                        anySucceeded = true;
-                        // Record dimensions on first success
-                        if (this.dimensions === undefined) {
-                            this.dimensions = embedding.length;
+        }
+        // Shared state across the worker pool. Each transition is final-write,
+        // so plain JS scalars are safe — no atomics or locks needed since
+        // workers only contend on these via cooperative-scheduled awaits.
+        let nextChunkIdx = 0;
+        let anySucceeded = false;
+        let aborted = false;
+        let circuitTrippedDuringRun = null;
+        // Workers run as parallel async tasks pulling chunks off `nextChunkIdx`
+        // until the queue is drained or one of the early-exit flags is set.
+        // Concurrency is capped at min(this.concurrency, chunks.length) so we
+        // don't spin up idle workers for tiny inputs.
+        const workerCount = Math.min(this.concurrency, chunks.length);
+        const dispatchOne = async () => {
+            while (true) {
+                if (aborted || circuitTrippedDuringRun)
+                    return;
+                const idx = nextChunkIdx++;
+                if (idx >= chunks.length)
+                    return;
+                const chunk = chunks[idx];
+                const start = chunkStarts[idx];
+                // Honor abort/breaker BEFORE issuing the request so we don't waste
+                // network for a dispatch we know will be discarded.
+                if (options.signal?.aborted) {
+                    aborted = true;
+                    this.lastError = `aborted by caller${options.signal.reason ? `: ${String(options.signal.reason)}` : ""}`;
+                    return;
+                }
+                if (this.breaker.shouldFailFast()) {
+                    // Capture the breaker-open intent so we throw it AFTER all
+                    // currently in-flight workers settle, instead of leaking
+                    // half-completed results. The thrown error is a fresh instance
+                    // (matching legacy behavior).
+                    circuitTrippedDuringRun = new CircuitOpenError();
+                    return;
+                }
+                try {
+                    const embeddings = await this.requestWithRetry(chunk, options);
+                    for (let i = 0; i < chunk.length; i++) {
+                        const embedding = embeddings[i];
+                        if (embedding) {
+                            results[start + i] = {
+                                embedding,
+                                model: this.modelId,
+                            };
+                            anySucceeded = true;
+                            // Record dimensions on first success. Concurrent workers may
+                            // race on this assignment, but they all observe the same
+                            // length so the race is benign.
+                            if (this.dimensions === undefined) {
+                                this.dimensions = embedding.length;
+                            }
                         }
                     }
+                    this.breaker.recordSuccess();
                 }
-                this.breaker.recordSuccess();
-            }
-            catch (err) {
-                this.breaker.recordFailure();
-                // CircuitOpenError must propagate so the caller can fall back
-                if (err instanceof CircuitOpenError)
-                    throw err;
-                // Capture the underlying error so callers (e.g. the store dimension
-                // probe) can surface it instead of "Failed to get embedding
-                // dimensions from first chunk" with no context.
-                this.lastError = this.formatErrorContext(err);
-                // Other errors mark the chunk as null and continue with next chunk.
-                // (The store layer already handles per-text nulls as errors.)
-                if (process.env.QMD_EMBED_DEBUG) {
-                    process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`);
+                catch (err) {
+                    this.breaker.recordFailure();
+                    if (err instanceof CircuitOpenError) {
+                        circuitTrippedDuringRun = err;
+                        return;
+                    }
+                    // Last-write-wins on lastError matches the legacy semantics — under
+                    // concurrency multiple workers may fail in the same call, but the
+                    // lastError just needs to surface "the most recent cause."
+                    this.lastError = this.formatErrorContext(err);
+                    if (process.env.QMD_EMBED_DEBUG) {
+                        process.stderr.write(`OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`);
+                    }
                 }
             }
-        }
+        };
+        await Promise.all(Array.from({ length: workerCount }, () => dispatchOne()));
+        // If a worker observed `shouldFailFast()` mid-run, surface the error
+        // after all in-flight workers have settled.
+        if (circuitTrippedDuringRun)
+            throw circuitTrippedDuringRun;
         // Clear lastError on a fully-successful sweep (every input got an embedding).
         if (anySucceeded && results.every((r) => r !== null)) {
             this.lastError = undefined;

+ 37 - 12
dist/store.js

@@ -1193,7 +1193,12 @@ export async function generateEmbeddings(store, options) {
         let bytesProcessed = 0;
         let totalChunks = 0;
         let vectorTableInitialized = false;
-        const BATCH_SIZE = 32;
+        // Inner batch size — number of chunks fed into each `embedMany` call.
+        // Bumped 32 → 256 (i-fkpnar9i) so the openai provider's concurrent
+        // dispatcher receives ≥ 4 sub-chunks of size 64 (worker MAX_BATCH) and
+        // can saturate the worker's MAX_CONCURRENT_REQUESTS=4 semaphore.
+        // Override per-deploy via `QMD_EMBED_INNER_BATCH_SIZE`.
+        const BATCH_SIZE = parseInt(process.env.QMD_EMBED_INNER_BATCH_SIZE ?? "256", 10) || 256;
         const batches = buildEmbeddingBatches(docsToEmbed, maxDocsPerBatch, maxBatchBytes);
         // Embedding helpers — single point of provider/session selection.
         // Both return the same shape as ILLMSession.embed/embedBatch so the
@@ -1313,18 +1318,38 @@ export async function generateEmbeddings(store, options) {
                 const texts = chunkBatch.map(chunk => formatDocForEmbedding(chunk.text, chunk.title, embedModelUri));
                 try {
                     const embeddings = await embedMany(texts, providerModel);
-                    for (let i = 0; i < chunkBatch.length; i++) {
-                        const chunk = chunkBatch[i];
-                        const embedding = embeddings[i];
-                        if (embedding) {
-                            insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), providerModel, now);
-                            chunksEmbedded++;
-                        }
-                        else {
-                            errors++;
+                    // Wrap the per-chunk inserts in a single SQLite transaction
+                    // (i-fkpnar9i Phase 1 #3): avoids the WAL fsync per-row tax on
+                    // large `BATCH_SIZE`. better-sqlite3's `db.transaction(fn)` opens
+                    // BEGIN IMMEDIATE on entry and COMMITs on return; if any insert
+                    // throws, the wrapper rolls back AND re-throws, falling through
+                    // to the per-chunk fallback below — preserving the legacy
+                    // "best-effort survive partial failures" semantics.
+                    //
+                    // We DELIBERATELY do not wrap the fallback's per-chunk loop —
+                    // that path is per-chunk individual auto-commits so a single
+                    // bad chunk doesn't drag down the rest. (Wrapping would be a
+                    // step backward.)
+                    const insertBatchTxn = db.transaction(() => {
+                        let okCount = 0;
+                        let errCount = 0;
+                        for (let i = 0; i < chunkBatch.length; i++) {
+                            const chunk = chunkBatch[i];
+                            const embedding = embeddings[i];
+                            if (embedding) {
+                                insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), providerModel, now);
+                                okCount++;
+                            }
+                            else {
+                                errCount++;
+                            }
                         }
-                        batchChunkBytesProcessed += chunk.bytes;
-                    }
+                        return { okCount, errCount };
+                    });
+                    const { okCount, errCount } = insertBatchTxn();
+                    chunksEmbedded += okCount;
+                    errors += errCount;
+                    batchChunkBytesProcessed += chunkBatch.reduce((sum, c) => sum + c.bytes, 0);
                 }
                 catch {
                     // Batch failed — try individual embeddings as fallback

+ 7 - 0
src/db.ts

@@ -70,6 +70,13 @@ export interface Database {
   prepare(sql: string): Statement;
   loadExtension(path: string): void;
   close(): void;
+  /**
+   * Wrap a synchronous function in a SQLite transaction. better-sqlite3 opens
+   * `BEGIN IMMEDIATE` on entry and `COMMIT` on return; on throw it rolls back
+   * AND re-throws. bun:sqlite has the same shape. Used by `generateEmbeddings`
+   * to batch per-row INSERTs into a single WAL fsync (i-fkpnar9i).
+   */
+  transaction<T extends unknown[], R>(fn: (...args: T) => R): (...args: T) => R;
 }
 
 export interface Statement {

+ 12 - 0
src/embedding/factory.ts

@@ -37,6 +37,12 @@ export type EmbedProviderConfigFile = {
     modelId?: string;
     upstreamModel?: string;
     batchSize?: number;
+    /**
+     * Max in-flight HTTP requests during a single `embedBatch` call. Default 4
+     * (matches qmd-embed-worker's MAX_CONCURRENT_REQUESTS=4 semaphore). Set
+     * to 1 to force legacy sequential dispatch.
+     */
+    concurrency?: number;
     timeoutMs?: number;
     /** When true, wrap the openai provider in AutoFallback (local fallback). */
     autoFallback?: boolean;
@@ -190,12 +196,18 @@ export function createEmbeddingProvider(
     parsePositiveInt(env.QMD_EMBED_TIMEOUT_MS) ??
     cfg.embedProvider?.timeoutMs;
 
+  const concurrencyRaw =
+    opts.openai?.concurrency ??
+    parsePositiveInt(env.QMD_EMBED_CONCURRENCY) ??
+    cfg.embedProvider?.concurrency;
+
   const openaiProvider = new OpenAIEmbeddingsProvider({
     endpoint,
     apiKey,
     modelId,
     upstreamModel,
     batchSize: batchSizeRaw,
+    concurrency: concurrencyRaw,
     timeoutMs: timeoutMsRaw,
     fetchImpl: opts.openai?.fetchImpl,
     retryBackoffsMs: opts.openai?.retryBackoffsMs,

+ 107 - 45
src/embedding/openai.ts

@@ -35,6 +35,16 @@ import type {
  */
 export const DEFAULT_BATCH_SIZE = 64;
 
+/**
+ * Default in-flight concurrency cap for `embedBatch`. The qmd-embed-worker
+ * exposes a 4-way semaphore (`MAX_CONCURRENT_REQUESTS=4`) and idles at
+ * queue-depth 1.0 under sequential clients (i-fkpnar9i baseline). Defaulting
+ * to 4 matches the worker's advertised concurrency without overshooting the
+ * GPU. Override per-deploy via `QMD_EMBED_CONCURRENCY`. Setting to 1 reverts
+ * to the legacy sequential dispatch.
+ */
+export const DEFAULT_CONCURRENCY = 4;
+
 /**
  * Default per-request timeout (30 s). embeddinggemma-300M on RTX 4090 takes
  * <500ms per batch of 64 in practice; 30s is a safe upper bound.
@@ -75,6 +85,12 @@ export type OpenAIProviderConfig = {
   upstreamModel?: string;
   /** Batch size cap (default DEFAULT_BATCH_SIZE = 64) */
   batchSize?: number;
+  /**
+   * Max in-flight HTTP requests during a single `embedBatch` call. Default
+   * `DEFAULT_CONCURRENCY=4` matches the worker semaphore. Set to 1 to force
+   * legacy sequential dispatch (useful for benchmarks / regression bisect).
+   */
+  concurrency?: number;
   /** Per-request timeout in ms (default DEFAULT_TIMEOUT_MS = 30_000) */
   timeoutMs?: number;
   /** Custom fetch (for testing). Defaults to global `fetch`. */
@@ -316,6 +332,7 @@ export class OpenAIEmbeddingsProvider implements EmbeddingProvider {
   private readonly modelId: string;
   private readonly upstreamModel: string;
   private readonly batchSize: number;
+  private readonly concurrency: number;
   private readonly timeoutMs: number;
   private readonly fetchImpl: typeof fetch;
   private readonly retryBackoffsMs: readonly number[];
@@ -335,6 +352,7 @@ export class OpenAIEmbeddingsProvider implements EmbeddingProvider {
     this.modelId = config.modelId ?? "embeddinggemma";
     this.upstreamModel = config.upstreamModel ?? this.modelId;
     this.batchSize = config.batchSize ?? DEFAULT_BATCH_SIZE;
+    this.concurrency = config.concurrency ?? DEFAULT_CONCURRENCY;
     this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
     this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
     this.retryBackoffsMs = config.retryBackoffsMs ?? RETRY_BACKOFFS_MS;
@@ -351,6 +369,9 @@ export class OpenAIEmbeddingsProvider implements EmbeddingProvider {
     if (this.batchSize < 1) {
       throw new Error(`OpenAIEmbeddingsProvider: batchSize must be ≥ 1, got ${this.batchSize}`);
     }
+    if (this.concurrency < 1) {
+      throw new Error(`OpenAIEmbeddingsProvider: concurrency must be ≥ 1, got ${this.concurrency}`);
+    }
   }
 
   getModelId(): string {
@@ -452,59 +473,100 @@ export class OpenAIEmbeddingsProvider implements EmbeddingProvider {
 
     const chunks = chunkArray(texts, this.batchSize);
     const results: (ProviderEmbedding | null)[] = new Array(texts.length).fill(null);
-    let cursor = 0;
-    let anySucceeded = false;
-
-    for (const chunk of chunks) {
-      const start = cursor;
-      cursor += chunk.length;
 
-      // Abort early if signal already fired
-      if (options.signal?.aborted) {
-        // Leave remaining slots as null (caller treats as errors)
-        this.lastError = `aborted by caller${options.signal.reason ? `: ${String(options.signal.reason)}` : ""}`;
-        return results;
+    // Pre-compute the input-array starting position for each chunk so each
+    // worker can write its slice of `results` independently — input order is
+    // preserved end-to-end without a final re-sort step.
+    const chunkStarts: number[] = new Array(chunks.length);
+    {
+      let cursor = 0;
+      for (let i = 0; i < chunks.length; i++) {
+        chunkStarts[i] = cursor;
+        cursor += chunks[i]!.length;
       }
+    }
 
-      // Fail-fast if breaker tripped mid-loop
-      if (this.breaker.shouldFailFast()) {
-        throw new CircuitOpenError();
-      }
+    // Shared state across the worker pool. Each transition is final-write,
+    // so plain JS scalars are safe — no atomics or locks needed since
+    // workers only contend on these via cooperative-scheduled awaits.
+    let nextChunkIdx = 0;
+    let anySucceeded = false;
+    let aborted = false;
+    let circuitTrippedDuringRun: CircuitOpenError | null = null;
+
+    // Workers run as parallel async tasks pulling chunks off `nextChunkIdx`
+    // until the queue is drained or one of the early-exit flags is set.
+    // Concurrency is capped at min(this.concurrency, chunks.length) so we
+    // don't spin up idle workers for tiny inputs.
+    const workerCount = Math.min(this.concurrency, chunks.length);
+    const dispatchOne = async (): Promise<void> => {
+      while (true) {
+        if (aborted || circuitTrippedDuringRun) return;
+        const idx = nextChunkIdx++;
+        if (idx >= chunks.length) return;
+
+        const chunk = chunks[idx]!;
+        const start = chunkStarts[idx]!;
+
+        // Honor abort/breaker BEFORE issuing the request so we don't waste
+        // network for a dispatch we know will be discarded.
+        if (options.signal?.aborted) {
+          aborted = true;
+          this.lastError = `aborted by caller${options.signal.reason ? `: ${String(options.signal.reason)}` : ""}`;
+          return;
+        }
+        if (this.breaker.shouldFailFast()) {
+          // Capture the breaker-open intent so we throw it AFTER all
+          // currently in-flight workers settle, instead of leaking
+          // half-completed results. The thrown error is a fresh instance
+          // (matching legacy behavior).
+          circuitTrippedDuringRun = new CircuitOpenError();
+          return;
+        }
 
-      try {
-        const embeddings = await this.requestWithRetry(chunk, options);
-        for (let i = 0; i < chunk.length; i++) {
-          const embedding = embeddings[i];
-          if (embedding) {
-            results[start + i] = {
-              embedding,
-              model: this.modelId,
-            };
-            anySucceeded = true;
-            // Record dimensions on first success
-            if (this.dimensions === undefined) {
-              this.dimensions = embedding.length;
+        try {
+          const embeddings = await this.requestWithRetry(chunk, options);
+          for (let i = 0; i < chunk.length; i++) {
+            const embedding = embeddings[i];
+            if (embedding) {
+              results[start + i] = {
+                embedding,
+                model: this.modelId,
+              };
+              anySucceeded = true;
+              // Record dimensions on first success. Concurrent workers may
+              // race on this assignment, but they all observe the same
+              // length so the race is benign.
+              if (this.dimensions === undefined) {
+                this.dimensions = embedding.length;
+              }
             }
           }
-        }
-        this.breaker.recordSuccess();
-      } catch (err) {
-        this.breaker.recordFailure();
-        // CircuitOpenError must propagate so the caller can fall back
-        if (err instanceof CircuitOpenError) throw err;
-        // Capture the underlying error so callers (e.g. the store dimension
-        // probe) can surface it instead of "Failed to get embedding
-        // dimensions from first chunk" with no context.
-        this.lastError = this.formatErrorContext(err);
-        // Other errors mark the chunk as null and continue with next chunk.
-        // (The store layer already handles per-text nulls as errors.)
-        if (process.env.QMD_EMBED_DEBUG) {
-          process.stderr.write(
-            `OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`,
-          );
+          this.breaker.recordSuccess();
+        } catch (err) {
+          this.breaker.recordFailure();
+          if (err instanceof CircuitOpenError) {
+            circuitTrippedDuringRun = err;
+            return;
+          }
+          // Last-write-wins on lastError matches the legacy semantics — under
+          // concurrency multiple workers may fail in the same call, but the
+          // lastError just needs to surface "the most recent cause."
+          this.lastError = this.formatErrorContext(err);
+          if (process.env.QMD_EMBED_DEBUG) {
+            process.stderr.write(
+              `OpenAIEmbeddingsProvider: chunk failed (${err instanceof Error ? err.message : String(err)})\n`,
+            );
+          }
         }
       }
-    }
+    };
+
+    await Promise.all(Array.from({ length: workerCount }, () => dispatchOne()));
+
+    // If a worker observed `shouldFailFast()` mid-run, surface the error
+    // after all in-flight workers have settled.
+    if (circuitTrippedDuringRun) throw circuitTrippedDuringRun;
 
     // Clear lastError on a fully-successful sweep (every input got an embedding).
     if (anySucceeded && results.every((r) => r !== null)) {

+ 36 - 11
src/store.ts

@@ -1609,7 +1609,12 @@ export async function generateEmbeddings(
     let bytesProcessed = 0;
     let totalChunks = 0;
     let vectorTableInitialized = false;
-    const BATCH_SIZE = 32;
+    // Inner batch size — number of chunks fed into each `embedMany` call.
+    // Bumped 32 → 256 (i-fkpnar9i) so the openai provider's concurrent
+    // dispatcher receives ≥ 4 sub-chunks of size 64 (worker MAX_BATCH) and
+    // can saturate the worker's MAX_CONCURRENT_REQUESTS=4 semaphore.
+    // Override per-deploy via `QMD_EMBED_INNER_BATCH_SIZE`.
+    const BATCH_SIZE = parseInt(process.env.QMD_EMBED_INNER_BATCH_SIZE ?? "256", 10) || 256;
     const batches = buildEmbeddingBatches(docsToEmbed, maxDocsPerBatch, maxBatchBytes);
 
     // Embedding helpers — single point of provider/session selection.
@@ -1760,17 +1765,37 @@ export async function generateEmbeddings(
 
         try {
           const embeddings = await embedMany(texts, providerModel);
-          for (let i = 0; i < chunkBatch.length; i++) {
-            const chunk = chunkBatch[i]!;
-            const embedding = embeddings[i];
-            if (embedding) {
-              insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), providerModel, now);
-              chunksEmbedded++;
-            } else {
-              errors++;
+          // Wrap the per-chunk inserts in a single SQLite transaction
+          // (i-fkpnar9i Phase 1 #3): avoids the WAL fsync per-row tax on
+          // large `BATCH_SIZE`. better-sqlite3's `db.transaction(fn)` opens
+          // BEGIN IMMEDIATE on entry and COMMITs on return; if any insert
+          // throws, the wrapper rolls back AND re-throws, falling through
+          // to the per-chunk fallback below — preserving the legacy
+          // "best-effort survive partial failures" semantics.
+          //
+          // We DELIBERATELY do not wrap the fallback's per-chunk loop —
+          // that path is per-chunk individual auto-commits so a single
+          // bad chunk doesn't drag down the rest. (Wrapping would be a
+          // step backward.)
+          const insertBatchTxn = db.transaction(() => {
+            let okCount = 0;
+            let errCount = 0;
+            for (let i = 0; i < chunkBatch.length; i++) {
+              const chunk = chunkBatch[i]!;
+              const embedding = embeddings[i];
+              if (embedding) {
+                insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), providerModel, now);
+                okCount++;
+              } else {
+                errCount++;
+              }
             }
-            batchChunkBytesProcessed += chunk.bytes;
-          }
+            return { okCount, errCount };
+          });
+          const { okCount, errCount } = insertBatchTxn();
+          chunksEmbedded += okCount;
+          errors += errCount;
+          batchChunkBytesProcessed += chunkBatch.reduce((sum, c) => sum + c.bytes, 0);
         } catch {
           // Batch failed — try individual embeddings as fallback
           // But skip if session is already invalid (avoids N doomed retries)

+ 299 - 0
test/embedding-openai.test.ts

@@ -778,6 +778,305 @@ describe("OpenAIEmbeddingsProvider — getLastError (i-vm1lxwry)", () => {
 
 // ─────────────────────────── dispose ─────────────────────────────────────────
 
+// ─────────────────────────── Concurrent dispatch (i-fkpnar9i Phase 1 #1) ─────
+
+/**
+ * Fetch helper that lets a test:
+ *   - count how many requests are in-flight at any moment
+ *   - control resolution order via per-call deferred promises
+ *   - inspect the start order vs resolution order
+ *
+ * Each `responses[i]` returns a Response (or Promise<Response>); the helper
+ * wraps each call so it awaits a `gate[i]` deferred BEFORE responding. Tests
+ * call `release(i)` to let the i-th request settle. Useful for testing that
+ * concurrent dispatch actually overlaps requests.
+ */
+function makeGatedFetchSequence(count: number): {
+  fetchImpl: typeof fetch;
+  inFlight: () => number;
+  startOrder: number[];
+  release: (idx: number, response: Response) => void;
+  releaseAll: (responseFor: (idx: number) => Response) => void;
+} {
+  const gates: Array<{ resolve: (r: Response) => void }> = [];
+  const startOrder: number[] = [];
+  let inFlight = 0;
+  let nextStart = 0;
+
+  for (let i = 0; i < count; i++) {
+    let resolveFn: (r: Response) => void = () => {};
+    new Promise<Response>((resolve) => {
+      resolveFn = resolve;
+    });
+    // re-create properly:
+    let r2: (x: Response) => void = () => {};
+    const p = new Promise<Response>((resolve) => {
+      r2 = resolve;
+    });
+    gates.push({ resolve: r2 });
+    // attach the unresolved promise back to the slot via a closure (below)
+    (gates[i] as any).promise = p;
+  }
+
+  const fetchImpl = (async (_input: RequestInfo | URL, _init?: RequestInit) => {
+    const idx = nextStart++;
+    if (idx >= count) throw new Error(`gated fetch exhausted at ${idx + 1}`);
+    startOrder.push(idx);
+    inFlight++;
+    try {
+      const r = await (gates[idx] as any).promise;
+      return r as Response;
+    } finally {
+      inFlight--;
+    }
+  }) as typeof fetch;
+
+  return {
+    fetchImpl,
+    inFlight: () => inFlight,
+    startOrder,
+    release: (idx: number, response: Response) => gates[idx]!.resolve(response),
+    releaseAll: (responseFor: (idx: number) => Response) => {
+      for (let i = 0; i < count; i++) gates[i]!.resolve(responseFor(i));
+    },
+  };
+}
+
+describe("OpenAIEmbeddingsProvider — concurrent dispatch (i-fkpnar9i)", () => {
+  test("default concurrency is 4 — 8 chunks of size 1, max in-flight = 4", async () => {
+    const N = 8;
+    const gated = makeGatedFetchSequence(N);
+    const p = new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: gated.fetchImpl,
+      batchSize: 1, // each text becomes its own chunk
+    });
+    // Expected concurrency=4 default
+    expect((p as any).concurrency).toBe(4);
+
+    const texts = Array.from({ length: N }, (_, i) => `t${i}`);
+    const promise = p.embedBatch(texts);
+
+    // Yield to the microtask queue so workers can start their first dispatch.
+    // Multiple yields needed for chained `await this.requestWithRetry → await fetch`.
+    for (let i = 0; i < 5; i++) await Promise.resolve();
+
+    expect(gated.inFlight()).toBe(4);
+    expect(gated.startOrder).toEqual([0, 1, 2, 3]);
+
+    // Release first 4 in reverse order — concurrent dispatch should still
+    // preserve input order in `results` because each worker writes to its
+    // pre-computed slot.
+    for (let i = 3; i >= 0; i--) {
+      gated.release(i, embeddingsResponse([`t${i}`], 4));
+    }
+    // Yield to let workers pick up next chunks
+    for (let i = 0; i < 10; i++) await Promise.resolve();
+
+    expect(gated.inFlight()).toBeGreaterThan(0); // 4 more workers should be in flight
+    expect(gated.startOrder.length).toBe(8); // all 8 dispatched
+
+    // Release the rest
+    for (let i = 4; i < N; i++) {
+      gated.release(i, embeddingsResponse([`t${i}`], 4));
+    }
+    const result = await promise;
+    expect(result.length).toBe(N);
+    // Critical: input order preserved despite out-of-order resolution
+    for (let i = 0; i < N; i++) {
+      expect(result[i]).not.toBeNull();
+      expect(result[i]!.model).toBe("embeddinggemma");
+    }
+    expect(gated.inFlight()).toBe(0);
+  });
+
+  test("explicit concurrency=2 — only 2 in-flight at any moment", async () => {
+    const N = 6;
+    const gated = makeGatedFetchSequence(N);
+    const p = new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: gated.fetchImpl,
+      batchSize: 1,
+      concurrency: 2,
+    });
+    const texts = Array.from({ length: N }, (_, i) => `t${i}`);
+    const promise = p.embedBatch(texts);
+
+    for (let i = 0; i < 5; i++) await Promise.resolve();
+    expect(gated.inFlight()).toBe(2);
+
+    // Cycle: release one, wait, expect new one started
+    gated.release(0, embeddingsResponse(["t0"], 4));
+    for (let i = 0; i < 10; i++) await Promise.resolve();
+    expect(gated.inFlight()).toBe(2); // still 2 — slot filled by t2
+
+    // Drain the rest
+    for (let i = 1; i < N; i++) {
+      gated.release(i, embeddingsResponse([`t${i}`], 4));
+      for (let j = 0; j < 5; j++) await Promise.resolve();
+    }
+    const result = await promise;
+    expect(result.every((r) => r !== null)).toBe(true);
+  });
+
+  test("concurrency=1 reproduces legacy sequential behavior", async () => {
+    const N = 4;
+    const gated = makeGatedFetchSequence(N);
+    const p = new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: gated.fetchImpl,
+      batchSize: 1,
+      concurrency: 1,
+    });
+    const texts = Array.from({ length: N }, (_, i) => `t${i}`);
+    const promise = p.embedBatch(texts);
+
+    for (let i = 0; i < 5; i++) await Promise.resolve();
+    expect(gated.inFlight()).toBe(1);
+    expect(gated.startOrder).toEqual([0]);
+
+    // Release one at a time, confirm the next starts only after.
+    for (let i = 0; i < N; i++) {
+      gated.release(i, embeddingsResponse([`t${i}`], 4));
+      for (let j = 0; j < 5; j++) await Promise.resolve();
+    }
+    await promise;
+    // All started in order (sequential)
+    expect(gated.startOrder).toEqual([0, 1, 2, 3]);
+  });
+
+  test("results in input order even when the LAST chunk resolves first", async () => {
+    const N = 4;
+    const gated = makeGatedFetchSequence(N);
+    const p = new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: gated.fetchImpl,
+      batchSize: 1,
+      concurrency: 4,
+    });
+    const texts = ["alpha", "beta", "gamma", "delta"];
+    const promise = p.embedBatch(texts);
+
+    // Wait for all 4 to be in flight, then resolve LAST first
+    for (let i = 0; i < 5; i++) await Promise.resolve();
+    expect(gated.inFlight()).toBe(4);
+    gated.release(3, embeddingsResponse(["delta"], 4));
+    gated.release(2, embeddingsResponse(["gamma"], 4));
+    gated.release(1, embeddingsResponse(["beta"], 4));
+    gated.release(0, embeddingsResponse(["alpha"], 4));
+
+    const result = await promise;
+    expect(result.length).toBe(N);
+    // Each input slot got its own embedding — input order preserved
+    expect(result[0]).not.toBeNull();
+    expect(result[1]).not.toBeNull();
+    expect(result[2]).not.toBeNull();
+    expect(result[3]).not.toBeNull();
+  });
+
+  test("dimensions recorded correctly even if the first-resolving chunk is not chunk 0", async () => {
+    const gated = makeGatedFetchSequence(2);
+    const p = new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: gated.fetchImpl,
+      batchSize: 1,
+      concurrency: 2,
+    });
+    const promise = p.embedBatch(["a", "b"]);
+    for (let i = 0; i < 5; i++) await Promise.resolve();
+    // Resolve chunk 1 first with 7-dim, then chunk 0 with 7-dim
+    gated.release(1, embeddingsResponse(["b"], 7));
+    for (let i = 0; i < 5; i++) await Promise.resolve();
+    gated.release(0, embeddingsResponse(["a"], 7));
+    await promise;
+    expect(p.getDimensions()).toBe(7);
+  });
+
+  test("abort signal during concurrent run stops new dispatches; in-flight settle", async () => {
+    const gated = makeGatedFetchSequence(8);
+    const p = new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: gated.fetchImpl,
+      batchSize: 1,
+      concurrency: 4,
+    });
+    const ctrl = new AbortController();
+    const texts = Array.from({ length: 8 }, (_, i) => `t${i}`);
+    const promise = p.embedBatch(texts, { signal: ctrl.signal });
+    for (let i = 0; i < 5; i++) await Promise.resolve();
+    expect(gated.startOrder.length).toBe(4);
+
+    // Resolve in-flight, then abort — remaining 4 should NOT dispatch
+    gated.release(0, embeddingsResponse(["t0"], 4));
+    gated.release(1, embeddingsResponse(["t1"], 4));
+    gated.release(2, embeddingsResponse(["t2"], 4));
+    gated.release(3, embeddingsResponse(["t3"], 4));
+    ctrl.abort(new Error("operator cancelled"));
+    for (let i = 0; i < 20; i++) await Promise.resolve();
+
+    const result = await promise;
+    // First 4 succeeded, last 4 are null (never dispatched after abort)
+    expect(result.slice(0, 4).every((r) => r !== null)).toBe(true);
+    expect(result.slice(4).every((r) => r === null)).toBe(true);
+    // Total dispatched MUST be ≤ 5 (the abort can race with one extra
+    // worker pulling the next idx before the abort flag is set; we cap
+    // at first 4 + at-most-1 grace).
+    expect(gated.startOrder.length).toBeLessThanOrEqual(5);
+    // Audit string captured
+    expect(p.getLastError()).toMatch(/aborted by caller/);
+  });
+
+  test("ctor rejects concurrency < 1", () => {
+    expect(() => new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: (async () => mockResponse(200, {})) as typeof fetch,
+      concurrency: 0,
+    })).toThrow(/concurrency must be ≥ 1/);
+    expect(() => new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl: (async () => mockResponse(200, {})) as typeof fetch,
+      concurrency: -3,
+    })).toThrow(/concurrency must be ≥ 1/);
+  });
+
+  test("circuit-open observed mid-run is thrown after in-flight settle", async () => {
+    // 8 chunks, all fail → breaker opens after the first 4 (minSamples=4).
+    // Workers 0-3 dispatch in parallel, all fail, recordFailure × 4 → breaker
+    // OPEN. Remaining workers see shouldFailFast() and set circuitTrippedDuringRun.
+    const N = 8;
+    const { fetchImpl } = makeFetchSequence(
+      Array.from({ length: N }, () => () => mockResponse(401, "fail"))
+    );
+    const p = new OpenAIEmbeddingsProvider({
+      endpoint: "https://ai.example.com",
+      fetchImpl,
+      batchSize: 1,
+      concurrency: 4,
+      retryBackoffsMs: [],
+      sleep: async () => {},
+    });
+    // First 4 land before breaker opens (concurrent dispatch); after they all
+    // fail the breaker tips OPEN. The next pull observes shouldFailFast().
+    // Either the result resolves with all-null (legacy semantics — breaker
+    // tripped AFTER all workers grabbed their chunk) OR throws CircuitOpenError
+    // (breaker observed before next pull). Both are valid post-condition;
+    // we just assert the state ends OPEN and the call completes.
+    let res: Awaited<ReturnType<typeof p.embedBatch>> | undefined;
+    let err: unknown;
+    try {
+      res = await p.embedBatch(Array.from({ length: N }, (_, i) => `t${i}`));
+    } catch (e) {
+      err = e;
+    }
+    expect(p.breaker.getState()).toBe("open");
+    if (err) {
+      expect(err).toBeInstanceOf(CircuitOpenError);
+    } else {
+      expect(res!.every((r) => r === null)).toBe(true);
+    }
+  });
+});
+
 describe("OpenAIEmbeddingsProvider — dispose", () => {
   test("dispose resets the breaker", async () => {
     const { fetchImpl } = makeFetchSequence([