WIP: Agent/AgentState split — core methods migrated

turn(), push_node(), assemble_prompt_tokens(), compact(),
restore_from_log(), load_startup_journal(), apply_tool_result()
all use separate context/state locks. ToolHandler signature
updated to Arc<Agent>.

Remaining: tool handlers, control.rs, memory.rs, digest.rs,
and all outer callers (mind, user, learn, oneshot, dmn).

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-08 15:39:03 -04:00
parent 7fe4584ba0
commit e73135a8d0
2 changed files with 71 additions and 144 deletions

View file

@ -305,10 +305,9 @@ impl Agent {
results.push((call, output));
}
}
let mut me = agent.lock().await;
let mut bg_ds = DispatchState::new();
for (call, output) in results {
me.apply_tool_result(&call, output, &mut bg_ds);
Agent::apply_tool_result(&agent, &call, output, &mut bg_ds).await;
}
}
}
@ -320,26 +319,24 @@ impl Agent {
loop {
let _thinking = start_activity(&agent, "thinking...").await;
// Assemble prompt and start stream (brief lock)
let (mut rx, _stream_guard) = {
let me = agent.lock().await;
let prompt_tokens = me.assemble_prompt_tokens();
me.client.stream_completion(
let prompt_tokens = agent.assemble_prompt_tokens().await;
let st = agent.state.lock().await;
agent.client.stream_completion(
&prompt_tokens,
api::SamplingParams {
temperature: me.temperature,
top_p: me.top_p,
top_k: me.top_k,
temperature: st.temperature,
top_p: st.top_p,
top_k: st.top_k,
},
None,
)
};
// Create assistant branch and parser (brief lock)
let branch_idx = {
let mut me = agent.lock().await;
let idx = me.context.len(Section::Conversation);
me.context.push(Section::Conversation,
let mut ctx = agent.context.lock().await;
let idx = ctx.len(Section::Conversation);
ctx.push(Section::Conversation,
AstNode::branch(Role::Assistant, vec![]));
idx
};
@ -353,10 +350,10 @@ impl Agent {
match event {
api::StreamToken::Token { text, id: _ } => {
had_content = true;
let mut me = agent.lock().await;
let calls = parser.feed(&text, &mut me.context);
let mut ctx = agent.context.lock().await;
let calls = parser.feed(&text, &mut ctx);
drop(ctx);
for call in calls {
// Dispatch tool call immediately
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
@ -384,7 +381,7 @@ impl Agent {
}
api::StreamToken::Done { usage } => {
if let Some(u) = usage {
agent.lock().await.last_prompt_tokens = u.prompt_tokens;
agent.state.lock().await.last_prompt_tokens = u.prompt_tokens;
}
break;
}
@ -392,25 +389,20 @@ impl Agent {
}
// Flush parser remainder
{
let mut me = agent.lock().await;
parser.finish(&mut me.context);
}
parser.finish(&mut *agent.context.lock().await);
// Handle errors
if let Some(e) = stream_error {
let err = anyhow::anyhow!("{}", e);
let mut me = agent.lock().await;
if context::is_context_overflow(&err) && overflow_retries < 2 {
overflow_retries += 1;
me.notify(format!("context overflow — retrying ({}/2)", overflow_retries));
me.compact();
agent.state.lock().await.notify(format!("context overflow — retrying ({}/2)", overflow_retries));
agent.compact().await;
continue;
}
if context::is_stream_error(&err) && empty_retries < 2 {
empty_retries += 1;
me.notify(format!("stream error — retrying ({}/2)", empty_retries));
drop(me);
agent.state.lock().await.notify(format!("stream error — retrying ({}/2)", empty_retries));
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
@ -421,8 +413,7 @@ impl Agent {
if !had_content && pending_calls.is_empty() {
if empty_retries < 2 {
empty_retries += 1;
let mut me = agent.lock().await;
me.push_node(AstNode::user_msg(
agent.push_node(AstNode::user_msg(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
@ -452,8 +443,7 @@ impl Agent {
for entry in handles {
if let Ok((call, output)) = entry.handle.await {
let mut me = agent.lock().await;
me.apply_tool_result(&call, output, &mut ds);
Agent::apply_tool_result(&agent, &call, output, &mut ds).await;
}
}
continue;
@ -461,8 +451,8 @@ impl Agent {
// Text-only response — extract text and return
let text = {
let me = agent.lock().await;
let children = me.context.conversation()[branch_idx].children();
let ctx = agent.context.lock().await;
let children = ctx.conversation()[branch_idx].children();
children.iter()
.filter_map(|c| c.leaf())
.filter(|l| matches!(l.body(), NodeBody::Content(_)))
@ -471,10 +461,10 @@ impl Agent {
.join("")
};
let mut me = agent.lock().await;
if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; }
if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); }
if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; }
let mut st = agent.state.lock().await;
if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; }
if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); }
if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; }
return Ok(TurnResult {
text,
@ -487,56 +477,8 @@ impl Agent {
}
}
/// Dispatch a tool call without holding the agent lock across I/O.
async fn dispatch_tool_call_unlocked(
agent: &Arc<tokio::sync::Mutex<Agent>>,
active_tools: &tools::SharedActiveTools,
call: &PendingToolCall,
ds: &mut DispatchState,
) {
let args: serde_json::Value = match serde_json::from_str(&call.arguments) {
Ok(v) => v,
Err(e) => {
let err = format!("Error: malformed tool call arguments: {e}");
let _act = start_activity(agent, format!("rejected: {} (bad args)", call.name)).await;
let mut me = agent.lock().await;
me.apply_tool_result(call, err, ds);
return;
}
};
let args_summary = summarize_args(&call.name, &args);
let _calling = start_activity(agent, format!("calling: {}", call.name)).await;
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let output = tools::dispatch_with_agent(&call_clone.name, &args, Some(agent_handle)).await;
(call_clone, output)
});
active_tools.lock().unwrap().push(
tools::ActiveToolCall {
id: call.id.clone(),
name: call.name.clone(),
detail: args_summary,
started: std::time::Instant::now(),
background: false,
handle,
}
);
let entry = {
let mut tools = active_tools.lock().unwrap();
tools.pop().unwrap()
};
if let Ok((call, output)) = entry.handle.await {
let mut me = agent.lock().await;
me.apply_tool_result(&call, output, ds);
}
}
fn apply_tool_result(
&mut self,
async fn apply_tool_result(
agent: &Arc<Agent>,
call: &PendingToolCall,
output: String,
ds: &mut DispatchState,
@ -546,35 +488,31 @@ impl Agent {
ds.tool_errors += 1;
}
self.active_tools.lock().unwrap().retain(|t| t.id != call.id);
agent.state.lock().await.active_tools.lock().unwrap().retain(|t| t.id != call.id);
// Tag memory_render results as Memory nodes for context deduplication
if call.name == "memory_render" && !output.starts_with("Error:") {
let args: serde_json::Value =
serde_json::from_str(&call.arguments).unwrap_or_default();
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
self.push_node(AstNode::memory(key, &output));
agent.push_node(AstNode::memory(key, &output)).await;
return;
}
}
self.push_node(AstNode::tool_result(&output));
agent.push_node(AstNode::tool_result(&output)).await;
}
pub fn conversation_from(&self, from: usize) -> &[AstNode] {
let conv = self.context.conversation();
if from < conv.len() { &conv[from..] } else { &[] }
}
async fn load_startup_journal(&self) {
let oldest_msg_ts = {
let st = self.state.lock().await;
st.conversation_log.as_ref().and_then(|log| log.oldest_timestamp())
};
fn load_startup_journal(&mut self) {
let store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return,
};
let oldest_msg_ts = self.conversation_log.as_ref()
.and_then(|log| log.oldest_timestamp());
let mut journal_nodes: Vec<_> = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.collect();
@ -613,25 +551,22 @@ impl Agent {
if entries.is_empty() { return; }
self.context.clear(Section::Journal);
let mut ctx = self.context.lock().await;
ctx.clear(Section::Journal);
for entry in entries {
self.context.push(Section::Journal, entry);
ctx.push(Section::Journal, entry);
}
}
pub fn last_prompt_tokens(&self) -> u32 {
self.last_prompt_tokens
}
/// Rebuild the context window: reload identity, trim, reload journal.
pub fn compact(&mut self) {
pub async fn compact(&self) {
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
Ok((system_prompt, personality)) => {
self.context.clear(Section::System);
self.context.push(Section::System, AstNode::system_msg(&system_prompt));
self.context.clear(Section::Identity);
let mut ctx = self.context.lock().await;
ctx.clear(Section::System);
ctx.push(Section::System, AstNode::system_msg(&system_prompt));
ctx.clear(Section::Identity);
for (name, content) in &personality {
self.context.push(Section::Identity, AstNode::memory(name, content));
ctx.push(Section::Identity, AstNode::memory(name, content));
}
}
Err(e) => {
@ -639,46 +574,39 @@ impl Agent {
}
}
self.load_startup_journal();
self.load_startup_journal().await;
// TODO: trim_entries — dedup memories, evict to budget
self.generation += 1;
self.last_prompt_tokens = 0;
let mut st = self.state.lock().await;
st.generation += 1;
st.last_prompt_tokens = 0;
}
/// Restore from the conversation log.
/// Returns true if the log had content to restore.
pub fn restore_from_log(&mut self) -> bool {
let nodes = match &self.conversation_log {
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
Ok(nodes) if !nodes.is_empty() => nodes,
_ => return false,
},
None => return false,
pub async fn restore_from_log(&self) -> bool {
let nodes = {
let st = self.state.lock().await;
match &st.conversation_log {
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
Ok(nodes) if !nodes.is_empty() => nodes,
_ => return false,
},
None => return false,
}
};
self.context.clear(Section::Conversation);
for node in nodes {
self.context.push(Section::Conversation, node);
{
let mut ctx = self.context.lock().await;
ctx.clear(Section::Conversation);
for node in nodes {
ctx.push(Section::Conversation, node);
}
}
self.compact();
self.last_prompt_tokens = self.context.tokens() as u32;
self.compact().await;
let mut st = self.state.lock().await;
st.last_prompt_tokens = self.context.lock().await.tokens() as u32;
true
}
pub fn swap_client(&mut self, new_client: ApiClient) {
self.client = new_client;
}
pub fn model(&self) -> &str {
&self.client.model
}
pub fn conversation(&self) -> &[AstNode] {
self.context.conversation()
}
pub fn client_clone(&self) -> ApiClient {
self.client.clone()
}
}