provenance: track agent phase, use task_local + thread_local

Split TASK_PROVENANCE into TASK_AGENT (task_local, set once per agent
run) and TASK_PHASE (thread_local, updated between steps). Provenance
now reports "agent:surface-observe:observe" instead of just
"agent:surface-observe", making it possible to identify which pipeline
phase created a node.

Priority: task_local agent + thread_local phase > POC_PROVENANCE env
var > "manual".

Also includes memory_search catchup throttle and pipelining fixes
from the surface-observe refactor.
This commit is contained in:
ProofOfConcept 2026-03-27 15:11:17 -04:00
parent b1efdf0b9a
commit 85302c11d4
5 changed files with 79 additions and 53 deletions

View file

@ -137,6 +137,12 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File)
.join("surface-observe"); .join("surface-observe");
fs::create_dir_all(&state_dir).ok(); fs::create_dir_all(&state_dir).ok();
let transcript = session.transcript();
let offset_path = state_dir.join("transcript-offset");
let last_offset: u64 = fs::read_to_string(&offset_path).ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
let timeout = crate::config::get() let timeout = crate::config::get()
.surface_timeout_secs .surface_timeout_secs
.unwrap_or(300) as u64; .unwrap_or(300) as u64;
@ -145,8 +151,6 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File)
for (phase, pid) in &live { for (phase, pid) in &live {
let _ = writeln!(log_f, "alive pid-{}: phase={}", pid, phase); let _ = writeln!(log_f, "alive pid-{}: phase={}", pid, phase);
} }
let any_in_surface = live.iter().any(|(p, _)| p == "surface" || p == "step-0");
// Read surface output and inject into context // Read surface output and inject into context
let surface_path = state_dir.join("surface"); let surface_path = state_dir.join("surface");
@ -183,55 +187,50 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File)
fs::remove_file(&surface_path).ok(); fs::remove_file(&surface_path).ok();
} }
// If the agent is significantly behind, wait for it to finish.
// This prevents the agent from falling behind during heavy reading
// (studying, reading a book, etc.)
let conversation_budget: u64 = 50_000;
let offset_path = state_dir.join("transcript-offset");
let transcript = session.transcript();
let transcript_size = transcript.size;
if !live.is_empty() && transcript_size > 0 {
let last_offset: u64 = fs::read_to_string(&offset_path).ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
let behind = transcript_size.saturating_sub(last_offset);
if behind > conversation_budget / 2 {
// Wait up to 30s for the current agent to finish
let sleep_start = Instant::now();
for _ in 0..30 {
std::thread::sleep(std::time::Duration::from_secs(1));
let still_live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout);
if still_live.is_empty() { break; }
}
let sleep_duration = (Instant::now() - sleep_start).as_secs();
let _ = writeln!(log_f, "agent {}KB behind (budget {}KB), slept for {sleep_duration} seconds",
behind / 1024, conversation_budget / 1024);
out.push_str(&format!("Slept for {sleep_duration} seconds to let observe catch up\n"));
}
}
// Start a new agent if: // Start a new agent if:
// - nothing running, OR // - nothing running, OR
// - something running but past surface phase (pipelining) // - something running but past surface phase (pipelining)
let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout);
let any_in_surface = live.iter().any(|(p, _)| p == "surface" || p == "step-0"); let any_in_surface = live.iter().any(|(p, _)| p == "surface");
if any_in_surface { if any_in_surface {
let _ = writeln!(log_f, "agent in surface phase (have {:?}), waiting", live); let _ = writeln!(log_f, "agent in surface phase (have {:?}), waiting", live);
} else { } else {
// Record transcript offset so we can detect falling behind // Record transcript offset so we can detect falling behind
if transcript_size > 0 { if transcript.size > 0 {
fs::write(&offset_path, transcript_size.to_string()).ok(); fs::write(&offset_path, transcript.size.to_string()).ok();
} }
let pid = crate::agents::knowledge::spawn_agent( let pid = crate::agents::knowledge::spawn_agent(
"surface-observe", &state_dir, &session.session_id); "surface-observe", &state_dir, &session.session_id);
let _ = writeln!(log_f, "spawned agent {:?}, have {:?}", pid, live); let _ = writeln!(log_f, "spawned agent {:?}, have {:?}", pid, live);
} }
// If the agent is significantly behind, wait for it to finish.
// This prevents the agent from falling behind during heavy reading
// (studying, reading a book, etc.)
let conversation_budget: u64 = 50_000;
if !live.is_empty() && transcript.size > 0 {
let behind = transcript.size.saturating_sub(last_offset);
if behind > conversation_budget / 2 {
// Wait up to 5s for the current agent to finish
let sleep_start = Instant::now();
let _ = write!(log_f, "agent {}KB behind (budget {}",
behind / 1024, conversation_budget / 1024);
for _ in 0..5 {
std::thread::sleep(std::time::Duration::from_secs(1));
let still_live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout);
if still_live.is_empty() { break; }
}
let sleep_secs = (Instant::now() - sleep_start).as_secs_f64();
let _ = writeln!(log_f, ", slept {sleep_secs:.2}s");
out.push_str(&format!("Slept {sleep_secs:.2}s to let observe catch up\n"));
}
}
} }
/// Run the reflection agent on a slower cadence — every 100KB of transcript. /// Run the reflection agent on a slower cadence — every 100KB of transcript.
@ -246,14 +245,13 @@ fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) {
// Check transcript growth since last reflection // Check transcript growth since last reflection
let offset_path = state_dir.join("transcript-offset"); let offset_path = state_dir.join("transcript-offset");
let transcript = session.transcript(); let transcript = session.transcript();
let transcript_size = transcript.size;
let last_offset: u64 = fs::read_to_string(&offset_path).ok() let last_offset: u64 = fs::read_to_string(&offset_path).ok()
.and_then(|s| s.trim().parse().ok()) .and_then(|s| s.trim().parse().ok())
.unwrap_or(0); .unwrap_or(0);
const REFLECTION_INTERVAL: u64 = 100_000; const REFLECTION_INTERVAL: u64 = 100_000;
if transcript_size.saturating_sub(last_offset) < REFLECTION_INTERVAL { if transcript.size.saturating_sub(last_offset) < REFLECTION_INTERVAL {
return; return;
} }
@ -283,7 +281,7 @@ fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) {
fs::remove_file(state_dir.join("reflection")).ok(); fs::remove_file(state_dir.join("reflection")).ok();
} }
fs::write(&offset_path, transcript_size.to_string()).ok(); fs::write(&offset_path, transcript.size.to_string()).ok();
let pid = crate::agents::knowledge::spawn_agent( let pid = crate::agents::knowledge::spawn_agent(
"reflect", &state_dir, &session.session_id); "reflect", &state_dir, &session.session_id);
let _ = writeln!(log_f, "reflect: spawned {:?}", pid); let _ = writeln!(log_f, "reflect: spawned {:?}", pid);
@ -307,6 +305,8 @@ fn cleanup_stale_files(dir: &Path, max_age: Duration) {
} }
fn hook(session: &Session) -> String { fn hook(session: &Session) -> String {
let start_time = Instant::now();
let mut out = String::new(); let mut out = String::new();
let is_compaction = crate::transcript::detect_new_compaction( let is_compaction = crate::transcript::detect_new_compaction(
&session.state_dir, &session.session_id, &session.transcript_path, &session.state_dir, &session.session_id, &session.transcript_path,
@ -373,5 +373,9 @@ fn hook(session: &Session) -> String {
cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); cleanup_stale_files(&session.state_dir, Duration::from_secs(86400));
let _ = write!(log_f, "{}", out); let _ = write!(log_f, "{}", out);
let duration = (Instant::now() - start_time).as_secs_f64();
let _ = writeln!(log_f, "\nran in {duration:.2}s");
out out
} }

View file

@ -37,7 +37,7 @@ pub use parse::{MemoryUnit, parse_units};
pub use view::{StoreView, AnyView}; pub use view::{StoreView, AnyView};
pub use persist::fsck; pub use persist::fsck;
pub use persist::strip_md_keys; pub use persist::strip_md_keys;
pub use ops::TASK_PROVENANCE; pub use ops::{TASK_AGENT, set_phase};
use crate::graph::{self, Graph}; use crate::graph::{self, Graph};

View file

@ -8,17 +8,33 @@ use super::types::*;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
tokio::task_local! { tokio::task_local! {
/// Task-scoped provenance for agent writes. Set by the daemon before /// Task-scoped agent name for provenance. Set before running an agent's
/// running an agent's tool calls, so all writes within that task are /// tool calls, so all writes within that task are attributed to the agent.
/// automatically attributed to the agent. pub static TASK_AGENT: String;
pub static TASK_PROVENANCE: String;
} }
/// Provenance priority: task_local (agent context) > env var > "manual". thread_local! {
fn current_provenance() -> String { /// Current phase within a multi-step agent. Updated by the bail function
TASK_PROVENANCE.try_with(|p| p.clone()) /// between steps. Combined with TASK_AGENT to form the full provenance.
.or_else(|_| std::env::var("POC_PROVENANCE").map_err(|_| ())) static TASK_PHASE: std::cell::RefCell<Option<String>> = const { std::cell::RefCell::new(None) };
.unwrap_or_else(|_| "manual".to_string()) }
/// Set the current phase (called from bail function between steps).
pub fn set_phase(phase: &str) {
TASK_PHASE.with(|p| *p.borrow_mut() = Some(phase.to_string()));
}
/// Get the full provenance string: "agent:phase" or "agent" or env var or "manual".
pub fn current_provenance() -> String {
let agent = TASK_AGENT.try_with(|a| a.clone()).ok();
let phase = TASK_PHASE.with(|p| p.borrow().clone());
match (agent, phase) {
(Some(a), Some(p)) => format!("{}:{}", a, p),
(Some(a), None) => a,
_ => std::env::var("POC_PROVENANCE")
.unwrap_or_else(|_| "manual".to_string()),
}
} }
impl Store { impl Store {

View file

@ -237,9 +237,9 @@ pub fn call_api_with_tools_sync(
.enable_all() .enable_all()
.build() .build()
.map_err(|e| format!("tokio runtime: {}", e))?; .map_err(|e| format!("tokio runtime: {}", e))?;
let prov = format!("agent:{}", agent); let agent_name = format!("agent:{}", agent);
rt.block_on( rt.block_on(
crate::store::TASK_PROVENANCE.scope(prov, crate::store::TASK_AGENT.scope(agent_name,
call_api_with_tools(agent, prompts, temperature, bail_fn, log)) call_api_with_tools(agent, prompts, temperature, bail_fn, log))
) )
}).join().unwrap() }).join().unwrap()

