diff --git a/src/agent/mod.rs b/src/agent/mod.rs index cbc5642..85a53dc 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -305,10 +305,9 @@ impl Agent { results.push((call, output)); } } - let mut me = agent.lock().await; let mut bg_ds = DispatchState::new(); for (call, output) in results { - me.apply_tool_result(&call, output, &mut bg_ds); + Agent::apply_tool_result(&agent, &call, output, &mut bg_ds).await; } } } @@ -320,26 +319,24 @@ impl Agent { loop { let _thinking = start_activity(&agent, "thinking...").await; - // Assemble prompt and start stream (brief lock) let (mut rx, _stream_guard) = { - let me = agent.lock().await; - let prompt_tokens = me.assemble_prompt_tokens(); - me.client.stream_completion( + let prompt_tokens = agent.assemble_prompt_tokens().await; + let st = agent.state.lock().await; + agent.client.stream_completion( &prompt_tokens, api::SamplingParams { - temperature: me.temperature, - top_p: me.top_p, - top_k: me.top_k, + temperature: st.temperature, + top_p: st.top_p, + top_k: st.top_k, }, None, ) }; - // Create assistant branch and parser (brief lock) let branch_idx = { - let mut me = agent.lock().await; - let idx = me.context.len(Section::Conversation); - me.context.push(Section::Conversation, + let mut ctx = agent.context.lock().await; + let idx = ctx.len(Section::Conversation); + ctx.push(Section::Conversation, AstNode::branch(Role::Assistant, vec![])); idx }; @@ -353,10 +350,10 @@ impl Agent { match event { api::StreamToken::Token { text, id: _ } => { had_content = true; - let mut me = agent.lock().await; - let calls = parser.feed(&text, &mut me.context); + let mut ctx = agent.context.lock().await; + let calls = parser.feed(&text, &mut ctx); + drop(ctx); for call in calls { - // Dispatch tool call immediately let call_clone = call.clone(); let agent_handle = agent.clone(); let handle = tokio::spawn(async move { @@ -384,7 +381,7 @@ impl Agent { } api::StreamToken::Done { usage } => { if let Some(u) = usage { - agent.lock().await.last_prompt_tokens = u.prompt_tokens; + agent.state.lock().await.last_prompt_tokens = u.prompt_tokens; } break; } @@ -392,25 +389,20 @@ impl Agent { } // Flush parser remainder - { - let mut me = agent.lock().await; - parser.finish(&mut me.context); - } + parser.finish(&mut *agent.context.lock().await); // Handle errors if let Some(e) = stream_error { let err = anyhow::anyhow!("{}", e); - let mut me = agent.lock().await; if context::is_context_overflow(&err) && overflow_retries < 2 { overflow_retries += 1; - me.notify(format!("context overflow — retrying ({}/2)", overflow_retries)); - me.compact(); + agent.state.lock().await.notify(format!("context overflow — retrying ({}/2)", overflow_retries)); + agent.compact().await; continue; } if context::is_stream_error(&err) && empty_retries < 2 { empty_retries += 1; - me.notify(format!("stream error — retrying ({}/2)", empty_retries)); - drop(me); + agent.state.lock().await.notify(format!("stream error — retrying ({}/2)", empty_retries)); tokio::time::sleep(std::time::Duration::from_secs(2)).await; continue; } @@ -421,8 +413,7 @@ impl Agent { if !had_content && pending_calls.is_empty() { if empty_retries < 2 { empty_retries += 1; - let mut me = agent.lock().await; - me.push_node(AstNode::user_msg( + agent.push_node(AstNode::user_msg( "[system] Your previous response was empty. \ Please respond with text or use a tool." )); @@ -452,8 +443,7 @@ impl Agent { for entry in handles { if let Ok((call, output)) = entry.handle.await { - let mut me = agent.lock().await; - me.apply_tool_result(&call, output, &mut ds); + Agent::apply_tool_result(&agent, &call, output, &mut ds).await; } } continue; @@ -461,8 +451,8 @@ impl Agent { // Text-only response — extract text and return let text = { - let me = agent.lock().await; - let children = me.context.conversation()[branch_idx].children(); + let ctx = agent.context.lock().await; + let children = ctx.conversation()[branch_idx].children(); children.iter() .filter_map(|c| c.leaf()) .filter(|l| matches!(l.body(), NodeBody::Content(_))) @@ -471,10 +461,10 @@ impl Agent { .join("") }; - let mut me = agent.lock().await; - if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; } - if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); } - if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; } + let mut st = agent.state.lock().await; + if st.pending_yield { ds.yield_requested = true; st.pending_yield = false; } + if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); } + if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; } return Ok(TurnResult { text, @@ -487,56 +477,8 @@ impl Agent { } } - /// Dispatch a tool call without holding the agent lock across I/O. - async fn dispatch_tool_call_unlocked( - agent: &Arc>, - active_tools: &tools::SharedActiveTools, - call: &PendingToolCall, - ds: &mut DispatchState, - ) { - let args: serde_json::Value = match serde_json::from_str(&call.arguments) { - Ok(v) => v, - Err(e) => { - let err = format!("Error: malformed tool call arguments: {e}"); - let _act = start_activity(agent, format!("rejected: {} (bad args)", call.name)).await; - let mut me = agent.lock().await; - me.apply_tool_result(call, err, ds); - return; - } - }; - - let args_summary = summarize_args(&call.name, &args); - let _calling = start_activity(agent, format!("calling: {}", call.name)).await; - - let call_clone = call.clone(); - let agent_handle = agent.clone(); - let handle = tokio::spawn(async move { - let output = tools::dispatch_with_agent(&call_clone.name, &args, Some(agent_handle)).await; - (call_clone, output) - }); - active_tools.lock().unwrap().push( - tools::ActiveToolCall { - id: call.id.clone(), - name: call.name.clone(), - detail: args_summary, - started: std::time::Instant::now(), - background: false, - handle, - } - ); - - let entry = { - let mut tools = active_tools.lock().unwrap(); - tools.pop().unwrap() - }; - if let Ok((call, output)) = entry.handle.await { - let mut me = agent.lock().await; - me.apply_tool_result(&call, output, ds); - } - } - - fn apply_tool_result( - &mut self, + async fn apply_tool_result( + agent: &Arc, call: &PendingToolCall, output: String, ds: &mut DispatchState, @@ -546,35 +488,31 @@ impl Agent { ds.tool_errors += 1; } - self.active_tools.lock().unwrap().retain(|t| t.id != call.id); + agent.state.lock().await.active_tools.lock().unwrap().retain(|t| t.id != call.id); - // Tag memory_render results as Memory nodes for context deduplication if call.name == "memory_render" && !output.starts_with("Error:") { let args: serde_json::Value = serde_json::from_str(&call.arguments).unwrap_or_default(); if let Some(key) = args.get("key").and_then(|v| v.as_str()) { - self.push_node(AstNode::memory(key, &output)); + agent.push_node(AstNode::memory(key, &output)).await; return; } } - self.push_node(AstNode::tool_result(&output)); + agent.push_node(AstNode::tool_result(&output)).await; } - pub fn conversation_from(&self, from: usize) -> &[AstNode] { - let conv = self.context.conversation(); - if from < conv.len() { &conv[from..] } else { &[] } - } + async fn load_startup_journal(&self) { + let oldest_msg_ts = { + let st = self.state.lock().await; + st.conversation_log.as_ref().and_then(|log| log.oldest_timestamp()) + }; - fn load_startup_journal(&mut self) { let store = match crate::store::Store::load() { Ok(s) => s, Err(_) => return, }; - let oldest_msg_ts = self.conversation_log.as_ref() - .and_then(|log| log.oldest_timestamp()); - let mut journal_nodes: Vec<_> = store.nodes.values() .filter(|n| n.node_type == crate::store::NodeType::EpisodicSession) .collect(); @@ -613,25 +551,22 @@ impl Agent { if entries.is_empty() { return; } - self.context.clear(Section::Journal); + let mut ctx = self.context.lock().await; + ctx.clear(Section::Journal); for entry in entries { - self.context.push(Section::Journal, entry); + ctx.push(Section::Journal, entry); } } - pub fn last_prompt_tokens(&self) -> u32 { - self.last_prompt_tokens - } - - /// Rebuild the context window: reload identity, trim, reload journal. - pub fn compact(&mut self) { + pub async fn compact(&self) { match crate::config::reload_for_model(&self.app_config, &self.prompt_file) { Ok((system_prompt, personality)) => { - self.context.clear(Section::System); - self.context.push(Section::System, AstNode::system_msg(&system_prompt)); - self.context.clear(Section::Identity); + let mut ctx = self.context.lock().await; + ctx.clear(Section::System); + ctx.push(Section::System, AstNode::system_msg(&system_prompt)); + ctx.clear(Section::Identity); for (name, content) in &personality { - self.context.push(Section::Identity, AstNode::memory(name, content)); + ctx.push(Section::Identity, AstNode::memory(name, content)); } } Err(e) => { @@ -639,46 +574,39 @@ impl Agent { } } - self.load_startup_journal(); + self.load_startup_journal().await; - // TODO: trim_entries — dedup memories, evict to budget - self.generation += 1; - self.last_prompt_tokens = 0; + let mut st = self.state.lock().await; + st.generation += 1; + st.last_prompt_tokens = 0; } - /// Restore from the conversation log. - /// Returns true if the log had content to restore. - pub fn restore_from_log(&mut self) -> bool { - let nodes = match &self.conversation_log { - Some(log) => match log.read_nodes(64 * 1024 * 1024) { - Ok(nodes) if !nodes.is_empty() => nodes, - _ => return false, - }, - None => return false, + pub async fn restore_from_log(&self) -> bool { + let nodes = { + let st = self.state.lock().await; + match &st.conversation_log { + Some(log) => match log.read_nodes(64 * 1024 * 1024) { + Ok(nodes) if !nodes.is_empty() => nodes, + _ => return false, + }, + None => return false, + } }; - self.context.clear(Section::Conversation); - for node in nodes { - self.context.push(Section::Conversation, node); + { + let mut ctx = self.context.lock().await; + ctx.clear(Section::Conversation); + for node in nodes { + ctx.push(Section::Conversation, node); + } } - self.compact(); - self.last_prompt_tokens = self.context.tokens() as u32; + self.compact().await; + let mut st = self.state.lock().await; + st.last_prompt_tokens = self.context.lock().await.tokens() as u32; true } - pub fn swap_client(&mut self, new_client: ApiClient) { - self.client = new_client; - } - pub fn model(&self) -> &str { &self.client.model } - - pub fn conversation(&self) -> &[AstNode] { - self.context.conversation() - } - - pub fn client_clone(&self) -> ApiClient { - self.client.clone() - } } diff --git a/src/agent/tools/mod.rs b/src/agent/tools/mod.rs index 6518eb4..acb8ae8 100644 --- a/src/agent/tools/mod.rs +++ b/src/agent/tools/mod.rs @@ -29,7 +29,7 @@ fn default_timeout() -> u64 { 120 } /// Async tool handler function. /// Agent is None when called from contexts without an agent (MCP server, subconscious). pub type ToolHandler = fn( - Option>>, + Option>, serde_json::Value, ) -> Pin> + Send>>; @@ -100,11 +100,10 @@ pub async fn dispatch( pub async fn dispatch_with_agent( name: &str, args: &serde_json::Value, - agent: Option>>, + agent: Option>, ) -> String { - // Look up in agent's tools if available, otherwise global let tool = if let Some(ref a) = agent { - let guard = a.lock().await; + let guard = a.state.lock().await; guard.tools.iter().find(|t| t.name == name).copied() } else { None