// agent.rs — Core agent loop // // The simplest possible implementation of the agent pattern: // send messages + tool definitions to the model, if it responds // with tool calls then dispatch them and loop, if it responds // with text then display it and wait for the next prompt. // // Uses streaming by default so text tokens appear as they're // generated. Tool calls are accumulated from stream deltas and // dispatched after the stream completes. // // The DMN (dmn.rs) is the outer loop that decides what prompts // to send here. This module just handles single turns: prompt // in, response out, tool calls dispatched. pub mod api; pub mod context; pub mod oneshot; pub mod readout; pub mod tokenizer; pub mod tools; use std::sync::Arc; use anyhow::Result; use api::ApiClient; use context::{AstNode, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role}; use crate::mind::log::ConversationLog; // --- Activity tracking (RAII guards) --- pub struct ActivityEntry { pub id: u64, pub label: String, pub started: std::time::Instant, /// Auto-expires this long after creation (or completion). pub expires_at: std::time::Instant, } pub struct ActivityGuard { agent: Arc, id: u64, } impl ActivityGuard { pub async fn update(&self, label: impl Into) { let label = label.into(); let mut st = self.agent.state.lock().await; if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) { entry.label = label; } st.changed.notify_one(); } } const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5); impl Drop for ActivityGuard { fn drop(&mut self) { if let Ok(mut st) = self.agent.state.try_lock() { if let Some(entry) = st.activities.iter_mut().find(|a| a.id == self.id) { entry.label.push_str(" (complete)"); entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER; } } } } impl AgentState { pub fn push_activity(&mut self, label: impl Into) -> u64 { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600), }); self.changed.notify_one(); id } pub fn notify(&mut self, label: impl Into) { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + ACTIVITY_LINGER, }); self.changed.notify_one(); } pub fn expire_activities(&mut self) { let now = std::time::Instant::now(); self.activities.retain(|a| a.expires_at > now); } } pub async fn start_activity(agent: &Arc, label: impl Into) -> ActivityGuard { let id = agent.state.lock().await.push_activity(label); ActivityGuard { agent: agent.clone(), id } } /// Result of a single agent turn. pub struct TurnResult { /// Whether the model called yield_to_user during this turn. pub yield_requested: bool, /// Whether any tools (other than yield_to_user) were called. pub had_tool_calls: bool, /// Number of tool calls that returned errors this turn. pub tool_errors: u32, /// Model name to switch to after this turn completes. pub model_switch: Option, /// Agent requested DMN pause (full stop on autonomous behavior). pub dmn_pause: bool, } /// Accumulated state across tool dispatches within a single turn. struct DispatchState { yield_requested: bool, had_tool_calls: bool, tool_errors: u32, model_switch: Option, dmn_pause: bool, } impl DispatchState { fn new() -> Self { Self { yield_requested: false, had_tool_calls: false, tool_errors: 0, model_switch: None, dmn_pause: false, } } } /// Immutable agent config — shared via Arc, no mutex needed. pub struct Agent { pub client: ApiClient, pub app_config: crate::config::AppConfig, pub session_id: String, pub context: crate::Mutex, pub state: crate::Mutex, /// Shared landing pad for per-token concept-readout projections /// streamed from the vLLM server. Populated by the streaming /// token handler, read by UI screens (amygdala). Manifest is /// `None` when the server has readout disabled. pub readout: readout::SharedReadoutBuffer, } /// Mutable agent state — behind its own mutex. /// Which external MCP tools an agent can access. #[derive(Clone)] pub enum McpToolAccess { None, All, Some(Vec), } pub struct AgentState { pub tools: Vec, pub mcp_tools: McpToolAccess, pub last_prompt_tokens: u32, pub reasoning_effort: String, /// Native Qwen thinking — add `\n` to generation prompt. pub think_native: bool, /// Tool-based thinking — add a "think" tool for structured reasoning. pub think_tool: bool, pub temperature: f32, pub top_p: f32, pub top_k: u32, pub activities: Vec, next_activity_id: u64, pub pending_yield: bool, pub pending_model_switch: Option, pub pending_dmn_pause: bool, pub provenance: String, pub generation: u64, pub active_tools: tools::ActiveTools, /// vLLM scheduling priority (lower = higher priority). /// 0 = interactive, 1 = surface agent, 2 = other subconscious, 10 = unconscious. pub priority: Option, pub changed: Arc, } impl Agent { pub async fn new( client: ApiClient, personality: Vec<(String, String)>, app_config: crate::config::AppConfig, conversation_log: Option, active_tools: tools::ActiveTools, agent_tools: Vec, ) -> Arc { let mut context = ContextState::new(); context.conversation_log = conversation_log; let tool_defs: Vec = agent_tools.iter().map(|t| t.to_json()).collect(); if !tool_defs.is_empty() { let tools_text = format!( "# Tools\n\nYou have access to the following functions:\n\n\n{}\n\n\n\ If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\ \n\n\ \nvalue_1\n\n\ \n\n\n\ IMPORTANT: Function calls MUST follow the specified format.", tool_defs.join("\n"), ); context.push_no_log(Section::System, AstNode::system_msg(&tools_text)); } for (name, content) in &personality { context.push_no_log(Section::Identity, AstNode::memory(name, content)); } let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); let readout = readout::new_shared(); let agent = Arc::new(Self { client, app_config, session_id, context: crate::Mutex::new(context), readout, state: crate::Mutex::new(AgentState { tools: agent_tools, mcp_tools: McpToolAccess::All, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), think_native: true, think_tool: false, temperature: 0.6, top_p: 0.95, top_k: 20, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: "manual".to_string(), generation: 0, active_tools, priority: Some(0), changed: Arc::new(tokio::sync::Notify::new()), }), }); agent.load_startup_journal().await; // Probe the vLLM server for its readout manifest. Non-fatal: // if readout isn't enabled the server returns 404 and we // leave the manifest as None, which disables the amygdala // screen gracefully. match agent.client.fetch_readout_manifest().await { Ok(Some(m)) => { dbglog!( "readout manifest: {} concepts, layers={:?}", m.concepts.len(), m.layers, ); if let Ok(mut buf) = agent.readout.lock() { buf.set_manifest(Some(m)); } } Ok(None) => { dbglog!( "readout manifest: server has readout disabled (404)" ); } Err(e) => { dbglog!("readout manifest fetch failed: {}", e); } } agent } /// Fork: clones context for KV cache prefix sharing. pub async fn fork(self: &Arc, tools: Vec) -> Arc { let ctx = self.context.lock().await.clone(); let st = self.state.lock().await; Arc::new(Self { client: self.client.clone(), app_config: self.app_config.clone(), session_id: self.session_id.clone(), context: crate::Mutex::new(ctx), // Forks get an independent readout buffer. The amygdala // screen reads the main conscious agent's buffer only; // subconscious generations (scoring, reflection, etc.) // shouldn't bleed into the main emotional readout even // though they hit the same vLLM server. readout: readout::new_shared(), state: crate::Mutex::new(AgentState { tools, mcp_tools: McpToolAccess::None, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), think_native: st.think_native, think_tool: st.think_tool, temperature: st.temperature, top_p: st.top_p, top_k: st.top_k, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: st.provenance.clone(), generation: 0, active_tools: tools::ActiveTools::new(), priority: None, changed: Arc::new(tokio::sync::Notify::new()), }), }) } pub async fn assemble_prompt_tokens(&self) -> Vec { self.assemble_prompt().await.0 } /// Assemble a ready-to-send prompt: token stream in wire form (each /// image collapsed to a single `<|image_pad|>`) paired with the /// images to attach as multi_modal_data. /// /// Pre-send size check: if the context has grown past budget since the /// last compact (accumulation between turns, a fork's context getting /// bigger than expected, etc.), trim here rather than letting vLLM /// reject the request. Client-side tokenization means we already know /// the exact token count so there's no reason to round-trip an /// oversize request. pub async fn assemble_prompt(&self) -> (Vec, Vec) { let mut ctx = self.context.lock().await; if ctx.total_tokens() > context::context_budget_tokens() { ctx.trim_conversation(); } let st = self.state.lock().await; let (mut tokens, images, _) = ctx.wire_prompt(0..ctx.conversation().len(), |_| false); tokens.push(tokenizer::IM_START); if st.think_native { tokens.extend(tokenizer::encode("assistant\n\n")); } else { tokens.extend(tokenizer::encode("assistant\n")); } (tokens, images) } /// Rebuild the tools section of the system prompt from the current tools list. pub async fn rebuild_tools(&self) { let st = self.state.lock().await; let tool_defs: Vec = st.tools.iter().map(|t| t.to_json()).collect(); drop(st); let mut ctx = self.context.lock().await; ctx.clear(Section::System); if !tool_defs.is_empty() { let tools_text = format!( "# Tools\n\nYou have access to the following functions:\n\n\n{}\n\n\n\ If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\ \n\n\ \nvalue_1\n\n\ \n\n\n\ IMPORTANT: Function calls MUST follow the specified format.", tool_defs.join("\n"), ); ctx.push_no_log(Section::System, AstNode::system_msg(&tools_text)); } } pub async fn push_node(&self, node: AstNode) { let node = node.with_timestamp(chrono::Utc::now()); self.context.lock().await.push_log(Section::Conversation, node); self.state.lock().await.changed.notify_one(); } /// Run the agent turn loop: assemble prompt, stream response, /// parse into AST, dispatch tool calls, repeat until text response. pub async fn turn( agent: Arc, ) -> Result { // Collect finished background tools { let finished = agent.state.lock().await.active_tools.take_finished(); if !finished.is_empty() { let mut bg_ds = DispatchState::new(); let mut results = Vec::new(); for entry in finished { if let Ok((call, output)) = entry.handle.await { results.push((call, output)); } } Agent::apply_tool_results(&agent, results, &mut bg_ds).await; } } let mut overflow_retries: u32 = 0; let mut overflow_activity: Option = None; let mut empty_retries: u32 = 0; let mut ds = DispatchState::new(); loop { let _thinking = start_activity(&agent, "thinking...").await; let (rx, _stream_guard) = { let (prompt_tokens, images) = agent.assemble_prompt().await; let st = agent.state.lock().await; agent.client.stream_completion_mm( &prompt_tokens, &images, api::SamplingParams { temperature: st.temperature, top_p: st.top_p, top_k: st.top_k, }, st.priority, ) }; let branch_idx = { let mut ctx = agent.context.lock().await; let idx = ctx.len(Section::Conversation); ctx.push_log(Section::Conversation, AstNode::branch(Role::Assistant, vec![]) .with_timestamp(chrono::Utc::now())); idx }; let parser = ResponseParser::new(branch_idx); let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone()); let mut pending_calls: Vec = Vec::new(); while let Some(call) = tool_rx.recv().await { let call_clone = call.clone(); let agent_handle = agent.clone(); let handle = tokio::spawn(async move { let args: serde_json::Value = serde_json::from_str(&call_clone.arguments).unwrap_or_default(); let output = tools::dispatch_with_agent( &call_clone.name, &args, Some(agent_handle), ).await; (call_clone, output) }); agent.state.lock().await.active_tools.push(tools::ActiveToolCall { id: call.id.clone(), name: call.name.clone(), detail: call.arguments.clone(), started: std::time::Instant::now(), background: false, handle, }); pending_calls.push(call); } // Check for stream/parse errors match parser_handle.await { Ok(Err(e)) => { if context::is_context_overflow(&e) && overflow_retries < 2 { overflow_retries += 1; let msg = format!("context overflow — compacting ({}/2)", overflow_retries); match &overflow_activity { Some(a) => a.update(&msg).await, None => overflow_activity = Some( start_activity(&agent, &msg).await), } agent.compact().await; continue; } return Err(e); } Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)), Ok(Ok(())) => { // Assistant response was pushed to context by the parser; // log it now that parsing is complete. let ctx = agent.context.lock().await; if let Some(ref log) = ctx.conversation_log { let node = &ctx.conversation()[branch_idx]; if let Err(e) = log.append_node(node) { dbglog!("warning: log: {:#}", e); } } } } // Empty response — nudge and retry let has_content = { let ctx = agent.context.lock().await; !ctx.conversation()[branch_idx].children().is_empty() }; if !has_content && pending_calls.is_empty() { if empty_retries < 2 { empty_retries += 1; agent.push_node(AstNode::user_msg( "[system] Your previous response was empty. \ Please respond with text or use a tool." )).await; continue; } } else { empty_retries = 0; } // Wait for tool calls to complete if !pending_calls.is_empty() { ds.had_tool_calls = true; let handles = agent.state.lock().await.active_tools.take_foreground(); let mut results = Vec::new(); for entry in handles { if let Ok((call, output)) = entry.handle.await { results.push((call, output)); } } Agent::apply_tool_results(&agent, results, &mut ds).await; if !agent.state.lock().await.pending_yield { continue; } } // Text-only response — extract text and return let mut st = agent.state.lock().await; if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; } if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); } if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; } return Ok(TurnResult { yield_requested: ds.yield_requested, had_tool_calls: ds.had_tool_calls, tool_errors: ds.tool_errors, model_switch: ds.model_switch, dmn_pause: ds.dmn_pause, }); } } fn make_tool_result_node(call: &PendingToolCall, output: &str) -> AstNode { if call.name == "memory_render" && !output.starts_with("Error:") { let args: serde_json::Value = serde_json::from_str(&call.arguments).unwrap_or_default(); if let Some(key) = args.get("key").and_then(|v| v.as_str()) { return AstNode::memory(key, output); } } AstNode::tool_result(output) } async fn apply_tool_results( agent: &Arc, results: Vec<(PendingToolCall, String)>, ds: &mut DispatchState, ) { let mut nodes = Vec::new(); for (call, output) in &results { if call.name == "yield_to_user" { continue; } ds.had_tool_calls = true; if output.starts_with("Error:") { ds.tool_errors += 1; } nodes.push(Self::make_tool_result_node(call, output)); } { let mut st = agent.state.lock().await; for (call, _) in &results { st.active_tools.remove(&call.id); } } { let mut ctx = agent.context.lock().await; for node in nodes { ctx.push_log(Section::Conversation, node); } } agent.state.lock().await.changed.notify_one(); } async fn load_startup_journal(&self) { use crate::agent::tools::memory::journal_tail; let oldest_msg_ts = { let ctx = self.context.lock().await; ctx.conversation_log.as_ref().and_then(|log| log.oldest_timestamp()) }; // Get recent journal entries (newest first) let journal_entries = match journal_tail(None, Some(100), Some(0), None).await { Ok(e) => e, Err(_) => return, }; // Filter to entries before the conversation started let cutoff_ts = oldest_msg_ts.map(|t| t.timestamp()); let filtered: Vec<_> = journal_entries.into_iter() .filter(|e| cutoff_ts.map(|ts| e.created_at < ts).unwrap_or(true)) .collect(); let journal_budget = context::context_window() * 15 / 100; let mut entries = Vec::new(); let mut total_tokens = 0; // Take entries within budget (they're newest-first, so reverse for display) for entry in filtered.iter() { let ts = chrono::DateTime::from_timestamp(entry.created_at, 0); let ast = AstNode::memory(&entry.key, &entry.content) .with_timestamp(ts.unwrap_or_else(chrono::Utc::now)); let tok = ast.tokens(); if total_tokens + tok > journal_budget && !entries.is_empty() { break; } total_tokens += tok; entries.push(ast); } entries.reverse(); if entries.is_empty() { return; } let mut ctx = self.context.lock().await; ctx.clear(Section::Journal); for entry in entries { ctx.push_no_log(Section::Journal, entry); } } pub async fn compact(&self) { // Identity section is left in place — mid-session rebuilds discard // memory scores. Content edits to personality nodes get picked up at // the next restart via new() + restore_from_log(). self.load_startup_journal().await; self.context.lock().await.trim_conversation(); let mut st = self.state.lock().await; st.generation += 1; st.last_prompt_tokens = 0; } pub async fn restore_from_log(&self) -> bool { let tail = { let ctx = self.context.lock().await; match &ctx.conversation_log { Some(log) => match log.read_tail() { Ok(t) => t, Err(_) => return false, }, None => return false, } }; let budget = context::context_budget_tokens(); let fixed = { let ctx = self.context.lock().await; ctx.system().iter().chain(ctx.identity().iter()) .map(|n| n.tokens()).sum::() }; let conv_budget = budget.saturating_sub(fixed); // Walk backwards (newest first), retokenize, stop at budget let mut kept = Vec::new(); let mut total = 0; for node in tail.iter() { let node = node.retokenize(); let tok = node.tokens(); if total + tok > conv_budget && !kept.is_empty() { break; } total += tok; kept.push(node); } kept.reverse(); { let mut ctx = self.context.lock().await; ctx.clear(Section::Conversation); for node in kept { ctx.push_no_log(Section::Conversation, node); } } self.compact().await; self.state.lock().await.last_prompt_tokens = self.context.lock().await.tokens() as u32; true } pub fn model(&self) -> &str { &self.client.model } }