agent: don't hold agent lock across I/O

The agent lock was held for the entire duration of turn() — including
API streaming and tool dispatch awaits. This blocked the UI thread
whenever it needed the lock (render tick, compaction check, etc.),
causing 20+ second freezes.

Fix: turn() takes Arc<Mutex<Agent>> and manages locking internally.
Lock is held briefly for prepare/process phases, released during all
I/O (streaming, tool awaits, sleep retries). Also:

- check_compaction: spawns task instead of awaiting on event loop
- start_memory_scoring: already spawned, no change needed
- dispatch_tool_call_unlocked: drops lock before tool handle await
- Subconscious screen: renders all agents from state dynamically
  (no more hardcoded SUBCONSCIOUS_AGENTS list)
- Memory scoring shows n/m progress in snapshots

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
ProofOfConcept 2026-04-04 04:23:29 -04:00
parent 6fa881f811
commit fb54488f30
5 changed files with 301 additions and 269 deletions

View file

@ -19,6 +19,7 @@ pub mod parsing;
pub mod tools; pub mod tools;
pub mod training; pub mod training;
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use tiktoken_rs::CoreBPE; use tiktoken_rs::CoreBPE;
@ -60,6 +61,15 @@ struct DispatchState {
dmn_pause: bool, dmn_pause: bool,
} }
impl DispatchState {
fn new() -> Self {
Self {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
}
}
}
pub struct Agent { pub struct Agent {
client: ApiClient, client: ApiClient,
tool_defs: Vec<ToolDef>, tool_defs: Vec<ToolDef>,
@ -206,46 +216,43 @@ impl Agent {
/// Send a user message and run the agent loop until the model /// Send a user message and run the agent loop until the model
/// produces a text response (no more tool calls). Streams text /// produces a text response (no more tool calls). Streams text
/// and tool activity through the UI channel. /// and tool activity through the UI channel.
///
/// Takes Arc<Mutex<Agent>> and manages locking internally so the
/// lock is never held across I/O (API streaming, tool dispatch).
pub async fn turn( pub async fn turn(
&mut self, agent: Arc<tokio::sync::Mutex<Agent>>,
user_input: &str, user_input: &str,
ui_tx: &UiSender, ui_tx: &UiSender,
target: StreamTarget, target: StreamTarget,
) -> Result<TurnResult> { ) -> Result<TurnResult> {
// Run agent orchestration cycle (surface-observe, reflect, journal) // --- Pre-loop setup (lock 1): agent cycle, memories, user input ---
let cycle = self.run_agent_cycle(); let active_tools = {
let mut me = agent.lock().await;
// Surfaced memories — each as a separate Memory entry let cycle = me.run_agent_cycle();
for key in &cycle.surfaced_keys { for key in &cycle.surfaced_keys {
if let Some(rendered) = crate::cli::node::render_node( if let Some(rendered) = crate::cli::node::render_node(
&crate::store::Store::load().unwrap_or_default(), key, &crate::store::Store::load().unwrap_or_default(), key,
) { ) {
let mut msg = Message::user(format!( let mut msg = Message::user(format!(
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>", "<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
key, rendered, key, rendered,
)); ));
msg.stamp(); msg.stamp();
self.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg }); me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg });
}
}
if let Some(ref reflection) = cycle.reflection {
me.push_message(Message::user(format!(
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
reflection.trim(),
)));
} }
}
// Reflection — separate system reminder // Collect completed background tool calls
if let Some(ref reflection) = cycle.reflection { let mut bg_ds = DispatchState::new();
self.push_message(Message::user(format!(
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
reflection.trim(),
)));
}
// Inject completed background task results
// Collect completed background tool calls
{
let mut bg_ds = DispatchState {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
};
let finished: Vec<_> = { let finished: Vec<_> = {
let mut tools = self.active_tools.lock().unwrap(); let mut tools = me.active_tools.lock().unwrap();
let mut done = Vec::new(); let mut done = Vec::new();
let mut i = 0; let mut i = 0;
while i < tools.len() { while i < tools.len() {
@ -259,40 +266,40 @@ impl Agent {
}; };
for entry in finished { for entry in finished {
if let Ok((call, output)) = entry.handle.await { if let Ok((call, output)) = entry.handle.await {
self.apply_tool_result(&call, output, ui_tx, &mut bg_ds); me.apply_tool_result(&call, output, ui_tx, &mut bg_ds);
} }
} }
}
// User input — clean, just what was typed me.push_message(Message::user(user_input));
self.push_message(Message::user(user_input)); let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots()));
let _ = ui_tx.send(UiMessage::AgentUpdate(self.agent_cycles.snapshots()));
me.active_tools.clone()
};
// --- Lock released ---
let mut overflow_retries: u32 = 0; let mut overflow_retries: u32 = 0;
let mut empty_retries: u32 = 0; let mut empty_retries: u32 = 0;
let mut ds = DispatchState { let mut ds = DispatchState::new();
yield_requested: false,
had_tool_calls: false,
tool_errors: 0,
model_switch: None,
dmn_pause: false,
};
loop { loop {
let _ = ui_tx.send(UiMessage::Activity("thinking...".into())); let _ = ui_tx.send(UiMessage::Activity("thinking...".into()));
// Stream events from the API — we route each event to the // --- Lock 2: assemble messages, start stream ---
// appropriate UI pane rather than letting the API layer do it. let (mut rx, _stream_guard) = {
let api_messages = self.assemble_api_messages(); let me = agent.lock().await;
let (mut rx, _stream_guard) = self.client.start_stream( let api_messages = me.assemble_api_messages();
&api_messages, me.client.start_stream(
Some(&self.tool_defs), &api_messages,
ui_tx, Some(&me.tool_defs),
&self.reasoning_effort, ui_tx,
None, &me.reasoning_effort,
None, // priority: interactive None,
); None,
)
};
// --- Lock released ---
// --- Stream loop (no lock) ---
let mut content = String::new(); let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new(); let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None; let mut usage = None;
@ -301,8 +308,6 @@ impl Agent {
let mut tool_call_buf = String::new(); let mut tool_call_buf = String::new();
let mut stream_error = None; let mut stream_error = None;
let mut first_content = true; let mut first_content = true;
// Buffer for content not yet sent to UI — holds a tail
// that might be a partial <tool_call> tag.
let mut display_buf = String::new(); let mut display_buf = String::new();
while let Some(event) = rx.recv().await { while let Some(event) = rx.recv().await {
@ -316,7 +321,6 @@ impl Agent {
if in_tool_call { if in_tool_call {
tool_call_buf.push_str(&text); tool_call_buf.push_str(&text);
// Check for closing tag — parse and fire immediately
if let Some(end) = tool_call_buf.find("</tool_call>") { if let Some(end) = tool_call_buf.find("</tool_call>") {
let body = &tool_call_buf[..end]; let body = &tool_call_buf[..end];
if let Some(call) = crate::agent::parsing::parse_tool_call_body(body) { if let Some(call) = crate::agent::parsing::parse_tool_call_body(body) {
@ -336,8 +340,8 @@ impl Agent {
let output = tools::dispatch(&call.function.name, &args).await; let output = tools::dispatch(&call.function.name, &args).await;
(call, output) (call, output)
}); });
self.active_tools.lock().unwrap().push( active_tools.lock().unwrap().push(
crate::user::ui_channel::ActiveToolCall { tools::ActiveToolCall {
id: call_id, id: call_id,
name: call_name, name: call_name,
detail: args_summary, detail: args_summary,
@ -347,20 +351,16 @@ impl Agent {
} }
); );
} }
// Reset for potential next tool call
let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string(); let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string();
tool_call_buf.clear(); tool_call_buf.clear();
in_tool_call = false; in_tool_call = false;
// Any content after </tool_call> goes back to display
if !remaining.trim().is_empty() { if !remaining.trim().is_empty() {
display_buf.push_str(&remaining); display_buf.push_str(&remaining);
} }
} }
} else { } else {
display_buf.push_str(&text); display_buf.push_str(&text);
if let Some(pos) = display_buf.find("<tool_call>") { if let Some(pos) = display_buf.find("<tool_call>") {
// Flush content before the tag, suppress the rest.
let before = &display_buf[..pos]; let before = &display_buf[..pos];
if !before.is_empty() { if !before.is_empty() {
let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target)); let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target));
@ -368,10 +368,7 @@ impl Agent {
display_buf.clear(); display_buf.clear();
in_tool_call = true; in_tool_call = true;
} else { } else {
// Flush display_buf except a tail that could be
// a partial "<tool_call>" (10 chars).
let safe = display_buf.len().saturating_sub(10); let safe = display_buf.len().saturating_sub(10);
// Find a char boundary at or before safe
let safe = display_buf.floor_char_boundary(safe); let safe = display_buf.floor_char_boundary(safe);
if safe > 0 { if safe > 0 {
let flush = display_buf[..safe].to_string(); let flush = display_buf[..safe].to_string();
@ -408,148 +405,162 @@ impl Agent {
} }
} }
} }
// --- Stream complete ---
// Handle stream errors with retry logic // --- Lock 3: process results ---
if let Some(e) = stream_error {
let err = anyhow::anyhow!("{}", e);
if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 {
overflow_retries += 1;
let _ = ui_tx.send(UiMessage::Info(format!(
"[context overflow — compacting and retrying ({}/2)]",
overflow_retries,
)));
self.compact();
continue;
}
if crate::agent::context::is_stream_error(&err) && empty_retries < 2 {
empty_retries += 1;
let _ = ui_tx.send(UiMessage::Info(format!(
"[stream error: {} — retrying ({}/2)]",
e, empty_retries,
)));
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
let _ = ui_tx.send(UiMessage::Activity(String::new()));
return Err(err);
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
let _ = ui_tx.send(UiMessage::Activity(String::new()));
return Err(anyhow::anyhow!("model stream error: {}", detail));
}
// Flush remaining display buffer (normal responses without tool calls).
if !in_tool_call && !display_buf.is_empty() {
let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target));
}
if !content.is_empty() && !in_tool_call {
let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target));
}
let msg = api::build_response_message(content, tool_calls);
if let Some(usage) = &usage {
self.last_prompt_tokens = usage.prompt_tokens;
self.publish_context_state();
let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: String::new(), // filled by main loop
dmn_turns: 0,
dmn_max_turns: 0,
prompt_tokens: usage.prompt_tokens,
completion_tokens: usage.completion_tokens,
model: self.client.model.clone(),
turn_tools: 0, // tracked by TUI from ToolCall messages
context_budget: self.budget().status_string(),
}));
}
// Empty response — model returned finish=stop with no content
// or tool calls. Inject a nudge so the retry has different input.
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty());
if !has_content && !has_tools {
if empty_retries < 2 {
empty_retries += 1;
let _ = ui_tx.send(UiMessage::Debug(format!(
"empty response, injecting nudge and retrying ({}/2)",
empty_retries,
)));
self.push_message(Message::user(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
continue;
}
// After max retries, fall through — return the empty response
} else {
empty_retries = 0;
}
// Collect non-background tool calls fired during streaming
{ {
let mut me = agent.lock().await;
// Handle stream errors with retry logic
if let Some(e) = stream_error {
let err = anyhow::anyhow!("{}", e);
if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 {
overflow_retries += 1;
let _ = ui_tx.send(UiMessage::Info(format!(
"[context overflow — compacting and retrying ({}/2)]",
overflow_retries,
)));
me.compact();
continue;
}
if crate::agent::context::is_stream_error(&err) && empty_retries < 2 {
empty_retries += 1;
let _ = ui_tx.send(UiMessage::Info(format!(
"[stream error: {} — retrying ({}/2)]",
e, empty_retries,
)));
drop(me);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
let _ = ui_tx.send(UiMessage::Activity(String::new()));
return Err(err);
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
let _ = ui_tx.send(UiMessage::Activity(String::new()));
return Err(anyhow::anyhow!("model stream error: {}", detail));
}
// Flush remaining display buffer
if !in_tool_call && !display_buf.is_empty() {
let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target));
}
if !content.is_empty() && !in_tool_call {
let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target));
}
let msg = api::build_response_message(content, tool_calls);
if let Some(usage) = &usage {
me.last_prompt_tokens = usage.prompt_tokens;
me.publish_context_state();
let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: String::new(),
dmn_turns: 0,
dmn_max_turns: 0,
prompt_tokens: usage.prompt_tokens,
completion_tokens: usage.completion_tokens,
model: me.client.model.clone(),
turn_tools: 0,
context_budget: me.budget().status_string(),
}));
}
// Empty response — nudge and retry
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty());
if !has_content && !has_tools {
if empty_retries < 2 {
empty_retries += 1;
let _ = ui_tx.send(UiMessage::Debug(format!(
"empty response, injecting nudge and retrying ({}/2)",
empty_retries,
)));
me.push_message(Message::user(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
continue;
}
} else {
empty_retries = 0;
}
// Collect non-background tool calls fired during streaming
let pending: Vec<_> = { let pending: Vec<_> = {
let mut tools = self.active_tools.lock().unwrap(); let mut tools_guard = active_tools.lock().unwrap();
let mut non_bg = Vec::new(); let mut non_bg = Vec::new();
let mut i = 0; let mut i = 0;
while i < tools.len() { while i < tools_guard.len() {
if !tools[i].background { if !tools_guard[i].background {
non_bg.push(tools.remove(i)); non_bg.push(tools_guard.remove(i));
} else { } else {
i += 1; i += 1;
} }
} }
non_bg non_bg
}; };
if !pending.is_empty() { if !pending.is_empty() {
self.push_message(msg.clone()); me.push_message(msg.clone());
// Drop lock before awaiting tool handles
drop(me);
let mut results = Vec::new();
for entry in pending { for entry in pending {
if let Ok((call, output)) = entry.handle.await { if let Ok(r) = entry.handle.await {
self.apply_tool_result(&call, output, ui_tx, &mut ds); results.push(r);
} }
} }
self.publish_context_state(); // Reacquire to apply results
continue; let mut me = agent.lock().await;
} for (call, output) in results {
} me.apply_tool_result(&call, output, ui_tx, &mut ds);
// Tool calls (structured API path — not fired during stream).
if let Some(ref tool_calls) = msg.tool_calls {
if !tool_calls.is_empty() {
self.push_message(msg.clone());
for call in tool_calls {
self.dispatch_tool_call(call, None, ui_tx, &mut ds)
.await;
} }
me.publish_context_state();
continue; continue;
} }
// Tool calls (structured API path)
if let Some(ref tool_calls) = msg.tool_calls {
if !tool_calls.is_empty() {
me.push_message(msg.clone());
let calls: Vec<ToolCall> = tool_calls.clone();
// Drop lock before tool dispatch
drop(me);
for call in &calls {
Agent::dispatch_tool_call_unlocked(
&agent, &active_tools, call, ui_tx, &mut ds,
).await;
}
continue;
}
}
// Genuinely text-only response
let text = msg.content_text().to_string();
let _ = ui_tx.send(UiMessage::Activity(String::new()));
me.push_message(msg);
return Ok(TurnResult {
text,
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
} }
// Genuinely text-only response
let text = msg.content_text().to_string();
let _ = ui_tx.send(UiMessage::Activity(String::new()));
self.push_message(msg);
return Ok(TurnResult {
text,
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
} }
} }
/// Dispatch a single tool call: send UI annotations, run the tool, /// Dispatch a tool call without holding the agent lock across I/O.
/// push results into the conversation, handle images. /// Used by `turn()` which manages its own locking.
async fn dispatch_tool_call( async fn dispatch_tool_call_unlocked(
&mut self, agent: &Arc<tokio::sync::Mutex<Agent>>,
active_tools: &crate::user::ui_channel::SharedActiveTools,
call: &ToolCall, call: &ToolCall,
tag: Option<&str>,
ui_tx: &UiSender, ui_tx: &UiSender,
ds: &mut DispatchState, ds: &mut DispatchState,
) { ) {
@ -557,38 +568,34 @@ impl Agent {
serde_json::from_str(&call.function.arguments).unwrap_or_default(); serde_json::from_str(&call.function.arguments).unwrap_or_default();
let args_summary = summarize_args(&call.function.name, &args); let args_summary = summarize_args(&call.function.name, &args);
let label = match tag { let _ = ui_tx.send(UiMessage::Activity(format!("calling: {}", call.function.name)));
Some(t) => format!("calling: {} ({})", call.function.name, t),
None => format!("calling: {}", call.function.name),
};
let _ = ui_tx.send(UiMessage::Activity(label));
let _ = ui_tx.send(UiMessage::ToolCall { let _ = ui_tx.send(UiMessage::ToolCall {
name: call.function.name.clone(), name: call.function.name.clone(),
args_summary: args_summary.clone(), args_summary: args_summary.clone(),
}); });
// Handle working_stack — needs &mut self, can't be spawned
// working_stack needs &mut Agent — brief lock
if call.function.name == "working_stack" { if call.function.name == "working_stack" {
let result = tools::working_stack::handle(&args, &mut self.context.working_stack); let mut me = agent.lock().await;
let result = tools::working_stack::handle(&args, &mut me.context.working_stack);
let output = tools::ToolOutput::text(result.clone()); let output = tools::ToolOutput::text(result.clone());
self.apply_tool_result(call, output, ui_tx, ds); me.apply_tool_result(call, output, ui_tx, ds);
if !result.starts_with("Error:") { if !result.starts_with("Error:") {
self.refresh_context_state(); me.refresh_context_state();
} }
return; return;
} }
// Spawn, push to active_tools, await handle // Spawn tool, track it
let call_id = call.id.clone(); let call_clone = call.clone();
let call_name = call.function.name.clone();
let call = call.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let output = tools::dispatch(&call.function.name, &args).await; let output = tools::dispatch(&call_clone.function.name, &args).await;
(call, output) (call_clone, output)
}); });
self.active_tools.lock().unwrap().push( active_tools.lock().unwrap().push(
tools::ActiveToolCall { tools::ActiveToolCall {
id: call_id, id: call.id.clone(),
name: call_name, name: call.function.name.clone(),
detail: args_summary, detail: args_summary,
started: std::time::Instant::now(), started: std::time::Instant::now(),
background: false, background: false,
@ -596,14 +603,15 @@ impl Agent {
} }
); );
// Wait for this non-background tool to complete // Pop it back and await — no agent lock held
let entry = { let entry = {
let mut tools = self.active_tools.lock().unwrap(); let mut tools = active_tools.lock().unwrap();
// It's the last one we pushed
tools.pop().unwrap() tools.pop().unwrap()
}; };
if let Ok((call, output)) = entry.handle.await { if let Ok((call, output)) = entry.handle.await {
self.apply_tool_result(&call, output, ui_tx, ds); // Brief lock to apply result
let mut me = agent.lock().await;
me.apply_tool_result(&call, output, ui_tx, ds);
} }
} }

