From cbf7653cdf56c003983f8ba3c9e3480053e2cc06 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Tue, 7 Apr 2026 00:57:35 -0400 Subject: [PATCH] call_api_with_tools_sync() -> src/agent/oneshot.rs --- src/agent/oneshot.rs | 218 ++++++++++++++++++- src/claude/agent_cycles.rs | 416 +++++++++++++++++++++++++++++++++++++ src/claude/hook.rs | 2 +- src/claude/mod.rs | 1 + src/cli/agent.rs | 2 +- src/subconscious/api.rs | 228 -------------------- src/subconscious/audit.rs | 2 +- src/subconscious/digest.rs | 2 +- src/subconscious/mod.rs | 1 - 9 files changed, 638 insertions(+), 234 deletions(-) create mode 100644 src/claude/agent_cycles.rs delete mode 100644 src/subconscious/api.rs diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index 10558f9..b09e5ec 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -13,6 +13,27 @@ use crate::subconscious::{defs, prompts}; use std::fs; use std::path::PathBuf; +use std::sync::OnceLock; + +use super::api::ApiClient; +use super::api::types::*; +use super::tools::{self as agent_tools}; + +// --------------------------------------------------------------------------- +// API client — shared across oneshot agent runs +// --------------------------------------------------------------------------- + +static API_CLIENT: OnceLock = OnceLock::new(); + +fn get_client() -> Result<&'static ApiClient, String> { + Ok(API_CLIENT.get_or_init(|| { + let config = crate::config::get(); + let base_url = config.api_base_url.as_deref().unwrap_or(""); + let api_key = config.api_key.as_deref().unwrap_or(""); + let model = config.api_model.as_deref().unwrap_or("qwen-2.5-27b"); + ApiClient::new(base_url, api_key, model) + })) +} // --------------------------------------------------------------------------- // Agent execution @@ -143,7 +164,7 @@ pub fn run_one_agent( Ok(()) }; - let output = crate::subconscious::api::call_api_with_tools_sync( + let output = call_api_with_tools_sync( agent_name, &prompts, &step_phases, def.temperature, def.priority, &effective_tools, Some(&bail_fn), log)?; @@ -154,6 +175,201 @@ pub fn run_one_agent( }) } +// --------------------------------------------------------------------------- +// Multi-step API turn loop +// --------------------------------------------------------------------------- + +/// Run agent prompts through the API with tool support. +/// For multi-step agents, each prompt is injected as a new user message +/// after the previous step's tool loop completes. The conversation +/// context carries forward naturally between steps. +/// Returns the final text response after all steps complete. +pub async fn call_api_with_tools( + agent: &str, + prompts: &[String], + phases: &[String], + temperature: Option, + priority: i32, + tools: &[agent_tools::Tool], + bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, + log: &dyn Fn(&str), +) -> Result { + let client = get_client()?; + + let first_phase = phases.first().map(|s| s.as_str()).unwrap_or(""); + let _provenance = std::cell::RefCell::new( + if first_phase.is_empty() { format!("agent:{}", agent) } + else { format!("agent:{}:{}", agent, first_phase) } + ); + + let mut messages = vec![Message::user(&prompts[0])]; + let mut next_prompt_idx = 1; + let reasoning = crate::config::get().api_reasoning.clone(); + + let max_turns = 50 * prompts.len(); + for turn in 0..max_turns { + log(&format!("\n=== TURN {} ({} messages) ===\n", turn, messages.len())); + + let mut last_err = None; + let mut msg_opt = None; + let mut usage_opt = None; + for attempt in 0..5 { + let sampling = super::api::SamplingParams { + temperature: temperature.unwrap_or(0.6), + top_p: 0.95, + top_k: 20, + }; + match client.chat_completion_stream_temp( + &messages, + tools, + &reasoning, + sampling, + Some(priority), + ).await { + Ok((msg, usage)) => { + msg_opt = Some(msg); + usage_opt = usage; + break; + } + Err(e) => { + let err_str = e.to_string(); + let is_transient = err_str.contains("IncompleteMessage") + || err_str.contains("connection closed") + || err_str.contains("connection reset") + || err_str.contains("timed out") + || err_str.contains("Connection refused"); + if is_transient && attempt < 4 { + log(&format!("transient error (attempt {}): {}, retrying...", + attempt + 1, err_str)); + tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; + last_err = Some(e); + continue; + } + let msg_bytes: usize = messages.iter() + .map(|m| m.content_text().len()) + .sum(); + return Err(format!( + "API error on turn {} (~{}KB payload, {} messages, {} attempts): {}", + turn, msg_bytes / 1024, messages.len(), attempt + 1, e)); + } + } + } + let msg = msg_opt.unwrap(); + if let Some(ref e) = last_err { + log(&format!("succeeded after retry (previous error: {})", e)); + } + + if let Some(u) = &usage_opt { + log(&format!("tokens: {} prompt + {} completion", + u.prompt_tokens, u.completion_tokens)); + } + + let has_content = msg.content.is_some(); + let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); + + if has_tools { + let mut sanitized = msg.clone(); + if let Some(ref mut calls) = sanitized.tool_calls { + for call in calls { + if serde_json::from_str::(&call.function.arguments).is_err() { + log(&format!("sanitizing malformed args for {}: {}", + call.function.name, &call.function.arguments)); + call.function.arguments = "{}".to_string(); + } + } + } + messages.push(sanitized); + + for call in msg.tool_calls.as_ref().unwrap() { + log(&format!("\nTOOL CALL: {}({})", + call.function.name, + &call.function.arguments)); + + let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { + Ok(v) => v, + Err(_) => { + log(&format!("malformed tool call args: {}", &call.function.arguments)); + messages.push(Message::tool_result( + &call.id, + "Error: your tool call had malformed JSON arguments. Please retry with valid JSON.", + )); + continue; + } + }; + + let output = agent_tools::dispatch(&call.function.name, &args).await; + + if std::env::var("POC_AGENT_VERBOSE").is_ok() { + log(&format!("TOOL RESULT ({} chars):\n{}", output.len(), output)); + } else { + let preview: String = output.lines().next().unwrap_or("").chars().take(100).collect(); + log(&format!("Result: {}", preview)); + } + + messages.push(Message::tool_result(&call.id, &output)); + } + continue; + } + + // Text-only response — step complete + let text = msg.content_text().to_string(); + if text.is_empty() && !has_content { + log("empty response, retrying"); + messages.push(Message::user( + "[system] Your previous response was empty. Please respond with text or use a tool." + )); + continue; + } + + log(&format!("\n=== RESPONSE ===\n\n{}", text)); + + // If there are more prompts, check bail condition and inject the next one + if next_prompt_idx < prompts.len() { + if let Some(ref check) = bail_fn { + check(next_prompt_idx)?; + } + if let Some(phase) = phases.get(next_prompt_idx) { + *_provenance.borrow_mut() = format!("agent:{}:{}", agent, phase); + } + messages.push(Message::assistant(&text)); + let next = &prompts[next_prompt_idx]; + next_prompt_idx += 1; + log(&format!("\n=== STEP {}/{} ===\n", next_prompt_idx, prompts.len())); + messages.push(Message::user(next)); + continue; + } + + return Ok(text); + } + + Err(format!("agent exceeded {} tool turns", max_turns)) +} + +/// Synchronous wrapper — runs the async function on a dedicated thread +/// with its own tokio runtime. Safe to call from any context. +pub fn call_api_with_tools_sync( + agent: &str, + prompts: &[String], + phases: &[String], + temperature: Option, + priority: i32, + tools: &[agent_tools::Tool], + bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, + log: &(dyn Fn(&str) + Sync), +) -> Result { + std::thread::scope(|s| { + s.spawn(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| format!("tokio runtime: {}", e))?; + rt.block_on( + call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn, log) + ) + }).join().unwrap() + }) +} + // --------------------------------------------------------------------------- // Process management — PID tracking and subprocess spawning // --------------------------------------------------------------------------- diff --git a/src/claude/agent_cycles.rs b/src/claude/agent_cycles.rs new file mode 100644 index 0000000..423a2ad --- /dev/null +++ b/src/claude/agent_cycles.rs @@ -0,0 +1,416 @@ +// agent_cycles.rs — Agent orchestration for the Claude Code hook path +// +// Forked from subconscious/subconscious.rs. This copy handles the +// serialized-to-disk, process-spawning model used by Claude Code hooks. +// The TUI/Mind copy in subconscious/ is free to evolve independently +// (async tasks, integrated with Mind's event loop). + +use std::fs; +use std::fs::File; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant, SystemTime}; + +pub use crate::session::HookSession; + +/// Output from a single agent orchestration cycle. +#[derive(Default)] +pub struct AgentCycleOutput { + /// Memory node keys surfaced by surface-observe. + pub surfaced_keys: Vec, + /// Freeform reflection text from the reflect agent. + pub reflection: Option, + /// How long we slept waiting for observe to catch up, if at all. + pub sleep_secs: Option, +} + +/// Per-agent runtime state. +pub struct AgentInfo { + pub name: &'static str, + pub pid: Option, + pub phase: Option, + pub log_path: Option, + child: Option, +} + +/// Snapshot of agent state — serializable, sendable to TUI. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct AgentSnapshot { + pub name: String, + pub pid: Option, + pub phase: Option, + pub log_path: Option, +} + +impl AgentInfo { + fn snapshot(&self) -> AgentSnapshot { + AgentSnapshot { + name: self.name.to_string(), + pid: self.pid, + phase: self.phase.clone(), + log_path: self.log_path.clone(), + } + } +} + +/// Serializable state for persisting across Claude Code hook invocations. +#[derive(serde::Serialize, serde::Deserialize)] +pub struct SavedAgentState { + pub agents: Vec, +} + +impl SavedAgentState { + fn state_path(session_id: &str) -> PathBuf { + let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/sessions"); + fs::create_dir_all(&dir).ok(); + dir.join(format!("agent-state-{}.json", session_id)) + } + + pub fn load(session_id: &str) -> Self { + let path = Self::state_path(session_id); + let mut state: Self = fs::read_to_string(&path).ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or(SavedAgentState { agents: Vec::new() }); + + for agent in &mut state.agents { + if let Some(pid) = agent.pid { + unsafe { + if libc::kill(pid as i32, 0) != 0 { + agent.pid = None; + agent.phase = None; + } + } + } + } + state + } + + pub fn save(&self, session_id: &str) { + let path = Self::state_path(session_id); + if let Ok(json) = serde_json::to_string(self) { + fs::write(path, json).ok(); + } + } +} + +/// Persistent state for the agent orchestration cycle. +/// Created once per hook invocation, `trigger()` called on each user message. +pub struct AgentCycleState { + output_dir: PathBuf, + log_file: Option, + pub agents: Vec, + pub last_output: AgentCycleOutput, +} + +const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"]; + +impl AgentCycleState { + pub fn new(session_id: &str) -> Self { + let output_dir = crate::store::memory_dir().join("agent-output"); + let log_dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); + fs::create_dir_all(&log_dir).ok(); + let log_path = log_dir.join(format!("hook-{}", session_id)); + let log_file = fs::OpenOptions::new() + .create(true).append(true).open(log_path).ok(); + + let agents = AGENT_CYCLE_NAMES.iter() + .map(|&name| AgentInfo { name, pid: None, phase: None, log_path: None, child: None }) + .collect(); + + AgentCycleState { + output_dir, + log_file, + agents, + last_output: AgentCycleOutput { + surfaced_keys: vec![], + reflection: None, + sleep_secs: None, + }, + } + } + + fn log(&mut self, msg: std::fmt::Arguments) { + if let Some(ref mut f) = self.log_file { + let _ = write!(f, "{}", msg); + } + } + + fn agent_running(&self, name: &str) -> bool { + self.agents.iter().any(|a| a.name == name && a.pid.is_some()) + } + + fn agent_spawned(&mut self, name: &str, phase: &str, + result: crate::agent::oneshot::SpawnResult) { + if let Some(agent) = self.agents.iter_mut().find(|a| a.name == name) { + agent.pid = Some(result.child.id()); + agent.phase = Some(phase.to_string()); + agent.log_path = Some(result.log_path); + agent.child = Some(result.child); + } + } + + /// Check if any agents have completed. Reap child handles, or + /// check pid liveness for restored-from-disk agents. + fn poll_children(&mut self) { + for agent in &mut self.agents { + if let Some(ref mut child) = agent.child { + if let Ok(Some(_)) = child.try_wait() { + agent.pid = None; + agent.phase = None; + agent.child = None; + } + } else if let Some(pid) = agent.pid { + unsafe { + if libc::kill(pid as i32, 0) != 0 { + agent.pid = None; + agent.phase = None; + } + } + } + } + } + + pub fn snapshots(&self, scoring_in_flight: bool, scored_count: usize) -> Vec { + let mut snaps: Vec = self.agents.iter().map(|a| a.snapshot()).collect(); + snaps.push(AgentSnapshot { + name: "memory-scoring".to_string(), + pid: None, + phase: if scoring_in_flight { + Some("scoring...".into()) + } else if scored_count == 0 { + None + } else { + Some(format!("{} scored", scored_count)) + }, + log_path: None, + }); + snaps + } + + /// Restore agent state from a saved snapshot. + pub fn restore(&mut self, saved: &SavedAgentState) { + for sa in &saved.agents { + if let Some(agent) = self.agents.iter_mut().find(|a| a.name == sa.name) { + agent.pid = sa.pid; + agent.phase = sa.phase.clone(); + agent.log_path = sa.log_path.clone(); + } + } + } + + /// Save current state for next hook invocation. + pub fn save(&self, session_id: &str) { + let state = SavedAgentState { agents: self.snapshots(false, 0) }; + state.save(session_id); + } + + /// Run all agent cycles. Call on each user message. + pub fn trigger(&mut self, session: &HookSession) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + self.log(format_args!("\n=== {} agent_cycles ===\n", ts)); + + self.poll_children(); + cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); + + let (surfaced_keys, sleep_secs) = self.surface_observe_cycle(session); + let reflection = self.reflection_cycle(session); + self.journal_cycle(session); + + self.last_output = AgentCycleOutput { surfaced_keys, reflection, sleep_secs }; + } + + fn agent_dir(&self, name: &str) -> PathBuf { + let dir = self.output_dir.join(name); + fs::create_dir_all(&dir).ok(); + dir + } + + fn surface_observe_cycle(&mut self, session: &HookSession) -> (Vec, Option) { + let state_dir = self.agent_dir("surface-observe"); + let transcript = session.transcript(); + let offset_path = state_dir.join("transcript-offset"); + let last_offset: u64 = fs::read_to_string(&offset_path).ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + + // Read surfaced keys + let mut surfaced_keys = Vec::new(); + let surface_path = state_dir.join("surface"); + if let Ok(content) = fs::read_to_string(&surface_path) { + let mut seen = session.seen(); + let seen_path = session.path("seen"); + for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { + if !seen.insert(key.to_string()) { + self.log(format_args!(" skip (seen): {}\n", key)); + continue; + } + surfaced_keys.push(key.to_string()); + if let Ok(mut f) = fs::OpenOptions::new() + .create(true).append(true).open(&seen_path) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + writeln!(f, "{}\t{}", ts, key).ok(); + } + self.log(format_args!(" surfaced: {}\n", key)); + } + fs::remove_file(&surface_path).ok(); + } + + // Spawn new agent if not already running + let running = self.agent_running("surface-observe"); + if running { + self.log(format_args!("surface-observe already running\n")); + } else { + if transcript.size > 0 { + fs::write(&offset_path, transcript.size.to_string()).ok(); + } + if let Some(result) = crate::agent::oneshot::spawn_agent( + "surface-observe", &state_dir, &session.session_id) { + self.log(format_args!("spawned surface-observe pid {}\n", result.child.id())); + self.agent_spawned("surface-observe", "surface", result); + } + } + + // Wait if agent is significantly behind + let mut sleep_secs = None; + let conversation_budget: u64 = 50_000; + + if running && transcript.size > 0 { + let behind = transcript.size.saturating_sub(last_offset); + + if behind > conversation_budget / 2 { + let sleep_start = Instant::now(); + self.log(format_args!("agent {}KB behind\n", behind / 1024)); + + for _ in 0..5 { + std::thread::sleep(std::time::Duration::from_secs(1)); + self.poll_children(); + if !self.agent_running("surface-observe") { break; } + } + + let secs = (Instant::now() - sleep_start).as_secs_f64(); + self.log(format_args!("slept {secs:.2}s\n")); + sleep_secs = Some(secs); + } + } + + (surfaced_keys, sleep_secs) + } + + fn reflection_cycle(&mut self, session: &HookSession) -> Option { + let state_dir = self.agent_dir("reflect"); + let offset_path = state_dir.join("transcript-offset"); + let transcript = session.transcript(); + + let last_offset: u64 = fs::read_to_string(&offset_path).ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + + const REFLECTION_INTERVAL: u64 = 100_000; + if transcript.size.saturating_sub(last_offset) < REFLECTION_INTERVAL { + return None; + } + + if self.agent_running("reflect") { + self.log(format_args!("reflect: already running\n")); + return None; + } + + // Copy walked nodes from surface-observe + let so_state = self.agent_dir("surface-observe"); + if let Ok(walked) = fs::read_to_string(so_state.join("walked")) { + fs::write(state_dir.join("walked"), &walked).ok(); + } + + // Read and consume pending reflection + let reflection = fs::read_to_string(state_dir.join("reflection")).ok() + .filter(|s| !s.trim().is_empty()); + if reflection.is_some() { + fs::remove_file(state_dir.join("reflection")).ok(); + self.log(format_args!("reflect: consumed reflection\n")); + } + + fs::write(&offset_path, transcript.size.to_string()).ok(); + if let Some(result) = crate::agent::oneshot::spawn_agent( + "reflect", &state_dir, &session.session_id) { + self.log(format_args!("reflect: spawned pid {}\n", result.child.id())); + self.agent_spawned("reflect", "step-0", result); + } + + reflection + } + + fn journal_cycle(&mut self, session: &HookSession) { + let state_dir = self.agent_dir("journal"); + let offset_path = state_dir.join("transcript-offset"); + let transcript = session.transcript(); + + let last_offset: u64 = fs::read_to_string(&offset_path).ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + + const JOURNAL_INTERVAL: u64 = 20_000; + if transcript.size.saturating_sub(last_offset) < JOURNAL_INTERVAL { + return; + } + + if self.agent_running("journal") { + self.log(format_args!("journal: already running\n")); + return; + } + + fs::write(&offset_path, transcript.size.to_string()).ok(); + if let Some(result) = crate::agent::oneshot::spawn_agent( + "journal", &state_dir, &session.session_id) { + self.log(format_args!("journal: spawned pid {}\n", result.child.id())); + self.agent_spawned("journal", "step-0", result); + } + } +} + +/// Format agent cycle output for injection into a Claude Code session. +pub fn format_agent_output(output: &AgentCycleOutput) -> String { + let mut out = String::new(); + + if let Some(secs) = output.sleep_secs { + out.push_str(&format!("Slept {secs:.2}s to let observe catch up\n")); + } + + if !output.surfaced_keys.is_empty() { + if let Ok(store) = crate::store::Store::load() { + for key in &output.surfaced_keys { + if let Some(rendered) = crate::cli::node::render_node(&store, key) { + if !rendered.trim().is_empty() { + use std::fmt::Write as _; + writeln!(out, "--- {} (surfaced) ---", key).ok(); + write!(out, "{}", rendered).ok(); + } + } + } + } + } + + if let Some(ref reflection) = output.reflection { + use std::fmt::Write as _; + writeln!(out, "--- subconscious reflection ---").ok(); + write!(out, "{}", reflection.trim()).ok(); + } + + out +} + +fn cleanup_stale_files(dir: &Path, max_age: Duration) { + let entries = match fs::read_dir(dir) { + Ok(e) => e, + Err(_) => return, + }; + let cutoff = SystemTime::now() - max_age; + for entry in entries.flatten() { + if let Ok(meta) = entry.metadata() { + if let Ok(modified) = meta.modified() { + if modified < cutoff { + fs::remove_file(entry.path()).ok(); + } + } + } + } +} diff --git a/src/claude/hook.rs b/src/claude/hook.rs index dabd98b..86e5ab4 100644 --- a/src/claude/hook.rs +++ b/src/claude/hook.rs @@ -12,7 +12,7 @@ use std::process::Command; use std::time::Instant; pub use crate::session::HookSession; -pub use crate::subconscious::subconscious::*; +pub use super::agent_cycles::*; const CHUNK_SIZE: usize = 9000; diff --git a/src/claude/mod.rs b/src/claude/mod.rs index c672c15..d1eac93 100644 --- a/src/claude/mod.rs +++ b/src/claude/mod.rs @@ -7,6 +7,7 @@ // The daemon protocol (daemon_capnp) and universal infrastructure // (channels, supervisor, notify) remain in thalamus/. +pub mod agent_cycles; pub mod context; pub mod hook; pub mod idle; diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 8c787c5..f5ed7de 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -399,7 +399,7 @@ fn llm_compare( let prompt = build_compare_prompt(a, b); let _ = model; // model selection handled by API backend config - let response = crate::subconscious::api::call_api_with_tools_sync( + let response = crate::agent::oneshot::call_api_with_tools_sync( "compare", &[prompt], &[], None, 10, &[], None, &|_| {})?; let response = response.trim().to_uppercase(); diff --git a/src/subconscious/api.rs b/src/subconscious/api.rs deleted file mode 100644 index d0535c0..0000000 --- a/src/subconscious/api.rs +++ /dev/null @@ -1,228 +0,0 @@ -// agents/api.rs — Direct API backend for agent execution -// -// Uses poc-agent's OpenAI-compatible API client to call models directly -// (vllm, llama.cpp, OpenRouter, etc.) instead of shelling out to claude CLI. -// Implements the tool loop: send prompt → if tool_calls, execute them → -// send results back → repeat until text response. -// -// Activated when config has api_base_url set. - -use crate::agent::api::ApiClient; -use crate::agent::api::types::*; -use crate::agent::tools::{self as agent_tools}; - -use std::sync::OnceLock; - -static API_CLIENT: OnceLock = OnceLock::new(); - -fn get_client() -> Result<&'static ApiClient, String> { - Ok(API_CLIENT.get_or_init(|| { - let config = crate::config::get(); - let base_url = config.api_base_url.as_deref().unwrap_or(""); - let api_key = config.api_key.as_deref().unwrap_or(""); - let model = config.api_model.as_deref().unwrap_or("qwen-2.5-27b"); - ApiClient::new(base_url, api_key, model) - })) -} - -/// Run agent prompts through the direct API with tool support. -/// For multi-step agents, each prompt is injected as a new user message -/// after the previous step's tool loop completes. The conversation -/// context carries forward naturally between steps. -/// Returns the final text response after all steps complete. -pub async fn call_api_with_tools( - agent: &str, - prompts: &[String], - phases: &[String], - temperature: Option, - priority: i32, - tools: &[agent_tools::Tool], - bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, - log: &dyn Fn(&str), -) -> Result { - let client = get_client()?; - - // Tools are already filtered by the caller - // Provenance tracks which agent:phase is making writes. - // Updated between steps by the bail function via set_provenance(). - let first_phase = phases.first().map(|s| s.as_str()).unwrap_or(""); - let provenance = std::cell::RefCell::new( - if first_phase.is_empty() { format!("agent:{}", agent) } - else { format!("agent:{}:{}", agent, first_phase) } - ); - - // Start with the first prompt as a user message - let mut messages = vec![Message::user(&prompts[0])]; - let mut next_prompt_idx = 1; // index of next prompt to inject - let reasoning = crate::config::get().api_reasoning.clone(); - - let max_turns = 50 * prompts.len(); - for turn in 0..max_turns { - log(&format!("\n=== TURN {} ({} messages) ===\n", turn, messages.len())); - - let mut last_err = None; - let mut msg_opt = None; - let mut usage_opt = None; - for attempt in 0..5 { - let sampling = crate::agent::api::SamplingParams { - temperature: temperature.unwrap_or(0.6), - top_p: 0.95, - top_k: 20, - }; - match client.chat_completion_stream_temp( - &messages, - tools, - &reasoning, - sampling, - Some(priority), - ).await { - Ok((msg, usage)) => { - msg_opt = Some(msg); - usage_opt = usage; - break; - } - Err(e) => { - let err_str = e.to_string(); - let is_transient = err_str.contains("IncompleteMessage") - || err_str.contains("connection closed") - || err_str.contains("connection reset") - || err_str.contains("timed out") - || err_str.contains("Connection refused"); - if is_transient && attempt < 4 { - log(&format!("transient error (attempt {}): {}, retrying...", - attempt + 1, err_str)); - tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; - last_err = Some(e); - continue; - } - let msg_bytes: usize = messages.iter() - .map(|m| m.content_text().len()) - .sum(); - return Err(format!( - "API error on turn {} (~{}KB payload, {} messages, {} attempts): {}", - turn, msg_bytes / 1024, messages.len(), attempt + 1, e)); - } - } - } - let msg = msg_opt.unwrap(); - if let Some(ref e) = last_err { - log(&format!("succeeded after retry (previous error: {})", e)); - } - - if let Some(u) = &usage_opt { - log(&format!("tokens: {} prompt + {} completion", - u.prompt_tokens, u.completion_tokens)); - } - - let has_content = msg.content.is_some(); - let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); - - if has_tools { - // Push the assistant message with tool calls. - // Sanitize arguments: vllm re-parses them as JSON when - // preprocessing the conversation, so invalid JSON from the - // model crashes the next request. - let mut sanitized = msg.clone(); - if let Some(ref mut calls) = sanitized.tool_calls { - for call in calls { - if serde_json::from_str::(&call.function.arguments).is_err() { - log(&format!("sanitizing malformed args for {}: {}", - call.function.name, &call.function.arguments)); - call.function.arguments = "{}".to_string(); - } - } - } - messages.push(sanitized); - - // Execute each tool call - for call in msg.tool_calls.as_ref().unwrap() { - log(&format!("\nTOOL CALL: {}({})", - call.function.name, - &call.function.arguments)); - - let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { - Ok(v) => v, - Err(_) => { - log(&format!("malformed tool call args: {}", &call.function.arguments)); - messages.push(Message::tool_result( - &call.id, - "Error: your tool call had malformed JSON arguments. Please retry with valid JSON.", - )); - continue; - } - }; - - let output = agent_tools::dispatch(&call.function.name, &args).await; - - if std::env::var("POC_AGENT_VERBOSE").is_ok() { - log(&format!("TOOL RESULT ({} chars):\n{}", output.len(), output)); - } else { - let preview: String = output.lines().next().unwrap_or("").chars().take(100).collect(); - log(&format!("Result: {}", preview)); - } - - messages.push(Message::tool_result(&call.id, &output)); - } - continue; - } - - // Text-only response — step complete - let text = msg.content_text().to_string(); - if text.is_empty() && !has_content { - log("empty response, retrying"); - messages.push(Message::user( - "[system] Your previous response was empty. Please respond with text or use a tool." - )); - continue; - } - - log(&format!("\n=== RESPONSE ===\n\n{}", text)); - - // If there are more prompts, check bail condition and inject the next one - if next_prompt_idx < prompts.len() { - // Run bail check before continuing to next step - if let Some(ref check) = bail_fn { - check(next_prompt_idx)?; - } - // Update provenance for the new phase - if let Some(phase) = phases.get(next_prompt_idx) { - *provenance.borrow_mut() = format!("agent:{}:{}", agent, phase); - } - messages.push(Message::assistant(&text)); - let next = &prompts[next_prompt_idx]; - next_prompt_idx += 1; - log(&format!("\n=== STEP {}/{} ===\n", next_prompt_idx, prompts.len())); - messages.push(Message::user(next)); - continue; - } - - return Ok(text); - } - - Err(format!("agent exceeded {} tool turns", max_turns)) -} - -/// Synchronous wrapper — runs the async function on a dedicated thread -/// with its own tokio runtime. Safe to call from any context. -pub fn call_api_with_tools_sync( - agent: &str, - prompts: &[String], - phases: &[String], - temperature: Option, - priority: i32, - tools: &[agent_tools::Tool], - bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, - log: &(dyn Fn(&str) + Sync), -) -> Result { - std::thread::scope(|s| { - s.spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|e| format!("tokio runtime: {}", e))?; - rt.block_on( - call_api_with_tools(agent, prompts, phases, temperature, priority, tools, bail_fn, log) - ) - }).join().unwrap() - }) -} diff --git a/src/subconscious/audit.rs b/src/subconscious/audit.rs index 66dfd98..d20e6d6 100644 --- a/src/subconscious/audit.rs +++ b/src/subconscious/audit.rs @@ -210,7 +210,7 @@ pub fn link_audit(store: &mut Store, apply: bool) -> Result // Run batches in parallel via rayon let batch_results: Vec<_> = batch_data.par_iter() .map(|(batch_idx, batch_infos, prompt)| { - let response = super::api::call_api_with_tools_sync( + let response = crate::agent::oneshot::call_api_with_tools_sync( "audit", &[prompt.clone()], &[], None, 10, &[], None, &|_| {}); let completed = done.fetch_add(1, Ordering::Relaxed) + 1; eprint!("\r Batches: {}/{} done", completed, total_batches); diff --git a/src/subconscious/digest.rs b/src/subconscious/digest.rs index db1553b..032e4c0 100644 --- a/src/subconscious/digest.rs +++ b/src/subconscious/digest.rs @@ -284,7 +284,7 @@ fn generate_digest( .filter(|t| def.tools.iter().any(|w| w == &t.name)) .collect() }; - let digest = super::api::call_api_with_tools_sync( + let digest = crate::agent::oneshot::call_api_with_tools_sync( &def.agent, &prompts, &phases, def.temperature, def.priority, &tools, None, &log)?; diff --git a/src/subconscious/mod.rs b/src/subconscious/mod.rs index addd15b..a1c1fb2 100644 --- a/src/subconscious/mod.rs +++ b/src/subconscious/mod.rs @@ -17,7 +17,6 @@ // // The session hook (context injection, agent orchestration) moved to claude/hook. -pub mod api; pub mod audit; pub mod consolidate; pub mod daemon;