From 2caccf875d519613dd30b4279a670ab9ae4a1098 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Mon, 13 Apr 2026 18:30:58 -0400 Subject: [PATCH] Replace rkyv/bincode caching with redb indices Remove three-tier loading (rkyv snapshot, bincode cache, capnp replay) in favor of direct capnp log replay + redb for indexed access. - Remove all rkyv derives from types (Node, Relation, enums, etc.) - Remove Snapshot struct, RKYV_MAGIC, CACHE_MAGIC constants - Remove load_snapshot_mmap(), save(), save_snapshot() - Remove MmapView, AnyView from view.rs (keep StoreView trait) - Simplify Store::load() to just replay capnp logs - Add db.rs with redb schema: nodes, uuid_to_key, visits, transcript_progress - Simplify cmd_fsck to just check capnp integrity + graph health capnp logs remain source of truth; redb indices will be rebuilt on demand. Co-Authored-By: Proof of Concept --- Cargo.lock | 165 +---------------------- Cargo.toml | 4 - src/cli/admin.rs | 42 +----- src/hippocampus/store/db.rs | 171 +++++++++++++++++++++++ src/hippocampus/store/mod.rs | 18 +-- src/hippocampus/store/persist.rs | 224 +------------------------------ src/hippocampus/store/types.rs | 65 ++------- src/hippocampus/store/view.rs | 148 +------------------- 8 files changed, 201 insertions(+), 636 deletions(-) create mode 100644 src/hippocampus/store/db.rs diff --git a/Cargo.lock b/Cargo.lock index 3ca2a0b..2a18e6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,17 +8,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" -[[package]] -name = "ahash" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" -dependencies = [ - "getrandom 0.2.17", - "once_cell", - "version_check", -] - [[package]] name = "ahash" version = "0.8.12" @@ -285,18 +274,6 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" -[[package]] -name = "bitvec" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" -dependencies = [ - "funty", - "radium", - "tap", - "wyz", -] - [[package]] name = "block-buffer" version = "0.10.4" @@ -322,28 +299,6 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" -[[package]] -name = "bytecheck" -version = "0.6.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" -dependencies = [ - "bytecheck_derive", - "ptr_meta", - "simdutf8", -] - -[[package]] -name = "bytecheck_derive" -version = "0.6.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "bytemuck" version = "1.25.0" @@ -521,7 +476,6 @@ dependencies = [ "ast-grep-core", "ast-grep-language", "base64 0.22.1", - "bincode", "bytes", "capnp", "capnp-rpc", @@ -546,10 +500,8 @@ dependencies = [ "paste", "peg", "ratatui", - "rayon", "redb", "regex", - "rkyv", "rusqlite", "rustls", "rustls-native-certs", @@ -1143,12 +1095,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" -[[package]] -name = "funty" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" - [[package]] name = "futures" version = "0.3.32" @@ -1311,15 +1257,6 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -dependencies = [ - "ahash 0.7.8", -] - [[package]] name = "hashbrown" version = "0.15.5" @@ -2227,26 +2164,6 @@ dependencies = [ "yansi", ] -[[package]] -name = "ptr_meta" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" -dependencies = [ - "ptr_meta_derive", -] - -[[package]] -name = "ptr_meta_derive" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "pulldown-cmark" version = "0.13.3" @@ -2296,12 +2213,6 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" -[[package]] -name = "radium" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" - [[package]] name = "rand" version = "0.8.5" @@ -2520,15 +2431,6 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" -[[package]] -name = "rend" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" -dependencies = [ - "bytecheck", -] - [[package]] name = "ring" version = "0.17.14" @@ -2543,35 +2445,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rkyv" -version = "0.7.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" -dependencies = [ - "bitvec", - "bytecheck", - "bytes", - "hashbrown 0.12.3", - "ptr_meta", - "rend", - "rkyv_derive", - "seahash", - "tinyvec", - "uuid", -] - -[[package]] -name = "rkyv_derive" -version = "0.7.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "rusqlite" version = "0.37.0" @@ -2693,12 +2566,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "seahash" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" - [[package]] name = "security-framework" version = "3.7.0" @@ -2978,12 +2845,6 @@ dependencies = [ "yaml-rust", ] -[[package]] -name = "tap" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" - [[package]] name = "terminfo" version = "0.9.0" @@ -3131,28 +2992,13 @@ dependencies = [ "time-core", ] -[[package]] -name = "tinyvec" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - [[package]] name = "tokenizers" version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a620b996116a59e184c2fa2dfd8251ea34a36d0a514758c6f966386bd2e03476" dependencies = [ - "ahash 0.8.12", + "ahash", "aho-corasick", "compact_str", "dary_heap", @@ -4155,15 +4001,6 @@ dependencies = [ "wasmparser", ] -[[package]] -name = "wyz" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" -dependencies = [ - "tap", -] - [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/Cargo.toml b/Cargo.toml index a39c60f..d188e67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,6 @@ tui-markdown = { git = "https://github.com/koverstreet/tui-markdown", subdirecto tui-textarea = { version = "0.10.2", package = "tui-textarea-2" } uuid = { version = "1", features = ["v4"] } -bincode = "1" regex = "1" glob = "0.3" chrono = { version = "0.4", features = ["serde"] } @@ -51,9 +50,6 @@ ast-grep-language = { version = "0.42", features = ["builtin-parser"] } walkdir = "2" redb = "4" -rkyv = { version = "0.7", features = ["validation", "std"] } - -rayon = "1" tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["compat"] } diff --git a/src/cli/admin.rs b/src/cli/admin.rs index 1387323..3a7bbe5 100644 --- a/src/cli/admin.rs +++ b/src/cli/admin.rs @@ -60,43 +60,12 @@ pub async fn cmd_init() -> Result<()> { } pub async fn cmd_fsck() -> Result<()> { + // Check/repair capnp log integrity first + store::fsck()?; + let arc = memory::access_local()?; let mut store = arc.lock().await; - // Check cache vs log consistency - let log_store = store::Store::load_from_logs()?; - let mut cache_issues = 0; - - // Nodes in logs but missing from cache - for key in log_store.nodes.keys() { - if !store.nodes.contains_key(key) { - eprintln!("CACHE MISSING: '{}' exists in capnp log but not in cache", key); - cache_issues += 1; - } - } - // Nodes in cache but not in logs (phantom nodes) - for key in store.nodes.keys() { - if !log_store.nodes.contains_key(key) { - eprintln!("CACHE PHANTOM: '{}' exists in cache but not in capnp log", key); - cache_issues += 1; - } - } - // Version mismatches - for (key, log_node) in &log_store.nodes { - if let Some(cache_node) = store.nodes.get(key) - && cache_node.version != log_node.version { - eprintln!("CACHE STALE: '{}' cache v{} vs log v{}", - key, cache_node.version, log_node.version); - cache_issues += 1; - } - } - - if cache_issues > 0 { - eprintln!("{} cache inconsistencies found — rebuilding from logs", cache_issues); - *store = log_store; - store.save().context("rebuild save")?; - } - // Check node-key consistency let mut issues = 0; for (key, node) in &store.nodes { @@ -141,13 +110,12 @@ pub async fn cmd_fsck() -> Result<()> { r.version = t.version; } } - store.save()?; eprintln!("Pruned {} orphan edges", count); } let g = store.build_graph(); - println!("fsck: {} nodes, {} edges, {} issues, {} dangling, {} cache", - store.nodes.len(), g.edge_count(), issues, dangling, cache_issues); + println!("fsck: {} nodes, {} edges, {} issues, {} dangling", + store.nodes.len(), g.edge_count(), issues, dangling); Ok(()) } diff --git a/src/hippocampus/store/db.rs b/src/hippocampus/store/db.rs new file mode 100644 index 0000000..98a8aa9 --- /dev/null +++ b/src/hippocampus/store/db.rs @@ -0,0 +1,171 @@ +// redb index tables +// +// capnp logs are source of truth; redb provides indexed access. +// Tables: +// nodes: key → Node (JSON serialized) +// uuid_to_key: [u8;16] → key +// visits: (node_key, agent) → timestamp +// transcript_progress: (transcript_id, segment_idx, agent) → timestamp +// +// Relations stay in-memory for now (frequently iterated in full). + +use super::types::*; +use anyhow::{Context, Result}; +use redb::{Database, ReadableDatabase, TableDefinition}; +use std::path::Path; + +// Table definitions +const NODES: TableDefinition<&str, &[u8]> = TableDefinition::new("nodes"); +const UUID_TO_KEY: TableDefinition<&[u8], &str> = TableDefinition::new("uuid_to_key"); +const VISITS: TableDefinition<(&str, &str), i64> = TableDefinition::new("visits"); +const TRANSCRIPT_PROGRESS: TableDefinition<(&str, u32, &str), i64> = + TableDefinition::new("transcript_progress"); + +/// Open or create the redb database, ensuring all tables exist. +pub fn open_db(path: &Path) -> Result { + let db = Database::create(path) + .with_context(|| format!("create redb {}", path.display()))?; + + // Ensure tables exist by opening a write transaction + let txn = db.begin_write()?; + { + let _ = txn.open_table(NODES)?; + let _ = txn.open_table(UUID_TO_KEY)?; + let _ = txn.open_table(VISITS)?; + let _ = txn.open_table(TRANSCRIPT_PROGRESS)?; + } + txn.commit()?; + + Ok(db) +} + +/// Rebuild redb indices from in-memory Store (loaded from capnp logs). +/// Deletes and recreates the database for a clean rebuild. +pub fn rebuild_from_store(path: &Path, store: &Store) -> Result { + // Remove old database if it exists + if path.exists() { + std::fs::remove_file(path) + .with_context(|| format!("remove old db {}", path.display()))?; + } + + let db = open_db(path)?; + let txn = db.begin_write()?; + + { + let mut nodes_table = txn.open_table(NODES)?; + let mut uuid_table = txn.open_table(UUID_TO_KEY)?; + + for (key, node) in &store.nodes { + let json = serde_json::to_vec(node) + .with_context(|| format!("serialize node {}", key))?; + nodes_table.insert(key.as_str(), json.as_slice())?; + uuid_table.insert(node.uuid.as_slice(), key.as_str())?; + } + } + + { + let mut visits_table = txn.open_table(VISITS)?; + + for (node_key, agents) in &store.visits { + for (agent, ×tamp) in agents { + visits_table.insert((node_key.as_str(), agent.as_str()), timestamp)?; + } + } + } + + { + let mut tp_table = txn.open_table(TRANSCRIPT_PROGRESS)?; + + for ((transcript_id, segment_idx), agents) in &store.transcript_progress { + for agent in agents { + tp_table.insert( + (transcript_id.as_str(), *segment_idx, agent.as_str()), + now_epoch(), + )?; + } + } + } + + txn.commit()?; + Ok(db) +} + +/// Get a node by key from redb. +pub fn get_node(db: &Database, key: &str) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_table(NODES)?; + + match table.get(key)? { + Some(data) => { + let node: Node = serde_json::from_slice(data.value()) + .with_context(|| format!("deserialize node {}", key))?; + Ok(Some(node)) + } + None => Ok(None), + } +} + +/// Get key by uuid from redb. +pub fn get_key_by_uuid(db: &Database, uuid: &[u8; 16]) -> Result> { + let txn = db.begin_read()?; + let table = txn.open_table(UUID_TO_KEY)?; + + match table.get(uuid.as_slice())? { + Some(key) => Ok(Some(key.value().to_string())), + None => Ok(None), + } +} + +/// Insert or update a node in redb. +pub fn upsert_node(db: &Database, node: &Node) -> Result<()> { + let txn = db.begin_write()?; + { + let mut nodes_table = txn.open_table(NODES)?; + let mut uuid_table = txn.open_table(UUID_TO_KEY)?; + + let json = serde_json::to_vec(node) + .with_context(|| format!("serialize node {}", node.key))?; + + nodes_table.insert(node.key.as_str(), json.as_slice())?; + uuid_table.insert(node.uuid.as_slice(), node.key.as_str())?; + } + txn.commit()?; + Ok(()) +} + +/// Delete a node from redb indices (by marking key empty, keeping uuid mapping). +pub fn delete_node(db: &Database, key: &str, uuid: &[u8; 16]) -> Result<()> { + let txn = db.begin_write()?; + { + let mut nodes_table = txn.open_table(NODES)?; + let mut uuid_table = txn.open_table(UUID_TO_KEY)?; + + nodes_table.remove(key)?; + uuid_table.remove(uuid.as_slice())?; + } + txn.commit()?; + Ok(()) +} + +/// Record a visit in redb. +pub fn record_visit(db: &Database, node_key: &str, agent: &str, timestamp: i64) -> Result<()> { + let txn = db.begin_write()?; + { + let mut table = txn.open_table(VISITS)?; + table.insert((node_key, agent), timestamp)?; + } + txn.commit()?; + Ok(()) +} + +/// Get last visit timestamp for a node/agent pair. +pub fn get_last_visit(db: &Database, node_key: &str, agent: &str) -> Result { + let txn = db.begin_read()?; + let table = txn.open_table(VISITS)?; + + match table.get((node_key, agent))? { + Some(ts) => Ok(ts.value()), + None => Ok(0), + } +} + diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index 5bf03a2..0e86b9c 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -1,21 +1,16 @@ -// Append-only Cap'n Proto storage + derived KV cache +// Append-only Cap'n Proto storage + redb indices // -// Two log files are source of truth: +// capnp logs are the source of truth: // nodes.capnp - ContentNode messages // relations.capnp - Relation messages // -// The Store struct is the derived cache: latest version per UUID, -// rebuilt from logs when stale. Three-tier load strategy: -// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize -// 2. bincode cache (state.bin) — ~10ms -// 3. capnp log replay — ~40ms -// Staleness: log file sizes embedded in cache headers. +// redb provides indexed access; Store struct holds in-memory state. // // Module layout: // types.rs — Node, Relation, enums, capnp macros, path helpers // parse.rs — markdown → MemoryUnit parsing -// view.rs — zero-copy read-only access (StoreView, MmapView) -// persist.rs — load, save, replay, append, snapshot (all disk IO) +// view.rs — StoreView trait for read-only access +// persist.rs — load, replay, append (capnp IO) // ops.rs — mutations (upsert, delete, decay, cap_degree, etc.) // mod.rs — re-exports, key resolution, ingestion, rendering @@ -24,6 +19,7 @@ mod parse; mod view; mod persist; mod ops; +pub mod db; // Re-export everything callers need pub use types::{ @@ -34,7 +30,7 @@ pub use types::{ new_node, new_relation, }; pub use parse::{MemoryUnit, parse_units}; -pub use view::{StoreView, AnyView}; +pub use view::StoreView; pub use persist::fsck; pub use ops::current_provenance; diff --git a/src/hippocampus/store/persist.rs b/src/hippocampus/store/persist.rs index 946ff99..62a1740 100644 --- a/src/hippocampus/store/persist.rs +++ b/src/hippocampus/store/persist.rs @@ -1,11 +1,6 @@ -// Persistence layer: load, save, replay, append, snapshot +// Persistence layer: load, replay, append // -// Three-tier loading strategy: -// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize -// 2. bincode cache (state.bin) — ~10ms -// 3. capnp log replay — ~40ms -// -// Logs are append-only; cache staleness uses log file sizes, not mtimes. +// capnp logs are the source of truth; redb provides indexed access. use super::types::*; @@ -21,62 +16,11 @@ use std::io::{BufReader, Seek}; use std::path::Path; impl Store { - /// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs. - /// - /// Staleness check uses log file sizes (not mtimes). Since logs are - /// append-only, any write grows the file, invalidating the cache. - /// This avoids the mtime race that caused data loss with concurrent - /// writers (dream loop, link audit, journal enrichment). + /// Load store by replaying capnp logs. pub fn load() -> Result { - // 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy) - match Self::load_snapshot_mmap() { - Ok(Some(mut store)) => { - // rkyv snapshot doesn't include visits — replay from log - let visits_p = visits_path(); - if visits_p.exists() { - store.replay_visits(&visits_p).ok(); - } - let tp_p = transcript_progress_path(); - if tp_p.exists() { - store.replay_transcript_progress(&tp_p).ok(); - } - return Ok(store); - }, - Ok(None) => {}, - Err(e) => eprintln!("rkyv snapshot: {}", e), - } - - // 2. Try bincode state.bin cache (~10ms) let nodes_p = nodes_path(); let rels_p = relations_path(); - let state_p = state_path(); - let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); - - if let Ok(data) = fs::read(&state_p) - && data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC { - let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap()); - let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap()); - - if cached_nodes == nodes_size && cached_rels == rels_size - && let Ok(mut store) = bincode::deserialize::(&data[CACHE_HEADER_LEN..]) { - // Rebuild uuid_to_key (skipped by serde) - for (key, node) in &store.nodes { - store.uuid_to_key.insert(node.uuid, key.clone()); - } - store.loaded_nodes_size = nodes_size; - store.loaded_rels_size = rels_size; - // Bootstrap: write rkyv snapshot if missing - if !snapshot_path().exists() - && let Err(e) = store.save_snapshot(cached_nodes, cached_rels) { - eprintln!("rkyv bootstrap: {}", e); - } - return Ok(store); - } - } - - // Stale or no cache — rebuild from capnp logs let mut store = Store::default(); if nodes_p.exists() { @@ -94,7 +38,7 @@ impl Store { store.replay_transcript_progress(&tp_p)?; } - // Record log sizes after replay — this is the state we reflect + // Record log sizes after replay store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0); @@ -104,31 +48,6 @@ impl Store { store.nodes.contains_key(&r.target_key) ); - store.save()?; - Ok(store) - } - - /// Load store directly from capnp logs, bypassing all caches. - /// Used by fsck to verify cache consistency. - pub fn load_from_logs() -> Result { - let nodes_p = nodes_path(); - let rels_p = relations_path(); - - let mut store = Store::default(); - if nodes_p.exists() { - store.replay_nodes(&nodes_p)?; - } - if rels_p.exists() { - store.replay_relations(&rels_p)?; - } - let visits_p = visits_path(); - if visits_p.exists() { - store.replay_visits(&visits_p)?; - } - let tp_p = transcript_progress_path(); - if tp_p.exists() { - store.replay_transcript_progress(&tp_p)?; - } Ok(store) } @@ -588,135 +507,10 @@ impl Store { .unwrap_or(0) } - /// Save the derived cache with log size header for staleness detection. - /// Uses atomic write (tmp + rename) to prevent partial reads. + /// Placeholder - indices will be updated on write with redb. pub fn save(&self) -> Result<()> { - let _lock = StoreLock::acquire()?; - - let path = state_path(); - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).ok(); - } - - // Use log sizes from load time, not current filesystem sizes. - // If another writer appended since we loaded, our recorded size - // will be smaller than the actual log → next reader detects stale - // cache and replays the (correct, append-only) log. - let nodes_size = self.loaded_nodes_size; - let rels_size = self.loaded_rels_size; - - let bincode_data = bincode::serialize(self) - .with_context(|| format!("bincode serialize"))?; - - let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len()); - data.extend_from_slice(&CACHE_MAGIC); - data.extend_from_slice(&nodes_size.to_le_bytes()); - data.extend_from_slice(&rels_size.to_le_bytes()); - data.extend_from_slice(&bincode_data); - - // Atomic write: tmp file + rename - let tmp_path = path.with_extension("bin.tmp"); - fs::write(&tmp_path, &data) - .with_context(|| format!("write {}", tmp_path.display()))?; - fs::rename(&tmp_path, &path) - .with_context(|| format!("rename {} → {}", tmp_path.display(), path.display()))?; - - // Also write rkyv snapshot (mmap-friendly) - if let Err(e) = self.save_snapshot(nodes_size, rels_size) { - eprintln!("rkyv snapshot save: {}", e); - } - Ok(()) } - - /// Serialize store as rkyv snapshot with staleness header. - /// Assumes StoreLock is already held by caller. - fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<()> { - let snap = Snapshot { - nodes: self.nodes.clone(), - relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(), - gaps: self.gaps.clone(), - params: self.params, - }; - - let rkyv_data = rkyv::to_bytes::<_, 256>(&snap) - .with_context(|| format!("rkyv serialize"))?; - - let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len()); - data.extend_from_slice(&RKYV_MAGIC); - data.extend_from_slice(&1u32.to_le_bytes()); // format version - data.extend_from_slice(&nodes_size.to_le_bytes()); - data.extend_from_slice(&rels_size.to_le_bytes()); - data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes()); - data.extend_from_slice(&rkyv_data); - - let path = snapshot_path(); - let tmp_path = path.with_extension("rkyv.tmp"); - fs::write(&tmp_path, &data) - .with_context(|| format!("write {}", tmp_path.display()))?; - fs::rename(&tmp_path, &path) - .with_context(|| format!("rename"))?; - - Ok(()) - } - - /// Try loading store from mmap'd rkyv snapshot. - /// Returns None if snapshot is missing or stale (log sizes don't match). - fn load_snapshot_mmap() -> Result> { - let path = snapshot_path(); - if !path.exists() { return Ok(None); } - - let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); - - let file = fs::File::open(&path) - .with_context(|| format!("open {}", path.display()))?; - - let mmap = unsafe { memmap2::Mmap::map(&file) } - .with_context(|| format!("mmap {}", path.display()))?; - - if mmap.len() < RKYV_HEADER_LEN { return Ok(None); } - if mmap[..4] != RKYV_MAGIC { return Ok(None); } - - // [4..8] = version, skip for now - let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap()); - let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap()); - let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize; - - if cached_nodes != nodes_size || cached_rels != rels_size { - return Ok(None); // stale - } - if mmap.len() < RKYV_HEADER_LEN + data_len { - return Ok(None); // truncated - } - - let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len]; - - // SAFETY: we wrote this file ourselves via save_snapshot(). - // Skip full validation (check_archived_root) — the staleness header - // already confirms this snapshot matches the current log state. - let archived = unsafe { rkyv::archived_root::(rkyv_data) }; - - let snap: Snapshot = > - ::deserialize(archived, &mut rkyv::Infallible).unwrap(); - - let mut store = Store { - nodes: snap.nodes, - relations: snap.relations, - gaps: snap.gaps, - params: snap.params, - ..Default::default() - }; - - // Rebuild uuid_to_key (not serialized) - for (key, node) in &store.nodes { - store.uuid_to_key.insert(node.uuid, key.clone()); - } - store.loaded_nodes_size = nodes_size; - store.loaded_rels_size = rels_size; - - Ok(Some(store)) - } } /// Check and repair corrupt capnp log files. @@ -802,14 +596,6 @@ pub fn fsck() -> Result<()> { } if any_corrupt { - // Nuke caches so next load replays from the repaired logs - for p in [state_path(), snapshot_path()] { - if p.exists() { - fs::remove_file(&p) - .with_context(|| format!("remove {}", p.display()))?; - eprintln!("removed stale cache: {}", p.display()); - } - } eprintln!("repair complete — run `poc-memory status` to verify"); } else { eprintln!("store is clean"); diff --git a/src/hippocampus/store/types.rs b/src/hippocampus/store/types.rs index 0cc7aad..0619295 100644 --- a/src/hippocampus/store/types.rs +++ b/src/hippocampus/store/types.rs @@ -93,8 +93,7 @@ pub fn memory_dir() -> PathBuf { pub fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") } pub(crate) fn relations_path() -> PathBuf { memory_dir().join("relations.capnp") } -pub(crate) fn state_path() -> PathBuf { memory_dir().join("state.bin") } -pub(crate) fn snapshot_path() -> PathBuf { memory_dir().join("snapshot.rkyv") } +pub(crate) fn db_path() -> PathBuf { memory_dir().join("index.redb") } fn lock_path() -> PathBuf { memory_dir().join(".store.lock") } /// RAII file lock using flock(2). Dropped when scope exits. @@ -184,8 +183,7 @@ pub fn today() -> String { } // In-memory node representation -#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Node { pub uuid: [u8; 16], pub version: u32, @@ -228,8 +226,7 @@ pub struct Node { pub degree: Option, } -#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Relation { pub uuid: [u8; 16], pub version: u32, @@ -244,8 +241,7 @@ pub struct Relation { pub target_key: String, } -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] pub enum NodeType { EpisodicSession, EpisodicDaily, @@ -254,8 +250,7 @@ pub enum NodeType { EpisodicMonthly, } -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] pub enum Provenance { Manual, Journal, @@ -319,8 +314,7 @@ impl Provenance { } } -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] pub enum RelationType { Link, Causal, @@ -395,8 +389,7 @@ impl Relation { } } -#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct RetrievalEvent { pub query: String, pub timestamp: String, @@ -404,8 +397,7 @@ pub struct RetrievalEvent { pub used: Option>, } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct Params { pub default_weight: f64, pub decay_factor: f64, @@ -431,8 +423,7 @@ impl Default for Params { } // Gap record — something we looked for but didn't find -#[derive(Clone, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct GapRecord { pub description: String, pub timestamp: String, @@ -442,57 +433,23 @@ pub struct GapRecord { pub(super) type VisitIndex = HashMap>; // The full in-memory store -#[derive(Default, Serialize, Deserialize)] +#[derive(Default)] pub struct Store { pub nodes: HashMap, // key → latest node - #[serde(skip)] pub uuid_to_key: HashMap<[u8; 16], String>, // uuid → key (rebuilt from nodes) pub relations: Vec, // all active relations pub retrieval_log: Vec, pub gaps: Vec, pub params: Params, /// Agent visit tracking: node_key → (agent_type → last_visit_epoch) - #[serde(default)] pub visits: VisitIndex, /// Transcript mining progress: (transcript_id, segment_index) → set of agents that processed it - #[serde(default)] pub transcript_progress: HashMap<(String, u32), HashSet>, - /// Log sizes at load time — used by save() to write correct staleness header. - /// If another writer appended since we loaded, our cache will be marked stale - /// (recorded size < actual size), forcing the next reader to replay the log. - #[serde(skip)] + /// Log sizes at load time — used for staleness detection. pub(crate) loaded_nodes_size: u64, - #[serde(skip)] pub(crate) loaded_rels_size: u64, } -/// Snapshot for mmap: full store state minus retrieval_log (which -/// is append-only in retrieval.log). rkyv zero-copy serialization -/// lets us mmap this and access archived data without deserialization. -#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] -#[archive(check_bytes)] -pub(crate) struct Snapshot { - pub(crate) nodes: HashMap, - pub(crate) relations: Vec, - pub(crate) gaps: Vec, - pub(crate) params: Params, -} - -// rkyv snapshot header: 32 bytes (multiple of 16 for alignment after mmap) -// [0..4] magic "RKV\x01" -// [4..8] format version (u32 LE) -// [8..16] nodes.capnp file size (u64 LE) — staleness check -// [16..24] relations.capnp file size (u64 LE) -// [24..32] rkyv data length (u64 LE) -pub(crate) const RKYV_MAGIC: [u8; 4] = *b"RKV\x01"; -pub(crate) const RKYV_HEADER_LEN: usize = 32; - -// state.bin header: magic + log file sizes for staleness detection. -// File sizes are race-free for append-only logs (they only grow), -// unlike mtimes which race with concurrent writers. -pub(crate) const CACHE_MAGIC: [u8; 4] = *b"POC\x01"; -pub(crate) const CACHE_HEADER_LEN: usize = 4 + 8 + 8; // magic + nodes_size + rels_size - // Cap'n Proto serialization helpers /// Read a capnp text field, returning empty string on any error diff --git a/src/hippocampus/store/view.rs b/src/hippocampus/store/view.rs index 80d7bd8..738c078 100644 --- a/src/hippocampus/store/view.rs +++ b/src/hippocampus/store/view.rs @@ -1,18 +1,9 @@ -// Read-only access abstractions for the memory store -// -// StoreView: trait abstracting over owned Store and zero-copy MmapView. -// MmapView: mmap'd rkyv snapshot for sub-millisecond read-only access. -// AnyView: enum dispatch selecting fastest available view at runtime. +// Read-only access abstraction for the memory store use super::types::*; -use std::fs; - // --------------------------------------------------------------------------- // StoreView: read-only access trait for search and graph code. -// -// Abstracts over owned Store and zero-copy MmapView so the same -// spreading-activation and graph code works with either. // --------------------------------------------------------------------------- pub trait StoreView { @@ -67,140 +58,3 @@ impl StoreView for Store { self.params } } - -// --------------------------------------------------------------------------- -// MmapView: zero-copy store access via mmap'd rkyv snapshot. -// -// Holds the mmap alive; all string reads go directly into the mapped -// pages without allocation. Falls back to None if snapshot is stale. -// --------------------------------------------------------------------------- - -pub struct MmapView { - mmap: memmap2::Mmap, - _file: fs::File, - data_offset: usize, - data_len: usize, -} - -impl MmapView { - /// Try to open a fresh rkyv snapshot. Returns None if missing or stale. - pub fn open() -> Option { - let path = snapshot_path(); - let file = fs::File::open(&path).ok()?; - let mmap = unsafe { memmap2::Mmap::map(&file) }.ok()?; - - if mmap.len() < RKYV_HEADER_LEN { return None; } - if mmap[..4] != RKYV_MAGIC { return None; } - - let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0); - let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0); - - let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap()); - let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap()); - let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize; - - if cached_nodes != nodes_size || cached_rels != rels_size { return None; } - if mmap.len() < RKYV_HEADER_LEN + data_len { return None; } - - Some(MmapView { mmap, _file: file, data_offset: RKYV_HEADER_LEN, data_len }) - } - - fn snapshot(&self) -> &ArchivedSnapshot { - let data = &self.mmap[self.data_offset..self.data_offset + self.data_len]; - unsafe { rkyv::archived_root::(data) } - } -} - -impl StoreView for MmapView { - fn for_each_node(&self, mut f: F) { - let snap = self.snapshot(); - for (key, node) in snap.nodes.iter() { - f(key, &node.content, node.weight); - } - } - - fn for_each_node_meta(&self, mut f: F) { - let snap = self.snapshot(); - for (key, node) in snap.nodes.iter() { - let nt = match node.node_type { - ArchivedNodeType::EpisodicSession => NodeType::EpisodicSession, - ArchivedNodeType::EpisodicDaily => NodeType::EpisodicDaily, - ArchivedNodeType::EpisodicWeekly => NodeType::EpisodicWeekly, - ArchivedNodeType::EpisodicMonthly => NodeType::EpisodicMonthly, - ArchivedNodeType::Semantic => NodeType::Semantic, - }; - f(key, nt, node.timestamp); - } - } - - fn for_each_relation(&self, mut f: F) { - let snap = self.snapshot(); - for rel in snap.relations.iter() { - if rel.deleted { continue; } - let rt = match rel.rel_type { - ArchivedRelationType::Link => RelationType::Link, - ArchivedRelationType::Causal => RelationType::Causal, - ArchivedRelationType::Auto => RelationType::Auto, - }; - f(&rel.source_key, &rel.target_key, rel.strength, rt); - } - } - - fn node_weight(&self, key: &str) -> f64 { - let snap = self.snapshot(); - snap.nodes.get(key) - .map(|n| n.weight as f64) - .unwrap_or(snap.params.default_weight) - } - - fn node_content(&self, key: &str) -> Option<&str> { - let snap = self.snapshot(); - snap.nodes.get(key).map(|n| &*n.content) - } - - fn params(&self) -> Params { - let p = &self.snapshot().params; - Params { - default_weight: p.default_weight, - decay_factor: p.decay_factor, - use_boost: p.use_boost, - prune_threshold: p.prune_threshold, - edge_decay: p.edge_decay, - max_hops: p.max_hops, - min_activation: p.min_activation, - } - } -} - -// --------------------------------------------------------------------------- -// AnyView: enum dispatch for read-only access. -// -// MmapView when the snapshot is fresh, owned Store as fallback. -// The match on each call is a single predicted branch — zero overhead. -// --------------------------------------------------------------------------- - -pub enum AnyView { - Mmap(MmapView), - Owned(Store), -} - -impl StoreView for AnyView { - fn for_each_node(&self, f: F) { - match self { AnyView::Mmap(v) => v.for_each_node(f), AnyView::Owned(s) => s.for_each_node(f) } - } - fn for_each_node_meta(&self, f: F) { - match self { AnyView::Mmap(v) => v.for_each_node_meta(f), AnyView::Owned(s) => s.for_each_node_meta(f) } - } - fn for_each_relation(&self, f: F) { - match self { AnyView::Mmap(v) => v.for_each_relation(f), AnyView::Owned(s) => s.for_each_relation(f) } - } - fn node_weight(&self, key: &str) -> f64 { - match self { AnyView::Mmap(v) => v.node_weight(key), AnyView::Owned(s) => s.node_weight(key) } - } - fn node_content(&self, key: &str) -> Option<&str> { - match self { AnyView::Mmap(v) => v.node_content(key), AnyView::Owned(s) => s.node_content(key) } - } - fn params(&self) -> Params { - match self { AnyView::Mmap(v) => v.params(), AnyView::Owned(s) => s.params() } - } -}