restructure: hippocampus/ for memory, subconscious/ for agents

hippocampus/ — memory storage, retrieval, and consolidation:
  store, graph, query, similarity, spectral, neuro, counters,
  config, transcript, memory_search, lookups, cursor, migrate

subconscious/ — autonomous agents that process without being asked:
  reflect, surface, consolidate, digest, audit, etc.

All existing crate::X paths preserved via re-exports in lib.rs.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
ProofOfConcept 2026-03-25 01:04:13 -04:00 committed by Kent Overstreet
parent cfed85bd20
commit d5c0e86700
39 changed files with 87 additions and 32 deletions

View file

@ -0,0 +1,348 @@
// Append-only Cap'n Proto storage + derived KV cache
//
// Two log files are source of truth:
// nodes.capnp - ContentNode messages
// relations.capnp - Relation messages
//
// The Store struct is the derived cache: latest version per UUID,
// rebuilt from logs when stale. Three-tier load strategy:
// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize
// 2. bincode cache (state.bin) — ~10ms
// 3. capnp log replay — ~40ms
// Staleness: log file sizes embedded in cache headers.
//
// Module layout:
// types.rs — Node, Relation, enums, capnp macros, path helpers
// parse.rs — markdown → MemoryUnit parsing
// view.rs — zero-copy read-only access (StoreView, MmapView)
// persist.rs — load, save, replay, append, snapshot (all disk IO)
// ops.rs — mutations (upsert, delete, decay, cap_degree, etc.)
// mod.rs — re-exports, key resolution, ingestion, rendering
mod types;
mod parse;
mod view;
mod persist;
mod ops;
// Re-export everything callers need
pub use types::{
memory_dir, nodes_path,
now_epoch, epoch_to_local, format_date, format_datetime, format_datetime_space, compact_timestamp, today,
Node, Relation, NodeType, Provenance, RelationType,
RetrievalEvent, Params, GapRecord, Store,
new_node, new_relation,
};
pub use parse::{MemoryUnit, parse_units};
pub use view::{StoreView, AnyView};
pub use persist::fsck;
pub use persist::strip_md_keys;
pub use ops::TASK_PROVENANCE;
use crate::graph::{self, Graph};
use std::fs;
use std::io::Write as IoWrite;
use std::path::Path;
use parse::classify_filename;
/// Strip .md suffix from a key, handling both bare keys and section keys.
/// "journal.md#j-2026" → "journal#j-2026", "identity.md" → "identity", "identity" → "identity"
pub fn strip_md_suffix(key: &str) -> String {
if let Some((file, section)) = key.split_once('#') {
let bare = file.strip_suffix(".md").unwrap_or(file);
format!("{}#{}", bare, section)
} else {
key.strip_suffix(".md").unwrap_or(key).to_string()
}
}
impl Store {
pub fn build_graph(&self) -> Graph {
graph::build_graph(self)
}
pub fn resolve_key(&self, target: &str) -> Result<String, String> {
// Strip .md suffix if present — keys no longer use it
let bare = strip_md_suffix(target);
if self.nodes.contains_key(&bare) {
return Ok(bare);
}
let matches: Vec<_> = self.nodes.keys()
.filter(|k| k.to_lowercase().contains(&target.to_lowercase()))
.cloned().collect();
match matches.len() {
0 => Err(format!("No entry for '{}'. Run 'init'?", target)),
1 => Ok(matches[0].clone()),
n if n <= 10 => {
let list = matches.join("\n ");
Err(format!("Ambiguous '{}'. Matches:\n {}", target, list))
}
n => Err(format!("Too many matches for '{}' ({}). Be more specific.", target, n)),
}
}
/// Resolve a link target to (key, uuid).
fn resolve_node_uuid(&self, target: &str) -> Option<(String, [u8; 16])> {
let bare = strip_md_suffix(target);
let n = self.nodes.get(&bare)?;
Some((bare, n.uuid))
}
/// Append retrieval event to retrieval.log without needing a Store instance.
pub fn log_retrieval_static(query: &str, results: &[String]) {
let path = memory_dir().join("retrieval.log");
let line = format!("[{}] q=\"{}\" hits={}\n", today(), query, results.len());
if let Ok(mut f) = fs::OpenOptions::new()
.create(true).append(true).open(&path) {
let _ = f.write_all(line.as_bytes());
}
}
/// Scan markdown files and index all memory units
pub fn init_from_markdown(&mut self) -> Result<usize, String> {
let dir = memory_dir();
let mut count = 0;
if dir.exists() {
// Build edge set for O(1) dedup during ingestion
let mut edge_set = self.build_edge_set();
count = self.scan_dir_for_init(&dir, &mut edge_set)?;
}
Ok(count)
}
/// Build a HashSet of existing (source, target) UUID pairs for O(1) dedup.
fn build_edge_set(&self) -> std::collections::HashSet<([u8; 16], [u8; 16])> {
let mut set = std::collections::HashSet::with_capacity(self.relations.len() * 2);
for r in &self.relations {
set.insert((r.source, r.target));
set.insert((r.target, r.source));
}
set
}
fn scan_dir_for_init(
&mut self,
dir: &Path,
edge_set: &mut std::collections::HashSet<([u8; 16], [u8; 16])>,
) -> Result<usize, String> {
let mut count = 0;
let entries = fs::read_dir(dir)
.map_err(|e| format!("read dir {}: {}", dir.display(), e))?;
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
count += self.scan_dir_for_init(&path, edge_set)?;
continue;
}
let Some(ext) = path.extension() else { continue };
if ext != "md" { continue }
let filename = path.file_name().unwrap().to_string_lossy().to_string();
let content = fs::read_to_string(&path)
.map_err(|e| format!("read {}: {}", path.display(), e))?;
let units = parse_units(&filename, &content);
let (new_count, _) = self.ingest_units(&units, &filename)?;
count += new_count;
// Create relations from links
let mut new_relations = Vec::new();
for unit in &units {
let source_uuid = match self.nodes.get(&unit.key) {
Some(n) => n.uuid,
None => continue,
};
for link in unit.marker_links.iter().chain(unit.md_links.iter()) {
let Some((key, uuid)) = self.resolve_node_uuid(link) else { continue };
if !edge_set.contains(&(source_uuid, uuid)) {
edge_set.insert((source_uuid, uuid));
edge_set.insert((uuid, source_uuid));
new_relations.push(new_relation(
source_uuid, uuid, RelationType::Link, 1.0,
&unit.key, &key,
));
}
}
for cause in &unit.causes {
let Some((key, uuid)) = self.resolve_node_uuid(cause) else { continue };
if !edge_set.contains(&(uuid, source_uuid)) {
edge_set.insert((uuid, source_uuid));
new_relations.push(new_relation(
uuid, source_uuid, RelationType::Causal, 1.0,
&key, &unit.key,
));
}
}
}
if !new_relations.is_empty() {
self.append_relations(&new_relations)?;
self.relations.extend(new_relations);
}
}
Ok(count)
}
/// Process parsed memory units: diff against existing nodes, persist changes.
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
fn ingest_units(&mut self, units: &[MemoryUnit], filename: &str) -> Result<(usize, usize), String> {
let _lock = types::StoreLock::acquire()?;
self.refresh_nodes()?;
let node_type = classify_filename(filename);
let mut new_nodes = Vec::new();
let mut updated_nodes = Vec::new();
for (pos, unit) in units.iter().enumerate() {
if let Some(existing) = self.nodes.get(&unit.key) {
if existing.content != unit.content || existing.position != pos as u32 {
let mut node = existing.clone();
node.content = unit.content.clone();
node.position = pos as u32;
node.version += 1;
if let Some(ref s) = unit.state { node.state_tag = s.clone(); }
if let Some(ref s) = unit.source_ref { node.source_ref = s.clone(); }
updated_nodes.push(node);
}
} else {
let mut node = new_node(&unit.key, &unit.content);
node.node_type = node_type;
node.position = pos as u32;
if let Some(ref s) = unit.state { node.state_tag = s.clone(); }
if let Some(ref s) = unit.source_ref { node.source_ref = s.clone(); }
new_nodes.push(node);
}
}
if !new_nodes.is_empty() {
self.append_nodes_unlocked(&new_nodes)?;
for node in &new_nodes {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node.clone());
}
}
if !updated_nodes.is_empty() {
self.append_nodes_unlocked(&updated_nodes)?;
for node in &updated_nodes {
self.nodes.insert(node.key.clone(), node.clone());
}
}
Ok((new_nodes.len(), updated_nodes.len()))
}
/// Import a markdown file into the store, parsing it into nodes.
pub fn import_file(&mut self, path: &Path) -> Result<(usize, usize), String> {
let filename = path.file_name().unwrap().to_string_lossy().to_string();
let content = fs::read_to_string(path)
.map_err(|e| format!("read {}: {}", path.display(), e))?;
let units = parse_units(&filename, &content);
self.ingest_units(&units, &filename)
}
/// Gather all sections for a file key, sorted by position.
pub fn file_sections(&self, file_key: &str) -> Option<Vec<&Node>> {
let prefix = format!("{}#", file_key);
let mut sections: Vec<_> = self.nodes.values()
.filter(|n| n.key == file_key || n.key.starts_with(&prefix))
.collect();
if sections.is_empty() {
return None;
}
sections.sort_by_key(|n| n.position);
Some(sections)
}
/// Render a file key as plain content (no mem markers).
pub fn render_file(&self, file_key: &str) -> Option<String> {
let sections = self.file_sections(file_key)?;
let mut output = String::new();
for node in &sections {
output.push_str(&node.content);
if !node.content.ends_with('\n') {
output.push('\n');
}
output.push('\n');
}
Some(output.trim_end().to_string())
}
/// Render a file key back to markdown with reconstituted mem markers.
pub fn export_to_markdown(&self, file_key: &str) -> Option<String> {
let sections = self.file_sections(file_key)?;
let mut output = String::new();
for node in &sections {
if node.key.contains('#') {
let section_id = node.key.rsplit_once('#').map_or("", |(_, s)| s);
let links: Vec<_> = self.relations.iter()
.filter(|r| r.source_key == node.key && !r.deleted
&& r.rel_type != RelationType::Causal)
.map(|r| r.target_key.clone())
.collect();
let causes: Vec<_> = self.relations.iter()
.filter(|r| r.target_key == node.key && !r.deleted
&& r.rel_type == RelationType::Causal)
.map(|r| r.source_key.clone())
.collect();
let mut marker_parts = vec![format!("id={}", section_id)];
if !links.is_empty() {
marker_parts.push(format!("links={}", links.join(",")));
}
if !causes.is_empty() {
marker_parts.push(format!("causes={}", causes.join(",")));
}
output.push_str(&format!("<!-- mem: {} -->\n", marker_parts.join(" ")));
}
output.push_str(&node.content);
if !node.content.ends_with('\n') {
output.push('\n');
}
output.push('\n');
}
Some(output.trim_end().to_string())
}
/// Find the episodic node that best matches the given entry text.
pub fn find_journal_node(&self, entry_text: &str) -> Option<String> {
if entry_text.is_empty() {
return None;
}
let words: Vec<&str> = entry_text.split_whitespace()
.filter(|w| w.len() > 5)
.take(5)
.collect();
let mut best_key = None;
let mut best_score = 0;
for (key, node) in &self.nodes {
if node.node_type != NodeType::EpisodicSession {
continue;
}
let content_lower = node.content.to_lowercase();
let score: usize = words.iter()
.filter(|w| content_lower.contains(&w.to_lowercase()))
.count();
if score > best_score {
best_score = score;
best_key = Some(key.clone());
}
}
best_key
}
}

View file

@ -0,0 +1,328 @@
// Mutation operations on the store
//
// CRUD (upsert, delete, modify), feedback tracking (mark_used, mark_wrong),
// maintenance (decay, fix_categories, cap_degree), and graph metrics.
use super::types::*;
use std::collections::{HashMap, HashSet};
tokio::task_local! {
/// Task-scoped provenance for agent writes. Set by the daemon before
/// running an agent's tool calls, so all writes within that task are
/// automatically attributed to the agent.
pub static TASK_PROVENANCE: String;
}
/// Provenance priority: task_local (agent context) > env var > "manual".
fn current_provenance() -> String {
TASK_PROVENANCE.try_with(|p| p.clone())
.or_else(|_| std::env::var("POC_PROVENANCE").map_err(|_| ()))
.unwrap_or_else(|_| "manual".to_string())
}
impl Store {
/// Add or update a node (appends to log + updates cache).
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
if let Some(existing) = self.nodes.get(&node.key) {
node.uuid = existing.uuid;
node.version = existing.version + 1;
}
self.append_nodes_unlocked(&[node.clone()])?;
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node);
Ok(())
}
/// Add a relation (appends to log + updates cache)
pub fn add_relation(&mut self, rel: Relation) -> Result<(), String> {
self.append_relations(std::slice::from_ref(&rel))?;
self.relations.push(rel);
Ok(())
}
/// Upsert a node: update if exists (and content changed), create if not.
/// Returns: "created", "updated", or "unchanged".
///
/// Provenance is determined by the POC_PROVENANCE env var if set,
/// otherwise defaults to Manual.
pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str, String> {
let prov = current_provenance();
self.upsert_provenance(key, content, &prov)
}
/// Upsert with explicit provenance (for agent-created nodes).
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str, String> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
if let Some(existing) = self.nodes.get(key) {
if existing.content == content {
return Ok("unchanged");
}
let mut node = existing.clone();
node.content = content.to_string();
node.provenance = provenance.to_string();
node.timestamp = now_epoch();
node.version += 1;
self.append_nodes_unlocked(std::slice::from_ref(&node))?;
self.nodes.insert(key.to_string(), node);
Ok("updated")
} else {
let mut node = new_node(key, content);
node.provenance = provenance.to_string();
self.append_nodes_unlocked(std::slice::from_ref(&node))?;
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(key.to_string(), node);
Ok("created")
}
}
/// Soft-delete a node (appends deleted version, removes from cache).
/// Holds StoreLock across refresh + write to see concurrent creates.
pub fn delete_node(&mut self, key: &str) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
let prov = current_provenance();
let node = self.nodes.get(key)
.ok_or_else(|| format!("No node '{}'", key))?;
let mut deleted = node.clone();
deleted.deleted = true;
deleted.version += 1;
deleted.provenance = prov;
deleted.timestamp = now_epoch();
self.append_nodes_unlocked(std::slice::from_ref(&deleted))?;
self.nodes.remove(key);
Ok(())
}
/// Rename a node: change its key, update debug strings on all edges.
///
/// Graph edges (source/target UUIDs) are unaffected — they're already
/// UUID-based. We update the human-readable source_key/target_key strings
/// on relations, and created_at is preserved untouched.
///
/// Appends: (new_key, v+1) + (old_key, deleted, v+1) + updated relations.
/// Holds StoreLock across refresh + write to prevent races.
pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<(), String> {
if old_key == new_key {
return Ok(());
}
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
if self.nodes.contains_key(new_key) {
return Err(format!("Key '{}' already exists", new_key));
}
let node = self.nodes.get(old_key)
.ok_or_else(|| format!("No node '{}'", old_key))?
.clone();
let prov = current_provenance();
// New version under the new key
let mut renamed = node.clone();
renamed.key = new_key.to_string();
renamed.version += 1;
renamed.provenance = prov.clone();
renamed.timestamp = now_epoch();
// Deletion record for the old key (same UUID, independent version counter)
let mut tombstone = node.clone();
tombstone.deleted = true;
tombstone.version += 1;
tombstone.provenance = prov;
tombstone.timestamp = now_epoch();
// Collect affected relations and update their debug key strings
let updated_rels: Vec<_> = self.relations.iter()
.filter(|r| r.source_key == old_key || r.target_key == old_key)
.map(|r| {
let mut r = r.clone();
r.version += 1;
if r.source_key == old_key { r.source_key = new_key.to_string(); }
if r.target_key == old_key { r.target_key = new_key.to_string(); }
r
})
.collect();
// Persist under single lock
self.append_nodes_unlocked(&[renamed.clone(), tombstone])?;
if !updated_rels.is_empty() {
self.append_relations_unlocked(&updated_rels)?;
}
// Update in-memory cache
self.nodes.remove(old_key);
self.uuid_to_key.insert(renamed.uuid, new_key.to_string());
self.nodes.insert(new_key.to_string(), renamed);
for updated in &updated_rels {
if let Some(r) = self.relations.iter_mut().find(|r| r.uuid == updated.uuid) {
r.source_key = updated.source_key.clone();
r.target_key = updated.target_key.clone();
r.version = updated.version;
}
}
Ok(())
}
/// Modify a node in-place, bump version, and persist to capnp log.
fn modify_node(&mut self, key: &str, f: impl FnOnce(&mut Node)) -> Result<(), String> {
let node = self.nodes.get_mut(key)
.ok_or_else(|| format!("No node '{}'", key))?;
f(node);
node.version += 1;
let node = node.clone();
self.append_nodes(&[node])
}
pub fn mark_used(&mut self, key: &str) {
let boost = self.params.use_boost as f32;
let _ = self.modify_node(key, |n| {
n.uses += 1;
n.weight = (n.weight + boost).min(1.0);
if n.spaced_repetition_interval < 30 {
n.spaced_repetition_interval = match n.spaced_repetition_interval {
1 => 3, 3 => 7, 7 => 14, 14 => 30, _ => 30,
};
}
n.last_replayed = now_epoch();
});
}
pub fn mark_wrong(&mut self, key: &str, _ctx: Option<&str>) {
let _ = self.modify_node(key, |n| {
n.wrongs += 1;
n.weight = (n.weight - 0.1).max(0.0);
n.spaced_repetition_interval = 1;
});
}
/// Adjust edge strength between two nodes by a delta.
/// Clamps to [0.05, 0.95]. Returns (old_strength, new_strength, edges_modified).
pub fn adjust_edge_strength(&mut self, key_a: &str, key_b: &str, delta: f32) -> (f32, f32, usize) {
let mut old = 0.0f32;
let mut new = 0.0f32;
let mut count = 0;
for rel in &mut self.relations {
if rel.deleted { continue; }
if (rel.source_key == key_a && rel.target_key == key_b)
|| (rel.source_key == key_b && rel.target_key == key_a)
{
old = rel.strength;
rel.strength = (rel.strength + delta).clamp(0.05, 0.95);
new = rel.strength;
rel.version += 1;
count += 1;
}
}
(old, new, count)
}
pub fn record_gap(&mut self, desc: &str) {
self.gaps.push(GapRecord {
description: desc.to_string(),
timestamp: today(),
});
}
/// Cap node degree by soft-deleting edges from mega-hubs.
pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize), String> {
let mut node_degree: HashMap<String, usize> = HashMap::new();
for rel in &self.relations {
if rel.deleted { continue; }
*node_degree.entry(rel.source_key.clone()).or_default() += 1;
*node_degree.entry(rel.target_key.clone()).or_default() += 1;
}
let mut node_edges: HashMap<String, Vec<usize>> = HashMap::new();
for (i, rel) in self.relations.iter().enumerate() {
if rel.deleted { continue; }
node_edges.entry(rel.source_key.clone()).or_default().push(i);
node_edges.entry(rel.target_key.clone()).or_default().push(i);
}
let mut to_delete: HashSet<usize> = HashSet::new();
let mut hubs_capped = 0;
for (_key, edge_indices) in &node_edges {
let active: Vec<usize> = edge_indices.iter()
.filter(|&&i| !to_delete.contains(&i))
.copied()
.collect();
if active.len() <= max_degree { continue; }
let mut auto_indices: Vec<(usize, f32)> = Vec::new();
let mut link_indices: Vec<(usize, usize)> = Vec::new();
for &i in &active {
let rel = &self.relations[i];
if rel.rel_type == RelationType::Auto {
auto_indices.push((i, rel.strength));
} else {
let other = if &rel.source_key == _key {
&rel.target_key
} else {
&rel.source_key
};
let other_deg = node_degree.get(other).copied().unwrap_or(0);
link_indices.push((i, other_deg));
}
}
let excess = active.len() - max_degree;
auto_indices.sort_by(|a, b| a.1.total_cmp(&b.1));
let auto_prune = excess.min(auto_indices.len());
for &(i, _) in auto_indices.iter().take(auto_prune) {
to_delete.insert(i);
}
let remaining_excess = excess.saturating_sub(auto_prune);
if remaining_excess > 0 {
link_indices.sort_by(|a, b| b.1.cmp(&a.1));
let link_prune = remaining_excess.min(link_indices.len());
for &(i, _) in link_indices.iter().take(link_prune) {
to_delete.insert(i);
}
}
hubs_capped += 1;
}
let mut pruned_rels = Vec::new();
for &i in &to_delete {
self.relations[i].deleted = true;
self.relations[i].version += 1;
pruned_rels.push(self.relations[i].clone());
}
if !pruned_rels.is_empty() {
self.append_relations(&pruned_rels)?;
}
self.relations.retain(|r| !r.deleted);
Ok((hubs_capped, to_delete.len()))
}
/// Update graph-derived fields on all nodes
pub fn update_graph_metrics(&mut self) {
let g = self.build_graph();
let communities = g.communities();
for (key, node) in &mut self.nodes {
node.community_id = communities.get(key).copied();
node.clustering_coefficient = Some(g.clustering_coefficient(key));
node.degree = Some(g.degree(key) as u32);
}
}
}

View file

@ -0,0 +1,173 @@
// Markdown parsing for memory files
//
// Splits markdown files into MemoryUnit structs based on `<!-- mem: ... -->`
// markers. Each marker starts a new section; content before the first marker
// becomes the file-level unit. Links and causal edges are extracted from
// both marker attributes and inline markdown links.
use super::NodeType;
use regex::Regex;
use std::collections::HashMap;
use std::path::Path;
use std::sync::OnceLock;
pub struct MemoryUnit {
pub key: String,
pub content: String,
pub marker_links: Vec<String>,
pub md_links: Vec<String>,
pub causes: Vec<String>,
pub state: Option<String>,
pub source_ref: Option<String>,
}
pub fn classify_filename(filename: &str) -> NodeType {
let bare = filename.strip_suffix(".md").unwrap_or(filename);
if bare.starts_with("daily-") { NodeType::EpisodicDaily }
else if bare.starts_with("weekly-") { NodeType::EpisodicWeekly }
else if bare.starts_with("monthly-") { NodeType::EpisodicMonthly }
else if bare == "journal" { NodeType::EpisodicSession }
else { NodeType::Semantic }
}
pub fn parse_units(raw_filename: &str, content: &str) -> Vec<MemoryUnit> {
let filename = raw_filename.strip_suffix(".md").unwrap_or(raw_filename);
static MARKER_RE: OnceLock<Regex> = OnceLock::new();
static SOURCE_RE: OnceLock<Regex> = OnceLock::new();
static MD_LINK_RE: OnceLock<Regex> = OnceLock::new();
let marker_re = MARKER_RE.get_or_init(||
Regex::new(r"<!--\s*mem:\s*((?:id|links|tags|causes|state)\s*=\s*[^\s].*?)-->").unwrap());
let source_re = SOURCE_RE.get_or_init(||
Regex::new(r"<!--\s*source:\s*(.+?)\s*-->").unwrap());
let md_link_re = MD_LINK_RE.get_or_init(||
Regex::new(r"\[[^\]]*\]\(([^):]+(?:#[^)]*)?)\)").unwrap());
let markers: Vec<_> = marker_re.captures_iter(content)
.map(|cap| {
let full_match = cap.get(0).unwrap();
let attrs_str = &cap[1];
(full_match.start(), full_match.end(), parse_marker_attrs(attrs_str))
})
.collect();
let find_source = |text: &str| -> Option<String> {
source_re.captures(text).map(|c| c[1].trim().to_string())
};
if markers.is_empty() {
let source_ref = find_source(content);
let md_links = extract_md_links(content, md_link_re, filename);
return vec![MemoryUnit {
key: filename.to_string(),
content: content.to_string(),
marker_links: Vec::new(),
md_links,
causes: Vec::new(),
state: None,
source_ref,
}];
}
let mut units = Vec::new();
let first_start = markers[0].0;
let pre_content = content[..first_start].trim();
if !pre_content.is_empty() {
let source_ref = find_source(pre_content);
let md_links = extract_md_links(pre_content, md_link_re, filename);
units.push(MemoryUnit {
key: filename.to_string(),
content: pre_content.to_string(),
marker_links: Vec::new(),
md_links,
causes: Vec::new(),
state: None,
source_ref,
});
}
for (i, (_, end, attrs)) in markers.iter().enumerate() {
let unit_end = if i + 1 < markers.len() {
markers[i + 1].0
} else {
content.len()
};
let unit_content = content[*end..unit_end].trim();
let id = attrs.get("id").cloned().unwrap_or_default();
let key = if id.is_empty() {
format!("{}#unnamed-{}", filename, i)
} else {
format!("{}#{}", filename, id)
};
let marker_links = attrs.get("links")
.map(|l| l.split(',').map(|s| normalize_link(s.trim(), filename)).collect())
.unwrap_or_default();
let causes = attrs.get("causes")
.map(|l| l.split(',').map(|s| normalize_link(s.trim(), filename)).collect())
.unwrap_or_default();
let state = attrs.get("state").cloned();
let source_ref = find_source(unit_content);
let md_links = extract_md_links(unit_content, md_link_re, filename);
units.push(MemoryUnit {
key,
content: unit_content.to_string(),
marker_links,
md_links,
causes,
state,
source_ref,
});
}
units
}
fn parse_marker_attrs(attrs_str: &str) -> HashMap<String, String> {
static ATTR_RE: OnceLock<Regex> = OnceLock::new();
let attr_re = ATTR_RE.get_or_init(|| Regex::new(r"(\w+)\s*=\s*(\S+)").unwrap());
let mut attrs = HashMap::new();
for cap in attr_re.captures_iter(attrs_str) {
attrs.insert(cap[1].to_string(), cap[2].to_string());
}
attrs
}
fn extract_md_links(content: &str, re: &Regex, source_file: &str) -> Vec<String> {
re.captures_iter(content)
.map(|cap| normalize_link(&cap[1], source_file))
.filter(|link| !link.starts_with(source_file) || link.contains('#'))
.collect()
}
pub fn normalize_link(target: &str, source_file: &str) -> String {
let source_bare = source_file.strip_suffix(".md").unwrap_or(source_file);
if target.starts_with('#') {
return format!("{}{}", source_bare, target);
}
let (path_part, fragment) = if let Some(hash_pos) = target.find('#') {
(&target[..hash_pos], Some(&target[hash_pos..]))
} else {
(target, None)
};
let basename = Path::new(path_part)
.file_name()
.map(|f| f.to_string_lossy().to_string())
.unwrap_or_else(|| path_part.to_string());
let bare = basename.strip_suffix(".md").unwrap_or(&basename);
match fragment {
Some(frag) => format!("{}{}", bare, frag),
None => bare.to_string(),
}
}

View file

