consciousness/src/user/mod.rs
ProofOfConcept 6529aba069 Fix UI lag: try_lock on unconscious mutex, don't re-log restored nodes
The unconscious trigger holds the tokio mutex during heavy sync work
(store load, graph build, agent creation), blocking the UI tick which
needs the same lock for snapshots. Fix: try_lock in the UI — skip
the update if the trigger is running.

Also: restore_from_log was re-logging every restored node back to the
log file via push()'s auto-log. Added push_no_log() for restore path.

Co-Authored-By: Proof of Concept <poc@bcachefs.org>
2026-04-09 01:07:55 -04:00

565 lines
17 KiB
Rust

// user/ — User interface layer
//
// TUI, UI channel, parsing. The cognitive layer (session state
// machine, DMN, identity) lives in mind/.
pub(crate) mod chat;
mod context;
mod subconscious;
mod unconscious;
mod thalamus;
mod widgets;
use anyhow::Result;
use std::io::Write;
use crate::mind::MindCommand;
use crate::user::{self as tui};
// --- TUI infrastructure (moved from tui/mod.rs) ---
use ratatui::crossterm::{
event::{EnableMouseCapture, DisableMouseCapture},
terminal::{self, EnterAlternateScreen, LeaveAlternateScreen},
ExecutableCommand,
};
use ratatui::{
backend::CrosstermBackend,
};
use std::io;
/// Status info for the bottom status bar.
#[derive(Debug, Clone)]
struct StatusInfo {
dmn_state: String,
dmn_turns: u32,
dmn_max_turns: u32,
prompt_tokens: u32,
model: String,
turn_tools: u32,
context_budget: String,
}
/// Context loading details for the debug screen.
#[derive(Debug, Clone)]
struct ContextInfo {
model: String,
available_models: Vec<String>,
prompt_file: String,
backend: String,
system_prompt_chars: usize,
context_message_chars: usize,
}
/// Build the screen legend from screen labels.
fn screen_legend_from(screens: &[Box<dyn ScreenView>]) -> String {
let parts: Vec<String> = screens.iter().enumerate()
.map(|(i, s)| format!("F{}={}", i + 1, s.label()))
.collect();
format!(" {} ", parts.join(" "))
}
// Cached legend — set once at startup by event_loop
static SCREEN_LEGEND: std::sync::OnceLock<String> = std::sync::OnceLock::new();
fn set_screen_legend(legend: String) {
let _ = SCREEN_LEGEND.set(legend);
}
fn screen_legend() -> String {
SCREEN_LEGEND.get().cloned().unwrap_or_default()
}
/// A screen that can draw itself and handle input.
trait ScreenView: Send {
fn tick(&mut self, frame: &mut ratatui::Frame, area: ratatui::layout::Rect,
events: &[ratatui::crossterm::event::Event], app: &mut App);
fn label(&self) -> &'static str;
}
#[derive(Clone)]
struct IdleInfo {
user_present: bool,
since_activity: f64,
activity_ewma: f64,
block_reason: String,
dreaming: bool,
sleeping: bool,
}
#[derive(Clone)]
struct ChannelStatus {
name: String,
connected: bool,
unread: u32,
}
struct App {
status: StatusInfo,
activity: String,
running_processes: u32,
reasoning_effort: String,
temperature: f32,
top_p: f32,
top_k: u32,
agent: std::sync::Arc<crate::agent::Agent>,
should_quit: bool,
context_info: Option<ContextInfo>,
agent_state: Vec<crate::mind::SubconsciousSnapshot>,
unconscious_state: Vec<crate::mind::UnconsciousSnapshot>,
graph_health: Option<crate::subconscious::daemon::GraphHealth>,
/// Agent toggle requests from UI — consumed by mind loop.
pub agent_toggles: Vec<String>,
walked_count: usize,
channel_status: Vec<ChannelStatus>,
idle_info: Option<IdleInfo>,
}
impl App {
fn new(model: String, agent: std::sync::Arc<crate::agent::Agent>) -> Self {
Self {
status: StatusInfo {
dmn_state: "resting".into(), dmn_turns: 0, dmn_max_turns: 20,
prompt_tokens: 0, model,
turn_tools: 0, context_budget: String::new(),
},
activity: String::new(),
running_processes: 0,
reasoning_effort: "none".to_string(),
temperature: 0.6,
top_p: 0.95,
top_k: 20,
agent,
should_quit: false,
context_info: None,
agent_state: Vec::new(),
unconscious_state: Vec::new(),
graph_health: None,
agent_toggles: Vec::new(),
walked_count: 0,
channel_status: Vec::new(), idle_info: None,
}
}
fn set_channel_status(&mut self, channels: Vec<(String, bool, u32)>) {
self.channel_status = channels.into_iter()
.map(|(name, connected, unread)| ChannelStatus { name, connected, unread })
.collect();
}
fn update_idle(&mut self, state: &crate::thalamus::idle::State) {
self.idle_info = Some(IdleInfo {
user_present: state.user_present(), since_activity: state.since_activity(),
activity_ewma: state.activity_ewma, block_reason: state.block_reason().to_string(),
dreaming: state.dreaming, sleeping: state.sleep_until.is_some(),
});
}
}
fn init_terminal() -> io::Result<ratatui::Terminal<CrosstermBackend<io::Stdout>>> {
terminal::enable_raw_mode()?;
let mut stdout = io::stdout();
stdout.execute(EnterAlternateScreen)?;
stdout.execute(EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
ratatui::Terminal::new(backend)
}
fn restore_terminal(terminal: &mut ratatui::Terminal<CrosstermBackend<io::Stdout>>) -> io::Result<()> {
terminal::disable_raw_mode()?;
terminal.backend_mut().execute(DisableMouseCapture)?;
terminal.backend_mut().execute(LeaveAlternateScreen)?;
terminal.show_cursor()
}
/// Top-level entry point — creates Mind and UI, wires them together.
async fn start(cli: crate::user::CliArgs) -> Result<()> {
let (config, _figment) = crate::config::load_session(&cli)?;
if config.app.debug {
unsafe { std::env::set_var("POC_DEBUG", "1") };
}
let (turn_tx, turn_rx) = tokio::sync::mpsc::channel(1);
let (mind_tx, mind_rx) = tokio::sync::mpsc::unbounded_channel();
let mind = crate::mind::Mind::new(config, turn_tx).await;
let mut result = Ok(());
tokio_scoped::scope(|s| {
// Mind event loop — init + run
s.spawn(async {
mind.init().await;
mind.run(mind_rx, turn_rx).await;
});
// UI event loop
s.spawn(async {
result = run(
tui::App::new(String::new(), mind.agent.clone()),
&mind, mind_tx,
).await;
});
});
result
}
fn hotkey_cycle_reasoning(mind: &crate::mind::Mind) {
if let Ok(mut ag) = mind.agent.state.try_lock() {
let next = match ag.reasoning_effort.as_str() {
"none" => "low",
"low" => "high",
_ => "none",
};
ag.reasoning_effort = next.to_string();
let label = match next {
"none" => "off (monologue hidden)",
"low" => "low (brief monologue)",
"high" => "high (full monologue)",
_ => next,
};
ag.notify(format!("reasoning: {}", label));
}
}
async fn hotkey_kill_processes(mind: &crate::mind::Mind) {
let mut st = mind.agent.state.lock().await;
if st.active_tools.is_empty() {
st.notify("no running tools");
} else {
let count = st.active_tools.len();
st.active_tools.abort_all();
st.notify(format!("killed {} tools", count));
}
}
fn hotkey_cycle_autonomy(mind: &crate::mind::Mind) {
let mut s = mind.shared.lock().unwrap();
let label = match &s.dmn {
crate::mind::subconscious::State::Engaged | crate::mind::subconscious::State::Working | crate::mind::subconscious::State::Foraging => {
s.dmn = crate::mind::subconscious::State::Resting { since: std::time::Instant::now() };
"resting"
}
crate::mind::subconscious::State::Resting { .. } => {
s.dmn = crate::mind::subconscious::State::Paused;
"PAUSED"
}
crate::mind::subconscious::State::Paused => {
crate::mind::subconscious::set_off(true);
s.dmn = crate::mind::subconscious::State::Off;
"OFF (persists across restarts)"
}
crate::mind::subconscious::State::Off => {
crate::mind::subconscious::set_off(false);
s.dmn = crate::mind::subconscious::State::Foraging;
"foraging"
}
};
s.dmn_turns = 0;
drop(s);
if let Ok(mut ag) = mind.agent.state.try_lock() {
ag.notify(format!("DMN → {}", label));
}
}
/// Returns true if this is an event the main loop handles (F-keys, Ctrl combos, resize).
fn is_global_event(event: &ratatui::crossterm::event::Event) -> bool {
use ratatui::crossterm::event::{Event, KeyCode, KeyModifiers, KeyEventKind};
match event {
Event::Key(key) => {
if key.kind != KeyEventKind::Press { return false; }
matches!(key.code, KeyCode::F(_))
|| key.modifiers.contains(KeyModifiers::CONTROL)
}
Event::Resize(_, _) => true,
_ => false,
}
}
async fn run(
mut app: tui::App,
mind: &crate::mind::Mind,
mind_tx: tokio::sync::mpsc::UnboundedSender<MindCommand>,
) -> Result<()> {
let agent = &mind.agent;
// UI-owned state
let mut idle_state = crate::thalamus::idle::State::new();
idle_state.load();
let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::<Vec<(String, bool, u32)>>(4);
{
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
let notify_rx = crate::thalamus::channels::subscribe_all();
// F1=chat, F2=conscious, F3=subconscious, F4=unconscious, F5=thalamus
let mut screens: Vec<Box<dyn tui::ScreenView>> = vec![
Box::new(crate::user::chat::InteractScreen::new(
mind.agent.clone(), mind.shared.clone(), mind_tx.clone(),
)),
Box::new(crate::user::context::ConsciousScreen::new(mind.agent.clone())),
Box::new(crate::user::subconscious::SubconsciousScreen::new()),
Box::new(crate::user::unconscious::UnconsciousScreen::new()),
Box::new(crate::user::thalamus::ThalamusScreen::new()),
];
let mut active_screen: usize = 1; // F-key number
tui::set_screen_legend(tui::screen_legend_from(&*screens));
let mut terminal = tui::init_terminal()?;
// Terminal event reader — dedicated thread reads sync, pushes to channel
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
loop {
match crossterm::event::read() {
Ok(event) => { if event_tx.send(event).is_err() { break; } }
Err(_) => break,
}
}
});
let agent_changed = agent.state.lock().await.changed.clone();
let mut turn_watch = mind.turn_watch();
let mut pending: Vec<ratatui::crossterm::event::Event> = Vec::new();
terminal.hide_cursor()?;
if let Ok(mut ag) = agent.state.try_lock() { ag.notify("consciousness v0.3"); }
// Initial render
{
let mut frame = terminal.get_frame();
let area = frame.area();
screens[active_screen - 1].tick(&mut frame, area, &[], &mut app);
drop(frame);
terminal.flush()?;
terminal.swap_buffers();
terminal.backend_mut().flush()?;
}
// Replay conversation after Mind init completes (non-blocking check)
let mut startup_done = false;
let mut dirty = true; // render on first loop
loop {
tokio::select! {
biased;
Some(event) = event_rx.recv() => {
pending.push(event);
while let Ok(event) = event_rx.try_recv() {
pending.push(event);
}
}
_ = agent_changed.notified() => {
dirty = true;
}
_ = turn_watch.changed() => {
dirty = true;
}
Some(channels) = channel_rx.recv() => {
app.set_channel_status(channels);
}
}
// State sync on every wake
idle_state.decay_ewma();
app.update_idle(&idle_state);
app.agent_state = mind.subconscious_snapshots().await;
if let Ok(mut unc) = mind.unconscious.try_lock() {
let toggles: Vec<String> = app.agent_toggles.drain(..).collect();
for name in &toggles {
if mind.subconscious.lock().await.toggle(name).is_none() {
unc.toggle(name).await;
}
}
app.unconscious_state = unc.snapshots();
app.graph_health = unc.graph_health.clone();
}
app.walked_count = mind.subconscious_walked().await.len();
if !startup_done {
if let Ok(mut ag) = agent.state.try_lock() {
let model = agent.model().to_string();
ag.notify(format!("model: {}", model));
startup_done = true;
}
}
while let Ok(_notif) = notify_rx.try_recv() {
let tx = channel_tx.clone();
tokio::spawn(async move {
let result = crate::thalamus::channels::fetch_all_channels().await;
let _ = tx.send(result).await;
});
}
if !pending.is_empty() { idle_state.user_activity(); }
while !pending.is_empty() || dirty {
let global_pos = pending.iter().position(|e| is_global_event(e))
.unwrap_or(pending.len());
let mut frame = terminal.get_frame();
let area = frame.area();
screens[active_screen - 1].tick(&mut frame, area, &pending[..global_pos], &mut app);
drop(frame);
terminal.flush()?;
terminal.swap_buffers();
terminal.backend_mut().flush()?;
dirty = false;
pending = pending.split_off(global_pos);
if pending.is_empty() { break; }
dirty = true;
// Global event is first — handle it
use ratatui::crossterm::event::{Event, KeyCode, KeyModifiers};
let event = pending.remove(0);
match event {
Event::Key(key) => {
if let KeyCode::F(n) = key.code {
let idx = n as usize;
if idx >= 1 && idx <= screens.len() {
active_screen = idx;
}
} else if key.modifiers.contains(KeyModifiers::CONTROL) {
match key.code {
KeyCode::Char('c') => { app.should_quit = true; }
KeyCode::Char('r') => hotkey_cycle_reasoning(mind),
KeyCode::Char('k') => hotkey_kill_processes(mind).await,
KeyCode::Char('p') => hotkey_cycle_autonomy(mind),
_ => {}
}
}
}
Event::Resize(_, _) => {
let _ = terminal.autoresize();
let _ = terminal.clear();
}
_ => {}
}
}
if app.should_quit {
break;
}
}
tui::restore_terminal(&mut terminal)?;
Ok(())
}
// --- CLI ---
use clap::{Parser, Subcommand};
use std::path::PathBuf;
#[derive(Parser, Debug, Default)]
#[command(name = "consciousness", about = "Substrate-independent AI agent")]
pub struct CliArgs {
/// Select active backend ("anthropic" or "openrouter")
#[arg(long)]
pub backend: Option<String>,
/// Model override
#[arg(short, long)]
pub model: Option<String>,
/// API key override
#[arg(long)]
pub api_key: Option<String>,
/// Base URL override
#[arg(long)]
pub api_base: Option<String>,
/// Enable debug logging
#[arg(long)]
pub debug: bool,
/// Print effective config with provenance and exit
#[arg(long)]
pub show_config: bool,
/// Override all prompt assembly with this file
#[arg(long)]
pub system_prompt_file: Option<PathBuf>,
/// Project memory directory
#[arg(long)]
pub memory_project: Option<PathBuf>,
/// Max consecutive DMN turns
#[arg(long)]
pub dmn_max_turns: Option<u32>,
/// Disable background agents (surface, observe, scoring)
#[arg(long)]
pub no_agents: bool,
#[command(subcommand)]
pub command: Option<SubCmd>,
}
#[derive(Subcommand, Debug)]
pub enum SubCmd {
/// Print new output since last read and exit
Read {
/// Stream output continuously instead of exiting
#[arg(short, long)]
follow: bool,
/// Block until a complete response is received, then exit
#[arg(long)]
block: bool,
},
/// Send a message to the running agent
Write {
/// The message to send
message: Vec<String>,
},
}
#[tokio::main]
pub async fn main() {
// Initialize the Qwen tokenizer for direct token generation
let tokenizer_path = dirs::home_dir().unwrap_or_default()
.join(".consciousness/tokenizer-qwen35.json");
if tokenizer_path.exists() {
crate::agent::tokenizer::init(&tokenizer_path.to_string_lossy());
}
let cli = CliArgs::parse();
if cli.show_config {
match crate::config::load_app(&cli) {
Ok((app, figment)) => crate::config::show_config(&app, &figment),
Err(e) => {
eprintln!("Error loading config: {:#}", e);
std::process::exit(1);
}
}
return;
}
if let Err(e) = start(cli).await {
let _ = ratatui::crossterm::terminal::disable_raw_mode();
let _ = ratatui::crossterm::execute!(
std::io::stdout(),
ratatui::crossterm::terminal::LeaveAlternateScreen
);
eprintln!("Error: {:#}", e);
std::process::exit(1);
}
}