consciousness/src/mind/observe.rs

317 lines
12 KiB
Rust
Raw Normal View History

// observe.rs — Shared observation socket + logfile
//
// Two mechanisms:
// 1. Logfile (~/.consciousness/agent-sessions/observe.log) — append-only
// plain text of the conversation. `poc-agent read` prints new
// content since last read using a byte-offset cursor file.
// 2. Unix socket — for live streaming (`poc-agent read -f`) and
// sending input (`poc-agent write <msg>`).
//
// The logfile is the history. The socket is the live wire.
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{broadcast, Mutex};
use crate::user::ui_channel::UiMessage;
fn format_message(msg: &UiMessage) -> Option<String> {
match msg {
UiMessage::TextDelta(text, _) => {
let t = text.trim_end();
if t.is_empty() { None } else { Some(t.to_string()) }
}
UiMessage::UserInput(text) => Some(format!("\n> {}", text)),
UiMessage::ToolCall { name, args_summary } => {
if args_summary.is_empty() {
Some(format!("[{}]", name))
} else {
Some(format!("[{}: {}]", name, args_summary))
}
}
UiMessage::ToolResult { name, result } => {
let preview: String = result.lines().take(3).collect::<Vec<_>>().join("\n");
if name.is_empty() {
Some(format!("{}", preview))
} else {
Some(format!("{}: {}", name, preview))
}
}
UiMessage::DmnAnnotation(text) => Some(text.clone()),
UiMessage::Info(text) if !text.is_empty() => Some(text.clone()),
UiMessage::Reasoning(text) => {
let t = text.trim();
if t.is_empty() { None } else { Some(format!("(thinking: {})", t)) }
}
_ => None,
}
}
pub type InputSender = tokio::sync::mpsc::UnboundedSender<String>;
pub type InputReceiver = tokio::sync::mpsc::UnboundedReceiver<String>;
pub fn input_channel() -> (InputSender, InputReceiver) {
tokio::sync::mpsc::unbounded_channel()
}
fn session_dir() -> PathBuf {
dirs::home_dir().unwrap_or_default().join(".consciousness/agent-sessions")
}
fn socket_path() -> PathBuf { session_dir().join("agent.sock") }
fn log_path() -> PathBuf {
let dir = dirs::home_dir().unwrap_or_default().join(".consciousness/logs");
let _ = std::fs::create_dir_all(&dir);
dir.join("observe.log")
}
fn cursor_path() -> PathBuf { session_dir().join("read-cursor") }
// --- Client commands ---
/// Print new output since last read. With -f, stream live. With block, wait for one response.
pub async fn cmd_read_inner(follow: bool, block: bool, debug: bool) -> anyhow::Result<()> {
use std::io::{Read, Seek, SeekFrom, Write};
let log = log_path();
let cursor = cursor_path();
if debug {
eprintln!("log: {}", log.display());
}
let offset: u64 = std::fs::read_to_string(&cursor)
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
if let Ok(mut f) = std::fs::File::open(&log) {
let len = f.metadata()?.len();
if offset < len {
f.seek(SeekFrom::Start(offset))?;
let mut buf = String::new();
f.read_to_string(&mut buf)?;
print!("{}", buf);
let _ = std::io::stdout().flush();
} else if !follow && !block {
println!("(nothing new)");
}
let _ = std::fs::write(&cursor, len.to_string());
} else if !follow && !block {
println!("(no log yet — is consciousness running?)");
return Ok(());
}
if !follow && !block {
return Ok(());
}
// -f or --block: connect to socket for live output
let sock = socket_path();
let stream = UnixStream::connect(&sock).await
.map_err(|e| anyhow::anyhow!(
"can't connect for live streaming — is consciousness running? ({})", e
))?;
let (reader, _) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
print!("{}", line);
let _ = std::io::stdout().lock().flush();
// In blocking mode, stop when we see a new user input
// Format: "> X: " where X is a speaker (P, K, etc.)
if block && line.trim_start().starts_with("> ") {
let after_gt = line.trim_start().strip_prefix("> ").unwrap_or("");
if after_gt.contains(':') {
break;
}
}
}
Err(_) => break,
}
}
Ok(())
}
/// Send a message to the running agent.
pub async fn cmd_write(message: &str, debug: bool) -> anyhow::Result<()> {
let sock = socket_path();
if debug {
eprintln!("connecting to {}", sock.display());
}
let stream = UnixStream::connect(&sock).await
.map_err(|e| anyhow::anyhow!(
"can't connect — is consciousness running? ({})", e
))?;
let (_, mut writer) = stream.into_split();
writer.write_all(message.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.shutdown().await?;
Ok(())
}
// --- Server ---
/// Start the observation socket + logfile writer.
pub fn start(
socket_path_override: PathBuf,
mut ui_rx: broadcast::Receiver<UiMessage>,
input_tx: InputSender,
) {
let _ = std::fs::remove_file(&socket_path_override);
let listener = UnixListener::bind(&socket_path_override)
.expect("failed to bind observation socket");
// Open logfile
let logfile = Arc::new(Mutex::new(
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_path())
.expect("failed to open observe log"),
));
let (line_tx, _) = broadcast::channel::<String>(256);
let line_tx2 = line_tx.clone();
// Receive UiMessages → write to logfile + broadcast to socket clients.
// TextDelta and Reasoning tokens are buffered and flushed on turn
// boundaries so the log reads as complete messages, not token fragments.
tokio::spawn(async move {
let mut text_buf = String::new();
let mut reasoning_buf = String::new();
loop {
match ui_rx.recv().await {
Ok(msg) => {
// Buffer streaming tokens
match &msg {
UiMessage::TextDelta(text, _) => {
text_buf.push_str(text);
continue;
}
UiMessage::Reasoning(text) => {
reasoning_buf.push_str(text);
continue;
}
_ => {}
}
// Flush reasoning buffer as one line
if !reasoning_buf.is_empty() {
let thinking = format!("(thinking: {})", reasoning_buf.trim());
use std::io::Write;
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", thinking);
let _ = f.flush();
let _ = line_tx2.send(thinking);
reasoning_buf.clear();
}
// Flush text buffer
if !text_buf.is_empty() {
use std::io::Write;
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", text_buf);
let _ = f.flush();
let _ = line_tx2.send(std::mem::take(&mut text_buf));
}
// Write the non-streaming message
if let Some(line) = format_message(&msg) {
use std::io::Write;
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", line);
let _ = f.flush();
let _ = line_tx2.send(line);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => {
use std::io::Write;
if !reasoning_buf.is_empty() {
let thinking = format!("(thinking: {})", reasoning_buf.trim());
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", thinking);
let _ = f.flush();
let _ = line_tx2.send(thinking);
}
if !text_buf.is_empty() {
let mut f = logfile.lock().await;
let _ = writeln!(f, "{}", text_buf);
let _ = f.flush();
let _ = line_tx2.send(text_buf);
}
break;
}
}
}
});
// Accept socket connections (live streaming + input)
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let mut line_rx = line_tx.subscribe();
let input_tx = input_tx.clone();
tokio::spawn(async move {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut input_buf = String::new();
loop {
tokio::select! {
biased;
result = reader.read_line(&mut input_buf) => {
match result {
Ok(0) | Err(_) => break,
Ok(_) => {
let line = input_buf.trim().to_string();
if !line.is_empty() {
let _ = input_tx.send(line);
}
input_buf.clear();
}
}
}
result = line_rx.recv() => {
match result {
Ok(line) => {
let data = format!("{}\n", line);
if writer.write_all(data.as_bytes()).await.is_err() {
break;
}
let _ = writer.flush().await;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
let _ = writer.write_all(
b"[some output was dropped]\n"
).await;
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
});
}
Err(_) => break,
}
}
});
}