Convert store and CLI to anyhow::Result for cleaner error handling

Replace Result<_, String> with anyhow::Result throughout:
- hippocampus/store module (persist, ops, types, view, mod)
- CLI modules (admin, agent, graph, journal, node)
- Run trait in main.rs

Use .context() and .with_context() instead of .map_err(|e| format!(...))
patterns. Add bail!() for early error returns.

Add access_local() helper in hippocampus/mod.rs that returns
Result<Arc<Mutex<Store>>> for direct local store access.

Fix store access patterns to properly lock Arc<Mutex<Store>> before
accessing fields in mind/unconscious.rs, mind/mod.rs, subconscious/learn.rs,
and hippocampus/memory.rs.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
Kent Overstreet 2026-04-13 18:05:04 -04:00
parent 5db00e083f
commit b8db8754be
17 changed files with 282 additions and 295 deletions

View file

@ -19,7 +19,8 @@ pub struct MemoryNode {
impl MemoryNode {
/// Load a node from the store by key.
pub fn load(key: &str) -> Option<Self> {
let store = Store::load().ok()?;
let arc = super::access_local().ok()?;
let store = arc.try_lock().ok()?;
Self::from_store(&store, key)
}

View file

@ -22,7 +22,7 @@ pub mod transcript;
use std::cell::RefCell;
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use anyhow::{Context, Result};
use anyhow::Result;
use crate::hippocampus::store::Store;
pub use local::{LinkInfo, JournalEntry};
@ -79,6 +79,15 @@ pub fn access() -> StoreAccess {
}
}
/// Get local store access. Returns error if only RPC available.
pub fn access_local() -> Result<Arc<crate::Mutex<Store>>> {
match access() {
StoreAccess::Daemon(arc) => Ok(arc),
StoreAccess::Client => anyhow::bail!("direct store access not available via RPC"),
StoreAccess::None(err) => anyhow::bail!("{}", err),
}
}
pub fn socket_path() -> PathBuf {
dirs::home_dir()
.unwrap_or_default()
@ -150,24 +159,6 @@ pub fn memory_rpc(tool_name: &str, args: serde_json::Value) -> Result<String> {
})
}
// ── Helpers ────────────────────────────────────────────────────
fn get_str<'a>(args: &'a serde_json::Value, name: &'a str) -> Result<&'a str> {
args.get(name).and_then(|v| v.as_str()).context(format!("{} is required", name))
}
fn get_f64(args: &serde_json::Value, name: &str) -> Result<f64> {
args.get(name).and_then(|v| v.as_f64()).context(format!("{} is required", name))
}
/// Get provenance from agent state, or "manual".
async fn get_provenance(agent: &Option<std::sync::Arc<crate::agent::Agent>>) -> String {
match agent {
Some(a) => a.state.lock().await.provenance.clone(),
None => "manual".to_string(),
}
}
// ── Macro for generating tool wrappers ─────────────────────────
//
// memory_tool!(name, mut, arg1: [str], arg2: [Option<bool>])

View file

@ -40,6 +40,7 @@ pub use ops::current_provenance;
use crate::graph::{self, Graph};
use anyhow::{bail, Context, Result};
use std::fs;
use std::io::Write as IoWrite;
use std::path::Path;
@ -62,7 +63,7 @@ impl Store {
graph::build_graph(self)
}
pub fn resolve_key(&self, target: &str) -> Result<String, String> {
pub fn resolve_key(&self, target: &str) -> Result<String> {
// Strip .md suffix if present — keys no longer use it
let bare = strip_md_suffix(target);
@ -75,13 +76,13 @@ impl Store {
.cloned().collect();
match matches.len() {
0 => Err(format!("No entry for '{}'. Run 'init'?", target)),
0 => bail!("No entry for '{}'. Run 'init'?", target),
1 => Ok(matches[0].clone()),
n if n <= 10 => {
let list = matches.join("\n ");
Err(format!("Ambiguous '{}'. Matches:\n {}", target, list))
bail!("Ambiguous '{}'. Matches:\n {}", target, list)
}
n => Err(format!("Too many matches for '{}' ({}). Be more specific.", target, n)),
n => bail!("Too many matches for '{}' ({}). Be more specific.", target, n),
}
}
@ -103,7 +104,7 @@ impl Store {
}
/// Scan markdown files and index all memory units
pub fn init_from_markdown(&mut self) -> Result<usize, String> {
pub fn init_from_markdown(&mut self) -> Result<usize> {
let dir = memory_dir();
let mut count = 0;
if dir.exists() {
@ -128,10 +129,10 @@ impl Store {
&mut self,
dir: &Path,
edge_set: &mut std::collections::HashSet<([u8; 16], [u8; 16])>,
) -> Result<usize, String> {
) -> Result<usize> {
let mut count = 0;
let entries = fs::read_dir(dir)
.map_err(|e| format!("read dir {}: {}", dir.display(), e))?;
.with_context(|| format!("read dir {}", dir.display()))?;
for entry in entries.flatten() {
let path = entry.path();
@ -144,7 +145,7 @@ impl Store {
let filename = path.file_name().unwrap().to_string_lossy().to_string();
let content = fs::read_to_string(&path)
.map_err(|e| format!("read {}: {}", path.display(), e))?;
.with_context(|| format!("read {}", path.display()))?;
let units = parse_units(&filename, &content);
let (new_count, _) = self.ingest_units(&units, &filename)?;
@ -192,7 +193,7 @@ impl Store {
/// Process parsed memory units: diff against existing nodes, persist changes.
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
fn ingest_units(&mut self, units: &[MemoryUnit], filename: &str) -> Result<(usize, usize), String> {
fn ingest_units(&mut self, units: &[MemoryUnit], filename: &str) -> Result<(usize, usize)> {
let _lock = types::StoreLock::acquire()?;
self.refresh_nodes()?;
@ -239,10 +240,10 @@ impl Store {
}
/// Import a markdown file into the store, parsing it into nodes.
pub fn import_file(&mut self, path: &Path) -> Result<(usize, usize), String> {
pub fn import_file(&mut self, path: &Path) -> Result<(usize, usize)> {
let filename = path.file_name().unwrap().to_string_lossy().to_string();
let content = fs::read_to_string(path)
.map_err(|e| format!("read {}: {}", path.display(), e))?;
.with_context(|| format!("read {}", path.display()))?;
let units = parse_units(&filename, &content);
self.ingest_units(&units, &filename)
}

View file

@ -4,6 +4,7 @@
use super::types::*;
use anyhow::{anyhow, bail, Result};
use std::collections::{HashMap, HashSet};
/// Fallback provenance for non-tool-dispatch paths (CLI, digest, etc.).
@ -16,7 +17,7 @@ pub fn current_provenance() -> String {
impl Store {
/// Add or update a node (appends to log + updates cache).
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> {
pub fn upsert_node(&mut self, mut node: Node) -> Result<()> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
@ -31,7 +32,7 @@ impl Store {
}
/// Add a relation (appends to log + updates cache)
pub fn add_relation(&mut self, rel: Relation) -> Result<(), String> {
pub fn add_relation(&mut self, rel: Relation) -> Result<()> {
self.append_relations(std::slice::from_ref(&rel))?;
self.relations.push(rel);
Ok(())
@ -53,14 +54,14 @@ impl Store {
///
/// Provenance is determined by the POC_PROVENANCE env var if set,
/// otherwise defaults to Manual.
pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str, String> {
pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str> {
let prov = current_provenance();
self.upsert_provenance(key, content, &prov)
}
/// Upsert with explicit provenance (for agent-created nodes).
/// Holds StoreLock across refresh + check + write to prevent duplicate UUIDs.
pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str, String> {
pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
@ -88,14 +89,14 @@ impl Store {
/// Soft-delete a node (appends deleted version, removes from cache).
/// Holds StoreLock across refresh + write to see concurrent creates.
pub fn delete_node(&mut self, key: &str) -> Result<(), String> {
pub fn delete_node(&mut self, key: &str) -> Result<()> {
let _lock = StoreLock::acquire()?;
self.refresh_nodes()?;
let prov = current_provenance();
let node = self.nodes.get(key)
.ok_or_else(|| format!("No node '{}'", key))?;
.ok_or_else(|| anyhow!("No node '{}'", key))?;
let mut deleted = node.clone();
deleted.deleted = true;
deleted.version += 1;
@ -114,7 +115,7 @@ impl Store {
///
/// Appends: (new_key, v+1) + (old_key, deleted, v+1) + updated relations.
/// Holds StoreLock across refresh + write to prevent races.
pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<(), String> {
pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<()> {
if old_key == new_key {
return Ok(());
}
@ -123,10 +124,10 @@ impl Store {
self.refresh_nodes()?;
if self.nodes.contains_key(new_key) {
return Err(format!("Key '{}' already exists", new_key));
bail!("Key '{}' already exists", new_key);
}
let node = self.nodes.get(old_key)
.ok_or_else(|| format!("No node '{}'", old_key))?
.ok_or_else(|| anyhow!("No node '{}'", old_key))?
.clone();
let prov = current_provenance();
@ -179,7 +180,7 @@ impl Store {
}
/// Cap node degree by soft-deleting edges from mega-hubs.
pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize), String> {
pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize)> {
let mut node_degree: HashMap<String, usize> = HashMap::new();
for rel in &self.relations {
if rel.deleted { continue; }
@ -258,10 +259,10 @@ impl Store {
}
/// Set a node's weight directly. Returns (old, new).
pub fn set_weight(&mut self, key: &str, weight: f32) -> Result<(f32, f32), String> {
pub fn set_weight(&mut self, key: &str, weight: f32) -> Result<(f32, f32)> {
let weight = weight.clamp(0.01, 1.0);
let node = self.nodes.get_mut(key)
.ok_or_else(|| format!("node not found: {}", key))?;
.ok_or_else(|| anyhow!("node not found: {}", key))?;
let old = node.weight;
node.weight = weight;
Ok((old, weight))
@ -272,9 +273,9 @@ impl Store {
/// scores (alpha=0.5) but decays slowly on low scores (alpha=0.1).
/// This keeps memories surfaced even if they're only useful 1 in 4 times.
/// Returns (old_weight, new_weight).
pub fn score_weight(&mut self, key: &str, score: f64) -> Result<(f32, f32), String> {
pub fn score_weight(&mut self, key: &str, score: f64) -> Result<(f32, f32)> {
let node = self.nodes.get_mut(key)
.ok_or_else(|| format!("node not found: {}", key))?;
.ok_or_else(|| anyhow!("node not found: {}", key))?;
let old = node.weight;
let alpha = if score > old as f64 { 0.5 } else { 0.1 };
let new = (alpha * score + (1.0 - alpha) * old as f64) as f32;
@ -285,7 +286,7 @@ impl Store {
/// Set the strength of a link between two nodes. Deduplicates if
/// multiple links exist. Returns the old strength, or error if no link.
pub fn set_link_strength(&mut self, source: &str, target: &str, strength: f32) -> Result<f32, String> {
pub fn set_link_strength(&mut self, source: &str, target: &str, strength: f32) -> Result<f32> {
let strength = strength.clamp(0.01, 1.0);
let mut old = 0.0f32;
let mut found = false;
@ -322,22 +323,22 @@ impl Store {
/// Add a link between two nodes with Jaccard-based initial strength.
/// Returns the strength, or a message if the link already exists.
pub fn add_link(&mut self, source: &str, target: &str, provenance: &str) -> Result<f32, String> {
pub fn add_link(&mut self, source: &str, target: &str, provenance: &str) -> Result<f32> {
// Check for existing
let exists = self.relations.iter().any(|r|
!r.deleted &&
((r.source_key == source && r.target_key == target) ||
(r.source_key == target && r.target_key == source)));
if exists {
return Err(format!("link already exists: {}{}", source, target));
bail!("link already exists: {} ↔ {}", source, target);
}
let source_uuid = self.nodes.get(source)
.map(|n| n.uuid)
.ok_or_else(|| format!("source not found: {}", source))?;
.ok_or_else(|| anyhow!("source not found: {}", source))?;
let target_uuid = self.nodes.get(target)
.map(|n| n.uuid)
.ok_or_else(|| format!("target not found: {}", target))?;
.ok_or_else(|| anyhow!("target not found: {}", target))?;
let graph = self.build_graph();
let jaccard = graph.jaccard(source, target);

View file

@ -11,6 +11,7 @@ use super::types::*;
use crate::memory_capnp;
use anyhow::{Context, Result};
use capnp::message;
use capnp::serialize;
@ -27,10 +28,10 @@ static CACHED_STORE: tokio::sync::OnceCell<Arc<crate::Mutex<Store>>> =
impl Store {
/// Get or create the process-global cached store.
/// Reloads from disk if log files have changed since last load.
pub async fn cached() -> Result<Arc<crate::Mutex<Store>>, String> {
pub async fn cached() -> Result<Arc<crate::Mutex<Store>>> {
let store = CACHED_STORE.get_or_try_init(|| async {
let s = Store::load()?;
Ok::<_, String>(Arc::new(crate::Mutex::new(s)))
Ok::<_, anyhow::Error>(Arc::new(crate::Mutex::new(s)))
}).await?;
{
let mut guard = store.lock().await;
@ -54,7 +55,7 @@ impl Store {
/// append-only, any write grows the file, invalidating the cache.
/// This avoids the mtime race that caused data loss with concurrent
/// writers (dream loop, link audit, journal enrichment).
pub fn load() -> Result<Store, String> {
pub fn load() -> Result<Store> {
// 1. Try rkyv mmap snapshot (~4ms with deserialize, <1ms zero-copy)
match Self::load_snapshot_mmap() {
Ok(Some(mut store)) => {
@ -137,7 +138,7 @@ impl Store {
/// Load store directly from capnp logs, bypassing all caches.
/// Used by fsck to verify cache consistency.
pub fn load_from_logs() -> Result<Store, String> {
pub fn load_from_logs() -> Result<Store> {
let nodes_p = nodes_path();
let rels_p = relations_path();
@ -161,9 +162,9 @@ impl Store {
/// Replay node log, keeping latest version per UUID.
/// Tracks all UUIDs seen per key to detect duplicates.
fn replay_nodes(&mut self, path: &Path) -> Result<(), String> {
fn replay_nodes(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Track all non-deleted UUIDs per key to detect duplicates
@ -171,9 +172,9 @@ impl Store {
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log: {}", e))?;
.with_context(|| format!("read node log"))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
.with_context(|| format!("get nodes"))? {
let node = Node::from_capnp_migrate(node_reader)?;
let existing_version = self.nodes.get(&node.key)
.map(|n| n.version)
@ -208,9 +209,9 @@ impl Store {
}
/// Replay relation log, keeping latest version per UUID
fn replay_relations(&mut self, path: &Path) -> Result<(), String> {
fn replay_relations(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Collect all, then deduplicate by UUID keeping latest version
@ -218,9 +219,9 @@ impl Store {
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
.map_err(|e| format!("read relation log: {}", e))?;
.with_context(|| format!("read relation log"))?;
for rel_reader in log.get_relations()
.map_err(|e| format!("get relations: {}", e))? {
.with_context(|| format!("get relations"))? {
let rel = Relation::from_capnp_migrate(rel_reader)?;
let existing_version = by_uuid.get(&rel.uuid)
.map(|r| r.version)
@ -240,12 +241,12 @@ impl Store {
/// Find all duplicate keys: keys with multiple live UUIDs in the log.
/// Returns a map from key → vec of all live Node versions (one per UUID).
/// The "winner" in self.nodes is always one of them.
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>, String> {
pub fn find_duplicates(&self) -> Result<HashMap<String, Vec<Node>>> {
let path = nodes_path();
if !path.exists() { return Ok(HashMap::new()); }
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
// Track latest version of each UUID
@ -253,9 +254,9 @@ impl Store {
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log: {}", e))?;
.with_context(|| format!("read node log"))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
.with_context(|| format!("get nodes"))? {
let node = Node::from_capnp_migrate(node_reader)?;
let dominated = by_uuid.get(&node.uuid)
.map(|n| node.version >= n.version)
@ -282,13 +283,13 @@ impl Store {
/// Append nodes to the log file.
/// Serializes to a Vec first, then does a single write() syscall
/// so the append is atomic with O_APPEND even without flock.
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<(), String> {
pub fn append_nodes(&mut self, nodes: &[Node]) -> Result<()> {
let _lock = StoreLock::acquire()?;
self.append_nodes_unlocked(nodes)
}
/// Append nodes without acquiring the lock. Caller must hold StoreLock.
pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<(), String> {
pub(crate) fn append_nodes_unlocked(&mut self, nodes: &[Node]) -> Result<()> {
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
@ -299,15 +300,15 @@ impl Store {
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize nodes: {}", e))?;
.with_context(|| format!("serialize nodes"))?;
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write nodes: {}", e))?;
.with_context(|| format!("write nodes"))?;
self.loaded_nodes_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
@ -315,7 +316,7 @@ impl Store {
/// Replay only new entries appended to the node log since we last loaded.
/// Call under StoreLock to catch writes from concurrent processes.
pub(crate) fn refresh_nodes(&mut self) -> Result<(), String> {
pub(crate) fn refresh_nodes(&mut self) -> Result<()> {
let path = nodes_path();
let current_size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
if current_size <= self.loaded_nodes_size {
@ -323,16 +324,16 @@ impl Store {
}
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
reader.seek(std::io::SeekFrom::Start(self.loaded_nodes_size))
.map_err(|e| format!("seek nodes log: {}", e))?;
.with_context(|| format!("seek nodes log"))?;
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log delta: {}", e))?;
.with_context(|| format!("read node log delta"))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes delta: {}", e))? {
.with_context(|| format!("get nodes delta"))? {
let node = Node::from_capnp_migrate(node_reader)?;
let dominated = self.nodes.get(&node.key)
.map(|n| node.version >= n.version)
@ -355,13 +356,13 @@ impl Store {
/// Append relations to the log file.
/// Single write() syscall for atomic O_APPEND.
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<(), String> {
pub fn append_relations(&mut self, relations: &[Relation]) -> Result<()> {
let _lock = StoreLock::acquire()?;
self.append_relations_unlocked(relations)
}
/// Append relations without acquiring the lock. Caller must hold StoreLock.
pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<(), String> {
pub(crate) fn append_relations_unlocked(&mut self, relations: &[Relation]) -> Result<()> {
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
@ -372,22 +373,22 @@ impl Store {
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize relations: {}", e))?;
.with_context(|| format!("serialize relations"))?;
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write relations: {}", e))?;
.with_context(|| format!("write relations"))?;
self.loaded_rels_size = file.metadata().map(|m| m.len()).unwrap_or(0);
Ok(())
}
/// Append agent visit records to the visits log.
pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<(), String> {
pub fn append_visits(&mut self, visits: &[AgentVisit]) -> Result<()> {
if visits.is_empty() { return Ok(()); }
let mut msg = message::Builder::new_default();
@ -400,15 +401,15 @@ impl Store {
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize visits: {}", e))?;
.with_context(|| format!("serialize visits"))?;
let path = visits_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write visits: {}", e))?;
.with_context(|| format!("write visits"))?;
// Update in-memory index
for v in visits {
@ -422,22 +423,22 @@ impl Store {
}
/// Replay visits log to rebuild in-memory index.
fn replay_visits(&mut self, path: &Path) -> Result<(), String> {
fn replay_visits(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
while reader.stream_position().map_err(|e| e.to_string())?
< fs::metadata(path).map_err(|e| e.to_string())?.len()
while reader.stream_position()?
< fs::metadata(path)?.len()
{
let msg = match serialize::read_message(&mut reader, Default::default()) {
Ok(m) => m,
Err(_) => break,
};
let log = msg.get_root::<memory_capnp::agent_visit_log::Reader>()
.map_err(|e| format!("read visit log: {}", e))?;
.with_context(|| format!("read visit log"))?;
for visit in log.get_visits().map_err(|e| e.to_string())? {
for visit in log.get_visits()? {
let key = visit.get_node_key().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
@ -462,7 +463,7 @@ impl Store {
}
/// Append transcript segment progress records.
pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<(), String> {
pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<()> {
if segments.is_empty() { return Ok(()); }
let mut msg = message::Builder::new_default();
@ -475,15 +476,15 @@ impl Store {
}
let mut buf = Vec::new();
serialize::write_message(&mut buf, &msg)
.map_err(|e| format!("serialize transcript progress: {}", e))?;
.with_context(|| format!("serialize transcript progress"))?;
let path = transcript_progress_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
use std::io::Write;
(&file).write_all(&buf)
.map_err(|e| format!("write transcript progress: {}", e))?;
.with_context(|| format!("write transcript progress"))?;
// Update in-memory index
for seg in segments {
@ -497,22 +498,22 @@ impl Store {
}
/// Replay transcript progress log to rebuild in-memory index.
fn replay_transcript_progress(&mut self, path: &Path) -> Result<(), String> {
fn replay_transcript_progress(&mut self, path: &Path) -> Result<()> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(file);
while reader.stream_position().map_err(|e| e.to_string())?
< fs::metadata(path).map_err(|e| e.to_string())?.len()
while reader.stream_position()?
< fs::metadata(path)?.len()
{
let msg = match serialize::read_message(&mut reader, Default::default()) {
Ok(m) => m,
Err(_) => break,
};
let log = msg.get_root::<memory_capnp::transcript_progress_log::Reader>()
.map_err(|e| format!("read transcript progress: {}", e))?;
.with_context(|| format!("read transcript progress"))?;
for seg in log.get_segments().map_err(|e| e.to_string())? {
for seg in log.get_segments()? {
let id = seg.get_transcript_id().ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
@ -538,7 +539,7 @@ impl Store {
/// Reads _observed-transcripts-f-*, _mined-transcripts#f-*, and _facts-* keys,
/// extracts transcript_id and segment_index, writes to transcript-progress.capnp,
/// then deletes the stub nodes.
pub fn migrate_transcript_progress(&mut self) -> Result<usize, String> {
pub fn migrate_transcript_progress(&mut self) -> Result<usize> {
let mut segments = Vec::new();
for key in self.nodes.keys() {
@ -597,7 +598,7 @@ impl Store {
}
/// Record visits for a batch of node keys from a successful agent run.
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> {
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<()> {
let visits: Vec<AgentVisit> = node_keys.iter()
.filter_map(|key| {
let node = self.nodes.get(key)?;
@ -617,7 +618,7 @@ impl Store {
/// Save the derived cache with log size header for staleness detection.
/// Uses atomic write (tmp + rename) to prevent partial reads.
pub fn save(&self) -> Result<(), String> {
pub fn save(&self) -> Result<()> {
let _lock = StoreLock::acquire()?;
let path = state_path();
@ -633,7 +634,7 @@ impl Store {
let rels_size = self.loaded_rels_size;
let bincode_data = bincode::serialize(self)
.map_err(|e| format!("bincode serialize: {}", e))?;
.with_context(|| format!("bincode serialize"))?;
let mut data = Vec::with_capacity(CACHE_HEADER_LEN + bincode_data.len());
data.extend_from_slice(&CACHE_MAGIC);
@ -644,9 +645,9 @@ impl Store {
// Atomic write: tmp file + rename
let tmp_path = path.with_extension("bin.tmp");
fs::write(&tmp_path, &data)
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
.with_context(|| format!("write {}", tmp_path.display()))?;
fs::rename(&tmp_path, &path)
.map_err(|e| format!("rename {}{}: {}", tmp_path.display(), path.display(), e))?;
.with_context(|| format!("rename {}{}", tmp_path.display(), path.display()))?;
// Also write rkyv snapshot (mmap-friendly)
if let Err(e) = self.save_snapshot(nodes_size, rels_size) {
@ -658,7 +659,7 @@ impl Store {
/// Serialize store as rkyv snapshot with staleness header.
/// Assumes StoreLock is already held by caller.
fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<(), String> {
fn save_snapshot(&self, nodes_size: u64, rels_size: u64) -> Result<()> {
let snap = Snapshot {
nodes: self.nodes.clone(),
relations: self.relations.iter().filter(|r| !r.deleted).cloned().collect(),
@ -667,7 +668,7 @@ impl Store {
};
let rkyv_data = rkyv::to_bytes::<_, 256>(&snap)
.map_err(|e| format!("rkyv serialize: {}", e))?;
.with_context(|| format!("rkyv serialize"))?;
let mut data = Vec::with_capacity(RKYV_HEADER_LEN + rkyv_data.len());
data.extend_from_slice(&RKYV_MAGIC);
@ -680,16 +681,16 @@ impl Store {
let path = snapshot_path();
let tmp_path = path.with_extension("rkyv.tmp");
fs::write(&tmp_path, &data)
.map_err(|e| format!("write {}: {}", tmp_path.display(), e))?;
.with_context(|| format!("write {}", tmp_path.display()))?;
fs::rename(&tmp_path, &path)
.map_err(|e| format!("rename: {}", e))?;
.with_context(|| format!("rename"))?;
Ok(())
}
/// Try loading store from mmap'd rkyv snapshot.
/// Returns None if snapshot is missing or stale (log sizes don't match).
fn load_snapshot_mmap() -> Result<Option<Store>, String> {
fn load_snapshot_mmap() -> Result<Option<Store>> {
let path = snapshot_path();
if !path.exists() { return Ok(None); }
@ -697,10 +698,10 @@ impl Store {
let rels_size = fs::metadata(relations_path()).map(|m| m.len()).unwrap_or(0);
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let mmap = unsafe { memmap2::Mmap::map(&file) }
.map_err(|e| format!("mmap {}: {}", path.display(), e))?;
.with_context(|| format!("mmap {}", path.display()))?;
if mmap.len() < RKYV_HEADER_LEN { return Ok(None); }
if mmap[..4] != RKYV_MAGIC { return Ok(None); }
@ -751,7 +752,7 @@ impl Store {
/// Reads each message sequentially, tracking file position. On the first
/// corrupt message, truncates the file to the last good position. Also
/// removes stale caches so the next load replays from the repaired log.
pub fn fsck() -> Result<(), String> {
pub fn fsck() -> Result<()> {
let mut any_corrupt = false;
for (path, kind) in [
@ -761,9 +762,9 @@ pub fn fsck() -> Result<(), String> {
if !path.exists() { continue; }
let file = fs::File::open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
.with_context(|| format!("open {}", path.display()))?;
let file_len = file.metadata()
.map_err(|e| format!("stat {}: {}", path.display(), e))?.len();
.with_context(|| format!("stat {}", path.display()))?.len();
let mut reader = BufReader::new(file);
let mut good_messages = 0u64;
@ -771,7 +772,7 @@ pub fn fsck() -> Result<(), String> {
loop {
let pos = reader.stream_position()
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
.with_context(|| format!("tell {}", path.display()))?;
let msg = match serialize::read_message(&mut reader, message::ReaderOptions::new()) {
Ok(m) => m,
@ -783,9 +784,9 @@ pub fn fsck() -> Result<(), String> {
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.map_err(|e| format!("open for truncate: {}", e))?;
.with_context(|| format!("open for truncate"))?;
file.set_len(pos)
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
.with_context(|| format!("truncate {}", path.display()))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, pos, good_messages);
}
@ -807,16 +808,16 @@ pub fn fsck() -> Result<(), String> {
if valid {
good_messages += 1;
last_good_pos = reader.stream_position()
.map_err(|e| format!("tell {}: {}", path.display(), e))?;
.with_context(|| format!("tell {}", path.display()))?;
} else {
eprintln!("{}: corrupt message content at offset {}, truncating to {}",
kind, pos, last_good_pos);
any_corrupt = true;
drop(reader);
let file = fs::OpenOptions::new().write(true).open(&path)
.map_err(|e| format!("open for truncate: {}", e))?;
.with_context(|| format!("open for truncate"))?;
file.set_len(last_good_pos)
.map_err(|e| format!("truncate {}: {}", path.display(), e))?;
.with_context(|| format!("truncate {}", path.display()))?;
eprintln!("{}: truncated from {} to {} bytes ({} good messages)",
kind, file_len, last_good_pos, good_messages);
break;
@ -833,7 +834,7 @@ pub fn fsck() -> Result<(), String> {
for p in [state_path(), snapshot_path()] {
if p.exists() {
fs::remove_file(&p)
.map_err(|e| format!("remove {}: {}", p.display(), e))?;
.with_context(|| format!("remove {}", p.display()))?;
eprintln!("removed stale cache: {}", p.display());
}
}

View file

@ -5,6 +5,7 @@
use crate::memory_capnp;
use anyhow::{anyhow, bail, Context, Result};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -60,14 +61,14 @@ macro_rules! capnp_message {
skip: [$($sf:ident),* $(,)?] $(,)?
) => {
impl $struct {
pub fn from_capnp(r: $reader) -> Result<Self, String> {
pub fn from_capnp(r: $reader) -> Result<Self> {
paste::paste! {
Ok(Self {
$($tf: read_text(r.[<get_ $tf>]()),)*
$($uf: read_uuid(r.[<get_ $uf>]()),)*
$($pf: r.[<get_ $pf>](),)*
$($ef: $et::from_capnp(
r.[<get_ $ef>]().map_err(|_| concat!("bad ", stringify!($ef)))?
r.[<get_ $ef>]().map_err(|_| anyhow!(concat!("bad ", stringify!($ef))))?
),)*
$($sf: Default::default(),)*
})
@ -102,16 +103,16 @@ pub(crate) struct StoreLock {
}
impl StoreLock {
pub(crate) fn acquire() -> Result<Self, String> {
pub(crate) fn acquire() -> Result<Self> {
let path = lock_path();
let file = fs::OpenOptions::new()
.create(true).truncate(false).write(true).open(&path)
.map_err(|e| format!("open lock {}: {}", path.display(), e))?;
.with_context(|| format!("open lock {}", path.display()))?;
// Blocking exclusive lock
let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
if ret != 0 {
return Err(format!("flock: {}", std::io::Error::last_os_error()));
bail!("flock: {}", std::io::Error::last_os_error());
}
Ok(StoreLock { _file: file })
}
@ -354,7 +355,7 @@ capnp_message!(Node,
impl Node {
/// Read from capnp with migration: if the new provenance text field
/// is empty (old record), fall back to the deprecated provenanceOld enum.
pub fn from_capnp_migrate(r: memory_capnp::content_node::Reader<'_>) -> Result<Self, String> {
pub fn from_capnp_migrate(r: memory_capnp::content_node::Reader<'_>) -> Result<Self> {
let mut node = Self::from_capnp(r)?;
if node.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {
@ -384,7 +385,7 @@ capnp_message!(Relation,
);
impl Relation {
pub fn from_capnp_migrate(r: memory_capnp::relation::Reader<'_>) -> Result<Self, String> {
pub fn from_capnp_migrate(r: memory_capnp::relation::Reader<'_>) -> Result<Self> {
let mut rel = Self::from_capnp(r)?;
if rel.provenance.is_empty()
&& let Ok(old) = r.get_provenance_old() {

View file

@ -6,6 +6,7 @@
use super::types::*;
use anyhow::Result;
use std::fs;
// ---------------------------------------------------------------------------
@ -186,7 +187,7 @@ pub enum AnyView {
impl AnyView {
/// Load the fastest available view: mmap snapshot or owned store.
pub fn load() -> Result<Self, String> {
pub fn load() -> Result<Self> {
if let Some(mv) = MmapView::open() {
Ok(AnyView::Mmap(mv))
} else {