// observe.rs — Shared observation socket + logfile // // Two mechanisms: // 1. Logfile (~/.consciousness/agent-sessions/observe.log) — append-only // plain text of the conversation. `poc-agent read` prints new // content since last read using a byte-offset cursor file. // 2. Unix socket — for live streaming (`poc-agent read -f`) and // sending input (`poc-agent write `). // // The logfile is the history. The socket is the live wire. use anyhow::{Context, Result}; use std::fs::{File, OpenOptions}; use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use crate::agent::context::ConversationEntry; pub struct ConversationLog { path: PathBuf, } impl ConversationLog { pub fn new(path: PathBuf) -> Result { // Ensure parent directory exists if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .with_context(|| format!("creating log dir {}", parent.display()))?; } Ok(Self { path }) } /// Append a conversation entry to the log. pub fn append(&self, entry: &ConversationEntry) -> Result<()> { let mut file = OpenOptions::new() .create(true) .append(true) .open(&self.path) .with_context(|| format!("opening log {}", self.path.display()))?; let line = serde_json::to_string(entry) .context("serializing entry for log")?; writeln!(file, "{}", line) .context("writing to conversation log")?; file.sync_all() .context("syncing conversation log")?; Ok(()) } /// Read the tail of the log (last `max_bytes` bytes). /// Seeks to `file_len - max_bytes`, skips the first partial line, /// then parses forward. For logs smaller than `max_bytes`, reads everything. pub fn read_tail(&self, max_bytes: u64) -> Result> { if !self.path.exists() { return Ok(Vec::new()); } let file = File::open(&self.path) .with_context(|| format!("opening log {}", self.path.display()))?; let file_len = file.metadata()?.len(); let mut reader = BufReader::new(file); if file_len > max_bytes { reader.seek(SeekFrom::Start(file_len - max_bytes))?; // Skip partial first line let mut discard = String::new(); reader.read_line(&mut discard)?; } let mut entries = Vec::new(); for line in reader.lines() { let line = line.context("reading log tail")?; let line = line.trim(); if line.is_empty() { continue; } // Try ConversationEntry first (new format), fall back to bare Message (old logs) if let Ok(entry) = serde_json::from_str::(line) { entries.push(entry); } } Ok(entries) } pub fn path(&self) -> &Path { &self.path } /// Get the timestamp of the oldest message in the log. pub fn oldest_timestamp(&self) -> Option> { let file = File::open(&self.path).ok()?; let reader = BufReader::new(file); for line in reader.lines().flatten() { let line = line.trim().to_string(); if line.is_empty() { continue; } if let Ok(entry) = serde_json::from_str::(&line) { if let Some(ts) = &entry.message().timestamp { if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) { return Some(dt.to_utc()); } // Try other formats if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") { return Some(dt.and_utc()); } } } } None } }