consciousness/src/subconscious/knowledge.rs
Kent Overstreet 916f14a092 Log effective tool list, not just whitelist
Shows the actual tool names each agent will receive after
whitelist filtering, so logs are accurate regardless of whether
tools is empty (all) or specified.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-01 15:20:00 -04:00

532 lines
20 KiB
Rust

// knowledge.rs — agent execution and conversation fragment selection
//
// Agent prompts live in agents/*.agent files, dispatched via defs.rs.
// This module handles:
// - Agent execution (build prompt → call LLM with tools → log)
// - Conversation fragment selection (for observation agent)
//
// Agents apply changes via tool calls (poc-memory write/link-add/etc)
// during the LLM call — no action parsing needed.
use super::llm;
use crate::store::{self, Store};
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicPtr, Ordering};
// Global pid path for signal handler cleanup — stored as a leaked CString
// so the signal handler can unlink it without allocation.
static PID_CPATH: AtomicPtr<libc::c_char> = AtomicPtr::new(std::ptr::null_mut());
/// RAII guard that removes the pid file on drop (normal exit, panic).
struct PidGuard;
impl Drop for PidGuard {
fn drop(&mut self) {
let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst);
if !ptr.is_null() {
unsafe { libc::unlink(ptr); }
// Reclaim the leaked CString
unsafe { drop(std::ffi::CString::from_raw(ptr)); }
}
}
}
/// Register signal handlers to clean up pid file on SIGTERM/SIGINT.
fn register_pid_cleanup(pid_path: &std::path::Path) {
let c_path = std::ffi::CString::new(pid_path.to_string_lossy().as_bytes())
.expect("pid path contains null");
// Leak the CString so the signal handler can access it
let old = PID_CPATH.swap(c_path.into_raw(), Ordering::SeqCst);
if !old.is_null() {
unsafe { drop(std::ffi::CString::from_raw(old)); }
}
unsafe {
libc::signal(libc::SIGTERM, pid_cleanup_handler as libc::sighandler_t);
libc::signal(libc::SIGINT, pid_cleanup_handler as libc::sighandler_t);
}
}
extern "C" fn pid_cleanup_handler(sig: libc::c_int) {
let ptr = PID_CPATH.swap(std::ptr::null_mut(), Ordering::SeqCst);
if !ptr.is_null() {
unsafe { libc::unlink(ptr); }
// Don't free — we're in a signal handler, just leak it
}
unsafe {
libc::signal(sig, libc::SIG_DFL);
libc::raise(sig);
}
}
// ---------------------------------------------------------------------------
// Agent execution
// ---------------------------------------------------------------------------
/// Result of running a single agent.
pub struct AgentResult {
pub output: String,
pub node_keys: Vec<String>,
/// Directory containing output() files from the agent run.
pub state_dir: std::path::PathBuf,
}
/// Run a single agent and return the result (no action application — tools handle that).
pub fn run_and_apply(
store: &mut Store,
agent_name: &str,
batch_size: usize,
llm_tag: &str,
) -> Result<(), String> {
run_and_apply_with_log(store, agent_name, batch_size, llm_tag, &|_| {})
}
pub fn run_and_apply_with_log(
store: &mut Store,
agent_name: &str,
batch_size: usize,
llm_tag: &str,
log: &(dyn Fn(&str) + Sync),
) -> Result<(), String> {
run_and_apply_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default())
}
/// Like run_and_apply_with_log but with an in-flight exclusion set.
/// Returns the keys that were processed (for the daemon to track).
pub fn run_and_apply_excluded(
store: &mut Store,
agent_name: &str,
batch_size: usize,
llm_tag: &str,
log: &(dyn Fn(&str) + Sync),
exclude: &std::collections::HashSet<String>,
) -> Result<(), String> {
let result = run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, exclude)?;
// Mark conversation segments as mined after successful processing
if agent_name == "observation" {
mark_observation_done(&result.node_keys);
}
Ok(())
}
/// Run an agent with explicit target keys, bypassing the agent's query.
pub fn run_one_agent_with_keys(
store: &mut Store,
agent_name: &str,
keys: &[String],
count: usize,
llm_tag: &str,
log: &(dyn Fn(&str) + Sync),
) -> Result<AgentResult, String> {
let def = super::defs::get_def(agent_name)
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?;
log(&format!("targeting: {}", keys.join(", ")));
let graph = store.build_graph();
let mut resolved_steps = Vec::new();
let mut all_keys: Vec<String> = keys.to_vec();
for step in &def.steps {
let (prompt, extra_keys) = super::defs::resolve_placeholders(
&step.prompt, store, &graph, keys, count,
);
all_keys.extend(extra_keys);
resolved_steps.push(super::prompts::ResolvedStep {
prompt,
phase: step.phase.clone(),
});
}
let agent_batch = super::prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys };
// Record visits eagerly so concurrent agents pick different seeds
if !agent_batch.node_keys.is_empty() {
store.record_agent_visits(&agent_batch.node_keys, agent_name).ok();
}
run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log)
}
pub fn run_one_agent(
store: &mut Store,
agent_name: &str,
batch_size: usize,
llm_tag: &str,
log: &(dyn Fn(&str) + Sync),
) -> Result<AgentResult, String> {
run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default())
}
/// Like run_one_agent but excludes nodes currently being worked on by other agents.
pub fn run_one_agent_excluded(
store: &mut Store,
agent_name: &str,
batch_size: usize,
llm_tag: &str,
log: &(dyn Fn(&str) + Sync),
exclude: &std::collections::HashSet<String>,
) -> Result<AgentResult, String> {
let def = super::defs::get_def(agent_name)
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
// Set up output dir and write pid file BEFORE prompt building
let (state_dir, pid_path, _guard) = setup_agent_state(agent_name, &def)?;
log("building prompt");
let effective_count = def.count.unwrap_or(batch_size);
let agent_batch = super::defs::run_agent(store, &def, effective_count, exclude)?;
run_one_agent_inner(store, agent_name, &def, agent_batch, state_dir, pid_path, llm_tag, log)
}
/// Set up agent state dir, write initial pid file, register cleanup handlers.
/// Returns (state_dir, pid_path, guard). The guard removes the pid file on drop.
fn setup_agent_state(
agent_name: &str,
def: &super::defs::AgentDef,
) -> Result<(PathBuf, PathBuf, PidGuard), String> {
let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| store::memory_dir().join("agent-output").join(agent_name));
fs::create_dir_all(&state_dir)
.map_err(|e| format!("create state dir: {}", e))?;
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); }
// Clean up stale pid files from dead processes
scan_pid_files(&state_dir, 0);
let pid = std::process::id();
let pid_path = state_dir.join(format!("pid-{}", pid));
let first_phase = def.steps.first()
.map(|s| s.phase.as_str())
.unwrap_or("step-0");
fs::write(&pid_path, first_phase).ok();
// Register for cleanup on signals and normal exit
register_pid_cleanup(&pid_path);
Ok((state_dir, pid_path, PidGuard))
}
/// Check for live agent processes in a state dir. Returns (phase, pid) pairs.
/// Cleans up stale pid files and kills timed-out processes.
pub fn scan_pid_files(state_dir: &std::path::Path, timeout_secs: u64) -> Vec<(String, u32)> {
let mut live = Vec::new();
let Ok(entries) = fs::read_dir(state_dir) else { return live };
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if !name_str.starts_with("pid-") { continue; }
let pid: u32 = name_str.strip_prefix("pid-")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if pid == 0 { continue; }
if unsafe { libc::kill(pid as i32, 0) } != 0 {
fs::remove_file(entry.path()).ok();
continue;
}
if timeout_secs > 0 {
if let Ok(meta) = entry.metadata() {
if let Ok(modified) = meta.modified() {
if modified.elapsed().unwrap_or_default().as_secs() > timeout_secs {
unsafe { libc::kill(pid as i32, libc::SIGTERM); }
fs::remove_file(entry.path()).ok();
continue;
}
}
}
}
let phase = fs::read_to_string(entry.path())
.unwrap_or_default()
.trim().to_string();
live.push((phase, pid));
}
live
}
/// Spawn an agent asynchronously. Writes the pid file before returning
/// so the caller immediately sees the agent as running.
pub fn spawn_agent(
agent_name: &str,
state_dir: &std::path::Path,
session_id: &str,
) -> Option<u32> {
let def = super::defs::get_def(agent_name)?;
let first_phase = def.steps.first()
.map(|s| s.phase.as_str())
.unwrap_or("step-0");
let log_dir = dirs::home_dir().unwrap_or_default()
.join(format!(".consciousness/logs/{}", agent_name));
fs::create_dir_all(&log_dir).ok();
let agent_log = fs::File::create(
log_dir.join(format!("{}.log", store::compact_timestamp())))
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
let child = std::process::Command::new("poc-memory")
.args(["agent", "run", agent_name, "--count", "1", "--local",
"--state-dir", &state_dir.to_string_lossy()])
.env("POC_SESSION_ID", session_id)
.stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()))
.stderr(agent_log)
.spawn()
.ok()?;
let pid = child.id();
let pid_path = state_dir.join(format!("pid-{}", pid));
fs::write(&pid_path, first_phase).ok();
Some(pid)
}
fn run_one_agent_inner(
_store: &mut Store,
agent_name: &str,
def: &super::defs::AgentDef,
agent_batch: super::prompts::AgentBatch,
state_dir: std::path::PathBuf,
pid_path: std::path::PathBuf,
_llm_tag: &str,
log: &(dyn Fn(&str) + Sync),
) -> Result<AgentResult, String> {
let all_tools = crate::thought::memory_and_journal_definitions();
let effective_tools: Vec<String> = if def.tools.is_empty() {
all_tools.iter().map(|t| t.function.name.clone()).collect()
} else {
all_tools.iter()
.filter(|t| def.tools.iter().any(|w| w == &t.function.name))
.map(|t| t.function.name.clone())
.collect()
};
let tools_desc = effective_tools.join(", ");
let n_steps = agent_batch.steps.len();
for key in &agent_batch.node_keys {
log(&format!(" node: {}", key));
}
// Guard: reject oversized first prompt (later steps grow via conversation)
let max_prompt_bytes = 800_000;
let first_len = agent_batch.steps[0].prompt.len();
if first_len > max_prompt_bytes {
let prompt_kb = first_len / 1024;
let oversize_dir = store::memory_dir().join("llm-logs").join("oversized");
fs::create_dir_all(&oversize_dir).ok();
let oversize_path = oversize_dir.join(format!("{}-{}.txt",
agent_name, store::compact_timestamp()));
let header = format!("=== OVERSIZED PROMPT ===\nagent: {}\nsize: {}KB (max {}KB)\nnodes: {:?}\n\n",
agent_name, prompt_kb, max_prompt_bytes / 1024, agent_batch.node_keys);
fs::write(&oversize_path, format!("{}{}", header, &agent_batch.steps[0].prompt)).ok();
log(&format!("oversized prompt logged to {}", oversize_path.display()));
return Err(format!(
"prompt too large: {}KB (max {}KB) — seed nodes may be oversized",
prompt_kb, max_prompt_bytes / 1024,
));
}
let write_pid = |phase: &str| {
fs::write(&pid_path, phase).ok();
};
let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect();
log(&format!("{} step(s) {:?}, {}KB initial, model={}, {}, {} nodes, output={}",
n_steps, phases, first_len / 1024, def.model, tools_desc,
agent_batch.node_keys.len(), state_dir.display()));
let prompts: Vec<String> = agent_batch.steps.iter()
.map(|s| s.prompt.clone()).collect();
let step_phases: Vec<String> = agent_batch.steps.iter()
.map(|s| s.phase.clone()).collect();
let step_phases_for_bail = step_phases.clone();
if std::env::var("POC_AGENT_VERBOSE").is_ok() {
for (i, s) in agent_batch.steps.iter().enumerate() {
log(&format!("=== PROMPT {}/{} ({}) ===\n\n{}", i + 1, n_steps, s.phase, s.prompt));
}
}
log("\n=== CALLING LLM ===");
// Bail check: if the agent defines a bail script, run it between steps.
// The script receives the pid file path as $1, cwd = state dir.
let bail_script = def.bail.as_ref().map(|name| {
// Look for the script next to the .agent file
let agents_dir = super::defs::agents_dir();
agents_dir.join(name)
});
let state_dir_for_bail = state_dir.clone();
let pid_path_for_bail = pid_path.clone();
let bail_fn = move |step_idx: usize| -> Result<(), String> {
// Update phase in pid file and provenance tracking
if step_idx < step_phases_for_bail.len() {
write_pid(&step_phases_for_bail[step_idx]);
}
// Run bail script if defined
if let Some(ref script) = bail_script {
let status = std::process::Command::new(script)
.arg(&pid_path_for_bail)
.current_dir(&state_dir_for_bail)
.status()
.map_err(|e| format!("bail script {:?} failed: {}", script, e))?;
if !status.success() {
return Err(format!("bailed at step {}: {:?} exited {}",
step_idx + 1, script.file_name().unwrap_or_default(),
status.code().unwrap_or(-1)));
}
}
Ok(())
};
let output = llm::call_for_def_multi(def, &prompts, &step_phases, Some(&bail_fn), log)?;
Ok(AgentResult {
output,
node_keys: agent_batch.node_keys,
state_dir,
})
}
// ---------------------------------------------------------------------------
// Conversation fragment selection
// ---------------------------------------------------------------------------
/// Select conversation fragments (per-segment) for the observation extractor.
/// Uses the transcript-progress.capnp log for dedup — no stub nodes.
/// Does NOT pre-mark segments; caller must call mark_observation_done() after success.
pub fn select_conversation_fragments(n: usize) -> Vec<(String, String)> {
let projects = crate::config::get().projects_dir.clone();
if !projects.exists() { return Vec::new(); }
let store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return Vec::new(),
};
let mut jsonl_files: Vec<PathBuf> = Vec::new();
if let Ok(dirs) = fs::read_dir(&projects) {
for dir in dirs.filter_map(|e| e.ok()) {
if !dir.path().is_dir() { continue; }
if let Ok(files) = fs::read_dir(dir.path()) {
for f in files.filter_map(|e| e.ok()) {
let p = f.path();
if p.extension().map(|x| x == "jsonl").unwrap_or(false)
&& let Ok(meta) = p.metadata()
&& meta.len() > 50_000 {
jsonl_files.push(p);
}
}
}
}
}
// Collect unmined segments across all transcripts
let mut candidates: Vec<(String, String)> = Vec::new();
for path in &jsonl_files {
let path_str = path.to_string_lossy();
let messages = match super::enrich::extract_conversation(&path_str) {
Ok(m) => m,
Err(_) => continue,
};
let session_id = path.file_stem()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".into());
let segments = super::enrich::split_on_compaction(messages);
for (seg_idx, segment) in segments.into_iter().enumerate() {
if store.is_segment_mined(&session_id, seg_idx as u32, "observation") {
continue;
}
// Skip segments with too few assistant messages (rate limits, errors)
let assistant_msgs = segment.iter()
.filter(|(_, role, _, _)| role == "assistant")
.count();
if assistant_msgs < 2 {
continue;
}
// Skip segments that are just rate limit errors
let has_rate_limit = segment.iter().any(|(_, _, text, _)|
text.contains("hit your limit") || text.contains("rate limit"));
if has_rate_limit && assistant_msgs < 3 {
continue;
}
let text = format_segment(&segment);
if text.len() < 500 {
continue;
}
const CHUNK_SIZE: usize = 50_000;
const OVERLAP: usize = 10_000;
if text.len() <= CHUNK_SIZE {
let id = format!("{}.{}", session_id, seg_idx);
candidates.push((id, text));
} else {
// Split on line boundaries with overlap
let lines: Vec<&str> = text.lines().collect();
let mut start_line = 0;
let mut chunk_idx = 0;
while start_line < lines.len() {
let mut end_line = start_line;
let mut size = 0;
while end_line < lines.len() && size < CHUNK_SIZE {
size += lines[end_line].len() + 1;
end_line += 1;
}
let chunk: String = lines[start_line..end_line].join("\n");
let id = format!("{}.{}.{}", session_id, seg_idx, chunk_idx);
candidates.push((id, chunk));
if end_line >= lines.len() { break; }
// Back up by overlap amount for next chunk
let mut overlap_size = 0;
let mut overlap_start = end_line;
while overlap_start > start_line && overlap_size < OVERLAP {
overlap_start -= 1;
overlap_size += lines[overlap_start].len() + 1;
}
start_line = overlap_start;
chunk_idx += 1;
}
}
}
if candidates.len() >= n { break; }
}
candidates.truncate(n);
candidates
}
/// Mark observation segments as successfully mined (call AFTER the agent succeeds).
pub fn mark_observation_done(fragment_ids: &[String]) {
let mut store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return,
};
for id in fragment_ids {
if let Some((session_id, seg_str)) = id.rsplit_once('.')
&& let Ok(seg) = seg_str.parse::<u32>() {
let _ = store.mark_segment_mined(session_id, seg, "observation");
}
}
}
/// Format a segment's messages into readable text for the observation agent.
fn format_segment(messages: &[(usize, String, String, String)]) -> String {
let cfg = crate::config::get();
let mut fragments = Vec::new();
for (_, role, text, ts) in messages {
let min_len = if role == "user" { 5 } else { 10 };
if text.len() <= min_len { continue; }
let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name };
if ts.is_empty() {
fragments.push(format!("**{}:** {}", name, text));
} else {
fragments.push(format!("**{}** {}: {}", name, &ts[..ts.len().min(19)], text));
}
}
fragments.join("\n\n")
}