// agent.rs — Core agent loop // // The simplest possible implementation of the agent pattern: // send messages + tool definitions to the model, if it responds // with tool calls then dispatch them and loop, if it responds // with text then display it and wait for the next prompt. // // Uses streaming by default so text tokens appear as they're // generated. Tool calls are accumulated from stream deltas and // dispatched after the stream completes. // // The DMN (dmn.rs) is the outer loop that decides what prompts // to send here. This module just handles single turns: prompt // in, response out, tool calls dispatched. pub mod api; pub mod context; pub mod oneshot; pub mod tools; use std::sync::Arc; use anyhow::Result; use tiktoken_rs::CoreBPE; use api::{ApiClient, ToolCall}; use api::{ContentPart, Message, MessageContent, Role}; use context::{ConversationEntry, ContextState}; use tools::{summarize_args, working_stack}; use crate::mind::log::ConversationLog; use crate::agent::context::ContextSection; // --- Activity tracking (RAII guards) --- pub struct ActivityEntry { pub id: u64, pub label: String, pub started: std::time::Instant, /// Auto-expires this long after creation (or completion). pub expires_at: std::time::Instant, } /// RAII guard — marks the activity "(complete)" on drop, starts expiry timer. pub struct ActivityGuard { agent: Arc>, id: u64, } const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5); impl Drop for ActivityGuard { fn drop(&mut self) { if let Ok(mut ag) = self.agent.try_lock() { if let Some(entry) = ag.activities.iter_mut().find(|a| a.id == self.id) { entry.label.push_str(" (complete)"); entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER; } } } } impl Agent { /// Register an activity, returns its ID. Caller creates the guard. pub fn push_activity(&mut self, label: impl Into) -> u64 { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600), }); self.changed.notify_one(); id } /// Push a notification — auto-expires after 5 seconds. pub fn notify(&mut self, label: impl Into) { self.expire_activities(); let id = self.next_activity_id; self.next_activity_id += 1; self.activities.push(ActivityEntry { id, label: label.into(), started: std::time::Instant::now(), expires_at: std::time::Instant::now() + ACTIVITY_LINGER, }); self.changed.notify_one(); } /// Remove expired activities. pub fn expire_activities(&mut self) { let now = std::time::Instant::now(); self.activities.retain(|a| a.expires_at > now); } } /// Create an activity guard from outside the lock. pub fn activity_guard(agent: &Arc>, id: u64) -> ActivityGuard { ActivityGuard { agent: agent.clone(), id } } /// Convenience: lock, push activity, unlock, return guard. pub async fn start_activity(agent: &Arc>, label: impl Into) -> ActivityGuard { let id = agent.lock().await.push_activity(label); ActivityGuard { agent: agent.clone(), id } } /// Result of a single agent turn. pub struct TurnResult { /// The text response (already sent through UI channel). #[allow(dead_code)] pub text: String, /// Whether the model called yield_to_user during this turn. pub yield_requested: bool, /// Whether any tools (other than yield_to_user) were called. pub had_tool_calls: bool, /// Number of tool calls that returned errors this turn. pub tool_errors: u32, /// Model name to switch to after this turn completes. pub model_switch: Option, /// Agent requested DMN pause (full stop on autonomous behavior). pub dmn_pause: bool, } /// Accumulated state across tool dispatches within a single turn. struct DispatchState { yield_requested: bool, had_tool_calls: bool, tool_errors: u32, model_switch: Option, dmn_pause: bool, } impl DispatchState { fn new() -> Self { Self { yield_requested: false, had_tool_calls: false, tool_errors: 0, model_switch: None, dmn_pause: false, } } } pub struct Agent { client: ApiClient, tools: Vec, /// Last known prompt token count from the API (tracks context size). last_prompt_tokens: u32, /// Current reasoning effort level ("none", "low", "high"). pub reasoning_effort: String, /// Sampling parameters — adjustable at runtime from the thalamus screen. pub temperature: f32, pub top_p: f32, pub top_k: u32, /// Active activities — RAII guards auto-remove on drop. pub activities: Vec, next_activity_id: u64, /// Control tool flags — set by tool handlers, consumed by turn loop. pub pending_yield: bool, pub pending_model_switch: Option, pub pending_dmn_pause: bool, /// Provenance tag for memory operations — identifies who made the change. pub provenance: String, /// Persistent conversation log — append-only record of all messages. pub conversation_log: Option, /// BPE tokenizer for token counting (cl100k_base — close enough /// for Claude and Qwen budget allocation, ~85-90% count accuracy). tokenizer: CoreBPE, /// Mutable context state — personality, working stack, etc. pub context: ContextState, /// App config — used to reload identity on compaction and model switching. pub app_config: crate::config::AppConfig, pub prompt_file: String, /// Stable session ID for memory-search dedup across turns. pub session_id: String, /// Incremented on compaction — UI uses this to detect resets. pub generation: u64, /// Whether incremental memory scoring is currently running. pub memory_scoring_in_flight: bool, /// Shared active tools — Agent writes, TUI reads. pub active_tools: tools::SharedActiveTools, /// Fires when agent state changes — UI wakes on this instead of polling. pub changed: Arc, } impl Agent { pub fn new( client: ApiClient, system_prompt: String, personality: Vec<(String, String)>, app_config: crate::config::AppConfig, prompt_file: String, conversation_log: Option, active_tools: tools::SharedActiveTools, ) -> Self { let tokenizer = tiktoken_rs::cl100k_base() .expect("failed to load cl100k_base tokenizer"); let context = ContextState { system_prompt: system_prompt.clone(), personality, journal: Vec::new(), working_stack: Vec::new(), entries: Vec::new(), }; let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); let mut agent = Self { client, tools: tools::tools(), last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: 0.6, top_p: 0.95, top_k: 20, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: "manual".to_string(), conversation_log, tokenizer, context, app_config, prompt_file, session_id, generation: 0, memory_scoring_in_flight: false, active_tools, changed: Arc::new(tokio::sync::Notify::new()), }; agent.load_startup_journal(); agent.load_working_stack(); agent } /// Create a lightweight agent forked from this one's context. /// /// The forked agent shares the same conversation prefix (system prompt, /// personality, journal, entries) for KV cache sharing. The caller /// appends the subconscious prompt as a user message and runs the turn. pub fn fork(&self, tools: Vec) -> Self { let tokenizer = tiktoken_rs::cl100k_base() .expect("failed to load cl100k_base tokenizer"); Self { client: self.client.clone(), tools, last_prompt_tokens: 0, reasoning_effort: "none".to_string(), temperature: self.temperature, top_p: self.top_p, top_k: self.top_k, activities: Vec::new(), next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, provenance: self.provenance.clone(), conversation_log: None, tokenizer, context: self.context.clone(), app_config: self.app_config.clone(), prompt_file: self.prompt_file.clone(), session_id: self.session_id.clone(), generation: 0, memory_scoring_in_flight: false, active_tools: tools::shared_active_tools(), changed: Arc::new(tokio::sync::Notify::new()), } } /// Assemble the full message list for the API call from typed sources. /// System prompt + personality context + journal + conversation messages. pub fn assemble_api_messages(&self) -> Vec { let mut msgs = Vec::new(); msgs.push(Message::system(&self.context.system_prompt)); let ctx = self.context.render_context_message(); if !ctx.is_empty() { msgs.push(Message::user(ctx)); } let jnl = context::render_journal(&self.context.journal); if !jnl.is_empty() { msgs.push(Message::user(jnl)); } msgs.extend(self.context.entries.iter() .filter(|e| !e.is_log()) .map(|e| e.api_message().clone())); msgs } /// Run agent orchestration cycle, returning structured output. /// Push a conversation message — stamped and logged. pub fn push_message(&mut self, mut msg: Message) { msg.stamp(); let entry = ConversationEntry::Message(msg); self.push_entry(entry); } pub fn push_entry(&mut self, entry: ConversationEntry) { if let Some(ref log) = self.conversation_log { if let Err(e) = log.append(&entry) { eprintln!("warning: failed to log entry: {:#}", e); } } self.context.entries.push(entry); self.changed.notify_one(); } fn streaming_entry(&mut self) -> Option<&mut Message> { for entry in self.context.entries.iter_mut().rev() { let m = entry.message_mut(); if m.role == Role::Assistant { return if m.timestamp.is_none() { Some(m) } else { None } } } None } /// Append streaming text to the last entry (creating a partial /// assistant entry if needed). Called by collect_stream per token batch. fn append_streaming(&mut self, text: &str) { if let Some(m) = self.streaming_entry() { m.append_content(text); } else { // No streaming entry — create without timestamp so finalize can find it self.context.entries.push(ConversationEntry::Message(Message { role: Role::Assistant, content: Some(MessageContent::Text(text.to_string())), tool_calls: None, tool_call_id: None, name: None, timestamp: None, })); } self.changed.notify_one(); } /// Finalize the streaming entry with the complete response message. /// Finds the unstamped assistant entry, updates it in place, and logs it. fn finalize_streaming(&mut self, msg: Message) { if let Some(m) = self.streaming_entry() { *m = msg.clone(); m.stamp(); } else { // No streaming entry found — push as new (this logs via push_message) self.push_message(msg.clone()); } // Log the finalized entry if let Some(ref log) = self.conversation_log { let entry = ConversationEntry::Message(msg); if let Err(e) = log.append(&entry) { eprintln!("warning: failed to log finalized entry: {:#}", e); } } self.changed.notify_one(); } /// Send a user message and run the agent loop until the model /// produces a text response (no more tool calls). Streams text /// and tool activity through the UI channel. /// /// Takes Arc> and manages locking internally so the /// lock is never held across I/O (API streaming, tool dispatch). pub async fn turn( agent: Arc>, ) -> Result { // --- Pre-loop setup (lock 1): collect finished tools --- let active_tools = { let mut finished = Vec::new(); let tools = { let me = agent.lock().await; // Collect completed background tool handles — remove from active list // but don't await yet (MutexGuard isn't Send). let mut tools = me.active_tools.lock().unwrap(); let mut i = 0; while i < tools.len() { if tools[i].handle.is_finished() { finished.push(tools.remove(i)); } else { i += 1; } } me.active_tools.clone() }; // Await finished handles without holding the agent lock let mut bg_results = Vec::new(); for entry in finished { if let Ok((call, output)) = entry.handle.await { bg_results.push((call, output)); } } // Re-acquire to apply background tool results if !bg_results.is_empty() { let mut me = agent.lock().await; let mut bg_ds = DispatchState::new(); for (call, output) in bg_results { me.apply_tool_result(&call, output, &mut bg_ds); } } tools }; let mut overflow_retries: u32 = 0; let mut empty_retries: u32 = 0; let mut ds = DispatchState::new(); loop { // --- Lock 2: assemble messages, start stream --- let _thinking = start_activity(&agent, "thinking...").await; let (mut rx, _stream_guard) = { let me = agent.lock().await; let api_messages = me.assemble_api_messages(); let sampling = api::SamplingParams { temperature: me.temperature, top_p: me.top_p, top_k: me.top_k, }; me.client.start_stream( &api_messages, &me.tools, &me.reasoning_effort, sampling, None, ) }; // --- Lock released --- // --- Stream loop (no lock) --- let sr = api::collect_stream( &mut rx, &agent, &active_tools, ).await; let api::StreamResult { content, tool_calls, usage, finish_reason, error: stream_error, display_buf, in_tool_call, } = sr; // --- Stream complete --- // --- Lock 3: process results --- let (msg, pending) = { let mut me = agent.lock().await; // Handle stream errors with retry logic if let Some(e) = stream_error { let err = anyhow::anyhow!("{}", e); if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { overflow_retries += 1; me.notify(format!("context overflow — retrying ({}/2)", overflow_retries)); me.compact(); continue; } if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { empty_retries += 1; me.notify(format!("stream error — retrying ({}/2)", empty_retries)); drop(me); tokio::time::sleep(std::time::Duration::from_secs(2)).await; continue; } return Err(err); } if finish_reason.as_deref() == Some("error") { let detail = if content.is_empty() { "no details".into() } else { content }; return Err(anyhow::anyhow!("model stream error: {}", detail)); } // Flush remaining display buffer to streaming entry if !in_tool_call && !display_buf.is_empty() { me.append_streaming(&display_buf); } let msg = api::build_response_message(content, tool_calls); if let Some(usage) = &usage { me.last_prompt_tokens = usage.prompt_tokens; } // Empty response — nudge and retry let has_content = msg.content.is_some(); let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty()); if !has_content && !has_tools { if empty_retries < 2 { empty_retries += 1; dbglog!( "empty response, injecting nudge and retrying ({}/2)", empty_retries, ); me.push_message(Message::user( "[system] Your previous response was empty. \ Please respond with text or use a tool." )); continue; } } else { empty_retries = 0; } // Collect non-background tool calls fired during streaming let mut tools_guard = active_tools.lock().unwrap(); let mut non_bg = Vec::new(); let mut i = 0; while i < tools_guard.len() { if !tools_guard[i].background { non_bg.push(tools_guard.remove(i)); } else { i += 1; } } (msg, non_bg) }; if !pending.is_empty() { agent.lock().await.finalize_streaming(msg.clone()); // Drop lock before awaiting tool handles let mut results = Vec::new(); for entry in pending { if let Ok(r) = entry.handle.await { results.push(r); } } // Reacquire to apply results let mut me = agent.lock().await; for (call, output) in results { me.apply_tool_result(&call, output, &mut ds); } continue; } // Tool calls (structured API path) if let Some(ref tool_calls) = msg.tool_calls { if !tool_calls.is_empty() { agent.lock().await.finalize_streaming(msg.clone()); let calls: Vec = tool_calls.clone(); // Drop lock before tool dispatch for call in &calls { Agent::dispatch_tool_call_unlocked( &agent, &active_tools, call, &mut ds, ).await; } continue; } } // Genuinely text-only response let text = msg.content_text().to_string(); let mut me = agent.lock().await; me.finalize_streaming(msg); // Drain pending control flags if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); } if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; } return Ok(TurnResult { text, yield_requested: ds.yield_requested, had_tool_calls: ds.had_tool_calls, tool_errors: ds.tool_errors, model_switch: ds.model_switch, dmn_pause: ds.dmn_pause, }); } } /// Dispatch a tool call without holding the agent lock across I/O. /// Used by `turn()` which manages its own locking. async fn dispatch_tool_call_unlocked( agent: &Arc>, active_tools: &tools::SharedActiveTools, call: &ToolCall, ds: &mut DispatchState, ) { let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { Ok(v) => v, Err(e) => { let err = format!("Error: malformed tool call arguments: {e}"); let _act = start_activity(agent, format!("rejected: {} (bad args)", call.function.name)).await; let mut me = agent.lock().await; me.apply_tool_result(call, err, ds); return; } }; let args_summary = summarize_args(&call.function.name, &args); let _calling = start_activity(agent, format!("calling: {}", call.function.name)).await; // Spawn tool, track it let call_clone = call.clone(); let agent_handle = agent.clone(); let handle = tokio::spawn(async move { let output = tools::dispatch_with_agent(&call_clone.function.name, &args, Some(agent_handle)).await; (call_clone, output) }); active_tools.lock().unwrap().push( tools::ActiveToolCall { id: call.id.clone(), name: call.function.name.clone(), detail: args_summary, started: std::time::Instant::now(), background: false, handle, } ); // Pop it back and await — no agent lock held let entry = { let mut tools = active_tools.lock().unwrap(); tools.pop().unwrap() }; if let Ok((call, output)) = entry.handle.await { // Brief lock to apply result let mut me = agent.lock().await; me.apply_tool_result(&call, output, ds); } } /// Apply a completed tool result to conversation state. fn apply_tool_result( &mut self, call: &ToolCall, output: String, ds: &mut DispatchState, ) { let args: serde_json::Value = serde_json::from_str(&call.function.arguments).unwrap_or_default(); ds.had_tool_calls = true; if output.starts_with("Error:") { ds.tool_errors += 1; } self.active_tools.lock().unwrap().retain(|t| t.id != call.id); // Tag memory_render results for context deduplication if call.function.name == "memory_render" && !output.starts_with("Error:") { if let Some(key) = args.get("key").and_then(|v| v.as_str()) { let mut msg = Message::tool_result(&call.id, &output); msg.stamp(); self.push_entry(ConversationEntry::Memory { key: key.to_string(), message: msg, score: None }); return; } } self.push_message(Message::tool_result(&call.id, &output)); } /// Token budget by category — cheap, no formatting. Used for compaction decisions. pub fn context_budget(&self) -> context::ContextBudget { let count = |m: &Message| context::msg_token_count(&self.tokenizer, m); let system = count(&Message::system(&self.context.system_prompt)); let identity = count(&Message::user(&self.context.render_context_message())); let journal_rendered = context::render_journal(&self.context.journal); let journal = if journal_rendered.is_empty() { 0 } else { count(&Message::user(&journal_rendered)) }; let memory: usize = self.context.entries.iter() .filter(|e| e.is_memory()) .map(|e| count(e.message())) .sum(); let conversation: usize = self.context.entries.iter() .filter(|e| !e.is_memory() && !e.is_log()) .map(|e| count(e.api_message())) .sum(); context::ContextBudget { system, identity, journal, memory, conversation } } /// Build context state summary for the debug screen. pub fn context_state_summary(&self) -> Vec { let count_msg = |m: &Message| context::msg_token_count(&self.tokenizer, m); let mut sections = Vec::new(); // System prompt — counted as the actual message sent let system_msg = Message::system(&self.context.system_prompt); sections.push(ContextSection { name: "System prompt".into(), tokens: count_msg(&system_msg), content: self.context.system_prompt.clone(), children: Vec::new(), }); // Context message (personality + working stack) — counted as the // single user message that assemble_api_messages sends let context_rendered = self.context.render_context_message(); let context_msg = Message::user(&context_rendered); sections.push(ContextSection { name: format!("Identity ({} files + stack)", self.context.personality.len()), tokens: count_msg(&context_msg), content: context_rendered, children: Vec::new(), }); // Journal — counted as the single rendered message sent let journal_rendered = context::render_journal(&self.context.journal); let journal_msg = Message::user(&journal_rendered); sections.push(ContextSection { name: format!("Journal ({} entries)", self.context.journal.len()), tokens: if journal_rendered.is_empty() { 0 } else { count_msg(&journal_msg) }, content: journal_rendered, children: Vec::new(), }); // Memory nodes — extracted from Memory entries in the conversation let memory_entries: Vec<&ConversationEntry> = self.context.entries.iter() .filter(|e| e.is_memory()) .collect(); if !memory_entries.is_empty() { let node_children: Vec = memory_entries.iter() .map(|entry| { let (key, score) = match entry { ConversationEntry::Memory { key, score, .. } => (key.as_str(), *score), _ => unreachable!(), }; let label = match score { Some(s) => format!("{} (score:{:.1})", key, s), None => key.to_string(), }; ContextSection { name: label, tokens: count_msg(entry.message()), content: String::new(), children: Vec::new(), } }) .collect(); let node_tokens: usize = node_children.iter().map(|c| c.tokens).sum(); sections.push(ContextSection { name: format!("Memory nodes ({} loaded)", memory_entries.len()), tokens: node_tokens, content: String::new(), children: node_children, }); } // Conversation — memories excluded (counted in their own section above) let conv_children = self.entry_sections(&count_msg, 0, false); let conv_tokens: usize = conv_children.iter().map(|c| c.tokens).sum(); sections.push(ContextSection { name: format!("Conversation ({} messages)", conv_children.len()), tokens: conv_tokens, content: String::new(), children: conv_children, }); sections } /// Build ContextSection nodes for conversation entries starting at `from`. /// When `include_memories` is false, memory entries are excluded (they get /// their own section in context_state_summary to avoid double-counting). fn entry_sections( &self, count_msg: &dyn Fn(&Message) -> usize, from: usize, include_memories: bool, ) -> Vec { let cfg = crate::config::get(); self.context.entries.iter().enumerate() .skip(from) .filter(|(_, e)| !e.is_log() && (include_memories || !e.is_memory())) .map(|(i, entry)| { let m = entry.message(); let text = m.content.as_ref() .map(|c| c.as_text().to_string()) .unwrap_or_default(); let (role_name, label) = if let ConversationEntry::Memory { key, score, .. } = entry { let label = match score { Some(s) => format!("[memory: {} score:{:.1}]", key, s), None => format!("[memory: {}]", key), }; ("mem".to_string(), label) } else { let tool_info = m.tool_calls.as_ref().map(|tc| { tc.iter() .map(|c| c.function.name.clone()) .collect::>() .join(", ") }); let label = match &tool_info { Some(tools) => format!("[tool_call: {}]", tools), None => { let preview: String = text.chars().take(60).collect(); let preview = preview.replace('\n', " "); if text.len() > 60 { format!("{}...", preview) } else { preview } } }; let role_name = match m.role { Role::Assistant => cfg.assistant_name.clone(), Role::User => cfg.user_name.clone(), Role::Tool => "tool".to_string(), Role::System => "system".to_string(), }; (role_name, label) }; ContextSection { name: format!("[{}] {}: {}", i, role_name, label), tokens: count_msg(entry.api_message()), content: text, children: Vec::new(), } }) .collect() } /// Context sections for entries from `from` onward — used by the /// subconscious debug screen to show forked agent conversations. pub fn conversation_sections_from(&self, from: usize) -> Vec { let count_msg = |m: &Message| context::msg_token_count(&self.tokenizer, m); self.entry_sections(&count_msg, from, true) } /// Load recent journal entries at startup for orientation. /// Uses the same budget logic as compaction but with empty conversation. /// Only parses the tail of the journal file (last 64KB) for speed. fn load_startup_journal(&mut self) { let store = match crate::store::Store::load() { Ok(s) => s, Err(_) => return, }; // Find oldest message timestamp in conversation log let oldest_msg_ts = self.conversation_log.as_ref() .and_then(|log| log.oldest_timestamp()); // Get journal entries from the memory graph let mut journal_nodes: Vec<_> = store.nodes.values() .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .collect(); let mut dbg = std::fs::OpenOptions::new().create(true).append(true) .open("/tmp/poc-journal-debug.log").ok(); macro_rules! dbg_log { ($($arg:tt)*) => { if let Some(ref mut f) = dbg { use std::io::Write; let _ = writeln!(f, $($arg)*); } } } dbg_log!("[journal] {} nodes, oldest_msg={:?}", journal_nodes.len(), oldest_msg_ts); journal_nodes.sort_by_key(|n| n.created_at); if let Some(first) = journal_nodes.first() { dbg_log!("[journal] first created_at={}", first.created_at); } if let Some(last) = journal_nodes.last() { dbg_log!("[journal] last created_at={}", last.created_at); } // Find the cutoff index — entries older than conversation, plus one overlap let cutoff_idx = if let Some(cutoff) = oldest_msg_ts { let cutoff_ts = cutoff.timestamp(); dbg_log!("[journal] cutoff timestamp={}", cutoff_ts); let mut idx = journal_nodes.len(); for (i, node) in journal_nodes.iter().enumerate() { if node.created_at >= cutoff_ts { idx = i + 1; break; } } idx } else { journal_nodes.len() }; dbg_log!("[journal] cutoff_idx={}", cutoff_idx); // Walk backwards from cutoff, accumulating entries within 15% of context let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); let context_window = crate::agent::context::context_window(); let journal_budget = context_window * 15 / 100; dbg_log!("[journal] budget={} tokens ({}*15%)", journal_budget, context_window); let mut entries = Vec::new(); let mut total_tokens = 0; for node in journal_nodes[..cutoff_idx].iter().rev() { let tokens = count(&node.content); if total_tokens + tokens > journal_budget && !entries.is_empty() { break; } entries.push(context::JournalEntry { timestamp: chrono::DateTime::from_timestamp(node.created_at, 0) .unwrap_or_default(), content: node.content.clone(), }); total_tokens += tokens; } entries.reverse(); dbg_log!("[journal] loaded {} entries, {} tokens", entries.len(), total_tokens); if entries.is_empty() { dbg_log!("[journal] no entries!"); return; } self.context.journal = entries; dbg_log!("[journal] context.journal now has {} entries", self.context.journal.len()); } /// Called after any change to context state (working stack, etc). /// Load working stack from disk. fn load_working_stack(&mut self) { if let Ok(data) = std::fs::read_to_string(working_stack::file_path()) { if let Ok(stack) = serde_json::from_str::>(&data) { self.context.working_stack = stack; } } } /// Replace base64 image data in older messages with text placeholders. /// Keeps the 2 most recent images live (enough for motion/comparison). /// The tool result message before each image records what was loaded. pub fn age_out_images(&mut self) { // Find image entries newest-first, skip 1 (caller is about to add another) let to_age: Vec = self.context.entries.iter().enumerate() .rev() .filter(|(_, e)| { if let Some(MessageContent::Parts(parts)) = &e.message().content { parts.iter().any(|p| matches!(p, ContentPart::ImageUrl { .. })) } else { false } }) .map(|(i, _)| i) .skip(1) // keep 1 existing + 1 about to be added = 2 live .collect(); for i in to_age { let msg = self.context.entries[i].message_mut(); if let Some(MessageContent::Parts(parts)) = &msg.content { let mut replacement = String::new(); for part in parts { match part { ContentPart::Text { text } => { if !replacement.is_empty() { replacement.push('\n'); } replacement.push_str(text); } ContentPart::ImageUrl { .. } => { if !replacement.is_empty() { replacement.push('\n'); } replacement.push_str("[image aged out]"); } } } msg.content = Some(MessageContent::Text(replacement)); } } self.generation += 1; } /// Strip ephemeral tool calls from the conversation history. /// /// Last prompt token count reported by the API. pub fn last_prompt_tokens(&self) -> u32 { self.last_prompt_tokens } /// Rebuild the context window: reload identity, dedup, trim, reload journal. pub fn compact(&mut self) { // Reload identity from config match crate::config::reload_for_model(&self.app_config, &self.prompt_file) { Ok((system_prompt, personality)) => { self.context.system_prompt = system_prompt; self.context.personality = personality; } Err(e) => { eprintln!("warning: failed to reload identity: {:#}", e); } } let before = self.context.entries.len(); let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); let before_conv = before - before_mem; // Age out images before trimming — they're huge in the request payload self.age_out_images(); // Load journal BEFORE trimming so trim accounts for journal cost self.load_startup_journal(); // Dedup memory, trim to budget let budget = self.context_budget(); let entries = self.context.entries.clone(); self.context.entries = crate::agent::context::trim_entries( &entries, &self.tokenizer, &budget, ); let after = self.context.entries.len(); let after_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); let after_conv = after - after_mem; dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})", before, after, before_mem, after_mem, before_conv, after_conv); self.generation += 1; self.last_prompt_tokens = 0; let budget = self.context_budget(); dbglog!("[compact] budget: {}", budget.format()); } /// Restore from the conversation log. Builds the context window /// the same way compact() does — journal summaries for old messages, /// raw recent messages. This is the unified startup path. /// Returns true if the log had content to restore. pub fn restore_from_log(&mut self) -> bool { let entries = match &self.conversation_log { Some(log) => match log.read_tail(64 * 1024 * 1024) { Ok(entries) if !entries.is_empty() => entries, _ => return false, }, None => return false, }; // Load extra — compact() will dedup, trim, reload identity + journal let all: Vec<_> = entries.into_iter() .filter(|e| !e.is_log() && e.message().role != Role::System) .collect(); let mem_count = all.iter().filter(|e| e.is_memory()).count(); let conv_count = all.len() - mem_count; dbglog!("[restore] loaded {} entries from log (mem: {}, conv: {})", all.len(), mem_count, conv_count); self.context.entries = all; self.compact(); // Estimate prompt tokens so status bar isn't 0 on startup self.last_prompt_tokens = self.context_budget().total() as u32; true } /// Replace the API client (for model switching). pub fn swap_client(&mut self, new_client: ApiClient) { self.client = new_client; } /// Get the model identifier. pub fn model(&self) -> &str { &self.client.model } /// Get the conversation entries for persistence. pub fn entries(&self) -> &[ConversationEntry] { &self.context.entries } /// Mutable access to conversation entries (for /retry). pub fn client_clone(&self) -> ApiClient { self.client.clone() } }