diff --git a/poc-memory/schema/memory.capnp b/poc-memory/schema/memory.capnp index 4b71faa..87be4c2 100644 --- a/poc-memory/schema/memory.capnp +++ b/poc-memory/schema/memory.capnp @@ -122,3 +122,18 @@ struct AgentVisit { struct AgentVisitLog { visits @0 :List(AgentVisit); } + +# Transcript mining progress — separate append-only log. +# Tracks which segments of which transcripts have been processed, +# by which agent, so we never re-mine the same content. + +struct TranscriptSegment { + transcriptId @0 :Text; # session UUID (filename stem) + segmentIndex @1 :UInt32; # compaction segment index within transcript + agent @2 :Text; # "observation", "experience", "fact" + timestamp @3 :Int64; # unix epoch seconds when mining completed +} + +struct TranscriptProgressLog { + segments @0 :List(TranscriptSegment); +} diff --git a/poc-memory/src/main.rs b/poc-memory/src/main.rs index bf479d9..d5b5157 100644 --- a/poc-memory/src/main.rs +++ b/poc-memory/src/main.rs @@ -643,6 +643,9 @@ enum AdminCmd { /// Date (default: today) date: Option, }, + /// Migrate transcript stub nodes to progress log + #[command(name = "migrate-transcript-progress")] + MigrateTranscriptProgress, } #[derive(Subcommand)] @@ -817,6 +820,12 @@ fn main() { AdminCmd::Params => cli::misc::cmd_params(), AdminCmd::LookupBump { keys } => cli::node::cmd_lookup_bump(&keys), AdminCmd::Lookups { date } => cli::node::cmd_lookups(date.as_deref()), + AdminCmd::MigrateTranscriptProgress => (|| -> Result<(), String> { + let mut store = store::Store::load()?; + let count = store.migrate_transcript_progress()?; + println!("Migrated {} transcript segment markers", count); + Ok(()) + })() }, }; diff --git a/poc-memory/src/store/persist.rs b/poc-memory/src/store/persist.rs index ca608c7..17895fe 100644 --- a/poc-memory/src/store/persist.rs +++ b/poc-memory/src/store/persist.rs @@ -35,6 +35,10 @@ impl Store { if visits_p.exists() { store.replay_visits(&visits_p).ok(); } + let tp_p = transcript_progress_path(); + if tp_p.exists() { + store.replay_transcript_progress(&tp_p).ok(); + } return Ok(store); }, Ok(None) => {}, @@ -87,6 +91,10 @@ impl Store { if visits_p.exists() { store.replay_visits(&visits_p)?; } + let tp_p = transcript_progress_path(); + if tp_p.exists() { + store.replay_transcript_progress(&tp_p)?; + } // Record log sizes after replay — this is the state we reflect store.loaded_nodes_size = fs::metadata(&nodes_p).map(|m| m.len()).unwrap_or(0); @@ -119,6 +127,10 @@ impl Store { if visits_p.exists() { store.replay_visits(&visits_p)?; } + let tp_p = transcript_progress_path(); + if tp_p.exists() { + store.replay_transcript_progress(&tp_p)?; + } Ok(store) } @@ -424,6 +436,157 @@ impl Store { Ok(()) } + /// Append transcript segment progress records. + pub fn append_transcript_progress(&mut self, segments: &[TranscriptSegment]) -> Result<(), String> { + if segments.is_empty() { return Ok(()); } + + let mut msg = message::Builder::new_default(); + { + let log = msg.init_root::(); + let mut list = log.init_segments(segments.len() as u32); + for (i, seg) in segments.iter().enumerate() { + seg.to_capnp(list.reborrow().get(i as u32)); + } + } + let mut buf = Vec::new(); + serialize::write_message(&mut buf, &msg) + .map_err(|e| format!("serialize transcript progress: {}", e))?; + + let path = transcript_progress_path(); + let file = fs::OpenOptions::new() + .create(true).append(true).open(&path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + use std::io::Write; + (&file).write_all(&buf) + .map_err(|e| format!("write transcript progress: {}", e))?; + + // Update in-memory index + for seg in segments { + self.transcript_progress + .entry((seg.transcript_id.clone(), seg.segment_index)) + .or_default() + .insert(seg.agent.clone()); + } + + Ok(()) + } + + /// Replay transcript progress log to rebuild in-memory index. + fn replay_transcript_progress(&mut self, path: &Path) -> Result<(), String> { + let file = fs::File::open(path) + .map_err(|e| format!("open {}: {}", path.display(), e))?; + let mut reader = BufReader::new(file); + + while reader.stream_position().map_err(|e| e.to_string())? + < fs::metadata(path).map_err(|e| e.to_string())?.len() + { + let msg = match serialize::read_message(&mut reader, Default::default()) { + Ok(m) => m, + Err(_) => break, + }; + let log = msg.get_root::() + .map_err(|e| format!("read transcript progress: {}", e))?; + + for seg in log.get_segments().map_err(|e| e.to_string())? { + let id = seg.get_transcript_id().ok() + .and_then(|t| t.to_str().ok()) + .unwrap_or("") + .to_string(); + let agent = seg.get_agent().ok() + .and_then(|t| t.to_str().ok()) + .unwrap_or("") + .to_string(); + let idx = seg.get_segment_index(); + + if !id.is_empty() && !agent.is_empty() { + self.transcript_progress + .entry((id, idx)) + .or_default() + .insert(agent); + } + } + } + Ok(()) + } + + /// Check if a transcript segment has been processed by a given agent. + pub fn is_segment_mined(&self, transcript_id: &str, segment_index: u32, agent: &str) -> bool { + self.transcript_progress + .get(&(transcript_id.to_string(), segment_index)) + .map_or(false, |agents| agents.contains(agent)) + } + + /// Mark a transcript segment as successfully processed. + pub fn mark_segment_mined(&mut self, transcript_id: &str, segment_index: u32, agent: &str) -> Result<(), String> { + let seg = new_transcript_segment(transcript_id, segment_index, agent); + self.append_transcript_progress(&[seg]) + } + + /// Migrate old stub-node transcript markers into the new progress log. + /// Reads _observed-transcripts-f-*, _mined-transcripts#f-*, and _facts-* keys, + /// extracts transcript_id and segment_index, writes to transcript-progress.capnp, + /// then deletes the stub nodes. + pub fn migrate_transcript_progress(&mut self) -> Result { + let mut segments = Vec::new(); + + for (key, _node) in &self.nodes { + // _observed-transcripts-f-{UUID}.{segment} + if let Some(rest) = key.strip_prefix("_observed-transcripts-f-") { + if let Some((uuid, seg_str)) = rest.rsplit_once('.') { + if let Ok(seg) = seg_str.parse::() { + segments.push(new_transcript_segment(uuid, seg, "observation")); + } + } + } + // _mined-transcripts#f-{UUID}.{segment} + else if let Some(rest) = key.strip_prefix("_mined-transcripts#f-") { + if let Some((uuid, seg_str)) = rest.rsplit_once('.') { + if let Ok(seg) = seg_str.parse::() { + segments.push(new_transcript_segment(uuid, seg, "experience")); + } + } + } + // _mined-transcripts-f-{UUID}.{segment} + else if let Some(rest) = key.strip_prefix("_mined-transcripts-f-") { + if let Some((uuid, seg_str)) = rest.rsplit_once('.') { + if let Ok(seg) = seg_str.parse::() { + segments.push(new_transcript_segment(uuid, seg, "experience")); + } + } + } + // _facts-{UUID} (whole-file, segment 0) + else if let Some(uuid) = key.strip_prefix("_facts-") { + if !uuid.contains('-') || uuid.len() < 30 { continue; } // skip non-UUID + segments.push(new_transcript_segment(uuid, 0, "fact")); + } + } + + let count = segments.len(); + if count > 0 { + self.append_transcript_progress(&segments)?; + } + + // Soft-delete the old stub nodes + let keys_to_delete: Vec = self.nodes.keys() + .filter(|k| k.starts_with("_observed-transcripts-") + || k.starts_with("_mined-transcripts") + || (k.starts_with("_facts-") && !k.contains("fact_mine"))) + .cloned() + .collect(); + + for key in &keys_to_delete { + if let Some(node) = self.nodes.get_mut(key) { + node.deleted = true; + } + } + + if !keys_to_delete.is_empty() { + self.save()?; + } + + Ok(count) + } + /// Record visits for a batch of node keys from a successful agent run. pub fn record_agent_visits(&mut self, node_keys: &[String], agent: &str) -> Result<(), String> { let visits: Vec = node_keys.iter() diff --git a/poc-memory/src/store/types.rs b/poc-memory/src/store/types.rs index 2d95a61..6fb4a37 100644 --- a/poc-memory/src/store/types.rs +++ b/poc-memory/src/store/types.rs @@ -8,7 +8,7 @@ use crate::memory_capnp; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs; use std::os::unix::io::AsRawFd; use std::path::PathBuf; @@ -442,6 +442,9 @@ pub struct Store { /// Agent visit tracking: node_key → (agent_type → last_visit_epoch) #[serde(default)] pub visits: VisitIndex, + /// Transcript mining progress: (transcript_id, segment_index) → set of agents that processed it + #[serde(default)] + pub transcript_progress: HashMap<(String, u32), HashSet>, /// Log sizes at load time — used by save() to write correct staleness header. /// If another writer appended since we loaded, our cache will be marked stale /// (recorded size < actual size), forcing the next reader to replay the log. @@ -560,6 +563,36 @@ pub fn new_visit(node_uuid: [u8; 16], node_key: &str, agent: &str, outcome: &str pub(crate) fn visits_path() -> PathBuf { memory_dir().join("visits.capnp") } +/// Transcript mining progress — tracks which segments have been processed +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TranscriptSegment { + pub transcript_id: String, + pub segment_index: u32, + pub agent: String, + pub timestamp: i64, +} + +capnp_message!(TranscriptSegment, + reader: memory_capnp::transcript_segment::Reader<'_>, + builder: memory_capnp::transcript_segment::Builder<'_>, + text: [transcript_id, agent], + uuid: [], + prim: [segment_index, timestamp], + enm: [], + skip: [], +); + +pub fn new_transcript_segment(transcript_id: &str, segment_index: u32, agent: &str) -> TranscriptSegment { + TranscriptSegment { + transcript_id: transcript_id.to_string(), + segment_index, + agent: agent.to_string(), + timestamp: now_epoch(), + } +} + +pub(crate) fn transcript_progress_path() -> PathBuf { memory_dir().join("transcript-progress.capnp") } + /// Create a new relation. /// Provenance is set from POC_PROVENANCE env var if present, else "manual". pub fn new_relation(