View file

@ -339,6 +339,11 @@ fn run_one_agent_inner(
} }
log("\n=== CALLING LLM ==="); log("\n=== CALLING LLM ===");
// Set initial phase for provenance tracking
if let Some(first_phase) = step_phases.first() {
crate::store::set_phase(first_phase);
}
// Bail check: if the agent defines a bail script, run it between steps. // Bail check: if the agent defines a bail script, run it between steps.
// The script receives the pid file path as $1, cwd = state dir. // The script receives the pid file path as $1, cwd = state dir.
let bail_script = def.bail.as_ref().map(|name| { let bail_script = def.bail.as_ref().map(|name| {
@ -349,9 +354,10 @@ fn run_one_agent_inner(
let state_dir_for_bail = state_dir.clone(); let state_dir_for_bail = state_dir.clone();
let pid_path_for_bail = pid_path.clone(); let pid_path_for_bail = pid_path.clone();
let bail_fn = move |step_idx: usize| -> Result<(), String> { let bail_fn = move |step_idx: usize| -> Result<(), String> {
// Update phase // Update phase in pid file and provenance tracking
if step_idx < step_phases.len() { if step_idx < step_phases.len() {
write_pid(&step_phases[step_idx]); write_pid(&step_phases[step_idx]);
crate::store::set_phase(&step_phases[step_idx]);
} }
// Run bail script if defined // Run bail script if defined
if let Some(ref script) = bail_script { if let Some(ref script) = bail_script {