diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index 6e93fac..2464878 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -242,208 +242,197 @@ fn job_rename_agent( /// /// This handles arbitrarily large nodes because the output of each phase 2 /// call is proportional to one child, not the whole parent. -fn job_split_agent( +/// Split a single node by key. Called as an independent task so multiple +/// splits can run in parallel. Each task loads the store fresh, checks the +/// node still exists and hasn't been split, does the LLM work, then saves. +fn job_split_one( ctx: &ExecutionContext, - batch_size: usize, + parent_key: String, ) -> Result<(), TaskError> { run_job(ctx, "c-split", || { - ctx.log_line("loading store"); + ctx.log_line(&format!("loading store for {}", parent_key)); let mut store = crate::store::Store::load()?; - let count = if batch_size == 0 { 1 } else { batch_size }; - let candidates = super::prompts::split_candidates(&store); - if candidates.is_empty() { - ctx.log_line("no nodes large enough to split"); + // Check node still exists and hasn't been deleted/split already + let content_len = match store.nodes.get(parent_key.as_str()) { + Some(n) if !n.deleted => n.content.len(), + _ => { + ctx.log_line(&format!("skip: {} no longer exists or deleted", parent_key)); + return Ok(()); + } + }; + + ctx.log_line(&format!("--- splitting: {} ({} chars)", parent_key, content_len)); + + // Phase 1: get split plan + let plan_prompt = super::prompts::split_plan_prompt(&store, &parent_key)?; + ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len())); + + let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?; + let plan = match super::llm::parse_json_response(&plan_response) { + Ok(v) => v, + Err(e) => { + ctx.log_line(&format!("phase 1 parse error: {}", e)); + return Ok(()); + } + }; + + let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or(""); + if action == "keep" { + let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or(""); + ctx.log_line(&format!("keep: {} ({})", parent_key, reason)); + return Ok(()); + } + if action != "split" { + ctx.log_line(&format!("unexpected action: {}", action)); return Ok(()); } - let mut total_split = 0; - let mut total_kept = 0; - - for parent_key in candidates.iter().take(count) { - ctx.log_line(&format!("--- splitting: {} ({} chars)", - parent_key, - store.nodes.get(parent_key).map(|n| n.content.len()).unwrap_or(0))); - - // Phase 1: get split plan - let plan_prompt = super::prompts::split_plan_prompt(&store, parent_key)?; - ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len())); - - let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?; - let plan = match super::llm::parse_json_response(&plan_response) { - Ok(v) => v, - Err(e) => { - ctx.log_line(&format!("phase 1 parse error: {}", e)); - continue; - } - }; - - let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or(""); - if action == "keep" { - let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or(""); - ctx.log_line(&format!("keep: {} ({})", parent_key, reason)); - total_kept += 1; - continue; - } - if action != "split" { - ctx.log_line(&format!("unexpected action: {}", action)); - continue; + let children_plan = match plan.get("children").and_then(|v| v.as_array()) { + Some(c) if c.len() >= 2 => c, + _ => { + ctx.log_line("plan has fewer than 2 children, skipping"); + return Ok(()); } + }; - let children_plan = match plan.get("children").and_then(|v| v.as_array()) { - Some(c) if c.len() >= 2 => c, - _ => { - ctx.log_line("plan has fewer than 2 children, skipping"); - continue; - } - }; - - ctx.log_line(&format!("phase 1: {} children planned", children_plan.len())); - for child in children_plan { - let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?"); - let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or(""); - ctx.log_line(&format!(" planned: {} — {}", key, desc)); - } - - // Phase 2: extract content for each child - let mut children: Vec<(String, String)> = Vec::new(); - - for child_plan in children_plan { - let child_key = match child_plan.get("key").and_then(|v| v.as_str()) { - Some(k) => k.to_string(), - None => continue, - }; - let child_desc = child_plan.get("description") - .and_then(|v| v.as_str()).unwrap_or(""); - let child_sections = child_plan.get("sections") - .and_then(|v| v.as_array()) - .map(|arr| arr.iter() - .filter_map(|v| v.as_str()) - .collect::>() - .join(", ")) - .unwrap_or_default(); - - ctx.log_line(&format!("phase 2: extracting {}", child_key)); - - let extract_prompt = super::prompts::split_extract_prompt( - &store, parent_key, &child_key, child_desc, &child_sections)?; - ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len())); - - let content = match super::llm::call_sonnet("split-extract", &extract_prompt) { - Ok(c) => c, - Err(e) => { - ctx.log_line(&format!(" extract error: {}", e)); - continue; - } - }; - - ctx.log_line(&format!(" extracted: {} chars", content.len())); - children.push((child_key, content)); - } - - if children.len() < 2 { - ctx.log_line(&format!("only {} children extracted, skipping", children.len())); - continue; - } - - // Collect parent's edges before modifications - let parent_edges: Vec<_> = store.relations.iter() - .filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key)) - .cloned() - .collect(); - - // Create child nodes - let mut child_uuids: Vec<([u8; 16], String)> = Vec::new(); - for (child_key, content) in &children { - if store.nodes.contains_key(child_key.as_str()) { - ctx.log_line(&format!(" skip: {} already exists", child_key)); - continue; - } - store.upsert_provenance(child_key, content, - crate::store::Provenance::AgentConsolidate)?; - let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid; - child_uuids.push((uuid, child_key.clone())); - ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len())); - } - - // Inherit edges: assign each parent edge to best-matching child - for edge in &parent_edges { - let neighbor_key = if edge.source_key == *parent_key { - &edge.target_key - } else { - &edge.source_key - }; - let neighbor_content = store.nodes.get(neighbor_key.as_str()) - .map(|n| n.content.as_str()) - .unwrap_or(""); - - let best = child_uuids.iter() - .enumerate() - .map(|(idx, _)| { - let sim = crate::similarity::cosine_similarity( - &children[idx].1, neighbor_content); - (idx, sim) - }) - .max_by(|a, b| a.1.total_cmp(&b.1)); - - if let Some((idx, _)) = best { - let (child_uuid, child_key) = &child_uuids[idx]; - let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) { - Some(n) => n.uuid, - None => continue, - }; - - let (su, tu, sk, tk) = if edge.source_key == *parent_key { - (*child_uuid, neighbor_uuid, child_key.as_str(), neighbor_key.as_str()) - } else { - (neighbor_uuid, *child_uuid, neighbor_key.as_str(), child_key.as_str()) - }; - - let rel = crate::store::new_relation( - su, tu, crate::store::RelationType::Auto, edge.strength, sk, tk, - ); - store.add_relation(rel).ok(); - } - } - - // Link siblings - for i in 0..child_uuids.len() { - for j in (i+1)..child_uuids.len() { - let rel = crate::store::new_relation( - child_uuids[i].0, child_uuids[j].0, - crate::store::RelationType::Auto, 0.5, - &child_uuids[i].1, &child_uuids[j].1, - ); - store.add_relation(rel).ok(); - } - } - - // Tombstone parent - if let Some(parent) = store.nodes.get_mut(parent_key) { - parent.deleted = true; - parent.version += 1; - let tombstone = parent.clone(); - store.append_nodes(std::slice::from_ref(&tombstone)).ok(); - } - store.nodes.remove(parent_key); - - ctx.log_line(&format!("split complete: {} → {} children", parent_key, child_uuids.len())); - total_split += 1; - - // Save after each split so we don't lose work - store.save()?; + ctx.log_line(&format!("phase 1: {} children planned", children_plan.len())); + for child in children_plan { + let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?"); + let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or(""); + ctx.log_line(&format!(" planned: {} — {}", key, desc)); } - // Store report - let ts = crate::store::format_datetime(crate::store::now_epoch()) - .replace([':', '-', 'T'], ""); - let report_key = format!("_consolidation-split-{}", ts); - let report = format!("Split agent: {} split, {} kept out of {} candidates", - total_split, total_kept, candidates.len().min(count)); - store.upsert_provenance(&report_key, &report, - crate::store::Provenance::AgentConsolidate).ok(); + // Phase 2: extract content for each child + let mut children: Vec<(String, String)> = Vec::new(); + // Collect neighbor assignments from plan: child_key -> [neighbor_keys] + let mut neighbor_map: HashMap> = HashMap::new(); - ctx.log_line(&format!("done: {} split, {} kept", total_split, total_kept)); + for child_plan in children_plan { + let child_key = match child_plan.get("key").and_then(|v| v.as_str()) { + Some(k) => k.to_string(), + None => continue, + }; + let child_desc = child_plan.get("description") + .and_then(|v| v.as_str()).unwrap_or(""); + let child_sections = child_plan.get("sections") + .and_then(|v| v.as_array()) + .map(|arr| arr.iter() + .filter_map(|v| v.as_str()) + .collect::>() + .join(", ")) + .unwrap_or_default(); + let child_neighbors: Vec = child_plan.get("neighbors") + .and_then(|v| v.as_array()) + .map(|arr| arr.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect()) + .unwrap_or_default(); + neighbor_map.insert(child_key.clone(), child_neighbors); + + ctx.log_line(&format!("phase 2: extracting {}", child_key)); + + let extract_prompt = super::prompts::split_extract_prompt( + &store, &parent_key, &child_key, child_desc, &child_sections)?; + ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len())); + + let content = match super::llm::call_sonnet("split-extract", &extract_prompt) { + Ok(c) => c, + Err(e) => { + ctx.log_line(&format!(" extract error: {}", e)); + continue; + } + }; + + ctx.log_line(&format!(" extracted: {} chars", content.len())); + children.push((child_key, content)); + } + + if children.len() < 2 { + ctx.log_line(&format!("only {} children extracted, skipping", children.len())); + return Ok(()); + } + + // Reload store before mutations — another split may have saved meanwhile + store = crate::store::Store::load()?; + + // Re-check parent still exists after reload + if !store.nodes.contains_key(parent_key.as_str()) || + store.nodes.get(parent_key.as_str()).map_or(true, |n| n.deleted) { + ctx.log_line(&format!("skip: {} was split by another task", parent_key)); + return Ok(()); + } + + // Collect parent's edges before modifications + let parent_edges: Vec<_> = store.relations.iter() + .filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key)) + .cloned() + .collect(); + + // Create child nodes + let mut child_uuids: Vec<([u8; 16], String)> = Vec::new(); + for (child_key, content) in &children { + if store.nodes.contains_key(child_key.as_str()) { + ctx.log_line(&format!(" skip: {} already exists", child_key)); + continue; + } + store.upsert_provenance(child_key, content, + crate::store::Provenance::AgentConsolidate)?; + let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid; + child_uuids.push((uuid, child_key.clone())); + ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len())); + } + + // Inherit edges using agent's neighbor assignments from the plan + for (child_uuid, child_key) in &child_uuids { + let neighbors = match neighbor_map.get(child_key) { + Some(n) => n, + None => continue, + }; + for neighbor_key in neighbors { + // Find the parent edge for this neighbor to inherit its strength + let parent_edge = parent_edges.iter().find(|r| { + r.source_key == *neighbor_key || r.target_key == *neighbor_key + }); + let strength = parent_edge.map(|e| e.strength).unwrap_or(0.3); + + let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) { + Some(n) => n.uuid, + None => continue, + }; + + let rel = crate::store::new_relation( + *child_uuid, neighbor_uuid, + crate::store::RelationType::Auto, strength, + child_key, neighbor_key, + ); + store.add_relation(rel).ok(); + } + } + + // Link siblings + for i in 0..child_uuids.len() { + for j in (i+1)..child_uuids.len() { + let rel = crate::store::new_relation( + child_uuids[i].0, child_uuids[j].0, + crate::store::RelationType::Auto, 0.5, + &child_uuids[i].1, &child_uuids[j].1, + ); + store.add_relation(rel).ok(); + } + } + + // Tombstone parent + if let Some(parent) = store.nodes.get_mut(parent_key.as_str()) { + parent.deleted = true; + parent.version += 1; + let tombstone = parent.clone(); + store.append_nodes(std::slice::from_ref(&tombstone)).ok(); + } + store.nodes.remove(parent_key.as_str()); + + ctx.log_line(&format!("split complete: {} → {} children", parent_key, child_uuids.len())); + store.save()?; Ok(()) }) } @@ -1359,6 +1348,32 @@ fn status_socket_loop( let is_rename = *agent_type == "rename"; let is_split = *agent_type == "split"; + + if is_split { + // Split: load candidates upfront, spawn independent + // parallel tasks — one per node, no dependencies. + let store = crate::store::Store::load().ok(); + let candidates = store.as_ref() + .map(|s| super::prompts::split_candidates(s)) + .unwrap_or_default(); + let to_split: Vec = candidates.into_iter() + .take(count) + .collect(); + for key in &to_split { + let key = key.clone(); + let task_name = format!("c-split-{}:{}", key, today); + choir.spawn(task_name) + .resource(llm) + .retries(1) + .init(move |ctx| { + job_split_one(ctx, key.clone()) + }) + .run(); + spawned += 1; + } + remaining = 0; + } + while remaining > 0 { let batch = remaining.min(batch_size); let agent = agent_type.to_string(); @@ -1369,8 +1384,6 @@ fn status_socket_loop( .init(move |ctx| { if is_rename { job_rename_agent(ctx, batch) - } else if is_split { - job_split_agent(ctx, batch) } else { job_consolidation_agent(ctx, &agent, batch) } diff --git a/poc-memory/src/agents/llm.rs b/poc-memory/src/agents/llm.rs index 8e0f41a..ab4ea55 100644 --- a/poc-memory/src/agents/llm.rs +++ b/poc-memory/src/agents/llm.rs @@ -50,7 +50,8 @@ fn call_model(agent: &str, model: &str, prompt: &str) -> Result .map_err(|e| format!("write temp prompt: {}", e))?; let mut cmd = Command::new("claude"); - cmd.args(["-p", "--model", model, "--tools", "", "--no-session-persistence"]) + cmd.args(["-p", "--model", model, "--tools", "", "--no-session-persistence", + "--strict-mcp-config"]) .stdin(fs::File::open(&tmp).map_err(|e| format!("open temp: {}", e))?) .env_remove("CLAUDECODE"); diff --git a/poc-memory/src/agents/prompts.rs b/poc-memory/src/agents/prompts.rs index affb70f..1d66938 100644 --- a/poc-memory/src/agents/prompts.rs +++ b/poc-memory/src/agents/prompts.rs @@ -317,7 +317,6 @@ pub fn split_candidates(store: &Store) -> Vec { .filter(|(key, node)| { !key.starts_with('_') && !node.deleted - && node.content.len() > 2000 && matches!(node.node_type, crate::store::NodeType::Semantic) }) .map(|(k, n)| (k.as_str(), n.content.len())) diff --git a/prompts/split-plan.md b/prompts/split-plan.md index 01610c3..a23f7d2 100644 --- a/prompts/split-plan.md +++ b/prompts/split-plan.md @@ -43,12 +43,14 @@ Output a JSON block describing the split plan: { "key": "new-key-1", "description": "Brief description of what this child covers", - "sections": ["Section Header 1", "Section Header 2"] + "sections": ["Section Header 1", "Section Header 2"], + "neighbors": ["neighbor-key-a", "neighbor-key-b"] }, { "key": "new-key-2", "description": "Brief description of what this child covers", - "sections": ["Section Header 3", "Another Section"] + "sections": ["Section Header 3", "Another Section"], + "neighbors": ["neighbor-key-c"] } ] } @@ -79,6 +81,14 @@ in each child. These don't need to be exact matches; they're hints that help the extractor know what to include. Content that spans topics or doesn't have a clear header can be mentioned in the description. +## Neighbor assignment + +The "neighbors" field assigns the parent's graph edges to each child. +Look at the neighbor list — each neighbor should go to whichever child +is most semantically related. A neighbor can appear in multiple children +if it's relevant to both. Every neighbor should be assigned to at least +one child so no graph connections are lost. + {{TOPOLOGY}} ## Node to review