context: tighten timestamp schema; every AstNode has one
Previously NodeLeaf.timestamp and AstNode::Branch.timestamp accepted null or missing via a deserialize_timestamp_or_epoch fallback — legacy entries in conversation.jsonl from before Branch timestamps existed (and from before chrono serialization was wired up) would load with UNIX_EPOCH as a sentinel. Downstream, node_timestamp_ns() returned Option<i64> and callers had to handle None as "old entry, skip." That second filter was silently dropping every candidate in score_finetune_candidates when scoring an older session — the F6 screen showed "0 above threshold" even when max_divergence was orders of magnitude above the threshold, because every entry was failing the None check, not the divergence check. The fix, in three parts: 1. src/bin/fix-timestamps.rs — one-off migration tool that walks a conversation.jsonl, linearly interpolates timestamps for entries stuck at UNIX_EPOCH (using surrounding real timestamps as anchors), propagates to child leaves with per-sibling ns offsets, and bumps any collisions by 1 ns for uniqueness. Ran against the current session's log: 11887 entries, 72289 ns bumps, all unique. 2. context.rs — drop default_timestamp and deserialize_timestamp_or_epoch. NodeLeaf and Branch now require a present non-null timestamp on deserialize. Tests flip from "missing/null → UNIX_EPOCH" to "missing/null → Err." 3. subconscious/learn.rs — node_timestamp_ns now returns i64, not Option<i64>. The matching caller in score_finetune_candidates collapses from a Some/None match to a single trained-set check. mind/log.rs's oldest_timestamp no longer filters UNIX_EPOCH. Every line currently on disk has already been migrated. Going forward, new AstNodes always carry real timestamps (Utc::now() at construction time), so the strict schema is the invariant, not an aspiration. Co-Authored-By: Proof of Concept <poc@bcachefs.org>
This commit is contained in:
parent
77822992c8
commit
080b4f9084
4 changed files with 210 additions and 71 deletions
|
|
@ -85,19 +85,6 @@ pub enum NodeBody {
|
|||
Log(String),
|
||||
}
|
||||
|
||||
fn default_timestamp() -> DateTime<Utc> {
|
||||
DateTime::UNIX_EPOCH
|
||||
}
|
||||
|
||||
/// Deserialize timestamp, treating both missing and null as UNIX_EPOCH.
|
||||
fn deserialize_timestamp_or_epoch<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let opt: Option<DateTime<Utc>> = Option::deserialize(deserializer)?;
|
||||
Ok(opt.unwrap_or(DateTime::UNIX_EPOCH))
|
||||
}
|
||||
|
||||
/// A leaf node: typed content with cached token IDs.
|
||||
/// Token IDs are not serialized — they're recomputed on deserialization.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
|
@ -113,7 +100,6 @@ impl<'de> Deserialize<'de> for NodeLeaf {
|
|||
#[derive(Deserialize)]
|
||||
struct Raw {
|
||||
body: NodeBody,
|
||||
#[serde(default = "default_timestamp", deserialize_with = "deserialize_timestamp_or_epoch")]
|
||||
timestamp: DateTime<Utc>,
|
||||
}
|
||||
let raw = Raw::deserialize(deserializer)?;
|
||||
|
|
@ -133,7 +119,6 @@ pub enum AstNode {
|
|||
Branch {
|
||||
role: Role,
|
||||
children: Vec<AstNode>,
|
||||
#[serde(default = "default_timestamp", deserialize_with = "deserialize_timestamp_or_epoch")]
|
||||
timestamp: DateTime<Utc>,
|
||||
/// Per-response memory attribution from full scoring matrix.
|
||||
/// Maps memory key → divergence score for this response.
|
||||
|
|
@ -1363,45 +1348,31 @@ mod tests {
|
|||
// -- Timestamp deserialization tests ------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn test_timestamp_null_becomes_epoch() {
|
||||
// Old conversation.jsonl entries have "timestamp":null
|
||||
// serde(default) only handles missing fields, not explicit nulls.
|
||||
// We need to verify our deserialize handles this correctly.
|
||||
fn test_timestamp_null_rejected() {
|
||||
// Missing/null timestamps used to be accepted via a lenient
|
||||
// deserialize fallback. Post-migration the schema is strict.
|
||||
let json = r#"{"Leaf":{"body":{"Content":"hello"},"timestamp":null}}"#;
|
||||
let node: AstNode = serde_json::from_str(json).unwrap();
|
||||
let leaf = node.leaf().unwrap();
|
||||
assert_eq!(leaf.timestamp(), DateTime::<Utc>::UNIX_EPOCH);
|
||||
assert!(serde_json::from_str::<AstNode>(json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timestamp_missing_becomes_epoch() {
|
||||
fn test_timestamp_missing_rejected() {
|
||||
let json = r#"{"Leaf":{"body":{"Content":"hello"}}}"#;
|
||||
assert!(serde_json::from_str::<AstNode>(json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_branch_timestamp_missing_rejected() {
|
||||
let json = r#"{"Branch":{"role":"User","children":[]}}"#;
|
||||
assert!(serde_json::from_str::<AstNode>(json).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timestamp_present_accepted() {
|
||||
let json = r#"{"Leaf":{"body":{"Content":"hi"},"timestamp":"2026-04-16T12:00:00Z"}}"#;
|
||||
let node: AstNode = serde_json::from_str(json).unwrap();
|
||||
let leaf = node.leaf().unwrap();
|
||||
assert_eq!(leaf.timestamp(), DateTime::<Utc>::UNIX_EPOCH);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_branch_timestamp_null_becomes_epoch() {
|
||||
let json = r#"{"Branch":{"role":"User","children":[{"Leaf":{"body":{"Content":"hi"}}}],"timestamp":null}}"#;
|
||||
let node: AstNode = serde_json::from_str(json).unwrap();
|
||||
match node {
|
||||
AstNode::Branch { timestamp, .. } => {
|
||||
assert_eq!(timestamp, DateTime::<Utc>::UNIX_EPOCH);
|
||||
}
|
||||
_ => panic!("expected Branch"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_branch_timestamp_missing_becomes_epoch() {
|
||||
let json = r#"{"Branch":{"role":"User","children":[{"Leaf":{"body":{"Content":"hi"}}}]}}"#;
|
||||
let node: AstNode = serde_json::from_str(json).unwrap();
|
||||
match node {
|
||||
AstNode::Branch { timestamp, .. } => {
|
||||
assert_eq!(timestamp, DateTime::<Utc>::UNIX_EPOCH);
|
||||
}
|
||||
_ => panic!("expected Branch"),
|
||||
}
|
||||
assert_eq!(leaf.timestamp().to_rfc3339(),
|
||||
"2026-04-16T12:00:00+00:00");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
180
src/bin/fix-timestamps.rs
Normal file
180
src/bin/fix-timestamps.rs
Normal file
|
|
@ -0,0 +1,180 @@
|
|||
// fix-timestamps: One-off migration for ~/.consciousness/agent-sessions/
|
||||
// conversation.jsonl.
|
||||
//
|
||||
// Before Branch nodes carried their own timestamps, early entries were
|
||||
// serialized with missing/null timestamp fields — they deserialize as
|
||||
// UNIX_EPOCH via the (now-to-be-removed) deserialize_timestamp_or_epoch
|
||||
// fallback. Training needs every entry to have a unique timestamp to
|
||||
// dedup already-trained responses.
|
||||
//
|
||||
// Walks the file, synthesizes timestamps for any entry stuck at
|
||||
// UNIX_EPOCH by linear interpolation between surrounding real
|
||||
// timestamps. For child leaves inside a Branch, derives timestamps
|
||||
// from the parent with a tiny per-child offset.
|
||||
//
|
||||
// SAFETY: reads from argv[1], writes to argv[1].tmp, renames into
|
||||
// place. Keep a .bak copy before running.
|
||||
//
|
||||
// Usage: fix-timestamps <path-to-conversation.jsonl>
|
||||
|
||||
use std::io::{BufRead, BufReader, BufWriter, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
|
||||
use consciousness::agent::context::AstNode;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let path: PathBuf = std::env::args().nth(1)
|
||||
.context("usage: fix-timestamps <path>")?.into();
|
||||
|
||||
let f = std::fs::File::open(&path)
|
||||
.with_context(|| format!("open {}", path.display()))?;
|
||||
let reader = BufReader::new(f);
|
||||
|
||||
let mut nodes: Vec<AstNode> = Vec::new();
|
||||
for (i, line) in reader.lines().enumerate() {
|
||||
let line = line?;
|
||||
if line.trim().is_empty() { continue; }
|
||||
let node: AstNode = serde_json::from_str(&line)
|
||||
.with_context(|| format!("line {}: parse", i + 1))?;
|
||||
nodes.push(node);
|
||||
}
|
||||
println!("read {} entries", nodes.len());
|
||||
|
||||
fix_top_level_timestamps(&mut nodes);
|
||||
for node in &mut nodes {
|
||||
propagate_to_children(node);
|
||||
}
|
||||
|
||||
// Ensure uniqueness — real timestamps can collide when two entries
|
||||
// were written in the same ns; synthesized ones can also overlap.
|
||||
// Bump colliding ns by 1 until unique.
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
let mut bumps = 0usize;
|
||||
for (i, node) in nodes.iter_mut().enumerate() {
|
||||
let ts = top_ts(node);
|
||||
assert!(ts > DateTime::<Utc>::UNIX_EPOCH,
|
||||
"entry {}: still UNIX_EPOCH", i);
|
||||
let mut ns = ts.timestamp_nanos_opt().expect("ts in i64 ns range");
|
||||
let mut bumped = false;
|
||||
while !seen.insert(ns) {
|
||||
ns += 1;
|
||||
bumped = true;
|
||||
bumps += 1;
|
||||
}
|
||||
if bumped {
|
||||
set_top_ts(node, DateTime::<Utc>::from_timestamp_nanos(ns));
|
||||
}
|
||||
}
|
||||
println!("all {} timestamps real and unique ({} ns bumps)",
|
||||
nodes.len(), bumps);
|
||||
|
||||
let tmp = path.with_extension("jsonl.tmp");
|
||||
{
|
||||
let f = std::fs::File::create(&tmp)
|
||||
.with_context(|| format!("create {}", tmp.display()))?;
|
||||
let mut w = BufWriter::new(f);
|
||||
for node in &nodes {
|
||||
serde_json::to_writer(&mut w, node)?;
|
||||
w.write_all(b"\n")?;
|
||||
}
|
||||
w.flush()?;
|
||||
}
|
||||
std::fs::rename(&tmp, &path)
|
||||
.with_context(|| format!("rename {} -> {}", tmp.display(), path.display()))?;
|
||||
println!("wrote {}", path.display());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn top_ts(node: &AstNode) -> DateTime<Utc> {
|
||||
match node {
|
||||
AstNode::Leaf(leaf) => leaf.timestamp(),
|
||||
AstNode::Branch { timestamp, .. } => *timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
fn set_top_ts(node: &mut AstNode, ts: DateTime<Utc>) {
|
||||
match node {
|
||||
AstNode::Leaf(leaf) => *leaf = leaf.clone().with_timestamp(ts),
|
||||
AstNode::Branch { timestamp, .. } => *timestamp = ts,
|
||||
}
|
||||
}
|
||||
|
||||
/// Fill in missing top-level timestamps. Strategy:
|
||||
/// - If two real timestamps bracket a run of missing ones, linearly
|
||||
/// interpolate between them.
|
||||
/// - If missing ones precede the first real one, back-fill using
|
||||
/// (first_real - N·1µs).
|
||||
/// - If missing ones follow the last real one, forward-fill.
|
||||
/// - If no real timestamps exist at all, synthesize from now() going
|
||||
/// backwards.
|
||||
fn fix_top_level_timestamps(nodes: &mut [AstNode]) {
|
||||
let real: Vec<(usize, DateTime<Utc>)> = nodes.iter().enumerate()
|
||||
.filter(|(_, n)| top_ts(n) > DateTime::<Utc>::UNIX_EPOCH)
|
||||
.map(|(i, n)| (i, top_ts(n)))
|
||||
.collect();
|
||||
|
||||
if real.is_empty() {
|
||||
let now = Utc::now();
|
||||
let len = nodes.len();
|
||||
for (i, node) in nodes.iter_mut().enumerate() {
|
||||
let ts = now - Duration::microseconds((len - i) as i64);
|
||||
set_top_ts(node, ts);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Helper: bisect real[] for the nearest real entries around idx.
|
||||
let find_bracket = |idx: usize| -> (Option<(usize, DateTime<Utc>)>,
|
||||
Option<(usize, DateTime<Utc>)>) {
|
||||
let pos = real.binary_search_by_key(&idx, |(i, _)| *i);
|
||||
let (prior_pos, next_pos) = match pos {
|
||||
Ok(p) => (Some(p), Some(p)),
|
||||
Err(p) => (
|
||||
if p == 0 { None } else { Some(p - 1) },
|
||||
if p >= real.len() { None } else { Some(p) },
|
||||
),
|
||||
};
|
||||
(prior_pos.map(|p| real[p]), next_pos.map(|p| real[p]))
|
||||
};
|
||||
|
||||
for i in 0..nodes.len() {
|
||||
if top_ts(&nodes[i]) > DateTime::<Utc>::UNIX_EPOCH {
|
||||
continue;
|
||||
}
|
||||
let (prior, next) = find_bracket(i);
|
||||
let new_ts = match (prior, next) {
|
||||
(Some((pi, pt)), Some((ni, nt))) if pi != ni => {
|
||||
// Linear interpolate.
|
||||
let span_ns = (nt - pt).num_nanoseconds().unwrap_or(0);
|
||||
let offset_ns = span_ns * (i - pi) as i64 / (ni - pi) as i64;
|
||||
pt + Duration::nanoseconds(offset_ns)
|
||||
}
|
||||
(Some((pi, pt)), _) => {
|
||||
pt + Duration::microseconds((i - pi) as i64)
|
||||
}
|
||||
(None, Some((ni, nt))) => {
|
||||
nt - Duration::microseconds((ni - i) as i64)
|
||||
}
|
||||
(None, None) => unreachable!(),
|
||||
};
|
||||
set_top_ts(&mut nodes[i], new_ts);
|
||||
}
|
||||
}
|
||||
|
||||
/// For every Branch, ensure each child Leaf has a timestamp. If missing,
|
||||
/// use parent.ts + child_idx·1ns so siblings stay unique but close.
|
||||
fn propagate_to_children(node: &mut AstNode) {
|
||||
if let AstNode::Branch { timestamp, children, .. } = node {
|
||||
let parent_ts = *timestamp;
|
||||
for (ci, child) in children.iter_mut().enumerate() {
|
||||
if top_ts(child) <= DateTime::<Utc>::UNIX_EPOCH {
|
||||
set_top_ts(child, parent_ts + Duration::nanoseconds(ci as i64));
|
||||
}
|
||||
propagate_to_children(child);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -55,17 +55,13 @@ impl ConversationLog {
|
|||
}
|
||||
|
||||
pub fn oldest_timestamp(&self) -> Option<chrono::DateTime<chrono::Utc>> {
|
||||
// Read forward from the start to find first non-epoch timestamp
|
||||
let file = File::open(&self.path).ok()?;
|
||||
let mmap = unsafe { Mmap::map(&file).ok()? };
|
||||
for line in mmap.split(|&b| b == b'\n') {
|
||||
if line.is_empty() { continue; }
|
||||
if let Ok(node) = serde_json::from_slice::<AstNode>(line) {
|
||||
if let Some(leaf) = node.leaf() {
|
||||
let ts = leaf.timestamp();
|
||||
if ts != chrono::DateTime::UNIX_EPOCH {
|
||||
return Some(ts);
|
||||
}
|
||||
return Some(leaf.timestamp());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -516,16 +516,11 @@ pub async fn score_finetune_candidates(
|
|||
|
||||
let node = &entries[entry_idx];
|
||||
|
||||
// Get timestamp and skip if already trained
|
||||
let timestamp_ns = match node_timestamp_ns(node) {
|
||||
Some(ts) => {
|
||||
if trained.contains(&ts) {
|
||||
continue; // Already trained, skip
|
||||
// Skip if already trained on.
|
||||
let timestamp_ns = node_timestamp_ns(node);
|
||||
if trained.contains(×tamp_ns) {
|
||||
continue;
|
||||
}
|
||||
ts
|
||||
}
|
||||
None => continue, // No timestamp, skip
|
||||
};
|
||||
|
||||
// Extract response text
|
||||
let response_text = match node {
|
||||
|
|
@ -661,18 +656,15 @@ pub fn mark_trained(timestamp_ns: i64) {
|
|||
}
|
||||
|
||||
/// Get timestamp in nanoseconds from an AstNode.
|
||||
/// Returns None for entries with default UNIX_EPOCH timestamp (old data)
|
||||
/// or timestamps outside the representable nano range (pre-1677 or post-2262).
|
||||
pub fn node_timestamp_ns(node: &AstNode) -> Option<i64> {
|
||||
/// i64-ns representation covers 1677..2262 via chrono; timestamps
|
||||
/// outside that window would be bugs we'd want to surface, hence panic.
|
||||
pub fn node_timestamp_ns(node: &AstNode) -> i64 {
|
||||
let ts = match node {
|
||||
AstNode::Leaf(leaf) => leaf.timestamp(),
|
||||
AstNode::Branch { timestamp, .. } => *timestamp,
|
||||
};
|
||||
if ts == chrono::DateTime::UNIX_EPOCH {
|
||||
None // Old entry without real timestamp
|
||||
} else {
|
||||
ts.timestamp_nanos_opt()
|
||||
}
|
||||
.expect("timestamp outside i64-ns representable range (1677..2262)")
|
||||
}
|
||||
|
||||
// ── Training API ────────────────────────────────────────────────
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue