From cfddb55ed95487e11f6d9c2cc9bf66024047517a Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sun, 5 Apr 2026 22:18:07 -0400 Subject: [PATCH] =?UTF-8?q?Kill=20TextDelta,=20Info=20=E2=80=94=20UiMessag?= =?UTF-8?q?e=20is=20dead.=20RAII=20ActivityGuards=20replace=20all=20status?= =?UTF-8?q?=20feedback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streaming text now goes directly to agent entries via append_streaming(). sync_from_agent diffs the growing entry each tick. The streaming entry is popped when the response completes; build_response_message pushes the final version. All status feedback uses RAII ActivityGuards: - push_activity() for long-running work (thinking, streaming, scoring) - notify() for instant feedback (compacted, DMN state changes, commands) - Guards auto-remove on Drop, appending "(complete)" and lingering 5s - expire_activities() cleans up timed-out notifications on render tick UiMessage enum reduced to a single Info variant with zero sends. The channel infrastructure remains for now (Mind/Agent still take UiSender in signatures) — mechanical cleanup for a follow-up. Co-Authored-By: Proof of Concept --- src/agent/api/mod.rs | 14 ++-- src/agent/api/openai.rs | 9 ++- src/agent/api/types.rs | 9 +++ src/agent/mod.rs | 143 ++++++++++++++++++++++++++++++-------- src/mind/mod.rs | 3 +- src/subconscious/learn.rs | 6 +- src/user/chat.rs | 120 +++++++++++--------------------- src/user/mod.rs | 80 ++++++--------------- src/user/ui_channel.rs | 3 - 9 files changed, 201 insertions(+), 186 deletions(-) diff --git a/src/agent/api/mod.rs b/src/agent/api/mod.rs index 33c8526..39f20e9 100644 --- a/src/agent/api/mod.rs +++ b/src/agent/api/mod.rs @@ -20,7 +20,7 @@ use tokio::sync::mpsc; use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall}; pub use types::ToolCall; -use crate::user::ui_channel::{UiMessage, UiSender, StreamTarget}; +use crate::user::ui_channel::UiSender; /// A JoinHandle that aborts its task when dropped. pub struct AbortOnDrop(tokio::task::JoinHandle<()>); @@ -130,7 +130,7 @@ impl ApiClient { &reasoning_effort, sampling, priority, ).await; if let Err(e) = result { - let _ = tx.send(StreamEvent::Error(e.to_string(); + let _ = tx.send(StreamEvent::Error(e.to_string())); } }); @@ -207,7 +207,6 @@ pub(crate) async fn send_and_check( body: &impl serde::Serialize, auth_header: (&str, &str), extra_headers: &[(&str, &str)], - ui_tx: &UiSender, debug_label: &str, request_json: Option<&str>, ) -> Result { @@ -619,8 +618,6 @@ pub struct StreamResult { /// - UI forwarding (text deltas, reasoning, tool call notifications) pub async fn collect_stream( rx: &mut mpsc::UnboundedReceiver, - ui_tx: &UiSender, - target: StreamTarget, agent: &std::sync::Arc>, active_tools: &crate::user::ui_channel::SharedActiveTools, ) -> StreamResult { @@ -633,12 +630,13 @@ pub async fn collect_stream( let mut error = None; let mut first_content = true; let mut display_buf = String::new(); + let mut _streaming_guard: Option = None; while let Some(event) = rx.recv().await { match event { StreamEvent::Content(text) => { if first_content { - if let Ok(mut ag) = agent.try_lock() { ag.activity = "streaming...".into(); } + _streaming_guard = Some(super::start_activity(agent, "streaming...").await); first_content = false; } content.push_str(&text); @@ -683,7 +681,7 @@ pub async fn collect_stream( if let Some(pos) = display_buf.find("") { let before = &display_buf[..pos]; if !before.is_empty() { - let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target)); + if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(before); } } display_buf.clear(); in_tool_call = true; @@ -693,7 +691,7 @@ pub async fn collect_stream( if safe > 0 { let flush = display_buf[..safe].to_string(); display_buf = display_buf[safe..].to_string(); - let _ = ui_tx.send(UiMessage::TextDelta(flush, target)); + if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(&flush); } } } } diff --git a/src/agent/api/openai.rs b/src/agent/api/openai.rs index df59d36..d10bb93 100644 --- a/src/agent/api/openai.rs +++ b/src/agent/api/openai.rs @@ -9,7 +9,7 @@ use reqwest::Client; use tokio::sync::mpsc; use super::types::*; -use crate::user::ui_channel::{UiMessage, UiSender}; +use crate::user::ui_channel::UiSender; use super::StreamEvent; /// Stream SSE events from an OpenAI-compatible endpoint, sending @@ -66,7 +66,6 @@ pub(super) async fn stream_events( &request, ("Authorization", &format!("Bearer {}", api_key)), &[], - ui_tx, &debug_label, request_json.as_deref(), ) @@ -105,7 +104,7 @@ pub(super) async fn stream_events( }; if let Some(ref u) = chunk.usage { - let _ = tx.send(StreamEvent::Usage(u.clone(); + let _ = tx.send(StreamEvent::Usage(u.clone())); usage = chunk.usage; } @@ -126,7 +125,7 @@ pub(super) async fn stream_events( reasoning_chars += r.len(); has_reasoning = true; if !r.is_empty() { - let _ = tx.send(StreamEvent::Reasoning(r.clone(); + let _ = tx.send(StreamEvent::Reasoning(r.clone())); } } if let Some(ref r) = choice.delta.reasoning_details { @@ -143,7 +142,7 @@ pub(super) async fn stream_events( first_content_at = Some(reader.stream_start.elapsed()); } content_len += text_delta.len(); - let _ = tx.send(StreamEvent::Content(text_delta.clone(); + let _ = tx.send(StreamEvent::Content(text_delta.clone())); } if let Some(ref tc_deltas) = choice.delta.tool_calls { diff --git a/src/agent/api/types.rs b/src/agent/api/types.rs index 6954c4a..a875b54 100644 --- a/src/agent/api/types.rs +++ b/src/agent/api/types.rs @@ -202,6 +202,15 @@ impl Message { self.content.as_ref().map_or("", |c| c.as_text()) } + /// Append text to existing content (for streaming). + pub fn append_content(&mut self, text: &str) { + match self.content { + Some(MessageContent::Text(ref mut s)) => s.push_str(text), + None => self.content = Some(MessageContent::Text(text.to_string())), + _ => {} // Parts — don't append to multimodal + } + } + pub fn role_str(&self) -> &str { match self.role { Role::System => "system", diff --git a/src/agent/mod.rs b/src/agent/mod.rs index e56652c..1758d0b 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -28,9 +28,82 @@ use context::{ConversationEntry, ContextState, ContextBudget}; use tools::{summarize_args, working_stack}; use crate::mind::log::ConversationLog; -use crate::user::ui_channel::{ContextSection, SharedContextState, StreamTarget, UiMessage, UiSender}; +use crate::user::ui_channel::{ContextSection, SharedContextState, StreamTarget, UiSender}; use crate::subconscious::learn; +// --- Activity tracking (RAII guards) --- + +pub struct ActivityEntry { + pub id: u64, + pub label: String, + pub started: std::time::Instant, + /// Auto-expires this long after creation (or completion). + pub expires_at: std::time::Instant, +} + +/// RAII guard — marks the activity "(complete)" on drop, starts expiry timer. +pub struct ActivityGuard { + agent: Arc>, + id: u64, +} + +const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5); + +impl Drop for ActivityGuard { + fn drop(&mut self) { + if let Ok(mut ag) = self.agent.try_lock() { + if let Some(entry) = ag.activities.iter_mut().find(|a| a.id == self.id) { + entry.label.push_str(" (complete)"); + entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER; + } + } + } +} + +impl Agent { + /// Register an activity, returns its ID. Caller creates the guard. + pub fn push_activity(&mut self, label: impl Into) -> u64 { + self.expire_activities(); + let id = self.next_activity_id; + self.next_activity_id += 1; + self.activities.push(ActivityEntry { + id, label: label.into(), + started: std::time::Instant::now(), + expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600), + }); + id + } + + /// Push a notification — auto-expires after 5 seconds. + pub fn notify(&mut self, label: impl Into) { + self.expire_activities(); + let id = self.next_activity_id; + self.next_activity_id += 1; + self.activities.push(ActivityEntry { + id, label: label.into(), + started: std::time::Instant::now(), + expires_at: std::time::Instant::now() + ACTIVITY_LINGER, + }); + } + + /// Remove expired activities. + pub fn expire_activities(&mut self) { + let now = std::time::Instant::now(); + self.activities.retain(|a| a.expires_at > now); + } +} + +/// Create an activity guard from outside the lock. +pub fn activity_guard(agent: &Arc>, id: u64) -> ActivityGuard { + ActivityGuard { agent: agent.clone(), id } +} + +/// Convenience: lock, push activity, unlock, return guard. +pub async fn start_activity(agent: &Arc>, label: impl Into) -> ActivityGuard { + let id = agent.lock().await.push_activity(label); + ActivityGuard { agent: agent.clone(), id } +} + /// Result of a single agent turn. pub struct TurnResult { /// The text response (already sent through UI channel). @@ -77,8 +150,9 @@ pub struct Agent { pub temperature: f32, pub top_p: f32, pub top_k: u32, - /// Live activity indicator — read by UI on render tick. - pub activity: String, + /// Active activities — RAII guards auto-remove on drop. + pub activities: Vec, + next_activity_id: u64, /// Control tool flags — set by tool handlers, consumed by turn loop. pub pending_yield: bool, pub pending_model_switch: Option, @@ -147,7 +221,8 @@ impl Agent { temperature: 0.6, top_p: 0.95, top_k: 20, - activity: String::new(), + activities: Vec::new(), + next_activity_id: 0, pending_yield: false, pending_model_switch: None, pending_dmn_pause: false, @@ -182,7 +257,7 @@ impl Agent { if !jnl.is_empty() { msgs.push(Message::user(jnl)); } - msgs.extend(self.context.entries.iter().map(|e| e.api_message().clone(); + msgs.extend(self.context.entries.iter().map(|e| e.api_message().clone())); msgs } @@ -218,9 +293,22 @@ impl Agent { self.context.entries.push(entry); } - /// Push a context-only message (system prompt, identity context, - /// journal summaries). Not logged — these are reconstructed on - /// every startup/compaction. + /// Append streaming text to the last entry (creating a partial + /// assistant entry if needed). Called by collect_stream per token batch. + pub fn append_streaming(&mut self, text: &str) { + if let Some(entry) = self.context.entries.last_mut() { + let msg = entry.message_mut(); + if msg.role == Role::Assistant { + msg.append_content(text); + return; + } + } + // No assistant entry yet — push a new partial one + self.context.entries.push(ConversationEntry::Message( + Message::assistant(text), + )); + } + pub fn budget(&self) -> ContextBudget { let count_str = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); let count_msg = |m: &Message| crate::agent::context::msg_token_count(&self.tokenizer, m); @@ -263,7 +351,7 @@ impl Agent { me.push_message(Message::user(format!( "\n--- subconscious reflection ---\n{}\n", reflection.trim(), - ); + ))); } // Collect completed background tool handles — remove from active list @@ -308,9 +396,9 @@ impl Agent { loop { // --- Lock 2: assemble messages, start stream --- + let _thinking = start_activity(&agent, "thinking...").await; let (mut rx, _stream_guard) = { - let mut me = agent.lock().await; - me.activity = "thinking...".into(); + let me = agent.lock().await; let api_messages = me.assemble_api_messages(); let sampling = api::SamplingParams { temperature: me.temperature, @@ -330,7 +418,7 @@ impl Agent { // --- Stream loop (no lock) --- let sr = api::collect_stream( - &mut rx, ui_tx, target, &agent, &active_tools, + &mut rx, &agent, &active_tools, ).await; let api::StreamResult { content, tool_calls, usage, finish_reason, @@ -347,39 +435,37 @@ impl Agent { let err = anyhow::anyhow!("{}", e); if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 { overflow_retries += 1; - let _ = ui_tx.send(UiMessage::Info(format!( - "[context overflow — compacting and retrying ({}/2)]", - overflow_retries, - ); + me.notify(format!("context overflow — retrying ({}/2)", overflow_retries)); me.compact(); continue; } if crate::agent::context::is_stream_error(&err) && empty_retries < 2 { empty_retries += 1; - let _ = ui_tx.send(UiMessage::Info(format!( - "[stream error: {} — retrying ({}/2)]", - e, empty_retries, - ); + me.notify(format!("stream error — retrying ({}/2)", empty_retries)); drop(me); tokio::time::sleep(std::time::Duration::from_secs(2)).await; continue; } - me.activity.clear(); return Err(err); } if finish_reason.as_deref() == Some("error") { let detail = if content.is_empty() { "no details".into() } else { content }; - me.activity.clear(); return Err(anyhow::anyhow!("model stream error: {}", detail)); } - // Flush remaining display buffer + // Flush remaining display buffer to streaming entry if !in_tool_call && !display_buf.is_empty() { - let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target)); + me.append_streaming(&display_buf); } - if !content.is_empty() && !in_tool_call { - let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target)); + + // Pop the streaming entry — the proper entry gets pushed below + // via build_response_message which handles tool calls, leaked + // tool calls, etc. sync_from_agent handles the swap. + if let Some(entry) = me.context.entries.last() { + if entry.message().role == Role::Assistant && entry.message().timestamp.is_none() { + me.context.entries.pop(); + } } let msg = api::build_response_message(content, tool_calls); @@ -460,7 +546,6 @@ impl Agent { // Genuinely text-only response let text = msg.content_text().to_string(); let mut me = agent.lock().await; - me.activity.clear(); me.push_message(msg); // Drain pending control flags @@ -492,15 +577,15 @@ impl Agent { Ok(v) => v, Err(e) => { let err = format!("Error: malformed tool call arguments: {e}"); + let _act = start_activity(agent, format!("rejected: {} (bad args)", call.function.name)).await; let mut me = agent.lock().await; - me.activity = format!("rejected: {} (bad args)", call.function.name); me.apply_tool_result(call, err, ui_tx, ds); return; } }; let args_summary = summarize_args(&call.function.name, &args); - agent.lock().await.activity = format!("calling: {}", call.function.name); + let _calling = start_activity(agent, format!("calling: {}", call.function.name)).await; // Spawn tool, track it let call_clone = call.clone(); diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 8b90618..f46d4ec 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -259,6 +259,7 @@ impl Mind { let mut ag = self.agent.lock().await; if ag.last_prompt_tokens() > threshold { ag.compact(); + ag.notify("compacted"); } } MindCommand::Score => { @@ -381,7 +382,7 @@ impl Mind { let _ = self.turn_watch.send(false); if let Some(name) = model_switch { - crate::user::chat::cmd_switch_model(&self.agent, &name, &self.ui_tx).await; + crate::user::chat::cmd_switch_model(&self.agent, &name).await; } // Post-turn maintenance diff --git a/src/subconscious/learn.rs b/src/subconscious/learn.rs index a5a4c3c..a23007d 100644 --- a/src/subconscious/learn.rs +++ b/src/subconscious/learn.rs @@ -17,7 +17,7 @@ use crate::agent::api::ApiClient; use crate::agent::api::types::*; use crate::agent::context::{ConversationEntry, ContextState}; -use crate::user::ui_channel::{UiMessage, UiSender}; +use crate::user::ui_channel::UiSender; const SCORE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120); @@ -353,7 +353,7 @@ pub async fn score_memories_incremental( continue; } - if let Ok(mut ag) = agent.try_lock() { ag.activity = format!("scoring memory: {}...", key); } + let _scoring = crate::agent::start_activity(agent, format!("scoring: {}", key)).await; match score_divergence(&http, client, context, range, Filter::SkipKey(key)).await { Ok((divs, _)) => { let n_responses = divs.len(); @@ -361,7 +361,6 @@ pub async fn score_memories_incremental( dbglog!( "[scoring] {} max:{:.3} ({} responses)", key, max_div, n_responses, ); - // TODO: update graph weight once normalization is figured out results.push((key.clone(), max_div)); } Err(e) => { @@ -372,7 +371,6 @@ pub async fn score_memories_incremental( } } - if let Ok(mut ag) = agent.try_lock() { ag.activity.clear(); } Ok(results) } diff --git a/src/user/chat.rs b/src/user/chat.rs index d6eccdc..37432eb 100644 --- a/src/user/chat.rs +++ b/src/user/chat.rs @@ -16,7 +16,7 @@ use super::{ App, HotkeyAction, ScreenAction, ScreenView, screen_legend, }; -use crate::user::ui_channel::{UiMessage, StreamTarget}; +use crate::user::ui_channel::StreamTarget; use crate::mind::MindCommand; // --- Slash command table --- @@ -35,13 +35,15 @@ fn commands() -> Vec { vec![ SlashCommand { name: "/new", help: "Start fresh session (saves current)", handler: |s, _| { let _ = s.mind_tx.send(MindCommand::NewSession); } }, SlashCommand { name: "/save", help: "Save session to disk", - handler: |s, _| { let _ = s.ui_tx.send(UiMessage::Info("Conversation is saved automatically.".into())); } }, + handler: |s, _| { + if let Ok(mut ag) = s.agent.try_lock() { ag.notify("saved"); } + } }, SlashCommand { name: "/retry", help: "Re-run last turn", handler: |s, _| { let agent = s.agent.clone(); let mind_tx = s.mind_tx.clone(); - let ui_tx = s.ui_tx.clone(); tokio::spawn(async move { + let _act = crate::agent::start_activity(&agent, "retrying...").await; let mut ag = agent.lock().await; let entries = ag.entries_mut(); let mut last_user_text = None; @@ -53,36 +55,29 @@ fn commands() -> Vec { vec![ entries.pop(); } drop(ag); - match last_user_text { - Some(text) => { - let preview = &text[..text.len().min(60)]; - let _ = ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", preview))); - let _ = mind_tx.send(MindCommand::Turn(text, StreamTarget::Conversation)); - } - None => { - let _ = ui_tx.send(UiMessage::Info("(nothing to retry)".into())); - } + if let Some(text) = last_user_text { + let _ = mind_tx.send(MindCommand::Turn(text, StreamTarget::Conversation)); } }); } }, SlashCommand { name: "/model", help: "Show/switch model (/model )", handler: |s, arg| { if arg.is_empty() { - if let Ok(ag) = s.agent.try_lock() { - let _ = s.ui_tx.send(UiMessage::Info(format!("Current model: {}", ag.model()))); + if let Ok(mut ag) = s.agent.try_lock() { let names = ag.app_config.model_names(); - if !names.is_empty() { - let _ = s.ui_tx.send(UiMessage::Info(format!("Available: {}", names.join(", ")))); - } - } else { - let _ = s.ui_tx.send(UiMessage::Info("(busy)".into())); + let label = if names.is_empty() { + format!("model: {}", ag.model()) + } else { + format!("model: {} ({})", ag.model(), names.join(", ")) + }; + ag.notify(label); } } else { let agent = s.agent.clone(); - let ui_tx = s.ui_tx.clone(); let name = arg.to_string(); tokio::spawn(async move { - cmd_switch_model(&agent, &name, &ui_tx).await; + let _act = crate::agent::start_activity(&agent, format!("switching to {}...", name)).await; + cmd_switch_model(&agent, &name).await; }); } } }, @@ -91,16 +86,16 @@ fn commands() -> Vec { vec![ SlashCommand { name: "/dmn", help: "Show DMN state", handler: |s, _| { let st = s.shared_mind.lock().unwrap(); - let _ = s.ui_tx.send(UiMessage::Info(format!( - "DMN: {:?} ({}/{})", st.dmn, st.dmn_turns, st.max_dmn_turns, - ))); + if let Ok(mut ag) = s.agent.try_lock() { + ag.notify(format!("DMN: {:?} ({}/{})", st.dmn, st.dmn_turns, st.max_dmn_turns)); + } } }, SlashCommand { name: "/sleep", help: "Put DMN to sleep", handler: |s, _| { let mut st = s.shared_mind.lock().unwrap(); st.dmn = crate::mind::dmn::State::Resting { since: std::time::Instant::now() }; st.dmn_turns = 0; - let _ = s.ui_tx.send(UiMessage::Info("DMN sleeping.".into())); + if let Ok(mut ag) = s.agent.try_lock() { ag.notify("DMN sleeping"); } } }, SlashCommand { name: "/wake", help: "Wake DMN to foraging", handler: |s, _| { @@ -108,17 +103,17 @@ fn commands() -> Vec { vec![ if matches!(st.dmn, crate::mind::dmn::State::Off) { crate::mind::dmn::set_off(false); } st.dmn = crate::mind::dmn::State::Foraging; st.dmn_turns = 0; - let _ = s.ui_tx.send(UiMessage::Info("DMN foraging.".into())); + if let Ok(mut ag) = s.agent.try_lock() { ag.notify("DMN foraging"); } } }, SlashCommand { name: "/pause", help: "Full stop — no autonomous ticks (Ctrl+P)", handler: |s, _| { let mut st = s.shared_mind.lock().unwrap(); st.dmn = crate::mind::dmn::State::Paused; st.dmn_turns = 0; - let _ = s.ui_tx.send(UiMessage::Info("DMN paused.".into())); + if let Ok(mut ag) = s.agent.try_lock() { ag.notify("DMN paused"); } } }, SlashCommand { name: "/help", help: "Show this help", - handler: |s, _| { send_help(&s.ui_tx); } }, + handler: |s, _| { notify_help(&s.agent); } }, ]} fn dispatch_command(input: &str) -> Option { @@ -130,14 +125,13 @@ fn dispatch_command(input: &str) -> Option { pub async fn cmd_switch_model( agent: &std::sync::Arc>, name: &str, - ui_tx: &crate::user::ui_channel::UiSender, ) { let resolved = { let ag = agent.lock().await; match ag.app_config.resolve_model(name) { Ok(r) => r, Err(e) => { - let _ = ui_tx.send(UiMessage::Info(format!("{}", e))); + agent.lock().await.notify(format!("model error: {}", e)); return; } } @@ -154,28 +148,21 @@ pub async fn cmd_switch_model( if prompt_changed { ag.prompt_file = resolved.prompt_file.clone(); ag.compact(); - let _ = ui_tx.send(UiMessage::Info(format!( - "Switched to {} ({}) — prompt: {}, recompacted", - name, resolved.model_id, resolved.prompt_file, - ))); + ag.notify(format!("switched to {} (recompacted)", resolved.model_id)); } else { - let _ = ui_tx.send(UiMessage::Info(format!( - "Switched to {} ({})", name, resolved.model_id, - ))); + ag.notify(format!("switched to {}", resolved.model_id)); } } -pub(crate) fn send_help(ui_tx: &crate::user::ui_channel::UiSender) { - for cmd in &commands() { - let _ = ui_tx.send(UiMessage::Info(format!(" {:12} {}", cmd.name, cmd.help))); +fn notify_help(agent: &std::sync::Arc>) { + if let Ok(mut ag) = agent.try_lock() { + let mut help = String::new(); + for cmd in &commands() { + help.push_str(&format!("{:12} {}\n", cmd.name, cmd.help)); + } + help.push_str("Keys: Tab ^Up/Down PgUp/Down Mouse Esc ^P ^R ^K"); + ag.notify(help); } - let _ = ui_tx.send(UiMessage::Info(String::new())); - let _ = ui_tx.send(UiMessage::Info( - "Keys: Tab=pane ^Up/Down=scroll PgUp/PgDn=scroll Mouse=click/scroll".into(), - )); - let _ = ui_tx.send(UiMessage::Info( - " Alt+Enter=newline Esc=interrupt ^P=pause ^R=reasoning ^K=kill".into(), - )); } /// Turn marker for the conversation pane gutter. @@ -410,7 +397,6 @@ pub(crate) struct InteractScreen { agent: std::sync::Arc>, shared_mind: std::sync::Arc, mind_tx: tokio::sync::mpsc::UnboundedSender, - ui_tx: crate::user::ui_channel::UiSender, } impl InteractScreen { @@ -418,7 +404,6 @@ impl InteractScreen { agent: std::sync::Arc>, shared_mind: std::sync::Arc, mind_tx: tokio::sync::mpsc::UnboundedSender, - ui_tx: crate::user::ui_channel::UiSender, ) -> Self { Self { autonomous: PaneState::new(true), @@ -439,7 +424,6 @@ impl InteractScreen { agent, shared_mind, mind_tx, - ui_tx, } } @@ -576,7 +560,9 @@ impl InteractScreen { if let Some(cmd) = dispatch_command(input) { (cmd.handler)(self, &input[cmd.name.len()..].trim_start()); } else { - let _ = self.ui_tx.send(UiMessage::Info(format!("Unknown command: {}", input.split_whitespace().next().unwrap_or(input)))); + if let Ok(mut ag) = self.agent.try_lock() { + ag.notify(format!("unknown: {}", input.split_whitespace().next().unwrap_or(input))); + } } return; } @@ -585,29 +571,6 @@ impl InteractScreen { self.shared_mind.lock().unwrap().input.push(input.to_string()); } - /// Process a UiMessage — update pane state. - pub fn handle_ui_message(&mut self, msg: &UiMessage, app: &mut App) { - match msg { - UiMessage::TextDelta(text, target) => match target { - StreamTarget::Conversation => { - if self.needs_assistant_marker { - self.conversation.pending_marker = Marker::Assistant; - self.needs_assistant_marker = false; - } - self.conversation.current_color = Color::Reset; - self.conversation.append_text(text); - } - StreamTarget::Autonomous => { - self.autonomous.current_color = Color::Reset; - self.autonomous.append_text(text); - } - }, - UiMessage::Info(text) => { - self.conversation.push_line(text.clone(), Color::Cyan); - } - _ => {} - } - } fn scroll_active_up(&mut self, n: u16) { match self.active_pane { @@ -886,13 +849,14 @@ impl ScreenView for InteractScreen { self.sync_from_agent(); // Read status from agent + mind state - if let Ok(agent) = self.agent.try_lock() { + if let Ok(mut agent) = self.agent.try_lock() { + agent.expire_activities(); app.status.prompt_tokens = agent.last_prompt_tokens(); app.status.model = agent.model().to_string(); app.status.context_budget = agent.budget().status_string(); - if !agent.activity.is_empty() { - app.activity = agent.activity.clone(); - } + app.activity = agent.activities.last() + .map(|a| a.label.clone()) + .unwrap_or_default(); } { let mind = self.shared_mind.lock().unwrap(); diff --git a/src/user/mod.rs b/src/user/mod.rs index 8989755..f9baeee 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -17,7 +17,6 @@ use std::time::Duration; use crate::mind::MindCommand; use crate::user::{self as tui}; -use crate::user::ui_channel::UiMessage; // --- TUI infrastructure (moved from tui/mod.rs) --- @@ -211,14 +210,14 @@ pub async fn start(cli: crate::user::CliArgs) -> Result<()> { s.spawn(async { result = run( tui::App::new(String::new(), shared_context, shared_active_tools), - &mind, mind_tx, ui_tx, ui_rx, + &mind, mind_tx, ).await; }); }); result } -fn hotkey_cycle_reasoning(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender) { +fn hotkey_cycle_reasoning(mind: &crate::mind::Mind) { if let Ok(mut ag) = mind.agent.try_lock() { let next = match ag.reasoning_effort.as_str() { "none" => "low", @@ -232,31 +231,26 @@ fn hotkey_cycle_reasoning(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender "high" => "high (full monologue)", _ => next, }; - let _ = ui_tx.send(UiMessage::Info(format!("Reasoning: {} — ^R to cycle", label))); - } else { - let _ = ui_tx.send(UiMessage::Info( - "(agent busy — reasoning change takes effect next turn)".into(), - )); + ag.notify(format!("reasoning: {}", label)); } } -async fn hotkey_kill_processes(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender) { - let active_tools = mind.agent.lock().await.active_tools.clone(); +async fn hotkey_kill_processes(mind: &crate::mind::Mind) { + let mut ag = mind.agent.lock().await; + let active_tools = ag.active_tools.clone(); let mut tools = active_tools.lock().unwrap(); if tools.is_empty() { - let _ = ui_tx.send(UiMessage::Info("(no running tool calls)".into())); + ag.notify("no running tools"); } else { + let count = tools.len(); for entry in tools.drain(..) { - let elapsed = entry.started.elapsed(); - let _ = ui_tx.send(UiMessage::Info(format!( - " killing {} ({:.0}s): {}", entry.name, elapsed.as_secs_f64(), entry.detail, - ))); entry.handle.abort(); } + ag.notify(format!("killed {} tools", count)); } } -fn hotkey_cycle_autonomy(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender) { +fn hotkey_cycle_autonomy(mind: &crate::mind::Mind) { let mut s = mind.shared.lock().unwrap(); let label = match &s.dmn { crate::mind::dmn::State::Engaged | crate::mind::dmn::State::Working | crate::mind::dmn::State::Foraging => { @@ -280,7 +274,9 @@ fn hotkey_cycle_autonomy(mind: &crate::mind::Mind, ui_tx: &ui_channel::UiSender) }; s.dmn_turns = 0; drop(s); - let _ = ui_tx.send(UiMessage::Info(format!("DMN → {} (Ctrl+P to cycle)", label))); + if let Ok(mut ag) = mind.agent.try_lock() { + ag.notify(format!("DMN → {}", label)); + } } fn hotkey_adjust_sampling(mind: &crate::mind::Mind, param: usize, delta: f32) { @@ -294,24 +290,10 @@ fn hotkey_adjust_sampling(mind: &crate::mind::Mind, param: usize, delta: f32) { } } -pub fn send_context_info(config: &crate::config::SessionConfig, ui_tx: &ui_channel::UiSender) { - let context_groups = crate::config::get().context_groups.clone(); - let (instruction_files, memory_files) = crate::mind::identity::context_file_info( - &config.prompt_file, - config.app.memory_project.as_deref(), - &context_groups, - ); - let _ = ui_tx.send(UiMessage::Info(format!( - " context: {}K chars ({} config, {} memory files)", - config.context_parts.iter().map(|(_, c)| c.len()).sum::() / 1024, - instruction_files.len(), memory_files.len(), - ))); -} fn diff_mind_state( cur: &crate::mind::MindState, prev: &crate::mind::MindState, - ui_tx: &ui_channel::UiSender, dirty: &mut bool, ) { if cur.dmn.label() != prev.dmn.label() || cur.dmn_turns != prev.dmn_turns { @@ -325,15 +307,9 @@ fn diff_mind_state( *dirty = true; } if cur.scoring_in_flight != prev.scoring_in_flight { - if !cur.scoring_in_flight && prev.scoring_in_flight { - let _ = ui_tx.send(UiMessage::Info("[scoring complete]".into())); - } *dirty = true; } if cur.compaction_in_flight != prev.compaction_in_flight { - if !cur.compaction_in_flight && prev.compaction_in_flight { - let _ = ui_tx.send(UiMessage::Info("[compacted]".into())); - } *dirty = true; } } @@ -342,8 +318,6 @@ pub async fn run( mut app: tui::App, mind: &crate::mind::Mind, mind_tx: tokio::sync::mpsc::UnboundedSender, - ui_tx: ui_channel::UiSender, - mut ui_rx: ui_channel::UiReceiver, ) -> Result<()> { let agent = &mind.agent; let shared_mind = &mind.shared; @@ -362,9 +336,8 @@ pub async fn run( } let notify_rx = crate::thalamus::channels::subscribe_all(); - // InteractScreen held separately for UiMessage routing let mut interact = crate::user::chat::InteractScreen::new( - mind.agent.clone(), mind.shared.clone(), mind_tx.clone(), ui_tx.clone(), + mind.agent.clone(), mind.shared.clone(), mind_tx.clone(), ); // Overlay screens: F2=conscious, F3=subconscious, F4=unconscious, F5=thalamus let mut screens: Vec> = vec![ @@ -387,7 +360,7 @@ pub async fn run( terminal.hide_cursor()?; - let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into())); + if let Ok(mut ag) = agent.try_lock() { ag.notify("consciousness v0.3"); } // Initial render terminal.draw(|f| { @@ -453,9 +426,9 @@ pub async fn run( // One-time: mark startup done after Mind init if !startup_done { - if let Ok(ag) = agent.try_lock() { - // sync_from_agent handles conversation replay - let _ = ui_tx.send(UiMessage::Info(format!(" model: {}", ag.model()))); + if let Ok(mut ag) = agent.try_lock() { + let model = ag.model().to_string(); + ag.notify(format!("model: {}", model)); startup_done = true; dirty = true; } @@ -464,7 +437,7 @@ pub async fn run( // Diff MindState — generate UI messages from changes { let cur = shared_mind.lock().unwrap(); - diff_mind_state(&cur, &prev_mind, &ui_tx, &mut dirty); + diff_mind_state(&cur, &prev_mind, &mut dirty); prev_mind = cur.clone(); } @@ -482,27 +455,18 @@ pub async fn run( dirty = true; } - Some(msg) = ui_rx.recv() => { - interact.handle_ui_message(&msg, &mut app); - dirty = true; - } } // Handle hotkey actions let actions: Vec = app.hotkey_actions.drain(..).collect(); for action in actions { match action { - HotkeyAction::CycleReasoning => hotkey_cycle_reasoning(mind, &ui_tx), - HotkeyAction::KillProcess => hotkey_kill_processes(mind, &ui_tx).await, + HotkeyAction::CycleReasoning => hotkey_cycle_reasoning(mind), + HotkeyAction::KillProcess => hotkey_kill_processes(mind).await, HotkeyAction::Interrupt => { let _ = mind_tx.send(MindCommand::Interrupt); } - HotkeyAction::CycleAutonomy => hotkey_cycle_autonomy(mind, &ui_tx), + HotkeyAction::CycleAutonomy => hotkey_cycle_autonomy(mind), HotkeyAction::AdjustSampling(param, delta) => hotkey_adjust_sampling(mind, param, delta), } - } - - // Drain UiMessages to interact screen - while let Ok(msg) = ui_rx.try_recv() { - interact.handle_ui_message(&msg, &mut app); dirty = true; } diff --git a/src/user/ui_channel.rs b/src/user/ui_channel.rs index eccf607..6a21549 100644 --- a/src/user/ui_channel.rs +++ b/src/user/ui_channel.rs @@ -71,9 +71,6 @@ pub struct ContextInfo { #[derive(Debug, Clone)] #[allow(dead_code)] pub enum UiMessage { - /// Streaming text delta — routed to conversation or autonomous pane. - TextDelta(String, StreamTarget), - /// Informational message — goes to conversation pane (command output, etc). Info(String),