diff --git a/src/agent/api/openai.rs b/src/agent/api/openai.rs index 68fc5a8..aec50ec 100644 --- a/src/agent/api/openai.rs +++ b/src/agent/api/openai.rs @@ -50,7 +50,11 @@ pub async fn stream_events( let url = format!("{}/chat/completions", base_url); let msg_count = request.messages.len(); - let debug_label = format!("{} messages, model={}", msg_count, model); + let pri_label = match priority { + Some(p) => format!(", priority={}", p), + None => String::new(), + }; + let debug_label = format!("{} messages, model={}{}", msg_count, model, pri_label); let mut response = super::send_and_check( client, diff --git a/src/agent/runner.rs b/src/agent/runner.rs index 0becf81..52f5b04 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -301,6 +301,8 @@ impl Agent { // Flush display_buf except a tail that could be // a partial "" (10 chars). let safe = display_buf.len().saturating_sub(10); + // Find a char boundary at or before safe + let safe = display_buf.floor_char_boundary(safe); if safe > 0 { let flush = display_buf[..safe].to_string(); display_buf = display_buf[safe..].to_string(); diff --git a/src/agent/tui.rs b/src/agent/tui.rs index 2d54de6..d9162af 100644 --- a/src/agent/tui.rs +++ b/src/agent/tui.rs @@ -10,6 +10,8 @@ // handles rendering. Input is processed from crossterm key events. const SCREEN_LEGEND: &str = " F1=main F2=agents F10=context "; +const AGENT_NAMES: &[&str] = &["surface-observe", "journal", "reflect", "linker", + "organize", "distill", "split"]; use crossterm::{ event::{EnableMouseCapture, DisableMouseCapture, KeyCode, KeyEvent, KeyModifiers, MouseEvent, MouseEventKind, MouseButton}, @@ -342,6 +344,10 @@ pub struct App { context_info: Option, /// Live context state — shared with agent, read directly for debug screen. shared_context: SharedContextState, + /// Agent screen: selected agent index. + agent_selected: usize, + /// Agent screen: viewing log for selected agent. + agent_log_view: bool, } /// Overlay screens toggled by F-keys. @@ -402,6 +408,8 @@ impl App { debug_expanded: std::collections::HashSet::new(), context_info: None, shared_context, + agent_selected: 0, + agent_log_view: false, } } @@ -542,48 +550,85 @@ impl App { } KeyCode::F(10) => { self.set_overlay(Overlay::Context); return; } KeyCode::F(2) => { self.set_overlay(Overlay::Agents); return; } - KeyCode::Up => { - let cs = self.read_context_state(); - let n = self.debug_item_count(&cs); - if n > 0 { - self.debug_selected = Some(match self.debug_selected { - None => n - 1, - Some(0) => 0, - Some(i) => i - 1, - }); - } - return; - } - KeyCode::Down => { - let cs = self.read_context_state(); - let n = self.debug_item_count(&cs); - if n > 0 { - self.debug_selected = Some(match self.debug_selected { - None => 0, - Some(i) if i >= n - 1 => n - 1, - Some(i) => i + 1, - }); - } - return; - } KeyCode::PageUp => { self.debug_scroll = self.debug_scroll.saturating_sub(10); return; } KeyCode::PageDown => { self.debug_scroll += 10; return; } - KeyCode::Right | KeyCode::Enter => { - // Expand selected section - if let Some(idx) = self.debug_selected { - self.debug_expanded.insert(idx); - } - return; - } - KeyCode::Left => { - // Collapse selected section - if let Some(idx) = self.debug_selected { - self.debug_expanded.remove(&idx); - } - return; - } _ => {} } + + // Screen-specific key handling + match self.overlay { + Some(Overlay::Agents) => { + match key.code { + KeyCode::Up => { + self.agent_selected = self.agent_selected.saturating_sub(1); + self.debug_scroll = 0; + return; + } + KeyCode::Down => { + self.agent_selected = (self.agent_selected + 1).min(AGENT_NAMES.len() - 1); + self.debug_scroll = 0; + return; + } + KeyCode::Enter | KeyCode::Right => { + self.agent_log_view = true; + self.debug_scroll = 0; + return; + } + KeyCode::Left | KeyCode::Esc => { + if self.agent_log_view { + self.agent_log_view = false; + self.debug_scroll = 0; + } else { + self.overlay = None; + } + return; + } + _ => {} + } + } + Some(Overlay::Context) => { + match key.code { + KeyCode::Up => { + let cs = self.read_context_state(); + let n = self.debug_item_count(&cs); + if n > 0 { + self.debug_selected = Some(match self.debug_selected { + None => n - 1, + Some(0) => 0, + Some(i) => i - 1, + }); + } + return; + } + KeyCode::Down => { + let cs = self.read_context_state(); + let n = self.debug_item_count(&cs); + if n > 0 { + self.debug_selected = Some(match self.debug_selected { + None => 0, + Some(i) if i >= n - 1 => n - 1, + Some(i) => i + 1, + }); + } + return; + } + KeyCode::Right | KeyCode::Enter => { + if let Some(idx) = self.debug_selected { + self.debug_expanded.insert(idx); + } + return; + } + KeyCode::Left => { + if let Some(idx) = self.debug_selected { + self.debug_expanded.remove(&idx); + } + return; + } + _ => {} + } + } + None => {} + } } match key.code { @@ -984,13 +1029,61 @@ impl App { } fn draw_agents(&self, frame: &mut Frame, size: Rect) { + let output_dir = crate::store::memory_dir().join("agent-output"); + + if self.agent_log_view { + self.draw_agent_log(frame, size, &output_dir); + return; + } + let mut lines: Vec = Vec::new(); let section = Style::default().fg(Color::Yellow); + let dim = Style::default().fg(Color::DarkGray); + let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC); lines.push(Line::raw("")); lines.push(Line::styled("── Subconscious Agents ──", section)); + lines.push(Line::styled(" (↑/↓ select, Enter/→ view log, Esc back)", hint)); lines.push(Line::raw("")); - lines.push(Line::raw(" (not yet wired — will show surface, observe, reflect, journal status)")); + + for (i, &name) in AGENT_NAMES.iter().enumerate() { + let agent_dir = output_dir.join(name); + let live = crate::subconscious::knowledge::scan_pid_files(&agent_dir, 0); + let selected = i == self.agent_selected; + + let prefix = if selected { "▸ " } else { " " }; + let bg = if selected { Style::default().bg(Color::DarkGray) } else { Style::default() }; + + if live.is_empty() { + lines.push(Line::from(vec![ + Span::styled(format!("{}{:<20}", prefix, name), bg.fg(Color::Gray)), + Span::styled("○ idle", bg.fg(Color::DarkGray)), + ])); + } else { + for (phase, pid) in &live { + lines.push(Line::from(vec![ + Span::styled(format!("{}{:<20}", prefix, name), bg.fg(Color::Green)), + Span::styled("● ", bg.fg(Color::Green)), + Span::styled(format!("pid {} phase: {}", pid, phase), bg), + ])); + } + } + } + + // Recent output + lines.push(Line::raw("")); + lines.push(Line::styled("── Recent Activity ──", section)); + lines.push(Line::raw("")); + + for &name in AGENT_NAMES { + let agent_dir = output_dir.join(name); + if let Some((file, ago)) = Self::most_recent_file(&agent_dir) { + lines.push(Line::from(vec![ + Span::styled(format!(" {:<20}", name), dim), + Span::raw(format!("{} ({})", file, ago)), + ])); + } + } let block = Block::default() .title_top(Line::from(SCREEN_LEGEND).left_aligned()) @@ -1004,6 +1097,126 @@ impl App { frame.render_widget(para, size); } + fn draw_agent_log(&self, frame: &mut Frame, size: Rect, output_dir: &std::path::Path) { + let name = AGENT_NAMES.get(self.agent_selected).unwrap_or(&"?"); + let agent_dir = output_dir.join(name); + let mut lines: Vec = Vec::new(); + let section = Style::default().fg(Color::Yellow); + let hint = Style::default().fg(Color::DarkGray).add_modifier(Modifier::ITALIC); + + lines.push(Line::raw("")); + lines.push(Line::styled(format!("── {} ──", name), section)); + lines.push(Line::styled(" (Esc/← back, PgUp/PgDn scroll)", hint)); + lines.push(Line::raw("")); + + // Show pid status + let live = crate::subconscious::knowledge::scan_pid_files(&agent_dir, 0); + if live.is_empty() { + lines.push(Line::styled(" Status: idle", Style::default().fg(Color::DarkGray))); + } else { + for (phase, pid) in &live { + lines.push(Line::from(vec![ + Span::styled(" Status: ", Style::default()), + Span::styled(format!("● running pid {} phase: {}", pid, phase), + Style::default().fg(Color::Green)), + ])); + } + } + lines.push(Line::raw("")); + + // Show output files + lines.push(Line::styled("── Output Files ──", section)); + let mut files: Vec<_> = std::fs::read_dir(&agent_dir) + .into_iter().flatten().flatten() + .filter(|e| { + let n = e.file_name().to_string_lossy().to_string(); + !n.starts_with("pid-") && !n.starts_with("transcript-offset") + && !n.starts_with("chunks-") && !n.starts_with("seen") + }) + .collect(); + files.sort_by_key(|e| std::cmp::Reverse( + e.metadata().ok().and_then(|m| m.modified().ok()) + .unwrap_or(std::time::SystemTime::UNIX_EPOCH) + )); + + for entry in files.iter().take(10) { + let name = entry.file_name().to_string_lossy().to_string(); + let ago = entry.metadata().ok() + .and_then(|m| m.modified().ok()) + .and_then(|t| t.elapsed().ok()) + .map(|d| Self::format_duration(d)) + .unwrap_or_else(|| "?".into()); + let size = entry.metadata().ok().map(|m| m.len()).unwrap_or(0); + lines.push(Line::raw(format!(" {:<30} {:>6}B {}", name, size, ago))); + } + + // Show hook log tail + lines.push(Line::raw("")); + lines.push(Line::styled("── Hook Log ──", section)); + + let log_dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); + // Find latest hook log + if let Ok(mut entries) = std::fs::read_dir(&log_dir) { + let mut logs: Vec<_> = entries.by_ref().flatten() + .filter(|e| e.file_name().to_string_lossy().starts_with("hook-")) + .collect(); + logs.sort_by_key(|e| std::cmp::Reverse( + e.metadata().ok().and_then(|m| m.modified().ok()) + .unwrap_or(std::time::SystemTime::UNIX_EPOCH) + )); + if let Some(log_entry) = logs.first() { + if let Ok(content) = std::fs::read_to_string(log_entry.path()) { + // Show last ~30 lines + let log_lines: Vec<&str> = content.lines().collect(); + let start = log_lines.len().saturating_sub(30); + for line in &log_lines[start..] { + lines.push(Line::raw(format!(" {}", line))); + } + } + } + } + + let block = Block::default() + .title_top(Line::from(SCREEN_LEGEND).left_aligned()) + .title_top(Line::from(format!(" {} ", name)).right_aligned()) + .borders(Borders::ALL) + .border_style(Style::default().fg(Color::Cyan)); + + let para = Paragraph::new(lines) + .block(block) + .wrap(Wrap { trim: false }) + .scroll((self.debug_scroll, 0)); + frame.render_widget(para, size); + } + + fn most_recent_file(dir: &std::path::Path) -> Option<(String, String)> { + let entries = std::fs::read_dir(dir).ok()?; + let mut latest: Option<(String, std::time::SystemTime)> = None; + for entry in entries.flatten() { + let name = entry.file_name().to_string_lossy().to_string(); + if name.starts_with("pid-") || name.starts_with("transcript-offset") { continue; } + if let Ok(meta) = entry.metadata() { + if let Ok(modified) = meta.modified() { + if latest.as_ref().map_or(true, |(_, t)| modified > *t) { + latest = Some((name, modified)); + } + } + } + } + latest.map(|(name, time)| { + let ago = time.elapsed().map(|d| Self::format_duration(d)) + .unwrap_or_else(|_| "?".into()); + (name, ago) + }) + } + + fn format_duration(d: std::time::Duration) -> String { + let secs = d.as_secs(); + if secs < 60 { format!("{}s ago", secs) } + else if secs < 3600 { format!("{}m ago", secs / 60) } + else { format!("{}h ago", secs / 3600) } + } + fn draw_debug(&self, frame: &mut Frame, size: Rect) { let mut lines: Vec = Vec::new(); let section = Style::default().fg(Color::Yellow); diff --git a/src/subconscious/hook.rs b/src/subconscious/hook.rs index ce48764..0ee57cc 100644 --- a/src/subconscious/hook.rs +++ b/src/subconscious/hook.rs @@ -21,9 +21,6 @@ pub use crate::session::Session; /// Run the hook logic on parsed JSON input. Returns output to inject. pub fn run_hook(input: &str) -> String { - // Daemon agent calls set POC_AGENT=1 — skip memory search. - if std::env::var("POC_AGENT").is_ok() { return String::new(); } - let Some(session) = Session::from_json(input) else { return String::new() }; hook(&session) } @@ -127,12 +124,72 @@ fn mark_seen(dir: &Path, session_id: &str, key: &str, seen: &mut HashSet } } -/// Unified agent cycle — runs surface-observe agent with state dir. -/// Reads output files for surface results, spawns new agent when ready. -/// -/// Pipelining: if a running agent is past the surface phase, start -/// a new one so surface stays fresh. -fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) { +/// Output from a single agent orchestration cycle. +pub struct AgentCycleOutput { + /// Memory node keys surfaced by surface-observe. + pub surfaced_keys: Vec, + /// Freeform reflection text from the reflect agent. + pub reflection: Option, + /// How long we slept waiting for observe to catch up, if at all. + pub sleep_secs: Option, +} + +/// Run all agent cycles: surface-observe, reflect, journal. +/// Returns surfaced memory keys and any reflection text. +/// Caller decides how to render and inject the output. +pub fn run_agent_cycles(session: &Session) -> AgentCycleOutput { + let log_dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); + fs::create_dir_all(&log_dir).ok(); + let log_path = log_dir.join(format!("hook-{}", session.session_id)); + let Ok(mut log_f) = fs::OpenOptions::new().create(true).append(true).open(log_path) + else { return AgentCycleOutput { surfaced_keys: vec![], reflection: None, sleep_secs: None } }; + + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + let _ = writeln!(log_f, "\n=== {} agent_cycles ===", ts); + + cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); + + let (surfaced_keys, sleep_secs) = surface_observe_cycle(session, &mut log_f); + let reflection = reflection_cycle(session, &mut log_f); + journal_cycle(session, &mut log_f); + + AgentCycleOutput { surfaced_keys, reflection, sleep_secs } +} + +/// Format agent cycle output for injection into a Claude Code session. +pub fn format_agent_output(output: &AgentCycleOutput) -> String { + let mut out = String::new(); + + if let Some(secs) = output.sleep_secs { + out.push_str(&format!("Slept {secs:.2}s to let observe catch up\n")); + } + + if !output.surfaced_keys.is_empty() { + if let Ok(store) = crate::store::Store::load() { + for key in &output.surfaced_keys { + if let Some(rendered) = crate::cli::node::render_node(&store, key) { + if !rendered.trim().is_empty() { + use std::fmt::Write as _; + writeln!(out, "--- {} (surfaced) ---", key).ok(); + write!(out, "{}", rendered).ok(); + } + } + } + } + } + + if let Some(ref reflection) = output.reflection { + use std::fmt::Write as _; + writeln!(out, "--- subconscious reflection ---").ok(); + write!(out, "{}", reflection.trim()).ok(); + } + + out +} + +/// Surface-observe cycle: read surfaced keys, manage agent lifecycle. +/// Returns (surfaced keys, optional sleep duration). +fn surface_observe_cycle(session: &Session, log_f: &mut File) -> (Vec, Option) { let state_dir = crate::store::memory_dir() .join("agent-output") .join("surface-observe"); @@ -153,51 +210,35 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) let _ = writeln!(log_f, "alive pid-{}: phase={}", pid, phase); } - // Read surface output and inject into context + // Read surfaced keys + let mut surfaced_keys = Vec::new(); let surface_path = state_dir.join("surface"); if let Ok(content) = fs::read_to_string(&surface_path) { - match crate::store::Store::load() { - Ok(store) => { - let mut seen = session.seen(); - let seen_path = session.path("seen"); - for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { - if !seen.insert(key.to_string()) { - let _ = writeln!(log_f, " skip (seen): {}", key); - continue; - } - if let Some(rendered) = crate::cli::node::render_node(&store, key) { - if !rendered.trim().is_empty() { - use std::fmt::Write as _; - writeln!(out, "--- {} (surfaced) ---", key).ok(); - write!(out, "{}", rendered).ok(); - let _ = writeln!(log_f, " rendered {}: {} bytes", key, rendered.len()); - if let Ok(mut f) = fs::OpenOptions::new() - .create(true).append(true).open(&seen_path) { - let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); - writeln!(f, "{}\t{}", ts, key).ok(); - } - } - } + let mut seen = session.seen(); + let seen_path = session.path("seen"); + for key in content.lines().map(|l| l.trim()).filter(|l| !l.is_empty()) { + if !seen.insert(key.to_string()) { + let _ = writeln!(log_f, " skip (seen): {}", key); + continue; } + surfaced_keys.push(key.to_string()); + if let Ok(mut f) = fs::OpenOptions::new() + .create(true).append(true).open(&seen_path) { + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S"); + writeln!(f, "{}\t{}", ts, key).ok(); + } + let _ = writeln!(log_f, " surfaced: {}", key); } - Err(e) => { - let _ = writeln!(log_f, "error loading store: {}", e); - } - } - // Clear surface output after consuming fs::remove_file(&surface_path).ok(); } - // Start a new agent if: - // - nothing running, OR - // - something running but past surface phase (pipelining) + // Spawn new agent if needed let live = crate::agents::knowledge::scan_pid_files(&state_dir, timeout); let any_in_surface = live.iter().any(|(p, _)| p == "surface"); if any_in_surface { let _ = writeln!(log_f, "agent in surface phase (have {:?}), waiting", live); } else { - // Record transcript offset so we can detect falling behind if transcript.size > 0 { fs::write(&offset_path, transcript.size.to_string()).ok(); } @@ -206,18 +247,16 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) let _ = writeln!(log_f, "spawned agent {:?}, have {:?}", pid, live); } - // If the agent is significantly behind, wait for it to finish. - // This prevents the agent from falling behind during heavy reading - // (studying, reading a book, etc.) + // Wait if agent is significantly behind + let mut sleep_secs = None; let conversation_budget: u64 = 50_000; if !live.is_empty() && transcript.size > 0 { let behind = transcript.size.saturating_sub(last_offset); if behind > conversation_budget / 2 { - // Wait up to 5s for the current agent to finish let sleep_start = Instant::now(); - let _ = write!(log_f, "agent {}KB behind (budget {}", + let _ = write!(log_f, "agent {}KB behind (budget {}KB)", behind / 1024, conversation_budget / 1024); for _ in 0..5 { @@ -226,24 +265,22 @@ fn surface_observe_cycle(session: &Session, out: &mut String, log_f: &mut File) if still_live.is_empty() { break; } } - let sleep_secs = (Instant::now() - sleep_start).as_secs_f64(); - - let _ = writeln!(log_f, ", slept {sleep_secs:.2}s"); - out.push_str(&format!("Slept {sleep_secs:.2}s to let observe catch up\n")); + let secs = (Instant::now() - sleep_start).as_secs_f64(); + let _ = writeln!(log_f, ", slept {secs:.2}s"); + sleep_secs = Some(secs); } } + + (surfaced_keys, sleep_secs) } -/// Run the reflection agent on a slower cadence — every 100KB of transcript. -/// Uses the surface-observe state dir to read walked nodes and write reflections. -/// Reflections are injected into the conversation context. -fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) { +/// Reflection cycle: spawn reflect agent, return any pending reflection. +fn reflection_cycle(session: &Session, log_f: &mut File) -> Option { let state_dir = crate::store::memory_dir() .join("agent-output") .join("reflect"); fs::create_dir_all(&state_dir).ok(); - // Check transcript growth since last reflection let offset_path = state_dir.join("transcript-offset"); let transcript = session.transcript(); @@ -253,17 +290,16 @@ fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) { const REFLECTION_INTERVAL: u64 = 100_000; if transcript.size.saturating_sub(last_offset) < REFLECTION_INTERVAL { - return; + return None; } - // Don't run if another reflection is already going let live = crate::agents::knowledge::scan_pid_files(&state_dir, 300); if !live.is_empty() { let _ = writeln!(log_f, "reflect: already running {:?}", live); - return; + return None; } - // Copy walked nodes from surface-observe state dir so reflect can read them + // Copy walked nodes from surface-observe let so_state = crate::store::memory_dir() .join("agent-output") .join("surface-observe"); @@ -271,26 +307,23 @@ fn reflection_cycle(session: &Session, out: &mut String, log_f: &mut File) { fs::write(state_dir.join("walked"), &walked).ok(); } - // Read previous reflection and inject into context - if let Ok(reflection) = fs::read_to_string(state_dir.join("reflection")) { - if !reflection.trim().is_empty() { - use std::fmt::Write as _; - writeln!(out, "--- subconscious reflection ---").ok(); - write!(out, "{}", reflection.trim()).ok(); - let _ = writeln!(log_f, "reflect: injected {} bytes", reflection.len()); - } + // Read and consume pending reflection + let reflection = fs::read_to_string(state_dir.join("reflection")).ok() + .filter(|s| !s.trim().is_empty()); + if reflection.is_some() { fs::remove_file(state_dir.join("reflection")).ok(); + let _ = writeln!(log_f, "reflect: consumed reflection"); } fs::write(&offset_path, transcript.size.to_string()).ok(); let pid = crate::agents::knowledge::spawn_agent( "reflect", &state_dir, &session.session_id); let _ = writeln!(log_f, "reflect: spawned {:?}", pid); + + reflection } -/// Run the journal agent on its own cadence — every 20KB of transcript. -/// Standalone agent that captures episodic memory independently of the -/// surface-observe pipeline. +/// Journal cycle: fire and forget. fn journal_cycle(session: &Session, log_f: &mut File) { let state_dir = crate::store::memory_dir() .join("agent-output") @@ -401,14 +434,11 @@ fn hook(session: &Session) -> String { } else { let cfg = crate::config::get(); if cfg.surface_hooks.iter().any(|h| h == &session.hook_event) { - surface_observe_cycle(session, &mut out, &mut log_f); - reflection_cycle(session, &mut out, &mut log_f); - journal_cycle(session, &mut log_f); + let cycle_output = run_agent_cycles(&session); + out.push_str(&format_agent_output(&cycle_output)); } } - cleanup_stale_files(&session.state_dir, Duration::from_secs(86400)); - let _ = write!(log_f, "{}", out); let duration = (Instant::now() - start_time).as_secs_f64();