From 74fce5cf414f17d34f845a1782b8981390b08e87 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Thu, 2 Apr 2026 21:36:27 -0400 Subject: [PATCH] remove dead session-watcher and transcript mining code Session mining, stale session detection, is_file_open /proc scan, segment extraction, and fact mining are all replaced by the observe/journal agents. Remove the entire session-watcher thread, find_stale_sessions(), is_file_open(), MIN_SESSION_BYTES, and SESSION_STALE_SECS. -329 lines. Co-Authored-By: Proof of Concept --- src/subconscious/daemon.rs | 331 +------------------------------------ 1 file changed, 1 insertion(+), 330 deletions(-) diff --git a/src/subconscious/daemon.rs b/src/subconscious/daemon.rs index 0c17ab8..146dd96 100644 --- a/src/subconscious/daemon.rs +++ b/src/subconscious/daemon.rs @@ -19,7 +19,6 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; -const SESSION_STALE_SECS: u64 = 600; // 10 minutes const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); @@ -407,72 +406,6 @@ fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { } } -// --- Session detection --- - -/// Find JSONL session files that are stale (not recently written) and not -/// held open by any process. -/// Find JSONL session files that haven't been written to recently. -/// Only checks metadata (stat), no file reads or subprocess calls. -/// The fuser check (is file open?) is deferred to the reconcile loop, -/// only for sessions that pass the mined-key filter. -/// Minimum file size for a session to be worth mining. -/// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive -/// sessions are much larger. Skip anything too small to contain -/// meaningful conversation. -const MIN_SESSION_BYTES: u64 = 100_000; - -fn find_stale_sessions() -> Vec { - let projects = crate::config::get().projects_dir.clone(); - if !projects.exists() { - return Vec::new(); - } - - let mut stale = Vec::new(); - let now = SystemTime::now(); - - let Ok(dirs) = fs::read_dir(&projects) else { return stale }; - for dir_entry in dirs.filter_map(|e| e.ok()) { - if !dir_entry.path().is_dir() { continue; } - let Ok(files) = fs::read_dir(dir_entry.path()) else { continue }; - for f in files.filter_map(|e| e.ok()) { - let path = f.path(); - if !path.extension().map(|x| x == "jsonl").unwrap_or(false) { continue; } - let Ok(meta) = path.metadata() else { continue }; - // Skip tiny sessions (daemon-spawned LLM calls, aborted sessions) - if meta.len() < MIN_SESSION_BYTES { continue; } - let Ok(mtime) = meta.modified() else { continue }; - let age = now.duration_since(mtime).unwrap_or_default(); - if age.as_secs() >= SESSION_STALE_SECS { - stale.push(path); - } - } - } - stale -} - -/// Check if any other process has a file open by scanning /proc/*/fd/. -/// This is what `fuser` does internally, without the subprocess overhead. -fn is_file_open(path: &Path) -> bool { - let Ok(target) = path.canonicalize() else { return false }; - let Ok(procs) = fs::read_dir("/proc") else { return false }; - let my_pid = std::process::id().to_string(); - - for proc_entry in procs.filter_map(|e| e.ok()) { - let name = proc_entry.file_name(); - let name = name.to_string_lossy(); - if !name.chars().all(|c| c.is_ascii_digit()) { continue; } - if *name == my_pid { continue; } - - let fd_dir = proc_entry.path().join("fd"); - let Ok(fds) = fs::read_dir(&fd_dir) else { continue }; - for fd in fds.filter_map(|e| e.ok()) { - let Ok(link) = fs::read_link(fd.path()) else { continue }; - if link == target { return true; } - } - } - false -} - /// Get process uptime as human-readable string by reading /proc//stat. fn proc_uptime(pid: u32) -> Option { // /proc//stat field 22 (1-indexed) is start time in clock ticks @@ -603,267 +536,6 @@ pub fn run_daemon() -> Result<(), String> { // Write initial status write_status(&choir, *last_daily.lock().unwrap(), &graph_health); - // Session watcher: reconcile-based extraction - // Each tick: scan filesystem for stale sessions, check store for what's - // already mined, check task registry for what's in-flight, spawn the diff. - // No persistent tracking state — the store is the source of truth. - let choir_sw = Arc::clone(&choir); - let _llm_sw = Arc::clone(&llm); // kept for future use - let last_daily_sw = Arc::clone(&last_daily); - let graph_health_sw = Arc::clone(&graph_health); - choir.spawn("session-watcher").init(move |ctx| { - ctx.set_progress("idle"); - // Cache: path → (file_size, segment_count). Invalidated when size changes. - let mut seg_cache: HashMap = HashMap::new(); - // Retry backoff: filename → (next_retry_after, current_backoff). - // Exponential from 5min, cap 30min. Resets on daemon restart. - let mut retry_backoff: HashMap = HashMap::new(); - - const BACKOFF_INITIAL: Duration = Duration::from_secs(300); // 5 min - const BACKOFF_MAX: Duration = Duration::from_secs(1800); // 30 min - - loop { - if ctx.is_cancelled() { - return Err(TaskError::Fatal("cancelled".into())); - } - - ctx.set_progress("scanning"); - - // Check for failed tasks and update backoff. - // Task names are "extract:{filename}.{segment}" — extract the - // filename (UUID.jsonl) by stripping the trailing .N segment suffix. - let task_statuses = choir_sw.task_statuses(); - for t in &task_statuses { - if let Some(label) = t.name.strip_prefix("extract:") { - // label is "UUID.jsonl.N" — strip last ".N" to get filename - let filename = match label.rfind('.') { - Some(pos) if label[pos+1..].chars().all(|c| c.is_ascii_digit()) => { - &label[..pos] - } - _ => label, - }; - match t.status { - TaskStatus::Failed => { - let entry = retry_backoff.entry(filename.to_string()) - .or_insert((std::time::Instant::now(), BACKOFF_INITIAL)); - entry.1 = (entry.1 * 2).min(BACKOFF_MAX); - entry.0 = std::time::Instant::now() + entry.1; - } - TaskStatus::Completed => { - retry_backoff.remove(filename); - } - _ => {} - } - } - } - - // What's currently running/pending? (avoid spawning duplicates) - let active: HashSet = task_statuses.iter() - .filter(|t| !t.status.is_finished()) - .map(|t| t.name.clone()) - .collect(); - - let stale = find_stale_sessions(); - - // Load mined transcript keys once for this tick - let mined = std::collections::HashSet::::new(); // mining removed - - // MAX_NEW_PER_TICK removed — mining handled by observation agent - - // Load fact-mined keys too - let fact_keys: HashSet = { - use crate::store::StoreView; - let view = crate::store::AnyView::load().ok(); - view.map(|v| { - let mut keys = HashSet::new(); - v.for_each_node(|key, _, _| { - if key.starts_with("_facts-") { - keys.insert(key.to_string()); - } - }); - keys - }).unwrap_or_default() - }; - - let _extract_queued = 0usize; - let mut _extract_remaining = 0usize; - let mut _fact_remaining = 0usize; - let mut already_mined = 0; - let mut still_open = 0; - let mut backed_off = 0; - let total_stale = stale.len(); - - // Sessions with old whole-file keys that need per-segment migration - let mut migrate_keys: Vec<(String, String, usize)> = Vec::new(); - - let mut needs_extract: Vec<(String, String, Option)> = Vec::new(); - let mut needs_fact: Vec<(String, String)> = Vec::new(); - - let now = std::time::Instant::now(); - - for session in stale { - let filename = session.file_name() - .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| "unknown".into()); - let path_str = session.to_string_lossy().to_string(); - - // Check retry backoff before doing any work - if let Some((next_retry, _)) = retry_backoff.get(&filename) { - if now >= *next_retry { - // Backoff expired, proceed - } else { - backed_off += 1; - continue; - } - } - - if is_file_open(&session) { - still_open += 1; - continue; - } - - // Get file size for cache invalidation - let file_size = fs::metadata(&session).map(|m| m.len()).unwrap_or(0); - - // Get segment count, using cache with size-based invalidation - let seg_count = if let Some(&(cached_size, cached_count)) = seg_cache.get(&path_str) { - if cached_size == file_size { - cached_count - } else { - // File changed — re-parse - let messages = match super::enrich::extract_conversation(&path_str) { - Ok(m) => m, - Err(_) => continue, - }; - let count = super::enrich::split_on_compaction(messages).len(); - seg_cache.insert(path_str.clone(), (file_size, count)); - count - } - } else { - let messages = match super::enrich::extract_conversation(&path_str) { - Ok(m) => m, - Err(_) => continue, - }; - let count = super::enrich::split_on_compaction(messages).len(); - seg_cache.insert(path_str.clone(), (file_size, count)); - count - }; - - // No extractable messages — skip entirely - if seg_count == 0 { - already_mined += 1; - continue; - } - - let fname_key = format!("_experience-{}", filename.trim_end_matches(".jsonl")); - let has_whole_file_key = mined.contains(&fname_key); - - // Check per-segment keys, find unmined segments - let mut unmined_segs: Vec = Vec::new(); - let mut has_any_seg_key = false; - for i in 0..seg_count { - let seg_key = format!("{}.{}", fname_key, i); - if mined.contains(&seg_key) { - has_any_seg_key = true; - } else { - unmined_segs.push(i); - } - } - - // Migrate old whole-file key: if it exists but no per-segment keys, - // write per-segment keys for all current segments (they were mined - // under the old scheme) - if has_whole_file_key && !has_any_seg_key && seg_count > 0 { - migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count)); - // After migration, all current segments are covered - unmined_segs.clear(); - } - - if unmined_segs.is_empty() { - // All segments mined — check fact-mining - let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); - if !fact_keys.contains(&fact_key) { - let task_name = format!("fact-mine:{}", filename); - if !active.contains(&task_name) { - needs_fact.push((filename, path_str)); - } - } else { - already_mined += 1; - } - } else { - // Queue unmined segments - for i in unmined_segs { - let task_name = format!("extract:{}.{}", filename, i); - if active.contains(&task_name) { continue; } - needs_extract.push(( - format!("{}.{}", filename, i), - path_str.clone(), - Some(i), - )); - } - } - } - - // Migrate old whole-file keys to per-segment keys - if !migrate_keys.is_empty() { - match crate::store::Store::load() { - Ok(mut store) => { - let mut ok = 0; - let mut fail = 0; - for (fname_key, path_str, seg_count) in &migrate_keys { - for i in 0..*seg_count { - let seg_key = format!("{}.{}", fname_key, i); - let content = format!("Migrated from whole-file key for {}", path_str); - let mut node = crate::store::new_node(&seg_key, &content); - node.provenance = "experience-mine:write".to_string(); - match store.upsert_node(node) { - Ok(()) => ok += 1, - Err(e) => { - if fail == 0 { - eprintln!("migration upsert_node error: {}", e); - } - fail += 1; - } - } - } - } - if let Err(e) = store.save() { - eprintln!("migration save error: {}", e); - } - log_event("session-watcher", "migrated", - &format!("{} whole-file keys → per-segment ({} ok, {} fail)", - migrate_keys.len(), ok, fail)); - } - Err(e) => { - eprintln!("migration store load error: {}", e); - } - } - } - - // experience_mine and fact_mine killed — observation.agent handles transcript mining - _extract_remaining = needs_extract.len(); - _fact_remaining = needs_fact.len(); - - let extract_pending = _extract_queued + _extract_remaining; - let fact_pending = _fact_remaining; - if extract_pending > 0 || fact_pending > 0 || still_open > 0 || backed_off > 0 { - log_event("session-watcher", "tick", - &format!("{} stale, {} mined, {} extract, {} fact, {} open, {} backoff", - total_stale, already_mined, extract_pending, fact_pending, still_open, backed_off)); - let mut parts = Vec::new(); - if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); } - if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); } - if still_open > 0 { parts.push(format!("{} open", still_open)); } - if backed_off > 0 { parts.push(format!("{} backoff", backed_off)); } - ctx.set_progress(parts.join(", ")); - } else { - ctx.set_progress("idle"); - } - - write_status(&choir_sw, *last_daily_sw.lock().unwrap(), &graph_health_sw); - std::thread::sleep(SCHEDULER_INTERVAL); - } - }); // Scheduler: runs daily jobs based on filesystem state let choir_sched = Arc::clone(&choir); @@ -1200,8 +872,7 @@ fn format_duration_human(ms: u128) -> String { } fn task_group(name: &str) -> &str { - if name == "session-watcher" || name == "scheduler" { "core" } - else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" } + if name == "scheduler" { "core" } else if name.starts_with("c-") || name.starts_with("consolidate:") || name.starts_with("knowledge-loop:") || name.starts_with("digest:") || name.starts_with("decay:") { "daily" }