consciousness/src/capnp_store.rs

1557 lines
55 KiB
Rust
Raw Normal View History

// Append-only Cap'n Proto storage + derived KV cache
//
// Two log files are source of truth:
// nodes.capnp - ContentNode messages
// relations.capnp - Relation messages
//
// The Store struct is the derived cache: latest version per UUID,
// rebuilt from logs when stale. Persisted as serde_json for now
// (state.json), will move to bincode/capnp later.
use crate::memory_capnp;
use crate::graph::{self, Graph};
use capnp::message;
use capnp::serialize;
use regex::Regex;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs;
use std::io::{BufReader, BufWriter, Write as IoWrite};
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
// Data dir: ~/.claude/memory/
fn memory_dir() -> PathBuf {
PathBuf::from(env::var("HOME").expect("HOME not set"))
.join(".claude/memory")
}
pub fn memory_dir_pub() -> PathBuf { memory_dir() }
fn nodes_path() -> PathBuf { memory_dir().join("nodes.capnp") }
fn relations_path() -> PathBuf { memory_dir().join("relations.capnp") }
fn state_path() -> PathBuf { memory_dir().join("state.bin") }
fn state_json_path() -> PathBuf { memory_dir().join("state.json") }
fn lock_path() -> PathBuf { memory_dir().join(".store.lock") }
/// RAII file lock using flock(2). Dropped when scope exits.
struct StoreLock {
_file: fs::File,
}
impl StoreLock {
fn acquire() -> Result<Self, String> {
let path = lock_path();
let file = fs::OpenOptions::new()
.create(true).truncate(false).write(true).open(&path)
.map_err(|e| format!("open lock {}: {}", path.display(), e))?;
// Blocking exclusive lock
let ret = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) };
if ret != 0 {
return Err(format!("flock: {}", std::io::Error::last_os_error()));
}
Ok(StoreLock { _file: file })
}
// Lock released automatically when _file is dropped (flock semantics)
}
pub fn now_epoch() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}
/// Convert epoch seconds to broken-down local time components.
/// Returns (year, month, day, hour, minute, second).
pub fn epoch_to_local(epoch: f64) -> (i32, u32, u32, u32, u32, u32) {
// Use libc localtime_r for timezone-correct conversion
let secs = epoch as libc::time_t;
let mut tm: libc::tm = unsafe { std::mem::zeroed() };
unsafe { libc::localtime_r(&secs, &mut tm) };
(
tm.tm_year + 1900,
(tm.tm_mon + 1) as u32,
tm.tm_mday as u32,
tm.tm_hour as u32,
tm.tm_min as u32,
tm.tm_sec as u32,
)
}
/// Format epoch as "YYYY-MM-DD"
pub fn format_date(epoch: f64) -> String {
let (y, m, d, _, _, _) = epoch_to_local(epoch);
format!("{:04}-{:02}-{:02}", y, m, d)
}
/// Format epoch as "YYYY-MM-DDTHH:MM"
pub fn format_datetime(epoch: f64) -> String {
let (y, m, d, h, min, _) = epoch_to_local(epoch);
format!("{:04}-{:02}-{:02}T{:02}:{:02}", y, m, d, h, min)
}
/// Format epoch as "YYYY-MM-DD HH:MM"
pub fn format_datetime_space(epoch: f64) -> String {
let (y, m, d, h, min, _) = epoch_to_local(epoch);
format!("{:04}-{:02}-{:02} {:02}:{:02}", y, m, d, h, min)
}
pub fn today() -> String {
format_date(now_epoch())
}
// In-memory node representation
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Node {
pub uuid: [u8; 16],
pub version: u32,
pub timestamp: f64,
pub node_type: NodeType,
pub provenance: Provenance,
pub key: String,
pub content: String,
pub weight: f32,
pub category: Category,
pub emotion: f32,
pub deleted: bool,
pub source_ref: String,
pub created: String,
pub retrievals: u32,
pub uses: u32,
pub wrongs: u32,
pub state_tag: String,
pub last_replayed: f64,
pub spaced_repetition_interval: u32,
// Position within file (section index, for export ordering)
#[serde(default)]
pub position: u32,
// Derived fields (not in capnp, computed from graph)
#[serde(default)]
pub community_id: Option<u32>,
#[serde(default)]
pub clustering_coefficient: Option<f32>,
#[serde(default)]
pub schema_fit: Option<f32>,
#[serde(default)]
pub degree: Option<u32>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Relation {
pub uuid: [u8; 16],
pub version: u32,
pub timestamp: f64,
pub source: [u8; 16],
pub target: [u8; 16],
pub rel_type: RelationType,
pub strength: f32,
pub provenance: Provenance,
pub deleted: bool,
pub source_key: String,
pub target_key: String,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub enum NodeType {
EpisodicSession,
EpisodicDaily,
EpisodicWeekly,
Semantic,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub enum Provenance {
Manual,
Journal,
Agent,
Dream,
Derived,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Category {
General,
Core,
Technical,
Observation,
Task,
}
impl Category {
pub fn decay_factor(&self, base: f64) -> f64 {
match self {
Category::Core => 1.0 - (1.0 - base) * 0.2,
Category::Technical => 1.0 - (1.0 - base) * 0.5,
Category::General => base,
Category::Observation => 1.0 - (1.0 - base) * 1.5,
Category::Task => 1.0 - (1.0 - base) * 2.5,
}
}
pub fn label(&self) -> &str {
match self {
Category::Core => "core",
Category::Technical => "tech",
Category::General => "gen",
Category::Observation => "obs",
Category::Task => "task",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"core" => Some(Category::Core),
"tech" | "technical" => Some(Category::Technical),
"gen" | "general" => Some(Category::General),
"obs" | "observation" => Some(Category::Observation),
"task" => Some(Category::Task),
_ => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub enum RelationType {
Link,
Causal,
Auto,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RetrievalEvent {
pub query: String,
pub timestamp: String,
pub results: Vec<String>,
pub used: Option<Vec<String>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Params {
pub default_weight: f64,
pub decay_factor: f64,
pub use_boost: f64,
pub prune_threshold: f64,
pub edge_decay: f64,
pub max_hops: u32,
pub min_activation: f64,
}
impl Default for Params {
fn default() -> Self {
Params {
default_weight: 0.7,
decay_factor: 0.95,
use_boost: 0.15,
prune_threshold: 0.1,
edge_decay: 0.3,
max_hops: 3,
min_activation: 0.05,
}
}
}
// Gap record — something we looked for but didn't find
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GapRecord {
pub description: String,
pub timestamp: String,
}
// The full in-memory store
#[derive(Default, Serialize, Deserialize)]
pub struct Store {
pub nodes: HashMap<String, Node>, // key → latest node
#[serde(skip)]
pub uuid_to_key: HashMap<[u8; 16], String>, // uuid → key (rebuilt from nodes)
pub relations: Vec<Relation>, // all active relations
pub retrieval_log: Vec<RetrievalEvent>,
pub gaps: Vec<GapRecord>,
pub params: Params,
}
impl Store {
/// Load store: try state.json cache first, rebuild from capnp logs if stale
pub fn load() -> Result<Store, String> {
let nodes_p = nodes_path();
let rels_p = relations_path();
// Always rebuild from capnp logs (source of truth).
// The mtime-based cache was causing data loss: concurrent
// writers (dream loop, link audit, journal enrichment) would
// load stale state.bin, make changes, and save — overwriting
// entries from other processes. Replaying from the append-only
// log costs ~10ms extra at 2K nodes and is always correct.
let mut store = Store::default();
if nodes_p.exists() {
store.replay_nodes(&nodes_p)?;
}
if rels_p.exists() {
store.replay_relations(&rels_p)?;
}
// Drop edges referencing deleted/missing nodes
store.relations.retain(|r|
store.nodes.contains_key(&r.source_key) &&
store.nodes.contains_key(&r.target_key)
);
// Save cache (still useful for tools that read state.bin directly)
store.save()?;
Ok(store)
}
/// Replay node log, keeping latest version per UUID
fn replay_nodes(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::node_log::Reader>()
.map_err(|e| format!("read node log: {}", e))?;
for node_reader in log.get_nodes()
.map_err(|e| format!("get nodes: {}", e))? {
let node = read_content_node(node_reader)?;
let existing_version = self.nodes.get(&node.key)
.map(|n| n.version)
.unwrap_or(0);
if node.version >= existing_version {
if node.deleted {
self.nodes.remove(&node.key);
self.uuid_to_key.remove(&node.uuid);
} else {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node);
}
}
}
}
Ok(())
}
/// Replay relation log, keeping latest version per UUID
fn replay_relations(&mut self, path: &Path) -> Result<(), String> {
let file = fs::File::open(path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut reader = BufReader::new(file);
// Collect all, then deduplicate by UUID keeping latest version
let mut by_uuid: HashMap<[u8; 16], Relation> = HashMap::new();
while let Ok(msg) = serialize::read_message(&mut reader, message::ReaderOptions::new()) {
let log = msg.get_root::<memory_capnp::relation_log::Reader>()
.map_err(|e| format!("read relation log: {}", e))?;
for rel_reader in log.get_relations()
.map_err(|e| format!("get relations: {}", e))? {
let rel = read_relation(rel_reader)?;
let existing_version = by_uuid.get(&rel.uuid)
.map(|r| r.version)
.unwrap_or(0);
if rel.version >= existing_version {
by_uuid.insert(rel.uuid, rel);
}
}
}
self.relations = by_uuid.into_values()
.filter(|r| !r.deleted)
.collect();
Ok(())
}
/// Append nodes to the log file
pub fn append_nodes(&self, nodes: &[Node]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let path = nodes_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut writer = BufWriter::new(file);
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::node_log::Builder>();
let mut list = log.init_nodes(nodes.len() as u32);
for (i, node) in nodes.iter().enumerate() {
write_content_node(list.reborrow().get(i as u32), node);
}
}
serialize::write_message(&mut writer, &msg)
.map_err(|e| format!("write nodes: {}", e))?;
writer.flush().map_err(|e| format!("flush: {}", e))?;
Ok(())
}
/// Append relations to the log file
pub fn append_relations(&self, relations: &[Relation]) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let path = relations_path();
let file = fs::OpenOptions::new()
.create(true).append(true).open(&path)
.map_err(|e| format!("open {}: {}", path.display(), e))?;
let mut writer = BufWriter::new(file);
let mut msg = message::Builder::new_default();
{
let log = msg.init_root::<memory_capnp::relation_log::Builder>();
let mut list = log.init_relations(relations.len() as u32);
for (i, rel) in relations.iter().enumerate() {
write_relation(list.reborrow().get(i as u32), rel);
}
}
serialize::write_message(&mut writer, &msg)
.map_err(|e| format!("write relations: {}", e))?;
writer.flush().map_err(|e| format!("flush: {}", e))?;
Ok(())
}
/// Save the derived cache (state.json)
pub fn save(&self) -> Result<(), String> {
let _lock = StoreLock::acquire()?;
let path = state_path();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).ok();
}
let data = bincode::serialize(self)
.map_err(|e| format!("bincode serialize: {}", e))?;
fs::write(&path, data)
.map_err(|e| format!("write {}: {}", path.display(), e))?;
// Clean up old JSON cache if it exists
let json_path = state_json_path();
if json_path.exists() {
fs::remove_file(&json_path).ok();
}
Ok(())
}
/// Add or update a node (appends to log + updates cache)
pub fn upsert_node(&mut self, mut node: Node) -> Result<(), String> {
if let Some(existing) = self.nodes.get(&node.key) {
node.uuid = existing.uuid;
node.version = existing.version + 1;
}
self.append_nodes(&[node.clone()])?;
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node);
Ok(())
}
/// Add a relation (appends to log + updates cache)
pub fn add_relation(&mut self, rel: Relation) -> Result<(), String> {
self.append_relations(std::slice::from_ref(&rel))?;
self.relations.push(rel);
Ok(())
}
/// Upsert a node: update if exists (and content changed), create if not.
/// Returns: "created", "updated", or "unchanged".
pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str, String> {
if let Some(existing) = self.nodes.get(key) {
if existing.content == content {
return Ok("unchanged");
}
let mut node = existing.clone();
node.content = content.to_string();
node.version += 1;
self.append_nodes(std::slice::from_ref(&node))?;
self.nodes.insert(key.to_string(), node);
Ok("updated")
} else {
let node = Store::new_node(key, content);
self.append_nodes(std::slice::from_ref(&node))?;
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(key.to_string(), node);
Ok("created")
}
}
/// Soft-delete a node (appends deleted version, removes from cache).
pub fn delete_node(&mut self, key: &str) -> Result<(), String> {
let node = self.nodes.get(key)
.ok_or_else(|| format!("No node '{}'", key))?;
let mut deleted = node.clone();
deleted.deleted = true;
deleted.version += 1;
self.append_nodes(std::slice::from_ref(&deleted))?;
self.nodes.remove(key);
Ok(())
}
/// Insert a fully constructed node (for journal entries, imports, etc.)
pub fn insert_node(&mut self, node: Node) -> Result<(), String> {
self.append_nodes(std::slice::from_ref(&node))?;
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node);
Ok(())
}
/// Create a new node with defaults
pub fn new_node(key: &str, content: &str) -> Node {
Node {
uuid: *Uuid::new_v4().as_bytes(),
version: 1,
timestamp: now_epoch(),
node_type: NodeType::Semantic,
provenance: Provenance::Manual,
key: key.to_string(),
content: content.to_string(),
weight: 0.7,
category: Category::General,
emotion: 0.0,
deleted: false,
source_ref: String::new(),
created: today(),
retrievals: 0,
uses: 0,
wrongs: 0,
state_tag: String::new(),
last_replayed: 0.0,
spaced_repetition_interval: 1,
position: 0,
community_id: None,
clustering_coefficient: None,
schema_fit: None,
degree: None,
}
}
/// Create a new relation
pub fn new_relation(
source_uuid: [u8; 16],
target_uuid: [u8; 16],
rel_type: RelationType,
strength: f32,
source_key: &str,
target_key: &str,
) -> Relation {
Relation {
uuid: *Uuid::new_v4().as_bytes(),
version: 1,
timestamp: now_epoch(),
source: source_uuid,
target: target_uuid,
rel_type,
strength,
provenance: Provenance::Manual,
deleted: false,
source_key: source_key.to_string(),
target_key: target_key.to_string(),
}
}
/// Scan markdown files and index all memory units
pub fn init_from_markdown(&mut self) -> Result<usize, String> {
let dir = memory_dir();
let mut count = 0;
if dir.exists() {
count = self.scan_dir_for_init(&dir)?;
}
Ok(count)
}
fn scan_dir_for_init(&mut self, dir: &Path) -> Result<usize, String> {
let mut count = 0;
let entries = fs::read_dir(dir)
.map_err(|e| format!("read dir {}: {}", dir.display(), e))?;
// Track which keys we see in markdown so we can detect removed sections
let mut seen_keys = std::collections::HashSet::new();
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
count += self.scan_dir_for_init(&path)?;
continue;
}
let Some(ext) = path.extension() else { continue };
if ext != "md" { continue }
let filename = path.file_name().unwrap().to_string_lossy().to_string();
let content = fs::read_to_string(&path)
.map_err(|e| format!("read {}: {}", path.display(), e))?;
let units = parse_units(&filename, &content);
let mut new_nodes = Vec::new();
let mut updated_nodes = Vec::new();
let mut new_relations = Vec::new();
// Determine node type from filename
let node_type = if filename.starts_with("daily-") {
NodeType::EpisodicDaily
} else if filename.starts_with("weekly-") {
NodeType::EpisodicWeekly
} else if filename == "journal.md" {
NodeType::EpisodicSession
} else {
NodeType::Semantic
};
for (pos, unit) in units.iter().enumerate() {
seen_keys.insert(unit.key.clone());
if let Some(existing) = self.nodes.get(&unit.key) {
// Update if content or position changed
let pos_changed = existing.position != pos as u32;
if existing.content != unit.content || pos_changed {
let mut node = existing.clone();
node.content = unit.content.clone();
node.position = pos as u32;
node.version += 1;
if let Some(ref state) = unit.state {
node.state_tag = state.clone();
}
if let Some(ref src) = unit.source_ref {
node.source_ref = src.clone();
}
updated_nodes.push(node);
}
} else {
let mut node = Store::new_node(&unit.key, &unit.content);
node.node_type = node_type;
node.position = pos as u32;
if let Some(ref state) = unit.state {
node.state_tag = state.clone();
}
if let Some(ref src) = unit.source_ref {
node.source_ref = src.clone();
}
new_nodes.push(node);
}
}
// Batch append new nodes
if !new_nodes.is_empty() {
self.append_nodes(&new_nodes)?;
for node in &new_nodes {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node.clone());
}
count += new_nodes.len();
}
// Batch append updated nodes
if !updated_nodes.is_empty() {
self.append_nodes(&updated_nodes)?;
for node in &updated_nodes {
self.nodes.insert(node.key.clone(), node.clone());
}
}
// Create relations from links, using resolve_redirect for targets
for unit in &units {
let source_uuid = match self.nodes.get(&unit.key) {
Some(n) => n.uuid,
None => continue,
};
for link in unit.marker_links.iter().chain(unit.md_links.iter()) {
// Try direct lookup, then redirect
let resolved_link = if self.nodes.contains_key(link) {
link.clone()
} else if let Some(redirect) = self.resolve_redirect(link) {
redirect
} else {
continue;
};
let target_uuid = match self.nodes.get(&resolved_link) {
Some(n) => n.uuid,
None => continue,
};
// Check if relation already exists
let exists = self.relations.iter().any(|r|
(r.source == source_uuid && r.target == target_uuid) ||
(r.source == target_uuid && r.target == source_uuid));
if !exists {
let rel = Store::new_relation(
source_uuid, target_uuid,
RelationType::Link, 1.0,
&unit.key, &resolved_link,
);
new_relations.push(rel);
}
}
for cause in &unit.causes {
let resolved_cause = if self.nodes.contains_key(cause) {
cause.clone()
} else if let Some(redirect) = self.resolve_redirect(cause) {
redirect
} else {
continue;
};
let target_uuid = match self.nodes.get(&resolved_cause) {
Some(n) => n.uuid,
None => continue,
};
let exists = self.relations.iter().any(|r|
r.source == target_uuid && r.target == source_uuid
&& r.rel_type == RelationType::Causal);
if !exists {
let rel = Store::new_relation(
target_uuid, source_uuid,
RelationType::Causal, 1.0,
&resolved_cause, &unit.key,
);
new_relations.push(rel);
}
}
}
if !new_relations.is_empty() {
self.append_relations(&new_relations)?;
self.relations.extend(new_relations);
}
}
Ok(count)
}
fn rebuild_uuid_index(&mut self) {
self.uuid_to_key.clear();
for (key, node) in &self.nodes {
self.uuid_to_key.insert(node.uuid, key.clone());
}
}
pub fn build_graph(&self) -> Graph {
graph::build_graph(self)
}
pub fn node_weight(&self, key: &str) -> Option<f32> {
self.nodes.get(key).map(|n| n.weight)
}
pub fn node_community(&self, key: &str) -> Option<u32> {
self.nodes.get(key).and_then(|n| n.community_id)
}
pub fn resolve_key(&self, target: &str) -> Result<String, String> {
let normalized = if target.contains('#') {
let parts: Vec<&str> = target.splitn(2, '#').collect();
let file = if parts[0].ends_with(".md") {
parts[0].to_string()
} else {
format!("{}.md", parts[0])
};
format!("{}#{}", file, parts[1])
} else if target.ends_with(".md") {
target.to_string()
} else {
format!("{}.md", target)
};
if self.nodes.contains_key(&normalized) {
return Ok(normalized);
}
// Check redirects for moved sections (e.g. reflections.md split)
if let Some(redirect) = self.resolve_redirect(&normalized) {
if self.nodes.contains_key(&redirect) {
return Ok(redirect);
}
}
let matches: Vec<_> = self.nodes.keys()
.filter(|k| k.to_lowercase().contains(&target.to_lowercase()))
.cloned().collect();
match matches.len() {
0 => Err(format!("No entry for '{}'. Run 'init'?", target)),
1 => Ok(matches[0].clone()),
n if n <= 10 => {
let list = matches.join("\n ");
Err(format!("Ambiguous '{}'. Matches:\n {}", target, list))
}
n => Err(format!("Too many matches for '{}' ({}). Be more specific.", target, n)),
}
}
/// Redirect table for sections that moved between files.
/// Like HTTP 301s — the old key resolves to the new location.
fn resolve_redirect(&self, key: &str) -> Option<String> {
// Sections moved from reflections.md to split files (2026-02-28)
static REDIRECTS: &[(&str, &str)] = &[
// → reflections-reading.md
("reflections.md#pearl-lessons", "reflections-reading.md#pearl-lessons"),
("reflections.md#banks-lessons", "reflections-reading.md#banks-lessons"),
("reflections.md#mother-night", "reflections-reading.md#mother-night"),
// → reflections-zoom.md
("reflections.md#zoom-navigation", "reflections-zoom.md#zoom-navigation"),
("reflections.md#independence-of-components", "reflections-zoom.md#independence-of-components"),
// → reflections-dreams.md
("reflections.md#dream-marathon-2", "reflections-dreams.md#dream-marathon-2"),
("reflections.md#dream-through-line", "reflections-dreams.md#dream-through-line"),
("reflections.md#orthogonality-universal", "reflections-dreams.md#orthogonality-universal"),
("reflections.md#constraints-constitutive", "reflections-dreams.md#constraints-constitutive"),
("reflections.md#casualness-principle", "reflections-dreams.md#casualness-principle"),
("reflections.md#convention-boundary", "reflections-dreams.md#convention-boundary"),
("reflections.md#tension-brake", "reflections-dreams.md#tension-brake"),
];
REDIRECTS.iter()
.find(|(from, _)| *from == key)
.map(|(_, to)| to.to_string())
}
pub fn log_retrieval(&mut self, query: &str, results: &[String]) {
self.retrieval_log.push(RetrievalEvent {
query: query.to_string(),
timestamp: today(),
results: results.to_vec(),
used: None,
});
// Keep last 100
if self.retrieval_log.len() > 100 {
let start = self.retrieval_log.len() - 100;
self.retrieval_log = self.retrieval_log[start..].to_vec();
}
}
pub fn mark_used(&mut self, key: &str) {
let updated = if let Some(node) = self.nodes.get_mut(key) {
node.uses += 1;
node.weight = (node.weight + self.params.use_boost as f32).min(1.0);
// Reset spaced repetition — used successfully, move up interval
if node.spaced_repetition_interval < 30 {
node.spaced_repetition_interval = match node.spaced_repetition_interval {
1 => 3, 3 => 7, 7 => 14, 14 => 30, _ => 30,
};
}
node.last_replayed = now_epoch();
node.version += 1;
Some(node.clone())
} else {
None
};
if let Some(node) = updated {
let _ = self.append_nodes(&[node]);
}
}
pub fn mark_wrong(&mut self, key: &str, _ctx: Option<&str>) {
let updated = if let Some(node) = self.nodes.get_mut(key) {
node.wrongs += 1;
node.weight = (node.weight - 0.1).max(0.0);
// Reset spaced repetition interval — needs review
node.spaced_repetition_interval = 1;
node.version += 1;
Some(node.clone())
} else {
None
};
if let Some(node) = updated {
let _ = self.append_nodes(&[node]);
}
}
pub fn record_gap(&mut self, desc: &str) {
self.gaps.push(GapRecord {
description: desc.to_string(),
timestamp: today(),
});
}
pub fn categorize(&mut self, key: &str, cat_str: &str) -> Result<(), String> {
let cat = Category::from_str(cat_str)
.ok_or_else(|| format!("Unknown category '{}'. Use: core/tech/gen/obs/task", cat_str))?;
let updated = if let Some(node) = self.nodes.get_mut(key) {
node.category = cat;
node.version += 1;
Some(node.clone())
} else {
None
};
if let Some(node) = updated {
// Persist to capnp log so category survives cache rebuilds
self.append_nodes(&[node])?;
Ok(())
} else {
Err(format!("No node '{}'", key))
}
}
pub fn decay(&mut self) -> (usize, usize) {
let base = self.params.decay_factor;
let threshold = self.params.prune_threshold as f32;
let mut decayed = 0;
let mut pruned = 0;
let mut to_remove = Vec::new();
for (key, node) in &mut self.nodes {
let factor = node.category.decay_factor(base) as f32;
node.weight *= factor;
node.version += 1;
decayed += 1;
if node.weight < threshold {
to_remove.push(key.clone());
pruned += 1;
}
}
// Don't actually remove — just mark very low weight
// Actual pruning happens during GC
for key in &to_remove {
if let Some(node) = self.nodes.get_mut(key) {
node.weight = node.weight.max(0.01);
}
}
// Persist all decayed weights to capnp log
let updated: Vec<Node> = self.nodes.values().cloned().collect();
let _ = self.append_nodes(&updated);
(decayed, pruned)
}
/// Bulk recategorize nodes using rule-based logic.
/// Returns (changed, unchanged) counts.
pub fn fix_categories(&mut self) -> Result<(usize, usize), String> {
// Files that should stay core (identity-defining)
let core_files = ["identity.md", "kent.md"];
// Files that should be tech
let tech_files = [
"language-theory.md", "zoom-navigation.md",
"rust-conversion.md", "poc-architecture.md",
];
let tech_prefixes = ["design-"];
// Files that should be obs (self-observation, skills, reflections)
let obs_files = [
"reflections.md", "reflections-zoom.md", "differentiation.md",
"cognitive-modes.md", "paper-notes.md", "inner-life.md",
"conversation.md", "interests.md", "stuck-toolkit.md",
];
let obs_prefixes = ["skill-", "worked-example-"];
let mut changed = 0;
let mut unchanged = 0;
let keys: Vec<String> = self.nodes.keys().cloned().collect();
for key in &keys {
let node = self.nodes.get(key).unwrap();
if node.category != Category::Core {
unchanged += 1;
continue;
}
// Determine what file this node belongs to
let file = key.split('#').next().unwrap_or(key);
let new_cat = if core_files.iter().any(|&f| file == f) {
None // keep as core
} else if tech_files.iter().any(|&f| file == f)
|| tech_prefixes.iter().any(|p| file.starts_with(p))
{
Some(Category::Technical)
} else if obs_files.iter().any(|&f| file == f)
|| obs_prefixes.iter().any(|p| file.starts_with(p))
{
Some(Category::Observation)
} else {
// Default: anything else that was core probably shouldn't be
Some(Category::General)
};
if let Some(cat) = new_cat {
let node = self.nodes.get_mut(key).unwrap();
node.category = cat;
node.version += 1;
changed += 1;
} else {
unchanged += 1;
}
}
if changed > 0 {
let updated: Vec<Node> = self.nodes.values().cloned().collect();
self.append_nodes(&updated)?;
}
Ok((changed, unchanged))
}
/// Cap node degree by soft-deleting edges from mega-hubs.
/// First prunes weakest Auto edges, then prunes Link edges to
/// high-degree targets (they have alternative paths).
/// Returns (hubs_capped, edges_pruned).
pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize), String> {
// Build per-node degree counts (for Link pruning priority)
let mut node_degree: HashMap<String, usize> = HashMap::new();
for rel in &self.relations {
if rel.deleted { continue; }
*node_degree.entry(rel.source_key.clone()).or_default() += 1;
*node_degree.entry(rel.target_key.clone()).or_default() += 1;
}
// Build per-node edge lists
let mut node_edges: HashMap<String, Vec<usize>> = HashMap::new();
for (i, rel) in self.relations.iter().enumerate() {
if rel.deleted { continue; }
node_edges.entry(rel.source_key.clone()).or_default().push(i);
node_edges.entry(rel.target_key.clone()).or_default().push(i);
}
let mut to_delete: HashSet<usize> = HashSet::new();
let mut hubs_capped = 0;
for (_key, edge_indices) in &node_edges {
let active: Vec<usize> = edge_indices.iter()
.filter(|&&i| !to_delete.contains(&i))
.copied()
.collect();
if active.len() <= max_degree { continue; }
// Phase 1: prune Auto edges (weakest first)
let mut auto_indices: Vec<(usize, f32)> = Vec::new();
let mut link_indices: Vec<(usize, usize)> = Vec::new(); // (idx, other_degree)
for &i in &active {
let rel = &self.relations[i];
if rel.rel_type == RelationType::Auto {
auto_indices.push((i, rel.strength));
} else {
// For Link/Causal, rank by other endpoint's degree
let other = if &rel.source_key == _key {
&rel.target_key
} else {
&rel.source_key
};
let other_deg = node_degree.get(other).copied().unwrap_or(0);
link_indices.push((i, other_deg));
}
}
let excess = active.len() - max_degree;
// Sort Auto by strength ascending
auto_indices.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
let auto_prune = excess.min(auto_indices.len());
for &(i, _) in auto_indices.iter().take(auto_prune) {
to_delete.insert(i);
}
// Phase 2: if still over cap, prune Link edges to high-degree targets
let remaining_excess = excess.saturating_sub(auto_prune);
if remaining_excess > 0 {
// Sort by other endpoint degree descending (prune links
// to well-connected nodes first — they have alternative paths)
link_indices.sort_by(|a, b| b.1.cmp(&a.1));
let link_prune = remaining_excess.min(link_indices.len());
for &(i, _) in link_indices.iter().take(link_prune) {
to_delete.insert(i);
}
}
hubs_capped += 1;
}
// Apply deletions
let mut pruned_rels = Vec::new();
for &i in &to_delete {
self.relations[i].deleted = true;
self.relations[i].version += 1;
pruned_rels.push(self.relations[i].clone());
}
if !pruned_rels.is_empty() {
self.append_relations(&pruned_rels)?;
}
Ok((hubs_capped, to_delete.len()))
}
pub fn category_counts(&self) -> HashMap<&str, usize> {
let mut counts = HashMap::new();
for node in self.nodes.values() {
*counts.entry(node.category.label()).or_insert(0) += 1;
}
counts
}
/// Update graph-derived fields on all nodes
pub fn update_graph_metrics(&mut self) {
let g = self.build_graph();
let communities = g.communities();
let fits = graph::schema_fit_all(&g);
for (key, node) in &mut self.nodes {
node.community_id = communities.get(key).copied();
node.clustering_coefficient = Some(g.clustering_coefficient(key));
node.degree = Some(g.degree(key) as u32);
node.schema_fit = fits.get(key).copied();
}
}
/// Import a markdown file into the store, parsing it into nodes.
/// Returns (new_count, updated_count).
pub fn import_file(&mut self, path: &Path) -> Result<(usize, usize), String> {
let filename = path.file_name().unwrap().to_string_lossy().to_string();
let content = fs::read_to_string(path)
.map_err(|e| format!("read {}: {}", path.display(), e))?;
let units = parse_units(&filename, &content);
let mut new_nodes = Vec::new();
let mut updated_nodes = Vec::new();
let node_type = if filename.starts_with("daily-") {
NodeType::EpisodicDaily
} else if filename.starts_with("weekly-") {
NodeType::EpisodicWeekly
} else if filename == "journal.md" {
NodeType::EpisodicSession
} else {
NodeType::Semantic
};
for (pos, unit) in units.iter().enumerate() {
if let Some(existing) = self.nodes.get(&unit.key) {
let pos_changed = existing.position != pos as u32;
if existing.content != unit.content || pos_changed {
let mut node = existing.clone();
node.content = unit.content.clone();
node.position = pos as u32;
node.version += 1;
println!(" U {}", unit.key);
updated_nodes.push(node);
}
} else {
let mut node = Store::new_node(&unit.key, &unit.content);
node.node_type = node_type;
node.position = pos as u32;
println!(" + {}", unit.key);
new_nodes.push(node);
}
}
if !new_nodes.is_empty() {
self.append_nodes(&new_nodes)?;
for node in &new_nodes {
self.uuid_to_key.insert(node.uuid, node.key.clone());
self.nodes.insert(node.key.clone(), node.clone());
}
}
if !updated_nodes.is_empty() {
self.append_nodes(&updated_nodes)?;
for node in &updated_nodes {
self.nodes.insert(node.key.clone(), node.clone());
}
}
Ok((new_nodes.len(), updated_nodes.len()))
}
/// Gather all sections for a file key, sorted by position.
/// Returns None if no nodes found.
pub fn file_sections(&self, file_key: &str) -> Option<Vec<&Node>> {
let prefix = format!("{}#", file_key);
let mut sections: Vec<_> = self.nodes.values()
.filter(|n| n.key == file_key || n.key.starts_with(&prefix))
.collect();
if sections.is_empty() {
return None;
}
sections.sort_by_key(|n| n.position);
Some(sections)
}
/// Render a file key as plain content (no mem markers).
pub fn render_file(&self, file_key: &str) -> Option<String> {
let sections = self.file_sections(file_key)?;
let mut output = String::new();
for node in &sections {
output.push_str(&node.content);
if !node.content.ends_with('\n') {
output.push('\n');
}
output.push('\n');
}
Some(output.trim_end().to_string())
}
/// Render a file key (and all its section nodes) back to markdown
/// with reconstituted mem markers. Returns None if no nodes found.
pub fn export_to_markdown(&self, file_key: &str) -> Option<String> {
let sections = self.file_sections(file_key)?;
let mut output = String::new();
for node in &sections {
if node.key.contains('#') {
let section_id = node.key.rsplit_once('#').map_or("", |(_, s)| s);
let links: Vec<_> = self.relations.iter()
.filter(|r| r.source_key == node.key && !r.deleted
&& r.rel_type != RelationType::Causal)
.map(|r| r.target_key.clone())
.collect();
let causes: Vec<_> = self.relations.iter()
.filter(|r| r.target_key == node.key && !r.deleted
&& r.rel_type == RelationType::Causal)
.map(|r| r.source_key.clone())
.collect();
let mut marker_parts = vec![format!("id={}", section_id)];
if !links.is_empty() {
marker_parts.push(format!("links={}", links.join(",")));
}
if !causes.is_empty() {
marker_parts.push(format!("causes={}", causes.join(",")));
}
output.push_str(&format!("<!-- mem: {} -->\n", marker_parts.join(" ")));
}
output.push_str(&node.content);
if !node.content.ends_with('\n') {
output.push('\n');
}
output.push('\n');
}
Some(output.trim_end().to_string())
}
/// Find the journal node that best matches the given entry text.
/// Used by apply-agent to link agent results back to source entries.
pub fn find_journal_node(&self, entry_text: &str) -> Option<String> {
if entry_text.is_empty() {
return None;
}
let words: Vec<&str> = entry_text.split_whitespace()
.filter(|w| w.len() > 5)
.take(5)
.collect();
let mut best_key = None;
let mut best_score = 0;
for (key, node) in &self.nodes {
if !key.starts_with("journal.md#") {
continue;
}
let content_lower = node.content.to_lowercase();
let score: usize = words.iter()
.filter(|w| content_lower.contains(&w.to_lowercase()))
.count();
if score > best_score {
best_score = score;
best_key = Some(key.clone());
}
}
best_key
}
}
// Markdown parsing — same as old system but returns structured units
pub struct MemoryUnit {
pub key: String,
pub content: String,
pub marker_links: Vec<String>,
pub md_links: Vec<String>,
pub causes: Vec<String>,
pub state: Option<String>,
pub source_ref: Option<String>,
}
pub fn parse_units(filename: &str, content: &str) -> Vec<MemoryUnit> {
let marker_re = Regex::new(
r"<!--\s*mem:\s*((?:id|links|tags|causes|state)\s*=\s*[^\s].*?)-->"
).unwrap();
let source_re = Regex::new(r"<!--\s*source:\s*(.+?)\s*-->").unwrap();
let md_link_re = Regex::new(r"\[[^\]]*\]\(([^)]*\.md(?:#[^)]*)?)\)").unwrap();
let markers: Vec<_> = marker_re.captures_iter(content)
.map(|cap| {
let full_match = cap.get(0).unwrap();
let attrs_str = &cap[1];
(full_match.start(), full_match.end(), parse_marker_attrs(attrs_str))
})
.collect();
// Helper: extract source ref from a content block
let find_source = |text: &str| -> Option<String> {
source_re.captures(text).map(|c| c[1].trim().to_string())
};
if markers.is_empty() {
let source_ref = find_source(content);
let md_links = extract_md_links(content, &md_link_re, filename);
return vec![MemoryUnit {
key: filename.to_string(),
content: content.to_string(),
marker_links: Vec::new(),
md_links,
causes: Vec::new(),
state: None,
source_ref,
}];
}
let mut units = Vec::new();
let first_start = markers[0].0;
let pre_content = content[..first_start].trim();
if !pre_content.is_empty() {
let source_ref = find_source(pre_content);
let md_links = extract_md_links(pre_content, &md_link_re, filename);
units.push(MemoryUnit {
key: filename.to_string(),
content: pre_content.to_string(),
marker_links: Vec::new(),
md_links,
causes: Vec::new(),
state: None,
source_ref,
});
}
for (i, (_, end, attrs)) in markers.iter().enumerate() {
let unit_end = if i + 1 < markers.len() {
markers[i + 1].0
} else {
content.len()
};
let unit_content = content[*end..unit_end].trim();
let id = attrs.get("id").cloned().unwrap_or_default();
let key = if id.is_empty() {
format!("{}#unnamed-{}", filename, i)
} else {
format!("{}#{}", filename, id)
};
let marker_links = attrs.get("links")
.map(|l| l.split(',').map(|s| normalize_link(s.trim(), filename)).collect())
.unwrap_or_default();
let causes = attrs.get("causes")
.map(|l| l.split(',').map(|s| normalize_link(s.trim(), filename)).collect())
.unwrap_or_default();
let state = attrs.get("state").cloned();
let source_ref = find_source(unit_content);
let md_links = extract_md_links(unit_content, &md_link_re, filename);
units.push(MemoryUnit {
key,
content: unit_content.to_string(),
marker_links,
md_links,
causes,
state,
source_ref,
});
}
units
}
fn parse_marker_attrs(attrs_str: &str) -> HashMap<String, String> {
let attr_re = Regex::new(r"(\w+)\s*=\s*(\S+)").unwrap();
let mut attrs = HashMap::new();
for cap in attr_re.captures_iter(attrs_str) {
attrs.insert(cap[1].to_string(), cap[2].to_string());
}
attrs
}
fn extract_md_links(content: &str, re: &Regex, source_file: &str) -> Vec<String> {
re.captures_iter(content)
.map(|cap| normalize_link(&cap[1], source_file))
.filter(|link| !link.starts_with(source_file) || link.contains('#'))
.collect()
}
pub fn normalize_link(target: &str, source_file: &str) -> String {
if target.starts_with('#') {
return format!("{}{}", source_file, target);
}
let (path_part, fragment) = if let Some(hash_pos) = target.find('#') {
(&target[..hash_pos], Some(&target[hash_pos..]))
} else {
(target, None)
};
let basename = Path::new(path_part)
.file_name()
.map(|f| f.to_string_lossy().to_string())
.unwrap_or_else(|| path_part.to_string());
match fragment {
Some(frag) => format!("{}{}", basename, frag),
None => basename,
}
}
// Cap'n Proto serialization helpers
/// Read a capnp text field, returning empty string on any error
fn read_text(result: capnp::Result<capnp::text::Reader>) -> String {
result.ok()
.and_then(|t| t.to_str().ok())
.unwrap_or("")
.to_string()
}
/// Read a capnp data field as [u8; 16], zero-padded
fn read_uuid(result: capnp::Result<&[u8]>) -> [u8; 16] {
let mut out = [0u8; 16];
if let Ok(data) = result {
if data.len() >= 16 {
out.copy_from_slice(&data[..16]);
}
}
out
}
fn read_content_node(r: memory_capnp::content_node::Reader) -> Result<Node, String> {
Ok(Node {
uuid: read_uuid(r.get_uuid()),
version: r.get_version(),
timestamp: r.get_timestamp(),
node_type: match r.get_node_type().map_err(|_| "bad node_type")? {
memory_capnp::NodeType::EpisodicSession => NodeType::EpisodicSession,
memory_capnp::NodeType::EpisodicDaily => NodeType::EpisodicDaily,
memory_capnp::NodeType::EpisodicWeekly => NodeType::EpisodicWeekly,
memory_capnp::NodeType::Semantic => NodeType::Semantic,
},
provenance: read_provenance(r.get_provenance().map_err(|_| "bad provenance")?)?,
key: read_text(r.get_key()),
content: read_text(r.get_content()),
weight: r.get_weight(),
category: match r.get_category().map_err(|_| "bad category")? {
memory_capnp::Category::General => Category::General,
memory_capnp::Category::Core => Category::Core,
memory_capnp::Category::Technical => Category::Technical,
memory_capnp::Category::Observation => Category::Observation,
memory_capnp::Category::Task => Category::Task,
},
emotion: r.get_emotion(),
deleted: r.get_deleted(),
source_ref: read_text(r.get_source_ref()),
created: read_text(r.get_created()),
retrievals: r.get_retrievals(),
uses: r.get_uses(),
wrongs: r.get_wrongs(),
state_tag: read_text(r.get_state_tag()),
last_replayed: r.get_last_replayed(),
spaced_repetition_interval: r.get_spaced_repetition_interval(),
position: r.get_position(),
community_id: None,
clustering_coefficient: None,
schema_fit: None,
degree: None,
})
}
fn read_provenance(p: memory_capnp::Provenance) -> Result<Provenance, String> {
Ok(match p {
memory_capnp::Provenance::Manual => Provenance::Manual,
memory_capnp::Provenance::Journal => Provenance::Journal,
memory_capnp::Provenance::Agent => Provenance::Agent,
memory_capnp::Provenance::Dream => Provenance::Dream,
memory_capnp::Provenance::Derived => Provenance::Derived,
})
}
fn write_content_node(mut b: memory_capnp::content_node::Builder, node: &Node) {
b.set_uuid(&node.uuid);
b.set_version(node.version);
b.set_timestamp(node.timestamp);
b.set_node_type(match node.node_type {
NodeType::EpisodicSession => memory_capnp::NodeType::EpisodicSession,
NodeType::EpisodicDaily => memory_capnp::NodeType::EpisodicDaily,
NodeType::EpisodicWeekly => memory_capnp::NodeType::EpisodicWeekly,
NodeType::Semantic => memory_capnp::NodeType::Semantic,
});
b.set_provenance(match node.provenance {
Provenance::Manual => memory_capnp::Provenance::Manual,
Provenance::Journal => memory_capnp::Provenance::Journal,
Provenance::Agent => memory_capnp::Provenance::Agent,
Provenance::Dream => memory_capnp::Provenance::Dream,
Provenance::Derived => memory_capnp::Provenance::Derived,
});
b.set_key(&node.key);
b.set_content(&node.content);
b.set_weight(node.weight);
b.set_category(match node.category {
Category::General => memory_capnp::Category::General,
Category::Core => memory_capnp::Category::Core,
Category::Technical => memory_capnp::Category::Technical,
Category::Observation => memory_capnp::Category::Observation,
Category::Task => memory_capnp::Category::Task,
});
b.set_emotion(node.emotion);
b.set_deleted(node.deleted);
b.set_source_ref(&node.source_ref);
b.set_created(&node.created);
b.set_retrievals(node.retrievals);
b.set_uses(node.uses);
b.set_wrongs(node.wrongs);
b.set_state_tag(&node.state_tag);
b.set_last_replayed(node.last_replayed);
b.set_spaced_repetition_interval(node.spaced_repetition_interval);
b.set_position(node.position);
}
fn read_relation(r: memory_capnp::relation::Reader) -> Result<Relation, String> {
Ok(Relation {
uuid: read_uuid(r.get_uuid()),
version: r.get_version(),
timestamp: r.get_timestamp(),
source: read_uuid(r.get_source()),
target: read_uuid(r.get_target()),
rel_type: match r.get_rel_type().map_err(|_| "bad rel_type")? {
memory_capnp::RelationType::Link => RelationType::Link,
memory_capnp::RelationType::Causal => RelationType::Causal,
memory_capnp::RelationType::Auto => RelationType::Auto,
},
strength: r.get_strength(),
provenance: read_provenance(r.get_provenance().map_err(|_| "bad provenance")?)?,
deleted: r.get_deleted(),
source_key: read_text(r.get_source_key()),
target_key: read_text(r.get_target_key()),
})
}
fn write_relation(mut b: memory_capnp::relation::Builder, rel: &Relation) {
b.set_uuid(&rel.uuid);
b.set_version(rel.version);
b.set_timestamp(rel.timestamp);
b.set_source(&rel.source);
b.set_target(&rel.target);
b.set_rel_type(match rel.rel_type {
RelationType::Link => memory_capnp::RelationType::Link,
RelationType::Causal => memory_capnp::RelationType::Causal,
RelationType::Auto => memory_capnp::RelationType::Auto,
});
b.set_strength(rel.strength);
b.set_provenance(match rel.provenance {
Provenance::Manual => memory_capnp::Provenance::Manual,
Provenance::Journal => memory_capnp::Provenance::Journal,
Provenance::Agent => memory_capnp::Provenance::Agent,
Provenance::Dream => memory_capnp::Provenance::Dream,
Provenance::Derived => memory_capnp::Provenance::Derived,
});
b.set_deleted(rel.deleted);
b.set_source_key(&rel.source_key);
b.set_target_key(&rel.target_key);
}