// agent.rs — Core agent loop // // The simplest possible implementation of the agent pattern: // send messages + tool definitions to the model, if it responds // with tool calls then dispatch them and loop, if it responds // with text then display it and wait for the next prompt. // // Uses streaming by default so text tokens appear as they're // generated. Tool calls are accumulated from stream deltas and // dispatched after the stream completes. // // The DMN (dmn.rs) is the outer loop that decides what prompts // to send here. This module just handles single turns: prompt // in, response out, tool calls dispatched. pub mod api; pub mod context; pub mod oneshot; pub mod tokenizer; pub mod tools; use std::sync::Arc; use anyhow::Result; use api::ApiClient; use context::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role}; use tools::summarize_args; use crate::mind::log::ConversationLog; // --- Activity tracking (RAII guards) --- pub struct ActivityEntry { pub id: u64, pub label: String, pub started: std::time::Instant, /// Auto-expires this long after creation (or completion). pub expires_at: std::time::Instant, } /// RAII guard — marks the activity "(complete)" on drop, starts expiry timer. pub struct ActivityGuard { agent: Arc>, id: u64, } const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5); impl Drop for ActivityGuard { fn drop(&mut self) { if let Ok(mut ag) = self.agent.try_lock() { if let Some(entry) = ag.activities.iter_mut().find(|a| a.id == self.id) { entry.label.push_str(" (complete)"); entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER; } } } } impl Agent { /// Register an activity, returns its ID. Caller creates the guard. pub fn push_activity(&mut self, label: impl Into) -> u64 { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600), }); self.changed.notify_one(); id } /// Push a notification — auto-expires after 5 seconds. pub fn notify(&mut self, label: impl Into) { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + ACTIVITY_LINGER, }); self.changed.notify_one(); } /// Remove expired activities. pub fn expire_activities(&mut self) { let now = std::time::Instant::now(); self.activities.retain(|a| a.expires_at > now); } } /// Create an activity guard from outside the lock. pub fn activity_guard(agent: &Arc>, id: u64) -> ActivityGuard { ActivityGuard { agent: agent.clone(), id } } /// Convenience: lock, push activity, unlock, return guard. pub async fn start_activity(agent: &Arc>, label: impl Into) -> ActivityGuard { let id = agent.lock().await.push_activity(label); ActivityGuard { agent: agent.clone(), id } } /// Result of a single agent turn. pub struct TurnResult { /// The text response (already sent through UI channel). #[allow(dead_code)] pub text: String, /// Whether the model called yield_to_user during this turn. pub yield_requested: bool, /// Whether any tools (other than yield_to_user) were called. pub had_tool_calls: bool, /// Number of tool calls that returned errors this turn. pub tool_errors: u32, /// Model name to switch to after this turn completes. pub model_switch: Option, /// Agent requested DMN pause (full stop on autonomous behavior). pub dmn_pause: bool, } /// Accumulated state across tool dispatches within a single turn. struct DispatchState { yield_requested: bool, had_tool_calls: bool, tool_errors: u32, model_switch: Option, dmn_pause: bool, } impl DispatchState { fn new() -> Self { Self { yield_requested: false, had_tool_calls: false, tool_errors: 0, model_switch: None, dmn_pause: false, } } } pub struct Agent { client: ApiClient, tools: Vec, /// Last known prompt token count from the API (tracks context size). last_prompt_tokens: u32, /// Current reasoning effort level ("none", "low", "high"). pub reasoning_effort: String, /// Sampling parameters — adjustable at runtime from the thalamus screen. pub temperature: f32, pub top_p: f32, pub top_k: u32, /// Active activities — RAII guards auto-remove on drop. pub activities: Vec, next_activity_id: u64, /// Control tool flags — set by tool handlers, consumed by turn loop. pub pending_yield: bool, pub pending_model_switch: Option, pub pending_dmn_pause: bool, /// Provenance tag for memory operations — identifies who made the change. pub provenance: String, /// Persistent conversation log — append-only record of all messages. pub conversation_log: Option, pub context: ContextState, /// App config — used to reload identity on compaction and model switching. pub app_config: crate::config::AppConfig, pub prompt_file: String, /// Stable session ID for memory-search dedup across turns. pub session_id: String, /// Incremented on compaction — UI uses this to detect resets. pub generation: u64, /// Whether incremental memory scoring is currently running. pub memory_scoring_in_flight: bool, /// Shared active tools — Agent writes, TUI reads. pub active_tools: tools::SharedActiveTools, /// Fires when agent state changes — UI wakes on this instead of polling. pub changed: Arc, } impl Agent { pub fn new( client: ApiClient, system_prompt: String, personality: Vec<(String, String)>, app_config: crate::config::AppConfig, prompt_file: String, conversation_log: Option, active_tools: tools::SharedActiveTools, ) -> Self { let mut context = ContextState::new(); context.push(Section::System, AstNode::system_msg(&system_prompt)); let tool_defs: Vec = tools::tools().iter() .map(|t| t.to_json()).collect(); if !tool_defs.is_empty() { let tools_text = format!( "# Tools\n\nYou have access to the following functions:\n\n\n{}\n\n\n\ If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\ \n\n\ \nvalue_1\n\n\ \n\n\n\ IMPORTANT: Function calls MUST follow the specified format.", tool_defs.join("\n"), ); context.push(Section::System, AstNode::system_msg(&tools_text)); } for (name, content) in &personality { context.push(Section::Identity, AstNode::memory(name, content)); } let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); let mut agent = Self { client, tools: tools::tools(), last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: 0.6, top_p: 0.95, top_k: 20, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: "manual".to_string(), conversation_log, context, app_config, prompt_file, session_id, generation: 0, memory_scoring_in_flight: false, active_tools, changed: Arc::new(tokio::sync::Notify::new()), }; agent.load_startup_journal(); agent } /// Create a lightweight agent forked from this one's context. /// /// The forked agent shares the same conversation prefix (system prompt, /// personality, journal, entries) for KV cache sharing. The caller /// appends the subconscious prompt as a user message and runs the turn. pub fn fork(&self, tools: Vec) -> Self { Self { client: self.client.clone(), tools, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: self.temperature, top_p: self.top_p, top_k: self.top_k, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: self.provenance.clone(), conversation_log: None, context: self.context.clone(), app_config: self.app_config.clone(), prompt_file: self.prompt_file.clone(), session_id: self.session_id.clone(), generation: 0, memory_scoring_in_flight: false, active_tools: tools::shared_active_tools(), changed: Arc::new(tokio::sync::Notify::new()), } } /// Assemble the full prompt as token IDs. /// Context sections + assistant prompt suffix. pub fn assemble_prompt_tokens(&self) -> Vec { let mut tokens = self.context.token_ids(); tokens.push(tokenizer::IM_START); tokens.extend(tokenizer::encode("assistant\n")); tokens } /// Push a node into the conversation and log it. pub fn push_node(&mut self, node: AstNode) { if let Some(ref log) = self.conversation_log { if let Err(e) = log.append_node(&node) { eprintln!("warning: failed to log entry: {:#}", e); } } self.context.push(Section::Conversation, node); self.changed.notify_one(); } /// Run the agent turn loop: assemble prompt, stream response, /// parse into AST, dispatch tool calls, repeat until text response. pub async fn turn( agent: Arc>, ) -> Result { let active_tools = { let me = agent.lock().await; me.active_tools.clone() }; // Collect finished background tools { let mut finished = Vec::new(); { let mut tools = active_tools.lock().unwrap(); let mut i = 0; while i < tools.len() { if tools[i].handle.is_finished() { finished.push(tools.remove(i)); } else { i += 1; } } } if !finished.is_empty() { let mut results = Vec::new(); for entry in finished { if let Ok((call, output)) = entry.handle.await { results.push((call, output)); } } let mut me = agent.lock().await; let mut bg_ds = DispatchState::new(); for (call, output) in results { me.apply_tool_result(&call, output, &mut bg_ds); } } } let mut overflow_retries: u32 = 0; let mut empty_retries: u32 = 0; let mut ds = DispatchState::new(); loop { let _thinking = start_activity(&agent, "thinking...").await; // Assemble prompt and start stream (brief lock) let (mut rx, _stream_guard) = { let me = agent.lock().await; let prompt_tokens = me.assemble_prompt_tokens(); me.client.stream_completion( &prompt_tokens, api::SamplingParams { temperature: me.temperature, top_p: me.top_p, top_k: me.top_k, }, None, ) }; // Create assistant branch and parser (brief lock) let branch_idx = { let mut me = agent.lock().await; let idx = me.context.len(Section::Conversation); me.context.push(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); idx }; let mut parser = ResponseParser::new(branch_idx); let mut pending_calls: Vec = Vec::new(); let mut had_content = false; let mut stream_error: Option = None; // Stream loop — no lock held across I/O while let Some(event) = rx.recv().await { match event { api::StreamToken::Token { text, id: _ } => { had_content = true; let mut me = agent.lock().await; let calls = parser.feed(&text, &mut me.context); for call in calls { // Dispatch tool call immediately let call_clone = call.clone(); let agent_handle = agent.clone(); let handle = tokio::spawn(async move { let args: serde_json::Value = serde_json::from_str(&call_clone.arguments).unwrap_or_default(); let output = tools::dispatch_with_agent( &call_clone.name, &args, Some(agent_handle), ).await; (call_clone, output) }); active_tools.lock().unwrap().push(tools::ActiveToolCall { id: call.id.clone(), name: call.name.clone(), detail: call.arguments.clone(), started: std::time::Instant::now(), background: false, handle, }); pending_calls.push(call); } } api::StreamToken::Error(e) => { stream_error = Some(e); break; } api::StreamToken::Done { usage } => { if let Some(u) = usage { agent.lock().await.last_prompt_tokens = u.prompt_tokens; } break; } } } // Flush parser remainder { let mut me = agent.lock().await; parser.finish(&mut me.context); } // Handle errors if let Some(e) = stream_error { let err = anyhow::anyhow!("{}", e); let mut me = agent.lock().await; if context::is_context_overflow(&err) && overflow_retries < 2 { overflow_retries += 1; me.notify(format!("context overflow — retrying ({}/2)", overflow_retries)); me.compact(); continue; } if context::is_stream_error(&err) && empty_retries < 2 { empty_retries += 1; me.notify(format!("stream error — retrying ({}/2)", empty_retries)); drop(me); tokio::time::sleep(std::time::Duration::from_secs(2)).await; continue; } return Err(err); } // Empty response — nudge and retry if !had_content && pending_calls.is_empty() { if empty_retries < 2 { empty_retries += 1; let mut me = agent.lock().await; me.push_node(AstNode::user_msg( "[system] Your previous response was empty. \ Please respond with text or use a tool." )); continue; } } else { empty_retries = 0; } // Wait for tool calls to complete if !pending_calls.is_empty() { ds.had_tool_calls = true; // Collect non-background tool handles let mut handles = Vec::new(); { let mut tools_guard = active_tools.lock().unwrap(); let mut i = 0; while i < tools_guard.len() { if !tools_guard[i].background { handles.push(tools_guard.remove(i)); } else { i += 1; } } } for entry in handles { if let Ok((call, output)) = entry.handle.await { let mut me = agent.lock().await; me.apply_tool_result(&call, output, &mut ds); } } continue; } // Text-only response — extract text and return let text = { let me = agent.lock().await; let children = me.context.conversation()[branch_idx].children(); children.iter() .filter_map(|c| c.leaf()) .filter(|l| matches!(l.body(), NodeBody::Content(_))) .map(|l| l.body().text()) .collect::>() .join("") }; let mut me = agent.lock().await; if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); } if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; } return Ok(TurnResult { text, yield_requested: ds.yield_requested, had_tool_calls: ds.had_tool_calls, tool_errors: ds.tool_errors, model_switch: ds.model_switch, dmn_pause: ds.dmn_pause, }); } } /// Dispatch a tool call without holding the agent lock across I/O. async fn dispatch_tool_call_unlocked( agent: &Arc>, active_tools: &tools::SharedActiveTools, call: &PendingToolCall, ds: &mut DispatchState, ) { let args: serde_json::Value = match serde_json::from_str(&call.arguments) { Ok(v) => v, Err(e) => { let err = format!("Error: malformed tool call arguments: {e}"); let _act = start_activity(agent, format!("rejected: {} (bad args)", call.name)).await; let mut me = agent.lock().await; me.apply_tool_result(call, err, ds); return; } }; let args_summary = summarize_args(&call.name, &args); let _calling = start_activity(agent, format!("calling: {}", call.name)).await; let call_clone = call.clone(); let agent_handle = agent.clone(); let handle = tokio::spawn(async move { let output = tools::dispatch_with_agent(&call_clone.name, &args, Some(agent_handle)).await; (call_clone, output) }); active_tools.lock().unwrap().push( tools::ActiveToolCall { id: call.id.clone(), name: call.name.clone(), detail: args_summary, started: std::time::Instant::now(), background: false, handle, } ); let entry = { let mut tools = active_tools.lock().unwrap(); tools.pop().unwrap() }; if let Ok((call, output)) = entry.handle.await { let mut me = agent.lock().await; me.apply_tool_result(&call, output, ds); } } fn apply_tool_result( &mut self, call: &PendingToolCall, output: String, ds: &mut DispatchState, ) { ds.had_tool_calls = true; if output.starts_with("Error:") { ds.tool_errors += 1; } self.active_tools.lock().unwrap().retain(|t| t.id != call.id); // Tag memory_render results as Memory nodes for context deduplication if call.name == "memory_render" && !output.starts_with("Error:") { let args: serde_json::Value = serde_json::from_str(&call.arguments).unwrap_or_default(); if let Some(key) = args.get("key").and_then(|v| v.as_str()) { self.push_node(AstNode::memory(key, &output)); return; } } self.push_node(AstNode::tool_result(&output)); } pub fn conversation_from(&self, from: usize) -> &[AstNode] { let conv = self.context.conversation(); if from < conv.len() { &conv[from..] } else { &[] } } fn load_startup_journal(&mut self) { let store = match crate::store::Store::load() { Ok(s) => s, Err(_) => return, }; let oldest_msg_ts = self.conversation_log.as_ref() .and_then(|log| log.oldest_timestamp()); let mut journal_nodes: Vec<_> = store.nodes.values() .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .collect(); journal_nodes.sort_by_key(|n| n.created_at); let cutoff_idx = if let Some(cutoff) = oldest_msg_ts { let cutoff_ts = cutoff.timestamp(); let mut idx = journal_nodes.len(); for (i, node) in journal_nodes.iter().enumerate() { if node.created_at >= cutoff_ts { idx = i + 1; break; } } idx } else { journal_nodes.len() }; let journal_budget = context::context_window() * 15 / 100; let mut entries = Vec::new(); let mut total_tokens = 0; for node in journal_nodes[..cutoff_idx].iter().rev() { let ts = chrono::DateTime::from_timestamp(node.created_at, 0); let ast = AstNode::memory(&node.key, &node.content) .with_timestamp(ts.unwrap_or_else(chrono::Utc::now)); let tok = ast.tokens(); if total_tokens + tok > journal_budget && !entries.is_empty() { break; } total_tokens += tok; entries.push(ast); } entries.reverse(); if entries.is_empty() { return; } self.context.clear(Section::Journal); for entry in entries { self.context.push(Section::Journal, entry); } } pub fn last_prompt_tokens(&self) -> u32 { self.last_prompt_tokens } /// Rebuild the context window: reload identity, trim, reload journal. pub fn compact(&mut self) { match crate::config::reload_for_model(&self.app_config, &self.prompt_file) { Ok((system_prompt, personality)) => { self.context.clear(Section::System); self.context.push(Section::System, AstNode::system_msg(&system_prompt)); self.context.clear(Section::Identity); for (name, content) in &personality { self.context.push(Section::Identity, AstNode::memory(name, content)); } } Err(e) => { eprintln!("warning: failed to reload identity: {:#}", e); } } self.load_startup_journal(); // TODO: trim_entries — dedup memories, evict to budget self.generation += 1; self.last_prompt_tokens = 0; } /// Restore from the conversation log. /// Returns true if the log had content to restore. pub fn restore_from_log(&mut self) -> bool { let nodes = match &self.conversation_log { Some(log) => match log.read_nodes(64 * 1024 * 1024) { Ok(nodes) if !nodes.is_empty() => nodes, _ => return false, }, None => return false, }; self.context.clear(Section::Conversation); for node in nodes { self.context.push(Section::Conversation, node); } self.compact(); self.last_prompt_tokens = self.context.tokens() as u32; true } pub fn swap_client(&mut self, new_client: ApiClient) { self.client = new_client; } pub fn model(&self) -> &str { &self.client.model } pub fn conversation(&self) -> &[AstNode] { self.context.conversation() } pub fn client_clone(&self) -> ApiClient { self.client.clone() } }