split agent: parallel execution, agent-driven edges, no MCP overhead

- Refactor split from serial batch to independent per-node tasks
  (run-agent split N spawns N parallel tasks, gated by llm_concurrency)
- Replace cosine similarity edge inheritance with agent-assigned
  neighbors in the plan JSON — the LLM already understands the
  semantic relationships, no need to approximate with bag-of-words
- Add --strict-mcp-config to claude CLI calls to skip MCP server
  startup (saves ~5s per call)
- Remove hardcoded 2000-char split threshold — let the agent decide
  what's worth splitting
- Reload store before mutations to handle concurrent split races
This commit is contained in:
ProofOfConcept 2026-03-10 03:21:33 -04:00
parent 149c289fea
commit 8bbc246b3d
4 changed files with 218 additions and 195 deletions

View file

@ -242,208 +242,197 @@ fn job_rename_agent(
///
/// This handles arbitrarily large nodes because the output of each phase 2
/// call is proportional to one child, not the whole parent.
fn job_split_agent(
/// Split a single node by key. Called as an independent task so multiple
/// splits can run in parallel. Each task loads the store fresh, checks the
/// node still exists and hasn't been split, does the LLM work, then saves.
fn job_split_one(
ctx: &ExecutionContext,
batch_size: usize,
parent_key: String,
) -> Result<(), TaskError> {
run_job(ctx, "c-split", || {
ctx.log_line("loading store");
ctx.log_line(&format!("loading store for {}", parent_key));
let mut store = crate::store::Store::load()?;
let count = if batch_size == 0 { 1 } else { batch_size };
let candidates = super::prompts::split_candidates(&store);
if candidates.is_empty() {
ctx.log_line("no nodes large enough to split");
// Check node still exists and hasn't been deleted/split already
let content_len = match store.nodes.get(parent_key.as_str()) {
Some(n) if !n.deleted => n.content.len(),
_ => {
ctx.log_line(&format!("skip: {} no longer exists or deleted", parent_key));
return Ok(());
}
};
ctx.log_line(&format!("--- splitting: {} ({} chars)", parent_key, content_len));
// Phase 1: get split plan
let plan_prompt = super::prompts::split_plan_prompt(&store, &parent_key)?;
ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len()));
let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?;
let plan = match super::llm::parse_json_response(&plan_response) {
Ok(v) => v,
Err(e) => {
ctx.log_line(&format!("phase 1 parse error: {}", e));
return Ok(());
}
};
let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or("");
if action == "keep" {
let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!("keep: {} ({})", parent_key, reason));
return Ok(());
}
if action != "split" {
ctx.log_line(&format!("unexpected action: {}", action));
return Ok(());
}
let mut total_split = 0;
let mut total_kept = 0;
for parent_key in candidates.iter().take(count) {
ctx.log_line(&format!("--- splitting: {} ({} chars)",
parent_key,
store.nodes.get(parent_key).map(|n| n.content.len()).unwrap_or(0)));
// Phase 1: get split plan
let plan_prompt = super::prompts::split_plan_prompt(&store, parent_key)?;
ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len()));
let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?;
let plan = match super::llm::parse_json_response(&plan_response) {
Ok(v) => v,
Err(e) => {
ctx.log_line(&format!("phase 1 parse error: {}", e));
continue;
}
};
let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or("");
if action == "keep" {
let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!("keep: {} ({})", parent_key, reason));
total_kept += 1;
continue;
}
if action != "split" {
ctx.log_line(&format!("unexpected action: {}", action));
continue;
let children_plan = match plan.get("children").and_then(|v| v.as_array()) {
Some(c) if c.len() >= 2 => c,
_ => {
ctx.log_line("plan has fewer than 2 children, skipping");
return Ok(());
}
};
let children_plan = match plan.get("children").and_then(|v| v.as_array()) {
Some(c) if c.len() >= 2 => c,
_ => {
ctx.log_line("plan has fewer than 2 children, skipping");
continue;
}
};
ctx.log_line(&format!("phase 1: {} children planned", children_plan.len()));
for child in children_plan {
let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?");
let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!(" planned: {}{}", key, desc));
}
// Phase 2: extract content for each child
let mut children: Vec<(String, String)> = Vec::new();
for child_plan in children_plan {
let child_key = match child_plan.get("key").and_then(|v| v.as_str()) {
Some(k) => k.to_string(),
None => continue,
};
let child_desc = child_plan.get("description")
.and_then(|v| v.as_str()).unwrap_or("");
let child_sections = child_plan.get("sections")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>()
.join(", "))
.unwrap_or_default();
ctx.log_line(&format!("phase 2: extracting {}", child_key));
let extract_prompt = super::prompts::split_extract_prompt(
&store, parent_key, &child_key, child_desc, &child_sections)?;
ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len()));
let content = match super::llm::call_sonnet("split-extract", &extract_prompt) {
Ok(c) => c,
Err(e) => {
ctx.log_line(&format!(" extract error: {}", e));
continue;
}
};
ctx.log_line(&format!(" extracted: {} chars", content.len()));
children.push((child_key, content));
}
if children.len() < 2 {
ctx.log_line(&format!("only {} children extracted, skipping", children.len()));
continue;
}
// Collect parent's edges before modifications
let parent_edges: Vec<_> = store.relations.iter()
.filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key))
.cloned()
.collect();
// Create child nodes
let mut child_uuids: Vec<([u8; 16], String)> = Vec::new();
for (child_key, content) in &children {
if store.nodes.contains_key(child_key.as_str()) {
ctx.log_line(&format!(" skip: {} already exists", child_key));
continue;
}
store.upsert_provenance(child_key, content,
crate::store::Provenance::AgentConsolidate)?;
let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid;
child_uuids.push((uuid, child_key.clone()));
ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len()));
}
// Inherit edges: assign each parent edge to best-matching child
for edge in &parent_edges {
let neighbor_key = if edge.source_key == *parent_key {
&edge.target_key
} else {
&edge.source_key
};
let neighbor_content = store.nodes.get(neighbor_key.as_str())
.map(|n| n.content.as_str())
.unwrap_or("");
let best = child_uuids.iter()
.enumerate()
.map(|(idx, _)| {
let sim = crate::similarity::cosine_similarity(
&children[idx].1, neighbor_content);
(idx, sim)
})
.max_by(|a, b| a.1.total_cmp(&b.1));
if let Some((idx, _)) = best {
let (child_uuid, child_key) = &child_uuids[idx];
let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) {
Some(n) => n.uuid,
None => continue,
};
let (su, tu, sk, tk) = if edge.source_key == *parent_key {
(*child_uuid, neighbor_uuid, child_key.as_str(), neighbor_key.as_str())
} else {
(neighbor_uuid, *child_uuid, neighbor_key.as_str(), child_key.as_str())
};
let rel = crate::store::new_relation(
su, tu, crate::store::RelationType::Auto, edge.strength, sk, tk,
);
store.add_relation(rel).ok();
}
}
// Link siblings
for i in 0..child_uuids.len() {
for j in (i+1)..child_uuids.len() {
let rel = crate::store::new_relation(
child_uuids[i].0, child_uuids[j].0,
crate::store::RelationType::Auto, 0.5,
&child_uuids[i].1, &child_uuids[j].1,
);
store.add_relation(rel).ok();
}
}
// Tombstone parent
if let Some(parent) = store.nodes.get_mut(parent_key) {
parent.deleted = true;
parent.version += 1;
let tombstone = parent.clone();
store.append_nodes(std::slice::from_ref(&tombstone)).ok();
}
store.nodes.remove(parent_key);
ctx.log_line(&format!("split complete: {}{} children", parent_key, child_uuids.len()));
total_split += 1;
// Save after each split so we don't lose work
store.save()?;
ctx.log_line(&format!("phase 1: {} children planned", children_plan.len()));
for child in children_plan {
let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?");
let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or("");
ctx.log_line(&format!(" planned: {}{}", key, desc));
}
// Store report
let ts = crate::store::format_datetime(crate::store::now_epoch())
.replace([':', '-', 'T'], "");
let report_key = format!("_consolidation-split-{}", ts);
let report = format!("Split agent: {} split, {} kept out of {} candidates",
total_split, total_kept, candidates.len().min(count));
store.upsert_provenance(&report_key, &report,
crate::store::Provenance::AgentConsolidate).ok();
// Phase 2: extract content for each child
let mut children: Vec<(String, String)> = Vec::new();
// Collect neighbor assignments from plan: child_key -> [neighbor_keys]
let mut neighbor_map: HashMap<String, Vec<String>> = HashMap::new();
ctx.log_line(&format!("done: {} split, {} kept", total_split, total_kept));
for child_plan in children_plan {
let child_key = match child_plan.get("key").and_then(|v| v.as_str()) {
Some(k) => k.to_string(),
None => continue,
};
let child_desc = child_plan.get("description")
.and_then(|v| v.as_str()).unwrap_or("");
let child_sections = child_plan.get("sections")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>()
.join(", "))
.unwrap_or_default();
let child_neighbors: Vec<String> = child_plan.get("neighbors")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect())
.unwrap_or_default();
neighbor_map.insert(child_key.clone(), child_neighbors);
ctx.log_line(&format!("phase 2: extracting {}", child_key));
let extract_prompt = super::prompts::split_extract_prompt(
&store, &parent_key, &child_key, child_desc, &child_sections)?;
ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len()));
let content = match super::llm::call_sonnet("split-extract", &extract_prompt) {
Ok(c) => c,
Err(e) => {
ctx.log_line(&format!(" extract error: {}", e));
continue;
}
};
ctx.log_line(&format!(" extracted: {} chars", content.len()));
children.push((child_key, content));
}
if children.len() < 2 {
ctx.log_line(&format!("only {} children extracted, skipping", children.len()));
return Ok(());
}
// Reload store before mutations — another split may have saved meanwhile
store = crate::store::Store::load()?;
// Re-check parent still exists after reload
if !store.nodes.contains_key(parent_key.as_str()) ||
store.nodes.get(parent_key.as_str()).map_or(true, |n| n.deleted) {
ctx.log_line(&format!("skip: {} was split by another task", parent_key));
return Ok(());
}
// Collect parent's edges before modifications
let parent_edges: Vec<_> = store.relations.iter()
.filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key))
.cloned()
.collect();
// Create child nodes
let mut child_uuids: Vec<([u8; 16], String)> = Vec::new();
for (child_key, content) in &children {
if store.nodes.contains_key(child_key.as_str()) {
ctx.log_line(&format!(" skip: {} already exists", child_key));
continue;
}
store.upsert_provenance(child_key, content,
crate::store::Provenance::AgentConsolidate)?;
let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid;
child_uuids.push((uuid, child_key.clone()));
ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len()));
}
// Inherit edges using agent's neighbor assignments from the plan
for (child_uuid, child_key) in &child_uuids {
let neighbors = match neighbor_map.get(child_key) {
Some(n) => n,
None => continue,
};
for neighbor_key in neighbors {
// Find the parent edge for this neighbor to inherit its strength
let parent_edge = parent_edges.iter().find(|r| {
r.source_key == *neighbor_key || r.target_key == *neighbor_key
});
let strength = parent_edge.map(|e| e.strength).unwrap_or(0.3);
let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) {
Some(n) => n.uuid,
None => continue,
};
let rel = crate::store::new_relation(
*child_uuid, neighbor_uuid,
crate::store::RelationType::Auto, strength,
child_key, neighbor_key,
);
store.add_relation(rel).ok();
}
}
// Link siblings
for i in 0..child_uuids.len() {
for j in (i+1)..child_uuids.len() {
let rel = crate::store::new_relation(
child_uuids[i].0, child_uuids[j].0,
crate::store::RelationType::Auto, 0.5,
&child_uuids[i].1, &child_uuids[j].1,
);
store.add_relation(rel).ok();
}
}
// Tombstone parent
if let Some(parent) = store.nodes.get_mut(parent_key.as_str()) {
parent.deleted = true;
parent.version += 1;
let tombstone = parent.clone();
store.append_nodes(std::slice::from_ref(&tombstone)).ok();
}
store.nodes.remove(parent_key.as_str());
ctx.log_line(&format!("split complete: {}{} children", parent_key, child_uuids.len()));
store.save()?;
Ok(())
})
}
@ -1359,6 +1348,32 @@ fn status_socket_loop(
let is_rename = *agent_type == "rename";
let is_split = *agent_type == "split";
if is_split {
// Split: load candidates upfront, spawn independent
// parallel tasks — one per node, no dependencies.
let store = crate::store::Store::load().ok();
let candidates = store.as_ref()
.map(|s| super::prompts::split_candidates(s))
.unwrap_or_default();
let to_split: Vec<String> = candidates.into_iter()
.take(count)
.collect();
for key in &to_split {
let key = key.clone();
let task_name = format!("c-split-{}:{}", key, today);
choir.spawn(task_name)
.resource(llm)
.retries(1)
.init(move |ctx| {
job_split_one(ctx, key.clone())
})
.run();
spawned += 1;
}
remaining = 0;
}
while remaining > 0 {
let batch = remaining.min(batch_size);
let agent = agent_type.to_string();
@ -1369,8 +1384,6 @@ fn status_socket_loop(
.init(move |ctx| {
if is_rename {
job_rename_agent(ctx, batch)
} else if is_split {
job_split_agent(ctx, batch)
} else {
job_consolidation_agent(ctx, &agent, batch)
}