diff --git a/poc-memory/src/main.rs b/poc-memory/src/main.rs index 690336d..0494b46 100644 --- a/poc-memory/src/main.rs +++ b/poc-memory/src/main.rs @@ -86,6 +86,12 @@ enum Command { Health, /// Run consistency checks and repair Fsck, + /// Find and merge duplicate nodes (same key, multiple UUIDs) + Dedup { + /// Apply the merge (default: dry run) + #[arg(long)] + apply: bool, + }, /// Summary of memory state Status, /// Show graph structure overview @@ -487,6 +493,7 @@ fn main() { Command::Migrate => cmd_migrate(), Command::Health => cmd_health(), Command::Fsck => cmd_fsck(), + Command::Dedup { apply } => cmd_dedup(apply), Command::Status => cmd_status(), Command::Graph => cmd_graph(), Command::Used { key } => cmd_used(&key), @@ -796,6 +803,182 @@ fn cmd_fsck() -> Result<(), String> { Ok(()) } +fn cmd_dedup(apply: bool) -> Result<(), String> { + use std::collections::{HashMap, HashSet}; + + let mut store = store::Store::load()?; + let duplicates = store.find_duplicates()?; + + if duplicates.is_empty() { + println!("No duplicate keys found."); + return Ok(()); + } + + // Count edges per UUID + let mut edges_by_uuid: HashMap<[u8; 16], usize> = HashMap::new(); + for rel in &store.relations { + if rel.deleted { continue; } + *edges_by_uuid.entry(rel.source).or_default() += 1; + *edges_by_uuid.entry(rel.target).or_default() += 1; + } + + let mut identical_groups = Vec::new(); + let mut diverged_groups = Vec::new(); + + for (key, mut nodes) in duplicates { + // Sort by version descending so highest-version is first + nodes.sort_by(|a, b| b.version.cmp(&a.version)); + + // Check if all copies have identical content + let all_same = nodes.windows(2).all(|w| w[0].content == w[1].content); + + let info: Vec<_> = nodes.iter().map(|n| { + let edge_count = edges_by_uuid.get(&n.uuid).copied().unwrap_or(0); + (n.clone(), edge_count) + }).collect(); + + if all_same { + identical_groups.push((key, info)); + } else { + diverged_groups.push((key, info)); + } + } + + // Report + println!("=== Duplicate key report ===\n"); + println!("{} identical groups, {} diverged groups\n", + identical_groups.len(), diverged_groups.len()); + + if !identical_groups.is_empty() { + println!("── Identical (safe to auto-merge) ──"); + for (key, copies) in &identical_groups { + let total_edges: usize = copies.iter().map(|c| c.1).sum(); + println!(" {} ({} copies, {} total edges)", key, copies.len(), total_edges); + for (node, edges) in copies { + let uuid_hex = node.uuid.iter().map(|b| format!("{:02x}", b)).collect::(); + println!(" v{} uuid={}.. edges={}", node.version, &uuid_hex[..8], edges); + } + } + println!(); + } + + if !diverged_groups.is_empty() { + println!("── Diverged (need review) ──"); + for (key, copies) in &diverged_groups { + let total_edges: usize = copies.iter().map(|c| c.1).sum(); + println!(" {} ({} copies, {} total edges)", key, copies.len(), total_edges); + for (node, edges) in copies { + let uuid_hex = node.uuid.iter().map(|b| format!("{:02x}", b)).collect::(); + let preview: String = node.content.chars().take(80).collect(); + println!(" v{} uuid={}.. edges={} | {}{}", + node.version, &uuid_hex[..8], edges, preview, + if node.content.len() > 80 { "..." } else { "" }); + } + } + println!(); + } + + if !apply { + let total_dupes: usize = identical_groups.iter().chain(diverged_groups.iter()) + .map(|(_, copies)| copies.len() - 1) + .sum(); + println!("Dry run: {} duplicate nodes would be merged. Use --apply to execute.", total_dupes); + return Ok(()); + } + + // Merge all groups: identical + diverged + // For diverged: keep the copy with most edges (it's the one that got + // woven into the graph — the version that lived). Fall back to highest version. + let all_groups: Vec<_> = identical_groups.into_iter() + .chain(diverged_groups.into_iter()) + .collect(); + + let mut merged = 0usize; + let mut edges_redirected = 0usize; + let mut edges_deduped = 0usize; + + for (_key, mut copies) in all_groups { + // Pick survivor: most edges first, then highest version + copies.sort_by(|a, b| b.1.cmp(&a.1).then(b.0.version.cmp(&a.0.version))); + + let survivor_uuid = copies[0].0.uuid; + let doomed_uuids: Vec<[u8; 16]> = copies[1..].iter().map(|c| c.0.uuid).collect(); + + // Redirect edges from doomed UUIDs to survivor + let mut updated_rels = Vec::new(); + for rel in &mut store.relations { + if rel.deleted { continue; } + let mut changed = false; + if doomed_uuids.contains(&rel.source) { + rel.source = survivor_uuid; + changed = true; + } + if doomed_uuids.contains(&rel.target) { + rel.target = survivor_uuid; + changed = true; + } + if changed { + rel.version += 1; + updated_rels.push(rel.clone()); + edges_redirected += 1; + } + } + + // Dedup edges: same (source, target, rel_type) → keep highest strength + let mut seen: HashSet<([u8; 16], [u8; 16], String)> = HashSet::new(); + let mut to_tombstone_rels = Vec::new(); + // Sort by strength descending so we keep the strongest + let mut rels_with_idx: Vec<(usize, &store::Relation)> = store.relations.iter() + .enumerate() + .filter(|(_, r)| !r.deleted && (r.source == survivor_uuid || r.target == survivor_uuid)) + .collect(); + rels_with_idx.sort_by(|a, b| b.1.strength.total_cmp(&a.1.strength)); + + for (idx, rel) in &rels_with_idx { + let edge_key = (rel.source, rel.target, format!("{:?}", rel.rel_type)); + if !seen.insert(edge_key) { + to_tombstone_rels.push(*idx); + edges_deduped += 1; + } + } + + for &idx in &to_tombstone_rels { + store.relations[idx].deleted = true; + store.relations[idx].version += 1; + updated_rels.push(store.relations[idx].clone()); + } + + // Tombstone doomed nodes + let mut tombstones = Vec::new(); + for (doomed_node, _) in &copies[1..] { + let mut t = doomed_node.clone(); + t.deleted = true; + t.version += 1; + tombstones.push(t); + } + + store.append_nodes(&tombstones)?; + if !updated_rels.is_empty() { + store.append_relations(&updated_rels)?; + } + + for uuid in &doomed_uuids { + store.uuid_to_key.remove(uuid); + } + + merged += doomed_uuids.len(); + } + + // Remove tombstoned relations from cache + store.relations.retain(|r| !r.deleted); + store.save()?; + + println!("Merged {} duplicates, redirected {} edges, deduped {} duplicate edges", + merged, edges_redirected, edges_deduped); + + Ok(()) +} + fn cmd_health() -> Result<(), String> { let store = store::Store::load()?; let g = store.build_graph(); @@ -907,8 +1090,8 @@ fn cmd_consolidate_batch(count: usize, auto: bool, agent: Option) -> Res let store = store::Store::load()?; if let Some(agent_name) = agent { - let prompt = agents::prompts::agent_prompt(&store, &agent_name, count)?; - println!("{}", prompt); + let batch = agents::prompts::agent_prompt(&store, &agent_name, count)?; + println!("{}", batch.prompt); Ok(()) } else { agents::prompts::consolidation_batch(&store, count, auto)