unify conversation persistence to append-only jsonl
Log ConversationEntry (with Memory/Message typing) instead of raw Message. restore_from_log reads typed entries directly, preserving Memory vs Message distinction across restarts. Remove current.json snapshot and save_session — the append-only log is the single source of truth. Remove dead read_all and message_count methods. Add push_entry for logging typed entries. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
1f7b585d41
commit
a21cf31ad2
3 changed files with 47 additions and 125 deletions
|
|
@ -14,7 +14,7 @@ use std::fs::{File, OpenOptions};
|
||||||
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
|
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
use crate::agent::types::Message;
|
use crate::agent::types::ConversationEntry;
|
||||||
|
|
||||||
pub struct ConversationLog {
|
pub struct ConversationLog {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
|
|
@ -30,16 +30,16 @@ impl ConversationLog {
|
||||||
Ok(Self { path })
|
Ok(Self { path })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a single message to the log.
|
/// Append a conversation entry to the log.
|
||||||
pub fn append(&self, msg: &Message) -> Result<()> {
|
pub fn append(&self, entry: &ConversationEntry) -> Result<()> {
|
||||||
let mut file = OpenOptions::new()
|
let mut file = OpenOptions::new()
|
||||||
.create(true)
|
.create(true)
|
||||||
.append(true)
|
.append(true)
|
||||||
.open(&self.path)
|
.open(&self.path)
|
||||||
.with_context(|| format!("opening log {}", self.path.display()))?;
|
.with_context(|| format!("opening log {}", self.path.display()))?;
|
||||||
|
|
||||||
let line = serde_json::to_string(msg)
|
let line = serde_json::to_string(entry)
|
||||||
.context("serializing message for log")?;
|
.context("serializing entry for log")?;
|
||||||
writeln!(file, "{}", line)
|
writeln!(file, "{}", line)
|
||||||
.context("writing to conversation log")?;
|
.context("writing to conversation log")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -48,7 +48,7 @@ impl ConversationLog {
|
||||||
/// Read the tail of the log (last `max_bytes` bytes).
|
/// Read the tail of the log (last `max_bytes` bytes).
|
||||||
/// Seeks to `file_len - max_bytes`, skips the first partial line,
|
/// Seeks to `file_len - max_bytes`, skips the first partial line,
|
||||||
/// then parses forward. For logs smaller than `max_bytes`, reads everything.
|
/// then parses forward. For logs smaller than `max_bytes`, reads everything.
|
||||||
pub fn read_tail(&self, max_bytes: u64) -> Result<Vec<Message>> {
|
pub fn read_tail(&self, max_bytes: u64) -> Result<Vec<ConversationEntry>> {
|
||||||
if !self.path.exists() {
|
if !self.path.exists() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
|
@ -64,62 +64,19 @@ impl ConversationLog {
|
||||||
reader.read_line(&mut discard)?;
|
reader.read_line(&mut discard)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut messages = Vec::new();
|
let mut entries = Vec::new();
|
||||||
for line in reader.lines() {
|
for line in reader.lines() {
|
||||||
let line = line.context("reading log tail")?;
|
let line = line.context("reading log tail")?;
|
||||||
let line = line.trim();
|
let line = line.trim();
|
||||||
if line.is_empty() {
|
if line.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
match serde_json::from_str::<Message>(line) {
|
// Try ConversationEntry first (new format), fall back to bare Message (old logs)
|
||||||
Ok(msg) => messages.push(msg),
|
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(line) {
|
||||||
Err(_) => {} // skip corrupt/partial lines
|
entries.push(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(messages)
|
Ok(entries)
|
||||||
}
|
|
||||||
|
|
||||||
/// Count messages in the log without loading content.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn message_count(&self) -> Result<usize> {
|
|
||||||
if !self.path.exists() {
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
let file = File::open(&self.path)
|
|
||||||
.with_context(|| format!("opening log {}", self.path.display()))?;
|
|
||||||
let reader = BufReader::new(file);
|
|
||||||
Ok(reader.lines()
|
|
||||||
.filter(|l| l.as_ref().map_or(false, |s| !s.trim().is_empty()))
|
|
||||||
.count())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read all messages from the log. Returns empty vec if log doesn't exist.
|
|
||||||
/// NOTE: Don't use this in hot paths — use read_tail() instead.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn read_all(&self) -> Result<Vec<Message>> {
|
|
||||||
if !self.path.exists() {
|
|
||||||
return Ok(Vec::new());
|
|
||||||
}
|
|
||||||
let file = File::open(&self.path)
|
|
||||||
.with_context(|| format!("opening log {}", self.path.display()))?;
|
|
||||||
let reader = BufReader::new(file);
|
|
||||||
let mut messages = Vec::new();
|
|
||||||
|
|
||||||
for (i, line) in reader.lines().enumerate() {
|
|
||||||
let line = line.with_context(|| format!("reading log line {}", i))?;
|
|
||||||
let line = line.trim();
|
|
||||||
if line.is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
match serde_json::from_str::<Message>(line) {
|
|
||||||
Ok(msg) => messages.push(msg),
|
|
||||||
Err(e) => {
|
|
||||||
// Log corruption — skip bad lines rather than failing
|
|
||||||
eprintln!("warning: skipping corrupt log line {}: {}", i, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(messages)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn path(&self) -> &Path {
|
pub fn path(&self) -> &Path {
|
||||||
|
|
@ -133,8 +90,8 @@ impl ConversationLog {
|
||||||
for line in reader.lines().flatten() {
|
for line in reader.lines().flatten() {
|
||||||
let line = line.trim().to_string();
|
let line = line.trim().to_string();
|
||||||
if line.is_empty() { continue; }
|
if line.is_empty() { continue; }
|
||||||
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
if let Ok(entry) = serde_json::from_str::<ConversationEntry>(&line) {
|
||||||
if let Some(ts) = &msg.timestamp {
|
if let Some(ts) = &entry.message().timestamp {
|
||||||
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) {
|
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) {
|
||||||
return Some(dt.to_utc());
|
return Some(dt.to_utc());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -168,12 +168,17 @@ impl Agent {
|
||||||
/// Push a conversation message — stamped and logged.
|
/// Push a conversation message — stamped and logged.
|
||||||
fn push_message(&mut self, mut msg: Message) {
|
fn push_message(&mut self, mut msg: Message) {
|
||||||
msg.stamp();
|
msg.stamp();
|
||||||
|
let entry = ConversationEntry::Message(msg);
|
||||||
|
self.push_entry(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push_entry(&mut self, entry: ConversationEntry) {
|
||||||
if let Some(ref log) = self.conversation_log {
|
if let Some(ref log) = self.conversation_log {
|
||||||
if let Err(e) = log.append(&msg) {
|
if let Err(e) = log.append(&entry) {
|
||||||
eprintln!("warning: failed to log message: {:#}", e);
|
eprintln!("warning: failed to log entry: {:#}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.context.entries.push(ConversationEntry::Message(msg));
|
self.context.entries.push(entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a context-only message (system prompt, identity context,
|
/// Push a context-only message (system prompt, identity context,
|
||||||
|
|
@ -1000,11 +1005,11 @@ impl Agent {
|
||||||
self.context.system_prompt = system_prompt;
|
self.context.system_prompt = system_prompt;
|
||||||
self.context.personality = personality;
|
self.context.personality = personality;
|
||||||
|
|
||||||
let all_messages = match &self.conversation_log {
|
let entries = match &self.conversation_log {
|
||||||
Some(log) => match log.read_tail(512 * 1024) {
|
Some(log) => match log.read_tail(512 * 1024) {
|
||||||
Ok(msgs) if !msgs.is_empty() => {
|
Ok(entries) if !entries.is_empty() => {
|
||||||
dbglog!("[restore] read {} messages from log tail", msgs.len());
|
dbglog!("[restore] read {} entries from log tail", entries.len());
|
||||||
msgs
|
entries
|
||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
dbglog!("[restore] log exists but is empty");
|
dbglog!("[restore] log exists but is empty");
|
||||||
|
|
@ -1021,29 +1026,31 @@ impl Agent {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Filter out system/context messages — we only want the
|
// Filter out system messages, keep everything else (including Memory entries)
|
||||||
// actual conversation (user prompts, assistant responses,
|
let entries: Vec<ConversationEntry> = entries
|
||||||
// tool calls/results)
|
|
||||||
let conversation: Vec<Message> = all_messages
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|m| m.role != Role::System)
|
.filter(|e| e.message().role != Role::System)
|
||||||
.collect();
|
.collect();
|
||||||
dbglog!("[restore] {} messages after filtering system", conversation.len());
|
|
||||||
|
|
||||||
let messages = crate::agent::context::trim_conversation(
|
// Trim to fit context budget
|
||||||
|
let n = entries.len();
|
||||||
|
let conversation: Vec<Message> = entries.iter()
|
||||||
|
.map(|e| e.api_message().clone()).collect();
|
||||||
|
let trimmed = crate::agent::context::trim_conversation(
|
||||||
&self.context,
|
&self.context,
|
||||||
&conversation,
|
&conversation,
|
||||||
&self.client.model,
|
&self.client.model,
|
||||||
&self.tokenizer,
|
&self.tokenizer,
|
||||||
);
|
);
|
||||||
dbglog!("[restore] journal preserved: {} entries",
|
// Keep only the entries that survived trimming (by count from the end)
|
||||||
self.context.journal.len());
|
let keep = trimmed.len();
|
||||||
// Don't overwrite journal — already loaded from memory graph
|
self.context.entries = entries.into_iter()
|
||||||
self.context.entries = messages.into_iter()
|
.skip(n.saturating_sub(keep))
|
||||||
.map(ConversationEntry::Message).collect();
|
.collect();
|
||||||
dbglog!("[restore] built context window: {} entries", self.context.entries.len());
|
|
||||||
self.last_prompt_tokens = 0;
|
|
||||||
|
|
||||||
|
dbglog!("[restore] {} entries, journal: {} entries",
|
||||||
|
self.context.entries.len(), self.context.journal.len());
|
||||||
|
self.last_prompt_tokens = 0;
|
||||||
self.publish_context_state();
|
self.publish_context_state();
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
@ -1068,10 +1075,6 @@ impl Agent {
|
||||||
&mut self.context.entries
|
&mut self.context.entries
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Restore from saved conversation entries.
|
|
||||||
pub fn restore(&mut self, entries: Vec<ConversationEntry>) {
|
|
||||||
self.context.entries = entries;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Context window building, token counting, and error classification
|
// Context window building, token counting, and error classification
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use crossterm::event::{Event, EventStream, KeyEventKind};
|
use crossterm::event::{Event, EventStream, KeyEventKind};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::sync::{mpsc, Mutex};
|
use tokio::sync::{mpsc, Mutex};
|
||||||
|
|
@ -124,8 +123,6 @@ struct Session {
|
||||||
process_tracker: tools::ProcessTracker,
|
process_tracker: tools::ProcessTracker,
|
||||||
ui_tx: ui_channel::UiSender,
|
ui_tx: ui_channel::UiSender,
|
||||||
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
||||||
session_file: PathBuf,
|
|
||||||
|
|
||||||
// DMN state
|
// DMN state
|
||||||
dmn: dmn::State,
|
dmn: dmn::State,
|
||||||
dmn_turns: u32,
|
dmn_turns: u32,
|
||||||
|
|
@ -153,7 +150,6 @@ impl Session {
|
||||||
process_tracker: tools::ProcessTracker,
|
process_tracker: tools::ProcessTracker,
|
||||||
ui_tx: ui_channel::UiSender,
|
ui_tx: ui_channel::UiSender,
|
||||||
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
turn_tx: mpsc::Sender<(Result<TurnResult>, StreamTarget)>,
|
||||||
session_file: PathBuf,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let max_dmn_turns = config.app.dmn.max_turns;
|
let max_dmn_turns = config.app.dmn.max_turns;
|
||||||
|
|
||||||
|
|
@ -163,7 +159,6 @@ impl Session {
|
||||||
process_tracker,
|
process_tracker,
|
||||||
ui_tx,
|
ui_tx,
|
||||||
turn_tx,
|
turn_tx,
|
||||||
session_file,
|
|
||||||
dmn: if dmn::is_off() {
|
dmn: if dmn::is_off() {
|
||||||
dmn::State::Off
|
dmn::State::Off
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -321,8 +316,6 @@ impl Session {
|
||||||
.to_string(),
|
.to_string(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = save_session(&agent_guard, &self.session_file);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send any consolidated pending input as a single turn.
|
/// Send any consolidated pending input as a single turn.
|
||||||
|
|
@ -398,14 +391,9 @@ impl Session {
|
||||||
match input {
|
match input {
|
||||||
"/quit" | "/exit" => Command::Quit,
|
"/quit" | "/exit" => Command::Quit,
|
||||||
"/save" => {
|
"/save" => {
|
||||||
if let Ok(agent) = self.agent.try_lock() {
|
let _ = self.ui_tx.send(UiMessage::Info(
|
||||||
let _ = save_session(&agent, &self.session_file);
|
"Conversation is saved automatically (append-only log).".into()
|
||||||
let _ = self.ui_tx.send(UiMessage::Info("Session saved.".into()));
|
));
|
||||||
} else {
|
|
||||||
let _ = self
|
|
||||||
.ui_tx
|
|
||||||
.send(UiMessage::Info("(busy — will save after turn)".into()));
|
|
||||||
}
|
|
||||||
Command::Handled
|
Command::Handled
|
||||||
}
|
}
|
||||||
"/new" | "/clear" => {
|
"/new" | "/clear" => {
|
||||||
|
|
@ -415,10 +403,6 @@ impl Session {
|
||||||
.send(UiMessage::Info("(turn in progress, please wait)".into()));
|
.send(UiMessage::Info("(turn in progress, please wait)".into()));
|
||||||
return Command::Handled;
|
return Command::Handled;
|
||||||
}
|
}
|
||||||
{
|
|
||||||
let agent_guard = self.agent.lock().await;
|
|
||||||
let _ = save_session(&agent_guard, &self.session_file);
|
|
||||||
}
|
|
||||||
{
|
{
|
||||||
let new_log = log::ConversationLog::new(
|
let new_log = log::ConversationLog::new(
|
||||||
self.config.session_dir.join("conversation.jsonl"),
|
self.config.session_dir.join("conversation.jsonl"),
|
||||||
|
|
@ -516,7 +500,6 @@ impl Session {
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = save_session(&agent_guard, &self.session_file);
|
|
||||||
self.dmn = dmn::State::Resting {
|
self.dmn = dmn::State::Resting {
|
||||||
since: Instant::now(),
|
since: Instant::now(),
|
||||||
};
|
};
|
||||||
|
|
@ -861,8 +844,6 @@ impl Session {
|
||||||
if let Some(handle) = self.turn_handle.take() {
|
if let Some(handle) = self.turn_handle.take() {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
}
|
}
|
||||||
let agent = self.agent.lock().await;
|
|
||||||
let _ = save_session(&agent, &self.session_file);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -927,29 +908,17 @@ async fn run(cli: cli::CliArgs) -> Result<()> {
|
||||||
// so Ctrl+K can kill processes even when the agent is busy.
|
// so Ctrl+K can kill processes even when the agent is busy.
|
||||||
let process_tracker = agent.lock().await.process_tracker.clone();
|
let process_tracker = agent.lock().await.process_tracker.clone();
|
||||||
|
|
||||||
// Try to restore from conversation log (primary) or session file (fallback)
|
// Restore conversation from the append-only log
|
||||||
let session_file = config.session_dir.join("current.json");
|
|
||||||
{
|
{
|
||||||
let mut agent_guard = agent.lock().await;
|
let mut agent_guard = agent.lock().await;
|
||||||
let restored = agent_guard.restore_from_log(
|
if agent_guard.restore_from_log(
|
||||||
config.system_prompt.clone(),
|
config.system_prompt.clone(),
|
||||||
config.context_parts.clone(),
|
config.context_parts.clone(),
|
||||||
);
|
) {
|
||||||
if restored {
|
|
||||||
replay_session_to_ui(agent_guard.entries(), &ui_tx);
|
replay_session_to_ui(agent_guard.entries(), &ui_tx);
|
||||||
let _ = ui_tx.send(UiMessage::Info(
|
let _ = ui_tx.send(UiMessage::Info(
|
||||||
"--- restored from conversation log ---".into(),
|
"--- restored from conversation log ---".into(),
|
||||||
));
|
));
|
||||||
} else if session_file.exists() {
|
|
||||||
if let Ok(data) = std::fs::read_to_string(&session_file) {
|
|
||||||
if let Ok(messages) = serde_json::from_str(&data) {
|
|
||||||
agent_guard.restore(messages);
|
|
||||||
replay_session_to_ui(agent_guard.entries(), &ui_tx);
|
|
||||||
let _ = ui_tx.send(UiMessage::Info(
|
|
||||||
"--- restored from session file ---".into(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -978,7 +947,6 @@ async fn run(cli: cli::CliArgs) -> Result<()> {
|
||||||
process_tracker,
|
process_tracker,
|
||||||
ui_tx.clone(),
|
ui_tx.clone(),
|
||||||
turn_tx,
|
turn_tx,
|
||||||
session_file,
|
|
||||||
);
|
);
|
||||||
session.update_status();
|
session.update_status();
|
||||||
session.send_context_info();
|
session.send_context_info();
|
||||||
|
|
@ -1103,12 +1071,6 @@ fn drain_ui_messages(rx: &mut ui_channel::UiReceiver, app: &mut tui::App) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_session(agent: &Agent, path: &PathBuf) -> Result<()> {
|
|
||||||
let data = serde_json::to_string_pretty(agent.entries())?;
|
|
||||||
std::fs::write(path, data)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_tool_tests(ui_tx: &ui_channel::UiSender, tracker: &tools::ProcessTracker) {
|
async fn run_tool_tests(ui_tx: &ui_channel::UiSender, tracker: &tools::ProcessTracker) {
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue