|
|
@@ -24,7 +24,6 @@ import {
|
|
|
formatQueryForEmbedding,
|
|
|
formatDocForEmbedding,
|
|
|
withLLMSessionForLlm,
|
|
|
- type LLMSessionOptions,
|
|
|
type RerankDocument,
|
|
|
type ILLMSession,
|
|
|
} from "./llm.js";
|
|
|
@@ -45,6 +44,8 @@ export const DEFAULT_RERANK_MODEL = "ExpedientFalcon/qwen3-reranker:0.6b-q8_0";
|
|
|
export const DEFAULT_QUERY_MODEL = "Qwen/Qwen3-1.7B";
|
|
|
export const DEFAULT_GLOB = "**/*.md";
|
|
|
export const DEFAULT_MULTI_GET_MAX_BYTES = 10 * 1024; // 10KB
|
|
|
+export const DEFAULT_EMBED_MAX_DOCS_PER_BATCH = 64;
|
|
|
+export const DEFAULT_EMBED_MAX_BATCH_BYTES = 64 * 1024 * 1024; // 64MB
|
|
|
|
|
|
// Chunking: 900 tokens per chunk with 15% overlap
|
|
|
// Increased from 800 to accommodate smart chunking finding natural break points
|
|
|
@@ -1179,6 +1180,109 @@ export type EmbedResult = {
|
|
|
durationMs: number;
|
|
|
};
|
|
|
|
|
|
+export type EmbedOptions = {
|
|
|
+ force?: boolean;
|
|
|
+ model?: string;
|
|
|
+ maxDocsPerBatch?: number;
|
|
|
+ maxBatchBytes?: number;
|
|
|
+ onProgress?: (info: EmbedProgress) => void;
|
|
|
+};
|
|
|
+
|
|
|
+type PendingEmbeddingDoc = {
|
|
|
+ hash: string;
|
|
|
+ path: string;
|
|
|
+ bytes: number;
|
|
|
+};
|
|
|
+
|
|
|
+type EmbeddingDoc = PendingEmbeddingDoc & {
|
|
|
+ body: string;
|
|
|
+};
|
|
|
+
|
|
|
+type ChunkItem = {
|
|
|
+ hash: string;
|
|
|
+ title: string;
|
|
|
+ text: string;
|
|
|
+ seq: number;
|
|
|
+ pos: number;
|
|
|
+ tokens: number;
|
|
|
+ bytes: number;
|
|
|
+};
|
|
|
+
|
|
|
+function validatePositiveIntegerOption(name: string, value: number | undefined, fallback: number): number {
|
|
|
+ if (value === undefined) return fallback;
|
|
|
+ if (!Number.isInteger(value) || value < 1) {
|
|
|
+ throw new Error(`${name} must be a positive integer`);
|
|
|
+ }
|
|
|
+ return value;
|
|
|
+}
|
|
|
+
|
|
|
+function resolveEmbedOptions(options?: EmbedOptions): Required<Pick<EmbedOptions, "maxDocsPerBatch" | "maxBatchBytes">> {
|
|
|
+ return {
|
|
|
+ maxDocsPerBatch: validatePositiveIntegerOption("maxDocsPerBatch", options?.maxDocsPerBatch, DEFAULT_EMBED_MAX_DOCS_PER_BATCH),
|
|
|
+ maxBatchBytes: validatePositiveIntegerOption("maxBatchBytes", options?.maxBatchBytes, DEFAULT_EMBED_MAX_BATCH_BYTES),
|
|
|
+ };
|
|
|
+}
|
|
|
+
|
|
|
+function getPendingEmbeddingDocs(db: Database): PendingEmbeddingDoc[] {
|
|
|
+ return db.prepare(`
|
|
|
+ SELECT d.hash, MIN(d.path) as path, length(CAST(c.doc AS BLOB)) as bytes
|
|
|
+ FROM documents d
|
|
|
+ JOIN content c ON d.hash = c.hash
|
|
|
+ LEFT JOIN content_vectors v ON d.hash = v.hash AND v.seq = 0
|
|
|
+ WHERE d.active = 1 AND v.hash IS NULL
|
|
|
+ GROUP BY d.hash
|
|
|
+ ORDER BY MIN(d.path)
|
|
|
+ `).all() as PendingEmbeddingDoc[];
|
|
|
+}
|
|
|
+
|
|
|
+function buildEmbeddingBatches(
|
|
|
+ docs: PendingEmbeddingDoc[],
|
|
|
+ maxDocsPerBatch: number,
|
|
|
+ maxBatchBytes: number,
|
|
|
+): PendingEmbeddingDoc[][] {
|
|
|
+ const batches: PendingEmbeddingDoc[][] = [];
|
|
|
+ let currentBatch: PendingEmbeddingDoc[] = [];
|
|
|
+ let currentBytes = 0;
|
|
|
+
|
|
|
+ for (const doc of docs) {
|
|
|
+ const docBytes = Math.max(0, doc.bytes);
|
|
|
+ const wouldExceedDocs = currentBatch.length >= maxDocsPerBatch;
|
|
|
+ const wouldExceedBytes = currentBatch.length > 0 && (currentBytes + docBytes) > maxBatchBytes;
|
|
|
+
|
|
|
+ if (wouldExceedDocs || wouldExceedBytes) {
|
|
|
+ batches.push(currentBatch);
|
|
|
+ currentBatch = [];
|
|
|
+ currentBytes = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ currentBatch.push(doc);
|
|
|
+ currentBytes += docBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (currentBatch.length > 0) {
|
|
|
+ batches.push(currentBatch);
|
|
|
+ }
|
|
|
+
|
|
|
+ return batches;
|
|
|
+}
|
|
|
+
|
|
|
+function getEmbeddingDocsForBatch(db: Database, batch: PendingEmbeddingDoc[]): EmbeddingDoc[] {
|
|
|
+ if (batch.length === 0) return [];
|
|
|
+
|
|
|
+ const placeholders = batch.map(() => "?").join(",");
|
|
|
+ const rows = db.prepare(`
|
|
|
+ SELECT hash, doc as body
|
|
|
+ FROM content
|
|
|
+ WHERE hash IN (${placeholders})
|
|
|
+ `).all(...batch.map(doc => doc.hash)) as { hash: string; body: string }[];
|
|
|
+ const bodyByHash = new Map(rows.map(row => [row.hash, row.body]));
|
|
|
+
|
|
|
+ return batch.map((doc) => ({
|
|
|
+ ...doc,
|
|
|
+ body: bodyByHash.get(doc.hash) ?? "",
|
|
|
+ }));
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* Generate vector embeddings for documents that need them.
|
|
|
* Pure function — no console output, no db lifecycle management.
|
|
|
@@ -1186,120 +1290,141 @@ export type EmbedResult = {
|
|
|
*/
|
|
|
export async function generateEmbeddings(
|
|
|
store: Store,
|
|
|
- options?: {
|
|
|
- force?: boolean;
|
|
|
- model?: string;
|
|
|
- onProgress?: (info: EmbedProgress) => void;
|
|
|
- }
|
|
|
+ options?: EmbedOptions
|
|
|
): Promise<EmbedResult> {
|
|
|
const db = store.db;
|
|
|
const model = options?.model ?? DEFAULT_EMBED_MODEL;
|
|
|
const now = new Date().toISOString();
|
|
|
+ const { maxDocsPerBatch, maxBatchBytes } = resolveEmbedOptions(options);
|
|
|
+ const encoder = new TextEncoder();
|
|
|
|
|
|
if (options?.force) {
|
|
|
clearAllEmbeddings(db);
|
|
|
}
|
|
|
|
|
|
- const hashesToEmbed = getHashesForEmbedding(db);
|
|
|
+ const docsToEmbed = getPendingEmbeddingDocs(db);
|
|
|
|
|
|
- if (hashesToEmbed.length === 0) {
|
|
|
+ if (docsToEmbed.length === 0) {
|
|
|
return { docsProcessed: 0, chunksEmbedded: 0, errors: 0, durationMs: 0 };
|
|
|
}
|
|
|
-
|
|
|
- // Chunk all documents
|
|
|
- type ChunkItem = { hash: string; title: string; text: string; seq: number; pos: number; tokens: number; bytes: number };
|
|
|
- const allChunks: ChunkItem[] = [];
|
|
|
-
|
|
|
- for (const item of hashesToEmbed) {
|
|
|
- const encoder = new TextEncoder();
|
|
|
- const bodyBytes = encoder.encode(item.body).length;
|
|
|
- if (bodyBytes === 0) continue;
|
|
|
-
|
|
|
- const title = extractTitle(item.body, item.path);
|
|
|
- const chunks = await chunkDocumentByTokens(item.body);
|
|
|
-
|
|
|
- for (let seq = 0; seq < chunks.length; seq++) {
|
|
|
- allChunks.push({
|
|
|
- hash: item.hash,
|
|
|
- title,
|
|
|
- text: chunks[seq]!.text,
|
|
|
- seq,
|
|
|
- pos: chunks[seq]!.pos,
|
|
|
- tokens: chunks[seq]!.tokens,
|
|
|
- bytes: encoder.encode(chunks[seq]!.text).length,
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (allChunks.length === 0) {
|
|
|
- return { docsProcessed: 0, chunksEmbedded: 0, errors: 0, durationMs: 0 };
|
|
|
- }
|
|
|
-
|
|
|
- const totalBytes = allChunks.reduce((sum, chk) => sum + chk.bytes, 0);
|
|
|
- const totalChunks = allChunks.length;
|
|
|
- const totalDocs = hashesToEmbed.length;
|
|
|
+ const totalBytes = docsToEmbed.reduce((sum, doc) => sum + Math.max(0, doc.bytes), 0);
|
|
|
+ const totalDocs = docsToEmbed.length;
|
|
|
const startTime = Date.now();
|
|
|
|
|
|
// Use store's LlamaCpp or global singleton, wrapped in a session
|
|
|
const llm = getLlm(store);
|
|
|
- const sessionOptions: LLMSessionOptions = { maxDuration: 30 * 60 * 1000, name: 'generateEmbeddings' };
|
|
|
|
|
|
// Create a session manager for this llm instance
|
|
|
const result = await withLLMSessionForLlm(llm, async (session) => {
|
|
|
- // Get embedding dimensions from first chunk
|
|
|
- const firstChunk = allChunks[0]!;
|
|
|
- const firstText = formatDocForEmbedding(firstChunk.text, firstChunk.title);
|
|
|
- const firstResult = await session.embed(firstText);
|
|
|
- if (!firstResult) {
|
|
|
- throw new Error("Failed to get embedding dimensions from first chunk");
|
|
|
- }
|
|
|
- store.ensureVecTable(firstResult.embedding.length);
|
|
|
-
|
|
|
- let chunksEmbedded = 0, errors = 0, bytesProcessed = 0;
|
|
|
+ let chunksEmbedded = 0;
|
|
|
+ let errors = 0;
|
|
|
+ let bytesProcessed = 0;
|
|
|
+ let totalChunks = 0;
|
|
|
+ let vectorTableInitialized = false;
|
|
|
const BATCH_SIZE = 32;
|
|
|
+ const batches = buildEmbeddingBatches(docsToEmbed, maxDocsPerBatch, maxBatchBytes);
|
|
|
+
|
|
|
+ for (const batchMeta of batches) {
|
|
|
+ const batchDocs = getEmbeddingDocsForBatch(db, batchMeta);
|
|
|
+ const batchChunks: ChunkItem[] = [];
|
|
|
+ const batchBytes = batchMeta.reduce((sum, doc) => sum + Math.max(0, doc.bytes), 0);
|
|
|
+
|
|
|
+ for (const doc of batchDocs) {
|
|
|
+ if (!doc.body.trim()) continue;
|
|
|
+
|
|
|
+ const title = extractTitle(doc.body, doc.path);
|
|
|
+ const chunks = await chunkDocumentByTokens(doc.body);
|
|
|
+
|
|
|
+ for (let seq = 0; seq < chunks.length; seq++) {
|
|
|
+ batchChunks.push({
|
|
|
+ hash: doc.hash,
|
|
|
+ title,
|
|
|
+ text: chunks[seq]!.text,
|
|
|
+ seq,
|
|
|
+ pos: chunks[seq]!.pos,
|
|
|
+ tokens: chunks[seq]!.tokens,
|
|
|
+ bytes: encoder.encode(chunks[seq]!.text).length,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- for (let batchStart = 0; batchStart < allChunks.length; batchStart += BATCH_SIZE) {
|
|
|
- const batchEnd = Math.min(batchStart + BATCH_SIZE, allChunks.length);
|
|
|
- const batch = allChunks.slice(batchStart, batchEnd);
|
|
|
- const texts = batch.map(chunk => formatDocForEmbedding(chunk.text, chunk.title));
|
|
|
-
|
|
|
- try {
|
|
|
- const embeddings = await session.embedBatch(texts);
|
|
|
- for (let i = 0; i < batch.length; i++) {
|
|
|
- const chunk = batch[i]!;
|
|
|
- const embedding = embeddings[i];
|
|
|
- if (embedding) {
|
|
|
- insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), model, now);
|
|
|
- chunksEmbedded++;
|
|
|
- } else {
|
|
|
- errors++;
|
|
|
- }
|
|
|
- bytesProcessed += chunk.bytes;
|
|
|
+ totalChunks += batchChunks.length;
|
|
|
+
|
|
|
+ if (batchChunks.length === 0) {
|
|
|
+ bytesProcessed += batchBytes;
|
|
|
+ options?.onProgress?.({ chunksEmbedded, totalChunks, bytesProcessed, totalBytes, errors });
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!vectorTableInitialized) {
|
|
|
+ const firstChunk = batchChunks[0]!;
|
|
|
+ const firstText = formatDocForEmbedding(firstChunk.text, firstChunk.title);
|
|
|
+ const firstResult = await session.embed(firstText);
|
|
|
+ if (!firstResult) {
|
|
|
+ throw new Error("Failed to get embedding dimensions from first chunk");
|
|
|
}
|
|
|
- } catch {
|
|
|
- // Batch failed — try individual embeddings as fallback
|
|
|
- for (const chunk of batch) {
|
|
|
- try {
|
|
|
- const text = formatDocForEmbedding(chunk.text, chunk.title);
|
|
|
- const result = await session.embed(text);
|
|
|
- if (result) {
|
|
|
- insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now);
|
|
|
+ store.ensureVecTable(firstResult.embedding.length);
|
|
|
+ vectorTableInitialized = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ const totalBatchChunkBytes = batchChunks.reduce((sum, chunk) => sum + chunk.bytes, 0);
|
|
|
+ let batchChunkBytesProcessed = 0;
|
|
|
+
|
|
|
+ for (let batchStart = 0; batchStart < batchChunks.length; batchStart += BATCH_SIZE) {
|
|
|
+ const batchEnd = Math.min(batchStart + BATCH_SIZE, batchChunks.length);
|
|
|
+ const chunkBatch = batchChunks.slice(batchStart, batchEnd);
|
|
|
+ const texts = chunkBatch.map(chunk => formatDocForEmbedding(chunk.text, chunk.title));
|
|
|
+
|
|
|
+ try {
|
|
|
+ const embeddings = await session.embedBatch(texts);
|
|
|
+ for (let i = 0; i < chunkBatch.length; i++) {
|
|
|
+ const chunk = chunkBatch[i]!;
|
|
|
+ const embedding = embeddings[i];
|
|
|
+ if (embedding) {
|
|
|
+ insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(embedding.embedding), model, now);
|
|
|
chunksEmbedded++;
|
|
|
} else {
|
|
|
errors++;
|
|
|
}
|
|
|
- } catch {
|
|
|
- errors++;
|
|
|
+ batchChunkBytesProcessed += chunk.bytes;
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
+ // Batch failed — try individual embeddings as fallback
|
|
|
+ for (const chunk of chunkBatch) {
|
|
|
+ try {
|
|
|
+ const text = formatDocForEmbedding(chunk.text, chunk.title);
|
|
|
+ const result = await session.embed(text);
|
|
|
+ if (result) {
|
|
|
+ insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now);
|
|
|
+ chunksEmbedded++;
|
|
|
+ } else {
|
|
|
+ errors++;
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
+ errors++;
|
|
|
+ }
|
|
|
+ batchChunkBytesProcessed += chunk.bytes;
|
|
|
}
|
|
|
- bytesProcessed += chunk.bytes;
|
|
|
}
|
|
|
+
|
|
|
+ const proportionalBytes = totalBatchChunkBytes === 0
|
|
|
+ ? batchBytes
|
|
|
+ : Math.min(batchBytes, Math.round((batchChunkBytesProcessed / totalBatchChunkBytes) * batchBytes));
|
|
|
+ options?.onProgress?.({
|
|
|
+ chunksEmbedded,
|
|
|
+ totalChunks,
|
|
|
+ bytesProcessed: bytesProcessed + proportionalBytes,
|
|
|
+ totalBytes,
|
|
|
+ errors,
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
+ bytesProcessed += batchBytes;
|
|
|
options?.onProgress?.({ chunksEmbedded, totalChunks, bytesProcessed, totalBytes, errors });
|
|
|
}
|
|
|
|
|
|
return { chunksEmbedded, errors };
|
|
|
- }, sessionOptions);
|
|
|
+ }, { maxDuration: 30 * 60 * 1000, name: 'generateEmbeddings' });
|
|
|
|
|
|
return {
|
|
|
docsProcessed: totalDocs,
|