split out src/mind

This commit is contained in:
Kent Overstreet 2026-04-04 02:46:32 -04:00
parent ce04568454
commit 79e384f005
21 changed files with 1865 additions and 2175 deletions

935
src/mind/mod.rs Normal file
View file

@ -0,0 +1,935 @@
// mind/ — Cognitive layer
//
// Session state machine, DMN, identity, observation socket.
// Everything about how the mind operates, separate from the
// user interface (TUI, CLI) and the agent execution (tools, API).
pub mod dmn;
pub mod identity;
pub mod observe;
// consciousness.rs — Session state machine and event loop
//
// The core runtime for the consciousness binary. Session manages turns,
// DMN state, compaction, scoring, and slash commands. The event loop
// bridges Session (cognitive state) with App (TUI rendering).
//
// The event loop uses biased select! so priorities are deterministic:
// keyboard events > turn results > render ticks > DMN timer > UI messages.
use anyhow::Result;
use crossterm::event::{Event, EventStream, KeyEventKind};
use futures::StreamExt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex};
use crate::agent::{Agent, TurnResult};
use crate::agent::api::ApiClient;
use crate::agent::api::types as api_types;
use crate::config::{self, AppConfig, SessionConfig};
use crate::dbglog;
use crate::user::{self as tui, HotkeyAction};
use crate::user::ui_channel::{self, ContextInfo, StatusInfo, StreamTarget, UiMessage};
use crate::user::log;
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
fn compaction_threshold(app: &AppConfig) -> u32 {
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
}
/// Commands that are handled in the main loop, not sent to the agent.
enum Command {
Quit,
Handled,
None,
}
// --- Session: all mutable state for a running agent session ---
/// Collects the ~15 loose variables that previously lived in run()
/// into a coherent struct with methods. The event loop dispatches
/// to Session methods; Session manages turns, compaction, DMN state,
/// and slash commands.
pub struct Session {
agent: Arc<Mutex<Agent>>,
config: SessionConfig,
ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
// DMN state
dmn: dmn::State,
dmn_turns: u32,
max_dmn_turns: u32,
// Turn tracking
turn_in_progress: bool,
turn_handle: Option<tokio::task::JoinHandle<()>>,
/// User messages received while a turn is in progress.
/// Consolidated into one message (newline-separated) so the
/// model sees everything the user typed, not just the first line.
pending_input: Option<String>,
// Per-turn tracking for DMN context
last_user_input: Instant,
consecutive_errors: u32,
last_turn_had_tools: bool,
// Subconscious orchestration
agent_cycles: crate::subconscious::subconscious::AgentCycleState,
/// Latest memory importance scores from full matrix scoring (manual /score).
memory_scores: Option<crate::agent::training::MemoryScore>,
/// Whether a full matrix /score task is currently running.
scoring_in_flight: bool,
}
impl Session {
fn new(
agent: Arc<Mutex<Agent>>,
config: SessionConfig,
ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
) -> Self {
let max_dmn_turns = config.app.dmn.max_turns;
Self {
agent,
config,
ui_tx,
turn_tx,
dmn: if dmn::is_off() {
dmn::State::Off
} else {
dmn::State::Resting { since: Instant::now() }
},
dmn_turns: 0,
max_dmn_turns,
turn_in_progress: false,
turn_handle: None,
pending_input: None,
last_user_input: Instant::now(),
consecutive_errors: 0,
last_turn_had_tools: false,
agent_cycles: crate::subconscious::subconscious::AgentCycleState::new(""),
memory_scores: None,
scoring_in_flight: false,
}
}
/// How long before the next DMN tick.
fn dmn_interval(&self) -> Duration {
self.dmn.interval()
}
/// Spawn an agent turn in a background task.
fn spawn_turn(&mut self, input: String, target: StreamTarget) {
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
let result_tx = self.turn_tx.clone();
self.turn_in_progress = true;
self.turn_handle = Some(tokio::spawn(async move {
let mut agent = agent.lock().await;
let result = agent.turn(&input, &ui_tx, target).await;
let _ = result_tx.send((result, target)).await;
}));
}
/// Submit user input — either queue it (if a turn is running) or
/// start a new turn immediately.
fn submit_input(&mut self, input: String) {
if self.turn_in_progress {
match &mut self.pending_input {
Some(existing) => {
existing.push('\n');
existing.push_str(&input);
}
None => self.pending_input = Some(input.clone()),
}
let _ = self.ui_tx.send(UiMessage::Info("(queued)".into()));
} else {
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
self.update_status();
self.spawn_turn(input, StreamTarget::Conversation);
}
}
/// Process a completed turn: update DMN state, check compaction,
/// drain any queued input.
async fn handle_turn_result(
&mut self,
result: Result<TurnResult>,
target: StreamTarget,
) {
self.turn_in_progress = false;
self.turn_handle = None;
match result {
Ok(turn_result) => {
if turn_result.tool_errors > 0 {
self.consecutive_errors += turn_result.tool_errors;
} else {
self.consecutive_errors = 0;
}
self.last_turn_had_tools = turn_result.had_tool_calls;
self.dmn = dmn::transition(
&self.dmn,
turn_result.yield_requested,
turn_result.had_tool_calls,
target == StreamTarget::Conversation,
);
if turn_result.dmn_pause {
self.dmn = dmn::State::Paused;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN paused (agent requested). Ctrl+P or /wake to resume.".into(),
));
}
if let Some(model_name) = turn_result.model_switch {
self.switch_model(&model_name).await;
}
}
Err(e) => {
self.consecutive_errors += 1;
let msg = match target {
StreamTarget::Autonomous => {
UiMessage::DmnAnnotation(format!("[error: {:#}]", e))
}
StreamTarget::Conversation => {
UiMessage::Info(format!("Error: {:#}", e))
}
};
let _ = self.ui_tx.send(msg);
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
}
}
self.update_status();
self.check_compaction().await;
self.maybe_start_memory_scoring().await;
self.drain_pending();
}
/// Spawn incremental memory scoring if not already running.
async fn maybe_start_memory_scoring(&mut self) {
{
let agent = self.agent.lock().await;
if agent.agent_cycles.memory_scoring_in_flight {
return;
}
}
let (context, client, cursor) = {
let mut agent = self.agent.lock().await;
let cursor = agent.agent_cycles.memory_score_cursor;
agent.agent_cycles.memory_scoring_in_flight = true;
(agent.context.clone(), agent.client_clone(), cursor)
};
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
tokio::spawn(async move {
let result = crate::agent::training::score_memories_incremental(
&context, cursor, &client, &ui_tx,
).await;
let mut agent = agent.lock().await;
agent.agent_cycles.memory_scoring_in_flight = false;
match result {
Ok((new_cursor, scores)) => {
agent.agent_cycles.memory_score_cursor = new_cursor;
agent.agent_cycles.memory_scores.extend(scores);
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
}
Err(e) => {
let _ = ui_tx.send(UiMessage::Debug(format!(
"[memory-scoring] failed: {:#}", e,
)));
}
}
});
}
/// Check if compaction is needed after a turn.
async fn check_compaction(&mut self) {
let mut agent_guard = self.agent.lock().await;
let tokens = agent_guard.last_prompt_tokens();
let threshold = compaction_threshold(&self.config.app);
if tokens > threshold {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"[compaction: {}K > {}K threshold]",
tokens / 1000,
threshold / 1000,
)));
agent_guard.compact();
let _ = self.ui_tx.send(UiMessage::Info(
"[compacted — journal + recent messages]".into(),
));
self.send_context_info();
}
}
/// Send any consolidated pending input as a single turn.
fn drain_pending(&mut self) {
if let Some(queued) = self.pending_input.take() {
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
let _ = self.ui_tx.send(UiMessage::UserInput(queued.clone()));
self.update_status();
self.spawn_turn(queued, StreamTarget::Conversation);
}
}
/// Fire a DMN tick: check max turns, generate prompt, spawn turn.
fn dmn_tick(&mut self) {
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
return;
}
self.dmn_turns += 1;
if self.dmn_turns > self.max_dmn_turns {
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
"[dmn: {} consecutive turns, resting (limit: {})]",
self.dmn_turns - 1,
self.max_dmn_turns,
)));
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
self.dmn_turns = 0;
self.update_status();
return;
}
let dmn_ctx = dmn::DmnContext {
user_idle: self.last_user_input.elapsed(),
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
};
let prompt = self.dmn.prompt(&dmn_ctx);
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
"[dmn: {} ({}/{})]",
self.dmn.label(),
self.dmn_turns,
self.max_dmn_turns,
)));
self.update_status();
self.spawn_turn(prompt, StreamTarget::Autonomous);
}
/// Handle slash commands. Returns how the main loop should respond.
async fn handle_command(&mut self, input: &str) -> Command {
const COMMANDS: &[(&str, &str)] = &[
("/quit", "Exit consciousness"),
("/new", "Start fresh session (saves current)"),
("/save", "Save session to disk"),
("/retry", "Re-run last turn"),
("/model", "Show/switch model (/model <name>)"),
("/score", "Score memory importance"),
("/dmn", "Show DMN state"),
("/sleep", "Put DMN to sleep"),
("/wake", "Wake DMN to foraging"),
("/pause", "Full stop — no autonomous ticks (Ctrl+P)"),
("/test", "Run tool smoke tests"),
("/help", "Show this help"),
];
match input {
"/quit" | "/exit" => Command::Quit,
"/save" => {
let _ = self.ui_tx.send(UiMessage::Info(
"Conversation is saved automatically (append-only log).".into()
));
Command::Handled
}
"/new" | "/clear" => {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
return Command::Handled;
}
{
let new_log = log::ConversationLog::new(
self.config.session_dir.join("conversation.jsonl"),
).ok();
let mut agent_guard = self.agent.lock().await;
let shared_ctx = agent_guard.shared_context.clone();
let shared_tools = agent_guard.active_tools.clone();
*agent_guard = Agent::new(
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
self.config.system_prompt.clone(),
self.config.context_parts.clone(),
self.config.app.clone(),
self.config.prompt_file.clone(),
new_log,
shared_ctx,
shared_tools,
);
}
self.dmn = dmn::State::Resting { since: Instant::now() };
let _ = self.ui_tx.send(UiMessage::Info("New session started.".into()));
Command::Handled
}
"/model" => {
if let Ok(agent) = self.agent.try_lock() {
let _ = self.ui_tx.send(UiMessage::Info(format!("Current model: {}", agent.model())));
let names = self.config.app.model_names();
if !names.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info(format!("Available: {}", names.join(", "))));
}
} else {
let _ = self.ui_tx.send(UiMessage::Info("(busy)".into()));
}
Command::Handled
}
"/score" => {
if self.scoring_in_flight {
let _ = self.ui_tx.send(UiMessage::Info("(scoring already in progress)".into()));
return Command::Handled;
}
let (context, client) = {
let agent = self.agent.lock().await;
(agent.context.clone(), agent.client_clone())
};
self.scoring_in_flight = true;
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
tokio::spawn(async move {
let result = crate::agent::training::score_memories(
&context, &client, &ui_tx,
).await;
let agent = agent.lock().await;
match result {
Ok(scores) => {
agent.publish_context_state_with_scores(Some(&scores));
}
Err(e) => {
let _ = ui_tx.send(UiMessage::Info(format!("[scoring failed: {:#}]", e)));
}
}
});
Command::Handled
}
"/dmn" => {
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN state: {:?}", self.dmn)));
let _ = self.ui_tx.send(UiMessage::Info(format!("Next tick in: {:?}", self.dmn.interval())));
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Consecutive DMN turns: {}/{}", self.dmn_turns, self.max_dmn_turns,
)));
Command::Handled
}
"/sleep" => {
self.dmn = dmn::State::Resting { since: Instant::now() };
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN sleeping (heartbeat every 5 min). Type anything to wake.".into(),
));
Command::Handled
}
"/wake" => {
let was_paused = matches!(self.dmn, dmn::State::Paused | dmn::State::Off);
if matches!(self.dmn, dmn::State::Off) {
dmn::set_off(false);
}
self.dmn = dmn::State::Foraging;
self.dmn_turns = 0;
let msg = if was_paused { "DMN unpaused — entering foraging mode." }
else { "DMN waking — entering foraging mode." };
let _ = self.ui_tx.send(UiMessage::Info(msg.into()));
self.update_status();
Command::Handled
}
"/pause" => {
self.dmn = dmn::State::Paused;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN paused — no autonomous ticks. Ctrl+P or /wake to resume.".into(),
));
self.update_status();
Command::Handled
}
"/retry" => {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
return Command::Handled;
}
let mut agent_guard = self.agent.lock().await;
let entries = agent_guard.entries_mut();
let mut last_user_text = None;
while let Some(entry) = entries.last() {
if entry.message().role == api_types::Role::User {
last_user_text = Some(entries.pop().unwrap().message().content_text().to_string());
break;
}
entries.pop();
}
drop(agent_guard);
match last_user_text {
Some(text) => {
let preview_len = text.len().min(60);
let _ = self.ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len])));
self.dmn_turns = 0;
self.dmn = dmn::State::Engaged;
self.spawn_turn(text, StreamTarget::Conversation);
}
None => {
let _ = self.ui_tx.send(UiMessage::Info("(nothing to retry)".into()));
}
}
Command::Handled
}
"/help" => {
for (name, desc) in COMMANDS {
let _ = self.ui_tx.send(UiMessage::Info(format!(" {:12} {}", name, desc)));
}
let _ = self.ui_tx.send(UiMessage::Info(String::new()));
let _ = self.ui_tx.send(UiMessage::Info(
"Keys: Tab=pane ^Up/Down=scroll PgUp/PgDn=scroll Mouse=click/scroll".into(),
));
let _ = self.ui_tx.send(UiMessage::Info(
" Alt+Enter=newline Esc=interrupt ^P=pause ^R=reasoning ^K=kill F10=context F2=agents".into(),
));
let _ = self.ui_tx.send(UiMessage::Info(
" Shift+click for native text selection (copy/paste)".into(),
));
Command::Handled
}
cmd if cmd.starts_with("/model ") => {
let name = cmd[7..].trim();
if name.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info("Usage: /model <name>".into()));
return Command::Handled;
}
self.switch_model(name).await;
Command::Handled
}
_ => Command::None,
}
}
/// Interrupt: kill processes, abort current turn, clear pending queue.
async fn interrupt(&mut self) {
let count = {
let agent = self.agent.lock().await;
let mut tools = agent.active_tools.lock().unwrap();
let count = tools.len();
for entry in tools.drain(..) {
entry.handle.abort();
}
count
};
if count == 0 {
if let Some(handle) = self.turn_handle.take() {
handle.abort();
self.turn_in_progress = false;
self.dmn = dmn::State::Resting { since: Instant::now() };
self.update_status();
let _ = self.ui_tx.send(UiMessage::Activity(String::new()));
}
}
self.pending_input = None;
let killed = count;
if killed > 0 || self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"(interrupted — killed {} process(es), turn aborted)", killed,
)));
} else {
let _ = self.ui_tx.send(UiMessage::Info("(interrupted)".into()));
}
}
/// Cycle reasoning effort: none → low → high → none.
fn cycle_reasoning(&mut self, app: &mut tui::App) {
if let Ok(mut agent_guard) = self.agent.try_lock() {
let next = match agent_guard.reasoning_effort.as_str() {
"none" => "low",
"low" => "high",
_ => "none",
};
agent_guard.reasoning_effort = next.to_string();
app.reasoning_effort = next.to_string();
let label = match next {
"none" => "off (monologue hidden)",
"low" => "low (brief monologue)",
"high" => "high (full monologue)",
_ => next,
};
let _ = self.ui_tx.send(UiMessage::Info(format!("Reasoning: {} — ^R to cycle", label)));
} else {
let _ = self.ui_tx.send(UiMessage::Info(
"(agent busy — reasoning change takes effect next turn)".into(),
));
}
}
/// Show and kill running tool calls (Ctrl+K).
async fn kill_processes(&mut self) {
let active_tools = self.agent.lock().await.active_tools.clone();
let mut tools = active_tools.lock().unwrap();
if tools.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info("(no running tool calls)".into()));
} else {
for entry in tools.drain(..) {
let elapsed = entry.started.elapsed();
let _ = self.ui_tx.send(UiMessage::Info(format!(
" killing {} ({:.0}s): {}", entry.name, elapsed.as_secs_f64(), entry.detail,
)));
entry.handle.abort();
}
}
}
/// Cycle DMN autonomy: foraging → resting → paused → off → foraging.
fn cycle_autonomy(&mut self) {
let (new_state, label) = match &self.dmn {
dmn::State::Engaged | dmn::State::Working | dmn::State::Foraging => {
(dmn::State::Resting { since: Instant::now() }, "resting")
}
dmn::State::Resting { .. } => (dmn::State::Paused, "PAUSED"),
dmn::State::Paused => {
dmn::set_off(true);
(dmn::State::Off, "OFF (persists across restarts)")
}
dmn::State::Off => {
dmn::set_off(false);
(dmn::State::Foraging, "foraging")
}
};
self.dmn = new_state;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN → {} (Ctrl+P to cycle)", label)));
self.update_status();
}
/// Switch to a named model from the config registry.
async fn switch_model(&mut self, name: &str) {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
return;
}
let resolved = match self.config.app.resolve_model(name) {
Ok(r) => r,
Err(e) => {
let _ = self.ui_tx.send(UiMessage::Info(format!("{}", e)));
return;
}
};
let new_client = ApiClient::new(&resolved.api_base, &resolved.api_key, &resolved.model_id);
let prompt_changed = resolved.prompt_file != self.config.prompt_file;
let mut agent_guard = self.agent.lock().await;
agent_guard.swap_client(new_client);
self.config.model = resolved.model_id.clone();
self.config.api_base = resolved.api_base;
self.config.api_key = resolved.api_key;
if prompt_changed {
self.config.prompt_file = resolved.prompt_file.clone();
agent_guard.prompt_file = resolved.prompt_file.clone();
agent_guard.compact();
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Switched to {} ({}) — prompt: {}, recompacted",
name, resolved.model_id, resolved.prompt_file,
)));
} else {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Switched to {} ({})", name, resolved.model_id,
)));
}
drop(agent_guard);
self.update_status();
self.send_context_info();
}
fn load_context_groups(&self) -> Vec<config::ContextGroup> {
config::get().context_groups.clone()
}
fn send_context_info(&self) {
let context_groups = self.load_context_groups();
let (instruction_files, memory_files) = identity::context_file_info(
&self.config.prompt_file,
self.config.app.memory_project.as_deref(),
&context_groups,
);
let _ = self.ui_tx.send(UiMessage::ContextInfoUpdate(ContextInfo {
model: self.config.model.clone(),
available_models: self.config.app.model_names(),
prompt_file: self.config.prompt_file.clone(),
backend: self.config.app.backend.clone(),
instruction_files,
memory_files,
system_prompt_chars: self.config.system_prompt.len(),
context_message_chars: self.config.context_parts.iter().map(|(_, c)| c.len()).sum(),
}));
}
fn update_status(&self) {
let _ = self.ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: self.dmn.label().to_string(),
dmn_turns: self.dmn_turns,
dmn_max_turns: self.max_dmn_turns,
prompt_tokens: 0,
completion_tokens: 0,
model: String::new(),
turn_tools: 0,
context_budget: String::new(),
}));
}
async fn shutdown(&mut self) {
if let Some(handle) = self.turn_handle.take() {
handle.abort();
}
}
}
// --- Event loop ---
pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
let (config, _figment) = config::load_session(&cli)?;
if config.app.debug {
unsafe { std::env::set_var("POC_DEBUG", "1") };
}
// Start channel daemons
let mut channel_supervisor = crate::thalamus::supervisor::Supervisor::new();
channel_supervisor.load_config();
channel_supervisor.ensure_running();
// Initialize idle state machine
let mut idle_state = crate::thalamus::idle::State::new();
idle_state.load();
// Channel status fetcher
let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::<Vec<(String, bool, u32)>>(4);
{
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
let notify_rx = crate::thalamus::channels::subscribe_all();
let mut pending_notifications: Vec<crate::thalamus::channels::ChannelNotification> = Vec::new();
// Create UI channel
let (ui_tx, mut ui_rx) = ui_channel::channel();
// Shared state
let shared_context = ui_channel::shared_context_state();
let shared_active_tools = ui_channel::shared_active_tools();
// Initialize TUI
let mut terminal = tui::init_terminal()?;
let mut app = tui::App::new(config.model.clone(), shared_context.clone(), shared_active_tools.clone());
// Startup info
let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into()));
let _ = ui_tx.send(UiMessage::Info(format!(
" model: {} (available: {})", config.model, config.app.model_names().join(", "),
)));
let client = ApiClient::new(&config.api_base, &config.api_key, &config.model);
let _ = ui_tx.send(UiMessage::Info(format!(" api: {} ({})", config.api_base, client.backend_label())));
let _ = ui_tx.send(UiMessage::Info(format!(
" context: {}K chars ({} config, {} memory files)",
config.context_parts.iter().map(|(_, c)| c.len()).sum::<usize>() / 1024,
config.config_file_count, config.memory_file_count,
)));
let conversation_log_path = config.session_dir.join("conversation.jsonl");
let conversation_log = log::ConversationLog::new(conversation_log_path.clone())
.expect("failed to create conversation log");
let _ = ui_tx.send(UiMessage::Info(format!(" log: {}", conversation_log.path().display())));
let agent = Arc::new(Mutex::new(Agent::new(
client,
config.system_prompt.clone(),
config.context_parts.clone(),
config.app.clone(),
config.prompt_file.clone(),
Some(conversation_log),
shared_context,
shared_active_tools,
)));
// Restore conversation from log
{
let mut agent_guard = agent.lock().await;
if agent_guard.restore_from_log() {
ui_channel::replay_session_to_ui(agent_guard.entries(), &ui_tx);
let _ = ui_tx.send(UiMessage::Info("--- restored from conversation log ---".into()));
}
}
// Send initial budget to status bar
{
let agent_guard = agent.lock().await;
let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: "resting".to_string(),
dmn_turns: 0, dmn_max_turns: 0,
prompt_tokens: 0, completion_tokens: 0,
model: agent_guard.model().to_string(),
turn_tools: 0,
context_budget: agent_guard.budget().status_string(),
}));
}
let (turn_tx, mut turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
let mut session = Session::new(agent, config, ui_tx.clone(), turn_tx);
session.update_status();
session.send_context_info();
// Start observation socket
let socket_path = session.config.session_dir.join("agent.sock");
let (observe_input_tx, mut observe_input_rx) = observe::input_channel();
observe::start(socket_path, ui_tx.subscribe(), observe_input_tx);
let mut reader = EventStream::new();
let mut render_interval = tokio::time::interval(Duration::from_millis(50));
render_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut dirty = true;
terminal.hide_cursor()?;
// Initial render
app.drain_messages(&mut ui_rx);
terminal.draw(|f| app.draw(f))?;
loop {
let timeout = session.dmn_interval();
tokio::select! {
biased;
maybe_event = reader.next() => {
match maybe_event {
Some(Ok(Event::Key(key))) => {
if key.kind != KeyEventKind::Press { continue; }
app.handle_key(key);
idle_state.user_activity();
if app.screen == tui::Screen::Thalamus {
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
dirty = true;
}
Some(Ok(Event::Mouse(mouse))) => {
app.handle_mouse(mouse);
dirty = true;
}
Some(Ok(Event::Resize(w, h))) => {
app.handle_resize(w, h);
terminal.clear()?;
dirty = true;
}
Some(Err(_)) => break,
None => break,
_ => continue,
}
}
Some(line) = observe_input_rx.recv() => {
app.submitted.push(line);
dirty = true;
}
Some((result, target)) = turn_rx.recv() => {
session.handle_turn_result(result, target).await;
idle_state.response_activity();
dirty = true;
}
_ = render_interval.tick() => {
let new_count = session.agent.lock().await.active_tools.lock().unwrap().len() as u32;
if new_count != app.running_processes {
app.running_processes = new_count;
dirty = true;
}
idle_state.decay_ewma();
app.update_idle(&idle_state);
while let Ok(notif) = notify_rx.try_recv() {
pending_notifications.push(notif);
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
}
_ = tokio::time::sleep(timeout), if !session.turn_in_progress => {
session.dmn_tick();
dirty = true;
}
Some(channels) = channel_rx.recv() => {
app.set_channel_status(channels);
dirty = true;
}
Some(msg) = ui_rx.recv() => {
app.handle_ui_message(msg);
dirty = true;
}
}
// Process submitted input
let submitted: Vec<String> = app.submitted.drain(..).collect();
for input in submitted {
let input = input.trim().to_string();
if input.is_empty() { continue; }
match session.handle_command(&input).await {
Command::Quit => app.should_quit = true,
Command::Handled => {}
Command::None => session.submit_input(input),
}
}
// Process hotkey actions
let actions: Vec<HotkeyAction> = app.hotkey_actions.drain(..).collect();
for action in actions {
match action {
HotkeyAction::CycleReasoning => session.cycle_reasoning(&mut app),
HotkeyAction::KillProcess => session.kill_processes().await,
HotkeyAction::Interrupt => session.interrupt().await,
HotkeyAction::CycleAutonomy => session.cycle_autonomy(),
}
}
if app.drain_messages(&mut ui_rx) {
dirty = true;
}
if dirty {
terminal.draw(|f| app.draw(f))?;
dirty = false;
}
if app.should_quit {
break;
}
}
session.shutdown().await;
tui::restore_terminal(&mut terminal)?;
Ok(())
}