From 7fc1270d6f49d584127c5a32a4585a8b665b393e Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Tue, 17 Mar 2026 01:24:54 -0400 Subject: [PATCH] agent run: queue targeted runs to daemon, one task per node --target and --query now queue individual daemon tasks instead of running sequentially in the CLI. Each node gets its own choir task with LLM resource locking. Falls back to local execution if daemon isn't running. RPC extended: "run-agent linker 1 target:KEY" spawns a targeted task. --- poc-memory/src/agents/daemon.rs | 47 ++++++++++++++++++++++++++++++++- poc-memory/src/cli/agent.rs | 27 ++++++++++++++++--- 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/poc-memory/src/agents/daemon.rs b/poc-memory/src/agents/daemon.rs index e097c47..e05850b 100644 --- a/poc-memory/src/agents/daemon.rs +++ b/poc-memory/src/agents/daemon.rs @@ -41,6 +41,27 @@ fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), St // experience_mine and fact_mine removed — observation.agent handles all transcript mining +/// Run an agent targeted at a specific node key. +fn job_targeted_agent( + ctx: &ExecutionContext, + agent_type: &str, + target_key: &str, +) -> Result<(), TaskError> { + let agent = agent_type.to_string(); + let key = target_key.to_string(); + let job_name = format!("c-{}-target", agent); + run_job(ctx, &job_name, || { + let mut store = crate::store::Store::load()?; + ctx.log_line(&format!("targeting: {}", key)); + let log = |msg: &str| { ctx.log_line(msg); }; + super::knowledge::run_one_agent_with_keys( + &mut store, &agent, &[key.clone()], 5, "daemon", &log, false, + )?; + ctx.log_line("done"); + Ok(()) + }) +} + /// Run a single consolidation agent (replay, linker, separator, transfer, health). fn job_consolidation_agent( ctx: &ExecutionContext, @@ -1048,11 +1069,15 @@ pub fn run_daemon() -> Result<(), String> { let llm_rpc = Arc::clone(&llm); daemon.add_rpc_handler(move |cmd, _ctx| { if !cmd.starts_with("run-agent ") { return None; } - let parts: Vec<&str> = cmd.splitn(3, ' ').collect(); + let parts: Vec<&str> = cmd.splitn(4, ' ').collect(); let agent_type = parts.get(1).unwrap_or(&"replay"); let count: usize = parts.get(2) .and_then(|s| s.parse().ok()) .unwrap_or(1); + // Optional target key: "run-agent linker 1 target:KEY" + let target_key: Option = parts.get(3) + .and_then(|s| s.strip_prefix("target:")) + .map(|s| s.to_string()); let batch_size = 5; let today = chrono::Local::now().format("%Y-%m-%d"); let ts = chrono::Local::now().format("%H%M%S"); @@ -1063,6 +1088,22 @@ pub fn run_daemon() -> Result<(), String> { let is_rename = *agent_type == "rename"; let is_split = *agent_type == "split"; + // Targeted run: one task for a specific node + if let Some(ref key) = target_key { + let agent = agent_type.to_string(); + let key = key.clone(); + let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::(), today); + choir_rpc.spawn(task_name) + .resource(&llm_rpc) + .retries(1) + .init(move |ctx| { + job_targeted_agent(ctx, &agent, &key) + }) + .run(); + spawned = 1; + remaining = 0; + } + if is_split { let store = crate::store::Store::load().ok(); let candidates = store.as_ref() @@ -1128,6 +1169,10 @@ pub fn run_daemon() -> Result<(), String> { std::process::exit(0) } +pub fn send_rpc_pub(cmd: &str) -> Option { + send_rpc(cmd) +} + fn send_rpc(cmd: &str) -> Option { jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) } diff --git a/poc-memory/src/cli/agent.rs b/poc-memory/src/cli/agent.rs index b2adc9e..38e5d5e 100644 --- a/poc-memory/src/cli/agent.rs +++ b/poc-memory/src/cli/agent.rs @@ -28,10 +28,29 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option }; if !resolved_targets.is_empty() { - // Run agent once with all targets as seeds - crate::agents::knowledge::run_one_agent_with_keys( - &mut store, agent, &resolved_targets, count, "test", &log, debug, - )?; + // Queue one daemon task per target node + let mut queued = 0; + for key in &resolved_targets { + let cmd = format!("run-agent {} 1 target:{}", agent, key); + match crate::agents::daemon::send_rpc_pub(&cmd) { + Some(_) => queued += 1, + None => { + eprintln!("Daemon not running — falling back to local execution"); + // Local fallback: run sequentially + for (i, key) in resolved_targets.iter().enumerate() { + eprintln!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key); + if i > 0 { store = store::Store::load()?; } + if let Err(e) = crate::agents::knowledge::run_one_agent_with_keys( + &mut store, agent, &[key.clone()], count, "test", &log, debug, + ) { + eprintln!("[{}] ERROR on {}: {}", agent, key, e); + } + } + return Ok(()); + } + } + } + eprintln!("[{}] queued {} tasks to daemon", agent, queued); } else if debug { crate::agents::knowledge::run_one_agent( &mut store, agent, count, "test", &log, true,