// agent.rs — Core agent loop // // The simplest possible implementation of the agent pattern: // send messages + tool definitions to the model, if it responds // with tool calls then dispatch them and loop, if it responds // with text then display it and wait for the next prompt. // // Uses streaming by default so text tokens appear as they're // generated. Tool calls are accumulated from stream deltas and // dispatched after the stream completes. // // The DMN (dmn.rs) is the outer loop that decides what prompts // to send here. This module just handles single turns: prompt // in, response out, tool calls dispatched. pub mod api; pub mod context; pub mod oneshot; pub mod tokenizer; pub mod tools; use std::sync::Arc; use anyhow::Result; use api::ApiClient; use context::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role}; use crate::mind::log::ConversationLog; // --- Activity tracking (RAII guards) --- pub struct ActivityEntry { pub id: u64, pub label: String, pub started: std::time::Instant, /// Auto-expires this long after creation (or completion). pub expires_at: std::time::Instant, } pub struct ActivityGuard { agent: Arc, id: u64, } const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5); impl Drop for ActivityGuard { fn drop(&mut self) { if let Ok(mut st) = self.agent.state.try_lock() { if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) { entry.label.push_str(" (complete)"); entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER; } } } } impl AgentState { pub fn push_activity(&mut self, label: impl Into) -> u64 { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600), }); self.changed.notify_one(); id } pub fn notify(&mut self, label: impl Into) { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + ACTIVITY_LINGER, }); self.changed.notify_one(); } pub fn expire_activities(&mut self) { let now = std::time::Instant::now(); self.activities.retain(|a| a.expires_at > now); } } pub async fn start_activity(agent: &Arc, label: impl Into) -> ActivityGuard { let id = agent.state.lock().await.push_activity(label); ActivityGuard { agent: agent.clone(), id } } /// Result of a single agent turn. pub struct TurnResult { /// The text response (already sent through UI channel). #[allow(dead_code)] pub text: String, /// Whether the model called yield_to_user during this turn. pub yield_requested: bool, /// Whether any tools (other than yield_to_user) were called. pub had_tool_calls: bool, /// Number of tool calls that returned errors this turn. pub tool_errors: u32, /// Model name to switch to after this turn completes. pub model_switch: Option, /// Agent requested DMN pause (full stop on autonomous behavior). pub dmn_pause: bool, } /// Accumulated state across tool dispatches within a single turn. struct DispatchState { yield_requested: bool, had_tool_calls: bool, tool_errors: u32, model_switch: Option, dmn_pause: bool, } impl DispatchState { fn new() -> Self { Self { yield_requested: false, had_tool_calls: false, tool_errors: 0, model_switch: None, dmn_pause: false, } } } /// Immutable agent config — shared via Arc, no mutex needed. pub struct Agent { pub client: ApiClient, pub app_config: crate::config::AppConfig, pub prompt_file: String, pub session_id: String, pub context: tokio::sync::Mutex, pub state: tokio::sync::Mutex, } /// Mutable agent state — behind its own mutex. pub struct AgentState { pub tools: Vec, pub last_prompt_tokens: u32, pub reasoning_effort: String, pub temperature: f32, pub top_p: f32, pub top_k: u32, pub activities: Vec, next_activity_id: u64, pub pending_yield: bool, pub pending_model_switch: Option, pub pending_dmn_pause: bool, pub provenance: String, pub conversation_log: Option, pub generation: u64, pub memory_scoring_in_flight: bool, pub active_tools: tools::ActiveTools, pub changed: Arc, } impl Agent { pub async fn new( client: ApiClient, system_prompt: String, personality: Vec<(String, String)>, app_config: crate::config::AppConfig, prompt_file: String, conversation_log: Option, active_tools: tools::ActiveTools, ) -> Arc { let mut context = ContextState::new(); context.push(Section::System, AstNode::system_msg(&system_prompt)); let tool_defs: Vec = tools::tools().iter() .map(|t| t.to_json()).collect(); if !tool_defs.is_empty() { let tools_text = format!( "# Tools\n\nYou have access to the following functions:\n\n\n{}\n\n\n\ If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\ \n\n\ \nvalue_1\n\n\ \n\n\n\ IMPORTANT: Function calls MUST follow the specified format.", tool_defs.join("\n"), ); context.push(Section::System, AstNode::system_msg(&tools_text)); } for (name, content) in &personality { context.push(Section::Identity, AstNode::memory(name, content)); } let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); let agent = Arc::new(Self { client, app_config, prompt_file, session_id, context: tokio::sync::Mutex::new(context), state: tokio::sync::Mutex::new(AgentState { tools: tools::tools(), last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: 0.6, top_p: 0.95, top_k: 20, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: "manual".to_string(), conversation_log, generation: 0, memory_scoring_in_flight: false, active_tools, changed: Arc::new(tokio::sync::Notify::new()), }), }); agent.load_startup_journal().await; agent } /// Fork: clones context for KV cache prefix sharing. pub async fn fork(self: &Arc, tools: Vec) -> Arc { let ctx = self.context.lock().await.clone(); let st = self.state.lock().await; Arc::new(Self { client: self.client.clone(), app_config: self.app_config.clone(), prompt_file: self.prompt_file.clone(), session_id: self.session_id.clone(), context: tokio::sync::Mutex::new(ctx), state: tokio::sync::Mutex::new(AgentState { tools, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: st.temperature, top_p: st.top_p, top_k: st.top_k, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: st.provenance.clone(), conversation_log: None, generation: 0, memory_scoring_in_flight: false, active_tools: tools::ActiveTools::new(), changed: Arc::new(tokio::sync::Notify::new()), }), }) } pub async fn assemble_prompt_tokens(&self) -> Vec { let ctx = self.context.lock().await; let mut tokens = ctx.token_ids(); tokens.push(tokenizer::IM_START); tokens.extend(tokenizer::encode("assistant\n")); tokens } pub async fn push_node(&self, node: AstNode) { let st = self.state.lock().await; if let Some(ref log) = st.conversation_log { if let Err(e) = log.append_node(&node) { eprintln!("warning: failed to log entry: {:#}", e); } } drop(st); self.context.lock().await.push(Section::Conversation, node); self.state.lock().await.changed.notify_one(); } /// Run the agent turn loop: assemble prompt, stream response, /// parse into AST, dispatch tool calls, repeat until text response. pub async fn turn( agent: Arc, ) -> Result { // Collect finished background tools { let finished = agent.state.lock().await.active_tools.take_finished(); if !finished.is_empty() { let mut bg_ds = DispatchState::new(); for entry in finished { if let Ok((call, output)) = entry.handle.await { Agent::apply_tool_result(&agent, &call, output, &mut bg_ds).await; } } } } let mut overflow_retries: u32 = 0; let mut empty_retries: u32 = 0; let mut ds = DispatchState::new(); loop { let _thinking = start_activity(&agent, "thinking...").await; let (rx, _stream_guard) = { let prompt_tokens = agent.assemble_prompt_tokens().await; let st = agent.state.lock().await; agent.client.stream_completion( &prompt_tokens, api::SamplingParams { temperature: st.temperature, top_p: st.top_p, top_k: st.top_k, }, None, ) }; let branch_idx = { let mut ctx = agent.context.lock().await; let idx = ctx.len(Section::Conversation); ctx.push(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); idx }; let parser = ResponseParser::new(branch_idx); let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone()); let mut pending_calls: Vec = Vec::new(); while let Some(call) = tool_rx.recv().await { let call_clone = call.clone(); let agent_handle = agent.clone(); let handle = tokio::spawn(async move { let args: serde_json::Value = serde_json::from_str(&call_clone.arguments).unwrap_or_default(); let output = tools::dispatch_with_agent( &call_clone.name, &args, Some(agent_handle), ).await; (call_clone, output) }); agent.state.lock().await.active_tools.push(tools::ActiveToolCall { id: call.id.clone(), name: call.name.clone(), detail: call.arguments.clone(), started: std::time::Instant::now(), background: false, handle, }); pending_calls.push(call); } // Check for stream/parse errors match parser_handle.await { Ok(Err(e)) => { if context::is_context_overflow(&e) && overflow_retries < 2 { overflow_retries += 1; agent.state.lock().await.notify( format!("context overflow — retrying ({}/2)", overflow_retries)); agent.compact().await; continue; } return Err(e); } Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)), Ok(Ok(())) => {} } // Empty response — nudge and retry let has_content = { let ctx = agent.context.lock().await; !ctx.conversation()[branch_idx].children().is_empty() }; if !has_content && pending_calls.is_empty() { if empty_retries < 2 { empty_retries += 1; agent.push_node(AstNode::user_msg( "[system] Your previous response was empty. \ Please respond with text or use a tool." )).await; continue; } } else { empty_retries = 0; } // Wait for tool calls to complete if !pending_calls.is_empty() { ds.had_tool_calls = true; let handles = agent.state.lock().await.active_tools.take_foreground(); for entry in handles { if let Ok((call, output)) = entry.handle.await { Agent::apply_tool_result(&agent, &call, output, &mut ds).await; } } continue; } // Text-only response — extract text and return let text = { let ctx = agent.context.lock().await; let children = ctx.conversation()[branch_idx].children(); children.iter() .filter_map(|c| c.leaf()) .filter(|l| matches!(l.body(), NodeBody::Content(_))) .map(|l| l.body().text()) .collect::>() .join("") }; let mut st = agent.state.lock().await; if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; } if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); } if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; } return Ok(TurnResult { text, yield_requested: ds.yield_requested, had_tool_calls: ds.had_tool_calls, tool_errors: ds.tool_errors, model_switch: ds.model_switch, dmn_pause: ds.dmn_pause, }); } } async fn apply_tool_result( agent: &Arc, call: &PendingToolCall, output: String, ds: &mut DispatchState, ) { ds.had_tool_calls = true; if output.starts_with("Error:") { ds.tool_errors += 1; } agent.state.lock().await.active_tools.remove(&call.id); if call.name == "memory_render" && !output.starts_with("Error:") { let args: serde_json::Value = serde_json::from_str(&call.arguments).unwrap_or_default(); if let Some(key) = args.get("key").and_then(|v| v.as_str()) { agent.push_node(AstNode::memory(key, &output)).await; return; } } agent.push_node(AstNode::tool_result(&output)).await; } async fn load_startup_journal(&self) { let oldest_msg_ts = { let st = self.state.lock().await; st.conversation_log.as_ref().and_then(|log| log.oldest_timestamp()) }; let store = match crate::store::Store::load() { Ok(s) => s, Err(_) => return, }; let mut journal_nodes: Vec<_> = store.nodes.values() .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .collect(); journal_nodes.sort_by_key(|n| n.created_at); let cutoff_idx = if let Some(cutoff) = oldest_msg_ts { let cutoff_ts = cutoff.timestamp(); let mut idx = journal_nodes.len(); for (i, node) in journal_nodes.iter().enumerate() { if node.created_at >= cutoff_ts { idx = i + 1; break; } } idx } else { journal_nodes.len() }; let journal_budget = context::context_window() * 15 / 100; let mut entries = Vec::new(); let mut total_tokens = 0; for node in journal_nodes[..cutoff_idx].iter().rev() { let ts = chrono::DateTime::from_timestamp(node.created_at, 0); let ast = AstNode::memory(&node.key, &node.content) .with_timestamp(ts.unwrap_or_else(chrono::Utc::now)); let tok = ast.tokens(); if total_tokens + tok > journal_budget && !entries.is_empty() { break; } total_tokens += tok; entries.push(ast); } entries.reverse(); if entries.is_empty() { return; } let mut ctx = self.context.lock().await; ctx.clear(Section::Journal); for entry in entries { ctx.push(Section::Journal, entry); } } pub async fn compact(&self) { match crate::config::reload_for_model(&self.app_config, &self.prompt_file) { Ok((system_prompt, personality)) => { let mut ctx = self.context.lock().await; ctx.clear(Section::System); ctx.push(Section::System, AstNode::system_msg(&system_prompt)); ctx.clear(Section::Identity); for (name, content) in &personality { ctx.push(Section::Identity, AstNode::memory(name, content)); } } Err(e) => { eprintln!("warning: failed to reload identity: {:#}", e); } } self.load_startup_journal().await; let mut st = self.state.lock().await; st.generation += 1; st.last_prompt_tokens = 0; } pub async fn restore_from_log(&self) -> bool { let nodes = { let st = self.state.lock().await; match &st.conversation_log { Some(log) => match log.read_nodes(64 * 1024 * 1024) { Ok(nodes) if !nodes.is_empty() => nodes, _ => return false, }, None => return false, } }; { let mut ctx = self.context.lock().await; ctx.clear(Section::Conversation); for node in nodes { ctx.push(Section::Conversation, node); } } self.compact().await; let mut st = self.state.lock().await; st.last_prompt_tokens = self.context.lock().await.tokens() as u32; true } pub fn model(&self) -> &str { &self.client.model } }