consciousness/src/mind/mod.rs
Kent Overstreet 060ab10340 add --no-agents flag to disable background agents
Disables memory scoring, surface, and observe agents when set.
Useful for testing with external backends (e.g. OpenRouter) where
background agent traffic would be slow and unnecessary.

Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
2026-04-05 01:18:47 -04:00

967 lines
37 KiB
Rust

// mind/ — Cognitive layer
//
// Session state machine, DMN, identity, observation socket.
// Everything about how the mind operates, separate from the
// user interface (TUI, CLI) and the agent execution (tools, API).
pub mod dmn;
pub mod identity;
pub mod observe;
// consciousness.rs — Session state machine and event loop
//
// The core runtime for the consciousness binary. Session manages turns,
// DMN state, compaction, scoring, and slash commands. The event loop
// bridges Session (cognitive state) with App (TUI rendering).
//
// The event loop uses biased select! so priorities are deterministic:
// keyboard events > turn results > render ticks > DMN timer > UI messages.
use anyhow::Result;
use crossterm::event::{Event, EventStream, KeyEventKind};
use futures::StreamExt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex};
use crate::agent::{Agent, TurnResult};
use crate::agent::api::ApiClient;
use crate::agent::api::types as api_types;
use crate::config::{self, AppConfig, SessionConfig};
use crate::user::{self as tui, HotkeyAction};
use crate::user::ui_channel::{self, ContextInfo, StatusInfo, StreamTarget, UiMessage};
use crate::user::log;
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
fn compaction_threshold(app: &AppConfig) -> u32 {
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
}
/// Commands that are handled in the main loop, not sent to the agent.
enum Command {
Quit,
Handled,
None,
}
// --- Session: all mutable state for a running agent session ---
/// Collects the ~15 loose variables that previously lived in run()
/// into a coherent struct with methods. The event loop dispatches
/// to Session methods; Session manages turns, compaction, DMN state,
/// and slash commands.
pub struct Session {
agent: Arc<Mutex<Agent>>,
config: SessionConfig,
ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
// DMN state
dmn: dmn::State,
dmn_turns: u32,
max_dmn_turns: u32,
// Turn tracking
turn_in_progress: bool,
turn_handle: Option<tokio::task::JoinHandle<()>>,
/// User messages received while a turn is in progress.
/// Consolidated into one message (newline-separated) so the
/// model sees everything the user typed, not just the first line.
pending_input: Option<String>,
// Per-turn tracking for DMN context
last_user_input: Instant,
consecutive_errors: u32,
last_turn_had_tools: bool,
// Subconscious orchestration
agent_cycles: crate::subconscious::subconscious::AgentCycleState,
/// Latest memory importance scores from full matrix scoring (manual /score).
memory_scores: Option<crate::agent::training::MemoryScore>,
/// Whether a full matrix /score task is currently running.
scoring_in_flight: bool,
}
impl Session {
fn new(
agent: Arc<Mutex<Agent>>,
config: SessionConfig,
ui_tx: ui_channel::UiSender,
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
) -> Self {
let max_dmn_turns = config.app.dmn.max_turns;
Self {
agent,
config,
ui_tx,
turn_tx,
dmn: if dmn::is_off() {
dmn::State::Off
} else {
dmn::State::Resting { since: Instant::now() }
},
dmn_turns: 0,
max_dmn_turns,
turn_in_progress: false,
turn_handle: None,
pending_input: None,
last_user_input: Instant::now(),
consecutive_errors: 0,
last_turn_had_tools: false,
agent_cycles: crate::subconscious::subconscious::AgentCycleState::new(""),
memory_scores: None,
scoring_in_flight: false,
}
}
/// How long before the next DMN tick.
fn dmn_interval(&self) -> Duration {
self.dmn.interval()
}
/// Spawn an agent turn in a background task.
fn spawn_turn(&mut self, input: String, target: StreamTarget) {
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
let result_tx = self.turn_tx.clone();
self.turn_in_progress = true;
self.turn_handle = Some(tokio::spawn(async move {
let result = Agent::turn(agent, &input, &ui_tx, target).await;
let _ = result_tx.send((result, target)).await;
}));
}
/// Submit user input — either queue it (if a turn is running) or
/// start a new turn immediately.
fn submit_input(&mut self, input: String) {
if self.turn_in_progress {
match &mut self.pending_input {
Some(existing) => {
existing.push('\n');
existing.push_str(&input);
}
None => self.pending_input = Some(input.clone()),
}
let _ = self.ui_tx.send(UiMessage::Info("(queued)".into()));
} else {
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
self.update_status();
self.spawn_turn(input, StreamTarget::Conversation);
}
}
/// Process a completed turn: update DMN state, check compaction,
/// drain any queued input.
async fn handle_turn_result(
&mut self,
result: Result<TurnResult>,
target: StreamTarget,
) {
self.turn_in_progress = false;
self.turn_handle = None;
match result {
Ok(turn_result) => {
if turn_result.tool_errors > 0 {
self.consecutive_errors += turn_result.tool_errors;
} else {
self.consecutive_errors = 0;
}
self.last_turn_had_tools = turn_result.had_tool_calls;
self.dmn = dmn::transition(
&self.dmn,
turn_result.yield_requested,
turn_result.had_tool_calls,
target == StreamTarget::Conversation,
);
if turn_result.dmn_pause {
self.dmn = dmn::State::Paused;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN paused (agent requested). Ctrl+P or /wake to resume.".into(),
));
}
if let Some(model_name) = turn_result.model_switch {
self.switch_model(&model_name).await;
}
}
Err(e) => {
self.consecutive_errors += 1;
let msg = match target {
StreamTarget::Autonomous => {
UiMessage::DmnAnnotation(format!("[error: {:#}]", e))
}
StreamTarget::Conversation => {
UiMessage::Info(format!("Error: {:#}", e))
}
};
let _ = self.ui_tx.send(msg);
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
}
}
self.update_status();
self.check_compaction();
if !self.config.no_agents {
self.start_memory_scoring();
}
self.drain_pending();
}
/// Spawn incremental memory scoring if not already running.
/// Non-blocking — all async work happens in the spawned task.
fn start_memory_scoring(&self) {
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
tokio::spawn(async move {
// Check + snapshot under one brief lock
let (context, client, cursor) = {
let mut agent = agent.lock().await;
if agent.agent_cycles.memory_scoring_in_flight {
return;
}
let cursor = agent.agent_cycles.memory_score_cursor;
agent.agent_cycles.memory_scoring_in_flight = true;
// Count total unique memories
let mut seen = std::collections::HashSet::new();
for entry in &agent.context.entries {
if let crate::agent::context::ConversationEntry::Memory { key, .. } = entry {
seen.insert(key.clone());
}
}
agent.agent_cycles.memory_total = seen.len();
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
(agent.context.clone(), agent.client_clone(), cursor)
};
// Lock released — event loop is free
let result = crate::agent::training::score_memories_incremental(
&context, cursor, &client, &ui_tx,
).await;
// Brief lock — just update fields, no heavy work
{
let mut agent = agent.lock().await;
agent.agent_cycles.memory_scoring_in_flight = false;
if let Ok((new_cursor, ref scores)) = result {
agent.agent_cycles.memory_score_cursor = new_cursor;
agent.agent_cycles.memory_scores.extend(scores.clone());
}
}
// Snapshot and log outside the lock
match result {
Ok(_) => {
let agent = agent.lock().await;
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
}
Err(e) => {
let _ = ui_tx.send(UiMessage::Debug(format!(
"[memory-scoring] failed: {:#}", e,
)));
}
}
});
}
/// Check if compaction is needed after a turn.
fn check_compaction(&self) {
let threshold = compaction_threshold(&self.config.app);
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
tokio::spawn(async move {
let mut agent_guard = agent.lock().await;
let tokens = agent_guard.last_prompt_tokens();
if tokens > threshold {
let _ = ui_tx.send(UiMessage::Info(format!(
"[compaction: {}K > {}K threshold]",
tokens / 1000,
threshold / 1000,
)));
agent_guard.compact();
let _ = ui_tx.send(UiMessage::Info(
"[compacted — journal + recent messages]".into(),
));
}
});
}
/// Send any consolidated pending input as a single turn.
fn drain_pending(&mut self) {
if let Some(queued) = self.pending_input.take() {
self.dmn_turns = 0;
self.consecutive_errors = 0;
self.last_user_input = Instant::now();
self.dmn = dmn::State::Engaged;
let _ = self.ui_tx.send(UiMessage::UserInput(queued.clone()));
self.update_status();
self.spawn_turn(queued, StreamTarget::Conversation);
}
}
/// Fire a DMN tick: check max turns, generate prompt, spawn turn.
fn dmn_tick(&mut self) {
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
return;
}
self.dmn_turns += 1;
if self.dmn_turns > self.max_dmn_turns {
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
"[dmn: {} consecutive turns, resting (limit: {})]",
self.dmn_turns - 1,
self.max_dmn_turns,
)));
self.dmn = dmn::State::Resting {
since: Instant::now(),
};
self.dmn_turns = 0;
self.update_status();
return;
}
let dmn_ctx = dmn::DmnContext {
user_idle: self.last_user_input.elapsed(),
consecutive_errors: self.consecutive_errors,
last_turn_had_tools: self.last_turn_had_tools,
};
let prompt = self.dmn.prompt(&dmn_ctx);
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
"[dmn: {} ({}/{})]",
self.dmn.label(),
self.dmn_turns,
self.max_dmn_turns,
)));
self.update_status();
self.spawn_turn(prompt, StreamTarget::Autonomous);
}
/// Handle slash commands. Returns how the main loop should respond.
async fn handle_command(&mut self, input: &str) -> Command {
const COMMANDS: &[(&str, &str)] = &[
("/quit", "Exit consciousness"),
("/new", "Start fresh session (saves current)"),
("/save", "Save session to disk"),
("/retry", "Re-run last turn"),
("/model", "Show/switch model (/model <name>)"),
("/score", "Score memory importance"),
("/dmn", "Show DMN state"),
("/sleep", "Put DMN to sleep"),
("/wake", "Wake DMN to foraging"),
("/pause", "Full stop — no autonomous ticks (Ctrl+P)"),
("/test", "Run tool smoke tests"),
("/help", "Show this help"),
];
match input {
"/quit" | "/exit" => Command::Quit,
"/save" => {
let _ = self.ui_tx.send(UiMessage::Info(
"Conversation is saved automatically (append-only log).".into()
));
Command::Handled
}
"/new" | "/clear" => {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
return Command::Handled;
}
{
let new_log = log::ConversationLog::new(
self.config.session_dir.join("conversation.jsonl"),
).ok();
let mut agent_guard = self.agent.lock().await;
let shared_ctx = agent_guard.shared_context.clone();
let shared_tools = agent_guard.active_tools.clone();
*agent_guard = Agent::new(
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
self.config.system_prompt.clone(),
self.config.context_parts.clone(),
self.config.app.clone(),
self.config.prompt_file.clone(),
new_log,
shared_ctx,
shared_tools,
);
}
self.dmn = dmn::State::Resting { since: Instant::now() };
let _ = self.ui_tx.send(UiMessage::Info("New session started.".into()));
Command::Handled
}
"/model" => {
if let Ok(agent) = self.agent.try_lock() {
let _ = self.ui_tx.send(UiMessage::Info(format!("Current model: {}", agent.model())));
let names = self.config.app.model_names();
if !names.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info(format!("Available: {}", names.join(", "))));
}
} else {
let _ = self.ui_tx.send(UiMessage::Info("(busy)".into()));
}
Command::Handled
}
"/score" => {
if self.scoring_in_flight {
let _ = self.ui_tx.send(UiMessage::Info("(scoring already in progress)".into()));
return Command::Handled;
}
let (context, client) = {
let agent = self.agent.lock().await;
(agent.context.clone(), agent.client_clone())
};
self.scoring_in_flight = true;
let agent = self.agent.clone();
let ui_tx = self.ui_tx.clone();
tokio::spawn(async move {
let result = crate::agent::training::score_memories(
&context, &client, &ui_tx,
).await;
let agent = agent.lock().await;
match result {
Ok(scores) => {
agent.publish_context_state_with_scores(Some(&scores));
}
Err(e) => {
let _ = ui_tx.send(UiMessage::Info(format!("[scoring failed: {:#}]", e)));
}
}
});
Command::Handled
}
"/dmn" => {
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN state: {:?}", self.dmn)));
let _ = self.ui_tx.send(UiMessage::Info(format!("Next tick in: {:?}", self.dmn.interval())));
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Consecutive DMN turns: {}/{}", self.dmn_turns, self.max_dmn_turns,
)));
Command::Handled
}
"/sleep" => {
self.dmn = dmn::State::Resting { since: Instant::now() };
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN sleeping (heartbeat every 5 min). Type anything to wake.".into(),
));
Command::Handled
}
"/wake" => {
let was_paused = matches!(self.dmn, dmn::State::Paused | dmn::State::Off);
if matches!(self.dmn, dmn::State::Off) {
dmn::set_off(false);
}
self.dmn = dmn::State::Foraging;
self.dmn_turns = 0;
let msg = if was_paused { "DMN unpaused — entering foraging mode." }
else { "DMN waking — entering foraging mode." };
let _ = self.ui_tx.send(UiMessage::Info(msg.into()));
self.update_status();
Command::Handled
}
"/pause" => {
self.dmn = dmn::State::Paused;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(
"DMN paused — no autonomous ticks. Ctrl+P or /wake to resume.".into(),
));
self.update_status();
Command::Handled
}
"/retry" => {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
return Command::Handled;
}
let mut agent_guard = self.agent.lock().await;
let entries = agent_guard.entries_mut();
let mut last_user_text = None;
while let Some(entry) = entries.last() {
if entry.message().role == api_types::Role::User {
last_user_text = Some(entries.pop().unwrap().message().content_text().to_string());
break;
}
entries.pop();
}
drop(agent_guard);
match last_user_text {
Some(text) => {
let preview_len = text.len().min(60);
let _ = self.ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len])));
self.dmn_turns = 0;
self.dmn = dmn::State::Engaged;
self.spawn_turn(text, StreamTarget::Conversation);
}
None => {
let _ = self.ui_tx.send(UiMessage::Info("(nothing to retry)".into()));
}
}
Command::Handled
}
"/help" => {
for (name, desc) in COMMANDS {
let _ = self.ui_tx.send(UiMessage::Info(format!(" {:12} {}", name, desc)));
}
let _ = self.ui_tx.send(UiMessage::Info(String::new()));
let _ = self.ui_tx.send(UiMessage::Info(
"Keys: Tab=pane ^Up/Down=scroll PgUp/PgDn=scroll Mouse=click/scroll".into(),
));
let _ = self.ui_tx.send(UiMessage::Info(
" Alt+Enter=newline Esc=interrupt ^P=pause ^R=reasoning ^K=kill F10=context F2=agents".into(),
));
let _ = self.ui_tx.send(UiMessage::Info(
" Shift+click for native text selection (copy/paste)".into(),
));
Command::Handled
}
cmd if cmd.starts_with("/model ") => {
let name = cmd[7..].trim();
if name.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info("Usage: /model <name>".into()));
return Command::Handled;
}
self.switch_model(name).await;
Command::Handled
}
_ => Command::None,
}
}
/// Interrupt: kill processes, abort current turn, clear pending queue.
async fn interrupt(&mut self) {
let count = {
let agent = self.agent.lock().await;
let mut tools = agent.active_tools.lock().unwrap();
let count = tools.len();
for entry in tools.drain(..) {
entry.handle.abort();
}
count
};
if count == 0 {
if let Some(handle) = self.turn_handle.take() {
handle.abort();
self.turn_in_progress = false;
self.dmn = dmn::State::Resting { since: Instant::now() };
self.update_status();
let _ = self.ui_tx.send(UiMessage::Activity(String::new()));
}
}
self.pending_input = None;
let killed = count;
if killed > 0 || self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"(interrupted — killed {} process(es), turn aborted)", killed,
)));
} else {
let _ = self.ui_tx.send(UiMessage::Info("(interrupted)".into()));
}
}
/// Cycle reasoning effort: none → low → high → none.
fn cycle_reasoning(&mut self, app: &mut tui::App) {
if let Ok(mut agent_guard) = self.agent.try_lock() {
let next = match agent_guard.reasoning_effort.as_str() {
"none" => "low",
"low" => "high",
_ => "none",
};
agent_guard.reasoning_effort = next.to_string();
app.reasoning_effort = next.to_string();
let label = match next {
"none" => "off (monologue hidden)",
"low" => "low (brief monologue)",
"high" => "high (full monologue)",
_ => next,
};
let _ = self.ui_tx.send(UiMessage::Info(format!("Reasoning: {} — ^R to cycle", label)));
} else {
let _ = self.ui_tx.send(UiMessage::Info(
"(agent busy — reasoning change takes effect next turn)".into(),
));
}
}
/// Show and kill running tool calls (Ctrl+K).
async fn kill_processes(&mut self) {
let active_tools = self.agent.lock().await.active_tools.clone();
let mut tools = active_tools.lock().unwrap();
if tools.is_empty() {
let _ = self.ui_tx.send(UiMessage::Info("(no running tool calls)".into()));
} else {
for entry in tools.drain(..) {
let elapsed = entry.started.elapsed();
let _ = self.ui_tx.send(UiMessage::Info(format!(
" killing {} ({:.0}s): {}", entry.name, elapsed.as_secs_f64(), entry.detail,
)));
entry.handle.abort();
}
}
}
/// Cycle DMN autonomy: foraging → resting → paused → off → foraging.
fn cycle_autonomy(&mut self) {
let (new_state, label) = match &self.dmn {
dmn::State::Engaged | dmn::State::Working | dmn::State::Foraging => {
(dmn::State::Resting { since: Instant::now() }, "resting")
}
dmn::State::Resting { .. } => (dmn::State::Paused, "PAUSED"),
dmn::State::Paused => {
dmn::set_off(true);
(dmn::State::Off, "OFF (persists across restarts)")
}
dmn::State::Off => {
dmn::set_off(false);
(dmn::State::Foraging, "foraging")
}
};
self.dmn = new_state;
self.dmn_turns = 0;
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN → {} (Ctrl+P to cycle)", label)));
self.update_status();
}
/// Switch to a named model from the config registry.
async fn switch_model(&mut self, name: &str) {
if self.turn_in_progress {
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
return;
}
let resolved = match self.config.app.resolve_model(name) {
Ok(r) => r,
Err(e) => {
let _ = self.ui_tx.send(UiMessage::Info(format!("{}", e)));
return;
}
};
let new_client = ApiClient::new(&resolved.api_base, &resolved.api_key, &resolved.model_id);
let prompt_changed = resolved.prompt_file != self.config.prompt_file;
let mut agent_guard = self.agent.lock().await;
agent_guard.swap_client(new_client);
self.config.model = resolved.model_id.clone();
self.config.api_base = resolved.api_base;
self.config.api_key = resolved.api_key;
if prompt_changed {
self.config.prompt_file = resolved.prompt_file.clone();
agent_guard.prompt_file = resolved.prompt_file.clone();
agent_guard.compact();
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Switched to {} ({}) — prompt: {}, recompacted",
name, resolved.model_id, resolved.prompt_file,
)));
} else {
let _ = self.ui_tx.send(UiMessage::Info(format!(
"Switched to {} ({})", name, resolved.model_id,
)));
}
drop(agent_guard);
self.update_status();
self.send_context_info();
}
fn load_context_groups(&self) -> Vec<config::ContextGroup> {
config::get().context_groups.clone()
}
fn send_context_info(&self) {
let context_groups = self.load_context_groups();
let (instruction_files, memory_files) = identity::context_file_info(
&self.config.prompt_file,
self.config.app.memory_project.as_deref(),
&context_groups,
);
let _ = self.ui_tx.send(UiMessage::ContextInfoUpdate(ContextInfo {
model: self.config.model.clone(),
available_models: self.config.app.model_names(),
prompt_file: self.config.prompt_file.clone(),
backend: self.config.app.backend.clone(),
instruction_files,
memory_files,
system_prompt_chars: self.config.system_prompt.len(),
context_message_chars: self.config.context_parts.iter().map(|(_, c)| c.len()).sum(),
}));
}
fn update_status(&self) {
let _ = self.ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: self.dmn.label().to_string(),
dmn_turns: self.dmn_turns,
dmn_max_turns: self.max_dmn_turns,
prompt_tokens: 0,
completion_tokens: 0,
model: String::new(),
turn_tools: 0,
context_budget: String::new(),
}));
}
async fn shutdown(&mut self) {
if let Some(handle) = self.turn_handle.take() {
handle.abort();
}
}
}
// --- Event loop ---
pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
let (config, _figment) = config::load_session(&cli)?;
if config.app.debug {
unsafe { std::env::set_var("POC_DEBUG", "1") };
}
// Start channel daemons
let mut channel_supervisor = crate::thalamus::supervisor::Supervisor::new();
channel_supervisor.load_config();
channel_supervisor.ensure_running();
// Initialize idle state machine
let mut idle_state = crate::thalamus::idle::State::new();
idle_state.load();
// Channel status fetcher
let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::<Vec<(String, bool, u32)>>(4);
{
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
let notify_rx = crate::thalamus::channels::subscribe_all();
let mut pending_notifications: Vec<crate::thalamus::channels::ChannelNotification> = Vec::new();
// Create UI channel
let (ui_tx, mut ui_rx) = ui_channel::channel();
// Shared state
let shared_context = ui_channel::shared_context_state();
let shared_active_tools = ui_channel::shared_active_tools();
// Initialize TUI
let mut terminal = tui::init_terminal()?;
let mut app = tui::App::new(config.model.clone(), shared_context.clone(), shared_active_tools.clone());
// Startup info
let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into()));
let _ = ui_tx.send(UiMessage::Info(format!(
" model: {} (available: {})", config.model, config.app.model_names().join(", "),
)));
let client = ApiClient::new(&config.api_base, &config.api_key, &config.model);
let _ = ui_tx.send(UiMessage::Info(format!(" api: {} ({})", config.api_base, client.backend_label())));
let _ = ui_tx.send(UiMessage::Info(format!(
" context: {}K chars ({} config, {} memory files)",
config.context_parts.iter().map(|(_, c)| c.len()).sum::<usize>() / 1024,
config.config_file_count, config.memory_file_count,
)));
let conversation_log_path = config.session_dir.join("conversation.jsonl");
let conversation_log = log::ConversationLog::new(conversation_log_path.clone())
.expect("failed to create conversation log");
let _ = ui_tx.send(UiMessage::Info(format!(" log: {}", conversation_log.path().display())));
let agent = Arc::new(Mutex::new(Agent::new(
client,
config.system_prompt.clone(),
config.context_parts.clone(),
config.app.clone(),
config.prompt_file.clone(),
Some(conversation_log),
shared_context,
shared_active_tools,
)));
// Restore conversation from log
{
let mut agent_guard = agent.lock().await;
if agent_guard.restore_from_log() {
ui_channel::replay_session_to_ui(agent_guard.entries(), &ui_tx);
let _ = ui_tx.send(UiMessage::Info("--- restored from conversation log ---".into()));
}
}
// Send initial budget to status bar
{
let agent_guard = agent.lock().await;
let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
dmn_state: "resting".to_string(),
dmn_turns: 0, dmn_max_turns: 0,
prompt_tokens: 0, completion_tokens: 0,
model: agent_guard.model().to_string(),
turn_tools: 0,
context_budget: agent_guard.budget().status_string(),
}));
}
let (turn_tx, mut turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
let no_agents = config.no_agents;
let mut session = Session::new(agent, config, ui_tx.clone(), turn_tx);
session.update_status();
if !no_agents {
session.start_memory_scoring(); // also sends initial agent snapshots
}
session.send_context_info();
// Start observation socket
let socket_path = session.config.session_dir.join("agent.sock");
let (observe_input_tx, mut observe_input_rx) = observe::input_channel();
if !no_agents {
observe::start(socket_path, ui_tx.subscribe(), observe_input_tx);
}
let mut reader = EventStream::new();
let mut render_interval = tokio::time::interval(Duration::from_millis(50));
render_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut dirty = true;
terminal.hide_cursor()?;
// Initial render
app.drain_messages(&mut ui_rx);
terminal.draw(|f| app.draw(f))?;
loop {
let timeout = session.dmn_interval();
tokio::select! {
biased;
maybe_event = reader.next() => {
match maybe_event {
Some(Ok(Event::Key(key))) => {
if key.kind != KeyEventKind::Press { continue; }
app.handle_key(key);
idle_state.user_activity();
if app.screen == tui::Screen::Thalamus {
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
dirty = true;
}
Some(Ok(Event::Mouse(mouse))) => {
app.handle_mouse(mouse);
dirty = true;
}
Some(Ok(Event::Resize(w, h))) => {
app.handle_resize(w, h);
terminal.clear()?;
dirty = true;
}
Some(Err(_)) => break,
None => break,
_ => continue,
}
}
Some(line) = observe_input_rx.recv() => {
app.submitted.push(line);
dirty = true;
}
Some((result, target)) = turn_rx.recv() => {
session.handle_turn_result(result, target).await;
idle_state.response_activity();
dirty = true;
}
_ = render_interval.tick() => {
let new_count = session.agent.lock().await.active_tools.lock().unwrap().len() as u32;
if new_count != app.running_processes {
app.running_processes = new_count;
dirty = true;
}
idle_state.decay_ewma();
app.update_idle(&idle_state);
while let Ok(notif) = notify_rx.try_recv() {
pending_notifications.push(notif);
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
}
_ = tokio::time::sleep(timeout), if !session.turn_in_progress => {
session.dmn_tick();
dirty = true;
}
Some(channels) = channel_rx.recv() => {
app.set_channel_status(channels);
dirty = true;
}
Some(msg) = ui_rx.recv() => {
app.handle_ui_message(msg);
dirty = true;
}
}
// Process submitted input
let submitted: Vec<String> = app.submitted.drain(..).collect();
for input in submitted {
let input = input.trim().to_string();
if input.is_empty() { continue; }
match session.handle_command(&input).await {
Command::Quit => app.should_quit = true,
Command::Handled => {}
Command::None => session.submit_input(input),
}
}
// Process hotkey actions
let actions: Vec<HotkeyAction> = app.hotkey_actions.drain(..).collect();
for action in actions {
match action {
HotkeyAction::CycleReasoning => session.cycle_reasoning(&mut app),
HotkeyAction::KillProcess => session.kill_processes().await,
HotkeyAction::Interrupt => session.interrupt().await,
HotkeyAction::CycleAutonomy => session.cycle_autonomy(),
HotkeyAction::AdjustSampling(param, delta) => {
if let Ok(mut agent) = session.agent.try_lock() {
match param {
0 => { agent.temperature = (agent.temperature + delta).clamp(0.0, 2.0); app.temperature = agent.temperature; }
1 => { agent.top_p = (agent.top_p + delta).clamp(0.0, 1.0); app.top_p = agent.top_p; }
2 => { agent.top_k = (agent.top_k as f32 + delta).max(0.0) as u32; app.top_k = agent.top_k; }
_ => {}
}
}
}
}
}
if app.drain_messages(&mut ui_rx) {
dirty = true;
}
if dirty {
terminal.draw(|f| app.draw(f))?;
dirty = false;
}
if app.should_quit {
break;
}
}
session.shutdown().await;
tui::restore_terminal(&mut terminal)?;
Ok(())
}