consciousness/src/hippocampus/store/capnp.rs

649 lines
23 KiB
Rust
Raw Normal View History

// Cap'n Proto serialization and persistence
//
// capnp logs are the source of truth; redb provides indexed access.
// This module contains:
// - Serialization macros (capnp_enum!, capnp_message!)
// - Load/replay from capnp logs
// - Append to capnp logs
// - fsck (corruption repair)
use super::{index, types::*};
use redb::ReadableTableMetadata;
use crate::memory_capnp;
use super::Store;
use anyhow::{anyhow, Context, Result};
use capnp::message;
use capnp::serialize;
use std::collections::HashMap;
use std::fs;
use std::io::{BufReader, Seek};
use std::path::Path;
// ---------------------------------------------------------------------------
// Capnp serialization macros
//
// Declarative mapping between Rust types and capnp generated types.
// Adding a field to the schema means adding it in one place below;
// both read and write are generated from the same declaration.
// ---------------------------------------------------------------------------
/// Generate to_capnp/from_capnp conversion methods for an enum.
macro_rules! capnp_enum {
($rust_type:ident, $capnp_type:path, [$($variant:ident),+ $(,)?]) => {
impl $rust_type {
#[allow(clippy::wrong_self_convention, dead_code)]
pub(crate) fn to_capnp(&self) -> $capnp_type {
match self {
$(Self::$variant => <$capnp_type>::$variant,)+
}
}
pub(crate) fn from_capnp(v: $capnp_type) -> Self {
match v {
$(<$capnp_type>::$variant => Self::$variant,)+
}
}
}
};
}
/// Generate from_capnp/to_capnp methods for a struct with capnp serialization.
/// Fields are grouped by serialization kind:
/// text - capnp Text fields (String in Rust)
/// uuid - capnp Data fields ([u8; 16] in Rust)
/// prim - copy types (u32, f32, f64, bool)
/// enm - enums with to_capnp/from_capnp methods
/// skip - Rust-only fields not in capnp (set to Default on read)
macro_rules! capnp_message {
(
$struct:ident,
reader: $reader:ty,
builder: $builder:ty,
text: [$($tf:ident),* $(,)?],
uuid: [$($uf:ident),* $(,)?],
prim: [$($pf:ident),* $(,)?],
enm: [$($ef:ident: $et:ident),* $(,)?],
skip: [$($sf:ident),* $(,)?] $(,)?
) => {
impl $struct {
pub fn from_capnp(r: $reader) -> Result<Self> {
paste::paste! {
Ok(Self {
$($tf: read_text(r.[<get_ $tf>]()),)*
$($uf: read_uuid(r.[<get_ $uf>]()),)*
$($pf: r.[<get_ $pf>](),)*
$($ef: $et::from_capnp(
r.[<get_ $ef>]().map_err(|_| anyhow!(concat!("bad ", stringify!($ef))))?
),)*
$($sf: Default::default(),)*
})
}
}
pub fn to_capnp(&self, mut b: $builder) {
paste::paste! {
$(b.[<set_ $tf>](&self.$tf);)*
$(b.[<set_ $uf>](&self.$uf);)*
$(b.[<set_ $pf>](self.$pf);)*
$(b.[<set_ $ef>](self.$ef.to_capnp());)*
}
}
}
};
}
// ---------------------------------------------------------------------------
// Capnp helpers
// ---------------------------------------------------------------------------
/// Read a capnp text field, returning empty string on any error
fn read_text(result: capnp::Result<capnp::text::Reader>) -> String {
result.ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string()
}
/// Read a capnp data field as [u8; 16], zero-padded
fn read_uuid(result: capnp::Result<&[u8]>) -> [u8; 16] {
let mut out = [0u8; 16];
if let Ok(data) = result
&& data.len() >= 16 {
out.copy_from_slice(&data[..16]);
}
out
}
// ---------------------------------------------------------------------------
// Type-to-capnp mappings
// ---------------------------------------------------------------------------
capnp_enum!(NodeType, memory_capnp::NodeType,
[EpisodicSession, EpisodicDaily, EpisodicWeekly, Semantic, EpisodicMonthly]);
capnp_enum!(RelationType, memory_capnp::RelationType,
[Link, Causal, Auto]);
capnp_message!(Node,
reader: memory_capnp::content_node::Reader<'_>,
builder: memory_capnp::content_node::Builder<'_>,
text: [key, content, source_ref, provenance],
uuid: [uuid],
prim: [version, timestamp, weight, emotion, deleted,
retrievals, uses, wrongs, last_replayed,
spaced_repetition_interval, created_at, last_scored],
enm: [node_type: NodeType],
skip: [community_id, clustering_coefficient, degree],
);
capnp_message!(Relation,
reader: memory_capnp::relation::Reader<'_>,
builder: memory_capnp::relation::Builder<'_>,
text: [source_key, target_key, provenance],
uuid: [uuid, source, target],
prim: [version, timestamp, strength, deleted],
enm: [rel_type: RelationType],
skip: [],
);
// ---------------------------------------------------------------------------
// Migration helpers (legacy provenance enum → string)
// ---------------------------------------------------------------------------
/// Convert legacy capnp provenance enum to string label.
fn legacy_provenance_label(p: memory_capnp::Provenance) -> &'static str {
use memory_capnp::Provenance::*;
match p {
Manual => "manual",
Journal => "journal",
Agent => "agent",
Dream => "dream",
Derived => "derived",
AgentExperienceMine => "agent:experience-mine",
AgentKnowledgeObservation => "agent:knowledge-observation",
AgentKnowledgePattern => "agent:knowledge-pattern",
AgentKnowledgeConnector => "agent:knowledge-connector",
AgentKnowledgeChallenger => "agent:knowledge-challenger",
AgentConsolidate => "agent:consolidate",
AgentDigest => "agent:digest",
AgentFactMine => "agent:fact-mine",
AgentDecay => "agent:decay",
}
}
impl Node {
/// Read from capnp with migration: if the new provenance text field
/// is empty (old record), fall back to the deprecated provenanceOld enum.
pub fn from_capnp_migrate(r: memory_capnp::content_node::Reader<'_>) -> Result<Self> {
let mut node = Self::from_capnp(r)?;
if node.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {
node.provenance = legacy_provenance_label(old).to_string();
}
// Sanitize timestamps: old capnp records have raw offsets instead
// of unix epoch. Anything past year 2100 (~4102444800) is bogus.
const MAX_SANE_EPOCH: i64 = 4_102_444_800;
if node.timestamp > MAX_SANE_EPOCH || node.timestamp < 0 {
node.timestamp = node.created_at;
}
if node.created_at > MAX_SANE_EPOCH || node.created_at < 0 {
node.created_at = node.timestamp.min(MAX_SANE_EPOCH);
}
Ok(node)
}
}
impl Relation {
pub fn from_capnp_migrate(r: memory_capnp::relation::Reader<'_>) -> Result<Self> {
let mut rel = Self::from_capnp(r)?;
if rel.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {
rel.provenance = legacy_provenance_label(old).to_string();
}
Ok(rel)
}
}
// ---------------------------------------------------------------------------
// Store persistence methods
// ---------------------------------------------------------------------------
impl Store {
/// Load store by replaying capnp logs, then open/verify redb indices.
pub fn load() -> Result<Store> {
let nodes_p = nodes_path();
let rels_p = relations_path();
let mut store = Store::default();
if nodes_p.exists() {
store.replay_nodes(&nodes_p)?;
}
if rels_p.exists() {
store.replay_relations(&rels_p)?;
}
// Record log sizes after replay
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
// Drop edges referencing deleted/missing nodes
store.relations.retain(|r|
store.nodes.contains_key(&r.source_key) &&
store.nodes.contains_key(&r.target_key)
);
// Open redb and verify/rebuild indices
let db_p = db_path();
store.db = Some(store.open_or_rebuild_db(&db_p)?);
Ok(store)
}
/// Open redb database, rebuilding if unhealthy.
fn open_or_rebuild_db(&self, path: &Path) -> Result<redb::Database> {
// Try opening existing database
if path.exists() {
match index::open_db(path) {
Ok(database) => {
if self.db_is_healthy(&database)? {
return Ok(database);
}
eprintln!("redb index stale, rebuilding...");
}
Err(e) => {
eprintln!("redb open failed ({}), rebuilding...", e);
}
}
}
// Rebuild index from capnp log
rebuild_index(path, &nodes_path())
}
/// Check if redb indices match in-memory state.
fn db_is_healthy(&self, database: &redb::Database) -> Result<bool> {
use redb::ReadableDatabase;
let txn = database.begin_read()?;
// Quick check: node count should match
let nodes_table = txn.open_table(index::NODES)?;
let db_count = nodes_table.len()?;
if db_count != self.nodes.len() as u64 {
return Ok(false);
}
// Spot check: verify a few random nodes exist with matching keys
// (full verification would be too slow)
for (i, key) in self.nodes.keys().enumerate() {
if i >= 10 { break; } // check first 10
if nodes_table.get(key.as_str())?.is_none() {
return Ok(false);
}
}
Ok(true)
}
/// Replay node log, keeping latest version per UUID.
/// Tracks all UUIDs seen per key to detect duplicates.
fn replay_nodes(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Track all non-deleted UUIDs per key to detect duplicates
let mut key_uuids: HashMap<String, Vec<[u8; 16]>> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.with_context(|| format!("read node log"))?;
for node_reader in log.get_nodes()
.with_context(|| format!("get nodes"))? {
let node = Node::from_capnp_migrate(node_reader)?;
let existing_version = self.nodes.get(&node.key)
.map(|n| n.version)
.unwrap_or(0);
if node.version >= existing_version {
if node.deleted {
self.nodes.remove(&node.key);
self.uuid_to_key.remove(&node.uuid);
if let Some(uuids) = key_uuids.get_mut(&node.key) {
uuids.retain(|u| *u != node.uuid);
}
} else {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node.clone());
let uuids = key_uuids.entry(node.key).or_default();
if !uuids.contains(&node.uuid) {
uuids.push(node.uuid);
}
}
}
}
}
// Report duplicate keys
for (key, uuids) in &key_uuids {
if uuids.len() > 1 {
dbglog!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len());
}
}
Ok(())
}
/// Replay relation log, keeping latest version per UUID
fn replay_relations(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Collect all, then deduplicate by UUID keeping latest version
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
.with_context(|| format!("read relation log"))?;
for rel_reader in log.get_relations()
.with_context(|| format!("get relations"))? {
let rel = Relation::from_capnp_migrate(rel_reader)?;
let existing_version = by_uuid.get(&rel.uuid)
.map(|r| r.version)
.unwrap_or(0);
if rel.version >= existing_version {
by_uuid.insert(rel.uuid, rel);
}
}
}
self.relations = by_uuid.into_values()
.filter(|r| !r.deleted)
.collect();
Ok(())
}
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
/// Returns a map from key → vec of all live Node versions (one per UUID).
/// The "winner" in self.nodes is always one of them.
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>> {
let path = nodes_path();
if !path.exists() { return Ok(HashMap::new()); }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Track latest version of each UUID
let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.with_context(|| format!("read node log"))?;
for node_reader in log.get_nodes()
.with_context(|| format!("get nodes"))? {
let node = Node::from_capnp_migrate(node_reader)?;
let dominated = by_uuid.get(&node.uuid)
.map(|n| node.version >= n.version)
.unwrap_or(true);
if dominated {
by_uuid.insert(node.uuid, node);
}
}
}
// Group live (non-deleted) nodes by key
let mut by_key: HashMap<String, Vec<Node>> = HashMap::new();
for node in by_uuid.into_values() {
if !node.deleted {
by_key.entry(node.key.clone()).or_default().push(node);
}
}
// Keep only duplicates
by_key.retain(|_, nodes| nodes.len() > 1);
Ok(by_key)
}
/// Append nodes to the log file. Returns the offset where the message was written.
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<u64> {
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
let mut list = log.init_nodes(nodes.len() as u32);
for (i, node) in nodes.iter().enumerate() {
node.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize nodes"))?;
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.with_context(|| format!("open {}", path.display()))?;
// Get offset before writing
let offset = file.metadata().map(|m| m.len()).unwrap_or(0);
use std::io::Write;
(&file).write_all(&buf)
.with_context(|| format!("write nodes"))?;
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(offset)
}
/// Append relations to the log file.
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> {
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
let mut list = log.init_relations(relations.len() as u32);
for (i, rel) in relations.iter().enumerate() {
rel.to_capnp(list.reborrow().get(i as u32));
}
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.with_context(|| format!("serialize relations"))?;
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.with_context(|| format!("write relations"))?;
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
/// Placeholder - indices will be updated on write with redb.
pub fn save(&self) -> Result<()> {
Ok(())
}
}
/// Check and repair corrupt capnp log files.
///
/// Reads each message sequentially, tracking file position. On the first
/// corrupt message, truncates the file to the last good position. Also
/// removes stale caches so the next load replays from the repaired log.
pub fn fsck() -> Result<()> {
let mut any_corrupt = false;
for (path, kind) in [
(nodes_path(), "node"),
(relations_path(), "relation"),
] {
if !path.exists() { continue; }
let file = fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let file_len = file.metadata()
.with_context(|| format!("stat {}", path.display()))?.len();
let mut reader = BufReader::new(file);
let mut good_messages = 0u64;
let mut last_good_pos = 0u64;
loop {
let pos = reader.stream_position()
.with_context(|| format!("tell {}", path.display()))?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
Err(_) => {
// read_message fails at EOF (normal) or on corrupt framing
if pos < file_len {
// Not at EOF — corrupt framing
eprintln!("{}: corrupt message at offset {}, truncating", kind, pos);
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.with_context(|| format!("open for truncate"))?;
file.set_len(pos)
.with_context(|| format!("truncate {}", path.display()))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, pos, good_messages);
}
break;
}
};
// Validate the message content too
let valid = if kind == "node" {
msg.get_root::<memory_capnp::node_log::Reader>()
.and_then(|l| l.get_nodes().map(|_| ()))
.is_ok()
} else {
msg.get_root::<memory_capnp::relation_log::Reader>()
.and_then(|l| l.get_relations().map(|_| ()))
.is_ok()
};
if valid {
good_messages += 1;
last_good_pos = reader.stream_position()
.with_context(|| format!("tell {}", path.display()))?;
} else {
eprintln!("{}: corrupt message content at offset {}, truncating to {}",
kind, pos, last_good_pos);
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.with_context(|| format!("open for truncate"))?;
file.set_len(last_good_pos)
.with_context(|| format!("truncate {}", path.display()))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, last_good_pos, good_messages);
break;
}
}
if !any_corrupt {
eprintln!("{}: {} messages, all clean", kind, good_messages);
}
}
if any_corrupt {
eprintln!("repair complete — run `poc-memory status` to verify");
} else {
eprintln!("store is clean");
}
Ok(())
}
/// Rebuild redb index from capnp log.
/// Scans the log, tracking offsets, and records latest version of each node.
fn rebuild_index(db_path: &Path, capnp_path: &Path) -> Result<redb::Database> {
// Remove old database if it exists
if db_path.exists() {
fs::remove_file(db_path)
.with_context(|| format!("remove old db {}", db_path.display()))?;
}
let database = index::open_db(db_path)?;
if !capnp_path.exists() {
return Ok(database);
}
// Track latest (offset, uuid, version, deleted) per key
let mut latest: HashMap<String, (u64, [u8; 16], u32, bool)> = HashMap::new();
let file = fs::File::open(capnp_path)
.with_context(|| format!("open {}", capnp_path.display()))?;
let mut reader = BufReader::new(file);
loop {
let offset = reader.stream_position()?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
Err(_) => break,
};
let log = match msg.get_root::<memory_capnp::node_log::Reader>() {
Ok(l) => l,
Err(_) => continue,
};
let nodes = match log.get_nodes() {
Ok(n) => n,
Err(_) => continue,
};
for node_reader in nodes {
let key = node_reader.get_key().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string();
if key.is_empty() { continue; }
let version = node_reader.get_version();
let deleted = node_reader.get_deleted();
let mut uuid = [0u8; 16];
if let Ok(data) = node_reader.get_uuid() {
if data.len() >= 16 {
uuid.copy_from_slice(&data[..16]);
}
}
// Keep if newer version
let dominated = latest.get(&key)
.map(|(_, _, v, _)| version >= *v)
.unwrap_or(true);
if dominated {
latest.insert(key, (offset, uuid, version, deleted));
}
}
}
// Write index entries for non-deleted nodes
{
let txn = database.begin_write()?;
{
let mut nodes_table = txn.open_table(index::NODES)?;
let mut uuid_table = txn.open_table(index::UUID_TO_KEY)?;
for (key, (offset, uuid, _, deleted)) in latest {
if !deleted {
nodes_table.insert(key.as_str(), offset)?;
uuid_table.insert(uuid.as_slice(), key.as_str())?;
}
}
}
txn.commit()?;
}
Ok(database)
}