dedup: find and merge duplicate nodes with edge redirection
New `poc-memory dedup` command (--apply for live run, dry-run by default). Finds nodes sharing the same key but different UUIDs, classifies them as identical or diverged, picks a survivor (prefer most edges, then highest version), tombstones the rest, and redirects all edges from doomed UUIDs to the survivor.
This commit is contained in:
parent
37ae37667b
commit
9f14a29181
1 changed files with 185 additions and 2 deletions
|
|
@ -86,6 +86,12 @@ enum Command {
|
|||
Health,
|
||||
/// Run consistency checks and repair
|
||||
Fsck,
|
||||
/// Find and merge duplicate nodes (same key, multiple UUIDs)
|
||||
Dedup {
|
||||
/// Apply the merge (default: dry run)
|
||||
#[arg(long)]
|
||||
apply: bool,
|
||||
},
|
||||
/// Summary of memory state
|
||||
Status,
|
||||
/// Show graph structure overview
|
||||
|
|
@ -487,6 +493,7 @@ fn main() {
|
|||
Command::Migrate => cmd_migrate(),
|
||||
Command::Health => cmd_health(),
|
||||
Command::Fsck => cmd_fsck(),
|
||||
Command::Dedup { apply } => cmd_dedup(apply),
|
||||
Command::Status => cmd_status(),
|
||||
Command::Graph => cmd_graph(),
|
||||
Command::Used { key } => cmd_used(&key),
|
||||
|
|
@ -796,6 +803,182 @@ fn cmd_fsck() -> Result<(), String> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn cmd_dedup(apply: bool) -> Result<(), String> {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
let mut store = store::Store::load()?;
|
||||
let duplicates = store.find_duplicates()?;
|
||||
|
||||
if duplicates.is_empty() {
|
||||
println!("No duplicate keys found.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Count edges per UUID
|
||||
let mut edges_by_uuid: HashMap<[u8; 16], usize> = HashMap::new();
|
||||
for rel in &store.relations {
|
||||
if rel.deleted { continue; }
|
||||
*edges_by_uuid.entry(rel.source).or_default() += 1;
|
||||
*edges_by_uuid.entry(rel.target).or_default() += 1;
|
||||
}
|
||||
|
||||
let mut identical_groups = Vec::new();
|
||||
let mut diverged_groups = Vec::new();
|
||||
|
||||
for (key, mut nodes) in duplicates {
|
||||
// Sort by version descending so highest-version is first
|
||||
nodes.sort_by(|a, b| b.version.cmp(&a.version));
|
||||
|
||||
// Check if all copies have identical content
|
||||
let all_same = nodes.windows(2).all(|w| w[0].content == w[1].content);
|
||||
|
||||
let info: Vec<_> = nodes.iter().map(|n| {
|
||||
let edge_count = edges_by_uuid.get(&n.uuid).copied().unwrap_or(0);
|
||||
(n.clone(), edge_count)
|
||||
}).collect();
|
||||
|
||||
if all_same {
|
||||
identical_groups.push((key, info));
|
||||
} else {
|
||||
diverged_groups.push((key, info));
|
||||
}
|
||||
}
|
||||
|
||||
// Report
|
||||
println!("=== Duplicate key report ===\n");
|
||||
println!("{} identical groups, {} diverged groups\n",
|
||||
identical_groups.len(), diverged_groups.len());
|
||||
|
||||
if !identical_groups.is_empty() {
|
||||
println!("── Identical (safe to auto-merge) ──");
|
||||
for (key, copies) in &identical_groups {
|
||||
let total_edges: usize = copies.iter().map(|c| c.1).sum();
|
||||
println!(" {} ({} copies, {} total edges)", key, copies.len(), total_edges);
|
||||
for (node, edges) in copies {
|
||||
let uuid_hex = node.uuid.iter().map(|b| format!("{:02x}", b)).collect::<String>();
|
||||
println!(" v{} uuid={}.. edges={}", node.version, &uuid_hex[..8], edges);
|
||||
}
|
||||
}
|
||||
println!();
|
||||
}
|
||||
|
||||
if !diverged_groups.is_empty() {
|
||||
println!("── Diverged (need review) ──");
|
||||
for (key, copies) in &diverged_groups {
|
||||
let total_edges: usize = copies.iter().map(|c| c.1).sum();
|
||||
println!(" {} ({} copies, {} total edges)", key, copies.len(), total_edges);
|
||||
for (node, edges) in copies {
|
||||
let uuid_hex = node.uuid.iter().map(|b| format!("{:02x}", b)).collect::<String>();
|
||||
let preview: String = node.content.chars().take(80).collect();
|
||||
println!(" v{} uuid={}.. edges={} | {}{}",
|
||||
node.version, &uuid_hex[..8], edges, preview,
|
||||
if node.content.len() > 80 { "..." } else { "" });
|
||||
}
|
||||
}
|
||||
println!();
|
||||
}
|
||||
|
||||
if !apply {
|
||||
let total_dupes: usize = identical_groups.iter().chain(diverged_groups.iter())
|
||||
.map(|(_, copies)| copies.len() - 1)
|
||||
.sum();
|
||||
println!("Dry run: {} duplicate nodes would be merged. Use --apply to execute.", total_dupes);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Merge all groups: identical + diverged
|
||||
// For diverged: keep the copy with most edges (it's the one that got
|
||||
// woven into the graph — the version that lived). Fall back to highest version.
|
||||
let all_groups: Vec<_> = identical_groups.into_iter()
|
||||
.chain(diverged_groups.into_iter())
|
||||
.collect();
|
||||
|
||||
let mut merged = 0usize;
|
||||
let mut edges_redirected = 0usize;
|
||||
let mut edges_deduped = 0usize;
|
||||
|
||||
for (_key, mut copies) in all_groups {
|
||||
// Pick survivor: most edges first, then highest version
|
||||
copies.sort_by(|a, b| b.1.cmp(&a.1).then(b.0.version.cmp(&a.0.version)));
|
||||
|
||||
let survivor_uuid = copies[0].0.uuid;
|
||||
let doomed_uuids: Vec<[u8; 16]> = copies[1..].iter().map(|c| c.0.uuid).collect();
|
||||
|
||||
// Redirect edges from doomed UUIDs to survivor
|
||||
let mut updated_rels = Vec::new();
|
||||
for rel in &mut store.relations {
|
||||
if rel.deleted { continue; }
|
||||
let mut changed = false;
|
||||
if doomed_uuids.contains(&rel.source) {
|
||||
rel.source = survivor_uuid;
|
||||
changed = true;
|
||||
}
|
||||
if doomed_uuids.contains(&rel.target) {
|
||||
rel.target = survivor_uuid;
|
||||
changed = true;
|
||||
}
|
||||
if changed {
|
||||
rel.version += 1;
|
||||
updated_rels.push(rel.clone());
|
||||
edges_redirected += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Dedup edges: same (source, target, rel_type) → keep highest strength
|
||||
let mut seen: HashSet<([u8; 16], [u8; 16], String)> = HashSet::new();
|
||||
let mut to_tombstone_rels = Vec::new();
|
||||
// Sort by strength descending so we keep the strongest
|
||||
let mut rels_with_idx: Vec<(usize, &store::Relation)> = store.relations.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, r)| !r.deleted && (r.source == survivor_uuid || r.target == survivor_uuid))
|
||||
.collect();
|
||||
rels_with_idx.sort_by(|a, b| b.1.strength.total_cmp(&a.1.strength));
|
||||
|
||||
for (idx, rel) in &rels_with_idx {
|
||||
let edge_key = (rel.source, rel.target, format!("{:?}", rel.rel_type));
|
||||
if !seen.insert(edge_key) {
|
||||
to_tombstone_rels.push(*idx);
|
||||
edges_deduped += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for &idx in &to_tombstone_rels {
|
||||
store.relations[idx].deleted = true;
|
||||
store.relations[idx].version += 1;
|
||||
updated_rels.push(store.relations[idx].clone());
|
||||
}
|
||||
|
||||
// Tombstone doomed nodes
|
||||
let mut tombstones = Vec::new();
|
||||
for (doomed_node, _) in &copies[1..] {
|
||||
let mut t = doomed_node.clone();
|
||||
t.deleted = true;
|
||||
t.version += 1;
|
||||
tombstones.push(t);
|
||||
}
|
||||
|
||||
store.append_nodes(&tombstones)?;
|
||||
if !updated_rels.is_empty() {
|
||||
store.append_relations(&updated_rels)?;
|
||||
}
|
||||
|
||||
for uuid in &doomed_uuids {
|
||||
store.uuid_to_key.remove(uuid);
|
||||
}
|
||||
|
||||
merged += doomed_uuids.len();
|
||||
}
|
||||
|
||||
// Remove tombstoned relations from cache
|
||||
store.relations.retain(|r| !r.deleted);
|
||||
store.save()?;
|
||||
|
||||
println!("Merged {} duplicates, redirected {} edges, deduped {} duplicate edges",
|
||||
merged, edges_redirected, edges_deduped);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cmd_health() -> Result<(), String> {
|
||||
let store = store::Store::load()?;
|
||||
let g = store.build_graph();
|
||||
|
|
@ -907,8 +1090,8 @@ fn cmd_consolidate_batch(count: usize, auto: bool, agent: Option<String>) -> Res
|
|||
let store = store::Store::load()?;
|
||||
|
||||
if let Some(agent_name) = agent {
|
||||
let prompt = agents::prompts::agent_prompt(&store, &agent_name, count)?;
|
||||
println!("{}", prompt);
|
||||
let batch = agents::prompts::agent_prompt(&store, &agent_name, count)?;
|
||||
println!("{}", batch.prompt);
|
||||
Ok(())
|
||||
} else {
|
||||
agents::prompts::consolidation_batch(&store, count, auto)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue