split agent: two-phase node decomposition for memory consolidation
Phase 1 sends a large node with its neighbor communities to the LLM
and gets back a JSON split plan (child keys, descriptions, section
hints). Phase 2 fires one extraction call per child in parallel —
each gets the full parent content and extracts/reorganizes just its
portion.
This handles arbitrarily large nodes because output is always
proportional to one child, not the whole parent. Tested on the kent
node (19K chars → 3 children totaling 20K chars with clean topic
separation).
New files:
prompts/split-plan.md — phase 1 planning prompt
prompts/split-extract.md — phase 2 extraction prompt
prompts/split.md — original single-phase (kept for reference)
Modified:
agents/prompts.rs — split_candidates(), split_plan_prompt(),
split_extract_prompt(), agent_prompt "split" arm
agents/daemon.rs — job_split_agent() two-phase implementation,
RPC dispatch for "split" agent type
tui.rs — added "split" to AGENT_TYPES
This commit is contained in:
parent
4c973183c4
commit
ca62692a28
6 changed files with 515 additions and 2 deletions
|
|
@ -233,6 +233,221 @@ fn job_rename_agent(
|
|||
})
|
||||
}
|
||||
|
||||
/// Run the split agent: two-phase decomposition of large nodes.
|
||||
///
|
||||
/// Phase 1: Send node + neighbors to LLM, get back a JSON split plan
|
||||
/// (child keys, descriptions, section hints).
|
||||
/// Phase 2: For each child, send parent content + child description to LLM,
|
||||
/// get back the extracted/reorganized content for that child.
|
||||
///
|
||||
/// This handles arbitrarily large nodes because the output of each phase 2
|
||||
/// call is proportional to one child, not the whole parent.
|
||||
fn job_split_agent(
|
||||
ctx: &ExecutionContext,
|
||||
batch_size: usize,
|
||||
) -> Result<(), TaskError> {
|
||||
run_job(ctx, "c-split", || {
|
||||
ctx.log_line("loading store");
|
||||
let mut store = crate::store::Store::load()?;
|
||||
|
||||
let count = if batch_size == 0 { 1 } else { batch_size };
|
||||
let candidates = super::prompts::split_candidates(&store);
|
||||
if candidates.is_empty() {
|
||||
ctx.log_line("no nodes large enough to split");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut total_split = 0;
|
||||
let mut total_kept = 0;
|
||||
|
||||
for parent_key in candidates.iter().take(count) {
|
||||
ctx.log_line(&format!("--- splitting: {} ({} chars)",
|
||||
parent_key,
|
||||
store.nodes.get(parent_key).map(|n| n.content.len()).unwrap_or(0)));
|
||||
|
||||
// Phase 1: get split plan
|
||||
let plan_prompt = super::prompts::split_plan_prompt(&store, parent_key)?;
|
||||
ctx.log_line(&format!("phase 1: plan prompt {} chars", plan_prompt.len()));
|
||||
|
||||
let plan_response = super::llm::call_sonnet("split-plan", &plan_prompt)?;
|
||||
let plan = match super::llm::parse_json_response(&plan_response) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
ctx.log_line(&format!("phase 1 parse error: {}", e));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let action = plan.get("action").and_then(|v| v.as_str()).unwrap_or("");
|
||||
if action == "keep" {
|
||||
let reason = plan.get("reason").and_then(|v| v.as_str()).unwrap_or("");
|
||||
ctx.log_line(&format!("keep: {} ({})", parent_key, reason));
|
||||
total_kept += 1;
|
||||
continue;
|
||||
}
|
||||
if action != "split" {
|
||||
ctx.log_line(&format!("unexpected action: {}", action));
|
||||
continue;
|
||||
}
|
||||
|
||||
let children_plan = match plan.get("children").and_then(|v| v.as_array()) {
|
||||
Some(c) if c.len() >= 2 => c,
|
||||
_ => {
|
||||
ctx.log_line("plan has fewer than 2 children, skipping");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
ctx.log_line(&format!("phase 1: {} children planned", children_plan.len()));
|
||||
for child in children_plan {
|
||||
let key = child.get("key").and_then(|v| v.as_str()).unwrap_or("?");
|
||||
let desc = child.get("description").and_then(|v| v.as_str()).unwrap_or("");
|
||||
ctx.log_line(&format!(" planned: {} — {}", key, desc));
|
||||
}
|
||||
|
||||
// Phase 2: extract content for each child
|
||||
let mut children: Vec<(String, String)> = Vec::new();
|
||||
|
||||
for child_plan in children_plan {
|
||||
let child_key = match child_plan.get("key").and_then(|v| v.as_str()) {
|
||||
Some(k) => k.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
let child_desc = child_plan.get("description")
|
||||
.and_then(|v| v.as_str()).unwrap_or("");
|
||||
let child_sections = child_plan.get("sections")
|
||||
.and_then(|v| v.as_array())
|
||||
.map(|arr| arr.iter()
|
||||
.filter_map(|v| v.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "))
|
||||
.unwrap_or_default();
|
||||
|
||||
ctx.log_line(&format!("phase 2: extracting {}", child_key));
|
||||
|
||||
let extract_prompt = super::prompts::split_extract_prompt(
|
||||
&store, parent_key, &child_key, child_desc, &child_sections)?;
|
||||
ctx.log_line(&format!(" extract prompt: {} chars", extract_prompt.len()));
|
||||
|
||||
let content = match super::llm::call_sonnet("split-extract", &extract_prompt) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
ctx.log_line(&format!(" extract error: {}", e));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
ctx.log_line(&format!(" extracted: {} chars", content.len()));
|
||||
children.push((child_key, content));
|
||||
}
|
||||
|
||||
if children.len() < 2 {
|
||||
ctx.log_line(&format!("only {} children extracted, skipping", children.len()));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Collect parent's edges before modifications
|
||||
let parent_edges: Vec<_> = store.relations.iter()
|
||||
.filter(|r| !r.deleted && (r.source_key == *parent_key || r.target_key == *parent_key))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Create child nodes
|
||||
let mut child_uuids: Vec<([u8; 16], String)> = Vec::new();
|
||||
for (child_key, content) in &children {
|
||||
if store.nodes.contains_key(child_key.as_str()) {
|
||||
ctx.log_line(&format!(" skip: {} already exists", child_key));
|
||||
continue;
|
||||
}
|
||||
store.upsert_provenance(child_key, content,
|
||||
crate::store::Provenance::AgentConsolidate)?;
|
||||
let uuid = store.nodes.get(child_key.as_str()).unwrap().uuid;
|
||||
child_uuids.push((uuid, child_key.clone()));
|
||||
ctx.log_line(&format!(" created: {} ({} chars)", child_key, content.len()));
|
||||
}
|
||||
|
||||
// Inherit edges: assign each parent edge to best-matching child
|
||||
for edge in &parent_edges {
|
||||
let neighbor_key = if edge.source_key == *parent_key {
|
||||
&edge.target_key
|
||||
} else {
|
||||
&edge.source_key
|
||||
};
|
||||
let neighbor_content = store.nodes.get(neighbor_key.as_str())
|
||||
.map(|n| n.content.as_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let best = child_uuids.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, _)| {
|
||||
let sim = crate::similarity::cosine_similarity(
|
||||
&children[idx].1, neighbor_content);
|
||||
(idx, sim)
|
||||
})
|
||||
.max_by(|a, b| a.1.total_cmp(&b.1));
|
||||
|
||||
if let Some((idx, _)) = best {
|
||||
let (child_uuid, child_key) = &child_uuids[idx];
|
||||
let neighbor_uuid = match store.nodes.get(neighbor_key.as_str()) {
|
||||
Some(n) => n.uuid,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let (su, tu, sk, tk) = if edge.source_key == *parent_key {
|
||||
(*child_uuid, neighbor_uuid, child_key.as_str(), neighbor_key.as_str())
|
||||
} else {
|
||||
(neighbor_uuid, *child_uuid, neighbor_key.as_str(), child_key.as_str())
|
||||
};
|
||||
|
||||
let rel = crate::store::new_relation(
|
||||
su, tu, crate::store::RelationType::Auto, edge.strength, sk, tk,
|
||||
);
|
||||
store.add_relation(rel).ok();
|
||||
}
|
||||
}
|
||||
|
||||
// Link siblings
|
||||
for i in 0..child_uuids.len() {
|
||||
for j in (i+1)..child_uuids.len() {
|
||||
let rel = crate::store::new_relation(
|
||||
child_uuids[i].0, child_uuids[j].0,
|
||||
crate::store::RelationType::Auto, 0.5,
|
||||
&child_uuids[i].1, &child_uuids[j].1,
|
||||
);
|
||||
store.add_relation(rel).ok();
|
||||
}
|
||||
}
|
||||
|
||||
// Tombstone parent
|
||||
if let Some(parent) = store.nodes.get_mut(parent_key) {
|
||||
parent.deleted = true;
|
||||
parent.version += 1;
|
||||
let tombstone = parent.clone();
|
||||
store.append_nodes(std::slice::from_ref(&tombstone)).ok();
|
||||
}
|
||||
store.nodes.remove(parent_key);
|
||||
|
||||
ctx.log_line(&format!("split complete: {} → {} children", parent_key, child_uuids.len()));
|
||||
total_split += 1;
|
||||
|
||||
// Save after each split so we don't lose work
|
||||
store.save()?;
|
||||
}
|
||||
|
||||
// Store report
|
||||
let ts = crate::store::format_datetime(crate::store::now_epoch())
|
||||
.replace([':', '-', 'T'], "");
|
||||
let report_key = format!("_consolidation-split-{}", ts);
|
||||
let report = format!("Split agent: {} split, {} kept out of {} candidates",
|
||||
total_split, total_kept, candidates.len().min(count));
|
||||
store.upsert_provenance(&report_key, &report,
|
||||
crate::store::Provenance::AgentConsolidate).ok();
|
||||
|
||||
ctx.log_line(&format!("done: {} split, {} kept", total_split, total_kept));
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Apply consolidation actions from recent reports.
|
||||
fn job_consolidation_apply(ctx: &ExecutionContext) -> Result<(), TaskError> {
|
||||
run_job(ctx, "c-apply", || {
|
||||
|
|
@ -1143,6 +1358,7 @@ fn status_socket_loop(
|
|||
let mut remaining = count;
|
||||
|
||||
let is_rename = *agent_type == "rename";
|
||||
let is_split = *agent_type == "split";
|
||||
while remaining > 0 {
|
||||
let batch = remaining.min(batch_size);
|
||||
let agent = agent_type.to_string();
|
||||
|
|
@ -1153,6 +1369,8 @@ fn status_socket_loop(
|
|||
.init(move |ctx| {
|
||||
if is_rename {
|
||||
job_rename_agent(ctx, batch)
|
||||
} else if is_split {
|
||||
job_split_agent(ctx, batch)
|
||||
} else {
|
||||
job_consolidation_agent(ctx, &agent, batch)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -311,6 +311,85 @@ fn format_rename_candidates(store: &Store, count: usize) -> String {
|
|||
out
|
||||
}
|
||||
|
||||
/// Get split candidates sorted by size (largest first)
|
||||
pub fn split_candidates(store: &Store) -> Vec<String> {
|
||||
let mut candidates: Vec<(&str, usize)> = store.nodes.iter()
|
||||
.filter(|(key, node)| {
|
||||
!key.starts_with('_')
|
||||
&& !node.deleted
|
||||
&& node.content.len() > 2000
|
||||
})
|
||||
.map(|(k, n)| (k.as_str(), n.content.len()))
|
||||
.collect();
|
||||
candidates.sort_by(|a, b| b.1.cmp(&a.1));
|
||||
candidates.into_iter().map(|(k, _)| k.to_string()).collect()
|
||||
}
|
||||
|
||||
/// Format a single node for split-plan prompt (phase 1)
|
||||
fn format_split_plan_node(store: &Store, graph: &Graph, key: &str) -> String {
|
||||
let communities = graph.communities();
|
||||
let node = match store.nodes.get(key) {
|
||||
Some(n) => n,
|
||||
None => return format!("Node '{}' not found\n", key),
|
||||
};
|
||||
|
||||
let mut out = String::new();
|
||||
out.push_str(&format!("### {} ({} chars)\n", key, node.content.len()));
|
||||
|
||||
// Show neighbors grouped by community
|
||||
let neighbors = graph.neighbors(key);
|
||||
if !neighbors.is_empty() {
|
||||
let mut by_community: std::collections::BTreeMap<String, Vec<(&str, f32)>> =
|
||||
std::collections::BTreeMap::new();
|
||||
for (nkey, strength) in &neighbors {
|
||||
let comm = communities.get(nkey.as_str())
|
||||
.map(|c| format!("c{}", c))
|
||||
.unwrap_or_else(|| "unclustered".into());
|
||||
by_community.entry(comm)
|
||||
.or_default()
|
||||
.push((nkey.as_str(), *strength));
|
||||
}
|
||||
|
||||
out.push_str("\nNeighbors by community:\n");
|
||||
for (comm, members) in &by_community {
|
||||
out.push_str(&format!(" {} ({}):", comm, members.len()));
|
||||
for (nkey, strength) in members.iter().take(5) {
|
||||
out.push_str(&format!(" {}({:.2})", nkey, strength));
|
||||
}
|
||||
if members.len() > 5 {
|
||||
out.push_str(&format!(" +{} more", members.len() - 5));
|
||||
}
|
||||
out.push('\n');
|
||||
}
|
||||
}
|
||||
|
||||
// Full content
|
||||
out.push_str(&format!("\nContent:\n{}\n\n", node.content));
|
||||
out.push_str("---\n\n");
|
||||
out
|
||||
}
|
||||
|
||||
/// Build split-plan prompt for a single node (phase 1)
|
||||
pub fn split_plan_prompt(store: &Store, key: &str) -> Result<String, String> {
|
||||
let graph = store.build_graph();
|
||||
let topology = format_topology_header(&graph);
|
||||
let node_section = format_split_plan_node(store, &graph, key);
|
||||
load_prompt("split-plan", &[("{{TOPOLOGY}}", &topology), ("{{NODE}}", &node_section)])
|
||||
}
|
||||
|
||||
/// Build split-extract prompt for one child (phase 2)
|
||||
pub fn split_extract_prompt(store: &Store, parent_key: &str, child_key: &str, child_desc: &str, child_sections: &str) -> Result<String, String> {
|
||||
let parent_content = store.nodes.get(parent_key)
|
||||
.map(|n| n.content.as_str())
|
||||
.ok_or_else(|| format!("No node '{}'", parent_key))?;
|
||||
load_prompt("split-extract", &[
|
||||
("{{CHILD_KEY}}", child_key),
|
||||
("{{CHILD_DESC}}", child_desc),
|
||||
("{{CHILD_SECTIONS}}", child_sections),
|
||||
("{{PARENT_CONTENT}}", parent_content),
|
||||
])
|
||||
}
|
||||
|
||||
/// Run agent consolidation on top-priority nodes
|
||||
pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<(), String> {
|
||||
let graph = store.build_graph();
|
||||
|
|
@ -424,6 +503,16 @@ pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<String,
|
|||
let nodes_section = format_rename_candidates(store, count);
|
||||
load_prompt("rename", &[("{{NODES}}", &nodes_section)])
|
||||
}
|
||||
_ => Err(format!("Unknown agent: {}. Use: replay, linker, separator, transfer, health, rename", agent)),
|
||||
"split" => {
|
||||
// Phase 1: plan prompt for the largest candidate
|
||||
let candidates = split_candidates(store);
|
||||
if candidates.is_empty() {
|
||||
return Err("No nodes large enough to split".to_string());
|
||||
}
|
||||
let key = &candidates[0];
|
||||
let node_section = format_split_plan_node(store, &graph, key);
|
||||
load_prompt("split-plan", &[("{{TOPOLOGY}}", &topology), ("{{NODE}}", &node_section)])
|
||||
}
|
||||
_ => Err(format!("Unknown agent: {}. Use: replay, linker, separator, transfer, health, rename, split", agent)),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ const POLL_INTERVAL: Duration = Duration::from_secs(2);
|
|||
// Agent types we know about, in display order
|
||||
const AGENT_TYPES: &[&str] = &[
|
||||
"health", "replay", "linker", "separator", "transfer",
|
||||
"apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename",
|
||||
"apply", "orphans", "cap", "digest", "digest-links", "knowledge", "rename", "split",
|
||||
];
|
||||
|
||||
fn status_sock_path() -> PathBuf {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue