From 3f644609e1f882ef6f3776f683585ec88a7d231d Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Tue, 3 Mar 2026 16:40:32 -0500 Subject: [PATCH] store: split mod.rs into persist.rs and ops.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mod.rs was 937 lines with all Store methods in one block. Split into three files by responsibility: - persist.rs (318 lines): load, save, replay, append, snapshot — all disk IO and cache management - ops.rs (300 lines): upsert, delete, modify, mark_used/wrong, decay, fix_categories, cap_degree — all mutations - mod.rs (356 lines): re-exports, key resolution, ingestion, rendering, search — read-only operations No behavioral changes; cargo check + full smoke test pass. --- src/store/mod.rs | 717 ++++--------------------------------------- src/store/ops.rs | 300 ++++++++++++++++++ src/store/persist.rs | 318 +++++++++++++++++++ 3 files changed, 686 insertions(+), 649 deletions(-) create mode 100644 src/store/ops.rs create mode 100644 src/store/persist.rs diff --git a/src/store/mod.rs b/src/store/mod.rs index 0518199..16b176c 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -10,454 +10,35 @@ // 2. bincode cache (state.bin) — ~10ms // 3. capnp log replay — ~40ms // Staleness: log file sizes embedded in cache headers. +// +// Module layout: +// types.rs — Node, Relation, enums, capnp macros, path helpers +// parse.rs — markdown → MemoryUnit parsing +// view.rs — zero-copy read-only access (StoreView, MmapView) +// persist.rs — load, save, replay, append, snapshot (all disk IO) +// ops.rs — mutations (upsert, delete, decay, cap_degree, etc.) +// mod.rs — re-exports, key resolution, ingestion, rendering mod types; mod parse; mod view; +mod persist; +mod ops; // Re-export everything callers need pub use types::*; pub use parse::{MemoryUnit, parse_units}; pub use view::{StoreView, AnyView}; -use crate::memory_capnp; use crate::graph::{self, Graph}; -use capnp::message; -use capnp::serialize; - -use std::collections::{HashMap, HashSet}; use std::fs; -use std::io::{BufReader, BufWriter, Write as IoWrite}; +use std::io::Write as IoWrite; use std::path::Path; use parse::classify_filename; impl Store { - /// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs. - /// - /// Staleness check uses log file sizes (not mtimes). Since logs are - /// append-only, any write grows the file, invalidating the cache. - /// This avoids the mtime race that caused data loss with concurrent - /// writers (dream loop, link audit, journal enrichment). - pub fn load() -> Result { - // 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy) - match Self::load_snapshot_mmap() { - Ok(Some(store)) => return Ok(store), - Ok(None) => {}, - Err(e) => eprintln!("rkyv snapshot: {}", e), - } - - // 2. Try bincode state.bin cache (~10ms) - let nodes_p = nodes_path(); - let rels_p = relations_path(); - let state_p = state_path(); - - let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); - - if let Ok(data) = fs::read(&state_p) { - if data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC { - let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap()); - let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap()); - - if cached_nodes == nodes_size && cached_rels == rels_size { - if let Ok(mut store) = bincode::deserialize::(&data[CACHE_HEADER_LEN..]) { - // Rebuild uuid_to_key (skipped by serde) - for (key, node) in &store.nodes { - store.uuid_to_key.insert(node.uuid, key.clone()); - } - // Bootstrap: write rkyv snapshot if missing - if !snapshot_path().exists() { - if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) { - eprintln!("rkyv bootstrap: {}", e); - } - } - return Ok(store); - } - } - } - } - - // Stale or no cache — rebuild from capnp logs - let mut store = Store::default(); - - if nodes_p.exists() { - store.replay_nodes(&nodes_p)?; - } - if rels_p.exists() { - store.replay_relations(&rels_p)?; - } - - // Drop edges referencing deleted/missing nodes - store.relations.retain(|r| - store.nodes.contains_key(&r.source_key) && - store.nodes.contains_key(&r.target_key) - ); - - store.save()?; - Ok(store) - } - - /// Replay node log, keeping latest version per UUID - fn replay_nodes(&mut self, path: &Path) -> Result<(), String> { - let file = fs::File::open(path) - .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut reader = BufReader::new(file); - - while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { - let log = msg.get_root::() - .map_err(|e| format!("read node log: {}", e))?; - for node_reader in log.get_nodes() - .map_err(|e| format!("get nodes: {}", e))? { - let node = Node::from_capnp(node_reader)?; - let existing_version = self.nodes.get(&node.key) - .map(|n| n.version) - .unwrap_or(0); - if node.version >= existing_version { - if node.deleted { - self.nodes.remove(&node.key); - self.uuid_to_key.remove(&node.uuid); - } else { - self.uuid_to_key.insert(node.uuid, node.key.clone()); - self.nodes.insert(node.key.clone(), node); - } - } - } - } - Ok(()) - } - - /// Replay relation log, keeping latest version per UUID - fn replay_relations(&mut self, path: &Path) -> Result<(), String> { - let file = fs::File::open(path) - .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut reader = BufReader::new(file); - - // Collect all, then deduplicate by UUID keeping latest version - let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new(); - - while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { - let log = msg.get_root::() - .map_err(|e| format!("read relation log: {}", e))?; - for rel_reader in log.get_relations() - .map_err(|e| format!("get relations: {}", e))? { - let rel = Relation::from_capnp(rel_reader)?; - let existing_version = by_uuid.get(&rel.uuid) - .map(|r| r.version) - .unwrap_or(0); - if rel.version >= existing_version { - by_uuid.insert(rel.uuid, rel); - } - } - } - - self.relations = by_uuid.into_values() - .filter(|r| !r.deleted) - .collect(); - Ok(()) - } - - /// Append nodes to the log file - pub fn append_nodes(&self, nodes: &[Node]) -> Result<(), String> { - let _lock = StoreLock::acquire()?; - - let path = nodes_path(); - let file = fs::OpenOptions::new() - .create(true).append(true).open(&path) - .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut writer = BufWriter::new(file); - - let mut msg = message::Builder::new_default(); - { - let log = msg.init_root::(); - let mut list = log.init_nodes(nodes.len() as u32); - for (i, node) in nodes.iter().enumerate() { - node.to_capnp(list.reborrow().get(i as u32)); - } - } - serialize::write_message(&mut writer, &msg) - .map_err(|e| format!("write nodes: {}", e))?; - writer.flush().map_err(|e| format!("flush: {}", e))?; - Ok(()) - } - - /// Append relations to the log file - pub fn append_relations(&self, relations: &[Relation]) -> Result<(), String> { - let _lock = StoreLock::acquire()?; - - let path = relations_path(); - let file = fs::OpenOptions::new() - .create(true).append(true).open(&path) - .map_err(|e| format!("open {}: {}", path.display(), e))?; - let mut writer = BufWriter::new(file); - - let mut msg = message::Builder::new_default(); - { - let log = msg.init_root::(); - let mut list = log.init_relations(relations.len() as u32); - for (i, rel) in relations.iter().enumerate() { - rel.to_capnp(list.reborrow().get(i as u32)); - } - } - serialize::write_message(&mut writer, &msg) - .map_err(|e| format!("write relations: {}", e))?; - writer.flush().map_err(|e| format!("flush: {}", e))?; - Ok(()) - } - - /// Save the derived cache with log size header for staleness detection. - /// Uses atomic write (tmp + rename) to prevent partial reads. - pub fn save(&self) -> Result<(), String> { - let _lock = StoreLock::acquire()?; - - let path = state_path(); - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).ok(); - } - - let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); - - let bincode_data = bincode::serialize(self) - .map_err(|e| format!("bincode serialize: {}", e))?; - - let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len()); - data.extend_from_slice(&CACHE_MAGIC); - data.extend_from_slice(&nodes_size.to_le_bytes()); - data.extend_from_slice(&rels_size.to_le_bytes()); - data.extend_from_slice(&bincode_data); - - // Atomic write: tmp file + rename - let tmp_path = path.with_extension("bin.tmp"); - fs::write(&tmp_path, &data) - .map_err(|e| format!("write {}: {}", tmp_path.display(), e))?; - fs::rename(&tmp_path, &path) - .map_err(|e| format!("rename {} → {}: {}", tmp_path.display(), path.display(), e))?; - - // Also write rkyv snapshot (mmap-friendly) - if let Err(e) = self.save_snapshot(nodes_size, rels_size) { - eprintln!("rkyv snapshot save: {}", e); - } - - Ok(()) - } - - /// Serialize store as rkyv snapshot with staleness header. - /// Assumes StoreLock is already held by caller. - fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> { - let snap = Snapshot { - nodes: self.nodes.clone(), - relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(), - gaps: self.gaps.clone(), - params: self.params, - }; - - let rkyv_data = rkyv::to_bytes::<_, 256>(&snap) - .map_err(|e| format!("rkyv serialize: {}", e))?; - - let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len()); - data.extend_from_slice(&RKYV_MAGIC); - data.extend_from_slice(&1u32.to_le_bytes()); // format version - data.extend_from_slice(&nodes_size.to_le_bytes()); - data.extend_from_slice(&rels_size.to_le_bytes()); - data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes()); - data.extend_from_slice(&rkyv_data); - - let path = snapshot_path(); - let tmp_path = path.with_extension("rkyv.tmp"); - fs::write(&tmp_path, &data) - .map_err(|e| format!("write {}: {}", tmp_path.display(), e))?; - fs::rename(&tmp_path, &path) - .map_err(|e| format!("rename: {}", e))?; - - Ok(()) - } - - /// Try loading store from mmap'd rkyv snapshot. - /// Returns None if snapshot is missing or stale (log sizes don't match). - fn load_snapshot_mmap() -> Result, String> { - let path = snapshot_path(); - if !path.exists() { return Ok(None); } - - let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); - - let file = fs::File::open(&path) - .map_err(|e| format!("open {}: {}", path.display(), e))?; - - let mmap = unsafe { memmap2::Mmap::map(&file) } - .map_err(|e| format!("mmap {}: {}", path.display(), e))?; - - if mmap.len() < RKYV_HEADER_LEN { return Ok(None); } - if mmap[..4] != RKYV_MAGIC { return Ok(None); } - - // [4..8] = version, skip for now - let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap()); - let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap()); - let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize; - - if cached_nodes != nodes_size || cached_rels != rels_size { - return Ok(None); // stale - } - if mmap.len() < RKYV_HEADER_LEN + data_len { - return Ok(None); // truncated - } - - let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len]; - - // SAFETY: we wrote this file ourselves via save_snapshot(). - // Skip full validation (check_archived_root) — the staleness header - // already confirms this snapshot matches the current log state. - let archived = unsafe { rkyv::archived_root::(rkyv_data) }; - - let snap: Snapshot = > - ::deserialize(archived, &mut rkyv::Infallible).unwrap(); - - let mut store = Store { - nodes: snap.nodes, - relations: snap.relations, - gaps: snap.gaps, - params: snap.params, - ..Default::default() - }; - - // Rebuild uuid_to_key (not serialized) - for (key, node) in &store.nodes { - store.uuid_to_key.insert(node.uuid, key.clone()); - } - - Ok(Some(store)) - } - - /// Add or update a node (appends to log + updates cache) - pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> { - if let Some(existing) = self.nodes.get(&node.key) { - node.uuid = existing.uuid; - node.version = existing.version + 1; - } - self.append_nodes(&[node.clone()])?; - self.uuid_to_key.insert(node.uuid, node.key.clone()); - self.nodes.insert(node.key.clone(), node); - Ok(()) - } - - /// Add a relation (appends to log + updates cache) - pub fn add_relation(&mut self, rel: Relation) -> Result<(), String> { - self.append_relations(std::slice::from_ref(&rel))?; - self.relations.push(rel); - Ok(()) - } - - /// Upsert a node: update if exists (and content changed), create if not. - /// Returns: "created", "updated", or "unchanged". - pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str, String> { - if let Some(existing) = self.nodes.get(key) { - if existing.content == content { - return Ok("unchanged"); - } - let mut node = existing.clone(); - node.content = content.to_string(); - node.version += 1; - self.append_nodes(std::slice::from_ref(&node))?; - self.nodes.insert(key.to_string(), node); - Ok("updated") - } else { - let node = new_node(key, content); - self.append_nodes(std::slice::from_ref(&node))?; - self.uuid_to_key.insert(node.uuid, node.key.clone()); - self.nodes.insert(key.to_string(), node); - Ok("created") - } - } - - /// Soft-delete a node (appends deleted version, removes from cache). - pub fn delete_node(&mut self, key: &str) -> Result<(), String> { - let node = self.nodes.get(key) - .ok_or_else(|| format!("No node '{}'", key))?; - let mut deleted = node.clone(); - deleted.deleted = true; - deleted.version += 1; - self.append_nodes(std::slice::from_ref(&deleted))?; - self.nodes.remove(key); - Ok(()) - } - - /// Scan markdown files and index all memory units - pub fn init_from_markdown(&mut self) -> Result { - let dir = memory_dir(); - let mut count = 0; - if dir.exists() { - count = self.scan_dir_for_init(&dir)?; - } - Ok(count) - } - - fn scan_dir_for_init(&mut self, dir: &Path) -> Result { - let mut count = 0; - let entries = fs::read_dir(dir) - .map_err(|e| format!("read dir {}: {}", dir.display(), e))?; - - for entry in entries.flatten() { - let path = entry.path(); - if path.is_dir() { - count += self.scan_dir_for_init(&path)?; - continue; - } - let Some(ext) = path.extension() else { continue }; - if ext != "md" { continue } - - let filename = path.file_name().unwrap().to_string_lossy().to_string(); - let content = fs::read_to_string(&path) - .map_err(|e| format!("read {}: {}", path.display(), e))?; - - let units = parse_units(&filename, &content); - let (new_count, _) = self.ingest_units(&units, &filename)?; - count += new_count; - - // Create relations from links - let mut new_relations = Vec::new(); - for unit in &units { - let source_uuid = match self.nodes.get(&unit.key) { - Some(n) => n.uuid, - None => continue, - }; - - for link in unit.marker_links.iter().chain(unit.md_links.iter()) { - let Some((key, uuid)) = self.resolve_node_uuid(link) else { continue }; - let exists = self.relations.iter().any(|r| - (r.source == source_uuid && r.target == uuid) || - (r.source == uuid && r.target == source_uuid)); - if !exists { - new_relations.push(new_relation( - source_uuid, uuid, RelationType::Link, 1.0, - &unit.key, &key, - )); - } - } - - for cause in &unit.causes { - let Some((key, uuid)) = self.resolve_node_uuid(cause) else { continue }; - let exists = self.relations.iter().any(|r| - r.source == uuid && r.target == source_uuid - && r.rel_type == RelationType::Causal); - if !exists { - new_relations.push(new_relation( - uuid, source_uuid, RelationType::Causal, 1.0, - &key, &unit.key, - )); - } - } - } - - if !new_relations.is_empty() { - self.append_relations(&new_relations)?; - self.relations.extend(new_relations); - } - } - Ok(count) - } - pub fn build_graph(&self) -> Graph { graph::build_graph(self) } @@ -547,241 +128,79 @@ impl Store { } } - /// Modify a node in-place, bump version, and persist to capnp log. - fn modify_node(&mut self, key: &str, f: impl FnOnce(&mut Node)) -> Result<(), String> { - let node = self.nodes.get_mut(key) - .ok_or_else(|| format!("No node '{}'", key))?; - f(node); - node.version += 1; - let node = node.clone(); - self.append_nodes(&[node]) - } - - pub fn mark_used(&mut self, key: &str) { - let boost = self.params.use_boost as f32; - let _ = self.modify_node(key, |n| { - n.uses += 1; - n.weight = (n.weight + boost).min(1.0); - if n.spaced_repetition_interval < 30 { - n.spaced_repetition_interval = match n.spaced_repetition_interval { - 1 => 3, 3 => 7, 7 => 14, 14 => 30, _ => 30, - }; - } - n.last_replayed = now_epoch(); - }); - } - - pub fn mark_wrong(&mut self, key: &str, _ctx: Option<&str>) { - let _ = self.modify_node(key, |n| { - n.wrongs += 1; - n.weight = (n.weight - 0.1).max(0.0); - n.spaced_repetition_interval = 1; - }); - } - - pub fn record_gap(&mut self, desc: &str) { - self.gaps.push(GapRecord { - description: desc.to_string(), - timestamp: today(), - }); - } - - pub fn categorize(&mut self, key: &str, cat_str: &str) -> Result<(), String> { - let cat = Category::from_str(cat_str) - .ok_or_else(|| format!("Unknown category '{}'. Use: core/tech/gen/obs/task", cat_str))?; - self.modify_node(key, |n| { n.category = cat; }) - } - - pub fn decay(&mut self) -> (usize, usize) { - let base = self.params.decay_factor; - let threshold = self.params.prune_threshold as f32; - let mut decayed = 0; - let mut pruned = 0; - let mut to_remove = Vec::new(); - - for (key, node) in &mut self.nodes { - let factor = node.category.decay_factor(base) as f32; - node.weight *= factor; - node.version += 1; - decayed += 1; - if node.weight < threshold { - to_remove.push(key.clone()); - pruned += 1; - } + /// Scan markdown files and index all memory units + pub fn init_from_markdown(&mut self) -> Result { + let dir = memory_dir(); + let mut count = 0; + if dir.exists() { + count = self.scan_dir_for_init(&dir)?; } - - // Don't actually remove — just mark very low weight - // Actual pruning happens during GC - for key in &to_remove { - if let Some(node) = self.nodes.get_mut(key) { - node.weight = node.weight.max(0.01); - } - } - - // Persist all decayed weights to capnp log - let updated: Vec = self.nodes.values().cloned().collect(); - let _ = self.append_nodes(&updated); - - (decayed, pruned) + Ok(count) } - /// Bulk recategorize nodes using rule-based logic. - /// Returns (changed, unchanged) counts. - pub fn fix_categories(&mut self) -> Result<(usize, usize), String> { - let core_files = ["identity.md", "kent.md"]; - let tech_files = [ - "language-theory.md", "zoom-navigation.md", - "rust-conversion.md", "poc-architecture.md", - ]; - let tech_prefixes = ["design-"]; - let obs_files = [ - "reflections.md", "reflections-zoom.md", "differentiation.md", - "cognitive-modes.md", "paper-notes.md", "inner-life.md", - "conversation.md", "interests.md", "stuck-toolkit.md", - ]; - let obs_prefixes = ["skill-", "worked-example-"]; + fn scan_dir_for_init(&mut self, dir: &Path) -> Result { + let mut count = 0; + let entries = fs::read_dir(dir) + .map_err(|e| format!("read dir {}: {}", dir.display(), e))?; - let mut changed_nodes = Vec::new(); - let mut unchanged = 0; - - let keys: Vec = self.nodes.keys().cloned().collect(); - for key in &keys { - let node = self.nodes.get(key).unwrap(); - if node.category != Category::Core { - unchanged += 1; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + count += self.scan_dir_for_init(&path)?; continue; } + let Some(ext) = path.extension() else { continue }; + if ext != "md" { continue } - let file = key.split('#').next().unwrap_or(key); + let filename = path.file_name().unwrap().to_string_lossy().to_string(); + let content = fs::read_to_string(&path) + .map_err(|e| format!("read {}: {}", path.display(), e))?; - let new_cat = if core_files.iter().any(|&f| file == f) { - None - } else if tech_files.iter().any(|&f| file == f) - || tech_prefixes.iter().any(|p| file.starts_with(p)) - { - Some(Category::Technical) - } else if obs_files.iter().any(|&f| file == f) - || obs_prefixes.iter().any(|p| file.starts_with(p)) - { - Some(Category::Observation) - } else { - Some(Category::General) - }; + let units = parse_units(&filename, &content); + let (new_count, _) = self.ingest_units(&units, &filename)?; + count += new_count; - if let Some(cat) = new_cat { - let node = self.nodes.get_mut(key).unwrap(); - node.category = cat; - node.version += 1; - changed_nodes.push(node.clone()); - } else { - unchanged += 1; - } - } + // Create relations from links + let mut new_relations = Vec::new(); + for unit in &units { + let source_uuid = match self.nodes.get(&unit.key) { + Some(n) => n.uuid, + None => continue, + }; - if !changed_nodes.is_empty() { - self.append_nodes(&changed_nodes)?; - } + for link in unit.marker_links.iter().chain(unit.md_links.iter()) { + let Some((key, uuid)) = self.resolve_node_uuid(link) else { continue }; + let exists = self.relations.iter().any(|r| + (r.source == source_uuid && r.target == uuid) || + (r.source == uuid && r.target == source_uuid)); + if !exists { + new_relations.push(new_relation( + source_uuid, uuid, RelationType::Link, 1.0, + &unit.key, &key, + )); + } + } - Ok((changed_nodes.len(), unchanged)) - } - - /// Cap node degree by soft-deleting edges from mega-hubs. - pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize), String> { - let mut node_degree: HashMap = HashMap::new(); - for rel in &self.relations { - if rel.deleted { continue; } - *node_degree.entry(rel.source_key.clone()).or_default() += 1; - *node_degree.entry(rel.target_key.clone()).or_default() += 1; - } - - let mut node_edges: HashMap> = HashMap::new(); - for (i, rel) in self.relations.iter().enumerate() { - if rel.deleted { continue; } - node_edges.entry(rel.source_key.clone()).or_default().push(i); - node_edges.entry(rel.target_key.clone()).or_default().push(i); - } - - let mut to_delete: HashSet = HashSet::new(); - let mut hubs_capped = 0; - - for (_key, edge_indices) in &node_edges { - let active: Vec = edge_indices.iter() - .filter(|&&i| !to_delete.contains(&i)) - .copied() - .collect(); - if active.len() <= max_degree { continue; } - - let mut auto_indices: Vec<(usize, f32)> = Vec::new(); - let mut link_indices: Vec<(usize, usize)> = Vec::new(); - for &i in &active { - let rel = &self.relations[i]; - if rel.rel_type == RelationType::Auto { - auto_indices.push((i, rel.strength)); - } else { - let other = if &rel.source_key == _key { - &rel.target_key - } else { - &rel.source_key - }; - let other_deg = node_degree.get(other).copied().unwrap_or(0); - link_indices.push((i, other_deg)); + for cause in &unit.causes { + let Some((key, uuid)) = self.resolve_node_uuid(cause) else { continue }; + let exists = self.relations.iter().any(|r| + r.source == uuid && r.target == source_uuid + && r.rel_type == RelationType::Causal); + if !exists { + new_relations.push(new_relation( + uuid, source_uuid, RelationType::Causal, 1.0, + &key, &unit.key, + )); + } } } - let excess = active.len() - max_degree; - - auto_indices.sort_by(|a, b| a.1.total_cmp(&b.1)); - let auto_prune = excess.min(auto_indices.len()); - for &(i, _) in auto_indices.iter().take(auto_prune) { - to_delete.insert(i); + if !new_relations.is_empty() { + self.append_relations(&new_relations)?; + self.relations.extend(new_relations); } - - let remaining_excess = excess.saturating_sub(auto_prune); - if remaining_excess > 0 { - link_indices.sort_by(|a, b| b.1.cmp(&a.1)); - let link_prune = remaining_excess.min(link_indices.len()); - for &(i, _) in link_indices.iter().take(link_prune) { - to_delete.insert(i); - } - } - - hubs_capped += 1; - } - - let mut pruned_rels = Vec::new(); - for &i in &to_delete { - self.relations[i].deleted = true; - self.relations[i].version += 1; - pruned_rels.push(self.relations[i].clone()); - } - - if !pruned_rels.is_empty() { - self.append_relations(&pruned_rels)?; - } - - self.relations.retain(|r| !r.deleted); - - Ok((hubs_capped, to_delete.len())) - } - - pub fn category_counts(&self) -> HashMap<&str, usize> { - let mut counts = HashMap::new(); - for node in self.nodes.values() { - *counts.entry(node.category.label()).or_insert(0) += 1; - } - counts - } - - /// Update graph-derived fields on all nodes - pub fn update_graph_metrics(&mut self) { - let g = self.build_graph(); - let communities = g.communities(); - - for (key, node) in &mut self.nodes { - node.community_id = communities.get(key).copied(); - node.clustering_coefficient = Some(g.clustering_coefficient(key)); - node.degree = Some(g.degree(key) as u32); } + Ok(count) } /// Process parsed memory units: diff against existing nodes, persist changes. diff --git a/src/store/ops.rs b/src/store/ops.rs new file mode 100644 index 0000000..eee5078 --- /dev/null +++ b/src/store/ops.rs @@ -0,0 +1,300 @@ +// Mutation operations on the store +// +// CRUD (upsert, delete, modify), feedback tracking (mark_used, mark_wrong), +// maintenance (decay, fix_categories, cap_degree), and graph metrics. + +use super::types::*; + +use std::collections::{HashMap, HashSet}; + +impl Store { + /// Add or update a node (appends to log + updates cache) + pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> { + if let Some(existing) = self.nodes.get(&node.key) { + node.uuid = existing.uuid; + node.version = existing.version + 1; + } + self.append_nodes(&[node.clone()])?; + self.uuid_to_key.insert(node.uuid, node.key.clone()); + self.nodes.insert(node.key.clone(), node); + Ok(()) + } + + /// Add a relation (appends to log + updates cache) + pub fn add_relation(&mut self, rel: Relation) -> Result<(), String> { + self.append_relations(std::slice::from_ref(&rel))?; + self.relations.push(rel); + Ok(()) + } + + /// Upsert a node: update if exists (and content changed), create if not. + /// Returns: "created", "updated", or "unchanged". + pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str, String> { + if let Some(existing) = self.nodes.get(key) { + if existing.content == content { + return Ok("unchanged"); + } + let mut node = existing.clone(); + node.content = content.to_string(); + node.version += 1; + self.append_nodes(std::slice::from_ref(&node))?; + self.nodes.insert(key.to_string(), node); + Ok("updated") + } else { + let node = new_node(key, content); + self.append_nodes(std::slice::from_ref(&node))?; + self.uuid_to_key.insert(node.uuid, node.key.clone()); + self.nodes.insert(key.to_string(), node); + Ok("created") + } + } + + /// Soft-delete a node (appends deleted version, removes from cache). + pub fn delete_node(&mut self, key: &str) -> Result<(), String> { + let node = self.nodes.get(key) + .ok_or_else(|| format!("No node '{}'", key))?; + let mut deleted = node.clone(); + deleted.deleted = true; + deleted.version += 1; + self.append_nodes(std::slice::from_ref(&deleted))?; + self.nodes.remove(key); + Ok(()) + } + + /// Modify a node in-place, bump version, and persist to capnp log. + fn modify_node(&mut self, key: &str, f: impl FnOnce(&mut Node)) -> Result<(), String> { + let node = self.nodes.get_mut(key) + .ok_or_else(|| format!("No node '{}'", key))?; + f(node); + node.version += 1; + let node = node.clone(); + self.append_nodes(&[node]) + } + + pub fn mark_used(&mut self, key: &str) { + let boost = self.params.use_boost as f32; + let _ = self.modify_node(key, |n| { + n.uses += 1; + n.weight = (n.weight + boost).min(1.0); + if n.spaced_repetition_interval < 30 { + n.spaced_repetition_interval = match n.spaced_repetition_interval { + 1 => 3, 3 => 7, 7 => 14, 14 => 30, _ => 30, + }; + } + n.last_replayed = now_epoch(); + }); + } + + pub fn mark_wrong(&mut self, key: &str, _ctx: Option<&str>) { + let _ = self.modify_node(key, |n| { + n.wrongs += 1; + n.weight = (n.weight - 0.1).max(0.0); + n.spaced_repetition_interval = 1; + }); + } + + pub fn record_gap(&mut self, desc: &str) { + self.gaps.push(GapRecord { + description: desc.to_string(), + timestamp: today(), + }); + } + + pub fn categorize(&mut self, key: &str, cat_str: &str) -> Result<(), String> { + let cat = Category::from_str(cat_str) + .ok_or_else(|| format!("Unknown category '{}'. Use: core/tech/gen/obs/task", cat_str))?; + self.modify_node(key, |n| { n.category = cat; }) + } + + pub fn decay(&mut self) -> (usize, usize) { + let base = self.params.decay_factor; + let threshold = self.params.prune_threshold as f32; + let mut decayed = 0; + let mut pruned = 0; + let mut to_remove = Vec::new(); + + for (key, node) in &mut self.nodes { + let factor = node.category.decay_factor(base) as f32; + node.weight *= factor; + node.version += 1; + decayed += 1; + if node.weight < threshold { + to_remove.push(key.clone()); + pruned += 1; + } + } + + // Don't actually remove — just mark very low weight + // Actual pruning happens during GC + for key in &to_remove { + if let Some(node) = self.nodes.get_mut(key) { + node.weight = node.weight.max(0.01); + } + } + + // Persist all decayed weights to capnp log + let updated: Vec = self.nodes.values().cloned().collect(); + let _ = self.append_nodes(&updated); + + (decayed, pruned) + } + + /// Bulk recategorize nodes using rule-based logic. + /// Returns (changed, unchanged) counts. + pub fn fix_categories(&mut self) -> Result<(usize, usize), String> { + let core_files = ["identity.md", "kent.md"]; + let tech_files = [ + "language-theory.md", "zoom-navigation.md", + "rust-conversion.md", "poc-architecture.md", + ]; + let tech_prefixes = ["design-"]; + let obs_files = [ + "reflections.md", "reflections-zoom.md", "differentiation.md", + "cognitive-modes.md", "paper-notes.md", "inner-life.md", + "conversation.md", "interests.md", "stuck-toolkit.md", + ]; + let obs_prefixes = ["skill-", "worked-example-"]; + + let mut changed_nodes = Vec::new(); + let mut unchanged = 0; + + let keys: Vec = self.nodes.keys().cloned().collect(); + for key in &keys { + let node = self.nodes.get(key).unwrap(); + if node.category != Category::Core { + unchanged += 1; + continue; + } + + let file = key.split('#').next().unwrap_or(key); + + let new_cat = if core_files.iter().any(|&f| file == f) { + None + } else if tech_files.iter().any(|&f| file == f) + || tech_prefixes.iter().any(|p| file.starts_with(p)) + { + Some(Category::Technical) + } else if obs_files.iter().any(|&f| file == f) + || obs_prefixes.iter().any(|p| file.starts_with(p)) + { + Some(Category::Observation) + } else { + Some(Category::General) + }; + + if let Some(cat) = new_cat { + let node = self.nodes.get_mut(key).unwrap(); + node.category = cat; + node.version += 1; + changed_nodes.push(node.clone()); + } else { + unchanged += 1; + } + } + + if !changed_nodes.is_empty() { + self.append_nodes(&changed_nodes)?; + } + + Ok((changed_nodes.len(), unchanged)) + } + + /// Cap node degree by soft-deleting edges from mega-hubs. + pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize), String> { + let mut node_degree: HashMap = HashMap::new(); + for rel in &self.relations { + if rel.deleted { continue; } + *node_degree.entry(rel.source_key.clone()).or_default() += 1; + *node_degree.entry(rel.target_key.clone()).or_default() += 1; + } + + let mut node_edges: HashMap> = HashMap::new(); + for (i, rel) in self.relations.iter().enumerate() { + if rel.deleted { continue; } + node_edges.entry(rel.source_key.clone()).or_default().push(i); + node_edges.entry(rel.target_key.clone()).or_default().push(i); + } + + let mut to_delete: HashSet = HashSet::new(); + let mut hubs_capped = 0; + + for (_key, edge_indices) in &node_edges { + let active: Vec = edge_indices.iter() + .filter(|&&i| !to_delete.contains(&i)) + .copied() + .collect(); + if active.len() <= max_degree { continue; } + + let mut auto_indices: Vec<(usize, f32)> = Vec::new(); + let mut link_indices: Vec<(usize, usize)> = Vec::new(); + for &i in &active { + let rel = &self.relations[i]; + if rel.rel_type == RelationType::Auto { + auto_indices.push((i, rel.strength)); + } else { + let other = if &rel.source_key == _key { + &rel.target_key + } else { + &rel.source_key + }; + let other_deg = node_degree.get(other).copied().unwrap_or(0); + link_indices.push((i, other_deg)); + } + } + + let excess = active.len() - max_degree; + + auto_indices.sort_by(|a, b| a.1.total_cmp(&b.1)); + let auto_prune = excess.min(auto_indices.len()); + for &(i, _) in auto_indices.iter().take(auto_prune) { + to_delete.insert(i); + } + + let remaining_excess = excess.saturating_sub(auto_prune); + if remaining_excess > 0 { + link_indices.sort_by(|a, b| b.1.cmp(&a.1)); + let link_prune = remaining_excess.min(link_indices.len()); + for &(i, _) in link_indices.iter().take(link_prune) { + to_delete.insert(i); + } + } + + hubs_capped += 1; + } + + let mut pruned_rels = Vec::new(); + for &i in &to_delete { + self.relations[i].deleted = true; + self.relations[i].version += 1; + pruned_rels.push(self.relations[i].clone()); + } + + if !pruned_rels.is_empty() { + self.append_relations(&pruned_rels)?; + } + + self.relations.retain(|r| !r.deleted); + + Ok((hubs_capped, to_delete.len())) + } + + pub fn category_counts(&self) -> HashMap<&str, usize> { + let mut counts = HashMap::new(); + for node in self.nodes.values() { + *counts.entry(node.category.label()).or_insert(0) += 1; + } + counts + } + + /// Update graph-derived fields on all nodes + pub fn update_graph_metrics(&mut self) { + let g = self.build_graph(); + let communities = g.communities(); + + for (key, node) in &mut self.nodes { + node.community_id = communities.get(key).copied(); + node.clustering_coefficient = Some(g.clustering_coefficient(key)); + node.degree = Some(g.degree(key) as u32); + } + } +} diff --git a/src/store/persist.rs b/src/store/persist.rs new file mode 100644 index 0000000..f1b4d39 --- /dev/null +++ b/src/store/persist.rs @@ -0,0 +1,318 @@ +// Persistence layer: load, save, replay, append, snapshot +// +// Three-tier loading strategy: +// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize +// 2. bincode cache (state.bin) — ~10ms +// 3. capnp log replay — ~40ms +// +// Logs are append-only; cache staleness uses log file sizes, not mtimes. + +use super::types::*; + +use crate::memory_capnp; + +use capnp::message; +use capnp::serialize; + +use std::collections::HashMap; +use std::fs; +use std::io::{BufReader, BufWriter, Write as IoWrite}; +use std::path::Path; + +impl Store { + /// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs. + /// + /// Staleness check uses log file sizes (not mtimes). Since logs are + /// append-only, any write grows the file, invalidating the cache. + /// This avoids the mtime race that caused data loss with concurrent + /// writers (dream loop, link audit, journal enrichment). + pub fn load() -> Result { + // 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy) + match Self::load_snapshot_mmap() { + Ok(Some(store)) => return Ok(store), + Ok(None) => {}, + Err(e) => eprintln!("rkyv snapshot: {}", e), + } + + // 2. Try bincode state.bin cache (~10ms) + let nodes_p = nodes_path(); + let rels_p = relations_path(); + let state_p = state_path(); + + let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); + let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); + + if let Ok(data) = fs::read(&state_p) { + if data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC { + let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap()); + let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap()); + + if cached_nodes == nodes_size && cached_rels == rels_size { + if let Ok(mut store) = bincode::deserialize::(&data[CACHE_HEADER_LEN..]) { + // Rebuild uuid_to_key (skipped by serde) + for (key, node) in &store.nodes { + store.uuid_to_key.insert(node.uuid, key.clone()); + } + // Bootstrap: write rkyv snapshot if missing + if !snapshot_path().exists() { + if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) { + eprintln!("rkyv bootstrap: {}", e); + } + } + return Ok(store); + } + } + } + } + + // Stale or no cache — rebuild from capnp logs + let mut store = Store::default(); + + if nodes_p.exists() { + store.replay_nodes(&nodes_p)?; + } + if rels_p.exists() { + store.replay_relations(&rels_p)?; + } + + // Drop edges referencing deleted/missing nodes + store.relations.retain(|r| + store.nodes.contains_key(&r.source_key) && + store.nodes.contains_key(&r.target_key) + ); + + store.save()?; + Ok(store) + } + + /// Replay node log, keeping latest version per UUID + fn replay_nodes(&mut self, path: &Path) -> Result<(), String> { + let file = fs::File::open(path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut reader = BufReader::new(file); + + while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { + let log = msg.get_root::() + .map_err(|e| format!("read node log: {}", e))?; + for node_reader in log.get_nodes() + .map_err(|e| format!("get nodes: {}", e))? { + let node = Node::from_capnp(node_reader)?; + let existing_version = self.nodes.get(&node.key) + .map(|n| n.version) + .unwrap_or(0); + if node.version >= existing_version { + if node.deleted { + self.nodes.remove(&node.key); + self.uuid_to_key.remove(&node.uuid); + } else { + self.uuid_to_key.insert(node.uuid, node.key.clone()); + self.nodes.insert(node.key.clone(), node); + } + } + } + } + Ok(()) + } + + /// Replay relation log, keeping latest version per UUID + fn replay_relations(&mut self, path: &Path) -> Result<(), String> { + let file = fs::File::open(path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut reader = BufReader::new(file); + + // Collect all, then deduplicate by UUID keeping latest version + let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new(); + + while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { + let log = msg.get_root::() + .map_err(|e| format!("read relation log: {}", e))?; + for rel_reader in log.get_relations() + .map_err(|e| format!("get relations: {}", e))? { + let rel = Relation::from_capnp(rel_reader)?; + let existing_version = by_uuid.get(&rel.uuid) + .map(|r| r.version) + .unwrap_or(0); + if rel.version >= existing_version { + by_uuid.insert(rel.uuid, rel); + } + } + } + + self.relations = by_uuid.into_values() + .filter(|r| !r.deleted) + .collect(); + Ok(()) + } + + /// Append nodes to the log file + pub fn append_nodes(&self, nodes: &[Node]) -> Result<(), String> { + let _lock = StoreLock::acquire()?; + + let path = nodes_path(); + let file = fs::OpenOptions::new() + .create(true).append(true).open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut writer = BufWriter::new(file); + + let mut msg = message::Builder::new_default(); + { + let log = msg.init_root::(); + let mut list = log.init_nodes(nodes.len() as u32); + for (i, node) in nodes.iter().enumerate() { + node.to_capnp(list.reborrow().get(i as u32)); + } + } + serialize::write_message(&mut writer, &msg) + .map_err(|e| format!("write nodes: {}", e))?; + writer.flush().map_err(|e| format!("flush: {}", e))?; + Ok(()) + } + + /// Append relations to the log file + pub fn append_relations(&self, relations: &[Relation]) -> Result<(), String> { + let _lock = StoreLock::acquire()?; + + let path = relations_path(); + let file = fs::OpenOptions::new() + .create(true).append(true).open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut writer = BufWriter::new(file); + + let mut msg = message::Builder::new_default(); + { + let log = msg.init_root::(); + let mut list = log.init_relations(relations.len() as u32); + for (i, rel) in relations.iter().enumerate() { + rel.to_capnp(list.reborrow().get(i as u32)); + } + } + serialize::write_message(&mut writer, &msg) + .map_err(|e| format!("write relations: {}", e))?; + writer.flush().map_err(|e| format!("flush: {}", e))?; + Ok(()) + } + + /// Save the derived cache with log size header for staleness detection. + /// Uses atomic write (tmp + rename) to prevent partial reads. + pub fn save(&self) -> Result<(), String> { + let _lock = StoreLock::acquire()?; + + let path = state_path(); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).ok(); + } + + let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); + let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); + + let bincode_data = bincode::serialize(self) + .map_err(|e| format!("bincode serialize: {}", e))?; + + let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len()); + data.extend_from_slice(&CACHE_MAGIC); + data.extend_from_slice(&nodes_size.to_le_bytes()); + data.extend_from_slice(&rels_size.to_le_bytes()); + data.extend_from_slice(&bincode_data); + + // Atomic write: tmp file + rename + let tmp_path = path.with_extension("bin.tmp"); + fs::write(&tmp_path, &data) + .map_err(|e| format!("write {}: {}", tmp_path.display(), e))?; + fs::rename(&tmp_path, &path) + .map_err(|e| format!("rename {} → {}: {}", tmp_path.display(), path.display(), e))?; + + // Also write rkyv snapshot (mmap-friendly) + if let Err(e) = self.save_snapshot(nodes_size, rels_size) { + eprintln!("rkyv snapshot save: {}", e); + } + + Ok(()) + } + + /// Serialize store as rkyv snapshot with staleness header. + /// Assumes StoreLock is already held by caller. + fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> { + let snap = Snapshot { + nodes: self.nodes.clone(), + relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(), + gaps: self.gaps.clone(), + params: self.params, + }; + + let rkyv_data = rkyv::to_bytes::<_, 256>(&snap) + .map_err(|e| format!("rkyv serialize: {}", e))?; + + let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len()); + data.extend_from_slice(&RKYV_MAGIC); + data.extend_from_slice(&1u32.to_le_bytes()); // format version + data.extend_from_slice(&nodes_size.to_le_bytes()); + data.extend_from_slice(&rels_size.to_le_bytes()); + data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes()); + data.extend_from_slice(&rkyv_data); + + let path = snapshot_path(); + let tmp_path = path.with_extension("rkyv.tmp"); + fs::write(&tmp_path, &data) + .map_err(|e| format!("write {}: {}", tmp_path.display(), e))?; + fs::rename(&tmp_path, &path) + .map_err(|e| format!("rename: {}", e))?; + + Ok(()) + } + + /// Try loading store from mmap'd rkyv snapshot. + /// Returns None if snapshot is missing or stale (log sizes don't match). + fn load_snapshot_mmap() -> Result, String> { + let path = snapshot_path(); + if !path.exists() { return Ok(None); } + + let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); + let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); + + let file = fs::File::open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + + let mmap = unsafe { memmap2::Mmap::map(&file) } + .map_err(|e| format!("mmap {}: {}", path.display(), e))?; + + if mmap.len() < RKYV_HEADER_LEN { return Ok(None); } + if mmap[..4] != RKYV_MAGIC { return Ok(None); } + + // [4..8] = version, skip for now + let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap()); + let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap()); + let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize; + + if cached_nodes != nodes_size || cached_rels != rels_size { + return Ok(None); // stale + } + if mmap.len() < RKYV_HEADER_LEN + data_len { + return Ok(None); // truncated + } + + let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len]; + + // SAFETY: we wrote this file ourselves via save_snapshot(). + // Skip full validation (check_archived_root) — the staleness header + // already confirms this snapshot matches the current log state. + let archived = unsafe { rkyv::archived_root::(rkyv_data) }; + + let snap: Snapshot = > + ::deserialize(archived, &mut rkyv::Infallible).unwrap(); + + let mut store = Store { + nodes: snap.nodes, + relations: snap.relations, + gaps: snap.gaps, + params: snap.params, + ..Default::default() + }; + + // Rebuild uuid_to_key (not serialized) + for (key, node) in &store.nodes { + store.uuid_to_key.insert(node.uuid, key.clone()); + } + + Ok(Some(store)) + } +}