// Persistence layer: load, replay, append // // capnp logs are the source of truth; redb provides indexed access. use super::types::*; use crate::memory_capnp; use anyhow::{Context, Result}; use capnp::message; use capnp::serialize; use std::collections::HashMap; use std::fs; use std::io::{BufReader, Seek}; use std::path::Path; impl Store { /// Load store by replaying capnp logs. pub fn load() -> Result { let nodes_p = nodes_path(); let rels_p = relations_path(); let mut store = Store::default(); if nodes_p.exists() { store.replay_nodes(&nodes_p)?; } if rels_p.exists() { store.replay_relations(&rels_p)?; } let visits_p = visits_path(); if visits_p.exists() { store.replay_visits(&visits_p)?; } let tp_p = transcript_progress_path(); if tp_p.exists() { store.replay_transcript_progress(&tp_p)?; } // Record log sizes after replay store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); // Drop edges referencing deleted/missing nodes store.relations.retain(|r| store.nodes.contains_key(&r.source_key) && store.nodes.contains_key(&r.target_key) ); Ok(store) } /// Replay node log, keeping latest version per UUID. /// Tracks all UUIDs seen per key to detect duplicates. fn replay_nodes(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Track all non-deleted UUIDs per key to detect duplicates let mut key_uuids: HashMap> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read node log"))?; for node_reader in log.get_nodes() .with_context(|| format!("get nodes"))? { let node = Node::from_capnp_migrate(node_reader)?; let existing_version = self.nodes.get(&node.key) .map(|n| n.version) .unwrap_or(0); if node.version >= existing_version { if node.deleted { self.nodes.remove(&node.key); self.uuid_to_key.remove(&node.uuid); if let Some(uuids) = key_uuids.get_mut(&node.key) { uuids.retain(|u| *u != node.uuid); } } else { self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node.clone()); let uuids = key_uuids.entry(node.key).or_default(); if !uuids.contains(&node.uuid) { uuids.push(node.uuid); } } } } } // Report duplicate keys for (key, uuids) in &key_uuids { if uuids.len() > 1 { dbglog!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len()); } } Ok(()) } /// Replay relation log, keeping latest version per UUID fn replay_relations(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Collect all, then deduplicate by UUID keeping latest version let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read relation log"))?; for rel_reader in log.get_relations() .with_context(|| format!("get relations"))? { let rel = Relation::from_capnp_migrate(rel_reader)?; let existing_version = by_uuid.get(&rel.uuid) .map(|r| r.version) .unwrap_or(0); if rel.version >= existing_version { by_uuid.insert(rel.uuid, rel); } } } self.relations = by_uuid.into_values() .filter(|r| !r.deleted) .collect(); Ok(()) } /// Find all duplicate keys: keys with multiple live UUIDs in the log. /// Returns a map from key → vec of all live Node versions (one per UUID). /// The "winner" in self.nodes is always one of them. pub fn find_duplicates(&self) -> Result>> { let path = nodes_path(); if !path.exists() { return Ok(HashMap::new()); } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); // Track latest version of each UUID let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new(); while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read node log"))?; for node_reader in log.get_nodes() .with_context(|| format!("get nodes"))? { let node = Node::from_capnp_migrate(node_reader)?; let dominated = by_uuid.get(&node.uuid) .map(|n| node.version >= n.version) .unwrap_or(true); if dominated { by_uuid.insert(node.uuid, node); } } } // Group live (non-deleted) nodes by key let mut by_key: HashMap> = HashMap::new(); for node in by_uuid.into_values() { if !node.deleted { by_key.entry(node.key.clone()).or_default().push(node); } } // Keep only duplicates by_key.retain(|_, nodes| nodes.len() > 1); Ok(by_key) } /// Append nodes to the log file. /// Serializes to a Vec first, then does a single write() syscall /// so the append is atomic with O_APPEND even without flock. pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<()> { let _lock = StoreLock::acquire()?; self.append_nodes_unlocked(nodes) } /// Append nodes without acquiring the lock. Caller must hold StoreLock. pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<()> { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_nodes(nodes.len() as u32); for (i, node) in nodes.iter().enumerate() { node.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize nodes"))?; let path = nodes_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write nodes"))?; self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } /// Replay only new entries appended to the node log since we last loaded. /// Call under StoreLock to catch writes from concurrent processes. pub(crate) fn refresh_nodes(&mut self) -> Result<()> { let path = nodes_path(); let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0); if current_size <= self.loaded_nodes_size { return Ok(()); // no new data } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size)) .with_context(|| format!("seek nodes log"))?; while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) { let log = msg.get_root::() .with_context(|| format!("read node log delta"))?; for node_reader in log.get_nodes() .with_context(|| format!("get nodes delta"))? { let node = Node::from_capnp_migrate(node_reader)?; let dominated = self.nodes.get(&node.key) .map(|n| node.version >= n.version) .unwrap_or(true); if dominated { if node.deleted { self.nodes.remove(&node.key); self.uuid_to_key.remove(&node.uuid); } else { self.uuid_to_key.insert(node.uuid, node.key.clone()); self.nodes.insert(node.key.clone(), node); } } } } self.loaded_nodes_size = current_size; Ok(()) } /// Append relations to the log file. /// Single write() syscall for atomic O_APPEND. pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> { let _lock = StoreLock::acquire()?; self.append_relations_unlocked(relations) } /// Append relations without acquiring the lock. Caller must hold StoreLock. pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<()> { let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_relations(relations.len() as u32); for (i, rel) in relations.iter().enumerate() { rel.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize relations"))?; let path = relations_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write relations"))?; self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0); Ok(()) } /// Append agent visit records to the visits log. pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<()> { if visits.is_empty() { return Ok(()); } let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_visits(visits.len() as u32); for (i, visit) in visits.iter().enumerate() { visit.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize visits"))?; let path = visits_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write visits"))?; // Update in-memory index for v in visits { self.visits .entry(v.node_key.clone()) .or_default() .insert(v.agent.clone(), v.timestamp); } Ok(()) } /// Replay visits log to rebuild in-memory index. fn replay_visits(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); while reader.stream_position()? < fs::metadata(path)?.len() { let msg = match serialize::read_message(&mut reader, Default::default()) { Ok(m) => m, Err(_) => break, }; let log = msg.get_root::() .with_context(|| format!("read visit log"))?; for visit in log.get_visits()? { let key = visit.get_node_key().ok() .and_then(|t| t.to_str().ok()) .unwrap_or("") .to_string(); let agent = visit.get_agent().ok() .and_then(|t| t.to_str().ok()) .unwrap_or("") .to_string(); let ts = visit.get_timestamp(); if !key.is_empty() && !agent.is_empty() { let entry = self.visits.entry(key).or_default(); // Keep latest timestamp per agent let existing = entry.entry(agent).or_insert(0); if ts > *existing { *existing = ts; } } } } Ok(()) } /// Append transcript segment progress records. pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<()> { if segments.is_empty() { return Ok(()); } let mut msg = message::Builder::new_default(); { let log = msg.init_root::(); let mut list = log.init_segments(segments.len() as u32); for (i, seg) in segments.iter().enumerate() { seg.to_capnp(list.reborrow().get(i as u32)); } } let mut buf = Vec::new(); serialize::write_message(&mut buf, &msg) .with_context(|| format!("serialize transcript progress"))?; let path = transcript_progress_path(); let file = fs::OpenOptions::new() .create(true).append(true).open(&path) .with_context(|| format!("open {}", path.display()))?; use std::io::Write; (&file).write_all(&buf) .with_context(|| format!("write transcript progress"))?; // Update in-memory index for seg in segments { self.transcript_progress .entry((seg.transcript_id.clone(), seg.segment_index)) .or_default() .insert(seg.agent.clone()); } Ok(()) } /// Replay transcript progress log to rebuild in-memory index. fn replay_transcript_progress(&mut self, path: &Path) -> Result<()> { let file = fs::File::open(path) .with_context(|| format!("open {}", path.display()))?; let mut reader = BufReader::new(file); while reader.stream_position()? < fs::metadata(path)?.len() { let msg = match serialize::read_message(&mut reader, Default::default()) { Ok(m) => m, Err(_) => break, }; let log = msg.get_root::() .with_context(|| format!("read transcript progress"))?; for seg in log.get_segments()? { let id = seg.get_transcript_id().ok() .and_then(|t| t.to_str().ok()) .unwrap_or("") .to_string(); let agent = seg.get_agent().ok() .and_then(|t| t.to_str().ok()) .unwrap_or("") .to_string(); let idx = seg.get_segment_index(); if !id.is_empty() && !agent.is_empty() { self.transcript_progress .entry((id, idx)) .or_default() .insert(agent); } } } Ok(()) } /// Migrate old stub-node transcript markers into the new progress log. /// Reads _observed-transcripts-f-*, _mined-transcripts#f-*, and _facts-* keys, /// extracts transcript_id and segment_index, writes to transcript-progress.capnp, /// then deletes the stub nodes. pub fn migrate_transcript_progress(&mut self) -> Result { let mut segments = Vec::new(); for key in self.nodes.keys() { // _observed-transcripts-f-{UUID}.{segment} if let Some(rest) = key.strip_prefix("_observed-transcripts-f-") { if let Some((uuid, seg_str)) = rest.rsplit_once('.') && let Ok(seg) = seg_str.parse::() { segments.push(new_transcript_segment(uuid, seg, "observation")); } } // _mined-transcripts#f-{UUID}.{segment} else if let Some(rest) = key.strip_prefix("_mined-transcripts#f-") { if let Some((uuid, seg_str)) = rest.rsplit_once('.') && let Ok(seg) = seg_str.parse::() { segments.push(new_transcript_segment(uuid, seg, "experience")); } } // _mined-transcripts-f-{UUID}.{segment} else if let Some(rest) = key.strip_prefix("_mined-transcripts-f-") { if let Some((uuid, seg_str)) = rest.rsplit_once('.') && let Ok(seg) = seg_str.parse::() { segments.push(new_transcript_segment(uuid, seg, "experience")); } } // _facts-{UUID} (whole-file, segment 0) else if let Some(uuid) = key.strip_prefix("_facts-") { if !uuid.contains('-') || uuid.len() < 30 { continue; } // skip non-UUID segments.push(new_transcript_segment(uuid, 0, "fact")); } } let count = segments.len(); if count > 0 { self.append_transcript_progress(&segments)?; } // Soft-delete the old stub nodes let keys_to_delete: Vec = self.nodes.keys() .filter(|k| k.starts_with("_observed-transcripts-") || k.starts_with("_mined-transcripts") || (k.starts_with("_facts-") && !k.contains("fact_mine"))) .cloned() .collect(); for key in &keys_to_delete { if let Some(node) = self.nodes.get_mut(key) { node.deleted = true; } } if !keys_to_delete.is_empty() { self.save()?; } Ok(count) } /// Record visits for a batch of node keys from a successful agent run. pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<()> { let visits: Vec = node_keys.iter() .filter_map(|key| { let node = self.nodes.get(key)?; Some(new_visit(node.uuid, key, agent, "processed")) }) .collect(); self.append_visits(&visits) } /// Get the last time an agent visited a node. Returns 0 if never visited. pub fn last_visited(&self, node_key: &str, agent: &str) -> i64 { self.visits.get(node_key) .and_then(|agents| agents.get(agent)) .copied() .unwrap_or(0) } /// Placeholder - indices will be updated on write with redb. pub fn save(&self) -> Result<()> { Ok(()) } } /// Check and repair corrupt capnp log files. /// /// Reads each message sequentially, tracking file position. On the first /// corrupt message, truncates the file to the last good position. Also /// removes stale caches so the next load replays from the repaired log. pub fn fsck() -> Result<()> { let mut any_corrupt = false; for (path, kind) in [ (nodes_path(), "node"), (relations_path(), "relation"), ] { if !path.exists() { continue; } let file = fs::File::open(&path) .with_context(|| format!("open {}", path.display()))?; let file_len = file.metadata() .with_context(|| format!("stat {}", path.display()))?.len(); let mut reader = BufReader::new(file); let mut good_messages = 0u64; let mut last_good_pos = 0u64; loop { let pos = reader.stream_position() .with_context(|| format!("tell {}", path.display()))?; let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { Ok(m) => m, Err(_) => { // read_message fails at EOF (normal) or on corrupt framing if pos < file_len { // Not at EOF — corrupt framing eprintln!("{}: corrupt message at offset {}, truncating", kind, pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .with_context(|| format!("open for truncate"))?; file.set_len(pos) .with_context(|| format!("truncate {}", path.display()))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, pos, good_messages); } break; } }; // Validate the message content too let valid = if kind == "node" { msg.get_root::() .and_then(|l| l.get_nodes().map(|_| ())) .is_ok() } else { msg.get_root::() .and_then(|l| l.get_relations().map(|_| ())) .is_ok() }; if valid { good_messages += 1; last_good_pos = reader.stream_position() .with_context(|| format!("tell {}", path.display()))?; } else { eprintln!("{}: corrupt message content at offset {}, truncating to {}", kind, pos, last_good_pos); any_corrupt = true; drop(reader); let file = fs::OpenOptions::new().write(true).open(&path) .with_context(|| format!("open for truncate"))?; file.set_len(last_good_pos) .with_context(|| format!("truncate {}", path.display()))?; eprintln!("{}: truncated from {} to {} bytes ({} good messages)", kind, file_len, last_good_pos, good_messages); break; } } if !any_corrupt { eprintln!("{}: {} messages, all clean", kind, good_messages); } } if any_corrupt { eprintln!("repair complete — run `poc-memory status` to verify"); } else { eprintln!("store is clean"); } Ok(()) }