consciousness/src/user/ui_channel.rs

235 lines
8.3 KiB
Rust
Raw Normal View History

// ui_channel.rs — Output routing for TUI panes
//
// All output from the agent (streaming text, tool calls, status updates)
// goes through a UiMessage enum sent over an mpsc channel. The TUI
// receives these messages and routes them to the appropriate pane.
//
// This replaces direct stdout/stderr printing throughout the codebase.
// The agent and API client never touch the terminal directly — they
// just send messages that the TUI renders where appropriate.
//
// The channel also fans out to a broadcast channel so the observation
// socket (observe.rs) can subscribe without touching the main path.
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
// Re-export context types that moved to agent::context
pub use crate::agent::context::{ContextSection, SharedContextState, shared_context_state};
// ActiveToolCall lives in agent::tools — re-export for TUI access
pub use crate::agent::tools::ActiveToolCall;
/// Shared active tool calls — agent spawns, TUI reads metadata / aborts.
pub type SharedActiveTools = Arc<std::sync::Mutex<Vec<ActiveToolCall>>>;
pub fn shared_active_tools() -> SharedActiveTools {
Arc::new(std::sync::Mutex::new(Vec::new()))
}
/// Which pane streaming text should go to.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamTarget {
/// User-initiated turn — text goes to conversation pane.
Conversation,
/// DMN-initiated turn — text goes to autonomous pane.
Autonomous,
}
/// Status info for the bottom status bar.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct StatusInfo {
pub dmn_state: String,
pub dmn_turns: u32,
pub dmn_max_turns: u32,
pub prompt_tokens: u32,
pub completion_tokens: u32,
pub model: String,
/// Number of tool calls dispatched in the current turn.
pub turn_tools: u32,
/// Context window budget breakdown (e.g. "id:8% mem:25% jnl:30% conv:37%").
pub context_budget: String,
}
/// Context loading details for the debug screen.
#[derive(Debug, Clone)]
pub struct ContextInfo {
pub model: String,
pub available_models: Vec<String>,
pub prompt_file: String,
pub backend: String,
#[allow(dead_code)]
pub instruction_files: Vec<(String, usize)>,
#[allow(dead_code)]
pub memory_files: Vec<(String, usize)>,
pub system_prompt_chars: usize,
pub context_message_chars: usize,
}
/// Messages sent from agent/API to the TUI for rendering.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum UiMessage {
/// Streaming text delta — routed to conversation or autonomous pane
/// based on the current StreamTarget.
TextDelta(String, StreamTarget),
/// User's input echoed to conversation pane.
UserInput(String),
/// Tool call header: [tool_name] with args summary.
ToolCall {
name: String,
args_summary: String,
},
/// Full tool result — goes to tools pane.
ToolResult {
name: String,
result: String,
},
/// DMN state annotation: [dmn: foraging (3/20)].
DmnAnnotation(String),
/// Status bar update.
StatusUpdate(StatusInfo),
/// Live activity indicator for the status bar — shows what the
/// agent is doing right now ("thinking...", "calling: bash", etc).
/// Empty string clears the indicator.
Activity(String),
/// Reasoning/thinking tokens from the model (internal monologue).
/// Routed to the autonomous pane so the user can peek at what
/// the model is thinking about during long tool chains.
Reasoning(String),
/// A tool call started — shown as a live overlay above the status bar.
ToolStarted { id: String, name: String, detail: String },
/// A tool call finished — removes it from the live overlay.
ToolFinished { id: String },
/// Debug message (only shown when POC_DEBUG is set).
Debug(String),
/// Informational message — goes to conversation pane (command output, etc).
Info(String),
/// Context loading details — stored for the debug screen (Ctrl+D).
ContextInfoUpdate(ContextInfo),
/// Agent cycle state update — refreshes the F2 agents screen.
AgentUpdate(Vec<crate::subconscious::subconscious::AgentSnapshot>),
}
/// Sender that fans out to both the TUI (mpsc) and observers (broadcast).
#[derive(Clone)]
pub struct UiSender {
tui: mpsc::UnboundedSender<UiMessage>,
observe: broadcast::Sender<UiMessage>,
}
impl UiSender {
pub fn send(&self, msg: UiMessage) -> Result<(), mpsc::error::SendError<UiMessage>> {
// Broadcast to observers (ignore errors — no subscribers is fine)
let _ = self.observe.send(msg.clone());
self.tui.send(msg)
}
/// Subscribe to the broadcast side (for the observation socket).
pub fn subscribe(&self) -> broadcast::Receiver<UiMessage> {
self.observe.subscribe()
}
}
/// Convenience type for the receiving half.
pub type UiReceiver = mpsc::UnboundedReceiver<UiMessage>;
/// Create a new UI channel pair.
pub fn channel() -> (UiSender, UiReceiver) {
let (tui_tx, tui_rx) = mpsc::unbounded_channel();
let (observe_tx, _) = broadcast::channel(1024);
(UiSender { tui: tui_tx, observe: observe_tx }, tui_rx)
}
2026-04-04 02:46:32 -04:00
/// Replay a restored session into the TUI panes so the user can see
/// conversation history immediately on restart. Shows user input,
/// assistant responses, and brief tool call summaries. Skips the system
/// prompt, context message, DMN plumbing, and image injection messages.
pub fn replay_session_to_ui(entries: &[crate::agent::context::ConversationEntry], ui_tx: &UiSender) {
use crate::agent::api::types::Role;
crate::dbglog!("[replay] replaying {} entries to UI", entries.len());
for (i, e) in entries.iter().enumerate() {
let m = e.message();
let preview: String = m.content_text().chars().take(60).collect();
crate::dbglog!("[replay] [{}] {:?} mem={} tc={} tcid={:?} {:?}",
i, m.role, e.is_memory(), m.tool_calls.as_ref().map_or(0, |t| t.len()),
m.tool_call_id.as_deref(), preview);
}
let mut seen_first_user = false;
let mut target = StreamTarget::Conversation;
for entry in entries {
if entry.is_memory() { continue; }
let msg = entry.message();
match msg.role {
Role::System => {}
Role::User => {
if !seen_first_user {
seen_first_user = true;
continue;
}
let text = msg.content_text();
if text.starts_with("Your context was just compacted")
|| text.starts_with("Your context was just rebuilt")
|| text.starts_with("[Earlier in this conversation")
|| text.starts_with("Here is the image")
|| text.contains("[image aged out")
{
continue;
}
if text.starts_with("[dmn]") {
target = StreamTarget::Autonomous;
let first_line = text.lines().next().unwrap_or("[dmn]");
let _ = ui_tx.send(UiMessage::DmnAnnotation(first_line.to_string()));
} else {
target = StreamTarget::Conversation;
let _ = ui_tx.send(UiMessage::UserInput(text.to_string()));
}
}
Role::Assistant => {
if let Some(ref calls) = msg.tool_calls {
for call in calls {
let _ = ui_tx.send(UiMessage::ToolCall {
name: call.function.name.clone(),
args_summary: String::new(),
});
}
}
let text = msg.content_text();
if !text.is_empty() {
let _ = ui_tx.send(UiMessage::TextDelta(format!("{}\n", text), target));
}
}
Role::Tool => {
let text = msg.content_text();
let preview: String = text.lines().take(3).collect::<Vec<_>>().join("\n");
let truncated = if text.lines().count() > 3 {
format!("{}...", preview)
} else {
preview
};
let _ = ui_tx.send(UiMessage::ToolResult {
name: String::new(),
result: truncated,
});
}
}
}
}