From d0f126b709a041ef62e9a372afc666e898bdc759 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Fri, 20 Mar 2026 12:45:24 -0400 Subject: [PATCH] agents: in-flight node exclusion prevents concurrent collisions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Track which nodes are being processed across all concurrent agents. When an agent claims seeds, it adds them and their strongly-connected neighbors (score = link_strength * node_weight > 0.15) to a shared HashSet. Concurrent agents filter these out when running their query, ensuring they work on distant parts of the graph. This replaces the eager-visit approach with a proper scheduling mechanism: the daemon serializes seed selection while parallelizing LLM work. The in-flight set is released on completion (or error). Previously: core-personality rewritten 12x, irc-regulars 10x, same node superseded 12x — concurrent agents all selected the same high-degree hub nodes. Now they'll spread across the graph. Co-Authored-By: Claude Opus 4.6 (1M context) --- poc-memory/src/agents/daemon.rs | 88 ++++++++++++++++++++++++++++-- poc-memory/src/agents/defs.rs | 20 +++++-- poc-memory/src/agents/knowledge.rs | 36 +++++++++--- poc-memory/src/agents/prompts.rs | 2 +- 4 files changed, 128 insertions(+), 18 deletions(-) diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index 6aa71c9..5137fe4 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -80,24 +80,96 @@ fn job_targeted_agent( } /// Run a single consolidation agent (replay, linker, separator, transfer, health). +/// Shared set of node keys currently being processed by agents. +/// Prevents concurrent agents from working on overlapping graph regions. +type InFlightNodes = Arc>>; + fn job_consolidation_agent( ctx: &ExecutionContext, agent_type: &str, batch_size: usize, + in_flight: &InFlightNodes, ) -> Result<(), TaskError> { let agent = agent_type.to_string(); let batch = batch_size; let job_name = format!("c-{}", agent); let job_name2 = job_name.clone(); + let in_flight = Arc::clone(in_flight); run_job(ctx, &job_name, || { ctx.log_line("loading store"); let mut store = crate::store::Store::load()?; - ctx.log_line(&format!("running agent: {} (batch={})", agent, batch)); + + // Claim seeds: lock in-flight set, run query excluding it, + // add selected seeds + strongly-connected neighbors, then unlock. + let mut claimed_keys: Vec; + let graph = store.build_graph(); + { + let mut locked = in_flight.lock().unwrap(); + ctx.log_line(&format!("running agent: {} (batch={}, {} in-flight)", + agent, batch, locked.len())); + + // Run the agent's query, filtering out in-flight nodes + let def = super::defs::get_def(&agent) + .ok_or_else(|| format!("no .agent file for {}", agent))?; + let query = &def.query; + let keys = if !query.is_empty() { + use crate::query::engine as search; + let mut stages = search::Stage::parse_pipeline(query)?; + let padded = batch + locked.len().min(100); + if !stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))) { + stages.push(search::Stage::Transform(search::Transform::Limit(padded))); + } + let results = search::run_query(&stages, vec![], &graph, &store, false, padded); + results.into_iter() + .map(|(k, _)| k) + .filter(|k| !locked.contains(k)) + .take(batch) + .collect::>() + } else { + vec![] + }; + if keys.is_empty() { + return Err("query returned no results (after exclusion)".into()); + } + + // Claim seeds + strongly-connected neighbors. + // Only exclude neighbors with score > threshold to avoid + // blacking out the graph via high-degree hub nodes. + claimed_keys = Vec::with_capacity(batch * 20); + for key in &keys { + claimed_keys.push(key.clone()); + locked.insert(key.clone()); + for (nbr, strength) in graph.neighbors(key) { + let weight = store.nodes.get(nbr.as_str()) + .map(|n| n.weight).unwrap_or(0.1); + if strength * weight > 0.15 { + claimed_keys.push(nbr.clone()); + locked.insert(nbr.clone()); + } + } + } + } + // in_flight lock released — run LLM without holding it + let log = |msg: &str| { ctx.log_line(msg); log_event(&job_name2, "progress", msg); }; - super::knowledge::run_and_apply_with_log(&mut store, &agent, batch, "consolidate", &log)?; + // Use run_one_agent_with_keys — we already selected seeds above, + // no need to re-run the query. + let result = super::knowledge::run_one_agent_with_keys( + &mut store, &agent, &claimed_keys, batch, "consolidate", &log, false, + ).map(|_| ()); + + // Release all claimed keys (seeds + neighbors) + { + let mut locked = in_flight.lock().unwrap(); + for key in &claimed_keys { + locked.remove(key); + } + } + + result?; ctx.log_line("done"); Ok(()) }) @@ -661,6 +733,10 @@ pub fn run_daemon() -> Result<(), String> { let graph_health: Arc>> = Arc::new(Mutex::new(None)); + // Nodes currently being processed by agents — prevents concurrent + // agents from working on overlapping graph regions. + let in_flight: InFlightNodes = Arc::new(Mutex::new(std::collections::HashSet::new())); + log_event("daemon", "started", &format!("pid {}", std::process::id())); eprintln!("poc-memory daemon started (pid {})", std::process::id()); @@ -932,6 +1008,7 @@ pub fn run_daemon() -> Result<(), String> { let llm_sched = Arc::clone(&llm); let last_daily_sched = Arc::clone(&last_daily); let graph_health_sched = Arc::clone(&graph_health); + let in_flight_sched = Arc::clone(&in_flight); let log_dir_sched = task_log_dir.clone(); const CONSOLIDATION_INTERVAL: Duration = Duration::from_secs(6 * 3600); // 6 hours @@ -990,13 +1067,14 @@ pub fn run_daemon() -> Result<(), String> { for (i, (agent_type, batch)) in runs.iter().enumerate() { let agent = agent_type.to_string(); let b = *batch; + let in_flight_clone = Arc::clone(&in_flight_sched); let task_name = format!("c-{}-{}:{}", agent, i, today); let mut builder = choir_sched.spawn(task_name) .resource(&llm_sched) .log_dir(&log_dir_sched) .retries(1) .init(move |ctx| { - job_consolidation_agent(ctx, &agent, b) + job_consolidation_agent(ctx, &agent, b, &in_flight_clone) }); if let Some(dep) = prev_by_type.get(*agent_type) { builder.depend_on(dep); @@ -1107,6 +1185,7 @@ pub fn run_daemon() -> Result<(), String> { let choir_rpc = Arc::clone(&choir); let llm_rpc = Arc::clone(&llm); let log_dir_rpc = task_log_dir.clone(); + let in_flight_rpc = Arc::clone(&in_flight); daemon.add_rpc_handler(move |cmd, _ctx| { if !cmd.starts_with("run-agent ") { return None; } let parts: Vec<&str> = cmd.splitn(4, ' ').collect(); @@ -1175,6 +1254,7 @@ pub fn run_daemon() -> Result<(), String> { while remaining > 0 { let batch = remaining.min(batch_size); let agent = agent_type.to_string(); + let in_flight_clone = Arc::clone(&in_flight_rpc); let task_name = format!("c-{}-rpc{}:{}", agent, ts, today); let mut builder = choir_rpc.spawn(task_name) .resource(&llm_rpc) @@ -1184,7 +1264,7 @@ pub fn run_daemon() -> Result<(), String> { if is_rename { job_rename_agent(ctx, batch) } else { - job_consolidation_agent(ctx, &agent, batch) + job_consolidation_agent(ctx, &agent, batch, &in_flight_clone) } }); if let Some(ref dep) = prev { diff --git a/poc-memory/src/agents/defs.rs b/poc-memory/src/agents/defs.rs index e5bd780..d94790e 100644 --- a/poc-memory/src/agents/defs.rs +++ b/poc-memory/src/agents/defs.rs @@ -404,10 +404,13 @@ pub fn resolve_placeholders( } /// Run a config-driven agent: query → resolve placeholders → prompt. +/// `exclude` filters out nodes (and their neighborhoods) already being +/// worked on by other agents, preventing concurrent collisions. pub fn run_agent( store: &Store, def: &AgentDef, count: usize, + exclude: &std::collections::HashSet, ) -> Result { let graph = store.build_graph(); @@ -417,13 +420,20 @@ pub fn run_agent( let has_limit = stages.iter().any(|s| matches!(s, search::Stage::Transform(search::Transform::Limit(_)))); if !has_limit { - stages.push(search::Stage::Transform(search::Transform::Limit(count))); + // Request extra results to compensate for exclusion filtering + let padded = count + exclude.len().min(100); + stages.push(search::Stage::Transform(search::Transform::Limit(padded))); } - let results = search::run_query(&stages, vec![], &graph, store, false, count); - if results.is_empty() { - return Err(format!("{}: query returned no results", def.agent)); + let results = search::run_query(&stages, vec![], &graph, store, false, count + exclude.len().min(100)); + let filtered: Vec = results.into_iter() + .map(|(k, _)| k) + .filter(|k| !exclude.contains(k)) + .take(count) + .collect(); + if filtered.is_empty() { + return Err(format!("{}: query returned no results (after exclusion)", def.agent)); } - results.into_iter().map(|(k, _)| k).collect::>() + filtered } else { vec![] }; diff --git a/poc-memory/src/agents/knowledge.rs b/poc-memory/src/agents/knowledge.rs index 64aad7e..9687960 100644 --- a/poc-memory/src/agents/knowledge.rs +++ b/poc-memory/src/agents/knowledge.rs @@ -41,7 +41,20 @@ pub fn run_and_apply_with_log( llm_tag: &str, log: &dyn Fn(&str), ) -> Result<(), String> { - let result = run_one_agent(store, agent_name, batch_size, llm_tag, log, false)?; + run_and_apply_excluded(store, agent_name, batch_size, llm_tag, log, &Default::default()) +} + +/// Like run_and_apply_with_log but with an in-flight exclusion set. +/// Returns the keys that were processed (for the daemon to track). +pub fn run_and_apply_excluded( + store: &mut Store, + agent_name: &str, + batch_size: usize, + llm_tag: &str, + log: &dyn Fn(&str), + exclude: &std::collections::HashSet, +) -> Result<(), String> { + let result = run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, false, exclude)?; // Mark conversation segments as mined after successful processing if agent_name == "observation" { @@ -88,18 +101,25 @@ pub fn run_one_agent( llm_tag: &str, log: &dyn Fn(&str), debug: bool, +) -> Result { + run_one_agent_excluded(store, agent_name, batch_size, llm_tag, log, debug, &Default::default()) +} + +/// Like run_one_agent but excludes nodes currently being worked on by other agents. +pub fn run_one_agent_excluded( + store: &mut Store, + agent_name: &str, + batch_size: usize, + llm_tag: &str, + log: &dyn Fn(&str), + debug: bool, + exclude: &std::collections::HashSet, ) -> Result { let def = super::defs::get_def(agent_name) .ok_or_else(|| format!("no .agent file for {}", agent_name))?; log("building prompt"); - let agent_batch = super::defs::run_agent(store, &def, batch_size)?; - - // Eagerly record visits so concurrent agents pick different seeds. - // The not-visited query filter checks this timestamp. - if !agent_batch.node_keys.is_empty() { - store.record_agent_visits(&agent_batch.node_keys, agent_name).ok(); - } + let agent_batch = super::defs::run_agent(store, &def, batch_size, exclude)?; run_one_agent_inner(store, agent_name, &def, agent_batch, llm_tag, log, debug) } diff --git a/poc-memory/src/agents/prompts.rs b/poc-memory/src/agents/prompts.rs index b57d66f..e2d6b26 100644 --- a/poc-memory/src/agents/prompts.rs +++ b/poc-memory/src/agents/prompts.rs @@ -428,5 +428,5 @@ pub fn consolidation_batch(store: &Store, count: usize, auto: bool) -> Result<() pub fn agent_prompt(store: &Store, agent: &str, count: usize) -> Result { let def = super::defs::get_def(agent) .ok_or_else(|| format!("Unknown agent: {}", agent))?; - super::defs::run_agent(store, &def, count) + super::defs::run_agent(store, &def, count, &Default::default()) }