transcript progress: capnp append-only log replaces stub nodes
Add TranscriptSegment capnp schema and append-only log for tracking which transcript segments have been mined by which agents. Replaces the old approach of creating stub nodes (_observed-transcripts, _mined-transcripts, _facts-) in the main graph store. - New schema: TranscriptSegment and TranscriptProgressLog - Store methods: append_transcript_progress, replay, is_segment_mined, mark_segment_mined - Migration command: admin migrate-transcript-progress (migrated 1771 markers, soft-deleted old stub nodes) - Progress log replayed on all Store::load paths Also: revert extractor.agent to graph-only (no CONVERSATIONS), update memory-instructions-core with refine-over-create principle. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
1500a2b635
commit
835b392b7a
4 changed files with 221 additions and 1 deletions
|
|
@ -122,3 +122,18 @@ struct AgentVisit {
|
||||||
struct AgentVisitLog {
|
struct AgentVisitLog {
|
||||||
visits @0 :List(AgentVisit);
|
visits @0 :List(AgentVisit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Transcript mining progress — separate append-only log.
|
||||||
|
# Tracks which segments of which transcripts have been processed,
|
||||||
|
# by which agent, so we never re-mine the same content.
|
||||||
|
|
||||||
|
struct TranscriptSegment {
|
||||||
|
transcriptId @0 :Text; # session UUID (filename stem)
|
||||||
|
segmentIndex @1 :UInt32; # compaction segment index within transcript
|
||||||
|
agent @2 :Text; # "observation", "experience", "fact"
|
||||||
|
timestamp @3 :Int64; # unix epoch seconds when mining completed
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TranscriptProgressLog {
|
||||||
|
segments @0 :List(TranscriptSegment);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -643,6 +643,9 @@ enum AdminCmd {
|
||||||
/// Date (default: today)
|
/// Date (default: today)
|
||||||
date: Option<String>,
|
date: Option<String>,
|
||||||
},
|
},
|
||||||
|
/// Migrate transcript stub nodes to progress log
|
||||||
|
#[command(name = "migrate-transcript-progress")]
|
||||||
|
MigrateTranscriptProgress,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Subcommand)]
|
#[derive(Subcommand)]
|
||||||
|
|
@ -817,6 +820,12 @@ fn main() {
|
||||||
AdminCmd::Params => cli::misc::cmd_params(),
|
AdminCmd::Params => cli::misc::cmd_params(),
|
||||||
AdminCmd::LookupBump { keys } => cli::node::cmd_lookup_bump(&keys),
|
AdminCmd::LookupBump { keys } => cli::node::cmd_lookup_bump(&keys),
|
||||||
AdminCmd::Lookups { date } => cli::node::cmd_lookups(date.as_deref()),
|
AdminCmd::Lookups { date } => cli::node::cmd_lookups(date.as_deref()),
|
||||||
|
AdminCmd::MigrateTranscriptProgress => (|| -> Result<(), String> {
|
||||||
|
let mut store = store::Store::load()?;
|
||||||
|
let count = store.migrate_transcript_progress()?;
|
||||||
|
println!("Migrated {} transcript segment markers", count);
|
||||||
|
Ok(())
|
||||||
|
})()
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,10 @@ impl Store {
|
||||||
if visits_p.exists() {
|
if visits_p.exists() {
|
||||||
store.replay_visits(&visits_p).ok();
|
store.replay_visits(&visits_p).ok();
|
||||||
}
|
}
|
||||||
|
let tp_p = transcript_progress_path();
|
||||||
|
if tp_p.exists() {
|
||||||
|
store.replay_transcript_progress(&tp_p).ok();
|
||||||
|
}
|
||||||
return Ok(store);
|
return Ok(store);
|
||||||
},
|
},
|
||||||
Ok(None) => {},
|
Ok(None) => {},
|
||||||
|
|
@ -87,6 +91,10 @@ impl Store {
|
||||||
if visits_p.exists() {
|
if visits_p.exists() {
|
||||||
store.replay_visits(&visits_p)?;
|
store.replay_visits(&visits_p)?;
|
||||||
}
|
}
|
||||||
|
let tp_p = transcript_progress_path();
|
||||||
|
if tp_p.exists() {
|
||||||
|
store.replay_transcript_progress(&tp_p)?;
|
||||||
|
}
|
||||||
|
|
||||||
// Record log sizes after replay — this is the state we reflect
|
// Record log sizes after replay — this is the state we reflect
|
||||||
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0);
|
||||||
|
|
@ -119,6 +127,10 @@ impl Store {
|
||||||
if visits_p.exists() {
|
if visits_p.exists() {
|
||||||
store.replay_visits(&visits_p)?;
|
store.replay_visits(&visits_p)?;
|
||||||
}
|
}
|
||||||
|
let tp_p = transcript_progress_path();
|
||||||
|
if tp_p.exists() {
|
||||||
|
store.replay_transcript_progress(&tp_p)?;
|
||||||
|
}
|
||||||
Ok(store)
|
Ok(store)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -424,6 +436,157 @@ impl Store {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Append transcript segment progress records.
|
||||||
|
pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<(), String> {
|
||||||
|
if segments.is_empty() { return Ok(()); }
|
||||||
|
|
||||||
|
let mut msg = message::Builder::new_default();
|
||||||
|
{
|
||||||
|
let log = msg.init_root::<memory_capnp::transcript_progress_log::Builder>();
|
||||||
|
let mut list = log.init_segments(segments.len() as u32);
|
||||||
|
for (i, seg) in segments.iter().enumerate() {
|
||||||
|
seg.to_capnp(list.reborrow().get(i as u32));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
serialize::write_message(&mut buf, &msg)
|
||||||
|
.map_err(|e| format!("serialize transcript progress: {}", e))?;
|
||||||
|
|
||||||
|
let path = transcript_progress_path();
|
||||||
|
let file = fs::OpenOptions::new()
|
||||||
|
.create(true).append(true).open(&path)
|
||||||
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||||||
|
use std::io::Write;
|
||||||
|
(&file).write_all(&buf)
|
||||||
|
.map_err(|e| format!("write transcript progress: {}", e))?;
|
||||||
|
|
||||||
|
// Update in-memory index
|
||||||
|
for seg in segments {
|
||||||
|
self.transcript_progress
|
||||||
|
.entry((seg.transcript_id.clone(), seg.segment_index))
|
||||||
|
.or_default()
|
||||||
|
.insert(seg.agent.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Replay transcript progress log to rebuild in-memory index.
|
||||||
|
fn replay_transcript_progress(&mut self, path: &Path) -> Result<(), String> {
|
||||||
|
let file = fs::File::open(path)
|
||||||
|
.map_err(|e| format!("open {}: {}", path.display(), e))?;
|
||||||
|
let mut reader = BufReader::new(file);
|
||||||
|
|
||||||
|
while reader.stream_position().map_err(|e| e.to_string())?
|
||||||
|
< fs::metadata(path).map_err(|e| e.to_string())?.len()
|
||||||
|
{
|
||||||
|
let msg = match serialize::read_message(&mut reader, Default::default()) {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(_) => break,
|
||||||
|
};
|
||||||
|
let log = msg.get_root::<memory_capnp::transcript_progress_log::Reader>()
|
||||||
|
.map_err(|e| format!("read transcript progress: {}", e))?;
|
||||||
|
|
||||||
|
for seg in log.get_segments().map_err(|e| e.to_string())? {
|
||||||
|
let id = seg.get_transcript_id().ok()
|
||||||
|
.and_then(|t| t.to_str().ok())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_string();
|
||||||
|
let agent = seg.get_agent().ok()
|
||||||
|
.and_then(|t| t.to_str().ok())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_string();
|
||||||
|
let idx = seg.get_segment_index();
|
||||||
|
|
||||||
|
if !id.is_empty() && !agent.is_empty() {
|
||||||
|
self.transcript_progress
|
||||||
|
.entry((id, idx))
|
||||||
|
.or_default()
|
||||||
|
.insert(agent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if a transcript segment has been processed by a given agent.
|
||||||
|
pub fn is_segment_mined(&self, transcript_id: &str, segment_index: u32, agent: &str) -> bool {
|
||||||
|
self.transcript_progress
|
||||||
|
.get(&(transcript_id.to_string(), segment_index))
|
||||||
|
.map_or(false, |agents| agents.contains(agent))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a transcript segment as successfully processed.
|
||||||
|
pub fn mark_segment_mined(&mut self, transcript_id: &str, segment_index: u32, agent: &str) -> Result<(), String> {
|
||||||
|
let seg = new_transcript_segment(transcript_id, segment_index, agent);
|
||||||
|
self.append_transcript_progress(&[seg])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Migrate old stub-node transcript markers into the new progress log.
|
||||||
|
/// Reads _observed-transcripts-f-*, _mined-transcripts#f-*, and _facts-* keys,
|
||||||
|
/// extracts transcript_id and segment_index, writes to transcript-progress.capnp,
|
||||||
|
/// then deletes the stub nodes.
|
||||||
|
pub fn migrate_transcript_progress(&mut self) -> Result<usize, String> {
|
||||||
|
let mut segments = Vec::new();
|
||||||
|
|
||||||
|
for (key, _node) in &self.nodes {
|
||||||
|
// _observed-transcripts-f-{UUID}.{segment}
|
||||||
|
if let Some(rest) = key.strip_prefix("_observed-transcripts-f-") {
|
||||||
|
if let Some((uuid, seg_str)) = rest.rsplit_once('.') {
|
||||||
|
if let Ok(seg) = seg_str.parse::<u32>() {
|
||||||
|
segments.push(new_transcript_segment(uuid, seg, "observation"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// _mined-transcripts#f-{UUID}.{segment}
|
||||||
|
else if let Some(rest) = key.strip_prefix("_mined-transcripts#f-") {
|
||||||
|
if let Some((uuid, seg_str)) = rest.rsplit_once('.') {
|
||||||
|
if let Ok(seg) = seg_str.parse::<u32>() {
|
||||||
|
segments.push(new_transcript_segment(uuid, seg, "experience"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// _mined-transcripts-f-{UUID}.{segment}
|
||||||
|
else if let Some(rest) = key.strip_prefix("_mined-transcripts-f-") {
|
||||||
|
if let Some((uuid, seg_str)) = rest.rsplit_once('.') {
|
||||||
|
if let Ok(seg) = seg_str.parse::<u32>() {
|
||||||
|
segments.push(new_transcript_segment(uuid, seg, "experience"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// _facts-{UUID} (whole-file, segment 0)
|
||||||
|
else if let Some(uuid) = key.strip_prefix("_facts-") {
|
||||||
|
if !uuid.contains('-') || uuid.len() < 30 { continue; } // skip non-UUID
|
||||||
|
segments.push(new_transcript_segment(uuid, 0, "fact"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = segments.len();
|
||||||
|
if count > 0 {
|
||||||
|
self.append_transcript_progress(&segments)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Soft-delete the old stub nodes
|
||||||
|
let keys_to_delete: Vec<String> = self.nodes.keys()
|
||||||
|
.filter(|k| k.starts_with("_observed-transcripts-")
|
||||||
|
|| k.starts_with("_mined-transcripts")
|
||||||
|
|| (k.starts_with("_facts-") && !k.contains("fact_mine")))
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for key in &keys_to_delete {
|
||||||
|
if let Some(node) = self.nodes.get_mut(key) {
|
||||||
|
node.deleted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !keys_to_delete.is_empty() {
|
||||||
|
self.save()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
/// Record visits for a batch of node keys from a successful agent run.
|
/// Record visits for a batch of node keys from a successful agent run.
|
||||||
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> {
|
pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> {
|
||||||
let visits: Vec<AgentVisit> = node_keys.iter()
|
let visits: Vec<AgentVisit> = node_keys.iter()
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ use crate::memory_capnp;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::os::unix::io::AsRawFd;
|
use std::os::unix::io::AsRawFd;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
@ -442,6 +442,9 @@ pub struct Store {
|
||||||
/// Agent visit tracking: node_key → (agent_type → last_visit_epoch)
|
/// Agent visit tracking: node_key → (agent_type → last_visit_epoch)
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub visits: VisitIndex,
|
pub visits: VisitIndex,
|
||||||
|
/// Transcript mining progress: (transcript_id, segment_index) → set of agents that processed it
|
||||||
|
#[serde(default)]
|
||||||
|
pub transcript_progress: HashMap<(String, u32), HashSet<String>>,
|
||||||
/// Log sizes at load time — used by save() to write correct staleness header.
|
/// Log sizes at load time — used by save() to write correct staleness header.
|
||||||
/// If another writer appended since we loaded, our cache will be marked stale
|
/// If another writer appended since we loaded, our cache will be marked stale
|
||||||
/// (recorded size < actual size), forcing the next reader to replay the log.
|
/// (recorded size < actual size), forcing the next reader to replay the log.
|
||||||
|
|
@ -560,6 +563,36 @@ pub fn new_visit(node_uuid: [u8; 16], node_key: &str, agent: &str, outcome: &str
|
||||||
|
|
||||||
pub(crate) fn visits_path() -> PathBuf { memory_dir().join("visits.capnp") }
|
pub(crate) fn visits_path() -> PathBuf { memory_dir().join("visits.capnp") }
|
||||||
|
|
||||||
|
/// Transcript mining progress — tracks which segments have been processed
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct TranscriptSegment {
|
||||||
|
pub transcript_id: String,
|
||||||
|
pub segment_index: u32,
|
||||||
|
pub agent: String,
|
||||||
|
pub timestamp: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
capnp_message!(TranscriptSegment,
|
||||||
|
reader: memory_capnp::transcript_segment::Reader<'_>,
|
||||||
|
builder: memory_capnp::transcript_segment::Builder<'_>,
|
||||||
|
text: [transcript_id, agent],
|
||||||
|
uuid: [],
|
||||||
|
prim: [segment_index, timestamp],
|
||||||
|
enm: [],
|
||||||
|
skip: [],
|
||||||
|
);
|
||||||
|
|
||||||
|
pub fn new_transcript_segment(transcript_id: &str, segment_index: u32, agent: &str) -> TranscriptSegment {
|
||||||
|
TranscriptSegment {
|
||||||
|
transcript_id: transcript_id.to_string(),
|
||||||
|
segment_index,
|
||||||
|
agent: agent.to_string(),
|
||||||
|
timestamp: now_epoch(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn transcript_progress_path() -> PathBuf { memory_dir().join("transcript-progress.capnp") }
|
||||||
|
|
||||||
/// Create a new relation.
|
/// Create a new relation.
|
||||||
/// Provenance is set from POC_PROVENANCE env var if present, else "manual".
|
/// Provenance is set from POC_PROVENANCE env var if present, else "manual".
|
||||||
pub fn new_relation(
|
pub fn new_relation(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue