diff --git a/poc-memory/schema/memory.capnp b/poc-memory/schema/memory.capnp index 4111769..bf7df39 100644 --- a/poc-memory/schema/memory.capnp +++ b/poc-memory/schema/memory.capnp @@ -101,3 +101,19 @@ struct NodeLog { struct RelationLog { relations @0 :List(Relation); } + +# Agent visit tracking — separate append-only log. +# Records when an agent successfully processed a node. +# Used to deprioritize recently-visited nodes in agent selection. + +struct AgentVisit { + nodeUuid @0 :Data; # 16 bytes — which node + nodeKey @1 :Text; # human-readable key (for debugging) + agent @2 :Text; # agent type: "linker", "rename", "replay", etc. + timestamp @3 :Int64; # unix epoch seconds + outcome @4 :Text; # "processed", "skipped", "modified" — optional detail +} + +struct AgentVisitLog { + visits @0 :List(AgentVisit); +} diff --git a/poc-memory/src/agents/consolidate.rs b/poc-memory/src/agents/consolidate.rs index 20487a5..e24d4cb 100644 --- a/poc-memory/src/agents/consolidate.rs +++ b/poc-memory/src/agents/consolidate.rs @@ -98,8 +98,8 @@ pub fn consolidate_full_with_progress( *store = Store::load()?; } - let prompt = match super::prompts::agent_prompt(store, agent_type, *count) { - Ok(p) => p, + let agent_batch = match super::prompts::agent_prompt(store, agent_type, *count) { + Ok(b) => b, Err(e) => { let msg = format!(" ERROR building prompt: {}", e); log_line(&mut log_buf, &msg); @@ -109,10 +109,10 @@ pub fn consolidate_full_with_progress( } }; - log_line(&mut log_buf, &format!(" Prompt: {} chars (~{} tokens)", - prompt.len(), prompt.len() / 4)); + log_line(&mut log_buf, &format!(" Prompt: {} chars (~{} tokens), {} nodes", + agent_batch.prompt.len(), agent_batch.prompt.len() / 4, agent_batch.node_keys.len())); - let response = match call_sonnet("consolidate", &prompt) { + let response = match call_sonnet("consolidate", &agent_batch.prompt) { Ok(r) => r, Err(e) => { let msg = format!(" ERROR from Sonnet: {}", e); @@ -131,6 +131,13 @@ pub fn consolidate_full_with_progress( store::Provenance::AgentConsolidate).ok(); reports.push(report_key.clone()); + // Record visits for successfully processed nodes + if !agent_batch.node_keys.is_empty() { + if let Err(e) = store.record_agent_visits(&agent_batch.node_keys, agent_type) { + log_line(&mut log_buf, &format!(" Visit recording: {}", e)); + } + } + let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key); log_line(&mut log_buf, &msg); on_progress(&msg); diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index 2464878..bce2407 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -137,10 +137,11 @@ fn job_consolidation_agent( }; ctx.log_line(&format!("building prompt: {}", label)); - let prompt = super::prompts::agent_prompt(&store, &agent, batch)?; - ctx.log_line(&format!("prompt: {} chars, calling Sonnet", prompt.len())); + let agent_batch = super::prompts::agent_prompt(&store, &agent, batch)?; + ctx.log_line(&format!("prompt: {} chars ({} nodes), calling Sonnet", + agent_batch.prompt.len(), agent_batch.node_keys.len())); - let response = super::llm::call_sonnet("consolidate", &prompt)?; + let response = super::llm::call_sonnet("consolidate", &agent_batch.prompt)?; let ts = crate::store::format_datetime(crate::store::now_epoch()) .replace([':', '-', 'T'], ""); @@ -148,6 +149,13 @@ fn job_consolidation_agent( store.upsert_provenance(&report_key, &response, crate::store::Provenance::AgentConsolidate).ok(); + // Record visits for successfully processed nodes + if !agent_batch.node_keys.is_empty() { + if let Err(e) = store.record_agent_visits(&agent_batch.node_keys, &agent) { + ctx.log_line(&format!("visit recording: {}", e)); + } + } + ctx.log_line(&format!("done: {} lines → {}", response.lines().count(), report_key)); Ok(()) }) @@ -165,14 +173,16 @@ fn job_rename_agent( let batch = if batch_size == 0 { 10 } else { batch_size }; ctx.log_line(&format!("building prompt: rename (batch={})", batch)); - let prompt = super::prompts::agent_prompt(&store, "rename", batch)?; - ctx.log_line(&format!("prompt: {} chars, calling Sonnet", prompt.len())); + let agent_batch = super::prompts::agent_prompt(&store, "rename", batch)?; + ctx.log_line(&format!("prompt: {} chars ({} nodes), calling Sonnet", + agent_batch.prompt.len(), agent_batch.node_keys.len())); - let response = super::llm::call_sonnet("consolidate", &prompt)?; + let response = super::llm::call_sonnet("consolidate", &agent_batch.prompt)?; // Parse RENAME actions directly from response let mut applied = 0; let mut skipped = 0; + let mut successfully_renamed: Vec = Vec::new(); for line in response.lines() { let trimmed = line.trim(); if !trimmed.starts_with("RENAME ") { continue; } @@ -208,6 +218,7 @@ fn job_rename_agent( match store.rename_node(&resolved, new_key) { Ok(()) => { ctx.log_line(&format!("renamed: {} → {}", resolved, new_key)); + successfully_renamed.push(new_key.to_string()); applied += 1; } Err(e) => { @@ -221,6 +232,13 @@ fn job_rename_agent( store.save()?; } + // Record visits for successfully renamed nodes + if !successfully_renamed.is_empty() { + if let Err(e) = store.record_agent_visits(&successfully_renamed, "rename") { + ctx.log_line(&format!("visit recording: {}", e)); + } + } + // Also store the report for auditing let ts = crate::store::format_datetime(crate::store::now_epoch()) .replace([':', '-', 'T'], ""); diff --git a/poc-memory/src/agents/prompts.rs b/poc-memory/src/agents/prompts.rs index 1d66938..1cc15a4 100644 --- a/poc-memory/src/agents/prompts.rs +++ b/poc-memory/src/agents/prompts.rs @@ -11,6 +11,14 @@ use crate::neuro::{ replay_queue, replay_queue_with_graph, detect_interference, }; +/// Result of building an agent prompt — includes both the prompt text +/// and the keys of nodes selected for processing, so the caller can +/// record visits after successful completion. +pub struct AgentBatch { + pub prompt: String, + pub node_keys: Vec, +} + /// Load a prompt template, replacing {{PLACEHOLDER}} with data pub fn load_prompt(name: &str, replacements: &[(&str, &str)]) -> Result { let path = crate::config::get().prompts_dir.join(format!("{}.md", name)); @@ -260,28 +268,23 @@ fn format_pairs_section( out } -/// Format rename candidates: nodes with auto-generated or opaque keys -fn format_rename_candidates(store: &Store, count: usize) -> String { +/// Format rename candidates, returning both keys and formatted section +fn format_rename_candidates_with_keys(store: &Store, count: usize) -> (Vec, String) { let mut candidates: Vec<(&str, &crate::store::Node)> = store.nodes.iter() .filter(|(key, _)| { - // Only rename nodes with long auto-generated keys if key.len() < 60 { return false; } - - // Journal entries with auto-slugs if key.starts_with("journal#j-") { return true; } - - // Mined transcripts with UUIDs if key.starts_with("_mined-transcripts#f-") { return true; } - false }) .map(|(k, n)| (k.as_str(), n)) .collect(); - // Sort by timestamp (newest first) so we rename recent stuff first candidates.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp)); candidates.truncate(count); + let keys: Vec = candidates.iter().map(|(k, _)| k.to_string()).collect(); + let mut out = String::new(); out.push_str(&format!("## Nodes to rename ({} of {} candidates)\n\n", candidates.len(), @@ -308,7 +311,7 @@ fn format_rename_candidates(store: &Store, count: usize) -> String { out.push_str("---\n\n"); } - out + (keys, out) } /// Get split candidates sorted by size (largest first) @@ -437,8 +440,10 @@ pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<() Ok(()) } -/// Generate a specific agent prompt with filled-in data -pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result { +/// Generate a specific agent prompt with filled-in data. +/// Returns an AgentBatch with the prompt text and the keys of nodes +/// selected for processing (for visit tracking on success). +pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result { let graph = store.build_graph(); let topology = format_topology_header(&graph); @@ -447,8 +452,10 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result { let items = replay_queue_with_graph(store, count, &graph, emb.as_ref()); + let keys: Vec = items.iter().map(|i| i.key.clone()).collect(); let nodes_section = format_nodes_section(store, &items, &graph); - load_prompt("replay", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)]) + let prompt = load_prompt("replay", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])?; + Ok(AgentBatch { prompt, node_keys: keys }) } "linker" => { // Filter to episodic entries @@ -459,14 +466,21 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result = items.iter().map(|i| i.key.clone()).collect(); let nodes_section = format_nodes_section(store, &items, &graph); - load_prompt("linker", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)]) + let prompt = load_prompt("linker", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])?; + Ok(AgentBatch { prompt, node_keys: keys }) } "separator" => { let mut pairs = detect_interference(store, &graph, 0.5); pairs.truncate(count); + // Both nodes in each pair count as visited + let keys: Vec = pairs.iter() + .flat_map(|(a, b, _)| vec![a.clone(), b.clone()]) + .collect(); let pairs_section = format_pairs_section(&pairs, store, &graph); - load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)]) + let prompt = load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)])?; + Ok(AgentBatch { prompt, node_keys: keys }) } "transfer" => { // Recent episodic entries @@ -493,15 +507,19 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result { + // Health agent analyzes the whole graph, no specific nodes let health_section = format_health_section(store, &graph); - load_prompt("health", &[("{{TOPOLOGY}}", &topology), ("{{HEALTH}}", &health_section)]) + let prompt = load_prompt("health", &[("{{TOPOLOGY}}", &topology), ("{{HEALTH}}", &health_section)])?; + Ok(AgentBatch { prompt, node_keys: vec![] }) } "rename" => { - let nodes_section = format_rename_candidates(store, count); - load_prompt("rename", &[("{{NODES}}", &nodes_section)]) + let (keys, nodes_section) = format_rename_candidates_with_keys(store, count); + let prompt = load_prompt("rename", &[("{{NODES}}", &nodes_section)])?; + Ok(AgentBatch { prompt, node_keys: keys }) } "split" => { // Phase 1: plan prompt for the largest candidate @@ -509,9 +527,10 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result Err(format!("Unknown agent: {}. Use: replay, linker, separator, transfer, health, rename, split", agent)), } diff --git a/poc-memory/src/store/persist.rs b/poc-memory/src/store/persist.rs index 46185c8..d93d283 100644 --- a/poc-memory/src/store/persist.rs +++ b/poc-memory/src/store/persist.rs @@ -29,7 +29,14 @@ impl Store { pub fn load() -> Result { // 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy) match Self::load_snapshot_mmap() { - Ok(Some(store)) => return Ok(store), + Ok(Some(mut store)) => { + // rkyv snapshot doesn't include visits — replay from log + let visits_p = visits_path(); + if visits_p.exists() { + store.replay_visits(&visits_p).ok(); + } + return Ok(store); + }, Ok(None) => {}, Err(e) => eprintln!("rkyv snapshot: {}", e), } @@ -76,6 +83,10 @@ impl Store { if rels_p.exists() { store.replay_relations(&rels_p)?; } + let visits_p = visits_path(); + if visits_p.exists() { + store.replay_visits(&visits_p)?; + } // Record log sizes after replay — this is the state we reflect store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); @@ -318,6 +329,100 @@ impl Store { Ok(()) } + /// Append agent visit records to the visits log. + pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<(), String> { + if visits.is_empty() { return Ok(()); } + + let mut msg = message::Builder::new_default(); + { + let log = msg.init_root::(); + let mut list = log.init_visits(visits.len() as u32); + for (i, visit) in visits.iter().enumerate() { + visit.to_capnp(list.reborrow().get(i as u32)); + } + } + let mut buf = Vec::new(); + serialize::write_message(&mut buf, &msg) + .map_err(|e| format!("serialize visits: {}", e))?; + + let path = visits_path(); + let file = fs::OpenOptions::new() + .create(true).append(true).open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + use std::io::Write; + (&file).write_all(&buf) + .map_err(|e| format!("write visits: {}", e))?; + + // Update in-memory index + for v in visits { + self.visits + .entry(v.node_key.clone()) + .or_default() + .insert(v.agent.clone(), v.timestamp); + } + + Ok(()) + } + + /// Replay visits log to rebuild in-memory index. + fn replay_visits(&mut self, path: &Path) -> Result<(), String> { + let file = fs::File::open(path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut reader = BufReader::new(file); + + while reader.stream_position().map_err(|e| e.to_string())? + < fs::metadata(path).map_err(|e| e.to_string())?.len() + { + let msg = match serialize::read_message(&mut reader, Default::default()) { + Ok(m) => m, + Err(_) => break, + }; + let log = msg.get_root::() + .map_err(|e| format!("read visit log: {}", e))?; + + for visit in log.get_visits().map_err(|e| e.to_string())? { + let key = visit.get_node_key().ok() + .and_then(|t| t.to_str().ok()) + .unwrap_or("") + .to_string(); + let agent = visit.get_agent().ok() + .and_then(|t| t.to_str().ok()) + .unwrap_or("") + .to_string(); + let ts = visit.get_timestamp(); + + if !key.is_empty() && !agent.is_empty() { + let entry = self.visits.entry(key).or_default(); + // Keep latest timestamp per agent + let existing = entry.entry(agent).or_insert(0); + if ts > *existing { + *existing = ts; + } + } + } + } + Ok(()) + } + + /// Record visits for a batch of node keys from a successful agent run. + pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> { + let visits: Vec = node_keys.iter() + .filter_map(|key| { + let node = self.nodes.get(key)?; + Some(new_visit(node.uuid, key, agent, "processed")) + }) + .collect(); + self.append_visits(&visits) + } + + /// Get the last time an agent visited a node. Returns 0 if never visited. + pub fn last_visited(&self, node_key: &str, agent: &str) -> i64 { + self.visits.get(node_key) + .and_then(|agents| agents.get(agent)) + .copied() + .unwrap_or(0) + } + /// Save the derived cache with log size header for staleness detection. /// Uses atomic write (tmp + rename) to prevent partial reads. pub fn save(&self) -> Result<(), String> { diff --git a/poc-memory/src/store/types.rs b/poc-memory/src/store/types.rs index 6510d5e..754abdf 100644 --- a/poc-memory/src/store/types.rs +++ b/poc-memory/src/store/types.rs @@ -394,6 +394,9 @@ pub struct GapRecord { pub timestamp: String, } +/// Per-node agent visit index: node_key → (agent_type → last_visit_timestamp) +pub type VisitIndex = HashMap>; + // The full in-memory store #[derive(Default, Serialize, Deserialize)] pub struct Store { @@ -404,6 +407,9 @@ pub struct Store { pub retrieval_log: Vec, pub gaps: Vec, pub params: Params, + /// Agent visit tracking: node_key → (agent_type → last_visit_epoch) + #[serde(default)] + pub visits: VisitIndex, /// Log sizes at load time — used by save() to write correct staleness header. /// If another writer appended since we loaded, our cache will be marked stale /// (recorded size < actual size), forcing the next reader to replay the log. @@ -490,6 +496,38 @@ pub fn new_node(key: &str, content: &str) -> Node { } } +/// Agent visit record — tracks when an agent successfully processed a node +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AgentVisit { + pub node_uuid: [u8; 16], + pub node_key: String, + pub agent: String, + pub timestamp: i64, + pub outcome: String, +} + +capnp_message!(AgentVisit, + reader: memory_capnp::agent_visit::Reader<'_>, + builder: memory_capnp::agent_visit::Builder<'_>, + text: [node_key, agent, outcome], + uuid: [node_uuid], + prim: [timestamp], + enm: [], + skip: [], +); + +pub fn new_visit(node_uuid: [u8; 16], node_key: &str, agent: &str, outcome: &str) -> AgentVisit { + AgentVisit { + node_uuid, + node_key: node_key.to_string(), + agent: agent.to_string(), + timestamp: now_epoch(), + outcome: outcome.to_string(), + } +} + +pub(crate) fn visits_path() -> PathBuf { memory_dir().join("visits.capnp") } + /// Create a new relation pub fn new_relation( source_uuid: [u8; 16],