Kill TextDelta, Info — UiMessage is dead. RAII ActivityGuards replace all status feedback
Streaming text now goes directly to agent entries via append_streaming(). sync_from_agent diffs the growing entry each tick. The streaming entry is popped when the response completes; build_response_message pushes the final version. All status feedback uses RAII ActivityGuards: - push_activity() for long-running work (thinking, streaming, scoring) - notify() for instant feedback (compacted, DMN state changes, commands) - Guards auto-remove on Drop, appending "(complete)" and lingering 5s - expire_activities() cleans up timed-out notifications on render tick UiMessage enum reduced to a single Info variant with zero sends. The channel infrastructure remains for now (Mind/Agent still take UiSender in signatures) — mechanical cleanup for a follow-up. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
e7914e3d58
commit
cfddb55ed9
9 changed files with 201 additions and 186 deletions
|
|
@ -20,7 +20,7 @@ use tokio::sync::mpsc;
|
|||
|
||||
use crate::agent::tools::{self as agent_tools, summarize_args, ActiveToolCall};
|
||||
pub use types::ToolCall;
|
||||
use crate::user::ui_channel::{UiMessage, UiSender, StreamTarget};
|
||||
use crate::user::ui_channel::UiSender;
|
||||
|
||||
/// A JoinHandle that aborts its task when dropped.
|
||||
pub struct AbortOnDrop(tokio::task::JoinHandle<()>);
|
||||
|
|
@ -130,7 +130,7 @@ impl ApiClient {
|
|||
&reasoning_effort, sampling, priority,
|
||||
).await;
|
||||
if let Err(e) = result {
|
||||
let _ = tx.send(StreamEvent::Error(e.to_string();
|
||||
let _ = tx.send(StreamEvent::Error(e.to_string()));
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -207,7 +207,6 @@ pub(crate) async fn send_and_check(
|
|||
body: &impl serde::Serialize,
|
||||
auth_header: (&str, &str),
|
||||
extra_headers: &[(&str, &str)],
|
||||
ui_tx: &UiSender,
|
||||
debug_label: &str,
|
||||
request_json: Option<&str>,
|
||||
) -> Result<reqwest::Response> {
|
||||
|
|
@ -619,8 +618,6 @@ pub struct StreamResult {
|
|||
/// - UI forwarding (text deltas, reasoning, tool call notifications)
|
||||
pub async fn collect_stream(
|
||||
rx: &mut mpsc::UnboundedReceiver<StreamEvent>,
|
||||
ui_tx: &UiSender,
|
||||
target: StreamTarget,
|
||||
agent: &std::sync::Arc<tokio::sync::Mutex<super::Agent>>,
|
||||
active_tools: &crate::user::ui_channel::SharedActiveTools,
|
||||
) -> StreamResult {
|
||||
|
|
@ -633,12 +630,13 @@ pub async fn collect_stream(
|
|||
let mut error = None;
|
||||
let mut first_content = true;
|
||||
let mut display_buf = String::new();
|
||||
let mut _streaming_guard: Option<super::ActivityGuard> = None;
|
||||
|
||||
while let Some(event) = rx.recv().await {
|
||||
match event {
|
||||
StreamEvent::Content(text) => {
|
||||
if first_content {
|
||||
if let Ok(mut ag) = agent.try_lock() { ag.activity = "streaming...".into(); }
|
||||
_streaming_guard = Some(super::start_activity(agent, "streaming...").await);
|
||||
first_content = false;
|
||||
}
|
||||
content.push_str(&text);
|
||||
|
|
@ -683,7 +681,7 @@ pub async fn collect_stream(
|
|||
if let Some(pos) = display_buf.find("<tool_call>") {
|
||||
let before = &display_buf[..pos];
|
||||
if !before.is_empty() {
|
||||
let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target));
|
||||
if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(before); }
|
||||
}
|
||||
display_buf.clear();
|
||||
in_tool_call = true;
|
||||
|
|
@ -693,7 +691,7 @@ pub async fn collect_stream(
|
|||
if safe > 0 {
|
||||
let flush = display_buf[..safe].to_string();
|
||||
display_buf = display_buf[safe..].to_string();
|
||||
let _ = ui_tx.send(UiMessage::TextDelta(flush, target));
|
||||
if let Ok(mut ag) = agent.try_lock() { ag.append_streaming(&flush); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use reqwest::Client;
|
|||
use tokio::sync::mpsc;
|
||||
|
||||
use super::types::*;
|
||||
use crate::user::ui_channel::{UiMessage, UiSender};
|
||||
use crate::user::ui_channel::UiSender;
|
||||
use super::StreamEvent;
|
||||
|
||||
/// Stream SSE events from an OpenAI-compatible endpoint, sending
|
||||
|
|
@ -66,7 +66,6 @@ pub(super) async fn stream_events(
|
|||
&request,
|
||||
("Authorization", &format!("Bearer {}", api_key)),
|
||||
&[],
|
||||
ui_tx,
|
||||
&debug_label,
|
||||
request_json.as_deref(),
|
||||
)
|
||||
|
|
@ -105,7 +104,7 @@ pub(super) async fn stream_events(
|
|||
};
|
||||
|
||||
if let Some(ref u) = chunk.usage {
|
||||
let _ = tx.send(StreamEvent::Usage(u.clone();
|
||||
let _ = tx.send(StreamEvent::Usage(u.clone()));
|
||||
usage = chunk.usage;
|
||||
}
|
||||
|
||||
|
|
@ -126,7 +125,7 @@ pub(super) async fn stream_events(
|
|||
reasoning_chars += r.len();
|
||||
has_reasoning = true;
|
||||
if !r.is_empty() {
|
||||
let _ = tx.send(StreamEvent::Reasoning(r.clone();
|
||||
let _ = tx.send(StreamEvent::Reasoning(r.clone()));
|
||||
}
|
||||
}
|
||||
if let Some(ref r) = choice.delta.reasoning_details {
|
||||
|
|
@ -143,7 +142,7 @@ pub(super) async fn stream_events(
|
|||
first_content_at = Some(reader.stream_start.elapsed());
|
||||
}
|
||||
content_len += text_delta.len();
|
||||
let _ = tx.send(StreamEvent::Content(text_delta.clone();
|
||||
let _ = tx.send(StreamEvent::Content(text_delta.clone()));
|
||||
}
|
||||
|
||||
if let Some(ref tc_deltas) = choice.delta.tool_calls {
|
||||
|
|
|
|||
|
|
@ -202,6 +202,15 @@ impl Message {
|
|||
self.content.as_ref().map_or("", |c| c.as_text())
|
||||
}
|
||||
|
||||
/// Append text to existing content (for streaming).
|
||||
pub fn append_content(&mut self, text: &str) {
|
||||
match self.content {
|
||||
Some(MessageContent::Text(ref mut s)) => s.push_str(text),
|
||||
None => self.content = Some(MessageContent::Text(text.to_string())),
|
||||
_ => {} // Parts — don't append to multimodal
|
||||
}
|
||||
}
|
||||
|
||||
pub fn role_str(&self) -> &str {
|
||||
match self.role {
|
||||
Role::System => "system",
|
||||
|
|
|
|||
143
src/agent/mod.rs
143
src/agent/mod.rs
|
|
@ -28,9 +28,82 @@ use context::{ConversationEntry, ContextState, ContextBudget};
|
|||
use tools::{summarize_args, working_stack};
|
||||
|
||||
use crate::mind::log::ConversationLog;
|
||||
use crate::user::ui_channel::{ContextSection, SharedContextState, StreamTarget, UiMessage, UiSender};
|
||||
use crate::user::ui_channel::{ContextSection, SharedContextState, StreamTarget, UiSender};
|
||||
use crate::subconscious::learn;
|
||||
|
||||
// --- Activity tracking (RAII guards) ---
|
||||
|
||||
pub struct ActivityEntry {
|
||||
pub id: u64,
|
||||
pub label: String,
|
||||
pub started: std::time::Instant,
|
||||
/// Auto-expires this long after creation (or completion).
|
||||
pub expires_at: std::time::Instant,
|
||||
}
|
||||
|
||||
/// RAII guard — marks the activity "(complete)" on drop, starts expiry timer.
|
||||
pub struct ActivityGuard {
|
||||
agent: Arc<tokio::sync::Mutex<Agent>>,
|
||||
id: u64,
|
||||
}
|
||||
|
||||
const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5);
|
||||
|
||||
impl Drop for ActivityGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Ok(mut ag) = self.agent.try_lock() {
|
||||
if let Some(entry) = ag.activities.iter_mut().find(|a| a.id == self.id) {
|
||||
entry.label.push_str(" (complete)");
|
||||
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Agent {
|
||||
/// Register an activity, returns its ID. Caller creates the guard.
|
||||
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
|
||||
self.expire_activities();
|
||||
let id = self.next_activity_id;
|
||||
self.next_activity_id += 1;
|
||||
self.activities.push(ActivityEntry {
|
||||
id, label: label.into(),
|
||||
started: std::time::Instant::now(),
|
||||
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600),
|
||||
});
|
||||
id
|
||||
}
|
||||
|
||||
/// Push a notification — auto-expires after 5 seconds.
|
||||
pub fn notify(&mut self, label: impl Into<String>) {
|
||||
self.expire_activities();
|
||||
let id = self.next_activity_id;
|
||||
self.next_activity_id += 1;
|
||||
self.activities.push(ActivityEntry {
|
||||
id, label: label.into(),
|
||||
started: std::time::Instant::now(),
|
||||
expires_at: std::time::Instant::now() + ACTIVITY_LINGER,
|
||||
});
|
||||
}
|
||||
|
||||
/// Remove expired activities.
|
||||
pub fn expire_activities(&mut self) {
|
||||
let now = std::time::Instant::now();
|
||||
self.activities.retain(|a| a.expires_at > now);
|
||||
}
|
||||
}
|
||||
|
||||
/// Create an activity guard from outside the lock.
|
||||
pub fn activity_guard(agent: &Arc<tokio::sync::Mutex<Agent>>, id: u64) -> ActivityGuard {
|
||||
ActivityGuard { agent: agent.clone(), id }
|
||||
}
|
||||
|
||||
/// Convenience: lock, push activity, unlock, return guard.
|
||||
pub async fn start_activity(agent: &Arc<tokio::sync::Mutex<Agent>>, label: impl Into<String>) -> ActivityGuard {
|
||||
let id = agent.lock().await.push_activity(label);
|
||||
ActivityGuard { agent: agent.clone(), id }
|
||||
}
|
||||
|
||||
/// Result of a single agent turn.
|
||||
pub struct TurnResult {
|
||||
/// The text response (already sent through UI channel).
|
||||
|
|
@ -77,8 +150,9 @@ pub struct Agent {
|
|||
pub temperature: f32,
|
||||
pub top_p: f32,
|
||||
pub top_k: u32,
|
||||
/// Live activity indicator — read by UI on render tick.
|
||||
pub activity: String,
|
||||
/// Active activities — RAII guards auto-remove on drop.
|
||||
pub activities: Vec<ActivityEntry>,
|
||||
next_activity_id: u64,
|
||||
/// Control tool flags — set by tool handlers, consumed by turn loop.
|
||||
pub pending_yield: bool,
|
||||
pub pending_model_switch: Option<String>,
|
||||
|
|
@ -147,7 +221,8 @@ impl Agent {
|
|||
temperature: 0.6,
|
||||
top_p: 0.95,
|
||||
top_k: 20,
|
||||
activity: String::new(),
|
||||
activities: Vec::new(),
|
||||
next_activity_id: 0,
|
||||
pending_yield: false,
|
||||
pending_model_switch: None,
|
||||
pending_dmn_pause: false,
|
||||
|
|
@ -182,7 +257,7 @@ impl Agent {
|
|||
if !jnl.is_empty() {
|
||||
msgs.push(Message::user(jnl));
|
||||
}
|
||||
msgs.extend(self.context.entries.iter().map(|e| e.api_message().clone();
|
||||
msgs.extend(self.context.entries.iter().map(|e| e.api_message().clone()));
|
||||
msgs
|
||||
}
|
||||
|
||||
|
|
@ -218,9 +293,22 @@ impl Agent {
|
|||
self.context.entries.push(entry);
|
||||
}
|
||||
|
||||
/// Push a context-only message (system prompt, identity context,
|
||||
/// journal summaries). Not logged — these are reconstructed on
|
||||
/// every startup/compaction.
|
||||
/// Append streaming text to the last entry (creating a partial
|
||||
/// assistant entry if needed). Called by collect_stream per token batch.
|
||||
pub fn append_streaming(&mut self, text: &str) {
|
||||
if let Some(entry) = self.context.entries.last_mut() {
|
||||
let msg = entry.message_mut();
|
||||
if msg.role == Role::Assistant {
|
||||
msg.append_content(text);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// No assistant entry yet — push a new partial one
|
||||
self.context.entries.push(ConversationEntry::Message(
|
||||
Message::assistant(text),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn budget(&self) -> ContextBudget {
|
||||
let count_str = |s: &str| self.tokenizer.encode_with_special_tokens(s).len();
|
||||
let count_msg = |m: &Message| crate::agent::context::msg_token_count(&self.tokenizer, m);
|
||||
|
|
@ -263,7 +351,7 @@ impl Agent {
|
|||
me.push_message(Message::user(format!(
|
||||
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
|
||||
reflection.trim(),
|
||||
);
|
||||
)));
|
||||
}
|
||||
|
||||
// Collect completed background tool handles — remove from active list
|
||||
|
|
@ -308,9 +396,9 @@ impl Agent {
|
|||
|
||||
loop {
|
||||
// --- Lock 2: assemble messages, start stream ---
|
||||
let _thinking = start_activity(&agent, "thinking...").await;
|
||||
let (mut rx, _stream_guard) = {
|
||||
let mut me = agent.lock().await;
|
||||
me.activity = "thinking...".into();
|
||||
let me = agent.lock().await;
|
||||
let api_messages = me.assemble_api_messages();
|
||||
let sampling = api::SamplingParams {
|
||||
temperature: me.temperature,
|
||||
|
|
@ -330,7 +418,7 @@ impl Agent {
|
|||
|
||||
// --- Stream loop (no lock) ---
|
||||
let sr = api::collect_stream(
|
||||
&mut rx, ui_tx, target, &agent, &active_tools,
|
||||
&mut rx, &agent, &active_tools,
|
||||
).await;
|
||||
let api::StreamResult {
|
||||
content, tool_calls, usage, finish_reason,
|
||||
|
|
@ -347,39 +435,37 @@ impl Agent {
|
|||
let err = anyhow::anyhow!("{}", e);
|
||||
if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 {
|
||||
overflow_retries += 1;
|
||||
let _ = ui_tx.send(UiMessage::Info(format!(
|
||||
"[context overflow — compacting and retrying ({}/2)]",
|
||||
overflow_retries,
|
||||
);
|
||||
me.notify(format!("context overflow — retrying ({}/2)", overflow_retries));
|
||||
me.compact();
|
||||
continue;
|
||||
}
|
||||
if crate::agent::context::is_stream_error(&err) && empty_retries < 2 {
|
||||
empty_retries += 1;
|
||||
let _ = ui_tx.send(UiMessage::Info(format!(
|
||||
"[stream error: {} — retrying ({}/2)]",
|
||||
e, empty_retries,
|
||||
);
|
||||
me.notify(format!("stream error — retrying ({}/2)", empty_retries));
|
||||
drop(me);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||||
continue;
|
||||
}
|
||||
me.activity.clear();
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
if finish_reason.as_deref() == Some("error") {
|
||||
let detail = if content.is_empty() { "no details".into() } else { content };
|
||||
me.activity.clear();
|
||||
return Err(anyhow::anyhow!("model stream error: {}", detail));
|
||||
}
|
||||
|
||||
// Flush remaining display buffer
|
||||
// Flush remaining display buffer to streaming entry
|
||||
if !in_tool_call && !display_buf.is_empty() {
|
||||
let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target));
|
||||
me.append_streaming(&display_buf);
|
||||
}
|
||||
if !content.is_empty() && !in_tool_call {
|
||||
let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target));
|
||||
|
||||
// Pop the streaming entry — the proper entry gets pushed below
|
||||
// via build_response_message which handles tool calls, leaked
|
||||
// tool calls, etc. sync_from_agent handles the swap.
|
||||
if let Some(entry) = me.context.entries.last() {
|
||||
if entry.message().role == Role::Assistant && entry.message().timestamp.is_none() {
|
||||
me.context.entries.pop();
|
||||
}
|
||||
}
|
||||
|
||||
let msg = api::build_response_message(content, tool_calls);
|
||||
|
|
@ -460,7 +546,6 @@ impl Agent {
|
|||
// Genuinely text-only response
|
||||
let text = msg.content_text().to_string();
|
||||
let mut me = agent.lock().await;
|
||||
me.activity.clear();
|
||||
me.push_message(msg);
|
||||
|
||||
// Drain pending control flags
|
||||
|
|
@ -492,15 +577,15 @@ impl Agent {
|
|||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
let err = format!("Error: malformed tool call arguments: {e}");
|
||||
let _act = start_activity(agent, format!("rejected: {} (bad args)", call.function.name)).await;
|
||||
let mut me = agent.lock().await;
|
||||
me.activity = format!("rejected: {} (bad args)", call.function.name);
|
||||
me.apply_tool_result(call, err, ui_tx, ds);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let args_summary = summarize_args(&call.function.name, &args);
|
||||
agent.lock().await.activity = format!("calling: {}", call.function.name);
|
||||
let _calling = start_activity(agent, format!("calling: {}", call.function.name)).await;
|
||||
|
||||
// Spawn tool, track it
|
||||
let call_clone = call.clone();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue