store: always replay from capnp log, remove stale cache optimization
The mtime-based cache (state.bin) was causing data loss under concurrent writes. Multiple processes (dream loop journal writes, link audit agents, journal enrichment agents) would each: 1. Load state.bin (stale - missing other processes' recent writes) 2. Make their own changes 3. Save state.bin, overwriting entries from other processes This caused 48 nodes to be lost from tonight's dream session - entries were in the append-only capnp log but invisible to the index because a later writer's state.bin overwrote the version that contained them. Fix: always replay from the capnp log (the source of truth). Cost: ~10ms extra at 2K nodes (36ms vs 26ms). The cache saved 10ms but introduced a correctness bug that lost real data. The append-only log design was correct - the cache layer violated its invariant by allowing stale reads to silently discard writes.
This commit is contained in:
parent
d8de2f33f4
commit
c7e7cfb7af
1 changed files with 7 additions and 39 deletions
|
|
@ -283,47 +283,15 @@ pub struct Store {
|
||||||
impl Store {
|
impl Store {
|
||||||
/// Load store: try state.json cache first, rebuild from capnp logs if stale
|
/// Load store: try state.json cache first, rebuild from capnp logs if stale
|
||||||
pub fn load() -> Result<Store, String> {
|
pub fn load() -> Result<Store, String> {
|
||||||
let state = state_path();
|
|
||||||
let nodes_p = nodes_path();
|
let nodes_p = nodes_path();
|
||||||
let rels_p = relations_path();
|
let rels_p = relations_path();
|
||||||
|
|
||||||
// Check if cache is up to date
|
// Always rebuild from capnp logs (source of truth).
|
||||||
let cache_fresh = state.exists() && {
|
// The mtime-based cache was causing data loss: concurrent
|
||||||
let cache_mtime = fs::metadata(&state).ok()
|
// writers (dream loop, link audit, journal enrichment) would
|
||||||
.and_then(|m| m.modified().ok())
|
// load stale state.bin, make changes, and save — overwriting
|
||||||
.unwrap_or(UNIX_EPOCH);
|
// entries from other processes. Replaying from the append-only
|
||||||
let nodes_mtime = fs::metadata(&nodes_p).ok()
|
// log costs ~10ms extra at 2K nodes and is always correct.
|
||||||
.and_then(|m| m.modified().ok())
|
|
||||||
.unwrap_or(UNIX_EPOCH);
|
|
||||||
let rels_mtime = fs::metadata(&rels_p).ok()
|
|
||||||
.and_then(|m| m.modified().ok())
|
|
||||||
.unwrap_or(UNIX_EPOCH);
|
|
||||||
cache_mtime >= nodes_mtime && cache_mtime >= rels_mtime
|
|
||||||
};
|
|
||||||
|
|
||||||
if cache_fresh {
|
|
||||||
let data = fs::read(&state)
|
|
||||||
.map_err(|e| format!("read state.bin: {}", e))?;
|
|
||||||
let mut store: Store = bincode::deserialize(&data)
|
|
||||||
.map_err(|e| format!("parse state.bin: {}", e))?;
|
|
||||||
store.rebuild_uuid_index();
|
|
||||||
return Ok(store);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try legacy JSON cache for migration
|
|
||||||
let json_state = state_json_path();
|
|
||||||
if json_state.exists() {
|
|
||||||
let data = fs::read_to_string(&json_state)
|
|
||||||
.map_err(|e| format!("read state.json: {}", e))?;
|
|
||||||
if let Ok(mut store) = serde_json::from_str::<Store>(&data) {
|
|
||||||
store.rebuild_uuid_index();
|
|
||||||
// Migrate to bincode
|
|
||||||
store.save()?;
|
|
||||||
return Ok(store);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rebuild from capnp logs
|
|
||||||
let mut store = Store::default();
|
let mut store = Store::default();
|
||||||
|
|
||||||
if nodes_p.exists() {
|
if nodes_p.exists() {
|
||||||
|
|
@ -339,7 +307,7 @@ impl Store {
|
||||||
store.nodes.contains_key(&r.target_key)
|
store.nodes.contains_key(&r.target_key)
|
||||||
);
|
);
|
||||||
|
|
||||||
// Save cache
|
// Save cache (still useful for tools that read state.bin directly)
|
||||||
store.save()?;
|
store.save()?;
|
||||||
Ok(store)
|
Ok(store)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue