|
|
@@ -1337,6 +1337,12 @@ export async function generateEmbeddings(
|
|
|
const batches = buildEmbeddingBatches(docsToEmbed, maxDocsPerBatch, maxBatchBytes);
|
|
|
|
|
|
for (const batchMeta of batches) {
|
|
|
+ // Abort early if session has been invalidated
|
|
|
+ if (!session.isValid) {
|
|
|
+ console.warn(`⚠ Session expired — skipping remaining document batches`);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
const batchDocs = getEmbeddingDocsForBatch(db, batchMeta);
|
|
|
const batchChunks: ChunkItem[] = [];
|
|
|
const batchBytes = batchMeta.reduce((sum, doc) => sum + Math.max(0, doc.bytes), 0);
|
|
|
@@ -1345,7 +1351,7 @@ export async function generateEmbeddings(
|
|
|
if (!doc.body.trim()) continue;
|
|
|
|
|
|
const title = extractTitle(doc.body, doc.path);
|
|
|
- const chunks = await chunkDocumentByTokens(doc.body);
|
|
|
+ const chunks = await chunkDocumentByTokens(doc.body, undefined, undefined, undefined, session.signal);
|
|
|
|
|
|
for (let seq = 0; seq < chunks.length; seq++) {
|
|
|
batchChunks.push({
|
|
|
@@ -1383,6 +1389,23 @@ export async function generateEmbeddings(
|
|
|
let batchChunkBytesProcessed = 0;
|
|
|
|
|
|
for (let batchStart = 0; batchStart < batchChunks.length; batchStart += BATCH_SIZE) {
|
|
|
+ // Abort early if session has been invalidated (e.g. max duration exceeded)
|
|
|
+ if (!session.isValid) {
|
|
|
+ const remaining = batchChunks.length - batchStart;
|
|
|
+ errors += remaining;
|
|
|
+ console.warn(`⚠ Session expired — skipping ${remaining} remaining chunks`);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Abort early if error rate is too high (>80% of processed chunks failed)
|
|
|
+ const processed = chunksEmbedded + errors;
|
|
|
+ if (processed >= BATCH_SIZE && errors > processed * 0.8) {
|
|
|
+ const remaining = batchChunks.length - batchStart;
|
|
|
+ errors += remaining;
|
|
|
+ console.warn(`⚠ Error rate too high (${errors}/${processed}) — aborting embedding`);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
const batchEnd = Math.min(batchStart + BATCH_SIZE, batchChunks.length);
|
|
|
const chunkBatch = batchChunks.slice(batchStart, batchEnd);
|
|
|
const texts = chunkBatch.map(chunk => formatDocForEmbedding(chunk.text, chunk.title));
|
|
|
@@ -1402,20 +1425,26 @@ export async function generateEmbeddings(
|
|
|
}
|
|
|
} catch {
|
|
|
// Batch failed — try individual embeddings as fallback
|
|
|
- for (const chunk of chunkBatch) {
|
|
|
- try {
|
|
|
- const text = formatDocForEmbedding(chunk.text, chunk.title);
|
|
|
- const result = await session.embed(text);
|
|
|
- if (result) {
|
|
|
- insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now);
|
|
|
- chunksEmbedded++;
|
|
|
- } else {
|
|
|
+ // But skip if session is already invalid (avoids N doomed retries)
|
|
|
+ if (!session.isValid) {
|
|
|
+ errors += chunkBatch.length;
|
|
|
+ batchChunkBytesProcessed += chunkBatch.reduce((sum, c) => sum + c.bytes, 0);
|
|
|
+ } else {
|
|
|
+ for (const chunk of chunkBatch) {
|
|
|
+ try {
|
|
|
+ const text = formatDocForEmbedding(chunk.text, chunk.title);
|
|
|
+ const result = await session.embed(text);
|
|
|
+ if (result) {
|
|
|
+ insertEmbedding(db, chunk.hash, chunk.seq, chunk.pos, new Float32Array(result.embedding), model, now);
|
|
|
+ chunksEmbedded++;
|
|
|
+ } else {
|
|
|
+ errors++;
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
errors++;
|
|
|
}
|
|
|
- } catch {
|
|
|
- errors++;
|
|
|
+ batchChunkBytesProcessed += chunk.bytes;
|
|
|
}
|
|
|
- batchChunkBytesProcessed += chunk.bytes;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -2091,7 +2120,8 @@ export async function chunkDocumentByTokens(
|
|
|
content: string,
|
|
|
maxTokens: number = CHUNK_SIZE_TOKENS,
|
|
|
overlapTokens: number = CHUNK_OVERLAP_TOKENS,
|
|
|
- windowTokens: number = CHUNK_WINDOW_TOKENS
|
|
|
+ windowTokens: number = CHUNK_WINDOW_TOKENS,
|
|
|
+ signal?: AbortSignal
|
|
|
): Promise<{ text: string; pos: number; tokens: number }[]> {
|
|
|
const llm = getDefaultLlamaCpp();
|
|
|
|
|
|
@@ -2109,6 +2139,9 @@ export async function chunkDocumentByTokens(
|
|
|
const results: { text: string; pos: number; tokens: number }[] = [];
|
|
|
|
|
|
for (const chunk of charChunks) {
|
|
|
+ // Respect abort signal to avoid runaway tokenization
|
|
|
+ if (signal?.aborted) break;
|
|
|
+
|
|
|
const tokens = await llm.tokenize(chunk.text);
|
|
|
|
|
|
if (tokens.length <= maxTokens) {
|
|
|
@@ -2122,6 +2155,7 @@ export async function chunkDocumentByTokens(
|
|
|
const subChunks = chunkDocument(chunk.text, safeMaxChars, Math.floor(overlapChars * actualCharsPerToken / 2), Math.floor(windowChars * actualCharsPerToken / 2));
|
|
|
|
|
|
for (const subChunk of subChunks) {
|
|
|
+ if (signal?.aborted) break;
|
|
|
const subTokens = await llm.tokenize(subChunk.text);
|
|
|
results.push({
|
|
|
text: subChunk.text,
|