Duplicate key warnings fire on every store load and were writing to stderr, corrupting the TUI display. Log write warnings and MCP server failures are similarly routine. Route these to dbglog. Serious errors (rkyv snapshot failures, store corruption) remain on stderr — those are real problems the user needs to see. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
846 lines
34 KiB
Rust
846 lines
34 KiB
Rust
// Persistence layer: load, save, replay, append, snapshot
|
|
//
|
|
// Three-tier loading strategy:
|
|
// 1. rkyv mmap snapshot (snapshot.rkyv) — ~4ms deserialize
|
|
// 2. bincode cache (state.bin) — ~10ms
|
|
// 3. capnp log replay — ~40ms
|
|
//
|
|
// Logs are append-only; cache staleness uses log file sizes, not mtimes.
|
|
|
|
use super::types::*;
|
|
|
|
use crate::memory_capnp;
|
|
|
|
use capnp::message;
|
|
use capnp::serialize;
|
|
|
|
use std::collections::HashMap;
|
|
use std::fs;
|
|
use std::io::{BufReader, Seek};
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
|
|
/// Process-global cached store. Reloads only when log files change.
|
|
static CACHED_STORE: tokio::sync::OnceCell<Arc<tokio::sync::Mutex<Store>>> =
|
|
tokio::sync::OnceCell::const_new();
|
|
|
|
impl Store {
|
|
/// Get or create the process-global cached store.
|
|
/// Reloads from disk if log files have changed since last load.
|
|
pub async fn cached() -> Result<Arc<tokio::sync::Mutex<Store>>, String> {
|
|
let store = CACHED_STORE.get_or_try_init(|| async {
|
|
let s = Store::load()?;
|
|
Ok::<_, String>(Arc::new(tokio::sync::Mutex::new(s)))
|
|
}).await?;
|
|
{
|
|
let mut guard = store.lock().await;
|
|
if guard.is_stale() {
|
|
*guard = Store::load()?;
|
|
}
|
|
}
|
|
Ok(store.clone())
|
|
}
|
|
|
|
/// Check if the on-disk logs have grown since we loaded.
|
|
pub fn is_stale(&self) -> bool {
|
|
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
|
|
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
|
|
nodes_size != self.loaded_nodes_size || rels_size != self.loaded_rels_size
|
|
}
|
|
|
|
/// Load store from state.bin cache if fresh, otherwise rebuild from capnp logs.
|
|
///
|
|
/// Staleness check uses log file sizes (not mtimes). Since logs are
|
|
/// append-only, any write grows the file, invalidating the cache.
|
|
/// This avoids the mtime race that caused data loss with concurrent
|
|
/// writers (dream loop, link audit, journal enrichment).
|
|
pub fn load() -> Result<Store, String> {
|
|
// 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy)
|
|
match Self::load_snapshot_mmap() {
|
|
Ok(Some(mut store)) => {
|
|
// rkyv snapshot doesn't include visits — replay from log
|
|
let visits_p = visits_path();
|
|
if visits_p.exists() {
|
|
store.replay_visits(&visits_p).ok();
|
|
}
|
|
let tp_p = transcript_progress_path();
|
|
if tp_p.exists() {
|
|
store.replay_transcript_progress(&tp_p).ok();
|
|
}
|
|
return Ok(store);
|
|
},
|
|
Ok(None) => {},
|
|
Err(e) => eprintln!("rkyv snapshot: {}", e),
|
|
}
|
|
|
|
// 2. Try bincode state.bin cache (~10ms)
|
|
let nodes_p = nodes_path();
|
|
let rels_p = relations_path();
|
|
let state_p = state_path();
|
|
|
|
let nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
|
let rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
|
|
|
|
if let Ok(data) = fs::read(&state_p)
|
|
&& data.len() >= CACHE_HEADER_LEN && data[..4] == CACHE_MAGIC {
|
|
let cached_nodes = u64::from_le_bytes(data[4..12].try_into().unwrap());
|
|
let cached_rels = u64::from_le_bytes(data[12..20].try_into().unwrap());
|
|
|
|
if cached_nodes == nodes_size && cached_rels == rels_size
|
|
&& let Ok(mut store) = bincode::deserialize::<Store>(&data[CACHE_HEADER_LEN..]) {
|
|
// Rebuild uuid_to_key (skipped by serde)
|
|
for (key, node) in &store.nodes {
|
|
store.uuid_to_key.insert(node.uuid, key.clone());
|
|
}
|
|
store.loaded_nodes_size = nodes_size;
|
|
store.loaded_rels_size = rels_size;
|
|
// Bootstrap: write rkyv snapshot if missing
|
|
if !snapshot_path().exists()
|
|
&& let Err(e) = store.save_snapshot(cached_nodes, cached_rels) {
|
|
eprintln!("rkyv bootstrap: {}", e);
|
|
}
|
|
return Ok(store);
|
|
}
|
|
}
|
|
|
|
// Stale or no cache — rebuild from capnp logs
|
|
let mut store = Store::default();
|
|
|
|
if nodes_p.exists() {
|
|
store.replay_nodes(&nodes_p)?;
|
|
}
|
|
if rels_p.exists() {
|
|
store.replay_relations(&rels_p)?;
|
|
}
|
|
let visits_p = visits_path();
|
|
if visits_p.exists() {
|
|
store.replay_visits(&visits_p)?;
|
|
}
|
|
let tp_p = transcript_progress_path();
|
|
if tp_p.exists() {
|
|
store.replay_transcript_progress(&tp_p)?;
|
|
}
|
|
|
|
// Record log sizes after replay — this is the state we reflect
|
|
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
|
store.loaded_rels_size = fs::metadata(&rels_p).map(|m| m.len()).unwrap_or(0);
|
|
|
|
// Drop edges referencing deleted/missing nodes
|
|
store.relations.retain(|r|
|
|
store.nodes.contains_key(&r.source_key) &&
|
|
store.nodes.contains_key(&r.target_key)
|
|
);
|
|
|
|
store.save()?;
|
|
Ok(store)
|
|
}
|
|
|
|
/// Load store directly from capnp logs, bypassing all caches.
|
|
/// Used by fsck to verify cache consistency.
|
|
pub fn load_from_logs() -> Result<Store, String> {
|
|
let nodes_p = nodes_path();
|
|
let rels_p = relations_path();
|
|
|
|
let mut store = Store::default();
|
|
if nodes_p.exists() {
|
|
store.replay_nodes(&nodes_p)?;
|
|
}
|
|
if rels_p.exists() {
|
|
store.replay_relations(&rels_p)?;
|
|
}
|
|
let visits_p = visits_path();
|
|
if visits_p.exists() {
|
|
store.replay_visits(&visits_p)?;
|
|
}
|
|
let tp_p = transcript_progress_path();
|
|
if tp_p.exists() {
|
|
store.replay_transcript_progress(&tp_p)?;
|
|
}
|
|
Ok(store)
|
|
}
|
|
|
|
/// Replay node log, keeping latest version per UUID.
|
|
/// Tracks all UUIDs seen per key to detect duplicates.
|
|
fn replay_nodes(&mut self, path: &Path) -> Result<(), String> {
|
|
let file = fs::File::open(path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
// Track all non-deleted UUIDs per key to detect duplicates
|
|
let mut key_uuids: HashMap<String, Vec<[u8; 16]>> = HashMap::new();
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
.map_err(|e| format!("read node log: {}", e))?;
|
|
for node_reader in log.get_nodes()
|
|
.map_err(|e| format!("get nodes: {}", e))? {
|
|
let node = Node::from_capnp_migrate(node_reader)?;
|
|
let existing_version = self.nodes.get(&node.key)
|
|
.map(|n| n.version)
|
|
.unwrap_or(0);
|
|
if node.version >= existing_version {
|
|
if node.deleted {
|
|
self.nodes.remove(&node.key);
|
|
self.uuid_to_key.remove(&node.uuid);
|
|
if let Some(uuids) = key_uuids.get_mut(&node.key) {
|
|
uuids.retain(|u| *u != node.uuid);
|
|
}
|
|
} else {
|
|
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
|
self.nodes.insert(node.key.clone(), node.clone());
|
|
let uuids = key_uuids.entry(node.key).or_default();
|
|
if !uuids.contains(&node.uuid) {
|
|
uuids.push(node.uuid);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Report duplicate keys
|
|
for (key, uuids) in &key_uuids {
|
|
if uuids.len() > 1 {
|
|
dbglog!("WARNING: key '{}' has {} UUIDs (duplicate nodes)", key, uuids.len());
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Replay relation log, keeping latest version per UUID
|
|
fn replay_relations(&mut self, path: &Path) -> Result<(), String> {
|
|
let file = fs::File::open(path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
// Collect all, then deduplicate by UUID keeping latest version
|
|
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
|
|
.map_err(|e| format!("read relation log: {}", e))?;
|
|
for rel_reader in log.get_relations()
|
|
.map_err(|e| format!("get relations: {}", e))? {
|
|
let rel = Relation::from_capnp_migrate(rel_reader)?;
|
|
let existing_version = by_uuid.get(&rel.uuid)
|
|
.map(|r| r.version)
|
|
.unwrap_or(0);
|
|
if rel.version >= existing_version {
|
|
by_uuid.insert(rel.uuid, rel);
|
|
}
|
|
}
|
|
}
|
|
|
|
self.relations = by_uuid.into_values()
|
|
.filter(|r| !r.deleted)
|
|
.collect();
|
|
Ok(())
|
|
}
|
|
|
|
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
|
|
/// Returns a map from key → vec of all live Node versions (one per UUID).
|
|
/// The "winner" in self.nodes is always one of them.
|
|
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>, String> {
|
|
let path = nodes_path();
|
|
if !path.exists() { return Ok(HashMap::new()); }
|
|
|
|
let file = fs::File::open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
// Track latest version of each UUID
|
|
let mut by_uuid: HashMap<[u8; 16], Node> = HashMap::new();
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
.map_err(|e| format!("read node log: {}", e))?;
|
|
for node_reader in log.get_nodes()
|
|
.map_err(|e| format!("get nodes: {}", e))? {
|
|
let node = Node::from_capnp_migrate(node_reader)?;
|
|
let dominated = by_uuid.get(&node.uuid)
|
|
.map(|n| node.version >= n.version)
|
|
.unwrap_or(true);
|
|
if dominated {
|
|
by_uuid.insert(node.uuid, node);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Group live (non-deleted) nodes by key
|
|
let mut by_key: HashMap<String, Vec<Node>> = HashMap::new();
|
|
for node in by_uuid.into_values() {
|
|
if !node.deleted {
|
|
by_key.entry(node.key.clone()).or_default().push(node);
|
|
}
|
|
}
|
|
|
|
// Keep only duplicates
|
|
by_key.retain(|_, nodes| nodes.len() > 1);
|
|
Ok(by_key)
|
|
}
|
|
|
|
/// Append nodes to the log file.
|
|
/// Serializes to a Vec first, then does a single write() syscall
|
|
/// so the append is atomic with O_APPEND even without flock.
|
|
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> {
|
|
let _lock = StoreLock::acquire()?;
|
|
self.append_nodes_unlocked(nodes)
|
|
}
|
|
|
|
/// Append nodes without acquiring the lock. Caller must hold StoreLock.
|
|
pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<(), String> {
|
|
let mut msg = message::Builder::new_default();
|
|
{
|
|
let log = msg.init_root::<memory_capnp::node_log::Builder>();
|
|
let mut list = log.init_nodes(nodes.len() as u32);
|
|
for (i, node) in nodes.iter().enumerate() {
|
|
node.to_capnp(list.reborrow().get(i as u32));
|
|
}
|
|
}
|
|
let mut buf = Vec::new();
|
|
serialize::write_message(&mut buf, &msg)
|
|
.map_err(|e| format!("serialize nodes: {}", e))?;
|
|
|
|
let path = nodes_path();
|
|
let file = fs::OpenOptions::new()
|
|
.create(true).append(true).open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
use std::io::Write;
|
|
(&file).write_all(&buf)
|
|
.map_err(|e| format!("write nodes: {}", e))?;
|
|
|
|
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
|
|
Ok(())
|
|
}
|
|
|
|
/// Replay only new entries appended to the node log since we last loaded.
|
|
/// Call under StoreLock to catch writes from concurrent processes.
|
|
pub(crate) fn refresh_nodes(&mut self) -> Result<(), String> {
|
|
let path = nodes_path();
|
|
let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
|
|
if current_size <= self.loaded_nodes_size {
|
|
return Ok(()); // no new data
|
|
}
|
|
|
|
let file = fs::File::open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
let mut reader = BufReader::new(file);
|
|
reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size))
|
|
.map_err(|e| format!("seek nodes log: {}", e))?;
|
|
|
|
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
let log = msg.get_root::<memory_capnp::node_log::Reader>()
|
|
.map_err(|e| format!("read node log delta: {}", e))?;
|
|
for node_reader in log.get_nodes()
|
|
.map_err(|e| format!("get nodes delta: {}", e))? {
|
|
let node = Node::from_capnp_migrate(node_reader)?;
|
|
let dominated = self.nodes.get(&node.key)
|
|
.map(|n| node.version >= n.version)
|
|
.unwrap_or(true);
|
|
if dominated {
|
|
if node.deleted {
|
|
self.nodes.remove(&node.key);
|
|
self.uuid_to_key.remove(&node.uuid);
|
|
} else {
|
|
self.uuid_to_key.insert(node.uuid, node.key.clone());
|
|
self.nodes.insert(node.key.clone(), node);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
self.loaded_nodes_size = current_size;
|
|
Ok(())
|
|
}
|
|
|
|
/// Append relations to the log file.
|
|
/// Single write() syscall for atomic O_APPEND.
|
|
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> {
|
|
let _lock = StoreLock::acquire()?;
|
|
self.append_relations_unlocked(relations)
|
|
}
|
|
|
|
/// Append relations without acquiring the lock. Caller must hold StoreLock.
|
|
pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<(), String> {
|
|
let mut msg = message::Builder::new_default();
|
|
{
|
|
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
|
|
let mut list = log.init_relations(relations.len() as u32);
|
|
for (i, rel) in relations.iter().enumerate() {
|
|
rel.to_capnp(list.reborrow().get(i as u32));
|
|
}
|
|
}
|
|
let mut buf = Vec::new();
|
|
serialize::write_message(&mut buf, &msg)
|
|
.map_err(|e| format!("serialize relations: {}", e))?;
|
|
|
|
let path = relations_path();
|
|
let file = fs::OpenOptions::new()
|
|
.create(true).append(true).open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
use std::io::Write;
|
|
(&file).write_all(&buf)
|
|
.map_err(|e| format!("write relations: {}", e))?;
|
|
|
|
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
|
|
Ok(())
|
|
}
|
|
|
|
/// Append agent visit records to the visits log.
|
|
pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<(), String> {
|
|
if visits.is_empty() { return Ok(()); }
|
|
|
|
let mut msg = message::Builder::new_default();
|
|
{
|
|
let log = msg.init_root::<memory_capnp::agent_visit_log::Builder>();
|
|
let mut list = log.init_visits(visits.len() as u32);
|
|
for (i, visit) in visits.iter().enumerate() {
|
|
visit.to_capnp(list.reborrow().get(i as u32));
|
|
}
|
|
}
|
|
let mut buf = Vec::new();
|
|
serialize::write_message(&mut buf, &msg)
|
|
.map_err(|e| format!("serialize visits: {}", e))?;
|
|
|
|
let path = visits_path();
|
|
let file = fs::OpenOptions::new()
|
|
.create(true).append(true).open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
use std::io::Write;
|
|
(&file).write_all(&buf)
|
|
.map_err(|e| format!("write visits: {}", e))?;
|
|
|
|
// Update in-memory index
|
|
for v in visits {
|
|
self.visits
|
|
.entry(v.node_key.clone())
|
|
.or_default()
|
|
.insert(v.agent.clone(), v.timestamp);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Replay visits log to rebuild in-memory index.
|
|
fn replay_visits(&mut self, path: &Path) -> Result<(), String> {
|
|
let file = fs::File::open(path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
while reader.stream_position().map_err(|e| e.to_string())?
|
|
< fs::metadata(path).map_err(|e| e.to_string())?.len()
|
|
{
|
|
let msg = match serialize::read_message(&mut reader, Default::default()) {
|
|
Ok(m) => m,
|
|
Err(_) => break,
|
|
};
|
|
let log = msg.get_root::<memory_capnp::agent_visit_log::Reader>()
|
|
.map_err(|e| format!("read visit log: {}", e))?;
|
|
|
|
for visit in log.get_visits().map_err(|e| e.to_string())? {
|
|
let key = visit.get_node_key().ok()
|
|
.and_then(|t| t.to_str().ok())
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let agent = visit.get_agent().ok()
|
|
.and_then(|t| t.to_str().ok())
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let ts = visit.get_timestamp();
|
|
|
|
if !key.is_empty() && !agent.is_empty() {
|
|
let entry = self.visits.entry(key).or_default();
|
|
// Keep latest timestamp per agent
|
|
let existing = entry.entry(agent).or_insert(0);
|
|
if ts > *existing {
|
|
*existing = ts;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Append transcript segment progress records.
|
|
pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<(), String> {
|
|
if segments.is_empty() { return Ok(()); }
|
|
|
|
let mut msg = message::Builder::new_default();
|
|
{
|
|
let log = msg.init_root::<memory_capnp::transcript_progress_log::Builder>();
|
|
let mut list = log.init_segments(segments.len() as u32);
|
|
for (i, seg) in segments.iter().enumerate() {
|
|
seg.to_capnp(list.reborrow().get(i as u32));
|
|
}
|
|
}
|
|
let mut buf = Vec::new();
|
|
serialize::write_message(&mut buf, &msg)
|
|
.map_err(|e| format!("serialize transcript progress: {}", e))?;
|
|
|
|
let path = transcript_progress_path();
|
|
let file = fs::OpenOptions::new()
|
|
.create(true).append(true).open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
use std::io::Write;
|
|
(&file).write_all(&buf)
|
|
.map_err(|e| format!("write transcript progress: {}", e))?;
|
|
|
|
// Update in-memory index
|
|
for seg in segments {
|
|
self.transcript_progress
|
|
.entry((seg.transcript_id.clone(), seg.segment_index))
|
|
.or_default()
|
|
.insert(seg.agent.clone());
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Replay transcript progress log to rebuild in-memory index.
|
|
fn replay_transcript_progress(&mut self, path: &Path) -> Result<(), String> {
|
|
let file = fs::File::open(path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
let mut reader = BufReader::new(file);
|
|
|
|
while reader.stream_position().map_err(|e| e.to_string())?
|
|
< fs::metadata(path).map_err(|e| e.to_string())?.len()
|
|
{
|
|
let msg = match serialize::read_message(&mut reader, Default::default()) {
|
|
Ok(m) => m,
|
|
Err(_) => break,
|
|
};
|
|
let log = msg.get_root::<memory_capnp::transcript_progress_log::Reader>()
|
|
.map_err(|e| format!("read transcript progress: {}", e))?;
|
|
|
|
for seg in log.get_segments().map_err(|e| e.to_string())? {
|
|
let id = seg.get_transcript_id().ok()
|
|
.and_then(|t| t.to_str().ok())
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let agent = seg.get_agent().ok()
|
|
.and_then(|t| t.to_str().ok())
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let idx = seg.get_segment_index();
|
|
|
|
if !id.is_empty() && !agent.is_empty() {
|
|
self.transcript_progress
|
|
.entry((id, idx))
|
|
.or_default()
|
|
.insert(agent);
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Migrate old stub-node transcript markers into the new progress log.
|
|
/// Reads _observed-transcripts-f-*, _mined-transcripts#f-*, and _facts-* keys,
|
|
/// extracts transcript_id and segment_index, writes to transcript-progress.capnp,
|
|
/// then deletes the stub nodes.
|
|
pub fn migrate_transcript_progress(&mut self) -> Result<usize, String> {
|
|
let mut segments = Vec::new();
|
|
|
|
for key in self.nodes.keys() {
|
|
// _observed-transcripts-f-{UUID}.{segment}
|
|
if let Some(rest) = key.strip_prefix("_observed-transcripts-f-") {
|
|
if let Some((uuid, seg_str)) = rest.rsplit_once('.')
|
|
&& let Ok(seg) = seg_str.parse::<u32>() {
|
|
segments.push(new_transcript_segment(uuid, seg, "observation"));
|
|
}
|
|
}
|
|
// _mined-transcripts#f-{UUID}.{segment}
|
|
else if let Some(rest) = key.strip_prefix("_mined-transcripts#f-") {
|
|
if let Some((uuid, seg_str)) = rest.rsplit_once('.')
|
|
&& let Ok(seg) = seg_str.parse::<u32>() {
|
|
segments.push(new_transcript_segment(uuid, seg, "experience"));
|
|
}
|
|
}
|
|
// _mined-transcripts-f-{UUID}.{segment}
|
|
else if let Some(rest) = key.strip_prefix("_mined-transcripts-f-") {
|
|
if let Some((uuid, seg_str)) = rest.rsplit_once('.')
|
|
&& let Ok(seg) = seg_str.parse::<u32>() {
|
|
segments.push(new_transcript_segment(uuid, seg, "experience"));
|
|
}
|
|
}
|
|
// _facts-{UUID} (whole-file, segment 0)
|
|
else if let Some(uuid) = key.strip_prefix("_facts-") {
|
|
if !uuid.contains('-') || uuid.len() < 30 { continue; } // skip non-UUID
|
|
segments.push(new_transcript_segment(uuid, 0, "fact"));
|
|
}
|
|
}
|
|
|
|
let count = segments.len();
|
|
if count > 0 {
|
|
self.append_transcript_progress(&segments)?;
|
|
}
|
|
|
|
// Soft-delete the old stub nodes
|
|
let keys_to_delete: Vec<String> = self.nodes.keys()
|
|
.filter(|k| k.starts_with("_observed-transcripts-")
|
|
|| k.starts_with("_mined-transcripts")
|
|
|| (k.starts_with("_facts-") && !k.contains("fact_mine")))
|
|
.cloned()
|
|
.collect();
|
|
|
|
for key in &keys_to_delete {
|
|
if let Some(node) = self.nodes.get_mut(key) {
|
|
node.deleted = true;
|
|
}
|
|
}
|
|
|
|
if !keys_to_delete.is_empty() {
|
|
self.save()?;
|
|
}
|
|
|
|
Ok(count)
|
|
}
|
|
|
|
/// Record visits for a batch of node keys from a successful agent run.
|
|
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> {
|
|
let visits: Vec<AgentVisit> = node_keys.iter()
|
|
.filter_map(|key| {
|
|
let node = self.nodes.get(key)?;
|
|
Some(new_visit(node.uuid, key, agent, "processed"))
|
|
})
|
|
.collect();
|
|
self.append_visits(&visits)
|
|
}
|
|
|
|
/// Get the last time an agent visited a node. Returns 0 if never visited.
|
|
pub fn last_visited(&self, node_key: &str, agent: &str) -> i64 {
|
|
self.visits.get(node_key)
|
|
.and_then(|agents| agents.get(agent))
|
|
.copied()
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
/// Save the derived cache with log size header for staleness detection.
|
|
/// Uses atomic write (tmp + rename) to prevent partial reads.
|
|
pub fn save(&self) -> Result<(), String> {
|
|
let _lock = StoreLock::acquire()?;
|
|
|
|
let path = state_path();
|
|
if let Some(parent) = path.parent() {
|
|
fs::create_dir_all(parent).ok();
|
|
}
|
|
|
|
// Use log sizes from load time, not current filesystem sizes.
|
|
// If another writer appended since we loaded, our recorded size
|
|
// will be smaller than the actual log → next reader detects stale
|
|
// cache and replays the (correct, append-only) log.
|
|
let nodes_size = self.loaded_nodes_size;
|
|
let rels_size = self.loaded_rels_size;
|
|
|
|
let bincode_data = bincode::serialize(self)
|
|
.map_err(|e| format!("bincode serialize: {}", e))?;
|
|
|
|
let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len());
|
|
data.extend_from_slice(&CACHE_MAGIC);
|
|
data.extend_from_slice(&nodes_size.to_le_bytes());
|
|
data.extend_from_slice(&rels_size.to_le_bytes());
|
|
data.extend_from_slice(&bincode_data);
|
|
|
|
// Atomic write: tmp file + rename
|
|
let tmp_path = path.with_extension("bin.tmp");
|
|
fs::write(&tmp_path, &data)
|
|
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
|
|
fs::rename(&tmp_path, &path)
|
|
.map_err(|e| format!("rename {} → {}: {}", tmp_path.display(), path.display(), e))?;
|
|
|
|
// Also write rkyv snapshot (mmap-friendly)
|
|
if let Err(e) = self.save_snapshot(nodes_size, rels_size) {
|
|
eprintln!("rkyv snapshot save: {}", e);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Serialize store as rkyv snapshot with staleness header.
|
|
/// Assumes StoreLock is already held by caller.
|
|
fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> {
|
|
let snap = Snapshot {
|
|
nodes: self.nodes.clone(),
|
|
relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(),
|
|
gaps: self.gaps.clone(),
|
|
params: self.params,
|
|
};
|
|
|
|
let rkyv_data = rkyv::to_bytes::<_, 256>(&snap)
|
|
.map_err(|e| format!("rkyv serialize: {}", e))?;
|
|
|
|
let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len());
|
|
data.extend_from_slice(&RKYV_MAGIC);
|
|
data.extend_from_slice(&1u32.to_le_bytes()); // format version
|
|
data.extend_from_slice(&nodes_size.to_le_bytes());
|
|
data.extend_from_slice(&rels_size.to_le_bytes());
|
|
data.extend_from_slice(&(rkyv_data.len() as u64).to_le_bytes());
|
|
data.extend_from_slice(&rkyv_data);
|
|
|
|
let path = snapshot_path();
|
|
let tmp_path = path.with_extension("rkyv.tmp");
|
|
fs::write(&tmp_path, &data)
|
|
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
|
|
fs::rename(&tmp_path, &path)
|
|
.map_err(|e| format!("rename: {}", e))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Try loading store from mmap'd rkyv snapshot.
|
|
/// Returns None if snapshot is missing or stale (log sizes don't match).
|
|
fn load_snapshot_mmap() -> Result<Option<Store>, String> {
|
|
let path = snapshot_path();
|
|
if !path.exists() { return Ok(None); }
|
|
|
|
let nodes_size = fs::metadata(nodes_path()).map(|m| m.len()).unwrap_or(0);
|
|
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
|
|
|
|
let file = fs::File::open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
|
|
let mmap = unsafe { memmap2::Mmap::map(&file) }
|
|
.map_err(|e| format!("mmap {}: {}", path.display(), e))?;
|
|
|
|
if mmap.len() < RKYV_HEADER_LEN { return Ok(None); }
|
|
if mmap[..4] != RKYV_MAGIC { return Ok(None); }
|
|
|
|
// [4..8] = version, skip for now
|
|
let cached_nodes = u64::from_le_bytes(mmap[8..16].try_into().unwrap());
|
|
let cached_rels = u64::from_le_bytes(mmap[16..24].try_into().unwrap());
|
|
let data_len = u64::from_le_bytes(mmap[24..32].try_into().unwrap()) as usize;
|
|
|
|
if cached_nodes != nodes_size || cached_rels != rels_size {
|
|
return Ok(None); // stale
|
|
}
|
|
if mmap.len() < RKYV_HEADER_LEN + data_len {
|
|
return Ok(None); // truncated
|
|
}
|
|
|
|
let rkyv_data = &mmap[RKYV_HEADER_LEN..RKYV_HEADER_LEN + data_len];
|
|
|
|
// SAFETY: we wrote this file ourselves via save_snapshot().
|
|
// Skip full validation (check_archived_root) — the staleness header
|
|
// already confirms this snapshot matches the current log state.
|
|
let archived = unsafe { rkyv::archived_root::<Snapshot>(rkyv_data) };
|
|
|
|
let snap: Snapshot = <ArchivedSnapshot as rkyv::Deserialize<Snapshot, rkyv::Infallible>>
|
|
::deserialize(archived, &mut rkyv::Infallible).unwrap();
|
|
|
|
let mut store = Store {
|
|
nodes: snap.nodes,
|
|
relations: snap.relations,
|
|
gaps: snap.gaps,
|
|
params: snap.params,
|
|
..Default::default()
|
|
};
|
|
|
|
// Rebuild uuid_to_key (not serialized)
|
|
for (key, node) in &store.nodes {
|
|
store.uuid_to_key.insert(node.uuid, key.clone());
|
|
}
|
|
store.loaded_nodes_size = nodes_size;
|
|
store.loaded_rels_size = rels_size;
|
|
|
|
Ok(Some(store))
|
|
}
|
|
}
|
|
|
|
/// Check and repair corrupt capnp log files.
|
|
///
|
|
/// Reads each message sequentially, tracking file position. On the first
|
|
/// corrupt message, truncates the file to the last good position. Also
|
|
/// removes stale caches so the next load replays from the repaired log.
|
|
pub fn fsck() -> Result<(), String> {
|
|
let mut any_corrupt = false;
|
|
|
|
for (path, kind) in [
|
|
(nodes_path(), "node"),
|
|
(relations_path(), "relation"),
|
|
] {
|
|
if !path.exists() { continue; }
|
|
|
|
let file = fs::File::open(&path)
|
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
|
let file_len = file.metadata()
|
|
.map_err(|e| format!("stat {}: {}", path.display(), e))?.len();
|
|
let mut reader = BufReader::new(file);
|
|
|
|
let mut good_messages = 0u64;
|
|
let mut last_good_pos = 0u64;
|
|
|
|
loop {
|
|
let pos = reader.stream_position()
|
|
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
|
|
|
|
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
|
|
Ok(m) => m,
|
|
Err(_) => {
|
|
// read_message fails at EOF (normal) or on corrupt framing
|
|
if pos < file_len {
|
|
// Not at EOF — corrupt framing
|
|
eprintln!("{}: corrupt message at offset {}, truncating", kind, pos);
|
|
any_corrupt = true;
|
|
drop(reader);
|
|
let file = fs::OpenOptions::new().write(true).open(&path)
|
|
.map_err(|e| format!("open for truncate: {}", e))?;
|
|
file.set_len(pos)
|
|
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
|
|
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
|
|
kind, file_len, pos, good_messages);
|
|
}
|
|
break;
|
|
}
|
|
};
|
|
|
|
// Validate the message content too
|
|
let valid = if kind == "node" {
|
|
msg.get_root::<memory_capnp::node_log::Reader>()
|
|
.and_then(|l| l.get_nodes().map(|_| ()))
|
|
.is_ok()
|
|
} else {
|
|
msg.get_root::<memory_capnp::relation_log::Reader>()
|
|
.and_then(|l| l.get_relations().map(|_| ()))
|
|
.is_ok()
|
|
};
|
|
|
|
if valid {
|
|
good_messages += 1;
|
|
last_good_pos = reader.stream_position()
|
|
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
|
|
} else {
|
|
eprintln!("{}: corrupt message content at offset {}, truncating to {}",
|
|
kind, pos, last_good_pos);
|
|
any_corrupt = true;
|
|
drop(reader);
|
|
let file = fs::OpenOptions::new().write(true).open(&path)
|
|
.map_err(|e| format!("open for truncate: {}", e))?;
|
|
file.set_len(last_good_pos)
|
|
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
|
|
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
|
|
kind, file_len, last_good_pos, good_messages);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if !any_corrupt {
|
|
eprintln!("{}: {} messages, all clean", kind, good_messages);
|
|
}
|
|
}
|
|
|
|
if any_corrupt {
|
|
// Nuke caches so next load replays from the repaired logs
|
|
for p in [state_path(), snapshot_path()] {
|
|
if p.exists() {
|
|
fs::remove_file(&p)
|
|
.map_err(|e| format!("remove {}: {}", p.display(), e))?;
|
|
eprintln!("removed stale cache: {}", p.display());
|
|
}
|
|
}
|
|
eprintln!("repair complete — run `poc-memory status` to verify");
|
|
} else {
|
|
eprintln!("store is clean");
|
|
}
|
|
|
|
Ok(())
|
|
}
|