View file

@ -127,8 +127,7 @@ impl Session {
let result_tx = self.turn_tx.clone(); let result_tx = self.turn_tx.clone();
self.turn_in_progress = true; self.turn_in_progress = true;
self.turn_handle = Some(tokio::spawn(async move { self.turn_handle = Some(tokio::spawn(async move {
let mut agent = agent.lock().await; let result = Agent::turn(agent, &input, &ui_tx, target).await;
let result = agent.turn(&input, &ui_tx, target).await;
let _ = result_tx.send((result, target)).await; let _ = result_tx.send((result, target)).await;
})); }));
} }
@ -209,40 +208,54 @@ impl Session {
} }
self.update_status(); self.update_status();
self.check_compaction().await; self.check_compaction();
self.maybe_start_memory_scoring().await; self.start_memory_scoring();
self.drain_pending(); self.drain_pending();
} }
/// Spawn incremental memory scoring if not already running. /// Spawn incremental memory scoring if not already running.
async fn maybe_start_memory_scoring(&mut self) { /// Non-blocking — all async work happens in the spawned task.
{ fn start_memory_scoring(&self) {
let agent = self.agent.lock().await;
if agent.agent_cycles.memory_scoring_in_flight {
return;
}
}
let (context, client, cursor) = {
let mut agent = self.agent.lock().await;
let cursor = agent.agent_cycles.memory_score_cursor;
agent.agent_cycles.memory_scoring_in_flight = true;
(agent.context.clone(), agent.client_clone(), cursor)
};
let agent = self.agent.clone(); let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone(); let ui_tx = self.ui_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
// Check + snapshot under one brief lock
let (context, client, cursor) = {
let mut agent = agent.lock().await;
if agent.agent_cycles.memory_scoring_in_flight {
return;
}
let cursor = agent.agent_cycles.memory_score_cursor;
agent.agent_cycles.memory_scoring_in_flight = true;
// Count total unique memories
let mut seen = std::collections::HashSet::new();
for entry in &agent.context.entries {
if let crate::agent::context::ConversationEntry::Memory { key, .. } = entry {
seen.insert(key.clone());
}
}
agent.agent_cycles.memory_total = seen.len();
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
(agent.context.clone(), agent.client_clone(), cursor)
};
// Lock released — event loop is free
let result = crate::agent::training::score_memories_incremental( let result = crate::agent::training::score_memories_incremental(
&context, cursor, &client, &ui_tx, &context, cursor, &client, &ui_tx,
).await; ).await;
let mut agent = agent.lock().await; // Brief lock — just update fields, no heavy work
agent.agent_cycles.memory_scoring_in_flight = false; {
match result { let mut agent = agent.lock().await;
Ok((new_cursor, scores)) => { agent.agent_cycles.memory_scoring_in_flight = false;
if let Ok((new_cursor, ref scores)) = result {
agent.agent_cycles.memory_score_cursor = new_cursor; agent.agent_cycles.memory_score_cursor = new_cursor;
agent.agent_cycles.memory_scores.extend(scores); agent.agent_cycles.memory_scores.extend(scores.clone());
}
}
// Snapshot and log outside the lock
match result {
Ok(_) => {
let agent = agent.lock().await;
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots())); let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
} }
Err(e) => { Err(e) => {
@ -255,23 +268,25 @@ impl Session {
} }
/// Check if compaction is needed after a turn. /// Check if compaction is needed after a turn.
async fn check_compaction(&mut self) { fn check_compaction(&self) {
let mut agent_guard = self.agent.lock().await;
let tokens = agent_guard.last_prompt_tokens();
let threshold = compaction_threshold(&self.config.app); let threshold = compaction_threshold(&self.config.app);
let agent = self.agent.clone();
if tokens > threshold { let ui_tx = self.ui_tx.clone();
let _ = self.ui_tx.send(UiMessage::Info(format!( tokio::spawn(async move {
"[compaction: {}K > {}K threshold]", let mut agent_guard = agent.lock().await;
tokens / 1000, let tokens = agent_guard.last_prompt_tokens();
threshold / 1000, if tokens > threshold {
))); let _ = ui_tx.send(UiMessage::Info(format!(
agent_guard.compact(); "[compaction: {}K > {}K threshold]",
let _ = self.ui_tx.send(UiMessage::Info( tokens / 1000,
"[compacted — journal + recent messages]".into(), threshold / 1000,
)); )));
self.send_context_info(); agent_guard.compact();
} let _ = ui_tx.send(UiMessage::Info(
"[compacted — journal + recent messages]".into(),
));
}
});
} }
/// Send any consolidated pending input as a single turn. /// Send any consolidated pending input as a single turn.
@ -791,6 +806,7 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
let mut session = Session::new(agent, config, ui_tx.clone(), turn_tx); let mut session = Session::new(agent, config, ui_tx.clone(), turn_tx);
session.update_status(); session.update_status();
session.start_memory_scoring(); // also sends initial agent snapshots
session.send_context_info(); session.send_context_info();
// Start observation socket // Start observation socket

View file

@ -110,6 +110,8 @@ pub struct AgentCycleState {
pub memory_scoring_in_flight: bool, pub memory_scoring_in_flight: bool,
/// Latest per-memory scores from incremental scoring. /// Latest per-memory scores from incremental scoring.
pub memory_scores: Vec<(String, f64)>, pub memory_scores: Vec<(String, f64)>,
/// Total unique memories in the context (updated when scoring starts).
pub memory_total: usize,
} }
const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"]; const AGENT_CYCLE_NAMES: &[&str] = &["surface-observe", "journal", "reflect"];
@ -139,6 +141,7 @@ impl AgentCycleState {
memory_score_cursor: 0, memory_score_cursor: 0,
memory_scoring_in_flight: false, memory_scoring_in_flight: false,
memory_scores: Vec::new(), memory_scores: Vec::new(),
memory_total: 0,
} }
} }
@ -189,11 +192,11 @@ impl AgentCycleState {
name: "memory-scoring".to_string(), name: "memory-scoring".to_string(),
pid: None, pid: None,
phase: if self.memory_scoring_in_flight { phase: if self.memory_scoring_in_flight {
Some(format!("scoring (cursor: {})", self.memory_score_cursor)) Some(format!("scoring {}/{}", self.memory_scores.len(), self.memory_total))
} else if self.memory_scores.is_empty() { } else if self.memory_scores.is_empty() {
None None
} else { } else {
Some(format!("{} memories scored", self.memory_scores.len())) Some(format!("{}/{} scored", self.memory_scores.len(), self.memory_total))
}, },
log_path: None, log_path: None,
}); });

