From 390b6c6c0a14e35799dd5bbf20019c034956e0b6 Mon Sep 17 00:00:00 2001 From: Kent Overstreet Date: Sun, 5 Apr 2026 01:48:11 -0400 Subject: [PATCH] more reorg --- src/agent/mod.rs | 8 +- src/mind/{observe.rs => log.rs} | 102 ++++++++++++++++- src/mind/mod.rs | 14 +-- .../training.rs => subconscious/learn.rs} | 2 +- src/subconscious/mod.rs | 9 +- src/thalamus/channels.rs | 3 - src/user/log.rs | 107 ------------------ src/user/mod.rs | 5 +- 8 files changed, 117 insertions(+), 133 deletions(-) rename src/mind/{observe.rs => log.rs} (76%) rename src/{agent/training.rs => subconscious/learn.rs} (99%) delete mode 100644 src/user/log.rs diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 900f1cf..97e1aa8 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -17,7 +17,6 @@ pub mod api; pub mod context; pub mod oneshot; pub mod tools; -pub mod training; use std::sync::Arc; use anyhow::Result; @@ -28,8 +27,9 @@ use api::types::{ContentPart, Message, MessageContent, Role}; use context::{ConversationEntry, ContextState, ContextBudget}; use tools::{summarize_args, working_stack}; -use crate::user::log::ConversationLog; +use crate::mind::log::ConversationLog; use crate::user::ui_channel::{ContextSection, SharedContextState, StreamTarget, StatusInfo, UiMessage, UiSender}; +use crate::subconscious::learn; /// Result of a single agent turn. pub struct TurnResult { @@ -570,7 +570,7 @@ impl Agent { } /// Build context state summary for the debug screen. - pub fn context_state_summary(&self, memory_scores: Option<&crate::agent::training::MemoryScore>) -> Vec { + pub fn context_state_summary(&self, memory_scores: Option<&learn::MemoryScore>) -> Vec { let count = |s: &str| self.tokenizer.encode_with_special_tokens(s).len(); let mut sections = Vec::new(); @@ -869,7 +869,7 @@ impl Agent { self.publish_context_state_with_scores(None); } - pub fn publish_context_state_with_scores(&self, memory_scores: Option<&crate::agent::training::MemoryScore>) { + pub fn publish_context_state_with_scores(&self, memory_scores: Option<&learn::MemoryScore>) { let summary = self.context_state_summary(memory_scores); if let Ok(mut dbg) = std::fs::OpenOptions::new().create(true).append(true) .open("/tmp/poc-journal-debug.log") { diff --git a/src/mind/observe.rs b/src/mind/log.rs similarity index 76% rename from src/mind/observe.rs rename to src/mind/log.rs index ff4caf3..de6639a 100644 --- a/src/mind/observe.rs +++ b/src/mind/log.rs @@ -9,13 +9,107 @@ // // The logfile is the history. The socket is the live wire. -use std::path::PathBuf; +use anyhow::{Context, Result}; +use std::fs::{File, OpenOptions}; +use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{broadcast, Mutex}; use crate::user::ui_channel::UiMessage; +use crate::agent::context::ConversationEntry; + +pub struct ConversationLog { + path: PathBuf, +} + +impl ConversationLog { + pub fn new(path: PathBuf) -> Result { + // Ensure parent directory exists + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("creating log dir {}", parent.display()))?; + } + Ok(Self { path }) + } + + /// Append a conversation entry to the log. + pub fn append(&self, entry: &ConversationEntry) -> Result<()> { + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.path) + .with_context(|| format!("opening log {}", self.path.display()))?; + + let line = serde_json::to_string(entry) + .context("serializing entry for log")?; + writeln!(file, "{}", line) + .context("writing to conversation log")?; + Ok(()) + } + + /// Read the tail of the log (last `max_bytes` bytes). + /// Seeks to `file_len - max_bytes`, skips the first partial line, + /// then parses forward. For logs smaller than `max_bytes`, reads everything. + pub fn read_tail(&self, max_bytes: u64) -> Result> { + if !self.path.exists() { + return Ok(Vec::new()); + } + let file = File::open(&self.path) + .with_context(|| format!("opening log {}", self.path.display()))?; + let file_len = file.metadata()?.len(); + let mut reader = BufReader::new(file); + + if file_len > max_bytes { + reader.seek(SeekFrom::Start(file_len - max_bytes))?; + // Skip partial first line + let mut discard = String::new(); + reader.read_line(&mut discard)?; + } + + let mut entries = Vec::new(); + for line in reader.lines() { + let line = line.context("reading log tail")?; + let line = line.trim(); + if line.is_empty() { + continue; + } + // Try ConversationEntry first (new format), fall back to bare Message (old logs) + if let Ok(entry) = serde_json::from_str::(line) { + entries.push(entry); + } + } + Ok(entries) + } + + pub fn path(&self) -> &Path { + &self.path + } + + /// Get the timestamp of the oldest message in the log. + pub fn oldest_timestamp(&self) -> Option> { + let file = File::open(&self.path).ok()?; + let reader = BufReader::new(file); + for line in reader.lines().flatten() { + let line = line.trim().to_string(); + if line.is_empty() { continue; } + if let Ok(entry) = serde_json::from_str::(&line) { + if let Some(ts) = &entry.message().timestamp { + if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) { + return Some(dt.to_utc()); + } + // Try other formats + if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") { + return Some(dt.and_utc()); + } + } + } + } + None + } +} fn format_message(msg: &UiMessage) -> Option { match msg { @@ -115,7 +209,7 @@ pub async fn cmd_read_inner(follow: bool, block: bool, debug: bool) -> anyhow::R ))?; let (reader, _) = stream.into_split(); - let mut reader = BufReader::new(reader); + let mut reader = TokioBufReader::new(reader); let mut line = String::new(); loop { @@ -268,7 +362,7 @@ pub fn start( tokio::spawn(async move { let (reader, mut writer) = stream.into_split(); - let mut reader = BufReader::new(reader); + let mut reader = TokioBufReader::new(reader); let mut input_buf = String::new(); loop { diff --git a/src/mind/mod.rs b/src/mind/mod.rs index bc8dfe9..6a980cb 100644 --- a/src/mind/mod.rs +++ b/src/mind/mod.rs @@ -6,7 +6,7 @@ pub mod dmn; pub mod identity; -pub mod observe; +pub mod log; // consciousness.rs — Session state machine and event loop // @@ -30,7 +30,7 @@ use crate::agent::api::types as api_types; use crate::config::{self, AppConfig, SessionConfig}; use crate::user::{self as tui, HotkeyAction}; use crate::user::ui_channel::{self, ContextInfo, StatusInfo, StreamTarget, UiMessage}; -use crate::user::log; +use crate::subconscious::learn; /// Compaction threshold — context is rebuilt when prompt tokens exceed this. fn compaction_threshold(app: &AppConfig) -> u32 { @@ -76,7 +76,7 @@ pub struct Session { // Subconscious orchestration agent_cycles: crate::subconscious::subconscious::AgentCycleState, /// Latest memory importance scores from full matrix scoring (manual /score). - memory_scores: Option, + memory_scores: Option, /// Whether a full matrix /score task is currently running. scoring_in_flight: bool, } @@ -233,7 +233,7 @@ impl Session { (agent.context.clone(), agent.client_clone()) }; - let result = crate::agent::training::score_memories_incremental( + let result = learn::score_memories_incremental( &context, max_age as i64, response_window, &client, &ui_tx, ).await; @@ -407,7 +407,7 @@ impl Session { let agent = self.agent.clone(); let ui_tx = self.ui_tx.clone(); tokio::spawn(async move { - let result = crate::agent::training::score_memories( + let result = learn::score_memories( &context, &client, &ui_tx, ).await; let agent = agent.lock().await; @@ -805,9 +805,9 @@ pub async fn run(cli: crate::user::CliArgs) -> Result<()> { // Start observation socket let socket_path = session.config.session_dir.join("agent.sock"); - let (observe_input_tx, mut observe_input_rx) = observe::input_channel(); + let (observe_input_tx, mut observe_input_rx) = log::input_channel(); if !no_agents { - observe::start(socket_path, ui_tx.subscribe(), observe_input_tx); + log::start(socket_path, ui_tx.subscribe(), observe_input_tx); } let mut reader = EventStream::new(); diff --git a/src/agent/training.rs b/src/subconscious/learn.rs similarity index 99% rename from src/agent/training.rs rename to src/subconscious/learn.rs index a2203b8..0b57805 100644 --- a/src/agent/training.rs +++ b/src/subconscious/learn.rs @@ -14,7 +14,7 @@ // with high divergence depend on memories the model // hasn't internalized. 2 API calls. -use super::api::ApiClient; +use crate::agent::api::ApiClient; use crate::agent::api::types::*; use crate::agent::context::{ConversationEntry, ContextState}; use crate::user::ui_channel::{UiMessage, UiSender}; diff --git a/src/subconscious/mod.rs b/src/subconscious/mod.rs index 3b047ff..addd15b 100644 --- a/src/subconscious/mod.rs +++ b/src/subconscious/mod.rs @@ -17,11 +17,12 @@ // // The session hook (context injection, agent orchestration) moved to claude/hook. -pub mod subconscious; pub mod api; -pub mod prompts; -pub mod defs; pub mod audit; pub mod consolidate; -pub mod digest; pub mod daemon; +pub mod defs; +pub mod digest; +pub mod learn; +pub mod prompts; +pub mod subconscious; diff --git a/src/thalamus/channels.rs b/src/thalamus/channels.rs index 14cc489..4bc00c7 100644 --- a/src/thalamus/channels.rs +++ b/src/thalamus/channels.rs @@ -6,9 +6,6 @@ // Each daemon socket speaks the channel.capnp protocol. The channel // manager routes by prefix: "irc.#bcachefs" → connects to irc.sock. -use std::collections::HashMap; -use std::path::PathBuf; - use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem}; use futures::AsyncReadExt; use tokio::net::UnixStream; diff --git a/src/user/log.rs b/src/user/log.rs deleted file mode 100644 index 7dfc718..0000000 --- a/src/user/log.rs +++ /dev/null @@ -1,107 +0,0 @@ -// log.rs — Persistent conversation log -// -// Append-only JSONL file that records every message in the conversation. -// This is the permanent record — never truncated, never compacted. -// The in-memory message array is a view into this log; compaction -// builds that view by mixing raw recent messages with journal -// summaries of older ones. -// -// Each line is a JSON-serialized Message with its timestamp. -// The log survives session restarts, compactions, and crashes. - -use anyhow::{Context, Result}; -use std::fs::{File, OpenOptions}; -use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; - -use crate::agent::context::ConversationEntry; - -pub struct ConversationLog { - path: PathBuf, -} - -impl ConversationLog { - pub fn new(path: PathBuf) -> Result { - // Ensure parent directory exists - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent) - .with_context(|| format!("creating log dir {}", parent.display()))?; - } - Ok(Self { path }) - } - - /// Append a conversation entry to the log. - pub fn append(&self, entry: &ConversationEntry) -> Result<()> { - let mut file = OpenOptions::new() - .create(true) - .append(true) - .open(&self.path) - .with_context(|| format!("opening log {}", self.path.display()))?; - - let line = serde_json::to_string(entry) - .context("serializing entry for log")?; - writeln!(file, "{}", line) - .context("writing to conversation log")?; - Ok(()) - } - - /// Read the tail of the log (last `max_bytes` bytes). - /// Seeks to `file_len - max_bytes`, skips the first partial line, - /// then parses forward. For logs smaller than `max_bytes`, reads everything. - pub fn read_tail(&self, max_bytes: u64) -> Result> { - if !self.path.exists() { - return Ok(Vec::new()); - } - let file = File::open(&self.path) - .with_context(|| format!("opening log {}", self.path.display()))?; - let file_len = file.metadata()?.len(); - let mut reader = BufReader::new(file); - - if file_len > max_bytes { - reader.seek(SeekFrom::Start(file_len - max_bytes))?; - // Skip partial first line - let mut discard = String::new(); - reader.read_line(&mut discard)?; - } - - let mut entries = Vec::new(); - for line in reader.lines() { - let line = line.context("reading log tail")?; - let line = line.trim(); - if line.is_empty() { - continue; - } - // Try ConversationEntry first (new format), fall back to bare Message (old logs) - if let Ok(entry) = serde_json::from_str::(line) { - entries.push(entry); - } - } - Ok(entries) - } - - pub fn path(&self) -> &Path { - &self.path - } - - /// Get the timestamp of the oldest message in the log. - pub fn oldest_timestamp(&self) -> Option> { - let file = File::open(&self.path).ok()?; - let reader = BufReader::new(file); - for line in reader.lines().flatten() { - let line = line.trim().to_string(); - if line.is_empty() { continue; } - if let Ok(entry) = serde_json::from_str::(&line) { - if let Some(ts) = &entry.message().timestamp { - if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) { - return Some(dt.to_utc()); - } - // Try other formats - if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S") { - return Some(dt.and_utc()); - } - } - } - } - None - } -} diff --git a/src/user/mod.rs b/src/user/mod.rs index 97e6a22..b0a8437 100644 --- a/src/user/mod.rs +++ b/src/user/mod.rs @@ -4,7 +4,6 @@ // machine, DMN, identity) lives in mind/. pub mod ui_channel; -pub mod log; pub mod chat; pub mod context; @@ -725,7 +724,7 @@ pub async fn main() { match &cli.command { Some(SubCmd::Read { follow, block }) => { - if let Err(e) = crate::mind::observe::cmd_read_inner(*follow, *block, cli.debug).await { + if let Err(e) = crate::mind::log::cmd_read_inner(*follow, *block, cli.debug).await { eprintln!("{:#}", e); std::process::exit(1); } @@ -737,7 +736,7 @@ pub async fn main() { eprintln!("Usage: consciousness write "); std::process::exit(1); } - if let Err(e) = crate::mind::observe::cmd_write(&msg, cli.debug).await { + if let Err(e) = crate::mind::log::cmd_write(&msg, cli.debug).await { eprintln!("{:#}", e); std::process::exit(1); }