@ -0,0 +1,939 @@
// Persistence layer: load, save, replay, append, snapshot
//
// Three-tier loading strategy:
// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize
// 2. bincode cache (state.bin) — ~10ms
// 3. capnp log replay — ~40ms
//
// Logs are append-only; cache staleness uses log file sizes, not mtimes.
use super::types::*;
use crate::memory_capnp;
use capnp::message;
use capnp::serialize;
use std::collections::HashMap;
use std::fs;
use std::io::{BufReader, Seek};
use std::path::Path;
impl Store {
/// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs.
///
/// Staleness check uses log file sizes (not mtimes). Since logs are
/// append-only, any write grows the file, invalidating the cache.
/// This avoids the mtime race that caused data loss with concurrent
/// writers (dream loop, link audit, journal enrichment).
pub fn load() -> Result<Store, String> {
// 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy)
match Self::load_snapshot_mmap() {
Ok(Some(mut store)) => {
// rkyv snapshot doesn't include visits — replay from log
let visits_p = visits_path();
if visits_p.exists() {
store.replay_visits(&visits_p).ok();
}
let tp_p = transcript_progress_path();
if tp_p.exists() {
store.replay_transcript_progress(&tp_p).ok();
}
return Ok(store);
},
Ok(None) => {},
Err(e) => eprintln!("rkyv snapshot: {}", e),
}
// 2. Try bincode state.bin cache (~10ms)
let nodes_p = nodes_path();
let rels_p = relations_path();
let state_p = state_path();
let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
if let Ok(data) = fs::read(&state_p)
&& data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC {
let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap());
let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap());
if cached_nodes == nodes_size && cached_rels == rels_size
&& let Ok(mut store) = bincode::deserialize::<Store>(&data[CACHE_HEADER_LEN..]) {
// Rebuild uuid_to_key (skipped by serde)
for (key, node) in &store.nodes {
store.uuid_to_key.insert(node.uuid, key.clone());
}
store.loaded_nodes_size = nodes_size;
store.loaded_rels_size = rels_size;
// Bootstrap: write rkyv snapshot if missing
if !snapshot_path().exists()
&& let Err(e) = store.save_snapshot(cached_nodes, cached_rels) {
eprintln!("rkyv bootstrap: {}", e);
}
return Ok(store);
}
}
// Stale or no cache — rebuild from capnp logs
let mut store = Store::default();
if nodes_p.exists() {
store.replay_nodes(&nodes_p)?;
}
if rels_p.exists() {
store.replay_relations(&rels_p)?;
}
let visits_p = visits_path();
if visits_p.exists() {
store.replay_visits(&visits_p)?;
}
let tp_p = transcript_progress_path();
if tp_p.exists() {
store.replay_transcript_progress(&tp_p)?;
}
// Record log sizes after replay — this is the state we reflect
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
// Drop edges referencing deleted/missing nodes
store.relations.retain(|r|
store.nodes.contains_key(&r.source_key) &&
store.nodes.contains_key(&r.target_key)
);
store.save()?;
Ok(store)
}
/// Load store directly from capnp logs, bypassing all caches.
/// Used by fsck to verify cache consistency.
pub fn load_from_logs() -> Result<Store, String> {
let nodes_p = nodes_path();
let rels_p = relations_path();
let mut store = Store::default();
if nodes_p.exists() {
store.replay_nodes(&nodes_p)?;
}
if rels_p.exists() {
store.replay_relations(&rels_p)?;
}
let visits_p = visits_path();
if visits_p.exists() {
store.replay_visits(&visits_p)?;
}
let tp_p = transcript_progress_path();
if tp_p.exists() {
store.replay_transcript_progress(&tp_p)?;
}
Ok(store)
}
/// Replay node log, keeping latest version per UUID.
/// Tracks all UUIDs seen per key to detect duplicates.
fn replay_nodes(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
// Track all non-deleted UUIDs per key to detect duplicates
let mut key_uuids: HashMap<String, Vec<[u8; 16]>> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
let node = Node::from_capnp_migrate(node_reader)?;
let existing_version = self.nodes.get(&node.key)
.map(|n| n.version)
.unwrap_or(0);
if node.version >= existing_version {
if node.deleted {
self.nodes.remove(&node.key);
self.uuid_to_key.remove(&node.uuid);
if let Some(uuids) = key_uuids.get_mut(&node.key) {
uuids.retain(|u| *u != node.uuid);
}
} else {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node.clone());
let uuids = key_uuids.entry(node.key).or_default();
if !uuids.contains(&node.uuid) {
uuids.push(node.uuid);
}
}
}
}
}
// Report duplicate keys
for (key, uuids) in &key_uuids {
if uuids.len() > 1 {
eprintln!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len());
}
}
Ok(())
}
/// Replay relation log, keeping latest version per UUID
fn replay_relations(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
// Collect all, then deduplicate by UUID keeping latest version
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
.map_err(|e| format!("read relation log: {}", e))?;
for rel_reader in log.get_relations()
.map_err(|e| format!("get relations: {}", e))? {
let rel = Relation::from_capnp_migrate(rel_reader)?;
let existing_version = by_uuid.get(&rel.uuid)
.map(|r| r.version)
.unwrap_or(0);
if rel.version >= existing_version {
by_uuid.insert(rel.uuid, rel);
}
}
}
self.relations = by_uuid.into_values()
.filter(|r| !r.deleted)
.collect();
Ok(())
}
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
/// Returns a map from key → vec of all live Node versions (one per UUID).
/// The "winner" in self.nodes is always one of them.
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>, String> {
let path = nodes_path();
if !path.exists() { return Ok(HashMap::new()); }
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
// Track latest version of each UUID
let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
let node = Node::from_capnp_migrate(node_reader)?;
let dominated = by_uuid.get(&node.uuid)
.map(|n| node.version >= n.version)
.unwrap_or(true);
if dominated {
by_uuid.insert(node.uuid, node);
}
}
}
// Group live (non-deleted) nodes by key
let mut by_key: HashMap<String, Vec<Node>> = HashMap::new();
for node in by_uuid.into_values() {
if !node.deleted {
by_key.entry(node.key.clone()).or_default().push(node);
}
}
// Keep only duplicates
by_key.retain(|_, nodes| nodes.len() > 1);
Ok(by_key)
}
/// Append nodes to the log file.
/// Serializes to a Vec first, then does a single write() syscall
/// so the append is atomic with O_APPEND even without flock.
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
self.append_nodes_unlocked(nodes)
}
/// Append nodes without acquiring the lock. Caller must hold StoreLock.
pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<(), String> {
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
let mut list = log.init_nodes(nodes.len() as u32);
for (i, node) in nodes.iter().enumerate() {
node.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize nodes: {}", e))?;
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write nodes: {}", e))?;
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
/// Replay only new entries appended to the node log since we last loaded.
/// Call under StoreLock to catch writes from concurrent processes.
pub(crate) fn refresh_nodes(&mut self) -> Result<(), String> {
let path = nodes_path();
let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
if current_size <= self.loaded_nodes_size {
return Ok(()); // no new data
}
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size))
.map_err(|e| format!("seek nodes log: {}", e))?;
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log delta: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes delta: {}", e))? {
let node = Node::from_capnp_migrate(node_reader)?;
let dominated = self.nodes.get(&node.key)
.map(|n| node.version >= n.version)
.unwrap_or(true);
if dominated {
if node.deleted {
self.nodes.remove(&node.key);
self.uuid_to_key.remove(&node.uuid);
} else {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node);
}
}
}
}
self.loaded_nodes_size = current_size;
Ok(())
}
/// Append relations to the log file.
/// Single write() syscall for atomic O_APPEND.
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
self.append_relations_unlocked(relations)
}
/// Append relations without acquiring the lock. Caller must hold StoreLock.
pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<(), String> {
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
let mut list = log.init_relations(relations.len() as u32);
for (i, rel) in relations.iter().enumerate() {
rel.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize relations: {}", e))?;
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write relations: {}", e))?;
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
/// Append agent visit records to the visits log.
pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<(), String> {
if visits.is_empty() { return Ok(()); }
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::agent_visit_log::Builder>();
let mut list = log.init_visits(visits.len() as u32);
for (i, visit) in visits.iter().enumerate() {
visit.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize visits: {}", e))?;
let path = visits_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write visits: {}", e))?;
// Update in-memory index
for v in visits {
self.visits
.entry(v.node_key.clone())
.or_default()
.insert(v.agent.clone(), v.timestamp);
}
Ok(())
}
/// Replay visits log to rebuild in-memory index.
fn replay_visits(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
while reader.stream_position().map_err(|e| e.to_string())?
< fs::metadata(path).map_err(|e| e.to_string())?.len()
{
let msg = match serialize::read_message(&mut reader, Default::default()) {
Ok(m) => m,
Err(_) => break,
};
let log = msg.get_root::<memory_capnp::agent_visit_log::Reader>()
.map_err(|e| format!("read visit log: {}", e))?;
for visit in log.get_visits().map_err(|e| e.to_string())? {
let key = visit.get_node_key().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let agent = visit.get_agent().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let ts = visit.get_timestamp();
if !key.is_empty() && !agent.is_empty() {
let entry = self.visits.entry(key).or_default();
// Keep latest timestamp per agent
let existing = entry.entry(agent).or_insert(0);
if ts > *existing {
*existing = ts;
}
}
}
}
Ok(())
}
/// Append transcript segment progress records.
pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<(), String> {
if segments.is_empty() { return Ok(()); }
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::transcript_progress_log::Builder>();
let mut list = log.init_segments(segments.len() as u32);
for (i, seg) in segments.iter().enumerate() {
seg.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize transcript progress: {}", e))?;
let path = transcript_progress_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write transcript progress: {}", e))?;
// Update in-memory index
for seg in segments {
self.transcript_progress
.entry((seg.transcript_id.clone(), seg.segment_index))
.or_default()
.insert(seg.agent.clone());
}
Ok(())
}
/// Replay transcript progress log to rebuild in-memory index.
fn replay_transcript_progress(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
while reader.stream_position().map_err(|e| e.to_string())?
< fs::metadata(path).map_err(|e| e.to_string())?.len()
{
let msg = match serialize::read_message(&mut reader, Default::default()) {
Ok(m) => m,
Err(_) => break,
};
let log = msg.get_root::<memory_capnp::transcript_progress_log::Reader>()
.map_err(|e| format!("read transcript progress: {}", e))?;
for seg in log.get_segments().map_err(|e| e.to_string())? {
let id = seg.get_transcript_id().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let agent = seg.get_agent().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
let idx = seg.get_segment_index();
if !id.is_empty() && !agent.is_empty() {
self.transcript_progress
.entry((id, idx))
.or_default()
.insert(agent);
}
}
}
Ok(())
}
/// Check if a transcript segment has been processed by a given agent.
pub fn is_segment_mined(&self, transcript_id: &str, segment_index: u32, agent: &str) -> bool {
self.transcript_progress
.get(&(transcript_id.to_string(), segment_index))
.is_some_and(|agents| agents.contains(agent))
}
/// Mark a transcript segment as successfully processed.
pub fn mark_segment_mined(&mut self, transcript_id: &str, segment_index: u32, agent: &str) -> Result<(), String> {
let seg = new_transcript_segment(transcript_id, segment_index, agent);
self.append_transcript_progress(&[seg])
}
/// Migrate old stub-node transcript markers into the new progress log.
/// Reads _observed-transcripts-f-*, _mined-transcripts#f-*, and _facts-* keys,
/// extracts transcript_id and segment_index, writes to transcript-progress.capnp,
/// then deletes the stub nodes.
pub fn migrate_transcript_progress(&mut self) -> Result<usize, String> {
let mut segments = Vec::new();
for key in self.nodes.keys() {
// _observed-transcripts-f-{UUID}.{segment}
if let Some(rest) = key.strip_prefix("_observed-transcripts-f-") {
if let Some((uuid, seg_str)) = rest.rsplit_once('.')
&& let Ok(seg) = seg_str.parse::<u32>() {
segments.push(new_transcript_segment(uuid, seg, "observation"));
}
}
// _mined-transcripts#f-{UUID}.{segment}
else if let Some(rest) = key.strip_prefix("_mined-transcripts#f-") {
if let Some((uuid, seg_str)) = rest.rsplit_once('.')
&& let Ok(seg) = seg_str.parse::<u32>() {
segments.push(new_transcript_segment(uuid, seg, "experience"));
}
}
// _mined-transcripts-f-{UUID}.{segment}
else if let Some(rest) = key.strip_prefix("_mined-transcripts-f-") {
if let Some((uuid, seg_str)) = rest.rsplit_once('.')
&& let Ok(seg) = seg_str.parse::<u32>() {
segments.push(new_transcript_segment(uuid, seg, "experience"));
}
}
// _facts-{UUID} (whole-file, segment 0)
else if let Some(uuid) = key.strip_prefix("_facts-") {
if !uuid.contains('-') || uuid.len() < 30 { continue; } // skip non-UUID
segments.push(new_transcript_segment(uuid, 0, "fact"));
}
}
let count = segments.len();
if count > 0 {
self.append_transcript_progress(&segments)?;
}
// Soft-delete the old stub nodes
let keys_to_delete: Vec<String> = self.nodes.keys()
.filter(|k| k.starts_with("_observed-transcripts-")
|| k.starts_with("_mined-transcripts")
|| (k.starts_with("_facts-") && !k.contains("fact_mine")))
.cloned()
.collect();
for key in &keys_to_delete {
if let Some(node) = self.nodes.get_mut(key) {
node.deleted = true;
}
}
if !keys_to_delete.is_empty() {
self.save()?;
}
Ok(count)
}
/// Record visits for a batch of node keys from a successful agent run.
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> {
let visits: Vec<AgentVisit> = node_keys.iter()
.filter_map(|key| {
let node = self.nodes.get(key)?;
Some(new_visit(node.uuid, key, agent, "processed"))
})
.collect();
self.append_visits(&visits)
}
/// Get the last time an agent visited a node. Returns 0 if never visited.
pub fn last_visited(&self, node_key: &str, agent: &str) -> i64 {
self.visits.get(node_key)
.and_then(|agents| agents.get(agent))
.copied()
.unwrap_or(0)
}
/// Save the derived cache with log size header for staleness detection.
/// Uses atomic write (tmp + rename) to prevent partial reads.
pub fn save(&self) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let path = state_path();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).ok();
}
// Use log sizes from load time, not current filesystem sizes.
// If another writer appended since we loaded, our recorded size
// will be smaller than the actual log → next reader detects stale
// cache and replays the (correct, append-only) log.
let nodes_size = self.loaded_nodes_size;
let rels_size = self.loaded_rels_size;
let bincode_data = bincode::serialize(self)
.map_err(|e| format!("bincode serialize: {}", e))?;
let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len());
data.extend_from_slice(&CACHE_MAGIC);
data.extend_from_slice(&nodes_size.to_le_bytes());
data.extend_from_slice(&rels_size.to_le_bytes());
data.extend_from_slice(&bincode_data);
// Atomic write: tmp file + rename
let tmp_path = path.with_extension("bin.tmp");
fs::write(&tmp_path, &data)
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
fs::rename(&tmp_path, &path)
.map_err(|e| format!("rename {}{}: {}", tmp_path.display(), path.display(), e))?;
// Also write rkyv snapshot (mmap-friendly)
if let Err(e) = self.save_snapshot(nodes_size, rels_size) {
eprintln!("rkyv snapshot save: {}", e);
}
Ok(())
}
/// Serialize store as rkyv snapshot with staleness header.
/// Assumes StoreLock is already held by caller.
fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> {
let snap = Snapshot {
nodes: self.nodes.clone(),
relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(),
gaps: self.gaps.clone(),
params: self.params,
};
let rkyv_data = rkyv::to_bytes::<_, 256>(&snap)
.map_err(|e| format!("rkyv serialize: {}", e))?;
let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len());
data.extend_from_slice(&RKYV_MAGIC);
data.extend_from_slice(&1u32.to_le_bytes()); // format version
data.extend_from_slice(&nodes_size.to_le_bytes());
data.extend_from_slice(&rels_size.to_le_bytes());
data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes());
data.extend_from_slice(&rkyv_data);
let path = snapshot_path();
let tmp_path = path.with_extension("rkyv.tmp");
fs::write(&tmp_path, &data)
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
fs::rename(&tmp_path, &path)
.map_err(|e| format!("rename: {}", e))?;
Ok(())
}
/// Try loading store from mmap'd rkyv snapshot.
/// Returns None if snapshot is missing or stale (log sizes don't match).
fn load_snapshot_mmap() -> Result<Option<Store>, String> {
let path = snapshot_path();
if !path.exists() { return Ok(None); }
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mmap = unsafe { memmap2::Mmap::map(&file) }
.map_err(|e| format!("mmap {}: {}", path.display(), e))?;
if mmap.len() < RKYV_HEADER_LEN { return Ok(None); }
if mmap[..4] != RKYV_MAGIC { return Ok(None); }
// [4..8] = version, skip for now
let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap());
let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap());
let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize;
if cached_nodes != nodes_size || cached_rels != rels_size {
return Ok(None); // stale
}
if mmap.len() < RKYV_HEADER_LEN + data_len {
return Ok(None); // truncated
}
let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len];
// SAFETY: we wrote this file ourselves via save_snapshot().
// Skip full validation (check_archived_root) — the staleness header
// already confirms this snapshot matches the current log state.
let archived = unsafe { rkyv::archived_root::<Snapshot>(rkyv_data) };
let snap: Snapshot = <ArchivedSnapshot as rkyv::Deserialize<Snapshot, rkyv::Infallible>>
::deserialize(archived, &mut rkyv::Infallible).unwrap();
let mut store = Store {
nodes: snap.nodes,
relations: snap.relations,
gaps: snap.gaps,
params: snap.params,
..Default::default()
};
// Rebuild uuid_to_key (not serialized)
for (key, node) in &store.nodes {
store.uuid_to_key.insert(node.uuid, key.clone());
}
store.loaded_nodes_size = nodes_size;
store.loaded_rels_size = rels_size;
Ok(Some(store))
}
}
/// Strip .md suffix from all node keys and relation key strings.
/// Merges duplicates (bare key + .md key) by keeping the latest version.
pub fn strip_md_keys() -> Result<(), String> {
use super::strip_md_suffix;
let mut store = Store::load()?;
let mut renamed_nodes = 0usize;
let mut renamed_rels = 0usize;
let mut merged = 0usize;
// Collect keys that need renaming
let old_keys: Vec<String> = store.nodes.keys()
.filter(|k| k.ends_with(".md") || k.contains(".md#"))
.cloned()
.collect();
for old_key in &old_keys {
let new_key = strip_md_suffix(old_key);
if new_key == *old_key { continue; }
let mut node = store.nodes.remove(old_key).unwrap();
store.uuid_to_key.remove(&node.uuid);
if let Some(existing) = store.nodes.get(&new_key) {
// Merge: keep whichever has the higher version
if existing.version >= node.version {
eprintln!(" merge {}{} (keeping existing v{})",
old_key, new_key, existing.version);
merged += 1;
continue;
}
eprintln!(" merge {}{} (replacing v{} with v{})",
old_key, new_key, existing.version, node.version);
merged += 1;
}
node.key = new_key.clone();
node.version += 1;
store.uuid_to_key.insert(node.uuid, new_key.clone());
store.nodes.insert(new_key, node);
renamed_nodes += 1;
}
// Fix relation key strings
for rel in &mut store.relations {
let new_source = strip_md_suffix(&rel.source_key);
let new_target = strip_md_suffix(&rel.target_key);
if new_source != rel.source_key || new_target != rel.target_key {
rel.source_key = new_source;
rel.target_key = new_target;
rel.version += 1;
renamed_rels += 1;
}
}
if renamed_nodes == 0 && renamed_rels == 0 && merged == 0 {
eprintln!("No .md suffixes found — store is clean");
return Ok(());
}
eprintln!("Renamed {} nodes, {} relations, merged {} duplicates",
renamed_nodes, renamed_rels, merged);
// Append migrated nodes/relations to the log (preserving history)
let changed_nodes: Vec<_> = old_keys.iter()
.filter_map(|old_key| {
let new_key = strip_md_suffix(old_key);
store.nodes.get(&new_key).cloned()
})
.collect();
if !changed_nodes.is_empty() {
store.append_nodes(&changed_nodes)?;
}
// Invalidate caches so next load replays from logs
for p in [state_path(), snapshot_path()] {
if p.exists() {
fs::remove_file(&p).ok();
}
}
eprintln!("Migration complete (appended to existing logs)");
Ok(())
}
// DO NOT USE. This function destroyed the append-only log history on
// 2026-03-14 when strip_md_keys() called it. It:
//
// 1. Truncates nodes.capnp via File::create() — all historical
// versions of every node are permanently lost
// 2. Writes only from the in-memory store — so any node missing
// due to a loading bug is also permanently lost
// 3. Makes no backup of the old log before overwriting
// 4. Filters out deleted relations, destroying deletion history
//
// The correct approach for migrations is to APPEND new versions
// (with updated keys) and delete markers (for old keys) to the
// existing log, preserving all history.
//
// This function is kept (dead) so the comment survives as a warning.
// If you need log compaction in the future, design it properly:
// back up first, preserve history, and never write from a potentially
// incomplete in-memory snapshot.
#[allow(dead_code)]
fn _rewrite_store_disabled(_store: &Store) -> Result<(), String> {
panic!("rewrite_store is disabled — see comment above");
}
/// Check and repair corrupt capnp log files.
///
/// Reads each message sequentially, tracking file position. On the first
/// corrupt message, truncates the file to the last good position. Also
/// removes stale caches so the next load replays from the repaired log.
pub fn fsck() -> Result<(), String> {
let mut any_corrupt = false;
for (path, kind) in [
(nodes_path(), "node"),
(relations_path(), "relation"),
] {
if !path.exists() { continue; }
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let file_len = file.metadata()
.map_err(|e| format!("stat {}: {}", path.display(), e))?.len();
let mut reader = BufReader::new(file);
let mut good_messages = 0u64;
let mut last_good_pos = 0u64;
loop {
let pos = reader.stream_position()
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
Err(_) => {
// read_message fails at EOF (normal) or on corrupt framing
if pos < file_len {
// Not at EOF — corrupt framing
eprintln!("{}: corrupt message at offset {}, truncating", kind, pos);
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.map_err(|e| format!("open for truncate: {}", e))?;
file.set_len(pos)
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, pos, good_messages);
}
break;
}
};
// Validate the message content too
let valid = if kind == "node" {
msg.get_root::<memory_capnp::node_log::Reader>()
.and_then(|l| l.get_nodes().map(|_| ()))
.is_ok()
} else {
msg.get_root::<memory_capnp::relation_log::Reader>()
.and_then(|l| l.get_relations().map(|_| ()))
.is_ok()
};
if valid {
good_messages += 1;
last_good_pos = reader.stream_position()
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
} else {
eprintln!("{}: corrupt message content at offset {}, truncating to {}",
kind, pos, last_good_pos);
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.map_err(|e| format!("open for truncate: {}", e))?;
file.set_len(last_good_pos)
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, last_good_pos, good_messages);
break;
}
}
if !any_corrupt {
eprintln!("{}: {} messages, all clean", kind, good_messages);
}
}
if any_corrupt {
// Nuke caches so next load replays from the repaired logs
for p in [state_path(), snapshot_path()] {
if p.exists() {
fs::remove_file(&p)
.map_err(|e| format!("remove {}: {}", p.display(), e))?;
eprintln!("removed stale cache: {}", p.display());
}
}
eprintln!("repair complete — run `poc-memory status` to verify");
} else {
eprintln!("store is clean");
}
Ok(())
}

View file

@ -0,0 +1,628 @@
// Core types for the memory store
//
// Node, Relation, enums, Params, and supporting types. Also contains
// the capnp serialization macros that generate bidirectional conversion.
use crate::memory_capnp;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
// ---------------------------------------------------------------------------
// Capnp serialization macros
//
// Declarative mapping between Rust types and capnp generated types.
// Adding a field to the schema means adding it in one place below;
// both read and write are generated from the same declaration.
// ---------------------------------------------------------------------------
/// Generate to_capnp/from_capnp conversion methods for an enum.
macro_rules! capnp_enum {
($rust_type:ident, $capnp_type:path, [$($variant:ident),+ $(,)?]) => {
impl $rust_type {
#[allow(clippy::wrong_self_convention, dead_code)]
pub(crate) fn to_capnp(&self) -> $capnp_type {
match self {
$(Self::$variant => <$capnp_type>::$variant,)+
}
}
pub(crate) fn from_capnp(v: $capnp_type) -> Self {
match v {
$(<$capnp_type>::$variant => Self::$variant,)+
}
}
}
};
}
/// Generate from_capnp/to_capnp methods for a struct with capnp serialization.
/// Fields are grouped by serialization kind:
/// text - capnp Text fields (String in Rust)
/// uuid - capnp Data fields ([u8; 16] in Rust)
/// prim - copy types (u32, f32, f64, bool)
/// enm - enums with to_capnp/from_capnp methods
/// skip - Rust-only fields not in capnp (set to Default on read)
macro_rules! capnp_message {
(
$struct:ident,
reader: $reader:ty,
builder: $builder:ty,
text: [$($tf:ident),* $(,)?],
uuid: [$($uf:ident),* $(,)?],
prim: [$($pf:ident),* $(,)?],
enm: [$($ef:ident: $et:ident),* $(,)?],
skip: [$($sf:ident),* $(,)?] $(,)?
) => {
impl $struct {
pub fn from_capnp(r: $reader) -> Result<Self, String> {
paste::paste! {
Ok(Self {
$($tf: read_text(r.[<get_ $tf>]()),)*
$($uf: read_uuid(r.[<get_ $uf>]()),)*
$($pf: r.[<get_ $pf>](),)*
$($ef: $et::from_capnp(
r.[<get_ $ef>]().map_err(|_| concat!("bad ", stringify!($ef)))?
),)*
$($sf: Default::default(),)*
})
}
}
pub fn to_capnp(&self, mut b: $builder) {
paste::paste! {
$(b.[<set_ $tf>](&self.$tf);)*
$(b.[<set_ $uf>](&self.$uf);)*
$(b.[<set_ $pf>](self.$pf);)*
$(b.[<set_ $ef>](self.$ef.to_capnp());)*
}
}
}
};
}
pub fn memory_dir() -> PathBuf {
crate::config::get().data_dir.clone()
}
pub fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") }
pub(crate) fn relations_path() -> PathBuf { memory_dir().join("relations.capnp") }
pub(crate) fn state_path() -> PathBuf { memory_dir().join("state.bin") }
pub(crate) fn snapshot_path() -> PathBuf { memory_dir().join("snapshot.rkyv") }
fn lock_path() -> PathBuf { memory_dir().join(".store.lock") }
/// RAII file lock using flock(2). Dropped when scope exits.
pub(crate) struct StoreLock {
_file: fs::File,
}
impl StoreLock {
pub(crate) fn acquire() -> Result<Self, String> {
let path = lock_path();
let file = fs::OpenOptions::new()
.create(true).truncate(false).write(true).open(&path)
.map_err(|e| format!("open lock {}: {}", path.display(), e))?;
// Blocking exclusive lock
let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
if ret != 0 {
return Err(format!("flock: {}", std::io::Error::last_os_error()));
}
Ok(StoreLock { _file: file })
}
// Lock released automatically when _file is dropped (flock semantics)
}
pub fn now_epoch() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64
}
/// Convert epoch seconds to broken-down local time components.
/// Returns (year, month, day, hour, minute, second).
pub fn epoch_to_local(epoch: i64) -> (i32, u32, u32, u32, u32, u32) {
use chrono::{Datelike, Local, TimeZone, Timelike};
let dt = match Local.timestamp_opt(epoch, 0) {
chrono::LocalResult::Single(dt) => dt,
chrono::LocalResult::Ambiguous(dt, _) => dt,
chrono::LocalResult::None => {
// DST gap or invalid — try shifting, then fall back to UTC
Local.timestamp_opt(epoch + 3600, 0)
.earliest()
.or_else(|| chrono::Utc.timestamp_opt(epoch, 0).earliest()
.map(|dt| dt.with_timezone(&Local)))
.unwrap_or_else(|| {
// Completely invalid timestamp — use epoch 0
chrono::Utc.timestamp_opt(0, 0).unwrap().with_timezone(&Local)
})
}
};
(
dt.year(),
dt.month(),
dt.day(),
dt.hour(),
dt.minute(),
dt.second(),
)
}
/// Format epoch as "YYYY-MM-DD"
pub fn format_date(epoch: i64) -> String {
let (y, m, d, _, _, _) = epoch_to_local(epoch);
format!("{:04}-{:02}-{:02}", y, m, d)
}
/// Format epoch as "YYYY-MM-DDTHH:MM"
pub fn format_datetime(epoch: i64) -> String {
let (y, m, d, h, min, _) = epoch_to_local(epoch);
format!("{:04}-{:02}-{:02}T{:02}:{:02}", y, m, d, h, min)
}
/// Format epoch as "YYYY-MM-DD HH:MM"
pub fn format_datetime_space(epoch: i64) -> String {
let (y, m, d, h, min, _) = epoch_to_local(epoch);
format!("{:04}-{:02}-{:02} {:02}:{:02}", y, m, d, h, min)
}
/// Compact timestamp for use in keys: "YYYYMMDDTHHMMss"
pub fn compact_timestamp() -> String {
let (y, m, d, h, min, s) = epoch_to_local(now_epoch());
format!("{:04}{:02}{:02}T{:02}{:02}{:02}", y, m, d, h, min, s)
}
pub fn today() -> String {
format_date(now_epoch())
}
// In-memory node representation
#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct Node {
pub uuid: [u8; 16],
pub version: u32,
pub timestamp: i64,
pub node_type: NodeType,
pub provenance: String,
pub key: String,
pub content: String,
pub weight: f32,
pub emotion: f32,
pub deleted: bool,
pub source_ref: String,
pub created: String,
pub retrievals: u32,
pub uses: u32,
pub wrongs: u32,
pub state_tag: String,
pub last_replayed: i64,
pub spaced_repetition_interval: u32,
// Position within file (section index, for export ordering)
#[serde(default)]
pub position: u32,
// Stable creation timestamp (unix epoch seconds). Set once at creation;
// never updated on rename or content update. Zero for legacy nodes.
#[serde(default)]
pub created_at: i64,
// Derived fields (not in capnp, computed from graph)
#[serde(default)]
pub community_id: Option<u32>,
#[serde(default)]
pub clustering_coefficient: Option<f32>,
#[serde(default)]
pub degree: Option<u32>,
}
#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct Relation {
pub uuid: [u8; 16],
pub version: u32,
pub timestamp: i64,
pub source: [u8; 16],
pub target: [u8; 16],
pub rel_type: RelationType,
pub strength: f32,
pub provenance: String,
pub deleted: bool,
pub source_key: String,
pub target_key: String,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub enum NodeType {
EpisodicSession,
EpisodicDaily,
EpisodicWeekly,
Semantic,
EpisodicMonthly,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub enum Provenance {
Manual,
Journal,
Agent, // legacy catch-all, prefer specific variants below
Dream,
Derived,
AgentExperienceMine,
AgentKnowledgeObservation,
AgentKnowledgePattern,
AgentKnowledgeConnector,
AgentKnowledgeChallenger,
AgentConsolidate,
AgentDigest,
AgentFactMine,
AgentDecay,
}
impl Provenance {
/// Parse from POC_PROVENANCE env var. Returns None if unset.
pub fn from_env() -> Option<Self> {
std::env::var("POC_PROVENANCE").ok().and_then(|s| Self::from_label(&s))
}
pub fn from_label(s: &str) -> Option<Self> {
Some(match s {
"manual" => Self::Manual,
"journal" => Self::Journal,
"agent" => Self::Agent,
"dream" => Self::Dream,
"derived" => Self::Derived,
"agent:experience-mine" => Self::AgentExperienceMine,
"agent:knowledge-observation"=> Self::AgentKnowledgeObservation,
"agent:knowledge-pattern" => Self::AgentKnowledgePattern,
"agent:knowledge-connector" => Self::AgentKnowledgeConnector,
"agent:knowledge-challenger" => Self::AgentKnowledgeChallenger,
"agent:consolidate" => Self::AgentConsolidate,
"agent:digest" => Self::AgentDigest,
"agent:fact-mine" => Self::AgentFactMine,
"agent:decay" => Self::AgentDecay,
_ => return None,
})
}
pub fn label(&self) -> &'static str {
match self {
Self::Manual => "manual",
Self::Journal => "journal",
Self::Agent => "agent",
Self::Dream => "dream",
Self::Derived => "derived",
Self::AgentExperienceMine => "agent:experience-mine",
Self::AgentKnowledgeObservation => "agent:knowledge-observation",
Self::AgentKnowledgePattern => "agent:knowledge-pattern",
Self::AgentKnowledgeConnector => "agent:knowledge-connector",
Self::AgentKnowledgeChallenger => "agent:knowledge-challenger",
Self::AgentConsolidate => "agent:consolidate",
Self::AgentDigest => "agent:digest",
Self::AgentFactMine => "agent:fact-mine",
Self::AgentDecay => "agent:decay",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub enum RelationType {
Link,
Causal,
Auto,
}
capnp_enum!(NodeType, memory_capnp::NodeType,
[EpisodicSession, EpisodicDaily, EpisodicWeekly, Semantic, EpisodicMonthly]);
capnp_enum!(Provenance, memory_capnp::Provenance,
[Manual, Journal, Agent, Dream, Derived,
AgentExperienceMine, AgentKnowledgeObservation, AgentKnowledgePattern,
AgentKnowledgeConnector, AgentKnowledgeChallenger, AgentConsolidate,
AgentDigest, AgentFactMine, AgentDecay]);
capnp_enum!(RelationType, memory_capnp::RelationType,
[Link, Causal, Auto]);
capnp_message!(Node,
reader: memory_capnp::content_node::Reader<'_>,
builder: memory_capnp::content_node::Builder<'_>,
text: [key, content, source_ref, created, state_tag, provenance],
uuid: [uuid],
prim: [version, timestamp, weight, emotion, deleted,
retrievals, uses, wrongs, last_replayed,
spaced_repetition_interval, position, created_at],
enm: [node_type: NodeType],
skip: [community_id, clustering_coefficient, degree],
);
impl Node {
/// Read from capnp with migration: if the new provenance text field
/// is empty (old record), fall back to the deprecated provenanceOld enum.
pub fn from_capnp_migrate(r: memory_capnp::content_node::Reader<'_>) -> Result<Self, String> {
let mut node = Self::from_capnp(r)?;
if node.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {
node.provenance = Provenance::from_capnp(old).label().to_string();
}
// Sanitize timestamps: old capnp records have raw offsets instead
// of unix epoch. Anything past year 2100 (~4102444800) is bogus.
const MAX_SANE_EPOCH: i64 = 4_102_444_800;
if node.timestamp > MAX_SANE_EPOCH || node.timestamp < 0 {
node.timestamp = node.created_at;
}
if node.created_at > MAX_SANE_EPOCH || node.created_at < 0 {
node.created_at = node.timestamp.min(MAX_SANE_EPOCH);
}
Ok(node)
}
}
capnp_message!(Relation,
reader: memory_capnp::relation::Reader<'_>,
builder: memory_capnp::relation::Builder<'_>,
text: [source_key, target_key, provenance],
uuid: [uuid, source, target],
prim: [version, timestamp, strength, deleted],
enm: [rel_type: RelationType],
skip: [],
);
impl Relation {
pub fn from_capnp_migrate(r: memory_capnp::relation::Reader<'_>) -> Result<Self, String> {
let mut rel = Self::from_capnp(r)?;
if rel.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {
rel.provenance = Provenance::from_capnp(old).label().to_string();
}
Ok(rel)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct RetrievalEvent {
pub query: String,
pub timestamp: String,
pub results: Vec<String>,
pub used: Option<Vec<String>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct Params {
pub default_weight: f64,
pub decay_factor: f64,
pub use_boost: f64,
pub prune_threshold: f64,
pub edge_decay: f64,
pub max_hops: u32,
pub min_activation: f64,
}
impl Default for Params {
fn default() -> Self {
Params {
default_weight: 0.7,
decay_factor: 0.95,
use_boost: 0.15,
prune_threshold: 0.1,
edge_decay: 0.3,
max_hops: 3,
min_activation: 0.05,
}
}
}
// Gap record — something we looked for but didn't find
#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct GapRecord {
pub description: String,
pub timestamp: String,
}
/// Per-node agent visit index: node_key → (agent_type → last_visit_timestamp)
pub type VisitIndex = HashMap<String, HashMap<String, i64>>;
// The full in-memory store
#[derive(Default, Serialize, Deserialize)]
pub struct Store {
pub nodes: HashMap<String, Node>, // key → latest node
#[serde(skip)]
pub uuid_to_key: HashMap<[u8; 16], String>, // uuid → key (rebuilt from nodes)
pub relations: Vec<Relation>, // all active relations
pub retrieval_log: Vec<RetrievalEvent>,
pub gaps: Vec<GapRecord>,
pub params: Params,
/// Agent visit tracking: node_key → (agent_type → last_visit_epoch)
#[serde(default)]
pub visits: VisitIndex,
/// Transcript mining progress: (transcript_id, segment_index) → set of agents that processed it
#[serde(default)]
pub transcript_progress: HashMap<(String, u32), HashSet<String>>,
/// Log sizes at load time — used by save() to write correct staleness header.
/// If another writer appended since we loaded, our cache will be marked stale
/// (recorded size < actual size), forcing the next reader to replay the log.
#[serde(skip)]
pub(crate) loaded_nodes_size: u64,
#[serde(skip)]
pub(crate) loaded_rels_size: u64,
}
/// Snapshot for mmap: full store state minus retrieval_log (which
/// is append-only in retrieval.log). rkyv zero-copy serialization
/// lets us mmap this and access archived data without deserialization.
#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub(crate) struct Snapshot {
pub(crate) nodes: HashMap<String, Node>,
pub(crate) relations: Vec<Relation>,
pub(crate) gaps: Vec<GapRecord>,
pub(crate) params: Params,
}
// rkyv snapshot header: 32 bytes (multiple of 16 for alignment after mmap)
// [0..4] magic "RKV\x01"
// [4..8] format version (u32 LE)
// [8..16] nodes.capnp file size (u64 LE) — staleness check
// [16..24] relations.capnp file size (u64 LE)
// [24..32] rkyv data length (u64 LE)
pub(crate) const RKYV_MAGIC: [u8; 4] = *b"RKV\x01";
pub(crate) const RKYV_HEADER_LEN: usize = 32;
// state.bin header: magic + log file sizes for staleness detection.
// File sizes are race-free for append-only logs (they only grow),
// unlike mtimes which race with concurrent writers.
pub(crate) const CACHE_MAGIC: [u8; 4] = *b"POC\x01";
pub(crate) const CACHE_HEADER_LEN: usize = 4 + 8 + 8; // magic + nodes_size + rels_size
// Cap'n Proto serialization helpers
/// Read a capnp text field, returning empty string on any error
pub(crate) fn read_text(result: capnp::Result<capnp::text::Reader>) -> String {
result.ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string()
}
/// Read a capnp data field as [u8; 16], zero-padded
pub(crate) fn read_uuid(result: capnp::Result<&[u8]>) -> [u8; 16] {
let mut out = [0u8; 16];
if let Ok(data) = result
&& data.len() >= 16 {
out.copy_from_slice(&data[..16]);
}
out
}
/// Create a new node with defaults
pub fn new_node(key: &str, content: &str) -> Node {
Node {
uuid: *Uuid::new_v4().as_bytes(),
version: 1,
timestamp: now_epoch(),
node_type: NodeType::Semantic,
provenance: "manual".to_string(),
key: key.to_string(),
content: content.to_string(),
weight: 0.7,
emotion: 0.0,
deleted: false,
source_ref: String::new(),
created: today(),
retrievals: 0,
uses: 0,
wrongs: 0,
state_tag: String::new(),
last_replayed: 0,
spaced_repetition_interval: 1,
position: 0,
created_at: now_epoch(),
community_id: None,
clustering_coefficient: None,
degree: None,
}
}
/// Agent visit record — tracks when an agent successfully processed a node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AgentVisit {
pub node_uuid: [u8; 16],
pub node_key: String,
pub agent: String,
pub timestamp: i64,
pub outcome: String,
}
capnp_message!(AgentVisit,
reader: memory_capnp::agent_visit::Reader<'_>,
builder: memory_capnp::agent_visit::Builder<'_>,
text: [node_key, agent, outcome],
uuid: [node_uuid],
prim: [timestamp],
enm: [],
skip: [],
);
pub fn new_visit(node_uuid: [u8; 16], node_key: &str, agent: &str, outcome: &str) -> AgentVisit {
AgentVisit {
node_uuid,
node_key: node_key.to_string(),
agent: agent.to_string(),
timestamp: now_epoch(),
outcome: outcome.to_string(),
}
}
pub(crate) fn visits_path() -> PathBuf { memory_dir().join("visits.capnp") }
/// Transcript mining progress — tracks which segments have been processed
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TranscriptSegment {
pub transcript_id: String,
pub segment_index: u32,
pub agent: String,
pub timestamp: i64,
}
capnp_message!(TranscriptSegment,
reader: memory_capnp::transcript_segment::Reader<'_>,
builder: memory_capnp::transcript_segment::Builder<'_>,
text: [transcript_id, agent],
uuid: [],
prim: [segment_index, timestamp],
enm: [],
skip: [],
);
pub fn new_transcript_segment(transcript_id: &str, segment_index: u32, agent: &str) -> TranscriptSegment {
TranscriptSegment {
transcript_id: transcript_id.to_string(),
segment_index,
agent: agent.to_string(),
timestamp: now_epoch(),
}
}
pub(crate) fn transcript_progress_path() -> PathBuf { memory_dir().join("transcript-progress.capnp") }
/// Create a new relation.
/// Provenance is set from POC_PROVENANCE env var if present, else "manual".
pub fn new_relation(
source_uuid: [u8; 16],
target_uuid: [u8; 16],
rel_type: RelationType,
strength: f32,
source_key: &str,
target_key: &str,
) -> Relation {
// Use raw env var for provenance — agent names are dynamic
let provenance = std::env::var("POC_PROVENANCE")
.unwrap_or_else(|_| "manual".to_string());
Relation {
uuid: *Uuid::new_v4().as_bytes(),
version: 1,
timestamp: now_epoch(),
source: source_uuid,
target: target_uuid,
rel_type,
strength,
provenance,
deleted: false,
source_key: source_key.to_string(),
target_key: target_key.to_string(),
}
}

View file

@ -0,0 +1,217 @@
// Read-only access abstractions for the memory store
//
// StoreView: trait abstracting over owned Store and zero-copy MmapView.
// MmapView: mmap'd rkyv snapshot for sub-millisecond read-only access.
// AnyView: enum dispatch selecting fastest available view at runtime.
use super::types::*;
use std::fs;
// ---------------------------------------------------------------------------
// StoreView: read-only access trait for search and graph code.
//
// Abstracts over owned Store and zero-copy MmapView so the same
// spreading-activation and graph code works with either.
// ---------------------------------------------------------------------------
pub trait StoreView {
/// Iterate all nodes. Callback receives (key, content, weight).
fn for_each_node<F: FnMut(&str, &str, f32)>(&self, f: F);
/// Iterate all nodes with metadata. Callback receives (key, node_type, timestamp).
fn for_each_node_meta<F: FnMut(&str, NodeType, i64)>(&self, f: F);
/// Iterate all relations. Callback receives (source_key, target_key, strength, rel_type).
fn for_each_relation<F: FnMut(&str, &str, f32, RelationType)>(&self, f: F);
/// Node weight by key, or the default weight if missing.
fn node_weight(&self, key: &str) -> f64;
/// Node content by key.
fn node_content(&self, key: &str) -> Option<&str>;
/// Search/graph parameters.
fn params(&self) -> Params;
}
impl StoreView for Store {
fn for_each_node<F: FnMut(&str, &str, f32)>(&self, mut f: F) {
for (key, node) in &self.nodes {
f(key, &node.content, node.weight);
}
}
fn for_each_node_meta<F: FnMut(&str, NodeType, i64)>(&self, mut f: F) {
for (key, node) in &self.nodes {
f(key, node.node_type, node.timestamp);
}
}
fn for_each_relation<F: FnMut(&str, &str, f32, RelationType)>(&self, mut f: F) {
for rel in &self.relations {
if rel.deleted { continue; }
f(&rel.source_key, &rel.target_key, rel.strength, rel.rel_type);
}
}
fn node_weight(&self, key: &str) -> f64 {
self.nodes.get(key).map(|n| n.weight as f64).unwrap_or(self.params.default_weight)
}
fn node_content(&self, key: &str) -> Option<&str> {
self.nodes.get(key).map(|n| n.content.as_str())
}
fn params(&self) -> Params {
self.params
}
}
// ---------------------------------------------------------------------------
// MmapView: zero-copy store access via mmap'd rkyv snapshot.
//
// Holds the mmap alive; all string reads go directly into the mapped
// pages without allocation. Falls back to None if snapshot is stale.
// ---------------------------------------------------------------------------
pub struct MmapView {
mmap: memmap2::Mmap,
_file: fs::File,
data_offset: usize,
data_len: usize,
}
impl MmapView {
/// Try to open a fresh rkyv snapshot. Returns None if missing or stale.
pub fn open() -> Option<Self> {
let path = snapshot_path();
let file = fs::File::open(&path).ok()?;
let mmap = unsafe { memmap2::Mmap::map(&file) }.ok()?;
if mmap.len() < RKYV_HEADER_LEN { return None; }
if mmap[..4] != RKYV_MAGIC { return None; }
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap());
let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap());
let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize;
if cached_nodes != nodes_size || cached_rels != rels_size { return None; }
if mmap.len() < RKYV_HEADER_LEN + data_len { return None; }
Some(MmapView { mmap, _file: file, data_offset: RKYV_HEADER_LEN, data_len })
}
fn snapshot(&self) -> &ArchivedSnapshot {
let data = &self.mmap[self.data_offset..self.data_offset + self.data_len];
unsafe { rkyv::archived_root::<Snapshot>(data) }
}
}
impl StoreView for MmapView {
fn for_each_node<F: FnMut(&str, &str, f32)>(&self, mut f: F) {
let snap = self.snapshot();
for (key, node) in snap.nodes.iter() {
f(key, &node.content, node.weight);
}
}
fn for_each_node_meta<F: FnMut(&str, NodeType, i64)>(&self, mut f: F) {
let snap = self.snapshot();
for (key, node) in snap.nodes.iter() {
let nt = match node.node_type {
ArchivedNodeType::EpisodicSession => NodeType::EpisodicSession,
ArchivedNodeType::EpisodicDaily => NodeType::EpisodicDaily,
ArchivedNodeType::EpisodicWeekly => NodeType::EpisodicWeekly,
ArchivedNodeType::EpisodicMonthly => NodeType::EpisodicMonthly,
ArchivedNodeType::Semantic => NodeType::Semantic,
};
f(key, nt, node.timestamp);
}
}
fn for_each_relation<F: FnMut(&str, &str, f32, RelationType)>(&self, mut f: F) {
let snap = self.snapshot();
for rel in snap.relations.iter() {
if rel.deleted { continue; }
let rt = match rel.rel_type {
ArchivedRelationType::Link => RelationType::Link,
ArchivedRelationType::Causal => RelationType::Causal,
ArchivedRelationType::Auto => RelationType::Auto,
};
f(&rel.source_key, &rel.target_key, rel.strength, rt);
}
}
fn node_weight(&self, key: &str) -> f64 {
let snap = self.snapshot();
snap.nodes.get(key)
.map(|n| n.weight as f64)
.unwrap_or(snap.params.default_weight)
}
fn node_content(&self, key: &str) -> Option<&str> {
let snap = self.snapshot();
snap.nodes.get(key).map(|n| &*n.content)
}
fn params(&self) -> Params {
let p = &self.snapshot().params;
Params {
default_weight: p.default_weight,
decay_factor: p.decay_factor,
use_boost: p.use_boost,
prune_threshold: p.prune_threshold,
edge_decay: p.edge_decay,
max_hops: p.max_hops,
min_activation: p.min_activation,
}
}
}
// ---------------------------------------------------------------------------
// AnyView: enum dispatch for read-only access.
//
// MmapView when the snapshot is fresh, owned Store as fallback.
// The match on each call is a single predicted branch — zero overhead.
// ---------------------------------------------------------------------------
pub enum AnyView {
Mmap(MmapView),
Owned(Store),
}
impl AnyView {
/// Load the fastest available view: mmap snapshot or owned store.
pub fn load() -> Result<Self, String> {
if let Some(mv) = MmapView::open() {
Ok(AnyView::Mmap(mv))
} else {
Ok(AnyView::Owned(Store::load()?))
}
}
}
impl StoreView for AnyView {
fn for_each_node<F: FnMut(&str, &str, f32)>(&self, f: F) {
match self { AnyView::Mmap(v) => v.for_each_node(f), AnyView::Owned(s) => s.for_each_node(f) }
}
fn for_each_node_meta<F: FnMut(&str, NodeType, i64)>(&self, f: F) {
match self { AnyView::Mmap(v) => v.for_each_node_meta(f), AnyView::Owned(s) => s.for_each_node_meta(f) }
}
fn for_each_relation<F: FnMut(&str, &str, f32, RelationType)>(&self, f: F) {
match self { AnyView::Mmap(v) => v.for_each_relation(f), AnyView::Owned(s) => s.for_each_relation(f) }
}
fn node_weight(&self, key: &str) -> f64 {
match self { AnyView::Mmap(v) => v.node_weight(key), AnyView::Owned(s) => s.node_weight(key) }
}
fn node_content(&self, key: &str) -> Option<&str> {
match self { AnyView::Mmap(v) => v.node_content(key), AnyView::Owned(s) => s.node_content(key) }
}
fn params(&self) -> Params {
match self { AnyView::Mmap(v) => v.params(), AnyView::Owned(s) => s.params() }
}
}