From 120ffabfaa566c43232d6f336e249930c125488f Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 5 Apr 2026 05:34:59 -0400 Subject: [PATCH] Kill socket, read/write subcommands Redundant with channels Signed-off-by: Kent Overstreet --- src/mind/log.rs | 304 ------------------------------------------------ src/mind/mod.rs | 8 -- src/user/mod.rs | 23 ---- 3 files changed, 335 deletions(-) diff --git a/src/mind/log.rs b/src/mind/log.rs index de6639a..1de9593 100644 --- a/src/mind/log.rs +++ b/src/mind/log.rs @@ -13,12 +13,6 @@ use anyhow::{Context, Result}; use std::fs::{File, OpenOptions}; use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; -use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader}; -use tokio::net::{UnixListener, UnixStream}; -use tokio::sync::{broadcast, Mutex}; - -use crate::user::ui_channel::UiMessage; use crate::agent::context::ConversationEntry; pub struct ConversationLog { @@ -110,301 +104,3 @@ impl ConversationLog { None } } - -fn format_message(msg: &UiMessage) -> Option { - match msg { - UiMessage::TextDelta(text, _) => { - let t = text.trim_end(); - if t.is_empty() { None } else { Some(t.to_string()) } - } - UiMessage::UserInput(text) => Some(format!("\n> {}", text)), - UiMessage::ToolCall { name, args_summary } => { - if args_summary.is_empty() { - Some(format!("[{}]", name)) - } else { - Some(format!("[{}: {}]", name, args_summary)) - } - } - UiMessage::ToolResult { name, result } => { - let preview: String = result.lines().take(3).collect::>().join("\n"); - if name.is_empty() { - Some(format!(" → {}", preview)) - } else { - Some(format!(" → {}: {}", name, preview)) - } - } - UiMessage::DmnAnnotation(text) => Some(text.clone()), - UiMessage::Info(text) if !text.is_empty() => Some(text.clone()), - UiMessage::Reasoning(text) => { - let t = text.trim(); - if t.is_empty() { None } else { Some(format!("(thinking: {})", t)) } - } - _ => None, - } -} - -pub type InputSender = tokio::sync::mpsc::UnboundedSender; -pub type InputReceiver = tokio::sync::mpsc::UnboundedReceiver; - -pub fn input_channel() -> (InputSender, InputReceiver) { - tokio::sync::mpsc::unbounded_channel() -} - -fn session_dir() -> PathBuf { - dirs::home_dir().unwrap_or_default().join(".consciousness/agent-sessions") -} - -fn socket_path() -> PathBuf { session_dir().join("agent.sock") } -fn log_path() -> PathBuf { - let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); - let _ = std::fs::create_dir_all(&dir); - dir.join("observe.log") -} -fn cursor_path() -> PathBuf { session_dir().join("read-cursor") } - -// --- Client commands --- - -/// Print new output since last read. With -f, stream live. With block, wait for one response. -pub async fn cmd_read_inner(follow: bool, block: bool, debug: bool) -> anyhow::Result<()> { - use std::io::{Read, Seek, SeekFrom, Write}; - - let log = log_path(); - let cursor = cursor_path(); - - if debug { - eprintln!("log: {}", log.display()); - } - - let offset: u64 = std::fs::read_to_string(&cursor) - .ok() - .and_then(|s| s.trim().parse().ok()) - .unwrap_or(0); - - if let Ok(mut f) = std::fs::File::open(&log) { - let len = f.metadata()?.len(); - if offset < len { - f.seek(SeekFrom::Start(offset))?; - let mut buf = String::new(); - f.read_to_string(&mut buf)?; - print!("{}", buf); - let _ = std::io::stdout().flush(); - } else if !follow && !block { - println!("(nothing new)"); - } - let _ = std::fs::write(&cursor, len.to_string()); - } else if !follow && !block { - println!("(no log yet — is consciousness running?)"); - return Ok(()); - } - - if !follow && !block { - return Ok(()); - } - - // -f or --block: connect to socket for live output - let sock = socket_path(); - let stream = UnixStream::connect(&sock).await - .map_err(|e| anyhow::anyhow!( - "can't connect for live streaming — is consciousness running? ({})", e - ))?; - - let (reader, _) = stream.into_split(); - let mut reader = TokioBufReader::new(reader); - let mut line = String::new(); - - loop { - line.clear(); - match reader.read_line(&mut line).await { - Ok(0) => break, - Ok(_) => { - print!("{}", line); - let _ = std::io::stdout().lock().flush(); - - // In blocking mode, stop when we see a new user input - // Format: "> X: " where X is a speaker (P, K, etc.) - if block && line.trim_start().starts_with("> ") { - let after_gt = line.trim_start().strip_prefix("> ").unwrap_or(""); - if after_gt.contains(':') { - break; - } - } - } - Err(_) => break, - } - } - Ok(()) -} - -/// Send a message to the running agent. -pub async fn cmd_write(message: &str, debug: bool) -> anyhow::Result<()> { - let sock = socket_path(); - if debug { - eprintln!("connecting to {}", sock.display()); - } - let stream = UnixStream::connect(&sock).await - .map_err(|e| anyhow::anyhow!( - "can't connect — is consciousness running? ({})", e - ))?; - - let (_, mut writer) = stream.into_split(); - writer.write_all(message.as_bytes()).await?; - writer.write_all(b"\n").await?; - writer.shutdown().await?; - Ok(()) -} - -// --- Server --- - -/// Start the observation socket + logfile writer. -pub fn start( - socket_path_override: PathBuf, - mut ui_rx: broadcast::Receiver, - input_tx: InputSender, -) { - let _ = std::fs::remove_file(&socket_path_override); - - let listener = UnixListener::bind(&socket_path_override) - .expect("failed to bind observation socket"); - - // Open logfile - let logfile = Arc::new(Mutex::new( - std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(log_path()) - .expect("failed to open observe log"), - )); - - let (line_tx, _) = broadcast::channel::(256); - let line_tx2 = line_tx.clone(); - - // Receive UiMessages → write to logfile + broadcast to socket clients. - // TextDelta and Reasoning tokens are buffered and flushed on turn - // boundaries so the log reads as complete messages, not token fragments. - tokio::spawn(async move { - let mut text_buf = String::new(); - let mut reasoning_buf = String::new(); - - loop { - match ui_rx.recv().await { - Ok(msg) => { - // Buffer streaming tokens - match &msg { - UiMessage::TextDelta(text, _) => { - text_buf.push_str(text); - continue; - } - UiMessage::Reasoning(text) => { - reasoning_buf.push_str(text); - continue; - } - _ => {} - } - - // Flush reasoning buffer as one line - if !reasoning_buf.is_empty() { - let thinking = format!("(thinking: {})", reasoning_buf.trim()); - use std::io::Write; - let mut f = logfile.lock().await; - let _ = writeln!(f, "{}", thinking); - let _ = f.flush(); - let _ = line_tx2.send(thinking); - reasoning_buf.clear(); - } - - // Flush text buffer - if !text_buf.is_empty() { - use std::io::Write; - let mut f = logfile.lock().await; - let _ = writeln!(f, "{}", text_buf); - let _ = f.flush(); - let _ = line_tx2.send(std::mem::take(&mut text_buf)); - } - - // Write the non-streaming message - if let Some(line) = format_message(&msg) { - use std::io::Write; - let mut f = logfile.lock().await; - let _ = writeln!(f, "{}", line); - let _ = f.flush(); - let _ = line_tx2.send(line); - } - } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => { - use std::io::Write; - if !reasoning_buf.is_empty() { - let thinking = format!("(thinking: {})", reasoning_buf.trim()); - let mut f = logfile.lock().await; - let _ = writeln!(f, "{}", thinking); - let _ = f.flush(); - let _ = line_tx2.send(thinking); - } - if !text_buf.is_empty() { - let mut f = logfile.lock().await; - let _ = writeln!(f, "{}", text_buf); - let _ = f.flush(); - let _ = line_tx2.send(text_buf); - } - break; - } - } - } - }); - - // Accept socket connections (live streaming + input) - tokio::spawn(async move { - loop { - match listener.accept().await { - Ok((stream, _)) => { - let mut line_rx = line_tx.subscribe(); - let input_tx = input_tx.clone(); - - tokio::spawn(async move { - let (reader, mut writer) = stream.into_split(); - let mut reader = TokioBufReader::new(reader); - let mut input_buf = String::new(); - - loop { - tokio::select! { - biased; - - result = reader.read_line(&mut input_buf) => { - match result { - Ok(0) | Err(_) => break, - Ok(_) => { - let line = input_buf.trim().to_string(); - if !line.is_empty() { - let _ = input_tx.send(line); - } - input_buf.clear(); - } - } - } - - result = line_rx.recv() => { - match result { - Ok(line) => { - let data = format!("{}\n", line); - if writer.write_all(data.as_bytes()).await.is_err() { - break; - } - let _ = writer.flush().await; - } - Err(broadcast::error::RecvError::Lagged(_)) => { - let _ = writer.write_all( - b"[some output was dropped]\n" - ).await; - } - Err(broadcast::error::RecvError::Closed) => break, - } - } - } - } - }); - } - Err(_) => break, - } - } - }); -} diff --git a/src/mind/mod.rs b/src/mind/mod.rs index 8e11856..751cc05 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -244,14 +244,6 @@ impl Mind { let mut ag = self.agent.lock().await; ag.restore_from_log(); drop(ag); - - // Start observation socket - if !self.config.no_agents { - let socket_path = self.config.session_dir.join("agent.sock"); - let (observe_input_tx, _observe_input_rx) = log::input_channel(); - log::start(socket_path, self.ui_tx.subscribe(), observe_input_tx); - self.start_memory_scoring(); - } } pub fn turn_watch(&self) -> tokio::sync::watch::Receiver { diff --git a/src/user/mod.rs b/src/user/mod.rs index 46b0a16..d41de6c 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -723,29 +723,6 @@ pub enum SubCmd { pub async fn main() { let cli = CliArgs::parse(); - match &cli.command { - Some(SubCmd::Read { follow, block }) => { - if let Err(e) = crate::mind::log::cmd_read_inner(*follow, *block, cli.debug).await { - eprintln!("{:#}", e); - std::process::exit(1); - } - return; - } - Some(SubCmd::Write { message }) => { - let msg = message.join(" "); - if msg.is_empty() { - eprintln!("Usage: consciousness write "); - std::process::exit(1); - } - if let Err(e) = crate::mind::log::cmd_write(&msg, cli.debug).await { - eprintln!("{:#}", e); - std::process::exit(1); - } - return; - } - None => {} - } - if cli.show_config { match crate::config::load_app(&cli) { Ok((app, figment)) => crate::config::show_config(&app, &figment),