agent visits: track when agents successfully process nodes
New append-only visits.capnp log records which agent processed which
node and when. Only recorded on successful completion — transient
errors don't mark nodes as "seen."
Schema: AgentVisit{nodeUuid, nodeKey, agent, timestamp, outcome}
Storage: append_visits(), replay_visits(), in-memory VisitIndex
Recording: daemon records visits after successful LLM call
API: agent_prompt() returns AgentBatch{prompt, node_keys} so callers
know which nodes to mark as visited.
Groundwork for using visit recency in agent node selection — agents
will deprioritize recently-visited nodes.
This commit is contained in:
parent
9f14a29181
commit
0e1e5a1981
6 changed files with 237 additions and 34 deletions
|
|
@ -98,8 +98,8 @@ pub fn consolidate_full_with_progress(
|
|||
*store = Store::load()?;
|
||||
}
|
||||
|
||||
let prompt = match super::prompts::agent_prompt(store, agent_type, *count) {
|
||||
Ok(p) => p,
|
||||
let agent_batch = match super::prompts::agent_prompt(store, agent_type, *count) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
let msg = format!(" ERROR building prompt: {}", e);
|
||||
log_line(&mut log_buf, &msg);
|
||||
|
|
@ -109,10 +109,10 @@ pub fn consolidate_full_with_progress(
|
|||
}
|
||||
};
|
||||
|
||||
log_line(&mut log_buf, &format!(" Prompt: {} chars (~{} tokens)",
|
||||
prompt.len(), prompt.len() / 4));
|
||||
log_line(&mut log_buf, &format!(" Prompt: {} chars (~{} tokens), {} nodes",
|
||||
agent_batch.prompt.len(), agent_batch.prompt.len() / 4, agent_batch.node_keys.len()));
|
||||
|
||||
let response = match call_sonnet("consolidate", &prompt) {
|
||||
let response = match call_sonnet("consolidate", &agent_batch.prompt) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let msg = format!(" ERROR from Sonnet: {}", e);
|
||||
|
|
@ -131,6 +131,13 @@ pub fn consolidate_full_with_progress(
|
|||
store::Provenance::AgentConsolidate).ok();
|
||||
reports.push(report_key.clone());
|
||||
|
||||
// Record visits for successfully processed nodes
|
||||
if !agent_batch.node_keys.is_empty() {
|
||||
if let Err(e) = store.record_agent_visits(&agent_batch.node_keys, agent_type) {
|
||||
log_line(&mut log_buf, &format!(" Visit recording: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
let msg = format!(" Done: {} lines → {}", response.lines().count(), report_key);
|
||||
log_line(&mut log_buf, &msg);
|
||||
on_progress(&msg);
|
||||
|
|
|
|||
|
|
@ -137,10 +137,11 @@ fn job_consolidation_agent(
|
|||
};
|
||||
ctx.log_line(&format!("building prompt: {}", label));
|
||||
|
||||
let prompt = super::prompts::agent_prompt(&store, &agent, batch)?;
|
||||
ctx.log_line(&format!("prompt: {} chars, calling Sonnet", prompt.len()));
|
||||
let agent_batch = super::prompts::agent_prompt(&store, &agent, batch)?;
|
||||
ctx.log_line(&format!("prompt: {} chars ({} nodes), calling Sonnet",
|
||||
agent_batch.prompt.len(), agent_batch.node_keys.len()));
|
||||
|
||||
let response = super::llm::call_sonnet("consolidate", &prompt)?;
|
||||
let response = super::llm::call_sonnet("consolidate", &agent_batch.prompt)?;
|
||||
|
||||
let ts = crate::store::format_datetime(crate::store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
|
|
@ -148,6 +149,13 @@ fn job_consolidation_agent(
|
|||
store.upsert_provenance(&report_key, &response,
|
||||
crate::store::Provenance::AgentConsolidate).ok();
|
||||
|
||||
// Record visits for successfully processed nodes
|
||||
if !agent_batch.node_keys.is_empty() {
|
||||
if let Err(e) = store.record_agent_visits(&agent_batch.node_keys, &agent) {
|
||||
ctx.log_line(&format!("visit recording: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
ctx.log_line(&format!("done: {} lines → {}", response.lines().count(), report_key));
|
||||
Ok(())
|
||||
})
|
||||
|
|
@ -165,14 +173,16 @@ fn job_rename_agent(
|
|||
let batch = if batch_size == 0 { 10 } else { batch_size };
|
||||
ctx.log_line(&format!("building prompt: rename (batch={})", batch));
|
||||
|
||||
let prompt = super::prompts::agent_prompt(&store, "rename", batch)?;
|
||||
ctx.log_line(&format!("prompt: {} chars, calling Sonnet", prompt.len()));
|
||||
let agent_batch = super::prompts::agent_prompt(&store, "rename", batch)?;
|
||||
ctx.log_line(&format!("prompt: {} chars ({} nodes), calling Sonnet",
|
||||
agent_batch.prompt.len(), agent_batch.node_keys.len()));
|
||||
|
||||
let response = super::llm::call_sonnet("consolidate", &prompt)?;
|
||||
let response = super::llm::call_sonnet("consolidate", &agent_batch.prompt)?;
|
||||
|
||||
// Parse RENAME actions directly from response
|
||||
let mut applied = 0;
|
||||
let mut skipped = 0;
|
||||
let mut successfully_renamed: Vec<String> = Vec::new();
|
||||
for line in response.lines() {
|
||||
let trimmed = line.trim();
|
||||
if !trimmed.starts_with("RENAME ") { continue; }
|
||||
|
|
@ -208,6 +218,7 @@ fn job_rename_agent(
|
|||
match store.rename_node(&resolved, new_key) {
|
||||
Ok(()) => {
|
||||
ctx.log_line(&format!("renamed: {} → {}", resolved, new_key));
|
||||
successfully_renamed.push(new_key.to_string());
|
||||
applied += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
@ -221,6 +232,13 @@ fn job_rename_agent(
|
|||
store.save()?;
|
||||
}
|
||||
|
||||
// Record visits for successfully renamed nodes
|
||||
if !successfully_renamed.is_empty() {
|
||||
if let Err(e) = store.record_agent_visits(&successfully_renamed, "rename") {
|
||||
ctx.log_line(&format!("visit recording: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Also store the report for auditing
|
||||
let ts = crate::store::format_datetime(crate::store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
|
|
|
|||
|
|
@ -11,6 +11,14 @@ use crate::neuro::{
|
|||
replay_queue, replay_queue_with_graph, detect_interference,
|
||||
};
|
||||
|
||||
/// Result of building an agent prompt — includes both the prompt text
|
||||
/// and the keys of nodes selected for processing, so the caller can
|
||||
/// record visits after successful completion.
|
||||
pub struct AgentBatch {
|
||||
pub prompt: String,
|
||||
pub node_keys: Vec<String>,
|
||||
}
|
||||
|
||||
/// Load a prompt template, replacing {{PLACEHOLDER}} with data
|
||||
pub fn load_prompt(name: &str, replacements: &[(&str, &str)]) -> Result<String, String> {
|
||||
let path = crate::config::get().prompts_dir.join(format!("{}.md", name));
|
||||
|
|
@ -260,28 +268,23 @@ fn format_pairs_section(
|
|||
out
|
||||
}
|
||||
|
||||
/// Format rename candidates: nodes with auto-generated or opaque keys
|
||||
fn format_rename_candidates(store: &Store, count: usize) -> String {
|
||||
/// Format rename candidates, returning both keys and formatted section
|
||||
fn format_rename_candidates_with_keys(store: &Store, count: usize) -> (Vec<String>, String) {
|
||||
let mut candidates: Vec<(&str, &crate::store::Node)> = store.nodes.iter()
|
||||
.filter(|(key, _)| {
|
||||
// Only rename nodes with long auto-generated keys
|
||||
if key.len() < 60 { return false; }
|
||||
|
||||
// Journal entries with auto-slugs
|
||||
if key.starts_with("journal#j-") { return true; }
|
||||
|
||||
// Mined transcripts with UUIDs
|
||||
if key.starts_with("_mined-transcripts#f-") { return true; }
|
||||
|
||||
false
|
||||
})
|
||||
.map(|(k, n)| (k.as_str(), n))
|
||||
.collect();
|
||||
|
||||
// Sort by timestamp (newest first) so we rename recent stuff first
|
||||
candidates.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
|
||||
candidates.truncate(count);
|
||||
|
||||
let keys: Vec<String> = candidates.iter().map(|(k, _)| k.to_string()).collect();
|
||||
|
||||
let mut out = String::new();
|
||||
out.push_str(&format!("## Nodes to rename ({} of {} candidates)\n\n",
|
||||
candidates.len(),
|
||||
|
|
@ -308,7 +311,7 @@ fn format_rename_candidates(store: &Store, count: usize) -> String {
|
|||
|
||||
out.push_str("---\n\n");
|
||||
}
|
||||
out
|
||||
(keys, out)
|
||||
}
|
||||
|
||||
/// Get split candidates sorted by size (largest first)
|
||||
|
|
@ -437,8 +440,10 @@ pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<()
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate a specific agent prompt with filled-in data
|
||||
pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String, String> {
|
||||
/// Generate a specific agent prompt with filled-in data.
|
||||
/// Returns an AgentBatch with the prompt text and the keys of nodes
|
||||
/// selected for processing (for visit tracking on success).
|
||||
pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<AgentBatch, String> {
|
||||
let graph = store.build_graph();
|
||||
let topology = format_topology_header(&graph);
|
||||
|
||||
|
|
@ -447,8 +452,10 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String,
|
|||
match agent {
|
||||
"replay" => {
|
||||
let items = replay_queue_with_graph(store, count, &graph, emb.as_ref());
|
||||
let keys: Vec<String> = items.iter().map(|i| i.key.clone()).collect();
|
||||
let nodes_section = format_nodes_section(store, &items, &graph);
|
||||
load_prompt("replay", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])
|
||||
let prompt = load_prompt("replay", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])?;
|
||||
Ok(AgentBatch { prompt, node_keys: keys })
|
||||
}
|
||||
"linker" => {
|
||||
// Filter to episodic entries
|
||||
|
|
@ -459,14 +466,21 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String,
|
|||
.unwrap_or(false)
|
||||
});
|
||||
items.truncate(count);
|
||||
let keys: Vec<String> = items.iter().map(|i| i.key.clone()).collect();
|
||||
let nodes_section = format_nodes_section(store, &items, &graph);
|
||||
load_prompt("linker", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])
|
||||
let prompt = load_prompt("linker", &[("{{TOPOLOGY}}", &topology), ("{{NODES}}", &nodes_section)])?;
|
||||
Ok(AgentBatch { prompt, node_keys: keys })
|
||||
}
|
||||
"separator" => {
|
||||
let mut pairs = detect_interference(store, &graph, 0.5);
|
||||
pairs.truncate(count);
|
||||
// Both nodes in each pair count as visited
|
||||
let keys: Vec<String> = pairs.iter()
|
||||
.flat_map(|(a, b, _)| vec![a.clone(), b.clone()])
|
||||
.collect();
|
||||
let pairs_section = format_pairs_section(&pairs, store, &graph);
|
||||
load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)])
|
||||
let prompt = load_prompt("separator", &[("{{TOPOLOGY}}", &topology), ("{{PAIRS}}", &pairs_section)])?;
|
||||
Ok(AgentBatch { prompt, node_keys: keys })
|
||||
}
|
||||
"transfer" => {
|
||||
// Recent episodic entries
|
||||
|
|
@ -493,15 +507,19 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String,
|
|||
})
|
||||
.collect();
|
||||
let episodes_section = format_nodes_section(store, &items, &graph);
|
||||
load_prompt("transfer", &[("{{TOPOLOGY}}", &topology), ("{{EPISODES}}", &episodes_section)])
|
||||
let prompt = load_prompt("transfer", &[("{{TOPOLOGY}}", &topology), ("{{EPISODES}}", &episodes_section)])?;
|
||||
Ok(AgentBatch { prompt, node_keys: episode_keys })
|
||||
}
|
||||
"health" => {
|
||||
// Health agent analyzes the whole graph, no specific nodes
|
||||
let health_section = format_health_section(store, &graph);
|
||||
load_prompt("health", &[("{{TOPOLOGY}}", &topology), ("{{HEALTH}}", &health_section)])
|
||||
let prompt = load_prompt("health", &[("{{TOPOLOGY}}", &topology), ("{{HEALTH}}", &health_section)])?;
|
||||
Ok(AgentBatch { prompt, node_keys: vec![] })
|
||||
}
|
||||
"rename" => {
|
||||
let nodes_section = format_rename_candidates(store, count);
|
||||
load_prompt("rename", &[("{{NODES}}", &nodes_section)])
|
||||
let (keys, nodes_section) = format_rename_candidates_with_keys(store, count);
|
||||
let prompt = load_prompt("rename", &[("{{NODES}}", &nodes_section)])?;
|
||||
Ok(AgentBatch { prompt, node_keys: keys })
|
||||
}
|
||||
"split" => {
|
||||
// Phase 1: plan prompt for the largest candidate
|
||||
|
|
@ -509,9 +527,10 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String,
|
|||
if candidates.is_empty() {
|
||||
return Err("No nodes large enough to split".to_string());
|
||||
}
|
||||
let key = &candidates[0];
|
||||
let node_section = format_split_plan_node(store, &graph, key);
|
||||
load_prompt("split-plan", &[("{{TOPOLOGY}}", &topology), ("{{NODE}}", &node_section)])
|
||||
let key = candidates[0].clone();
|
||||
let node_section = format_split_plan_node(store, &graph, &key);
|
||||
let prompt = load_prompt("split-plan", &[("{{TOPOLOGY}}", &topology), ("{{NODE}}", &node_section)])?;
|
||||
Ok(AgentBatch { prompt, node_keys: vec![key] })
|
||||
}
|
||||
_ => Err(format!("Unknown agent: {}. Use: replay, linker, separator, transfer, health, rename, split", agent)),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,14 @@ impl Store {
|
|||
pub fn load() -> Result<Store, String> {
|
||||
// 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy)
|
||||
match Self::load_snapshot_mmap() {
|
||||
Ok(Some(store)) => return Ok(store),
|
||||
Ok(Some(mut store)) => {
|
||||
// rkyv snapshot doesn't include visits — replay from log
|
||||
let visits_p = visits_path();
|
||||
if visits_p.exists() {
|
||||
store.replay_visits(&visits_p).ok();
|
||||
}
|
||||
return Ok(store);
|
||||
},
|
||||
Ok(None) => {},
|
||||
Err(e) => eprintln!("rkyv snapshot: {}", e),
|
||||
}
|
||||
|
|
@ -76,6 +83,10 @@ impl Store {
|
|||
if rels_p.exists() {
|
||||
store.replay_relations(&rels_p)?;
|
||||
}
|
||||
let visits_p = visits_path();
|
||||
if visits_p.exists() {
|
||||
store.replay_visits(&visits_p)?;
|
||||
}
|
||||
|
||||
// Record log sizes after replay — this is the state we reflect
|
||||
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
||||
|
|
@ -318,6 +329,100 @@ impl Store {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Append agent visit records to the visits log.
|
||||
pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<(), String> {
|
||||
if visits.is_empty() { return Ok(()); }
|
||||
|
||||
let mut msg = message::Builder::new_default();
|
||||
{
|
||||
let log = msg.init_root::<memory_capnp::agent_visit_log::Builder>();
|
||||
let mut list = log.init_visits(visits.len() as u32);
|
||||
for (i, visit) in visits.iter().enumerate() {
|
||||
visit.to_capnp(list.reborrow().get(i as u32));
|
||||
}
|
||||
}
|
||||
let mut buf = Vec::new();
|
||||
serialize::write_message(&mut buf, &msg)
|
||||
.map_err(|e| format!("serialize visits: {}", e))?;
|
||||
|
||||
let path = visits_path();
|
||||
let file = fs::OpenOptions::new()
|
||||
.create(true).append(true).open(&path)
|
||||
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||||
use std::io::Write;
|
||||
(&file).write_all(&buf)
|
||||
.map_err(|e| format!("write visits: {}", e))?;
|
||||
|
||||
// Update in-memory index
|
||||
for v in visits {
|
||||
self.visits
|
||||
.entry(v.node_key.clone())
|
||||
.or_default()
|
||||
.insert(v.agent.clone(), v.timestamp);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Replay visits log to rebuild in-memory index.
|
||||
fn replay_visits(&mut self, path: &Path) -> Result<(), String> {
|
||||
let file = fs::File::open(path)
|
||||
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||||
let mut reader = BufReader::new(file);
|
||||
|
||||
while reader.stream_position().map_err(|e| e.to_string())?
|
||||
< fs::metadata(path).map_err(|e| e.to_string())?.len()
|
||||
{
|
||||
let msg = match serialize::read_message(&mut reader, Default::default()) {
|
||||
Ok(m) => m,
|
||||
Err(_) => break,
|
||||
};
|
||||
let log = msg.get_root::<memory_capnp::agent_visit_log::Reader>()
|
||||
.map_err(|e| format!("read visit log: {}", e))?;
|
||||
|
||||
for visit in log.get_visits().map_err(|e| e.to_string())? {
|
||||
let key = visit.get_node_key().ok()
|
||||
.and_then(|t| t.to_str().ok())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let agent = visit.get_agent().ok()
|
||||
.and_then(|t| t.to_str().ok())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let ts = visit.get_timestamp();
|
||||
|
||||
if !key.is_empty() && !agent.is_empty() {
|
||||
let entry = self.visits.entry(key).or_default();
|
||||
// Keep latest timestamp per agent
|
||||
let existing = entry.entry(agent).or_insert(0);
|
||||
if ts > *existing {
|
||||
*existing = ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Record visits for a batch of node keys from a successful agent run.
|
||||
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> {
|
||||
let visits: Vec<AgentVisit> = node_keys.iter()
|
||||
.filter_map(|key| {
|
||||
let node = self.nodes.get(key)?;
|
||||
Some(new_visit(node.uuid, key, agent, "processed"))
|
||||
})
|
||||
.collect();
|
||||
self.append_visits(&visits)
|
||||
}
|
||||
|
||||
/// Get the last time an agent visited a node. Returns 0 if never visited.
|
||||
pub fn last_visited(&self, node_key: &str, agent: &str) -> i64 {
|
||||
self.visits.get(node_key)
|
||||
.and_then(|agents| agents.get(agent))
|
||||
.copied()
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Save the derived cache with log size header for staleness detection.
|
||||
/// Uses atomic write (tmp + rename) to prevent partial reads.
|
||||
pub fn save(&self) -> Result<(), String> {
|
||||
|
|
|
|||
|
|
@ -394,6 +394,9 @@ pub struct GapRecord {
|
|||
pub timestamp: String,
|
||||
}
|
||||
|
||||
/// Per-node agent visit index: node_key → (agent_type → last_visit_timestamp)
|
||||
pub type VisitIndex = HashMap<String, HashMap<String, i64>>;
|
||||
|
||||
// The full in-memory store
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
pub struct Store {
|
||||
|
|
@ -404,6 +407,9 @@ pub struct Store {
|
|||
pub retrieval_log: Vec<RetrievalEvent>,
|
||||
pub gaps: Vec<GapRecord>,
|
||||
pub params: Params,
|
||||
/// Agent visit tracking: node_key → (agent_type → last_visit_epoch)
|
||||
#[serde(default)]
|
||||
pub visits: VisitIndex,
|
||||
/// Log sizes at load time — used by save() to write correct staleness header.
|
||||
/// If another writer appended since we loaded, our cache will be marked stale
|
||||
/// (recorded size < actual size), forcing the next reader to replay the log.
|
||||
|
|
@ -490,6 +496,38 @@ pub fn new_node(key: &str, content: &str) -> Node {
|
|||
}
|
||||
}
|
||||
|
||||
/// Agent visit record — tracks when an agent successfully processed a node
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct AgentVisit {
|
||||
pub node_uuid: [u8; 16],
|
||||
pub node_key: String,
|
||||
pub agent: String,
|
||||
pub timestamp: i64,
|
||||
pub outcome: String,
|
||||
}
|
||||
|
||||
capnp_message!(AgentVisit,
|
||||
reader: memory_capnp::agent_visit::Reader<'_>,
|
||||
builder: memory_capnp::agent_visit::Builder<'_>,
|
||||
text: [node_key, agent, outcome],
|
||||
uuid: [node_uuid],
|
||||
prim: [timestamp],
|
||||
enm: [],
|
||||
skip: [],
|
||||
);
|
||||
|
||||
pub fn new_visit(node_uuid: [u8; 16], node_key: &str, agent: &str, outcome: &str) -> AgentVisit {
|
||||
AgentVisit {
|
||||
node_uuid,
|
||||
node_key: node_key.to_string(),
|
||||
agent: agent.to_string(),
|
||||
timestamp: now_epoch(),
|
||||
outcome: outcome.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn visits_path() -> PathBuf { memory_dir().join("visits.capnp") }
|
||||
|
||||
/// Create a new relation
|
||||
pub fn new_relation(
|
||||
source_uuid: [u8; 16],
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue