From fb54488f3022c0bcbe8ee1b96240507a09bbc2d0 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sat, 4 Apr 2026 04:23:29 -0400 Subject: [PATCH] agent: don't hold agent lock across I/O MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The agent lock was held for the entire duration of turn() — including API streaming and tool dispatch awaits. This blocked the UI thread whenever it needed the lock (render tick, compaction check, etc.), causing 20+ second freezes. Fix: turn() takes Arc> and manages locking internally. Lock is held briefly for prepare/process phases, released during all I/O (streaming, tool awaits, sleep retries). Also: - check_compaction: spawns task instead of awaiting on event loop - start_memory_scoring: already spawned, no change needed - dispatch_tool_call_unlocked: drops lock before tool handle await - Subconscious screen: renders all agents from state dynamically (no more hardcoded SUBCONSCIOUS_AGENTS list) - Memory scoring shows n/m progress in snapshots Co-Authored-By: Proof of Concept --- src/agent/mod.rs | 422 ++++++++++++++++--------------- src/mind/mod.rs | 96 ++++--- src/subconscious/subconscious.rs | 7 +- src/user/mod.rs | 2 +- src/user/subconscious.rs | 43 ++-- 5 files changed, 301 insertions(+), 269 deletions(-) diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 1e19acf..dc0c1cd 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -19,6 +19,7 @@ pub mod parsing; pub mod tools; pub mod training; +use std::sync::Arc; use anyhow::Result; use tiktoken_rs::CoreBPE; @@ -60,6 +61,15 @@ struct DispatchState { dmn_pause: bool, } +impl DispatchState { + fn new() -> Self { + Self { + yield_requested: false, had_tool_calls: false, + tool_errors: 0, model_switch: None, dmn_pause: false, + } + } +} + pub struct Agent { client: ApiClient, tool_defs: Vec, @@ -206,46 +216,43 @@ impl Agent { /// Send a user message and run the agent loop until the model /// produces a text response (no more tool calls). Streams text /// and tool activity through the UI channel. + /// + /// Takes Arc> and manages locking internally so the + /// lock is never held across I/O (API streaming, tool dispatch). pub async fn turn( - &mut self, + agent: Arc>, user_input: &str, ui_tx: &UiSender, target: StreamTarget, ) -> Result { - // Run agent orchestration cycle (surface-observe, reflect, journal) - let cycle = self.run_agent_cycle(); + // --- Pre-loop setup (lock 1): agent cycle, memories, user input --- + let active_tools = { + let mut me = agent.lock().await; - // Surfaced memories — each as a separate Memory entry - for key in &cycle.surfaced_keys { - if let Some(rendered) = crate::cli::node::render_node( - &crate::store::Store::load().unwrap_or_default(), key, - ) { - let mut msg = Message::user(format!( - "\n--- {} (surfaced) ---\n{}\n", - key, rendered, - )); - msg.stamp(); - self.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); + let cycle = me.run_agent_cycle(); + for key in &cycle.surfaced_keys { + if let Some(rendered) = crate::cli::node::render_node( + &crate::store::Store::load().unwrap_or_default(), key, + ) { + let mut msg = Message::user(format!( + "\n--- {} (surfaced) ---\n{}\n", + key, rendered, + )); + msg.stamp(); + me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); + } + } + if let Some(ref reflection) = cycle.reflection { + me.push_message(Message::user(format!( + "\n--- subconscious reflection ---\n{}\n", + reflection.trim(), + ))); } - } - // Reflection — separate system reminder - if let Some(ref reflection) = cycle.reflection { - self.push_message(Message::user(format!( - "\n--- subconscious reflection ---\n{}\n", - reflection.trim(), - ))); - } - - // Inject completed background task results - // Collect completed background tool calls - { - let mut bg_ds = DispatchState { - yield_requested: false, had_tool_calls: false, - tool_errors: 0, model_switch: None, dmn_pause: false, - }; + // Collect completed background tool calls + let mut bg_ds = DispatchState::new(); let finished: Vec<_> = { - let mut tools = self.active_tools.lock().unwrap(); + let mut tools = me.active_tools.lock().unwrap(); let mut done = Vec::new(); let mut i = 0; while i < tools.len() { @@ -259,40 +266,40 @@ impl Agent { }; for entry in finished { if let Ok((call, output)) = entry.handle.await { - self.apply_tool_result(&call, output, ui_tx, &mut bg_ds); + me.apply_tool_result(&call, output, ui_tx, &mut bg_ds); } } - } - // User input — clean, just what was typed - self.push_message(Message::user(user_input)); - let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots())); + me.push_message(Message::user(user_input)); + let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots())); + + me.active_tools.clone() + }; + // --- Lock released --- let mut overflow_retries: u32 = 0; let mut empty_retries: u32 = 0; - let mut ds = DispatchState { - yield_requested: false, - had_tool_calls: false, - tool_errors: 0, - model_switch: None, - dmn_pause: false, - }; + let mut ds = DispatchState::new(); loop { let _ = ui_tx.send(UiMessage::Activity("thinking...".into())); - // Stream events from the API — we route each event to the - // appropriate UI pane rather than letting the API layer do it. - let api_messages = self.assemble_api_messages(); - let (mut rx, _stream_guard) = self.client.start_stream( - &api_messages, - Some(&self.tool_defs), - ui_tx, - &self.reasoning_effort, - None, - None, // priority: interactive - ); + // --- Lock 2: assemble messages, start stream --- + let (mut rx, _stream_guard) = { + let me = agent.lock().await; + let api_messages = me.assemble_api_messages(); + me.client.start_stream( + &api_messages, + Some(&me.tool_defs), + ui_tx, + &me.reasoning_effort, + None, + None, + ) + }; + // --- Lock released --- + // --- Stream loop (no lock) --- let mut content = String::new(); let mut tool_calls: Vec = Vec::new(); let mut usage = None; @@ -301,8 +308,6 @@ impl Agent { let mut tool_call_buf = String::new(); let mut stream_error = None; let mut first_content = true; - // Buffer for content not yet sent to UI — holds a tail - // that might be a partial tag. let mut display_buf = String::new(); while let Some(event) = rx.recv().await { @@ -316,7 +321,6 @@ impl Agent { if in_tool_call { tool_call_buf.push_str(&text); - // Check for closing tag — parse and fire immediately if let Some(end) = tool_call_buf.find("") { let body = &tool_call_buf[..end]; if let Some(call) = crate::agent::parsing::parse_tool_call_body(body) { @@ -336,8 +340,8 @@ impl Agent { let output = tools::dispatch(&call.function.name, &args).await; (call, output) }); - self.active_tools.lock().unwrap().push( - crate::user::ui_channel::ActiveToolCall { + active_tools.lock().unwrap().push( + tools::ActiveToolCall { id: call_id, name: call_name, detail: args_summary, @@ -347,20 +351,16 @@ impl Agent { } ); } - // Reset for potential next tool call let remaining = tool_call_buf[end + "".len()..].to_string(); tool_call_buf.clear(); in_tool_call = false; - // Any content after goes back to display if !remaining.trim().is_empty() { display_buf.push_str(&remaining); } } } else { display_buf.push_str(&text); - if let Some(pos) = display_buf.find("") { - // Flush content before the tag, suppress the rest. let before = &display_buf[..pos]; if !before.is_empty() { let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target)); @@ -368,10 +368,7 @@ impl Agent { display_buf.clear(); in_tool_call = true; } else { - // Flush display_buf except a tail that could be - // a partial "" (10 chars). let safe = display_buf.len().saturating_sub(10); - // Find a char boundary at or before safe let safe = display_buf.floor_char_boundary(safe); if safe > 0 { let flush = display_buf[..safe].to_string(); @@ -408,148 +405,162 @@ impl Agent { } } } + // --- Stream complete --- - // Handle stream errors with retry logic - if let Some(e) = stream_error { - let err = anyhow::anyhow!("{}", e); - if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { - overflow_retries += 1; - let _ = ui_tx.send(UiMessage::Info(format!( - "[context overflow — compacting and retrying ({}/2)]", - overflow_retries, - ))); - self.compact(); - continue; - } - if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { - empty_retries += 1; - let _ = ui_tx.send(UiMessage::Info(format!( - "[stream error: {} — retrying ({}/2)]", - e, empty_retries, - ))); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - continue; - } - let _ = ui_tx.send(UiMessage::Activity(String::new())); - return Err(err); - } - - if finish_reason.as_deref() == Some("error") { - let detail = if content.is_empty() { "no details".into() } else { content }; - let _ = ui_tx.send(UiMessage::Activity(String::new())); - return Err(anyhow::anyhow!("model stream error: {}", detail)); - } - - // Flush remaining display buffer (normal responses without tool calls). - if !in_tool_call && !display_buf.is_empty() { - let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target)); - } - if !content.is_empty() && !in_tool_call { - let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); - } - - let msg = api::build_response_message(content, tool_calls); - - if let Some(usage) = &usage { - self.last_prompt_tokens = usage.prompt_tokens; - - self.publish_context_state(); - let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo { - dmn_state: String::new(), // filled by main loop - dmn_turns: 0, - dmn_max_turns: 0, - prompt_tokens: usage.prompt_tokens, - completion_tokens: usage.completion_tokens, - model: self.client.model.clone(), - turn_tools: 0, // tracked by TUI from ToolCall messages - context_budget: self.budget().status_string(), - })); - } - - // Empty response — model returned finish=stop with no content - // or tool calls. Inject a nudge so the retry has different input. - let has_content = msg.content.is_some(); - let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty()); - if !has_content && !has_tools { - if empty_retries < 2 { - empty_retries += 1; - let _ = ui_tx.send(UiMessage::Debug(format!( - "empty response, injecting nudge and retrying ({}/2)", - empty_retries, - ))); - self.push_message(Message::user( - "[system] Your previous response was empty. \ - Please respond with text or use a tool." - )); - continue; - } - // After max retries, fall through — return the empty response - } else { - empty_retries = 0; - } - - // Collect non-background tool calls fired during streaming + // --- Lock 3: process results --- { + let mut me = agent.lock().await; + + // Handle stream errors with retry logic + if let Some(e) = stream_error { + let err = anyhow::anyhow!("{}", e); + if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { + overflow_retries += 1; + let _ = ui_tx.send(UiMessage::Info(format!( + "[context overflow — compacting and retrying ({}/2)]", + overflow_retries, + ))); + me.compact(); + continue; + } + if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { + empty_retries += 1; + let _ = ui_tx.send(UiMessage::Info(format!( + "[stream error: {} — retrying ({}/2)]", + e, empty_retries, + ))); + drop(me); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + let _ = ui_tx.send(UiMessage::Activity(String::new())); + return Err(err); + } + + if finish_reason.as_deref() == Some("error") { + let detail = if content.is_empty() { "no details".into() } else { content }; + let _ = ui_tx.send(UiMessage::Activity(String::new())); + return Err(anyhow::anyhow!("model stream error: {}", detail)); + } + + // Flush remaining display buffer + if !in_tool_call && !display_buf.is_empty() { + let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target)); + } + if !content.is_empty() && !in_tool_call { + let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); + } + + let msg = api::build_response_message(content, tool_calls); + + if let Some(usage) = &usage { + me.last_prompt_tokens = usage.prompt_tokens; + me.publish_context_state(); + let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo { + dmn_state: String::new(), + dmn_turns: 0, + dmn_max_turns: 0, + prompt_tokens: usage.prompt_tokens, + completion_tokens: usage.completion_tokens, + model: me.client.model.clone(), + turn_tools: 0, + context_budget: me.budget().status_string(), + })); + } + + // Empty response — nudge and retry + let has_content = msg.content.is_some(); + let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty()); + if !has_content && !has_tools { + if empty_retries < 2 { + empty_retries += 1; + let _ = ui_tx.send(UiMessage::Debug(format!( + "empty response, injecting nudge and retrying ({}/2)", + empty_retries, + ))); + me.push_message(Message::user( + "[system] Your previous response was empty. \ + Please respond with text or use a tool." + )); + continue; + } + } else { + empty_retries = 0; + } + + // Collect non-background tool calls fired during streaming let pending: Vec<_> = { - let mut tools = self.active_tools.lock().unwrap(); + let mut tools_guard = active_tools.lock().unwrap(); let mut non_bg = Vec::new(); let mut i = 0; - while i < tools.len() { - if !tools[i].background { - non_bg.push(tools.remove(i)); + while i < tools_guard.len() { + if !tools_guard[i].background { + non_bg.push(tools_guard.remove(i)); } else { i += 1; } } non_bg }; + if !pending.is_empty() { - self.push_message(msg.clone()); + me.push_message(msg.clone()); + // Drop lock before awaiting tool handles + drop(me); + let mut results = Vec::new(); for entry in pending { - if let Ok((call, output)) = entry.handle.await { - self.apply_tool_result(&call, output, ui_tx, &mut ds); + if let Ok(r) = entry.handle.await { + results.push(r); } } - self.publish_context_state(); - continue; - } - } - - // Tool calls (structured API path — not fired during stream). - if let Some(ref tool_calls) = msg.tool_calls { - if !tool_calls.is_empty() { - self.push_message(msg.clone()); - - for call in tool_calls { - self.dispatch_tool_call(call, None, ui_tx, &mut ds) - .await; + // Reacquire to apply results + let mut me = agent.lock().await; + for (call, output) in results { + me.apply_tool_result(&call, output, ui_tx, &mut ds); } + me.publish_context_state(); continue; } + + // Tool calls (structured API path) + if let Some(ref tool_calls) = msg.tool_calls { + if !tool_calls.is_empty() { + me.push_message(msg.clone()); + let calls: Vec = tool_calls.clone(); + // Drop lock before tool dispatch + drop(me); + for call in &calls { + Agent::dispatch_tool_call_unlocked( + &agent, &active_tools, call, ui_tx, &mut ds, + ).await; + } + continue; + } + } + + // Genuinely text-only response + let text = msg.content_text().to_string(); + let _ = ui_tx.send(UiMessage::Activity(String::new())); + me.push_message(msg); + + return Ok(TurnResult { + text, + yield_requested: ds.yield_requested, + had_tool_calls: ds.had_tool_calls, + tool_errors: ds.tool_errors, + model_switch: ds.model_switch, + dmn_pause: ds.dmn_pause, + }); } - - // Genuinely text-only response - let text = msg.content_text().to_string(); - let _ = ui_tx.send(UiMessage::Activity(String::new())); - self.push_message(msg); - - return Ok(TurnResult { - text, - yield_requested: ds.yield_requested, - had_tool_calls: ds.had_tool_calls, - tool_errors: ds.tool_errors, - model_switch: ds.model_switch, - dmn_pause: ds.dmn_pause, - }); } } - /// Dispatch a single tool call: send UI annotations, run the tool, - /// push results into the conversation, handle images. - async fn dispatch_tool_call( - &mut self, + /// Dispatch a tool call without holding the agent lock across I/O. + /// Used by `turn()` which manages its own locking. + async fn dispatch_tool_call_unlocked( + agent: &Arc>, + active_tools: &crate::user::ui_channel::SharedActiveTools, call: &ToolCall, - tag: Option<&str>, ui_tx: &UiSender, ds: &mut DispatchState, ) { @@ -557,38 +568,34 @@ impl Agent { serde_json::from_str(&call.function.arguments).unwrap_or_default(); let args_summary = summarize_args(&call.function.name, &args); - let label = match tag { - Some(t) => format!("calling: {} ({})", call.function.name, t), - None => format!("calling: {}", call.function.name), - }; - let _ = ui_tx.send(UiMessage::Activity(label)); + let _ = ui_tx.send(UiMessage::Activity(format!("calling: {}", call.function.name))); let _ = ui_tx.send(UiMessage::ToolCall { name: call.function.name.clone(), args_summary: args_summary.clone(), }); - // Handle working_stack — needs &mut self, can't be spawned + + // working_stack needs &mut Agent — brief lock if call.function.name == "working_stack" { - let result = tools::working_stack::handle(&args, &mut self.context.working_stack); + let mut me = agent.lock().await; + let result = tools::working_stack::handle(&args, &mut me.context.working_stack); let output = tools::ToolOutput::text(result.clone()); - self.apply_tool_result(call, output, ui_tx, ds); + me.apply_tool_result(call, output, ui_tx, ds); if !result.starts_with("Error:") { - self.refresh_context_state(); + me.refresh_context_state(); } return; } - // Spawn, push to active_tools, await handle - let call_id = call.id.clone(); - let call_name = call.function.name.clone(); - let call = call.clone(); + // Spawn tool, track it + let call_clone = call.clone(); let handle = tokio::spawn(async move { - let output = tools::dispatch(&call.function.name, &args).await; - (call, output) + let output = tools::dispatch(&call_clone.function.name, &args).await; + (call_clone, output) }); - self.active_tools.lock().unwrap().push( + active_tools.lock().unwrap().push( tools::ActiveToolCall { - id: call_id, - name: call_name, + id: call.id.clone(), + name: call.function.name.clone(), detail: args_summary, started: std::time::Instant::now(), background: false, @@ -596,14 +603,15 @@ impl Agent { } ); - // Wait for this non-background tool to complete + // Pop it back and await — no agent lock held let entry = { - let mut tools = self.active_tools.lock().unwrap(); - // It's the last one we pushed + let mut tools = active_tools.lock().unwrap(); tools.pop().unwrap() }; if let Ok((call, output)) = entry.handle.await { - self.apply_tool_result(&call, output, ui_tx, ds); + // Brief lock to apply result + let mut me = agent.lock().await; + me.apply_tool_result(&call, output, ui_tx, ds); } } diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 4d05886..a89d2f0 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -127,8 +127,7 @@ impl Session { let result_tx = self.turn_tx.clone(); self.turn_in_progress = true; self.turn_handle = Some(tokio::spawn(async move { - let mut agent = agent.lock().await; - let result = agent.turn(&input, &ui_tx, target).await; + let result = Agent::turn(agent, &input, &ui_tx, target).await; let _ = result_tx.send((result, target)).await; })); } @@ -209,40 +208,54 @@ impl Session { } self.update_status(); - self.check_compaction().await; - self.maybe_start_memory_scoring().await; + self.check_compaction(); + self.start_memory_scoring(); self.drain_pending(); } /// Spawn incremental memory scoring if not already running. - async fn maybe_start_memory_scoring(&mut self) { - { - let agent = self.agent.lock().await; - if agent.agent_cycles.memory_scoring_in_flight { - return; - } - } - - let (context, client, cursor) = { - let mut agent = self.agent.lock().await; - let cursor = agent.agent_cycles.memory_score_cursor; - agent.agent_cycles.memory_scoring_in_flight = true; - (agent.context.clone(), agent.client_clone(), cursor) - }; - + /// Non-blocking — all async work happens in the spawned task. + fn start_memory_scoring(&self) { let agent = self.agent.clone(); let ui_tx = self.ui_tx.clone(); tokio::spawn(async move { + // Check + snapshot under one brief lock + let (context, client, cursor) = { + let mut agent = agent.lock().await; + if agent.agent_cycles.memory_scoring_in_flight { + return; + } + let cursor = agent.agent_cycles.memory_score_cursor; + agent.agent_cycles.memory_scoring_in_flight = true; + // Count total unique memories + let mut seen = std::collections::HashSet::new(); + for entry in &agent.context.entries { + if let crate::agent::context::ConversationEntry::Memory { key, .. } = entry { + seen.insert(key.clone()); + } + } + agent.agent_cycles.memory_total = seen.len(); + let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots())); + (agent.context.clone(), agent.client_clone(), cursor) + }; + // Lock released — event loop is free let result = crate::agent::training::score_memories_incremental( &context, cursor, &client, &ui_tx, ).await; - let mut agent = agent.lock().await; - agent.agent_cycles.memory_scoring_in_flight = false; - match result { - Ok((new_cursor, scores)) => { + // Brief lock — just update fields, no heavy work + { + let mut agent = agent.lock().await; + agent.agent_cycles.memory_scoring_in_flight = false; + if let Ok((new_cursor, ref scores)) = result { agent.agent_cycles.memory_score_cursor = new_cursor; - agent.agent_cycles.memory_scores.extend(scores); + agent.agent_cycles.memory_scores.extend(scores.clone()); + } + } + // Snapshot and log outside the lock + match result { + Ok(_) => { + let agent = agent.lock().await; let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots())); } Err(e) => { @@ -255,23 +268,25 @@ impl Session { } /// Check if compaction is needed after a turn. - async fn check_compaction(&mut self) { - let mut agent_guard = self.agent.lock().await; - let tokens = agent_guard.last_prompt_tokens(); + fn check_compaction(&self) { let threshold = compaction_threshold(&self.config.app); - - if tokens > threshold { - let _ = self.ui_tx.send(UiMessage::Info(format!( - "[compaction: {}K > {}K threshold]", - tokens / 1000, - threshold / 1000, - ))); - agent_guard.compact(); - let _ = self.ui_tx.send(UiMessage::Info( - "[compacted — journal + recent messages]".into(), - )); - self.send_context_info(); - } + let agent = self.agent.clone(); + let ui_tx = self.ui_tx.clone(); + tokio::spawn(async move { + let mut agent_guard = agent.lock().await; + let tokens = agent_guard.last_prompt_tokens(); + if tokens > threshold { + let _ = ui_tx.send(UiMessage::Info(format!( + "[compaction: {}K > {}K threshold]", + tokens / 1000, + threshold / 1000, + ))); + agent_guard.compact(); + let _ = ui_tx.send(UiMessage::Info( + "[compacted — journal + recent messages]".into(), + )); + } + }); } /// Send any consolidated pending input as a single turn. @@ -791,6 +806,7 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let mut session = Session::new(agent, config, ui_tx.clone(), turn_tx); session.update_status(); + session.start_memory_scoring(); // also sends initial agent snapshots session.send_context_info(); // Start observation socket diff --git a/src/subconscious/subconscious.rs b/src/subconscious/subconscious.rs index c63f634..c28bb7b 100644 --- a/src/subconscious/subconscious.rs +++ b/src/subconscious/subconscious.rs @@ -110,6 +110,8 @@ pub struct AgentCycleState { pub memory_scoring_in_flight: bool, /// Latest per-memory scores from incremental scoring. pub memory_scores: Vec<(String, f64)>, + /// Total unique memories in the context (updated when scoring starts). + pub memory_total: usize, } const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"]; @@ -139,6 +141,7 @@ impl AgentCycleState { memory_score_cursor: 0, memory_scoring_in_flight: false, memory_scores: Vec::new(), + memory_total: 0, } } @@ -189,11 +192,11 @@ impl AgentCycleState { name: "memory-scoring".to_string(), pid: None, phase: if self.memory_scoring_in_flight { - Some(format!("scoring (cursor: {})", self.memory_score_cursor)) + Some(format!("scoring {}/{}", self.memory_scores.len(), self.memory_total)) } else if self.memory_scores.is_empty() { None } else { - Some(format!("{} memories scored", self.memory_scores.len())) + Some(format!("{}/{} scored", self.memory_scores.len(), self.memory_total)) }, log_path: None, }); diff --git a/src/user/mod.rs b/src/user/mod.rs index 701cfd7..fc2f6dd 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -433,7 +433,7 @@ impl App { Screen::Subconscious => { match key.code { KeyCode::Up => { self.agent_selected = self.agent_selected.saturating_sub(1); self.debug_scroll = 0; return; } - KeyCode::Down => { self.agent_selected = (self.agent_selected + 1).min(SUBCONSCIOUS_AGENTS.len() - 1); self.debug_scroll = 0; return; } + KeyCode::Down => { self.agent_selected = (self.agent_selected + 1).min(self.agent_state.len().saturating_sub(1)); self.debug_scroll = 0; return; } KeyCode::Enter | KeyCode::Right => { self.agent_log_view = true; self.debug_scroll = 0; return; } KeyCode::Left | KeyCode::Esc => { if self.agent_log_view { self.agent_log_view = false; self.debug_scroll = 0; } diff --git a/src/user/subconscious.rs b/src/user/subconscious.rs index 21aa293..a7c8578 100644 --- a/src/user/subconscious.rs +++ b/src/user/subconscious.rs @@ -11,7 +11,7 @@ use ratatui::{ Frame, }; -use super::{App, SUBCONSCIOUS_AGENTS, SCREEN_LEGEND}; +use super::{App, SCREEN_LEGEND}; impl App { pub(crate) fn draw_agents(&self, frame: &mut Frame, size: Rect) { @@ -24,7 +24,6 @@ impl App { let mut lines: Vec = Vec::new(); let section = Style::default().fg(Color::Yellow); - let _dim = Style::default().fg(Color::DarkGray); let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC); lines.push(Line::raw("")); @@ -32,29 +31,35 @@ impl App { lines.push(Line::styled(" (↑/↓ select, Enter/→ view log, Esc back)", hint)); lines.push(Line::raw("")); - for (i, &name) in SUBCONSCIOUS_AGENTS.iter().enumerate() { + for (i, agent) in self.agent_state.iter().enumerate() { let selected = i == self.agent_selected; let prefix = if selected { "▸ " } else { " " }; let bg = if selected { Style::default().bg(Color::DarkGray) } else { Style::default() }; - let agent = self.agent_state.iter().find(|a| a.name == name); - - match agent.and_then(|a| a.pid) { - Some(pid) => { - let phase = agent.and_then(|a| a.phase.as_deref()).unwrap_or("?"); - lines.push(Line::from(vec![ - Span::styled(format!("{}{:<20}", prefix, name), bg.fg(Color::Green)), + let status = match (&agent.pid, &agent.phase) { + (Some(pid), Some(phase)) => { + vec![ + Span::styled(format!("{}{:<20}", prefix, agent.name), bg.fg(Color::Green)), Span::styled("● ", bg.fg(Color::Green)), - Span::styled(format!("pid {} phase: {}", pid, phase), bg), - ])); + Span::styled(format!("pid {} {}", pid, phase), bg), + ] } - None => { - lines.push(Line::from(vec![ - Span::styled(format!("{}{:<20}", prefix, name), bg.fg(Color::Gray)), + (None, Some(phase)) => { + // No pid but has phase — async task (e.g. memory-scoring) + vec![ + Span::styled(format!("{}{:<20}", prefix, agent.name), bg.fg(Color::Cyan)), + Span::styled("◆ ", bg.fg(Color::Cyan)), + Span::styled(phase.clone(), bg), + ] + } + _ => { + vec![ + Span::styled(format!("{}{:<20}", prefix, agent.name), bg.fg(Color::Gray)), Span::styled("○ idle", bg.fg(Color::DarkGray)), - ])); + ] } - } + }; + lines.push(Line::from(status)); } let block = Block::default() @@ -70,8 +75,8 @@ impl App { } fn draw_agent_log(&self, frame: &mut Frame, size: Rect, _output_dir: &std::path::Path) { - let name = SUBCONSCIOUS_AGENTS.get(self.agent_selected).unwrap_or(&"?"); - let agent = self.agent_state.iter().find(|a| a.name == *name); + let agent = self.agent_state.get(self.agent_selected); + let name = agent.map(|a| a.name.as_str()).unwrap_or("?"); let mut lines: Vec = Vec::new(); let section = Style::default().fg(Color::Yellow); let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC);