WIP: Wiring context_new into agent — turn loop, StreamToken, dead code removal

Work in progress. New turn loop uses ResponseParser + StreamToken.
Killed StreamEvent, append_streaming, finalize_streaming, streaming_index,
assemble_api_messages, working_stack. Many methods still reference old
types — fixing next.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-08 14:55:10 -04:00
parent 648356ae40
commit 9c79d7a037
4 changed files with 202 additions and 424 deletions

View file

@ -23,13 +23,11 @@ pub mod tools;
use std::sync::Arc;
use anyhow::Result;
use api::{ApiClient, ToolCall};
use api::{ContentPart, Message, MessageContent, Role};
use context::{ConversationEntry, ContextEntry, ContextState};
use api::ApiClient;
use context_new::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role};
use tools::{summarize_args, working_stack};
use crate::mind::log::ConversationLog;
use crate::agent::context::ContextSection;
// --- Activity tracking (RAII guards) ---
@ -163,7 +161,6 @@ pub struct Agent {
pub provenance: String,
/// Persistent conversation log — append-only record of all messages.
pub conversation_log: Option<ConversationLog>,
/// Mutable context state — personality, working stack, etc.
pub context: ContextState,
/// App config — used to reload identity on compaction and model switching.
pub app_config: crate::config::AppConfig,
@ -190,11 +187,9 @@ impl Agent {
conversation_log: Option<ConversationLog>,
active_tools: tools::SharedActiveTools,
) -> Self {
let mut system = ContextSection::new("System prompt");
system.push(ContextEntry::new(
ConversationEntry::System(Message::system(&system_prompt)), None));
let mut context = ContextState::new();
context.push(Section::System, AstNode::system_msg(&system_prompt));
// Tool definitions — part of the context, tokenized and scored
let tool_defs: Vec<String> = tools::tools().iter()
.map(|t| t.to_json()).collect();
if !tool_defs.is_empty() {
@ -207,22 +202,12 @@ impl Agent {
IMPORTANT: Function calls MUST follow the specified format.",
tool_defs.join("\n"),
);
system.push(ContextEntry::new(
ConversationEntry::System(Message::system(&tools_text)), None));
context.push(Section::System, AstNode::system_msg(&tools_text));
}
let mut identity = ContextSection::new("Identity");
for (_name, content) in &personality {
identity.push(ContextEntry::new(
ConversationEntry::Message(Message::user(content)), None));
for (name, content) in &personality {
context.push(Section::Identity, AstNode::memory(name, content));
}
let context = ContextState {
system,
identity,
journal: ContextSection::new("Journal"),
conversation: ContextSection::new("Conversation"),
working_stack: Vec::new(),
};
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
let mut agent = Self {
client,
@ -250,7 +235,6 @@ impl Agent {
};
agent.load_startup_journal();
agent.load_working_stack();
agent
}
@ -287,165 +271,41 @@ impl Agent {
}
}
/// Assemble the full message list for the API call from typed sources.
/// System prompt + personality context + journal + conversation messages.
pub fn assemble_api_messages(&self) -> Vec<Message> {
let mut msgs = Vec::new();
// System section
for e in self.context.system.entries() {
msgs.push(e.entry.api_message().clone());
}
// Identity — render personality files + working stack into one user message
let ctx = self.context.render_context_message();
if !ctx.is_empty() {
msgs.push(Message::user(ctx));
}
// Journal — render into one user message
let jnl = self.context.render_journal();
if !jnl.is_empty() {
msgs.push(Message::user(jnl));
}
// Conversation entries
msgs.extend(self.context.conversation.entries().iter()
.filter(|e| !e.entry.is_log() && !e.entry.is_thinking())
.map(|e| e.entry.api_message().clone()));
msgs
}
/// Assemble the full prompt as token IDs for the completions API.
/// System section (includes tools), identity, journal, conversation,
/// then the assistant prompt suffix.
/// Assemble the full prompt as token IDs.
/// Context sections + assistant prompt suffix.
pub fn assemble_prompt_tokens(&self) -> Vec<u32> {
let mut tokens = Vec::new();
// System section — includes system prompt + tool definitions
for e in self.context.system.entries() {
tokens.extend(&e.token_ids);
}
// Identity — rendered as one user message
let ctx = self.context.render_context_message();
if !ctx.is_empty() {
tokens.extend(tokenizer::tokenize_entry("user", &ctx));
}
// Journal — rendered as one user message
let jnl = self.context.render_journal();
if !jnl.is_empty() {
tokens.extend(tokenizer::tokenize_entry("user", &jnl));
}
// Conversation entries — use cached token_ids
for e in self.context.conversation.entries() {
if e.entry.is_log() || e.entry.is_thinking() { continue; }
tokens.extend(&e.token_ids);
}
// Prompt the assistant to respond
let mut tokens = self.context.token_ids();
tokens.push(tokenizer::IM_START);
tokens.extend(tokenizer::encode("assistant\n"));
tokens
}
/// Run agent orchestration cycle, returning structured output.
/// Push a conversation message — stamped and logged.
pub fn push_message(&mut self, mut msg: Message) {
msg.stamp();
let entry = ConversationEntry::Message(msg);
self.push_entry(entry);
}
pub fn push_entry(&mut self, entry: ConversationEntry) {
/// Push a node into the conversation and log it.
pub fn push_node(&mut self, node: AstNode) {
if let Some(ref log) = self.conversation_log {
if let Err(e) = log.append(&entry) {
if let Err(e) = log.append_node(&node) {
eprintln!("warning: failed to log entry: {:#}", e);
}
}
self.context.conversation.push(ContextEntry::new(
entry, Some(chrono::Utc::now())));
self.context.push(Section::Conversation, node);
self.changed.notify_one();
}
/// Find the index of the in-progress streaming entry (unstamped assistant message).
fn streaming_index(&self) -> Option<usize> {
self.context.conversation.entries().iter().rposition(|ce| {
if ce.token_ids.is_empty() { return false; }
let m = ce.entry.message();
m.role == Role::Assistant && m.timestamp.is_none()
})
}
/// Append streaming text to the last entry (creating a partial
/// assistant entry if needed). Called by collect_stream per token batch.
fn append_streaming(&mut self, text: &str) {
if let Some(idx) = self.streaming_index() {
assert!(!self.context.conversation.entries()[idx].token_ids.is_empty(),
"streaming_index returned entry with empty token_ids at {}", idx);
let mut msg = self.context.conversation.entries()[idx].entry.message().clone();
msg.append_content(text);
self.context.conversation.set_message(idx, msg);
} else {
self.context.conversation.push(ContextEntry::new(
ConversationEntry::Message(Message {
role: Role::Assistant,
content: Some(MessageContent::Text(text.to_string())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: None,
}),
None,
));
}
self.changed.notify_one();
}
/// Finalize the streaming entry with the complete response message.
/// Finds the unstamped assistant entry, replaces it via set() with proper token count.
fn finalize_streaming(&mut self, msg: Message) {
if let Some(i) = self.streaming_index() {
let mut stamped = msg.clone();
stamped.stamp();
self.context.conversation.set(i, ContextEntry::new(
ConversationEntry::Message(stamped),
Some(chrono::Utc::now()),
));
} else {
self.push_message(msg.clone());
}
// Log the finalized entry
if let Some(ref log) = self.conversation_log {
let entry = ConversationEntry::Message(msg);
if let Err(e) = log.append(&entry) {
eprintln!("warning: failed to log finalized entry: {:#}", e);
}
}
self.changed.notify_one();
}
/// Send a user message and run the agent loop until the model
/// produces a text response (no more tool calls). Streams text
/// and tool activity through the UI channel.
///
/// Takes Arc<Mutex<Agent>> and manages locking internally so the
/// lock is never held across I/O (API streaming, tool dispatch).
/// Run the agent turn loop: assemble prompt, stream response,
/// parse into AST, dispatch tool calls, repeat until text response.
pub async fn turn(
agent: Arc<tokio::sync::Mutex<Agent>>,
) -> Result<TurnResult> {
// --- Pre-loop setup (lock 1): collect finished tools ---
let active_tools = {
let mut finished = Vec::new();
let tools = {
let me = agent.lock().await;
let me = agent.lock().await;
me.active_tools.clone()
};
// Collect completed background tool handles — remove from active list
// but don't await yet (MutexGuard isn't Send).
let mut tools = me.active_tools.lock().unwrap();
// Collect finished background tools
{
let mut finished = Vec::new();
{
let mut tools = active_tools.lock().unwrap();
let mut i = 0;
while i < tools.len() {
if tools[i].handle.is_finished() {
@ -454,192 +314,181 @@ impl Agent {
i += 1;
}
}
me.active_tools.clone()
};
// Await finished handles without holding the agent lock
let mut bg_results = Vec::new();
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
bg_results.push((call, output));
}
}
// Re-acquire to apply background tool results
if !bg_results.is_empty() {
if !finished.is_empty() {
let mut results = Vec::new();
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
results.push((call, output));
}
}
let mut me = agent.lock().await;
let mut bg_ds = DispatchState::new();
for (call, output) in bg_results {
for (call, output) in results {
me.apply_tool_result(&call, output, &mut bg_ds);
}
}
tools
};
}
let mut overflow_retries: u32 = 0;
let mut empty_retries: u32 = 0;
let mut ds = DispatchState::new();
loop {
// --- Lock 2: assemble messages, start stream ---
let _thinking = start_activity(&agent, "thinking...").await;
// Assemble prompt and start stream (brief lock)
let (mut rx, _stream_guard) = {
let me = agent.lock().await;
let sampling = api::SamplingParams {
temperature: me.temperature,
top_p: me.top_p,
top_k: me.top_k,
};
if tokenizer::is_initialized() {
let prompt_tokens = me.assemble_prompt_tokens();
me.client.start_stream_completions(
&prompt_tokens,
sampling,
None,
)
} else {
let api_messages = me.assemble_api_messages();
me.client.start_stream(
&api_messages,
&me.tools,
&me.reasoning_effort,
sampling,
None,
)
}
let prompt_tokens = me.assemble_prompt_tokens();
me.client.stream_completion(
&prompt_tokens,
api::SamplingParams {
temperature: me.temperature,
top_p: me.top_p,
top_k: me.top_k,
},
None,
)
};
// --- Lock released ---
// --- Stream loop (no lock) ---
let sr = api::collect_stream(
&mut rx, &agent, &active_tools,
).await;
let api::StreamResult {
content, tool_calls, usage, finish_reason,
error: stream_error, display_buf, in_tool_call, reasoning,
} = sr;
// --- Stream complete ---
// Push thinking entry if model produced reasoning
if !reasoning.is_empty() {
// Create assistant branch and parser (brief lock)
let branch_idx = {
let mut me = agent.lock().await;
me.push_entry(context::ConversationEntry::Thinking(reasoning));
let idx = me.context.len(Section::Conversation);
me.context.push(Section::Conversation,
AstNode::branch(Role::Assistant, vec![]));
idx
};
let mut parser = ResponseParser::new(branch_idx);
let mut pending_calls: Vec<PendingToolCall> = Vec::new();
let mut had_content = false;
let mut stream_error: Option<String> = None;
// Stream loop — no lock held across I/O
while let Some(event) = rx.recv().await {
match event {
api::StreamToken::Token { text, id: _ } => {
had_content = true;
let mut me = agent.lock().await;
let calls = parser.feed(&text, &mut me.context);
for call in calls {
// Dispatch tool call immediately
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let args: serde_json::Value =
serde_json::from_str(&call_clone.arguments).unwrap_or_default();
let output = tools::dispatch_with_agent(
&call_clone.name, &args, Some(agent_handle),
).await;
(call_clone, output)
});
active_tools.lock().unwrap().push(tools::ActiveToolCall {
id: call.id.clone(),
name: call.name.clone(),
detail: call.arguments.clone(),
started: std::time::Instant::now(),
background: false,
handle,
});
pending_calls.push(call);
}
}
api::StreamToken::Error(e) => {
stream_error = Some(e);
break;
}
api::StreamToken::Done { usage } => {
if let Some(u) = usage {
agent.lock().await.last_prompt_tokens = u.prompt_tokens;
}
break;
}
}
}
// --- Lock 3: process results ---
let (msg, pending) = {
// Flush parser remainder
{
let mut me = agent.lock().await;
parser.finish(&mut me.context);
}
// Handle stream errors with retry logic
if let Some(e) = stream_error {
let err = anyhow::anyhow!("{}", e);
if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 {
overflow_retries += 1;
me.notify(format!("context overflow — retrying ({}/2)", overflow_retries));
me.compact();
continue;
}
if crate::agent::context::is_stream_error(&err) && empty_retries < 2 {
empty_retries += 1;
me.notify(format!("stream error — retrying ({}/2)", empty_retries));
drop(me);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
return Err(err);
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
return Err(anyhow::anyhow!("model stream error: {}", detail));
}
// Flush remaining display buffer to streaming entry
if !in_tool_call && !display_buf.is_empty() {
me.append_streaming(&display_buf);
}
let msg = api::build_response_message(content, tool_calls);
if let Some(usage) = &usage {
me.last_prompt_tokens = usage.prompt_tokens;
}
// Empty response — nudge and retry
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty());
if !has_content && !has_tools {
if empty_retries < 2 {
empty_retries += 1;
dbglog!(
"empty response, injecting nudge and retrying ({}/2)",
empty_retries,
);
me.push_message(Message::user(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
continue;
}
} else {
empty_retries = 0;
}
// Collect non-background tool calls fired during streaming
let mut tools_guard = active_tools.lock().unwrap();
let mut non_bg = Vec::new();
let mut i = 0;
while i < tools_guard.len() {
if !tools_guard[i].background {
non_bg.push(tools_guard.remove(i));
} else {
i += 1;
}
}
(msg, non_bg)
};
if !pending.is_empty() {
agent.lock().await.finalize_streaming(msg.clone());
// Drop lock before awaiting tool handles
let mut results = Vec::new();
for entry in pending {
if let Ok(r) = entry.handle.await {
results.push(r);
}
}
// Reacquire to apply results
// Handle errors
if let Some(e) = stream_error {
let err = anyhow::anyhow!("{}", e);
let mut me = agent.lock().await;
for (call, output) in results {
me.apply_tool_result(&call, output, &mut ds);
if context_new::is_context_overflow(&err) && overflow_retries < 2 {
overflow_retries += 1;
me.notify(format!("context overflow — retrying ({}/2)", overflow_retries));
me.compact();
continue;
}
if context_new::is_stream_error(&err) && empty_retries < 2 {
empty_retries += 1;
me.notify(format!("stream error — retrying ({}/2)", empty_retries));
drop(me);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
return Err(err);
}
// Empty response — nudge and retry
if !had_content && pending_calls.is_empty() {
if empty_retries < 2 {
empty_retries += 1;
let mut me = agent.lock().await;
me.push_node(AstNode::user_msg(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
continue;
}
} else {
empty_retries = 0;
}
// Wait for tool calls to complete
if !pending_calls.is_empty() {
ds.had_tool_calls = true;
// Collect non-background tool handles
let mut handles = Vec::new();
{
let mut tools_guard = active_tools.lock().unwrap();
let mut i = 0;
while i < tools_guard.len() {
if !tools_guard[i].background {
handles.push(tools_guard.remove(i));
} else {
i += 1;
}
}
}
for entry in handles {
if let Ok((call, output)) = entry.handle.await {
let mut me = agent.lock().await;
me.apply_tool_result(&call, output, &mut ds);
}
}
continue;
}
// Tool calls (structured API path)
if let Some(ref tool_calls) = msg.tool_calls {
if !tool_calls.is_empty() {
agent.lock().await.finalize_streaming(msg.clone());
let calls: Vec<ToolCall> = tool_calls.clone();
// Drop lock before tool dispatch
for call in &calls {
Agent::dispatch_tool_call_unlocked(
&agent, &active_tools, call, &mut ds,
).await;
}
continue;
}
}
// Text-only response — extract text and return
let text = {
let me = agent.lock().await;
let children = me.context.conversation()[branch_idx].children();
children.iter()
.filter_map(|c| c.leaf())
.filter(|l| matches!(l.body(), NodeBody::Content(_)))
.map(|l| l.body().text())
.collect::<Vec<_>>()
.join("")
};
// Genuinely text-only response
let text = msg.content_text().to_string();
let mut me = agent.lock().await;
me.finalize_streaming(msg);
// Drain pending control flags
if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; }
if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); }
if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; }