consciousness/src/agent/mod.rs
ProofOfConcept e9b26f5d45 tools: modernize working_stack, remove special-case dispatch
working_stack now uses the Tool format with an Agent handle —
it locks the agent and modifies the stack directly. The special-case
interception in the turn loop is removed. All tools go through
the unified registry dispatch.

Also passes agent handle to all spawned tool tasks so any tool
that needs Agent access can use it.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-04 18:19:21 -04:00

1122 lines
46 KiB
Rust

// agent.rs — Core agent loop
//
// The simplest possible implementation of the agent pattern:
// send messages + tool definitions to the model, if it responds
// with tool calls then dispatch them and loop, if it responds
// with text then display it and wait for the next prompt.
//
// Uses streaming by default so text tokens appear as they're
// generated. Tool calls are accumulated from stream deltas and
// dispatched after the stream completes.
//
// The DMN (dmn.rs) is the outer loop that decides what prompts
// to send here. This module just handles single turns: prompt
// in, response out, tool calls dispatched.
pub mod api;
pub mod context;
pub mod parsing;
pub mod tools;
pub mod training;
use std::sync::Arc;
use anyhow::Result;
use tiktoken_rs::CoreBPE;
use api::{ApiClient, StreamEvent};
use context as journal;
use tools::{ToolCall, ToolDef, FunctionCall, summarize_args};
use crate::user::log::ConversationLog;
use crate::agent::api::types::*;
use crate::agent::context::{
ConversationEntry, ContextState, ContextBudget,
working_stack_instructions_path, working_stack_file_path,
};
use crate::user::ui_channel::{ContextSection, SharedContextState, StreamTarget, StatusInfo, UiMessage, UiSender};
/// Result of a single agent turn.
pub struct TurnResult {
/// The text response (already sent through UI channel).
#[allow(dead_code)]
pub text: String,
/// Whether the model called yield_to_user during this turn.
pub yield_requested: bool,
/// Whether any tools (other than yield_to_user) were called.
pub had_tool_calls: bool,
/// Number of tool calls that returned errors this turn.
pub tool_errors: u32,
/// Model name to switch to after this turn completes.
pub model_switch: Option<String>,
/// Agent requested DMN pause (full stop on autonomous behavior).
pub dmn_pause: bool,
}
/// Accumulated state across tool dispatches within a single turn.
struct DispatchState {
yield_requested: bool,
had_tool_calls: bool,
tool_errors: u32,
model_switch: Option<String>,
dmn_pause: bool,
}
impl DispatchState {
fn new() -> Self {
Self {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
}
}
}
pub struct Agent {
client: ApiClient,
tool_defs: Vec<ToolDef>,
/// Last known prompt token count from the API (tracks context size).
last_prompt_tokens: u32,
/// Current reasoning effort level ("none", "low", "high").
pub reasoning_effort: String,
/// Sampling parameters — adjustable at runtime from the thalamus screen.
pub temperature: f32,
pub top_p: f32,
pub top_k: u32,
/// Control tool flags — set by tool handlers, consumed by turn loop.
pub pending_yield: bool,
pub pending_model_switch: Option<String>,
pub pending_dmn_pause: bool,
/// Persistent conversation log — append-only record of all messages.
conversation_log: Option<ConversationLog>,
/// BPE tokenizer for token counting (cl100k_base — close enough
/// for Claude and Qwen budget allocation, ~85-90% count accuracy).
tokenizer: CoreBPE,
/// Mutable context state — personality, working stack, etc.
pub context: ContextState,
/// Shared live context summary — TUI reads this directly for debug screen.
pub shared_context: SharedContextState,
/// App config — used to reload identity on compaction.
app_config: crate::config::AppConfig,
pub prompt_file: String,
/// Stable session ID for memory-search dedup across turns.
pub session_id: String,
/// Agent orchestration state (surface-observe, journal, reflect).
/// TODO: move to Session — it's session-level, not agent-level.
pub agent_cycles: crate::subconscious::subconscious::AgentCycleState,
/// Shared active tools — Agent writes, TUI reads.
pub active_tools: crate::user::ui_channel::SharedActiveTools,
}
fn render_journal(entries: &[journal::JournalEntry]) -> String {
if entries.is_empty() { return String::new(); }
let mut text = String::from("[Earlier — from your journal]\n\n");
for entry in entries {
use std::fmt::Write;
writeln!(text, "## {}\n{}\n", entry.timestamp.format("%Y-%m-%dT%H:%M"), entry.content).ok();
}
text
}
impl Agent {
pub fn new(
client: ApiClient,
system_prompt: String,
personality: Vec<(String, String)>,
app_config: crate::config::AppConfig,
prompt_file: String,
conversation_log: Option<ConversationLog>,
shared_context: SharedContextState,
active_tools: crate::user::ui_channel::SharedActiveTools,
) -> Self {
let tool_defs = tools::definitions();
let tokenizer = tiktoken_rs::cl100k_base()
.expect("failed to load cl100k_base tokenizer");
let context = ContextState {
system_prompt: system_prompt.clone(),
personality,
journal: Vec::new(),
working_stack: Vec::new(),
entries: Vec::new(),
};
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
let agent_cycles = crate::subconscious::subconscious::AgentCycleState::new(&session_id);
let mut agent = Self {
client,
tool_defs,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
top_p: 0.95,
top_k: 20,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
conversation_log,
tokenizer,
context,
shared_context,
app_config,
prompt_file,
session_id,
agent_cycles,
active_tools,
};
agent.load_startup_journal();
agent.load_working_stack();
agent.publish_context_state();
agent
}
/// Assemble the full message list for the API call from typed sources.
/// System prompt + personality context + journal + conversation messages.
fn assemble_api_messages(&self) -> Vec<Message> {
let mut msgs = Vec::new();
msgs.push(Message::system(&self.context.system_prompt));
let ctx = self.context.render_context_message();
if !ctx.is_empty() {
msgs.push(Message::user(ctx));
}
let jnl = render_journal(&self.context.journal);
if !jnl.is_empty() {
msgs.push(Message::user(jnl));
}
msgs.extend(self.context.entries.iter().map(|e| e.api_message().clone()));
msgs
}
/// Run agent orchestration cycle, returning structured output.
fn run_agent_cycle(&mut self) -> crate::subconscious::subconscious::AgentCycleOutput {
let transcript_path = self.conversation_log.as_ref()
.map(|l| l.path().to_string_lossy().to_string())
.unwrap_or_default();
let session = crate::session::HookSession::from_fields(
self.session_id.clone(),
transcript_path,
"UserPromptSubmit".into(),
);
self.agent_cycles.trigger(&session);
std::mem::take(&mut self.agent_cycles.last_output)
}
/// Push a conversation message — stamped and logged.
fn push_message(&mut self, mut msg: Message) {
msg.stamp();
let entry = ConversationEntry::Message(msg);
self.push_entry(entry);
}
fn push_entry(&mut self, entry: ConversationEntry) {
if let Some(ref log) = self.conversation_log {
if let Err(e) = log.append(&entry) {
eprintln!("warning: failed to log entry: {:#}", e);
}
}
self.context.entries.push(entry);
}
/// Push a context-only message (system prompt, identity context,
/// journal summaries). Not logged — these are reconstructed on
/// every startup/compaction.
pub fn budget(&self) -> ContextBudget {
let count_str = |s: &str| self.tokenizer.encode_with_special_tokens(s).len();
let count_msg = |m: &Message| crate::agent::context::msg_token_count(&self.tokenizer, m);
let window = crate::agent::context::context_window();
self.context.budget(&count_str, &count_msg, window)
}
/// Send a user message and run the agent loop until the model
/// produces a text response (no more tool calls). Streams text
/// and tool activity through the UI channel.
///
/// Takes Arc<Mutex<Agent>> and manages locking internally so the
/// lock is never held across I/O (API streaming, tool dispatch).
pub async fn turn(
agent: Arc<tokio::sync::Mutex<Agent>>,
user_input: &str,
ui_tx: &UiSender,
target: StreamTarget,
) -> Result<TurnResult> {
// --- Pre-loop setup (lock 1): agent cycle, memories, user input ---
let active_tools = {
let mut me = agent.lock().await;
let cycle = me.run_agent_cycle();
for key in &cycle.surfaced_keys {
if let Some(rendered) = crate::cli::node::render_node(
&crate::store::Store::load().unwrap_or_default(), key,
) {
let mut msg = Message::user(format!(
"<system-reminder>\n--- {} (surfaced) ---\n{}\n</system-reminder>",
key, rendered,
));
msg.stamp();
me.push_entry(ConversationEntry::Memory { key: key.clone(), message: msg });
}
}
if let Some(ref reflection) = cycle.reflection {
me.push_message(Message::user(format!(
"<system-reminder>\n--- subconscious reflection ---\n{}\n</system-reminder>",
reflection.trim(),
)));
}
// Collect completed background tool calls
let mut bg_ds = DispatchState::new();
let finished: Vec<_> = {
let mut tools = me.active_tools.lock().unwrap();
let mut done = Vec::new();
let mut i = 0;
while i < tools.len() {
if tools[i].handle.is_finished() {
done.push(tools.remove(i));
} else {
i += 1;
}
}
done
};
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
me.apply_tool_result(&call, output, ui_tx, &mut bg_ds);
}
}
me.push_message(Message::user(user_input));
let _ = ui_tx.send(UiMessage::AgentUpdate(me.agent_cycles.snapshots()));
me.active_tools.clone()
};
// --- Lock released ---
let mut overflow_retries: u32 = 0;
let mut empty_retries: u32 = 0;
let mut ds = DispatchState::new();
loop {
let _ = ui_tx.send(UiMessage::Activity("thinking...".into()));
// --- Lock 2: assemble messages, start stream ---
let (mut rx, _stream_guard) = {
let me = agent.lock().await;
let api_messages = me.assemble_api_messages();
let sampling = api::SamplingParams {
temperature: me.temperature,
top_p: me.top_p,
top_k: me.top_k,
};
me.client.start_stream(
&api_messages,
Some(&me.tool_defs),
ui_tx,
&me.reasoning_effort,
sampling,
None,
)
};
// --- Lock released ---
// --- Stream loop (no lock) ---
let mut content = String::new();
let mut tool_calls: Vec<ToolCall> = Vec::new();
let mut usage = None;
let mut finish_reason = None;
let mut in_tool_call = false;
let mut tool_call_buf = String::new();
let mut stream_error = None;
let mut first_content = true;
let mut display_buf = String::new();
while let Some(event) = rx.recv().await {
match event {
StreamEvent::Content(text) => {
if first_content {
let _ = ui_tx.send(UiMessage::Activity("streaming...".into()));
first_content = false;
}
content.push_str(&text);
if in_tool_call {
tool_call_buf.push_str(&text);
if let Some(end) = tool_call_buf.find("</tool_call>") {
let body = &tool_call_buf[..end];
if let Some(call) = crate::agent::parsing::parse_tool_call_body(body) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
let args_summary = summarize_args(&call.function.name, &args);
let _ = ui_tx.send(UiMessage::ToolCall {
name: call.function.name.clone(),
args_summary: args_summary.clone(),
});
let is_background = args.get("run_in_background")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let call_id = call.id.clone();
let call_name = call.function.name.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let output = tools::dispatch_with_agent(&call.function.name, &args, Some(agent_handle)).await;
(call, output)
});
active_tools.lock().unwrap().push(
tools::ActiveToolCall {
id: call_id,
name: call_name,
detail: args_summary,
started: std::time::Instant::now(),
background: is_background,
handle,
}
);
}
let remaining = tool_call_buf[end + "</tool_call>".len()..].to_string();
tool_call_buf.clear();
in_tool_call = false;
if !remaining.trim().is_empty() {
display_buf.push_str(&remaining);
}
}
} else {
display_buf.push_str(&text);
if let Some(pos) = display_buf.find("<tool_call>") {
let before = &display_buf[..pos];
if !before.is_empty() {
let _ = ui_tx.send(UiMessage::TextDelta(before.to_string(), target));
}
display_buf.clear();
in_tool_call = true;
} else {
let safe = display_buf.len().saturating_sub(10);
let safe = display_buf.floor_char_boundary(safe);
if safe > 0 {
let flush = display_buf[..safe].to_string();
display_buf = display_buf[safe..].to_string();
let _ = ui_tx.send(UiMessage::TextDelta(flush, target));
}
}
}
}
StreamEvent::Reasoning(text) => {
let _ = ui_tx.send(UiMessage::Reasoning(text));
}
StreamEvent::ToolCallDelta { index, id, call_type, name, arguments } => {
while tool_calls.len() <= index {
tool_calls.push(ToolCall {
id: String::new(),
call_type: "function".to_string(),
function: FunctionCall { name: String::new(), arguments: String::new() },
});
}
if let Some(id) = id { tool_calls[index].id = id; }
if let Some(ct) = call_type { tool_calls[index].call_type = ct; }
if let Some(n) = name { tool_calls[index].function.name = n; }
if let Some(a) = arguments { tool_calls[index].function.arguments.push_str(&a); }
}
StreamEvent::Usage(u) => usage = Some(u),
StreamEvent::Finished { reason, .. } => {
finish_reason = Some(reason);
break;
}
StreamEvent::Error(e) => {
stream_error = Some(e);
break;
}
}
}
// --- Stream complete ---
// --- Lock 3: process results ---
{
let mut me = agent.lock().await;
// Handle stream errors with retry logic
if let Some(e) = stream_error {
let err = anyhow::anyhow!("{}", e);
if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 {
overflow_retries += 1;
let _ = ui_tx.send(UiMessage::Info(format!(
"[context overflow — compacting and retrying ({}/2)]",
overflow_retries,
)));
me.compact();
continue;
}
if crate::agent::context::is_stream_error(&err) && empty_retries < 2 {
empty_retries += 1;
let _ = ui_tx.send(UiMessage::Info(format!(
"[stream error: {} — retrying ({}/2)]",
e, empty_retries,
)));
drop(me);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
let _ = ui_tx.send(UiMessage::Activity(String::new()));
return Err(err);
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
let _ = ui_tx.send(UiMessage::Activity(String::new()));
return Err(anyhow::anyhow!("model stream error: {}", detail));
}
// Flush remaining display buffer
if !in_tool_call && !display_buf.is_empty() {
let _ = ui_tx.send(UiMessage::TextDelta(display_buf, target));
}
if !content.is_empty() && !in_tool_call {
let _ = ui_tx.send(UiMessage::TextDelta("\n".to_string(), target));
}
let msg = api::build_response_message(content, tool_calls);
if let Some(usage) = &usage {
me.last_prompt_tokens = usage.prompt_tokens;
me.publish_context_state();
let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: String::new(),
dmn_turns: 0,
dmn_max_turns: 0,
prompt_tokens: usage.prompt_tokens,
completion_tokens: usage.completion_tokens,
model: me.client.model.clone(),
turn_tools: 0,
context_budget: me.budget().status_string(),
}));
}
// Empty response — nudge and retry
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty());
if !has_content && !has_tools {
if empty_retries < 2 {
empty_retries += 1;
let _ = ui_tx.send(UiMessage::Debug(format!(
"empty response, injecting nudge and retrying ({}/2)",
empty_retries,
)));
me.push_message(Message::user(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
continue;
}
} else {
empty_retries = 0;
}
// Collect non-background tool calls fired during streaming
let pending: Vec<_> = {
let mut tools_guard = active_tools.lock().unwrap();
let mut non_bg = Vec::new();
let mut i = 0;
while i < tools_guard.len() {
if !tools_guard[i].background {
non_bg.push(tools_guard.remove(i));
} else {
i += 1;
}
}
non_bg
};
if !pending.is_empty() {
me.push_message(msg.clone());
// Drop lock before awaiting tool handles
drop(me);
let mut results = Vec::new();
for entry in pending {
if let Ok(r) = entry.handle.await {
results.push(r);
}
}
// Reacquire to apply results
let mut me = agent.lock().await;
for (call, output) in results {
me.apply_tool_result(&call, output, ui_tx, &mut ds);
}
me.publish_context_state();
continue;
}
// Tool calls (structured API path)
if let Some(ref tool_calls) = msg.tool_calls {
if !tool_calls.is_empty() {
me.push_message(msg.clone());
let calls: Vec<ToolCall> = tool_calls.clone();
// Drop lock before tool dispatch
drop(me);
for call in &calls {
Agent::dispatch_tool_call_unlocked(
&agent, &active_tools, call, ui_tx, &mut ds,
).await;
}
continue;
}
}
// Genuinely text-only response
let text = msg.content_text().to_string();
let _ = ui_tx.send(UiMessage::Activity(String::new()));
me.push_message(msg);
// Drain pending control flags
if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; }
if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); }
if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; }
return Ok(TurnResult {
text,
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
}
}
}
/// Dispatch a tool call without holding the agent lock across I/O.
/// Used by `turn()` which manages its own locking.
async fn dispatch_tool_call_unlocked(
agent: &Arc<tokio::sync::Mutex<Agent>>,
active_tools: &crate::user::ui_channel::SharedActiveTools,
call: &ToolCall,
ui_tx: &UiSender,
ds: &mut DispatchState,
) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
let args_summary = summarize_args(&call.function.name, &args);
let _ = ui_tx.send(UiMessage::Activity(format!("calling: {}", call.function.name)));
let _ = ui_tx.send(UiMessage::ToolCall {
name: call.function.name.clone(),
args_summary: args_summary.clone(),
});
// Spawn tool, track it
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let output = tools::dispatch_with_agent(&call_clone.function.name, &args, Some(agent_handle)).await;
(call_clone, output)
});
active_tools.lock().unwrap().push(
tools::ActiveToolCall {
id: call.id.clone(),
name: call.function.name.clone(),
detail: args_summary,
started: std::time::Instant::now(),
background: false,
handle,
}
);
// Pop it back and await — no agent lock held
let entry = {
let mut tools = active_tools.lock().unwrap();
tools.pop().unwrap()
};
if let Ok((call, output)) = entry.handle.await {
// Brief lock to apply result
let mut me = agent.lock().await;
me.apply_tool_result(&call, output, ui_tx, ds);
}
}
/// Apply a completed tool result to conversation state.
fn apply_tool_result(
&mut self,
call: &ToolCall,
output: String,
ui_tx: &UiSender,
ds: &mut DispatchState,
) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
ds.had_tool_calls = true;
if output.starts_with("Error:") {
ds.tool_errors += 1;
}
let _ = ui_tx.send(UiMessage::ToolResult {
name: call.function.name.clone(),
result: output.clone(),
});
self.active_tools.lock().unwrap().retain(|t| t.id != call.id);
// Tag memory_render results for context deduplication
if call.function.name == "memory_render" && !output.starts_with("Error:") {
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
let mut msg = Message::tool_result(&call.id, &output);
msg.stamp();
self.push_entry(ConversationEntry::Memory { key: key.to_string(), message: msg });
self.publish_context_state();
return;
}
}
self.push_message(Message::tool_result(&call.id, &output));
}
/// Build context state summary for the debug screen.
pub fn context_state_summary(&self, memory_scores: Option<&crate::agent::training::MemoryScore>) -> Vec<ContextSection> {
let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len();
let mut sections = Vec::new();
// System prompt
sections.push(ContextSection {
name: "System prompt".into(),
tokens: count(&self.context.system_prompt),
content: self.context.system_prompt.clone(),
children: Vec::new(),
});
// Personality — parent with file children
let personality_children: Vec<ContextSection> = self.context.personality.iter()
.map(|(name, content)| ContextSection {
name: name.clone(),
tokens: count(content),
content: content.clone(),
children: Vec::new(),
})
.collect();
let personality_tokens: usize = personality_children.iter().map(|c| c.tokens).sum();
sections.push(ContextSection {
name: format!("Personality ({} files)", personality_children.len()),
tokens: personality_tokens,
content: String::new(),
children: personality_children,
});
// Journal
{
let journal_children: Vec<ContextSection> = self.context.journal.iter()
.map(|entry| {
let preview: String = entry.content.lines()
.find(|l| !l.trim().is_empty())
.unwrap_or("").chars().take(60).collect();
ContextSection {
name: format!("{}: {}", entry.timestamp.format("%Y-%m-%dT%H:%M"), preview),
tokens: count(&entry.content),
content: entry.content.clone(),
children: Vec::new(),
}
})
.collect();
let journal_tokens: usize = journal_children.iter().map(|c| c.tokens).sum();
sections.push(ContextSection {
name: format!("Journal ({} entries)", journal_children.len()),
tokens: journal_tokens,
content: String::new(),
children: journal_children,
});
}
// Working stack — instructions + items as children
let instructions = std::fs::read_to_string(working_stack_instructions_path())
.unwrap_or_default();
let mut stack_children = vec![ContextSection {
name: "Instructions".into(),
tokens: count(&instructions),
content: instructions,
children: Vec::new(),
}];
for (i, item) in self.context.working_stack.iter().enumerate() {
let marker = if i == self.context.working_stack.len() - 1 { "" } else { " " };
stack_children.push(ContextSection {
name: format!("{} [{}] {}", marker, i, item),
tokens: count(item),
content: String::new(),
children: Vec::new(),
});
}
let stack_tokens: usize = stack_children.iter().map(|c| c.tokens).sum();
sections.push(ContextSection {
name: format!("Working stack ({} items)", self.context.working_stack.len()),
tokens: stack_tokens,
content: String::new(),
children: stack_children,
});
// Memory nodes — extracted from Memory entries in the conversation
let memory_entries: Vec<&ConversationEntry> = self.context.entries.iter()
.filter(|e| e.is_memory())
.collect();
if !memory_entries.is_empty() {
let node_children: Vec<ContextSection> = memory_entries.iter()
.map(|entry| {
let key = match entry {
ConversationEntry::Memory { key, .. } => key.as_str(),
_ => unreachable!(),
};
let text = entry.message().content_text();
let score = memory_scores
.and_then(|s| s.memory_weights.iter()
.find(|(k, _)| k == key)
.map(|(_, v)| *v));
let label = match score {
Some(v) => format!("{} (importance: {:.1})", key, v),
None => key.to_string(),
};
ContextSection {
name: label,
tokens: count(text),
content: String::new(),
children: Vec::new(),
}
})
.collect();
let node_tokens: usize = node_children.iter().map(|c| c.tokens).sum();
sections.push(ContextSection {
name: format!("Memory nodes ({} loaded)", memory_entries.len()),
tokens: node_tokens,
content: String::new(),
children: node_children,
});
}
// Conversation — each message as a child
let conv_messages = &self.context.entries;
let conv_children: Vec<ContextSection> = conv_messages.iter().enumerate()
.map(|(i, entry)| {
let m = entry.message();
let text = m.content.as_ref()
.map(|c| c.as_text().to_string())
.unwrap_or_default();
let tool_info = m.tool_calls.as_ref().map(|tc| {
tc.iter()
.map(|c| c.function.name.clone())
.collect::<Vec<_>>()
.join(", ")
});
let label = if entry.is_memory() {
if let ConversationEntry::Memory { key, .. } = entry {
format!("[memory: {}]", key)
} else { unreachable!() }
} else {
match &tool_info {
Some(tools) => format!("[tool_call: {}]", tools),
None => {
let preview: String = text.chars().take(60).collect();
let preview = preview.replace('\n', " ");
if text.len() > 60 { format!("{}...", preview) } else { preview }
}
}
};
let tokens = count(&text);
let cfg = crate::config::get();
let role_name = if entry.is_memory() { "mem".to_string() } else {
match m.role {
Role::Assistant => cfg.assistant_name.clone(),
Role::User => cfg.user_name.clone(),
Role::Tool => "tool".to_string(),
Role::System => "system".to_string(),
}
};
// Show which memories were important for this response
let children = if m.role == Role::Assistant {
memory_scores
.map(|s| s.important_memories_for_entry(i))
.unwrap_or_default()
.into_iter()
.map(|(key, score)| ContextSection {
name: format!("← {} ({:.1})", key, score),
tokens: 0,
content: String::new(),
children: Vec::new(),
})
.collect()
} else {
Vec::new()
};
ContextSection {
name: format!("[{}] {}: {}", i, role_name, label),
tokens,
content: text,
children,
}
})
.collect();
let conv_tokens: usize = conv_children.iter().map(|c| c.tokens).sum();
sections.push(ContextSection {
name: format!("Conversation ({} messages)", conv_children.len()),
tokens: conv_tokens,
content: String::new(),
children: conv_children,
});
sections
}
/// Load recent journal entries at startup for orientation.
/// Uses the same budget logic as compaction but with empty conversation.
/// Only parses the tail of the journal file (last 64KB) for speed.
fn load_startup_journal(&mut self) {
let store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return,
};
// Find oldest message timestamp in conversation log
let oldest_msg_ts = self.conversation_log.as_ref()
.and_then(|log| log.oldest_timestamp());
// Get journal entries from the memory graph
let mut journal_nodes: Vec<_> = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.collect();
let mut dbg = std::fs::OpenOptions::new().create(true).append(true)
.open("/tmp/poc-journal-debug.log").ok();
macro_rules! dbg_log {
($($arg:tt)*) => {
if let Some(ref mut f) = dbg { use std::io::Write; let _ = writeln!(f, $($arg)*); }
}
}
dbg_log!("[journal] {} nodes, oldest_msg={:?}", journal_nodes.len(), oldest_msg_ts);
journal_nodes.sort_by_key(|n| n.created_at);
if let Some(first) = journal_nodes.first() {
dbg_log!("[journal] first created_at={}", first.created_at);
}
if let Some(last) = journal_nodes.last() {
dbg_log!("[journal] last created_at={}", last.created_at);
}
// Find the cutoff index — entries older than conversation, plus one overlap
let cutoff_idx = if let Some(cutoff) = oldest_msg_ts {
let cutoff_ts = cutoff.timestamp();
dbg_log!("[journal] cutoff timestamp={}", cutoff_ts);
let mut idx = journal_nodes.len();
for (i, node) in journal_nodes.iter().enumerate() {
if node.created_at >= cutoff_ts {
idx = i + 1;
break;
}
}
idx
} else {
journal_nodes.len()
};
dbg_log!("[journal] cutoff_idx={}", cutoff_idx);
// Walk backwards from cutoff, accumulating entries within 15% of context
let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len();
let context_window = crate::agent::context::context_window();
let journal_budget = context_window * 15 / 100;
dbg_log!("[journal] budget={} tokens ({}*15%)", journal_budget, context_window);
let mut entries = Vec::new();
let mut total_tokens = 0;
for node in journal_nodes[..cutoff_idx].iter().rev() {
let tokens = count(&node.content);
if total_tokens + tokens > journal_budget && !entries.is_empty() {
break;
}
entries.push(journal::JournalEntry {
timestamp: chrono::DateTime::from_timestamp(node.created_at, 0)
.unwrap_or_default(),
content: node.content.clone(),
});
total_tokens += tokens;
}
entries.reverse();
dbg_log!("[journal] loaded {} entries, {} tokens", entries.len(), total_tokens);
if entries.is_empty() {
dbg_log!("[journal] no entries!");
return;
}
self.context.journal = entries;
dbg_log!("[journal] context.journal now has {} entries", self.context.journal.len());
}
/// Called after any change to context state (working stack, etc).
fn refresh_context_state(&mut self) {
self.publish_context_state();
self.save_working_stack();
}
/// Persist working stack to disk.
fn save_working_stack(&self) {
if let Ok(json) = serde_json::to_string(&self.context.working_stack) {
let _ = std::fs::write(working_stack_file_path(), json);
}
}
/// Load working stack from disk.
fn load_working_stack(&mut self) {
if let Ok(data) = std::fs::read_to_string(working_stack_file_path()) {
if let Ok(stack) = serde_json::from_str::<Vec<String>>(&data) {
self.context.working_stack = stack;
}
}
}
/// Push the current context summary to the shared state for the TUI to read.
pub fn publish_context_state(&self) {
self.publish_context_state_with_scores(None);
}
pub fn publish_context_state_with_scores(&self, memory_scores: Option<&crate::agent::training::MemoryScore>) {
let summary = self.context_state_summary(memory_scores);
if let Ok(mut dbg) = std::fs::OpenOptions::new().create(true).append(true)
.open("/tmp/poc-journal-debug.log") {
use std::io::Write;
for s in &summary {
let _ = writeln!(dbg, "[publish] {} ({} tokens, {} children)", s.name, s.tokens, s.children.len());
}
}
if let Ok(mut state) = self.shared_context.write() {
*state = summary;
}
}
/// Replace base64 image data in older messages with text placeholders.
/// Keeps the 2 most recent images live (enough for motion/comparison).
/// The tool result message before each image records what was loaded.
fn age_out_images(&mut self) {
// Find image entries newest-first, skip 1 (caller is about to add another)
let to_age: Vec<usize> = self.context.entries.iter().enumerate()
.rev()
.filter(|(_, e)| {
if let Some(MessageContent::Parts(parts)) = &e.message().content {
parts.iter().any(|p| matches!(p, ContentPart::ImageUrl { .. }))
} else { false }
})
.map(|(i, _)| i)
.skip(1) // keep 1 existing + 1 about to be added = 2 live
.collect();
for i in to_age {
let msg = self.context.entries[i].message_mut();
if let Some(MessageContent::Parts(parts)) = &msg.content {
let mut replacement = String::new();
for part in parts {
match part {
ContentPart::Text { text } => {
if !replacement.is_empty() { replacement.push('\n'); }
replacement.push_str(text);
}
ContentPart::ImageUrl { .. } => {
if !replacement.is_empty() { replacement.push('\n'); }
replacement.push_str("[image aged out]");
}
}
}
msg.content = Some(MessageContent::Text(replacement));
}
}
}
/// Strip ephemeral tool calls from the conversation history.
///
/// Last prompt token count reported by the API.
pub fn last_prompt_tokens(&self) -> u32 {
self.last_prompt_tokens
}
/// Rebuild the context window: reload identity, dedup, trim, reload journal.
pub fn compact(&mut self) {
// Reload identity from config
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
Ok((system_prompt, personality)) => {
self.context.system_prompt = system_prompt;
self.context.personality = personality;
}
Err(e) => {
eprintln!("warning: failed to reload identity: {:#}", e);
}
}
let before = self.context.entries.len();
let before_mem = self.context.entries.iter().filter(|e| e.is_memory()).count();
let before_conv = before - before_mem;
// Dedup memory, trim to budget, reload journal
let entries = self.context.entries.clone();
self.context.entries = crate::agent::context::trim_entries(
&self.context,
&entries,
&self.tokenizer,
);
let after = self.context.entries.len();
let after_mem = self.context.entries.iter().filter(|e| e.is_memory()).count();
let after_conv = after - after_mem;
dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})",
before, after, before_mem, after_mem, before_conv, after_conv);
let budget = self.budget();
dbglog!("[compact] budget: {}", budget.status_string());
self.load_startup_journal();
self.last_prompt_tokens = 0;
self.publish_context_state();
}
/// Restore from the conversation log. Builds the context window
/// the same way compact() does — journal summaries for old messages,
/// raw recent messages. This is the unified startup path.
/// Returns true if the log had content to restore.
pub fn restore_from_log(&mut self) -> bool {
let entries = match &self.conversation_log {
Some(log) => match log.read_tail(64 * 1024 * 1024) {
Ok(entries) if !entries.is_empty() => entries,
_ => return false,
},
None => return false,
};
// Load extra — compact() will dedup, trim, reload identity + journal
let all: Vec<_> = entries.into_iter()
.filter(|e| e.message().role != Role::System)
.collect();
let mem_count = all.iter().filter(|e| e.is_memory()).count();
let conv_count = all.len() - mem_count;
dbglog!("[restore] loaded {} entries from log (mem: {}, conv: {})",
all.len(), mem_count, conv_count);
self.context.entries = all;
self.compact();
// Estimate prompt tokens from budget so status bar isn't 0 on startup
let b = self.budget();
self.last_prompt_tokens = b.used() as u32;
true
}
/// Replace the API client (for model switching).
pub fn swap_client(&mut self, new_client: ApiClient) {
self.client = new_client;
}
/// Get the model identifier.
pub fn model(&self) -> &str {
&self.client.model
}
/// Get the conversation entries for persistence.
pub fn entries(&self) -> &[ConversationEntry] {
&self.context.entries
}
/// Mutable access to conversation entries (for /retry).
pub fn client_clone(&self) -> ApiClient {
self.client.clone()
}
pub fn entries_mut(&mut self) -> &mut Vec<ConversationEntry> {
&mut self.context.entries
}
}