View file

@ -433,7 +433,7 @@ impl App {
Screen::Subconscious => { Screen::Subconscious => {
match key.code { match key.code {
KeyCode::Up => { self.agent_selected = self.agent_selected.saturating_sub(1); self.debug_scroll = 0; return; } KeyCode::Up => { self.agent_selected = self.agent_selected.saturating_sub(1); self.debug_scroll = 0; return; }
KeyCode::Down => { self.agent_selected = (self.agent_selected + 1).min(SUBCONSCIOUS_AGENTS.len() - 1); self.debug_scroll = 0; return; } KeyCode::Down => { self.agent_selected = (self.agent_selected + 1).min(self.agent_state.len().saturating_sub(1)); self.debug_scroll = 0; return; }
KeyCode::Enter | KeyCode::Right => { self.agent_log_view = true; self.debug_scroll = 0; return; } KeyCode::Enter | KeyCode::Right => { self.agent_log_view = true; self.debug_scroll = 0; return; }
KeyCode::Left | KeyCode::Esc => { KeyCode::Left | KeyCode::Esc => {
if self.agent_log_view { self.agent_log_view = false; self.debug_scroll = 0; } if self.agent_log_view { self.agent_log_view = false; self.debug_scroll = 0; }

View file

@ -11,7 +11,7 @@ use ratatui::{
Frame, Frame,
}; };
use super::{App, SUBCONSCIOUS_AGENTS, SCREEN_LEGEND}; use super::{App, SCREEN_LEGEND};
impl App { impl App {
pub(crate) fn draw_agents(&self, frame: &mut Frame, size: Rect) { pub(crate) fn draw_agents(&self, frame: &mut Frame, size: Rect) {
@ -24,7 +24,6 @@ impl App {
let mut lines: Vec<Line> = Vec::new(); let mut lines: Vec<Line> = Vec::new();
let section = Style::default().fg(Color::Yellow); let section = Style::default().fg(Color::Yellow);
let _dim = Style::default().fg(Color::DarkGray);
let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC); let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC);
lines.push(Line::raw("")); lines.push(Line::raw(""));
@ -32,29 +31,35 @@ impl App {
lines.push(Line::styled(" (↑/↓ select, Enter/→ view log, Esc back)", hint)); lines.push(Line::styled(" (↑/↓ select, Enter/→ view log, Esc back)", hint));
lines.push(Line::raw("")); lines.push(Line::raw(""));
for (i, &name) in SUBCONSCIOUS_AGENTS.iter().enumerate() { for (i, agent) in self.agent_state.iter().enumerate() {
let selected = i == self.agent_selected; let selected = i == self.agent_selected;
let prefix = if selected { "" } else { " " }; let prefix = if selected { "" } else { " " };
let bg = if selected { Style::default().bg(Color::DarkGray) } else { Style::default() }; let bg = if selected { Style::default().bg(Color::DarkGray) } else { Style::default() };
let agent = self.agent_state.iter().find(|a| a.name == name); let status = match (&agent.pid, &agent.phase) {
(Some(pid), Some(phase)) => {
match agent.and_then(|a| a.pid) { vec![
Some(pid) => { Span::styled(format!("{}{:<20}", prefix, agent.name), bg.fg(Color::Green)),
let phase = agent.and_then(|a| a.phase.as_deref()).unwrap_or("?");
lines.push(Line::from(vec![
Span::styled(format!("{}{:<20}", prefix, name), bg.fg(Color::Green)),
Span::styled("", bg.fg(Color::Green)), Span::styled("", bg.fg(Color::Green)),
Span::styled(format!("pid {} phase: {}", pid, phase), bg), Span::styled(format!("pid {} {}", pid, phase), bg),
])); ]
} }
None => { (None, Some(phase)) => {
lines.push(Line::from(vec![ // No pid but has phase — async task (e.g. memory-scoring)
Span::styled(format!("{}{:<20}", prefix, name), bg.fg(Color::Gray)), vec![
Span::styled(format!("{}{:<20}", prefix, agent.name), bg.fg(Color::Cyan)),
Span::styled("", bg.fg(Color::Cyan)),
Span::styled(phase.clone(), bg),
]
}
_ => {
vec![
Span::styled(format!("{}{:<20}", prefix, agent.name), bg.fg(Color::Gray)),
Span::styled("○ idle", bg.fg(Color::DarkGray)), Span::styled("○ idle", bg.fg(Color::DarkGray)),
])); ]
} }
} };
lines.push(Line::from(status));
} }
let block = Block::default() let block = Block::default()
@ -70,8 +75,8 @@ impl App {
} }
fn draw_agent_log(&self, frame: &mut Frame, size: Rect, _output_dir: &std::path::Path) { fn draw_agent_log(&self, frame: &mut Frame, size: Rect, _output_dir: &std::path::Path) {
let name = SUBCONSCIOUS_AGENTS.get(self.agent_selected).unwrap_or(&"?"); let agent = self.agent_state.get(self.agent_selected);
let agent = self.agent_state.iter().find(|a| a.name == *name); let name = agent.map(|a| a.name.as_str()).unwrap_or("?");
let mut lines: Vec<Line> = Vec::new(); let mut lines: Vec<Line> = Vec::new();
let section = Style::default().fg(Color::Yellow); let section = Style::default().fg(Color::Yellow);
let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC); let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC);