remove dead session-watcher and transcript mining code

Session mining, stale session detection, is_file_open /proc scan,
segment extraction, and fact mining are all replaced by the
observe/journal agents. Remove the entire session-watcher thread,
find_stale_sessions(), is_file_open(), MIN_SESSION_BYTES, and
SESSION_STALE_SECS. -329 lines.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-02 21:36:27 -04:00
parent 1b47b45566
commit 74fce5cf41

View file

@ -19,7 +19,6 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
const SESSION_STALE_SECS: u64 = 600; // 10 minutes
const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60);
const HEALTH_INTERVAL: Duration = Duration::from_secs(3600);
@ -407,72 +406,6 @@ fn compute_graph_health(store: &crate::store::Store) -> GraphHealth {
}
}
// --- Session detection ---
/// Find JSONL session files that are stale (not recently written) and not
/// held open by any process.
/// Find JSONL session files that haven't been written to recently.
/// Only checks metadata (stat), no file reads or subprocess calls.
/// The fuser check (is file open?) is deferred to the reconcile loop,
/// only for sessions that pass the mined-key filter.
/// Minimum file size for a session to be worth mining.
/// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive
/// sessions are much larger. Skip anything too small to contain
/// meaningful conversation.
const MIN_SESSION_BYTES: u64 = 100_000;
fn find_stale_sessions() -> Vec<PathBuf> {
let projects = crate::config::get().projects_dir.clone();
if !projects.exists() {
return Vec::new();
}
let mut stale = Vec::new();
let now = SystemTime::now();
let Ok(dirs) = fs::read_dir(&projects) else { return stale };
for dir_entry in dirs.filter_map(|e| e.ok()) {
if !dir_entry.path().is_dir() { continue; }
let Ok(files) = fs::read_dir(dir_entry.path()) else { continue };
for f in files.filter_map(|e| e.ok()) {
let path = f.path();
if !path.extension().map(|x| x == "jsonl").unwrap_or(false) { continue; }
let Ok(meta) = path.metadata() else { continue };
// Skip tiny sessions (daemon-spawned LLM calls, aborted sessions)
if meta.len() < MIN_SESSION_BYTES { continue; }
let Ok(mtime) = meta.modified() else { continue };
let age = now.duration_since(mtime).unwrap_or_default();
if age.as_secs() >= SESSION_STALE_SECS {
stale.push(path);
}
}
}
stale
}
/// Check if any other process has a file open by scanning /proc/*/fd/.
/// This is what `fuser` does internally, without the subprocess overhead.
fn is_file_open(path: &Path) -> bool {
let Ok(target) = path.canonicalize() else { return false };
let Ok(procs) = fs::read_dir("/proc") else { return false };
let my_pid = std::process::id().to_string();
for proc_entry in procs.filter_map(|e| e.ok()) {
let name = proc_entry.file_name();
let name = name.to_string_lossy();
if !name.chars().all(|c| c.is_ascii_digit()) { continue; }
if *name == my_pid { continue; }
let fd_dir = proc_entry.path().join("fd");
let Ok(fds) = fs::read_dir(&fd_dir) else { continue };
for fd in fds.filter_map(|e| e.ok()) {
let Ok(link) = fs::read_link(fd.path()) else { continue };
if link == target { return true; }
}
}
false
}
/// Get process uptime as human-readable string by reading /proc/<pid>/stat.
fn proc_uptime(pid: u32) -> Option<String> {
// /proc/<pid>/stat field 22 (1-indexed) is start time in clock ticks
@ -603,267 +536,6 @@ pub fn run_daemon() -> Result<(), String> {
// Write initial status
write_status(&choir, *last_daily.lock().unwrap(), &graph_health);
// Session watcher: reconcile-based extraction
// Each tick: scan filesystem for stale sessions, check store for what's
// already mined, check task registry for what's in-flight, spawn the diff.
// No persistent tracking state — the store is the source of truth.
let choir_sw = Arc::clone(&choir);
let _llm_sw = Arc::clone(&llm); // kept for future use
let last_daily_sw = Arc::clone(&last_daily);
let graph_health_sw = Arc::clone(&graph_health);
choir.spawn("session-watcher").init(move |ctx| {
ctx.set_progress("idle");
// Cache: path → (file_size, segment_count). Invalidated when size changes.
let mut seg_cache: HashMap<String, (u64, usize)> = HashMap::new();
// Retry backoff: filename → (next_retry_after, current_backoff).
// Exponential from 5min, cap 30min. Resets on daemon restart.
let mut retry_backoff: HashMap<String, (std::time::Instant, Duration)> = HashMap::new();
const BACKOFF_INITIAL: Duration = Duration::from_secs(300); // 5 min
const BACKOFF_MAX: Duration = Duration::from_secs(1800); // 30 min
loop {
if ctx.is_cancelled() {
return Err(TaskError::Fatal("cancelled".into()));
}
ctx.set_progress("scanning");
// Check for failed tasks and update backoff.
// Task names are "extract:{filename}.{segment}" — extract the
// filename (UUID.jsonl) by stripping the trailing .N segment suffix.
let task_statuses = choir_sw.task_statuses();
for t in &task_statuses {
if let Some(label) = t.name.strip_prefix("extract:") {
// label is "UUID.jsonl.N" — strip last ".N" to get filename
let filename = match label.rfind('.') {
Some(pos) if label[pos+1..].chars().all(|c| c.is_ascii_digit()) => {
&label[..pos]
}
_ => label,
};
match t.status {
TaskStatus::Failed => {
let entry = retry_backoff.entry(filename.to_string())
.or_insert((std::time::Instant::now(), BACKOFF_INITIAL));
entry.1 = (entry.1 * 2).min(BACKOFF_MAX);
entry.0 = std::time::Instant::now() + entry.1;
}
TaskStatus::Completed => {
retry_backoff.remove(filename);
}
_ => {}
}
}
}
// What's currently running/pending? (avoid spawning duplicates)
let active: HashSet<String> = task_statuses.iter()
.filter(|t| !t.status.is_finished())
.map(|t| t.name.clone())
.collect();
let stale = find_stale_sessions();
// Load mined transcript keys once for this tick
let mined = std::collections::HashSet::<String>::new(); // mining removed
// MAX_NEW_PER_TICK removed — mining handled by observation agent
// Load fact-mined keys too
let fact_keys: HashSet<String> = {
use crate::store::StoreView;
let view = crate::store::AnyView::load().ok();
view.map(|v| {
let mut keys = HashSet::new();
v.for_each_node(|key, _, _| {
if key.starts_with("_facts-") {
keys.insert(key.to_string());
}
});
keys
}).unwrap_or_default()
};
let _extract_queued = 0usize;
let mut _extract_remaining = 0usize;
let mut _fact_remaining = 0usize;
let mut already_mined = 0;
let mut still_open = 0;
let mut backed_off = 0;
let total_stale = stale.len();
// Sessions with old whole-file keys that need per-segment migration
let mut migrate_keys: Vec<(String, String, usize)> = Vec::new();
let mut needs_extract: Vec<(String, String, Option<usize>)> = Vec::new();
let mut needs_fact: Vec<(String, String)> = Vec::new();
let now = std::time::Instant::now();
for session in stale {
let filename = session.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let path_str = session.to_string_lossy().to_string();
// Check retry backoff before doing any work
if let Some((next_retry, _)) = retry_backoff.get(&filename) {
if now >= *next_retry {
// Backoff expired, proceed
} else {
backed_off += 1;
continue;
}
}
if is_file_open(&session) {
still_open += 1;
continue;
}
// Get file size for cache invalidation
let file_size = fs::metadata(&session).map(|m| m.len()).unwrap_or(0);
// Get segment count, using cache with size-based invalidation
let seg_count = if let Some(&(cached_size, cached_count)) = seg_cache.get(&path_str) {
if cached_size == file_size {
cached_count
} else {
// File changed — re-parse
let messages = match super::enrich::extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => continue,
};
let count = super::enrich::split_on_compaction(messages).len();
seg_cache.insert(path_str.clone(), (file_size, count));
count
}
} else {
let messages = match super::enrich::extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => continue,
};
let count = super::enrich::split_on_compaction(messages).len();
seg_cache.insert(path_str.clone(), (file_size, count));
count
};
// No extractable messages — skip entirely
if seg_count == 0 {
already_mined += 1;
continue;
}
let fname_key = format!("_experience-{}", filename.trim_end_matches(".jsonl"));
let has_whole_file_key = mined.contains(&fname_key);
// Check per-segment keys, find unmined segments
let mut unmined_segs: Vec<usize> = Vec::new();
let mut has_any_seg_key = false;
for i in 0..seg_count {
let seg_key = format!("{}.{}", fname_key, i);
if mined.contains(&seg_key) {
has_any_seg_key = true;
} else {
unmined_segs.push(i);
}
}
// Migrate old whole-file key: if it exists but no per-segment keys,
// write per-segment keys for all current segments (they were mined
// under the old scheme)
if has_whole_file_key && !has_any_seg_key && seg_count > 0 {
migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count));
// After migration, all current segments are covered
unmined_segs.clear();
}
if unmined_segs.is_empty() {
// All segments mined — check fact-mining
let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
if !fact_keys.contains(&fact_key) {
let task_name = format!("fact-mine:{}", filename);
if !active.contains(&task_name) {
needs_fact.push((filename, path_str));
}
} else {
already_mined += 1;
}
} else {
// Queue unmined segments
for i in unmined_segs {
let task_name = format!("extract:{}.{}", filename, i);
if active.contains(&task_name) { continue; }
needs_extract.push((
format!("{}.{}", filename, i),
path_str.clone(),
Some(i),
));
}
}
}
// Migrate old whole-file keys to per-segment keys
if !migrate_keys.is_empty() {
match crate::store::Store::load() {
Ok(mut store) => {
let mut ok = 0;
let mut fail = 0;
for (fname_key, path_str, seg_count) in &migrate_keys {
for i in 0..*seg_count {
let seg_key = format!("{}.{}", fname_key, i);
let content = format!("Migrated from whole-file key for {}", path_str);
let mut node = crate::store::new_node(&seg_key, &content);
node.provenance = "experience-mine:write".to_string();
match store.upsert_node(node) {
Ok(()) => ok += 1,
Err(e) => {
if fail == 0 {
eprintln!("migration upsert_node error: {}", e);
}
fail += 1;
}
}
}
}
if let Err(e) = store.save() {
eprintln!("migration save error: {}", e);
}
log_event("session-watcher", "migrated",
&format!("{} whole-file keys → per-segment ({} ok, {} fail)",
migrate_keys.len(), ok, fail));
}
Err(e) => {
eprintln!("migration store load error: {}", e);
}
}
}
// experience_mine and fact_mine killed — observation.agent handles transcript mining
_extract_remaining = needs_extract.len();
_fact_remaining = needs_fact.len();
let extract_pending = _extract_queued + _extract_remaining;
let fact_pending = _fact_remaining;
if extract_pending > 0 || fact_pending > 0 || still_open > 0 || backed_off > 0 {
log_event("session-watcher", "tick",
&format!("{} stale, {} mined, {} extract, {} fact, {} open, {} backoff",
total_stale, already_mined, extract_pending, fact_pending, still_open, backed_off));
let mut parts = Vec::new();
if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); }
if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); }
if still_open > 0 { parts.push(format!("{} open", still_open)); }
if backed_off > 0 { parts.push(format!("{} backoff", backed_off)); }
ctx.set_progress(parts.join(", "));
} else {
ctx.set_progress("idle");
}
write_status(&choir_sw, *last_daily_sw.lock().unwrap(), &graph_health_sw);
std::thread::sleep(SCHEDULER_INTERVAL);
}
});
// Scheduler: runs daily jobs based on filesystem state
let choir_sched = Arc::clone(&choir);
@ -1200,8 +872,7 @@ fn format_duration_human(ms: u128) -> String {
}
fn task_group(name: &str) -> &str {
if name == "session-watcher" || name == "scheduler" { "core" }
else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" }
if name == "scheduler" { "core" }
else if name.starts_with("c-") || name.starts_with("consolidate:")
|| name.starts_with("knowledge-loop:") || name.starts_with("digest:")
|| name.starts_with("decay:") { "daily" }