Fast startup: mmap backward scan instead of reading full log
Uses JsonlBackwardIter (SIMD memrchr3) to scan the conversation log newest-first without reading/parsing the whole file. Stops as soon as the conversation budget is full. Only the kept nodes get retokenized and pushed into context. 18MB log → only tokenize the ~50 nodes that fit in the budget. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
7da3efc5df
commit
949dacd861
2 changed files with 35 additions and 32 deletions
|
|
@ -568,18 +568,17 @@ impl Agent {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn restore_from_log(&self) -> bool {
|
pub async fn restore_from_log(&self) -> bool {
|
||||||
let all_nodes = {
|
let tail = {
|
||||||
let ctx = self.context.lock().await;
|
let ctx = self.context.lock().await;
|
||||||
match &ctx.conversation_log {
|
match &ctx.conversation_log {
|
||||||
Some(log) => match log.read_nodes(64 * 1024 * 1024) {
|
Some(log) => match log.read_tail() {
|
||||||
Ok(nodes) if !nodes.is_empty() => nodes,
|
Ok(t) => t,
|
||||||
_ => return false,
|
Err(_) => return false,
|
||||||
},
|
},
|
||||||
None => return false,
|
None => return false,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Walk backwards from the tail, retokenize, stop at budget
|
|
||||||
let budget = context::context_budget_tokens();
|
let budget = context::context_budget_tokens();
|
||||||
let fixed = {
|
let fixed = {
|
||||||
let ctx = self.context.lock().await;
|
let ctx = self.context.lock().await;
|
||||||
|
|
@ -588,9 +587,10 @@ impl Agent {
|
||||||
};
|
};
|
||||||
let conv_budget = budget.saturating_sub(fixed);
|
let conv_budget = budget.saturating_sub(fixed);
|
||||||
|
|
||||||
|
// Walk backwards (newest first), retokenize, stop at budget
|
||||||
let mut kept = Vec::new();
|
let mut kept = Vec::new();
|
||||||
let mut total = 0;
|
let mut total = 0;
|
||||||
for node in all_nodes.into_iter().rev() {
|
for node in tail.iter() {
|
||||||
let node = node.retokenize();
|
let node = node.retokenize();
|
||||||
let tok = node.tokens();
|
let tok = node.tokens();
|
||||||
if total + tok > conv_budget && !kept.is_empty() { break; }
|
if total + tok > conv_budget && !kept.is_empty() { break; }
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,10 @@
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use std::fs::{File, OpenOptions};
|
use std::fs::{File, OpenOptions};
|
||||||
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
|
use std::io::Write;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use crate::agent::context::AstNode;
|
use crate::agent::context::AstNode;
|
||||||
|
use crate::hippocampus::transcript::JsonlBackwardIter;
|
||||||
|
use memmap2::Mmap;
|
||||||
|
|
||||||
pub struct ConversationLog {
|
pub struct ConversationLog {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
|
|
@ -33,32 +35,19 @@ impl ConversationLog {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_nodes(&self, max_bytes: u64) -> Result<Vec<AstNode>> {
|
/// Read nodes from the tail of the log, newest first.
|
||||||
|
/// Caller decides when to stop (budget, count, etc).
|
||||||
|
pub fn read_tail(&self) -> Result<TailNodes> {
|
||||||
if !self.path.exists() {
|
if !self.path.exists() {
|
||||||
return Ok(Vec::new());
|
anyhow::bail!("log does not exist");
|
||||||
}
|
}
|
||||||
let file = File::open(&self.path)
|
let file = File::open(&self.path)
|
||||||
.with_context(|| format!("opening log {}", self.path.display()))?;
|
.with_context(|| format!("opening log {}", self.path.display()))?;
|
||||||
let file_len = file.metadata()?.len();
|
if file.metadata()?.len() == 0 {
|
||||||
let mut reader = BufReader::new(file);
|
anyhow::bail!("log is empty");
|
||||||
|
|
||||||
if file_len > max_bytes {
|
|
||||||
reader.seek(SeekFrom::Start(file_len - max_bytes))?;
|
|
||||||
let mut discard = String::new();
|
|
||||||
reader.read_line(&mut discard)?;
|
|
||||||
}
|
}
|
||||||
|
let mmap = unsafe { Mmap::map(&file)? };
|
||||||
let mut nodes = Vec::new();
|
Ok(TailNodes { _file: file, mmap })
|
||||||
for line in reader.lines() {
|
|
||||||
let line = line.context("reading log tail")?;
|
|
||||||
let line = line.trim();
|
|
||||||
if line.is_empty() { continue; }
|
|
||||||
if let Ok(node) = serde_json::from_str::<AstNode>(line) {
|
|
||||||
nodes.push(node);
|
|
||||||
}
|
|
||||||
// Old format entries silently skipped — journal has the context
|
|
||||||
}
|
|
||||||
Ok(nodes)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn path(&self) -> &Path {
|
pub fn path(&self) -> &Path {
|
||||||
|
|
@ -66,12 +55,13 @@ impl ConversationLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
|
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
|
||||||
|
// Read forward from the start to find first timestamp
|
||||||
let file = File::open(&self.path).ok()?;
|
let file = File::open(&self.path).ok()?;
|
||||||
let reader = BufReader::new(file);
|
let mmap = unsafe { Mmap::map(&file).ok()? };
|
||||||
for line in reader.lines().flatten() {
|
// Find first { ... } and parse
|
||||||
let line = line.trim().to_string();
|
for line in mmap.split(|&b| b == b'\n') {
|
||||||
if line.is_empty() { continue; }
|
if line.is_empty() { continue; }
|
||||||
if let Ok(node) = serde_json::from_str::<AstNode>(&line) {
|
if let Ok(node) = serde_json::from_slice::<AstNode>(line) {
|
||||||
if let Some(leaf) = node.leaf() {
|
if let Some(leaf) = node.leaf() {
|
||||||
if let Some(ts) = leaf.timestamp() {
|
if let Some(ts) = leaf.timestamp() {
|
||||||
return Some(ts);
|
return Some(ts);
|
||||||
|
|
@ -82,3 +72,16 @@ impl ConversationLog {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Iterates over conversation log nodes newest-first, using mmap + backward scan.
|
||||||
|
pub struct TailNodes {
|
||||||
|
_file: File,
|
||||||
|
mmap: Mmap,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TailNodes {
|
||||||
|
pub fn iter(&self) -> impl Iterator<Item = AstNode> + '_ {
|
||||||
|
JsonlBackwardIter::new(&self.mmap)
|
||||||
|
.filter_map(|bytes| serde_json::from_slice::<AstNode>(bytes).ok())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue