store: remove visit tracking infrastructure

Remove AgentVisit, TranscriptSegment, and all related visit tracking code.
Provenance is what we've been using to track agent interaction with nodes.

Also removes dead fields from Node (state_tag, created).

-349 lines.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-13 18:57:12 -04:00
parent 7d49f29fde
commit a1accc7cd4
8 changed files with 7 additions and 349 deletions

View file

@ -7,7 +7,7 @@
// Also contains the legacy run_one_agent() pipeline and process
// management for spawned agent subprocesses.
use crate::store::{self, Store};
use crate::store;
use crate::subconscious::{defs, prompts};
use std::collections::HashMap;
@ -382,7 +382,6 @@ pub struct AgentResult {
/// Run an agent. If keys are provided, use them directly (bypassing the
/// agent's query). Otherwise, run the query to select target nodes.
pub async fn run_one_agent(
store: &mut Store,
agent_name: &str,
count: usize,
keys: Option<&[String]>,
@ -413,11 +412,7 @@ pub async fn run_one_agent(
phase: step.phase.clone(),
});
}
let batch = prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys };
if !batch.node_keys.is_empty() {
store.record_agent_visits(&batch.node_keys, agent_name).ok();
}
batch
prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }
} else {
let effective_count = def.count.unwrap_or(count);
defs::run_agent(&def, effective_count, &Default::default()).await?

View file

@ -39,23 +39,18 @@ pub async fn cmd_run_agent(agent: &str, count: usize, target: &[String], query:
vec![] // use agent's built-in query
};
let arc = memory::access_local()?;
if !resolved_targets.is_empty() {
for (i, key) in resolved_targets.iter().enumerate() {
println!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key);
let mut store = arc.lock().await;
if let Err(e) = crate::agent::oneshot::run_one_agent(
&mut store, agent, count, Some(&[key.clone()]),
agent, count, Some(&[key.clone()]),
).await {
println!("[{}] ERROR on {}: {}", agent, key, e);
}
}
} else {
// Local execution (--local, --debug, dry-run, or daemon unavailable)
let mut store = arc.lock().await;
crate::agent::oneshot::run_one_agent(
&mut store, agent, count, None,
agent, count, None,
).await.map_err(|e| anyhow::anyhow!("{}", e))?;
}
Ok(())

View file

@ -148,8 +148,6 @@ pub enum Filter {
Age(Cmp), // vs now - timestamp (seconds)
ContentLen(Cmp),
Provenance(String),
NotVisited { agent: String, duration: i64 }, // seconds
Visited { agent: String },
Negated(Box<Filter>),
}
@ -185,8 +183,6 @@ pub enum ScoreField {
Weight,
ContentLen,
Priority,
/// Time since last visit by named agent. 1.0 = never visited, decays toward 0.
Recency(String),
}
/// Numeric comparison operator.
@ -243,17 +239,6 @@ fn score_field(
// Priority is already roughly 0-1 from the scoring function
p.min(1.0)
}
ScoreField::Recency(agent) => {
let last = store.last_visited(key, agent);
if last == 0 {
1.0 // never visited = highest recency score
} else {
let age = (crate::store::now_epoch() - last) as f64;
// Sigmoid decay: 1.0 at 7+ days, ~0.5 at 1 day, ~0.1 at 1 hour
let hours = age / 3600.0;
1.0 - (-0.03 * hours).exp()
}
}
}
}
@ -306,8 +291,6 @@ impl fmt::Display for Filter {
Filter::Age(c) => write!(f, "age:{}", c),
Filter::ContentLen(c) => write!(f, "content-len:{}", c),
Filter::Provenance(p) => write!(f, "provenance:{}", p),
Filter::NotVisited { agent, duration } => write!(f, "not-visited:{},{}s", agent, duration),
Filter::Visited { agent } => write!(f, "visited:{}", agent),
Filter::Negated(inner) => write!(f, "!{}", inner),
}
}
@ -441,13 +424,6 @@ pub fn eval_filter(filt: &Filter, key: &str, store: &Store, now: i64) -> bool {
}
Filter::ContentLen(cmp) => cmp.matches(node.content.len() as f64),
Filter::Provenance(p) => node.provenance == *p,
Filter::NotVisited { agent, duration } => {
let last = store.last_visited(key, agent);
last == 0 || (now - last) > *duration
}
Filter::Visited { agent } => {
store.last_visited(key, agent) > 0
}
Filter::Negated(inner) => !eval_filter(inner, key, store, now),
}
}

View file

@ -100,8 +100,6 @@ peg::parser! {
/ "key:" g:glob_pattern() { Stage::Filter(Filter::KeyGlob(g)) }
/ "!key:" g:glob_pattern() { Stage::Filter(Filter::Negated(Box::new(Filter::KeyGlob(g)))) }
/ "provenance:" p:ident() { Stage::Filter(Filter::Provenance(p)) }
/ "not-visited:" a:ident() "," d:integer() { Stage::Filter(Filter::NotVisited { agent: a, duration: d as i64 }) }
/ "visited:" a:ident() { Stage::Filter(Filter::Visited { agent: a }) }
/ "all" { Stage::Generator(Generator::All) }
// Graph algorithms
/ "spread" { Stage::Algorithm(AlgoStage { algo: Algorithm::Spread, params: std::collections::HashMap::new() }) }
@ -123,8 +121,7 @@ peg::parser! {
/ f:field() { make_sort_field(&f, false) }
rule score_term() -> (ScoreField, f64)
= "recency(" a:ident() ")" "*" w:number() { (ScoreField::Recency(a), w) }
/ f:score_field_name() "*" w:number() { (f, w) }
= f:score_field_name() "*" w:number() { (f, w) }
rule score_field_name() -> ScoreField
= "isolation" { ScoreField::Isolation }

View file

@ -4,8 +4,6 @@
// Tables:
// nodes: key → Node (JSON serialized)
// uuid_to_key: [u8;16] → key
// visits: (node_key, agent) → timestamp
// transcript_progress: (transcript_id, segment_idx, agent) → timestamp
//
// Relations stay in-memory for now (frequently iterated in full).
@ -17,9 +15,6 @@ use std::path::Path;
// Table definitions
pub const NODES: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes");
pub const UUID_TO_KEY: TableDefinition<&[u8], &str> = TableDefinition::new("uuid_to_key");
pub const VISITS: TableDefinition<(&str, &str), i64> = TableDefinition::new("visits");
pub const TRANSCRIPT_PROGRESS: TableDefinition<(&str, u32, &str), i64> =
TableDefinition::new("transcript_progress");
/// Open or create the redb database, ensuring all tables exist.
pub fn open_db(path: &Path) -> Result<Database> {
@ -31,8 +26,6 @@ pub fn open_db(path: &Path) -> Result<Database> {
{
let _ = txn.open_table(NODES)?;
let _ = txn.open_table(UUID_TO_KEY)?;
let _ = txn.open_table(VISITS)?;
let _ = txn.open_table(TRANSCRIPT_PROGRESS)?;
}
txn.commit()?;
@ -63,29 +56,6 @@ pub fn rebuild_from_store(path: &Path, store: &Store) -> Result<Database> {
}
}
{
let mut visits_table = txn.open_table(VISITS)?;
for (node_key, agents) in &store.visits {
for (agent, &timestamp) in agents {
visits_table.insert((node_key.as_str(), agent.as_str()), timestamp)?;
}
}
}
{
let mut tp_table = txn.open_table(TRANSCRIPT_PROGRESS)?;
for ((transcript_id, segment_idx), agents) in &store.transcript_progress {
for agent in agents {
tp_table.insert(
(transcript_id.as_str(), *segment_idx, agent.as_str()),
now_epoch(),
)?;
}
}
}
txn.commit()?;
Ok(db)
}
@ -147,25 +117,3 @@ pub fn delete_node(db: &Database, key: &str, uuid: &[u8; 16]) -> Result<()> {
Ok(())
}
/// Record a visit in redb.
pub fn record_visit(db: &Database, node_key: &str, agent: &str, timestamp: i64) -> Result<()> {
let txn = db.begin_write()?;
{
let mut table = txn.open_table(VISITS)?;
table.insert((node_key, agent), timestamp)?;
}
txn.commit()?;
Ok(())
}
/// Get last visit timestamp for a node/agent pair.
pub fn get_last_visit(db: &Database, node_key: &str, agent: &str) -> Result<i64> {
let txn = db.begin_read()?;
let table = txn.open_table(VISITS)?;
match table.get((node_key, agent))? {
Some(ts) => Ok(ts.value()),
None => Ok(0),
}
}

View file

@ -30,14 +30,6 @@ impl Store {
if rels_p.exists() {
store.replay_relations(&rels_p)?;
}
let visits_p = visits_path();
if visits_p.exists() {
store.replay_visits(&visits_p)?;
}
let tp_p = transcript_progress_path();
if tp_p.exists() {
store.replay_transcript_progress(&tp_p)?;
}
// Record log sizes after replay
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
@ -330,173 +322,6 @@ impl Store {
Ok(())
}
/// Append agent visit records to the visits log.
pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<()> {
if visits.is_empty() { return Ok(()); }
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::agent_visit_log::Builder>();
let mut list = log.init_visits(visits.len() as u32);
for (i, visit) in visits.iter().enumerate() {
visit.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize visits"))?;
let path = visits_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.with_context(|| format!("write visits"))?;
// Update in-memory index
for v in visits {
self.visits
.entry(v.node_key.clone())
.or_default()
.insert(v.agent.clone(), v.timestamp);
}
Ok(())
}
/// Replay visits log to rebuild in-memory index.
fn replay_visits(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
while reader.stream_position()?
< fs::metadata(path)?.len()
{
let msg = match serialize::read_message(&mut reader, Default::default()) {
Ok(m) => m,
Err(_) => break,
};
let log = msg.get_root::<memory_capnp::agent_visit_log::Reader>()
.with_context(|| format!("read visit log"))?;
for visit in log.get_visits()? {
let key = visit.get_node_key().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let agent = visit.get_agent().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let ts = visit.get_timestamp();
if !key.is_empty() && !agent.is_empty() {
let entry = self.visits.entry(key).or_default();
// Keep latest timestamp per agent
let existing = entry.entry(agent).or_insert(0);
if ts > *existing {
*existing = ts;
}
}
}
}
Ok(())
}
/// Append transcript segment progress records.
pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<()> {
if segments.is_empty() { return Ok(()); }
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::transcript_progress_log::Builder>();
let mut list = log.init_segments(segments.len() as u32);
for (i, seg) in segments.iter().enumerate() {
seg.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize transcript progress"))?;
let path = transcript_progress_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.with_context(|| format!("write transcript progress"))?;
// Update in-memory index
for seg in segments {
self.transcript_progress
.entry((seg.transcript_id.clone(), seg.segment_index))
.or_default()
.insert(seg.agent.clone());
}
Ok(())
}
/// Replay transcript progress log to rebuild in-memory index.
fn replay_transcript_progress(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
while reader.stream_position()?
< fs::metadata(path)?.len()
{
let msg = match serialize::read_message(&mut reader, Default::default()) {
Ok(m) => m,
Err(_) => break,
};
let log = msg.get_root::<memory_capnp::transcript_progress_log::Reader>()
.with_context(|| format!("read transcript progress"))?;
for seg in log.get_segments()? {
let id = seg.get_transcript_id().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let agent = seg.get_agent().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let idx = seg.get_segment_index();
if !id.is_empty() && !agent.is_empty() {
self.transcript_progress
.entry((id, idx))
.or_default()
.insert(agent);
}
}
}
Ok(())
}
/// Record visits for a batch of node keys from a successful agent run.
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<()> {
let visits: Vec<AgentVisit> = node_keys.iter()
.filter_map(|key| {
let node = self.nodes.get(key)?;
Some(new_visit(node.uuid, key, agent, "processed"))
})
.collect();
self.append_visits(&visits)
}
/// Get the last time an agent visited a node. Returns 0 if never visited.
pub fn last_visited(&self, node_key: &str, agent: &str) -> i64 {
self.visits.get(node_key)
.and_then(|agents| agents.get(agent))
.copied()
.unwrap_or(0)
}
/// Placeholder - indices will be updated on write with redb.
pub fn save(&self) -> Result<()> {
Ok(())

View file

@ -9,7 +9,7 @@ use anyhow::{anyhow, bail, Context, Result};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fs;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
@ -196,11 +196,9 @@ pub struct Node {
pub emotion: f32,
pub deleted: bool,
pub source_ref: String,
pub created: String,
pub retrievals: u32,
pub uses: u32,
pub wrongs: u32,
pub state_tag: String,
pub last_replayed: i64,
pub spaced_repetition_interval: u32,
@ -262,7 +260,7 @@ capnp_enum!(RelationType, memory_capnp::RelationType,
capnp_message!(Node,
reader: memory_capnp::content_node::Reader<'_>,
builder: memory_capnp::content_node::Builder<'_>,
text: [key, content, source_ref, created, state_tag, provenance],
text: [key, content, source_ref, provenance],
uuid: [uuid],
prim: [version, timestamp, weight, emotion, deleted,
retrievals, uses, wrongs, last_replayed,
@ -335,18 +333,11 @@ impl Relation {
}
}
/// Per-node agent visit index: node_key → (agent_type → last_visit_timestamp)
pub(super) type VisitIndex = HashMap<String, HashMap<String, i64>>;
// The full in-memory store
pub struct Store {
pub nodes: HashMap<String, Node>, // key → latest node
pub uuid_to_key: HashMap<[u8; 16], String>, // uuid → key (rebuilt from nodes)
pub relations: Vec<Relation>, // all active relations
/// Agent visit tracking: node_key → (agent_type → last_visit_epoch)
pub visits: VisitIndex,
/// Transcript mining progress: (transcript_id, segment_index) → set of agents that processed it
pub transcript_progress: HashMap<(String, u32), HashSet<String>>,
/// Log sizes at load time — used for staleness detection.
pub(crate) loaded_nodes_size: u64,
pub(crate) loaded_rels_size: u64,
@ -360,8 +351,6 @@ impl Default for Store {
nodes: HashMap::new(),
uuid_to_key: HashMap::new(),
relations: Vec::new(),
visits: HashMap::new(),
transcript_progress: HashMap::new(),
loaded_nodes_size: 0,
loaded_rels_size: 0,
db: None,
@ -403,11 +392,9 @@ pub fn new_node(key: &str, content: &str) -> Node {
emotion: 0.0,
deleted: false,
source_ref: String::new(),
created: today(),
retrievals: 0,
uses: 0,
wrongs: 0,
state_tag: String::new(),
last_replayed: 0,
spaced_repetition_interval: 1,
created_at: now_epoch(),
@ -418,59 +405,6 @@ pub fn new_node(key: &str, content: &str) -> Node {
}
}
/// Agent visit record — tracks when an agent successfully processed a node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AgentVisit {
pub node_uuid: [u8; 16],
pub node_key: String,
pub agent: String,
pub timestamp: i64,
pub outcome: String,
}
capnp_message!(AgentVisit,
reader: memory_capnp::agent_visit::Reader<'_>,
builder: memory_capnp::agent_visit::Builder<'_>,
text: [node_key, agent, outcome],
uuid: [node_uuid],
prim: [timestamp],
enm: [],
skip: [],
);
pub(super) fn new_visit(node_uuid: [u8; 16], node_key: &str, agent: &str, outcome: &str) -> AgentVisit {
AgentVisit {
node_uuid,
node_key: node_key.to_string(),
agent: agent.to_string(),
timestamp: now_epoch(),
outcome: outcome.to_string(),
}
}
pub(crate) fn visits_path() -> PathBuf { memory_dir().join("visits.capnp") }
/// Transcript mining progress — tracks which segments have been processed
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TranscriptSegment {
pub transcript_id: String,
pub segment_index: u32,
pub agent: String,
pub timestamp: i64,
}
capnp_message!(TranscriptSegment,
reader: memory_capnp::transcript_segment::Reader<'_>,
builder: memory_capnp::transcript_segment::Builder<'_>,
text: [transcript_id, agent],
uuid: [],
prim: [segment_index, timestamp],
enm: [],
skip: [],
);
pub(crate) fn transcript_progress_path() -> PathBuf { memory_dir().join("transcript-progress.capnp") }
/// Create a new relation.
/// Provenance is set from POC_PROVENANCE env var if present, else "manual".
pub fn new_relation(

View file

@ -12,7 +12,6 @@ use futures::FutureExt;
use crate::agent::oneshot::{AutoAgent, AutoStep, RunStats};
use crate::agent::tools;
use crate::subconscious::defs;
use crate::hippocampus::access_local;
fn config_path() -> std::path::PathBuf {
dirs::home_dir().unwrap_or_default()
@ -254,12 +253,6 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result<SpawnResul
None => return Err(auto),
};
// Run query and resolve placeholders
let store_arc = match access_local() {
Ok(s) => s,
Err(_) => return Err(auto),
};
let exclude: std::collections::HashSet<String> = std::collections::HashSet::new();
let batch = match defs::run_agent(
&def, def.count.unwrap_or(5), &exclude,
@ -271,11 +264,6 @@ pub async fn prepare_spawn(name: &str, mut auto: AutoAgent) -> Result<SpawnResul
}
};
if !batch.node_keys.is_empty() {
let mut store = store_arc.lock().await;
store.record_agent_visits(&batch.node_keys, name).ok();
}
let orig_steps = std::mem::replace(&mut auto.steps,
batch.steps.iter().map(|s| AutoStep {
prompt: s.prompt.clone(),