From a21cf31ad28b9a69cb9f5c1f0727f7779354b212 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Thu, 2 Apr 2026 14:31:19 -0400 Subject: [PATCH] unify conversation persistence to append-only jsonl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Log ConversationEntry (with Memory/Message typing) instead of raw Message. restore_from_log reads typed entries directly, preserving Memory vs Message distinction across restarts. Remove current.json snapshot and save_session — the append-only log is the single source of truth. Remove dead read_all and message_count methods. Add push_entry for logging typed entries. Co-Authored-By: Proof of Concept --- src/agent/log.rs | 69 +++++++++----------------------------------- src/agent/runner.rs | 53 ++++++++++++++++++---------------- src/bin/poc-agent.rs | 50 ++++---------------------------- 3 files changed, 47 insertions(+), 125 deletions(-) diff --git a/src/agent/log.rs b/src/agent/log.rs index 7353d14..1a1052f 100644 --- a/src/agent/log.rs +++ b/src/agent/log.rs @@ -14,7 +14,7 @@ use std::fs::{File, OpenOptions}; use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; -use crate::agent::types::Message; +use crate::agent::types::ConversationEntry; pub struct ConversationLog { path: PathBuf, @@ -30,16 +30,16 @@ impl ConversationLog { Ok(Self { path }) } - /// Append a single message to the log. - pub fn append(&self, msg: &Message) -> Result<()> { + /// Append a conversation entry to the log. + pub fn append(&self, entry: &ConversationEntry) -> Result<()> { let mut file = OpenOptions::new() .create(true) .append(true) .open(&self.path) .with_context(|| format!("opening log {}", self.path.display()))?; - let line = serde_json::to_string(msg) - .context("serializing message for log")?; + let line = serde_json::to_string(entry) + .context("serializing entry for log")?; writeln!(file, "{}", line) .context("writing to conversation log")?; Ok(()) @@ -48,7 +48,7 @@ impl ConversationLog { /// Read the tail of the log (last `max_bytes` bytes). /// Seeks to `file_len - max_bytes`, skips the first partial line, /// then parses forward. For logs smaller than `max_bytes`, reads everything. - pub fn read_tail(&self, max_bytes: u64) -> Result> { + pub fn read_tail(&self, max_bytes: u64) -> Result> { if !self.path.exists() { return Ok(Vec::new()); } @@ -64,62 +64,19 @@ impl ConversationLog { reader.read_line(&mut discard)?; } - let mut messages = Vec::new(); + let mut entries = Vec::new(); for line in reader.lines() { let line = line.context("reading log tail")?; let line = line.trim(); if line.is_empty() { continue; } - match serde_json::from_str::(line) { - Ok(msg) => messages.push(msg), - Err(_) => {} // skip corrupt/partial lines + // Try ConversationEntry first (new format), fall back to bare Message (old logs) + if let Ok(entry) = serde_json::from_str::(line) { + entries.push(entry); } } - Ok(messages) - } - - /// Count messages in the log without loading content. - #[allow(dead_code)] - pub fn message_count(&self) -> Result { - if !self.path.exists() { - return Ok(0); - } - let file = File::open(&self.path) - .with_context(|| format!("opening log {}", self.path.display()))?; - let reader = BufReader::new(file); - Ok(reader.lines() - .filter(|l| l.as_ref().map_or(false, |s| !s.trim().is_empty())) - .count()) - } - - /// Read all messages from the log. Returns empty vec if log doesn't exist. - /// NOTE: Don't use this in hot paths — use read_tail() instead. - #[allow(dead_code)] - pub fn read_all(&self) -> Result> { - if !self.path.exists() { - return Ok(Vec::new()); - } - let file = File::open(&self.path) - .with_context(|| format!("opening log {}", self.path.display()))?; - let reader = BufReader::new(file); - let mut messages = Vec::new(); - - for (i, line) in reader.lines().enumerate() { - let line = line.with_context(|| format!("reading log line {}", i))?; - let line = line.trim(); - if line.is_empty() { - continue; - } - match serde_json::from_str::(line) { - Ok(msg) => messages.push(msg), - Err(e) => { - // Log corruption — skip bad lines rather than failing - eprintln!("warning: skipping corrupt log line {}: {}", i, e); - } - } - } - Ok(messages) + Ok(entries) } pub fn path(&self) -> &Path { @@ -133,8 +90,8 @@ impl ConversationLog { for line in reader.lines().flatten() { let line = line.trim().to_string(); if line.is_empty() { continue; } - if let Ok(msg) = serde_json::from_str::(&line) { - if let Some(ts) = &msg.timestamp { + if let Ok(entry) = serde_json::from_str::(&line) { + if let Some(ts) = &entry.message().timestamp { if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) { return Some(dt.to_utc()); } diff --git a/src/agent/runner.rs b/src/agent/runner.rs index c145bd2..a93dae2 100644 --- a/src/agent/runner.rs +++ b/src/agent/runner.rs @@ -168,12 +168,17 @@ impl Agent { /// Push a conversation message — stamped and logged. fn push_message(&mut self, mut msg: Message) { msg.stamp(); + let entry = ConversationEntry::Message(msg); + self.push_entry(entry); + } + + fn push_entry(&mut self, entry: ConversationEntry) { if let Some(ref log) = self.conversation_log { - if let Err(e) = log.append(&msg) { - eprintln!("warning: failed to log message: {:#}", e); + if let Err(e) = log.append(&entry) { + eprintln!("warning: failed to log entry: {:#}", e); } } - self.context.entries.push(ConversationEntry::Message(msg)); + self.context.entries.push(entry); } /// Push a context-only message (system prompt, identity context, @@ -1000,11 +1005,11 @@ impl Agent { self.context.system_prompt = system_prompt; self.context.personality = personality; - let all_messages = match &self.conversation_log { + let entries = match &self.conversation_log { Some(log) => match log.read_tail(512 * 1024) { - Ok(msgs) if !msgs.is_empty() => { - dbglog!("[restore] read {} messages from log tail", msgs.len()); - msgs + Ok(entries) if !entries.is_empty() => { + dbglog!("[restore] read {} entries from log tail", entries.len()); + entries } Ok(_) => { dbglog!("[restore] log exists but is empty"); @@ -1021,29 +1026,31 @@ impl Agent { } }; - // Filter out system/context messages — we only want the - // actual conversation (user prompts, assistant responses, - // tool calls/results) - let conversation: Vec = all_messages + // Filter out system messages, keep everything else (including Memory entries) + let entries: Vec = entries .into_iter() - .filter(|m| m.role != Role::System) + .filter(|e| e.message().role != Role::System) .collect(); - dbglog!("[restore] {} messages after filtering system", conversation.len()); - let messages = crate::agent::context::trim_conversation( + // Trim to fit context budget + let n = entries.len(); + let conversation: Vec = entries.iter() + .map(|e| e.api_message().clone()).collect(); + let trimmed = crate::agent::context::trim_conversation( &self.context, &conversation, &self.client.model, &self.tokenizer, ); - dbglog!("[restore] journal preserved: {} entries", - self.context.journal.len()); - // Don't overwrite journal — already loaded from memory graph - self.context.entries = messages.into_iter() - .map(ConversationEntry::Message).collect(); - dbglog!("[restore] built context window: {} entries", self.context.entries.len()); + // Keep only the entries that survived trimming (by count from the end) + let keep = trimmed.len(); + self.context.entries = entries.into_iter() + .skip(n.saturating_sub(keep)) + .collect(); + + dbglog!("[restore] {} entries, journal: {} entries", + self.context.entries.len(), self.context.journal.len()); self.last_prompt_tokens = 0; - self.publish_context_state(); true } @@ -1068,10 +1075,6 @@ impl Agent { &mut self.context.entries } - /// Restore from saved conversation entries. - pub fn restore(&mut self, entries: Vec) { - self.context.entries = entries; - } } // Context window building, token counting, and error classification diff --git a/src/bin/poc-agent.rs b/src/bin/poc-agent.rs index eda6961..bbf390d 100644 --- a/src/bin/poc-agent.rs +++ b/src/bin/poc-agent.rs @@ -24,7 +24,6 @@ use anyhow::Result; use crossterm::event::{Event, EventStream, KeyEventKind}; use futures::StreamExt; -use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, Mutex}; @@ -124,8 +123,6 @@ struct Session { process_tracker: tools::ProcessTracker, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, - session_file: PathBuf, - // DMN state dmn: dmn::State, dmn_turns: u32, @@ -153,7 +150,6 @@ impl Session { process_tracker: tools::ProcessTracker, ui_tx: ui_channel::UiSender, turn_tx: mpsc::Sender<(Result, StreamTarget)>, - session_file: PathBuf, ) -> Self { let max_dmn_turns = config.app.dmn.max_turns; @@ -163,7 +159,6 @@ impl Session { process_tracker, ui_tx, turn_tx, - session_file, dmn: if dmn::is_off() { dmn::State::Off } else { @@ -321,8 +316,6 @@ impl Session { .to_string(), ); } - - let _ = save_session(&agent_guard, &self.session_file); } /// Send any consolidated pending input as a single turn. @@ -398,14 +391,9 @@ impl Session { match input { "/quit" | "/exit" => Command::Quit, "/save" => { - if let Ok(agent) = self.agent.try_lock() { - let _ = save_session(&agent, &self.session_file); - let _ = self.ui_tx.send(UiMessage::Info("Session saved.".into())); - } else { - let _ = self - .ui_tx - .send(UiMessage::Info("(busy — will save after turn)".into())); - } + let _ = self.ui_tx.send(UiMessage::Info( + "Conversation is saved automatically (append-only log).".into() + )); Command::Handled } "/new" | "/clear" => { @@ -415,10 +403,6 @@ impl Session { .send(UiMessage::Info("(turn in progress, please wait)".into())); return Command::Handled; } - { - let agent_guard = self.agent.lock().await; - let _ = save_session(&agent_guard, &self.session_file); - } { let new_log = log::ConversationLog::new( self.config.session_dir.join("conversation.jsonl"), @@ -516,7 +500,6 @@ impl Session { ))); } } - let _ = save_session(&agent_guard, &self.session_file); self.dmn = dmn::State::Resting { since: Instant::now(), }; @@ -861,8 +844,6 @@ impl Session { if let Some(handle) = self.turn_handle.take() { handle.abort(); } - let agent = self.agent.lock().await; - let _ = save_session(&agent, &self.session_file); } } @@ -927,29 +908,17 @@ async fn run(cli: cli::CliArgs) -> Result<()> { // so Ctrl+K can kill processes even when the agent is busy. let process_tracker = agent.lock().await.process_tracker.clone(); - // Try to restore from conversation log (primary) or session file (fallback) - let session_file = config.session_dir.join("current.json"); + // Restore conversation from the append-only log { let mut agent_guard = agent.lock().await; - let restored = agent_guard.restore_from_log( + if agent_guard.restore_from_log( config.system_prompt.clone(), config.context_parts.clone(), - ); - if restored { + ) { replay_session_to_ui(agent_guard.entries(), &ui_tx); let _ = ui_tx.send(UiMessage::Info( "--- restored from conversation log ---".into(), )); - } else if session_file.exists() { - if let Ok(data) = std::fs::read_to_string(&session_file) { - if let Ok(messages) = serde_json::from_str(&data) { - agent_guard.restore(messages); - replay_session_to_ui(agent_guard.entries(), &ui_tx); - let _ = ui_tx.send(UiMessage::Info( - "--- restored from session file ---".into(), - )); - } - } } } @@ -978,7 +947,6 @@ async fn run(cli: cli::CliArgs) -> Result<()> { process_tracker, ui_tx.clone(), turn_tx, - session_file, ); session.update_status(); session.send_context_info(); @@ -1103,12 +1071,6 @@ fn drain_ui_messages(rx: &mut ui_channel::UiReceiver, app: &mut tui::App) { } } -fn save_session(agent: &Agent, path: &PathBuf) -> Result<()> { - let data = serde_json::to_string_pretty(agent.entries())?; - std::fs::write(path, data)?; - Ok(()) -} - async fn run_tool_tests(ui_tx: &ui_channel::UiSender, tracker: &tools::ProcessTracker) { use serde_json::json;