// poc-memory daemon: background job orchestration for memory maintenance // // Replaces the fragile cron+shell approach with a single long-running process // that owns all background memory work. Uses jobkit for worker pool, status // tracking, retry, and cancellation. // // Architecture: // - Scheduler task: runs every 60s, scans filesystem state, spawns jobs // - Session watcher task: detects ended Claude sessions, triggers extraction // - Jobs shell out to existing poc-memory subcommands (Phase 1) // - Status written to daemon-status.json for `poc-memory daemon status` // // Phase 2 will inline job logic; Phase 3 integrates into poc-agent. use jobkit::{Choir, ExecutionContext, TaskError, TaskInfo, TaskStatus}; use std::collections::{HashMap, HashSet}; use std::fs; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; const SESSION_STALE_SECS: u64 = 600; // 10 minutes const SCHEDULER_INTERVAL: Duration = Duration::from_secs(60); const HEALTH_INTERVAL: Duration = Duration::from_secs(3600); // --- Persistent task queue --- #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct PendingTask { id: String, agent: String, batch: usize, #[serde(default)] target_key: Option, } struct TaskQueue { path: PathBuf, tasks: Mutex>, } impl TaskQueue { fn load(data_dir: &Path) -> Arc { let path = data_dir.join("pending-tasks.jsonl"); let tasks = if path.exists() { fs::read_to_string(&path) .unwrap_or_default() .lines() .filter_map(|l| serde_json::from_str(l).ok()) .collect() } else { Vec::new() }; let count = tasks.len(); if count > 0 { log_event("task-queue", "loaded", &format!("{} pending tasks", count)); } Arc::new(Self { path, tasks: Mutex::new(tasks) }) } fn push(&self, task: PendingTask) { let mut tasks = self.tasks.lock().unwrap(); tasks.push(task); self.write_locked(&tasks); } fn remove(&self, id: &str) { let mut tasks = self.tasks.lock().unwrap(); tasks.retain(|t| t.id != id); self.write_locked(&tasks); } fn drain(&self) -> Vec { let tasks = self.tasks.lock().unwrap(); tasks.clone() } fn write_locked(&self, tasks: &[PendingTask]) { let content: String = tasks.iter() .filter_map(|t| serde_json::to_string(t).ok()) .collect::>() .join("\n"); let _ = fs::write(&self.path, if content.is_empty() { String::new() } else { content + "\n" }); } } fn log_path() -> PathBuf { crate::config::get().data_dir.join("daemon.log") } // --- Logging --- fn log_event(job: &str, event: &str, detail: &str) { jobkit::daemon::event_log::log(&crate::config::get().data_dir, job, event, detail); } /// Public wrapper for logging from other agent modules. pub fn log_event_pub(job: &str, event: &str, detail: &str) { log_event(job, event, detail); } /// Verbose log — only written if verbose logging is enabled. pub fn log_verbose(job: &str, event: &str, detail: &str) { jobkit::daemon::event_log::verbose(&crate::config::get().data_dir, job, event, detail); } // --- Job functions (direct, no subprocess) --- static DAEMON_POOL: std::sync::OnceLock> = std::sync::OnceLock::new(); /// Run a named job with logging, progress reporting, and error mapping. fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), String>) -> Result<(), TaskError> { let pool = DAEMON_POOL.get().map(|p| p.as_ref()); jobkit::daemon::Daemon::run_job(&crate::config::get().data_dir, ctx, name, pool, f) } // experience_mine and fact_mine removed — observation.agent handles all transcript mining /// Run an agent targeted at a specific node key. fn job_targeted_agent( ctx: &ExecutionContext, agent_type: &str, target_key: &str, ) -> Result<(), TaskError> { let agent = agent_type.to_string(); let key = target_key.to_string(); let job_name = format!("c-{}-target({})", agent, key); run_job(ctx, &job_name, || { let mut store = crate::store::Store::load()?; ctx.log_line(&format!("targeting: {}", key)); let job = job_name.clone(); let log = |msg: &str| { ctx.log_line(msg); log_event(&job, "progress", msg); }; super::knowledge::run_one_agent_with_keys( &mut store, &agent, &[key.clone()], 5, "daemon", &log, false, )?; ctx.log_line("done"); Ok(()) }) } /// Run a single consolidation agent (replay, linker, separator, transfer, health). /// Shared set of node keys currently being processed by agents. /// Prevents concurrent agents from working on overlapping graph regions. type InFlightNodes = Arc>>; fn job_consolidation_agent( ctx: &ExecutionContext, agent_type: &str, batch_size: usize, in_flight: &InFlightNodes, ) -> Result<(), TaskError> { let agent = agent_type.to_string(); let batch = batch_size; let job_name = format!("c-{}", agent); let job_name2 = job_name.clone(); let in_flight = Arc::clone(in_flight); run_job(ctx, &job_name, || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; // Claim seeds: lock in-flight set, run query excluding it, // add selected seeds + strongly-connected neighbors, then unlock. let mut claimed_keys: Vec; let graph = store.build_graph(); { let mut locked = in_flight.lock().unwrap(); ctx.log_line(&format!("running agent: {} (batch={}, {} in-flight)", agent, batch, locked.len())); // Run the agent's query, filtering out in-flight nodes let def = super::defs::get_def(&agent) .ok_or_else(|| format!("no .agent file for {}", agent))?; let query = &def.query; let keys = if !query.is_empty() { use crate::query::engine as search; let mut stages = search::Stage::parse_pipeline(query)?; let padded = batch + locked.len().min(100); if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) { stages.push(search::Stage::Transform(search::Transform::Limit(padded))); } let results = search::run_query(&stages, vec![], &graph, &store, false, padded); results.into_iter() .map(|(k, _)| k) .filter(|k| !locked.contains(k)) .take(batch) .collect::>() } else { vec![] }; if keys.is_empty() { return Err("query returned no results (after exclusion)".into()); } // Claim seeds + strongly-connected neighbors. // Only exclude neighbors with score > threshold to avoid // blacking out the graph via high-degree hub nodes. claimed_keys = Vec::with_capacity(batch * 20); for key in &keys { claimed_keys.push(key.clone()); locked.insert(key.clone()); for (nbr, strength) in graph.neighbors(key) { let weight = store.nodes.get(nbr.as_str()) .map(|n| n.weight).unwrap_or(0.1); if strength * weight > 0.3 { claimed_keys.push(nbr.clone()); locked.insert(nbr.clone()); } } } } // in_flight lock released — run LLM without holding it let log = |msg: &str| { ctx.log_line(msg); log_event(&job_name2, "progress", msg); }; // Use run_one_agent_with_keys — we already selected seeds above, // no need to re-run the query. let result = super::knowledge::run_one_agent_with_keys( &mut store, &agent, &claimed_keys, batch, "consolidate", &log, false, ).map(|_| ()); // Release all claimed keys (seeds + neighbors) { let mut locked = in_flight.lock().unwrap(); for key in &claimed_keys { locked.remove(key); } } result?; ctx.log_line("done"); Ok(()) }) } /// Run the rename agent: generates renames via LLM, applies them directly. fn job_rename_agent( ctx: &ExecutionContext, batch_size: usize, ) -> Result<(), TaskError> { run_job(ctx, "c-rename", || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; let batch = if batch_size == 0 { 10 } else { batch_size }; ctx.log_line(&format!("running rename agent (batch={})", batch)); let log = |msg: &str| ctx.log_line(msg); let result = super::knowledge::run_one_agent(&mut store, "rename", batch, "consolidate", &log, false)?; // Parse RENAME actions from response (rename uses its own format, not WRITE_NODE/LINK/REFINE) let mut applied = 0; let mut skipped = 0; for line in result.output.lines() { let trimmed = line.trim(); if !trimmed.starts_with("RENAME ") { continue; } let parts: Vec<&str> = trimmed[7..].splitn(2, ' ').collect(); if parts.len() != 2 { skipped += 1; continue; } let old_key = parts[0].trim(); let new_key = parts[1].trim(); if old_key.is_empty() || new_key.is_empty() { skipped += 1; continue; } let resolved = match store.resolve_key(old_key) { Ok(k) => k, Err(e) => { ctx.log_line(&format!("skip: {} → {}: {}", old_key, new_key, e)); skipped += 1; continue; } }; if store.nodes.contains_key(new_key) { ctx.log_line(&format!("skip: {} already exists", new_key)); skipped += 1; continue; } match store.rename_node(&resolved, new_key) { Ok(()) => { ctx.log_line(&format!("renamed: {} → {}", resolved, new_key)); applied += 1; } Err(e) => { ctx.log_line(&format!("error: {} → {}: {}", resolved, new_key, e)); skipped += 1; } } } if applied > 0 { store.save()?; } ctx.log_line(&format!("done: {} applied, {} skipped", applied, skipped)); Ok(()) }) } /// Run the split agent: two-phase decomposition of large nodes. /// /// Phase 1: Send node + neighbors to LLM, get back a JSON split plan /// (child keys, descriptions, section hints). /// Phase 2: For each child, send parent content + child description to LLM, /// get back the extracted/reorganized content for that child. /// /// This handles arbitrarily large nodes because the output of each phase 2 /// call is proportional to one child, not the whole parent. /// Split a single node by key. Called as an independent task so multiple /// splits can run in parallel. Each task loads the store fresh, checks the /// node still exists and hasn't been split, does the LLM work, then saves. fn job_split_one( ctx: &ExecutionContext, parent_key: String, ) -> Result<(), TaskError> { run_job(ctx, "c-split", || { ctx.log_line(&format!("loading store for {}", parent_key)); let mut store = crate::store::Store::load()?; // Check node still exists and hasn't been deleted/split already let content_len = match store.nodes.get(parent_key.as_str()) { Some(n) if !n.deleted => n.content.len(), _ => { ctx.log_line(&format!("skip: {} no longer exists or deleted", parent_key)); return Ok(()); } }; ctx.log_line(&format!("--- splitting: {} ({} chars)", parent_key, content_len)); // Phase 1: get split plan let plan_prompt = super::prompts::split_plan_prompt(&store, &parent_key)?; ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len())); let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?; let plan = match super::llm::parse_json_response(&plan_response) { Ok(v) => v, Err(e) => { ctx.log_line(&format!("phase 1 parse error: {}", e)); return Ok(()); } }; let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or(""); if action == "keep" { let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or(""); ctx.log_line(&format!("keep: {} ({})", parent_key, reason)); return Ok(()); } if action != "split" { ctx.log_line(&format!("unexpected action: {}", action)); return Ok(()); } let children_plan = match plan.get("children").and_then(|v| v.as_array()) { Some(c) if c.len() >= 2 => c, _ => { ctx.log_line("plan has fewer than 2 children, skipping"); return Ok(()); } }; ctx.log_line(&format!("phase 1: {} children planned", children_plan.len())); for child in children_plan { let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?"); let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or(""); ctx.log_line(&format!(" planned: {} — {}", key, desc)); } // Phase 2: extract content for each child let mut children: Vec<(String, String)> = Vec::new(); // Collect neighbor assignments from plan: child_key -> [neighbor_keys] let mut neighbor_map: HashMap> = HashMap::new(); for child_plan in children_plan { let child_key = match child_plan.get("key").and_then(|v| v.as_str()) { Some(k) => k.to_string(), None => continue, }; let child_desc = child_plan.get("description") .and_then(|v| v.as_str()).unwrap_or(""); let child_sections = child_plan.get("sections") .and_then(|v| v.as_array()) .map(|arr| arr.iter() .filter_map(|v| v.as_str()) .collect::>() .join(", ")) .unwrap_or_default(); let child_neighbors: Vec = child_plan.get("neighbors") .and_then(|v| v.as_array()) .map(|arr| arr.iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect()) .unwrap_or_default(); neighbor_map.insert(child_key.clone(), child_neighbors); ctx.log_line(&format!("phase 2: extracting {}", child_key)); let extract_prompt = super::prompts::split_extract_prompt( &store, &parent_key, &child_key, child_desc, &child_sections)?; ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len())); let content = match super::llm::call_sonnet("split-extract", &extract_prompt) { Ok(c) => c, Err(e) => { ctx.log_line(&format!(" extract error: {}", e)); continue; } }; ctx.log_line(&format!(" extracted: {} chars", content.len())); children.push((child_key, content)); } if children.len() < 2 { ctx.log_line(&format!("only {} children extracted, skipping", children.len())); return Ok(()); } // Reload store before mutations — another split may have saved meanwhile store = crate::store::Store::load()?; // Re-check parent still exists after reload if !store.nodes.contains_key(parent_key.as_str()) || store.nodes.get(parent_key.as_str()).map_or(true, |n| n.deleted) { ctx.log_line(&format!("skip: {} was split by another task", parent_key)); return Ok(()); } // Collect parent's edges before modifications let parent_edges: Vec<_> = store.relations.iter() .filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key)) .cloned() .collect(); // Create child nodes let mut child_uuids: Vec<([u8; 16], String)> = Vec::new(); for (child_key, content) in &children { if store.nodes.contains_key(child_key.as_str()) { ctx.log_line(&format!(" skip: {} already exists", child_key)); continue; } store.upsert_provenance(child_key, content, "consolidate:write")?; let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid; child_uuids.push((uuid, child_key.clone())); ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len())); } // Inherit edges using agent's neighbor assignments from the plan for (child_uuid, child_key) in &child_uuids { let neighbors = match neighbor_map.get(child_key) { Some(n) => n, None => continue, }; for neighbor_key in neighbors { // Find the parent edge for this neighbor to inherit its strength let parent_edge = parent_edges.iter().find(|r| { r.source_key == *neighbor_key || r.target_key == *neighbor_key }); let strength = parent_edge.map(|e| e.strength).unwrap_or(0.3); let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) { Some(n) => n.uuid, None => continue, }; let rel = crate::store::new_relation( *child_uuid, neighbor_uuid, crate::store::RelationType::Auto, strength, child_key, neighbor_key, ); store.add_relation(rel).ok(); } } // Link siblings for i in 0..child_uuids.len() { for j in (i+1)..child_uuids.len() { let rel = crate::store::new_relation( child_uuids[i].0, child_uuids[j].0, crate::store::RelationType::Auto, 0.5, &child_uuids[i].1, &child_uuids[j].1, ); store.add_relation(rel).ok(); } } // Tombstone parent if let Some(parent) = store.nodes.get_mut(parent_key.as_str()) { parent.deleted = true; parent.version += 1; let tombstone = parent.clone(); store.append_nodes(std::slice::from_ref(&tombstone)).ok(); } store.nodes.remove(parent_key.as_str()); ctx.log_line(&format!("split complete: {} → {} children", parent_key, child_uuids.len())); store.save()?; Ok(()) }) } /// Link orphan nodes (CPU-heavy, no LLM). fn job_link_orphans(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "c-orphans", || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; ctx.log_line("linking orphans"); let (orphans, added) = crate::neuro::link_orphans(&mut store, 2, 3, 0.15); ctx.log_line(&format!("{} orphans, {} links added", orphans, added)); Ok(()) }) } /// Cap node degree to prevent mega-hubs. fn job_cap_degree(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "c-cap", || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; ctx.log_line("capping degree"); match store.cap_degree(50) { Ok((hubs, pruned)) => { store.save()?; ctx.log_line(&format!("{} hubs capped, {} edges pruned", hubs, pruned)); Ok(()) } Err(e) => Err(e), } }) } /// Apply links extracted from digests. fn job_digest_links(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "c-digest-links", || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; ctx.log_line("applying digest links"); let links = super::digest::parse_all_digest_links(&store); let (applied, skipped, fallbacks) = super::digest::apply_digest_links(&mut store, &links); store.save()?; ctx.log_line(&format!("{} applied, {} skipped, {} fallbacks", applied, skipped, fallbacks)); Ok(()) }) } fn job_knowledge_loop(_ctx: &ExecutionContext) -> Result<(), TaskError> { // Knowledge loop removed — agents now use tool calls directly. // Consolidation is handled by consolidate_full() in the consolidate job. Ok(()) } fn job_digest(ctx: &ExecutionContext) -> Result<(), TaskError> { run_job(ctx, "digest", || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; ctx.log_line("generating digests"); crate::digest::digest_auto(&mut store) }) } fn job_daily_check( ctx: &ExecutionContext, graph_health: &Arc>>, ) -> Result<(), TaskError> { let gh = Arc::clone(graph_health); run_job(ctx, "daily-check", || { ctx.log_line("loading store"); let store = crate::store::Store::load()?; ctx.log_line("checking health"); let _report = crate::neuro::daily_check(&store); // Decay search hit counters (10% daily decay) ctx.log_line("decaying search counters"); match crate::counters::decay_all(0.9) { Ok(removed) => ctx.log_line(&format!("decayed counters, removed {}", removed)), Err(e) => ctx.log_line(&format!("counter decay failed: {}", e)), } // Compute graph health metrics for status display ctx.log_line("computing graph health"); let health = compute_graph_health(&store); *gh.lock().unwrap() = Some(health); Ok(()) }) } fn compute_graph_health(store: &crate::store::Store) -> GraphHealth { let graph = store.build_graph(); let snap = crate::graph::current_metrics(&graph); let episodic_count = store.nodes.iter() .filter(|(_, n)| matches!(n.node_type, crate::store::NodeType::EpisodicSession)) .count(); let episodic_ratio = if store.nodes.is_empty() { 0.0 } else { episodic_count as f32 / store.nodes.len() as f32 }; // Use the same planning logic as consolidation (skip O(n²) interference) let plan = crate::neuro::consolidation_plan_quick(store); GraphHealth { nodes: snap.nodes, edges: snap.edges, communities: snap.communities, alpha: snap.alpha, gini: snap.gini, avg_cc: snap.avg_cc, sigma: snap.sigma, episodic_ratio, interference: 0, plan_counts: plan.counts, plan_rationale: plan.rationale, computed_at: crate::store::format_datetime_space(crate::store::now_epoch()), } } // --- Session detection --- /// Find JSONL session files that are stale (not recently written) and not /// held open by any process. /// Find JSONL session files that haven't been written to recently. /// Only checks metadata (stat), no file reads or subprocess calls. /// The fuser check (is file open?) is deferred to the reconcile loop, /// only for sessions that pass the mined-key filter. /// Minimum file size for a session to be worth mining. /// Daemon-spawned LLM calls are ~55KB/5 lines; real interactive /// sessions are much larger. Skip anything too small to contain /// meaningful conversation. const MIN_SESSION_BYTES: u64 = 100_000; fn find_stale_sessions() -> Vec { let projects = crate::config::get().projects_dir.clone(); if !projects.exists() { return Vec::new(); } let mut stale = Vec::new(); let now = SystemTime::now(); let Ok(dirs) = fs::read_dir(&projects) else { return stale }; for dir_entry in dirs.filter_map(|e| e.ok()) { if !dir_entry.path().is_dir() { continue; } let Ok(files) = fs::read_dir(dir_entry.path()) else { continue }; for f in files.filter_map(|e| e.ok()) { let path = f.path(); if path.extension().map(|x| x == "jsonl").unwrap_or(false) { if let Ok(meta) = path.metadata() { // Skip tiny sessions (daemon-spawned LLM calls, aborted sessions) if meta.len() < MIN_SESSION_BYTES { continue; } if let Ok(mtime) = meta.modified() { let age = now.duration_since(mtime).unwrap_or_default(); if age.as_secs() >= SESSION_STALE_SECS { stale.push(path); } } } } } } stale } /// Check if any other process has a file open by scanning /proc/*/fd/. /// This is what `fuser` does internally, without the subprocess overhead. fn is_file_open(path: &Path) -> bool { let Ok(target) = path.canonicalize() else { return false }; let Ok(procs) = fs::read_dir("/proc") else { return false }; let my_pid = std::process::id().to_string(); for proc_entry in procs.filter_map(|e| e.ok()) { let name = proc_entry.file_name(); let name = name.to_string_lossy(); if !name.chars().all(|c| c.is_ascii_digit()) { continue; } if *name == my_pid { continue; } let fd_dir = proc_entry.path().join("fd"); let Ok(fds) = fs::read_dir(&fd_dir) else { continue }; for fd in fds.filter_map(|e| e.ok()) { if let Ok(link) = fs::read_link(fd.path()) { if link == target { return true; } } } } false } /// Get process uptime as human-readable string by reading /proc//stat. fn proc_uptime(pid: u32) -> Option { // /proc//stat field 22 (1-indexed) is start time in clock ticks let stat = fs::read_to_string(format!("/proc/{}/stat", pid)).ok()?; // Fields after comm (which may contain spaces/parens) — find closing paren let after_comm = stat.get(stat.rfind(')')? + 2..)?; let fields: Vec<&str> = after_comm.split_whitespace().collect(); // Field 22 in stat is index 19 after comm (fields[0] = state, field 22 = starttime = index 19) let start_ticks: u64 = fields.get(19)?.parse().ok()?; let ticks_per_sec = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as u64; let boot_time_secs = { let uptime = fs::read_to_string("/proc/uptime").ok()?; let sys_uptime: f64 = uptime.split_whitespace().next()?.parse().ok()?; let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); now - sys_uptime as u64 }; let start_secs = boot_time_secs + start_ticks / ticks_per_sec; let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok()?.as_secs(); let uptime = now.saturating_sub(start_secs); Some(format_duration_human(uptime as u128 * 1000)) } // --- Status writing --- fn write_status( choir: &Choir, last_daily: Option, graph_health: &Arc>>, ) { let status = build_status(choir, last_daily, graph_health); jobkit::daemon::status::write(&crate::config::get().data_dir, &status); } #[derive(Clone, Default, serde::Serialize, serde::Deserialize)] pub struct GraphHealth { pub nodes: usize, pub edges: usize, pub communities: usize, pub alpha: f32, // power-law exponent (target ≥2.5) pub gini: f32, // degree inequality (target ≤0.4) pub avg_cc: f32, // clustering coefficient (target ≥0.2) pub sigma: f32, // small-world sigma pub episodic_ratio: f32, // episodic/total nodes (target <0.4) pub interference: usize, // interfering pairs (target <50) // Consolidation work estimate from plan #[serde(default)] pub plan_counts: std::collections::HashMap, pub plan_rationale: Vec, pub computed_at: String, } #[derive(serde::Serialize, serde::Deserialize)] struct DaemonStatus { pid: u32, tasks: Vec, #[serde(default)] last_daily: Option, #[serde(default)] graph_health: Option, } // --- The daemon --- pub fn run_daemon() -> Result<(), String> { let config = crate::config::get(); let mut daemon = jobkit::daemon::Daemon::new(jobkit::daemon::DaemonConfig { data_dir: config.data_dir.clone(), resource_slots: config.llm_concurrency, resource_name: "llm".to_string(), }); let choir = Arc::clone(&daemon.choir); let llm = Arc::clone(&daemon.resource); let _ = DAEMON_POOL.set(Arc::clone(&llm)); let task_log_dir = config.data_dir.join("logs"); let _ = fs::create_dir_all(&task_log_dir); // Enable verbose logging if POC_MEMORY_VERBOSE is set if std::env::var("POC_MEMORY_VERBOSE").is_ok() { jobkit::daemon::event_log::set_level(jobkit::daemon::event_log::LogLevel::Verbose); } // Recover last_daily from previous status file let last_daily: Arc>> = Arc::new(Mutex::new( jobkit::daemon::status::load::(&config.data_dir) .and_then(|s| s.last_daily) .and_then(|d| d.parse().ok()) )); let graph_health: Arc>> = Arc::new(Mutex::new(None)); // Persistent task queue — survives daemon restarts let task_queue = TaskQueue::load(&config.data_dir); // Nodes currently being processed by agents — prevents concurrent // agents from working on overlapping graph regions. let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new())); log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); // Recover pending tasks from previous run { let recovered = task_queue.drain(); if !recovered.is_empty() { log_event("task-queue", "recovering", &format!("{} tasks", recovered.len())); for pt in &recovered { let agent = pt.agent.clone(); let b = pt.batch; let task_id = pt.id.clone(); let in_flight_clone = Arc::clone(&in_flight); let queue_clone = Arc::clone(&task_queue); choir.spawn(pt.id.clone()) .resource(&llm) .log_dir(&task_log_dir) .retries(1) .init(move |ctx| { let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); queue_clone.remove(&task_id); result }); // Drop schedules via IdleTask::Drop } } } // Write initial status write_status(&choir, *last_daily.lock().unwrap(), &graph_health); // Session watcher: reconcile-based extraction // Each tick: scan filesystem for stale sessions, check store for what's // already mined, check task registry for what's in-flight, spawn the diff. // No persistent tracking state — the store is the source of truth. let choir_sw = Arc::clone(&choir); let _llm_sw = Arc::clone(&llm); // kept for future use let last_daily_sw = Arc::clone(&last_daily); let graph_health_sw = Arc::clone(&graph_health); choir.spawn("session-watcher").init(move |ctx| { ctx.set_progress("idle"); // Cache: path → (file_size, segment_count). Invalidated when size changes. let mut seg_cache: HashMap = HashMap::new(); // Retry backoff: filename → (next_retry_after, current_backoff). // Exponential from 5min, cap 30min. Resets on daemon restart. let mut retry_backoff: HashMap = HashMap::new(); const BACKOFF_INITIAL: Duration = Duration::from_secs(300); // 5 min const BACKOFF_MAX: Duration = Duration::from_secs(1800); // 30 min loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } ctx.set_progress("scanning"); // Check for failed tasks and update backoff. // Task names are "extract:{filename}.{segment}" — extract the // filename (UUID.jsonl) by stripping the trailing .N segment suffix. let task_statuses = choir_sw.task_statuses(); for t in &task_statuses { if let Some(label) = t.name.strip_prefix("extract:") { // label is "UUID.jsonl.N" — strip last ".N" to get filename let filename = match label.rfind('.') { Some(pos) if label[pos+1..].chars().all(|c| c.is_ascii_digit()) => { &label[..pos] } _ => label, }; match t.status { TaskStatus::Failed => { let entry = retry_backoff.entry(filename.to_string()) .or_insert((std::time::Instant::now(), BACKOFF_INITIAL)); entry.1 = (entry.1 * 2).min(BACKOFF_MAX); entry.0 = std::time::Instant::now() + entry.1; } TaskStatus::Completed => { retry_backoff.remove(filename); } _ => {} } } } // What's currently running/pending? (avoid spawning duplicates) let active: HashSet = task_statuses.iter() .filter(|t| !t.status.is_finished()) .map(|t| t.name.clone()) .collect(); let stale = find_stale_sessions(); // Load mined transcript keys once for this tick let mined = std::collections::HashSet::::new(); // mining removed // MAX_NEW_PER_TICK removed — mining handled by observation agent // Load fact-mined keys too let fact_keys: HashSet = { use crate::store::StoreView; let view = crate::store::AnyView::load().ok(); view.map(|v| { let mut keys = HashSet::new(); v.for_each_node(|key, _, _| { if key.starts_with("_facts-") { keys.insert(key.to_string()); } }); keys }).unwrap_or_default() }; let _extract_queued = 0usize; let mut _extract_remaining = 0usize; let mut _fact_remaining = 0usize; let mut already_mined = 0; let mut still_open = 0; let mut backed_off = 0; let total_stale = stale.len(); // Sessions with old whole-file keys that need per-segment migration let mut migrate_keys: Vec<(String, String, usize)> = Vec::new(); let mut needs_extract: Vec<(String, String, Option)> = Vec::new(); let mut needs_fact: Vec<(String, String)> = Vec::new(); let now = std::time::Instant::now(); for session in stale { let filename = session.file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()); let path_str = session.to_string_lossy().to_string(); // Check retry backoff before doing any work if let Some((next_retry, _)) = retry_backoff.get(&filename) { if now < *next_retry { backed_off += 1; continue; } } if is_file_open(&session) { still_open += 1; continue; } // Get file size for cache invalidation let file_size = fs::metadata(&session).map(|m| m.len()).unwrap_or(0); // Get segment count, using cache with size-based invalidation let seg_count = if let Some(&(cached_size, cached_count)) = seg_cache.get(&path_str) { if cached_size == file_size { cached_count } else { // File changed — re-parse let messages = match super::enrich::extract_conversation(&path_str) { Ok(m) => m, Err(_) => continue, }; let count = super::enrich::split_on_compaction(messages).len(); seg_cache.insert(path_str.clone(), (file_size, count)); count } } else { let messages = match super::enrich::extract_conversation(&path_str) { Ok(m) => m, Err(_) => continue, }; let count = super::enrich::split_on_compaction(messages).len(); seg_cache.insert(path_str.clone(), (file_size, count)); count }; // No extractable messages — skip entirely if seg_count == 0 { already_mined += 1; continue; } let fname_key = format!("_experience-{}", filename.trim_end_matches(".jsonl")); let has_whole_file_key = mined.contains(&fname_key); // Check per-segment keys, find unmined segments let mut unmined_segs: Vec = Vec::new(); let mut has_any_seg_key = false; for i in 0..seg_count { let seg_key = format!("{}.{}", fname_key, i); if mined.contains(&seg_key) { has_any_seg_key = true; } else { unmined_segs.push(i); } } // Migrate old whole-file key: if it exists but no per-segment keys, // write per-segment keys for all current segments (they were mined // under the old scheme) if has_whole_file_key && !has_any_seg_key && seg_count > 0 { migrate_keys.push((fname_key.clone(), path_str.clone(), seg_count)); // After migration, all current segments are covered unmined_segs.clear(); } if unmined_segs.is_empty() { // All segments mined — check fact-mining let fact_key = format!("_facts-{}", filename.trim_end_matches(".jsonl")); if !fact_keys.contains(&fact_key) { let task_name = format!("fact-mine:{}", filename); if !active.contains(&task_name) { needs_fact.push((filename, path_str)); } } else { already_mined += 1; } } else { // Queue unmined segments for i in unmined_segs { let task_name = format!("extract:{}.{}", filename, i); if active.contains(&task_name) { continue; } needs_extract.push(( format!("{}.{}", filename, i), path_str.clone(), Some(i), )); } } } // Migrate old whole-file keys to per-segment keys if !migrate_keys.is_empty() { match crate::store::Store::load() { Ok(mut store) => { let mut ok = 0; let mut fail = 0; for (fname_key, path_str, seg_count) in &migrate_keys { for i in 0..*seg_count { let seg_key = format!("{}.{}", fname_key, i); let content = format!("Migrated from whole-file key for {}", path_str); let mut node = crate::store::new_node(&seg_key, &content); node.provenance = "experience-mine:write".to_string(); match store.upsert_node(node) { Ok(()) => ok += 1, Err(e) => { if fail == 0 { eprintln!("migration upsert_node error: {}", e); } fail += 1; } } } } if let Err(e) = store.save() { eprintln!("migration save error: {}", e); } log_event("session-watcher", "migrated", &format!("{} whole-file keys → per-segment ({} ok, {} fail)", migrate_keys.len(), ok, fail)); } Err(e) => { eprintln!("migration store load error: {}", e); } } } // experience_mine and fact_mine killed — observation.agent handles transcript mining _extract_remaining = needs_extract.len(); _fact_remaining = needs_fact.len(); let extract_pending = _extract_queued + _extract_remaining; let fact_pending = _fact_remaining; if extract_pending > 0 || fact_pending > 0 || still_open > 0 || backed_off > 0 { log_event("session-watcher", "tick", &format!("{} stale, {} mined, {} extract, {} fact, {} open, {} backoff", total_stale, already_mined, extract_pending, fact_pending, still_open, backed_off)); let mut parts = Vec::new(); if extract_pending > 0 { parts.push(format!("{} extract", extract_pending)); } if fact_pending > 0 { parts.push(format!("{} fact", fact_pending)); } if still_open > 0 { parts.push(format!("{} open", still_open)); } if backed_off > 0 { parts.push(format!("{} backoff", backed_off)); } ctx.set_progress(parts.join(", ")); } else { ctx.set_progress("idle"); } write_status(&choir_sw, *last_daily_sw.lock().unwrap(), &graph_health_sw); std::thread::sleep(SCHEDULER_INTERVAL); } }); // Scheduler: runs daily jobs based on filesystem state let choir_sched = Arc::clone(&choir); let llm_sched = Arc::clone(&llm); let last_daily_sched = Arc::clone(&last_daily); let graph_health_sched = Arc::clone(&graph_health); let in_flight_sched = Arc::clone(&in_flight); let log_dir_sched = task_log_dir.clone(); let queue_sched = Arc::clone(&task_queue); const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours choir.spawn("scheduler").init(move |ctx| { let mut last_health = std::time::Instant::now() - HEALTH_INTERVAL; let mut last_consolidation = std::time::Instant::now() - CONSOLIDATION_INTERVAL; // run on first tick ctx.set_progress("idle"); loop { if ctx.is_cancelled() { return Err(TaskError::Fatal("cancelled".into())); } let today = chrono::Local::now().date_naive(); // Health check: every hour — also updates graph health metrics if last_health.elapsed() >= HEALTH_INTERVAL { let gh = Arc::clone(&graph_health_sched); choir_sched.spawn("health").init(move |ctx| { job_daily_check(ctx, &gh) }); last_health = std::time::Instant::now(); } // Consolidation cycle: every 6 hours (wait for health check to cache metrics first) let gh = graph_health_sched.lock().unwrap().clone(); if last_consolidation.elapsed() >= CONSOLIDATION_INTERVAL && gh.is_some() { log_event("scheduler", "consolidation-trigger", &format!("{} (every 6h)", today)); // Use cached graph health plan (from consolidation_plan_quick). let h = gh.as_ref().unwrap(); // guarded by gh.is_some() above let plan = crate::neuro::ConsolidationPlan { counts: h.plan_counts.clone(), run_health: true, rationale: Vec::new(), }; let runs = plan.to_agent_runs(5); let summary: Vec = h.plan_counts.iter() .filter(|(_, c)| **c > 0) .map(|(a, c)| format!("{}{}", &a[..1], c)) .collect(); log_event("scheduler", "consolidation-plan", &format!("{} agents ({})", runs.len(), summary.join(" "))); // Phase 1: Agent runs — all concurrent, in-flight exclusion // prevents overlapping graph regions. let mut all_tasks: Vec = Vec::new(); for (i, (agent_type, batch)) in runs.iter().enumerate() { let agent = agent_type.to_string(); let b = *batch; let in_flight_clone = Arc::clone(&in_flight_sched); let task_name = format!("c-{}-{}:{}", agent, i, today); let task_id = task_name.clone(); let queue_clone = Arc::clone(&queue_sched); queue_sched.push(PendingTask { id: task_id.clone(), agent: agent.clone(), batch: b, target_key: None, }); let task = choir_sched.spawn(task_name) .resource(&llm_sched) .log_dir(&log_dir_sched) .retries(1) .init(move |ctx| { let result = job_consolidation_agent(ctx, &agent, b, &in_flight_clone); queue_clone.remove(&task_id); result }) .run(); all_tasks.push(task); } // Orphans phase depends on all agent tasks completing let prev_agent = all_tasks.last().cloned(); // Phase 2: Link orphans (CPU-only, no LLM) let mut orphans = choir_sched.spawn(format!("c-orphans:{}", today)) .retries(1) .init(move |ctx| job_link_orphans(ctx)); if let Some(ref dep) = prev_agent { orphans.depend_on(dep); } let orphans = orphans.run(); // Phase 3: Cap degree let mut cap = choir_sched.spawn(format!("c-cap:{}", today)) .retries(1) .init(move |ctx| job_cap_degree(ctx)); cap.depend_on(&orphans); let cap = cap.run(); // Phase 4: Generate digests let mut digest = choir_sched.spawn(format!("c-digest:{}", today)) .resource(&llm_sched) .retries(1) .init(move |ctx| job_digest(ctx)); digest.depend_on(&cap); let digest = digest.run(); // Phase 5: Apply digest links let mut digest_links = choir_sched.spawn(format!("c-digest-links:{}", today)) .retries(1) .init(move |ctx| job_digest_links(ctx)); digest_links.depend_on(&digest); let digest_links = digest_links.run(); // Phase 7: Knowledge loop let mut knowledge = choir_sched.spawn(format!("c-knowledge:{}", today)) .resource(&llm_sched) .retries(1) .init(move |ctx| job_knowledge_loop(ctx)); knowledge.depend_on(&digest_links); *last_daily_sched.lock().unwrap() = Some(today); last_consolidation = std::time::Instant::now(); ctx.set_progress(format!("daily pipeline triggered ({today})")); } // Prune finished tasks from registry let pruned = choir_sched.gc_finished(); if pruned > 0 { log::trace!("pruned {} finished tasks", pruned); } write_status(&choir_sched, *last_daily_sched.lock().unwrap(), &graph_health_sched); std::thread::sleep(SCHEDULER_INTERVAL); } }); // Register RPC handlers { let last_daily_rpc = Arc::clone(&last_daily); daemon.add_rpc_handler(move |cmd, _ctx| { if cmd == "consolidate" { *last_daily_rpc.lock().unwrap() = None; log_event("rpc", "consolidate", "triggered via socket"); Some("{\"ok\":true,\"action\":\"consolidation scheduled\"}\n".into()) } else { None } }); } daemon.add_rpc_handler(|cmd, _ctx| { if cmd != "reload-config" { return None; } let changed = crate::config::reload(); let config = crate::config::get(); let api = config.api_base_url.as_deref().unwrap_or("(none)"); let model = config.api_model.as_deref().unwrap_or("(default)"); log_event("daemon", "config-reload", &format!("changed={}, api={}, model={}", changed, api, model)); Some(format!("{{\"ok\":true,\"changed\":{},\"api_base_url\":\"{}\",\"api_model\":\"{}\"}}\n", changed, api, model)) }); daemon.add_rpc_handler(|cmd, _ctx| { if !cmd.starts_with("record-hits ") { return None; } let keys: Vec<&str> = cmd.strip_prefix("record-hits ") .unwrap_or("") .split('\t') .filter(|k| !k.is_empty()) .collect(); if keys.is_empty() { return Some("{\"ok\":false,\"error\":\"no keys\"}\n".into()); } let n = keys.len(); match crate::counters::record_search_hits(&keys) { Ok(()) => Some(format!("{{\"ok\":true,\"recorded\":{}}}\n", n)), Err(e) => Some(format!("{{\"ok\":false,\"error\":\"{}\"}}\n", e.replace('"', "'"))), } }); { let choir_rpc = Arc::clone(&choir); let llm_rpc = Arc::clone(&llm); let log_dir_rpc = task_log_dir.clone(); let in_flight_rpc = Arc::clone(&in_flight); daemon.add_rpc_handler(move |cmd, _ctx| { if !cmd.starts_with("run-agent ") { return None; } let parts: Vec<&str> = cmd.splitn(4, ' ').collect(); let agent_type = parts.get(1).unwrap_or(&"replay"); let count: usize = parts.get(2) .and_then(|s| s.parse().ok()) .unwrap_or(1); // Optional target key: "run-agent linker 1 target:KEY" let target_key: Option = parts.get(3) .and_then(|s| s.strip_prefix("target:")) .map(|s| s.to_string()); let batch_size = 5; let today = chrono::Local::now().format("%Y-%m-%d"); let ts = chrono::Local::now().format("%H%M%S"); let mut prev = None; let mut spawned = 0; let mut remaining = count; let is_rename = *agent_type == "rename"; // Targeted run: one task for a specific node if let Some(ref key) = target_key { let agent = agent_type.to_string(); let key = key.clone(); let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::(), today); if jobkit::daemon::event_log::enabled(jobkit::daemon::event_log::LogLevel::Verbose) { log_event("daemon", "spawn-targeted", &format!("{} (pool: {}/{})", task_name, llm_rpc.available(), llm_rpc.capacity())); } choir_rpc.spawn(task_name) .resource(&llm_rpc) .log_dir(&log_dir_rpc) .retries(1) .init(move |ctx| { job_targeted_agent(ctx, &agent, &key) }) .run(); spawned = 1; remaining = 0; } while remaining > 0 { let batch = remaining.min(batch_size); let agent = agent_type.to_string(); let in_flight_clone = Arc::clone(&in_flight_rpc); let task_name = format!("c-{}-rpc{}:{}", agent, ts, today); let mut builder = choir_rpc.spawn(task_name) .resource(&llm_rpc) .log_dir(&log_dir_rpc) .retries(1) .init(move |ctx| { if is_rename { job_rename_agent(ctx, batch) } else { job_consolidation_agent(ctx, &agent, batch, &in_flight_clone) } }); if let Some(ref dep) = prev { builder.depend_on(dep); } prev = Some(builder.run()); remaining -= batch; spawned += 1; } log_event("rpc", "run-agent", &format!("{} x{}", agent_type, count)); Some(format!("{{\"ok\":true,\"action\":\"queued {} {} run(s) ({} tasks)\"}}\n", count, agent_type, spawned)) }); } // Main thread: socket server + signal handling let last_daily_status = Arc::clone(&last_daily); let graph_health_status = Arc::clone(&graph_health); daemon.run(move |ctx| { build_status(&ctx.choir, *last_daily_status.lock().unwrap(), &graph_health_status) }); log_event("daemon", "stopping", ""); eprintln!("Shutting down..."); log_event("daemon", "stopped", ""); std::process::exit(0) } pub fn send_rpc_pub(cmd: &str) -> Option { send_rpc(cmd) } fn send_rpc(cmd: &str) -> Option { jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) } pub fn rpc_consolidate() -> Result<(), String> { match send_rpc("consolidate") { Some(resp) => { println!("{}", resp.trim()); Ok(()) } None => Err("Daemon not running.".into()), } } /// Record search hits for the given keys (fire-and-forget from memory-search). pub fn rpc_record_hits(keys: &[&str]) -> Result<(), String> { if keys.is_empty() { return Ok(()); } let cmd = format!("record-hits {}", keys.join("\t")); match send_rpc(&cmd) { Some(_) => Ok(()), None => Err("Daemon not running.".into()), } } pub fn rpc_run_agent(agent: &str, count: usize) -> Result<(), String> { let cmd = format!("run-agent {} {}", agent, count); match send_rpc(&cmd) { Some(resp) => { println!("{}", resp.trim()); Ok(()) } None => Err("Daemon not running.".into()), } } fn read_status_socket() -> Option { let json = jobkit::daemon::socket::send_rpc(&crate::config::get().data_dir, "")?; serde_json::from_str(&json).ok() } // status_socket_loop has been replaced by daemon.run() in jobkit-daemon. fn build_status( choir: &Choir, last_daily: Option, graph_health: &Arc>>, ) -> DaemonStatus { DaemonStatus { pid: std::process::id(), tasks: choir.task_statuses(), last_daily: last_daily.map(|d| d.to_string()), graph_health: graph_health.lock().unwrap().clone(), } } // --- Status display --- fn format_duration_human(ms: u128) -> String { if ms < 1_000 { format!("{}ms", ms) } else if ms < 60_000 { format!("{:.1}s", ms as f64 / 1000.0) } else if ms < 3_600_000 { format!("{:.0}m{:.0}s", ms / 60_000, (ms % 60_000) / 1000) } else { format!("{:.0}h{:.0}m", ms / 3_600_000, (ms % 3_600_000) / 60_000) } } fn task_group(name: &str) -> &str { if name == "session-watcher" || name == "scheduler" { "core" } else if name.starts_with("extract:") || name.starts_with("fact-mine:") { "extract" } else if name.starts_with("c-") || name.starts_with("consolidate:") || name.starts_with("knowledge-loop:") || name.starts_with("digest:") || name.starts_with("decay:") { "daily" } else if name == "health" { "health" } else { "other" } } /// Compute elapsed time for a task, using absolute started_at if available. fn task_elapsed(t: &TaskInfo) -> Duration { if matches!(t.status, TaskStatus::Running) { if let Some(started) = t.started_at { let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap_or_default() .as_secs_f64(); Duration::from_secs_f64((now - started).max(0.0)) } else { t.elapsed } } else { t.result.as_ref() .map(|r| r.duration) .unwrap_or(t.elapsed) } } fn status_symbol(t: &TaskInfo) -> &'static str { if t.cancelled { return "✗" } match t.status { TaskStatus::Running => "▶", TaskStatus::Completed => "✓", TaskStatus::Failed => "✗", TaskStatus::Pending => "·", } } /// Shorten a job name for display: "experience-mine /long/path/uuid.jsonl" → "experience-mine uuid…" fn short_job_name(job: &str) -> String { // Split "verb path" or just return as-is if let Some((verb, path)) = job.split_once(' ') { let file = path.rsplit('/').next().unwrap_or(path); let file = file.strip_suffix(".jsonl").unwrap_or(file); let short = if file.len() > 12 { &file[..12] } else { file }; format!("{} {}", verb, short) } else { job.to_string() } } fn show_recent_completions(n: usize) { let path = log_path(); let content = match fs::read_to_string(&path) { Ok(c) => c, Err(_) => return, }; let recent: Vec<&str> = content.lines().rev() .filter(|line| { line.contains("\"event\":\"completed\"") || line.contains("\"event\":\"failed\"") }) .take(n) .collect(); if recent.is_empty() { return; } eprintln!(" Recent:"); for line in recent.iter().rev() { if let Ok(obj) = serde_json::from_str::(line) { let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; let sym = if event == "completed" { "✓" } else { "✗" }; let name = short_job_name(job); eprintln!(" {} {} {:30} {}", sym, time, name, detail); } } eprintln!(); } pub fn show_status() -> Result<(), String> { let status = match read_status_socket() { Some(s) => s, None => { eprintln!("Daemon not running."); return Ok(()); } }; let uptime_str = proc_uptime(status.pid).unwrap_or_default(); if uptime_str.is_empty() { eprintln!("poc-memory daemon pid={}", status.pid); } else { eprintln!("poc-memory daemon pid={} uptime {}", status.pid, uptime_str); } if status.tasks.is_empty() { eprintln!("\n No tasks"); return Ok(()); } // Count by status let running = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); let pending = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); let completed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); let failed = status.tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); eprintln!(" tasks: {} running, {} pending, {} done, {} failed", running, pending, completed, failed); // Graph health if let Some(ref gh) = status.graph_health { eprintln!(); fn indicator(val: f32, target: f32, higher_is_better: bool) -> &'static str { let ok = if higher_is_better { val >= target } else { val <= target }; if ok { "✓" } else { "✗" } } eprintln!(" Graph health ({})", gh.computed_at); eprintln!(" {} nodes, {} edges, {} communities", gh.nodes, gh.edges, gh.communities); eprintln!(" {} α={:.2} (≥2.5) {} gini={:.3} (≤0.4) {} cc={:.3} (≥0.2)", indicator(gh.alpha, 2.5, true), gh.alpha, indicator(gh.gini, 0.4, false), gh.gini, indicator(gh.avg_cc, 0.2, true), gh.avg_cc); eprintln!(" {} episodic={:.0}% (<40%) σ={:.1}", indicator(gh.episodic_ratio, 0.4, false), gh.episodic_ratio * 100.0, gh.sigma); let plan_total: usize = gh.plan_counts.values().sum::() + 1; let plan_summary: Vec = gh.plan_counts.iter() .filter(|(_, c)| **c > 0) .map(|(a, c)| format!("{}{}", &a[..1], c)) .collect(); eprintln!(" consolidation plan: {} agents ({} +health)", plan_total, plan_summary.join(" ")); } eprintln!(); // Group and display let groups: &[(&str, &str)] = &[ ("core", "Core"), ("daily", "Daily pipeline"), ("extract", "Session extraction"), ("health", "Health"), ("other", "Other"), ]; // In-flight tasks first (running + pending) let in_flight: Vec<&TaskInfo> = status.tasks.iter() .filter(|t| matches!(t.status, TaskStatus::Running | TaskStatus::Pending)) .collect(); if !in_flight.is_empty() { eprintln!(" In flight:"); for t in &in_flight { let sym = status_symbol(t); let e = task_elapsed(t); let elapsed = if !e.is_zero() { format!(" {}", format_duration_human(e.as_millis())) } else { String::new() }; let progress = t.progress.as_deref() .filter(|p| *p != "idle") .map(|p| format!(" {}", p)) .unwrap_or_default(); let name = short_job_name(&t.name); eprintln!(" {} {:30}{}{}", sym, name, elapsed, progress); if let Some(ref lp) = t.log_path { // tail from log file if matches!(t.status, TaskStatus::Running) { eprintln!(" │ log: {}", lp); } } } eprintln!(); } // Recent completions from log file show_recent_completions(20); // Detailed group view only if there are failures worth showing for (group_id, group_label) in groups { let tasks: Vec<&TaskInfo> = status.tasks.iter() .filter(|t| task_group(&t.name) == *group_id) .collect(); if tasks.is_empty() { continue; } // For extract group, show summary instead of individual tasks if *group_id == "extract" { let n_pending = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Pending)).count(); let n_running = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)).count(); let n_done = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Completed)).count(); let n_failed = tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).count(); eprintln!(" {} ({} total)", group_label, tasks.len()); if n_running > 0 { for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Running)) { let e = task_elapsed(t); let elapsed = if !e.is_zero() { format!(" ({})", format_duration_human(e.as_millis())) } else { String::new() }; let progress = t.progress.as_deref().map(|p| format!(" {}", p)).unwrap_or_default(); eprintln!(" {} {}{}{}", status_symbol(t), t.name, elapsed, progress); if let Some(ref lp) = t.log_path { eprintln!(" │ log: {}", lp); } } } let mut parts = Vec::new(); if n_done > 0 { parts.push(format!("{} done", n_done)); } if n_running > 0 { parts.push(format!("{} running", n_running)); } if n_pending > 0 { parts.push(format!("{} queued", n_pending)); } if n_failed > 0 { parts.push(format!("{} FAILED", n_failed)); } eprintln!(" {}", parts.join(", ")); // Show recent failures for t in tasks.iter().filter(|t| matches!(t.status, TaskStatus::Failed)).take(3) { if let Some(ref r) = t.result { if let Some(ref err) = r.error { let short = if err.len() > 80 { &err[..80] } else { err }; eprintln!(" ✗ {}: {}", t.name, short); } } } eprintln!(); continue; } eprintln!(" {}", group_label); for t in &tasks { let sym = status_symbol(t); let e = task_elapsed(t); let duration = if !e.is_zero() { format_duration_human(e.as_millis()) } else { String::new() }; let retry = if t.max_retries > 0 && t.retry_count > 0 { format!(" retry {}/{}", t.retry_count, t.max_retries) } else { String::new() }; let detail = if matches!(t.status, TaskStatus::Failed) { t.result.as_ref() .and_then(|r| r.error.as_ref()) .map(|e| { let short = if e.len() > 60 { &e[..60] } else { e }; format!(" err: {}", short) }) .unwrap_or_default() } else { String::new() }; if duration.is_empty() { eprintln!(" {} {:30}{}{}", sym, t.name, retry, detail); } else { eprintln!(" {} {:30} {:>8}{}{}", sym, t.name, duration, retry, detail); } // Show output log tail for running tasks if let Some(ref lp) = t.log_path { // tail from log file if matches!(t.status, TaskStatus::Running) { eprintln!(" │ log: {}", lp); } } } eprintln!(); } Ok(()) } pub fn install_service() -> Result<(), String> { let exe = std::env::current_exe() .map_err(|e| format!("current_exe: {}", e))?; let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?; let unit_dir = PathBuf::from(&home).join(".config/systemd/user"); fs::create_dir_all(&unit_dir) .map_err(|e| format!("create {}: {}", unit_dir.display(), e))?; let unit = format!( r#"[Unit] Description=poc-memory daemon — background memory maintenance After=default.target [Service] Type=simple ExecStart={exe} agent daemon Restart=on-failure RestartSec=30 Environment=HOME={home} Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin [Install] WantedBy=default.target "#, exe = exe.display(), home = home); let unit_path = unit_dir.join("poc-memory.service"); fs::write(&unit_path, &unit) .map_err(|e| format!("write {}: {}", unit_path.display(), e))?; eprintln!("Wrote {}", unit_path.display()); let status = std::process::Command::new("systemctl") .args(["--user", "daemon-reload"]) .status() .map_err(|e| format!("systemctl daemon-reload: {}", e))?; if !status.success() { return Err("systemctl daemon-reload failed".into()); } let status = std::process::Command::new("systemctl") .args(["--user", "enable", "--now", "poc-memory"]) .status() .map_err(|e| format!("systemctl enable: {}", e))?; if !status.success() { return Err("systemctl enable --now failed".into()); } eprintln!("Service enabled and started"); // Install poc-daemon service install_notify_daemon(&unit_dir, &home)?; // Install memory-search + poc-hook into Claude settings install_hook()?; Ok(()) } /// Install the poc-daemon (notification/idle) systemd user service. fn install_notify_daemon(unit_dir: &Path, home: &str) -> Result<(), String> { let poc_daemon = PathBuf::from(home).join(".cargo/bin/poc-daemon"); if !poc_daemon.exists() { eprintln!("Warning: poc-daemon not found at {} — skipping service install", poc_daemon.display()); eprintln!(" Build with: cargo install --path ."); return Ok(()); } let unit = format!( r#"[Unit] Description=poc-daemon — notification routing and idle management After=default.target [Service] Type=simple ExecStart={exe} agent daemon Restart=on-failure RestartSec=10 Environment=HOME={home} Environment=PATH={home}/.cargo/bin:{home}/.local/bin:{home}/bin:/usr/local/bin:/usr/bin:/bin [Install] WantedBy=default.target "#, exe = poc_daemon.display(), home = home); let unit_path = unit_dir.join("poc-daemon.service"); fs::write(&unit_path, &unit) .map_err(|e| format!("write {}: {}", unit_path.display(), e))?; eprintln!("Wrote {}", unit_path.display()); let status = std::process::Command::new("systemctl") .args(["--user", "daemon-reload"]) .status() .map_err(|e| format!("systemctl daemon-reload: {}", e))?; if !status.success() { return Err("systemctl daemon-reload failed".into()); } let status = std::process::Command::new("systemctl") .args(["--user", "enable", "--now", "poc-daemon"]) .status() .map_err(|e| format!("systemctl enable: {}", e))?; if !status.success() { return Err("systemctl enable --now poc-daemon failed".into()); } eprintln!("poc-daemon service enabled and started"); Ok(()) } /// Install memory-search and poc-hook into Claude Code settings.json. /// Public so `poc-memory init` can call it too. /// /// Hook layout: /// UserPromptSubmit: memory-search (10s), poc-hook (5s) /// PostToolUse: poc-hook (5s) /// Stop: poc-hook (5s) pub fn install_hook() -> Result<(), String> { let home = std::env::var("HOME").map_err(|e| format!("HOME: {}", e))?; let exe = std::env::current_exe() .map_err(|e| format!("current_exe: {}", e))?; let settings_path = PathBuf::from(&home).join(".claude/settings.json"); let memory_search = exe.with_file_name("memory-search"); let poc_hook = exe.with_file_name("poc-hook"); let mut settings: serde_json::Value = if settings_path.exists() { let content = fs::read_to_string(&settings_path) .map_err(|e| format!("read settings: {}", e))?; serde_json::from_str(&content) .map_err(|e| format!("parse settings: {}", e))? } else { serde_json::json!({}) }; let obj = settings.as_object_mut().ok_or("settings not an object")?; let hooks_obj = obj.entry("hooks") .or_insert_with(|| serde_json::json!({})) .as_object_mut().ok_or("hooks not an object")?; let mut changed = false; // Helper: ensure a hook binary is present in an event's hook list let ensure_hook = |hooks_obj: &mut serde_json::Map, event: &str, binary: &Path, timeout: u32, changed: &mut bool| { if !binary.exists() { eprintln!("Warning: {} not found — skipping", binary.display()); return; } let cmd = binary.to_string_lossy().to_string(); let name = binary.file_name().unwrap().to_string_lossy().to_string(); let event_array = hooks_obj.entry(event) .or_insert_with(|| serde_json::json!([{"hooks": []}])) .as_array_mut().unwrap(); if event_array.is_empty() { event_array.push(serde_json::json!({"hooks": []})); } let inner = event_array[0] .as_object_mut().unwrap() .entry("hooks") .or_insert_with(|| serde_json::json!([])) .as_array_mut().unwrap(); // Remove legacy load-memory.sh let before = inner.len(); inner.retain(|h| { let c = h.get("command").and_then(|c| c.as_str()).unwrap_or(""); !c.contains("load-memory") }); if inner.len() < before { eprintln!("Removed load-memory.sh from {event}"); *changed = true; } let already = inner.iter().any(|h| { h.get("command").and_then(|c| c.as_str()) .is_some_and(|c| c.contains(&name)) }); if !already { inner.push(serde_json::json!({ "type": "command", "command": cmd, "timeout": timeout })); *changed = true; eprintln!("Installed {name} in {event}"); } }; // UserPromptSubmit: memory-search + poc-hook ensure_hook(hooks_obj, "UserPromptSubmit", &memory_search, 10, &mut changed); ensure_hook(hooks_obj, "UserPromptSubmit", &poc_hook, 5, &mut changed); // PostToolUse + Stop: poc-hook only ensure_hook(hooks_obj, "PostToolUse", &poc_hook, 5, &mut changed); ensure_hook(hooks_obj, "Stop", &poc_hook, 5, &mut changed); if changed { let json = serde_json::to_string_pretty(&settings) .map_err(|e| format!("serialize settings: {}", e))?; fs::write(&settings_path, json) .map_err(|e| format!("write settings: {}", e))?; eprintln!("Updated {}", settings_path.display()); } else { eprintln!("All hooks already installed in {}", settings_path.display()); } Ok(()) } /// Drill down into a task's log file. Finds the log path from: /// 1. Running task status (daemon-status.json) /// 2. daemon.log started events (for completed/failed tasks) pub fn show_task_log(task_name: &str, lines: usize) -> Result<(), String> { // Try running tasks first if let Some(status_json) = send_rpc_pub("") { if let Ok(status) = serde_json::from_str::(&status_json) { if let Some(tasks) = status.get("tasks").and_then(|t| t.as_array()) { for t in tasks { let name = t.get("name").and_then(|n| n.as_str()).unwrap_or(""); if name.contains(task_name) { if let Some(lp) = t.get("log_path").and_then(|p| p.as_str()) { return tail_file(lp, lines); } } } } } } // Fall back to searching daemon.log for the most recent started event with a log path let log = log_path(); if log.exists() { let content = fs::read_to_string(&log).map_err(|e| format!("read log: {}", e))?; for line in content.lines().rev() { if let Ok(obj) = serde_json::from_str::(line) { let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or(""); let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or(""); let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); if job.contains(task_name) && event == "started" && detail.starts_with("log: ") { let path = &detail[5..]; return tail_file(path, lines); } } } } Err(format!("no log file found for task '{}'", task_name)) } fn tail_file(path: &str, lines: usize) -> Result<(), String> { let content = fs::read_to_string(path) .map_err(|e| format!("read {}: {}", path, e))?; let all_lines: Vec<&str> = content.lines().collect(); let skip = all_lines.len().saturating_sub(lines); eprintln!("--- {} ({} lines) ---", path, all_lines.len()); for line in &all_lines[skip..] { eprintln!("{}", line); } Ok(()) } pub fn show_log(job_filter: Option<&str>, lines: usize) -> Result<(), String> { let path = log_path(); if !path.exists() { eprintln!("No daemon log found."); return Ok(()); } let content = fs::read_to_string(&path) .map_err(|e| format!("read log: {}", e))?; let filtered: Vec<&str> = content.lines().rev() .filter(|line| { if let Some(job) = job_filter { line.contains(&format!("\"job\":\"{}\"", job)) } else { true } }) .take(lines) .collect(); if filtered.is_empty() { eprintln!("No log entries{}", job_filter.map(|j| format!(" for job '{}'", j)).unwrap_or_default()); return Ok(()); } // Pretty-print: parse JSON and format as "TIME JOB EVENT [DETAIL]" for line in filtered.into_iter().rev() { if let Ok(obj) = serde_json::from_str::(line) { let ts = obj.get("ts").and_then(|v| v.as_str()).unwrap_or("?"); let job = obj.get("job").and_then(|v| v.as_str()).unwrap_or("?"); let event = obj.get("event").and_then(|v| v.as_str()).unwrap_or("?"); let detail = obj.get("detail").and_then(|v| v.as_str()).unwrap_or(""); // Shorten timestamp to just time portion let time = if ts.len() >= 19 { &ts[11..19] } else { ts }; if detail.is_empty() { eprintln!(" {} {:20} {}", time, job, event); } else { // Truncate long details (file paths) let short = if detail.len() > 60 { let last = detail.rfind('/').map(|i| &detail[i+1..]).unwrap_or(detail); if last.len() > 60 { &last[..60] } else { last } } else { detail }; eprintln!(" {} {:20} {:12} {}", time, job, event, short); } } else { eprintln!("{}", line); } } Ok(()) }