From ddfdbe6cb1035ffb42fb0d6eddbeb4529fa67d34 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Thu, 9 Apr 2026 00:32:32 -0400 Subject: [PATCH] Move conversation_log from AgentState to ContextState The log records what goes into context, so it belongs under the context lock. push() now auto-logs conversation entries, eliminating all the manual lock-state-for-log, drop, lock-context-for-push dances. - ContextState: new conversation_log field, Clone impl drops it (forked contexts don't log) - push(): auto-logs Section::Conversation entries - push_node, apply_tool_results, collect_results: all simplified - collect_results: batch nodes under single context lock - Assistant response logged under context lock after parse completes Co-Authored-By: Proof of Concept --- src/agent/context.rs | 22 +++++- src/agent/mod.rs | 41 ++++------- src/mind/mod.rs | 6 +- src/mind/subconscious.rs | 152 +++++++++++++++++++-------------------- 4 files changed, 112 insertions(+), 109 deletions(-) diff --git a/src/agent/context.rs b/src/agent/context.rs index a571203..89232a9 100644 --- a/src/agent/context.rs +++ b/src/agent/context.rs @@ -99,12 +99,24 @@ pub enum AstNode { /// The context window: four sections as Vec. /// All mutation goes through ContextState methods to maintain the invariant /// that token_ids on every leaf matches its rendered text. -#[derive(Clone)] pub struct ContextState { system: Vec, identity: Vec, journal: Vec, conversation: Vec, + pub conversation_log: Option, +} + +impl Clone for ContextState { + fn clone(&self) -> Self { + Self { + system: self.system.clone(), + identity: self.identity.clone(), + journal: self.journal.clone(), + conversation: self.conversation.clone(), + conversation_log: None, // forked contexts don't log + } + } } /// Identifies a section for mutation methods. @@ -698,6 +710,7 @@ impl ContextState { identity: Vec::new(), journal: Vec::new(), conversation: Vec::new(), + conversation_log: None, } } @@ -753,6 +766,13 @@ impl ContextState { } pub fn push(&mut self, section: Section, node: AstNode) { + if section == Section::Conversation { + if let Some(ref log) = self.conversation_log { + if let Err(e) = log.append_node(&node) { + eprintln!("warning: log: {:#}", e); + } + } + } self.section_mut(section).push(node); } diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 4b00bc2..26b55de 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -151,7 +151,6 @@ pub struct AgentState { pub pending_model_switch: Option, pub pending_dmn_pause: bool, pub provenance: String, - pub conversation_log: Option, pub generation: u64, pub memory_scoring_in_flight: bool, pub active_tools: tools::ActiveTools, @@ -172,6 +171,7 @@ impl Agent { active_tools: tools::ActiveTools, ) -> Arc { let mut context = ContextState::new(); + context.conversation_log = conversation_log; context.push(Section::System, AstNode::system_msg(&system_prompt)); let tool_defs: Vec = tools::tools().iter() @@ -213,7 +213,6 @@ impl Agent { pending_model_switch: None, pending_dmn_pause: false, provenance: "manual".to_string(), - conversation_log, generation: 0, memory_scoring_in_flight: false, active_tools, @@ -249,7 +248,6 @@ impl Agent { pending_model_switch: None, pending_dmn_pause: false, provenance: st.provenance.clone(), - conversation_log: None, generation: 0, memory_scoring_in_flight: false, active_tools: tools::ActiveTools::new(), @@ -269,15 +267,8 @@ impl Agent { pub async fn push_node(&self, node: AstNode) { let node = node.with_timestamp(chrono::Utc::now()); - let st = self.state.lock().await; - if let Some(ref log) = st.conversation_log { - if let Err(e) = log.append_node(&node) { - eprintln!("warning: failed to log entry: {:#}", e); - } - } - st.changed.notify_one(); - drop(st); self.context.lock().await.push(Section::Conversation, node); + self.state.lock().await.changed.notify_one(); } /// Run the agent turn loop: assemble prompt, stream response, @@ -375,11 +366,13 @@ impl Agent { } Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)), Ok(Ok(())) => { - let node = agent.context.lock().await.conversation()[branch_idx].clone(); - let st = agent.state.lock().await; - if let Some(ref log) = st.conversation_log { - if let Err(e) = log.append_node(&node) { - eprintln!("warning: failed to log assistant response: {:#}", e); + // Assistant response was pushed to context by the parser; + // log it now that parsing is complete. + let ctx = agent.context.lock().await; + if let Some(ref log) = ctx.conversation_log { + let node = &ctx.conversation()[branch_idx]; + if let Err(e) = log.append_node(node) { + eprintln!("warning: log: {:#}", e); } } } @@ -469,19 +462,11 @@ impl Agent { nodes.push(Self::make_tool_result_node(call, output)); } - // Single lock: remove from active, log, push to context { let mut st = agent.state.lock().await; for (call, _) in &results { st.active_tools.remove(&call.id); } - for node in &nodes { - if let Some(ref log) = st.conversation_log { - if let Err(e) = log.append_node(node) { - eprintln!("warning: failed to log entry: {:#}", e); - } - } - } } { let mut ctx = agent.context.lock().await; @@ -494,8 +479,8 @@ impl Agent { async fn load_startup_journal(&self) { let oldest_msg_ts = { - let st = self.state.lock().await; - st.conversation_log.as_ref().and_then(|log| log.oldest_timestamp()) + let ctx = self.context.lock().await; + ctx.conversation_log.as_ref().and_then(|log| log.oldest_timestamp()) }; let store = match crate::store::Store::load() { @@ -574,8 +559,8 @@ impl Agent { pub async fn restore_from_log(&self) -> bool { let nodes = { - let st = self.state.lock().await; - match &st.conversation_log { + let ctx = self.context.lock().await; + match &ctx.conversation_log { Some(log) => match log.read_nodes(64 * 1024 * 1024) { Ok(nodes) if !nodes.is_empty() => nodes, _ => return false, diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 47cb052..138623d 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -383,10 +383,10 @@ impl Mind { { let mut ctx = self.agent.context.lock().await; ctx.clear(Section::Conversation); + ctx.conversation_log = new_log; } { let mut st = self.agent.state.lock().await; - st.conversation_log = new_log; st.generation += 1; st.last_prompt_tokens = 0; } @@ -488,9 +488,9 @@ impl Mind { let mut sub_handle: Option> = None; let mut unc_handle: Option> = None; loop { - let (timeout, turn_active, has_input) = { + let (timeout, has_input) = { let me = self.shared.lock().unwrap(); - (me.dmn.interval(), me.turn_active, me.has_pending_input()) + (me.dmn.interval(), me.has_pending_input()) }; let mut cmds = Vec::new(); diff --git a/src/mind/subconscious.rs b/src/mind/subconscious.rs index 48d1ce9..35189b6 100644 --- a/src/mind/subconscious.rs +++ b/src/mind/subconscious.rs @@ -273,7 +273,7 @@ impl State { use std::sync::Arc; use crate::agent::{Agent, oneshot::{AutoAgent, AutoStep}}; -use crate::agent::context::{Ast, AstNode, NodeBody}; +use crate::agent::context::{Ast, AstNode, NodeBody, Section}; use crate::subconscious::defs; /// Names and byte-interval triggers for the built-in subconscious agents. @@ -455,92 +455,90 @@ impl Subconscious { /// Collect results from finished agents, inject outputs into the /// conscious agent's context. + /// Reap finished agents and inject their outputs into the conscious context. pub async fn collect_results(&mut self, agent: &Arc) { - let finished: Vec<(usize, tokio::task::JoinHandle<(AutoAgent, Result)>)> = - self.agents.iter_mut().enumerate().filter_map(|(i, sub)| { - if sub.handle.as_ref().is_some_and(|h| h.is_finished()) { - sub.last_run = Some(Instant::now()); - Some((i, sub.handle.take().unwrap())) - } else { - None - } - }).collect(); - let had_finished = !finished.is_empty(); + let mut any_finished = false; + + for i in 0..self.agents.len() { + if !self.agents[i].handle.as_ref().is_some_and(|h| h.is_finished()) { + continue; + } + let handle = self.agents[i].handle.take().unwrap(); + self.agents[i].last_run = Some(Instant::now()); + any_finished = true; - for (idx, handle) in finished { let (auto_back, result) = handle.await.unwrap_or_else( |e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0), Err(format!("task panicked: {}", e)))); - self.agents[idx].auto = auto_back; + self.agents[i].auto = auto_back; match result { - Ok(_) => { - let name = self.agents[idx].name.clone(); - - // Check state for outputs (written by the output tool closure) - let has_outputs = self.state.contains_key("surface") - || self.state.contains_key("reflection") - || self.state.contains_key("thalamus"); - if has_outputs { - if let Some(surface_str) = self.state.get("surface").cloned() { - // Collect keys already in context to avoid duplicates - let existing: std::collections::HashSet = { - let ctx = agent.context.lock().await; - ctx.conversation().iter() - .filter_map(|n| n.leaf()) - .filter_map(|l| match l.body() { - NodeBody::Memory { key, .. } => Some(key.clone()), - _ => None, - }) - .collect() - }; - - let store = crate::store::Store::cached().await.ok(); - let store_guard = match &store { - Some(s) => Some(s.lock().await), - None => None, - }; - for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { - if existing.contains(key) { continue; } - let rendered = store_guard.as_ref() - .and_then(|s| crate::cli::node::render_node(s, key)); - if let Some(rendered) = rendered { - agent.push_node(AstNode::memory( - key, - format!("--- {} (surfaced) ---\n{}", key, rendered), - )).await; - } - } - } - - if let Some(reflection) = self.state.get("reflection").cloned() { - if !reflection.trim().is_empty() { - agent.push_node(AstNode::dmn(format!( - "--- subconscious reflection ---\n{}", - reflection.trim(), - ))).await; - } - } - - if let Some(nudge) = self.state.get("thalamus").cloned() { - let nudge = nudge.trim(); - if !nudge.is_empty() && nudge != "ok" { - agent.push_node(AstNode::dmn(format!( - "--- thalamus ---\n{}", - nudge, - ))).await; - } - } - } - - dbglog!("[subconscious] {} completed", name); - } - Err(e) => dbglog!("[subconscious] agent failed: {}", e), + Ok(_) => dbglog!("[subconscious] {} completed", self.agents[i].name), + Err(e) => dbglog!("[subconscious] {} failed: {}", self.agents[i].name, e), } } - if had_finished { - self.save_state(); + + if !any_finished { return; } + + // Collect all nodes to inject under a single context lock + let mut nodes: Vec = Vec::new(); + + if let Some(surface_str) = self.state.get("surface").cloned() { + let existing: std::collections::HashSet = { + let ctx = agent.context.lock().await; + ctx.conversation().iter() + .filter_map(|n| n.leaf()) + .filter_map(|l| match l.body() { + NodeBody::Memory { key, .. } => Some(key.clone()), + _ => None, + }) + .collect() + }; + + let store = crate::store::Store::cached().await.ok(); + let store_guard = match &store { + Some(s) => Some(s.lock().await), + None => None, + }; + for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { + if existing.contains(key) { continue; } + if let Some(rendered) = store_guard.as_ref() + .and_then(|s| crate::cli::node::render_node(s, key)) + { + nodes.push(AstNode::memory( + key, + format!("--- {} (surfaced) ---\n{}", key, rendered), + )); + } + } } + + if let Some(reflection) = self.state.get("reflection").cloned() { + if !reflection.trim().is_empty() { + nodes.push(AstNode::dmn(format!( + "--- subconscious reflection ---\n{}", + reflection.trim(), + ))); + } + } + + if let Some(nudge) = self.state.get("thalamus").cloned() { + let nudge = nudge.trim(); + if !nudge.is_empty() && nudge != "ok" { + nodes.push(AstNode::dmn(format!("--- thalamus ---\n{}", nudge))); + } + } + + if !nodes.is_empty() { + let mut ctx = agent.context.lock().await; + for node in nodes { + ctx.push(Section::Conversation, node); + } + drop(ctx); + agent.state.lock().await.changed.notify_one(); + } + + self.save_state(); } /// Trigger subconscious agents that are due to run.