daemon: skip tiny sessions, decouple fact-mine, show type breakdown

Skip session files under 100KB (daemon-spawned LLM calls, aborted
sessions). This drops ~8000 spurious pending jobs.

Decouple fact-mine from experience-mine: fact-mine only queues when
the experience-mine backlog is empty, ensuring experiences are
processed first.

Session-watcher progress now shows breakdown by type:
"N extract, N fact, N open" instead of flat "N pending".
This commit is contained in:
ProofOfConcept 2026-03-06 21:51:48 -05:00
parent 5e78e5be3f
commit 1c122ffd10

View file

@ -185,6 +185,12 @@ fn job_daily_check(ctx: &ExecutionContext) -> Result<(), TaskError> {
/// Only checks metadata (stat), no file reads or subprocess calls. /// Only checks metadata (stat), no file reads or subprocess calls.
/// The fuser check (is file open?) is deferred to the reconcile loop, /// The fuser check (is file open?) is deferred to the reconcile loop,
/// only for sessions that pass the mined-key filter. /// only for sessions that pass the mined-key filter.
/// Minimum file size for a session to be worth mining.
/// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive
/// sessions are much larger. Skip anything too small to contain
/// meaningful conversation.
const MIN_SESSION_BYTES: u64 = 100_000;
fn find_stale_sessions() -> Vec<PathBuf> { fn find_stale_sessions() -> Vec<PathBuf> {
let projects = projects_dir(); let projects = projects_dir();
if !projects.exists() { if !projects.exists() {
@ -202,6 +208,8 @@ fn find_stale_sessions() -> Vec<PathBuf> {
let path = f.path(); let path = f.path();
if path.extension().map(|x| x == "jsonl").unwrap_or(false) { if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
if let Ok(meta) = path.metadata() { if let Ok(meta) = path.metadata() {
// Skip tiny sessions (daemon-spawned LLM calls, aborted sessions)
if meta.len() < MIN_SESSION_BYTES { continue; }
if let Ok(mtime) = meta.modified() { if let Ok(mtime) = meta.modified() {
let age = now.duration_since(mtime).unwrap_or_default(); let age = now.duration_since(mtime).unwrap_or_default();
if age.as_secs() >= SESSION_STALE_SECS { if age.as_secs() >= SESSION_STALE_SECS {
@ -337,64 +345,115 @@ pub fn run_daemon() -> Result<(), String> {
// The watcher ticks every 60s so backlog drains steadily. // The watcher ticks every 60s so backlog drains steadily.
const MAX_NEW_PER_TICK: usize = 10; const MAX_NEW_PER_TICK: usize = 10;
let mut queued = 0; // Load fact-mined keys too
let fact_keys: HashSet<String> = {
use crate::store::{AnyView, StoreView};
let view = crate::store::AnyView::load().ok();
view.map(|v| {
let mut keys = HashSet::new();
v.for_each_node(|key, _, _| {
if key.starts_with("_facts-") {
keys.insert(key.to_string());
}
});
keys
}).unwrap_or_default()
};
let mut extract_queued = 0;
let mut extract_remaining = 0;
let mut fact_remaining = 0;
let mut already_mined = 0; let mut already_mined = 0;
let mut still_open = 0; let mut still_open = 0;
let total_stale = stale.len(); let total_stale = stale.len();
for session in stale {
if queued >= MAX_NEW_PER_TICK { break; }
// Classify each session
let mut needs_extract: Vec<(PathBuf, String, String)> = Vec::new();
let mut needs_fact: Vec<(PathBuf, String, String)> = Vec::new();
for session in stale {
let filename = session.file_name() let filename = session.file_name()
.map(|n| n.to_string_lossy().to_string()) .map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into()); .unwrap_or_else(|| "unknown".into());
let task_name = format!("extract:{}", filename); let path_str = session.to_string_lossy().to_string();
// Skip if already in-flight // Skip if already in-flight
if active.contains(&task_name) { continue; } let extract_name = format!("extract:{}", filename);
let fact_name = format!("fact-mine:{}", filename);
// Skip if already mined (filename key in store) if active.contains(&extract_name) || active.contains(&fact_name) {
let path_str = session.to_string_lossy().to_string();
if crate::enrich::is_transcript_mined_with_keys(&mined, &path_str) {
already_mined += 1;
continue; continue;
} }
// Skip if file is still open (active Claude session, possibly idle) let experience_done = crate::enrich::is_transcript_mined_with_keys(&mined, &path_str);
let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
let fact_done = fact_keys.contains(&fact_key);
if !experience_done {
if is_file_open(&session) { if is_file_open(&session) {
still_open += 1; still_open += 1;
continue; } else {
needs_extract.push((session, filename, path_str));
}
} else if !fact_done {
needs_fact.push((session, filename, path_str));
} else {
already_mined += 1;
}
} }
log_event("extract", "queued", &path_str); // Spawn experience-mine jobs (priority)
for (_, filename, path_str) in &needs_extract {
if extract_queued >= MAX_NEW_PER_TICK {
extract_remaining += 1;
continue;
}
let task_name = format!("extract:{}", filename);
log_event("extract", "queued", path_str);
let path = path_str.clone(); let path = path_str.clone();
let extract = choir_sw.spawn(task_name) choir_sw.spawn(task_name)
.resource(&llm_sw) .resource(&llm_sw)
.retries(2) .retries(2)
.init(move |ctx| { .init(move |ctx| {
job_experience_mine(ctx, &path) job_experience_mine(ctx, &path)
}) });
.run(); extract_queued += 1;
}
// Chain fact-mine after experience-mine // Only queue fact-mine when experience backlog is clear
let fact_task = format!("fact-mine:{}", filename); let mut fact_queued = 0;
if !active.contains(&fact_task) { if needs_extract.len() == extract_queued {
let path2 = path_str; let fact_budget = MAX_NEW_PER_TICK.saturating_sub(extract_queued);
let mut fm = choir_sw.spawn(fact_task) for (_, filename, path_str) in &needs_fact {
if fact_queued >= fact_budget {
fact_remaining += 1;
continue;
}
let task_name = format!("fact-mine:{}", filename);
log_event("fact-mine", "queued", path_str);
let path = path_str.clone();
choir_sw.spawn(task_name)
.resource(&llm_sw) .resource(&llm_sw)
.retries(1) .retries(1)
.init(move |ctx| { .init(move |ctx| {
job_fact_mine(ctx, &path2) job_fact_mine(ctx, &path)
}); });
fm.depend_on(&extract); fact_queued += 1;
} }
queued += 1; } else {
fact_remaining = needs_fact.len();
} }
if queued > 0 || still_open > 0 { let extract_pending = extract_queued + extract_remaining;
let fact_pending = fact_queued + fact_remaining;
if extract_pending > 0 || fact_pending > 0 || still_open > 0 {
log_event("session-watcher", "tick", log_event("session-watcher", "tick",
&format!("{} stale, {} mined, {} open, {} queued", &format!("{} stale, {} mined, {} extract, {} fact, {} open",
total_stale, already_mined, still_open, queued)); total_stale, already_mined, extract_pending, fact_pending, still_open));
ctx.set_progress(&format!("{} queued, {} open", queued, still_open)); let mut parts = Vec::new();
if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); }
if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); }
if still_open > 0 { parts.push(format!("{} open", still_open)); }
ctx.set_progress(&parts.join(", "));
} else { } else {
ctx.set_progress("idle"); ctx.set_progress("idle");
} }