consciousness/src/subconscious/defs.rs
Kent Overstreet 7476e9d0db delete rename agent and related code
The organize agents handle renaming as part of their normal work now.
Also simplified resolve_placeholders to build graph internally.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-13 02:05:58 -04:00

654 lines
24 KiB
Rust

// Agent definitions: self-contained files with query + prompt template.
//
// Each agent is a file in the agents/ directory:
// - First line: JSON header (agent, query, model, schedule)
// - After blank line: prompt template with {{placeholder}} lookups
//
// Placeholders are resolved at runtime:
// {{topology}} — graph topology header
// {{nodes}} — query results formatted as node sections
// {{episodes}} — alias for {{nodes}}
// {{health}} — graph health report
// {{rename}} — rename candidates
// {{split}} — split detail for the first query result
//
// The query selects what to operate on; placeholders pull in context.
use crate::graph::Graph;
use crate::store::Store;
use serde::Deserialize;
use std::path::PathBuf;
/// A single step in a multi-step agent.
pub struct AgentStep {
pub prompt: String,
/// Phase label for PID file tracking (e.g. "surface", "observe").
/// Parsed from `=== PROMPT phase:name ===` or auto-generated as "step-N".
pub phase: String,
}
/// Agent definition: config (from JSON header) + prompt steps.
pub struct AgentDef {
pub agent: String,
pub query: String,
/// Steps — single-step agents have one entry, multi-step have several.
/// Steps are separated by `=== PROMPT ===` in the .agent file.
pub steps: Vec<AgentStep>,
pub schedule: String,
pub tools: Vec<String>,
pub count: Option<usize>,
pub chunk_size: Option<usize>,
pub chunk_overlap: Option<usize>,
pub temperature: Option<f32>,
/// vLLM scheduling priority (lower = higher priority).
/// 0 = interactive, 1 = near-realtime, 10 = batch (default).
pub priority: i32,
/// Bail check command — run between steps with pid file path as $1,
/// cwd = state dir. Non-zero exit = stop the pipeline.
pub bail: Option<String>,
}
/// The JSON header portion (first line of the file).
#[derive(Deserialize)]
struct AgentHeader {
agent: String,
#[serde(default)]
query: String,
#[serde(default)]
schedule: String,
#[serde(default)]
tools: Vec<String>,
/// Number of seed nodes / conversation fragments (overrides --count)
#[serde(default)]
count: Option<usize>,
/// Max size of conversation chunks in bytes (default 50000)
#[serde(default)]
chunk_size: Option<usize>,
/// Overlap between chunks in bytes (default 10000)
#[serde(default)]
chunk_overlap: Option<usize>,
/// LLM temperature override
#[serde(default)]
temperature: Option<f32>,
/// vLLM scheduling priority (lower = higher priority, default 10 = batch)
#[serde(default = "default_priority")]
priority: i32,
/// Bail check command — run between steps with pid file path as $1,
/// cwd = state dir. Non-zero exit = stop the pipeline.
#[serde(default)]
bail: Option<String>,
}
fn default_priority() -> i32 { 10 }
/// Parse an agent file: first line is JSON config, rest is the prompt(s).
/// Multiple prompts are separated by `=== PROMPT [phase:name] ===` lines.
fn parse_agent_file(content: &str) -> Option<AgentDef> {
let (first_line, rest) = content.split_once('\n')?;
let header: AgentHeader = serde_json::from_str(first_line.trim()).ok()?;
// Skip optional blank line between header and prompt body
let body = rest.strip_prefix('\n').unwrap_or(rest);
// Split on === PROMPT ... === lines, capturing the delimiter content
let mut steps: Vec<AgentStep> = Vec::new();
let mut current_prompt = String::new();
let mut current_phase: Option<String> = None;
let mut step_num = 0;
for line in body.lines() {
if line.starts_with("=== PROMPT") && line.ends_with("===") {
// Save previous step if any
let trimmed = current_prompt.trim().to_string();
if !trimmed.is_empty() {
steps.push(AgentStep {
prompt: trimmed,
phase: current_phase.take()
.unwrap_or_else(|| format!("step-{}", step_num)),
});
step_num += 1;
}
// Parse delimiter: === PROMPT [phase:name] ===
let inner = line.strip_prefix("=== PROMPT").unwrap()
.strip_suffix("===").unwrap().trim();
current_phase = inner.strip_prefix("phase:")
.map(|s| s.trim().to_string());
current_prompt.clear();
} else {
if !current_prompt.is_empty() {
current_prompt.push('\n');
}
current_prompt.push_str(line);
}
}
// Save final step
let trimmed = current_prompt.trim().to_string();
if !trimmed.is_empty() {
steps.push(AgentStep {
prompt: trimmed,
phase: current_phase.take()
.unwrap_or_else(|| format!("step-{}", step_num)),
});
}
if steps.is_empty() {
return None;
}
Some(AgentDef {
agent: header.agent,
query: header.query,
steps,
schedule: header.schedule,
tools: header.tools,
count: header.count,
chunk_size: header.chunk_size,
chunk_overlap: header.chunk_overlap,
temperature: header.temperature,
priority: header.priority,
bail: header.bail,
})
}
pub fn agents_dir() -> PathBuf {
let repo = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/subconscious/agents");
if repo.is_dir() { return repo; }
crate::store::memory_dir().join("agents")
}
/// Load all agent definitions.
pub fn load_defs() -> Vec<AgentDef> {
let dir = agents_dir();
let Ok(entries) = std::fs::read_dir(&dir) else { return Vec::new() };
entries
.filter_map(|e| e.ok())
.filter(|e| {
let p = e.path();
p.extension().map(|x| x == "agent" || x == "md").unwrap_or(false)
})
.filter_map(|e| {
let content = std::fs::read_to_string(e.path()).ok()?;
parse_agent_file(&content)
})
.collect()
}
/// Look up a single agent definition by name.
pub fn get_def(name: &str) -> Option<AgentDef> {
let dir = agents_dir();
for ext in ["agent", "md"] {
let path = dir.join(format!("{}.{}", name, ext));
if let Ok(content) = std::fs::read_to_string(&path)
&& let Some(def) = parse_agent_file(&content) {
return Some(def);
}
}
load_defs().into_iter().find(|d| d.agent == name)
}
/// Result of resolving a placeholder: text + any affected node keys.
struct Resolved {
text: String,
keys: Vec<String>,
}
/// Resolve a single {{placeholder}} by name.
/// Returns the replacement text and any node keys it produced (for visit tracking).
fn resolve(
name: &str,
store: &Store,
graph: &Graph,
keys: &[String],
_count: usize,
) -> Option<Resolved> {
match name {
// seed — render output for each seed node (content + deduped links)
"seed" => {
let mut text = String::new();
let mut result_keys = Vec::new();
for key in keys {
if let Some(r) = resolve_tool(&format!("memory_render {}", key)) {
if !text.is_empty() { text.push_str("\n\n---\n\n"); }
text.push_str(&format!("## {}\n\n{}", key, r.text));
result_keys.push(key.clone());
}
}
if text.is_empty() { return None; }
Some(Resolved { text, keys: result_keys })
}
"organize" => {
// Show seed nodes with content and links via RPC
let mut text = format!("### Seed nodes ({} starting points)\n\n", keys.len());
let mut result_keys = Vec::new();
for key in keys {
match crate::mcp_server::memory_rpc(
"memory_render",
serde_json::json!({"key": key}),
) {
Ok(content) if !content.trim().is_empty() => {
text.push_str(&format!("#### {}\n\n{}\n\n---\n\n", key, content));
result_keys.push(key.clone());
}
_ => continue,
}
}
text.push_str("Use memory_render(KEY) and memory_links(KEY) to explore further.\n");
Some(Resolved { text, keys: result_keys })
}
"siblings" | "neighborhood" => {
const MAX_NEIGHBORS: usize = 20;
const BUDGET: usize = 400_000; // ~100K tokens
let mut out = String::new();
let mut all_keys: Vec<String> = Vec::new();
let mut included: std::collections::HashSet<String> = std::collections::HashSet::new();
for key in keys {
if included.contains(key) { continue; }
included.insert(key.clone());
let Some(node) = store.nodes.get(key.as_str()) else { continue };
// Seed node with full content
out.push_str(&format!("## {} (seed)\n\n{}\n\n", key, node.content));
all_keys.push(key.clone());
// Rank neighbors by link_strength * node_weight, take top 20
let mut ranked: Vec<_> = graph.neighbors(key).iter()
.filter_map(|(nbr, strength)| {
store.nodes.get(nbr.as_str()).map(|n| {
let score = strength * n.weight.max(0.01);
(nbr.to_string(), *strength, score)
})
})
.collect();
ranked.sort_by(|a, b| b.2.total_cmp(&a.2));
ranked.truncate(MAX_NEIGHBORS);
if ranked.is_empty() { continue; }
out.push_str(&format!("### Neighbors (top {})\n\n", ranked.len()));
for (nbr, strength, _) in &ranked {
if included.contains(nbr) { continue; }
included.insert(nbr.clone());
if let Some(n) = store.nodes.get(nbr.as_str()) {
if out.len() > BUDGET {
// Header-only past budget
let first = n.content.lines()
.find(|l| !l.trim().is_empty())
.unwrap_or("(empty)");
out.push_str(&format!("#### {} ({:.2}) — {}\n", nbr, strength, first));
} else {
out.push_str(&format!("#### {} ({:.2})\n\n{}\n\n", nbr, strength, n.content));
}
all_keys.push(nbr.to_string());
}
}
}
Some(Resolved { text: out, keys: all_keys })
}
// agent-context — personality/identity groups from load-context config
"agent-context" => {
let cfg = crate::config::get();
let mut text = String::new();
let mut keys = Vec::new();
for group in &cfg.context_groups {
if !group.agent { continue; }
let entries = crate::cli::node::get_group_content(group, &cfg);
for (key, content) in entries {
use std::fmt::Write;
writeln!(text, "--- {} ({}) ---", key, group.label).ok();
writeln!(text, "{}\n", content).ok();
keys.push(key);
}
}
if text.is_empty() { None }
else { Some(Resolved { text, keys }) }
}
// input:KEY — read a named output file from the agent's output dir
_ if name.starts_with("input:") => {
let key = &name[6..];
let dir = std::env::var("POC_AGENT_OUTPUT_DIR")
.map(std::path::PathBuf::from)
.unwrap_or_else(|_| crate::store::memory_dir().join("agent-output").join("default"));
let path = dir.join(key);
match std::fs::read_to_string(&path) {
Ok(text) => Some(Resolved { text, keys: vec![] }),
Err(_) => Some(Resolved { text: String::new(), keys: vec![] }),
}
}
// conversation — tail of the current session transcript (post-compaction)
// conversation:N — same, but with an explicit byte budget
_ if name == "conversation" || name.starts_with("conversation:") => {
let max_bytes = name.strip_prefix("conversation:")
.and_then(|s| s.parse::<usize>().ok());
let text = resolve_conversation(max_bytes);
if text.is_empty() { None }
else { Some(Resolved { text, keys: vec![] }) }
}
// seen_current — memories surfaced in current (post-compaction) context
"seen_current" => {
let text = resolve_seen_list("");
Some(Resolved { text, keys: vec![] })
}
// seen_previous — memories surfaced before last compaction
"seen_previous" => {
let text = resolve_seen_list("-prev");
Some(Resolved { text, keys: vec![] })
}
// memory_ratio — what % of current context is recalled memories
"memory_ratio" => {
let text = resolve_memory_ratio();
Some(Resolved { text, keys: vec![] })
}
// tool:NAME ARGS — run a tool call and include its output
_ if name.starts_with("tool:") => {
let spec = name[5..].trim();
resolve_tool(spec)
}
// bash:COMMAND — run a shell command and include its stdout
_ if name.starts_with("bash:") => {
let cmd = &name[5..];
let output = std::process::Command::new("bash")
.args(["-c", cmd])
.output();
let text = match output {
Ok(o) if o.status.success() =>
String::from_utf8_lossy(&o.stdout).to_string(),
Ok(o) => format!("(command failed: {})",
String::from_utf8_lossy(&o.stderr).trim()),
Err(e) => format!("(command error: {})", e),
};
Some(Resolved { text, keys: vec![] })
}
_ => None,
}
}
/// Get the tail of the current session's conversation.
/// Reads POC_SESSION_ID to find the transcript, extracts the last
/// segment (post-compaction), returns the tail (~100K chars).
fn resolve_conversation(budget: Option<usize>) -> String {
let session = crate::session::HookSession::from_env();
let transcript = session.as_ref()
.map(|s| s.transcript())
.unwrap_or_else(crate::session::TranscriptInfo::empty);
if !transcript.exists() { return String::new(); }
let Some(iter) = crate::transcript::TailMessages::open(&transcript.path) else {
return String::new();
};
let cfg = crate::config::get();
let max_bytes = budget.unwrap_or_else(|| cfg.surface_conversation_bytes.unwrap_or(100_000));
let mut fragments: Vec<String> = Vec::new();
let mut total_bytes = 0;
let mut oldest_ts = String::new();
for (role, content, ts) in iter {
if total_bytes >= max_bytes { break; }
let name = if role == "user" { &cfg.user_name } else { &cfg.assistant_name };
let formatted = if !ts.is_empty() {
oldest_ts = ts[..ts.floor_char_boundary(ts.len().min(19))].to_string();
format!("**{}** {}: {}", name, &oldest_ts, content)
} else {
format!("**{}:** {}", name, content)
};
total_bytes += content.len();
fragments.push(formatted);
}
// Set cutoff so surface doesn't see nodes created during this conversation
if !oldest_ts.is_empty() {
if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(&oldest_ts, "%Y-%m-%dT%H:%M:%S") {
let epoch = dt.and_local_timezone(chrono::Local).unwrap().timestamp();
unsafe { std::env::set_var("POC_MEMORIES_OLDER_THAN", epoch.to_string()); }
}
}
// Reverse back to chronological order
fragments.reverse();
fragments.join("\n\n")
}
/// Get surfaced memory keys from a seen-set file.
/// `suffix` is "" for current, "-prev" for pre-compaction.
fn resolve_seen_list(suffix: &str) -> String {
let session_id = std::env::var("POC_SESSION_ID").unwrap_or_default();
if session_id.is_empty() {
return "(no session ID)".to_string();
}
let state_dir = crate::store::memory_dir().join("sessions");
let path = state_dir.join(format!("seen{}-{}", suffix, session_id));
let entries: Vec<(String, String)> = std::fs::read_to_string(&path).ok()
.map(|content| {
content.lines()
.filter(|s| !s.is_empty())
.filter_map(|line| {
let (ts, key) = line.split_once('\t')?;
Some((ts.to_string(), key.to_string()))
})
.collect()
})
.unwrap_or_default();
if entries.is_empty() {
return "(none)".to_string();
}
// Sort newest first, dedup, cap at 20
let mut sorted = entries;
sorted.sort_by(|a, b| b.0.cmp(&a.0));
let mut seen = std::collections::HashSet::new();
let deduped: Vec<_> = sorted.into_iter()
.filter(|(_, key)| seen.insert(key.clone()))
.take(20)
.collect();
deduped.iter()
.map(|(ts, key)| format!("- {} ({})", key, ts))
.collect::<Vec<_>>()
.join("\n")
}
/// Compute what percentage of the current conversation context is recalled memories.
/// Sums rendered size of current seen-set keys vs total post-compaction transcript size.
fn resolve_memory_ratio() -> String {
let session_id = std::env::var("POC_SESSION_ID").unwrap_or_default();
if session_id.is_empty() {
return "(no session ID)".to_string();
}
let state_dir = crate::store::memory_dir().join("sessions");
// Get post-compaction transcript size
let session = crate::session::HookSession::from_env();
let transcript = session.as_ref()
.map(|s| s.transcript())
.unwrap_or_else(crate::session::TranscriptInfo::empty);
let compaction_offset: u64 = std::fs::read_to_string(
state_dir.join(format!("compaction-{}", session_id))
).ok().and_then(|s| s.trim().parse().ok()).unwrap_or(0);
let transcript_size = transcript.size.saturating_sub(compaction_offset);
if transcript_size == 0 {
return "0% of context is recalled memories (new session)".to_string();
}
// Sum rendered size of each key in current seen set
let seen_path = state_dir.join(format!("seen-{}", session_id));
let mut seen_keys = std::collections::HashSet::new();
let keys: Vec<String> = std::fs::read_to_string(&seen_path).ok()
.map(|content| {
content.lines()
.filter(|s| !s.is_empty())
.filter_map(|line| line.split_once('\t').map(|(_, k)| k.to_string()))
.filter(|k| seen_keys.insert(k.clone()))
.collect()
})
.unwrap_or_default();
let memory_bytes: u64 = keys.iter()
.filter_map(|key| {
std::process::Command::new("poc-memory")
.args(["render", key])
.output().ok()
})
.map(|out| out.stdout.len() as u64)
.sum();
let pct = (memory_bytes as f64 / transcript_size as f64 * 100.0).round() as u32;
format!("{}% of current context is recalled memories ({} memories, ~{}KB of ~{}KB)",
pct, keys.len(), memory_bytes / 1024, transcript_size / 1024)
}
/// Resolve a {{tool: name {args}}} placeholder by calling the tool
/// handler from the registry. Uses block_in_place to bridge sync→async.
fn resolve_tool(spec: &str) -> Option<Resolved> {
// Parse "tool_name {json args}" or "tool_name arg"
let (name, args) = match spec.find('{') {
Some(i) => {
let name = spec[..i].trim();
let args: serde_json::Value = serde_json::from_str(&spec[i..]).ok()?;
(name, args)
}
None => {
let mut parts = spec.splitn(2, char::is_whitespace);
let name = parts.next()?;
match parts.next() {
Some(arg) => (name, serde_json::json!({"key": arg})),
None => (name, serde_json::json!({})),
}
}
};
let tools = crate::agent::tools::tools();
let tool = tools.iter().find(|t| t.name == name)?;
let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(
(tool.handler)(None, args.clone())
)
});
match result {
Ok(text) => Some(Resolved { text, keys: vec![] }),
Err(e) => {
eprintln!("[defs] {{{{tool: {}}}}} failed: {}", name, e);
Some(Resolved { text: format!("(tool error: {})", e), keys: vec![] })
}
}
}
/// Resolve all {{placeholder}} patterns in a prompt template.
/// Returns the resolved text and all node keys collected from placeholders.
pub fn resolve_placeholders(
template: &str,
store: &Store,
keys: &[String],
count: usize,
) -> (String, Vec<String>) {
let graph = store.build_graph();
let mut result = template.to_string();
let mut extra_keys = Vec::new();
let mut pos = 0;
loop {
let Some(rel_start) = result[pos..].find("{{") else { break };
let start = pos + rel_start;
let Some(rel_end) = result[start + 2..].find("}}") else { break };
let end = start + 2 + rel_end;
let name = result[start + 2..end].trim().to_lowercase();
match resolve(&name, store, &graph, keys, count) {
Some(resolved) => {
let len = resolved.text.len();
extra_keys.extend(resolved.keys);
result.replace_range(start..end + 2, &resolved.text);
pos = start + len;
}
None => {
let msg = format!("(unknown: {})", name);
let len = msg.len();
result.replace_range(start..end + 2, &msg);
pos = start + len;
}
}
}
(result, extra_keys)
}
/// Run a config-driven agent: query → resolve placeholders → prompt.
/// `exclude` filters out nodes (and their neighborhoods) already being
/// worked on by other agents, preventing concurrent collisions.
pub fn run_agent(
store: &Store,
def: &AgentDef,
count: usize,
exclude: &std::collections::HashSet<String>,
) -> Result<super::prompts::AgentBatch, String> {
// Run the query if present, via RPC
let keys = if !def.query.is_empty() {
let padded = count + exclude.len().min(100);
let query = if def.query.contains("limit:") {
def.query.clone()
} else {
format!("{} | limit:{}", def.query, padded)
};
let result = crate::mcp_server::memory_rpc(
"memory_query",
serde_json::json!({"query": query}),
).map_err(|e| e.to_string())?;
let filtered: Vec<String> = result.lines()
.filter(|l| !l.is_empty() && *l != "no results")
.map(|s| s.to_string())
.filter(|k| !exclude.contains(k))
.take(count)
.collect();
if filtered.is_empty() {
return Err(format!("{}: query returned no results (after exclusion)", def.agent));
}
filtered
} else {
vec![]
};
// Resolve placeholders for all steps. The conversation context
// carries forward between steps naturally via the LLM's message history.
let mut all_keys = keys;
let mut resolved_steps = Vec::new();
for step in &def.steps {
let cfg = crate::config::get();
let template = step.prompt
.replace("{agent_name}", &def.agent)
.replace("{user_name}", &cfg.user_name)
.replace("{assistant_name}", &cfg.assistant_name);
let (prompt, extra_keys) = resolve_placeholders(&template, store, &all_keys, count);
all_keys.extend(extra_keys);
resolved_steps.push(super::prompts::ResolvedStep {
prompt,
phase: step.phase.clone(),
});
}
Ok(super::prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys })
}