// observe.rs — Shared observation socket + logfile // // Two mechanisms: // 1. Logfile (~/.consciousness/agent-sessions/observe.log) — append-only // plain text of the conversation. `poc-agent read` prints new // content since last read using a byte-offset cursor file. // 2. Unix socket — for live streaming (`poc-agent read -f`) and // sending input (`poc-agent write `). // // The logfile is the history. The socket is the live wire. use std::path::PathBuf; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{broadcast, Mutex}; use crate::agent::ui_channel::UiMessage; fn format_message(msg: &UiMessage) -> Option { match msg { UiMessage::TextDelta(text, _) => { let t = text.trim_end(); if t.is_empty() { None } else { Some(t.to_string()) } } UiMessage::UserInput(text) => Some(format!("\n> {}", text)), UiMessage::ToolCall { name, args_summary } => { if args_summary.is_empty() { Some(format!("[{}]", name)) } else { Some(format!("[{}: {}]", name, args_summary)) } } UiMessage::ToolResult { name, result } => { let preview: String = result.lines().take(3).collect::>().join("\n"); if name.is_empty() { Some(format!(" → {}", preview)) } else { Some(format!(" → {}: {}", name, preview)) } } UiMessage::DmnAnnotation(text) => Some(text.clone()), UiMessage::Info(text) if !text.is_empty() => Some(text.clone()), UiMessage::Reasoning(text) => { let t = text.trim(); if t.is_empty() { None } else { Some(format!("(thinking: {})", t)) } } _ => None, } } pub type InputSender = tokio::sync::mpsc::UnboundedSender; pub type InputReceiver = tokio::sync::mpsc::UnboundedReceiver; pub fn input_channel() -> (InputSender, InputReceiver) { tokio::sync::mpsc::unbounded_channel() } fn session_dir() -> PathBuf { dirs::home_dir().unwrap_or_default().join(".consciousness/agent-sessions") } fn socket_path() -> PathBuf { session_dir().join("agent.sock") } fn log_path() -> PathBuf { let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs"); let _ = std::fs::create_dir_all(&dir); dir.join("observe.log") } fn cursor_path() -> PathBuf { session_dir().join("read-cursor") } // --- Client commands --- /// Print new output since last read. With -f, stream live. With block, wait for one response. pub async fn cmd_read_inner(follow: bool, block: bool, debug: bool) -> anyhow::Result<()> { use std::io::{Read, Seek, SeekFrom, Write}; let log = log_path(); let cursor = cursor_path(); if debug { eprintln!("log: {}", log.display()); } let offset: u64 = std::fs::read_to_string(&cursor) .ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0); if let Ok(mut f) = std::fs::File::open(&log) { let len = f.metadata()?.len(); if offset < len { f.seek(SeekFrom::Start(offset))?; let mut buf = String::new(); f.read_to_string(&mut buf)?; print!("{}", buf); let _ = std::io::stdout().flush(); } else if !follow && !block { println!("(nothing new)"); } let _ = std::fs::write(&cursor, len.to_string()); } else if !follow && !block { println!("(no log yet — is poc-agent running?)"); return Ok(()); } if !follow && !block { return Ok(()); } // -f or --block: connect to socket for live output let sock = socket_path(); let stream = UnixStream::connect(&sock).await .map_err(|e| anyhow::anyhow!( "can't connect for live streaming — is poc-agent running? ({})", e ))?; let (reader, _) = stream.into_split(); let mut reader = BufReader::new(reader); let mut line = String::new(); loop { line.clear(); match reader.read_line(&mut line).await { Ok(0) => break, Ok(_) => { print!("{}", line); let _ = std::io::stdout().lock().flush(); // In blocking mode, stop when we see a new user input // Format: "> X: " where X is a speaker (P, K, etc.) if block && line.trim_start().starts_with("> ") { let after_gt = line.trim_start().strip_prefix("> ").unwrap_or(""); if after_gt.contains(':') { break; } } } Err(_) => break, } } Ok(()) } /// Send a message to the running agent. pub async fn cmd_write(message: &str, debug: bool) -> anyhow::Result<()> { let sock = socket_path(); if debug { eprintln!("connecting to {}", sock.display()); } let stream = UnixStream::connect(&sock).await .map_err(|e| anyhow::anyhow!( "can't connect — is poc-agent running? ({})", e ))?; let (_, mut writer) = stream.into_split(); writer.write_all(message.as_bytes()).await?; writer.write_all(b"\n").await?; writer.shutdown().await?; Ok(()) } // --- Server --- /// Start the observation socket + logfile writer. pub fn start( socket_path_override: PathBuf, mut ui_rx: broadcast::Receiver, input_tx: InputSender, ) { let _ = std::fs::remove_file(&socket_path_override); let listener = UnixListener::bind(&socket_path_override) .expect("failed to bind observation socket"); // Open logfile let logfile = Arc::new(Mutex::new( std::fs::OpenOptions::new() .create(true) .append(true) .open(log_path()) .expect("failed to open observe log"), )); let (line_tx, _) = broadcast::channel::(256); let line_tx2 = line_tx.clone(); // Receive UiMessages → write to logfile + broadcast to socket clients. // TextDelta and Reasoning tokens are buffered and flushed on turn // boundaries so the log reads as complete messages, not token fragments. tokio::spawn(async move { let mut text_buf = String::new(); let mut reasoning_buf = String::new(); loop { match ui_rx.recv().await { Ok(msg) => { // Buffer streaming tokens match &msg { UiMessage::TextDelta(text, _) => { text_buf.push_str(text); continue; } UiMessage::Reasoning(text) => { reasoning_buf.push_str(text); continue; } _ => {} } // Flush reasoning buffer as one line if !reasoning_buf.is_empty() { let thinking = format!("(thinking: {})", reasoning_buf.trim()); use std::io::Write; let mut f = logfile.lock().await; let _ = writeln!(f, "{}", thinking); let _ = f.flush(); let _ = line_tx2.send(thinking); reasoning_buf.clear(); } // Flush text buffer if !text_buf.is_empty() { use std::io::Write; let mut f = logfile.lock().await; let _ = writeln!(f, "{}", text_buf); let _ = f.flush(); let _ = line_tx2.send(std::mem::take(&mut text_buf)); } // Write the non-streaming message if let Some(line) = format_message(&msg) { use std::io::Write; let mut f = logfile.lock().await; let _ = writeln!(f, "{}", line); let _ = f.flush(); let _ = line_tx2.send(line); } } Err(broadcast::error::RecvError::Lagged(_)) => {} Err(broadcast::error::RecvError::Closed) => { use std::io::Write; if !reasoning_buf.is_empty() { let thinking = format!("(thinking: {})", reasoning_buf.trim()); let mut f = logfile.lock().await; let _ = writeln!(f, "{}", thinking); let _ = f.flush(); let _ = line_tx2.send(thinking); } if !text_buf.is_empty() { let mut f = logfile.lock().await; let _ = writeln!(f, "{}", text_buf); let _ = f.flush(); let _ = line_tx2.send(text_buf); } break; } } } }); // Accept socket connections (live streaming + input) tokio::spawn(async move { loop { match listener.accept().await { Ok((stream, _)) => { let mut line_rx = line_tx.subscribe(); let input_tx = input_tx.clone(); tokio::spawn(async move { let (reader, mut writer) = stream.into_split(); let mut reader = BufReader::new(reader); let mut input_buf = String::new(); loop { tokio::select! { biased; result = reader.read_line(&mut input_buf) => { match result { Ok(0) | Err(_) => break, Ok(_) => { let line = input_buf.trim().to_string(); if !line.is_empty() { let _ = input_tx.send(line); } input_buf.clear(); } } } result = line_rx.recv() => { match result { Ok(line) => { let data = format!("{}\n", line); if writer.write_all(data.as_bytes()).await.is_err() { break; } let _ = writer.flush().await; } Err(broadcast::error::RecvError::Lagged(_)) => { let _ = writer.write_all( b"[some output was dropped]\n" ).await; } Err(broadcast::error::RecvError::Closed) => break, } } } } }); } Err(_) => break, } } }); }