From 1c122ffd10db7674d79c70016a122f4b56b91fbb Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 6 Mar 2026 21:51:48 -0500 Subject: [PATCH] daemon: skip tiny sessions, decouple fact-mine, show type breakdown Skip session files under 100KB (daemon-spawned LLM calls, aborted sessions). This drops ~8000 spurious pending jobs. Decouple fact-mine from experience-mine: fact-mine only queues when the experience-mine backlog is empty, ensuring experiences are processed first. Session-watcher progress now shows breakdown by type: "N extract, N fact, N open" instead of flat "N pending". --- src/daemon.rs | 121 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 90 insertions(+), 31 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 702a2ab..765bd7f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -185,6 +185,12 @@ fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> { /// Only checks metadata (stat), no file reads or subprocess calls. /// The fuser check (is file open?) is deferred to the reconcile loop, /// only for sessions that pass the mined-key filter. +/// Minimum file size for a session to be worth mining. +/// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive +/// sessions are much larger. Skip anything too small to contain +/// meaningful conversation. +const MIN_SESSION_BYTES: u64 = 100_000; + fn find_stale_sessions() -> Vec { let projects = projects_dir(); if !projects.exists() { @@ -202,6 +208,8 @@ fn find_stale_sessions() -> Vec { let path = f.path(); if path.extension().map(|x| x == "jsonl").unwrap_or(false) { if let Ok(meta) = path.metadata() { + // Skip tiny sessions (daemon-spawned LLM calls, aborted sessions) + if meta.len() < MIN_SESSION_BYTES { continue; } if let Ok(mtime) = meta.modified() { let age = now.duration_since(mtime).unwrap_or_default(); if age.as_secs() >= SESSION_STALE_SECS { @@ -337,64 +345,115 @@ pub fn run_daemon() -> Result<(), String> { // The watcher ticks every 60s so backlog drains steadily. const MAX_NEW_PER_TICK: usize = 10; - let mut queued = 0; + // Load fact-mined keys too + let fact_keys: HashSet = { + use crate::store::{AnyView, StoreView}; + let view = crate::store::AnyView::load().ok(); + view.map(|v| { + let mut keys = HashSet::new(); + v.for_each_node(|key, _, _| { + if key.starts_with("_facts-") { + keys.insert(key.to_string()); + } + }); + keys + }).unwrap_or_default() + }; + + let mut extract_queued = 0; + let mut extract_remaining = 0; + let mut fact_remaining = 0; let mut already_mined = 0; let mut still_open = 0; let total_stale = stale.len(); - for session in stale { - if queued >= MAX_NEW_PER_TICK { break; } + // Classify each session + let mut needs_extract: Vec<(PathBuf, String, String)> = Vec::new(); + let mut needs_fact: Vec<(PathBuf, String, String)> = Vec::new(); + + for session in stale { let filename = session.file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()); - let task_name = format!("extract:{}", filename); + let path_str = session.to_string_lossy().to_string(); // Skip if already in-flight - if active.contains(&task_name) { continue; } + let extract_name = format!("extract:{}", filename); + let fact_name = format!("fact-mine:{}", filename); + if active.contains(&extract_name) || active.contains(&fact_name) { + continue; + } - // Skip if already mined (filename key in store) - let path_str = session.to_string_lossy().to_string(); - if crate::enrich::is_transcript_mined_with_keys(&mined, &path_str) { + let experience_done = crate::enrich::is_transcript_mined_with_keys(&mined, &path_str); + let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); + let fact_done = fact_keys.contains(&fact_key); + + if !experience_done { + if is_file_open(&session) { + still_open += 1; + } else { + needs_extract.push((session, filename, path_str)); + } + } else if !fact_done { + needs_fact.push((session, filename, path_str)); + } else { already_mined += 1; + } + } + + // Spawn experience-mine jobs (priority) + for (_, filename, path_str) in &needs_extract { + if extract_queued >= MAX_NEW_PER_TICK { + extract_remaining += 1; continue; } - - // Skip if file is still open (active Claude session, possibly idle) - if is_file_open(&session) { - still_open += 1; - continue; - } - - log_event("extract", "queued", &path_str); + let task_name = format!("extract:{}", filename); + log_event("extract", "queued", path_str); let path = path_str.clone(); - let extract = choir_sw.spawn(task_name) + choir_sw.spawn(task_name) .resource(&llm_sw) .retries(2) .init(move |ctx| { job_experience_mine(ctx, &path) - }) - .run(); + }); + extract_queued += 1; + } - // Chain fact-mine after experience-mine - let fact_task = format!("fact-mine:{}", filename); - if !active.contains(&fact_task) { - let path2 = path_str; - let mut fm = choir_sw.spawn(fact_task) + // Only queue fact-mine when experience backlog is clear + let mut fact_queued = 0; + if needs_extract.len() == extract_queued { + let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued); + for (_, filename, path_str) in &needs_fact { + if fact_queued >= fact_budget { + fact_remaining += 1; + continue; + } + let task_name = format!("fact-mine:{}", filename); + log_event("fact-mine", "queued", path_str); + let path = path_str.clone(); + choir_sw.spawn(task_name) .resource(&llm_sw) .retries(1) .init(move |ctx| { - job_fact_mine(ctx, &path2) + job_fact_mine(ctx, &path) }); - fm.depend_on(&extract); + fact_queued += 1; } - queued += 1; + } else { + fact_remaining = needs_fact.len(); } - if queued > 0 || still_open > 0 { + let extract_pending = extract_queued + extract_remaining; + let fact_pending = fact_queued + fact_remaining; + if extract_pending > 0 || fact_pending > 0 || still_open > 0 { log_event("session-watcher", "tick", - &format!("{} stale, {} mined, {} open, {} queued", - total_stale, already_mined, still_open, queued)); - ctx.set_progress(&format!("{} queued, {} open", queued, still_open)); + &format!("{} stale, {} mined, {} extract, {} fact, {} open", + total_stale, already_mined, extract_pending, fact_pending, still_open)); + let mut parts = Vec::new(); + if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); } + if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); } + if still_open > 0 { parts.push(format!("{} open", still_open)); } + ctx.set_progress(&parts.join(", ")); } else { ctx.set_progress("idle"); }