agents: in-flight node exclusion prevents concurrent collisions

Track which nodes are being processed across all concurrent agents.
When an agent claims seeds, it adds them and their strongly-connected
neighbors (score = link_strength * node_weight > 0.15) to a shared
HashSet. Concurrent agents filter these out when running their query,
ensuring they work on distant parts of the graph.

This replaces the eager-visit approach with a proper scheduling
mechanism: the daemon serializes seed selection while parallelizing
LLM work. The in-flight set is released on completion (or error).

Previously: core-personality rewritten 12x, irc-regulars 10x, same
node superseded 12x — concurrent agents all selected the same
high-degree hub nodes. Now they'll spread across the graph.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Kent Overstreet 2026-03-20 12:45:24 -04:00
parent 3fc108a251
commit d0f126b709
4 changed files with 128 additions and 18 deletions

View file

@ -80,24 +80,96 @@ fn job_targeted_agent(
} }
/// Run a single consolidation agent (replay, linker, separator, transfer, health). /// Run a single consolidation agent (replay, linker, separator, transfer, health).
/// Shared set of node keys currently being processed by agents.
/// Prevents concurrent agents from working on overlapping graph regions.
type InFlightNodes = Arc<Mutex<std::collections::HashSet<String>>>;
fn job_consolidation_agent( fn job_consolidation_agent(
ctx: &ExecutionContext, ctx: &ExecutionContext,
agent_type: &str, agent_type: &str,
batch_size: usize, batch_size: usize,
in_flight: &InFlightNodes,
) -> Result<(), TaskError> { ) -> Result<(), TaskError> {
let agent = agent_type.to_string(); let agent = agent_type.to_string();
let batch = batch_size; let batch = batch_size;
let job_name = format!("c-{}", agent); let job_name = format!("c-{}", agent);
let job_name2 = job_name.clone(); let job_name2 = job_name.clone();
let in_flight = Arc::clone(in_flight);
run_job(ctx, &job_name, || { run_job(ctx, &job_name, || {
ctx.log_line("loading store"); ctx.log_line("loading store");
let mut store = crate::store::Store::load()?; let mut store = crate::store::Store::load()?;
ctx.log_line(&format!("running agent: {} (batch={})", agent, batch));
// Claim seeds: lock in-flight set, run query excluding it,
// add selected seeds + strongly-connected neighbors, then unlock.
let mut claimed_keys: Vec<String>;
let graph = store.build_graph();
{
let mut locked = in_flight.lock().unwrap();
ctx.log_line(&format!("running agent: {} (batch={}, {} in-flight)",
agent, batch, locked.len()));
// Run the agent's query, filtering out in-flight nodes
let def = super::defs::get_def(&agent)
.ok_or_else(|| format!("no .agent file for {}", agent))?;
let query = &def.query;
let keys = if !query.is_empty() {
use crate::query::engine as search;
let mut stages = search::Stage::parse_pipeline(query)?;
let padded = batch + locked.len().min(100);
if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) {
stages.push(search::Stage::Transform(search::Transform::Limit(padded)));
}
let results = search::run_query(&stages, vec![], &graph, &store, false, padded);
results.into_iter()
.map(|(k, _)| k)
.filter(|k| !locked.contains(k))
.take(batch)
.collect::<Vec<_>>()
} else {
vec![]
};
if keys.is_empty() {
return Err("query returned no results (after exclusion)".into());
}
// Claim seeds + strongly-connected neighbors.
// Only exclude neighbors with score > threshold to avoid
// blacking out the graph via high-degree hub nodes.
claimed_keys = Vec::with_capacity(batch * 20);
for key in &keys {
claimed_keys.push(key.clone());
locked.insert(key.clone());
for (nbr, strength) in graph.neighbors(key) {
let weight = store.nodes.get(nbr.as_str())
.map(|n| n.weight).unwrap_or(0.1);
if strength * weight > 0.15 {
claimed_keys.push(nbr.clone());
locked.insert(nbr.clone());
}
}
}
}
// in_flight lock released — run LLM without holding it
let log = |msg: &str| { let log = |msg: &str| {
ctx.log_line(msg); ctx.log_line(msg);
log_event(&job_name2, "progress", msg); log_event(&job_name2, "progress", msg);
}; };
super::knowledge::run_and_apply_with_log(&mut store, &agent, batch, "consolidate", &log)?; // Use run_one_agent_with_keys — we already selected seeds above,
// no need to re-run the query.
let result = super::knowledge::run_one_agent_with_keys(
&mut store, &agent, &claimed_keys, batch, "consolidate", &log, false,
).map(|_| ());
// Release all claimed keys (seeds + neighbors)
{
let mut locked = in_flight.lock().unwrap();
for key in &claimed_keys {
locked.remove(key);
}
}
result?;
ctx.log_line("done"); ctx.log_line("done");
Ok(()) Ok(())
}) })
@ -661,6 +733,10 @@ pub fn run_daemon() -> Result<(), String> {
let graph_health: Arc<Mutex<Option<GraphHealth>>> = Arc::new(Mutex::new(None)); let graph_health: Arc<Mutex<Option<GraphHealth>>> = Arc::new(Mutex::new(None));
// Nodes currently being processed by agents — prevents concurrent
// agents from working on overlapping graph regions.
let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new()));
log_event("daemon", "started", &format!("pid {}", std::process::id())); log_event("daemon", "started", &format!("pid {}", std::process::id()));
eprintln!("poc-memory daemon started (pid {})", std::process::id()); eprintln!("poc-memory daemon started (pid {})", std::process::id());
@ -932,6 +1008,7 @@ pub fn run_daemon() -> Result<(), String> {
let llm_sched = Arc::clone(&llm); let llm_sched = Arc::clone(&llm);
let last_daily_sched = Arc::clone(&last_daily); let last_daily_sched = Arc::clone(&last_daily);
let graph_health_sched = Arc::clone(&graph_health); let graph_health_sched = Arc::clone(&graph_health);
let in_flight_sched = Arc::clone(&in_flight);
let log_dir_sched = task_log_dir.clone(); let log_dir_sched = task_log_dir.clone();
const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours
@ -990,13 +1067,14 @@ pub fn run_daemon() -> Result<(), String> {
for (i, (agent_type, batch)) in runs.iter().enumerate() { for (i, (agent_type, batch)) in runs.iter().enumerate() {
let agent = agent_type.to_string(); let agent = agent_type.to_string();
let b = *batch; let b = *batch;
let in_flight_clone = Arc::clone(&in_flight_sched);
let task_name = format!("c-{}-{}:{}", agent, i, today); let task_name = format!("c-{}-{}:{}", agent, i, today);
let mut builder = choir_sched.spawn(task_name) let mut builder = choir_sched.spawn(task_name)
.resource(&llm_sched) .resource(&llm_sched)
.log_dir(&log_dir_sched) .log_dir(&log_dir_sched)
.retries(1) .retries(1)
.init(move |ctx| { .init(move |ctx| {
job_consolidation_agent(ctx, &agent, b) job_consolidation_agent(ctx, &agent, b, &in_flight_clone)
}); });
if let Some(dep) = prev_by_type.get(*agent_type) { if let Some(dep) = prev_by_type.get(*agent_type) {
builder.depend_on(dep); builder.depend_on(dep);
@ -1107,6 +1185,7 @@ pub fn run_daemon() -> Result<(), String> {
let choir_rpc = Arc::clone(&choir); let choir_rpc = Arc::clone(&choir);
let llm_rpc = Arc::clone(&llm); let llm_rpc = Arc::clone(&llm);
let log_dir_rpc = task_log_dir.clone(); let log_dir_rpc = task_log_dir.clone();
let in_flight_rpc = Arc::clone(&in_flight);
daemon.add_rpc_handler(move |cmd, _ctx| { daemon.add_rpc_handler(move |cmd, _ctx| {
if !cmd.starts_with("run-agent ") { return None; } if !cmd.starts_with("run-agent ") { return None; }
let parts: Vec<&str> = cmd.splitn(4, ' ').collect(); let parts: Vec<&str> = cmd.splitn(4, ' ').collect();
@ -1175,6 +1254,7 @@ pub fn run_daemon() -> Result<(), String> {
while remaining > 0 { while remaining > 0 {
let batch = remaining.min(batch_size); let batch = remaining.min(batch_size);
let agent = agent_type.to_string(); let agent = agent_type.to_string();
let in_flight_clone = Arc::clone(&in_flight_rpc);
let task_name = format!("c-{}-rpc{}:{}", agent, ts, today); let task_name = format!("c-{}-rpc{}:{}", agent, ts, today);
let mut builder = choir_rpc.spawn(task_name) let mut builder = choir_rpc.spawn(task_name)
.resource(&llm_rpc) .resource(&llm_rpc)
@ -1184,7 +1264,7 @@ pub fn run_daemon() -> Result<(), String> {
if is_rename { if is_rename {
job_rename_agent(ctx, batch) job_rename_agent(ctx, batch)
} else { } else {
job_consolidation_agent(ctx, &agent, batch) job_consolidation_agent(ctx, &agent, batch, &in_flight_clone)
} }
}); });
if let Some(ref dep) = prev { if let Some(ref dep) = prev {

View file

@ -404,10 +404,13 @@ pub fn resolve_placeholders(
} }
/// Run a config-driven agent: query → resolve placeholders → prompt. /// Run a config-driven agent: query → resolve placeholders → prompt.
/// `exclude` filters out nodes (and their neighborhoods) already being
/// worked on by other agents, preventing concurrent collisions.
pub fn run_agent( pub fn run_agent(
store: &Store, store: &Store,
def: &AgentDef, def: &AgentDef,
count: usize, count: usize,
exclude: &std::collections::HashSet<String>,
) -> Result<super::prompts::AgentBatch, String> { ) -> Result<super::prompts::AgentBatch, String> {
let graph = store.build_graph(); let graph = store.build_graph();
@ -417,13 +420,20 @@ pub fn run_agent(
let has_limit = stages.iter().any(|s| let has_limit = stages.iter().any(|s|
matches!(s, search::Stage::Transform(search::Transform::Limit(_)))); matches!(s, search::Stage::Transform(search::Transform::Limit(_))));
if !has_limit { if !has_limit {
stages.push(search::Stage::Transform(search::Transform::Limit(count))); // Request extra results to compensate for exclusion filtering
let padded = count + exclude.len().min(100);
stages.push(search::Stage::Transform(search::Transform::Limit(padded)));
} }
let results = search::run_query(&stages, vec![], &graph, store, false, count); let results = search::run_query(&stages, vec![], &graph, store, false, count + exclude.len().min(100));
if results.is_empty() { let filtered: Vec<String> = results.into_iter()
return Err(format!("{}: query returned no results", def.agent)); .map(|(k, _)| k)
.filter(|k| !exclude.contains(k))
.take(count)
.collect();
if filtered.is_empty() {
return Err(format!("{}: query returned no results (after exclusion)", def.agent));
} }
results.into_iter().map(|(k, _)| k).collect::<Vec<_>>() filtered
} else { } else {
vec![] vec![]
}; };

View file

@ -41,7 +41,20 @@ pub fn run_and_apply_with_log(
llm_tag: &str, llm_tag: &str,
log: &dyn Fn(&str), log: &dyn Fn(&str),
) -> Result<(), String> { ) -> Result<(), String> {
let result = run_one_agent(store, agent_name, batch_size, llm_tag, log, false)?; run_and_apply_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default())
}
/// Like run_and_apply_with_log but with an in-flight exclusion set.
/// Returns the keys that were processed (for the daemon to track).
pub fn run_and_apply_excluded(
store: &mut Store,
agent_name: &str,
batch_size: usize,
llm_tag: &str,
log: &dyn Fn(&str),
exclude: &std::collections::HashSet<String>,
) -> Result<(), String> {
let result = run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, false, exclude)?;
// Mark conversation segments as mined after successful processing // Mark conversation segments as mined after successful processing
if agent_name == "observation" { if agent_name == "observation" {
@ -88,18 +101,25 @@ pub fn run_one_agent(
llm_tag: &str, llm_tag: &str,
log: &dyn Fn(&str), log: &dyn Fn(&str),
debug: bool, debug: bool,
) -> Result<AgentResult, String> {
run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, debug, &Default::default())
}
/// Like run_one_agent but excludes nodes currently being worked on by other agents.
pub fn run_one_agent_excluded(
store: &mut Store,
agent_name: &str,
batch_size: usize,
llm_tag: &str,
log: &dyn Fn(&str),
debug: bool,
exclude: &std::collections::HashSet<String>,
) -> Result<AgentResult, String> { ) -> Result<AgentResult, String> {
let def = super::defs::get_def(agent_name) let def = super::defs::get_def(agent_name)
.ok_or_else(|| format!("no .agent file for {}", agent_name))?; .ok_or_else(|| format!("no .agent file for {}", agent_name))?;
log("building prompt"); log("building prompt");
let agent_batch = super::defs::run_agent(store, &def, batch_size)?; let agent_batch = super::defs::run_agent(store, &def, batch_size, exclude)?;
// Eagerly record visits so concurrent agents pick different seeds.
// The not-visited query filter checks this timestamp.
if !agent_batch.node_keys.is_empty() {
store.record_agent_visits(&agent_batch.node_keys, agent_name).ok();
}
run_one_agent_inner(store, agent_name, &def, agent_batch, llm_tag, log, debug) run_one_agent_inner(store, agent_name, &def, agent_batch, llm_tag, log, debug)
} }

View file

@ -428,5 +428,5 @@ pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<()
pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<AgentBatch, String> { pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result<AgentBatch, String> {
let def = super::defs::get_def(agent) let def = super::defs::get_def(agent)
.ok_or_else(|| format!("Unknown agent: {}", agent))?; .ok_or_else(|| format!("Unknown agent: {}", agent))?;
super::defs::run_agent(store, &def, count) super::defs::run_agent(store, &def, count, &Default::default())
} }