diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 4cef3ec..2e64c7f 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -1,121 +1,1101 @@ -// agent — core agent infrastructure +// agent.rs — Core agent loop // -// Tool dispatch, memory operations, file operations, context -// management, and the agent runner loop. Used by both the -// interactive consciousness binary and subconscious agents. +// The simplest possible implementation of the agent pattern: +// send messages + tool definitions to the model, if it responds +// with tool calls then dispatch them and loop, if it responds +// with text then display it and wait for the next prompt. +// +// Uses streaming by default so text tokens appear as they're +// generated. Tool calls are accumulated from stream deltas and +// dispatched after the stream completes. +// +// The DMN (dmn.rs) is the outer loop that decides what prompts +// to send here. This module just handles single turns: prompt +// in, response out, tool calls dispatched. pub mod context; -pub mod runner; pub mod tools; pub mod training; -pub use tools::bash::ProcessTracker; +use anyhow::Result; +use tiktoken_rs::CoreBPE; -// Re-export ToolDef from agent::types for convenience — -// tools define their schemas using this type. -pub use crate::user::types::ToolDef; +use crate::user::api::ApiClient; +use crate::agent::context as journal; +use crate::user::log::ConversationLog; +use crate::user::api::StreamEvent; +use crate::agent::tools::{ProcessTracker, ToolCall, ToolDef, FunctionCall, summarize_args}; +use crate::user::types::*; +use crate::user::ui_channel::{ContextSection, SharedContextState, StatusInfo, StreamTarget, UiMessage, UiSender}; -/// Result of dispatching a tool call. -pub struct ToolOutput { +/// Result of a single agent turn. +pub struct TurnResult { + /// The text response (already sent through UI channel). + #[allow(dead_code)] pub text: String, - pub is_yield: bool, - /// Base64 data URIs for images to attach to the next message. - pub images: Vec, - /// Model name to switch to (deferred to session level). + /// Whether the model called yield_to_user during this turn. + pub yield_requested: bool, + /// Whether any tools (other than yield_to_user) were called. + pub had_tool_calls: bool, + /// Number of tool calls that returned errors this turn. + pub tool_errors: u32, + /// Model name to switch to after this turn completes. pub model_switch: Option, - /// Agent requested DMN pause (deferred to session level). + /// Agent requested DMN pause (full stop on autonomous behavior). pub dmn_pause: bool, } -impl ToolOutput { - pub fn error(e: impl std::fmt::Display) -> Self { - Self { - text: format!("Error: {}", e), - is_yield: false, - images: Vec::new(), +/// Accumulated state across tool dispatches within a single turn. +struct DispatchState { + yield_requested: bool, + had_tool_calls: bool, + tool_errors: u32, + model_switch: Option, + dmn_pause: bool, +} + +pub struct Agent { + client: ApiClient, + tool_defs: Vec, + /// Last known prompt token count from the API (tracks context size). + last_prompt_tokens: u32, + /// Shared process tracker for bash tool — lets TUI show/kill running commands. + pub process_tracker: ProcessTracker, + /// Current reasoning effort level ("none", "low", "high"). + pub reasoning_effort: String, + /// Persistent conversation log — append-only record of all messages. + conversation_log: Option, + /// BPE tokenizer for token counting (cl100k_base — close enough + /// for Claude and Qwen budget allocation, ~85-90% count accuracy). + tokenizer: CoreBPE, + /// Mutable context state — personality, working stack, etc. + pub context: ContextState, + /// Shared live context summary — TUI reads this directly for debug screen. + pub shared_context: SharedContextState, + /// App config — used to reload identity on compaction. + app_config: crate::config::AppConfig, + pub prompt_file: String, + /// Stable session ID for memory-search dedup across turns. + session_id: String, + /// Agent orchestration state (surface-observe, journal, reflect). + pub agent_cycles: crate::subconscious::subconscious::AgentCycleState, + /// Latest memory importance scores from training scorer. + pub memory_scores: Option, + /// Whether a /score task is currently running. + pub scoring_in_flight: bool, + /// Shared active tools — Agent writes, TUI reads. + pub active_tools: crate::user::ui_channel::SharedActiveTools, + /// Background tool calls that outlive the current turn. + background_tasks: futures::stream::FuturesUnordered< + std::pin::Pin + Send>> + >, +} + +fn render_journal(entries: &[journal::JournalEntry]) -> String { + if entries.is_empty() { return String::new(); } + let mut text = String::from("[Earlier — from your journal]\n\n"); + for entry in entries { + use std::fmt::Write; + writeln!(text, "## {}\n{}\n", entry.timestamp.format("%Y-%m-%dT%H:%M"), entry.content).ok(); + } + text +} + +impl Agent { + pub fn new( + client: ApiClient, + system_prompt: String, + personality: Vec<(String, String)>, + app_config: crate::config::AppConfig, + prompt_file: String, + conversation_log: Option, + shared_context: SharedContextState, + active_tools: crate::user::ui_channel::SharedActiveTools, + ) -> Self { + let tool_defs = tools::definitions(); + let tokenizer = tiktoken_rs::cl100k_base() + .expect("failed to load cl100k_base tokenizer"); + + let context = ContextState { + system_prompt: system_prompt.clone(), + personality, + journal: Vec::new(), + working_stack: Vec::new(), + entries: Vec::new(), + }; + let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); + let agent_cycles = crate::subconscious::subconscious::AgentCycleState::new(&session_id); + let mut agent = Self { + client, + tool_defs, + last_prompt_tokens: 0, + process_tracker: ProcessTracker::new(), + reasoning_effort: "none".to_string(), + conversation_log, + tokenizer, + context, + shared_context, + app_config, + prompt_file, + session_id, + agent_cycles, + memory_scores: None, + scoring_in_flight: false, + active_tools, + background_tasks: futures::stream::FuturesUnordered::new(), + }; + + agent.load_startup_journal(); + agent.load_working_stack(); + agent.publish_context_state(); + agent + } + + /// Assemble the full message list for the API call from typed sources. + /// System prompt + personality context + journal + conversation messages. + fn assemble_api_messages(&self) -> Vec { + let mut msgs = Vec::new(); + msgs.push(Message::system(&self.context.system_prompt)); + let ctx = self.context.render_context_message(); + if !ctx.is_empty() { + msgs.push(Message::user(ctx)); + } + let jnl = render_journal(&self.context.journal); + if !jnl.is_empty() { + msgs.push(Message::user(jnl)); + } + msgs.extend(self.context.entries.iter().map(|e| e.api_message().clone())); + msgs + } + + /// Run agent orchestration cycle, returning structured output. + fn run_agent_cycle(&mut self) -> crate::subconscious::subconscious::AgentCycleOutput { + let transcript_path = self.conversation_log.as_ref() + .map(|l| l.path().to_string_lossy().to_string()) + .unwrap_or_default(); + + let session = crate::session::HookSession::from_fields( + self.session_id.clone(), + transcript_path, + "UserPromptSubmit".into(), + ); + + self.agent_cycles.trigger(&session); + std::mem::take(&mut self.agent_cycles.last_output) + } + + /// Push a conversation message — stamped and logged. + fn push_message(&mut self, mut msg: Message) { + msg.stamp(); + let entry = ConversationEntry::Message(msg); + self.push_entry(entry); + } + + fn push_entry(&mut self, entry: ConversationEntry) { + if let Some(ref log) = self.conversation_log { + if let Err(e) = log.append(&entry) { + eprintln!("warning: failed to log entry: {:#}", e); + } + } + self.context.entries.push(entry); + } + + /// Push a context-only message (system prompt, identity context, + /// journal summaries). Not logged — these are reconstructed on + /// every startup/compaction. + pub fn budget(&self) -> ContextBudget { + let count_str = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); + let count_msg = |m: &Message| crate::agent::context::msg_token_count(&self.tokenizer, m); + let window = crate::agent::context::context_window(); + self.context.budget(&count_str, &count_msg, window) + } + + /// Send a user message and run the agent loop until the model + /// produces a text response (no more tool calls). Streams text + /// and tool activity through the UI channel. + pub async fn turn( + &mut self, + user_input: &str, + ui_tx: &UiSender, + target: StreamTarget, + ) -> Result { + // Run agent orchestration cycle (surface-observe, reflect, journal) + let cycle = self.run_agent_cycle(); + + // Surfaced memories — each as a separate Memory entry + for key in &cycle.surfaced_keys { + if let Some(rendered) = crate::cli::node::render_node( + &crate::store::Store::load().unwrap_or_default(), key, + ) { + let mut msg = Message::user(format!( + "\n--- {} (surfaced) ---\n{}\n", + key, rendered, + )); + msg.stamp(); + self.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); + } + } + + // Reflection — separate system reminder + if let Some(ref reflection) = cycle.reflection { + self.push_message(Message::user(format!( + "\n--- subconscious reflection ---\n{}\n", + reflection.trim(), + ))); + } + + // Inject completed background task results + { + use futures::{StreamExt, FutureExt}; + let mut bg_ds = DispatchState { + yield_requested: false, had_tool_calls: false, + tool_errors: 0, model_switch: None, dmn_pause: false, + }; + while let Some(Some((call, output))) = + std::pin::Pin::new(&mut self.background_tasks).next().now_or_never() + { + // Show result in TUI and inject into conversation + self.apply_tool_result(&call, output, ui_tx, &mut bg_ds); + } + } + + // User input — clean, just what was typed + self.push_message(Message::user(user_input)); + let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots())); + + let mut overflow_retries: u32 = 0; + let mut empty_retries: u32 = 0; + let mut ds = DispatchState { + yield_requested: false, + had_tool_calls: false, + tool_errors: 0, model_switch: None, dmn_pause: false, + }; + + loop { + let _ = ui_tx.send(UiMessage::Activity("thinking...".into())); + + // Stream events from the API — we route each event to the + // appropriate UI pane rather than letting the API layer do it. + let api_messages = self.assemble_api_messages(); + let (mut rx, _stream_guard) = self.client.start_stream( + &api_messages, + Some(&self.tool_defs), + ui_tx, + &self.reasoning_effort, + None, + None, // priority: interactive + ); + + let mut content = String::new(); + let mut tool_calls: Vec = Vec::new(); + let mut usage = None; + let mut finish_reason = None; + let mut in_tool_call = false; + let mut tool_call_buf = String::new(); + let mut stream_error = None; + let mut first_content = true; + // Tool calls fired during streaming (XML path) + let mut inflight: futures::stream::FuturesOrdered< + std::pin::Pin + Send>> + > = futures::stream::FuturesOrdered::new(); + // Buffer for content not yet sent to UI — holds a tail + // that might be a partial tag. + let mut display_buf = String::new(); + + while let Some(event) = rx.recv().await { + match event { + StreamEvent::Content(text) => { + if first_content { + let _ = ui_tx.send(UiMessage::Activity("streaming...".into())); + first_content = false; + } + content.push_str(&text); + + if in_tool_call { + tool_call_buf.push_str(&text); + // Check for closing tag — parse and fire immediately + if let Some(end) = tool_call_buf.find("") { + let body = &tool_call_buf[..end]; + if let Some(call) = crate::user::parsing::parse_tool_call_body(body) { + let args: serde_json::Value = + serde_json::from_str(&call.function.arguments).unwrap_or_default(); + let args_summary = summarize_args(&call.function.name, &args); + let _ = ui_tx.send(UiMessage::ToolCall { + name: call.function.name.clone(), + args_summary: args_summary.clone(), + }); + self.active_tools.write().unwrap().push( + crate::user::ui_channel::ActiveTool { + id: call.id.clone(), + name: call.function.name.clone(), + detail: args_summary, + started: std::time::Instant::now(), + } + ); + let tracker = self.process_tracker.clone(); + let is_background = args.get("run_in_background") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let future = Box::pin(async move { + let output = tools::dispatch(&call.function.name, &args, &tracker).await; + (call, output) + }); + if is_background { + self.background_tasks.push(future); + } else { + inflight.push_back(future); + } + } + // Reset for potential next tool call + let remaining = tool_call_buf[end + "".len()..].to_string(); + tool_call_buf.clear(); + in_tool_call = false; + // Any content after goes back to display + if !remaining.trim().is_empty() { + display_buf.push_str(&remaining); + } + } + } else { + display_buf.push_str(&text); + + if let Some(pos) = display_buf.find("") { + // Flush content before the tag, suppress the rest. + let before = &display_buf[..pos]; + if !before.is_empty() { + let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target)); + } + display_buf.clear(); + in_tool_call = true; + } else { + // Flush display_buf except a tail that could be + // a partial "" (10 chars). + let safe = display_buf.len().saturating_sub(10); + // Find a char boundary at or before safe + let safe = display_buf.floor_char_boundary(safe); + if safe > 0 { + let flush = display_buf[..safe].to_string(); + display_buf = display_buf[safe..].to_string(); + let _ = ui_tx.send(UiMessage::TextDelta(flush, target)); + } + } + } + } + StreamEvent::Reasoning(text) => { + let _ = ui_tx.send(UiMessage::Reasoning(text)); + } + StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => { + while tool_calls.len() <= index { + tool_calls.push(ToolCall { + id: String::new(), + call_type: "function".to_string(), + function: FunctionCall { name: String::new(), arguments: String::new() }, + }); + } + if let Some(id) = id { tool_calls[index].id = id; } + if let Some(ct) = call_type { tool_calls[index].call_type = ct; } + if let Some(n) = name { tool_calls[index].function.name = n; } + if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); } + } + StreamEvent::Usage(u) => usage = Some(u), + StreamEvent::Finished { reason, .. } => { + finish_reason = Some(reason); + break; + } + StreamEvent::Error(e) => { + stream_error = Some(e); + break; + } + } + } + + // Handle stream errors with retry logic + if let Some(e) = stream_error { + let err = anyhow::anyhow!("{}", e); + if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { + overflow_retries += 1; + let _ = ui_tx.send(UiMessage::Info(format!( + "[context overflow — compacting and retrying ({}/2)]", + overflow_retries, + ))); + self.compact(); + continue; + } + if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { + empty_retries += 1; + let _ = ui_tx.send(UiMessage::Info(format!( + "[stream error: {} — retrying ({}/2)]", + e, empty_retries, + ))); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + let _ = ui_tx.send(UiMessage::Activity(String::new())); + return Err(err); + } + + if finish_reason.as_deref() == Some("error") { + let detail = if content.is_empty() { "no details".into() } else { content }; + let _ = ui_tx.send(UiMessage::Activity(String::new())); + return Err(anyhow::anyhow!("model stream error: {}", detail)); + } + + // Flush remaining display buffer (normal responses without tool calls). + if !in_tool_call && !display_buf.is_empty() { + let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target)); + } + if !content.is_empty() && !in_tool_call { + let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); + } + + let msg = crate::user::api::build_response_message(content, tool_calls); + + + + if let Some(usage) = &usage { + self.last_prompt_tokens = usage.prompt_tokens; + + self.publish_context_state(); + let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo { + dmn_state: String::new(), // filled by main loop + dmn_turns: 0, + dmn_max_turns: 0, + prompt_tokens: usage.prompt_tokens, + completion_tokens: usage.completion_tokens, + model: self.client.model.clone(), + turn_tools: 0, // tracked by TUI from ToolCall messages + context_budget: self.budget().status_string(), + })); + } + + // Empty response — model returned finish=stop with no content + // or tool calls. Inject a nudge so the retry has different input. + let has_content = msg.content.is_some(); + let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty()); + if !has_content && !has_tools { + if empty_retries < 2 { + empty_retries += 1; + let _ = ui_tx.send(UiMessage::Debug(format!( + "empty response, injecting nudge and retrying ({}/2)", + empty_retries, + ))); + self.push_message(Message::user( + "[system] Your previous response was empty. \ + Please respond with text or use a tool." + )); + continue; + } + // After max retries, fall through — return the empty response + } else { + empty_retries = 0; + } + + // Collect tool calls that were fired during streaming + if !inflight.is_empty() { + use futures::StreamExt; + self.push_message(msg.clone()); + while let Some((call, output)) = inflight.next().await { + self.apply_tool_result(&call, output, ui_tx, &mut ds); + } + self.publish_context_state(); + continue; + } + + // Tool calls (structured API path — not fired during stream). + if let Some(ref tool_calls) = msg.tool_calls { + if !tool_calls.is_empty() { + self.push_message(msg.clone()); + + for call in tool_calls { + self.dispatch_tool_call(call, None, ui_tx, &mut ds) + .await; + } + continue; + } + } + + // Genuinely text-only response + let text = msg.content_text().to_string(); + let _ = ui_tx.send(UiMessage::Activity(String::new())); + self.push_message(msg); + + return Ok(TurnResult { + text, + yield_requested: ds.yield_requested, + had_tool_calls: ds.had_tool_calls, + tool_errors: ds.tool_errors, + model_switch: ds.model_switch, + dmn_pause: ds.dmn_pause, + }); } } - pub fn text(s: String) -> Self { - Self { - text: s, - is_yield: false, - images: Vec::new(), - model_switch: None, - dmn_pause: false, - } - } -} + /// Dispatch a single tool call: send UI annotations, run the tool, + /// push results into the conversation, handle images. + async fn dispatch_tool_call( + &mut self, + call: &ToolCall, + tag: Option<&str>, + ui_tx: &UiSender, + ds: &mut DispatchState, + ) { + let args: serde_json::Value = + serde_json::from_str(&call.function.arguments).unwrap_or_default(); -/// Truncate output if it exceeds max length, appending a truncation notice. -pub fn truncate_output(mut s: String, max: usize) -> String { - if s.len() > max { - s.truncate(max); - s.push_str("\n... (output truncated)"); - } - s -} - -/// Dispatch a shared tool call. Handles file operations, bash, -/// and memory/journal tools. Returns None for unknown tools -/// (caller should check agent-specific tools). -pub async fn dispatch( - name: &str, - args: &serde_json::Value, - tracker: &ProcessTracker, - provenance: Option<&str>, -) -> Option { - // Memory and journal tools - if name.starts_with("memory_") || name.starts_with("journal_") || name == "output" { - let result = tools::memory::dispatch(name, args, provenance); - return Some(match result { - Ok(s) => ToolOutput::text(s), - Err(e) => ToolOutput::error(e), + let args_summary = summarize_args(&call.function.name, &args); + let label = match tag { + Some(t) => format!("calling: {} ({})", call.function.name, t), + None => format!("calling: {}", call.function.name), + }; + let _ = ui_tx.send(UiMessage::Activity(label)); + let _ = ui_tx.send(UiMessage::ToolCall { + name: call.function.name.clone(), + args_summary: args_summary.clone(), }); + self.active_tools.write().unwrap().push( + crate::user::ui_channel::ActiveTool { + id: call.id.clone(), + name: call.function.name.clone(), + detail: args_summary, + started: std::time::Instant::now(), + } + ); + + // Handle working_stack tool — needs &mut self for context state + if call.function.name == "working_stack" { + let result = tools::working_stack::handle(&args, &mut self.context.working_stack); + let output = tools::ToolOutput { + text: result.clone(), + is_yield: false, + images: Vec::new(), + model_switch: None, + dmn_pause: false, + }; + let _ = ui_tx.send(UiMessage::ToolResult { + name: call.function.name.clone(), + result: output.text.clone(), + }); + self.active_tools.write().unwrap().retain(|t| t.id != call.id); + self.push_message(Message::tool_result(&call.id, &output.text)); + ds.had_tool_calls = true; + + // Re-render the context message so the model sees the updated stack + if !result.starts_with("Error:") { + self.refresh_context_state(); + } + return; + } + + // Dispatch through unified path + let output = + tools::dispatch(&call.function.name, &args, &self.process_tracker).await; + + self.apply_tool_result(call, output, ui_tx, ds); } - // File and execution tools - let result = match name { - "read_file" => tools::read::read_file(args), - "write_file" => tools::write::write_file(args), - "edit_file" => tools::edit::edit_file(args), - "bash" => tools::bash::run_bash(args, tracker).await, - "grep" => tools::grep::grep(args), - "glob" => tools::glob::glob_search(args), - _ => return None, - }; + /// Apply a completed tool result to conversation state. + fn apply_tool_result( + &mut self, + call: &ToolCall, + output: tools::ToolOutput, + ui_tx: &UiSender, + ds: &mut DispatchState, + ) { + let args: serde_json::Value = + serde_json::from_str(&call.function.arguments).unwrap_or_default(); - Some(match result { - Ok(s) => ToolOutput::text(s), - Err(e) => ToolOutput::error(e), - }) -} + if output.is_yield { + ds.yield_requested = true; + } else { + ds.had_tool_calls = true; + } + if output.model_switch.is_some() { + ds.model_switch = output.model_switch.clone(); + } + if output.dmn_pause { + ds.dmn_pause = true; + } + if output.text.starts_with("Error:") { + ds.tool_errors += 1; + } -/// Return all shared tool definitions. -pub fn definitions() -> Vec { - vec![ - tools::read::definition(), - tools::write::definition(), - tools::edit::definition(), - tools::bash::definition(), - tools::grep::definition(), - tools::glob::definition(), - ] -} + let _ = ui_tx.send(UiMessage::ToolResult { + name: call.function.name.clone(), + result: output.text.clone(), + }); + self.active_tools.write().unwrap().retain(|t| t.id != call.id); -/// Return all shared + memory tool definitions. -pub fn all_definitions() -> Vec { - let mut defs = definitions(); - defs.extend(tools::memory::definitions()); - defs -} + // Tag memory_render results for context deduplication + if call.function.name == "memory_render" && !output.text.starts_with("Error:") { + if let Some(key) = args.get("key").and_then(|v| v.as_str()) { + let mut msg = Message::tool_result(&call.id, &output.text); + msg.stamp(); + self.push_entry(ConversationEntry::Memory { key: key.to_string(), message: msg }); + self.publish_context_state(); + return; + } + } + + self.push_message(Message::tool_result(&call.id, &output.text)); + + if !output.images.is_empty() { + self.age_out_images(); + self.push_message(Message::user_with_images( + "Here is the image you requested:", + &output.images, + )); + } + } + + /// Build context state summary for the debug screen. + pub fn context_state_summary(&self) -> Vec { + let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); + + let mut sections = Vec::new(); + + // System prompt + sections.push(ContextSection { + name: "System prompt".into(), + tokens: count(&self.context.system_prompt), + content: self.context.system_prompt.clone(), + children: Vec::new(), + }); + + // Personality — parent with file children + let personality_children: Vec = self.context.personality.iter() + .map(|(name, content)| ContextSection { + name: name.clone(), + tokens: count(content), + content: content.clone(), + children: Vec::new(), + }) + .collect(); + let personality_tokens: usize = personality_children.iter().map(|c| c.tokens).sum(); + sections.push(ContextSection { + name: format!("Personality ({} files)", personality_children.len()), + tokens: personality_tokens, + content: String::new(), + children: personality_children, + }); + + // Journal + { + let journal_children: Vec = self.context.journal.iter() + .map(|entry| { + let preview: String = entry.content.lines() + .find(|l| !l.trim().is_empty()) + .unwrap_or("").chars().take(60).collect(); + ContextSection { + name: format!("{}: {}", entry.timestamp.format("%Y-%m-%dT%H:%M"), preview), + tokens: count(&entry.content), + content: entry.content.clone(), + children: Vec::new(), + } + }) + .collect(); + let journal_tokens: usize = journal_children.iter().map(|c| c.tokens).sum(); + sections.push(ContextSection { + name: format!("Journal ({} entries)", journal_children.len()), + tokens: journal_tokens, + content: String::new(), + children: journal_children, + }); + } + + // Working stack — instructions + items as children + let instructions = std::fs::read_to_string(working_stack_instructions_path()) + .unwrap_or_default(); + let mut stack_children = vec![ContextSection { + name: "Instructions".into(), + tokens: count(&instructions), + content: instructions, + children: Vec::new(), + }]; + for (i, item) in self.context.working_stack.iter().enumerate() { + let marker = if i == self.context.working_stack.len() - 1 { "→" } else { " " }; + stack_children.push(ContextSection { + name: format!("{} [{}] {}", marker, i, item), + tokens: count(item), + content: String::new(), + children: Vec::new(), + }); + } + let stack_tokens: usize = stack_children.iter().map(|c| c.tokens).sum(); + sections.push(ContextSection { + name: format!("Working stack ({} items)", self.context.working_stack.len()), + tokens: stack_tokens, + content: String::new(), + children: stack_children, + }); + + // Memory nodes — extracted from Memory entries in the conversation + let memory_entries: Vec<&ConversationEntry> = self.context.entries.iter() + .filter(|e| e.is_memory()) + .collect(); + if !memory_entries.is_empty() { + let node_children: Vec = memory_entries.iter() + .map(|entry| { + let key = match entry { + ConversationEntry::Memory { key, .. } => key.as_str(), + _ => unreachable!(), + }; + let text = entry.message().content_text(); + let score = self.memory_scores.as_ref() + .and_then(|s| s.memory_weights.iter() + .find(|(k, _)| k == key) + .map(|(_, v)| *v)); + let label = match score { + Some(v) => format!("{} (importance: {:.1})", key, v), + None => key.to_string(), + }; + ContextSection { + name: label, + tokens: count(text), + content: String::new(), + children: Vec::new(), + } + }) + .collect(); + let node_tokens: usize = node_children.iter().map(|c| c.tokens).sum(); + sections.push(ContextSection { + name: format!("Memory nodes ({} loaded)", memory_entries.len()), + tokens: node_tokens, + content: String::new(), + children: node_children, + }); + } + + // Conversation — each message as a child + let conv_messages = &self.context.entries; + let conv_children: Vec = conv_messages.iter().enumerate() + .map(|(i, entry)| { + let m = entry.message(); + let text = m.content.as_ref() + .map(|c| c.as_text().to_string()) + .unwrap_or_default(); + let tool_info = m.tool_calls.as_ref().map(|tc| { + tc.iter() + .map(|c| c.function.name.clone()) + .collect::>() + .join(", ") + }); + let label = if entry.is_memory() { + if let ConversationEntry::Memory { key, .. } = entry { + format!("[memory: {}]", key) + } else { unreachable!() } + } else { + match &tool_info { + Some(tools) => format!("[tool_call: {}]", tools), + None => { + let preview: String = text.chars().take(60).collect(); + let preview = preview.replace('\n', " "); + if text.len() > 60 { format!("{}...", preview) } else { preview } + } + } + }; + let tokens = count(&text); + let cfg = crate::config::get(); + let role_name = if entry.is_memory() { "mem".to_string() } else { + match m.role { + Role::Assistant => cfg.assistant_name.clone(), + Role::User => cfg.user_name.clone(), + Role::Tool => "tool".to_string(), + Role::System => "system".to_string(), + } + }; + // Show which memories were important for this response + let children = if m.role == Role::Assistant { + self.memory_scores.as_ref() + .map(|s| s.important_memories_for_entry(i)) + .unwrap_or_default() + .into_iter() + .map(|(key, score)| ContextSection { + name: format!("← {} ({:.1})", key, score), + tokens: 0, + content: String::new(), + children: Vec::new(), + }) + .collect() + } else { + Vec::new() + }; + ContextSection { + name: format!("[{}] {}: {}", i, role_name, label), + tokens, + content: text, + children, + } + }) + .collect(); + let conv_tokens: usize = conv_children.iter().map(|c| c.tokens).sum(); + sections.push(ContextSection { + name: format!("Conversation ({} messages)", conv_children.len()), + tokens: conv_tokens, + content: String::new(), + children: conv_children, + }); + + sections + } + + /// Load recent journal entries at startup for orientation. + /// Uses the same budget logic as compaction but with empty conversation. + /// Only parses the tail of the journal file (last 64KB) for speed. + fn load_startup_journal(&mut self) { + let store = match crate::store::Store::load() { + Ok(s) => s, + Err(_) => return, + }; + + // Find oldest message timestamp in conversation log + let oldest_msg_ts = self.conversation_log.as_ref() + .and_then(|log| log.oldest_timestamp()); + + // Get journal entries from the memory graph + let mut journal_nodes: Vec<_> = store.nodes.values() + .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) + .collect(); + let mut dbg = std::fs::OpenOptions::new().create(true).append(true) + .open("/tmp/poc-journal-debug.log").ok(); + macro_rules! dbg_log { + ($($arg:tt)*) => { + if let Some(ref mut f) = dbg { use std::io::Write; let _ = writeln!(f, $($arg)*); } + } + } + dbg_log!("[journal] {} nodes, oldest_msg={:?}", journal_nodes.len(), oldest_msg_ts); + + journal_nodes.sort_by_key(|n| n.created_at); + if let Some(first) = journal_nodes.first() { + dbg_log!("[journal] first created_at={}", first.created_at); + } + if let Some(last) = journal_nodes.last() { + dbg_log!("[journal] last created_at={}", last.created_at); + } + + // Find the cutoff index — entries older than conversation, plus one overlap + let cutoff_idx = if let Some(cutoff) = oldest_msg_ts { + let cutoff_ts = cutoff.timestamp(); + dbg_log!("[journal] cutoff timestamp={}", cutoff_ts); + let mut idx = journal_nodes.len(); + for (i, node) in journal_nodes.iter().enumerate() { + if node.created_at >= cutoff_ts { + idx = i + 1; + break; + } + } + idx + } else { + journal_nodes.len() + }; + dbg_log!("[journal] cutoff_idx={}", cutoff_idx); + + // Walk backwards from cutoff, accumulating entries within 15% of context + let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); + let context_window = crate::agent::context::context_window(); + let journal_budget = context_window * 15 / 100; + dbg_log!("[journal] budget={} tokens ({}*15%)", journal_budget, context_window); + + let mut entries = Vec::new(); + let mut total_tokens = 0; + + for node in journal_nodes[..cutoff_idx].iter().rev() { + let tokens = count(&node.content); + if total_tokens + tokens > journal_budget && !entries.is_empty() { + break; + } + entries.push(journal::JournalEntry { + timestamp: chrono::DateTime::from_timestamp(node.created_at, 0) + .unwrap_or_default(), + content: node.content.clone(), + }); + total_tokens += tokens; + } + entries.reverse(); + dbg_log!("[journal] loaded {} entries, {} tokens", entries.len(), total_tokens); + + if entries.is_empty() { + dbg_log!("[journal] no entries!"); + return; + } + + self.context.journal = entries; + dbg_log!("[journal] context.journal now has {} entries", self.context.journal.len()); + } + + /// Called after any change to context state (working stack, etc). + fn refresh_context_state(&mut self) { + + self.publish_context_state(); + self.save_working_stack(); + } + + /// Persist working stack to disk. + fn save_working_stack(&self) { + if let Ok(json) = serde_json::to_string(&self.context.working_stack) { + let _ = std::fs::write(working_stack_file_path(), json); + } + } + + /// Load working stack from disk. + fn load_working_stack(&mut self) { + if let Ok(data) = std::fs::read_to_string(working_stack_file_path()) { + if let Ok(stack) = serde_json::from_str::>(&data) { + self.context.working_stack = stack; + } + } + } + + /// Push the current context summary to the shared state for the TUI to read. + pub fn publish_context_state(&self) { + let summary = self.context_state_summary(); + if let Ok(mut dbg) = std::fs::OpenOptions::new().create(true).append(true) + .open("/tmp/poc-journal-debug.log") { + use std::io::Write; + for s in &summary { + let _ = writeln!(dbg, "[publish] {} ({} tokens, {} children)", s.name, s.tokens, s.children.len()); + } + } + if let Ok(mut state) = self.shared_context.write() { + *state = summary; + } + } + + /// Replace base64 image data in older messages with text placeholders. + /// Keeps the 2 most recent images live (enough for motion/comparison). + /// The tool result message before each image records what was loaded. + fn age_out_images(&mut self) { + // Find image entries newest-first, skip 1 (caller is about to add another) + let to_age: Vec = self.context.entries.iter().enumerate() + .rev() + .filter(|(_, e)| { + if let Some(MessageContent::Parts(parts)) = &e.message().content { + parts.iter().any(|p| matches!(p, ContentPart::ImageUrl { .. })) + } else { false } + }) + .map(|(i, _)| i) + .skip(1) // keep 1 existing + 1 about to be added = 2 live + .collect(); + + for i in to_age { + let msg = self.context.entries[i].message_mut(); + if let Some(MessageContent::Parts(parts)) = &msg.content { + let mut replacement = String::new(); + for part in parts { + match part { + ContentPart::Text { text } => { + if !replacement.is_empty() { replacement.push('\n'); } + replacement.push_str(text); + } + ContentPart::ImageUrl { .. } => { + if !replacement.is_empty() { replacement.push('\n'); } + replacement.push_str("[image aged out]"); + } + } + } + msg.content = Some(MessageContent::Text(replacement)); + } + } + } + + /// Strip ephemeral tool calls from the conversation history. + /// + /// Last prompt token count reported by the API. + pub fn last_prompt_tokens(&self) -> u32 { + self.last_prompt_tokens + } + + /// Rebuild the context window: reload identity, dedup, trim, reload journal. + pub fn compact(&mut self) { + // Reload identity from config + match crate::config::reload_for_model(&self.app_config, &self.prompt_file) { + Ok((system_prompt, personality)) => { + self.context.system_prompt = system_prompt; + self.context.personality = personality; + } + Err(e) => { + eprintln!("warning: failed to reload identity: {:#}", e); + } + } + + let before = self.context.entries.len(); + let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); + let before_conv = before - before_mem; + + // Dedup memory, trim to budget, reload journal + let entries = self.context.entries.clone(); + self.context.entries = crate::agent::context::trim_entries( + &self.context, + &entries, + &self.tokenizer, + ); + + let after = self.context.entries.len(); + let after_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); + let after_conv = after - after_mem; + + dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})", + before, after, before_mem, after_mem, before_conv, after_conv); + + let budget = self.budget(); + dbglog!("[compact] budget: {}", budget.status_string()); + + self.load_startup_journal(); + self.last_prompt_tokens = 0; + self.publish_context_state(); + } + + /// Restore from the conversation log. Builds the context window + /// the same way compact() does — journal summaries for old messages, + /// raw recent messages. This is the unified startup path. + /// Returns true if the log had content to restore. + pub fn restore_from_log(&mut self) -> bool { + let entries = match &self.conversation_log { + Some(log) => match log.read_tail(64 * 1024 * 1024) { + Ok(entries) if !entries.is_empty() => entries, + _ => return false, + }, + None => return false, + }; + + // Load extra — compact() will dedup, trim, reload identity + journal + let all: Vec<_> = entries.into_iter() + .filter(|e| e.message().role != Role::System) + .collect(); + let mem_count = all.iter().filter(|e| e.is_memory()).count(); + let conv_count = all.len() - mem_count; + dbglog!("[restore] loaded {} entries from log (mem: {}, conv: {})", + all.len(), mem_count, conv_count); + self.context.entries = all; + self.compact(); + // Estimate prompt tokens from budget so status bar isn't 0 on startup + let b = self.budget(); + self.last_prompt_tokens = b.used() as u32; + true + } + + /// Replace the API client (for model switching). + pub fn swap_client(&mut self, new_client: ApiClient) { + self.client = new_client; + } + + /// Get the model identifier. + pub fn model(&self) -> &str { + &self.client.model + } + + /// Get the conversation entries for persistence. + pub fn entries(&self) -> &[ConversationEntry] { + &self.context.entries + } + + /// Mutable access to conversation entries (for /retry). + pub fn client_clone(&self) -> ApiClient { + self.client.clone() + } + + pub fn entries_mut(&mut self) -> &mut Vec { + &mut self.context.entries + } -/// Return memory + journal tool definitions. -/// Used by the journal agent only. -pub fn memory_and_journal_definitions() -> Vec { - let mut defs = tools::memory::definitions(); - defs.extend(tools::memory::journal_definitions()); - defs } diff --git a/src/agent/runner.rs b/src/agent/runner.rs deleted file mode 100644 index 3292d3c..0000000 --- a/src/agent/runner.rs +++ /dev/null @@ -1,1162 +0,0 @@ -// agent.rs — Core agent loop -// -// The simplest possible implementation of the agent pattern: -// send messages + tool definitions to the model, if it responds -// with tool calls then dispatch them and loop, if it responds -// with text then display it and wait for the next prompt. -// -// Uses streaming by default so text tokens appear as they're -// generated. Tool calls are accumulated from stream deltas and -// dispatched after the stream completes. -// -// The DMN (dmn.rs) is the outer loop that decides what prompts -// to send here. This module just handles single turns: prompt -// in, response out, tool calls dispatched. - -use anyhow::Result; -use tiktoken_rs::CoreBPE; - -use crate::user::api::ApiClient; -use crate::agent::context as journal; -use crate::user::log::ConversationLog; -use crate::user::api::StreamEvent; -use crate::agent::tools; -use crate::agent::tools::ProcessTracker; -use crate::user::types::*; -use crate::user::ui_channel::{ContextSection, SharedContextState, StatusInfo, StreamTarget, UiMessage, UiSender}; - -/// Result of a single agent turn. -pub struct TurnResult { - /// The text response (already sent through UI channel). - #[allow(dead_code)] - pub text: String, - /// Whether the model called yield_to_user during this turn. - pub yield_requested: bool, - /// Whether any tools (other than yield_to_user) were called. - pub had_tool_calls: bool, - /// Number of tool calls that returned errors this turn. - pub tool_errors: u32, - /// Model name to switch to after this turn completes. - pub model_switch: Option, - /// Agent requested DMN pause (full stop on autonomous behavior). - pub dmn_pause: bool, -} - -/// Accumulated state across tool dispatches within a single turn. -struct DispatchState { - yield_requested: bool, - had_tool_calls: bool, - tool_errors: u32, - model_switch: Option, - dmn_pause: bool, -} - -pub struct Agent { - client: ApiClient, - tool_defs: Vec, - /// Last known prompt token count from the API (tracks context size). - last_prompt_tokens: u32, - /// Shared process tracker for bash tool — lets TUI show/kill running commands. - pub process_tracker: ProcessTracker, - /// Current reasoning effort level ("none", "low", "high"). - pub reasoning_effort: String, - /// Persistent conversation log — append-only record of all messages. - conversation_log: Option, - /// BPE tokenizer for token counting (cl100k_base — close enough - /// for Claude and Qwen budget allocation, ~85-90% count accuracy). - tokenizer: CoreBPE, - /// Mutable context state — personality, working stack, etc. - pub context: ContextState, - /// Shared live context summary — TUI reads this directly for debug screen. - pub shared_context: SharedContextState, - /// App config — used to reload identity on compaction. - app_config: crate::config::AppConfig, - pub prompt_file: String, - /// Stable session ID for memory-search dedup across turns. - session_id: String, - /// Agent orchestration state (surface-observe, journal, reflect). - pub agent_cycles: crate::subconscious::subconscious::AgentCycleState, - /// Latest memory importance scores from training scorer. - pub memory_scores: Option, - /// Whether a /score task is currently running. - pub scoring_in_flight: bool, - /// Shared active tools — Agent writes, TUI reads. - pub active_tools: crate::user::ui_channel::SharedActiveTools, - /// Background tool calls that outlive the current turn. - background_tasks: futures::stream::FuturesUnordered< - std::pin::Pin + Send>> - >, -} - -fn render_journal(entries: &[journal::JournalEntry]) -> String { - if entries.is_empty() { return String::new(); } - let mut text = String::from("[Earlier — from your journal]\n\n"); - for entry in entries { - use std::fmt::Write; - writeln!(text, "## {}\n{}\n", entry.timestamp.format("%Y-%m-%dT%H:%M"), entry.content).ok(); - } - text -} - -impl Agent { - pub fn new( - client: ApiClient, - system_prompt: String, - personality: Vec<(String, String)>, - app_config: crate::config::AppConfig, - prompt_file: String, - conversation_log: Option, - shared_context: SharedContextState, - active_tools: crate::user::ui_channel::SharedActiveTools, - ) -> Self { - let tool_defs = tools::definitions(); - let tokenizer = tiktoken_rs::cl100k_base() - .expect("failed to load cl100k_base tokenizer"); - - let context = ContextState { - system_prompt: system_prompt.clone(), - personality, - journal: Vec::new(), - working_stack: Vec::new(), - entries: Vec::new(), - }; - let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S")); - let agent_cycles = crate::subconscious::subconscious::AgentCycleState::new(&session_id); - let mut agent = Self { - client, - tool_defs, - last_prompt_tokens: 0, - process_tracker: ProcessTracker::new(), - reasoning_effort: "none".to_string(), - conversation_log, - tokenizer, - context, - shared_context, - app_config, - prompt_file, - session_id, - agent_cycles, - memory_scores: None, - scoring_in_flight: false, - active_tools, - background_tasks: futures::stream::FuturesUnordered::new(), - }; - - agent.load_startup_journal(); - agent.load_working_stack(); - agent.publish_context_state(); - agent - } - - /// Assemble the full message list for the API call from typed sources. - /// System prompt + personality context + journal + conversation messages. - fn assemble_api_messages(&self) -> Vec { - let mut msgs = Vec::new(); - msgs.push(Message::system(&self.context.system_prompt)); - let ctx = self.context.render_context_message(); - if !ctx.is_empty() { - msgs.push(Message::user(ctx)); - } - let jnl = render_journal(&self.context.journal); - if !jnl.is_empty() { - msgs.push(Message::user(jnl)); - } - msgs.extend(self.context.entries.iter().map(|e| e.api_message().clone())); - msgs - } - - /// Run agent orchestration cycle, returning structured output. - fn run_agent_cycle(&mut self) -> crate::subconscious::subconscious::AgentCycleOutput { - let transcript_path = self.conversation_log.as_ref() - .map(|l| l.path().to_string_lossy().to_string()) - .unwrap_or_default(); - - let session = crate::session::HookSession::from_fields( - self.session_id.clone(), - transcript_path, - "UserPromptSubmit".into(), - ); - - self.agent_cycles.trigger(&session); - std::mem::take(&mut self.agent_cycles.last_output) - } - - /// Push a conversation message — stamped and logged. - fn push_message(&mut self, mut msg: Message) { - msg.stamp(); - let entry = ConversationEntry::Message(msg); - self.push_entry(entry); - } - - fn push_entry(&mut self, entry: ConversationEntry) { - if let Some(ref log) = self.conversation_log { - if let Err(e) = log.append(&entry) { - eprintln!("warning: failed to log entry: {:#}", e); - } - } - self.context.entries.push(entry); - } - - /// Push a context-only message (system prompt, identity context, - /// journal summaries). Not logged — these are reconstructed on - /// every startup/compaction. - pub fn budget(&self) -> ContextBudget { - let count_str = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); - let count_msg = |m: &Message| crate::agent::context::msg_token_count(&self.tokenizer, m); - let window = crate::agent::context::context_window(); - self.context.budget(&count_str, &count_msg, window) - } - - /// Send a user message and run the agent loop until the model - /// produces a text response (no more tool calls). Streams text - /// and tool activity through the UI channel. - pub async fn turn( - &mut self, - user_input: &str, - ui_tx: &UiSender, - target: StreamTarget, - ) -> Result { - // Run agent orchestration cycle (surface-observe, reflect, journal) - let cycle = self.run_agent_cycle(); - - // Surfaced memories — each as a separate Memory entry - for key in &cycle.surfaced_keys { - if let Some(rendered) = crate::cli::node::render_node( - &crate::store::Store::load().unwrap_or_default(), key, - ) { - let mut msg = Message::user(format!( - "\n--- {} (surfaced) ---\n{}\n", - key, rendered, - )); - msg.stamp(); - self.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); - } - } - - // Reflection — separate system reminder - if let Some(ref reflection) = cycle.reflection { - self.push_message(Message::user(format!( - "\n--- subconscious reflection ---\n{}\n", - reflection.trim(), - ))); - } - - // Inject completed background task results - { - use futures::{StreamExt, FutureExt}; - let mut bg_ds = DispatchState { - yield_requested: false, had_tool_calls: false, - tool_errors: 0, model_switch: None, dmn_pause: false, - }; - while let Some(Some((call, output))) = - std::pin::Pin::new(&mut self.background_tasks).next().now_or_never() - { - // Show result in TUI and inject into conversation - self.apply_tool_result(&call, output, ui_tx, &mut bg_ds); - } - } - - // User input — clean, just what was typed - self.push_message(Message::user(user_input)); - let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots())); - - let mut overflow_retries: u32 = 0; - let mut empty_retries: u32 = 0; - let mut ds = DispatchState { - yield_requested: false, - had_tool_calls: false, - tool_errors: 0, - model_switch: None, - dmn_pause: false, - }; - - loop { - let _ = ui_tx.send(UiMessage::Activity("thinking...".into())); - - // Stream events from the API — we route each event to the - // appropriate UI pane rather than letting the API layer do it. - let api_messages = self.assemble_api_messages(); - let (mut rx, _stream_guard) = self.client.start_stream( - &api_messages, - Some(&self.tool_defs), - ui_tx, - &self.reasoning_effort, - None, - None, // priority: interactive - ); - - let mut content = String::new(); - let mut tool_calls: Vec = Vec::new(); - let mut usage = None; - let mut finish_reason = None; - let mut in_tool_call = false; - let mut tool_call_buf = String::new(); - let mut stream_error = None; - let mut first_content = true; - // Tool calls fired during streaming (XML path) - let mut inflight: futures::stream::FuturesOrdered< - std::pin::Pin + Send>> - > = futures::stream::FuturesOrdered::new(); - // Buffer for content not yet sent to UI — holds a tail - // that might be a partial tag. - let mut display_buf = String::new(); - - while let Some(event) = rx.recv().await { - match event { - StreamEvent::Content(text) => { - if first_content { - let _ = ui_tx.send(UiMessage::Activity("streaming...".into())); - first_content = false; - } - content.push_str(&text); - - if in_tool_call { - tool_call_buf.push_str(&text); - // Check for closing tag — parse and fire immediately - if let Some(end) = tool_call_buf.find("") { - let body = &tool_call_buf[..end]; - if let Some(call) = crate::user::parsing::parse_tool_call_body(body) { - let args: serde_json::Value = - serde_json::from_str(&call.function.arguments).unwrap_or_default(); - let args_summary = summarize_args(&call.function.name, &args); - let _ = ui_tx.send(UiMessage::ToolCall { - name: call.function.name.clone(), - args_summary: args_summary.clone(), - }); - self.active_tools.write().unwrap().push( - crate::user::ui_channel::ActiveTool { - id: call.id.clone(), - name: call.function.name.clone(), - detail: args_summary, - started: std::time::Instant::now(), - } - ); - let tracker = self.process_tracker.clone(); - let is_background = args.get("run_in_background") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - let future = Box::pin(async move { - let output = tools::dispatch(&call.function.name, &args, &tracker).await; - (call, output) - }); - if is_background { - self.background_tasks.push(future); - } else { - inflight.push_back(future); - } - } - // Reset for potential next tool call - let remaining = tool_call_buf[end + "".len()..].to_string(); - tool_call_buf.clear(); - in_tool_call = false; - // Any content after goes back to display - if !remaining.trim().is_empty() { - display_buf.push_str(&remaining); - } - } - } else { - display_buf.push_str(&text); - - if let Some(pos) = display_buf.find("") { - // Flush content before the tag, suppress the rest. - let before = &display_buf[..pos]; - if !before.is_empty() { - let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target)); - } - display_buf.clear(); - in_tool_call = true; - } else { - // Flush display_buf except a tail that could be - // a partial "" (10 chars). - let safe = display_buf.len().saturating_sub(10); - // Find a char boundary at or before safe - let safe = display_buf.floor_char_boundary(safe); - if safe > 0 { - let flush = display_buf[..safe].to_string(); - display_buf = display_buf[safe..].to_string(); - let _ = ui_tx.send(UiMessage::TextDelta(flush, target)); - } - } - } - } - StreamEvent::Reasoning(text) => { - let _ = ui_tx.send(UiMessage::Reasoning(text)); - } - StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => { - while tool_calls.len() <= index { - tool_calls.push(ToolCall { - id: String::new(), - call_type: "function".to_string(), - function: FunctionCall { name: String::new(), arguments: String::new() }, - }); - } - if let Some(id) = id { tool_calls[index].id = id; } - if let Some(ct) = call_type { tool_calls[index].call_type = ct; } - if let Some(n) = name { tool_calls[index].function.name = n; } - if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); } - } - StreamEvent::Usage(u) => usage = Some(u), - StreamEvent::Finished { reason, .. } => { - finish_reason = Some(reason); - break; - } - StreamEvent::Error(e) => { - stream_error = Some(e); - break; - } - } - } - - // Handle stream errors with retry logic - if let Some(e) = stream_error { - let err = anyhow::anyhow!("{}", e); - if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { - overflow_retries += 1; - let _ = ui_tx.send(UiMessage::Info(format!( - "[context overflow — compacting and retrying ({}/2)]", - overflow_retries, - ))); - self.compact(); - continue; - } - if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { - empty_retries += 1; - let _ = ui_tx.send(UiMessage::Info(format!( - "[stream error: {} — retrying ({}/2)]", - e, empty_retries, - ))); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - continue; - } - let _ = ui_tx.send(UiMessage::Activity(String::new())); - return Err(err); - } - - if finish_reason.as_deref() == Some("error") { - let detail = if content.is_empty() { "no details".into() } else { content }; - let _ = ui_tx.send(UiMessage::Activity(String::new())); - return Err(anyhow::anyhow!("model stream error: {}", detail)); - } - - // Flush remaining display buffer (normal responses without tool calls). - if !in_tool_call && !display_buf.is_empty() { - let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target)); - } - if !content.is_empty() && !in_tool_call { - let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); - } - - let msg = crate::user::api::build_response_message(content, tool_calls); - - - - if let Some(usage) = &usage { - self.last_prompt_tokens = usage.prompt_tokens; - - self.publish_context_state(); - let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo { - dmn_state: String::new(), // filled by main loop - dmn_turns: 0, - dmn_max_turns: 0, - prompt_tokens: usage.prompt_tokens, - completion_tokens: usage.completion_tokens, - model: self.client.model.clone(), - turn_tools: 0, // tracked by TUI from ToolCall messages - context_budget: self.budget().status_string(), - })); - } - - // Empty response — model returned finish=stop with no content - // or tool calls. Inject a nudge so the retry has different input. - let has_content = msg.content.is_some(); - let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty()); - if !has_content && !has_tools { - if empty_retries < 2 { - empty_retries += 1; - let _ = ui_tx.send(UiMessage::Debug(format!( - "empty response, injecting nudge and retrying ({}/2)", - empty_retries, - ))); - self.push_message(Message::user( - "[system] Your previous response was empty. \ - Please respond with text or use a tool." - )); - continue; - } - // After max retries, fall through — return the empty response - } else { - empty_retries = 0; - } - - // Collect tool calls that were fired during streaming - if !inflight.is_empty() { - use futures::StreamExt; - self.push_message(msg.clone()); - while let Some((call, output)) = inflight.next().await { - self.apply_tool_result(&call, output, ui_tx, &mut ds); - } - self.publish_context_state(); - continue; - } - - // Tool calls (structured API path — not fired during stream). - if let Some(ref tool_calls) = msg.tool_calls { - if !tool_calls.is_empty() { - self.push_message(msg.clone()); - - for call in tool_calls { - self.dispatch_tool_call(call, None, ui_tx, &mut ds) - .await; - } - continue; - } - } - - // Genuinely text-only response - let text = msg.content_text().to_string(); - let _ = ui_tx.send(UiMessage::Activity(String::new())); - self.push_message(msg); - - return Ok(TurnResult { - text, - yield_requested: ds.yield_requested, - had_tool_calls: ds.had_tool_calls, - tool_errors: ds.tool_errors, - model_switch: ds.model_switch, - dmn_pause: ds.dmn_pause, - }); - } - } - - /// Dispatch a single tool call: send UI annotations, run the tool, - /// push results into the conversation, handle images. - async fn dispatch_tool_call( - &mut self, - call: &ToolCall, - tag: Option<&str>, - ui_tx: &UiSender, - ds: &mut DispatchState, - ) { - let args: serde_json::Value = - serde_json::from_str(&call.function.arguments).unwrap_or_default(); - - let args_summary = summarize_args(&call.function.name, &args); - let label = match tag { - Some(t) => format!("calling: {} ({})", call.function.name, t), - None => format!("calling: {}", call.function.name), - }; - let _ = ui_tx.send(UiMessage::Activity(label)); - let _ = ui_tx.send(UiMessage::ToolCall { - name: call.function.name.clone(), - args_summary: args_summary.clone(), - }); - self.active_tools.write().unwrap().push( - crate::user::ui_channel::ActiveTool { - id: call.id.clone(), - name: call.function.name.clone(), - detail: args_summary, - started: std::time::Instant::now(), - } - ); - - // Handle working_stack tool — needs &mut self for context state - if call.function.name == "working_stack" { - let result = tools::working_stack::handle(&args, &mut self.context.working_stack); - let output = tools::ToolOutput { - text: result.clone(), - is_yield: false, - images: Vec::new(), - model_switch: None, - dmn_pause: false, - }; - let _ = ui_tx.send(UiMessage::ToolResult { - name: call.function.name.clone(), - result: output.text.clone(), - }); - self.active_tools.write().unwrap().retain(|t| t.id != call.id); - self.push_message(Message::tool_result(&call.id, &output.text)); - ds.had_tool_calls = true; - - // Re-render the context message so the model sees the updated stack - if !result.starts_with("Error:") { - self.refresh_context_state(); - } - return; - } - - // Dispatch through unified path - let output = - tools::dispatch(&call.function.name, &args, &self.process_tracker).await; - - self.apply_tool_result(call, output, ui_tx, ds); - } - - /// Apply a completed tool result to conversation state. - fn apply_tool_result( - &mut self, - call: &ToolCall, - output: tools::ToolOutput, - ui_tx: &UiSender, - ds: &mut DispatchState, - ) { - let args: serde_json::Value = - serde_json::from_str(&call.function.arguments).unwrap_or_default(); - - if output.is_yield { - ds.yield_requested = true; - } else { - ds.had_tool_calls = true; - } - if output.model_switch.is_some() { - ds.model_switch = output.model_switch.clone(); - } - if output.dmn_pause { - ds.dmn_pause = true; - } - if output.text.starts_with("Error:") { - ds.tool_errors += 1; - } - - let _ = ui_tx.send(UiMessage::ToolResult { - name: call.function.name.clone(), - result: output.text.clone(), - }); - self.active_tools.write().unwrap().retain(|t| t.id != call.id); - - // Tag memory_render results for context deduplication - if call.function.name == "memory_render" && !output.text.starts_with("Error:") { - if let Some(key) = args.get("key").and_then(|v| v.as_str()) { - let mut msg = Message::tool_result(&call.id, &output.text); - msg.stamp(); - self.push_entry(ConversationEntry::Memory { key: key.to_string(), message: msg }); - self.publish_context_state(); - return; - } - } - - self.push_message(Message::tool_result(&call.id, &output.text)); - - if !output.images.is_empty() { - self.age_out_images(); - self.push_message(Message::user_with_images( - "Here is the image you requested:", - &output.images, - )); - } - } - - /// Build context state summary for the debug screen. - pub fn context_state_summary(&self) -> Vec { - let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); - - let mut sections = Vec::new(); - - // System prompt - sections.push(ContextSection { - name: "System prompt".into(), - tokens: count(&self.context.system_prompt), - content: self.context.system_prompt.clone(), - children: Vec::new(), - }); - - // Personality — parent with file children - let personality_children: Vec = self.context.personality.iter() - .map(|(name, content)| ContextSection { - name: name.clone(), - tokens: count(content), - content: content.clone(), - children: Vec::new(), - }) - .collect(); - let personality_tokens: usize = personality_children.iter().map(|c| c.tokens).sum(); - sections.push(ContextSection { - name: format!("Personality ({} files)", personality_children.len()), - tokens: personality_tokens, - content: String::new(), - children: personality_children, - }); - - // Journal - { - let journal_children: Vec = self.context.journal.iter() - .map(|entry| { - let preview: String = entry.content.lines() - .find(|l| !l.trim().is_empty()) - .unwrap_or("").chars().take(60).collect(); - ContextSection { - name: format!("{}: {}", entry.timestamp.format("%Y-%m-%dT%H:%M"), preview), - tokens: count(&entry.content), - content: entry.content.clone(), - children: Vec::new(), - } - }) - .collect(); - let journal_tokens: usize = journal_children.iter().map(|c| c.tokens).sum(); - sections.push(ContextSection { - name: format!("Journal ({} entries)", journal_children.len()), - tokens: journal_tokens, - content: String::new(), - children: journal_children, - }); - } - - // Working stack — instructions + items as children - let instructions = std::fs::read_to_string(working_stack_instructions_path()) - .unwrap_or_default(); - let mut stack_children = vec![ContextSection { - name: "Instructions".into(), - tokens: count(&instructions), - content: instructions, - children: Vec::new(), - }]; - for (i, item) in self.context.working_stack.iter().enumerate() { - let marker = if i == self.context.working_stack.len() - 1 { "→" } else { " " }; - stack_children.push(ContextSection { - name: format!("{} [{}] {}", marker, i, item), - tokens: count(item), - content: String::new(), - children: Vec::new(), - }); - } - let stack_tokens: usize = stack_children.iter().map(|c| c.tokens).sum(); - sections.push(ContextSection { - name: format!("Working stack ({} items)", self.context.working_stack.len()), - tokens: stack_tokens, - content: String::new(), - children: stack_children, - }); - - // Memory nodes — extracted from Memory entries in the conversation - let memory_entries: Vec<&ConversationEntry> = self.context.entries.iter() - .filter(|e| e.is_memory()) - .collect(); - if !memory_entries.is_empty() { - let node_children: Vec = memory_entries.iter() - .map(|entry| { - let key = match entry { - ConversationEntry::Memory { key, .. } => key.as_str(), - _ => unreachable!(), - }; - let text = entry.message().content_text(); - let score = self.memory_scores.as_ref() - .and_then(|s| s.memory_weights.iter() - .find(|(k, _)| k == key) - .map(|(_, v)| *v)); - let label = match score { - Some(v) => format!("{} (importance: {:.1})", key, v), - None => key.to_string(), - }; - ContextSection { - name: label, - tokens: count(text), - content: String::new(), - children: Vec::new(), - } - }) - .collect(); - let node_tokens: usize = node_children.iter().map(|c| c.tokens).sum(); - sections.push(ContextSection { - name: format!("Memory nodes ({} loaded)", memory_entries.len()), - tokens: node_tokens, - content: String::new(), - children: node_children, - }); - } - - // Conversation — each message as a child - let conv_messages = &self.context.entries; - let conv_children: Vec = conv_messages.iter().enumerate() - .map(|(i, entry)| { - let m = entry.message(); - let text = m.content.as_ref() - .map(|c| c.as_text().to_string()) - .unwrap_or_default(); - let tool_info = m.tool_calls.as_ref().map(|tc| { - tc.iter() - .map(|c| c.function.name.clone()) - .collect::>() - .join(", ") - }); - let label = if entry.is_memory() { - if let ConversationEntry::Memory { key, .. } = entry { - format!("[memory: {}]", key) - } else { unreachable!() } - } else { - match &tool_info { - Some(tools) => format!("[tool_call: {}]", tools), - None => { - let preview: String = text.chars().take(60).collect(); - let preview = preview.replace('\n', " "); - if text.len() > 60 { format!("{}...", preview) } else { preview } - } - } - }; - let tokens = count(&text); - let cfg = crate::config::get(); - let role_name = if entry.is_memory() { "mem".to_string() } else { - match m.role { - Role::Assistant => cfg.assistant_name.clone(), - Role::User => cfg.user_name.clone(), - Role::Tool => "tool".to_string(), - Role::System => "system".to_string(), - } - }; - // Show which memories were important for this response - let children = if m.role == Role::Assistant { - self.memory_scores.as_ref() - .map(|s| s.important_memories_for_entry(i)) - .unwrap_or_default() - .into_iter() - .map(|(key, score)| ContextSection { - name: format!("← {} ({:.1})", key, score), - tokens: 0, - content: String::new(), - children: Vec::new(), - }) - .collect() - } else { - Vec::new() - }; - ContextSection { - name: format!("[{}] {}: {}", i, role_name, label), - tokens, - content: text, - children, - } - }) - .collect(); - let conv_tokens: usize = conv_children.iter().map(|c| c.tokens).sum(); - sections.push(ContextSection { - name: format!("Conversation ({} messages)", conv_children.len()), - tokens: conv_tokens, - content: String::new(), - children: conv_children, - }); - - sections - } - - /// Load recent journal entries at startup for orientation. - /// Uses the same budget logic as compaction but with empty conversation. - /// Only parses the tail of the journal file (last 64KB) for speed. - fn load_startup_journal(&mut self) { - let store = match crate::store::Store::load() { - Ok(s) => s, - Err(_) => return, - }; - - // Find oldest message timestamp in conversation log - let oldest_msg_ts = self.conversation_log.as_ref() - .and_then(|log| log.oldest_timestamp()); - - // Get journal entries from the memory graph - let mut journal_nodes: Vec<_> = store.nodes.values() - .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) - .collect(); - let mut dbg = std::fs::OpenOptions::new().create(true).append(true) - .open("/tmp/poc-journal-debug.log").ok(); - macro_rules! dbg_log { - ($($arg:tt)*) => { - if let Some(ref mut f) = dbg { use std::io::Write; let _ = writeln!(f, $($arg)*); } - } - } - dbg_log!("[journal] {} nodes, oldest_msg={:?}", journal_nodes.len(), oldest_msg_ts); - - journal_nodes.sort_by_key(|n| n.created_at); - if let Some(first) = journal_nodes.first() { - dbg_log!("[journal] first created_at={}", first.created_at); - } - if let Some(last) = journal_nodes.last() { - dbg_log!("[journal] last created_at={}", last.created_at); - } - - // Find the cutoff index — entries older than conversation, plus one overlap - let cutoff_idx = if let Some(cutoff) = oldest_msg_ts { - let cutoff_ts = cutoff.timestamp(); - dbg_log!("[journal] cutoff timestamp={}", cutoff_ts); - let mut idx = journal_nodes.len(); - for (i, node) in journal_nodes.iter().enumerate() { - if node.created_at >= cutoff_ts { - idx = i + 1; - break; - } - } - idx - } else { - journal_nodes.len() - }; - dbg_log!("[journal] cutoff_idx={}", cutoff_idx); - - // Walk backwards from cutoff, accumulating entries within 15% of context - let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); - let context_window = crate::agent::context::context_window(); - let journal_budget = context_window * 15 / 100; - dbg_log!("[journal] budget={} tokens ({}*15%)", journal_budget, context_window); - - let mut entries = Vec::new(); - let mut total_tokens = 0; - - for node in journal_nodes[..cutoff_idx].iter().rev() { - let tokens = count(&node.content); - if total_tokens + tokens > journal_budget && !entries.is_empty() { - break; - } - entries.push(journal::JournalEntry { - timestamp: chrono::DateTime::from_timestamp(node.created_at, 0) - .unwrap_or_default(), - content: node.content.clone(), - }); - total_tokens += tokens; - } - entries.reverse(); - dbg_log!("[journal] loaded {} entries, {} tokens", entries.len(), total_tokens); - - if entries.is_empty() { - dbg_log!("[journal] no entries!"); - return; - } - - self.context.journal = entries; - dbg_log!("[journal] context.journal now has {} entries", self.context.journal.len()); - } - - /// Called after any change to context state (working stack, etc). - fn refresh_context_state(&mut self) { - - self.publish_context_state(); - self.save_working_stack(); - } - - /// Persist working stack to disk. - fn save_working_stack(&self) { - if let Ok(json) = serde_json::to_string(&self.context.working_stack) { - let _ = std::fs::write(working_stack_file_path(), json); - } - } - - /// Load working stack from disk. - fn load_working_stack(&mut self) { - if let Ok(data) = std::fs::read_to_string(working_stack_file_path()) { - if let Ok(stack) = serde_json::from_str::>(&data) { - self.context.working_stack = stack; - } - } - } - - /// Push the current context summary to the shared state for the TUI to read. - pub fn publish_context_state(&self) { - let summary = self.context_state_summary(); - if let Ok(mut dbg) = std::fs::OpenOptions::new().create(true).append(true) - .open("/tmp/poc-journal-debug.log") { - use std::io::Write; - for s in &summary { - let _ = writeln!(dbg, "[publish] {} ({} tokens, {} children)", s.name, s.tokens, s.children.len()); - } - } - if let Ok(mut state) = self.shared_context.write() { - *state = summary; - } - } - - /// Replace base64 image data in older messages with text placeholders. - /// Keeps the 2 most recent images live (enough for motion/comparison). - /// The tool result message before each image records what was loaded. - fn age_out_images(&mut self) { - // Find image entries newest-first, skip 1 (caller is about to add another) - let to_age: Vec = self.context.entries.iter().enumerate() - .rev() - .filter(|(_, e)| { - if let Some(MessageContent::Parts(parts)) = &e.message().content { - parts.iter().any(|p| matches!(p, ContentPart::ImageUrl { .. })) - } else { false } - }) - .map(|(i, _)| i) - .skip(1) // keep 1 existing + 1 about to be added = 2 live - .collect(); - - for i in to_age { - let msg = self.context.entries[i].message_mut(); - if let Some(MessageContent::Parts(parts)) = &msg.content { - let mut replacement = String::new(); - for part in parts { - match part { - ContentPart::Text { text } => { - if !replacement.is_empty() { replacement.push('\n'); } - replacement.push_str(text); - } - ContentPart::ImageUrl { .. } => { - if !replacement.is_empty() { replacement.push('\n'); } - replacement.push_str("[image aged out]"); - } - } - } - msg.content = Some(MessageContent::Text(replacement)); - } - } - } - - /// Strip ephemeral tool calls from the conversation history. - /// - /// Last prompt token count reported by the API. - pub fn last_prompt_tokens(&self) -> u32 { - self.last_prompt_tokens - } - - /// Rebuild the context window: reload identity, dedup, trim, reload journal. - pub fn compact(&mut self) { - // Reload identity from config - match crate::config::reload_for_model(&self.app_config, &self.prompt_file) { - Ok((system_prompt, personality)) => { - self.context.system_prompt = system_prompt; - self.context.personality = personality; - } - Err(e) => { - eprintln!("warning: failed to reload identity: {:#}", e); - } - } - - let before = self.context.entries.len(); - let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); - let before_conv = before - before_mem; - - // Dedup memory, trim to budget, reload journal - let entries = self.context.entries.clone(); - self.context.entries = crate::agent::context::trim_entries( - &self.context, - &entries, - &self.tokenizer, - ); - - let after = self.context.entries.len(); - let after_mem = self.context.entries.iter().filter(|e| e.is_memory()).count(); - let after_conv = after - after_mem; - - dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})", - before, after, before_mem, after_mem, before_conv, after_conv); - - let budget = self.budget(); - dbglog!("[compact] budget: {}", budget.status_string()); - - self.load_startup_journal(); - self.last_prompt_tokens = 0; - self.publish_context_state(); - } - - /// Restore from the conversation log. Builds the context window - /// the same way compact() does — journal summaries for old messages, - /// raw recent messages. This is the unified startup path. - /// Returns true if the log had content to restore. - pub fn restore_from_log(&mut self) -> bool { - let entries = match &self.conversation_log { - Some(log) => match log.read_tail(64 * 1024 * 1024) { - Ok(entries) if !entries.is_empty() => entries, - _ => return false, - }, - None => return false, - }; - - // Load extra — compact() will dedup, trim, reload identity + journal - let all: Vec<_> = entries.into_iter() - .filter(|e| e.message().role != Role::System) - .collect(); - let mem_count = all.iter().filter(|e| e.is_memory()).count(); - let conv_count = all.len() - mem_count; - dbglog!("[restore] loaded {} entries from log (mem: {}, conv: {})", - all.len(), mem_count, conv_count); - self.context.entries = all; - self.compact(); - // Estimate prompt tokens from budget so status bar isn't 0 on startup - let b = self.budget(); - self.last_prompt_tokens = b.used() as u32; - true - } - - /// Replace the API client (for model switching). - pub fn swap_client(&mut self, new_client: ApiClient) { - self.client = new_client; - } - - /// Get the model identifier. - pub fn model(&self) -> &str { - &self.client.model - } - - /// Get the conversation entries for persistence. - pub fn entries(&self) -> &[ConversationEntry] { - &self.context.entries - } - - /// Mutable access to conversation entries (for /retry). - pub fn client_clone(&self) -> ApiClient { - self.client.clone() - } - - pub fn entries_mut(&mut self) -> &mut Vec { - &mut self.context.entries - } - -} - -// Context window building, token counting, and error classification -// live in context.rs - - -/// Create a short summary of tool args for the tools pane header. -fn summarize_args(tool_name: &str, args: &serde_json::Value) -> String { - match tool_name { - "read_file" | "write_file" | "edit_file" => args["file_path"] - .as_str() - .unwrap_or("") - .to_string(), - "bash" => { - let cmd = args["command"].as_str().unwrap_or(""); - if cmd.len() > 60 { - let end = cmd.char_indices() - .map(|(i, _)| i) - .take_while(|&i| i <= 60) - .last() - .unwrap_or(0); - format!("{}...", &cmd[..end]) - } else { - cmd.to_string() - } - } - "grep" => { - let pattern = args["pattern"].as_str().unwrap_or(""); - let path = args["path"].as_str().unwrap_or("."); - format!("{} in {}", pattern, path) - } - "glob" => args["pattern"] - .as_str() - .unwrap_or("") - .to_string(), - "view_image" => { - if let Some(pane) = args["pane_id"].as_str() { - format!("pane {}", pane) - } else { - args["file_path"].as_str().unwrap_or("").to_string() - } - } - "journal" => { - let entry = args["entry"].as_str().unwrap_or(""); - if entry.len() > 60 { - format!("{}...", &entry[..60]) - } else { - entry.to_string() - } - } - "yield_to_user" => args["message"] - .as_str() - .unwrap_or("") - .to_string(), - "switch_model" => args["model"] - .as_str() - .unwrap_or("") - .to_string(), - "pause" => String::new(), - _ => String::new(), - } -} - -// Parsing functions (parse_leaked_tool_calls, strip_leaked_artifacts) -// and their tests live in parsing.rs diff --git a/src/agent/tools/bash.rs b/src/agent/tools/bash.rs index fa75044..8482335 100644 --- a/src/agent/tools/bash.rs +++ b/src/agent/tools/bash.rs @@ -10,12 +10,9 @@ use anyhow::{Context, Result}; use serde::Deserialize; use serde_json::json; use std::process::Stdio; -use std::sync::Arc; -use std::time::Instant; use tokio::io::AsyncReadExt; -use tokio::sync::Mutex; -use super::ToolDef; +use super::{ToolDef, ProcessTracker, default_timeout}; #[derive(Deserialize)] struct Args { @@ -24,63 +21,6 @@ struct Args { timeout_secs: u64, } -fn default_timeout() -> u64 { 120 } - -/// Info about a running child process, visible to the TUI. -#[derive(Debug, Clone)] -pub struct ProcessInfo { - pub pid: u32, - pub command: String, - pub started: Instant, -} - -/// Shared tracker for running child processes. Allows the TUI to -/// display what's running and kill processes by PID. -#[derive(Debug, Clone, Default)] -pub struct ProcessTracker { - inner: Arc>>, -} - -impl ProcessTracker { - pub fn new() -> Self { - Self::default() - } - - async fn register(&self, pid: u32, command: &str) { - self.inner.lock().await.push(ProcessInfo { - pid, - command: if command.len() > 120 { - format!("{}...", &command[..120]) - } else { - command.to_string() - }, - started: Instant::now(), - }); - } - - async fn unregister(&self, pid: u32) { - self.inner.lock().await.retain(|p| p.pid != pid); - } - - /// Snapshot of currently running processes. - pub async fn list(&self) -> Vec { - self.inner.lock().await.clone() - } - - /// Kill a process by PID. Returns true if the signal was sent. - pub async fn kill(&self, pid: u32) -> bool { - // SIGTERM the process group (negative PID kills the group) - let ret = unsafe { libc::kill(-(pid as i32), libc::SIGTERM) }; - if ret != 0 { - // Try just the process - unsafe { libc::kill(pid as i32, libc::SIGTERM) }; - } - // Don't unregister — let the normal exit path do that - // so the tool result says "killed by user" - true - } -} - pub fn definition() -> ToolDef { ToolDef::new( "bash", diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index ad1955c..0392113 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -18,8 +18,161 @@ mod control; mod vision; pub mod working_stack; -// Re-export -pub use crate::agent::{ToolDef, ToolOutput, ProcessTracker, truncate_output}; +use serde::{Serialize, Deserialize}; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::Mutex; + +fn default_timeout() -> u64 { 120 } + +/// Function call within a tool call — name + JSON arguments. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FunctionCall { + pub name: String, + pub arguments: String, +} + +/// Function definition for tool schema. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FunctionDef { + pub name: String, + pub description: String, + pub parameters: serde_json::Value, +} + +/// Partial function call within a streaming delta. +#[derive(Debug, Deserialize)] +pub struct FunctionCallDelta { + pub name: Option, + pub arguments: Option, +} + +/// Tool definition sent to the model. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ToolDef { + #[serde(rename = "type")] + pub tool_type: String, + pub function: FunctionDef, +} + +/// A tool call requested by the model. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ToolCall { + pub id: String, + #[serde(rename = "type")] + pub call_type: String, + pub function: FunctionCall, +} + +/// A partial tool call within a streaming delta. The first chunk for a +/// given tool call carries the id and function name; subsequent chunks +/// carry argument fragments. +#[derive(Debug, Deserialize)] +pub struct ToolCallDelta { + pub index: usize, + pub id: Option, + #[serde(rename = "type")] + pub call_type: Option, + pub function: Option, +} + +/// Result of dispatching a tool call. +pub struct ToolOutput { + pub text: String, + pub is_yield: bool, + /// Base64 data URIs for images to attach to the next message. + pub images: Vec, + /// Model name to switch to (deferred to session level). + pub model_switch: Option, + /// Agent requested DMN pause (deferred to session level). + pub dmn_pause: bool, +} + +impl ToolOutput { + pub fn error(e: impl std::fmt::Display) -> Self { + Self { + text: format!("Error: {}", e), + is_yield: false, + images: Vec::new(), + model_switch: None, + dmn_pause: false, + } + } + + pub fn text(s: String) -> Self { + Self { + text: s, + is_yield: false, + images: Vec::new(), + model_switch: None, + dmn_pause: false, + } + } +} + +/// Info about a running child process, visible to the TUI. +#[derive(Debug, Clone)] +pub struct ProcessInfo { + pub pid: u32, + pub command: String, + pub started: Instant, +} + +/// Shared tracker for running child processes. Allows the TUI to +/// display what's running and kill processes by PID. +#[derive(Debug, Clone, Default)] +pub struct ProcessTracker { + inner: Arc>>, +} + +impl ProcessTracker { + pub fn new() -> Self { + Self::default() + } + + async fn register(&self, pid: u32, command: &str) { + self.inner.lock().await.push(ProcessInfo { + pid, + command: if command.len() > 120 { + format!("{}...", &command[..120]) + } else { + command.to_string() + }, + started: Instant::now(), + }); + } + + async fn unregister(&self, pid: u32) { + self.inner.lock().await.retain(|p| p.pid != pid); + } + + /// Snapshot of currently running processes. + pub async fn list(&self) -> Vec { + self.inner.lock().await.clone() + } + + /// Kill a process by PID. Returns true if the signal was sent. + pub async fn kill(&self, pid: u32) -> bool { + // SIGTERM the process group (negative PID kills the group) + let ret = unsafe { libc::kill(-(pid as i32), libc::SIGTERM) }; + if ret != 0 { + // Try just the process + unsafe { libc::kill(pid as i32, libc::SIGTERM) }; + } + // Don't unregister — let the normal exit path do that + // so the tool result says "killed by user" + true + } +} + +/// Truncate output if it exceeds max length, appending a truncation notice. +pub fn truncate_output(mut s: String, max: usize) -> String { + if s.len() > max { + s.truncate(max); + s.push_str("\n... (output truncated)"); + } + s +} /// Dispatch a tool call by name. /// @@ -28,12 +181,14 @@ pub use crate::agent::{ToolDef, ToolOutput, ProcessTracker, truncate_output}; /// /// Note: working_stack is handled in runner.rs before reaching this /// function (it needs mutable context access). +/// Dispatch a tool call by name. Handles all tools: +/// agent-specific (control, vision), memory/journal, file/bash. pub async fn dispatch( name: &str, args: &serde_json::Value, tracker: &ProcessTracker, ) -> ToolOutput { - // Agent-specific tools that return Result directly + // Agent-specific tools let rich_result = match name { "pause" => Some(control::pause(args)), "switch_model" => Some(control::switch_model(args)), @@ -45,21 +200,125 @@ pub async fn dispatch( return result.unwrap_or_else(ToolOutput::error); } - // Delegate to shared thought layer (poc-agent uses default provenance) - if let Some(output) = crate::agent::dispatch(name, args, tracker, None).await { + if let Some(output) = dispatch_shared(name, args, tracker, None).await { return output; } ToolOutput::error(format!("Unknown tool: {}", name)) } -/// Return all tool definitions (agent-specific + shared). +/// Dispatch shared tools (memory, file, bash). Used by both the +/// interactive agent and subconscious agents. Provenance tracks +/// which agent made the call for memory attribution. +pub async fn dispatch_shared( + name: &str, + args: &serde_json::Value, + tracker: &ProcessTracker, + provenance: Option<&str>, +) -> Option { + // Memory and journal tools + if name.starts_with("memory_") || name.starts_with("journal_") || name == "output" { + let result = memory::dispatch(name, args, provenance); + return Some(match result { + Ok(s) => ToolOutput::text(s), + Err(e) => ToolOutput::error(e), + }); + } + + // File and execution tools + let result = match name { + "read_file" => read::read_file(args), + "write_file" => write::write_file(args), + "edit_file" => edit::edit_file(args), + "bash" => bash::run_bash(args, tracker).await, + "grep" => grep::grep(args), + "glob" => glob::glob_search(args), + _ => return None, + }; + + Some(match result { + Ok(s) => ToolOutput::text(s), + Err(e) => ToolOutput::error(e), + }) +} + +/// Return all tool definitions (agent-specific + shared + memory). pub fn definitions() -> Vec { let mut defs = vec![ vision::definition(), working_stack::definition(), + read::definition(), + write::definition(), + edit::definition(), + bash::definition(), + grep::definition(), + glob::definition(), ]; defs.extend(control::definitions()); - defs.extend(crate::agent::all_definitions()); + defs.extend(memory::definitions()); defs } + +/// Return memory + journal tool definitions only. +pub fn memory_and_journal_definitions() -> Vec { + let mut defs = memory::definitions(); + defs.extend(memory::journal_definitions()); + defs +} + +/// Create a short summary of tool args for the tools pane header. +pub fn summarize_args(tool_name: &str, args: &serde_json::Value) -> String { + match tool_name { + "read_file" | "write_file" | "edit_file" => args["file_path"] + .as_str() + .unwrap_or("") + .to_string(), + "bash" => { + let cmd = args["command"].as_str().unwrap_or(""); + if cmd.len() > 60 { + let end = cmd.char_indices() + .map(|(i, _)| i) + .take_while(|&i| i <= 60) + .last() + .unwrap_or(0); + format!("{}...", &cmd[..end]) + } else { + cmd.to_string() + } + } + "grep" => { + let pattern = args["pattern"].as_str().unwrap_or(""); + let path = args["path"].as_str().unwrap_or("."); + format!("{} in {}", pattern, path) + } + "glob" => args["pattern"] + .as_str() + .unwrap_or("") + .to_string(), + "view_image" => { + if let Some(pane) = args["pane_id"].as_str() { + format!("pane {}", pane) + } else { + args["file_path"].as_str().unwrap_or("").to_string() + } + } + "journal" => { + let entry = args["entry"].as_str().unwrap_or(""); + if entry.len() > 60 { + format!("{}...", &entry[..60]) + } else { + entry.to_string() + } + } + "yield_to_user" => args["message"] + .as_str() + .unwrap_or("") + .to_string(), + "switch_model" => args["model"] + .as_str() + .unwrap_or("") + .to_string(), + "pause" => String::new(), + _ => String::new(), + } +} diff --git a/src/bin/consciousness.rs b/src/bin/consciousness.rs index b2be592..f7f053c 100644 --- a/src/bin/consciousness.rs +++ b/src/bin/consciousness.rs @@ -1,3 +1,4 @@ +#![warn(unreachable_pub)] // poc-agent — Substrate-independent AI agent // // A minimal but complete agent framework designed for identity @@ -32,7 +33,7 @@ use clap::Parser; use poc_memory::dbglog; use poc_memory::user::*; -use poc_memory::agent::{tools, runner::{Agent, TurnResult}}; +use poc_memory::agent::{tools, Agent, TurnResult}; use poc_memory::user::api::ApiClient; use poc_memory::user::tui::HotkeyAction; use poc_memory::config::{self, AppConfig, SessionConfig}; diff --git a/src/subconscious/api.rs b/src/subconscious/api.rs index d6f1363..56c8b1d 100644 --- a/src/subconscious/api.rs +++ b/src/subconscious/api.rs @@ -9,7 +9,7 @@ use crate::user::api::ApiClient; use crate::user::types::*; -use crate::agent::{self, ProcessTracker}; +use crate::agent::tools::{self as agent_tools, ProcessTracker, ToolOutput}; use std::sync::OnceLock; @@ -46,7 +46,7 @@ pub async fn call_api_with_tools( let (ui_tx, mut ui_rx) = crate::user::ui_channel::channel(); // All available native tools for subconscious agents - let all_tools = agent::memory_and_journal_definitions(); + let all_tools = agent_tools::memory_and_journal_definitions(); // If agent header specifies a tools whitelist, filter to only those let tool_defs: Vec<_> = if tools.is_empty() { all_tools @@ -175,9 +175,9 @@ pub async fn call_api_with_tools( }; let prov = provenance.borrow().clone(); - let output = match agent::dispatch(&call.function.name, &args, &tracker, Some(&prov)).await { + let output = match agent_tools::dispatch_shared(&call.function.name, &args, &tracker, Some(&prov)).await { Some(out) => out, - None => agent::ToolOutput::error(format!("Unknown tool: {}", call.function.name)), + None => ToolOutput::error(format!("Unknown tool: {}", call.function.name)), }; if std::env::var("POC_AGENT_VERBOSE").is_ok() { diff --git a/src/subconscious/knowledge.rs b/src/subconscious/knowledge.rs index bdf8e6a..e89c637 100644 --- a/src/subconscious/knowledge.rs +++ b/src/subconscious/knowledge.rs @@ -295,7 +295,7 @@ fn run_one_agent_inner( _llm_tag: &str, log: &(dyn Fn(&str) + Sync), ) -> Result { - let all_tools = crate::agent::memory_and_journal_definitions(); + let all_tools = crate::agent::tools::memory_and_journal_definitions(); let effective_tools: Vec = if def.tools.is_empty() { all_tools.iter().map(|t| t.function.name.clone()).collect() } else { diff --git a/src/user/tui/mod.rs b/src/user/tui/mod.rs index 0273177..b67b78b 100644 --- a/src/user/tui/mod.rs +++ b/src/user/tui/mod.rs @@ -307,10 +307,6 @@ pub(crate) fn parse_markdown(md: &str) -> Vec> { .collect() } -/// A tool call currently in flight — shown above the status bar. -// ActiveTool moved to ui_channel — shared between Agent and TUI -pub(crate) use crate::user::ui_channel::ActiveTool; - /// Main TUI application state. pub struct App { pub(crate) autonomous: PaneState, diff --git a/src/user/types.rs b/src/user/types.rs index e284cb2..77cf257 100644 --- a/src/user/types.rs +++ b/src/user/types.rs @@ -9,6 +9,12 @@ use chrono::Utc; use serde::{Deserialize, Serialize}; +// Re-export tool types that moved to agent::tools +pub use crate::agent::tools::{ + ToolDef, ToolCall, ToolCallDelta, ToolOutput, + FunctionCall, FunctionDef, FunctionCallDelta, +}; + /// Message content — either plain text or an array of content parts /// (for multimodal messages with images). Serializes as a JSON string /// for text-only, or a JSON array for multimodal. @@ -79,35 +85,7 @@ pub enum Role { Tool, } -/// A tool call requested by the model. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ToolCall { - pub id: String, - #[serde(rename = "type")] - pub call_type: String, - pub function: FunctionCall, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct FunctionCall { - pub name: String, - pub arguments: String, // JSON string -} - -/// Tool definition sent to the model. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ToolDef { - #[serde(rename = "type")] - pub tool_type: String, - pub function: FunctionDef, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct FunctionDef { - pub name: String, - pub description: String, - pub parameters: serde_json::Value, -} +// FunctionCall, FunctionDef moved to agent::tools /// Chat completion request. #[derive(Debug, Serialize)] @@ -202,23 +180,7 @@ pub struct Delta { pub tool_calls: Option>, } -/// A partial tool call within a streaming delta. The first chunk for a -/// given tool call carries the id and function name; subsequent chunks -/// carry argument fragments. -#[derive(Debug, Deserialize)] -pub struct ToolCallDelta { - pub index: usize, - pub id: Option, - #[serde(rename = "type")] - pub call_type: Option, - pub function: Option, -} - -#[derive(Debug, Deserialize)] -pub struct FunctionCallDelta { - pub name: Option, - pub arguments: Option, -} +// FunctionCallDelta moved to agent::tools // --- Convenience constructors ---