consciousness/poc-memory/src/agents/llm.rs

252 lines
9.5 KiB
Rust
Raw Normal View History

// LLM utilities: model invocation and response parsing
//
// Calls claude CLI as a subprocess. Uses prctl(PR_SET_PDEATHSIG)
// so child processes die when the daemon exits, preventing orphans.
use crate::store::Store;
use regex::Regex;
use std::fs;
use std::os::unix::process::CommandExt;
use std::process::Command;
fn log_usage(agent: &str, model: &str, prompt: &str, response: &str,
duration_ms: u128, ok: bool) {
let dir = crate::config::get().data_dir.join("llm-logs").join(agent);
let _ = fs::create_dir_all(&dir);
let date = chrono::Local::now().format("%Y-%m-%d");
let path = dir.join(format!("{}.md", date));
let ts = chrono::Local::now().format("%H:%M:%S");
let status = if ok { "ok" } else { "ERROR" };
let entry = format!(
"\n## {} — {} ({}, {:.1}s, {})\n\n\
### Prompt ({} chars)\n\n\
```\n{}\n```\n\n\
### Response ({} chars)\n\n\
```\n{}\n```\n\n---\n",
ts, agent, model, duration_ms as f64 / 1000.0, status,
prompt.len(), prompt,
response.len(), response,
);
use std::io::Write;
if let Ok(mut f) = fs::OpenOptions::new().create(true).append(true).open(&path) {
let _ = f.write_all(entry.as_bytes());
}
}
/// Maximum time to wait for a claude subprocess before killing it.
const SUBPROCESS_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // 5 minutes
/// Longer timeout for agents with tool access (multi-turn conversations).
const TOOL_AGENT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(900); // 15 minutes
/// Call a model via claude CLI. Returns the response text.
///
/// Sets PR_SET_PDEATHSIG on the child so it gets SIGTERM if the
/// parent daemon exits — no more orphaned claude processes.
/// Times out after 5 minutes to prevent blocking the daemon forever.
fn call_model(agent: &str, model: &str, prompt: &str) -> Result<String, String> {
call_model_with_tools(agent, model, prompt, &[])
}
/// Call a model via claude CLI, optionally with allowed tools.
///
/// When `tools` is empty, all tools are disabled (`--tools ""`).
/// When `tools` has entries, they're passed as `--allowedTools` patterns
/// (e.g. `["Bash(poc-memory:*)"]`), letting the agent call those tools
/// in Claude's native tool loop.
fn call_model_with_tools(agent: &str, model: &str, prompt: &str,
tools: &[String]) -> Result<String, String> {
let timeout = if tools.is_empty() { SUBPROCESS_TIMEOUT } else { TOOL_AGENT_TIMEOUT };
// Write prompt to temp file (claude CLI needs file input for large prompts)
let tmp = std::env::temp_dir().join(format!("poc-llm-{}-{:?}.txt",
std::process::id(), std::thread::current().id()));
fs::write(&tmp, prompt)
.map_err(|e| format!("write temp prompt: {}", e))?;
let mut cmd = Command::new("claude");
if tools.is_empty() {
cmd.args(["-p", "--model", model, "--tools", "", "--no-session-persistence",
"--strict-mcp-config"]);
} else {
cmd.args(["-p", "--model", model, "--no-session-persistence",
"--strict-mcp-config", "--allowedTools"]);
for tool in tools {
cmd.arg(tool);
}
}
cmd
.stdin(fs::File::open(&tmp).map_err(|e| format!("open temp: {}", e))?)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.env_remove("CLAUDECODE");
// Use separate OAuth credentials for agent work if configured
if let Some(ref dir) = crate::config::get().agent_config_dir {
cmd.env("CLAUDE_CONFIG_DIR", dir);
}
// Tell hooks this is a daemon agent call, not interactive
cmd.env("POC_AGENT", "1");
// Set provenance so any nodes/links created by tool calls are tagged
cmd.env("POC_PROVENANCE", format!("agent:{}", agent));
let start = std::time::Instant::now();
let child = unsafe {
cmd.pre_exec(|| {
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);
Ok(())
})
.spawn()
.map_err(|e| format!("spawn claude: {}", e))?
};
// Spawn a watchdog thread that kills the child after the timeout.
// Uses a cancellation flag so the thread exits promptly when the child finishes.
let child_id = child.id();
let cancel = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let cancel_flag = cancel.clone();
let watchdog = std::thread::spawn(move || {
// Sleep in 1s increments so we can check the cancel flag
let deadline = std::time::Instant::now() + timeout;
while std::time::Instant::now() < deadline {
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
std::thread::sleep(std::time::Duration::from_secs(1));
}
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
// Send SIGTERM, then SIGKILL after 5s grace period
unsafe { libc::kill(child_id as i32, libc::SIGTERM); }
for _ in 0..5 {
std::thread::sleep(std::time::Duration::from_secs(1));
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
}
unsafe { libc::kill(child_id as i32, libc::SIGKILL); }
});
let result = child.wait_with_output();
// Cancel the watchdog thread
cancel.store(true, std::sync::atomic::Ordering::Relaxed);
watchdog.join().ok();
fs::remove_file(&tmp).ok();
match result {
Ok(output) => {
let elapsed = start.elapsed().as_millis();
if elapsed > timeout.as_millis() - 1000 {
log_usage(agent, model, prompt, "TIMEOUT", elapsed, false);
return Err(format!("claude timed out after {:.0}s", elapsed as f64 / 1000.0));
}
if output.status.success() {
let response = String::from_utf8_lossy(&output.stdout).trim().to_string();
if response.is_empty() {
log_usage(agent, model, prompt, "EMPTY", elapsed, false);
return Err("claude returned empty response".into());
}
if response.contains(": You've hit your limit \u{00b7} resets") {
log_usage(agent, model, prompt, "RATE_LIMITED", elapsed, false);
return Err(format!("rate limited: {}", crate::util::first_n_chars(&response, 200)));
}
log_usage(agent, model, prompt, &response, elapsed, true);
Ok(response)
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
let preview = crate::util::first_n_chars(&stderr, 500);
log_usage(agent, model, prompt, &preview, elapsed, false);
Err(format!("claude exited {}: {}", output.status, preview.trim()))
}
}
Err(e) => Err(format!("wait claude: {}", e)),
}
}
/// Call Sonnet via claude CLI.
pub(crate) fn call_sonnet(agent: &str, prompt: &str) -> Result<String, String> {
call_model(agent, "sonnet", prompt)
}
/// Call Haiku via claude CLI (cheaper, faster — good for high-volume extraction).
pub(crate) fn call_haiku(agent: &str, prompt: &str) -> Result<String, String> {
call_model(agent, "haiku", prompt)
}
/// Simple LLM call for non-agent uses (audit, digest, compare).
/// Logs to llm-logs/{caller}/ file.
pub(crate) fn call_simple(caller: &str, prompt: &str) -> Result<String, String> {
let log_dir = crate::store::memory_dir().join("llm-logs").join(caller);
fs::create_dir_all(&log_dir).ok();
let log_path = log_dir.join(format!("{}.txt", crate::store::compact_timestamp()));
use std::io::Write;
let log = move |msg: &str| {
if let Ok(mut f) = fs::OpenOptions::new()
.create(true).append(true).open(&log_path)
{
let _ = writeln!(f, "{}", msg);
}
};
super::api::call_api_with_tools_sync(caller, prompt, &log)
}
/// Call a model using an agent definition's configuration.
pub(crate) fn call_for_def(
def: &super::defs::AgentDef,
prompt: &str,
log: &(dyn Fn(&str) + Sync),
) -> Result<String, String> {
super::api::call_api_with_tools_sync(&def.agent, prompt, log)
}
/// Parse a JSON response, handling markdown fences.
pub(crate) fn parse_json_response(response: &str) -> Result<serde_json::Value, String> {
let cleaned = response.trim();
let cleaned = cleaned.strip_prefix("```json").unwrap_or(cleaned);
let cleaned = cleaned.strip_prefix("```").unwrap_or(cleaned);
let cleaned = cleaned.strip_suffix("```").unwrap_or(cleaned);
let cleaned = cleaned.trim();
if let Ok(v) = serde_json::from_str(cleaned) {
return Ok(v);
}
// Try to find JSON object or array
let re_obj = Regex::new(r"\{[\s\S]*\}").unwrap();
let re_arr = Regex::new(r"\[[\s\S]*\]").unwrap();
if let Some(m) = re_obj.find(cleaned)
&& let Ok(v) = serde_json::from_str(m.as_str()) {
return Ok(v);
}
if let Some(m) = re_arr.find(cleaned)
&& let Ok(v) = serde_json::from_str(m.as_str()) {
return Ok(v);
}
let preview = crate::util::first_n_chars(cleaned, 200);
Err(format!("no valid JSON in response: {preview}..."))
}
/// Get all keys for prompt context.
pub(crate) fn semantic_keys(store: &Store) -> Vec<String> {
let mut keys: Vec<String> = store.nodes.keys()
.cloned()
.collect();
keys.sort();
keys.truncate(200);
keys
}