forked from kent/consciousness
Replace flat-file journal parser with direct store query for EpisodicSession nodes. Filter journal entries to only those older than the oldest conversation message (plus one overlap entry to avoid gaps). Falls back to 20 recent entries when no conversation exists yet. Fixes: poc-agent context window showing 0 journal entries. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
150 lines
5.3 KiB
Rust
150 lines
5.3 KiB
Rust
// log.rs — Persistent conversation log
|
|
//
|
|
// Append-only JSONL file that records every message in the conversation.
|
|
// This is the permanent record — never truncated, never compacted.
|
|
// The in-memory message array is a view into this log; compaction
|
|
// builds that view by mixing raw recent messages with journal
|
|
// summaries of older ones.
|
|
//
|
|
// Each line is a JSON-serialized Message with its timestamp.
|
|
// The log survives session restarts, compactions, and crashes.
|
|
|
|
use anyhow::{Context, Result};
|
|
use std::fs::{File, OpenOptions};
|
|
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use crate::agent::types::Message;
|
|
|
|
pub struct ConversationLog {
|
|
path: PathBuf,
|
|
}
|
|
|
|
impl ConversationLog {
|
|
pub fn new(path: PathBuf) -> Result<Self> {
|
|
// Ensure parent directory exists
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent)
|
|
.with_context(|| format!("creating log dir {}", parent.display()))?;
|
|
}
|
|
Ok(Self { path })
|
|
}
|
|
|
|
/// Append a single message to the log.
|
|
pub fn append(&self, msg: &Message) -> Result<()> {
|
|
let mut file = OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&self.path)
|
|
.with_context(|| format!("opening log {}", self.path.display()))?;
|
|
|
|
let line = serde_json::to_string(msg)
|
|
.context("serializing message for log")?;
|
|
writeln!(file, "{}", line)
|
|
.context("writing to conversation log")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Read the tail of the log (last `max_bytes` bytes).
|
|
/// Seeks to `file_len - max_bytes`, skips the first partial line,
|
|
/// then parses forward. For logs smaller than `max_bytes`, reads everything.
|
|
pub fn read_tail(&self, max_bytes: u64) -> Result<Vec<Message>> {
|
|
if !self.path.exists() {
|
|
return Ok(Vec::new());
|
|
}
|
|
let file = File::open(&self.path)
|
|
.with_context(|| format!("opening log {}", self.path.display()))?;
|
|
let file_len = file.metadata()?.len();
|
|
let mut reader = BufReader::new(file);
|
|
|
|
if file_len > max_bytes {
|
|
reader.seek(SeekFrom::Start(file_len - max_bytes))?;
|
|
// Skip partial first line
|
|
let mut discard = String::new();
|
|
reader.read_line(&mut discard)?;
|
|
}
|
|
|
|
let mut messages = Vec::new();
|
|
for line in reader.lines() {
|
|
let line = line.context("reading log tail")?;
|
|
let line = line.trim();
|
|
if line.is_empty() {
|
|
continue;
|
|
}
|
|
match serde_json::from_str::<Message>(line) {
|
|
Ok(msg) => messages.push(msg),
|
|
Err(_) => {} // skip corrupt/partial lines
|
|
}
|
|
}
|
|
Ok(messages)
|
|
}
|
|
|
|
/// Count messages in the log without loading content.
|
|
#[allow(dead_code)]
|
|
pub fn message_count(&self) -> Result<usize> {
|
|
if !self.path.exists() {
|
|
return Ok(0);
|
|
}
|
|
let file = File::open(&self.path)
|
|
.with_context(|| format!("opening log {}", self.path.display()))?;
|
|
let reader = BufReader::new(file);
|
|
Ok(reader.lines()
|
|
.filter(|l| l.as_ref().map_or(false, |s| !s.trim().is_empty()))
|
|
.count())
|
|
}
|
|
|
|
/// Read all messages from the log. Returns empty vec if log doesn't exist.
|
|
/// NOTE: Don't use this in hot paths — use read_tail() instead.
|
|
#[allow(dead_code)]
|
|
pub fn read_all(&self) -> Result<Vec<Message>> {
|
|
if !self.path.exists() {
|
|
return Ok(Vec::new());
|
|
}
|
|
let file = File::open(&self.path)
|
|
.with_context(|| format!("opening log {}", self.path.display()))?;
|
|
let reader = BufReader::new(file);
|
|
let mut messages = Vec::new();
|
|
|
|
for (i, line) in reader.lines().enumerate() {
|
|
let line = line.with_context(|| format!("reading log line {}", i))?;
|
|
let line = line.trim();
|
|
if line.is_empty() {
|
|
continue;
|
|
}
|
|
match serde_json::from_str::<Message>(line) {
|
|
Ok(msg) => messages.push(msg),
|
|
Err(e) => {
|
|
// Log corruption — skip bad lines rather than failing
|
|
eprintln!("warning: skipping corrupt log line {}: {}", i, e);
|
|
}
|
|
}
|
|
}
|
|
Ok(messages)
|
|
}
|
|
|
|
pub fn path(&self) -> &Path {
|
|
&self.path
|
|
}
|
|
|
|
/// Get the timestamp of the oldest message in the log.
|
|
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
|
|
let file = File::open(&self.path).ok()?;
|
|
let reader = BufReader::new(file);
|
|
for line in reader.lines().flatten() {
|
|
let line = line.trim().to_string();
|
|
if line.is_empty() { continue; }
|
|
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
|
if let Some(ts) = &msg.timestamp {
|
|
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) {
|
|
return Some(dt.to_utc());
|
|
}
|
|
// Try other formats
|
|
if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") {
|
|
return Some(dt.and_utc());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
None
|
|
}
|
|
}
|