diff --git a/src/agent/mod.rs b/src/agent/mod.rs index cae5939..2e9242e 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -577,7 +577,6 @@ impl Agent { self.push_node(AstNode::tool_result(&output)); } - pub fn conversation_from(&self, from: usize) -> &[AstNode] { let conv = self.context.conversation(); if from < conv.len() { &conv[from..] } else { &[] } @@ -698,7 +697,4 @@ impl Agent { pub fn client_clone(&self) -> ApiClient { self.client.clone() } - - - } diff --git a/src/agent/oneshot.rs b/src/agent/oneshot.rs index 2332d27..44a109f 100644 --- a/src/agent/oneshot.rs +++ b/src/agent/oneshot.rs @@ -14,7 +14,8 @@ use std::fs; use std::path::PathBuf; use std::sync::OnceLock; -use super::api::{ApiClient, Message, Usage}; +use super::api::{ApiClient, Usage}; +use super::context::{AstNode, Role}; use super::tools::{self as agent_tools}; use super::Agent; @@ -61,43 +62,13 @@ pub struct AutoAgent { pub turn: usize, } -/// Per-run conversation backend — created fresh by run() or run_forked(). -enum Backend { - Standalone { client: ApiClient, messages: Vec }, - Forked(std::sync::Arc>), -} +/// Per-run conversation backend — wraps a forked agent. +struct Backend(std::sync::Arc>); impl Backend { - async fn client(&self) -> ApiClient { - match self { - Backend::Standalone { client, .. } => client.clone(), - Backend::Forked(agent) => agent.lock().await.client_clone(), - } + async fn push_node(&mut self, node: AstNode) { + self.0.lock().await.push_node(node); } - - async fn messages(&self) -> Vec { - match self { - Backend::Standalone { messages, .. } => messages.clone(), - Backend::Forked(agent) => agent.lock().await.assemble_api_messages(), - } - } - - async fn push_message(&mut self, msg: Message) { - match self { - Backend::Standalone { messages, .. } => messages.push(msg), - Backend::Forked(agent) => agent.lock().await.push_message(msg), - } - } - - async fn push_raw(&mut self, msg: Message) { - match self { - Backend::Standalone { messages, .. } => messages.push(msg), - Backend::Forked(agent) => { - agent.lock().await.push_message(msg); - } - } - } - } /// Resolve {{placeholder}} templates in subconscious agent prompts. @@ -166,17 +137,12 @@ impl AutoAgent { } } - /// Run standalone — creates a fresh message list from the global - /// API client. Used by oneshot CLI agents. + /// Run standalone — TODO: needs rewrite to use completions API pub async fn run( &mut self, - bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, + _bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>, ) -> Result { - let client = get_client()?.clone(); - let mut backend = Backend::Standalone { - client, messages: Vec::new(), - }; - self.run_with_backend(&mut backend, bail_fn).await + Err("standalone agent run not yet migrated to completions API".to_string()) } /// Run forked using a shared agent Arc. The UI can lock the same @@ -192,7 +158,7 @@ impl AutoAgent { phase: s.phase.clone(), }).collect(); let orig_steps = std::mem::replace(&mut self.steps, resolved_steps); - let mut backend = Backend::Forked(agent.clone()); + let mut backend = Backend(agent.clone()); let result = self.run_with_backend(&mut backend, None).await; self.steps = orig_steps; result @@ -211,43 +177,27 @@ impl AutoAgent { let mut next_step = 0; if next_step < self.steps.len() { - backend.push_message( - Message::user(&self.steps[next_step].prompt)).await; + backend.push_node( + AstNode::user_msg(&self.steps[next_step].prompt)).await; next_step += 1; } - let reasoning = crate::config::get().api_reasoning.clone(); let max_turns = 50 * self.steps.len().max(1); for _ in 0..max_turns { self.turn += 1; - let messages = backend.messages().await; - let client = backend.client().await; - dbglog!("[auto] {} turn {} ({} messages)", - self.name, self.turn, messages.len()); + let result = Agent::turn(backend.0.clone()).await + .map_err(|e| format!("{}: {}", self.name, e))?; - let (msg, usage_opt) = Self::api_call_with_retry( - &self.name, &client, &self.tools, &messages, - &reasoning, self.sampling, self.priority).await?; - - if let Some(u) = &usage_opt { - dbglog!("[auto] {} tokens: {} prompt + {} completion", - self.name, u.prompt_tokens, u.completion_tokens); - } - - let has_content = msg.content.is_some(); - let has_tools = msg.tool_calls.as_ref().is_some_and(|tc| !tc.is_empty()); - - if has_tools { - self.dispatch_tools(backend, &msg).await; + if result.had_tool_calls { continue; } - let text = msg.content_text().to_string(); - if text.is_empty() && !has_content { + let text = result.text; + if text.is_empty() { dbglog!("[auto] {} empty response, retrying", self.name); - backend.push_message(Message::user( + backend.push_node(AstNode::user_msg( "[system] Your previous response was empty. \ Please respond with text or use a tool." )).await; @@ -257,15 +207,13 @@ impl AutoAgent { dbglog!("[auto] {} response: {}", self.name, &text[..text.len().min(200)]); - backend.push_message(Message::assistant(&text)).await; - if next_step < self.steps.len() { if let Some(ref check) = bail_fn { check(next_step)?; } self.current_phase = self.steps[next_step].phase.clone(); - backend.push_message( - Message::user(&self.steps[next_step].prompt)).await; + backend.push_node( + AstNode::user_msg(&self.steps[next_step].prompt)).await; next_step += 1; dbglog!("[auto] {} step {}/{}", self.name, next_step, self.steps.len()); @@ -278,102 +226,6 @@ impl AutoAgent { Err(format!("{}: exceeded {} tool turns", self.name, max_turns)) } - async fn api_call_with_retry( - name: &str, - client: &ApiClient, - tools: &[agent_tools::Tool], - messages: &[Message], - reasoning: &str, - sampling: super::api::SamplingParams, - priority: i32, - ) -> Result<(Message, Option), String> { - let mut last_err = None; - for attempt in 0..5 { - match client.chat_completion_stream_temp( - messages, tools, reasoning, sampling, Some(priority), - ).await { - Ok((msg, usage)) => { - if let Some(ref e) = last_err { - dbglog!("[auto] {} succeeded after retry (previous: {})", - name, e); - } - return Ok((msg, usage)); - } - Err(e) => { - let err_str = e.to_string(); - let is_transient = err_str.contains("IncompleteMessage") - || err_str.contains("connection closed") - || err_str.contains("connection reset") - || err_str.contains("timed out") - || err_str.contains("Connection refused"); - if is_transient && attempt < 4 { - dbglog!("[auto] {} transient error (attempt {}): {}, retrying", - name, attempt + 1, err_str); - tokio::time::sleep(std::time::Duration::from_secs(2 << attempt)).await; - last_err = Some(e); - continue; - } - let msg_bytes: usize = messages.iter() - .map(|m| m.content_text().len()).sum(); - return Err(format!( - "{}: API error (~{}KB, {} messages, {} attempts): {}", - name, msg_bytes / 1024, - messages.len(), attempt + 1, e)); - } - } - } - Err(format!("{}: all retry attempts exhausted", name)) - } - - async fn dispatch_tools(&mut self, backend: &mut Backend, msg: &Message) { - let mut sanitized = msg.clone(); - if let Some(ref mut calls) = sanitized.tool_calls { - for call in calls { - if serde_json::from_str::(&call.function.arguments).is_err() { - dbglog!("[auto] {} sanitizing malformed args for {}: {}", - self.name, call.function.name, &call.function.arguments); - call.function.arguments = "{}".to_string(); - } - } - } - backend.push_raw(sanitized).await; - - for call in msg.tool_calls.as_ref().unwrap() { - dbglog!("[auto] {} tool: {}({})", - self.name, call.function.name, &call.function.arguments); - - let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { - Ok(v) => v, - Err(_) => { - backend.push_raw(Message::tool_result( - &call.id, - "Error: your tool call had malformed JSON arguments. \ - Please retry with valid JSON.", - )).await; - continue; - } - }; - - // Intercept output() — store in-memory instead of filesystem - let output = if call.function.name == "output" { - let key = args["key"].as_str().unwrap_or(""); - let value = args["value"].as_str().unwrap_or(""); - if !key.is_empty() { - self.outputs.insert(key.to_string(), value.to_string()); - } - format!("{}: {}", key, value) - } else { - let agent = match &*backend { - Backend::Forked(a) => Some(a.clone()), - _ => None, - }; - agent_tools::dispatch_with_agent(&call.function.name, &args, agent).await - }; - - dbglog!("[auto] {} result: {} chars", self.name, output.len()); - backend.push_raw(Message::tool_result(&call.id, &output)).await; - } - } } // --------------------------------------------------------------------------- diff --git a/src/subconscious/learn.rs b/src/subconscious/learn.rs index 1415aed..48d6278 100644 --- a/src/subconscious/learn.rs +++ b/src/subconscious/learn.rs @@ -15,8 +15,7 @@ // hasn't internalized. 2 API calls. use crate::agent::api::ApiClient; -use crate::agent::api::*; -use crate::agent::context::{ConversationEntry, ContextEntry, ContextState}; +use crate::agent::context::{AstNode, Ast, NodeBody, ContextState, Role}; const SCORE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120); @@ -30,39 +29,71 @@ enum Filter<'a> { SkipAllMemories, } +fn is_memory(node: &AstNode) -> bool { + matches!(node, AstNode::Leaf(leaf) if matches!(leaf.body(), NodeBody::Memory { .. })) +} + +fn memory_key(node: &AstNode) -> Option<&str> { + match node { + AstNode::Leaf(leaf) => match leaf.body() { + NodeBody::Memory { key, .. } => Some(key), + _ => None, + }, + _ => None, + } +} + +fn is_assistant(node: &AstNode) -> bool { + matches!(node, AstNode::Branch { role: Role::Assistant, .. }) +} + +/// Push an AstNode as one or more JSON messages for the scoring API. +fn push_api_message(node: &AstNode, msgs: &mut Vec) { + match node { + AstNode::Branch { role, children } => { + let content: String = children.iter().map(|c| c.render()).collect(); + msgs.push(serde_json::json!({ + "role": role.as_str(), + "content": content, + })); + } + AstNode::Leaf(leaf) => { + let role = match leaf.body() { + NodeBody::ToolResult(_) => "tool", + _ => "user", + }; + msgs.push(serde_json::json!({ + "role": role, + "content": leaf.body().text(), + })); + } + } +} + /// Build the messages array for a scoring call. /// -/// Always includes system prompt + context message as prefix, then -/// entries from `range` filtered by `filter`. +/// Always includes system prompt as prefix, then entries from `range` +/// filtered by `filter`. fn build_messages( context: &ContextState, range: std::ops::Range, filter: Filter, ) -> Vec { let mut msgs = Vec::new(); - for e in context.system().entries() { - msgs.push(serde_json::json!({"role": "system", "content": e.entry.message().content_text()})); + for node in context.system() { + push_api_message(node, &mut msgs); } - let ctx = context.render_context_message(); - if !ctx.is_empty() { - msgs.push(serde_json::json!({"role": "user", "content": ctx})); - } - let entries = context.conversation().entries(); + let entries = context.conversation(); for i in range { - let ce = &entries[i]; - let entry = &ce.entry; + let node = &entries[i]; let skip = match &filter { Filter::None => false, Filter::SkipIndex(idx) => i == *idx, - Filter::SkipKey(key) => matches!(entry, ConversationEntry::Memory { key: k, .. } if k == *key), - Filter::SkipAllMemories => entry.is_memory(), + Filter::SkipKey(key) => memory_key(node) == Some(*key), + Filter::SkipAllMemories => is_memory(node), }; if skip { continue; } - let m = entry.api_message(); - msgs.push(serde_json::json!({ - "role": m.role_str(), - "content": m.content_text(), - })); + push_api_message(node, &mut msgs); } msgs } @@ -178,16 +209,13 @@ pub async fn score_memories( context: &ContextState, client: &ApiClient, ) -> anyhow::Result { - let mut memory_keys: Vec = context.conversation.entries().iter() - .filter_map(|ce| match &ce.entry { - ConversationEntry::Memory { key, .. } => Some(key.clone()), - _ => None, - }) + let mut memory_keys: Vec = context.conversation().iter() + .filter_map(|node| memory_key(node).map(String::from)) .collect(); memory_keys.dedup(); - let response_indices: Vec = context.conversation.entries().iter().enumerate() - .filter(|(_, ce)| ce.entry.message().role == Role::Assistant) + let response_indices: Vec = context.conversation().iter().enumerate() + .filter(|(_, node)| is_assistant(node)) .map(|(i, _)| i) .collect(); @@ -201,7 +229,7 @@ pub async fn score_memories( let http = http_client(); - let range = 0..context.conversation.entries().len(); + let range = 0..context.conversation().len(); let baseline = call_score(&http, client, &build_messages(context, range.clone(), Filter::None)).await?; @@ -245,10 +273,10 @@ pub async fn score_memories( /// Find the entry index after `start` that contains the Nth assistant response. /// Returns (end_index, true) if N responses were found, (entries.len(), false) if not. -fn nth_response_end(entries: &[ContextEntry], start: usize, n: usize) -> (usize, bool) { +fn nth_response_end(entries: &[AstNode], start: usize, n: usize) -> (usize, bool) { let mut count = 0; for i in start..entries.len() { - if entries[i].entry.message().role == Role::Assistant { + if is_assistant(&entries[i]) { count += 1; if count >= n { return (i + 1, true); } } @@ -270,17 +298,15 @@ pub async fn score_memory( ) -> anyhow::Result { const RESPONSE_WINDOW: usize = 50; - let entries = context.conversation.entries(); - let first_pos = match entries.iter().position(|ce| { - matches!(&ce.entry, ConversationEntry::Memory { key: k, .. } if k == key) - }) { + let entries = context.conversation(); + let first_pos = match entries.iter().position(|node| memory_key(node) == Some(key)) { Some(p) => p, None => return Ok(0.0), }; let (end, _) = nth_response_end(entries, first_pos, RESPONSE_WINDOW); let range = first_pos..end; - if !entries[range.clone()].iter().any(|ce| ce.entry.message().role == Role::Assistant) { + if !entries[range.clone()].iter().any(|node| is_assistant(node)) { return Ok(0.0); } @@ -319,14 +345,14 @@ where let store = crate::hippocampus::store::Store::load().unwrap_or_default(); - for (i, ce) in context.conversation.entries().iter().enumerate() { - if let ConversationEntry::Memory { key, .. } = &ce.entry { - if !seen.insert(key.clone()) { continue; } - let last_scored = store.nodes.get(key.as_str()) + for (i, node) in context.conversation().iter().enumerate() { + if let Some(key) = memory_key(node) { + if !seen.insert(key.to_owned()) { continue; } + let last_scored = store.nodes.get(key) .map(|n| n.last_scored) .unwrap_or(0); if now - last_scored >= max_age_secs { - candidates.push((i, key.clone(), last_scored)); + candidates.push((i, key.to_owned(), last_scored)); } } } @@ -337,11 +363,11 @@ where let http = http_client(); let mut scored = 0; - let total_tokens = context.conversation.tokens(); + let entries = context.conversation(); + let total_tokens: usize = entries.iter().map(|n| n.tokens()).sum(); let token_cutoff = total_tokens * 60 / 100; // Precompute cumulative token position for each entry - let entries = context.conversation.entries(); let mut cumulative: Vec = Vec::with_capacity(entries.len()); let mut running = 0; for e in entries { @@ -355,9 +381,9 @@ where if cumulative.get(*pos).copied().unwrap_or(total_tokens) > token_cutoff { continue; } - let (end, _) = nth_response_end(context.conversation.entries(), *pos, response_window); + let (end, _) = nth_response_end(context.conversation(), *pos, response_window); let range = *pos..end; - if !context.conversation.entries()[range.clone()].iter().any(|ce| ce.entry.message().role == Role::Assistant) { + if !context.conversation()[range.clone()].iter().any(|node| is_assistant(node)) { continue; } @@ -397,10 +423,11 @@ pub async fn score_finetune( count: usize, client: &ApiClient, ) -> anyhow::Result> { - let range = context.conversation.entries().len().saturating_sub(count)..context.conversation.entries().len(); + let entries = context.conversation(); + let range = entries.len().saturating_sub(count)..entries.len(); let response_positions: Vec = range.clone() - .filter(|&i| context.conversation.entries()[i].entry.message().role == Role::Assistant) + .filter(|&i| is_assistant(&entries[i])) .collect(); if response_positions.is_empty() { return Ok(Vec::new()); diff --git a/src/user/chat.rs b/src/user/chat.rs index 40b4d29..4ce5cf2 100644 --- a/src/user/chat.rs +++ b/src/user/chat.rs @@ -13,7 +13,7 @@ use ratatui::{ }; use super::{App, ScreenView, screen_legend}; -use crate::agent::context::{AstNode, NodeBody, Role}; +use crate::agent::context::{AstNode, NodeBody, Role, Ast}; use crate::mind::MindCommand; // --- Slash command table --- @@ -149,6 +149,7 @@ enum Marker { Assistant, } +#[derive(PartialEq)] enum PaneTarget { Conversation, ConversationAssistant, @@ -836,7 +837,7 @@ impl ScreenView for InteractScreen { agent.expire_activities(); app.status.prompt_tokens = agent.last_prompt_tokens(); app.status.model = agent.model().to_string(); - app.status.context_budget = agent.context.format_budget(); + app.status.context_budget = format!("{} tokens", agent.context.tokens()); app.activity = agent.activities.last() .map(|a| a.label.clone()) .unwrap_or_default(); diff --git a/src/user/subconscious.rs b/src/user/subconscious.rs index 661c82f..c0f1789 100644 --- a/src/user/subconscious.rs +++ b/src/user/subconscious.rs @@ -152,9 +152,8 @@ impl SubconsciousScreen { snap.forked_agent.as_ref() .and_then(|agent| agent.try_lock().ok()) .map(|ag| { - let conv = ag.context.conversation.clone(); - // Only show entries from fork point onward - let mut view = section_to_view(&conv); + let conv = ag.context.conversation(); + let mut view = section_to_view("Conversation", conv); let fork = snap.fork_point.min(view.children.len()); view.children = view.children.split_off(fork); vec![view] @@ -179,7 +178,7 @@ impl SubconsciousScreen { .unwrap_or_else(|| "—".to_string()); let entries = snap.forked_agent.as_ref() .and_then(|a| a.try_lock().ok()) - .map(|ag| ag.context.conversation.len().saturating_sub(snap.fork_point)) + .map(|ag| ag.context.conversation().len().saturating_sub(snap.fork_point)) .unwrap_or(0); ListItem::from(Line::from(vec![ Span::styled(&snap.name, Style::default().fg(Color::Gray)),