consciousness/src/hippocampus/store/ops.rs

404 lines
16 KiB
Rust
Raw Normal View History

// Mutation operations on the store
//
// CRUD (upsert, delete), maintenance (decay, cap_degree), and graph metrics.
use super::{capnp, index, types::*, Store};
use anyhow::{anyhow, bail, Result};
use std::collections::{HashMap, HashSet};
/// Fallback provenance for non-tool-dispatch paths (CLI, digest, etc.).
/// Tool dispatch passes provenance directly through thought::dispatch.
pub fn current_provenance() -> String {
std::env::var("POC_PROVENANCE")
.unwrap_or_else(|_| "manual".to_string())
}
impl Store {
/// Add or update a node (appends to log + updates index).
pub fn upsert_node(&mut self, mut node: Node) -> Result<()> {
if let Some(existing) = self.get_node(&node.key)? {
node.uuid = existing.uuid;
node.version = existing.version + 1;
}
let offset = self.append_nodes(&[node.clone()])?;
if let Some(ref database) = self.db {
index::index_node(database, &node.key, offset, &node.uuid)?;
}
Ok(())
}
/// Add a relation (appends to log + updates cache + indexes)
pub fn add_relation(&mut self, rel: Relation) -> Result<()> {
self.append_relations(std::slice::from_ref(&rel))?;
if let Some(db) = &self.db {
index::index_relation(db, &rel.source, &rel.target, rel.strength, rel.rel_type as u8)?;
}
self.relations.push(rel);
Ok(())
}
/// Recent nodes by provenance, sorted newest-first. Returns (key, timestamp).
pub fn recent_by_provenance(&self, provenance: &str, limit: usize) -> Vec<(String, i64)> {
let db = match self.db.as_ref() {
Some(db) => db,
None => return Vec::new(),
};
let keys = match index::all_keys(db) {
Ok(keys) => keys,
Err(_) => return Vec::new(),
};
let mut nodes: Vec<_> = keys.iter()
.filter_map(|key| {
let offset = index::get_offset(db, key).ok()??;
let node = capnp::read_node_at_offset(offset).ok()?;
if !node.deleted && node.provenance == provenance {
Some((key.clone(), node.timestamp))
} else {
None
}
})
.collect();
nodes.sort_by(|a, b| b.1.cmp(&a.1));
nodes.truncate(limit);
nodes
}
/// Upsert a node: update if exists (and content changed), create if not.
/// Returns: "created", "updated", or "unchanged".
///
/// Provenance is determined by the POC_PROVENANCE env var if set,
/// otherwise defaults to Manual.
pub fn upsert(&mut self, key: &str, content: &str) -> Result<&'static str> {
let prov = current_provenance();
self.upsert_provenance(key, content, &prov)
}
/// Upsert with explicit provenance (for agent-created nodes).
pub fn upsert_provenance(&mut self, key: &str, content: &str, provenance: &str) -> Result<&'static str> {
if let Some(existing) = self.get_node(key)? {
if existing.content == content {
return Ok("unchanged");
}
let mut node = existing;
node.content = content.to_string();
node.provenance = provenance.to_string();
node.timestamp = now_epoch();
node.version += 1;
let offset = self.append_nodes(std::slice::from_ref(&node))?;
if let Some(ref database) = self.db {
index::index_node(database, &node.key, offset, &node.uuid)?;
}
Ok("updated")
} else {
let mut node = new_node(key, content);
node.provenance = provenance.to_string();
let offset = self.append_nodes(std::slice::from_ref(&node))?;
if let Some(ref database) = self.db {
index::index_node(database, &node.key, offset, &node.uuid)?;
}
Ok("created")
}
}
/// Soft-delete a node (appends deleted version, removes from index).
pub fn delete_node(&mut self, key: &str) -> Result<()> {
let prov = current_provenance();
let node = self.get_node(key)?
.ok_or_else(|| anyhow!("No node '{}'", key))?;
let uuid = node.uuid;
let mut deleted = node;
deleted.deleted = true;
deleted.version += 1;
deleted.provenance = prov;
deleted.timestamp = now_epoch();
self.append_nodes(std::slice::from_ref(&deleted))?;
if let Some(ref database) = self.db {
index::remove_node(database, key, &uuid)?;
}
Ok(())
}
/// Rename a node: change its key, update debug strings on all edges.
///
/// Graph edges (source/target UUIDs) are unaffected — they're already
/// UUID-based. We update the human-readable source_key/target_key strings
/// on relations, and created_at is preserved untouched.
pub fn rename_node(&mut self, old_key: &str, new_key: &str) -> Result<()> {
if old_key == new_key {
return Ok(());
}
if self.contains_key(new_key)? {
bail!("Key '{}' already exists", new_key);
}
let node = self.get_node(old_key)?
.ok_or_else(|| anyhow!("No node '{}'", old_key))?;
let prov = current_provenance();
// New version under the new key
let mut renamed = node.clone();
renamed.key = new_key.to_string();
renamed.version += 1;
renamed.provenance = prov.clone();
renamed.timestamp = now_epoch();
// Deletion record for the old key (same UUID, independent version counter)
let mut tombstone = node.clone();
tombstone.deleted = true;
tombstone.version += 1;
tombstone.provenance = prov;
tombstone.timestamp = now_epoch();
// Persist node changes
let offset = self.append_nodes(&[renamed.clone(), tombstone.clone()])?;
// Update node index: remove old key, add renamed
if let Some(ref database) = self.db {
index::remove_node(database, old_key, &tombstone.uuid)?;
index::index_node(database, new_key, offset, &renamed.uuid)?;
// Find relations touching this node's UUID and update their key strings
let node_uuid = node.uuid;
let edges = index::edges_for_node(database, &node_uuid)?;
// Build uuid → key map for the other endpoints
let keys = index::all_keys(database)?;
let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new();
for k in &keys {
if let Ok(Some(u)) = index::get_uuid_for_key(database, k) {
uuid_to_key.insert(u, k.clone());
}
}
// Update the renamed node's mapping
uuid_to_key.insert(node_uuid, new_key.to_string());
let mut updated_rels = Vec::new();
for (other_uuid, strength, rel_type, is_outgoing) in edges {
let other_key = uuid_to_key.get(&other_uuid).cloned().unwrap_or_default();
let (src_uuid, tgt_uuid, src_key, tgt_key) = if is_outgoing {
(node_uuid, other_uuid, new_key.to_string(), other_key)
} else {
(other_uuid, node_uuid, other_key, new_key.to_string())
};
let mut rel = new_relation(src_uuid, tgt_uuid,
RelationType::from_u8(rel_type), strength,
&src_key, &tgt_key);
rel.version = 2; // indicate update
updated_rels.push(rel);
}
if !updated_rels.is_empty() {
self.append_relations(&updated_rels)?;
}
}
Ok(())
}
/// Cap node degree by soft-deleting edges from mega-hubs.
pub fn cap_degree(&mut self, max_degree: usize) -> Result<(usize, usize)> {
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let keys = index::all_keys(db)?;
// Build uuid ↔ key maps
let mut uuid_to_key: HashMap<[u8; 16], String> = HashMap::new();
let mut key_to_uuid: HashMap<String, [u8; 16]> = HashMap::new();
for key in &keys {
if let Ok(Some(uuid)) = index::get_uuid_for_key(db, key) {
uuid_to_key.insert(uuid, key.clone());
key_to_uuid.insert(key.clone(), uuid);
}
}
// Count degrees per node
let mut node_degree: HashMap<String, usize> = HashMap::new();
for key in &keys {
let uuid = match key_to_uuid.get(key) {
Some(u) => u,
None => continue,
};
let edges = index::edges_for_node(db, uuid)?;
node_degree.insert(key.clone(), edges.len());
}
let mut to_delete: HashSet<([u8; 16], [u8; 16])> = HashSet::new();
let mut hubs_capped = 0;
for key in &keys {
let uuid = match key_to_uuid.get(key) {
Some(u) => *u,
None => continue,
};
let edges = index::edges_for_node(db, &uuid)?;
if edges.len() <= max_degree { continue; }
// Separate auto vs manual edges: (source, target, sort_key)
let mut auto_edges: Vec<([u8; 16], [u8; 16], f32)> = Vec::new();
let mut link_edges: Vec<([u8; 16], [u8; 16], usize)> = Vec::new();
for (other_uuid, strength, rel_type, is_outgoing) in &edges {
// Canonical edge direction (source < target by outgoing flag)
let (src, tgt) = if *is_outgoing { (uuid, *other_uuid) } else { (*other_uuid, uuid) };
if to_delete.contains(&(src, tgt)) { continue; }
let other_key = match uuid_to_key.get(other_uuid) {
Some(k) => k,
None => continue,
};
if *rel_type == RelationType::Auto as u8 {
auto_edges.push((src, tgt, *strength));
} else {
let other_deg = node_degree.get(other_key).copied().unwrap_or(0);
link_edges.push((src, tgt, other_deg));
}
}
let active_count = auto_edges.len() + link_edges.len();
if active_count <= max_degree { continue; }
let excess = active_count - max_degree;
// Prune weakest auto edges first
auto_edges.sort_by(|a, b| a.2.total_cmp(&b.2));
for (src, tgt, _) in auto_edges.iter().take(excess) {
to_delete.insert((*src, *tgt));
}
// Then prune links to highest-degree nodes
let remaining = excess.saturating_sub(auto_edges.len());
if remaining > 0 {
link_edges.sort_by(|a, b| b.2.cmp(&a.2));
for (src, tgt, _) in link_edges.iter().take(remaining) {
to_delete.insert((*src, *tgt));
}
}
hubs_capped += 1;
}
// Collect edge info for deletion
let mut to_remove: Vec<([u8; 16], [u8; 16], f32, u8, String, String)> = Vec::new();
for (source_uuid, target_uuid) in &to_delete {
let edges = index::edges_for_node(db, source_uuid)?;
if let Some((_, strength, rel_type, _)) = edges.iter()
.find(|(other, _, _, out)| *other == *target_uuid && *out)
{
let source_key = uuid_to_key.get(source_uuid).cloned().unwrap_or_default();
let target_key = uuid_to_key.get(target_uuid).cloned().unwrap_or_default();
to_remove.push((*source_uuid, *target_uuid, *strength, *rel_type, source_key, target_key));
}
}
// Now mutate: remove from index and persist tombstones
let pruned_count = to_remove.len();
for (source_uuid, target_uuid, strength, rel_type, source_key, target_key) in to_remove {
if let Some(db) = &self.db {
index::remove_relation(db, &source_uuid, &target_uuid, strength, rel_type)?;
}
let mut rel = new_relation(source_uuid, target_uuid,
RelationType::from_u8(rel_type), strength,
&source_key, &target_key);
rel.deleted = true;
rel.version = 2;
self.append_relations(std::slice::from_ref(&rel))?;
}
Ok((hubs_capped, pruned_count))
}
/// Set a node's weight directly. Returns (old, new).
pub fn set_weight(&mut self, key: &str, weight: f32) -> Result<(f32, f32)> {
let weight = weight.clamp(0.01, 1.0);
let mut node = self.get_node(key)?
.ok_or_else(|| anyhow!("node not found: {}", key))?;
let old = node.weight;
if (old - weight).abs() < 0.001 {
return Ok((old, weight)); // unchanged
}
node.weight = weight;
node.version += 1;
node.timestamp = now_epoch();
let offset = self.append_nodes(std::slice::from_ref(&node))?;
if let Some(ref database) = self.db {
index::index_node(database, key, offset, &node.uuid)?;
}
Ok((old, weight))
}
/// Set the strength of a link between two nodes.
/// Returns the old strength. Creates link if it doesn't exist.
pub fn set_link_strength(&mut self, source: &str, target: &str, strength: f32) -> Result<f32> {
let strength = strength.clamp(0.01, 1.0);
let source_uuid = self.get_node(source)?
.map(|n| n.uuid)
.ok_or_else(|| anyhow!("source not found: {}", source))?;
let target_uuid = self.get_node(target)?
.map(|n| n.uuid)
.ok_or_else(|| anyhow!("target not found: {}", target))?;
// Find existing edge via index
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
let edges = index::edges_for_node(db, &source_uuid)?;
let existing = edges.iter().find(|(other, _, _, _)| *other == target_uuid);
if let Some((_, old_strength, rel_type, _)) = existing {
let old = *old_strength;
// Remove old edge from index, add updated one
index::remove_relation(db, &source_uuid, &target_uuid, old, *rel_type)?;
index::index_relation(db, &source_uuid, &target_uuid, strength, *rel_type)?;
// Append updated relation to log
let mut rel = new_relation(source_uuid, target_uuid,
RelationType::from_u8(*rel_type), strength, source, target);
rel.version = 2; // indicate update
self.append_relations(std::slice::from_ref(&rel))?;
Ok(old)
} else {
// Create new link
self.add_link(source, target, "link_set")?;
// Update its strength
let db = self.db.as_ref().ok_or_else(|| anyhow!("store not loaded"))?;
index::remove_relation(db, &source_uuid, &target_uuid, 0.1, RelationType::Link as u8)?;
index::index_relation(db, &source_uuid, &target_uuid, strength, RelationType::Link as u8)?;
Ok(0.0)
}
}
/// Add a link between two nodes with Jaccard-based initial strength.
/// Returns the strength, or a message if the link already exists.
pub fn add_link(&mut self, source: &str, target: &str, provenance: &str) -> Result<f32> {
let source_uuid = self.get_node(source)?
.map(|n| n.uuid)
.ok_or_else(|| anyhow!("source not found: {}", source))?;
let target_uuid = self.get_node(target)?
.map(|n| n.uuid)
.ok_or_else(|| anyhow!("target not found: {}", target))?;
// Check for existing via index
if let Some(db) = &self.db {
let edges = index::edges_for_node(db, &source_uuid)?;
let exists = edges.iter().any(|(other, _, _, _)| *other == target_uuid);
if exists {
bail!("link already exists: {} ↔ {}", source, target);
}
}
let graph = self.build_graph();
let jaccard = graph.jaccard(source, target);
let strength = (jaccard * 3.0).clamp(0.1, 1.0) as f32;
let mut rel = new_relation(
source_uuid, target_uuid,
RelationType::Link, strength,
source, target,
);
rel.provenance = provenance.to_string();
self.add_relation(rel)?;
Ok(strength)
}
}