From 63910e987cff21793ee1b198bf6f08ab3d0f905b Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Sun, 8 Mar 2026 18:31:19 -0400 Subject: [PATCH] fsck: add store integrity check and repair command Reads each capnp log message sequentially, validates framing and content. On first corrupt message, truncates to last good position and removes stale caches so next load replays from repaired log. Wired up as `poc-memory fsck`. --- src/main.rs | 66 ++++++++++++++++++++++------ src/store/mod.rs | 1 + src/store/persist.rs | 101 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 153 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index ddf4a92..eff4ef0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,6 +63,7 @@ fn main() { "init" => cmd_init(), "migrate" => cmd_migrate(), "health" => cmd_health(), + "fsck" => cmd_fsck(), "status" => cmd_status(), "graph" => cmd_graph(), "used" => cmd_used(&args[2..]), @@ -192,6 +193,8 @@ Commands: journal-ts-migrate Populate created_at for nodes missing it load-context Output session-start context from the store render KEY Output a node's content to stdout + history [--full] KEY Show all stored versions of a node + --full shows complete content for every version write KEY Upsert node content from stdin import FILE [FILE...] Import markdown file(s) into the store export [FILE|--all] Export store nodes to markdown file(s) @@ -409,6 +412,10 @@ fn cmd_migrate() -> Result<(), String> { migrate::migrate() } +fn cmd_fsck() -> Result<(), String> { + store::fsck() +} + fn cmd_health() -> Result<(), String> { let store = store::Store::load()?; let g = store.build_graph(); @@ -1549,10 +1556,33 @@ fn cmd_render(args: &[String]) -> Result<(), String> { } fn cmd_history(args: &[String]) -> Result<(), String> { - if args.is_empty() { - return Err("Usage: poc-memory history KEY".into()); + use clap::Parser; + + /// Show all stored versions of a memory node + #[derive(Parser)] + #[command(name = "poc-memory history")] + struct HistoryArgs { + /// Show full content for every version (not just preview) + #[arg(long)] + full: bool, + /// Node key to look up + #[arg(required = true)] + key: Vec, } - let key = args.join(" "); + + let parsed = match HistoryArgs::try_parse_from( + std::iter::once("history".to_string()).chain(args.iter().cloned()) + ) { + Ok(p) => p, + Err(e) => { + // Let clap print its own help/error formatting directly + e.print().ok(); + std::process::exit(if e.use_stderr() { 1 } else { 0 }); + } + }; + + let full = parsed.full; + let key = parsed.key.join(" "); // Replay the node log, collecting all versions of this key let path = store::nodes_path(); @@ -1590,18 +1620,26 @@ fn cmd_history(args: &[String]) -> Result<(), String> { format!("(raw:{})", node.timestamp) }; let content_len = node.content.len(); - let preview: String = node.content.chars().take(120).collect(); - let preview = preview.replace('\n', "\\n"); - eprintln!(" v{:<3} {} {:24} w={:.3} {}b", - node.version, ts, node.provenance.label(), node.weight, content_len); - eprintln!(" {}", preview); + if full { + eprintln!("=== v{} {} {} w={:.3} {}b ===", + node.version, ts, node.provenance.label(), node.weight, content_len); + eprintln!("{}", node.content); + } else { + let preview: String = node.content.chars().take(120).collect(); + let preview = preview.replace('\n', "\\n"); + eprintln!(" v{:<3} {} {:24} w={:.3} {}b", + node.version, ts, node.provenance.label(), node.weight, content_len); + eprintln!(" {}", preview); + } } - // Show latest full content - if let Some(latest) = versions.last() { - eprintln!("\n--- Latest content (v{}, {}) ---", - latest.version, latest.provenance.label()); - print!("{}", latest.content); + if !full { + // Show latest full content + if let Some(latest) = versions.last() { + eprintln!("\n--- Latest content (v{}, {}) ---", + latest.version, latest.provenance.label()); + print!("{}", latest.content); + } } Ok(()) @@ -2118,7 +2156,7 @@ fn cmd_fact_mine_store(args: &[String]) -> Result<(), String> { if !path.exists() { return Err(format!("File not found: {}", args[0])); } - let count = fact_mine::mine_and_store(path)?; + let count = fact_mine::mine_and_store(path, None)?; eprintln!("Stored {} facts", count); Ok(()) } diff --git a/src/store/mod.rs b/src/store/mod.rs index 16b176c..1e0de5f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -29,6 +29,7 @@ mod ops; pub use types::*; pub use parse::{MemoryUnit, parse_units}; pub use view::{StoreView, AnyView}; +pub use persist::fsck; use crate::graph::{self, Graph}; diff --git a/src/store/persist.rs b/src/store/persist.rs index cac15dd..5450642 100644 --- a/src/store/persist.rs +++ b/src/store/persist.rs @@ -16,7 +16,7 @@ use capnp::serialize; use std::collections::HashMap; use std::fs; -use std::io::{BufReader, BufWriter, Write as IoWrite}; +use std::io::{BufReader, BufWriter, Seek, Write as IoWrite}; use std::path::Path; impl Store { @@ -339,3 +339,102 @@ impl Store { Ok(Some(store)) } } + +/// Check and repair corrupt capnp log files. +/// +/// Reads each message sequentially, tracking file position. On the first +/// corrupt message, truncates the file to the last good position. Also +/// removes stale caches so the next load replays from the repaired log. +pub fn fsck() -> Result<(), String> { + let mut any_corrupt = false; + + for (path, kind) in [ + (nodes_path(), "node"), + (relations_path(), "relation"), + ] { + if !path.exists() { continue; } + + let file = fs::File::open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let file_len = file.metadata() + .map_err(|e| format!("stat {}: {}", path.display(), e))?.len(); + let mut reader = BufReader::new(file); + + let mut good_messages = 0u64; + let mut last_good_pos = 0u64; + + loop { + let pos = reader.stream_position() + .map_err(|e| format!("tell {}: {}", path.display(), e))?; + + let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) { + Ok(m) => m, + Err(_) => { + // read_message fails at EOF (normal) or on corrupt framing + if pos < file_len { + // Not at EOF — corrupt framing + eprintln!("{}: corrupt message at offset {}, truncating", kind, pos); + any_corrupt = true; + drop(reader); + let file = fs::OpenOptions::new().write(true).open(&path) + .map_err(|e| format!("open for truncate: {}", e))?; + file.set_len(pos) + .map_err(|e| format!("truncate {}: {}", path.display(), e))?; + eprintln!("{}: truncated from {} to {} bytes ({} good messages)", + kind, file_len, pos, good_messages); + } + break; + } + }; + + // Validate the message content too + let valid = if kind == "node" { + msg.get_root::() + .and_then(|l| l.get_nodes().map(|_| ())) + .is_ok() + } else { + msg.get_root::() + .and_then(|l| l.get_relations().map(|_| ())) + .is_ok() + }; + + if valid { + good_messages += 1; + last_good_pos = reader.stream_position() + .map_err(|e| format!("tell {}: {}", path.display(), e))?; + } else { + eprintln!("{}: corrupt message content at offset {}, truncating to {}", + kind, pos, last_good_pos); + any_corrupt = true; + drop(reader); + let file = fs::OpenOptions::new().write(true).open(&path) + .map_err(|e| format!("open for truncate: {}", e))?; + file.set_len(last_good_pos) + .map_err(|e| format!("truncate {}: {}", path.display(), e))?; + eprintln!("{}: truncated from {} to {} bytes ({} good messages)", + kind, file_len, last_good_pos, good_messages); + break; + } + } + + if !any_corrupt { + eprintln!("{}: {} messages, all clean", kind, good_messages); + } + } + + if any_corrupt { + // Nuke caches so next load replays from the repaired logs + for p in [state_path(), snapshot_path()] { + if p.exists() { + fs::remove_file(&p) + .map_err(|e| format!("remove {}: {}", p.display(), e))?; + eprintln!("removed stale cache: {}", p.display()); + } + } + eprintln!("repair complete — run `poc-memory status` to verify"); + } else { + eprintln!("store is clean"); + } + + Ok(()) +}