From 85302c11d4d013541efd3e08e71c21db0ff32a19 Mon Sep 17 00:00:00 2001 From: ProofOfConcept Date: Fri, 27 Mar 2026 15:11:17 -0400 Subject: [PATCH] provenance: track agent phase, use task_local + thread_local Split TASK_PROVENANCE into TASK_AGENT (task_local, set once per agent run) and TASK_PHASE (thread_local, updated between steps). Provenance now reports "agent:surface-observe:observe" instead of just "agent:surface-observe", making it possible to identify which pipeline phase created a node. Priority: task_local agent + thread_local phase > POC_PROVENANCE env var > "manual". Also includes memory_search catchup throttle and pipelining fixes from the surface-observe refactor. --- src/hippocampus/memory_search.rs | 84 +++++++++++++++++--------------- src/hippocampus/store/mod.rs | 2 +- src/hippocampus/store/ops.rs | 34 +++++++++---- src/subconscious/api.rs | 4 +- src/subconscious/knowledge.rs | 8 ++- 5 files changed, 79 insertions(+), 53 deletions(-) diff --git a/src/hippocampus/memory_search.rs b/src/hippocampus/memory_search.rs index ce71326..672556d 100644 --- a/src/hippocampus/memory_search.rs +++ b/src/hippocampus/memory_search.rs @@ -137,6 +137,12 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) .join("surface-observe"); fs::create_dir_all(&state_dir).ok(); + let transcript = session.transcript(); + let offset_path = state_dir.join("transcript-offset"); + let last_offset: u64 = fs::read_to_string(&offset_path).ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + let timeout = crate::config::get() .surface_timeout_secs .unwrap_or(300) as u64; @@ -145,8 +151,6 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) for (phase, pid) in &live { let _ = writeln!(log_f, "alive pid-{}: phase={}", pid, phase); } - let any_in_surface = live.iter().any(|(p, _)| p == "surface" || p == "step-0"); - // Read surface output and inject into context let surface_path = state_dir.join("surface"); @@ -183,55 +187,50 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) fs::remove_file(&surface_path).ok(); } - // If the agent is significantly behind, wait for it to finish. - // This prevents the agent from falling behind during heavy reading - // (studying, reading a book, etc.) - let conversation_budget: u64 = 50_000; - let offset_path = state_dir.join("transcript-offset"); - let transcript = session.transcript(); - let transcript_size = transcript.size; - - if !live.is_empty() && transcript_size > 0 { - let last_offset: u64 = fs::read_to_string(&offset_path).ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - let behind = transcript_size.saturating_sub(last_offset); - - if behind > conversation_budget / 2 { - // Wait up to 30s for the current agent to finish - let sleep_start = Instant::now(); - - for _ in 0..30 { - std::thread::sleep(std::time::Duration::from_secs(1)); - let still_live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); - if still_live.is_empty() { break; } - } - - let sleep_duration = (Instant::now() - sleep_start).as_secs(); - - let _ = writeln!(log_f, "agent {}KB behind (budget {}KB), slept for {sleep_duration} seconds", - behind / 1024, conversation_budget / 1024); - out.push_str(&format!("Slept for {sleep_duration} seconds to let observe catch up\n")); - } - } - // Start a new agent if: // - nothing running, OR // - something running but past surface phase (pipelining) let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); - let any_in_surface = live.iter().any(|(p, _)| p == "surface" || p == "step-0"); + let any_in_surface = live.iter().any(|(p, _)| p == "surface"); if any_in_surface { let _ = writeln!(log_f, "agent in surface phase (have {:?}), waiting", live); } else { // Record transcript offset so we can detect falling behind - if transcript_size > 0 { - fs::write(&offset_path, transcript_size.to_string()).ok(); + if transcript.size > 0 { + fs::write(&offset_path, transcript.size.to_string()).ok(); } let pid = crate::agents::knowledge::spawn_agent( "surface-observe", &state_dir, &session.session_id); let _ = writeln!(log_f, "spawned agent {:?}, have {:?}", pid, live); } + + // If the agent is significantly behind, wait for it to finish. + // This prevents the agent from falling behind during heavy reading + // (studying, reading a book, etc.) + let conversation_budget: u64 = 50_000; + + if !live.is_empty() && transcript.size > 0 { + let behind = transcript.size.saturating_sub(last_offset); + + if behind > conversation_budget / 2 { + // Wait up to 5s for the current agent to finish + let sleep_start = Instant::now(); + let _ = write!(log_f, "agent {}KB behind (budget {}", + behind / 1024, conversation_budget / 1024); + + for _ in 0..5 { + std::thread::sleep(std::time::Duration::from_secs(1)); + let still_live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); + if still_live.is_empty() { break; } + } + + let sleep_secs = (Instant::now() - sleep_start).as_secs_f64(); + + let _ = writeln!(log_f, ", slept {sleep_secs:.2}s"); + out.push_str(&format!("Slept {sleep_secs:.2}s to let observe catch up\n")); + } + } } /// Run the reflection agent on a slower cadence — every 100KB of transcript. @@ -246,14 +245,13 @@ fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) { // Check transcript growth since last reflection let offset_path = state_dir.join("transcript-offset"); let transcript = session.transcript(); - let transcript_size = transcript.size; let last_offset: u64 = fs::read_to_string(&offset_path).ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0); const REFLECTION_INTERVAL: u64 = 100_000; - if transcript_size.saturating_sub(last_offset) < REFLECTION_INTERVAL { + if transcript.size.saturating_sub(last_offset) < REFLECTION_INTERVAL { return; } @@ -283,7 +281,7 @@ fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) { fs::remove_file(state_dir.join("reflection")).ok(); } - fs::write(&offset_path, transcript_size.to_string()).ok(); + fs::write(&offset_path, transcript.size.to_string()).ok(); let pid = crate::agents::knowledge::spawn_agent( "reflect", &state_dir, &session.session_id); let _ = writeln!(log_f, "reflect: spawned {:?}", pid); @@ -307,6 +305,8 @@ fn cleanup_stale_files(dir: &Path, max_age: Duration) { } fn hook(session: &Session) -> String { + let start_time = Instant::now(); + let mut out = String::new(); let is_compaction = crate::transcript::detect_new_compaction( &session.state_dir, &session.session_id, &session.transcript_path, @@ -373,5 +373,9 @@ fn hook(session: &Session) -> String { cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); let _ = write!(log_f, "{}", out); + + let duration = (Instant::now() - start_time).as_secs_f64(); + let _ = writeln!(log_f, "\nran in {duration:.2}s"); + out } diff --git a/src/hippocampus/store/mod.rs b/src/hippocampus/store/mod.rs index 9085e1d..0862b3f 100644 --- a/src/hippocampus/store/mod.rs +++ b/src/hippocampus/store/mod.rs @@ -37,7 +37,7 @@ pub use parse::{MemoryUnit, parse_units}; pub use view::{StoreView, AnyView}; pub use persist::fsck; pub use persist::strip_md_keys; -pub use ops::TASK_PROVENANCE; +pub use ops::{TASK_AGENT, set_phase}; use crate::graph::{self, Graph}; diff --git a/src/hippocampus/store/ops.rs b/src/hippocampus/store/ops.rs index 463ec2e..0844933 100644 --- a/src/hippocampus/store/ops.rs +++ b/src/hippocampus/store/ops.rs @@ -8,17 +8,33 @@ use super::types::*; use std::collections::{HashMap, HashSet}; tokio::task_local! { - /// Task-scoped provenance for agent writes. Set by the daemon before - /// running an agent's tool calls, so all writes within that task are - /// automatically attributed to the agent. - pub static TASK_PROVENANCE: String; + /// Task-scoped agent name for provenance. Set before running an agent's + /// tool calls, so all writes within that task are attributed to the agent. + pub static TASK_AGENT: String; } -/// Provenance priority: task_local (agent context) > env var > "manual". -fn current_provenance() -> String { - TASK_PROVENANCE.try_with(|p| p.clone()) - .or_else(|_| std::env::var("POC_PROVENANCE").map_err(|_| ())) - .unwrap_or_else(|_| "manual".to_string()) +thread_local! { + /// Current phase within a multi-step agent. Updated by the bail function + /// between steps. Combined with TASK_AGENT to form the full provenance. + static TASK_PHASE: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; +} + +/// Set the current phase (called from bail function between steps). +pub fn set_phase(phase: &str) { + TASK_PHASE.with(|p| *p.borrow_mut() = Some(phase.to_string())); +} + +/// Get the full provenance string: "agent:phase" or "agent" or env var or "manual". +pub fn current_provenance() -> String { + let agent = TASK_AGENT.try_with(|a| a.clone()).ok(); + let phase = TASK_PHASE.with(|p| p.borrow().clone()); + + match (agent, phase) { + (Some(a), Some(p)) => format!("{}:{}", a, p), + (Some(a), None) => a, + _ => std::env::var("POC_PROVENANCE") + .unwrap_or_else(|_| "manual".to_string()), + } } impl Store { diff --git a/src/subconscious/api.rs b/src/subconscious/api.rs index d8810f6..b5c225d 100644 --- a/src/subconscious/api.rs +++ b/src/subconscious/api.rs @@ -237,9 +237,9 @@ pub fn call_api_with_tools_sync( .enable_all() .build() .map_err(|e| format!("tokio runtime: {}", e))?; - let prov = format!("agent:{}", agent); + let agent_name = format!("agent:{}", agent); rt.block_on( - crate::store::TASK_PROVENANCE.scope(prov, + crate::store::TASK_AGENT.scope(agent_name, call_api_with_tools(agent, prompts, temperature, bail_fn, log)) ) }).join().unwrap() diff --git a/src/subconscious/knowledge.rs b/src/subconscious/knowledge.rs index 19985a9..9af4ea7 100644 --- a/src/subconscious/knowledge.rs +++ b/src/subconscious/knowledge.rs @@ -339,6 +339,11 @@ fn run_one_agent_inner( } log("\n=== CALLING LLM ==="); + // Set initial phase for provenance tracking + if let Some(first_phase) = step_phases.first() { + crate::store::set_phase(first_phase); + } + // Bail check: if the agent defines a bail script, run it between steps. // The script receives the pid file path as $1, cwd = state dir. let bail_script = def.bail.as_ref().map(|name| { @@ -349,9 +354,10 @@ fn run_one_agent_inner( let state_dir_for_bail = state_dir.clone(); let pid_path_for_bail = pid_path.clone(); let bail_fn = move |step_idx: usize| -> Result<(), String> { - // Update phase + // Update phase in pid file and provenance tracking if step_idx < step_phases.len() { write_pid(&step_phases[step_idx]); + crate::store::set_phase(&step_phases[step_idx]); } // Run bail script if defined if let Some(ref script) = bail_script {