Move conversation_log from AgentState to ContextState
The log records what goes into context, so it belongs under the context lock. push() now auto-logs conversation entries, eliminating all the manual lock-state-for-log, drop, lock-context-for-push dances. - ContextState: new conversation_log field, Clone impl drops it (forked contexts don't log) - push(): auto-logs Section::Conversation entries - push_node, apply_tool_results, collect_results: all simplified - collect_results: batch nodes under single context lock - Assistant response logged under context lock after parse completes Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
d82a2ae90d
commit
ddfdbe6cb1
4 changed files with 112 additions and 109 deletions
|
|
@ -99,12 +99,24 @@ pub enum AstNode {
|
||||||
/// The context window: four sections as Vec<AstNode>.
|
/// The context window: four sections as Vec<AstNode>.
|
||||||
/// All mutation goes through ContextState methods to maintain the invariant
|
/// All mutation goes through ContextState methods to maintain the invariant
|
||||||
/// that token_ids on every leaf matches its rendered text.
|
/// that token_ids on every leaf matches its rendered text.
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ContextState {
|
pub struct ContextState {
|
||||||
system: Vec<AstNode>,
|
system: Vec<AstNode>,
|
||||||
identity: Vec<AstNode>,
|
identity: Vec<AstNode>,
|
||||||
journal: Vec<AstNode>,
|
journal: Vec<AstNode>,
|
||||||
conversation: Vec<AstNode>,
|
conversation: Vec<AstNode>,
|
||||||
|
pub conversation_log: Option<crate::mind::log::ConversationLog>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for ContextState {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
system: self.system.clone(),
|
||||||
|
identity: self.identity.clone(),
|
||||||
|
journal: self.journal.clone(),
|
||||||
|
conversation: self.conversation.clone(),
|
||||||
|
conversation_log: None, // forked contexts don't log
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Identifies a section for mutation methods.
|
/// Identifies a section for mutation methods.
|
||||||
|
|
@ -698,6 +710,7 @@ impl ContextState {
|
||||||
identity: Vec::new(),
|
identity: Vec::new(),
|
||||||
journal: Vec::new(),
|
journal: Vec::new(),
|
||||||
conversation: Vec::new(),
|
conversation: Vec::new(),
|
||||||
|
conversation_log: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -753,6 +766,13 @@ impl ContextState {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push(&mut self, section: Section, node: AstNode) {
|
pub fn push(&mut self, section: Section, node: AstNode) {
|
||||||
|
if section == Section::Conversation {
|
||||||
|
if let Some(ref log) = self.conversation_log {
|
||||||
|
if let Err(e) = log.append_node(&node) {
|
||||||
|
eprintln!("warning: log: {:#}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
self.section_mut(section).push(node);
|
self.section_mut(section).push(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,6 @@ pub struct AgentState {
|
||||||
pub pending_model_switch: Option<String>,
|
pub pending_model_switch: Option<String>,
|
||||||
pub pending_dmn_pause: bool,
|
pub pending_dmn_pause: bool,
|
||||||
pub provenance: String,
|
pub provenance: String,
|
||||||
pub conversation_log: Option<ConversationLog>,
|
|
||||||
pub generation: u64,
|
pub generation: u64,
|
||||||
pub memory_scoring_in_flight: bool,
|
pub memory_scoring_in_flight: bool,
|
||||||
pub active_tools: tools::ActiveTools,
|
pub active_tools: tools::ActiveTools,
|
||||||
|
|
@ -172,6 +171,7 @@ impl Agent {
|
||||||
active_tools: tools::ActiveTools,
|
active_tools: tools::ActiveTools,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let mut context = ContextState::new();
|
let mut context = ContextState::new();
|
||||||
|
context.conversation_log = conversation_log;
|
||||||
context.push(Section::System, AstNode::system_msg(&system_prompt));
|
context.push(Section::System, AstNode::system_msg(&system_prompt));
|
||||||
|
|
||||||
let tool_defs: Vec<String> = tools::tools().iter()
|
let tool_defs: Vec<String> = tools::tools().iter()
|
||||||
|
|
@ -213,7 +213,6 @@ impl Agent {
|
||||||
pending_model_switch: None,
|
pending_model_switch: None,
|
||||||
pending_dmn_pause: false,
|
pending_dmn_pause: false,
|
||||||
provenance: "manual".to_string(),
|
provenance: "manual".to_string(),
|
||||||
conversation_log,
|
|
||||||
generation: 0,
|
generation: 0,
|
||||||
memory_scoring_in_flight: false,
|
memory_scoring_in_flight: false,
|
||||||
active_tools,
|
active_tools,
|
||||||
|
|
@ -249,7 +248,6 @@ impl Agent {
|
||||||
pending_model_switch: None,
|
pending_model_switch: None,
|
||||||
pending_dmn_pause: false,
|
pending_dmn_pause: false,
|
||||||
provenance: st.provenance.clone(),
|
provenance: st.provenance.clone(),
|
||||||
conversation_log: None,
|
|
||||||
generation: 0,
|
generation: 0,
|
||||||
memory_scoring_in_flight: false,
|
memory_scoring_in_flight: false,
|
||||||
active_tools: tools::ActiveTools::new(),
|
active_tools: tools::ActiveTools::new(),
|
||||||
|
|
@ -269,15 +267,8 @@ impl Agent {
|
||||||
|
|
||||||
pub async fn push_node(&self, node: AstNode) {
|
pub async fn push_node(&self, node: AstNode) {
|
||||||
let node = node.with_timestamp(chrono::Utc::now());
|
let node = node.with_timestamp(chrono::Utc::now());
|
||||||
let st = self.state.lock().await;
|
|
||||||
if let Some(ref log) = st.conversation_log {
|
|
||||||
if let Err(e) = log.append_node(&node) {
|
|
||||||
eprintln!("warning: failed to log entry: {:#}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
st.changed.notify_one();
|
|
||||||
drop(st);
|
|
||||||
self.context.lock().await.push(Section::Conversation, node);
|
self.context.lock().await.push(Section::Conversation, node);
|
||||||
|
self.state.lock().await.changed.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the agent turn loop: assemble prompt, stream response,
|
/// Run the agent turn loop: assemble prompt, stream response,
|
||||||
|
|
@ -375,11 +366,13 @@ impl Agent {
|
||||||
}
|
}
|
||||||
Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)),
|
Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)),
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
let node = agent.context.lock().await.conversation()[branch_idx].clone();
|
// Assistant response was pushed to context by the parser;
|
||||||
let st = agent.state.lock().await;
|
// log it now that parsing is complete.
|
||||||
if let Some(ref log) = st.conversation_log {
|
let ctx = agent.context.lock().await;
|
||||||
if let Err(e) = log.append_node(&node) {
|
if let Some(ref log) = ctx.conversation_log {
|
||||||
eprintln!("warning: failed to log assistant response: {:#}", e);
|
let node = &ctx.conversation()[branch_idx];
|
||||||
|
if let Err(e) = log.append_node(node) {
|
||||||
|
eprintln!("warning: log: {:#}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -469,19 +462,11 @@ impl Agent {
|
||||||
nodes.push(Self::make_tool_result_node(call, output));
|
nodes.push(Self::make_tool_result_node(call, output));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Single lock: remove from active, log, push to context
|
|
||||||
{
|
{
|
||||||
let mut st = agent.state.lock().await;
|
let mut st = agent.state.lock().await;
|
||||||
for (call, _) in &results {
|
for (call, _) in &results {
|
||||||
st.active_tools.remove(&call.id);
|
st.active_tools.remove(&call.id);
|
||||||
}
|
}
|
||||||
for node in &nodes {
|
|
||||||
if let Some(ref log) = st.conversation_log {
|
|
||||||
if let Err(e) = log.append_node(node) {
|
|
||||||
eprintln!("warning: failed to log entry: {:#}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let mut ctx = agent.context.lock().await;
|
let mut ctx = agent.context.lock().await;
|
||||||
|
|
@ -494,8 +479,8 @@ impl Agent {
|
||||||
|
|
||||||
async fn load_startup_journal(&self) {
|
async fn load_startup_journal(&self) {
|
||||||
let oldest_msg_ts = {
|
let oldest_msg_ts = {
|
||||||
let st = self.state.lock().await;
|
let ctx = self.context.lock().await;
|
||||||
st.conversation_log.as_ref().and_then(|log| log.oldest_timestamp())
|
ctx.conversation_log.as_ref().and_then(|log| log.oldest_timestamp())
|
||||||
};
|
};
|
||||||
|
|
||||||
let store = match crate::store::Store::load() {
|
let store = match crate::store::Store::load() {
|
||||||
|
|
@ -574,8 +559,8 @@ impl Agent {
|
||||||
|
|
||||||
pub async fn restore_from_log(&self) -> bool {
|
pub async fn restore_from_log(&self) -> bool {
|
||||||
let nodes = {
|
let nodes = {
|
||||||
let st = self.state.lock().await;
|
let ctx = self.context.lock().await;
|
||||||
match &st.conversation_log {
|
match &ctx.conversation_log {
|
||||||
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
|
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
|
||||||
Ok(nodes) if !nodes.is_empty() => nodes,
|
Ok(nodes) if !nodes.is_empty() => nodes,
|
||||||
_ => return false,
|
_ => return false,
|
||||||
|
|
|
||||||
|
|
@ -383,10 +383,10 @@ impl Mind {
|
||||||
{
|
{
|
||||||
let mut ctx = self.agent.context.lock().await;
|
let mut ctx = self.agent.context.lock().await;
|
||||||
ctx.clear(Section::Conversation);
|
ctx.clear(Section::Conversation);
|
||||||
|
ctx.conversation_log = new_log;
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let mut st = self.agent.state.lock().await;
|
let mut st = self.agent.state.lock().await;
|
||||||
st.conversation_log = new_log;
|
|
||||||
st.generation += 1;
|
st.generation += 1;
|
||||||
st.last_prompt_tokens = 0;
|
st.last_prompt_tokens = 0;
|
||||||
}
|
}
|
||||||
|
|
@ -488,9 +488,9 @@ impl Mind {
|
||||||
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None;
|
let mut sub_handle: Option<tokio::task::JoinHandle<()>> = None;
|
||||||
let mut unc_handle: Option<tokio::task::JoinHandle<()>> = None;
|
let mut unc_handle: Option<tokio::task::JoinHandle<()>> = None;
|
||||||
loop {
|
loop {
|
||||||
let (timeout, turn_active, has_input) = {
|
let (timeout, has_input) = {
|
||||||
let me = self.shared.lock().unwrap();
|
let me = self.shared.lock().unwrap();
|
||||||
(me.dmn.interval(), me.turn_active, me.has_pending_input())
|
(me.dmn.interval(), me.has_pending_input())
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut cmds = Vec::new();
|
let mut cmds = Vec::new();
|
||||||
|
|
|
||||||
|
|
@ -273,7 +273,7 @@ impl State {
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use crate::agent::{Agent, oneshot::{AutoAgent, AutoStep}};
|
use crate::agent::{Agent, oneshot::{AutoAgent, AutoStep}};
|
||||||
use crate::agent::context::{Ast, AstNode, NodeBody};
|
use crate::agent::context::{Ast, AstNode, NodeBody, Section};
|
||||||
use crate::subconscious::defs;
|
use crate::subconscious::defs;
|
||||||
|
|
||||||
/// Names and byte-interval triggers for the built-in subconscious agents.
|
/// Names and byte-interval triggers for the built-in subconscious agents.
|
||||||
|
|
@ -455,35 +455,35 @@ impl Subconscious {
|
||||||
|
|
||||||
/// Collect results from finished agents, inject outputs into the
|
/// Collect results from finished agents, inject outputs into the
|
||||||
/// conscious agent's context.
|
/// conscious agent's context.
|
||||||
|
/// Reap finished agents and inject their outputs into the conscious context.
|
||||||
pub async fn collect_results(&mut self, agent: &Arc<Agent>) {
|
pub async fn collect_results(&mut self, agent: &Arc<Agent>) {
|
||||||
let finished: Vec<(usize, tokio::task::JoinHandle<(AutoAgent, Result<String, String>)>)> =
|
let mut any_finished = false;
|
||||||
self.agents.iter_mut().enumerate().filter_map(|(i, sub)| {
|
|
||||||
if sub.handle.as_ref().is_some_and(|h| h.is_finished()) {
|
for i in 0..self.agents.len() {
|
||||||
sub.last_run = Some(Instant::now());
|
if !self.agents[i].handle.as_ref().is_some_and(|h| h.is_finished()) {
|
||||||
Some((i, sub.handle.take().unwrap()))
|
continue;
|
||||||
} else {
|
}
|
||||||
None
|
let handle = self.agents[i].handle.take().unwrap();
|
||||||
}
|
self.agents[i].last_run = Some(Instant::now());
|
||||||
}).collect();
|
any_finished = true;
|
||||||
let had_finished = !finished.is_empty();
|
|
||||||
|
|
||||||
for (idx, handle) in finished {
|
|
||||||
let (auto_back, result) = handle.await.unwrap_or_else(
|
let (auto_back, result) = handle.await.unwrap_or_else(
|
||||||
|e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0),
|
|e| (AutoAgent::new(String::new(), vec![], vec![], 0.0, 0),
|
||||||
Err(format!("task panicked: {}", e))));
|
Err(format!("task panicked: {}", e))));
|
||||||
self.agents[idx].auto = auto_back;
|
self.agents[i].auto = auto_back;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => {
|
Ok(_) => dbglog!("[subconscious] {} completed", self.agents[i].name),
|
||||||
let name = self.agents[idx].name.clone();
|
Err(e) => dbglog!("[subconscious] {} failed: {}", self.agents[i].name, e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !any_finished { return; }
|
||||||
|
|
||||||
|
// Collect all nodes to inject under a single context lock
|
||||||
|
let mut nodes: Vec<AstNode> = Vec::new();
|
||||||
|
|
||||||
// Check state for outputs (written by the output tool closure)
|
|
||||||
let has_outputs = self.state.contains_key("surface")
|
|
||||||
|| self.state.contains_key("reflection")
|
|
||||||
|| self.state.contains_key("thalamus");
|
|
||||||
if has_outputs {
|
|
||||||
if let Some(surface_str) = self.state.get("surface").cloned() {
|
if let Some(surface_str) = self.state.get("surface").cloned() {
|
||||||
// Collect keys already in context to avoid duplicates
|
|
||||||
let existing: std::collections::HashSet<String> = {
|
let existing: std::collections::HashSet<String> = {
|
||||||
let ctx = agent.context.lock().await;
|
let ctx = agent.context.lock().await;
|
||||||
ctx.conversation().iter()
|
ctx.conversation().iter()
|
||||||
|
|
@ -502,46 +502,44 @@ impl Subconscious {
|
||||||
};
|
};
|
||||||
for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) {
|
for key in surface_str.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) {
|
||||||
if existing.contains(key) { continue; }
|
if existing.contains(key) { continue; }
|
||||||
let rendered = store_guard.as_ref()
|
if let Some(rendered) = store_guard.as_ref()
|
||||||
.and_then(|s| crate::cli::node::render_node(s, key));
|
.and_then(|s| crate::cli::node::render_node(s, key))
|
||||||
if let Some(rendered) = rendered {
|
{
|
||||||
agent.push_node(AstNode::memory(
|
nodes.push(AstNode::memory(
|
||||||
key,
|
key,
|
||||||
format!("--- {} (surfaced) ---\n{}", key, rendered),
|
format!("--- {} (surfaced) ---\n{}", key, rendered),
|
||||||
)).await;
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(reflection) = self.state.get("reflection").cloned() {
|
if let Some(reflection) = self.state.get("reflection").cloned() {
|
||||||
if !reflection.trim().is_empty() {
|
if !reflection.trim().is_empty() {
|
||||||
agent.push_node(AstNode::dmn(format!(
|
nodes.push(AstNode::dmn(format!(
|
||||||
"--- subconscious reflection ---\n{}",
|
"--- subconscious reflection ---\n{}",
|
||||||
reflection.trim(),
|
reflection.trim(),
|
||||||
))).await;
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(nudge) = self.state.get("thalamus").cloned() {
|
if let Some(nudge) = self.state.get("thalamus").cloned() {
|
||||||
let nudge = nudge.trim();
|
let nudge = nudge.trim();
|
||||||
if !nudge.is_empty() && nudge != "ok" {
|
if !nudge.is_empty() && nudge != "ok" {
|
||||||
agent.push_node(AstNode::dmn(format!(
|
nodes.push(AstNode::dmn(format!("--- thalamus ---\n{}", nudge)));
|
||||||
"--- thalamus ---\n{}",
|
|
||||||
nudge,
|
|
||||||
))).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dbglog!("[subconscious] {} completed", name);
|
if !nodes.is_empty() {
|
||||||
|
let mut ctx = agent.context.lock().await;
|
||||||
|
for node in nodes {
|
||||||
|
ctx.push(Section::Conversation, node);
|
||||||
}
|
}
|
||||||
Err(e) => dbglog!("[subconscious] agent failed: {}", e),
|
drop(ctx);
|
||||||
|
agent.state.lock().await.changed.notify_one();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if had_finished {
|
|
||||||
self.save_state();
|
self.save_state();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// Trigger subconscious agents that are due to run.
|
/// Trigger subconscious agents that are due to run.
|
||||||
pub async fn trigger(&mut self, agent: &Arc<Agent>) {
|
pub async fn trigger(&mut self, agent: &Arc<Agent>) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue