diff --git a/src/agent/context_new.rs b/src/agent/context_new.rs index 656daf7..e0d05f9 100644 --- a/src/agent/context_new.rs +++ b/src/agent/context_new.rs @@ -80,6 +80,7 @@ pub enum AstNode { /// The context window: four sections as Vec. /// All mutation goes through ContextState methods to maintain the invariant /// that token_ids on every leaf matches its rendered text. +#[derive(Clone)] pub struct ContextState { system: Vec, identity: Vec, @@ -699,11 +700,14 @@ impl ContextState { } } - /// Remove a node at `index` from `section`. pub fn del(&mut self, section: Section, index: usize) -> AstNode { self.section_mut(section).remove(index) } + pub fn clear(&mut self, section: Section) { + self.section_mut(section).clear(); + } + /// Push a child node into a branch at `index` in `section`. pub fn push_child(&mut self, section: Section, index: usize, child: AstNode) { let node = &mut self.section_mut(section)[index]; diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 6bf2eee..b42678a 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -25,7 +25,7 @@ use anyhow::Result; use api::ApiClient; use context_new::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role}; -use tools::{summarize_args, working_stack}; +use tools::summarize_args; use crate::mind::log::ConversationLog; @@ -505,38 +505,36 @@ impl Agent { } /// Dispatch a tool call without holding the agent lock across I/O. - /// Used by `turn()` which manages its own locking. async fn dispatch_tool_call_unlocked( agent: &Arc>, active_tools: &tools::SharedActiveTools, - call: &ToolCall, + call: &PendingToolCall, ds: &mut DispatchState, ) { - let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) { + let args: serde_json::Value = match serde_json::from_str(&call.arguments) { Ok(v) => v, Err(e) => { let err = format!("Error: malformed tool call arguments: {e}"); - let _act = start_activity(agent, format!("rejected: {} (bad args)", call.function.name)).await; + let _act = start_activity(agent, format!("rejected: {} (bad args)", call.name)).await; let mut me = agent.lock().await; me.apply_tool_result(call, err, ds); return; } }; - let args_summary = summarize_args(&call.function.name, &args); - let _calling = start_activity(agent, format!("calling: {}", call.function.name)).await; + let args_summary = summarize_args(&call.name, &args); + let _calling = start_activity(agent, format!("calling: {}", call.name)).await; - // Spawn tool, track it let call_clone = call.clone(); let agent_handle = agent.clone(); let handle = tokio::spawn(async move { - let output = tools::dispatch_with_agent(&call_clone.function.name, &args, Some(agent_handle)).await; + let output = tools::dispatch_with_agent(&call_clone.name, &args, Some(agent_handle)).await; (call_clone, output) }); active_tools.lock().unwrap().push( tools::ActiveToolCall { id: call.id.clone(), - name: call.function.name.clone(), + name: call.name.clone(), detail: args_summary, started: std::time::Instant::now(), background: false, @@ -544,28 +542,22 @@ impl Agent { } ); - // Pop it back and await — no agent lock held let entry = { let mut tools = active_tools.lock().unwrap(); tools.pop().unwrap() }; if let Ok((call, output)) = entry.handle.await { - // Brief lock to apply result let mut me = agent.lock().await; me.apply_tool_result(&call, output, ds); } } - /// Apply a completed tool result to conversation state. fn apply_tool_result( &mut self, - call: &ToolCall, + call: &PendingToolCall, output: String, ds: &mut DispatchState, ) { - let args: serde_json::Value = - serde_json::from_str(&call.function.arguments).unwrap_or_default(); - ds.had_tool_calls = true; if output.starts_with("Error:") { ds.tool_errors += 1; @@ -573,70 +565,41 @@ impl Agent { self.active_tools.lock().unwrap().retain(|t| t.id != call.id); - // Tag memory_render results for context deduplication - if call.function.name == "memory_render" && !output.starts_with("Error:") { + // Tag memory_render results as Memory nodes for context deduplication + if call.name == "memory_render" && !output.starts_with("Error:") { + let args: serde_json::Value = + serde_json::from_str(&call.arguments).unwrap_or_default(); if let Some(key) = args.get("key").and_then(|v| v.as_str()) { - let mut msg = Message::tool_result(&call.id, &output); - msg.stamp(); - self.push_entry(ConversationEntry::Memory { key: key.to_string(), message: msg, score: None }); + self.push_node(AstNode::memory(key, &output)); return; } } - self.push_message(Message::tool_result(&call.id, &output)); + self.push_node(AstNode::tool_result(&output)); } - /// Context state sections — just returns references to the live data. - pub fn context_sections(&self) -> [&ContextSection; 4] { - self.context.sections() + pub fn conversation_from(&self, from: usize) -> &[AstNode] { + let conv = self.context.conversation(); + if from < conv.len() { &conv[from..] } else { &[] } } - /// Conversation entries from `from` onward — used by the - /// subconscious debug screen to show forked agent conversations. - pub fn conversation_entries_from(&self, from: usize) -> &[ContextEntry] { - let entries = self.context.conversation.entries(); - if from < entries.len() { &entries[from..] } else { &[] } - } - - /// Load recent journal entries at startup for orientation. - /// Uses the same budget logic as compaction but with empty conversation. - /// Only parses the tail of the journal file (last 64KB) for speed. fn load_startup_journal(&mut self) { let store = match crate::store::Store::load() { Ok(s) => s, Err(_) => return, }; - // Find oldest message timestamp in conversation log let oldest_msg_ts = self.conversation_log.as_ref() .and_then(|log| log.oldest_timestamp()); - // Get journal entries from the memory graph let mut journal_nodes: Vec<_> = store.nodes.values() .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .collect(); - let mut dbg = std::fs::OpenOptions::new().create(true).append(true) - .open("/tmp/poc-journal-debug.log").ok(); - macro_rules! dbg_log { - ($($arg:tt)*) => { - if let Some(ref mut f) = dbg { use std::io::Write; let _ = writeln!(f, $($arg)*); } - } - } - dbg_log!("[journal] {} nodes, oldest_msg={:?}", journal_nodes.len(), oldest_msg_ts); - journal_nodes.sort_by_key(|n| n.created_at); - if let Some(first) = journal_nodes.first() { - dbg_log!("[journal] first created_at={}", first.created_at); - } - if let Some(last) = journal_nodes.last() { - dbg_log!("[journal] last created_at={}", last.created_at); - } - // Find the cutoff index — entries older than conversation, plus one overlap let cutoff_idx = if let Some(cutoff) = oldest_msg_ts { let cutoff_ts = cutoff.timestamp(); - dbg_log!("[journal] cutoff timestamp={}", cutoff_ts); let mut idx = journal_nodes.len(); for (i, node) in journal_nodes.iter().enumerate() { if node.created_at >= cutoff_ts { @@ -648,117 +611,45 @@ impl Agent { } else { journal_nodes.len() }; - dbg_log!("[journal] cutoff_idx={}", cutoff_idx); - // Walk backwards from cutoff, accumulating entries within 15% of context - let context_window = crate::agent::context::context_window(); - let journal_budget = context_window * 15 / 100; - dbg_log!("[journal] budget={} tokens ({}*15%)", journal_budget, context_window); - - let mut journal_entries = Vec::new(); + let journal_budget = context_new::context_window() * 15 / 100; + let mut entries = Vec::new(); let mut total_tokens = 0; for node in journal_nodes[..cutoff_idx].iter().rev() { - let msg = Message::user(&node.content); - let ce = ContextEntry::new( - ConversationEntry::Message(msg), - chrono::DateTime::from_timestamp(node.created_at, 0), - ); - if total_tokens + ce.tokens() > journal_budget && !journal_entries.is_empty() { + let ts = chrono::DateTime::from_timestamp(node.created_at, 0); + let ast = AstNode::memory(&node.key, &node.content) + .with_timestamp(ts.unwrap_or_else(chrono::Utc::now)); + let tok = ast.tokens(); + if total_tokens + tok > journal_budget && !entries.is_empty() { break; } - total_tokens += ce.tokens(); - journal_entries.push(ce); + total_tokens += tok; + entries.push(ast); } - journal_entries.reverse(); - dbg_log!("[journal] loaded {} entries, {} tokens", journal_entries.len(), total_tokens); + entries.reverse(); - if journal_entries.is_empty() { - dbg_log!("[journal] no entries!"); - return; - } + if entries.is_empty() { return; } - self.context.journal.clear(); - for entry in journal_entries { - self.context.journal.push(entry); - } - dbg_log!("[journal] context.journal now has {} entries", self.context.journal.len()); - } - - /// Called after any change to context state (working stack, etc). - /// Load working stack from disk. - fn load_working_stack(&mut self) { - if let Ok(data) = std::fs::read_to_string(working_stack::file_path()) { - if let Ok(stack) = serde_json::from_str::>(&data) { - self.context.working_stack = stack; - } + self.context.clear(Section::Journal); + for entry in entries { + self.context.push(Section::Journal, entry); } } - /// Replace base64 image data in older messages with text placeholders. - /// Keeps the 2 most recent images live (enough for motion/comparison). - /// The tool result message before each image records what was loaded. - pub fn age_out_images(&mut self) { - // Find image entries newest-first, skip 1 (caller is about to add another) - let to_age: Vec = self.context.conversation.entries().iter().enumerate() - .rev() - .filter(|(_, ce)| { - if let Some(MessageContent::Parts(parts)) = &ce.entry.message().content { - parts.iter().any(|p| matches!(p, ContentPart::ImageUrl { .. })) - } else { false } - }) - .map(|(i, _)| i) - .skip(1) // keep 1 existing + 1 about to be added = 2 live - .collect(); - - for i in to_age { - // Build replacement entry with image data stripped - let old = &self.context.conversation.entries()[i]; - let msg = old.entry.message(); - if let Some(MessageContent::Parts(parts)) = &msg.content { - let mut replacement = String::new(); - for part in parts { - match part { - ContentPart::Text { text } => { - if !replacement.is_empty() { replacement.push('\n'); } - replacement.push_str(text); - } - ContentPart::ImageUrl { .. } => { - if !replacement.is_empty() { replacement.push('\n'); } - replacement.push_str("[image aged out]"); - } - } - } - let mut new_msg = msg.clone(); - new_msg.content = Some(MessageContent::Text(replacement)); - self.context.conversation.set(i, ContextEntry::new( - ConversationEntry::Message(new_msg), - old.timestamp, - )); - } - } - self.generation += 1; - } - - /// Strip ephemeral tool calls from the conversation history. - /// - /// Last prompt token count reported by the API. pub fn last_prompt_tokens(&self) -> u32 { self.last_prompt_tokens } - /// Rebuild the context window: reload identity, dedup, trim, reload journal. + /// Rebuild the context window: reload identity, trim, reload journal. pub fn compact(&mut self) { - // Reload identity from config match crate::config::reload_for_model(&self.app_config, &self.prompt_file) { Ok((system_prompt, personality)) => { - self.context.system.clear(); - self.context.system.push(ContextEntry::new( - ConversationEntry::System(Message::system(&system_prompt)), None)); - self.context.identity.clear(); - for (_name, content) in &personality { - self.context.identity.push(ContextEntry::new( - ConversationEntry::Message(Message::user(content)), None)); + self.context.clear(Section::System); + self.context.push(Section::System, AstNode::system_msg(&system_prompt)); + self.context.clear(Section::Identity); + for (name, content) in &personality { + self.context.push(Section::Identity, AstNode::memory(name, content)); } } Err(e) => { @@ -766,88 +657,45 @@ impl Agent { } } - let before = self.context.conversation.len(); - let before_mem = self.context.conversation.entries().iter() - .filter(|e| e.entry.is_memory()).count(); - let before_conv = before - before_mem; - - // Age out images before trimming — they're huge in the request payload - self.age_out_images(); - - // Load journal BEFORE trimming so trim accounts for journal cost self.load_startup_journal(); - // Dedup memory, trim to budget - let fixed = self.context.system.tokens() + self.context.identity.tokens() - + self.context.journal.tokens(); - self.context.conversation.trim(fixed); - - let after = self.context.conversation.len(); - let after_mem = self.context.conversation.entries().iter() - .filter(|e| e.entry.is_memory()).count(); - let after_conv = after - after_mem; - - dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})", - before, after, before_mem, after_mem, before_conv, after_conv); + // TODO: trim_entries — dedup memories, evict to budget self.generation += 1; self.last_prompt_tokens = 0; - - dbglog!("[compact] budget: {}", self.context.format_budget()); } - /// Restore from the conversation log. Builds the context window - /// the same way compact() does — journal summaries for old messages, - /// raw recent messages. This is the unified startup path. + /// Restore from the conversation log. /// Returns true if the log had content to restore. pub fn restore_from_log(&mut self) -> bool { - let entries = match &self.conversation_log { - Some(log) => match log.read_tail(64 * 1024 * 1024) { - Ok(entries) if !entries.is_empty() => entries, + let nodes = match &self.conversation_log { + Some(log) => match log.read_nodes(64 * 1024 * 1024) { + Ok(nodes) if !nodes.is_empty() => nodes, _ => return false, }, None => return false, }; - // Load extra — compact() will dedup, trim, reload identity + journal - let all: Vec = entries.into_iter() - .filter(|e| !e.is_log() && !e.is_thinking() && e.message().role != Role::System) - .map(|e| { - let timestamp = if e.is_log() || e.is_thinking() { None } else { - e.message().timestamp.as_ref().and_then(|ts| { - chrono::DateTime::parse_from_rfc3339(ts).ok() - .map(|dt| dt.with_timezone(&chrono::Utc)) - }) - }; - ContextEntry::new(e, timestamp) - }) - .collect(); - let mem_count = all.iter().filter(|e| e.entry.is_memory()).count(); - let conv_count = all.len() - mem_count; - dbglog!("[restore] loaded {} entries from log (mem: {}, conv: {})", - all.len(), mem_count, conv_count); - self.context.conversation.set_entries(all); + self.context.clear(Section::Conversation); + for node in nodes { + self.context.push(Section::Conversation, node); + } self.compact(); - // Estimate prompt tokens so status bar isn't 0 on startup - self.last_prompt_tokens = self.context.total_tokens() as u32; + self.last_prompt_tokens = self.context.tokens() as u32; true } - /// Replace the API client (for model switching). pub fn swap_client(&mut self, new_client: ApiClient) { self.client = new_client; } - /// Get the model identifier. pub fn model(&self) -> &str { &self.client.model } - /// Get the conversation entries. - pub fn entries(&self) -> &[ContextEntry] { - self.context.conversation.entries() + pub fn conversation(&self) -> &[AstNode] { + self.context.conversation() } - /// Mutable access to conversation entries (for /retry). pub fn client_clone(&self) -> ApiClient { self.client.clone() } diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index 5a66d62..eef1eb4 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -68,7 +68,7 @@ pub struct ActiveToolCall { pub detail: String, pub started: Instant, pub background: bool, - pub handle: tokio::task::JoinHandle<(ToolCall, String)>, + pub handle: tokio::task::JoinHandle<(super::context_new::PendingToolCall, String)>, } /// Shared active tool calls — agent spawns, TUI reads metadata / aborts. diff --git a/src/mind/log.rs b/src/mind/log.rs index e7b6e71..2228456 100644 --- a/src/mind/log.rs +++ b/src/mind/log.rs @@ -1,19 +1,8 @@ -// observe.rs — Shared observation socket + logfile -// -// Two mechanisms: -// 1. Logfile (~/.consciousness/agent-sessions/observe.log) — append-only -// plain text of the conversation. `poc-agent read` prints new -// content since last read using a byte-offset cursor file. -// 2. Unix socket — for live streaming (`poc-agent read -f`) and -// sending input (`poc-agent write `). -// -// The logfile is the history. The socket is the live wire. - use anyhow::{Context, Result}; use std::fs::{File, OpenOptions}; use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; -use crate::agent::context::ConversationEntry; +use crate::agent::context_new::AstNode; pub struct ConversationLog { path: PathBuf, @@ -21,7 +10,6 @@ pub struct ConversationLog { impl ConversationLog { pub fn new(path: PathBuf) -> Result { - // Ensure parent directory exists if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("creating log dir {}", parent.display()))?; @@ -29,16 +17,15 @@ impl ConversationLog { Ok(Self { path }) } - /// Append a conversation entry to the log. - pub fn append(&self, entry: &ConversationEntry) -> Result<()> { + pub fn append_node(&self, node: &AstNode) -> Result<()> { let mut file = OpenOptions::new() .create(true) .append(true) .open(&self.path) .with_context(|| format!("opening log {}", self.path.display()))?; - let line = serde_json::to_string(entry) - .context("serializing entry for log")?; + let line = serde_json::to_string(node) + .context("serializing node for log")?; writeln!(file, "{}", line) .context("writing to conversation log")?; file.sync_all() @@ -46,10 +33,7 @@ impl ConversationLog { Ok(()) } - /// Read the tail of the log (last `max_bytes` bytes). - /// Seeks to `file_len - max_bytes`, skips the first partial line, - /// then parses forward. For logs smaller than `max_bytes`, reads everything. - pub fn read_tail(&self, max_bytes: u64) -> Result> { + pub fn read_nodes(&self, max_bytes: u64) -> Result> { if !self.path.exists() { return Ok(Vec::new()); } @@ -60,45 +44,36 @@ impl ConversationLog { if file_len > max_bytes { reader.seek(SeekFrom::Start(file_len - max_bytes))?; - // Skip partial first line let mut discard = String::new(); reader.read_line(&mut discard)?; } - let mut entries = Vec::new(); + let mut nodes = Vec::new(); for line in reader.lines() { let line = line.context("reading log tail")?; let line = line.trim(); - if line.is_empty() { - continue; - } - // Try ConversationEntry first (new format), fall back to bare Message (old logs) - if let Ok(entry) = serde_json::from_str::(line) { - entries.push(entry); + if line.is_empty() { continue; } + if let Ok(node) = serde_json::from_str::(line) { + nodes.push(node); } } - Ok(entries) + Ok(nodes) } pub fn path(&self) -> &Path { &self.path } - /// Get the timestamp of the oldest message in the log. pub fn oldest_timestamp(&self) -> Option> { let file = File::open(&self.path).ok()?; let reader = BufReader::new(file); for line in reader.lines().flatten() { let line = line.trim().to_string(); if line.is_empty() { continue; } - if let Ok(entry) = serde_json::from_str::(&line) { - if let Some(ts) = &entry.message().timestamp { - if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) { - return Some(dt.to_utc()); - } - // Try other formats - if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") { - return Some(dt.and_utc()); + if let Ok(node) = serde_json::from_str::(&line) { + if let Some(leaf) = node.leaf() { + if let Some(ts) = leaf.timestamp() { + return Some(ts); } } }