consciousness/src/bin/consciousness.rs
ProofOfConcept a7f19cdc7e tui: event-driven rendering with dirty bit
Only redraw when something actually changed. The 50ms render
interval still ticks (for process count updates) but no longer
triggers draws. Dirty is set by key events, mouse events,
resize, UI messages, turn completions, and DMN ticks.

Saves bandwidth over SSH and reduces CPU usage when idle.

Co-Developed-By: Kent Overstreet <kent.overstreet@linux.dev>
2026-04-03 18:51:22 -04:00

1200 lines
44 KiB
Rust

// poc-agent — Substrate-independent AI agent
//
// A minimal but complete agent framework designed for identity
// portability across LLM substrates. Loads the same CLAUDE.md,
// memory files, and configuration regardless of which model is
// running underneath.
//
// v0.3 — TUI. Split-pane terminal UI: autonomous output in top-left,
// conversation in bottom-left, tool activity on the right, status
// bar at the bottom. Uses ratatui + crossterm.
//
// Agent turns run in spawned tasks so the main loop stays responsive.
// The TUI re-renders at 20fps, showing streaming tokens and tool
// activity in real time.
//
// The event loop uses biased select! so priorities are deterministic:
// keyboard events > turn results > render ticks > DMN timer > UI messages.
// This ensures user input is never starved by background work.
//
// Named after its first resident: ProofOfConcept.
/// Write a debug line to /tmp/poc-debug.log. Used for diagnostics that
/// can't go to stderr (TUI owns the terminal).
use anyhow::Result;
use crossterm::event::{Event, EventStream, KeyEventKind};
use futures::StreamExt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex};
use clap::Parser;
use poc_memory::dbglog;
use poc_memory::user::*;
use poc_memory::user::runner::{Agent, TurnResult};
use poc_memory::user::api::ApiClient;
use poc_memory::user::tui::HotkeyAction;
use poc_memory::config::{self, AppConfig, SessionConfig};
use poc_memory::user::ui_channel::{ContextInfo, StatusInfo, StreamTarget, UiMessage};
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
fn compaction_threshold(app: &AppConfig) -> u32 {
(poc_memory::thought::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
}
#[tokio::main]
async fn main() {
let cli = cli::CliArgs::parse();
// Subcommands that don't launch the TUI
match &cli.command {
Some(cli::SubCmd::Read { follow, block }) => {
if let Err(e) = observe::cmd_read_inner(*follow, *block, cli.debug).await {
eprintln!("{:#}", e);
std::process::exit(1);
}
return;
}
Some(cli::SubCmd::Write { message }) => {
let msg = message.join(" ");
if msg.is_empty() {
eprintln!("Usage: consciousness write <message>");
std::process::exit(1);
}
if let Err(e) = observe::cmd_write(&msg, cli.debug).await {
eprintln!("{:#}", e);
std::process::exit(1);
}
return;
}
None => {}
}
// --show-config: print effective config and exit (before TUI init)
if cli.show_config {
match config::load_app(&cli) {
Ok((app, figment)) => {
config::show_config(&app, &figment);
}
Err(e) => {
eprintln!("Error loading config: {:#}", e);
std::process::exit(1);
}
}
return;
}
if let Err(e) = run(cli).await {
// If we crash, make sure terminal is restored
let _ = crossterm::terminal::disable_raw_mode();
let _ = crossterm::execute!(
std::io::stdout(),
crossterm::terminal::LeaveAlternateScreen
);
eprintln!("Error: {:#}", e);
std::process::exit(1);
}
}
/// Commands that are handled in the main loop, not sent to the agent.
enum Command {
Quit,
Handled,
None,
}
// --- Session: all mutable state for a running agent session ---
/// Collects the ~15 loose variables that previously lived in run()
/// into a coherent struct with methods. The event loop dispatches
/// to Session methods; Session manages turns, compaction, DMN state,
/// and slash commands.
struct Session {
agent: Arc<Mutex<Agent>>,
config: SessionConfig,
process_tracker: tools::ProcessTracker,
ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
// DMN state
dmn: dmn::State,
dmn_turns: u32,
max_dmn_turns: u32,
// Turn tracking
turn_in_progress: bool,
turn_handle: Option<tokio::task::JoinHandle<()>>,
/// User messages received while a turn is in progress.
/// Consolidated into one message (newline-separated) so the
/// model sees everything the user typed, not just the first line.
pending_input: Option<String>,
// Per-turn tracking for DMN context
last_user_input: Instant,
consecutive_errors: u32,
last_turn_had_tools: bool,
}
impl Session {
fn new(
agent: Arc<Mutex<Agent>>,
config: SessionConfig,
process_tracker: tools::ProcessTracker,
ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
) -> Self {
let max_dmn_turns = config.app.dmn.max_turns;
Self {
agent,
config,
process_tracker,
ui_tx,
turn_tx,
dmn: if dmn::is_off() {
dmn::State::Off
} else {
dmn::State::Resting { since: Instant::now() }
},
dmn_turns: 0,
max_dmn_turns,
turn_in_progress: false,
turn_handle: None,
pending_input: None,
last_user_input: Instant::now(),
consecutive_errors: 0,
last_turn_had_tools: false,
}
}
/// How long before the next DMN tick.
fn dmn_interval(&self) -> Duration {
self.dmn.interval()
}
/// Spawn an agent turn in a background task.
fn spawn_turn(&mut self, input: String, target: StreamTarget) {
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
let result_tx = self.turn_tx.clone();
self.turn_in_progress = true;
self.turn_handle = Some(tokio::spawn(async move {
let mut agent = agent.lock().await;
let result = agent.turn(&input, &ui_tx, target).await;
let _ = result_tx.send((result, target)).await;
}));
}
/// Submit user input — either queue it (if a turn is running) or
/// start a new turn immediately.
fn submit_input(&mut self, input: String) {
if self.turn_in_progress {
match &mut self.pending_input {
Some(existing) => {
existing.push('\n');
existing.push_str(&input);
}
None => self.pending_input = Some(input.clone()),
}
let _ = self.ui_tx.send(UiMessage::Info("(queued)".into()));
} else {
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
self.update_status();
self.spawn_turn(input, StreamTarget::Conversation);
}
}
/// Process a completed turn: update DMN state, check compaction,
/// drain any queued input.
async fn handle_turn_result(
&mut self,
result: Result<TurnResult>,
target: StreamTarget,
) {
self.turn_in_progress = false;
self.turn_handle = None;
match result {
Ok(turn_result) => {
if turn_result.tool_errors > 0 {
self.consecutive_errors += turn_result.tool_errors;
} else {
self.consecutive_errors = 0;
}
self.last_turn_had_tools = turn_result.had_tool_calls;
self.dmn = dmn::transition(
&self.dmn,
turn_result.yield_requested,
turn_result.had_tool_calls,
target == StreamTarget::Conversation,
);
if turn_result.dmn_pause {
self.dmn = dmn::State::Paused;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN paused (agent requested). Ctrl+P or /wake to resume.".into(),
));
}
if let Some(model_name) = turn_result.model_switch {
self.switch_model(&model_name).await;
}
}
Err(e) => {
self.consecutive_errors += 1;
let msg = match target {
StreamTarget::Autonomous => {
UiMessage::DmnAnnotation(format!("[error: {:#}]", e))
}
StreamTarget::Conversation => {
UiMessage::Info(format!("Error: {:#}", e))
}
};
let _ = self.ui_tx.send(msg);
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
}
}
self.update_status();
self.check_compaction().await;
self.drain_pending();
}
/// Check if compaction is needed after a turn. Two thresholds:
/// - Soft (80%): nudge the model to journal before we compact
/// - Hard (90%): compact immediately, ready or not
async fn check_compaction(&mut self) {
let mut agent_guard = self.agent.lock().await;
let tokens = agent_guard.last_prompt_tokens();
let threshold = compaction_threshold(&self.config.app);
if tokens > threshold {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"[compaction: {}K > {}K threshold]",
tokens / 1000,
threshold / 1000,
)));
agent_guard.compact();
let _ = self.ui_tx.send(UiMessage::Info(
"[compacted — journal + recent messages]".into(),
));
self.send_context_info();
}
}
/// Send any consolidated pending input as a single turn.
fn drain_pending(&mut self) {
if let Some(queued) = self.pending_input.take() {
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
let _ = self.ui_tx.send(UiMessage::UserInput(queued.clone()));
self.update_status();
self.spawn_turn(queued, StreamTarget::Conversation);
}
}
/// Fire a DMN tick: check max turns, generate prompt, spawn turn.
fn dmn_tick(&mut self) {
// Paused/Off state: no autonomous ticks at all.
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
return;
}
self.dmn_turns += 1;
if self.dmn_turns > self.max_dmn_turns {
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
"[dmn: {} consecutive turns, resting (limit: {})]",
self.dmn_turns - 1,
self.max_dmn_turns,
)));
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
self.dmn_turns = 0;
self.update_status();
return;
}
let dmn_ctx = dmn::DmnContext {
user_idle: self.last_user_input.elapsed(),
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
};
let prompt = self.dmn.prompt(&dmn_ctx);
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
"[dmn: {} ({}/{})]",
self.dmn.label(),
self.dmn_turns,
self.max_dmn_turns,
)));
self.update_status();
self.spawn_turn(prompt, StreamTarget::Autonomous);
}
/// Handle slash commands. Returns how the main loop should respond.
async fn handle_command(&mut self, input: &str) -> Command {
// Declarative command table — /help reads from this.
const COMMANDS: &[(&str, &str)] = &[
("/quit", "Exit consciousness"),
("/new", "Start fresh session (saves current)"),
("/save", "Save session to disk"),
("/retry", "Re-run last turn"),
("/model", "Show/switch model (/model <name>)"),
("/score", "Score memory importance"),
("/dmn", "Show DMN state"),
("/sleep", "Put DMN to sleep"),
("/wake", "Wake DMN to foraging"),
("/pause", "Full stop — no autonomous ticks (Ctrl+P)"),
("/test", "Run tool smoke tests"),
("/help", "Show this help"),
];
match input {
"/quit" | "/exit" => Command::Quit,
"/save" => {
let _ = self.ui_tx.send(UiMessage::Info(
"Conversation is saved automatically (append-only log).".into()
));
Command::Handled
}
"/new" | "/clear" => {
if self.turn_in_progress {
let _ = self
.ui_tx
.send(UiMessage::Info("(turn in progress, please wait)".into()));
return Command::Handled;
}
{
let new_log = log::ConversationLog::new(
self.config.session_dir.join("conversation.jsonl"),
)
.ok();
let mut agent_guard = self.agent.lock().await;
let shared_ctx = agent_guard.shared_context.clone();
*agent_guard = Agent::new(
ApiClient::new(
&self.config.api_base,
&self.config.api_key,
&self.config.model,
),
self.config.system_prompt.clone(),
self.config.context_parts.clone(),
self.config.app.clone(),
self.config.prompt_file.clone(),
new_log,
shared_ctx,
);
}
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
let _ = self
.ui_tx
.send(UiMessage::Info("New session started.".into()));
Command::Handled
}
"/model" => {
if let Ok(agent) = self.agent.try_lock() {
let _ = self.ui_tx.send(UiMessage::Info(
format!("Current model: {}", agent.model()),
));
let names = self.config.app.model_names();
if !names.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info(
format!("Available: {}", names.join(", ")),
));
}
} else {
let _ = self.ui_tx.send(UiMessage::Info("(busy)".into()));
}
Command::Handled
}
"/score" => {
// Snapshot context+client while we have the lock,
// so the scoring task doesn't need to wait for turns.
let (context, client) = {
let mut agent = self.agent.lock().await;
if agent.scoring_in_flight {
let _ = self.ui_tx.send(UiMessage::Info(
"(scoring already in progress)".into()
));
return Command::Handled;
}
agent.scoring_in_flight = true;
(agent.context.clone(), agent.client_clone())
};
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
let _ = self.ui_tx.send(UiMessage::Debug("[score] task spawning".into()));
tokio::spawn(async move {
let _ = ui_tx.send(UiMessage::Debug("[score] task started, calling score_memories".into()));
let result = poc_memory::thought::training::score_memories(
&context, &client, &ui_tx,
).await;
let _ = ui_tx.send(UiMessage::Debug("[score] score_memories returned, acquiring lock".into()));
// Store results — brief lock, just setting fields
let mut agent = agent.lock().await;
let _ = ui_tx.send(UiMessage::Debug("[score] lock acquired, storing results".into()));
agent.scoring_in_flight = false;
match result {
Ok(scores) => {
agent.memory_scores = Some(scores);
}
Err(e) => {
let _ = ui_tx.send(UiMessage::Info(format!(
"[scoring failed: {:#}]", e,
)));
}
}
agent.publish_context_state();
});
Command::Handled
}
"/dmn" => {
let _ = self
.ui_tx
.send(UiMessage::Info(format!("DMN state: {:?}", self.dmn)));
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Next tick in: {:?}",
self.dmn.interval()
)));
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Consecutive DMN turns: {}/{}",
self.dmn_turns, self.max_dmn_turns,
)));
Command::Handled
}
"/sleep" => {
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN sleeping (heartbeat every 5 min). Type anything to wake."
.into(),
));
Command::Handled
}
"/wake" => {
let was_paused = matches!(self.dmn, dmn::State::Paused | dmn::State::Off);
if matches!(self.dmn, dmn::State::Off) {
dmn::set_off(false);
}
self.dmn = dmn::State::Foraging;
self.dmn_turns = 0;
let msg = if was_paused {
"DMN unpaused — entering foraging mode."
} else {
"DMN waking — entering foraging mode."
};
let _ = self.ui_tx.send(UiMessage::Info(msg.into()));
self.update_status();
Command::Handled
}
"/pause" => {
self.dmn = dmn::State::Paused;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN paused — no autonomous ticks. Ctrl+P or /wake to resume.".into(),
));
self.update_status();
Command::Handled
}
"/test" => {
let _ = self
.ui_tx
.send(UiMessage::Info("Running tool smoke tests...".into()));
run_tool_tests(&self.ui_tx, &self.process_tracker).await;
Command::Handled
}
"/retry" => {
if self.turn_in_progress {
let _ = self
.ui_tx
.send(UiMessage::Info("(turn in progress, please wait)".into()));
return Command::Handled;
}
let mut agent_guard = self.agent.lock().await;
let entries = agent_guard.entries_mut();
let mut last_user_text = None;
while let Some(entry) = entries.last() {
if entry.message().role == poc_memory::user::types::Role::User {
last_user_text =
Some(entries.pop().unwrap().message().content_text().to_string());
break;
}
entries.pop();
}
drop(agent_guard);
match last_user_text {
Some(text) => {
let preview_len = text.len().min(60);
let _ = self.ui_tx.send(UiMessage::Info(format!(
"(retrying: {}...)",
&text[..preview_len]
)));
self.dmn_turns = 0;
self.dmn = dmn::State::Engaged;
self.spawn_turn(text, StreamTarget::Conversation);
}
None => {
let _ = self
.ui_tx
.send(UiMessage::Info("(nothing to retry)".into()));
}
}
Command::Handled
}
"/help" => {
for (name, desc) in COMMANDS {
let _ = self.ui_tx.send(UiMessage::Info(
format!(" {:12} {}", name, desc),
));
}
let _ = self.ui_tx.send(UiMessage::Info(String::new()));
let _ = self.ui_tx.send(UiMessage::Info(
"Keys: Tab=pane ^Up/Down=scroll PgUp/PgDn=scroll Mouse=click/scroll".into(),
));
let _ = self.ui_tx.send(UiMessage::Info(
" Alt+Enter=newline Esc=interrupt ^P=pause ^R=reasoning ^K=kill F10=context F2=agents".into(),
));
let _ = self.ui_tx.send(UiMessage::Info(
" Shift+click for native text selection (copy/paste)".into(),
));
Command::Handled
}
cmd if cmd.starts_with("/model ") => {
let name = cmd[7..].trim();
if name.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info("Usage: /model <name>".into()));
return Command::Handled;
}
self.switch_model(name).await;
Command::Handled
}
_ => Command::None,
}
}
/// Interrupt: kill processes, abort current turn, clear pending queue.
async fn interrupt(&mut self) {
let procs = self.process_tracker.list().await;
for p in &procs {
self.process_tracker.kill(p.pid).await;
}
// Only abort the turn if no processes are running — let SIGTERM'd
// processes exit normally so run_bash can unregister them.
if procs.is_empty() {
if let Some(handle) = self.turn_handle.take() {
handle.abort();
self.turn_in_progress = false;
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
self.update_status();
let _ = self.ui_tx.send(UiMessage::Activity(String::new()));
}
}
self.pending_input = None;
let killed = procs.len();
if killed > 0 || self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"(interrupted — killed {} process(es), turn aborted)",
killed
)));
} else {
let _ = self
.ui_tx
.send(UiMessage::Info("(interrupted)".into()));
}
}
/// Cycle reasoning effort: none → low → high → none.
fn cycle_reasoning(&mut self, app: &mut tui::App) {
if let Ok(mut agent_guard) = self.agent.try_lock() {
let next = match agent_guard.reasoning_effort.as_str() {
"none" => "low",
"low" => "high",
_ => "none",
};
agent_guard.reasoning_effort = next.to_string();
app.reasoning_effort = next.to_string();
let label = match next {
"none" => "off (monologue hidden)",
"low" => "low (brief monologue)",
"high" => "high (full monologue)",
_ => next,
};
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Reasoning: {} — ^R to cycle",
label
)));
} else {
let _ = self.ui_tx.send(UiMessage::Info(
"(agent busy — reasoning change takes effect next turn)".into(),
));
}
}
/// Show and kill running processes (Ctrl+K).
async fn kill_processes(&mut self) {
let procs = self.process_tracker.list().await;
if procs.is_empty() {
let _ = self
.ui_tx
.send(UiMessage::Info("(no running processes)".into()));
} else {
for p in &procs {
let elapsed = p.started.elapsed();
let _ = self.ui_tx.send(UiMessage::Info(format!(
" killing pid {} ({:.0}s): {}",
p.pid,
elapsed.as_secs_f64(),
p.command
)));
self.process_tracker.kill(p.pid).await;
}
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Killed {} process(es)",
procs.len()
)));
}
}
/// Cycle DMN autonomy: foraging → resting → paused → off → foraging.
/// From any other state, cycles to the "next" step down.
fn cycle_autonomy(&mut self) {
let (new_state, label) = match &self.dmn {
dmn::State::Engaged | dmn::State::Working | dmn::State::Foraging => {
(dmn::State::Resting { since: Instant::now() }, "resting")
}
dmn::State::Resting { .. } => {
(dmn::State::Paused, "PAUSED")
}
dmn::State::Paused => {
dmn::set_off(true);
(dmn::State::Off, "OFF (persists across restarts)")
}
dmn::State::Off => {
dmn::set_off(false);
(dmn::State::Foraging, "foraging")
}
};
self.dmn = new_state;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
format!("DMN → {} (Ctrl+P to cycle)", label),
));
self.update_status();
}
/// Switch to a named model from the config registry.
async fn switch_model(&mut self, name: &str) {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info(
"(turn in progress, please wait)".into(),
));
return;
}
let resolved = match self.config.app.resolve_model(name) {
Ok(r) => r,
Err(e) => {
let _ = self.ui_tx.send(UiMessage::Info(format!("{}", e)));
return;
}
};
let new_client = ApiClient::new(
&resolved.api_base,
&resolved.api_key,
&resolved.model_id,
);
let prompt_changed = resolved.prompt_file != self.config.prompt_file;
let mut agent_guard = self.agent.lock().await;
agent_guard.swap_client(new_client);
self.config.model = resolved.model_id.clone();
self.config.api_base = resolved.api_base;
self.config.api_key = resolved.api_key;
if prompt_changed {
self.config.prompt_file = resolved.prompt_file.clone();
agent_guard.prompt_file = resolved.prompt_file.clone();
agent_guard.compact();
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Switched to {} ({}) — prompt: {}, recompacted",
name, resolved.model_id, resolved.prompt_file,
)));
} else {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Switched to {} ({})",
name, resolved.model_id,
)));
}
drop(agent_guard);
self.update_status();
self.send_context_info();
}
/// Get context_groups from the unified config.
fn load_context_groups(&self) -> Vec<config::ContextGroup> {
config::get().context_groups.clone()
}
/// Send context loading info to the TUI debug screen.
fn send_context_info(&self) {
let context_groups = self.load_context_groups();
let (instruction_files, memory_files) = identity::context_file_info(
&self.config.prompt_file,
self.config.app.memory_project.as_deref(),
&context_groups,
);
let _ = self.ui_tx.send(UiMessage::ContextInfoUpdate(ContextInfo {
model: self.config.model.clone(),
available_models: self.config.app.model_names(),
prompt_file: self.config.prompt_file.clone(),
backend: self.config.app.backend.clone(),
instruction_files,
memory_files,
system_prompt_chars: self.config.system_prompt.len(),
context_message_chars: self.config.context_parts.iter().map(|(_, c)| c.len()).sum(),
}));
}
/// Send DMN status update to the TUI.
fn update_status(&self) {
let _ = self.ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: self.dmn.label().to_string(),
dmn_turns: self.dmn_turns,
dmn_max_turns: self.max_dmn_turns,
prompt_tokens: 0,
completion_tokens: 0,
model: String::new(),
turn_tools: 0,
context_budget: String::new(),
}));
}
/// Abort any running turn and save session. Called on exit.
async fn shutdown(&mut self) {
if let Some(handle) = self.turn_handle.take() {
handle.abort();
}
}
}
// --- Event loop ---
async fn run(cli: cli::CliArgs) -> Result<()> {
let (config, _figment) = config::load_session(&cli)?;
// Wire config.debug to the POC_DEBUG env var so all debug checks
// throughout the codebase (API, SSE reader, diagnostics) see it.
// Safety: called once at startup before any threads are spawned.
if config.app.debug {
unsafe { std::env::set_var("POC_DEBUG", "1") };
}
// Start channel daemons
let mut channel_supervisor = poc_memory::thalamus::supervisor::Supervisor::new();
channel_supervisor.load_config();
channel_supervisor.ensure_running();
// Create UI channel
let (ui_tx, mut ui_rx) = ui_channel::channel();
// Shared context state — agent writes, TUI reads for debug screen
let shared_context = ui_channel::shared_context_state();
// Initialize TUI
let mut terminal = tui::init_terminal()?;
let mut app = tui::App::new(config.model.clone(), shared_context.clone());
// Show startup info
let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into()));
let _ = ui_tx.send(UiMessage::Info(format!(
" model: {} (available: {})",
config.model,
config.app.model_names().join(", "),
)));
let client = ApiClient::new(&config.api_base, &config.api_key, &config.model);
let _ = ui_tx.send(UiMessage::Info(format!(
" api: {} ({})",
config.api_base,
client.backend_label()
)));
let _ = ui_tx.send(UiMessage::Info(format!(
" context: {}K chars ({} config, {} memory files)",
config.context_parts.iter().map(|(_, c)| c.len()).sum::<usize>() / 1024,
config.config_file_count,
config.memory_file_count,
)));
let conversation_log_path = config.session_dir.join("conversation.jsonl");
let conversation_log = log::ConversationLog::new(conversation_log_path.clone())
.expect("failed to create conversation log");
let _ = ui_tx.send(UiMessage::Info(format!(
" log: {}",
conversation_log.path().display()
)));
let agent = Arc::new(Mutex::new(Agent::new(
client,
config.system_prompt.clone(),
config.context_parts.clone(),
config.app.clone(),
config.prompt_file.clone(),
Some(conversation_log),
shared_context,
)));
// Keep a reference to the process tracker outside the agent lock
// so Ctrl+K can kill processes even when the agent is busy.
let process_tracker = agent.lock().await.process_tracker.clone();
// Restore conversation from the append-only log
{
let mut agent_guard = agent.lock().await;
if agent_guard.restore_from_log() {
replay_session_to_ui(agent_guard.entries(), &ui_tx);
let _ = ui_tx.send(UiMessage::Info(
"--- restored from conversation log ---".into(),
));
}
}
// Send initial budget to status bar
{
let agent_guard = agent.lock().await;
let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: "resting".to_string(),
dmn_turns: 0,
dmn_max_turns: 0,
prompt_tokens: 0,
completion_tokens: 0,
model: agent_guard.model().to_string(),
turn_tools: 0,
context_budget: agent_guard.budget().status_string(),
}));
}
// Channel for turn results from spawned tasks
let (turn_tx, mut turn_rx) =
mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
let mut session = Session::new(
agent,
config,
process_tracker,
ui_tx.clone(),
turn_tx,
);
session.update_status();
session.send_context_info();
// Start observation socket for external clients
let socket_path = session.config.session_dir.join("agent.sock");
let (observe_input_tx, mut observe_input_rx) = observe::input_channel();
observe::start(socket_path, ui_tx.subscribe(), observe_input_tx);
// Crossterm event stream
let mut reader = EventStream::new();
// Render timer — only draws when dirty
let mut render_interval = tokio::time::interval(Duration::from_millis(50));
render_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut dirty = true; // draw first frame
// Hide terminal cursor — tui-textarea renders its own cursor as a styled cell
terminal.hide_cursor()?;
// Initial render
drain_ui_messages(&mut ui_rx, &mut app);
terminal.draw(|f| app.draw(f))?;
loop {
let timeout = session.dmn_interval();
tokio::select! {
biased;
// Keyboard events (highest priority)
maybe_event = reader.next() => {
match maybe_event {
Some(Ok(Event::Key(key))) => {
if key.kind != KeyEventKind::Press {
continue;
}
app.handle_key(key);
dirty = true;
}
Some(Ok(Event::Mouse(mouse))) => {
app.handle_mouse(mouse);
dirty = true;
}
Some(Ok(Event::Resize(w, h))) => {
app.handle_resize(w, h);
terminal.clear()?;
dirty = true;
}
Some(Err(_)) => break,
None => break,
_ => continue,
}
}
// Input from observation socket clients
Some(line) = observe_input_rx.recv() => {
app.submitted.push(line);
dirty = true;
}
// Turn completed in background task
Some((result, target)) = turn_rx.recv() => {
session.handle_turn_result(result, target).await;
dirty = true;
}
// Render tick — only redraws if dirty
_ = render_interval.tick() => {
app.running_processes = session.process_tracker.list().await.len() as u32;
}
// DMN timer (only when no turn is running)
_ = tokio::time::sleep(timeout), if !session.turn_in_progress => {
session.dmn_tick();
dirty = true;
}
// UI messages (lowest priority — processed in bulk during render)
Some(msg) = ui_rx.recv() => {
app.handle_ui_message(msg);
dirty = true;
}
}
// Process submitted input
let submitted: Vec<String> = app.submitted.drain(..).collect();
for input in submitted {
let input = input.trim().to_string();
if input.is_empty() {
continue;
}
match session.handle_command(&input).await {
Command::Quit => app.should_quit = true,
Command::Handled => {}
Command::None => session.submit_input(input),
}
}
// Process hotkey actions
let actions: Vec<HotkeyAction> = app.hotkey_actions.drain(..).collect();
for action in actions {
match action {
HotkeyAction::CycleReasoning => session.cycle_reasoning(&mut app),
HotkeyAction::KillProcess => session.kill_processes().await,
HotkeyAction::Interrupt => session.interrupt().await,
HotkeyAction::CycleAutonomy => session.cycle_autonomy(),
}
}
// Drain pending UI messages
if drain_ui_messages(&mut ui_rx, &mut app) {
dirty = true;
}
// Only redraw when something changed
if dirty {
terminal.draw(|f| app.draw(f))?;
dirty = false;
}
if app.should_quit {
break;
}
}
session.shutdown().await;
tui::restore_terminal(&mut terminal)?;
Ok(())
}
// --- Free functions ---
fn drain_ui_messages(rx: &mut ui_channel::UiReceiver, app: &mut tui::App) -> bool {
let mut any = false;
while let Ok(msg) = rx.try_recv() {
app.handle_ui_message(msg);
any = true;
}
any
}
async fn run_tool_tests(ui_tx: &ui_channel::UiSender, tracker: &tools::ProcessTracker) {
use serde_json::json;
let tests: Vec<(&str, serde_json::Value, bool)> = vec![
("read_file", json!({"file_path": "/etc/hostname"}), true),
(
"read_file",
json!({"file_path": "/nonexistent/path"}),
false,
),
(
"write_file",
json!({"file_path": "/tmp/poc-agent-test.txt", "content": "hello from poc-agent\n"}),
true,
),
(
"read_file",
json!({"file_path": "/tmp/poc-agent-test.txt"}),
true,
),
(
"edit_file",
json!({"file_path": "/tmp/poc-agent-test.txt", "old_string": "hello", "new_string": "goodbye"}),
true,
),
(
"read_file",
json!({"file_path": "/tmp/poc-agent-test.txt"}),
true,
),
(
"bash",
json!({"command": "echo 'tool test passed'"}),
true,
),
("bash", json!({"command": "sleep 5", "timeout_secs": 1}), false),
(
"grep",
json!({"pattern": "fn main", "path": "src/", "show_content": true}),
true,
),
("glob", json!({"pattern": "src/**/*.rs"}), true),
("yield_to_user", json!({"message": "test yield"}), true),
];
let mut pass = 0;
let mut fail = 0;
for (name, args, should_succeed) in &tests {
let output = tools::dispatch(name, args, tracker).await;
let is_error = output.text.starts_with("Error:");
let ok = if *should_succeed { !is_error } else { is_error };
if ok {
let _ = ui_tx.send(UiMessage::Info(format!(" PASS: {}", name)));
pass += 1;
} else {
let _ = ui_tx.send(UiMessage::Info(format!(
" FAIL: {}{}",
name,
&output.text[..output.text.len().min(100)]
)));
fail += 1;
}
}
let _ = std::fs::remove_file("/tmp/poc-agent-test.txt");
let _ = ui_tx.send(UiMessage::Info(format!(
" {} passed, {} failed",
pass, fail
)));
}
/// Replay a restored session into the TUI panes so the user can see
/// conversation history immediately on restart. Shows user input,
/// assistant responses, and brief tool call summaries. Skips the system
/// prompt, context message, DMN plumbing, and image injection messages.
fn replay_session_to_ui(entries: &[types::ConversationEntry], ui_tx: &ui_channel::UiSender) {
use poc_memory::user::ui_channel::StreamTarget;
dbglog!("[replay] replaying {} entries to UI", entries.len());
for (i, e) in entries.iter().enumerate() {
let m = e.message();
let preview: String = m.content_text().chars().take(60).collect();
dbglog!("[replay] [{}] {:?} mem={} tc={} tcid={:?} {:?}",
i, m.role, e.is_memory(), m.tool_calls.as_ref().map_or(0, |t| t.len()),
m.tool_call_id.as_deref(), preview);
}
let mut seen_first_user = false;
let mut target = StreamTarget::Conversation;
for entry in entries {
// Memory entries are in the context window but not the conversation display
if entry.is_memory() { continue; }
let msg = entry.message();
match msg.role {
types::Role::System => {}
types::Role::User => {
// Skip context message (always the first user message)
if !seen_first_user {
seen_first_user = true;
continue;
}
let text = msg.content_text();
// Skip synthetic messages (compaction, journal, image injection)
if text.starts_with("Your context was just compacted")
|| text.starts_with("Your context was just rebuilt")
|| text.starts_with("[Earlier in this conversation")
|| text.starts_with("Here is the image")
|| text.contains("[image aged out")
{
continue;
}
if text.starts_with("[dmn]") {
target = StreamTarget::Autonomous;
let first_line = text.lines().next().unwrap_or("[dmn]");
let _ = ui_tx.send(UiMessage::DmnAnnotation(first_line.to_string()));
} else {
target = StreamTarget::Conversation;
let _ = ui_tx.send(UiMessage::UserInput(text.to_string()));
}
}
types::Role::Assistant => {
if let Some(ref calls) = msg.tool_calls {
for call in calls {
let _ = ui_tx.send(UiMessage::ToolCall {
name: call.function.name.clone(),
args_summary: String::new(),
});
}
}
let text = msg.content_text();
if !text.is_empty() {
let _ = ui_tx
.send(UiMessage::TextDelta(format!("{}\n", text), target));
}
}
types::Role::Tool => {
let text = msg.content_text();
let preview: String =
text.lines().take(3).collect::<Vec<_>>().join("\n");
let truncated = if text.lines().count() > 3 {
format!("{}...", preview)
} else {
preview
};
let _ = ui_tx.send(UiMessage::ToolResult {
name: String::new(),
result: truncated,
});
}
}
}
}