// Cap'n Proto serialization and persistence // // capnp logs are the source of truth; redb provides indexed access. // This module contains: // - Serialization macros (capnp_enum!, capnp_message!) // - Load/replay from capnp logs // - Append to capnp logs // - fsck (corruption repair) use super::{index, types::*}; use redb::ReadableTableMetadata; use crate::memory_capnp; use super::Store; use anyhow::{anyhow, Context, Result}; use capnp::message; use capnp::serialize; use std::collections::HashMap; use std::fs; use std::io::{BufReader, Seek}; use std::path::Path; // --------------------------------------------------------------------------- // Capnp serialization macros // // Declarative mapping between Rust types and capnp generated types. // Adding a field to the schema means adding it in one place below; // both read and write are generated from the same declaration. // --------------------------------------------------------------------------- /// Generate to_capnp/from_capnp conversion methods for an enum. macro_rules! capnp_enum { ($rust_type:ident, $capnp_type:path, [$($variant:ident),+ $(,)?]) => { impl $rust_type { #[allow(clippy::wrong_self_convention, dead_code)] pub(crate) fn to_capnp(&self) -> $capnp_type { match self { $(Self::$variant => <$capnp_type>::$variant,)+ } } pub(crate) fn from_capnp(v: $capnp_type) -> Self { match v { $(<$capnp_type>::$variant => Self::$variant,)+ } } } }; } /// Generate from_capnp/to_capnp methods for a struct with capnp serialization. /// Fields are grouped by serialization kind: /// text - capnp Text fields (String in Rust) /// uuid - capnp Data fields ([u8; 16] in Rust) /// prim - copy types (u32, f32, f64, bool) /// enm - enums with to_capnp/from_capnp methods /// skip - Rust-only fields not in capnp (set to Default on read) macro_rules! capnp_message { ( $struct:ident, reader: $reader:ty, builder: $builder:ty, text: [$($tf:ident),* $(,)?], uuid: [$($uf:ident),* $(,)?], prim: [$($pf:ident),* $(,)?], enm: [$($ef:ident: $et:ident),* $(,)?], skip: [$($sf:ident),* $(,)?] $(,)? ) => { impl $struct { pub fn from_capnp(r: $reader) -> Result { paste::paste! { Ok(Self { $($tf: read_text(r.[]()),)* $($uf: read_uuid(r.[]()),)* $($pf: r.[](),)* $($ef: $et::from_capnp( r.[]().map_err(|_| anyhow!(concat!("bad ", stringify!($ef))))? ),)* $($sf: Default::default(),)* }) } } pub fn to_capnp(&self, mut b: $builder) { paste::paste! { $(b.[](&self.$tf);)* $(b.[](&self.$uf);)* $(b.[](self.$pf);)* $(b.[](self.$ef.to_capnp());)* } } } }; } // --------------------------------------------------------------------------- // Capnp helpers // --------------------------------------------------------------------------- /// Read a capnp text field, returning empty string on any error fn read_text(result: capnp::Result) -> String { result.ok() .and_then(|t| t.to_str().ok()) .unwrap_or("") .to_string() } /// Read a capnp data field as [u8; 16], zero-padded fn read_uuid(result: capnp::Result<&[u8]>) -> [u8; 16] { let mut out = [0u8; 16]; if let Ok(data) = result && data.len() >= 16 { out.copy_from_slice(&data[..16]); } out } // --------------------------------------------------------------------------- // Type-to-capnp mappings // --------------------------------------------------------------------------- capnp_enum!(NodeType, memory_capnp::NodeType, [EpisodicSession, EpisodicDaily, EpisodicWeekly, Semantic, EpisodicMonthly]); capnp_enum!(RelationType, memory_capnp::RelationType, [Link, Causal, Auto]); capnp_message!(Node, reader: memory_capnp::content_node::Reader<'_>, builder: memory_capnp::content_node::Builder<'_>, text: [key, content, source_ref, provenance], uuid: [uuid], prim: [version, timestamp, weight, emotion, deleted, retrievals, uses, wrongs, last_replayed, spaced_repetition_interval, created_at, last_scored], enm: [node_type: NodeType], skip: [community_id, clustering_coefficient, degree], ); capnp_message!(Relation, reader: memory_capnp::relation::Reader<'_>, builder: memory_capnp::relation::Builder<'_>, text: [source_key, target_key, provenance], uuid: [uuid, source, target], prim: [version, timestamp, strength, deleted], enm: [rel_type: RelationType], skip: [], ); // --------------------------------------------------------------------------- // Migration helpers (legacy provenance enum → string) // --------------------------------------------------------------------------- /// Convert legacy capnp provenance enum to string label. fn legacy_provenance_label(p: memory_capnp::Provenance) -> &'static str { use memory_capnp::Provenance::*; match p { Manual => "manual", Journal => "journal", Agent => "agent", Dream => "dream", Derived => "derived", AgentExperienceMine => "agent:experience-mine", AgentKnowledgeObservation => "agent:knowledge-observation", AgentKnowledgePattern => "agent:knowledge-pattern", AgentKnowledgeConnector => "agent:knowledge-connector", AgentKnowledgeChallenger => "agent:knowledge-challenger", AgentConsolidate => "agent:consolidate", AgentDigest => "agent:digest", AgentFactMine => "agent:fact-mine", AgentDecay => "agent:decay", } } impl Node { /// Read from capnp with migration: if the new provenance text field /// is empty (old record), fall back to the deprecated provenanceOld enum. pub fn from_capnp_migrate(r: memory_capnp::content_node::Reader<'_>) -> Result { let mut node = Self::from_capnp(r)?; if node.provenance.is_empty() && let Ok(old) = r.get_provenance_old() { node.provenance = legacy_provenance_label(old).to_string(); } // Sanitize timestamps: old capnp records have raw offsets instead // of unix epoch. Anything past year 2100 (~4102444800) is bogus. const MAX_SANE_EPOCH: i64 = 4_102_444_800; if node.timestamp > MAX_SANE_EPOCH || node.timestamp < 0 { node.timestamp = node.created_at; } if node.created_at > MAX_SANE_EPOCH || node.created_at < 0 { node.created_at = node.timestamp.min(MAX_SANE_EPOCH); } Ok(node) } } impl Relation { pub fn from_capnp_migrate(r: memory_capnp::relation::Reader<'_>) -> Result { let mut rel = Self::from_capnp(r)?; if rel.provenance.is_empty() && let Ok(old) = r.get_provenance_old() { rel.provenance = legacy_provenance_label(old).to_string(); } Ok(rel) } } // --------------------------------------------------------------------------- // Direct node access // --------------------------------------------------------------------------- /// Read a single node at the given offset in the capnp log. /// The offset must point to a valid message containing the node. pub fn read_node_at_offset(offset: u64) -> Result { let path = nodes_path(); let mut file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; use std::io::{Seek, SeekFrom}; file.seek(SeekFrom::Start(offset))?; let mut reader = BufReader::new(file); let msg = serialize::read_message(&mut reader, message::ReaderOptions::new()) .with_context(|| format!("read message at offset {}", offset))?; let log = msg.get_root::() .with_context(|| "read node log")?; let nodes = log.get_nodes() .with_context(|| "get nodes")?; // A message at this offset should have exactly one node (from upsert), // or we take the last one if there are multiple (from batch operations like rename) if nodes.is_empty() { anyhow::bail!("no nodes in message at offset {}", offset); } // Return the first non-deleted node, or the first one if all are deleted for node_reader in nodes.iter() { let node = Node::from_capnp_migrate(node_reader)?; if !node.deleted { return Ok(node); } } // All nodes in this message are deleted - shouldn't happen if index is correct Node::from_capnp_migrate(nodes.get(0)) } // --------------------------------------------------------------------------- // Store persistence methods // --------------------------------------------------------------------------- impl Store { /// Load store by opening redb index and replaying relations. pub fn load() -> Result { let nodes_p = nodes_path(); let rels_p = relations_path(); let mut store = Store::default(); // Open redb index first (rebuilds from capnp if needed) let db_p = db_path(); store.db = Some(store.open_or_rebuild_db(&db_p)?); // Replay relations if rels_p.exists() { store.replay_relations(&rels_p)?; } // Record log sizes store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); // Drop edges referencing deleted/missing nodes let db = store.db.as_ref().unwrap(); store.relations.retain(|r| index::contains_key(db, &r.source_key).unwrap_or(false) && index::contains_key(db, &r.target_key).unwrap_or(false) ); Ok(store) } /// Open redb database, rebuilding if unhealthy. fn open_or_rebuild_db(&self, path: &Path) -> Result { // Try opening existing database if path.exists() { match index::open_db(path) { Ok(database) => { if self.db_is_healthy(&database)? { return Ok(database); } eprintln!("redb index stale, rebuilding..."); } Err(e) => { eprintln!("redb open failed ({}), rebuilding...", e); } } } // Rebuild index from capnp log rebuild_index(path, &nodes_path()) } /// Check if redb index is healthy by verifying some offsets are valid. fn db_is_healthy(&self, database: &redb::Database) -> Result { use redb::{ReadableDatabase, ReadableTable}; let txn = database.begin_read()?; let nodes_table = txn.open_table(index::NODES)?; // Check that we can read the table and it has entries if nodes_table.len()? == 0 { // Empty database - might be stale or new let capnp_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); return Ok(capnp_size == 0); // healthy only if capnp is also empty } // Spot check: verify a few offsets point to valid messages let mut checked = 0; for entry in nodes_table.iter()? { if checked >= 5 { break; } let (key, offset) = entry?; let offset = offset.value(); // Try to read the node at this offset if read_node_at_offset(offset).is_err() { return Ok(false); } checked += 1; let _ = key; // silence unused warning } Ok(true) } /// Replay relation log, keeping latest version per UUID fn replay_relations(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Collect all, then deduplicate by UUID keeping latest version let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read relation log"))?; for rel_reader in log.get_relations() .with_context(|| format!("get relations"))? { let rel = Relation::from_capnp_migrate(rel_reader)?; let existing_version = by_uuid.get(&rel.uuid) .map(|r| r.version) .unwrap_or(0); if rel.version >= existing_version { by_uuid.insert(rel.uuid, rel); } } } self.relations = by_uuid.into_values() .filter(|r| !r.deleted) .collect(); Ok(()) } /// Find all duplicate keys: keys with multiple live UUIDs in the log. /// Returns a map from key → vec of all live Node versions (one per UUID). /// The "winner" in self.nodes is always one of them. pub fn find_duplicates(&self) -> Result>> { let path = nodes_path(); if !path.exists() { return Ok(HashMap::new()); } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Track latest version of each UUID let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read node log"))?; for node_reader in log.get_nodes() .with_context(|| format!("get nodes"))? { let node = Node::from_capnp_migrate(node_reader)?; let dominated = by_uuid.get(&node.uuid) .map(|n| node.version >= n.version) .unwrap_or(true); if dominated { by_uuid.insert(node.uuid, node); } } } // Group live (non-deleted) nodes by key let mut by_key: HashMap> = HashMap::new(); for node in by_uuid.into_values() { if !node.deleted { by_key.entry(node.key.clone()).or_default().push(node); } } // Keep only duplicates by_key.retain(|_, nodes| nodes.len() > 1); Ok(by_key) } /// Append nodes to the log file. Returns the offset where the message was written. pub fn append_nodes(&mut self, nodes: &[Node]) -> Result { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_nodes(nodes.len() as u32); for (i, node) in nodes.iter().enumerate() { node.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize nodes"))?; let path = nodes_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; // Get offset before writing let offset = file.metadata().map(|m| m.len()).unwrap_or(0); use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write nodes"))?; self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(offset) } /// Append relations to the log file. pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_relations(relations.len() as u32); for (i, rel) in relations.iter().enumerate() { rel.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize relations"))?; let path = relations_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write relations"))?; self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } /// Placeholder - indices will be updated on write with redb. pub fn save(&self) -> Result<()> { Ok(()) } } /// Check and repair corrupt capnp log files. /// /// Reads each message sequentially, tracking file position. On the first /// corrupt message, truncates the file to the last good position. Also /// removes stale caches so the next load replays from the repaired log. pub fn fsck() -> Result<()> { let mut any_corrupt = false; for (path, kind) in [ (nodes_path(), "node"), (relations_path(), "relation"), ] { if !path.exists() { continue; } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let file_len = file.metadata() .with_context(|| format!("stat {}", path.display()))?.len(); let mut reader = BufReader::new(file); let mut good_messages = 0u64; let mut last_good_pos = 0u64; loop { let pos = reader.stream_position() .with_context(|| format!("tell {}", path.display()))?; let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { Ok(m) => m, Err(_) => { // read_message fails at EOF (normal) or on corrupt framing if pos < file_len { // Not at EOF — corrupt framing eprintln!("{}: corrupt message at offset {}, truncating", kind, pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .with_context(|| format!("open for truncate"))?; file.set_len(pos) .with_context(|| format!("truncate {}", path.display()))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, pos, good_messages); } break; } }; // Validate the message content too let valid = if kind == "node" { msg.get_root::() .and_then(|l| l.get_nodes().map(|_| ())) .is_ok() } else { msg.get_root::() .and_then(|l| l.get_relations().map(|_| ())) .is_ok() }; if valid { good_messages += 1; last_good_pos = reader.stream_position() .with_context(|| format!("tell {}", path.display()))?; } else { eprintln!("{}: corrupt message content at offset {}, truncating to {}", kind, pos, last_good_pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .with_context(|| format!("open for truncate"))?; file.set_len(last_good_pos) .with_context(|| format!("truncate {}", path.display()))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, last_good_pos, good_messages); break; } } if !any_corrupt { eprintln!("{}: {} messages, all clean", kind, good_messages); } } if any_corrupt { eprintln!("repair complete — run `poc-memory status` to verify"); } else { eprintln!("store is clean"); } Ok(()) } /// Rebuild redb index from capnp log. /// Scans the log, tracking offsets, and records latest version of each node. fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result { // Remove old database if it exists if db_path.exists() { fs::remove_file(db_path) .with_context(|| format!("remove old db {}", db_path.display()))?; } let database = index::open_db(db_path)?; if !capnp_path.exists() { return Ok(database); } // Track latest (offset, uuid, version, deleted) per key let mut latest: HashMap = HashMap::new(); let file = fs::File::open(capnp_path) .with_context(|| format!("open {}", capnp_path.display()))?; let mut reader = BufReader::new(file); loop { let offset = reader.stream_position()?; let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { Ok(m) => m, Err(_) => break, }; let log = match msg.get_root::() { Ok(l) => l, Err(_) => continue, }; let nodes = match log.get_nodes() { Ok(n) => n, Err(_) => continue, }; for node_reader in nodes { let key = node_reader.get_key().ok() .and_then(|t| t.to_str().ok()) .unwrap_or("") .to_string(); if key.is_empty() { continue; } let version = node_reader.get_version(); let deleted = node_reader.get_deleted(); let mut uuid = [0u8; 16]; if let Ok(data) = node_reader.get_uuid() { if data.len() >= 16 { uuid.copy_from_slice(&data[..16]); } } // Keep if newer version let dominated = latest.get(&key) .map(|(_, _, v, _)| version >= *v) .unwrap_or(true); if dominated { latest.insert(key, (offset, uuid, version, deleted)); } } } // Write index entries for non-deleted nodes { let txn = database.begin_write()?; { let mut nodes_table = txn.open_table(index::NODES)?; let mut uuid_table = txn.open_table(index::UUID_TO_KEY)?; for (key, (offset, uuid, _, deleted)) in latest { if !deleted { nodes_table.insert(key.as_str(), offset)?; uuid_table.insert(uuid.as_slice(), key.as_str())?; } } } txn.commit()?; } Ok(database) }