/quit, /help, /save handled directly in the UI event loop. /model and /model <name> moved to event_loop as cmd_switch_model(). Mind no longer needs tui::App for any command handling. Mind's handle_command now only has commands that genuinely need Mind state: /new, /retry, /score (turn_in_progress, DMN, scoring). Co-Authored-By: Kent Overstreet <kent.overstreet@linux.dev>
773 lines
29 KiB
Rust
773 lines
29 KiB
Rust
// mind/ — Cognitive layer
|
|
//
|
|
// Mind state machine, DMN, identity, observation socket.
|
|
// Everything about how the mind operates, separate from the
|
|
// user interface (TUI, CLI) and the agent execution (tools, API).
|
|
|
|
pub mod dmn;
|
|
pub mod identity;
|
|
pub mod log;
|
|
|
|
// consciousness.rs — Mind state machine and event loop
|
|
//
|
|
// The core runtime for the consciousness binary. Mind manages turns,
|
|
// DMN state, compaction, scoring, and slash commands. The event loop
|
|
// bridges Mind (cognitive state) with App (TUI rendering).
|
|
//
|
|
// The event loop uses biased select! so priorities are deterministic:
|
|
// keyboard events > turn results > render ticks > DMN timer > UI messages.
|
|
|
|
use anyhow::Result;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
use tokio::sync::{mpsc, Mutex};
|
|
|
|
use crate::agent::{Agent, TurnResult};
|
|
use crate::agent::api::ApiClient;
|
|
use crate::agent::api::types as api_types;
|
|
use crate::config::{self, AppConfig, SessionConfig};
|
|
use crate::user::{self as tui};
|
|
use crate::user::ui_channel::{self, ContextInfo, StatusInfo, StreamTarget, UiMessage};
|
|
use crate::subconscious::learn;
|
|
|
|
/// Compaction threshold — context is rebuilt when prompt tokens exceed this.
|
|
fn compaction_threshold(app: &AppConfig) -> u32 {
|
|
(crate::agent::context::context_window() as u32) * app.compaction.hard_threshold_pct / 100
|
|
}
|
|
|
|
/// Result of slash command handling.
|
|
pub enum Command {
|
|
Handled,
|
|
None,
|
|
}
|
|
|
|
// --- Mind: all mutable state for a running agent session ---
|
|
|
|
/// Collects the ~15 loose variables that previously lived in run()
|
|
/// into a coherent struct with methods. The event loop dispatches
|
|
/// to Mind methods; Mind manages turns, compaction, DMN state,
|
|
/// and slash commands.
|
|
pub struct Mind {
|
|
agent: Arc<Mutex<Agent>>,
|
|
config: SessionConfig,
|
|
ui_tx: ui_channel::UiSender,
|
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
|
// DMN state
|
|
dmn: dmn::State,
|
|
dmn_turns: u32,
|
|
max_dmn_turns: u32,
|
|
|
|
// Turn tracking
|
|
turn_in_progress: bool,
|
|
turn_handle: Option<tokio::task::JoinHandle<()>>,
|
|
/// User messages received while a turn is in progress.
|
|
/// Consolidated into one message (newline-separated) so the
|
|
/// model sees everything the user typed, not just the first line.
|
|
pending_input: Option<String>,
|
|
|
|
// Per-turn tracking for DMN context
|
|
last_user_input: Instant,
|
|
consecutive_errors: u32,
|
|
last_turn_had_tools: bool,
|
|
|
|
// Subconscious orchestration
|
|
agent_cycles: crate::subconscious::subconscious::AgentCycleState,
|
|
/// Latest memory importance scores from full matrix scoring (manual /score).
|
|
memory_scores: Option<learn::MemoryScore>,
|
|
/// Whether a full matrix /score task is currently running.
|
|
scoring_in_flight: bool,
|
|
}
|
|
|
|
impl Mind {
|
|
fn new(
|
|
agent: Arc<Mutex<Agent>>,
|
|
config: SessionConfig,
|
|
ui_tx: ui_channel::UiSender,
|
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
|
) -> Self {
|
|
let max_dmn_turns = config.app.dmn.max_turns;
|
|
|
|
Self {
|
|
agent,
|
|
config,
|
|
ui_tx,
|
|
turn_tx,
|
|
dmn: if dmn::is_off() {
|
|
dmn::State::Off
|
|
} else {
|
|
dmn::State::Resting { since: Instant::now() }
|
|
},
|
|
dmn_turns: 0,
|
|
max_dmn_turns,
|
|
turn_in_progress: false,
|
|
turn_handle: None,
|
|
pending_input: None,
|
|
last_user_input: Instant::now(),
|
|
consecutive_errors: 0,
|
|
last_turn_had_tools: false,
|
|
agent_cycles: crate::subconscious::subconscious::AgentCycleState::new(""),
|
|
memory_scores: None,
|
|
scoring_in_flight: false,
|
|
}
|
|
}
|
|
|
|
/// How long before the next DMN tick.
|
|
fn dmn_interval(&self) -> Duration {
|
|
self.dmn.interval()
|
|
}
|
|
|
|
/// Spawn an agent turn in a background task.
|
|
fn spawn_turn(&mut self, input: String, target: StreamTarget) {
|
|
let agent = self.agent.clone();
|
|
let ui_tx = self.ui_tx.clone();
|
|
let result_tx = self.turn_tx.clone();
|
|
self.turn_in_progress = true;
|
|
self.turn_handle = Some(tokio::spawn(async move {
|
|
let result = Agent::turn(agent, &input, &ui_tx, target).await;
|
|
let _ = result_tx.send((result, target)).await;
|
|
}));
|
|
}
|
|
|
|
/// Submit user input — either queue it (if a turn is running) or
|
|
/// start a new turn immediately.
|
|
fn submit_input(&mut self, input: String) {
|
|
if self.turn_in_progress {
|
|
match &mut self.pending_input {
|
|
Some(existing) => {
|
|
existing.push('\n');
|
|
existing.push_str(&input);
|
|
}
|
|
None => self.pending_input = Some(input.clone()),
|
|
}
|
|
let _ = self.ui_tx.send(UiMessage::Info("(queued)".into()));
|
|
} else {
|
|
self.dmn_turns = 0;
|
|
self.consecutive_errors = 0;
|
|
self.last_user_input = Instant::now();
|
|
self.dmn = dmn::State::Engaged;
|
|
let _ = self.ui_tx.send(UiMessage::UserInput(input.clone()));
|
|
self.update_status();
|
|
self.spawn_turn(input, StreamTarget::Conversation);
|
|
}
|
|
}
|
|
|
|
/// Process a completed turn: update DMN state, check compaction,
|
|
/// drain any queued input.
|
|
async fn handle_turn_result(
|
|
&mut self,
|
|
result: Result<TurnResult>,
|
|
target: StreamTarget,
|
|
) {
|
|
self.turn_in_progress = false;
|
|
self.turn_handle = None;
|
|
|
|
match result {
|
|
Ok(turn_result) => {
|
|
if turn_result.tool_errors > 0 {
|
|
self.consecutive_errors += turn_result.tool_errors;
|
|
} else {
|
|
self.consecutive_errors = 0;
|
|
}
|
|
self.last_turn_had_tools = turn_result.had_tool_calls;
|
|
self.dmn = dmn::transition(
|
|
&self.dmn,
|
|
turn_result.yield_requested,
|
|
turn_result.had_tool_calls,
|
|
target == StreamTarget::Conversation,
|
|
);
|
|
if turn_result.dmn_pause {
|
|
self.dmn = dmn::State::Paused;
|
|
self.dmn_turns = 0;
|
|
let _ = self.ui_tx.send(UiMessage::Info(
|
|
"DMN paused (agent requested). Ctrl+P or /wake to resume.".into(),
|
|
));
|
|
}
|
|
if let Some(model_name) = turn_result.model_switch {
|
|
crate::user::event_loop::cmd_switch_model(
|
|
&self.agent, &model_name, &self.ui_tx,
|
|
).await;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
self.consecutive_errors += 1;
|
|
let msg = match target {
|
|
StreamTarget::Autonomous => {
|
|
UiMessage::DmnAnnotation(format!("[error: {:#}]", e))
|
|
}
|
|
StreamTarget::Conversation => {
|
|
UiMessage::Info(format!("Error: {:#}", e))
|
|
}
|
|
};
|
|
let _ = self.ui_tx.send(msg);
|
|
self.dmn = dmn::State::Resting {
|
|
since: Instant::now(),
|
|
};
|
|
}
|
|
}
|
|
|
|
self.update_status();
|
|
self.check_compaction();
|
|
if !self.config.no_agents {
|
|
self.start_memory_scoring();
|
|
}
|
|
self.drain_pending();
|
|
}
|
|
|
|
/// Spawn incremental memory scoring if not already running.
|
|
/// Non-blocking — all async work happens in the spawned task.
|
|
fn start_memory_scoring(&self) {
|
|
let agent = self.agent.clone();
|
|
let ui_tx = self.ui_tx.clone();
|
|
let cfg = crate::config::get();
|
|
let max_age = cfg.scoring_interval_secs;
|
|
let response_window = cfg.scoring_response_window;
|
|
tokio::spawn(async move {
|
|
let (context, client) = {
|
|
let mut agent = agent.lock().await;
|
|
if agent.agent_cycles.memory_scoring_in_flight {
|
|
return;
|
|
}
|
|
agent.agent_cycles.memory_scoring_in_flight = true;
|
|
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
|
|
(agent.context.clone(), agent.client_clone())
|
|
};
|
|
|
|
let result = learn::score_memories_incremental(
|
|
&context, max_age as i64, response_window, &client, &ui_tx,
|
|
).await;
|
|
|
|
{
|
|
let mut agent = agent.lock().await;
|
|
agent.agent_cycles.memory_scoring_in_flight = false;
|
|
if let Ok(ref scores) = result {
|
|
agent.agent_cycles.memory_scores = scores.clone();
|
|
}
|
|
}
|
|
match result {
|
|
Ok(_) => {
|
|
let agent = agent.lock().await;
|
|
let _ = ui_tx.send(UiMessage::AgentUpdate(agent.agent_cycles.snapshots()));
|
|
}
|
|
Err(e) => {
|
|
let _ = ui_tx.send(UiMessage::Debug(format!(
|
|
"[memory-scoring] failed: {:#}", e,
|
|
)));
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Check if compaction is needed after a turn.
|
|
fn check_compaction(&self) {
|
|
let threshold = compaction_threshold(&self.config.app);
|
|
let agent = self.agent.clone();
|
|
let ui_tx = self.ui_tx.clone();
|
|
tokio::spawn(async move {
|
|
let mut agent_guard = agent.lock().await;
|
|
let tokens = agent_guard.last_prompt_tokens();
|
|
if tokens > threshold {
|
|
let _ = ui_tx.send(UiMessage::Info(format!(
|
|
"[compaction: {}K > {}K threshold]",
|
|
tokens / 1000,
|
|
threshold / 1000,
|
|
)));
|
|
agent_guard.compact();
|
|
let _ = ui_tx.send(UiMessage::Info(
|
|
"[compacted — journal + recent messages]".into(),
|
|
));
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Send any consolidated pending input as a single turn.
|
|
fn drain_pending(&mut self) {
|
|
if let Some(queued) = self.pending_input.take() {
|
|
self.dmn_turns = 0;
|
|
self.consecutive_errors = 0;
|
|
self.last_user_input = Instant::now();
|
|
self.dmn = dmn::State::Engaged;
|
|
let _ = self.ui_tx.send(UiMessage::UserInput(queued.clone()));
|
|
self.update_status();
|
|
self.spawn_turn(queued, StreamTarget::Conversation);
|
|
}
|
|
}
|
|
|
|
/// Fire a DMN tick: check max turns, generate prompt, spawn turn.
|
|
fn dmn_tick(&mut self) {
|
|
if matches!(self.dmn, dmn::State::Paused | dmn::State::Off) {
|
|
return;
|
|
}
|
|
|
|
self.dmn_turns += 1;
|
|
if self.dmn_turns > self.max_dmn_turns {
|
|
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
|
|
"[dmn: {} consecutive turns, resting (limit: {})]",
|
|
self.dmn_turns - 1,
|
|
self.max_dmn_turns,
|
|
)));
|
|
self.dmn = dmn::State::Resting {
|
|
since: Instant::now(),
|
|
};
|
|
self.dmn_turns = 0;
|
|
self.update_status();
|
|
return;
|
|
}
|
|
|
|
let dmn_ctx = dmn::DmnContext {
|
|
user_idle: self.last_user_input.elapsed(),
|
|
consecutive_errors: self.consecutive_errors,
|
|
last_turn_had_tools: self.last_turn_had_tools,
|
|
};
|
|
let prompt = self.dmn.prompt(&dmn_ctx);
|
|
let _ = self.ui_tx.send(UiMessage::DmnAnnotation(format!(
|
|
"[dmn: {} ({}/{})]",
|
|
self.dmn.label(),
|
|
self.dmn_turns,
|
|
self.max_dmn_turns,
|
|
)));
|
|
self.update_status();
|
|
self.spawn_turn(prompt, StreamTarget::Autonomous);
|
|
}
|
|
|
|
/// Handle slash commands. Returns how the main loop should respond.
|
|
async fn handle_command(&mut self, input: &str) -> Command {
|
|
match input {
|
|
"/new" | "/clear" => {
|
|
if self.turn_in_progress {
|
|
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
|
|
return Command::Handled;
|
|
}
|
|
{
|
|
let new_log = log::ConversationLog::new(
|
|
self.config.session_dir.join("conversation.jsonl"),
|
|
).ok();
|
|
let mut agent_guard = self.agent.lock().await;
|
|
let shared_ctx = agent_guard.shared_context.clone();
|
|
let shared_tools = agent_guard.active_tools.clone();
|
|
*agent_guard = Agent::new(
|
|
ApiClient::new(&self.config.api_base, &self.config.api_key, &self.config.model),
|
|
self.config.system_prompt.clone(),
|
|
self.config.context_parts.clone(),
|
|
self.config.app.clone(),
|
|
self.config.prompt_file.clone(),
|
|
new_log,
|
|
shared_ctx,
|
|
shared_tools,
|
|
);
|
|
}
|
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
let _ = self.ui_tx.send(UiMessage::Info("New session started.".into()));
|
|
Command::Handled
|
|
}
|
|
"/score" => {
|
|
if self.scoring_in_flight {
|
|
let _ = self.ui_tx.send(UiMessage::Info("(scoring already in progress)".into()));
|
|
return Command::Handled;
|
|
}
|
|
let (context, client) = {
|
|
let agent = self.agent.lock().await;
|
|
(agent.context.clone(), agent.client_clone())
|
|
};
|
|
self.scoring_in_flight = true;
|
|
let agent = self.agent.clone();
|
|
let ui_tx = self.ui_tx.clone();
|
|
tokio::spawn(async move {
|
|
let result = learn::score_memories(
|
|
&context, &client, &ui_tx,
|
|
).await;
|
|
let agent = agent.lock().await;
|
|
match result {
|
|
Ok(scores) => {
|
|
agent.publish_context_state_with_scores(Some(&scores));
|
|
}
|
|
Err(e) => {
|
|
let _ = ui_tx.send(UiMessage::Info(format!("[scoring failed: {:#}]", e)));
|
|
}
|
|
}
|
|
});
|
|
Command::Handled
|
|
}
|
|
"/dmn" => {
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN state: {:?}", self.dmn)));
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!("Next tick in: {:?}", self.dmn.interval())));
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!(
|
|
"Consecutive DMN turns: {}/{}", self.dmn_turns, self.max_dmn_turns,
|
|
)));
|
|
Command::Handled
|
|
}
|
|
"/sleep" => {
|
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
self.dmn_turns = 0;
|
|
let _ = self.ui_tx.send(UiMessage::Info(
|
|
"DMN sleeping (heartbeat every 5 min). Type anything to wake.".into(),
|
|
));
|
|
Command::Handled
|
|
}
|
|
"/wake" => {
|
|
let was_paused = matches!(self.dmn, dmn::State::Paused | dmn::State::Off);
|
|
if matches!(self.dmn, dmn::State::Off) {
|
|
dmn::set_off(false);
|
|
}
|
|
self.dmn = dmn::State::Foraging;
|
|
self.dmn_turns = 0;
|
|
let msg = if was_paused { "DMN unpaused — entering foraging mode." }
|
|
else { "DMN waking — entering foraging mode." };
|
|
let _ = self.ui_tx.send(UiMessage::Info(msg.into()));
|
|
self.update_status();
|
|
Command::Handled
|
|
}
|
|
"/pause" => {
|
|
self.dmn = dmn::State::Paused;
|
|
self.dmn_turns = 0;
|
|
let _ = self.ui_tx.send(UiMessage::Info(
|
|
"DMN paused — no autonomous ticks. Ctrl+P or /wake to resume.".into(),
|
|
));
|
|
self.update_status();
|
|
Command::Handled
|
|
}
|
|
"/retry" => {
|
|
if self.turn_in_progress {
|
|
let _ = self.ui_tx.send(UiMessage::Info("(turn in progress, please wait)".into()));
|
|
return Command::Handled;
|
|
}
|
|
let mut agent_guard = self.agent.lock().await;
|
|
let entries = agent_guard.entries_mut();
|
|
let mut last_user_text = None;
|
|
while let Some(entry) = entries.last() {
|
|
if entry.message().role == api_types::Role::User {
|
|
last_user_text = Some(entries.pop().unwrap().message().content_text().to_string());
|
|
break;
|
|
}
|
|
entries.pop();
|
|
}
|
|
drop(agent_guard);
|
|
match last_user_text {
|
|
Some(text) => {
|
|
let preview_len = text.len().min(60);
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!("(retrying: {}...)", &text[..preview_len])));
|
|
self.dmn_turns = 0;
|
|
self.dmn = dmn::State::Engaged;
|
|
self.spawn_turn(text, StreamTarget::Conversation);
|
|
}
|
|
None => {
|
|
let _ = self.ui_tx.send(UiMessage::Info("(nothing to retry)".into()));
|
|
}
|
|
}
|
|
Command::Handled
|
|
}
|
|
_ => Command::None,
|
|
}
|
|
}
|
|
|
|
/// Interrupt: kill processes, abort current turn, clear pending queue.
|
|
async fn interrupt(&mut self) {
|
|
let count = {
|
|
let agent = self.agent.lock().await;
|
|
let mut tools = agent.active_tools.lock().unwrap();
|
|
let count = tools.len();
|
|
for entry in tools.drain(..) {
|
|
entry.handle.abort();
|
|
}
|
|
count
|
|
};
|
|
if count == 0 {
|
|
if let Some(handle) = self.turn_handle.take() {
|
|
handle.abort();
|
|
self.turn_in_progress = false;
|
|
self.dmn = dmn::State::Resting { since: Instant::now() };
|
|
self.update_status();
|
|
let _ = self.ui_tx.send(UiMessage::Activity(String::new()));
|
|
}
|
|
}
|
|
self.pending_input = None;
|
|
let killed = count;
|
|
if killed > 0 || self.turn_in_progress {
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!(
|
|
"(interrupted — killed {} process(es), turn aborted)", killed,
|
|
)));
|
|
} else {
|
|
let _ = self.ui_tx.send(UiMessage::Info("(interrupted)".into()));
|
|
}
|
|
}
|
|
|
|
/// Cycle reasoning effort: none → low → high → none.
|
|
fn cycle_reasoning(&mut self) {
|
|
if let Ok(mut agent_guard) = self.agent.try_lock() {
|
|
let next = match agent_guard.reasoning_effort.as_str() {
|
|
"none" => "low",
|
|
"low" => "high",
|
|
_ => "none",
|
|
};
|
|
agent_guard.reasoning_effort = next.to_string();
|
|
let label = match next {
|
|
"none" => "off (monologue hidden)",
|
|
"low" => "low (brief monologue)",
|
|
"high" => "high (full monologue)",
|
|
_ => next,
|
|
};
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!("Reasoning: {} — ^R to cycle", label)));
|
|
} else {
|
|
let _ = self.ui_tx.send(UiMessage::Info(
|
|
"(agent busy — reasoning change takes effect next turn)".into(),
|
|
));
|
|
}
|
|
}
|
|
|
|
/// Show and kill running tool calls (Ctrl+K).
|
|
async fn kill_processes(&mut self) {
|
|
let active_tools = self.agent.lock().await.active_tools.clone();
|
|
let mut tools = active_tools.lock().unwrap();
|
|
if tools.is_empty() {
|
|
let _ = self.ui_tx.send(UiMessage::Info("(no running tool calls)".into()));
|
|
} else {
|
|
for entry in tools.drain(..) {
|
|
let elapsed = entry.started.elapsed();
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!(
|
|
" killing {} ({:.0}s): {}", entry.name, elapsed.as_secs_f64(), entry.detail,
|
|
)));
|
|
entry.handle.abort();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Cycle DMN autonomy: foraging → resting → paused → off → foraging.
|
|
fn cycle_autonomy(&mut self) {
|
|
let (new_state, label) = match &self.dmn {
|
|
dmn::State::Engaged | dmn::State::Working | dmn::State::Foraging => {
|
|
(dmn::State::Resting { since: Instant::now() }, "resting")
|
|
}
|
|
dmn::State::Resting { .. } => (dmn::State::Paused, "PAUSED"),
|
|
dmn::State::Paused => {
|
|
dmn::set_off(true);
|
|
(dmn::State::Off, "OFF (persists across restarts)")
|
|
}
|
|
dmn::State::Off => {
|
|
dmn::set_off(false);
|
|
(dmn::State::Foraging, "foraging")
|
|
}
|
|
};
|
|
self.dmn = new_state;
|
|
self.dmn_turns = 0;
|
|
let _ = self.ui_tx.send(UiMessage::Info(format!("DMN → {} (Ctrl+P to cycle)", label)));
|
|
self.update_status();
|
|
}
|
|
|
|
/// Switch to a named model from the config registry.
|
|
fn load_context_groups(&self) -> Vec<config::ContextGroup> {
|
|
config::get().context_groups.clone()
|
|
}
|
|
|
|
fn send_context_info(&self) {
|
|
let context_groups = self.load_context_groups();
|
|
let (instruction_files, memory_files) = identity::context_file_info(
|
|
&self.config.prompt_file,
|
|
self.config.app.memory_project.as_deref(),
|
|
&context_groups,
|
|
);
|
|
let _ = self.ui_tx.send(UiMessage::ContextInfoUpdate(ContextInfo {
|
|
model: self.config.model.clone(),
|
|
available_models: self.config.app.model_names(),
|
|
prompt_file: self.config.prompt_file.clone(),
|
|
backend: self.config.app.backend.clone(),
|
|
instruction_files,
|
|
memory_files,
|
|
system_prompt_chars: self.config.system_prompt.len(),
|
|
context_message_chars: self.config.context_parts.iter().map(|(_, c)| c.len()).sum(),
|
|
}));
|
|
}
|
|
|
|
fn update_status(&self) {
|
|
let _ = self.ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
|
|
dmn_state: self.dmn.label().to_string(),
|
|
dmn_turns: self.dmn_turns,
|
|
dmn_max_turns: self.max_dmn_turns,
|
|
prompt_tokens: 0,
|
|
completion_tokens: 0,
|
|
model: String::new(),
|
|
turn_tools: 0,
|
|
context_budget: String::new(),
|
|
}));
|
|
}
|
|
|
|
pub async fn shutdown(&mut self) {
|
|
if let Some(handle) = self.turn_handle.take() {
|
|
handle.abort();
|
|
}
|
|
}
|
|
|
|
/// Mind event loop — reacts to user input, turn results, DMN ticks.
|
|
pub async fn run(
|
|
&mut self,
|
|
mut input_rx: tokio::sync::mpsc::UnboundedReceiver<crate::user::event_loop::MindMessage>,
|
|
mut turn_rx: mpsc::Receiver<(Result<TurnResult>, StreamTarget)>,
|
|
) {
|
|
use crate::user::event_loop::MindMessage;
|
|
use crate::user::HotkeyAction;
|
|
|
|
loop {
|
|
let timeout = self.dmn_interval();
|
|
|
|
tokio::select! {
|
|
biased;
|
|
|
|
Some(msg) = input_rx.recv() => {
|
|
match msg {
|
|
MindMessage::UserInput(input) => {
|
|
match self.handle_command(&input).await {
|
|
Command::Handled => {}
|
|
Command::None => self.submit_input(input),
|
|
}
|
|
}
|
|
MindMessage::Hotkey(action) => {
|
|
match action {
|
|
HotkeyAction::CycleReasoning => self.cycle_reasoning(),
|
|
HotkeyAction::KillProcess => self.kill_processes().await,
|
|
HotkeyAction::Interrupt => self.interrupt().await,
|
|
HotkeyAction::CycleAutonomy => self.cycle_autonomy(),
|
|
HotkeyAction::AdjustSampling(param, delta) => {
|
|
if let Ok(mut agent) = self.agent.try_lock() {
|
|
match param {
|
|
0 => agent.temperature = (agent.temperature + delta).clamp(0.0, 2.0),
|
|
1 => agent.top_p = (agent.top_p + delta).clamp(0.0, 1.0),
|
|
2 => agent.top_k = (agent.top_k as f32 + delta).max(0.0) as u32,
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Some((result, target)) = turn_rx.recv() => {
|
|
self.handle_turn_result(result, target).await;
|
|
}
|
|
|
|
_ = tokio::time::sleep(timeout), if !self.turn_in_progress => {
|
|
self.dmn_tick();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Startup ---
|
|
|
|
pub async fn run(cli: crate::user::CliArgs) -> Result<()> {
|
|
let (config, _figment) = config::load_session(&cli)?;
|
|
|
|
if config.app.debug {
|
|
unsafe { std::env::set_var("POC_DEBUG", "1") };
|
|
}
|
|
|
|
// Start channel daemons
|
|
let mut channel_supervisor = crate::thalamus::supervisor::Supervisor::new();
|
|
channel_supervisor.load_config();
|
|
channel_supervisor.ensure_running();
|
|
|
|
// Initialize idle state machine
|
|
let mut idle_state = crate::thalamus::idle::State::new();
|
|
idle_state.load();
|
|
|
|
// Channel status
|
|
let (channel_tx, channel_rx) = tokio::sync::mpsc::channel::<Vec<(String, bool, u32)>>(4);
|
|
{
|
|
let tx = channel_tx.clone();
|
|
tokio::spawn(async move {
|
|
let result = crate::thalamus::channels::fetch_all_channels().await;
|
|
let _ = tx.send(result).await;
|
|
});
|
|
}
|
|
let notify_rx = crate::thalamus::channels::subscribe_all();
|
|
|
|
// Create UI channel
|
|
let (ui_tx, ui_rx) = ui_channel::channel();
|
|
|
|
// Shared state
|
|
let shared_context = ui_channel::shared_context_state();
|
|
let shared_active_tools = ui_channel::shared_active_tools();
|
|
|
|
// Startup info
|
|
let _ = ui_tx.send(UiMessage::Info("consciousness v0.3 (tui)".into()));
|
|
let _ = ui_tx.send(UiMessage::Info(format!(
|
|
" model: {} (available: {})", config.model, config.app.model_names().join(", "),
|
|
)));
|
|
let client = ApiClient::new(&config.api_base, &config.api_key, &config.model);
|
|
let _ = ui_tx.send(UiMessage::Info(format!(" api: {} ({})", config.api_base, client.backend_label())));
|
|
let _ = ui_tx.send(UiMessage::Info(format!(
|
|
" context: {}K chars ({} config, {} memory files)",
|
|
config.context_parts.iter().map(|(_, c)| c.len()).sum::<usize>() / 1024,
|
|
config.config_file_count, config.memory_file_count,
|
|
)));
|
|
|
|
let conversation_log_path = config.session_dir.join("conversation.jsonl");
|
|
let conversation_log = log::ConversationLog::new(conversation_log_path.clone())
|
|
.expect("failed to create conversation log");
|
|
let _ = ui_tx.send(UiMessage::Info(format!(" log: {}", conversation_log.path().display())));
|
|
|
|
let agent = Arc::new(Mutex::new(Agent::new(
|
|
client,
|
|
config.system_prompt.clone(),
|
|
config.context_parts.clone(),
|
|
config.app.clone(),
|
|
config.prompt_file.clone(),
|
|
Some(conversation_log),
|
|
shared_context.clone(),
|
|
shared_active_tools.clone(),
|
|
)));
|
|
|
|
// Restore conversation from log
|
|
{
|
|
let mut agent_guard = agent.lock().await;
|
|
if agent_guard.restore_from_log() {
|
|
ui_channel::replay_session_to_ui(agent_guard.entries(), &ui_tx);
|
|
let _ = ui_tx.send(UiMessage::Info("--- restored from conversation log ---".into()));
|
|
}
|
|
}
|
|
|
|
// Send initial budget to status bar
|
|
{
|
|
let agent_guard = agent.lock().await;
|
|
let _ = ui_tx.send(UiMessage::StatusUpdate(StatusInfo {
|
|
dmn_state: "resting".to_string(),
|
|
dmn_turns: 0, dmn_max_turns: 0,
|
|
prompt_tokens: 0, completion_tokens: 0,
|
|
model: agent_guard.model().to_string(),
|
|
turn_tools: 0,
|
|
context_budget: agent_guard.budget().status_string(),
|
|
}));
|
|
}
|
|
|
|
let (turn_tx, turn_rx) = mpsc::channel::<(Result<TurnResult>, StreamTarget)>(1);
|
|
|
|
let no_agents = config.no_agents;
|
|
let mut mind = Mind::new(agent, config, ui_tx.clone(), turn_tx);
|
|
mind.update_status();
|
|
if !no_agents {
|
|
mind.start_memory_scoring();
|
|
}
|
|
mind.send_context_info();
|
|
|
|
// Start observation socket
|
|
let socket_path = mind.config.session_dir.join("agent.sock");
|
|
let (observe_input_tx, observe_input_rx) = log::input_channel();
|
|
if !no_agents {
|
|
log::start(socket_path, ui_tx.subscribe(), observe_input_tx);
|
|
}
|
|
|
|
// Mind ↔ UI channel
|
|
let (mind_tx, mind_rx) = tokio::sync::mpsc::unbounded_channel();
|
|
|
|
// App for TUI
|
|
let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools);
|
|
let ui_agent = mind.agent.clone();
|
|
|
|
// Spawn Mind event loop
|
|
tokio::spawn(async move {
|
|
mind.run(mind_rx, turn_rx).await;
|
|
});
|
|
crate::user::event_loop::run(
|
|
app, ui_agent, mind_tx, ui_tx, ui_rx, observe_input_rx,
|
|
channel_tx, channel_rx, notify_rx, idle_state,
|
|
).await
|
|
}
|