Delete ui_channel.rs — relocate types, remove all UiMessage/UiSender plumbing

Types relocated:
- StreamTarget → mind/mod.rs (Mind decides Conversation vs Autonomous)
- SharedActiveTools + shared_active_tools() → agent/tools/mod.rs
- ContextSection + SharedContextState → agent/context.rs (already there)
- StatusInfo + ContextInfo → user/mod.rs (UI display state)

Removed UiSender from: Agent::turn, Mind, learn.rs, all function signatures.
The entire message-passing layer is gone. All state flows through
Agent fields (activities, entries, streaming) read by the UI via try_lock.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
ProofOfConcept 2026-04-05 22:34:48 -04:00
parent cfddb55ed9
commit f390fa1617
11 changed files with 72 additions and 165 deletions

View file

@ -20,7 +20,6 @@ use tokio::sync::mpsc;
use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall};
pub use types::ToolCall;
use crate::user::ui_channel::UiSender;
/// A JoinHandle that aborts its task when dropped.
pub struct AbortOnDrop(tokio::task::JoinHandle<()>);
@ -107,7 +106,6 @@ impl ApiClient {
&self,
messages: &[Message],
tools: &[agent_tools::Tool],
ui_tx: &UiSender,
reasoning_effort: &str,
sampling: SamplingParams,
priority: Option<i32>,
@ -119,14 +117,13 @@ impl ApiClient {
let messages = messages.to_vec();
let tools_json = tools_to_json_str(tools);
let tools_value: serde_json::Value = serde_json::from_str(&tools_json).unwrap_or_default();
let ui_tx = ui_tx.clone();
let reasoning_effort = reasoning_effort.to_string();
let base_url = self.base_url.clone();
let handle = tokio::spawn(async move {
let result = openai::stream_events(
&client, &base_url, &api_key, &model,
&messages, &tools_value, &tx, &ui_tx,
&messages, &tools_value, &tx,
&reasoning_effort, sampling, priority,
).await;
if let Err(e) = result {
@ -141,13 +138,12 @@ impl ApiClient {
&self,
messages: &[Message],
tools: &[agent_tools::Tool],
ui_tx: &UiSender,
reasoning_effort: &str,
sampling: SamplingParams,
priority: Option<i32>,
) -> Result<(Message, Option<Usage>)> {
// Use the event stream and accumulate into a message.
let (mut rx, _handle) = self.start_stream(messages, tools, ui_tx, reasoning_effort, sampling, priority);
let (mut rx, _handle) = self.start_stream(messages, tools, reasoning_effort, sampling, priority);
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
@ -319,14 +315,13 @@ pub(crate) struct SseReader {
pub sse_lines_parsed: u64,
pub sse_parse_errors: u64,
debug: bool,
ui_tx: UiSender,
done: bool,
/// Serialized request payload — saved to disk on errors for replay debugging.
pub(crate) request_json: Option<String>,
}
impl SseReader {
pub(crate) fn new(ui_tx: &UiSender) -> Self {
pub(crate) fn new() -> Self {
Self {
line_buf: String::new(),
chunk_timeout: Duration::from_secs(crate::config::get().api_stream_timeout_secs),
@ -335,7 +330,6 @@ impl SseReader {
sse_lines_parsed: 0,
sse_parse_errors: 0,
debug: std::env::var("POC_DEBUG").is_ok(),
ui_tx: ui_tx.clone(),
done: false,
request_json: None,
}
@ -516,7 +510,6 @@ pub fn build_response_message(
/// Log stream diagnostics. Shared by both backends.
pub(crate) fn log_diagnostics(
ui_tx: &UiSender,
content_len: usize,
tool_count: usize,
reasoning_chars: usize,
@ -619,7 +612,7 @@ pub struct StreamResult {
pub async fn collect_stream(
rx: &mut mpsc::UnboundedReceiver<StreamEvent>,
agent: &std::sync::Arc<tokio::sync::Mutex<super::Agent>>,
active_tools: &crate::user::ui_channel::SharedActiveTools,
active_tools: &crate::agent::tools::SharedActiveTools,
) -> StreamResult {
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();

View file

@ -9,7 +9,6 @@ use reqwest::Client;
use tokio::sync::mpsc;
use super::types::*;
use crate::user::ui_channel::UiSender;
use super::StreamEvent;
/// Stream SSE events from an OpenAI-compatible endpoint, sending
@ -23,7 +22,6 @@ pub(super) async fn stream_events(
messages: &[Message],
tools_json: &serde_json::Value,
tx: &mpsc::UnboundedSender<StreamEvent>,
ui_tx: &UiSender,
reasoning_effort: &str,
sampling: super::SamplingParams,
priority: Option<i32>,
@ -71,7 +69,7 @@ pub(super) async fn stream_events(
)
.await?;
let mut reader = super::SseReader::new(ui_tx);
let mut reader = super::SseReader::new();
reader.request_json = request_json;
let mut content_len: usize = 0;
@ -167,7 +165,6 @@ pub(super) async fn stream_events(
let total_elapsed = reader.stream_start.elapsed();
super::log_diagnostics(
ui_tx,
content_len,
tool_call_count,
reasoning_chars,

View file

@ -28,7 +28,8 @@ use context::{ConversationEntry, ContextState, ContextBudget};
use tools::{summarize_args, working_stack};
use crate::mind::log::ConversationLog;
use crate::user::ui_channel::{ContextSection, SharedContextState, StreamTarget, UiSender};
use crate::agent::context::{ContextSection, SharedContextState};
use crate::mind::StreamTarget;
use crate::subconscious::learn;
// --- Activity tracking (RAII guards) ---
@ -177,7 +178,7 @@ pub struct Agent {
/// TODO: move to Session — it's session-level, not agent-level.
pub agent_cycles: crate::subconscious::subconscious::AgentCycleState,
/// Shared active tools — Agent writes, TUI reads.
pub active_tools: crate::user::ui_channel::SharedActiveTools,
pub active_tools: crate::agent::tools::SharedActiveTools,
}
fn render_journal(entries: &[context::JournalEntry]) -> String {
@ -199,7 +200,7 @@ impl Agent {
prompt_file: String,
conversation_log: Option<ConversationLog>,
shared_context: SharedContextState,
active_tools: crate::user::ui_channel::SharedActiveTools,
active_tools: tools::SharedActiveTools,
) -> Self {
let tokenizer = tiktoken_rs::cl100k_base()
.expect("failed to load cl100k_base tokenizer");
@ -325,7 +326,6 @@ impl Agent {
pub async fn turn(
agent: Arc<tokio::sync::Mutex<Agent>>,
user_input: &str,
ui_tx: &UiSender,
target: StreamTarget,
) -> Result<TurnResult> {
// --- Pre-loop setup (lock 1): agent cycle, memories, user input ---
@ -382,7 +382,7 @@ impl Agent {
let mut me = agent.lock().await;
let mut bg_ds = DispatchState::new();
for (call, output) in bg_results {
me.apply_tool_result(&call, output, ui_tx, &mut bg_ds);
me.apply_tool_result(&call, output, &mut bg_ds);
}
me.push_message(Message::user(user_input));
}
@ -408,7 +408,6 @@ impl Agent {
me.client.start_stream(
&api_messages,
&me.tools,
ui_tx,
&me.reasoning_effort,
sampling,
None,
@ -522,7 +521,7 @@ impl Agent {
// Reacquire to apply results
let mut me = agent.lock().await;
for (call, output) in results {
me.apply_tool_result(&call, output, ui_tx, &mut ds);
me.apply_tool_result(&call, output, &mut ds);
}
me.publish_context_state();
continue;
@ -536,7 +535,7 @@ impl Agent {
// Drop lock before tool dispatch
for call in &calls {
Agent::dispatch_tool_call_unlocked(
&agent, &active_tools, call, ui_tx, &mut ds,
&agent, &active_tools, call, &mut ds,
).await;
}
continue;
@ -568,9 +567,8 @@ impl Agent {
/// Used by `turn()` which manages its own locking.
async fn dispatch_tool_call_unlocked(
agent: &Arc<tokio::sync::Mutex<Agent>>,
active_tools: &crate::user::ui_channel::SharedActiveTools,
active_tools: &crate::agent::tools::SharedActiveTools,
call: &ToolCall,
ui_tx: &UiSender,
ds: &mut DispatchState,
) {
let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) {
@ -579,7 +577,7 @@ impl Agent {
let err = format!("Error: malformed tool call arguments: {e}");
let _act = start_activity(agent, format!("rejected: {} (bad args)", call.function.name)).await;
let mut me = agent.lock().await;
me.apply_tool_result(call, err, ui_tx, ds);
me.apply_tool_result(call, err, ds);
return;
}
};
@ -613,7 +611,7 @@ impl Agent {
if let Ok((call, output)) = entry.handle.await {
// Brief lock to apply result
let mut me = agent.lock().await;
me.apply_tool_result(&call, output, ui_tx, ds);
me.apply_tool_result(&call, output, ds);
}
}
@ -622,7 +620,6 @@ impl Agent {
&mut self,
call: &ToolCall,
output: String,
ui_tx: &UiSender,
ds: &mut DispatchState,
) {
let args: serde_json::Value =

View file

@ -22,6 +22,7 @@ pub mod working_stack;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
fn default_timeout() -> u64 { 120 }
@ -70,6 +71,13 @@ pub struct ActiveToolCall {
pub handle: tokio::task::JoinHandle<(ToolCall, String)>,
}
/// Shared active tool calls — agent spawns, TUI reads metadata / aborts.
pub type SharedActiveTools = Arc<std::sync::Mutex<Vec<ActiveToolCall>>>;
pub fn shared_active_tools() -> SharedActiveTools {
Arc::new(std::sync::Mutex::new(Vec::new()))
}
/// Truncate output if it exceeds max length, appending a truncation notice.
pub fn truncate_output(mut s: String, max: usize) -> String {
if s.len() > max {