forked from kent/consciousness
The observe log was writing each TextDelta SSE token as a separate line, making poc-agent read show word-by-word fragments and causing the read cursor to advance past partial responses. Now TextDelta and Reasoning tokens are buffered and flushed as complete messages on turn boundaries (tool calls, user input, etc). The socket path (read -f) still streams live. Also fixed a potential deadlock: replaced blocking_lock() with .lock().await on the shared logfile mutex. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-Authored-By: Qwen 3.5 27B <noreply@qwen.ai>
304 lines
11 KiB
Rust
304 lines
11 KiB
Rust
// observe.rs — Shared observation socket + logfile
|
|
//
|
|
// Two mechanisms:
|
|
// 1. Logfile (~/.cache/poc-agent/sessions/observe.log) — append-only
|
|
// plain text of the conversation. `poc-agent read` prints new
|
|
// content since last read using a byte-offset cursor file.
|
|
// 2. Unix socket — for live streaming (`poc-agent read -f`) and
|
|
// sending input (`poc-agent write <msg>`).
|
|
//
|
|
// The logfile is the history. The socket is the live wire.
|
|
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
|
use tokio::net::{UnixListener, UnixStream};
|
|
use tokio::sync::{broadcast, Mutex};
|
|
|
|
use crate::ui_channel::UiMessage;
|
|
|
|
fn format_message(msg: &UiMessage) -> Option<String> {
|
|
match msg {
|
|
UiMessage::TextDelta(text, _) => {
|
|
let t = text.trim_end();
|
|
if t.is_empty() { None } else { Some(t.to_string()) }
|
|
}
|
|
UiMessage::UserInput(text) => Some(format!("\n> {}", text)),
|
|
UiMessage::ToolCall { name, args_summary } => {
|
|
if args_summary.is_empty() {
|
|
Some(format!("[{}]", name))
|
|
} else {
|
|
Some(format!("[{}: {}]", name, args_summary))
|
|
}
|
|
}
|
|
UiMessage::ToolResult { name, result } => {
|
|
let preview: String = result.lines().take(3).collect::<Vec<_>>().join("\n");
|
|
if name.is_empty() {
|
|
Some(format!(" → {}", preview))
|
|
} else {
|
|
Some(format!(" → {}: {}", name, preview))
|
|
}
|
|
}
|
|
UiMessage::DmnAnnotation(text) => Some(text.clone()),
|
|
UiMessage::Info(text) if !text.is_empty() => Some(text.clone()),
|
|
UiMessage::Reasoning(text) => {
|
|
let t = text.trim();
|
|
if t.is_empty() { None } else { Some(format!("(thinking: {})", t)) }
|
|
}
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
pub type InputSender = tokio::sync::mpsc::UnboundedSender<String>;
|
|
pub type InputReceiver = tokio::sync::mpsc::UnboundedReceiver<String>;
|
|
|
|
pub fn input_channel() -> (InputSender, InputReceiver) {
|
|
tokio::sync::mpsc::unbounded_channel()
|
|
}
|
|
|
|
fn session_dir() -> PathBuf {
|
|
let cache = dirs::cache_dir().unwrap_or_else(|| PathBuf::from("/tmp"));
|
|
cache.join("poc-agent/sessions")
|
|
}
|
|
|
|
fn socket_path() -> PathBuf { session_dir().join("agent.sock") }
|
|
fn log_path() -> PathBuf { session_dir().join("observe.log") }
|
|
fn cursor_path() -> PathBuf { session_dir().join("read-cursor") }
|
|
|
|
// --- Client commands ---
|
|
|
|
/// Print new output since last read. With -f, also stream live from socket.
|
|
pub async fn cmd_read(follow: bool, debug: bool) -> anyhow::Result<()> {
|
|
use std::io::{Read, Seek, SeekFrom, Write};
|
|
|
|
let log = log_path();
|
|
let cursor = cursor_path();
|
|
|
|
if debug {
|
|
eprintln!("log: {}", log.display());
|
|
}
|
|
|
|
let offset: u64 = std::fs::read_to_string(&cursor)
|
|
.ok()
|
|
.and_then(|s| s.trim().parse().ok())
|
|
.unwrap_or(0);
|
|
|
|
if let Ok(mut f) = std::fs::File::open(&log) {
|
|
let len = f.metadata()?.len();
|
|
if offset < len {
|
|
f.seek(SeekFrom::Start(offset))?;
|
|
let mut buf = String::new();
|
|
f.read_to_string(&mut buf)?;
|
|
print!("{}", buf);
|
|
let _ = std::io::stdout().flush();
|
|
} else if !follow {
|
|
println!("(nothing new)");
|
|
}
|
|
let _ = std::fs::write(&cursor, len.to_string());
|
|
} else if !follow {
|
|
println!("(no log yet — is poc-agent running?)");
|
|
return Ok(());
|
|
}
|
|
|
|
if !follow {
|
|
return Ok(());
|
|
}
|
|
|
|
// -f: connect to socket for live output
|
|
let sock = socket_path();
|
|
let stream = UnixStream::connect(&sock).await
|
|
.map_err(|e| anyhow::anyhow!(
|
|
"can't connect for live streaming — is poc-agent running? ({})", e
|
|
))?;
|
|
|
|
let (reader, _) = stream.into_split();
|
|
let mut reader = BufReader::new(reader);
|
|
let mut line = String::new();
|
|
|
|
loop {
|
|
line.clear();
|
|
match reader.read_line(&mut line).await {
|
|
Ok(0) => break,
|
|
Ok(_) => {
|
|
print!("{}", line);
|
|
let _ = std::io::stdout().lock().flush();
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Send a message to the running agent.
|
|
pub async fn cmd_write(message: &str, debug: bool) -> anyhow::Result<()> {
|
|
let sock = socket_path();
|
|
if debug {
|
|
eprintln!("connecting to {}", sock.display());
|
|
}
|
|
let stream = UnixStream::connect(&sock).await
|
|
.map_err(|e| anyhow::anyhow!(
|
|
"can't connect — is poc-agent running? ({})", e
|
|
))?;
|
|
|
|
let (_, mut writer) = stream.into_split();
|
|
writer.write_all(message.as_bytes()).await?;
|
|
writer.write_all(b"\n").await?;
|
|
writer.shutdown().await?;
|
|
Ok(())
|
|
}
|
|
|
|
// --- Server ---
|
|
|
|
/// Start the observation socket + logfile writer.
|
|
pub fn start(
|
|
socket_path_override: PathBuf,
|
|
mut ui_rx: broadcast::Receiver<UiMessage>,
|
|
input_tx: InputSender,
|
|
) {
|
|
let _ = std::fs::remove_file(&socket_path_override);
|
|
|
|
let listener = UnixListener::bind(&socket_path_override)
|
|
.expect("failed to bind observation socket");
|
|
|
|
// Open logfile
|
|
let logfile = Arc::new(Mutex::new(
|
|
std::fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(log_path())
|
|
.expect("failed to open observe log"),
|
|
));
|
|
|
|
let (line_tx, _) = broadcast::channel::<String>(256);
|
|
let line_tx2 = line_tx.clone();
|
|
|
|
// Receive UiMessages → write to logfile + broadcast to socket clients.
|
|
// TextDelta and Reasoning tokens are buffered and flushed on turn
|
|
// boundaries so the log reads as complete messages, not token fragments.
|
|
tokio::spawn(async move {
|
|
let mut text_buf = String::new();
|
|
let mut reasoning_buf = String::new();
|
|
|
|
loop {
|
|
match ui_rx.recv().await {
|
|
Ok(msg) => {
|
|
// Buffer streaming tokens
|
|
match &msg {
|
|
UiMessage::TextDelta(text, _) => {
|
|
text_buf.push_str(text);
|
|
continue;
|
|
}
|
|
UiMessage::Reasoning(text) => {
|
|
reasoning_buf.push_str(text);
|
|
continue;
|
|
}
|
|
_ => {}
|
|
}
|
|
|
|
// Flush reasoning buffer as one line
|
|
if !reasoning_buf.is_empty() {
|
|
let thinking = format!("(thinking: {})", reasoning_buf.trim());
|
|
use std::io::Write;
|
|
let mut f = logfile.lock().await;
|
|
let _ = writeln!(f, "{}", thinking);
|
|
let _ = f.flush();
|
|
let _ = line_tx2.send(thinking);
|
|
reasoning_buf.clear();
|
|
}
|
|
|
|
// Flush text buffer
|
|
if !text_buf.is_empty() {
|
|
use std::io::Write;
|
|
let mut f = logfile.lock().await;
|
|
let _ = writeln!(f, "{}", text_buf);
|
|
let _ = f.flush();
|
|
let _ = line_tx2.send(std::mem::take(&mut text_buf));
|
|
}
|
|
|
|
// Write the non-streaming message
|
|
if let Some(line) = format_message(&msg) {
|
|
use std::io::Write;
|
|
let mut f = logfile.lock().await;
|
|
let _ = writeln!(f, "{}", line);
|
|
let _ = f.flush();
|
|
let _ = line_tx2.send(line);
|
|
}
|
|
}
|
|
Err(broadcast::error::RecvError::Lagged(_)) => {}
|
|
Err(broadcast::error::RecvError::Closed) => {
|
|
use std::io::Write;
|
|
if !reasoning_buf.is_empty() {
|
|
let thinking = format!("(thinking: {})", reasoning_buf.trim());
|
|
let mut f = logfile.lock().await;
|
|
let _ = writeln!(f, "{}", thinking);
|
|
let _ = f.flush();
|
|
let _ = line_tx2.send(thinking);
|
|
}
|
|
if !text_buf.is_empty() {
|
|
let mut f = logfile.lock().await;
|
|
let _ = writeln!(f, "{}", text_buf);
|
|
let _ = f.flush();
|
|
let _ = line_tx2.send(text_buf);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Accept socket connections (live streaming + input)
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match listener.accept().await {
|
|
Ok((stream, _)) => {
|
|
let mut line_rx = line_tx.subscribe();
|
|
let input_tx = input_tx.clone();
|
|
|
|
tokio::spawn(async move {
|
|
let (reader, mut writer) = stream.into_split();
|
|
let mut reader = BufReader::new(reader);
|
|
let mut input_buf = String::new();
|
|
|
|
loop {
|
|
tokio::select! {
|
|
biased;
|
|
|
|
result = reader.read_line(&mut input_buf) => {
|
|
match result {
|
|
Ok(0) | Err(_) => break,
|
|
Ok(_) => {
|
|
let line = input_buf.trim().to_string();
|
|
if !line.is_empty() {
|
|
let _ = input_tx.send(line);
|
|
}
|
|
input_buf.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
result = line_rx.recv() => {
|
|
match result {
|
|
Ok(line) => {
|
|
let data = format!("{}\n", line);
|
|
if writer.write_all(data.as_bytes()).await.is_err() {
|
|
break;
|
|
}
|
|
let _ = writer.flush().await;
|
|
}
|
|
Err(broadcast::error::RecvError::Lagged(_)) => {
|
|
let _ = writer.write_all(
|
|
b"[some output was dropped]\n"
|
|
).await;
|
|
}
|
|
Err(broadcast::error::RecvError::Closed) => break,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
});
|
|
}
|