store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
// Persistence layer: load, save, replay, append, snapshot
|
|
|
|
|
//
|
|
|
|
|
// Three-tier loading strategy:
|
|
|
|
|
// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize
|
|
|
|
|
// 2. bincode cache (state.bin) — ~10ms
|
|
|
|
|
// 3. capnp log replay — ~40ms
|
|
|
|
|
//
|
|
|
|
|
// Logs are append-only; cache staleness uses log file sizes, not mtimes.
|
|
|
|
|
|
|
|
|
|
use super::types::*;
|
|
|
|
|
|
|
|
|
|
use crate::memory_capnp;
|
|
|
|
|
|
|
|
|
|
use capnp::message;
|
|
|
|
|
use capnp::serialize;
|
|
|
|
|
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::fs;
|
2026-03-08 20:51:56 -04:00
|
|
|
use std::io::{BufReader, BufWriter, Seek};
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
use std::path::Path;
|
|
|
|
|
|
|
|
|
|
impl Store {
|
|
|
|
|
/// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs.
|
|
|
|
|
///
|
|
|
|
|
/// Staleness check uses log file sizes (not mtimes). Since logs are
|
|
|
|
|
/// append-only, any write grows the file, invalidating the cache.
|
|
|
|
|
/// This avoids the mtime race that caused data loss with concurrent
|
|
|
|
|
/// writers (dream loop, link audit, journal enrichment).
|
|
|
|
|
pub fn load() -> Result<Store, String> {
|
|
|
|
|
// 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy)
|
|
|
|
|
match Self::load_snapshot_mmap() {
|
2026-03-10 14:30:53 -04:00
|
|
|
Ok(Some(mut store)) => {
|
|
|
|
|
// rkyv snapshot doesn't include visits — replay from log
|
|
|
|
|
let visits_p = visits_path();
|
|
|
|
|
if visits_p.exists() {
|
|
|
|
|
store.replay_visits(&visits_p).ok();
|
|
|
|
|
}
|
|
|
|
|
return Ok(store);
|
|
|
|
|
},
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
Ok(None) => {},
|
|
|
|
|
Err(e) => eprintln!("rkyv snapshot: {}", e),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. Try bincode state.bin cache (~10ms)
|
|
|
|
|
let nodes_p = nodes_path();
|
|
|
|
|
let rels_p = relations_path();
|
|
|
|
|
let state_p = state_path();
|
|
|
|
|
|
|
|
|
|
let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
|
|
|
|
let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
|
|
|
|
|
|
|
|
|
|
if let Ok(data) = fs::read(&state_p) {
|
|
|
|
|
if data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC {
|
|
|
|
|
let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap());
|
|
|
|
|
let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap());
|
|
|
|
|
|
|
|
|
|
if cached_nodes == nodes_size && cached_rels == rels_size {
|
|
|
|
|
if let Ok(mut store) = bincode::deserialize::<Store>(&data[CACHE_HEADER_LEN..]) {
|
|
|
|
|
// Rebuild uuid_to_key (skipped by serde)
|
|
|
|
|
for (key, node) in &store.nodes {
|
|
|
|
|
store.uuid_to_key.insert(node.uuid, key.clone());
|
|
|
|
|
}
|
2026-03-06 21:38:26 -05:00
|
|
|
store.loaded_nodes_size = nodes_size;
|
|
|
|
|
store.loaded_rels_size = rels_size;
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
// Bootstrap: write rkyv snapshot if missing
|
|
|
|
|
if !snapshot_path().exists() {
|
|
|
|
|
if let Err(e) = store.save_snapshot(cached_nodes, cached_rels) {
|
|
|
|
|
eprintln!("rkyv bootstrap: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Ok(store);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Stale or no cache — rebuild from capnp logs
|
|
|
|
|
let mut store = Store::default();
|
|
|
|
|
|
|
|
|
|
if nodes_p.exists() {
|
|
|
|
|
store.replay_nodes(&nodes_p)?;
|
|
|
|
|
}
|
|
|
|
|
if rels_p.exists() {
|
|
|
|
|
store.replay_relations(&rels_p)?;
|
|
|
|
|
}
|
2026-03-10 14:30:53 -04:00
|
|
|
let visits_p = visits_path();
|
|
|
|
|
if visits_p.exists() {
|
|
|
|
|
store.replay_visits(&visits_p)?;
|
|
|
|
|
}
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
|
2026-03-06 21:38:26 -05:00
|
|
|
// Record log sizes after replay — this is the state we reflect
|
|
|
|
|
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
|
|
|
|
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
|
|
|
|
|
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
// Drop edges referencing deleted/missing nodes
|
|
|
|
|
store.relations.retain(|r|
|
|
|
|
|
store.nodes.contains_key(&r.source_key) &&
|
|
|
|
|
store.nodes.contains_key(&r.target_key)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
store.save()?;
|
|
|
|
|
Ok(store)
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-08 19:45:18 -04:00
|
|
|
/// Replay node log, keeping latest version per UUID.
|
|
|
|
|
/// Tracks all UUIDs seen per key to detect duplicates.
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
fn replay_nodes(&mut self, path: &Path) -> Result<(), String> {
|
|
|
|
|
let file = fs::File::open(path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
let mut reader = BufReader::new(file);
|
|
|
|
|
|
2026-03-08 19:45:18 -04:00
|
|
|
// Track all non-deleted UUIDs per key to detect duplicates
|
|
|
|
|
let mut key_uuids: HashMap<String, Vec<[u8; 16]>> = HashMap::new();
|
|
|
|
|
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
|
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
|
|
|
.map_err(|e| format!("read node log: {}", e))?;
|
|
|
|
|
for node_reader in log.get_nodes()
|
|
|
|
|
.map_err(|e| format!("get nodes: {}", e))? {
|
|
|
|
|
let node = Node::from_capnp(node_reader)?;
|
|
|
|
|
let existing_version = self.nodes.get(&node.key)
|
|
|
|
|
.map(|n| n.version)
|
|
|
|
|
.unwrap_or(0);
|
|
|
|
|
if node.version >= existing_version {
|
|
|
|
|
if node.deleted {
|
|
|
|
|
self.nodes.remove(&node.key);
|
|
|
|
|
self.uuid_to_key.remove(&node.uuid);
|
2026-03-08 19:45:18 -04:00
|
|
|
if let Some(uuids) = key_uuids.get_mut(&node.key) {
|
|
|
|
|
uuids.retain(|u| *u != node.uuid);
|
|
|
|
|
}
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
} else {
|
|
|
|
|
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
2026-03-08 19:45:18 -04:00
|
|
|
self.nodes.insert(node.key.clone(), node.clone());
|
|
|
|
|
let uuids = key_uuids.entry(node.key).or_default();
|
|
|
|
|
if !uuids.contains(&node.uuid) {
|
|
|
|
|
uuids.push(node.uuid);
|
|
|
|
|
}
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-08 19:45:18 -04:00
|
|
|
|
|
|
|
|
// Report duplicate keys
|
|
|
|
|
for (key, uuids) in &key_uuids {
|
|
|
|
|
if uuids.len() > 1 {
|
|
|
|
|
eprintln!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Replay relation log, keeping latest version per UUID
|
|
|
|
|
fn replay_relations(&mut self, path: &Path) -> Result<(), String> {
|
|
|
|
|
let file = fs::File::open(path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
let mut reader = BufReader::new(file);
|
|
|
|
|
|
|
|
|
|
// Collect all, then deduplicate by UUID keeping latest version
|
|
|
|
|
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
|
|
|
|
|
|
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
|
|
|
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
|
|
|
|
|
.map_err(|e| format!("read relation log: {}", e))?;
|
|
|
|
|
for rel_reader in log.get_relations()
|
|
|
|
|
.map_err(|e| format!("get relations: {}", e))? {
|
|
|
|
|
let rel = Relation::from_capnp(rel_reader)?;
|
|
|
|
|
let existing_version = by_uuid.get(&rel.uuid)
|
|
|
|
|
.map(|r| r.version)
|
|
|
|
|
.unwrap_or(0);
|
|
|
|
|
if rel.version >= existing_version {
|
|
|
|
|
by_uuid.insert(rel.uuid, rel);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.relations = by_uuid.into_values()
|
|
|
|
|
.filter(|r| !r.deleted)
|
|
|
|
|
.collect();
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-10 14:30:21 -04:00
|
|
|
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
|
|
|
|
|
/// Returns a map from key → vec of all live Node versions (one per UUID).
|
|
|
|
|
/// The "winner" in self.nodes is always one of them.
|
|
|
|
|
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>, String> {
|
|
|
|
|
let path = nodes_path();
|
|
|
|
|
if !path.exists() { return Ok(HashMap::new()); }
|
|
|
|
|
|
|
|
|
|
let file = fs::File::open(&path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
let mut reader = BufReader::new(file);
|
|
|
|
|
|
|
|
|
|
// Track latest version of each UUID
|
|
|
|
|
let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new();
|
|
|
|
|
|
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
|
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
|
|
|
.map_err(|e| format!("read node log: {}", e))?;
|
|
|
|
|
for node_reader in log.get_nodes()
|
|
|
|
|
.map_err(|e| format!("get nodes: {}", e))? {
|
|
|
|
|
let node = Node::from_capnp(node_reader)?;
|
|
|
|
|
let dominated = by_uuid.get(&node.uuid)
|
|
|
|
|
.map(|n| node.version >= n.version)
|
|
|
|
|
.unwrap_or(true);
|
|
|
|
|
if dominated {
|
|
|
|
|
by_uuid.insert(node.uuid, node);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Group live (non-deleted) nodes by key
|
|
|
|
|
let mut by_key: HashMap<String, Vec<Node>> = HashMap::new();
|
|
|
|
|
for node in by_uuid.into_values() {
|
|
|
|
|
if !node.deleted {
|
|
|
|
|
by_key.entry(node.key.clone()).or_default().push(node);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Keep only duplicates
|
|
|
|
|
by_key.retain(|_, nodes| nodes.len() > 1);
|
|
|
|
|
Ok(by_key)
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-06 21:38:26 -05:00
|
|
|
/// Append nodes to the log file.
|
|
|
|
|
/// Serializes to a Vec first, then does a single write() syscall
|
|
|
|
|
/// so the append is atomic with O_APPEND even without flock.
|
|
|
|
|
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> {
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
let _lock = StoreLock::acquire()?;
|
2026-03-10 14:30:21 -04:00
|
|
|
self.append_nodes_unlocked(nodes)
|
|
|
|
|
}
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
|
2026-03-10 14:30:21 -04:00
|
|
|
/// Append nodes without acquiring the lock. Caller must hold StoreLock.
|
|
|
|
|
pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<(), String> {
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
let mut msg = message::Builder::new_default();
|
|
|
|
|
{
|
|
|
|
|
let log = msg.init_root::<memory_capnp::node_log::Builder>();
|
|
|
|
|
let mut list = log.init_nodes(nodes.len() as u32);
|
|
|
|
|
for (i, node) in nodes.iter().enumerate() {
|
|
|
|
|
node.to_capnp(list.reborrow().get(i as u32));
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-06 21:38:26 -05:00
|
|
|
let mut buf = Vec::new();
|
|
|
|
|
serialize::write_message(&mut buf, &msg)
|
|
|
|
|
.map_err(|e| format!("serialize nodes: {}", e))?;
|
|
|
|
|
|
|
|
|
|
let path = nodes_path();
|
|
|
|
|
let file = fs::OpenOptions::new()
|
|
|
|
|
.create(true).append(true).open(&path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
use std::io::Write;
|
|
|
|
|
(&file).write_all(&buf)
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
.map_err(|e| format!("write nodes: {}", e))?;
|
2026-03-06 21:38:26 -05:00
|
|
|
|
|
|
|
|
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-10 14:30:21 -04:00
|
|
|
/// Replay only new entries appended to the node log since we last loaded.
|
|
|
|
|
/// Call under StoreLock to catch writes from concurrent processes.
|
|
|
|
|
pub(crate) fn refresh_nodes(&mut self) -> Result<(), String> {
|
|
|
|
|
let path = nodes_path();
|
|
|
|
|
let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
|
|
|
|
|
if current_size <= self.loaded_nodes_size {
|
|
|
|
|
return Ok(()); // no new data
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let file = fs::File::open(&path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
let mut reader = BufReader::new(file);
|
|
|
|
|
reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size))
|
|
|
|
|
.map_err(|e| format!("seek nodes log: {}", e))?;
|
|
|
|
|
|
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
|
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
|
|
|
.map_err(|e| format!("read node log delta: {}", e))?;
|
|
|
|
|
for node_reader in log.get_nodes()
|
|
|
|
|
.map_err(|e| format!("get nodes delta: {}", e))? {
|
|
|
|
|
let node = Node::from_capnp(node_reader)?;
|
|
|
|
|
let dominated = self.nodes.get(&node.key)
|
|
|
|
|
.map(|n| node.version >= n.version)
|
|
|
|
|
.unwrap_or(true);
|
|
|
|
|
if dominated {
|
|
|
|
|
if node.deleted {
|
|
|
|
|
self.nodes.remove(&node.key);
|
|
|
|
|
self.uuid_to_key.remove(&node.uuid);
|
|
|
|
|
} else {
|
|
|
|
|
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
|
|
|
|
self.nodes.insert(node.key.clone(), node);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.loaded_nodes_size = current_size;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-06 21:38:26 -05:00
|
|
|
/// Append relations to the log file.
|
|
|
|
|
/// Single write() syscall for atomic O_APPEND.
|
|
|
|
|
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> {
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
let _lock = StoreLock::acquire()?;
|
2026-03-10 14:30:21 -04:00
|
|
|
self.append_relations_unlocked(relations)
|
|
|
|
|
}
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
|
2026-03-10 14:30:21 -04:00
|
|
|
/// Append relations without acquiring the lock. Caller must hold StoreLock.
|
|
|
|
|
pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<(), String> {
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
let mut msg = message::Builder::new_default();
|
|
|
|
|
{
|
|
|
|
|
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
|
|
|
|
|
let mut list = log.init_relations(relations.len() as u32);
|
|
|
|
|
for (i, rel) in relations.iter().enumerate() {
|
|
|
|
|
rel.to_capnp(list.reborrow().get(i as u32));
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-06 21:38:26 -05:00
|
|
|
let mut buf = Vec::new();
|
|
|
|
|
serialize::write_message(&mut buf, &msg)
|
|
|
|
|
.map_err(|e| format!("serialize relations: {}", e))?;
|
|
|
|
|
|
|
|
|
|
let path = relations_path();
|
|
|
|
|
let file = fs::OpenOptions::new()
|
|
|
|
|
.create(true).append(true).open(&path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
use std::io::Write;
|
|
|
|
|
(&file).write_all(&buf)
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
.map_err(|e| format!("write relations: {}", e))?;
|
2026-03-06 21:38:26 -05:00
|
|
|
|
|
|
|
|
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-10 14:30:53 -04:00
|
|
|
/// Append agent visit records to the visits log.
|
|
|
|
|
pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<(), String> {
|
|
|
|
|
if visits.is_empty() { return Ok(()); }
|
|
|
|
|
|
|
|
|
|
let mut msg = message::Builder::new_default();
|
|
|
|
|
{
|
|
|
|
|
let log = msg.init_root::<memory_capnp::agent_visit_log::Builder>();
|
|
|
|
|
let mut list = log.init_visits(visits.len() as u32);
|
|
|
|
|
for (i, visit) in visits.iter().enumerate() {
|
|
|
|
|
visit.to_capnp(list.reborrow().get(i as u32));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let mut buf = Vec::new();
|
|
|
|
|
serialize::write_message(&mut buf, &msg)
|
|
|
|
|
.map_err(|e| format!("serialize visits: {}", e))?;
|
|
|
|
|
|
|
|
|
|
let path = visits_path();
|
|
|
|
|
let file = fs::OpenOptions::new()
|
|
|
|
|
.create(true).append(true).open(&path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
use std::io::Write;
|
|
|
|
|
(&file).write_all(&buf)
|
|
|
|
|
.map_err(|e| format!("write visits: {}", e))?;
|
|
|
|
|
|
|
|
|
|
// Update in-memory index
|
|
|
|
|
for v in visits {
|
|
|
|
|
self.visits
|
|
|
|
|
.entry(v.node_key.clone())
|
|
|
|
|
.or_default()
|
|
|
|
|
.insert(v.agent.clone(), v.timestamp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Replay visits log to rebuild in-memory index.
|
|
|
|
|
fn replay_visits(&mut self, path: &Path) -> Result<(), String> {
|
|
|
|
|
let file = fs::File::open(path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
let mut reader = BufReader::new(file);
|
|
|
|
|
|
|
|
|
|
while reader.stream_position().map_err(|e| e.to_string())?
|
|
|
|
|
< fs::metadata(path).map_err(|e| e.to_string())?.len()
|
|
|
|
|
{
|
|
|
|
|
let msg = match serialize::read_message(&mut reader, Default::default()) {
|
|
|
|
|
Ok(m) => m,
|
|
|
|
|
Err(_) => break,
|
|
|
|
|
};
|
|
|
|
|
let log = msg.get_root::<memory_capnp::agent_visit_log::Reader>()
|
|
|
|
|
.map_err(|e| format!("read visit log: {}", e))?;
|
|
|
|
|
|
|
|
|
|
for visit in log.get_visits().map_err(|e| e.to_string())? {
|
|
|
|
|
let key = visit.get_node_key().ok()
|
|
|
|
|
.and_then(|t| t.to_str().ok())
|
|
|
|
|
.unwrap_or("")
|
|
|
|
|
.to_string();
|
|
|
|
|
let agent = visit.get_agent().ok()
|
|
|
|
|
.and_then(|t| t.to_str().ok())
|
|
|
|
|
.unwrap_or("")
|
|
|
|
|
.to_string();
|
|
|
|
|
let ts = visit.get_timestamp();
|
|
|
|
|
|
|
|
|
|
if !key.is_empty() && !agent.is_empty() {
|
|
|
|
|
let entry = self.visits.entry(key).or_default();
|
|
|
|
|
// Keep latest timestamp per agent
|
|
|
|
|
let existing = entry.entry(agent).or_insert(0);
|
|
|
|
|
if ts > *existing {
|
|
|
|
|
*existing = ts;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Record visits for a batch of node keys from a successful agent run.
|
|
|
|
|
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> {
|
|
|
|
|
let visits: Vec<AgentVisit> = node_keys.iter()
|
|
|
|
|
.filter_map(|key| {
|
|
|
|
|
let node = self.nodes.get(key)?;
|
|
|
|
|
Some(new_visit(node.uuid, key, agent, "processed"))
|
|
|
|
|
})
|
|
|
|
|
.collect();
|
|
|
|
|
self.append_visits(&visits)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get the last time an agent visited a node. Returns 0 if never visited.
|
|
|
|
|
pub fn last_visited(&self, node_key: &str, agent: &str) -> i64 {
|
|
|
|
|
self.visits.get(node_key)
|
|
|
|
|
.and_then(|agents| agents.get(agent))
|
|
|
|
|
.copied()
|
|
|
|
|
.unwrap_or(0)
|
|
|
|
|
}
|
|
|
|
|
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
/// Save the derived cache with log size header for staleness detection.
|
|
|
|
|
/// Uses atomic write (tmp + rename) to prevent partial reads.
|
|
|
|
|
pub fn save(&self) -> Result<(), String> {
|
|
|
|
|
let _lock = StoreLock::acquire()?;
|
|
|
|
|
|
|
|
|
|
let path = state_path();
|
|
|
|
|
if let Some(parent) = path.parent() {
|
|
|
|
|
fs::create_dir_all(parent).ok();
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-06 21:38:26 -05:00
|
|
|
// Use log sizes from load time, not current filesystem sizes.
|
|
|
|
|
// If another writer appended since we loaded, our recorded size
|
|
|
|
|
// will be smaller than the actual log → next reader detects stale
|
|
|
|
|
// cache and replays the (correct, append-only) log.
|
|
|
|
|
let nodes_size = self.loaded_nodes_size;
|
|
|
|
|
let rels_size = self.loaded_rels_size;
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
|
|
|
|
|
let bincode_data = bincode::serialize(self)
|
|
|
|
|
.map_err(|e| format!("bincode serialize: {}", e))?;
|
|
|
|
|
|
|
|
|
|
let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len());
|
|
|
|
|
data.extend_from_slice(&CACHE_MAGIC);
|
|
|
|
|
data.extend_from_slice(&nodes_size.to_le_bytes());
|
|
|
|
|
data.extend_from_slice(&rels_size.to_le_bytes());
|
|
|
|
|
data.extend_from_slice(&bincode_data);
|
|
|
|
|
|
|
|
|
|
// Atomic write: tmp file + rename
|
|
|
|
|
let tmp_path = path.with_extension("bin.tmp");
|
|
|
|
|
fs::write(&tmp_path, &data)
|
|
|
|
|
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
|
|
|
|
|
fs::rename(&tmp_path, &path)
|
|
|
|
|
.map_err(|e| format!("rename {} → {}: {}", tmp_path.display(), path.display(), e))?;
|
|
|
|
|
|
|
|
|
|
// Also write rkyv snapshot (mmap-friendly)
|
|
|
|
|
if let Err(e) = self.save_snapshot(nodes_size, rels_size) {
|
|
|
|
|
eprintln!("rkyv snapshot save: {}", e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Serialize store as rkyv snapshot with staleness header.
|
|
|
|
|
/// Assumes StoreLock is already held by caller.
|
|
|
|
|
fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> {
|
|
|
|
|
let snap = Snapshot {
|
|
|
|
|
nodes: self.nodes.clone(),
|
|
|
|
|
relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(),
|
|
|
|
|
gaps: self.gaps.clone(),
|
|
|
|
|
params: self.params,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let rkyv_data = rkyv::to_bytes::<_, 256>(&snap)
|
|
|
|
|
.map_err(|e| format!("rkyv serialize: {}", e))?;
|
|
|
|
|
|
|
|
|
|
let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len());
|
|
|
|
|
data.extend_from_slice(&RKYV_MAGIC);
|
|
|
|
|
data.extend_from_slice(&1u32.to_le_bytes()); // format version
|
|
|
|
|
data.extend_from_slice(&nodes_size.to_le_bytes());
|
|
|
|
|
data.extend_from_slice(&rels_size.to_le_bytes());
|
|
|
|
|
data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes());
|
|
|
|
|
data.extend_from_slice(&rkyv_data);
|
|
|
|
|
|
|
|
|
|
let path = snapshot_path();
|
|
|
|
|
let tmp_path = path.with_extension("rkyv.tmp");
|
|
|
|
|
fs::write(&tmp_path, &data)
|
|
|
|
|
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
|
|
|
|
|
fs::rename(&tmp_path, &path)
|
|
|
|
|
.map_err(|e| format!("rename: {}", e))?;
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Try loading store from mmap'd rkyv snapshot.
|
|
|
|
|
/// Returns None if snapshot is missing or stale (log sizes don't match).
|
|
|
|
|
fn load_snapshot_mmap() -> Result<Option<Store>, String> {
|
|
|
|
|
let path = snapshot_path();
|
|
|
|
|
if !path.exists() { return Ok(None); }
|
|
|
|
|
|
|
|
|
|
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
|
|
|
|
|
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
|
|
|
|
|
|
|
|
|
|
let file = fs::File::open(&path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
|
|
|
|
|
let mmap = unsafe { memmap2::Mmap::map(&file) }
|
|
|
|
|
.map_err(|e| format!("mmap {}: {}", path.display(), e))?;
|
|
|
|
|
|
|
|
|
|
if mmap.len() < RKYV_HEADER_LEN { return Ok(None); }
|
|
|
|
|
if mmap[..4] != RKYV_MAGIC { return Ok(None); }
|
|
|
|
|
|
|
|
|
|
// [4..8] = version, skip for now
|
|
|
|
|
let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap());
|
|
|
|
|
let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap());
|
|
|
|
|
let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize;
|
|
|
|
|
|
|
|
|
|
if cached_nodes != nodes_size || cached_rels != rels_size {
|
|
|
|
|
return Ok(None); // stale
|
|
|
|
|
}
|
|
|
|
|
if mmap.len() < RKYV_HEADER_LEN + data_len {
|
|
|
|
|
return Ok(None); // truncated
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len];
|
|
|
|
|
|
|
|
|
|
// SAFETY: we wrote this file ourselves via save_snapshot().
|
|
|
|
|
// Skip full validation (check_archived_root) — the staleness header
|
|
|
|
|
// already confirms this snapshot matches the current log state.
|
|
|
|
|
let archived = unsafe { rkyv::archived_root::<Snapshot>(rkyv_data) };
|
|
|
|
|
|
|
|
|
|
let snap: Snapshot = <ArchivedSnapshot as rkyv::Deserialize<Snapshot, rkyv::Infallible>>
|
|
|
|
|
::deserialize(archived, &mut rkyv::Infallible).unwrap();
|
|
|
|
|
|
|
|
|
|
let mut store = Store {
|
|
|
|
|
nodes: snap.nodes,
|
|
|
|
|
relations: snap.relations,
|
|
|
|
|
gaps: snap.gaps,
|
|
|
|
|
params: snap.params,
|
|
|
|
|
..Default::default()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Rebuild uuid_to_key (not serialized)
|
|
|
|
|
for (key, node) in &store.nodes {
|
|
|
|
|
store.uuid_to_key.insert(node.uuid, key.clone());
|
|
|
|
|
}
|
2026-03-06 21:38:26 -05:00
|
|
|
store.loaded_nodes_size = nodes_size;
|
|
|
|
|
store.loaded_rels_size = rels_size;
|
store: split mod.rs into persist.rs and ops.rs
mod.rs was 937 lines with all Store methods in one block.
Split into three files by responsibility:
- persist.rs (318 lines): load, save, replay, append, snapshot
— all disk IO and cache management
- ops.rs (300 lines): upsert, delete, modify, mark_used/wrong,
decay, fix_categories, cap_degree — all mutations
- mod.rs (356 lines): re-exports, key resolution, ingestion,
rendering, search — read-only operations
No behavioral changes; cargo check + full smoke test pass.
2026-03-03 16:40:32 -05:00
|
|
|
|
|
|
|
|
Ok(Some(store))
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-08 18:31:19 -04:00
|
|
|
|
2026-03-08 19:41:26 -04:00
|
|
|
/// Strip .md suffix from all node keys and relation key strings.
|
|
|
|
|
/// Merges duplicates (bare key + .md key) by keeping the latest version.
|
|
|
|
|
pub fn strip_md_keys() -> Result<(), String> {
|
|
|
|
|
use super::strip_md_suffix;
|
|
|
|
|
|
|
|
|
|
let mut store = Store::load()?;
|
|
|
|
|
let mut renamed_nodes = 0usize;
|
|
|
|
|
let mut renamed_rels = 0usize;
|
|
|
|
|
let mut merged = 0usize;
|
|
|
|
|
|
|
|
|
|
// Collect keys that need renaming
|
|
|
|
|
let old_keys: Vec<String> = store.nodes.keys()
|
|
|
|
|
.filter(|k| k.ends_with(".md") || k.contains(".md#"))
|
|
|
|
|
.cloned()
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
for old_key in &old_keys {
|
|
|
|
|
let new_key = strip_md_suffix(old_key);
|
|
|
|
|
if new_key == *old_key { continue; }
|
|
|
|
|
|
|
|
|
|
let mut node = store.nodes.remove(old_key).unwrap();
|
|
|
|
|
store.uuid_to_key.remove(&node.uuid);
|
|
|
|
|
|
|
|
|
|
if let Some(existing) = store.nodes.get(&new_key) {
|
|
|
|
|
// Merge: keep whichever has the higher version
|
|
|
|
|
if existing.version >= node.version {
|
|
|
|
|
eprintln!(" merge {} → {} (keeping existing v{})",
|
|
|
|
|
old_key, new_key, existing.version);
|
|
|
|
|
merged += 1;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
eprintln!(" merge {} → {} (replacing v{} with v{})",
|
|
|
|
|
old_key, new_key, existing.version, node.version);
|
|
|
|
|
merged += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
node.key = new_key.clone();
|
|
|
|
|
node.version += 1;
|
|
|
|
|
store.uuid_to_key.insert(node.uuid, new_key.clone());
|
|
|
|
|
store.nodes.insert(new_key, node);
|
|
|
|
|
renamed_nodes += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Fix relation key strings
|
|
|
|
|
for rel in &mut store.relations {
|
|
|
|
|
let new_source = strip_md_suffix(&rel.source_key);
|
|
|
|
|
let new_target = strip_md_suffix(&rel.target_key);
|
|
|
|
|
if new_source != rel.source_key || new_target != rel.target_key {
|
|
|
|
|
rel.source_key = new_source;
|
|
|
|
|
rel.target_key = new_target;
|
|
|
|
|
rel.version += 1;
|
|
|
|
|
renamed_rels += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if renamed_nodes == 0 && renamed_rels == 0 && merged == 0 {
|
|
|
|
|
eprintln!("No .md suffixes found — store is clean");
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
eprintln!("Renamed {} nodes, {} relations, merged {} duplicates",
|
|
|
|
|
renamed_nodes, renamed_rels, merged);
|
|
|
|
|
|
|
|
|
|
// Write fresh logs from the migrated state
|
|
|
|
|
rewrite_store(&store)?;
|
|
|
|
|
|
|
|
|
|
eprintln!("Store rewritten successfully");
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Rewrite the entire store from scratch (fresh logs + caches).
|
|
|
|
|
/// Used after migrations that change keys across all nodes/relations.
|
|
|
|
|
fn rewrite_store(store: &Store) -> Result<(), String> {
|
|
|
|
|
let _lock = StoreLock::acquire()?;
|
|
|
|
|
|
|
|
|
|
// Write fresh node log
|
|
|
|
|
let nodes: Vec<_> = store.nodes.values().cloned().collect();
|
|
|
|
|
let nodes_path = nodes_path();
|
|
|
|
|
{
|
|
|
|
|
let file = fs::File::create(&nodes_path)
|
|
|
|
|
.map_err(|e| format!("create {}: {}", nodes_path.display(), e))?;
|
|
|
|
|
let mut writer = BufWriter::new(file);
|
|
|
|
|
|
|
|
|
|
// Write in chunks to keep message sizes reasonable
|
|
|
|
|
for chunk in nodes.chunks(100) {
|
|
|
|
|
let mut msg = message::Builder::new_default();
|
|
|
|
|
{
|
|
|
|
|
let log = msg.init_root::<memory_capnp::node_log::Builder>();
|
|
|
|
|
let mut list = log.init_nodes(chunk.len() as u32);
|
|
|
|
|
for (i, node) in chunk.iter().enumerate() {
|
|
|
|
|
node.to_capnp(list.reborrow().get(i as u32));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
serialize::write_message(&mut writer, &msg)
|
|
|
|
|
.map_err(|e| format!("write nodes: {}", e))?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write fresh relation log
|
|
|
|
|
let rels_path = relations_path();
|
|
|
|
|
{
|
|
|
|
|
let file = fs::File::create(&rels_path)
|
|
|
|
|
.map_err(|e| format!("create {}: {}", rels_path.display(), e))?;
|
|
|
|
|
let mut writer = BufWriter::new(file);
|
|
|
|
|
|
|
|
|
|
let rels: Vec<_> = store.relations.iter().filter(|r| !r.deleted).cloned().collect();
|
|
|
|
|
if !rels.is_empty() {
|
|
|
|
|
for chunk in rels.chunks(100) {
|
|
|
|
|
let mut msg = message::Builder::new_default();
|
|
|
|
|
{
|
|
|
|
|
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
|
|
|
|
|
let mut list = log.init_relations(chunk.len() as u32);
|
|
|
|
|
for (i, rel) in chunk.iter().enumerate() {
|
|
|
|
|
rel.to_capnp(list.reborrow().get(i as u32));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
serialize::write_message(&mut writer, &msg)
|
|
|
|
|
.map_err(|e| format!("write relations: {}", e))?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Nuke caches so next load rebuilds from fresh logs
|
|
|
|
|
for p in [state_path(), snapshot_path()] {
|
|
|
|
|
if p.exists() {
|
|
|
|
|
fs::remove_file(&p).ok();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-08 18:31:19 -04:00
|
|
|
/// Check and repair corrupt capnp log files.
|
|
|
|
|
///
|
|
|
|
|
/// Reads each message sequentially, tracking file position. On the first
|
|
|
|
|
/// corrupt message, truncates the file to the last good position. Also
|
|
|
|
|
/// removes stale caches so the next load replays from the repaired log.
|
|
|
|
|
pub fn fsck() -> Result<(), String> {
|
|
|
|
|
let mut any_corrupt = false;
|
|
|
|
|
|
|
|
|
|
for (path, kind) in [
|
|
|
|
|
(nodes_path(), "node"),
|
|
|
|
|
(relations_path(), "relation"),
|
|
|
|
|
] {
|
|
|
|
|
if !path.exists() { continue; }
|
|
|
|
|
|
|
|
|
|
let file = fs::File::open(&path)
|
|
|
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
|
let file_len = file.metadata()
|
|
|
|
|
.map_err(|e| format!("stat {}: {}", path.display(), e))?.len();
|
|
|
|
|
let mut reader = BufReader::new(file);
|
|
|
|
|
|
|
|
|
|
let mut good_messages = 0u64;
|
|
|
|
|
let mut last_good_pos = 0u64;
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
let pos = reader.stream_position()
|
|
|
|
|
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
|
|
|
|
|
|
|
|
|
|
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
|
|
|
Ok(m) => m,
|
|
|
|
|
Err(_) => {
|
|
|
|
|
// read_message fails at EOF (normal) or on corrupt framing
|
|
|
|
|
if pos < file_len {
|
|
|
|
|
// Not at EOF — corrupt framing
|
|
|
|
|
eprintln!("{}: corrupt message at offset {}, truncating", kind, pos);
|
|
|
|
|
any_corrupt = true;
|
|
|
|
|
drop(reader);
|
|
|
|
|
let file = fs::OpenOptions::new().write(true).open(&path)
|
|
|
|
|
.map_err(|e| format!("open for truncate: {}", e))?;
|
|
|
|
|
file.set_len(pos)
|
|
|
|
|
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
|
|
|
|
|
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
|
|
|
|
|
kind, file_len, pos, good_messages);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Validate the message content too
|
|
|
|
|
let valid = if kind == "node" {
|
|
|
|
|
msg.get_root::<memory_capnp::node_log::Reader>()
|
|
|
|
|
.and_then(|l| l.get_nodes().map(|_| ()))
|
|
|
|
|
.is_ok()
|
|
|
|
|
} else {
|
|
|
|
|
msg.get_root::<memory_capnp::relation_log::Reader>()
|
|
|
|
|
.and_then(|l| l.get_relations().map(|_| ()))
|
|
|
|
|
.is_ok()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if valid {
|
|
|
|
|
good_messages += 1;
|
|
|
|
|
last_good_pos = reader.stream_position()
|
|
|
|
|
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
|
|
|
|
|
} else {
|
|
|
|
|
eprintln!("{}: corrupt message content at offset {}, truncating to {}",
|
|
|
|
|
kind, pos, last_good_pos);
|
|
|
|
|
any_corrupt = true;
|
|
|
|
|
drop(reader);
|
|
|
|
|
let file = fs::OpenOptions::new().write(true).open(&path)
|
|
|
|
|
.map_err(|e| format!("open for truncate: {}", e))?;
|
|
|
|
|
file.set_len(last_good_pos)
|
|
|
|
|
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
|
|
|
|
|
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
|
|
|
|
|
kind, file_len, last_good_pos, good_messages);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !any_corrupt {
|
|
|
|
|
eprintln!("{}: {} messages, all clean", kind, good_messages);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if any_corrupt {
|
|
|
|
|
// Nuke caches so next load replays from the repaired logs
|
|
|
|
|
for p in [state_path(), snapshot_path()] {
|
|
|
|
|
if p.exists() {
|
|
|
|
|
fs::remove_file(&p)
|
|
|
|
|
.map_err(|e| format!("remove {}: {}", p.display(), e))?;
|
|
|
|
|
eprintln!("removed stale cache: {}", p.display());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
eprintln!("repair complete — run `poc-memory status` to verify");
|
|
|
|
|
} else {
|
|
|
|
|
eprintln!("store is clean");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|