hook: catchup throttle and reflection agent
Catchup throttle: when the agent is >50% behind the conversation
window (>25KB of transcript growth since last spawn), block and
wait up to 30s for the current agent to finish. Prevents the agent
from falling behind during heavy reading/studying.
Reflection agent: runs every 100KB of transcript growth. Reads
walked nodes from surface-observe, follows links in unexpected
directions, outputs a short dreamy insight. Previous reflections
are injected into the conversation context.
Updated reflect.agent prompt to use {{input:walked}} from
surface-observe state dir and {{conversation:20000}} for lighter
context.
Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
parent
27861a44e5
commit
8ccc30d97e
2 changed files with 119 additions and 25 deletions
|
|
@ -183,18 +183,108 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File)
|
|||
fs::remove_file(&surface_path).ok();
|
||||
}
|
||||
|
||||
// If the agent is significantly behind, wait for it to finish.
|
||||
// This prevents the agent from falling behind during heavy reading
|
||||
// (studying, reading a book, etc.)
|
||||
let conversation_budget: u64 = 50_000;
|
||||
let offset_path = state_dir.join("transcript-offset");
|
||||
let transcript_size = if !session.transcript_path.is_empty() {
|
||||
fs::metadata(&session.transcript_path).map(|m| m.len()).unwrap_or(0)
|
||||
} else { 0 };
|
||||
|
||||
if !live.is_empty() && transcript_size > 0 {
|
||||
let last_offset: u64 = fs::read_to_string(&offset_path).ok()
|
||||
.and_then(|s| s.trim().parse().ok())
|
||||
.unwrap_or(0);
|
||||
let behind = transcript_size.saturating_sub(last_offset);
|
||||
|
||||
if behind > conversation_budget / 2 {
|
||||
let _ = writeln!(log_f, "agent {}KB behind (budget {}KB), waiting for catchup",
|
||||
behind / 1024, conversation_budget / 1024);
|
||||
// Wait up to 30s for the current agent to finish
|
||||
for _ in 0..30 {
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
let still_live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout);
|
||||
if still_live.is_empty() { break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start a new agent if:
|
||||
// - nothing running, OR
|
||||
// - something running but past surface phase (pipelining)
|
||||
let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout);
|
||||
let any_in_surface = live.iter().any(|(p, _)| p == "surface" || p == "step-0");
|
||||
|
||||
if any_in_surface {
|
||||
let _ = writeln!(log_f, "agent in surface phase (have {:?}), waiting", live);
|
||||
} else {
|
||||
// Record transcript offset so we can detect falling behind
|
||||
if transcript_size > 0 {
|
||||
fs::write(&offset_path, transcript_size.to_string()).ok();
|
||||
}
|
||||
let pid = crate::agents::knowledge::spawn_agent(
|
||||
"surface-observe", &state_dir, &session.session_id);
|
||||
let _ = writeln!(log_f, "spawned agent {:?}, have {:?}", pid, live);
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the reflection agent on a slower cadence — every 100KB of transcript.
|
||||
/// Uses the surface-observe state dir to read walked nodes and write reflections.
|
||||
/// Reflections are injected into the conversation context.
|
||||
fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) {
|
||||
let state_dir = crate::store::memory_dir()
|
||||
.join("agent-output")
|
||||
.join("reflect");
|
||||
fs::create_dir_all(&state_dir).ok();
|
||||
|
||||
// Check transcript growth since last reflection
|
||||
let offset_path = state_dir.join("transcript-offset");
|
||||
let transcript_size = if !session.transcript_path.is_empty() {
|
||||
fs::metadata(&session.transcript_path).map(|m| m.len()).unwrap_or(0)
|
||||
} else { 0 };
|
||||
|
||||
let last_offset: u64 = fs::read_to_string(&offset_path).ok()
|
||||
.and_then(|s| s.trim().parse().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
const REFLECTION_INTERVAL: u64 = 100_000;
|
||||
if transcript_size.saturating_sub(last_offset) < REFLECTION_INTERVAL {
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't run if another reflection is already going
|
||||
let live = crate::agents::knowledge::scan_pid_files(&state_dir, 300);
|
||||
if !live.is_empty() {
|
||||
let _ = writeln!(log_f, "reflect: already running {:?}", live);
|
||||
return;
|
||||
}
|
||||
|
||||
// Copy walked nodes from surface-observe state dir so reflect can read them
|
||||
let so_state = crate::store::memory_dir()
|
||||
.join("agent-output")
|
||||
.join("surface-observe");
|
||||
if let Ok(walked) = fs::read_to_string(so_state.join("walked")) {
|
||||
fs::write(state_dir.join("walked"), &walked).ok();
|
||||
}
|
||||
|
||||
// Read previous reflection and inject into context
|
||||
if let Ok(reflection) = fs::read_to_string(state_dir.join("reflection")) {
|
||||
if !reflection.trim().is_empty() {
|
||||
use std::fmt::Write as _;
|
||||
writeln!(out, "--- subconscious reflection ---").ok();
|
||||
write!(out, "{}", reflection.trim()).ok();
|
||||
let _ = writeln!(log_f, "reflect: injected {} bytes", reflection.len());
|
||||
}
|
||||
fs::remove_file(state_dir.join("reflection")).ok();
|
||||
}
|
||||
|
||||
fs::write(&offset_path, transcript_size.to_string()).ok();
|
||||
let pid = crate::agents::knowledge::spawn_agent(
|
||||
"reflect", &state_dir, &session.session_id);
|
||||
let _ = writeln!(log_f, "reflect: spawned {:?}", pid);
|
||||
}
|
||||
|
||||
fn cleanup_stale_files(dir: &Path, max_age: Duration) {
|
||||
let entries = match fs::read_dir(dir) {
|
||||
Ok(e) => e,
|
||||
|
|
@ -272,6 +362,7 @@ fn hook(session: &Session) -> String {
|
|||
let cfg = crate::config::get();
|
||||
if cfg.surface_hooks.iter().any(|h| h == &session.hook_event) {
|
||||
surface_observe_cycle(session, &mut out, &mut log_f);
|
||||
reflection_cycle(session, &mut out, &mut log_f);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue