consciousness/src/agent/mod.rs

1005 lines
38 KiB
Rust
Raw Normal View History

// agent.rs — Core agent loop
//
// The simplest possible implementation of the agent pattern:
// send messages + tool definitions to the model, if it responds
// with tool calls then dispatch them and loop, if it responds
// with text then display it and wait for the next prompt.
//
// Uses streaming by default so text tokens appear as they're
// generated. Tool calls are accumulated from stream deltas and
// dispatched after the stream completes.
//
// The DMN (dmn.rs) is the outer loop that decides what prompts
// to send here. This module just handles single turns: prompt
// in, response out, tool calls dispatched.
pub mod api;
pub mod context;
pub mod oneshot;
pub mod tokenizer;
pub mod tools;
use std::sync::Arc;
use anyhow::Result;
use api::{ApiClient, ToolCall};
use api::{ContentPart, Message, MessageContent, Role};
use context::{ConversationEntry, ContextEntry, ContextState};
use tools::{summarize_args, working_stack};
2026-04-05 01:48:11 -04:00
use crate::mind::log::ConversationLog;
use crate::agent::context::ContextSection;
// --- Activity tracking (RAII guards) ---
pub struct ActivityEntry {
pub id: u64,
pub label: String,
pub started: std::time::Instant,
/// Auto-expires this long after creation (or completion).
pub expires_at: std::time::Instant,
}
/// RAII guard — marks the activity "(complete)" on drop, starts expiry timer.
pub struct ActivityGuard {
agent: Arc<tokio::sync::Mutex<Agent>>,
id: u64,
}
const ACTIVITY_LINGER: std::time::Duration = std::time::Duration::from_secs(5);
impl Drop for ActivityGuard {
fn drop(&mut self) {
if let Ok(mut ag) = self.agent.try_lock() {
if let Some(entry) = ag.activities.iter_mut().find(|a| a.id == self.id) {
entry.label.push_str(" (complete)");
entry.expires_at = std::time::Instant::now() + ACTIVITY_LINGER;
}
}
}
}
impl Agent {
/// Register an activity, returns its ID. Caller creates the guard.
pub fn push_activity(&mut self, label: impl Into<String>) -> u64 {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(3600),
});
2026-04-05 23:04:10 -04:00
self.changed.notify_one();
id
}
/// Push a notification — auto-expires after 5 seconds.
pub fn notify(&mut self, label: impl Into<String>) {
self.expire_activities();
let id = self.next_activity_id;
self.next_activity_id += 1;
self.activities.push(ActivityEntry {
id, label: label.into(),
started: std::time::Instant::now(),
expires_at: std::time::Instant::now() + ACTIVITY_LINGER,
});
2026-04-05 23:04:10 -04:00
self.changed.notify_one();
}
/// Remove expired activities.
pub fn expire_activities(&mut self) {
let now = std::time::Instant::now();
self.activities.retain(|a| a.expires_at > now);
}
}
/// Create an activity guard from outside the lock.
pub fn activity_guard(agent: &Arc<tokio::sync::Mutex<Agent>>, id: u64) -> ActivityGuard {
ActivityGuard { agent: agent.clone(), id }
}
/// Convenience: lock, push activity, unlock, return guard.
pub async fn start_activity(agent: &Arc<tokio::sync::Mutex<Agent>>, label: impl Into<String>) -> ActivityGuard {
let id = agent.lock().await.push_activity(label);
ActivityGuard { agent: agent.clone(), id }
}
/// Result of a single agent turn.
pub struct TurnResult {
/// The text response (already sent through UI channel).
#[allow(dead_code)]
pub text: String,
/// Whether the model called yield_to_user during this turn.
pub yield_requested: bool,
/// Whether any tools (other than yield_to_user) were called.
pub had_tool_calls: bool,
/// Number of tool calls that returned errors this turn.
pub tool_errors: u32,
/// Model name to switch to after this turn completes.
pub model_switch: Option<String>,
/// Agent requested DMN pause (full stop on autonomous behavior).
pub dmn_pause: bool,
}
/// Accumulated state across tool dispatches within a single turn.
struct DispatchState {
yield_requested: bool,
had_tool_calls: bool,
tool_errors: u32,
model_switch: Option<String>,
dmn_pause: bool,
}
impl DispatchState {
fn new() -> Self {
Self {
yield_requested: false, had_tool_calls: false,
tool_errors: 0, model_switch: None, dmn_pause: false,
}
}
}
pub struct Agent {
client: ApiClient,
tools: Vec<tools::Tool>,
/// Last known prompt token count from the API (tracks context size).
last_prompt_tokens: u32,
/// Current reasoning effort level ("none", "low", "high").
pub reasoning_effort: String,
/// Sampling parameters — adjustable at runtime from the thalamus screen.
pub temperature: f32,
pub top_p: f32,
pub top_k: u32,
/// Active activities — RAII guards auto-remove on drop.
pub activities: Vec<ActivityEntry>,
next_activity_id: u64,
/// Control tool flags — set by tool handlers, consumed by turn loop.
pub pending_yield: bool,
pub pending_model_switch: Option<String>,
pub pending_dmn_pause: bool,
/// Provenance tag for memory operations — identifies who made the change.
pub provenance: String,
/// Persistent conversation log — append-only record of all messages.
pub conversation_log: Option<ConversationLog>,
/// Mutable context state — personality, working stack, etc.
pub context: ContextState,
/// App config — used to reload identity on compaction and model switching.
pub app_config: crate::config::AppConfig,
pub prompt_file: String,
/// Stable session ID for memory-search dedup across turns.
2026-04-04 02:46:32 -04:00
pub session_id: String,
/// Incremented on compaction — UI uses this to detect resets.
pub generation: u64,
/// Whether incremental memory scoring is currently running.
pub memory_scoring_in_flight: bool,
/// Shared active tools — Agent writes, TUI reads.
pub active_tools: tools::SharedActiveTools,
2026-04-05 23:04:10 -04:00
/// Fires when agent state changes — UI wakes on this instead of polling.
pub changed: Arc<tokio::sync::Notify>,
}
impl Agent {
pub fn new(
client: ApiClient,
system_prompt: String,
personality: Vec<(String, String)>,
app_config: crate::config::AppConfig,
prompt_file: String,
conversation_log: Option<ConversationLog>,
active_tools: tools::SharedActiveTools,
) -> Self {
let mut system = ContextSection::new("System prompt");
system.push(ContextEntry::new(
ConversationEntry::System(Message::system(&system_prompt)), None));
// Tool definitions — part of the context, tokenized and scored
let tool_defs: Vec<String> = tools::tools().iter()
.map(|t| t.to_json()).collect();
if !tool_defs.is_empty() {
let tools_text = format!(
"# Tools\n\nYou have access to the following functions:\n\n<tools>\n{}\n</tools>\n\n\
If you choose to call a function ONLY reply in the following format with NO suffix:\n\n\
<tool_call>\n<function=example_function_name>\n\
<parameter=example_parameter_1>\nvalue_1\n</parameter>\n\
</function>\n</tool_call>\n\n\
IMPORTANT: Function calls MUST follow the specified format.",
tool_defs.join("\n"),
);
system.push(ContextEntry::new(
ConversationEntry::System(Message::system(&tools_text)), None));
}
let mut identity = ContextSection::new("Identity");
for (_name, content) in &personality {
identity.push(ContextEntry::new(
ConversationEntry::Message(Message::user(content)), None));
}
let context = ContextState {
system,
identity,
journal: ContextSection::new("Journal"),
conversation: ContextSection::new("Conversation"),
working_stack: Vec::new(),
};
let session_id = format!("consciousness-{}", chrono::Utc::now().format("%Y%m%d-%H%M%S"));
let mut agent = Self {
client,
tools: tools::tools(),
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
top_p: 0.95,
top_k: 20,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: "manual".to_string(),
conversation_log,
context,
app_config,
prompt_file,
session_id,
generation: 0,
memory_scoring_in_flight: false,
active_tools,
2026-04-05 23:04:10 -04:00
changed: Arc::new(tokio::sync::Notify::new()),
};
agent.load_startup_journal();
agent.load_working_stack();
agent
}
/// Create a lightweight agent forked from this one's context.
///
/// The forked agent shares the same conversation prefix (system prompt,
/// personality, journal, entries) for KV cache sharing. The caller
/// appends the subconscious prompt as a user message and runs the turn.
pub fn fork(&self, tools: Vec<tools::Tool>) -> Self {
Self {
client: self.client.clone(),
tools,
last_prompt_tokens: 0,
reasoning_effort: "none".to_string(),
temperature: self.temperature,
top_p: self.top_p,
top_k: self.top_k,
activities: Vec::new(),
next_activity_id: 0,
pending_yield: false,
pending_model_switch: None,
pending_dmn_pause: false,
provenance: self.provenance.clone(),
conversation_log: None,
context: self.context.clone(),
app_config: self.app_config.clone(),
prompt_file: self.prompt_file.clone(),
session_id: self.session_id.clone(),
generation: 0,
memory_scoring_in_flight: false,
active_tools: tools::shared_active_tools(),
changed: Arc::new(tokio::sync::Notify::new()),
}
}
/// Assemble the full message list for the API call from typed sources.
/// System prompt + personality context + journal + conversation messages.
pub fn assemble_api_messages(&self) -> Vec<Message> {
let mut msgs = Vec::new();
// System section
for e in self.context.system.entries() {
msgs.push(e.entry.api_message().clone());
}
// Identity — render personality files + working stack into one user message
let ctx = self.context.render_context_message();
if !ctx.is_empty() {
msgs.push(Message::user(ctx));
}
// Journal — render into one user message
let jnl = self.context.render_journal();
if !jnl.is_empty() {
msgs.push(Message::user(jnl));
}
// Conversation entries
msgs.extend(self.context.conversation.entries().iter()
.filter(|e| !e.entry.is_log() && !e.entry.is_thinking())
.map(|e| e.entry.api_message().clone()));
msgs
}
/// Assemble the full prompt as token IDs for the completions API.
/// System section (includes tools), identity, journal, conversation,
/// then the assistant prompt suffix.
pub fn assemble_prompt_tokens(&self) -> Vec<u32> {
let mut tokens = Vec::new();
// System section — includes system prompt + tool definitions
for e in self.context.system.entries() {
tokens.extend(&e.token_ids);
}
// Identity — rendered as one user message
let ctx = self.context.render_context_message();
if !ctx.is_empty() {
tokens.extend(tokenizer::tokenize_entry("user", &ctx));
}
// Journal — rendered as one user message
let jnl = self.context.render_journal();
if !jnl.is_empty() {
tokens.extend(tokenizer::tokenize_entry("user", &jnl));
}
// Conversation entries — use cached token_ids
for e in self.context.conversation.entries() {
if e.entry.is_log() || e.entry.is_thinking() { continue; }
tokens.extend(&e.token_ids);
}
// Prompt the assistant to respond
tokens.push(tokenizer::IM_START);
tokens.extend(tokenizer::encode("assistant\n"));
tokens
}
/// Run agent orchestration cycle, returning structured output.
/// Push a conversation message — stamped and logged.
pub fn push_message(&mut self, mut msg: Message) {
msg.stamp();
let entry = ConversationEntry::Message(msg);
self.push_entry(entry);
}
pub fn push_entry(&mut self, entry: ConversationEntry) {
if let Some(ref log) = self.conversation_log {
if let Err(e) = log.append(&entry) {
eprintln!("warning: failed to log entry: {:#}", e);
}
}
self.context.conversation.push(ContextEntry::new(
entry, Some(chrono::Utc::now())));
2026-04-05 23:04:10 -04:00
self.changed.notify_one();
}
/// Find the index of the in-progress streaming entry (unstamped assistant message).
fn streaming_index(&self) -> Option<usize> {
self.context.conversation.entries().iter().rposition(|ce| {
let m = ce.entry.message();
m.role == Role::Assistant && m.timestamp.is_none()
})
}
/// Append streaming text to the last entry (creating a partial
/// assistant entry if needed). Called by collect_stream per token batch.
fn append_streaming(&mut self, text: &str) {
if let Some(idx) = self.streaming_index() {
let mut msg = self.context.conversation.entries()[idx].entry.message().clone();
msg.append_content(text);
self.context.conversation.set_message(idx, msg);
} else {
self.context.conversation.push(ContextEntry::new(
ConversationEntry::Message(Message {
role: Role::Assistant,
content: Some(MessageContent::Text(text.to_string())),
tool_calls: None,
tool_call_id: None,
name: None,
timestamp: None,
}),
None,
));
}
2026-04-05 23:04:10 -04:00
self.changed.notify_one();
}
/// Finalize the streaming entry with the complete response message.
/// Finds the unstamped assistant entry, replaces it via set() with proper token count.
fn finalize_streaming(&mut self, msg: Message) {
if let Some(i) = self.streaming_index() {
let mut stamped = msg.clone();
stamped.stamp();
self.context.conversation.set(i, ContextEntry::new(
ConversationEntry::Message(stamped),
Some(chrono::Utc::now()),
));
} else {
2026-04-06 23:04:08 -04:00
self.push_message(msg.clone());
}
// Log the finalized entry
if let Some(ref log) = self.conversation_log {
let entry = ConversationEntry::Message(msg);
if let Err(e) = log.append(&entry) {
eprintln!("warning: failed to log finalized entry: {:#}", e);
}
}
self.changed.notify_one();
}
/// Send a user message and run the agent loop until the model
/// produces a text response (no more tool calls). Streams text
/// and tool activity through the UI channel.
///
/// Takes Arc<Mutex<Agent>> and manages locking internally so the
/// lock is never held across I/O (API streaming, tool dispatch).
pub async fn turn(
agent: Arc<tokio::sync::Mutex<Agent>>,
) -> Result<TurnResult> {
// --- Pre-loop setup (lock 1): collect finished tools ---
let active_tools = {
let mut finished = Vec::new();
let tools = {
let me = agent.lock().await;
// Collect completed background tool handles — remove from active list
// but don't await yet (MutexGuard isn't Send).
let mut tools = me.active_tools.lock().unwrap();
let mut i = 0;
while i < tools.len() {
if tools[i].handle.is_finished() {
finished.push(tools.remove(i));
} else {
i += 1;
}
}
me.active_tools.clone()
};
// Await finished handles without holding the agent lock
let mut bg_results = Vec::new();
for entry in finished {
if let Ok((call, output)) = entry.handle.await {
bg_results.push((call, output));
}
}
// Re-acquire to apply background tool results
if !bg_results.is_empty() {
let mut me = agent.lock().await;
let mut bg_ds = DispatchState::new();
for (call, output) in bg_results {
me.apply_tool_result(&call, output, &mut bg_ds);
}
}
tools
};
let mut overflow_retries: u32 = 0;
let mut empty_retries: u32 = 0;
let mut ds = DispatchState::new();
loop {
// --- Lock 2: assemble messages, start stream ---
let _thinking = start_activity(&agent, "thinking...").await;
let (mut rx, _stream_guard) = {
let me = agent.lock().await;
let sampling = api::SamplingParams {
temperature: me.temperature,
top_p: me.top_p,
top_k: me.top_k,
};
if tokenizer::is_initialized() {
let prompt_tokens = me.assemble_prompt_tokens();
me.client.start_stream_completions(
&prompt_tokens,
sampling,
None,
)
} else {
let api_messages = me.assemble_api_messages();
me.client.start_stream(
&api_messages,
&me.tools,
&me.reasoning_effort,
sampling,
None,
)
}
};
// --- Lock released ---
// --- Stream loop (no lock) ---
let sr = api::collect_stream(
&mut rx, &agent, &active_tools,
).await;
let api::StreamResult {
content, tool_calls, usage, finish_reason,
error: stream_error, display_buf, in_tool_call, reasoning,
} = sr;
// --- Stream complete ---
// Push thinking entry if model produced reasoning
if !reasoning.is_empty() {
let mut me = agent.lock().await;
me.push_entry(context::ConversationEntry::Thinking(reasoning));
}
// --- Lock 3: process results ---
let (msg, pending) = {
let mut me = agent.lock().await;
// Handle stream errors with retry logic
if let Some(e) = stream_error {
let err = anyhow::anyhow!("{}", e);
if crate::agent::context::is_context_overflow(&err) && overflow_retries < 2 {
overflow_retries += 1;
me.notify(format!("context overflow — retrying ({}/2)", overflow_retries));
me.compact();
continue;
}
if crate::agent::context::is_stream_error(&err) && empty_retries < 2 {
empty_retries += 1;
me.notify(format!("stream error — retrying ({}/2)", empty_retries));
drop(me);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
return Err(err);
}
if finish_reason.as_deref() == Some("error") {
let detail = if content.is_empty() { "no details".into() } else { content };
return Err(anyhow::anyhow!("model stream error: {}", detail));
}
// Flush remaining display buffer to streaming entry
if !in_tool_call && !display_buf.is_empty() {
me.append_streaming(&display_buf);
}
let msg = api::build_response_message(content, tool_calls);
if let Some(usage) = &usage {
me.last_prompt_tokens = usage.prompt_tokens;
}
// Empty response — nudge and retry
let has_content = msg.content.is_some();
let has_tools = msg.tool_calls.as_ref().map_or(false, |tc| !tc.is_empty());
if !has_content && !has_tools {
if empty_retries < 2 {
empty_retries += 1;
dbglog!(
"empty response, injecting nudge and retrying ({}/2)",
empty_retries,
);
me.push_message(Message::user(
"[system] Your previous response was empty. \
Please respond with text or use a tool."
));
continue;
}
} else {
empty_retries = 0;
}
// Collect non-background tool calls fired during streaming
let mut tools_guard = active_tools.lock().unwrap();
let mut non_bg = Vec::new();
let mut i = 0;
while i < tools_guard.len() {
if !tools_guard[i].background {
non_bg.push(tools_guard.remove(i));
} else {
i += 1;
}
}
(msg, non_bg)
};
if !pending.is_empty() {
agent.lock().await.finalize_streaming(msg.clone());
// Drop lock before awaiting tool handles
let mut results = Vec::new();
for entry in pending {
if let Ok(r) = entry.handle.await {
results.push(r);
}
}
// Reacquire to apply results
let mut me = agent.lock().await;
for (call, output) in results {
me.apply_tool_result(&call, output, &mut ds);
}
continue;
}
// Tool calls (structured API path)
if let Some(ref tool_calls) = msg.tool_calls {
if !tool_calls.is_empty() {
agent.lock().await.finalize_streaming(msg.clone());
let calls: Vec<ToolCall> = tool_calls.clone();
// Drop lock before tool dispatch
for call in &calls {
Agent::dispatch_tool_call_unlocked(
&agent, &active_tools, call, &mut ds,
).await;
}
continue;
}
}
// Genuinely text-only response
let text = msg.content_text().to_string();
let mut me = agent.lock().await;
me.finalize_streaming(msg);
// Drain pending control flags
if me.pending_yield { ds.yield_requested = true; me.pending_yield = false; }
if me.pending_model_switch.is_some() { ds.model_switch = me.pending_model_switch.take(); }
if me.pending_dmn_pause { ds.dmn_pause = true; me.pending_dmn_pause = false; }
return Ok(TurnResult {
text,
yield_requested: ds.yield_requested,
had_tool_calls: ds.had_tool_calls,
tool_errors: ds.tool_errors,
model_switch: ds.model_switch,
dmn_pause: ds.dmn_pause,
});
}
}
/// Dispatch a tool call without holding the agent lock across I/O.
/// Used by `turn()` which manages its own locking.
async fn dispatch_tool_call_unlocked(
agent: &Arc<tokio::sync::Mutex<Agent>>,
active_tools: &tools::SharedActiveTools,
call: &ToolCall,
ds: &mut DispatchState,
) {
let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) {
Ok(v) => v,
Err(e) => {
let err = format!("Error: malformed tool call arguments: {e}");
let _act = start_activity(agent, format!("rejected: {} (bad args)", call.function.name)).await;
let mut me = agent.lock().await;
me.apply_tool_result(call, err, ds);
return;
}
};
let args_summary = summarize_args(&call.function.name, &args);
let _calling = start_activity(agent, format!("calling: {}", call.function.name)).await;
// Spawn tool, track it
let call_clone = call.clone();
let agent_handle = agent.clone();
let handle = tokio::spawn(async move {
let output = tools::dispatch_with_agent(&call_clone.function.name, &args, Some(agent_handle)).await;
(call_clone, output)
});
active_tools.lock().unwrap().push(
tools::ActiveToolCall {
id: call.id.clone(),
name: call.function.name.clone(),
detail: args_summary,
started: std::time::Instant::now(),
background: false,
handle,
}
);
// Pop it back and await — no agent lock held
let entry = {
let mut tools = active_tools.lock().unwrap();
tools.pop().unwrap()
};
if let Ok((call, output)) = entry.handle.await {
// Brief lock to apply result
let mut me = agent.lock().await;
me.apply_tool_result(&call, output, ds);
}
}
/// Apply a completed tool result to conversation state.
fn apply_tool_result(
&mut self,
call: &ToolCall,
output: String,
ds: &mut DispatchState,
) {
let args: serde_json::Value =
serde_json::from_str(&call.function.arguments).unwrap_or_default();
ds.had_tool_calls = true;
if output.starts_with("Error:") {
ds.tool_errors += 1;
}
self.active_tools.lock().unwrap().retain(|t| t.id != call.id);
// Tag memory_render results for context deduplication
if call.function.name == "memory_render" && !output.starts_with("Error:") {
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
let mut msg = Message::tool_result(&call.id, &output);
msg.stamp();
self.push_entry(ConversationEntry::Memory { key: key.to_string(), message: msg, score: None });
return;
}
}
self.push_message(Message::tool_result(&call.id, &output));
}
/// Context state sections — just returns references to the live data.
pub fn context_sections(&self) -> [&ContextSection; 4] {
self.context.sections()
}
/// Conversation entries from `from` onward — used by the
/// subconscious debug screen to show forked agent conversations.
pub fn conversation_entries_from(&self, from: usize) -> &[ContextEntry] {
let entries = self.context.conversation.entries();
if from < entries.len() { &entries[from..] } else { &[] }
}
/// Load recent journal entries at startup for orientation.
/// Uses the same budget logic as compaction but with empty conversation.
/// Only parses the tail of the journal file (last 64KB) for speed.
fn load_startup_journal(&mut self) {
let store = match crate::store::Store::load() {
Ok(s) => s,
Err(_) => return,
};
// Find oldest message timestamp in conversation log
let oldest_msg_ts = self.conversation_log.as_ref()
.and_then(|log| log.oldest_timestamp());
// Get journal entries from the memory graph
let mut journal_nodes: Vec<_> = store.nodes.values()
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
.collect();
let mut dbg = std::fs::OpenOptions::new().create(true).append(true)
.open("/tmp/poc-journal-debug.log").ok();
macro_rules! dbg_log {
($($arg:tt)*) => {
if let Some(ref mut f) = dbg { use std::io::Write; let _ = writeln!(f, $($arg)*); }
}
}
dbg_log!("[journal] {} nodes, oldest_msg={:?}", journal_nodes.len(), oldest_msg_ts);
journal_nodes.sort_by_key(|n| n.created_at);
if let Some(first) = journal_nodes.first() {
dbg_log!("[journal] first created_at={}", first.created_at);
}
if let Some(last) = journal_nodes.last() {
dbg_log!("[journal] last created_at={}", last.created_at);
}
// Find the cutoff index — entries older than conversation, plus one overlap
let cutoff_idx = if let Some(cutoff) = oldest_msg_ts {
let cutoff_ts = cutoff.timestamp();
dbg_log!("[journal] cutoff timestamp={}", cutoff_ts);
let mut idx = journal_nodes.len();
for (i, node) in journal_nodes.iter().enumerate() {
if node.created_at >= cutoff_ts {
idx = i + 1;
break;
}
}
idx
} else {
journal_nodes.len()
};
dbg_log!("[journal] cutoff_idx={}", cutoff_idx);
// Walk backwards from cutoff, accumulating entries within 15% of context
let context_window = crate::agent::context::context_window();
let journal_budget = context_window * 15 / 100;
dbg_log!("[journal] budget={} tokens ({}*15%)", journal_budget, context_window);
let mut journal_entries = Vec::new();
let mut total_tokens = 0;
for node in journal_nodes[..cutoff_idx].iter().rev() {
let msg = Message::user(&node.content);
let ce = ContextEntry::new(
ConversationEntry::Message(msg),
chrono::DateTime::from_timestamp(node.created_at, 0),
);
if total_tokens + ce.tokens() > journal_budget && !journal_entries.is_empty() {
break;
}
total_tokens += ce.tokens();
journal_entries.push(ce);
}
journal_entries.reverse();
dbg_log!("[journal] loaded {} entries, {} tokens", journal_entries.len(), total_tokens);
if journal_entries.is_empty() {
dbg_log!("[journal] no entries!");
return;
}
self.context.journal.clear();
for entry in journal_entries {
self.context.journal.push(entry);
}
dbg_log!("[journal] context.journal now has {} entries", self.context.journal.len());
}
/// Called after any change to context state (working stack, etc).
/// Load working stack from disk.
fn load_working_stack(&mut self) {
if let Ok(data) = std::fs::read_to_string(working_stack::file_path()) {
if let Ok(stack) = serde_json::from_str::<Vec<String>>(&data) {
self.context.working_stack = stack;
}
}
}
/// Replace base64 image data in older messages with text placeholders.
/// Keeps the 2 most recent images live (enough for motion/comparison).
/// The tool result message before each image records what was loaded.
pub fn age_out_images(&mut self) {
// Find image entries newest-first, skip 1 (caller is about to add another)
let to_age: Vec<usize> = self.context.conversation.entries().iter().enumerate()
.rev()
.filter(|(_, ce)| {
if let Some(MessageContent::Parts(parts)) = &ce.entry.message().content {
parts.iter().any(|p| matches!(p, ContentPart::ImageUrl { .. }))
} else { false }
})
.map(|(i, _)| i)
.skip(1) // keep 1 existing + 1 about to be added = 2 live
.collect();
for i in to_age {
// Build replacement entry with image data stripped
let old = &self.context.conversation.entries()[i];
let msg = old.entry.message();
if let Some(MessageContent::Parts(parts)) = &msg.content {
let mut replacement = String::new();
for part in parts {
match part {
ContentPart::Text { text } => {
if !replacement.is_empty() { replacement.push('\n'); }
replacement.push_str(text);
}
ContentPart::ImageUrl { .. } => {
if !replacement.is_empty() { replacement.push('\n'); }
replacement.push_str("[image aged out]");
}
}
}
let mut new_msg = msg.clone();
new_msg.content = Some(MessageContent::Text(replacement));
self.context.conversation.set(i, ContextEntry::new(
ConversationEntry::Message(new_msg),
old.timestamp,
));
}
}
self.generation += 1;
}
/// Strip ephemeral tool calls from the conversation history.
///
/// Last prompt token count reported by the API.
pub fn last_prompt_tokens(&self) -> u32 {
self.last_prompt_tokens
}
/// Rebuild the context window: reload identity, dedup, trim, reload journal.
pub fn compact(&mut self) {
// Reload identity from config
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
Ok((system_prompt, personality)) => {
self.context.system.clear();
self.context.system.push(ContextEntry::new(
ConversationEntry::System(Message::system(&system_prompt)), None));
self.context.identity.clear();
for (_name, content) in &personality {
self.context.identity.push(ContextEntry::new(
ConversationEntry::Message(Message::user(content)), None));
}
}
Err(e) => {
eprintln!("warning: failed to reload identity: {:#}", e);
}
}
let before = self.context.conversation.len();
let before_mem = self.context.conversation.entries().iter()
.filter(|e| e.entry.is_memory()).count();
let before_conv = before - before_mem;
// Age out images before trimming — they're huge in the request payload
self.age_out_images();
// Load journal BEFORE trimming so trim accounts for journal cost
self.load_startup_journal();
// Dedup memory, trim to budget
let fixed = self.context.system.tokens() + self.context.identity.tokens()
+ self.context.journal.tokens();
self.context.conversation.trim(fixed);
let after = self.context.conversation.len();
let after_mem = self.context.conversation.entries().iter()
.filter(|e| e.entry.is_memory()).count();
let after_conv = after - after_mem;
dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})",
before, after, before_mem, after_mem, before_conv, after_conv);
self.generation += 1;
self.last_prompt_tokens = 0;
dbglog!("[compact] budget: {}", self.context.format_budget());
}
/// Restore from the conversation log. Builds the context window
/// the same way compact() does — journal summaries for old messages,
/// raw recent messages. This is the unified startup path.
/// Returns true if the log had content to restore.
pub fn restore_from_log(&mut self) -> bool {
let entries = match &self.conversation_log {
Some(log) => match log.read_tail(64 * 1024 * 1024) {
Ok(entries) if !entries.is_empty() => entries,
_ => return false,
},
None => return false,
};
// Load extra — compact() will dedup, trim, reload identity + journal
let all: Vec<ContextEntry> = entries.into_iter()
.filter(|e| !e.is_log() && !e.is_thinking() && e.message().role != Role::System)
.map(|e| {
let timestamp = if e.is_log() || e.is_thinking() { None } else {
e.message().timestamp.as_ref().and_then(|ts| {
chrono::DateTime::parse_from_rfc3339(ts).ok()
.map(|dt| dt.with_timezone(&chrono::Utc))
})
};
ContextEntry::new(e, timestamp)
})
.collect();
let mem_count = all.iter().filter(|e| e.entry.is_memory()).count();
let conv_count = all.len() - mem_count;
dbglog!("[restore] loaded {} entries from log (mem: {}, conv: {})",
all.len(), mem_count, conv_count);
self.context.conversation.set_entries(all);
self.compact();
// Estimate prompt tokens so status bar isn't 0 on startup
self.last_prompt_tokens = self.context.total_tokens() as u32;
true
}
/// Replace the API client (for model switching).
pub fn swap_client(&mut self, new_client: ApiClient) {
self.client = new_client;
}
/// Get the model identifier.
pub fn model(&self) -> &str {
&self.client.model
}
/// Get the conversation entries.
pub fn entries(&self) -> &[ContextEntry] {
self.context.conversation.entries()
}
/// Mutable access to conversation entries (for /retry).
pub fn client_clone(&self) -> ApiClient {
self.client.clone()
}
}