forked from kent/consciousness
Improve hook agent diagnostics
This commit is contained in:
parent
e6f9f062f0
commit
b4a8f08b9b
8 changed files with 307 additions and 36 deletions
|
|
@ -755,13 +755,28 @@ impl ResponseParser {
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
let mut parser = self;
|
let mut parser = self;
|
||||||
let agent_name = agent.state.lock().await.provenance.clone();
|
let agent_name = agent.state.lock().await.provenance.clone();
|
||||||
|
eprintln!(
|
||||||
|
"[agent:{agent_name}] parser task start branch_idx={} in_think={}",
|
||||||
|
parser.branch_idx, parser.in_think,
|
||||||
|
);
|
||||||
let log_path = format!("/tmp/poc-{}.log", agent_name);
|
let log_path = format!("/tmp/poc-{}.log", agent_name);
|
||||||
let mut log_file = std::fs::OpenOptions::new()
|
let mut log_file = std::fs::OpenOptions::new()
|
||||||
.create(true).append(true).open(&log_path).ok();
|
.create(true).append(true).open(&log_path).ok();
|
||||||
let mut full_text = String::new();
|
let mut full_text = String::new();
|
||||||
|
let mut token_count: usize = 0;
|
||||||
while let Some(event) = stream.recv().await {
|
while let Some(event) = stream.recv().await {
|
||||||
match event {
|
match event {
|
||||||
super::api::StreamToken::Token { id, readout } => {
|
super::api::StreamToken::Token { id, readout } => {
|
||||||
|
token_count += 1;
|
||||||
|
if token_count == 1 {
|
||||||
|
eprintln!("[agent:{agent_name}] parser first token id={}", id);
|
||||||
|
} else if token_count % 256 == 0 {
|
||||||
|
eprintln!(
|
||||||
|
"[agent:{agent_name}] parser token_count={} chars={}",
|
||||||
|
token_count,
|
||||||
|
full_text.len(),
|
||||||
|
);
|
||||||
|
}
|
||||||
if let Some(r) = readout {
|
if let Some(r) = readout {
|
||||||
if let Ok(mut buf) = agent.readout.lock() {
|
if let Ok(mut buf) = agent.readout.lock() {
|
||||||
buf.push(id, r);
|
buf.push(id, r);
|
||||||
|
|
@ -786,6 +801,12 @@ impl ResponseParser {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
super::api::StreamToken::Done { usage } => {
|
super::api::StreamToken::Done { usage } => {
|
||||||
|
eprintln!(
|
||||||
|
"[agent:{agent_name}] parser done token_count={} chars={} usage={:?}",
|
||||||
|
token_count,
|
||||||
|
full_text.len(),
|
||||||
|
usage,
|
||||||
|
);
|
||||||
if let Some(ref mut f) = log_file {
|
if let Some(ref mut f) = log_file {
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
let ctx = agent.context.lock().await;
|
let ctx = agent.context.lock().await;
|
||||||
|
|
@ -813,11 +834,20 @@ impl ResponseParser {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
super::api::StreamToken::Error(e) => {
|
super::api::StreamToken::Error(e) => {
|
||||||
|
eprintln!("[agent:{agent_name}] parser stream error: {}", e);
|
||||||
return Err(anyhow::anyhow!("{}", e));
|
return Err(anyhow::anyhow!("{}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
eprintln!(
|
||||||
|
"[agent:{agent_name}] parser stream closed without done token_count={} chars={}",
|
||||||
|
token_count,
|
||||||
|
full_text.len(),
|
||||||
|
);
|
||||||
|
Err(anyhow::anyhow!(
|
||||||
|
"stream closed without Done event after {} tokens",
|
||||||
|
token_count,
|
||||||
|
))
|
||||||
});
|
});
|
||||||
(rx, handle)
|
(rx, handle)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,11 @@ use context::{AstNode, ContextState, Section, Ast, PendingToolCall, ResponsePars
|
||||||
|
|
||||||
use crate::mind::log::ConversationLog;
|
use crate::mind::log::ConversationLog;
|
||||||
|
|
||||||
|
async fn agent_trace(agent: &Arc<Agent>, msg: String) {
|
||||||
|
let provenance = agent.state.lock().await.provenance.clone();
|
||||||
|
eprintln!("[agent:{provenance}] {msg}");
|
||||||
|
}
|
||||||
|
|
||||||
// --- Activity tracking (RAII guards) ---
|
// --- Activity tracking (RAII guards) ---
|
||||||
|
|
||||||
pub struct ActivityEntry {
|
pub struct ActivityEntry {
|
||||||
|
|
@ -392,10 +397,16 @@ impl Agent {
|
||||||
pub async fn turn(
|
pub async fn turn(
|
||||||
agent: Arc<Agent>,
|
agent: Arc<Agent>,
|
||||||
) -> Result<TurnResult> {
|
) -> Result<TurnResult> {
|
||||||
|
agent_trace(&agent, format!("turn start")).await;
|
||||||
|
|
||||||
// Collect finished background tools
|
// Collect finished background tools
|
||||||
{
|
{
|
||||||
let finished = agent.state.lock().await.active_tools.take_finished();
|
let finished = agent.state.lock().await.active_tools.take_finished();
|
||||||
if !finished.is_empty() {
|
if !finished.is_empty() {
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"collecting {} finished background tools",
|
||||||
|
finished.len(),
|
||||||
|
)).await;
|
||||||
let mut bg_ds = DispatchState::new();
|
let mut bg_ds = DispatchState::new();
|
||||||
let mut results = Vec::new();
|
let mut results = Vec::new();
|
||||||
for entry in finished {
|
for entry in finished {
|
||||||
|
|
@ -414,25 +425,50 @@ impl Agent {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let _thinking = start_activity(&agent, "thinking...").await;
|
let _thinking = start_activity(&agent, "thinking...").await;
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"turn loop overflow_retries={} empty_retries={}",
|
||||||
|
overflow_retries, empty_retries,
|
||||||
|
)).await;
|
||||||
|
|
||||||
let (rx, _stream_guard) = {
|
let (rx, _stream_guard) = {
|
||||||
|
agent_trace(&agent, format!("assembling prompt")).await;
|
||||||
let (chunks, images, match_upto) = agent.assemble_prompt().await;
|
let (chunks, images, match_upto) = agent.assemble_prompt().await;
|
||||||
|
let chunk_tokens: usize = chunks.iter().map(|c| match c {
|
||||||
|
context::WireChunk::Tokens(t) => t.len(),
|
||||||
|
}).sum();
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"prompt assembled chunks={} tokens={} images={} match_upto={}",
|
||||||
|
chunks.len(), chunk_tokens, images.len(), match_upto,
|
||||||
|
)).await;
|
||||||
let st = agent.state.lock().await;
|
let st = agent.state.lock().await;
|
||||||
let readout_shape = agent.readout.lock().ok().and_then(|buf| {
|
let readout_shape = agent.readout.lock().ok().and_then(|buf| {
|
||||||
buf.manifest.as_ref().map(|m| {
|
buf.manifest.as_ref().map(|m| {
|
||||||
(m.layers.len() as u32, m.concepts.len() as u32)
|
(m.layers.len() as u32, m.concepts.len() as u32)
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
let sampling = st.sampling;
|
||||||
|
let priority = st.priority;
|
||||||
|
drop(st);
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"starting stream max_tokens={} temperature={} top_p={} top_k={} priority={:?} readout_shape={:?}",
|
||||||
|
sampling.max_tokens,
|
||||||
|
sampling.temperature,
|
||||||
|
sampling.top_p,
|
||||||
|
sampling.top_k,
|
||||||
|
priority,
|
||||||
|
readout_shape,
|
||||||
|
)).await;
|
||||||
agent.client.stream_session_mm(
|
agent.client.stream_session_mm(
|
||||||
agent.grpc_session.clone(),
|
agent.grpc_session.clone(),
|
||||||
chunks,
|
chunks,
|
||||||
images,
|
images,
|
||||||
match_upto,
|
match_upto,
|
||||||
st.sampling,
|
sampling,
|
||||||
st.priority,
|
priority,
|
||||||
readout_shape,
|
readout_shape,
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
agent_trace(&agent, format!("stream task spawned")).await;
|
||||||
|
|
||||||
let branch_idx = {
|
let branch_idx = {
|
||||||
let mut ctx = agent.context.lock().await;
|
let mut ctx = agent.context.lock().await;
|
||||||
|
|
@ -446,9 +482,38 @@ impl Agent {
|
||||||
let think_native = agent.state.lock().await.think_native;
|
let think_native = agent.state.lock().await.think_native;
|
||||||
let parser = ResponseParser::new(branch_idx, think_native);
|
let parser = ResponseParser::new(branch_idx, think_native);
|
||||||
let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone());
|
let (mut tool_rx, parser_handle) = parser.run(rx, agent.clone());
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"parser started branch_idx={} think_native={}",
|
||||||
|
branch_idx, think_native,
|
||||||
|
)).await;
|
||||||
|
|
||||||
let mut pending_calls: Vec<PendingToolCall> = Vec::new();
|
let mut pending_calls: Vec<PendingToolCall> = Vec::new();
|
||||||
while let Some(call) = tool_rx.recv().await {
|
loop {
|
||||||
|
let call = match tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(15),
|
||||||
|
tool_rx.recv(),
|
||||||
|
).await {
|
||||||
|
Ok(Some(call)) => call,
|
||||||
|
Ok(None) => {
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"tool channel closed pending_calls={}",
|
||||||
|
pending_calls.len(),
|
||||||
|
)).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"waiting for parser/tool events pending_calls={}",
|
||||||
|
pending_calls.len(),
|
||||||
|
)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"tool call received id={} name={} args_len={}",
|
||||||
|
call.id, call.name, call.arguments.len(),
|
||||||
|
)).await;
|
||||||
let call_clone = call.clone();
|
let call_clone = call.clone();
|
||||||
let agent_handle = agent.clone();
|
let agent_handle = agent.clone();
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
|
|
@ -471,8 +536,10 @@ impl Agent {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for stream/parse errors
|
// Check for stream/parse errors
|
||||||
|
agent_trace(&agent, format!("awaiting parser task")).await;
|
||||||
match parser_handle.await {
|
match parser_handle.await {
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
|
agent_trace(&agent, format!("parser returned error: {:#}", e)).await;
|
||||||
if context::is_context_overflow(&e) && overflow_retries < 2 {
|
if context::is_context_overflow(&e) && overflow_retries < 2 {
|
||||||
overflow_retries += 1;
|
overflow_retries += 1;
|
||||||
let msg = format!("context overflow — compacting ({}/2)", overflow_retries);
|
let msg = format!("context overflow — compacting ({}/2)", overflow_retries);
|
||||||
|
|
@ -486,8 +553,12 @@ impl Agent {
|
||||||
}
|
}
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
Err(e) => return Err(anyhow::anyhow!("parser task panicked: {}", e)),
|
Err(e) => {
|
||||||
|
agent_trace(&agent, format!("parser task panicked: {}", e)).await;
|
||||||
|
return Err(anyhow::anyhow!("parser task panicked: {}", e));
|
||||||
|
}
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
|
agent_trace(&agent, format!("parser completed")).await;
|
||||||
// Assistant response was pushed to context by the parser;
|
// Assistant response was pushed to context by the parser;
|
||||||
// log it now that parsing is complete.
|
// log it now that parsing is complete.
|
||||||
let ctx = agent.context.lock().await;
|
let ctx = agent.context.lock().await;
|
||||||
|
|
@ -508,6 +579,10 @@ impl Agent {
|
||||||
if !has_content && pending_calls.is_empty() {
|
if !has_content && pending_calls.is_empty() {
|
||||||
if empty_retries < 2 {
|
if empty_retries < 2 {
|
||||||
empty_retries += 1;
|
empty_retries += 1;
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"empty response retry {}/2",
|
||||||
|
empty_retries,
|
||||||
|
)).await;
|
||||||
agent.push_node(AstNode::user_msg(
|
agent.push_node(AstNode::user_msg(
|
||||||
"[system] Your previous response was empty. \
|
"[system] Your previous response was empty. \
|
||||||
Please respond with text or use a tool."
|
Please respond with text or use a tool."
|
||||||
|
|
@ -521,6 +596,10 @@ impl Agent {
|
||||||
// Wait for tool calls to complete
|
// Wait for tool calls to complete
|
||||||
if !pending_calls.is_empty() {
|
if !pending_calls.is_empty() {
|
||||||
ds.had_tool_calls = true;
|
ds.had_tool_calls = true;
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"waiting for {} foreground tools",
|
||||||
|
pending_calls.len(),
|
||||||
|
)).await;
|
||||||
|
|
||||||
let handles = agent.state.lock().await.active_tools.take_foreground();
|
let handles = agent.state.lock().await.active_tools.take_foreground();
|
||||||
let mut results = Vec::new();
|
let mut results = Vec::new();
|
||||||
|
|
@ -541,6 +620,16 @@ impl Agent {
|
||||||
if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); }
|
if st.pending_model_switch.is_some() { ds.model_switch = st.pending_model_switch.take(); }
|
||||||
if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; }
|
if st.pending_dmn_pause { ds.dmn_pause = true; st.pending_dmn_pause = false; }
|
||||||
|
|
||||||
|
drop(st);
|
||||||
|
agent_trace(&agent, format!(
|
||||||
|
"turn complete yield={} tool_calls={} tool_errors={} model_switch={:?} dmn_pause={}",
|
||||||
|
ds.yield_requested,
|
||||||
|
ds.had_tool_calls,
|
||||||
|
ds.tool_errors,
|
||||||
|
ds.model_switch,
|
||||||
|
ds.dmn_pause,
|
||||||
|
)).await;
|
||||||
|
|
||||||
return Ok(TurnResult {
|
return Ok(TurnResult {
|
||||||
yield_requested: ds.yield_requested,
|
yield_requested: ds.yield_requested,
|
||||||
had_tool_calls: ds.had_tool_calls,
|
had_tool_calls: ds.had_tool_calls,
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,9 @@ use crate::subconscious::{defs, prompts};
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::io::Write as _;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use super::context::AstNode;
|
use super::context::AstNode;
|
||||||
use super::tools::{self as agent_tools};
|
use super::tools::{self as agent_tools};
|
||||||
|
|
@ -106,6 +108,10 @@ pub async fn save_agent_log(name: &str, agent: &std::sync::Arc<Agent>) -> RunSta
|
||||||
stats
|
stats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn log_agent_event(agent: &str, msg: std::fmt::Arguments) {
|
||||||
|
eprintln!("[agent:{agent}] {msg}");
|
||||||
|
}
|
||||||
|
|
||||||
fn compute_run_stats(conversation: &[super::context::AstNode]) -> RunStats {
|
fn compute_run_stats(conversation: &[super::context::AstNode]) -> RunStats {
|
||||||
use super::context::{AstNode, NodeBody};
|
use super::context::{AstNode, NodeBody};
|
||||||
|
|
||||||
|
|
@ -345,20 +351,44 @@ impl AutoAgent {
|
||||||
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
bail_fn: Option<&(dyn Fn(usize) -> Result<(), String> + Sync)>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
dbglog!("[auto] {} starting, {} steps", self.name, self.steps.len());
|
dbglog!("[auto] {} starting, {} steps", self.name, self.steps.len());
|
||||||
|
log_agent_event(&self.name, format_args!(
|
||||||
|
"starting run steps={} temperature={} priority={}",
|
||||||
|
self.steps.len(), self.temperature, self.priority));
|
||||||
|
let run_start = Instant::now();
|
||||||
|
|
||||||
for (i, step) in self.steps.iter().enumerate() {
|
for (i, step) in self.steps.iter().enumerate() {
|
||||||
self.turn = i + 1;
|
self.turn = i + 1;
|
||||||
self.current_phase = step.phase.clone();
|
self.current_phase = step.phase.clone();
|
||||||
|
let step_start = Instant::now();
|
||||||
|
log_agent_event(&self.name, format_args!(
|
||||||
|
"step {}/{} phase={} prompt_bytes={}",
|
||||||
|
i + 1, self.steps.len(), step.phase, step.prompt.len()));
|
||||||
|
|
||||||
if let Some(ref check) = bail_fn {
|
if let Some(ref check) = bail_fn {
|
||||||
|
log_agent_event(&self.name, format_args!(
|
||||||
|
"step {}/{} phase={} bail check", i + 1, self.steps.len(), step.phase));
|
||||||
check(i)?;
|
check(i)?;
|
||||||
|
log_agent_event(&self.name, format_args!(
|
||||||
|
"step {}/{} phase={} bail ok", i + 1, self.steps.len(), step.phase));
|
||||||
}
|
}
|
||||||
|
|
||||||
backend.push_node(AstNode::system_msg(&step.prompt)).await;
|
backend.push_node(AstNode::system_msg(&step.prompt)).await;
|
||||||
Agent::turn(backend.0.clone()).await
|
Agent::turn(backend.0.clone()).await
|
||||||
.map_err(|e| format!("{}: {}", self.name, e))?;
|
.map_err(|e| {
|
||||||
|
log_agent_event(&self.name, format_args!(
|
||||||
|
"step {}/{} phase={} failed after {:.2}s: {}",
|
||||||
|
i + 1, self.steps.len(), step.phase,
|
||||||
|
step_start.elapsed().as_secs_f64(), e));
|
||||||
|
format!("{}: {}", self.name, e)
|
||||||
|
})?;
|
||||||
|
log_agent_event(&self.name, format_args!(
|
||||||
|
"step {}/{} phase={} done in {:.2}s",
|
||||||
|
i + 1, self.steps.len(), step.phase,
|
||||||
|
step_start.elapsed().as_secs_f64()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log_agent_event(&self.name, format_args!(
|
||||||
|
"run completed in {:.2}s", run_start.elapsed().as_secs_f64()));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -382,8 +412,29 @@ pub async fn run_one_agent(
|
||||||
count: usize,
|
count: usize,
|
||||||
keys: Option<&[String]>,
|
keys: Option<&[String]>,
|
||||||
) -> Result<AgentResult, String> {
|
) -> Result<AgentResult, String> {
|
||||||
|
let run_start = Instant::now();
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"run_one_agent start pid={} count={} explicit_keys={}",
|
||||||
|
std::process::id(), count, keys.map(|k| k.len()).unwrap_or(0)));
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"env POC_SESSION_ID={:?} POC_TRANSCRIPT_PATH={:?} POC_AGENT_OUTPUT_DIR={:?}",
|
||||||
|
std::env::var("POC_SESSION_ID").ok(),
|
||||||
|
std::env::var("POC_TRANSCRIPT_PATH").ok(),
|
||||||
|
std::env::var("POC_AGENT_OUTPUT_DIR").ok()));
|
||||||
|
if let Some(session) = crate::session::HookSession::from_env() {
|
||||||
|
let transcript = session.transcript();
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"session={} transcript={} size={} exists={}",
|
||||||
|
session.session_id, transcript.path, transcript.size, transcript.exists()));
|
||||||
|
} else {
|
||||||
|
log_agent_event(agent_name, format_args!("no hook session in environment"));
|
||||||
|
}
|
||||||
|
|
||||||
let def = defs::get_def(agent_name)
|
let def = defs::get_def(agent_name)
|
||||||
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
|
.ok_or_else(|| format!("no .agent file for {}", agent_name))?;
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"definition loaded steps={} tools={:?} count={:?} priority={} bail={:?}",
|
||||||
|
def.steps.len(), def.tools, def.count, def.priority, def.bail));
|
||||||
|
|
||||||
// State dir for agent output files
|
// State dir for agent output files
|
||||||
let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
|
let state_dir = std::env::var("POC_AGENT_OUTPUT_DIR")
|
||||||
|
|
@ -392,6 +443,7 @@ pub async fn run_one_agent(
|
||||||
fs::create_dir_all(&state_dir)
|
fs::create_dir_all(&state_dir)
|
||||||
.map_err(|e| format!("create state dir: {}", e))?;
|
.map_err(|e| format!("create state dir: {}", e))?;
|
||||||
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); }
|
unsafe { std::env::set_var("POC_AGENT_OUTPUT_DIR", &state_dir); }
|
||||||
|
log_agent_event(agent_name, format_args!("state_dir={}", state_dir.display()));
|
||||||
|
|
||||||
// Build prompt batch — either from explicit keys or the agent's query
|
// Build prompt batch — either from explicit keys or the agent's query
|
||||||
let agent_batch = if let Some(keys) = keys {
|
let agent_batch = if let Some(keys) = keys {
|
||||||
|
|
@ -411,6 +463,8 @@ pub async fn run_one_agent(
|
||||||
prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }
|
prompts::AgentBatch { steps: resolved_steps, node_keys: all_keys }
|
||||||
} else {
|
} else {
|
||||||
let effective_count = def.count.unwrap_or(count);
|
let effective_count = def.count.unwrap_or(count);
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"resolving default prompt placeholders effective_count={}", effective_count));
|
||||||
defs::run_agent(&def, effective_count, &Default::default()).await?
|
defs::run_agent(&def, effective_count, &Default::default()).await?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -463,6 +517,14 @@ pub async fn run_one_agent(
|
||||||
})),
|
})),
|
||||||
});
|
});
|
||||||
let n_steps = agent_batch.steps.len();
|
let n_steps = agent_batch.steps.len();
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"prompt batch ready steps={} node_keys={}",
|
||||||
|
n_steps, agent_batch.node_keys.len()));
|
||||||
|
for (i, step) in agent_batch.steps.iter().enumerate() {
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"prompt step {}/{} phase={} bytes={}",
|
||||||
|
i + 1, n_steps, step.phase, step.prompt.len()));
|
||||||
|
}
|
||||||
|
|
||||||
// Guard: reject oversized first prompt
|
// Guard: reject oversized first prompt
|
||||||
let max_prompt_bytes = 800_000;
|
let max_prompt_bytes = 800_000;
|
||||||
|
|
@ -485,6 +547,9 @@ pub async fn run_one_agent(
|
||||||
let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect();
|
let phases: Vec<&str> = agent_batch.steps.iter().map(|s| s.phase.as_str()).collect();
|
||||||
dbglog!("[{}] {} step(s) {:?}, {}KB initial, {} nodes",
|
dbglog!("[{}] {} step(s) {:?}, {}KB initial, {} nodes",
|
||||||
agent_name, n_steps, phases, first_len / 1024, agent_batch.node_keys.len());
|
agent_name, n_steps, phases, first_len / 1024, agent_batch.node_keys.len());
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"tools enabled: {}",
|
||||||
|
effective_tools.iter().map(|t| t.name).collect::<Vec<_>>().join(", ")));
|
||||||
|
|
||||||
let prompts: Vec<String> = agent_batch.steps.iter()
|
let prompts: Vec<String> = agent_batch.steps.iter()
|
||||||
.map(|s| s.prompt.clone()).collect();
|
.map(|s| s.prompt.clone()).collect();
|
||||||
|
|
@ -497,18 +562,25 @@ pub async fn run_one_agent(
|
||||||
let bail_script = def.bail.as_ref().map(|name| defs::agents_dir().join(name));
|
let bail_script = def.bail.as_ref().map(|name| defs::agents_dir().join(name));
|
||||||
let state_dir_for_bail = state_dir.clone();
|
let state_dir_for_bail = state_dir.clone();
|
||||||
let our_pid = std::process::id();
|
let our_pid = std::process::id();
|
||||||
let our_pid_file = format!("pid-{}", our_pid);
|
let our_pid_file = std::env::var("POC_AGENT_PID_FILE")
|
||||||
|
.unwrap_or_else(|_| format!("pid-{}", our_pid));
|
||||||
let step_phases_for_bail = step_phases.clone();
|
let step_phases_for_bail = step_phases.clone();
|
||||||
let bail_fn = move |step_idx: usize| -> Result<(), String> {
|
let bail_fn = move |step_idx: usize| -> Result<(), String> {
|
||||||
if let Some(ref script) = bail_script {
|
if let Some(ref script) = bail_script {
|
||||||
let phase = step_phases_for_bail.get(step_idx)
|
let phase = step_phases_for_bail.get(step_idx)
|
||||||
.map(String::as_str).unwrap_or("");
|
.map(String::as_str).unwrap_or("");
|
||||||
|
eprintln!(
|
||||||
|
"[agent:bail] script={} state_dir={} pid_file={} phase={}",
|
||||||
|
script.display(), state_dir_for_bail.display(), our_pid_file, phase);
|
||||||
let status = std::process::Command::new(script)
|
let status = std::process::Command::new(script)
|
||||||
.arg(&our_pid_file)
|
.arg(&our_pid_file)
|
||||||
.arg(phase)
|
.arg(phase)
|
||||||
.current_dir(&state_dir_for_bail)
|
.current_dir(&state_dir_for_bail)
|
||||||
.status()
|
.status()
|
||||||
.map_err(|e| format!("bail script {:?} failed: {}", script, e))?;
|
.map_err(|e| format!("bail script {:?} failed: {}", script, e))?;
|
||||||
|
eprintln!(
|
||||||
|
"[agent:bail] script={} phase={} status={}",
|
||||||
|
script.display(), phase, status);
|
||||||
if !status.success() {
|
if !status.success() {
|
||||||
return Err(format!("bailed at step {}: {:?} exited {}",
|
return Err(format!("bailed at step {}: {:?} exited {}",
|
||||||
step_idx + 1, script.file_name().unwrap_or_default(),
|
step_idx + 1, script.file_name().unwrap_or_default(),
|
||||||
|
|
@ -521,6 +593,8 @@ pub async fn run_one_agent(
|
||||||
call_api_with_tools_sync(
|
call_api_with_tools_sync(
|
||||||
agent_name, &prompts, &step_phases, def.temperature, def.priority,
|
agent_name, &prompts, &step_phases, def.temperature, def.priority,
|
||||||
&effective_tools, Some(&bail_fn))?;
|
&effective_tools, Some(&bail_fn))?;
|
||||||
|
log_agent_event(agent_name, format_args!(
|
||||||
|
"run_one_agent completed in {:.2}s", run_start.elapsed().as_secs_f64()));
|
||||||
|
|
||||||
Ok(AgentResult {
|
Ok(AgentResult {
|
||||||
node_keys: agent_batch.node_keys,
|
node_keys: agent_batch.node_keys,
|
||||||
|
|
@ -598,6 +672,15 @@ pub fn spawn_agent(
|
||||||
agent_name: &str,
|
agent_name: &str,
|
||||||
state_dir: &std::path::Path,
|
state_dir: &std::path::Path,
|
||||||
session_id: &str,
|
session_id: &str,
|
||||||
|
) -> Option<SpawnResult> {
|
||||||
|
spawn_agent_with_transcript(agent_name, state_dir, session_id, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn spawn_agent_with_transcript(
|
||||||
|
agent_name: &str,
|
||||||
|
state_dir: &std::path::Path,
|
||||||
|
session_id: &str,
|
||||||
|
transcript_path: Option<&str>,
|
||||||
) -> Option<SpawnResult> {
|
) -> Option<SpawnResult> {
|
||||||
let def = defs::get_def(agent_name)?;
|
let def = defs::get_def(agent_name)?;
|
||||||
let first_phase = def.steps.first()
|
let first_phase = def.steps.first()
|
||||||
|
|
@ -608,17 +691,41 @@ pub fn spawn_agent(
|
||||||
.join(format!(".consciousness/logs/{}", agent_name));
|
.join(format!(".consciousness/logs/{}", agent_name));
|
||||||
fs::create_dir_all(&log_dir).ok();
|
fs::create_dir_all(&log_dir).ok();
|
||||||
let log_path = log_dir.join(format!("{}.log", store::compact_timestamp()));
|
let log_path = log_dir.join(format!("{}.log", store::compact_timestamp()));
|
||||||
let agent_log = fs::File::create(&log_path)
|
let mut agent_log = fs::File::create(&log_path)
|
||||||
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
|
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
|
||||||
|
|
||||||
let child = std::process::Command::new("poc-memory")
|
let mut cmd = std::process::Command::new("bash");
|
||||||
.args(["agent", "run", agent_name, "--count", "1", "--local",
|
cmd.args([
|
||||||
"--state-dir", &state_dir.to_string_lossy()])
|
"-lc",
|
||||||
.env("POC_SESSION_ID", session_id)
|
r#"
|
||||||
.stdout(agent_log.try_clone().unwrap_or_else(|_| fs::File::create("/dev/null").unwrap()))
|
set +e
|
||||||
.stderr(agent_log)
|
export POC_AGENT_PID_FILE="pid-$$"
|
||||||
.spawn()
|
"$@"
|
||||||
.ok()?;
|
status=$?
|
||||||
|
printf '=== agent process exit status: %s at %s ===\n' "$status" "$(date --iso-8601=seconds)"
|
||||||
|
exit "$status"
|
||||||
|
"#,
|
||||||
|
"poc-memory-agent-wrapper",
|
||||||
|
"poc-memory", "agent", "run", agent_name, "--count", "1", "--local",
|
||||||
|
"--state-dir", &state_dir.to_string_lossy(),
|
||||||
|
]).env("POC_SESSION_ID", session_id);
|
||||||
|
if let Some(path) = transcript_path.filter(|p| !p.is_empty()) {
|
||||||
|
cmd.env("POC_TRANSCRIPT_PATH", path);
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = writeln!(agent_log, "=== spawn {} ===", chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"));
|
||||||
|
let _ = writeln!(agent_log, "agent={agent_name}");
|
||||||
|
let _ = writeln!(agent_log, "state_dir={}", state_dir.display());
|
||||||
|
let _ = writeln!(agent_log, "session_id={session_id}");
|
||||||
|
let _ = writeln!(agent_log, "transcript_path={}", transcript_path.unwrap_or(""));
|
||||||
|
let _ = writeln!(agent_log, "first_phase={first_phase}");
|
||||||
|
let _ = writeln!(agent_log, "command=poc-memory agent run {agent_name} --count 1 --local --state-dir {}", state_dir.display());
|
||||||
|
let _ = agent_log.flush();
|
||||||
|
|
||||||
|
let child_stdout = agent_log.try_clone()
|
||||||
|
.unwrap_or_else(|_| fs::File::create("/dev/null").unwrap());
|
||||||
|
let child_stderr = agent_log;
|
||||||
|
let child = cmd.stdout(child_stdout).stderr(child_stderr).spawn().ok()?;
|
||||||
|
|
||||||
let pid = child.id();
|
let pid = child.id();
|
||||||
let pid_path = state_dir.join(format!("pid-{}", pid));
|
let pid_path = state_dir.join(format!("pid-{}", pid));
|
||||||
|
|
|
||||||
|
|
@ -33,16 +33,17 @@ fn get() -> Option<&'static Tokenizer> {
|
||||||
TOKENIZER.get()
|
TOKENIZER.get()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn expect_tokenizer() -> &'static Tokenizer {
|
||||||
|
get().expect("tokenizer not initialized; expected ~/.consciousness/tokenizer-qwen35.json")
|
||||||
|
}
|
||||||
|
|
||||||
/// Tokenize a raw string, returning token IDs.
|
/// Tokenize a raw string, returning token IDs.
|
||||||
/// Returns empty vec if the tokenizer is not initialized.
|
|
||||||
pub fn encode(text: &str) -> Vec<u32> {
|
pub fn encode(text: &str) -> Vec<u32> {
|
||||||
match get() {
|
expect_tokenizer()
|
||||||
Some(t) => t.encode(text, false)
|
.encode(text, false)
|
||||||
.unwrap_or_else(|e| panic!("tokenization failed: {}", e))
|
.unwrap_or_else(|e| panic!("tokenization failed: {}", e))
|
||||||
.get_ids()
|
.get_ids()
|
||||||
.to_vec(),
|
.to_vec()
|
||||||
None => vec![],
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tokenize a chat entry with template wrapping:
|
/// Tokenize a chat entry with template wrapping:
|
||||||
|
|
@ -66,15 +67,12 @@ pub fn count(text: &str) -> usize {
|
||||||
|
|
||||||
/// Decode token IDs back to text.
|
/// Decode token IDs back to text.
|
||||||
pub fn decode(ids: &[u32]) -> String {
|
pub fn decode(ids: &[u32]) -> String {
|
||||||
match get() {
|
expect_tokenizer()
|
||||||
Some(t) => t.decode(ids, true)
|
.decode(ids, true)
|
||||||
.unwrap_or_else(|e| panic!("detokenization failed: {}", e)),
|
.unwrap_or_else(|e| panic!("detokenization failed: {}", e))
|
||||||
None => String::new(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if the tokenizer is initialized.
|
/// Check if the tokenizer is initialized.
|
||||||
pub fn is_initialized() -> bool {
|
pub fn is_initialized() -> bool {
|
||||||
TOKENIZER.get().is_some()
|
TOKENIZER.get().is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -209,7 +209,24 @@ memory_tool!(graph_trace, ref, key: [str]);
|
||||||
|
|
||||||
// ── Definitions ────────────────────────────────────────────────
|
// ── Definitions ────────────────────────────────────────────────
|
||||||
|
|
||||||
pub fn memory_tools() -> [super::Tool; 20] {
|
async fn jsonargs_memory_new(agent: &Option<std::sync::Arc<crate::agent::Agent>>, args: &serde_json::Value) -> Result<String> {
|
||||||
|
jsonargs_memory_write(agent, args).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn jsonargs_memory_link(agent: &Option<std::sync::Arc<crate::agent::Agent>>, args: &serde_json::Value) -> Result<String> {
|
||||||
|
let source = get_str(args, "source")?;
|
||||||
|
let target = get_str(args, "target")?;
|
||||||
|
if args.get("strength").and_then(|v| v.as_f64()).is_some() {
|
||||||
|
jsonargs_memory_link_set(agent, args).await
|
||||||
|
} else {
|
||||||
|
jsonargs_memory_link_add(agent, &serde_json::json!({
|
||||||
|
"source": source,
|
||||||
|
"target": target,
|
||||||
|
})).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn memory_tools() -> [super::Tool; 22] {
|
||||||
use super::Tool;
|
use super::Tool;
|
||||||
macro_rules! tool {
|
macro_rules! tool {
|
||||||
($name:ident, $desc:expr, $params:expr) => {
|
($name:ident, $desc:expr, $params:expr) => {
|
||||||
|
|
@ -234,6 +251,11 @@ pub fn memory_tools() -> [super::Tool; 20] {
|
||||||
"properties": { "key": {"type": "string"}, "content": {"type": "string"} },
|
"properties": { "key": {"type": "string"}, "content": {"type": "string"} },
|
||||||
"required": ["key", "content"]
|
"required": ["key", "content"]
|
||||||
}"#),
|
}"#),
|
||||||
|
tool!(memory_new, "Create or update a memory node. Alias for memory_write.", r#"{
|
||||||
|
"type": "object",
|
||||||
|
"properties": { "key": {"type": "string"}, "content": {"type": "string"} },
|
||||||
|
"required": ["key", "content"]
|
||||||
|
}"#),
|
||||||
tool!(memory_search, "Search via spreading activation from seed keys.", r#"{
|
tool!(memory_search, "Search via spreading activation from seed keys.", r#"{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {
|
"properties": {
|
||||||
|
|
@ -264,6 +286,16 @@ pub fn memory_tools() -> [super::Tool; 20] {
|
||||||
"properties": { "source": {"type": "string"}, "target": {"type": "string"} },
|
"properties": { "source": {"type": "string"}, "target": {"type": "string"} },
|
||||||
"required": ["source", "target"]
|
"required": ["source", "target"]
|
||||||
}"#),
|
}"#),
|
||||||
|
tool!(memory_link, "Add or update a link between two memory nodes. Alias for memory_link_add/memory_link_set.", r#"{
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"source": {"type": "string"},
|
||||||
|
"target": {"type": "string"},
|
||||||
|
"strength": {"type": "number", "description": "Optional; 0.01 to 1.0"},
|
||||||
|
"label": {"type": "string", "description": "Accepted for compatibility; currently ignored"}
|
||||||
|
},
|
||||||
|
"required": ["source", "target"]
|
||||||
|
}"#),
|
||||||
tool!(memory_delete, "Soft-delete a node.", r#"{
|
tool!(memory_delete, "Soft-delete a node.", r#"{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": { "key": {"type": "string"} },
|
"properties": { "key": {"type": "string"} },
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,13 @@
|
||||||
|
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use crate::hippocampus as memory;
|
use crate::hippocampus as memory;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
pub async fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<()> {
|
pub async fn cmd_run_agent(agent: &str, count: usize, target: &[String], query: Option<&str>, dry_run: bool, _local: bool, state_dir: Option<&str>) -> Result<()> {
|
||||||
|
let start = Instant::now();
|
||||||
|
eprintln!(
|
||||||
|
"[agent-cli] start agent={} count={} targets={} query={:?} dry_run={} local={} state_dir={:?} pid={}",
|
||||||
|
agent, count, target.len(), query, dry_run, _local, state_dir, std::process::id());
|
||||||
// Mark as agent so tool calls (e.g. poc-memory render) don't
|
// Mark as agent so tool calls (e.g. poc-memory render) don't
|
||||||
// pollute the user's seen set as a side effect
|
// pollute the user's seen set as a side effect
|
||||||
// SAFETY: single-threaded at this point (CLI startup, before any agent work)
|
// SAFETY: single-threaded at this point (CLI startup, before any agent work)
|
||||||
|
|
@ -45,14 +50,19 @@ pub async fn cmd_run_agent(agent: &str, count: usize, target: &[String], query:
|
||||||
if let Err(e) = crate::agent::oneshot::run_one_agent(
|
if let Err(e) = crate::agent::oneshot::run_one_agent(
|
||||||
agent, count, Some(&[key.clone()]),
|
agent, count, Some(&[key.clone()]),
|
||||||
).await {
|
).await {
|
||||||
|
eprintln!("[agent-cli] ERROR agent={} target={} error={}", agent, key, e);
|
||||||
println!("[{}] ERROR on {}: {}", agent, key, e);
|
println!("[{}] ERROR on {}: {}", agent, key, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
crate::agent::oneshot::run_one_agent(
|
if let Err(e) = crate::agent::oneshot::run_one_agent(
|
||||||
agent, count, None,
|
agent, count, None,
|
||||||
).await.map_err(|e| anyhow::anyhow!("{}", e))?;
|
).await {
|
||||||
|
eprintln!("[agent-cli] ERROR agent={} error={}", agent, e);
|
||||||
|
return Err(anyhow::anyhow!("{}", e));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
eprintln!("[agent-cli] done agent={} elapsed={:.2}s",
|
||||||
|
agent, start.elapsed().as_secs_f64());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,12 @@ impl HookSession {
|
||||||
|
|
||||||
/// Load from POC_SESSION_ID environment variable
|
/// Load from POC_SESSION_ID environment variable
|
||||||
pub fn from_env() -> Option<Self> {
|
pub fn from_env() -> Option<Self> {
|
||||||
Self::from_id(std::env::var("POC_SESSION_ID").ok()?)
|
let session_id = std::env::var("POC_SESSION_ID").ok()?;
|
||||||
|
let mut session = Self::from_id(session_id)?;
|
||||||
|
if let Ok(path) = std::env::var("POC_TRANSCRIPT_PATH") {
|
||||||
|
session.transcript_path = path;
|
||||||
|
}
|
||||||
|
Some(session)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the seen set for this session
|
/// Get the seen set for this session
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
#!/bin/bash
|
#!/usr/bin/env bash
|
||||||
# Bail if another agent is in the same phase-group as us.
|
# Bail if another agent is in the same phase-group as us.
|
||||||
#
|
#
|
||||||
# $1 = our pid file name (e.g. "pid-12345")
|
# $1 = our pid file name (e.g. "pid-12345")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue