agent run: queue targeted runs to daemon, one task per node

--target and --query now queue individual daemon tasks instead of
running sequentially in the CLI. Each node gets its own choir task
with LLM resource locking. Falls back to local execution if daemon
isn't running.

RPC extended: "run-agent linker 1 target:KEY" spawns a targeted task.
This commit is contained in:
ProofOfConcept 2026-03-17 01:24:54 -04:00
parent 83a027d8be
commit 7fc1270d6f
2 changed files with 69 additions and 5 deletions

View file

@ -41,6 +41,27 @@ fn run_job(ctx: &ExecutionContext, name: &str, f: impl FnOnce() -> Result<(), St
// experience_mine and fact_mine removed — observation.agent handles all transcript mining // experience_mine and fact_mine removed — observation.agent handles all transcript mining
/// Run an agent targeted at a specific node key.
fn job_targeted_agent(
ctx: &ExecutionContext,
agent_type: &str,
target_key: &str,
) -> Result<(), TaskError> {
let agent = agent_type.to_string();
let key = target_key.to_string();
let job_name = format!("c-{}-target", agent);
run_job(ctx, &job_name, || {
let mut store = crate::store::Store::load()?;
ctx.log_line(&format!("targeting: {}", key));
let log = |msg: &str| { ctx.log_line(msg); };
super::knowledge::run_one_agent_with_keys(
&mut store, &agent, &[key.clone()], 5, "daemon", &log, false,
)?;
ctx.log_line("done");
Ok(())
})
}
/// Run a single consolidation agent (replay, linker, separator, transfer, health). /// Run a single consolidation agent (replay, linker, separator, transfer, health).
fn job_consolidation_agent( fn job_consolidation_agent(
ctx: &ExecutionContext, ctx: &ExecutionContext,
@ -1048,11 +1069,15 @@ pub fn run_daemon() -> Result<(), String> {
let llm_rpc = Arc::clone(&llm); let llm_rpc = Arc::clone(&llm);
daemon.add_rpc_handler(move |cmd, _ctx| { daemon.add_rpc_handler(move |cmd, _ctx| {
if !cmd.starts_with("run-agent ") { return None; } if !cmd.starts_with("run-agent ") { return None; }
let parts: Vec<&str> = cmd.splitn(3, ' ').collect(); let parts: Vec<&str> = cmd.splitn(4, ' ').collect();
let agent_type = parts.get(1).unwrap_or(&"replay"); let agent_type = parts.get(1).unwrap_or(&"replay");
let count: usize = parts.get(2) let count: usize = parts.get(2)
.and_then(|s| s.parse().ok()) .and_then(|s| s.parse().ok())
.unwrap_or(1); .unwrap_or(1);
// Optional target key: "run-agent linker 1 target:KEY"
let target_key: Option<String> = parts.get(3)
.and_then(|s| s.strip_prefix("target:"))
.map(|s| s.to_string());
let batch_size = 5; let batch_size = 5;
let today = chrono::Local::now().format("%Y-%m-%d"); let today = chrono::Local::now().format("%Y-%m-%d");
let ts = chrono::Local::now().format("%H%M%S"); let ts = chrono::Local::now().format("%H%M%S");
@ -1063,6 +1088,22 @@ pub fn run_daemon() -> Result<(), String> {
let is_rename = *agent_type == "rename"; let is_rename = *agent_type == "rename";
let is_split = *agent_type == "split"; let is_split = *agent_type == "split";
// Targeted run: one task for a specific node
if let Some(ref key) = target_key {
let agent = agent_type.to_string();
let key = key.clone();
let task_name = format!("c-{}-{}:{}", agent, key.chars().take(30).collect::<String>(), today);
choir_rpc.spawn(task_name)
.resource(&llm_rpc)
.retries(1)
.init(move |ctx| {
job_targeted_agent(ctx, &agent, &key)
})
.run();
spawned = 1;
remaining = 0;
}
if is_split { if is_split {
let store = crate::store::Store::load().ok(); let store = crate::store::Store::load().ok();
let candidates = store.as_ref() let candidates = store.as_ref()
@ -1128,6 +1169,10 @@ pub fn run_daemon() -> Result<(), String> {
std::process::exit(0) std::process::exit(0)
} }
pub fn send_rpc_pub(cmd: &str) -> Option<String> {
send_rpc(cmd)
}
fn send_rpc(cmd: &str) -> Option<String> { fn send_rpc(cmd: &str) -> Option<String> {
jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd) jobkit_daemon::socket::send_rpc(&crate::config::get().data_dir, cmd)
} }

View file

@ -28,10 +28,29 @@ pub fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option
}; };
if !resolved_targets.is_empty() { if !resolved_targets.is_empty() {
// Run agent once with all targets as seeds // Queue one daemon task per target node
crate::agents::knowledge::run_one_agent_with_keys( let mut queued = 0;
&mut store, agent, &resolved_targets, count, "test", &log, debug, for key in &resolved_targets {
)?; let cmd = format!("run-agent {} 1 target:{}", agent, key);
match crate::agents::daemon::send_rpc_pub(&cmd) {
Some(_) => queued += 1,
None => {
eprintln!("Daemon not running — falling back to local execution");
// Local fallback: run sequentially
for (i, key) in resolved_targets.iter().enumerate() {
eprintln!("[{}] [{}/{}] {}", agent, i + 1, resolved_targets.len(), key);
if i > 0 { store = store::Store::load()?; }
if let Err(e) = crate::agents::knowledge::run_one_agent_with_keys(
&mut store, agent, &[key.clone()], count, "test", &log, debug,
) {
eprintln!("[{}] ERROR on {}: {}", agent, key, e);
}
}
return Ok(());
}
}
}
eprintln!("[{}] queued {} tasks to daemon", agent, queued);
} else if debug { } else if debug {
crate::agents::knowledge::run_one_agent( crate::agents::knowledge::run_one_agent(
&mut store, agent, count, "test", &log, true, &mut store, agent, count, "test", &log, true,