WIP: Agent core migrated to AST types
agent/mod.rs fully uses AstNode/ContextState/PendingToolCall. Killed: push_message, push_entry, append_streaming, finalize_streaming, streaming_index, assemble_api_messages, age_out_images, working_stack, context_sections, entries. ConversationLog rewritten for AstNode. Remaining: api dead code (chat path), mind/, user/, oneshot, learn. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
9c79d7a037
commit
a68377907a
4 changed files with 70 additions and 243 deletions
|
|
@ -80,6 +80,7 @@ pub enum AstNode {
|
||||||
/// The context window: four sections as Vec<AstNode>.
|
/// The context window: four sections as Vec<AstNode>.
|
||||||
/// All mutation goes through ContextState methods to maintain the invariant
|
/// All mutation goes through ContextState methods to maintain the invariant
|
||||||
/// that token_ids on every leaf matches its rendered text.
|
/// that token_ids on every leaf matches its rendered text.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct ContextState {
|
pub struct ContextState {
|
||||||
system: Vec<AstNode>,
|
system: Vec<AstNode>,
|
||||||
identity: Vec<AstNode>,
|
identity: Vec<AstNode>,
|
||||||
|
|
@ -699,11 +700,14 @@ impl ContextState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a node at `index` from `section`.
|
|
||||||
pub fn del(&mut self, section: Section, index: usize) -> AstNode {
|
pub fn del(&mut self, section: Section, index: usize) -> AstNode {
|
||||||
self.section_mut(section).remove(index)
|
self.section_mut(section).remove(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn clear(&mut self, section: Section) {
|
||||||
|
self.section_mut(section).clear();
|
||||||
|
}
|
||||||
|
|
||||||
/// Push a child node into a branch at `index` in `section`.
|
/// Push a child node into a branch at `index` in `section`.
|
||||||
pub fn push_child(&mut self, section: Section, index: usize, child: AstNode) {
|
pub fn push_child(&mut self, section: Section, index: usize, child: AstNode) {
|
||||||
let node = &mut self.section_mut(section)[index];
|
let node = &mut self.section_mut(section)[index];
|
||||||
|
|
|
||||||
252
src/agent/mod.rs
252
src/agent/mod.rs
|
|
@ -25,7 +25,7 @@ use anyhow::Result;
|
||||||
|
|
||||||
use api::ApiClient;
|
use api::ApiClient;
|
||||||
use context_new::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role};
|
use context_new::{AstNode, NodeBody, ContextState, Section, Ast, PendingToolCall, ResponseParser, Role};
|
||||||
use tools::{summarize_args, working_stack};
|
use tools::summarize_args;
|
||||||
|
|
||||||
use crate::mind::log::ConversationLog;
|
use crate::mind::log::ConversationLog;
|
||||||
|
|
||||||
|
|
@ -505,38 +505,36 @@ impl Agent {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Dispatch a tool call without holding the agent lock across I/O.
|
/// Dispatch a tool call without holding the agent lock across I/O.
|
||||||
/// Used by `turn()` which manages its own locking.
|
|
||||||
async fn dispatch_tool_call_unlocked(
|
async fn dispatch_tool_call_unlocked(
|
||||||
agent: &Arc<tokio::sync::Mutex<Agent>>,
|
agent: &Arc<tokio::sync::Mutex<Agent>>,
|
||||||
active_tools: &tools::SharedActiveTools,
|
active_tools: &tools::SharedActiveTools,
|
||||||
call: &ToolCall,
|
call: &PendingToolCall,
|
||||||
ds: &mut DispatchState,
|
ds: &mut DispatchState,
|
||||||
) {
|
) {
|
||||||
let args: serde_json::Value = match serde_json::from_str(&call.function.arguments) {
|
let args: serde_json::Value = match serde_json::from_str(&call.arguments) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let err = format!("Error: malformed tool call arguments: {e}");
|
let err = format!("Error: malformed tool call arguments: {e}");
|
||||||
let _act = start_activity(agent, format!("rejected: {} (bad args)", call.function.name)).await;
|
let _act = start_activity(agent, format!("rejected: {} (bad args)", call.name)).await;
|
||||||
let mut me = agent.lock().await;
|
let mut me = agent.lock().await;
|
||||||
me.apply_tool_result(call, err, ds);
|
me.apply_tool_result(call, err, ds);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let args_summary = summarize_args(&call.function.name, &args);
|
let args_summary = summarize_args(&call.name, &args);
|
||||||
let _calling = start_activity(agent, format!("calling: {}", call.function.name)).await;
|
let _calling = start_activity(agent, format!("calling: {}", call.name)).await;
|
||||||
|
|
||||||
// Spawn tool, track it
|
|
||||||
let call_clone = call.clone();
|
let call_clone = call.clone();
|
||||||
let agent_handle = agent.clone();
|
let agent_handle = agent.clone();
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
let output = tools::dispatch_with_agent(&call_clone.function.name, &args, Some(agent_handle)).await;
|
let output = tools::dispatch_with_agent(&call_clone.name, &args, Some(agent_handle)).await;
|
||||||
(call_clone, output)
|
(call_clone, output)
|
||||||
});
|
});
|
||||||
active_tools.lock().unwrap().push(
|
active_tools.lock().unwrap().push(
|
||||||
tools::ActiveToolCall {
|
tools::ActiveToolCall {
|
||||||
id: call.id.clone(),
|
id: call.id.clone(),
|
||||||
name: call.function.name.clone(),
|
name: call.name.clone(),
|
||||||
detail: args_summary,
|
detail: args_summary,
|
||||||
started: std::time::Instant::now(),
|
started: std::time::Instant::now(),
|
||||||
background: false,
|
background: false,
|
||||||
|
|
@ -544,28 +542,22 @@ impl Agent {
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// Pop it back and await — no agent lock held
|
|
||||||
let entry = {
|
let entry = {
|
||||||
let mut tools = active_tools.lock().unwrap();
|
let mut tools = active_tools.lock().unwrap();
|
||||||
tools.pop().unwrap()
|
tools.pop().unwrap()
|
||||||
};
|
};
|
||||||
if let Ok((call, output)) = entry.handle.await {
|
if let Ok((call, output)) = entry.handle.await {
|
||||||
// Brief lock to apply result
|
|
||||||
let mut me = agent.lock().await;
|
let mut me = agent.lock().await;
|
||||||
me.apply_tool_result(&call, output, ds);
|
me.apply_tool_result(&call, output, ds);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Apply a completed tool result to conversation state.
|
|
||||||
fn apply_tool_result(
|
fn apply_tool_result(
|
||||||
&mut self,
|
&mut self,
|
||||||
call: &ToolCall,
|
call: &PendingToolCall,
|
||||||
output: String,
|
output: String,
|
||||||
ds: &mut DispatchState,
|
ds: &mut DispatchState,
|
||||||
) {
|
) {
|
||||||
let args: serde_json::Value =
|
|
||||||
serde_json::from_str(&call.function.arguments).unwrap_or_default();
|
|
||||||
|
|
||||||
ds.had_tool_calls = true;
|
ds.had_tool_calls = true;
|
||||||
if output.starts_with("Error:") {
|
if output.starts_with("Error:") {
|
||||||
ds.tool_errors += 1;
|
ds.tool_errors += 1;
|
||||||
|
|
@ -573,70 +565,41 @@ impl Agent {
|
||||||
|
|
||||||
self.active_tools.lock().unwrap().retain(|t| t.id != call.id);
|
self.active_tools.lock().unwrap().retain(|t| t.id != call.id);
|
||||||
|
|
||||||
// Tag memory_render results for context deduplication
|
// Tag memory_render results as Memory nodes for context deduplication
|
||||||
if call.function.name == "memory_render" && !output.starts_with("Error:") {
|
if call.name == "memory_render" && !output.starts_with("Error:") {
|
||||||
|
let args: serde_json::Value =
|
||||||
|
serde_json::from_str(&call.arguments).unwrap_or_default();
|
||||||
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
|
if let Some(key) = args.get("key").and_then(|v| v.as_str()) {
|
||||||
let mut msg = Message::tool_result(&call.id, &output);
|
self.push_node(AstNode::memory(key, &output));
|
||||||
msg.stamp();
|
|
||||||
self.push_entry(ConversationEntry::Memory { key: key.to_string(), message: msg, score: None });
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.push_message(Message::tool_result(&call.id, &output));
|
self.push_node(AstNode::tool_result(&output));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Context state sections — just returns references to the live data.
|
pub fn conversation_from(&self, from: usize) -> &[AstNode] {
|
||||||
pub fn context_sections(&self) -> [&ContextSection; 4] {
|
let conv = self.context.conversation();
|
||||||
self.context.sections()
|
if from < conv.len() { &conv[from..] } else { &[] }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Conversation entries from `from` onward — used by the
|
|
||||||
/// subconscious debug screen to show forked agent conversations.
|
|
||||||
pub fn conversation_entries_from(&self, from: usize) -> &[ContextEntry] {
|
|
||||||
let entries = self.context.conversation.entries();
|
|
||||||
if from < entries.len() { &entries[from..] } else { &[] }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Load recent journal entries at startup for orientation.
|
|
||||||
/// Uses the same budget logic as compaction but with empty conversation.
|
|
||||||
/// Only parses the tail of the journal file (last 64KB) for speed.
|
|
||||||
fn load_startup_journal(&mut self) {
|
fn load_startup_journal(&mut self) {
|
||||||
let store = match crate::store::Store::load() {
|
let store = match crate::store::Store::load() {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Find oldest message timestamp in conversation log
|
|
||||||
let oldest_msg_ts = self.conversation_log.as_ref()
|
let oldest_msg_ts = self.conversation_log.as_ref()
|
||||||
.and_then(|log| log.oldest_timestamp());
|
.and_then(|log| log.oldest_timestamp());
|
||||||
|
|
||||||
// Get journal entries from the memory graph
|
|
||||||
let mut journal_nodes: Vec<_> = store.nodes.values()
|
let mut journal_nodes: Vec<_> = store.nodes.values()
|
||||||
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
|
.filter(|n| n.node_type == crate::store::NodeType::EpisodicSession)
|
||||||
.collect();
|
.collect();
|
||||||
let mut dbg = std::fs::OpenOptions::new().create(true).append(true)
|
|
||||||
.open("/tmp/poc-journal-debug.log").ok();
|
|
||||||
macro_rules! dbg_log {
|
|
||||||
($($arg:tt)*) => {
|
|
||||||
if let Some(ref mut f) = dbg { use std::io::Write; let _ = writeln!(f, $($arg)*); }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dbg_log!("[journal] {} nodes, oldest_msg={:?}", journal_nodes.len(), oldest_msg_ts);
|
|
||||||
|
|
||||||
journal_nodes.sort_by_key(|n| n.created_at);
|
journal_nodes.sort_by_key(|n| n.created_at);
|
||||||
if let Some(first) = journal_nodes.first() {
|
|
||||||
dbg_log!("[journal] first created_at={}", first.created_at);
|
|
||||||
}
|
|
||||||
if let Some(last) = journal_nodes.last() {
|
|
||||||
dbg_log!("[journal] last created_at={}", last.created_at);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the cutoff index — entries older than conversation, plus one overlap
|
|
||||||
let cutoff_idx = if let Some(cutoff) = oldest_msg_ts {
|
let cutoff_idx = if let Some(cutoff) = oldest_msg_ts {
|
||||||
let cutoff_ts = cutoff.timestamp();
|
let cutoff_ts = cutoff.timestamp();
|
||||||
dbg_log!("[journal] cutoff timestamp={}", cutoff_ts);
|
|
||||||
let mut idx = journal_nodes.len();
|
let mut idx = journal_nodes.len();
|
||||||
for (i, node) in journal_nodes.iter().enumerate() {
|
for (i, node) in journal_nodes.iter().enumerate() {
|
||||||
if node.created_at >= cutoff_ts {
|
if node.created_at >= cutoff_ts {
|
||||||
|
|
@ -648,117 +611,45 @@ impl Agent {
|
||||||
} else {
|
} else {
|
||||||
journal_nodes.len()
|
journal_nodes.len()
|
||||||
};
|
};
|
||||||
dbg_log!("[journal] cutoff_idx={}", cutoff_idx);
|
|
||||||
|
|
||||||
// Walk backwards from cutoff, accumulating entries within 15% of context
|
let journal_budget = context_new::context_window() * 15 / 100;
|
||||||
let context_window = crate::agent::context::context_window();
|
let mut entries = Vec::new();
|
||||||
let journal_budget = context_window * 15 / 100;
|
|
||||||
dbg_log!("[journal] budget={} tokens ({}*15%)", journal_budget, context_window);
|
|
||||||
|
|
||||||
let mut journal_entries = Vec::new();
|
|
||||||
let mut total_tokens = 0;
|
let mut total_tokens = 0;
|
||||||
|
|
||||||
for node in journal_nodes[..cutoff_idx].iter().rev() {
|
for node in journal_nodes[..cutoff_idx].iter().rev() {
|
||||||
let msg = Message::user(&node.content);
|
let ts = chrono::DateTime::from_timestamp(node.created_at, 0);
|
||||||
let ce = ContextEntry::new(
|
let ast = AstNode::memory(&node.key, &node.content)
|
||||||
ConversationEntry::Message(msg),
|
.with_timestamp(ts.unwrap_or_else(chrono::Utc::now));
|
||||||
chrono::DateTime::from_timestamp(node.created_at, 0),
|
let tok = ast.tokens();
|
||||||
);
|
if total_tokens + tok > journal_budget && !entries.is_empty() {
|
||||||
if total_tokens + ce.tokens() > journal_budget && !journal_entries.is_empty() {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
total_tokens += ce.tokens();
|
total_tokens += tok;
|
||||||
journal_entries.push(ce);
|
entries.push(ast);
|
||||||
}
|
}
|
||||||
journal_entries.reverse();
|
entries.reverse();
|
||||||
dbg_log!("[journal] loaded {} entries, {} tokens", journal_entries.len(), total_tokens);
|
|
||||||
|
|
||||||
if journal_entries.is_empty() {
|
if entries.is_empty() { return; }
|
||||||
dbg_log!("[journal] no entries!");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.context.journal.clear();
|
self.context.clear(Section::Journal);
|
||||||
for entry in journal_entries {
|
for entry in entries {
|
||||||
self.context.journal.push(entry);
|
self.context.push(Section::Journal, entry);
|
||||||
}
|
|
||||||
dbg_log!("[journal] context.journal now has {} entries", self.context.journal.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Called after any change to context state (working stack, etc).
|
|
||||||
/// Load working stack from disk.
|
|
||||||
fn load_working_stack(&mut self) {
|
|
||||||
if let Ok(data) = std::fs::read_to_string(working_stack::file_path()) {
|
|
||||||
if let Ok(stack) = serde_json::from_str::<Vec<String>>(&data) {
|
|
||||||
self.context.working_stack = stack;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Replace base64 image data in older messages with text placeholders.
|
|
||||||
/// Keeps the 2 most recent images live (enough for motion/comparison).
|
|
||||||
/// The tool result message before each image records what was loaded.
|
|
||||||
pub fn age_out_images(&mut self) {
|
|
||||||
// Find image entries newest-first, skip 1 (caller is about to add another)
|
|
||||||
let to_age: Vec<usize> = self.context.conversation.entries().iter().enumerate()
|
|
||||||
.rev()
|
|
||||||
.filter(|(_, ce)| {
|
|
||||||
if let Some(MessageContent::Parts(parts)) = &ce.entry.message().content {
|
|
||||||
parts.iter().any(|p| matches!(p, ContentPart::ImageUrl { .. }))
|
|
||||||
} else { false }
|
|
||||||
})
|
|
||||||
.map(|(i, _)| i)
|
|
||||||
.skip(1) // keep 1 existing + 1 about to be added = 2 live
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for i in to_age {
|
|
||||||
// Build replacement entry with image data stripped
|
|
||||||
let old = &self.context.conversation.entries()[i];
|
|
||||||
let msg = old.entry.message();
|
|
||||||
if let Some(MessageContent::Parts(parts)) = &msg.content {
|
|
||||||
let mut replacement = String::new();
|
|
||||||
for part in parts {
|
|
||||||
match part {
|
|
||||||
ContentPart::Text { text } => {
|
|
||||||
if !replacement.is_empty() { replacement.push('\n'); }
|
|
||||||
replacement.push_str(text);
|
|
||||||
}
|
|
||||||
ContentPart::ImageUrl { .. } => {
|
|
||||||
if !replacement.is_empty() { replacement.push('\n'); }
|
|
||||||
replacement.push_str("[image aged out]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut new_msg = msg.clone();
|
|
||||||
new_msg.content = Some(MessageContent::Text(replacement));
|
|
||||||
self.context.conversation.set(i, ContextEntry::new(
|
|
||||||
ConversationEntry::Message(new_msg),
|
|
||||||
old.timestamp,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.generation += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Strip ephemeral tool calls from the conversation history.
|
|
||||||
///
|
|
||||||
/// Last prompt token count reported by the API.
|
|
||||||
pub fn last_prompt_tokens(&self) -> u32 {
|
pub fn last_prompt_tokens(&self) -> u32 {
|
||||||
self.last_prompt_tokens
|
self.last_prompt_tokens
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rebuild the context window: reload identity, dedup, trim, reload journal.
|
/// Rebuild the context window: reload identity, trim, reload journal.
|
||||||
pub fn compact(&mut self) {
|
pub fn compact(&mut self) {
|
||||||
// Reload identity from config
|
|
||||||
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
|
match crate::config::reload_for_model(&self.app_config, &self.prompt_file) {
|
||||||
Ok((system_prompt, personality)) => {
|
Ok((system_prompt, personality)) => {
|
||||||
self.context.system.clear();
|
self.context.clear(Section::System);
|
||||||
self.context.system.push(ContextEntry::new(
|
self.context.push(Section::System, AstNode::system_msg(&system_prompt));
|
||||||
ConversationEntry::System(Message::system(&system_prompt)), None));
|
self.context.clear(Section::Identity);
|
||||||
self.context.identity.clear();
|
for (name, content) in &personality {
|
||||||
for (_name, content) in &personality {
|
self.context.push(Section::Identity, AstNode::memory(name, content));
|
||||||
self.context.identity.push(ContextEntry::new(
|
|
||||||
ConversationEntry::Message(Message::user(content)), None));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -766,88 +657,45 @@ impl Agent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let before = self.context.conversation.len();
|
|
||||||
let before_mem = self.context.conversation.entries().iter()
|
|
||||||
.filter(|e| e.entry.is_memory()).count();
|
|
||||||
let before_conv = before - before_mem;
|
|
||||||
|
|
||||||
// Age out images before trimming — they're huge in the request payload
|
|
||||||
self.age_out_images();
|
|
||||||
|
|
||||||
// Load journal BEFORE trimming so trim accounts for journal cost
|
|
||||||
self.load_startup_journal();
|
self.load_startup_journal();
|
||||||
|
|
||||||
// Dedup memory, trim to budget
|
// TODO: trim_entries — dedup memories, evict to budget
|
||||||
let fixed = self.context.system.tokens() + self.context.identity.tokens()
|
|
||||||
+ self.context.journal.tokens();
|
|
||||||
self.context.conversation.trim(fixed);
|
|
||||||
|
|
||||||
let after = self.context.conversation.len();
|
|
||||||
let after_mem = self.context.conversation.entries().iter()
|
|
||||||
.filter(|e| e.entry.is_memory()).count();
|
|
||||||
let after_conv = after - after_mem;
|
|
||||||
|
|
||||||
dbglog!("[compact] entries: {} → {} (mem: {} → {}, conv: {} → {})",
|
|
||||||
before, after, before_mem, after_mem, before_conv, after_conv);
|
|
||||||
self.generation += 1;
|
self.generation += 1;
|
||||||
self.last_prompt_tokens = 0;
|
self.last_prompt_tokens = 0;
|
||||||
|
|
||||||
dbglog!("[compact] budget: {}", self.context.format_budget());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Restore from the conversation log. Builds the context window
|
/// Restore from the conversation log.
|
||||||
/// the same way compact() does — journal summaries for old messages,
|
|
||||||
/// raw recent messages. This is the unified startup path.
|
|
||||||
/// Returns true if the log had content to restore.
|
/// Returns true if the log had content to restore.
|
||||||
pub fn restore_from_log(&mut self) -> bool {
|
pub fn restore_from_log(&mut self) -> bool {
|
||||||
let entries = match &self.conversation_log {
|
let nodes = match &self.conversation_log {
|
||||||
Some(log) => match log.read_tail(64 * 1024 * 1024) {
|
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
|
||||||
Ok(entries) if !entries.is_empty() => entries,
|
Ok(nodes) if !nodes.is_empty() => nodes,
|
||||||
_ => return false,
|
_ => return false,
|
||||||
},
|
},
|
||||||
None => return false,
|
None => return false,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Load extra — compact() will dedup, trim, reload identity + journal
|
self.context.clear(Section::Conversation);
|
||||||
let all: Vec<ContextEntry> = entries.into_iter()
|
for node in nodes {
|
||||||
.filter(|e| !e.is_log() && !e.is_thinking() && e.message().role != Role::System)
|
self.context.push(Section::Conversation, node);
|
||||||
.map(|e| {
|
}
|
||||||
let timestamp = if e.is_log() || e.is_thinking() { None } else {
|
|
||||||
e.message().timestamp.as_ref().and_then(|ts| {
|
|
||||||
chrono::DateTime::parse_from_rfc3339(ts).ok()
|
|
||||||
.map(|dt| dt.with_timezone(&chrono::Utc))
|
|
||||||
})
|
|
||||||
};
|
|
||||||
ContextEntry::new(e, timestamp)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
let mem_count = all.iter().filter(|e| e.entry.is_memory()).count();
|
|
||||||
let conv_count = all.len() - mem_count;
|
|
||||||
dbglog!("[restore] loaded {} entries from log (mem: {}, conv: {})",
|
|
||||||
all.len(), mem_count, conv_count);
|
|
||||||
self.context.conversation.set_entries(all);
|
|
||||||
self.compact();
|
self.compact();
|
||||||
// Estimate prompt tokens so status bar isn't 0 on startup
|
self.last_prompt_tokens = self.context.tokens() as u32;
|
||||||
self.last_prompt_tokens = self.context.total_tokens() as u32;
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Replace the API client (for model switching).
|
|
||||||
pub fn swap_client(&mut self, new_client: ApiClient) {
|
pub fn swap_client(&mut self, new_client: ApiClient) {
|
||||||
self.client = new_client;
|
self.client = new_client;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the model identifier.
|
|
||||||
pub fn model(&self) -> &str {
|
pub fn model(&self) -> &str {
|
||||||
&self.client.model
|
&self.client.model
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the conversation entries.
|
pub fn conversation(&self) -> &[AstNode] {
|
||||||
pub fn entries(&self) -> &[ContextEntry] {
|
self.context.conversation()
|
||||||
self.context.conversation.entries()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mutable access to conversation entries (for /retry).
|
|
||||||
pub fn client_clone(&self) -> ApiClient {
|
pub fn client_clone(&self) -> ApiClient {
|
||||||
self.client.clone()
|
self.client.clone()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ pub struct ActiveToolCall {
|
||||||
pub detail: String,
|
pub detail: String,
|
||||||
pub started: Instant,
|
pub started: Instant,
|
||||||
pub background: bool,
|
pub background: bool,
|
||||||
pub handle: tokio::task::JoinHandle<(ToolCall, String)>,
|
pub handle: tokio::task::JoinHandle<(super::context_new::PendingToolCall, String)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shared active tool calls — agent spawns, TUI reads metadata / aborts.
|
/// Shared active tool calls — agent spawns, TUI reads metadata / aborts.
|
||||||
|
|
|
||||||
|
|
@ -1,19 +1,8 @@
|
||||||
// observe.rs — Shared observation socket + logfile
|
|
||||||
//
|
|
||||||
// Two mechanisms:
|
|
||||||
// 1. Logfile (~/.consciousness/agent-sessions/observe.log) — append-only
|
|
||||||
// plain text of the conversation. `poc-agent read` prints new
|
|
||||||
// content since last read using a byte-offset cursor file.
|
|
||||||
// 2. Unix socket — for live streaming (`poc-agent read -f`) and
|
|
||||||
// sending input (`poc-agent write <msg>`).
|
|
||||||
//
|
|
||||||
// The logfile is the history. The socket is the live wire.
|
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use std::fs::{File, OpenOptions};
|
use std::fs::{File, OpenOptions};
|
||||||
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
|
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use crate::agent::context::ConversationEntry;
|
use crate::agent::context_new::AstNode;
|
||||||
|
|
||||||
pub struct ConversationLog {
|
pub struct ConversationLog {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
|
|
@ -21,7 +10,6 @@ pub struct ConversationLog {
|
||||||
|
|
||||||
impl ConversationLog {
|
impl ConversationLog {
|
||||||
pub fn new(path: PathBuf) -> Result<Self> {
|
pub fn new(path: PathBuf) -> Result<Self> {
|
||||||
// Ensure parent directory exists
|
|
||||||
if let Some(parent) = path.parent() {
|
if let Some(parent) = path.parent() {
|
||||||
std::fs::create_dir_all(parent)
|
std::fs::create_dir_all(parent)
|
||||||
.with_context(|| format!("creating log dir {}", parent.display()))?;
|
.with_context(|| format!("creating log dir {}", parent.display()))?;
|
||||||
|
|
@ -29,16 +17,15 @@ impl ConversationLog {
|
||||||
Ok(Self { path })
|
Ok(Self { path })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a conversation entry to the log.
|
pub fn append_node(&self, node: &AstNode) -> Result<()> {
|
||||||
pub fn append(&self, entry: &ConversationEntry) -> Result<()> {
|
|
||||||
let mut file = OpenOptions::new()
|
let mut file = OpenOptions::new()
|
||||||
.create(true)
|
.create(true)
|
||||||
.append(true)
|
.append(true)
|
||||||
.open(&self.path)
|
.open(&self.path)
|
||||||
.with_context(|| format!("opening log {}", self.path.display()))?;
|
.with_context(|| format!("opening log {}", self.path.display()))?;
|
||||||
|
|
||||||
let line = serde_json::to_string(entry)
|
let line = serde_json::to_string(node)
|
||||||
.context("serializing entry for log")?;
|
.context("serializing node for log")?;
|
||||||
writeln!(file, "{}", line)
|
writeln!(file, "{}", line)
|
||||||
.context("writing to conversation log")?;
|
.context("writing to conversation log")?;
|
||||||
file.sync_all()
|
file.sync_all()
|
||||||
|
|
@ -46,10 +33,7 @@ impl ConversationLog {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read the tail of the log (last `max_bytes` bytes).
|
pub fn read_nodes(&self, max_bytes: u64) -> Result<Vec<AstNode>> {
|
||||||
/// Seeks to `file_len - max_bytes`, skips the first partial line,
|
|
||||||
/// then parses forward. For logs smaller than `max_bytes`, reads everything.
|
|
||||||
pub fn read_tail(&self, max_bytes: u64) -> Result<Vec<ConversationEntry>> {
|
|
||||||
if !self.path.exists() {
|
if !self.path.exists() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
|
@ -60,45 +44,36 @@ impl ConversationLog {
|
||||||
|
|
||||||
if file_len > max_bytes {
|
if file_len > max_bytes {
|
||||||
reader.seek(SeekFrom::Start(file_len - max_bytes))?;
|
reader.seek(SeekFrom::Start(file_len - max_bytes))?;
|
||||||
// Skip partial first line
|
|
||||||
let mut discard = String::new();
|
let mut discard = String::new();
|
||||||
reader.read_line(&mut discard)?;
|
reader.read_line(&mut discard)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut entries = Vec::new();
|
let mut nodes = Vec::new();
|
||||||
for line in reader.lines() {
|
for line in reader.lines() {
|
||||||
let line = line.context("reading log tail")?;
|
let line = line.context("reading log tail")?;
|
||||||
let line = line.trim();
|
let line = line.trim();
|
||||||
if line.is_empty() {
|
if line.is_empty() { continue; }
|
||||||
continue;
|
if let Ok(node) = serde_json::from_str::<AstNode>(line) {
|
||||||
}
|
nodes.push(node);
|
||||||
// Try ConversationEntry first (new format), fall back to bare Message (old logs)
|
|
||||||
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(line) {
|
|
||||||
entries.push(entry);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(entries)
|
Ok(nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn path(&self) -> &Path {
|
pub fn path(&self) -> &Path {
|
||||||
&self.path
|
&self.path
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the timestamp of the oldest message in the log.
|
|
||||||
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
|
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
|
||||||
let file = File::open(&self.path).ok()?;
|
let file = File::open(&self.path).ok()?;
|
||||||
let reader = BufReader::new(file);
|
let reader = BufReader::new(file);
|
||||||
for line in reader.lines().flatten() {
|
for line in reader.lines().flatten() {
|
||||||
let line = line.trim().to_string();
|
let line = line.trim().to_string();
|
||||||
if line.is_empty() { continue; }
|
if line.is_empty() { continue; }
|
||||||
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(&line) {
|
if let Ok(node) = serde_json::from_str::<AstNode>(&line) {
|
||||||
if let Some(ts) = &entry.message().timestamp {
|
if let Some(leaf) = node.leaf() {
|
||||||
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) {
|
if let Some(ts) = leaf.timestamp() {
|
||||||
return Some(dt.to_utc());
|
return Some(ts);
|
||||||
}
|
|
||||||
// Try other formats
|
|
||||||
if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") {
|
|
||||||
return Some(dt.and_utc());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue