// knowledge.rs — agent execution and conversation fragment selection // // Agent prompts live in agents/*.agent files, dispatched via defs.rs. // This module handles: // - Agent execution (build prompt → call LLM with tools → log) // - Conversation fragment selection (for observation agent) // // Agents apply changes via tool calls (poc-memory write/link-add/etc) // during the LLM call — no action parsing needed. use super::llm; use crate::store::{self, Store}; use std::fs; use std::path::PathBuf; use std::sync::atomic::{AtomicPtr, Ordering}; // Global pid path for signal handler cleanup — stored as a leaked CString // so the signal handler can unlink it without allocation. static PID_CPATH: AtomicPtr = AtomicPtr::new(std::ptr::null_mut()); /// RAII guard that removes the pid file on drop (normal exit, panic). struct PidGuard; impl Drop for PidGuard { fn drop(&mut self) { let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst); if !ptr.is_null() { unsafe { libc::unlink(ptr); } // Reclaim the leaked CString unsafe { drop(std::ffi::CString::from_raw(ptr)); } } } } /// Register signal handlers to clean up pid file on SIGTERM/SIGINT. fn register_pid_cleanup(pid_path: &std::path::Path) { let c_path = std::ffi::CString::new(pid_path.to_string_lossy().as_bytes()) .expect("pid path contains null"); // Leak the CString so the signal handler can access it let old = PID_CPATH.swap(c_path.into_raw(), Ordering::SeqCst); if !old.is_null() { unsafe { drop(std::ffi::CString::from_raw(old)); } } unsafe { libc::signal(libc::SIGTERM, pid_cleanup_handler as libc::sighandler_t); libc::signal(libc::SIGINT, pid_cleanup_handler as libc::sighandler_t); } } extern "C" fn pid_cleanup_handler(sig: libc::c_int) { let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst); if !ptr.is_null() { unsafe { libc::unlink(ptr); } // Don't free — we're in a signal handler, just leak it } unsafe { libc::signal(sig, libc::SIG_DFL); libc::raise(sig); } } // --------------------------------------------------------------------------- // Agent execution // --------------------------------------------------------------------------- /// Result of running a single agent. pub struct AgentResult { pub output: String, pub node_keys: Vec, /// Directory containing output() files from the agent run. pub state_dir: std::path::PathBuf, } /// Run a single agent and return the result (no action application — tools handle that). pub fn run_and_apply( store: &mut Store, agent_name: &str, batch_size: usize, llm_tag: &str, ) -> Result<(), String> { run_and_apply_with_log(store, agent_name, batch_size, llm_tag, &|_| {}) } pub fn run_and_apply_with_log( store: &mut Store, agent_name: &str, batch_size: usize, llm_tag: &str, log: &(dyn Fn(&str) + Sync), ) -> Result<(), String> { run_and_apply_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default()) } /// Like run_and_apply_with_log but with an in-flight exclusion set. /// Returns the keys that were processed (for the daemon to track). pub fn run_and_apply_excluded( store: &mut Store, agent_name: &str, batch_size: usize, llm_tag: &str, log: &(dyn Fn(&str) + Sync), exclude: &std::collections::HashSet, ) -> Result<(), String> { let result = run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, exclude)?; // Mark conversation segments as mined after successful processing if agent_name == "observation" { mark_observation_done(&result.node_keys); } Ok(()) } /// Run an agent with explicit target keys, bypassing the agent's query. pub fn run_one_agent_with_keys( store: &mut Store, agent_name: &str, keys: &[String], count: usize, llm_tag: &str, log: &(dyn Fn(&str) + Sync), ) -> Result { let def = super::defs::get_def(agent_name) .ok_or_else(|| format!("no .agent file for {}", agent_name))?; let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?; log(&format!("targeting: {}", keys.join(", "))); let graph = store.build_graph(); let mut resolved_steps = Vec::new(); let mut all_keys: Vec = keys.to_vec(); for step in &def.steps { let (prompt, extra_keys) = super::defs::resolve_placeholders( &step.prompt, store, &graph, keys, count, ); all_keys.extend(extra_keys); resolved_steps.push(super::prompts::ResolvedStep { prompt, phase: step.phase.clone(), }); } let agent_batch = super::prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }; // Record visits eagerly so concurrent agents pick different seeds if !agent_batch.node_keys.is_empty() { store.record_agent_visits(&agent_batch.node_keys, agent_name).ok(); } run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log) } pub fn run_one_agent( store: &mut Store, agent_name: &str, batch_size: usize, llm_tag: &str, log: &(dyn Fn(&str) + Sync), ) -> Result { run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default()) } /// Like run_one_agent but excludes nodes currently being worked on by other agents. pub fn run_one_agent_excluded( store: &mut Store, agent_name: &str, batch_size: usize, llm_tag: &str, log: &(dyn Fn(&str) + Sync), exclude: &std::collections::HashSet, ) -> Result { let def = super::defs::get_def(agent_name) .ok_or_else(|| format!("no .agent file for {}", agent_name))?; // Set up output dir and write pid file BEFORE prompt building let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?; log("building prompt"); let effective_count = def.count.unwrap_or(batch_size); let agent_batch = super::defs::run_agent(store, &def, effective_count, exclude)?; run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log) } /// Set up agent state dir, write initial pid file, register cleanup handlers. /// Returns (state_dir, pid_path, guard). The guard removes the pid file on drop. fn setup_agent_state( agent_name: &str, def: &super::defs::AgentDef, ) -> Result<(PathBuf, PathBuf, PidGuard), String> { let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR") .map(PathBuf::from) .unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name)); fs::create_dir_all(&state_dir) .map_err(|e| format!("create state dir: {}", e))?; unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); } // Clean up stale pid files from dead processes scan_pid_files(&state_dir, 0); let pid = std::process::id(); let pid_path = state_dir.join(format!("pid-{}", pid)); let first_phase = def.steps.first() .map(|s| s.phase.as_str()) .unwrap_or("step-0"); fs::write(&pid_path, first_phase).ok(); // Register for cleanup on signals and normal exit register_pid_cleanup(&pid_path); Ok((state_dir, pid_path, PidGuard)) } /// Check for live agent processes in a state dir. Returns (phase, pid) pairs. /// Cleans up stale pid files and kills timed-out processes. pub fn scan_pid_files(state_dir: &std::path::Path, timeout_secs: u64) -> Vec<(String, u32)> { let mut live = Vec::new(); let Ok(entries) = fs::read_dir(state_dir) else { return live }; for entry in entries.flatten() { let name = entry.file_name(); let name_str = name.to_string_lossy(); if !name_str.starts_with("pid-") { continue; } let pid: u32 = name_str.strip_prefix("pid-") .and_then(|s| s.parse().ok()) .unwrap_or(0); if pid == 0 { continue; } if unsafe { libc::kill(pid as i32, 0) } != 0 { fs::remove_file(entry.path()).ok(); continue; } if timeout_secs > 0 { if let Ok(meta) = entry.metadata() { if let Ok(modified) = meta.modified() { if modified.elapsed().unwrap_or_default().as_secs() > timeout_secs { unsafe { libc::kill(pid as i32, libc::SIGTERM); } fs::remove_file(entry.path()).ok(); continue; } } } } let phase = fs::read_to_string(entry.path()) .unwrap_or_default() .trim().to_string(); live.push((phase, pid)); } live } /// Spawn an agent asynchronously. Writes the pid file before returning /// so the caller immediately sees the agent as running. pub fn spawn_agent( agent_name: &str, state_dir: &std::path::Path, session_id: &str, ) -> Option { let def = super::defs::get_def(agent_name)?; let first_phase = def.steps.first() .map(|s| s.phase.as_str()) .unwrap_or("step-0"); let log_dir = dirs::home_dir().unwrap_or_default() .join(format!(".consciousness/logs/{}", agent_name)); fs::create_dir_all(&log_dir).ok(); let agent_log = fs::File::create( log_dir.join(format!("{}.log", store::compact_timestamp()))) .unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()); let child = std::process::Command::new("poc-memory") .args(["agent", "run", agent_name, "--count", "1", "--local", "--state-dir", &state_dir.to_string_lossy()]) .env("POC_SESSION_ID", session_id) .stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap())) .stderr(agent_log) .spawn() .ok()?; let pid = child.id(); let pid_path = state_dir.join(format!("pid-{}", pid)); fs::write(&pid_path, first_phase).ok(); Some(pid) } fn run_one_agent_inner( _store: &mut Store, agent_name: &str, def: &super::defs::AgentDef, agent_batch: super::prompts::AgentBatch, state_dir: std::path::PathBuf, pid_path: std::path::PathBuf, _llm_tag: &str, log: &(dyn Fn(&str) + Sync), ) -> Result { let tools_desc = if def.tools.is_empty() { "no tools".into() } else { format!("{} tools", def.tools.len()) }; let n_steps = agent_batch.steps.len(); for key in &agent_batch.node_keys { log(&format!(" node: {}", key)); } // Guard: reject oversized first prompt (later steps grow via conversation) let max_prompt_bytes = 800_000; let first_len = agent_batch.steps[0].prompt.len(); if first_len > max_prompt_bytes { let prompt_kb = first_len / 1024; let oversize_dir = store::memory_dir().join("llm-logs").join("oversized"); fs::create_dir_all(&oversize_dir).ok(); let oversize_path = oversize_dir.join(format!("{}-{}.txt", agent_name, store::compact_timestamp())); let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n", agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys); fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok(); log(&format!("oversized prompt logged to {}", oversize_path.display())); return Err(format!( "prompt too large: {}KB (max {}KB) — seed nodes may be oversized", prompt_kb, max_prompt_bytes / 1024, )); } let write_pid = |phase: &str| { fs::write(&pid_path, phase).ok(); }; let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect(); log(&format!("{} step(s) {:?}, {}KB initial, model={}, {}, {} nodes, output={}", n_steps, phases, first_len / 1024, def.model, tools_desc, agent_batch.node_keys.len(), state_dir.display())); let prompts: Vec = agent_batch.steps.iter() .map(|s| s.prompt.clone()).collect(); let step_phases: Vec = agent_batch.steps.iter() .map(|s| s.phase.clone()).collect(); let step_phases_for_bail = step_phases.clone(); if std::env::var("POC_AGENT_VERBOSE").is_ok() { for (i, s) in agent_batch.steps.iter().enumerate() { log(&format!("=== PROMPT {}/{} ({}) ===\n\n{}", i + 1, n_steps, s.phase, s.prompt)); } } log("\n=== CALLING LLM ==="); // Bail check: if the agent defines a bail script, run it between steps. // The script receives the pid file path as $1, cwd = state dir. let bail_script = def.bail.as_ref().map(|name| { // Look for the script next to the .agent file let agents_dir = super::defs::agents_dir(); agents_dir.join(name) }); let state_dir_for_bail = state_dir.clone(); let pid_path_for_bail = pid_path.clone(); let bail_fn = move |step_idx: usize| -> Result<(), String> { // Update phase in pid file and provenance tracking if step_idx < step_phases_for_bail.len() { write_pid(&step_phases_for_bail[step_idx]); } // Run bail script if defined if let Some(ref script) = bail_script { let status = std::process::Command::new(script) .arg(&pid_path_for_bail) .current_dir(&state_dir_for_bail) .status() .map_err(|e| format!("bail script {:?} failed: {}", script, e))?; if !status.success() { return Err(format!("bailed at step {}: {:?} exited {}", step_idx + 1, script.file_name().unwrap_or_default(), status.code().unwrap_or(-1))); } } Ok(()) }; let output = llm::call_for_def_multi(def, &prompts, &step_phases, Some(&bail_fn), log)?; Ok(AgentResult { output, node_keys: agent_batch.node_keys, state_dir, }) } // --------------------------------------------------------------------------- // Conversation fragment selection // --------------------------------------------------------------------------- /// Select conversation fragments (per-segment) for the observation extractor. /// Uses the transcript-progress.capnp log for dedup — no stub nodes. /// Does NOT pre-mark segments; caller must call mark_observation_done() after success. pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> { let projects = crate::config::get().projects_dir.clone(); if !projects.exists() { return Vec::new(); } let store = match crate::store::Store::load() { Ok(s) => s, Err(_) => return Vec::new(), }; let mut jsonl_files: Vec = Vec::new(); if let Ok(dirs) = fs::read_dir(&projects) { for dir in dirs.filter_map(|e| e.ok()) { if !dir.path().is_dir() { continue; } if let Ok(files) = fs::read_dir(dir.path()) { for f in files.filter_map(|e| e.ok()) { let p = f.path(); if p.extension().map(|x| x == "jsonl").unwrap_or(false) && let Ok(meta) = p.metadata() && meta.len() > 50_000 { jsonl_files.push(p); } } } } } // Collect unmined segments across all transcripts let mut candidates: Vec<(String, String)> = Vec::new(); for path in &jsonl_files { let path_str = path.to_string_lossy(); let messages = match super::enrich::extract_conversation(&path_str) { Ok(m) => m, Err(_) => continue, }; let session_id = path.file_stem() .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|| "unknown".into()); let segments = super::enrich::split_on_compaction(messages); for (seg_idx, segment) in segments.into_iter().enumerate() { if store.is_segment_mined(&session_id, seg_idx as u32, "observation") { continue; } // Skip segments with too few assistant messages (rate limits, errors) let assistant_msgs = segment.iter() .filter(|(_, role, _, _)| role == "assistant") .count(); if assistant_msgs < 2 { continue; } // Skip segments that are just rate limit errors let has_rate_limit = segment.iter().any(|(_, _, text, _)| text.contains("hit your limit") || text.contains("rate limit")); if has_rate_limit && assistant_msgs < 3 { continue; } let text = format_segment(&segment); if text.len() < 500 { continue; } const CHUNK_SIZE: usize = 50_000; const OVERLAP: usize = 10_000; if text.len() <= CHUNK_SIZE { let id = format!("{}.{}", session_id, seg_idx); candidates.push((id, text)); } else { // Split on line boundaries with overlap let lines: Vec<&str> = text.lines().collect(); let mut start_line = 0; let mut chunk_idx = 0; while start_line < lines.len() { let mut end_line = start_line; let mut size = 0; while end_line < lines.len() && size < CHUNK_SIZE { size += lines[end_line].len() + 1; end_line += 1; } let chunk: String = lines[start_line..end_line].join("\n"); let id = format!("{}.{}.{}", session_id, seg_idx, chunk_idx); candidates.push((id, chunk)); if end_line >= lines.len() { break; } // Back up by overlap amount for next chunk let mut overlap_size = 0; let mut overlap_start = end_line; while overlap_start > start_line && overlap_size < OVERLAP { overlap_start -= 1; overlap_size += lines[overlap_start].len() + 1; } start_line = overlap_start; chunk_idx += 1; } } } if candidates.len() >= n { break; } } candidates.truncate(n); candidates } /// Mark observation segments as successfully mined (call AFTER the agent succeeds). pub fn mark_observation_done(fragment_ids: &[String]) { let mut store = match crate::store::Store::load() { Ok(s) => s, Err(_) => return, }; for id in fragment_ids { if let Some((session_id, seg_str)) = id.rsplit_once('.') && let Ok(seg) = seg_str.parse::() { let _ = store.mark_segment_mined(session_id, seg, "observation"); } } } /// Format a segment's messages into readable text for the observation agent. fn format_segment(messages: &[(usize, String, String, String)]) -> String { let cfg = crate::config::get(); let mut fragments = Vec::new(); for (_, role, text, ts) in messages { let min_len = if role == "user" { 5 } else { 10 }; if text.len() <= min_len { continue; } let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name }; if ts.is_empty() { fragments.push(format!("**{}:** {}", name, text)); } else { fragments.push(format!("**{}** {}: {}", name, &ts[..ts.len().min(19)], text)); } } fragments.join("\n\n") }