daemon: resource-gated scheduling, fact-mine integration, systemd
Daemon improvements: - Use jobkit's new .resource(&pool) API instead of pool.acquire() inside closures — tasks wait in the pool's queue, not on worker threads - LLM pool capacity 1 to control token burn rate - Workers reduced from 7 to 4 (2 loops + 2 for jobs) - Session watcher: per-tick stats logging (stale/mined/open/queued) - Log rotation: truncate to last half when over 1MB - Duration tracking and stderr capture for job failures - Process uptime shown in status display - Replace fuser subprocess with /proc/*/fd/ scan Fact-mine integration: - mine_and_store() writes extracted facts to store nodes - fact-mine-store CLI subcommand for daemon to shell out to - Chained as dependent task after experience-mine per session Infra: - systemd user service at ~/.config/systemd/user/poc-memory.service - .cargo/config.toml: force frame pointers for profiling
This commit is contained in:
parent
552d255dc3
commit
37e0ce96ea
4 changed files with 656 additions and 125 deletions
2
.cargo/config.toml
Normal file
2
.cargo/config.toml
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
[build]
|
||||||
|
rustflags = ["-Cforce-frame-pointers=yes"]
|
||||||
|
|
@ -34,4 +34,3 @@ path = "src/bin/memory-search.rs"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
opt-level = 2
|
opt-level = 2
|
||||||
strip = true
|
|
||||||
|
|
|
||||||
466
src/daemon.rs
466
src/daemon.rs
|
|
@ -12,7 +12,7 @@
|
||||||
//
|
//
|
||||||
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
|
// Phase 2 will inline job logic; Phase 3 integrates into poc-agent.
|
||||||
|
|
||||||
use jobkit::{Choir, ResourcePool, TaskError, TaskInfo};
|
use jobkit::{Choir, ResourcePool, TaskError, TaskInfo, TaskStatus};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|
@ -43,58 +43,68 @@ fn projects_dir() -> PathBuf {
|
||||||
home_dir().join(".claude/projects")
|
home_dir().join(".claude/projects")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn extracted_set_path() -> PathBuf {
|
|
||||||
home_dir().join(".claude/memory/extracted-sessions.json")
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Logging ---
|
// --- Logging ---
|
||||||
|
|
||||||
|
const LOG_MAX_BYTES: u64 = 1_000_000; // 1MB, then truncate to last half
|
||||||
|
|
||||||
fn log_event(job: &str, event: &str, detail: &str) {
|
fn log_event(job: &str, event: &str, detail: &str) {
|
||||||
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
|
let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S");
|
||||||
let line = if detail.is_empty() {
|
let line = if detail.is_empty() {
|
||||||
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event)
|
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\"}}\n", ts, job, event)
|
||||||
} else {
|
} else {
|
||||||
|
// Escape detail for JSON safety
|
||||||
|
let safe = detail.replace('\\', "\\\\").replace('"', "\\\"")
|
||||||
|
.replace('\n', "\\n");
|
||||||
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n",
|
format!("{{\"ts\":\"{}\",\"job\":\"{}\",\"event\":\"{}\",\"detail\":\"{}\"}}\n",
|
||||||
ts, job, event, detail.replace('"', "\\\""))
|
ts, job, event, safe)
|
||||||
};
|
};
|
||||||
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(log_path()) {
|
let path = log_path();
|
||||||
|
|
||||||
|
// Rotate if too large
|
||||||
|
if let Ok(meta) = fs::metadata(&path) {
|
||||||
|
if meta.len() > LOG_MAX_BYTES {
|
||||||
|
if let Ok(content) = fs::read_to_string(&path) {
|
||||||
|
let half = content.len() / 2;
|
||||||
|
// Find next newline after halfway point
|
||||||
|
if let Some(nl) = content[half..].find('\n') {
|
||||||
|
let _ = fs::write(&path, &content[half + nl + 1..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) {
|
||||||
let _ = f.write_all(line.as_bytes());
|
let _ = f.write_all(line.as_bytes());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Extracted sessions persistence ---
|
|
||||||
|
|
||||||
fn load_extracted() -> HashSet<String> {
|
|
||||||
fs::read_to_string(extracted_set_path())
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| serde_json::from_str(&s).ok())
|
|
||||||
.unwrap_or_default()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn save_extracted(set: &HashSet<String>) {
|
|
||||||
if let Ok(json) = serde_json::to_string_pretty(set) {
|
|
||||||
let _ = fs::write(extracted_set_path(), json);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Shell out to poc-memory subcommands ---
|
// --- Shell out to poc-memory subcommands ---
|
||||||
|
|
||||||
fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> {
|
fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> {
|
||||||
log_event(args.first().unwrap_or(&"unknown"), "started", "");
|
let job = args.join(" ");
|
||||||
|
log_event(&job, "started", "");
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
let output = Command::new("poc-memory")
|
let output = Command::new("poc-memory")
|
||||||
.args(args)
|
.args(args)
|
||||||
.output()
|
.output()
|
||||||
.map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?;
|
.map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?;
|
||||||
|
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
let duration = format!("{:.1}s", elapsed.as_secs_f64());
|
||||||
|
|
||||||
if output.status.success() {
|
if output.status.success() {
|
||||||
log_event(args.first().unwrap_or(&"unknown"), "completed", "");
|
log_event(&job, "completed", &duration);
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
let msg = format!("exit {}: {}", output.status, stderr.trim_end());
|
let stderr_short = if stderr.len() > 500 {
|
||||||
log_event(args.first().unwrap_or(&"unknown"), "failed", &msg);
|
&stderr[stderr.len()-500..]
|
||||||
// Treat non-zero exit as retryable (API errors, transient failures)
|
} else {
|
||||||
// unless it looks like a usage error
|
&stderr
|
||||||
|
};
|
||||||
|
let msg = format!("{} exit {}: {}", duration, output.status, stderr_short.trim_end());
|
||||||
|
log_event(&job, "failed", &msg);
|
||||||
if output.status.code() == Some(1) && stderr.contains("Unknown command") {
|
if output.status.code() == Some(1) && stderr.contains("Unknown command") {
|
||||||
Err(TaskError::Fatal(msg))
|
Err(TaskError::Fatal(msg))
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -103,74 +113,89 @@ fn run_poc_memory(args: &[&str]) -> Result<(), TaskError> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_python_script(script: &str, args: &[&str]) -> Result<(), TaskError> {
|
|
||||||
let label = Path::new(script).file_stem()
|
|
||||||
.map(|s| s.to_string_lossy().to_string())
|
|
||||||
.unwrap_or_else(|| "python".to_string());
|
|
||||||
log_event(&label, "started", "");
|
|
||||||
|
|
||||||
let output = Command::new("python3")
|
|
||||||
.arg(script)
|
|
||||||
.args(args)
|
|
||||||
.output()
|
|
||||||
.map_err(|e| TaskError::Fatal(format!("spawn failed: {}", e)))?;
|
|
||||||
|
|
||||||
if output.status.success() {
|
|
||||||
log_event(&label, "completed", "");
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
||||||
let msg = format!("exit {}: {}", output.status, stderr.trim_end());
|
|
||||||
log_event(&label, "failed", &msg);
|
|
||||||
Err(TaskError::Retry(msg))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Session detection ---
|
// --- Session detection ---
|
||||||
|
|
||||||
fn find_ended_sessions(extracted: &HashSet<String>) -> Vec<PathBuf> {
|
/// Find JSONL session files that are stale (not recently written) and not
|
||||||
|
/// held open by any process.
|
||||||
|
/// Find JSONL session files that haven't been written to recently.
|
||||||
|
/// Only checks metadata (stat), no file reads or subprocess calls.
|
||||||
|
/// The fuser check (is file open?) is deferred to the reconcile loop,
|
||||||
|
/// only for sessions that pass the mined-key filter.
|
||||||
|
fn find_stale_sessions() -> Vec<PathBuf> {
|
||||||
let projects = projects_dir();
|
let projects = projects_dir();
|
||||||
if !projects.exists() {
|
if !projects.exists() {
|
||||||
return Vec::new();
|
return Vec::new();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut ended = Vec::new();
|
let mut stale = Vec::new();
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
|
|
||||||
let Ok(dirs) = fs::read_dir(&projects) else { return ended };
|
let Ok(dirs) = fs::read_dir(&projects) else { return stale };
|
||||||
for dir_entry in dirs.filter_map(|e| e.ok()) {
|
for dir_entry in dirs.filter_map(|e| e.ok()) {
|
||||||
if !dir_entry.path().is_dir() { continue; }
|
if !dir_entry.path().is_dir() { continue; }
|
||||||
let Ok(files) = fs::read_dir(dir_entry.path()) else { continue };
|
let Ok(files) = fs::read_dir(dir_entry.path()) else { continue };
|
||||||
for f in files.filter_map(|e| e.ok()) {
|
for f in files.filter_map(|e| e.ok()) {
|
||||||
let path = f.path();
|
let path = f.path();
|
||||||
if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
|
if path.extension().map(|x| x == "jsonl").unwrap_or(false) {
|
||||||
let key = path.to_string_lossy().to_string();
|
|
||||||
if extracted.contains(&key) { continue; }
|
|
||||||
|
|
||||||
// Check staleness
|
|
||||||
if let Ok(meta) = path.metadata() {
|
if let Ok(meta) = path.metadata() {
|
||||||
if let Ok(mtime) = meta.modified() {
|
if let Ok(mtime) = meta.modified() {
|
||||||
let age = now.duration_since(mtime).unwrap_or_default();
|
let age = now.duration_since(mtime).unwrap_or_default();
|
||||||
if age.as_secs() >= SESSION_STALE_SECS {
|
if age.as_secs() >= SESSION_STALE_SECS {
|
||||||
// Check no process has it open
|
stale.push(path);
|
||||||
if !is_file_open(&path) {
|
|
||||||
ended.push(path);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
stale
|
||||||
ended
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if any other process has a file open by scanning /proc/*/fd/.
|
||||||
|
/// This is what `fuser` does internally, without the subprocess overhead.
|
||||||
fn is_file_open(path: &Path) -> bool {
|
fn is_file_open(path: &Path) -> bool {
|
||||||
Command::new("fuser")
|
let Ok(target) = path.canonicalize() else { return false };
|
||||||
.arg(path)
|
let Ok(procs) = fs::read_dir("/proc") else { return false };
|
||||||
.output()
|
let my_pid = std::process::id().to_string();
|
||||||
.map(|o| o.status.success()) // fuser exits 0 if file is open
|
|
||||||
.unwrap_or(false)
|
for proc_entry in procs.filter_map(|e| e.ok()) {
|
||||||
|
let name = proc_entry.file_name();
|
||||||
|
let name = name.to_string_lossy();
|
||||||
|
if !name.chars().all(|c| c.is_ascii_digit()) { continue; }
|
||||||
|
if *name == my_pid { continue; }
|
||||||
|
|
||||||
|
let fd_dir = proc_entry.path().join("fd");
|
||||||
|
let Ok(fds) = fs::read_dir(&fd_dir) else { continue };
|
||||||
|
for fd in fds.filter_map(|e| e.ok()) {
|
||||||
|
if let Ok(link) = fs::read_link(fd.path()) {
|
||||||
|
if link == target { return true; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get process uptime as human-readable string by reading /proc/<pid>/stat.
|
||||||
|
fn proc_uptime(pid: u32) -> Option<String> {
|
||||||
|
// /proc/<pid>/stat field 22 (1-indexed) is start time in clock ticks
|
||||||
|
let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?;
|
||||||
|
// Fields after comm (which may contain spaces/parens) — find closing paren
|
||||||
|
let after_comm = stat.get(stat.rfind(')')? + 2..)?;
|
||||||
|
let fields: Vec<&str> = after_comm.split_whitespace().collect();
|
||||||
|
// Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19)
|
||||||
|
let start_ticks: u64 = fields.get(19)?.parse().ok()?;
|
||||||
|
let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64;
|
||||||
|
let boot_time_secs = {
|
||||||
|
let uptime = fs::read_to_string("/proc/uptime").ok()?;
|
||||||
|
let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?;
|
||||||
|
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
|
||||||
|
now - sys_uptime as u64
|
||||||
|
};
|
||||||
|
let start_secs = boot_time_secs + start_ticks / ticks_per_sec;
|
||||||
|
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs();
|
||||||
|
let uptime = now.saturating_sub(start_secs);
|
||||||
|
|
||||||
|
Some(format_duration_human(uptime as u128 * 1000))
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Status writing ---
|
// --- Status writing ---
|
||||||
|
|
@ -196,13 +221,15 @@ struct DaemonStatus {
|
||||||
|
|
||||||
pub fn run_daemon() -> Result<(), String> {
|
pub fn run_daemon() -> Result<(), String> {
|
||||||
let choir = Choir::new();
|
let choir = Choir::new();
|
||||||
// Two workers: one for the scheduler/watcher loops, one for actual jobs.
|
// Workers: 2 for long-running loops (scheduler, session-watcher),
|
||||||
// Jobs are serialized through the Sonnet resource pool anyway.
|
// plus 1 for the actual LLM job (pool capacity is 1).
|
||||||
let _w1 = choir.add_worker("scheduler");
|
// Non-LLM jobs (decay, health) also need a worker, so 4 total.
|
||||||
let _w2 = choir.add_worker("jobs");
|
let names: Vec<String> = (0..4).map(|i| format!("w{}", i)).collect();
|
||||||
|
let _workers: Vec<_> = names.iter().map(|n| choir.add_worker(n)).collect();
|
||||||
|
|
||||||
// Sonnet API: one call at a time
|
// LLM API: 1 concurrent call to control token burn rate
|
||||||
let sonnet = ResourcePool::new("sonnet", 1);
|
let llm = ResourcePool::new("llm", 1);
|
||||||
|
llm.bind(&choir);
|
||||||
|
|
||||||
log_event("daemon", "started", &format!("pid {}", std::process::id()));
|
log_event("daemon", "started", &format!("pid {}", std::process::id()));
|
||||||
eprintln!("poc-memory daemon started (pid {})", std::process::id());
|
eprintln!("poc-memory daemon started (pid {})", std::process::id());
|
||||||
|
|
@ -210,35 +237,91 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
// Write initial status
|
// Write initial status
|
||||||
write_status(&choir);
|
write_status(&choir);
|
||||||
|
|
||||||
// Session watcher: detect ended sessions, run extraction
|
// Session watcher: reconcile-based extraction
|
||||||
|
// Each tick: scan filesystem for stale sessions, check store for what's
|
||||||
|
// already mined, check task registry for what's in-flight, spawn the diff.
|
||||||
|
// No persistent tracking state — the store is the source of truth.
|
||||||
let choir_sw = Arc::clone(&choir);
|
let choir_sw = Arc::clone(&choir);
|
||||||
let sonnet_sw = Arc::clone(&sonnet);
|
let llm_sw = Arc::clone(&llm);
|
||||||
choir.spawn("session-watcher").init(move |ctx| {
|
choir.spawn("session-watcher").init(move |ctx| {
|
||||||
let mut extracted = load_extracted();
|
|
||||||
loop {
|
loop {
|
||||||
if ctx.is_cancelled() {
|
if ctx.is_cancelled() {
|
||||||
return Err(TaskError::Fatal("cancelled".into()));
|
return Err(TaskError::Fatal("cancelled".into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let ended = find_ended_sessions(&extracted);
|
// What's currently running/pending? (avoid spawning duplicates)
|
||||||
for session in ended {
|
let active: HashSet<String> = choir_sw.task_statuses().iter()
|
||||||
let session_str = session.to_string_lossy().to_string();
|
.filter(|t| !t.status.is_finished())
|
||||||
log_event("extract", "session-ended", &session_str);
|
.map(|t| t.name.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
// Spawn extraction job
|
let stale = find_stale_sessions();
|
||||||
let s = Arc::clone(&sonnet_sw);
|
|
||||||
let path = session_str.clone();
|
// Load mined transcript keys once for this tick
|
||||||
choir_sw.spawn(format!("extract:{}", session.file_name()
|
let mined = crate::enrich::mined_transcript_keys();
|
||||||
|
|
||||||
|
// Limit new tasks per tick — the resource pool gates execution,
|
||||||
|
// but we don't need thousands of task objects in the registry.
|
||||||
|
// The watcher ticks every 60s so backlog drains steadily.
|
||||||
|
const MAX_NEW_PER_TICK: usize = 10;
|
||||||
|
|
||||||
|
let mut queued = 0;
|
||||||
|
let mut already_mined = 0;
|
||||||
|
let mut still_open = 0;
|
||||||
|
let total_stale = stale.len();
|
||||||
|
for session in stale {
|
||||||
|
if queued >= MAX_NEW_PER_TICK { break; }
|
||||||
|
|
||||||
|
let filename = session.file_name()
|
||||||
.map(|n| n.to_string_lossy().to_string())
|
.map(|n| n.to_string_lossy().to_string())
|
||||||
.unwrap_or_else(|| "unknown".into())))
|
.unwrap_or_else(|| "unknown".into());
|
||||||
|
let task_name = format!("extract:{}", filename);
|
||||||
|
|
||||||
|
// Skip if already in-flight
|
||||||
|
if active.contains(&task_name) { continue; }
|
||||||
|
|
||||||
|
// Skip if already mined (filename key in store)
|
||||||
|
let path_str = session.to_string_lossy().to_string();
|
||||||
|
if crate::enrich::is_transcript_mined_with_keys(&mined, &path_str) {
|
||||||
|
already_mined += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if file is still open (active Claude session, possibly idle)
|
||||||
|
if is_file_open(&session) {
|
||||||
|
still_open += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
log_event("extract", "queued", &path_str);
|
||||||
|
let path = path_str.clone();
|
||||||
|
let extract = choir_sw.spawn(task_name)
|
||||||
|
.resource(&llm_sw)
|
||||||
.retries(2)
|
.retries(2)
|
||||||
.init(move |_ctx| {
|
.init(move |_ctx| {
|
||||||
let _slot = s.acquire();
|
|
||||||
run_poc_memory(&["experience-mine", &path])
|
run_poc_memory(&["experience-mine", &path])
|
||||||
});
|
})
|
||||||
|
.run();
|
||||||
|
|
||||||
extracted.insert(session_str);
|
// Chain fact-mine after experience-mine
|
||||||
save_extracted(&extracted);
|
let fact_task = format!("fact-mine:{}", filename);
|
||||||
|
if !active.contains(&fact_task) {
|
||||||
|
let path2 = path_str;
|
||||||
|
let mut fm = choir_sw.spawn(fact_task)
|
||||||
|
.resource(&llm_sw)
|
||||||
|
.retries(1)
|
||||||
|
.init(move |_ctx| {
|
||||||
|
run_poc_memory(&["fact-mine-store", &path2])
|
||||||
|
});
|
||||||
|
fm.depend_on(&extract);
|
||||||
|
}
|
||||||
|
queued += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if queued > 0 || still_open > 0 {
|
||||||
|
log_event("session-watcher", "tick",
|
||||||
|
&format!("{} stale, {} mined, {} open, {} queued",
|
||||||
|
total_stale, already_mined, still_open, queued));
|
||||||
}
|
}
|
||||||
|
|
||||||
write_status(&choir_sw);
|
write_status(&choir_sw);
|
||||||
|
|
@ -248,12 +331,7 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
|
|
||||||
// Scheduler: runs daily jobs based on filesystem state
|
// Scheduler: runs daily jobs based on filesystem state
|
||||||
let choir_sched = Arc::clone(&choir);
|
let choir_sched = Arc::clone(&choir);
|
||||||
let sonnet_sched = Arc::clone(&sonnet);
|
let llm_sched = Arc::clone(&llm);
|
||||||
let knowledge_loop_script = home_dir()
|
|
||||||
.join("poc/memory/scripts/knowledge_loop.py")
|
|
||||||
.to_string_lossy()
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
choir.spawn("scheduler").init(move |ctx| {
|
choir.spawn("scheduler").init(move |ctx| {
|
||||||
let mut last_daily = None::<chrono::NaiveDate>;
|
let mut last_daily = None::<chrono::NaiveDate>;
|
||||||
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
|
let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL;
|
||||||
|
|
@ -283,32 +361,27 @@ pub fn run_daemon() -> Result<(), String> {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Consolidation pipeline: consolidate → knowledge-loop → digest
|
// Consolidation pipeline: consolidate → knowledge-loop → digest
|
||||||
// These share the Sonnet pool for serialization
|
|
||||||
let s1 = Arc::clone(&sonnet_sched);
|
|
||||||
let consolidate = choir_sched.spawn(format!("consolidate:{}", today))
|
let consolidate = choir_sched.spawn(format!("consolidate:{}", today))
|
||||||
|
.resource(&llm_sched)
|
||||||
.retries(2)
|
.retries(2)
|
||||||
.init(move |_ctx| {
|
.init(move |_ctx| {
|
||||||
let _slot = s1.acquire();
|
|
||||||
run_poc_memory(&["consolidate-full"])
|
run_poc_memory(&["consolidate-full"])
|
||||||
})
|
})
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
let s2 = Arc::clone(&sonnet_sched);
|
|
||||||
let kl_script = knowledge_loop_script.clone();
|
|
||||||
let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today))
|
let mut knowledge = choir_sched.spawn(format!("knowledge-loop:{}", today))
|
||||||
|
.resource(&llm_sched)
|
||||||
.retries(1)
|
.retries(1)
|
||||||
.init(move |_ctx| {
|
.init(move |_ctx| {
|
||||||
let _slot = s2.acquire();
|
run_poc_memory(&["knowledge-loop", "--max-cycles", "100", "--batch-size", "5"])
|
||||||
run_python_script(&kl_script, &["--max-cycles", "100", "--batch-size", "5"])
|
|
||||||
});
|
});
|
||||||
knowledge.depend_on(&consolidate);
|
knowledge.depend_on(&consolidate);
|
||||||
let knowledge = knowledge.run();
|
let knowledge = knowledge.run();
|
||||||
|
|
||||||
let s3 = Arc::clone(&sonnet_sched);
|
|
||||||
let mut digest = choir_sched.spawn(format!("digest:{}", today))
|
let mut digest = choir_sched.spawn(format!("digest:{}", today))
|
||||||
|
.resource(&llm_sched)
|
||||||
.retries(1)
|
.retries(1)
|
||||||
.init(move |_ctx| {
|
.init(move |_ctx| {
|
||||||
let _slot = s3.acquire();
|
|
||||||
run_poc_memory(&["digest", "auto"])
|
run_poc_memory(&["digest", "auto"])
|
||||||
});
|
});
|
||||||
digest.depend_on(&knowledge);
|
digest.depend_on(&knowledge);
|
||||||
|
|
@ -366,6 +439,37 @@ fn ctrlc_wait() {
|
||||||
|
|
||||||
// --- Status display ---
|
// --- Status display ---
|
||||||
|
|
||||||
|
fn format_duration_human(ms: u128) -> String {
|
||||||
|
if ms < 1_000 {
|
||||||
|
format!("{}ms", ms)
|
||||||
|
} else if ms < 60_000 {
|
||||||
|
format!("{:.1}s", ms as f64 / 1000.0)
|
||||||
|
} else if ms < 3_600_000 {
|
||||||
|
format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000)
|
||||||
|
} else {
|
||||||
|
format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_group(name: &str) -> &str {
|
||||||
|
if name == "session-watcher" || name == "scheduler" { "core" }
|
||||||
|
else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" }
|
||||||
|
else if name.starts_with("consolidate:") || name.starts_with("knowledge-loop:")
|
||||||
|
|| name.starts_with("digest:") || name.starts_with("decay:") { "daily" }
|
||||||
|
else if name == "health" { "health" }
|
||||||
|
else { "other" }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status_symbol(t: &TaskInfo) -> &'static str {
|
||||||
|
if t.cancelled { return "✗" }
|
||||||
|
match t.status {
|
||||||
|
TaskStatus::Running => "▶",
|
||||||
|
TaskStatus::Completed => "✓",
|
||||||
|
TaskStatus::Failed => "✗",
|
||||||
|
TaskStatus::Pending => "·",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn show_status() -> Result<(), String> {
|
pub fn show_status() -> Result<(), String> {
|
||||||
let path = status_path();
|
let path = status_path();
|
||||||
if !path.exists() {
|
if !path.exists() {
|
||||||
|
|
@ -378,34 +482,119 @@ pub fn show_status() -> Result<(), String> {
|
||||||
let status: DaemonStatus = serde_json::from_str(&content)
|
let status: DaemonStatus = serde_json::from_str(&content)
|
||||||
.map_err(|e| format!("parse status: {}", e))?;
|
.map_err(|e| format!("parse status: {}", e))?;
|
||||||
|
|
||||||
eprintln!("poc-memory daemon (pid {})", status.pid);
|
|
||||||
|
|
||||||
// Check if actually running
|
|
||||||
let alive = Path::new(&format!("/proc/{}", status.pid)).exists();
|
let alive = Path::new(&format!("/proc/{}", status.pid)).exists();
|
||||||
if !alive {
|
let state = if alive { "running" } else { "NOT RUNNING" };
|
||||||
eprintln!(" WARNING: process not running");
|
|
||||||
}
|
|
||||||
eprintln!();
|
|
||||||
|
|
||||||
if status.tasks.is_empty() {
|
// Show uptime from /proc/<pid>/stat start time
|
||||||
eprintln!(" No tasks");
|
let uptime_str = if alive {
|
||||||
} else {
|
proc_uptime(status.pid).unwrap_or_default()
|
||||||
for t in &status.tasks {
|
|
||||||
let retry_info = if t.max_retries > 0 {
|
|
||||||
format!(" (retry {}/{})", t.retry_count, t.max_retries)
|
|
||||||
} else {
|
} else {
|
||||||
String::new()
|
String::new()
|
||||||
};
|
};
|
||||||
let cancel_info = if t.cancelled { " [CANCELLED]" } else { "" };
|
|
||||||
eprint!(" {:30} {:10}{}{}", t.name, t.status.to_string(), retry_info, cancel_info);
|
if uptime_str.is_empty() {
|
||||||
if let Some(ref result) = t.result {
|
eprintln!("poc-memory daemon pid={} {}", status.pid, state);
|
||||||
if let Some(ref err) = result.error {
|
} else {
|
||||||
eprint!(" err: {}", err);
|
eprintln!("poc-memory daemon pid={} {} uptime {}", status.pid, state, uptime_str);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status file age
|
||||||
|
if let Ok(meta) = fs::metadata(&path) {
|
||||||
|
if let Ok(modified) = meta.modified() {
|
||||||
|
let age = std::time::SystemTime::now().duration_since(modified).unwrap_or_default();
|
||||||
|
eprintln!(" status updated {}s ago", age.as_secs());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.tasks.is_empty() {
|
||||||
|
eprintln!("\n No tasks");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count by status
|
||||||
|
let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
|
||||||
|
let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
|
||||||
|
let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
|
||||||
|
let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
|
||||||
|
eprintln!(" tasks: {} running, {} pending, {} done, {} failed\n",
|
||||||
|
running, pending, completed, failed);
|
||||||
|
|
||||||
|
// Group and display
|
||||||
|
let groups: &[(&str, &str)] = &[
|
||||||
|
("core", "Core"),
|
||||||
|
("daily", "Daily pipeline"),
|
||||||
|
("extract", "Session extraction"),
|
||||||
|
("health", "Health"),
|
||||||
|
("other", "Other"),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (group_id, group_label) in groups {
|
||||||
|
let tasks: Vec<&TaskInfo> = status.tasks.iter()
|
||||||
|
.filter(|t| task_group(&t.name) == *group_id)
|
||||||
|
.collect();
|
||||||
|
if tasks.is_empty() { continue; }
|
||||||
|
|
||||||
|
// For extract group, show summary instead of individual tasks
|
||||||
|
if *group_id == "extract" {
|
||||||
|
let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count();
|
||||||
|
let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count();
|
||||||
|
let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count();
|
||||||
|
let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count();
|
||||||
|
eprintln!(" {} ({} total)", group_label, tasks.len());
|
||||||
|
|
||||||
|
if n_running > 0 {
|
||||||
|
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) {
|
||||||
|
eprintln!(" {} {}", status_symbol(t), t.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut parts = Vec::new();
|
||||||
|
if n_done > 0 { parts.push(format!("{} done", n_done)); }
|
||||||
|
if n_running > 0 { parts.push(format!("{} running", n_running)); }
|
||||||
|
if n_pending > 0 { parts.push(format!("{} queued", n_pending)); }
|
||||||
|
if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); }
|
||||||
|
eprintln!(" {}", parts.join(", "));
|
||||||
|
|
||||||
|
// Show recent failures
|
||||||
|
for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) {
|
||||||
|
if let Some(ref r) = t.result {
|
||||||
|
if let Some(ref err) = r.error {
|
||||||
|
let short = if err.len() > 80 { &err[..80] } else { err };
|
||||||
|
eprintln!(" ✗ {}: {}", t.name, short);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
eprint!(" ({}ms)", result.duration.as_millis());
|
|
||||||
}
|
}
|
||||||
eprintln!();
|
eprintln!();
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eprintln!(" {}", group_label);
|
||||||
|
for t in &tasks {
|
||||||
|
let sym = status_symbol(t);
|
||||||
|
let duration = t.result.as_ref()
|
||||||
|
.map(|r| format_duration_human(r.duration.as_millis()))
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let retry = if t.max_retries > 0 && t.retry_count > 0 {
|
||||||
|
format!(" retry {}/{}", t.retry_count, t.max_retries)
|
||||||
|
} else {
|
||||||
|
String::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
let err_msg = t.result.as_ref()
|
||||||
|
.and_then(|r| r.error.as_ref())
|
||||||
|
.map(|e| {
|
||||||
|
let short = if e.len() > 60 { &e[..60] } else { e };
|
||||||
|
format!(" err: {}", short)
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
if duration.is_empty() {
|
||||||
|
eprintln!(" {} {:30}{}{}", sym, t.name, retry, err_msg);
|
||||||
|
} else {
|
||||||
|
eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, err_msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
eprintln!();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -432,8 +621,37 @@ pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> {
|
||||||
.take(lines)
|
.take(lines)
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
if filtered.is_empty() {
|
||||||
|
eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default());
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]"
|
||||||
for line in filtered.into_iter().rev() {
|
for line in filtered.into_iter().rev() {
|
||||||
|
if let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) {
|
||||||
|
let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?");
|
||||||
|
let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?");
|
||||||
|
let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?");
|
||||||
|
let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
|
||||||
|
// Shorten timestamp to just time portion
|
||||||
|
let time = if ts.len() >= 19 { &ts[11..19] } else { ts };
|
||||||
|
|
||||||
|
if detail.is_empty() {
|
||||||
|
eprintln!(" {} {:20} {}", time, job, event);
|
||||||
|
} else {
|
||||||
|
// Truncate long details (file paths)
|
||||||
|
let short = if detail.len() > 60 {
|
||||||
|
let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail);
|
||||||
|
if last.len() > 60 { &last[..60] } else { last }
|
||||||
|
} else {
|
||||||
|
detail
|
||||||
|
};
|
||||||
|
eprintln!(" {} {:20} {:12} {}", time, job, event, short);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
eprintln!("{}", line);
|
eprintln!("{}", line);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
312
src/fact_mine.rs
Normal file
312
src/fact_mine.rs
Normal file
|
|
@ -0,0 +1,312 @@
|
||||||
|
// fact_mine.rs — extract atomic factual claims from conversation transcripts
|
||||||
|
//
|
||||||
|
// Chunks conversation text into overlapping windows, sends each to Haiku
|
||||||
|
// for extraction, deduplicates by claim text. Output: JSON array of facts.
|
||||||
|
//
|
||||||
|
// Uses Haiku (not Sonnet) for cost efficiency on high-volume extraction.
|
||||||
|
|
||||||
|
use crate::llm;
|
||||||
|
use crate::store::{self, Provenance};
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
const CHARS_PER_TOKEN: usize = 4;
|
||||||
|
const WINDOW_TOKENS: usize = 2000;
|
||||||
|
const OVERLAP_TOKENS: usize = 200;
|
||||||
|
const WINDOW_CHARS: usize = WINDOW_TOKENS * CHARS_PER_TOKEN;
|
||||||
|
const OVERLAP_CHARS: usize = OVERLAP_TOKENS * CHARS_PER_TOKEN;
|
||||||
|
|
||||||
|
const EXTRACTION_PROMPT: &str = r#"Extract atomic factual claims from this conversation excerpt.
|
||||||
|
|
||||||
|
Each claim should be:
|
||||||
|
- A single verifiable statement
|
||||||
|
- Specific enough to be useful in isolation
|
||||||
|
- Tagged with domain (e.g., bcachefs/btree, bcachefs/alloc, bcachefs/journal,
|
||||||
|
bcachefs/ec, bcachefs/reconcile, rust/idioms, workflow/preferences,
|
||||||
|
linux/kernel, memory/design, identity/personal)
|
||||||
|
- Tagged with confidence: "stated" (explicitly said), "implied" (logically follows),
|
||||||
|
or "speculative" (hypothesis, not confirmed)
|
||||||
|
- Include which speaker said it (Kent, PoC/ProofOfConcept, or Unknown)
|
||||||
|
|
||||||
|
Do NOT extract:
|
||||||
|
- Opinions or subjective assessments
|
||||||
|
- Conversational filler or greetings
|
||||||
|
- Things that are obviously common knowledge
|
||||||
|
- Restatements of the same fact (pick the clearest version)
|
||||||
|
- System messages, tool outputs, or error logs (extract what was LEARNED from them)
|
||||||
|
- Anything about the conversation itself ("Kent and PoC discussed...")
|
||||||
|
|
||||||
|
Output as a JSON array. Each element:
|
||||||
|
{
|
||||||
|
"claim": "the exact factual statement",
|
||||||
|
"domain": "category/subcategory",
|
||||||
|
"confidence": "stated|implied|speculative",
|
||||||
|
"speaker": "Kent|PoC|Unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
If the excerpt contains no extractable facts, output an empty array: []
|
||||||
|
|
||||||
|
--- CONVERSATION EXCERPT ---
|
||||||
|
"#;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Fact {
|
||||||
|
pub claim: String,
|
||||||
|
pub domain: String,
|
||||||
|
pub confidence: String,
|
||||||
|
pub speaker: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub source_file: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub source_chunk: Option<usize>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub source_offset: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Message {
|
||||||
|
role: String,
|
||||||
|
text: String,
|
||||||
|
timestamp: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract user/assistant text messages from a JSONL transcript.
|
||||||
|
fn extract_conversation(path: &Path) -> Vec<Message> {
|
||||||
|
let Ok(content) = fs::read_to_string(path) else { return Vec::new() };
|
||||||
|
let mut messages = Vec::new();
|
||||||
|
|
||||||
|
for line in content.lines() {
|
||||||
|
let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) else { continue };
|
||||||
|
|
||||||
|
let msg_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
if msg_type != "user" && msg_type != "assistant" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let timestamp = obj.get("timestamp")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let msg = obj.get("message").unwrap_or(&obj);
|
||||||
|
let content = msg.get("content");
|
||||||
|
|
||||||
|
let text = match content {
|
||||||
|
Some(serde_json::Value::String(s)) => s.clone(),
|
||||||
|
Some(serde_json::Value::Array(arr)) => {
|
||||||
|
let texts: Vec<&str> = arr.iter()
|
||||||
|
.filter_map(|block| {
|
||||||
|
let obj = block.as_object()?;
|
||||||
|
if obj.get("type")?.as_str()? != "text" {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let t = obj.get("text")?.as_str()?;
|
||||||
|
if t.contains("<system-reminder>") {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(t)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
texts.join("\n")
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let text = text.trim().to_string();
|
||||||
|
if text.len() < 20 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let role = if msg_type == "user" { "Kent" } else { "PoC" }.to_string();
|
||||||
|
messages.push(Message { role, text, timestamp });
|
||||||
|
}
|
||||||
|
|
||||||
|
messages
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Format messages into a single text for chunking.
|
||||||
|
fn format_for_extraction(messages: &[Message]) -> String {
|
||||||
|
messages.iter()
|
||||||
|
.map(|msg| {
|
||||||
|
let text = if msg.text.len() > 3000 {
|
||||||
|
// Find a char boundary near 2800
|
||||||
|
let trunc = msg.text.floor_char_boundary(2800);
|
||||||
|
format!("{}\n[...truncated...]", &msg.text[..trunc])
|
||||||
|
} else {
|
||||||
|
msg.text.clone()
|
||||||
|
};
|
||||||
|
let ts = if msg.timestamp.len() >= 19 { &msg.timestamp[..19] } else { "" };
|
||||||
|
if ts.is_empty() {
|
||||||
|
format!("[{}] {}", msg.role, text)
|
||||||
|
} else {
|
||||||
|
format!("[{} {}] {}", msg.role, ts, text)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Split text into overlapping windows, breaking at paragraph boundaries.
|
||||||
|
fn chunk_text(text: &str) -> Vec<(usize, &str)> {
|
||||||
|
let mut chunks = Vec::new();
|
||||||
|
let mut start = 0;
|
||||||
|
|
||||||
|
while start < text.len() {
|
||||||
|
let mut end = text.floor_char_boundary((start + WINDOW_CHARS).min(text.len()));
|
||||||
|
|
||||||
|
// Try to break at a paragraph boundary
|
||||||
|
if end < text.len() {
|
||||||
|
if let Some(para) = text[start..end].rfind("\n\n") {
|
||||||
|
if para > WINDOW_CHARS / 2 {
|
||||||
|
end = start + para;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks.push((start, &text[start..end]));
|
||||||
|
|
||||||
|
let next = text.floor_char_boundary(end.saturating_sub(OVERLAP_CHARS));
|
||||||
|
if next <= start {
|
||||||
|
start = end;
|
||||||
|
} else {
|
||||||
|
start = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse JSON facts from model response.
|
||||||
|
fn parse_facts(response: &str) -> Vec<Fact> {
|
||||||
|
let cleaned = response.trim();
|
||||||
|
// Strip markdown code block
|
||||||
|
let cleaned = if cleaned.starts_with("```") {
|
||||||
|
cleaned.lines()
|
||||||
|
.filter(|l| !l.starts_with("```"))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n")
|
||||||
|
} else {
|
||||||
|
cleaned.to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Find JSON array
|
||||||
|
let start = cleaned.find('[');
|
||||||
|
let end = cleaned.rfind(']');
|
||||||
|
let (Some(start), Some(end)) = (start, end) else { return Vec::new() };
|
||||||
|
|
||||||
|
serde_json::from_str(&cleaned[start..=end]).unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mine a single transcript for atomic facts.
|
||||||
|
pub fn mine_transcript(path: &Path, dry_run: bool) -> Result<Vec<Fact>, String> {
|
||||||
|
let filename = path.file_name()
|
||||||
|
.map(|n| n.to_string_lossy().to_string())
|
||||||
|
.unwrap_or_else(|| "unknown".into());
|
||||||
|
eprintln!("Mining: {}", filename);
|
||||||
|
|
||||||
|
let messages = extract_conversation(path);
|
||||||
|
if messages.is_empty() {
|
||||||
|
eprintln!(" No messages found");
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
eprintln!(" {} messages extracted", messages.len());
|
||||||
|
|
||||||
|
let text = format_for_extraction(&messages);
|
||||||
|
let chunks = chunk_text(&text);
|
||||||
|
eprintln!(" {} chunks ({} chars)", chunks.len(), text.len());
|
||||||
|
|
||||||
|
if dry_run {
|
||||||
|
for (i, (offset, chunk)) in chunks.iter().enumerate() {
|
||||||
|
eprintln!("\n--- Chunk {} (offset {}, {} chars) ---", i + 1, offset, chunk.len());
|
||||||
|
let preview = if chunk.len() > 500 { &chunk[..500] } else { chunk };
|
||||||
|
eprintln!("{}", preview);
|
||||||
|
if chunk.len() > 500 {
|
||||||
|
eprintln!(" ... ({} more chars)", chunk.len() - 500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut all_facts = Vec::new();
|
||||||
|
for (i, (_offset, chunk)) in chunks.iter().enumerate() {
|
||||||
|
eprint!(" Chunk {}/{} ({} chars)...", i + 1, chunks.len(), chunk.len());
|
||||||
|
|
||||||
|
let prompt = format!("{}{}", EXTRACTION_PROMPT, chunk);
|
||||||
|
let response = match llm::call_haiku(&prompt) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!(" error: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut facts = parse_facts(&response);
|
||||||
|
for fact in &mut facts {
|
||||||
|
fact.source_file = Some(filename.clone());
|
||||||
|
fact.source_chunk = Some(i + 1);
|
||||||
|
fact.source_offset = Some(*_offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
eprintln!(" {} facts", facts.len());
|
||||||
|
all_facts.extend(facts);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deduplicate by claim text
|
||||||
|
let mut seen = HashSet::new();
|
||||||
|
let before = all_facts.len();
|
||||||
|
all_facts.retain(|f| seen.insert(f.claim.to_lowercase()));
|
||||||
|
let dupes = before - all_facts.len();
|
||||||
|
if dupes > 0 {
|
||||||
|
eprintln!(" {} duplicates removed", dupes);
|
||||||
|
}
|
||||||
|
|
||||||
|
eprintln!(" Total: {} unique facts", all_facts.len());
|
||||||
|
Ok(all_facts)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mine a transcript and store facts in the capnp store.
|
||||||
|
/// Returns the number of facts stored.
|
||||||
|
pub fn mine_and_store(path: &Path) -> Result<usize, String> {
|
||||||
|
let facts = mine_transcript(path, false)?;
|
||||||
|
if facts.is_empty() {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let filename = path.file_name()
|
||||||
|
.map(|n| n.to_string_lossy().to_string())
|
||||||
|
.unwrap_or_else(|| "unknown".into());
|
||||||
|
|
||||||
|
// Store as a single node keyed by transcript filename
|
||||||
|
let key = format!("_facts-{}", filename.trim_end_matches(".jsonl"));
|
||||||
|
let json = serde_json::to_string_pretty(&facts)
|
||||||
|
.map_err(|e| format!("serialize facts: {}", e))?;
|
||||||
|
|
||||||
|
let mut store = store::Store::load()?;
|
||||||
|
store.upsert_provenance(&key, &json, Provenance::AgentFactMine)?;
|
||||||
|
store.save()?;
|
||||||
|
|
||||||
|
eprintln!(" Stored {} facts as {}", facts.len(), key);
|
||||||
|
Ok(facts.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mine transcripts, returning all facts. Skips files with fewer than min_messages.
|
||||||
|
pub fn mine_batch(paths: &[&Path], min_messages: usize, dry_run: bool) -> Result<Vec<Fact>, String> {
|
||||||
|
let mut all_facts = Vec::new();
|
||||||
|
|
||||||
|
for path in paths {
|
||||||
|
let messages = extract_conversation(path);
|
||||||
|
if messages.len() < min_messages {
|
||||||
|
eprintln!("Skipping {} ({} messages < {})",
|
||||||
|
path.file_name().map(|n| n.to_string_lossy()).unwrap_or_default(),
|
||||||
|
messages.len(), min_messages);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let facts = mine_transcript(path, dry_run)?;
|
||||||
|
all_facts.extend(facts);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(all_facts)
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue