diff --git a/src/mind/mod.rs b/src/mind/mod.rs index cb70928..06de469 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -192,13 +192,23 @@ impl Mind { Self { agent, shared, config, ui_tx, turn_tx, turn_handle: None, turn_watch } } - /// Initialize — restore conversation from log, start background agents. + /// Initialize — restore log, start daemons and background agents. pub async fn init(&mut self) { + // Restore conversation let mut ag = self.agent.lock().await; ag.restore_from_log(); drop(ag); + // Start channel daemons + let mut sup = crate::thalamus::supervisor::Supervisor::new(); + sup.load_config(); + sup.ensure_running(); + + // Start observation socket if !self.config.no_agents { + let socket_path = self.config.session_dir.join("agent.sock"); + let (observe_input_tx, _observe_input_rx) = log::input_channel(); + log::start(socket_path, self.ui_tx.subscribe(), observe_input_tx); self.start_memory_scoring(); } } @@ -365,26 +375,6 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { unsafe { std::env::set_var("POC_DEBUG", "1") }; } - // Start channel daemons - let mut channel_supervisor = crate::thalamus::supervisor::Supervisor::new(); - channel_supervisor.load_config(); - channel_supervisor.ensure_running(); - - // Initialize idle state machine - let mut idle_state = crate::thalamus::idle::State::new(); - idle_state.load(); - - // Channel status - let (channel_tx, channel_rx) = tokio::sync::mpsc::channel::>(4); - { - let tx = channel_tx.clone(); - tokio::spawn(async move { - let result = crate::thalamus::channels::fetch_all_channels().await; - let _ = tx.send(result).await; - }); - } - let notify_rx = crate::thalamus::channels::subscribe_all(); - // Create UI channel let (ui_tx, ui_rx) = ui_channel::channel(); @@ -394,9 +384,9 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { let client = ApiClient::new(&config.api_base, &config.api_key, &config.model); - let conversation_log_path = config.session_dir.join("conversation.jsonl"); - let conversation_log = log::ConversationLog::new(conversation_log_path.clone()) - .expect("failed to create conversation log"); + let conversation_log = log::ConversationLog::new( + config.session_dir.join("conversation.jsonl"), + ).ok(); let agent = Arc::new(Mutex::new(Agent::new( client, @@ -404,30 +394,18 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { config.context_parts.clone(), config.app.clone(), config.prompt_file.clone(), - Some(conversation_log), + conversation_log, shared_context.clone(), shared_active_tools.clone(), ))); let (turn_tx, turn_rx) = mpsc::channel::<(Result, StreamTarget)>(1); + let (mind_tx, mind_rx) = tokio::sync::mpsc::unbounded_channel(); - let no_agents = config.no_agents; let shared_mind = shared_mind_state(config.app.dmn.max_turns); let mut mind = Mind::new(agent, shared_mind.clone(), config, ui_tx.clone(), turn_tx); mind.init().await; - // Start observation socket - let socket_path = mind.config.session_dir.join("agent.sock"); - let (observe_input_tx, observe_input_rx) = log::input_channel(); - if !no_agents { - log::start(socket_path, ui_tx.subscribe(), observe_input_tx); - } - - // Mind ↔ UI channel - let (mind_tx, mind_rx) = tokio::sync::mpsc::unbounded_channel(); - - // App for TUI - let app = tui::App::new(mind.config.model.clone(), shared_context, shared_active_tools); let ui_agent = mind.agent.clone(); let turn_watch = mind.turn_watch(); @@ -435,8 +413,10 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { tokio::spawn(async move { mind.run(mind_rx, turn_rx).await; }); + + // Run UI event loop crate::user::event_loop::run( - app, ui_agent, shared_mind, turn_watch, mind_tx, ui_tx, ui_rx, observe_input_rx, - channel_tx, channel_rx, notify_rx, idle_state, + tui::App::new(String::new(), shared_context, shared_active_tools), + ui_agent, shared_mind, turn_watch, mind_tx, ui_tx, ui_rx, ).await } diff --git a/src/user/event_loop.rs b/src/user/event_loop.rs index 6b592d8..aaaa56e 100644 --- a/src/user/event_loop.rs +++ b/src/user/event_loop.rs @@ -185,12 +185,20 @@ pub async fn run( mind_tx: tokio::sync::mpsc::UnboundedSender, ui_tx: ui_channel::UiSender, mut ui_rx: ui_channel::UiReceiver, - mut observe_input_rx: tokio::sync::mpsc::UnboundedReceiver, - channel_tx: tokio::sync::mpsc::Sender>, - mut channel_rx: tokio::sync::mpsc::Receiver>, - notify_rx: std::sync::mpsc::Receiver, - mut idle_state: crate::thalamus::idle::State, ) -> Result<()> { + // UI-owned state + let mut idle_state = crate::thalamus::idle::State::new(); + idle_state.load(); + + let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::>(4); + { + let tx = channel_tx.clone(); + tokio::spawn(async move { + let result = crate::thalamus::channels::fetch_all_channels().await; + let _ = tx.send(result).await; + }); + } + let notify_rx = crate::thalamus::channels::subscribe_all(); let mut terminal = tui::init_terminal()?; let mut reader = EventStream::new(); @@ -251,10 +259,6 @@ pub async fn run( } } - Some(line) = observe_input_rx.recv() => { - app.submitted.push(line); - dirty = true; - } _ = render_interval.tick() => { idle_state.decay_ewma();