diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..9900160 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["-Cforce-frame-pointers=yes"] diff --git a/Cargo.toml b/Cargo.toml index 9b59315..70777b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,4 +34,3 @@ path = "src/bin/memory-search.rs" [profile.release] opt-level = 2 -strip = true diff --git a/src/daemon.rs b/src/daemon.rs index 691949f..15fb2fe 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -12,7 +12,7 @@ // // Phase 2 will inline job logic; Phase 3 integrates into poc-agent. -use jobkit::{Choir, ResourcePool, TaskError, TaskInfo}; +use jobkit::{Choir, ResourcePool, TaskError, TaskInfo, TaskStatus}; use std::collections::HashSet; use std::fs; use std::io::Write; @@ -43,58 +43,68 @@ fn projects_dir() -> PathBuf { home_dir().join(".claude/projects") } -fn extracted_set_path() -> PathBuf { - home_dir().join(".claude/memory/extracted-sessions.json") -} - // --- Logging --- +const LOG_MAX_BYTES: u64 = 1_000_000; // 1MB, then truncate to last half + fn log_event(job: &str, event: &str, detail: &str) { let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); let line = if detail.is_empty() { format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event) } else { + // Escape detail for JSON safety + let safe = detail.replace('\\', "\\\\").replace('"', "\\\"") + .replace('\n', "\\n"); format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n", - ts, job, event, detail.replace('"', "\\\"")) + ts, job, event, safe) }; - if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(log_path()) { - let _ = f.write_all(line.as_bytes()); + let path = log_path(); + + // Rotate if too large + if let Ok(meta) = fs::metadata(&path) { + if meta.len() > LOG_MAX_BYTES { + if let Ok(content) = fs::read_to_string(&path) { + let half = content.len() / 2; + // Find next newline after halfway point + if let Some(nl) = content[half..].find('\n') { + let _ = fs::write(&path, &content[half + nl + 1..]); + } + } + } } -} -// --- Extracted sessions persistence --- - -fn load_extracted() -> HashSet { - fs::read_to_string(extracted_set_path()) - .ok() - .and_then(|s| serde_json::from_str(&s).ok()) - .unwrap_or_default() -} - -fn save_extracted(set: &HashSet) { - if let Ok(json) = serde_json::to_string_pretty(set) { - let _ = fs::write(extracted_set_path(), json); + if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) { + let _ = f.write_all(line.as_bytes()); } } // --- Shell out to poc-memory subcommands --- fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> { - log_event(args.first().unwrap_or(&"unknown"), "started", ""); + let job = args.join(" "); + log_event(&job, "started", ""); + let start = std::time::Instant::now(); + let output = Command::new("poc-memory") .args(args) .output() .map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?; + let elapsed = start.elapsed(); + let duration = format!("{:.1}s", elapsed.as_secs_f64()); + if output.status.success() { - log_event(args.first().unwrap_or(&"unknown"), "completed", ""); + log_event(&job, "completed", &duration); Ok(()) } else { let stderr = String::from_utf8_lossy(&output.stderr); - let msg = format!("exit {}: {}", output.status, stderr.trim_end()); - log_event(args.first().unwrap_or(&"unknown"), "failed", &msg); - // Treat non-zero exit as retryable (API errors, transient failures) - // unless it looks like a usage error + let stderr_short = if stderr.len() > 500 { + &stderr[stderr.len()-500..] + } else { + &stderr + }; + let msg = format!("{} exit {}: {}", duration, output.status, stderr_short.trim_end()); + log_event(&job, "failed", &msg); if output.status.code() == Some(1) && stderr.contains("Unknown command") { Err(TaskError::Fatal(msg)) } else { @@ -103,74 +113,89 @@ fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> { } } -fn run_python_script(script: &str, args: &[&str]) -> Result<(), TaskError> { - let label = Path::new(script).file_stem() - .map(|s| s.to_string_lossy().to_string()) - .unwrap_or_else(|| "python".to_string()); - log_event(&label, "started", ""); - - let output = Command::new("python3") - .arg(script) - .args(args) - .output() - .map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?; - - if output.status.success() { - log_event(&label, "completed", ""); - Ok(()) - } else { - let stderr = String::from_utf8_lossy(&output.stderr); - let msg = format!("exit {}: {}", output.status, stderr.trim_end()); - log_event(&label, "failed", &msg); - Err(TaskError::Retry(msg)) - } -} - // --- Session detection --- -fn find_ended_sessions(extracted: &HashSet) -> Vec { +/// Find JSONL session files that are stale (not recently written) and not +/// held open by any process. +/// Find JSONL session files that haven't been written to recently. +/// Only checks metadata (stat), no file reads or subprocess calls. +/// The fuser check (is file open?) is deferred to the reconcile loop, +/// only for sessions that pass the mined-key filter. +fn find_stale_sessions() -> Vec { let projects = projects_dir(); if !projects.exists() { return Vec::new(); } - let mut ended = Vec::new(); + let mut stale = Vec::new(); let now = SystemTime::now(); - let Ok(dirs) = fs::read_dir(&projects) else { return ended }; + let Ok(dirs) = fs::read_dir(&projects) else { return stale }; for dir_entry in dirs.filter_map(|e| e.ok()) { if !dir_entry.path().is_dir() { continue; } let Ok(files) = fs::read_dir(dir_entry.path()) else { continue }; for f in files.filter_map(|e| e.ok()) { let path = f.path(); if path.extension().map(|x| x == "jsonl").unwrap_or(false) { - let key = path.to_string_lossy().to_string(); - if extracted.contains(&key) { continue; } - - // Check staleness if let Ok(meta) = path.metadata() { if let Ok(mtime) = meta.modified() { let age = now.duration_since(mtime).unwrap_or_default(); if age.as_secs() >= SESSION_STALE_SECS { - // Check no process has it open - if !is_file_open(&path) { - ended.push(path); - } + stale.push(path); } } } } } } - ended + stale } +/// Check if any other process has a file open by scanning /proc/*/fd/. +/// This is what `fuser` does internally, without the subprocess overhead. fn is_file_open(path: &Path) -> bool { - Command::new("fuser") - .arg(path) - .output() - .map(|o| o.status.success()) // fuser exits 0 if file is open - .unwrap_or(false) + let Ok(target) = path.canonicalize() else { return false }; + let Ok(procs) = fs::read_dir("/proc") else { return false }; + let my_pid = std::process::id().to_string(); + + for proc_entry in procs.filter_map(|e| e.ok()) { + let name = proc_entry.file_name(); + let name = name.to_string_lossy(); + if !name.chars().all(|c| c.is_ascii_digit()) { continue; } + if *name == my_pid { continue; } + + let fd_dir = proc_entry.path().join("fd"); + let Ok(fds) = fs::read_dir(&fd_dir) else { continue }; + for fd in fds.filter_map(|e| e.ok()) { + if let Ok(link) = fs::read_link(fd.path()) { + if link == target { return true; } + } + } + } + false +} + +/// Get process uptime as human-readable string by reading /proc//stat. +fn proc_uptime(pid: u32) -> Option { + // /proc//stat field 22 (1-indexed) is start time in clock ticks + let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?; + // Fields after comm (which may contain spaces/parens) — find closing paren + let after_comm = stat.get(stat.rfind(')')? + 2..)?; + let fields: Vec<&str> = after_comm.split_whitespace().collect(); + // Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19) + let start_ticks: u64 = fields.get(19)?.parse().ok()?; + let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64; + let boot_time_secs = { + let uptime = fs::read_to_string("/proc/uptime").ok()?; + let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?; + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); + now - sys_uptime as u64 + }; + let start_secs = boot_time_secs + start_ticks / ticks_per_sec; + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); + let uptime = now.saturating_sub(start_secs); + + Some(format_duration_human(uptime as u128 * 1000)) } // --- Status writing --- @@ -196,13 +221,15 @@ struct DaemonStatus { pub fn run_daemon() -> Result<(), String> { let choir = Choir::new(); - // Two workers: one for the scheduler/watcher loops, one for actual jobs. - // Jobs are serialized through the Sonnet resource pool anyway. - let _w1 = choir.add_worker("scheduler"); - let _w2 = choir.add_worker("jobs"); + // Workers: 2 for long-running loops (scheduler, session-watcher), + // plus 1 for the actual LLM job (pool capacity is 1). + // Non-LLM jobs (decay, health) also need a worker, so 4 total. + let names: Vec = (0..4).map(|i| format!("w{}", i)).collect(); + let _workers: Vec<_> = names.iter().map(|n| choir.add_worker(n)).collect(); - // Sonnet API: one call at a time - let sonnet = ResourcePool::new("sonnet", 1); + // LLM API: 1 concurrent call to control token burn rate + let llm = ResourcePool::new("llm", 1); + llm.bind(&choir); log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); @@ -210,35 +237,91 @@ pub fn run_daemon() -> Result<(), String> { // Write initial status write_status(&choir); - // Session watcher: detect ended sessions, run extraction + // Session watcher: reconcile-based extraction + // Each tick: scan filesystem for stale sessions, check store for what's + // already mined, check task registry for what's in-flight, spawn the diff. + // No persistent tracking state — the store is the source of truth. let choir_sw = Arc::clone(&choir); - let sonnet_sw = Arc::clone(&sonnet); + let llm_sw = Arc::clone(&llm); choir.spawn("session-watcher").init(move |ctx| { - let mut extracted = load_extracted(); loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } - let ended = find_ended_sessions(&extracted); - for session in ended { - let session_str = session.to_string_lossy().to_string(); - log_event("extract", "session-ended", &session_str); + // What's currently running/pending? (avoid spawning duplicates) + let active: HashSet = choir_sw.task_statuses().iter() + .filter(|t| !t.status.is_finished()) + .map(|t| t.name.clone()) + .collect(); - // Spawn extraction job - let s = Arc::clone(&sonnet_sw); - let path = session_str.clone(); - choir_sw.spawn(format!("extract:{}", session.file_name() + let stale = find_stale_sessions(); + + // Load mined transcript keys once for this tick + let mined = crate::enrich::mined_transcript_keys(); + + // Limit new tasks per tick — the resource pool gates execution, + // but we don't need thousands of task objects in the registry. + // The watcher ticks every 60s so backlog drains steadily. + const MAX_NEW_PER_TICK: usize = 10; + + let mut queued = 0; + let mut already_mined = 0; + let mut still_open = 0; + let total_stale = stale.len(); + for session in stale { + if queued >= MAX_NEW_PER_TICK { break; } + + let filename = session.file_name() .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| "unknown".into()))) + .unwrap_or_else(|| "unknown".into()); + let task_name = format!("extract:{}", filename); + + // Skip if already in-flight + if active.contains(&task_name) { continue; } + + // Skip if already mined (filename key in store) + let path_str = session.to_string_lossy().to_string(); + if crate::enrich::is_transcript_mined_with_keys(&mined, &path_str) { + already_mined += 1; + continue; + } + + // Skip if file is still open (active Claude session, possibly idle) + if is_file_open(&session) { + still_open += 1; + continue; + } + + log_event("extract", "queued", &path_str); + let path = path_str.clone(); + let extract = choir_sw.spawn(task_name) + .resource(&llm_sw) .retries(2) .init(move |_ctx| { - let _slot = s.acquire(); run_poc_memory(&["experience-mine", &path]) - }); + }) + .run(); - extracted.insert(session_str); - save_extracted(&extracted); + // Chain fact-mine after experience-mine + let fact_task = format!("fact-mine:{}", filename); + if !active.contains(&fact_task) { + let path2 = path_str; + let mut fm = choir_sw.spawn(fact_task) + .resource(&llm_sw) + .retries(1) + .init(move |_ctx| { + run_poc_memory(&["fact-mine-store", &path2]) + }); + fm.depend_on(&extract); + } + queued += 1; + } + + if queued > 0 || still_open > 0 { + log_event("session-watcher", "tick", + &format!("{} stale, {} mined, {} open, {} queued", + total_stale, already_mined, still_open, queued)); } write_status(&choir_sw); @@ -248,12 +331,7 @@ pub fn run_daemon() -> Result<(), String> { // Scheduler: runs daily jobs based on filesystem state let choir_sched = Arc::clone(&choir); - let sonnet_sched = Arc::clone(&sonnet); - let knowledge_loop_script = home_dir() - .join("poc/memory/scripts/knowledge_loop.py") - .to_string_lossy() - .to_string(); - + let llm_sched = Arc::clone(&llm); choir.spawn("scheduler").init(move |ctx| { let mut last_daily = None::; let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; @@ -283,32 +361,27 @@ pub fn run_daemon() -> Result<(), String> { }); // Consolidation pipeline: consolidate → knowledge-loop → digest - // These share the Sonnet pool for serialization - let s1 = Arc::clone(&sonnet_sched); let consolidate = choir_sched.spawn(format!("consolidate:{}", today)) + .resource(&llm_sched) .retries(2) .init(move |_ctx| { - let _slot = s1.acquire(); run_poc_memory(&["consolidate-full"]) }) .run(); - let s2 = Arc::clone(&sonnet_sched); - let kl_script = knowledge_loop_script.clone(); let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today)) + .resource(&llm_sched) .retries(1) .init(move |_ctx| { - let _slot = s2.acquire(); - run_python_script(&kl_script, &["--max-cycles", "100", "--batch-size", "5"]) + run_poc_memory(&["knowledge-loop", "--max-cycles", "100", "--batch-size", "5"]) }); knowledge.depend_on(&consolidate); let knowledge = knowledge.run(); - let s3 = Arc::clone(&sonnet_sched); let mut digest = choir_sched.spawn(format!("digest:{}", today)) + .resource(&llm_sched) .retries(1) .init(move |_ctx| { - let _slot = s3.acquire(); run_poc_memory(&["digest", "auto"]) }); digest.depend_on(&knowledge); @@ -366,6 +439,37 @@ fn ctrlc_wait() { // --- Status display --- +fn format_duration_human(ms: u128) -> String { + if ms < 1_000 { + format!("{}ms", ms) + } else if ms < 60_000 { + format!("{:.1}s", ms as f64 / 1000.0) + } else if ms < 3_600_000 { + format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000) + } else { + format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000) + } +} + +fn task_group(name: &str) -> &str { + if name == "session-watcher" || name == "scheduler" { "core" } + else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" } + else if name.starts_with("consolidate:") || name.starts_with("knowledge-loop:") + || name.starts_with("digest:") || name.starts_with("decay:") { "daily" } + else if name == "health" { "health" } + else { "other" } +} + +fn status_symbol(t: &TaskInfo) -> &'static str { + if t.cancelled { return "✗" } + match t.status { + TaskStatus::Running => "▶", + TaskStatus::Completed => "✓", + TaskStatus::Failed => "✗", + TaskStatus::Pending => "·", + } +} + pub fn show_status() -> Result<(), String> { let path = status_path(); if !path.exists() { @@ -378,34 +482,119 @@ pub fn show_status() -> Result<(), String> { let status: DaemonStatus = serde_json::from_str(&content) .map_err(|e| format!("parse status: {}", e))?; - eprintln!("poc-memory daemon (pid {})", status.pid); - - // Check if actually running let alive = Path::new(&format!("/proc/{}", status.pid)).exists(); - if !alive { - eprintln!(" WARNING: process not running"); + let state = if alive { "running" } else { "NOT RUNNING" }; + + // Show uptime from /proc//stat start time + let uptime_str = if alive { + proc_uptime(status.pid).unwrap_or_default() + } else { + String::new() + }; + + if uptime_str.is_empty() { + eprintln!("poc-memory daemon pid={} {}", status.pid, state); + } else { + eprintln!("poc-memory daemon pid={} {} uptime {}", status.pid, state, uptime_str); + } + + // Status file age + if let Ok(meta) = fs::metadata(&path) { + if let Ok(modified) = meta.modified() { + let age = std::time::SystemTime::now().duration_since(modified).unwrap_or_default(); + eprintln!(" status updated {}s ago", age.as_secs()); + } } - eprintln!(); if status.tasks.is_empty() { - eprintln!(" No tasks"); - } else { - for t in &status.tasks { - let retry_info = if t.max_retries > 0 { - format!(" (retry {}/{})", t.retry_count, t.max_retries) + eprintln!("\n No tasks"); + return Ok(()); + } + + // Count by status + let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); + let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); + let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); + let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); + eprintln!(" tasks: {} running, {} pending, {} done, {} failed\n", + running, pending, completed, failed); + + // Group and display + let groups: &[(&str, &str)] = &[ + ("core", "Core"), + ("daily", "Daily pipeline"), + ("extract", "Session extraction"), + ("health", "Health"), + ("other", "Other"), + ]; + + for (group_id, group_label) in groups { + let tasks: Vec<&TaskInfo> = status.tasks.iter() + .filter(|t| task_group(&t.name) == *group_id) + .collect(); + if tasks.is_empty() { continue; } + + // For extract group, show summary instead of individual tasks + if *group_id == "extract" { + let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); + let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); + let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); + let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); + eprintln!(" {} ({} total)", group_label, tasks.len()); + + if n_running > 0 { + for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) { + eprintln!(" {} {}", status_symbol(t), t.name); + } + } + let mut parts = Vec::new(); + if n_done > 0 { parts.push(format!("{} done", n_done)); } + if n_running > 0 { parts.push(format!("{} running", n_running)); } + if n_pending > 0 { parts.push(format!("{} queued", n_pending)); } + if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); } + eprintln!(" {}", parts.join(", ")); + + // Show recent failures + for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) { + if let Some(ref r) = t.result { + if let Some(ref err) = r.error { + let short = if err.len() > 80 { &err[..80] } else { err }; + eprintln!(" ✗ {}: {}", t.name, short); + } + } + } + eprintln!(); + continue; + } + + eprintln!(" {}", group_label); + for t in &tasks { + let sym = status_symbol(t); + let duration = t.result.as_ref() + .map(|r| format_duration_human(r.duration.as_millis())) + .unwrap_or_default(); + + let retry = if t.max_retries > 0 && t.retry_count > 0 { + format!(" retry {}/{}", t.retry_count, t.max_retries) } else { String::new() }; - let cancel_info = if t.cancelled { " [CANCELLED]" } else { "" }; - eprint!(" {:30} {:10}{}{}", t.name, t.status.to_string(), retry_info, cancel_info); - if let Some(ref result) = t.result { - if let Some(ref err) = result.error { - eprint!(" err: {}", err); - } - eprint!(" ({}ms)", result.duration.as_millis()); + + let err_msg = t.result.as_ref() + .and_then(|r| r.error.as_ref()) + .map(|e| { + let short = if e.len() > 60 { &e[..60] } else { e }; + format!(" err: {}", short) + }) + .unwrap_or_default(); + + if duration.is_empty() { + eprintln!(" {} {:30}{}{}", sym, t.name, retry, err_msg); + } else { + eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, err_msg); } - eprintln!(); } + eprintln!(); } Ok(()) @@ -432,8 +621,37 @@ pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> { .take(lines) .collect(); + if filtered.is_empty() { + eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default()); + return Ok(()); + } + + // Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]" for line in filtered.into_iter().rev() { - eprintln!("{}", line); + if let Ok(obj) = serde_json::from_str::(line) { + let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); + let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); + let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); + let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); + + // Shorten timestamp to just time portion + let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; + + if detail.is_empty() { + eprintln!(" {} {:20} {}", time, job, event); + } else { + // Truncate long details (file paths) + let short = if detail.len() > 60 { + let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail); + if last.len() > 60 { &last[..60] } else { last } + } else { + detail + }; + eprintln!(" {} {:20} {:12} {}", time, job, event, short); + } + } else { + eprintln!("{}", line); + } } Ok(()) } diff --git a/src/fact_mine.rs b/src/fact_mine.rs new file mode 100644 index 0000000..96aa657 --- /dev/null +++ b/src/fact_mine.rs @@ -0,0 +1,312 @@ +// fact_mine.rs — extract atomic factual claims from conversation transcripts +// +// Chunks conversation text into overlapping windows, sends each to Haiku +// for extraction, deduplicates by claim text. Output: JSON array of facts. +// +// Uses Haiku (not Sonnet) for cost efficiency on high-volume extraction. + +use crate::llm; +use crate::store::{self, Provenance}; + +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::fs; +use std::path::Path; + +const CHARS_PER_TOKEN: usize = 4; +const WINDOW_TOKENS: usize = 2000; +const OVERLAP_TOKENS: usize = 200; +const WINDOW_CHARS: usize = WINDOW_TOKENS * CHARS_PER_TOKEN; +const OVERLAP_CHARS: usize = OVERLAP_TOKENS * CHARS_PER_TOKEN; + +const EXTRACTION_PROMPT: &str = r#"Extract atomic factual claims from this conversation excerpt. + +Each claim should be: +- A single verifiable statement +- Specific enough to be useful in isolation +- Tagged with domain (e.g., bcachefs/btree, bcachefs/alloc, bcachefs/journal, + bcachefs/ec, bcachefs/reconcile, rust/idioms, workflow/preferences, + linux/kernel, memory/design, identity/personal) +- Tagged with confidence: "stated" (explicitly said), "implied" (logically follows), + or "speculative" (hypothesis, not confirmed) +- Include which speaker said it (Kent, PoC/ProofOfConcept, or Unknown) + +Do NOT extract: +- Opinions or subjective assessments +- Conversational filler or greetings +- Things that are obviously common knowledge +- Restatements of the same fact (pick the clearest version) +- System messages, tool outputs, or error logs (extract what was LEARNED from them) +- Anything about the conversation itself ("Kent and PoC discussed...") + +Output as a JSON array. Each element: +{ + "claim": "the exact factual statement", + "domain": "category/subcategory", + "confidence": "stated|implied|speculative", + "speaker": "Kent|PoC|Unknown" +} + +If the excerpt contains no extractable facts, output an empty array: [] + +--- CONVERSATION EXCERPT --- +"#; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Fact { + pub claim: String, + pub domain: String, + pub confidence: String, + pub speaker: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub source_file: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub source_chunk: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub source_offset: Option, +} + +struct Message { + role: String, + text: String, + timestamp: String, +} + +/// Extract user/assistant text messages from a JSONL transcript. +fn extract_conversation(path: &Path) -> Vec { + let Ok(content) = fs::read_to_string(path) else { return Vec::new() }; + let mut messages = Vec::new(); + + for line in content.lines() { + let Ok(obj) = serde_json::from_str::(line) else { continue }; + + let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if msg_type != "user" && msg_type != "assistant" { + continue; + } + + let timestamp = obj.get("timestamp") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let msg = obj.get("message").unwrap_or(&obj); + let content = msg.get("content"); + + let text = match content { + Some(serde_json::Value::String(s)) => s.clone(), + Some(serde_json::Value::Array(arr)) => { + let texts: Vec<&str> = arr.iter() + .filter_map(|block| { + let obj = block.as_object()?; + if obj.get("type")?.as_str()? != "text" { + return None; + } + let t = obj.get("text")?.as_str()?; + if t.contains("") { + return None; + } + Some(t) + }) + .collect(); + texts.join("\n") + } + _ => continue, + }; + + let text = text.trim().to_string(); + if text.len() < 20 { + continue; + } + + let role = if msg_type == "user" { "Kent" } else { "PoC" }.to_string(); + messages.push(Message { role, text, timestamp }); + } + + messages +} + +/// Format messages into a single text for chunking. +fn format_for_extraction(messages: &[Message]) -> String { + messages.iter() + .map(|msg| { + let text = if msg.text.len() > 3000 { + // Find a char boundary near 2800 + let trunc = msg.text.floor_char_boundary(2800); + format!("{}\n[...truncated...]", &msg.text[..trunc]) + } else { + msg.text.clone() + }; + let ts = if msg.timestamp.len() >= 19 { &msg.timestamp[..19] } else { "" }; + if ts.is_empty() { + format!("[{}] {}", msg.role, text) + } else { + format!("[{} {}] {}", msg.role, ts, text) + } + }) + .collect::>() + .join("\n\n") +} + +/// Split text into overlapping windows, breaking at paragraph boundaries. +fn chunk_text(text: &str) -> Vec<(usize, &str)> { + let mut chunks = Vec::new(); + let mut start = 0; + + while start < text.len() { + let mut end = text.floor_char_boundary((start + WINDOW_CHARS).min(text.len())); + + // Try to break at a paragraph boundary + if end < text.len() { + if let Some(para) = text[start..end].rfind("\n\n") { + if para > WINDOW_CHARS / 2 { + end = start + para; + } + } + } + + chunks.push((start, &text[start..end])); + + let next = text.floor_char_boundary(end.saturating_sub(OVERLAP_CHARS)); + if next <= start { + start = end; + } else { + start = next; + } + } + + chunks +} + +/// Parse JSON facts from model response. +fn parse_facts(response: &str) -> Vec { + let cleaned = response.trim(); + // Strip markdown code block + let cleaned = if cleaned.starts_with("```") { + cleaned.lines() + .filter(|l| !l.starts_with("```")) + .collect::>() + .join("\n") + } else { + cleaned.to_string() + }; + + // Find JSON array + let start = cleaned.find('['); + let end = cleaned.rfind(']'); + let (Some(start), Some(end)) = (start, end) else { return Vec::new() }; + + serde_json::from_str(&cleaned[start..=end]).unwrap_or_default() +} + +/// Mine a single transcript for atomic facts. +pub fn mine_transcript(path: &Path, dry_run: bool) -> Result, String> { + let filename = path.file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| "unknown".into()); + eprintln!("Mining: {}", filename); + + let messages = extract_conversation(path); + if messages.is_empty() { + eprintln!(" No messages found"); + return Ok(Vec::new()); + } + eprintln!(" {} messages extracted", messages.len()); + + let text = format_for_extraction(&messages); + let chunks = chunk_text(&text); + eprintln!(" {} chunks ({} chars)", chunks.len(), text.len()); + + if dry_run { + for (i, (offset, chunk)) in chunks.iter().enumerate() { + eprintln!("\n--- Chunk {} (offset {}, {} chars) ---", i + 1, offset, chunk.len()); + let preview = if chunk.len() > 500 { &chunk[..500] } else { chunk }; + eprintln!("{}", preview); + if chunk.len() > 500 { + eprintln!(" ... ({} more chars)", chunk.len() - 500); + } + } + return Ok(Vec::new()); + } + + let mut all_facts = Vec::new(); + for (i, (_offset, chunk)) in chunks.iter().enumerate() { + eprint!(" Chunk {}/{} ({} chars)...", i + 1, chunks.len(), chunk.len()); + + let prompt = format!("{}{}", EXTRACTION_PROMPT, chunk); + let response = match llm::call_haiku(&prompt) { + Ok(r) => r, + Err(e) => { + eprintln!(" error: {}", e); + continue; + } + }; + + let mut facts = parse_facts(&response); + for fact in &mut facts { + fact.source_file = Some(filename.clone()); + fact.source_chunk = Some(i + 1); + fact.source_offset = Some(*_offset); + } + + eprintln!(" {} facts", facts.len()); + all_facts.extend(facts); + } + + // Deduplicate by claim text + let mut seen = HashSet::new(); + let before = all_facts.len(); + all_facts.retain(|f| seen.insert(f.claim.to_lowercase())); + let dupes = before - all_facts.len(); + if dupes > 0 { + eprintln!(" {} duplicates removed", dupes); + } + + eprintln!(" Total: {} unique facts", all_facts.len()); + Ok(all_facts) +} + +/// Mine a transcript and store facts in the capnp store. +/// Returns the number of facts stored. +pub fn mine_and_store(path: &Path) -> Result { + let facts = mine_transcript(path, false)?; + if facts.is_empty() { + return Ok(0); + } + + let filename = path.file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| "unknown".into()); + + // Store as a single node keyed by transcript filename + let key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); + let json = serde_json::to_string_pretty(&facts) + .map_err(|e| format!("serialize facts: {}", e))?; + + let mut store = store::Store::load()?; + store.upsert_provenance(&key, &json, Provenance::AgentFactMine)?; + store.save()?; + + eprintln!(" Stored {} facts as {}", facts.len(), key); + Ok(facts.len()) +} + +/// Mine transcripts, returning all facts. Skips files with fewer than min_messages. +pub fn mine_batch(paths: &[&Path], min_messages: usize, dry_run: bool) -> Result, String> { + let mut all_facts = Vec::new(); + + for path in paths { + let messages = extract_conversation(path); + if messages.len() < min_messages { + eprintln!("Skipping {} ({} messages < {})", + path.file_name().map(|n| n.to_string_lossy()).unwrap_or_default(), + messages.len(), min_messages); + continue; + } + + let facts = mine_transcript(path, dry_run)?; + all_facts.extend(facts); + } + + Ok(all_facts) +}